3 Commits

Author SHA1 Message Date
7625461929 Update Jenkinsfile 2024-10-12 16:43:41 +02:00
cclecle
8ba0ab5f5e chore: wip 2024-04-12 22:56:27 +01:00
cclecle
c05b541148 improve code quality 2023-11-11 17:17:35 +00:00
15 changed files with 204 additions and 182 deletions

4
Jenkinsfile vendored
View File

@@ -184,7 +184,7 @@ pipeline {
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
@@ -546,7 +546,7 @@ pipeline {
dir("gitrepo") {
junit 'helpers-results/cl_unit_test/*.xml'
// using cobertura format (= coverage xml format)
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
publishHTML([
reportDir: "helpers-results/cl_unit_test_coverage",
reportFiles: "index.html",

View File

@@ -50,6 +50,9 @@ where = ["src"]
[tool.setuptools.package-data]
"pyrestresource" = ["py.typed"]
[tool.pylint.main]
disable = ["missing-class-docstring","missing-function-docstring","missing-module-docstring"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true

View File

@@ -41,10 +41,9 @@ from .rest_resource_plugin import (
)
from .rest_ACL import ACL_target_user, ACL_target_group, ACL_target_group_Any, ACL_record, ACL_rule
from .rest_resource import RestResourceBase
from .rest_login import (
RestResourceBaseLogin,
UserLogin,
)
from .rest_login import UserLogin
from .rest_resource_login import RestResourceBaseLogin
from .rest_exceptions import (
RestResourceException,
RestResourceLoginException,

View File

@@ -36,5 +36,4 @@ def parse_dict_cookies(cookies: str) -> dict[str, str | None]:
def forward_exception(e: Exception, forward: bool) -> None:
if forward:
raise e from None
else:
traceback.print_exc()
traceback.print_exc()

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from enum import Enum, auto
from pydantic import BaseModel
from .rest_types import rsrc_verb
if TYPE_CHECKING:

View File

@@ -12,27 +12,20 @@
"""CLI interface module"""
from __future__ import annotations
from typing import Optional, ClassVar, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from datetime import datetime
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_model import RestField
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule, ACL_target_user
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
pass
class UserLogin(BaseModel):
@@ -46,21 +39,7 @@ class UserSession(BaseModel):
client: tuple[str, int] | tuple[()] | None
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource
class Login(RestResourceBase):
class Login(rest_resource.RestResourceBase):
username: Optional[str] = RestField(None)
secret: Optional[str] = RestField(
None,
@@ -70,64 +49,3 @@ class Login(RestResourceBase):
ACL_record(verbs=[rsrc_verb.GET], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie != None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
return
# print(f"non-connected user {request.get_client()}")
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
else:
already_failed = True
else:
pass
pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -12,7 +12,7 @@ if TYPE_CHECKING:
from pydantic.fields import _EmptyKwargs, AliasPath, AliasChoices
def RestField(
def RestField( # pylint: disable=too-many-locals
default: Any = PydanticUndefined,
*,
default_factory: Callable[[], Any] | None = _Unset,
@@ -28,7 +28,7 @@ def RestField(
json_schema_extra: dict[str, Any] | Callable[[dict[str, Any]], None] | None = _Unset,
frozen: bool | None = _Unset,
validate_default: bool | None = _Unset,
repr: bool = _Unset,
repr: bool = _Unset, # pylint: disable=redefined-builtin
init_var: bool | None = _Unset,
kw_only: bool | None = _Unset,
pattern: str | None = _Unset,

View File

@@ -13,7 +13,7 @@ from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel, Field
from typeguard import check_type
from .rest_login import RestResourceBaseLogin
from .rest_types import rsrc_verb, T_AllSupportedFields
from .rest_request_opt import (
RestRequestParams_POST,
@@ -96,11 +96,11 @@ class RequestFactory(
request.update_ReqParams(self.cls_RestRequestParams_DELETE)
else:
raise RestResourceHandlerException_MethodNotAllowed("Invalid Verb")
return
class RestRequest(Generic[_T_RestRequestParams]):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
"""Main RestRequets class"""
def __init__(
@@ -206,15 +206,15 @@ class RestRequest(Generic[_T_RestRequestParams]):
return None
if isinstance(self.headers["cookie"], dict):
return self.headers["cookie"][key]
else:
return None
return None
def set_resp_cookie_value(self, key: str, value: str) -> None:
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
self.outgoing_cookie[
key
] = f"{value}; expires={self.root_resource.get_new_cookie_expiration_date().strftime('%a, %d %b %Y %H:%M:%S GMT')}; path=/; HttpOnly"
expire_date = self.root_resource.get_new_cookie_expiration_date().strftime("%a, %d %b %Y %H:%M:%S GMT")
self.outgoing_cookie[key] = f"{value}; expires={expire_date}; path=/; HttpOnly"
def reset_resp_cookie(self, key: str) -> None:
self.outgoing_cookie[key] = "null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT"

View File

@@ -12,6 +12,7 @@ import json
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .helpers import _JSONEncoder, forward_exception
@@ -35,6 +36,7 @@ from .rest_exceptions import (
RestResourceException,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_types import T_SupportedRESTFields
@@ -43,6 +45,9 @@ if TYPE_CHECKING:
T_T_DictKey,
T_T_DictValues,
)
from .rest_resource_handler import (
ResourceHandler,
)
class RestResourceBase(ABC, BaseModel, validate_assignment=True):
@@ -96,7 +101,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
def check_acl_self(self, request: RestRequest, new_data: Optional[dict[str, T_SupportedRESTFields]]) -> None:
"""Check ACL on requested field operation (involving checking sub-fields)"""
if request.get_verb() is rsrc_verb.GET:
for key in self.model_fields.keys():
for key in self.model_fields:
self._check_acl(request.user, request.groups, rsrc_verb.GET, key)
elif request.get_verb() is rsrc_verb.PUT:
if new_data is not None:
@@ -178,7 +183,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
def _process_request_session(self, request: RestRequest) -> None:
pass
def process_request(
def process_request( # pylint: disable=too-complex
self,
url: str,
verb: rsrc_verb = rsrc_verb.GET,
@@ -188,10 +193,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
headers: Optional[list[Any]] = None,
http_mode: bool = False,
) -> RestRequest:
from .rest_resource_handler import (
ResourceHandler,
ResourceHandler_RestResourceBase,
)
from .rest_resource_handler import ResourceHandler_RestResourceBase
data: dict = {}
if data_json:

View File

@@ -1,3 +1,5 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import Optional, cast, TypeVar, Generic, Self, TYPE_CHECKING
@@ -12,7 +14,10 @@ from .rest_types import (
T_Dict,
T_DictValues,
)
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_request import RequestFactory
from .rest_resource_plugin import (
ResourcePlugin_field,
@@ -40,14 +45,14 @@ from .rest_exceptions import (
RestResourceHandlerException_ResourceNotFound,
RestResourceHandlerException_MethodNotAllowed,
RestResourceHandlerException_BadRequest,
RestResourceHandlerException_Forbiden,
)
if TYPE_CHECKING:
from .rest_types import T_T_DictKey, T_T_DictValues
from .rest_request import RestRequest
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, RestResourceBase)
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, rest_resource.RestResourceBase)
class ResourceHandler(
@@ -103,7 +108,7 @@ class ResourceHandler(
raise RestResourceHandlerException("if req not set, url,verb must be setted")
else:
assert url is not None and verb is not None
assert isinstance(resource, RestResourceBase)
assert isinstance(resource, rest_resource.RestResourceBase)
if data is None:
data = {}
self.req = self._request_factory.get_RestRequest(resource, url, verb, data, query_string)
@@ -143,14 +148,6 @@ class ResourceHandler(
resource_handler = self._find_resource()
return resource_handler._process_verb()
def access_resource(
self,
) -> _T_Resource:
# print(f"[TRACE] {type(self).__name__}->access_resource()")
self._reset_context()
resource_handler = self._find_resource()
return resource_handler.resource
def _reset_context(self) -> None:
self.req.reset_url_stack()
@@ -177,11 +174,7 @@ class ResourceHandler(
# reveal_type(_next_resource)
# print(f"[DEBUG] next_resource = {type(next_resource).__name__}")
if (
isinstance(_next_resource, RestResourceBase)
or isinstance(_next_resource, dict)
or type(_next_resource) in _T_SupportedRESTFields
):
if isinstance(_next_resource, (rest_resource.RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
next_resource_handler_cls: type[ResourceHandler] = self._get_resource_handler(_next_resource, self.req)
self.saved_url = self.req.consume_url_stack(self._nb_url_element_to_consume_)
@@ -308,7 +301,9 @@ class ResourceHandler_dict(
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
@@ -331,34 +326,34 @@ class ResourceHandler_dict(
if plugin_dict:
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return None
return
del _dict[_dict_key]
else:
if plugin_dict:
plugin_dict.handle_dict_delete_all(_dict, params)
return None
return
_dict.clear()
return
def _handle_process_post(self, params) -> Optional[T_DictKey]:
# pylint: disable=protected-access
def _handle_process_post(self, params) -> Optional[T_DictKey]: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_handle_process_post()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
dict_value_type: T_T_DictValues = cast(RestResourceBase, self.prev_handler.resource)._dict_value_type_[
dict_value_type: T_T_DictValues = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_value_type_[
self.req.get_resource_origin(1)
]
_obj: T_DictValues
if issubclass(dict_value_type, RestResourceBase):
if issubclass(dict_value_type, rest_resource.RestResourceBase):
_obj = dict_value_type(**self.req.get_data())
_obj_restrsrc = cast(RestResourceBase, _obj)
_obj_restrsrc = cast(rest_resource.RestResourceBase, _obj)
for key, _ in _obj_restrsrc.model_fields.items():
if key in _obj_restrsrc._plugins_:
@@ -386,14 +381,14 @@ class ResourceHandler_dict(
_dict_key = key_std
# if a primary key is set for the resource, updating it
if isinstance(_obj, RestResourceBase):
if isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
setattr(_obj, _obj._primary_key_, _dict_key)
# 2nd try/ using provided resource internal primary key
# & 3rd try/ using resource internal auto-generated primary key
# => this case is automatic because if self.req.get_data() doesn't contain the key, it should be automatically created
elif isinstance(_obj, RestResourceBase):
elif isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_obj_primary_key: Optional[T_DictKey] = getattr(_obj, _obj._primary_key_)
if _obj_primary_key is not None:
@@ -407,13 +402,12 @@ class ResourceHandler_dict(
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_post(_dict, _dict_key, _obj, params)
else:
_dict[_dict_key] = _obj
return _dict_key
else:
raise RestResourceHandlerException_BadRequest(
"Either the object needs defined primary key or the request must contain an API_key param to process this command"
)
_dict[_dict_key] = _obj
return _dict_key
raise RestResourceHandlerException_BadRequest(
"Either the object needs defined primary key or the request must contain an API_key param to process this command"
)
@ResourceHandler.register_resource_handler
@@ -453,7 +447,7 @@ class ResourceHandler_dict_elem(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(1)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
@@ -485,7 +479,7 @@ class ResourceHandler_dict_elem(
# because self.req is another context that is not saved to improve performances
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
@@ -505,14 +499,15 @@ class ResourceHandler_dict_elem(
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return None
return
del _dict[_dict_key]
return
@ResourceHandler.register_resource_handler
class ResourceHandler_RestResourceBase(
ResourceHandler[
RestResourceBase,
rest_resource.RestResourceBase,
_T_RestRequestParams_POST,
_T_RestRequestParams_DELETE,
RestRequestParams_RestResourceBase_GET,
@@ -538,7 +533,7 @@ class ResourceHandler_RestResourceBase(
def _check_resource_handler(cls, resource: _T_Resource, req: RestRequest) -> bool:
# print(f"{cls.__name__}->_check_resource_handler()")
return isinstance(resource, RestResourceBase)
return isinstance(resource, rest_resource.RestResourceBase)
def _check_access_rights(self) -> None:
super()._check_access_rights()
@@ -563,7 +558,7 @@ class ResourceHandler_RestResourceBase(
if self.resource.model_fields[self.req.get_resource_origin(0)].exclude is True and self.req.get_verb() is rsrc_verb.GET:
raise RestResourceHandlerException_ResourceNotFound(f"Not allowed READ access detected: {self.req.get_url_stack()}")
def _handle_process_get(self, params) -> RestResourceBase:
def _handle_process_get(self, params) -> rest_resource.RestResourceBase:
# print(f"{type(self).__name__}->_process_get()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
@@ -574,7 +569,7 @@ class ResourceHandler_RestResourceBase(
if len(self.req.get_url_stack()) == 0:
self.resource.check_acl_self(self.req, None)
for key, attr in self.resource.model_fields.items():
for key, _ in self.resource.model_fields.items():
if key in self.resource._plugins_:
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
@@ -592,7 +587,6 @@ class ResourceHandler_RestResourceBase(
return self.resource
# CASE 2: specific (operation) case for root Node
# TODO: this must probably be merged with the previous bloc
if self.req.get_resource_origin(0) == "/":
return self.resource
@@ -614,12 +608,12 @@ class ResourceHandler_RestResourceBase(
return value
def _handle_process_put(self, params) -> None:
def _handle_process_put(self, params) -> None: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_process_put()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.resource, RestResourceBase)
assert isinstance(self.resource, rest_resource.RestResourceBase)
self.resource.check_acl_self(self.req, self.req.get_data())
@@ -644,7 +638,7 @@ class ResourceHandler_RestResourceBase(
if (
isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
key = self.req.get_resource_origin(2)
dict_key_type: T_T_DictKey = self.prev_handler.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
@@ -675,7 +669,7 @@ class ResourceHandler_RestResourceBase(
_dict[_dict_key] = _new_resrc
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, RestResourceBase):
elif isinstance(self.prev_handler.resource, rest_resource.RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
@@ -692,7 +686,6 @@ class ResourceHandler_RestResourceBase(
# print(self.resource)
# print(_new_resrc)
# print(_new_resrc.__dict__)
return
def _handle_process_delete(self, params) -> None:
# print(f"{type(self).__name__}->_handle_process_delete()")
@@ -703,7 +696,7 @@ class ResourceHandler_RestResourceBase(
self.prev_handler is not None
and isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
self.prev_handler._process_delete()
else:
@@ -731,7 +724,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)
@@ -749,7 +742,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from typing import ClassVar, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from .rest_resource import RestResourceBase
from .rest_model import RestField
from .rest_ACL import ACL_target_user
from .rest_resource_plugin_login import ResourcePlugin_Login
from .rest_login import UserSession, Login
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_login import UserLogin
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie is not None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
# print(f"non-connected user {request.get_client()}")
return
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
already_failed = True
else:
pass # pylint: disable=unnecessary-pass
pass # pylint: disable=unnecessary-pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -12,7 +12,6 @@ from .rest_types import (
)
from .rest_exceptions import RestResourceConfigException
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_resource import RestResourceBase
@@ -31,14 +30,15 @@ if TYPE_CHECKING:
class ResourcePlugin(ABC):
def __init__(self) -> None:
...
self.__request: RestRequest
self.__root_resource: RestResourceBase
def set_context(self, request: RestRequest, root_resource: RestResourceBase) -> None:
self.__request: RestRequest = request
self.__root_resource: RestResourceBase = root_resource
self.__request = request
self.__root_resource = root_resource
def user_login(self, user_name: str, user_secret: str) -> str:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
@@ -54,7 +54,7 @@ class ResourcePlugin(ABC):
self.__request.reset_resp_cookie(key)
def get_new_cookie_expiration_date(self) -> datetime:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use get_new_cookie_expiration_date")

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_login import UserLogin, Login
from .rest_exceptions import RestResourceLoginException_InvalidCredentials
if TYPE_CHECKING:
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource

View File

@@ -1,3 +1,5 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import (
cast,
@@ -40,7 +42,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
if not get_origin(datatype[1]) is None:
raise RestResourceModelException("complex dict types are not supported (should create a RestResourceBase container)")
if not datatype[0] in _T_SupportedRESTFields:
raise RestResourceModelException(f"Unsupported Dict Field value type in class (key)")
raise RestResourceModelException("Unsupported Dict Field value type in class (key)")
# preprocessing types / structure
if self.parent is not None and isinstance(self.parent, RestResourceWalker_Sub_RestResourceBase):
@@ -61,7 +63,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_dict: type[ResourcePlugin_dict] = self.resource.json_schema_extra["plugin"]
@@ -96,7 +98,7 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
# print("aaaaaaaaaa")
@@ -151,7 +153,7 @@ class RestResourceWalker_Sub_RestResourceBase__tree_init(RestResourceWalker_Sub_
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_resource: type[ResourcePlugin_RestResourceBase] = self.resource.json_schema_extra["plugin"]

View File

@@ -35,6 +35,7 @@ class RestResourceWalkerFutureResult(ABC, Generic[TV_RestResourceWalkerFutureRes
class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
# pylint: disable=too-many-instance-attributes
cls_RestResourceWalkerFutureResult: Optional[type[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
@classmethod
@@ -43,11 +44,10 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
"""implementation interface to Factory.
The factory will call this specialized method on each implementation to find a supported one.
"""
...
@classmethod
def get(
self,
cls,
subs: list[type[RestResourceWalker_Sub]],
resource_name: str,
resource: FieldInfo | type[RestResourceBase],
@@ -60,7 +60,6 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
if _is_valid is True:
return sub(resource_name, resource, parent, _anno, _optional, argument)
raise RestResourceModelException(f"Incompatible Field Found: {type(resource).__name__}")
return None
def __init__(
self,
@@ -101,7 +100,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
def chain_process_future(self) -> Optional[TV_RestResourceWalkerFutureResult]:
if self.future_results_subs is not None and self.future_result is not None:
return_future_results_subs: list[Any] = [] # TODO: use typevar
return_future_results_subs: list[Any] = []
for future_result in self.future_results_subs:
return_future_results_subs.append(future_result.chain_process_future())
return self.future_result.process_future(return_future_results_subs)
@@ -251,5 +250,4 @@ class RestResourceWalker_Root:
resource_list = list(new_resource_list)
current_deep = current_deep + 1
return sub_walker_initial.chain_process_future()
else:
raise RestResourceModelException("Invalid Rootpoint")
raise RestResourceModelException("Invalid Rootpoint")