2 Commits

Author SHA1 Message Date
7625461929 Update Jenkinsfile 2024-10-12 16:43:41 +02:00
cclecle
8ba0ab5f5e chore: wip 2024-04-12 22:56:27 +01:00
9 changed files with 155 additions and 121 deletions

4
Jenkinsfile vendored
View File

@@ -184,7 +184,7 @@ pipeline {
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
@@ -546,7 +546,7 @@ pipeline {
dir("gitrepo") {
junit 'helpers-results/cl_unit_test/*.xml'
// using cobertura format (= coverage xml format)
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
publishHTML([
reportDir: "helpers-results/cl_unit_test_coverage",
reportFiles: "index.html",

View File

@@ -41,10 +41,9 @@ from .rest_resource_plugin import (
)
from .rest_ACL import ACL_target_user, ACL_target_group, ACL_target_group_Any, ACL_record, ACL_rule
from .rest_resource import RestResourceBase
from .rest_login import (
RestResourceBaseLogin,
UserLogin,
)
from .rest_login import UserLogin
from .rest_resource_login import RestResourceBaseLogin
from .rest_exceptions import (
RestResourceException,
RestResourceLoginException,

View File

@@ -12,27 +12,20 @@
"""CLI interface module"""
from __future__ import annotations
from typing import Optional, ClassVar, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from datetime import datetime
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_model import RestField
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule, ACL_target_user
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
pass
class UserLogin(BaseModel):
@@ -46,21 +39,7 @@ class UserSession(BaseModel):
client: tuple[str, int] | tuple[()] | None
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource
class Login(RestResourceBase):
class Login(rest_resource.RestResourceBase):
username: Optional[str] = RestField(None)
secret: Optional[str] = RestField(
None,
@@ -70,63 +49,3 @@ class Login(RestResourceBase):
ACL_record(verbs=[rsrc_verb.GET], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie is not None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
# print(f"non-connected user {request.get_client()}")
return
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
already_failed = True
else:
pass # pylint: disable=unnecessary-pass
pass # pylint: disable=unnecessary-pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -13,7 +13,7 @@ from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel, Field
from typeguard import check_type
from .rest_login import RestResourceBaseLogin
from .rest_types import rsrc_verb, T_AllSupportedFields
from .rest_request_opt import (
RestRequestParams_POST,
@@ -209,6 +209,8 @@ class RestRequest(Generic[_T_RestRequestParams]):
return None
def set_resp_cookie_value(self, key: str, value: str) -> None:
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
expire_date = self.root_resource.get_new_cookie_expiration_date().strftime("%a, %d %b %Y %H:%M:%S GMT")

View File

@@ -12,6 +12,7 @@ import json
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .helpers import _JSONEncoder, forward_exception

View File

@@ -14,7 +14,10 @@ from .rest_types import (
T_Dict,
T_DictValues,
)
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_request import RequestFactory
from .rest_resource_plugin import (
ResourcePlugin_field,
@@ -49,7 +52,7 @@ if TYPE_CHECKING:
from .rest_request import RestRequest
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, RestResourceBase)
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, rest_resource.RestResourceBase)
class ResourceHandler(
@@ -105,7 +108,7 @@ class ResourceHandler(
raise RestResourceHandlerException("if req not set, url,verb must be setted")
else:
assert url is not None and verb is not None
assert isinstance(resource, RestResourceBase)
assert isinstance(resource, rest_resource.RestResourceBase)
if data is None:
data = {}
self.req = self._request_factory.get_RestRequest(resource, url, verb, data, query_string)
@@ -171,7 +174,7 @@ class ResourceHandler(
# reveal_type(_next_resource)
# print(f"[DEBUG] next_resource = {type(next_resource).__name__}")
if isinstance(_next_resource, (RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
if isinstance(_next_resource, (rest_resource.RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
next_resource_handler_cls: type[ResourceHandler] = self._get_resource_handler(_next_resource, self.req)
self.saved_url = self.req.consume_url_stack(self._nb_url_element_to_consume_)
@@ -298,7 +301,9 @@ class ResourceHandler_dict(
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
@@ -337,16 +342,18 @@ class ResourceHandler_dict(
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
dict_value_type: T_T_DictValues = cast(RestResourceBase, self.prev_handler.resource)._dict_value_type_[
dict_value_type: T_T_DictValues = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_value_type_[
self.req.get_resource_origin(1)
]
_obj: T_DictValues
if issubclass(dict_value_type, RestResourceBase):
if issubclass(dict_value_type, rest_resource.RestResourceBase):
_obj = dict_value_type(**self.req.get_data())
_obj_restrsrc = cast(RestResourceBase, _obj)
_obj_restrsrc = cast(rest_resource.RestResourceBase, _obj)
for key, _ in _obj_restrsrc.model_fields.items():
if key in _obj_restrsrc._plugins_:
@@ -374,14 +381,14 @@ class ResourceHandler_dict(
_dict_key = key_std
# if a primary key is set for the resource, updating it
if isinstance(_obj, RestResourceBase):
if isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
setattr(_obj, _obj._primary_key_, _dict_key)
# 2nd try/ using provided resource internal primary key
# & 3rd try/ using resource internal auto-generated primary key
# => this case is automatic because if self.req.get_data() doesn't contain the key, it should be automatically created
elif isinstance(_obj, RestResourceBase):
elif isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_obj_primary_key: Optional[T_DictKey] = getattr(_obj, _obj._primary_key_)
if _obj_primary_key is not None:
@@ -440,7 +447,7 @@ class ResourceHandler_dict_elem(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(1)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
@@ -472,7 +479,7 @@ class ResourceHandler_dict_elem(
# because self.req is another context that is not saved to improve performances
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
@@ -500,7 +507,7 @@ class ResourceHandler_dict_elem(
@ResourceHandler.register_resource_handler
class ResourceHandler_RestResourceBase(
ResourceHandler[
RestResourceBase,
rest_resource.RestResourceBase,
_T_RestRequestParams_POST,
_T_RestRequestParams_DELETE,
RestRequestParams_RestResourceBase_GET,
@@ -526,7 +533,7 @@ class ResourceHandler_RestResourceBase(
def _check_resource_handler(cls, resource: _T_Resource, req: RestRequest) -> bool:
# print(f"{cls.__name__}->_check_resource_handler()")
return isinstance(resource, RestResourceBase)
return isinstance(resource, rest_resource.RestResourceBase)
def _check_access_rights(self) -> None:
super()._check_access_rights()
@@ -551,7 +558,7 @@ class ResourceHandler_RestResourceBase(
if self.resource.model_fields[self.req.get_resource_origin(0)].exclude is True and self.req.get_verb() is rsrc_verb.GET:
raise RestResourceHandlerException_ResourceNotFound(f"Not allowed READ access detected: {self.req.get_url_stack()}")
def _handle_process_get(self, params) -> RestResourceBase:
def _handle_process_get(self, params) -> rest_resource.RestResourceBase:
# print(f"{type(self).__name__}->_process_get()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
@@ -606,7 +613,7 @@ class ResourceHandler_RestResourceBase(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.resource, RestResourceBase)
assert isinstance(self.resource, rest_resource.RestResourceBase)
self.resource.check_acl_self(self.req, self.req.get_data())
@@ -631,7 +638,7 @@ class ResourceHandler_RestResourceBase(
if (
isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
key = self.req.get_resource_origin(2)
dict_key_type: T_T_DictKey = self.prev_handler.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
@@ -662,7 +669,7 @@ class ResourceHandler_RestResourceBase(
_dict[_dict_key] = _new_resrc
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, RestResourceBase):
elif isinstance(self.prev_handler.resource, rest_resource.RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
@@ -689,7 +696,7 @@ class ResourceHandler_RestResourceBase(
self.prev_handler is not None
and isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
self.prev_handler._process_delete()
else:
@@ -717,7 +724,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)
@@ -735,7 +742,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from typing import ClassVar, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from .rest_resource import RestResourceBase
from .rest_model import RestField
from .rest_ACL import ACL_target_user
from .rest_resource_plugin_login import ResourcePlugin_Login
from .rest_login import UserSession, Login
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_login import UserLogin
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie is not None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
# print(f"non-connected user {request.get_client()}")
return
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
already_failed = True
else:
pass # pylint: disable=unnecessary-pass
pass # pylint: disable=unnecessary-pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -12,7 +12,6 @@ from .rest_types import (
)
from .rest_exceptions import RestResourceConfigException
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_resource import RestResourceBase
@@ -39,7 +38,7 @@ class ResourcePlugin(ABC):
self.__root_resource = root_resource
def user_login(self, user_name: str, user_secret: str) -> str:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
@@ -55,7 +54,7 @@ class ResourcePlugin(ABC):
self.__request.reset_resp_cookie(key)
def get_new_cookie_expiration_date(self) -> datetime:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use get_new_cookie_expiration_date")

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_login import UserLogin, Login
from .rest_exceptions import RestResourceLoginException_InvalidCredentials
if TYPE_CHECKING:
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource