5 Commits

Author SHA1 Message Date
7625461929 Update Jenkinsfile 2024-10-12 16:43:41 +02:00
cclecle
8ba0ab5f5e chore: wip 2024-04-12 22:56:27 +01:00
cclecle
c05b541148 improve code quality 2023-11-11 17:17:35 +00:00
cclecle
afaebec633 complete base dict plugin + unittest 2023-11-11 15:04:47 +00:00
cclecle
f677f7bf28 fix dict delete + plugin 2023-11-11 12:09:03 +00:00
19 changed files with 445 additions and 294 deletions

4
Jenkinsfile vendored
View File

@@ -184,7 +184,7 @@ pipeline {
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
@@ -546,7 +546,7 @@ pipeline {
dir("gitrepo") {
junit 'helpers-results/cl_unit_test/*.xml'
// using cobertura format (= coverage xml format)
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
publishHTML([
reportDir: "helpers-results/cl_unit_test_coverage",
reportFiles: "index.html",

View File

@@ -50,6 +50,9 @@ where = ["src"]
[tool.setuptools.package-data]
"pyrestresource" = ["py.typed"]
[tool.pylint.main]
disable = ["missing-class-docstring","missing-function-docstring","missing-module-docstring"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true

View File

@@ -32,6 +32,7 @@ from .rest_request_opt import (
RestRequestParams_Dict_DELETE,
RestRequestParams_Dict_GET,
RestRequestParams_Dict_elem_GET,
RestRequestParams_Dict_elem_PUT,
)
from .rest_resource_plugin import (
ResourcePlugin_field_default,
@@ -40,10 +41,9 @@ from .rest_resource_plugin import (
)
from .rest_ACL import ACL_target_user, ACL_target_group, ACL_target_group_Any, ACL_record, ACL_rule
from .rest_resource import RestResourceBase
from .rest_login import (
RestResourceBaseLogin,
UserLogin,
)
from .rest_login import UserLogin
from .rest_resource_login import RestResourceBaseLogin
from .rest_exceptions import (
RestResourceException,
RestResourceLoginException,

View File

@@ -36,5 +36,4 @@ def parse_dict_cookies(cookies: str) -> dict[str, str | None]:
def forward_exception(e: Exception, forward: bool) -> None:
if forward:
raise e from None
else:
traceback.print_exc()
traceback.print_exc()

View File

@@ -1,9 +1,10 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from enum import Enum, auto
from pydantic import BaseModel
from .rest_types import rsrc_verb
if TYPE_CHECKING:

View File

@@ -12,27 +12,20 @@
"""CLI interface module"""
from __future__ import annotations
from typing import Optional, ClassVar, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from datetime import datetime
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_model import RestField
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule, ACL_target_user
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
pass
class UserLogin(BaseModel):
@@ -46,21 +39,7 @@ class UserSession(BaseModel):
client: tuple[str, int] | tuple[()] | None
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource
class Login(RestResourceBase):
class Login(rest_resource.RestResourceBase):
username: Optional[str] = RestField(None)
secret: Optional[str] = RestField(
None,
@@ -70,64 +49,3 @@ class Login(RestResourceBase):
ACL_record(verbs=[rsrc_verb.GET], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie != None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
return
# print(f"non-connected user {request.get_client()}")
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
else:
already_failed = True
else:
pass
pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -1,5 +1,5 @@
from __future__ import annotations
from typing import Literal, Type, Any, Callable, TYPE_CHECKING
from typing import Literal, Any, Callable, TYPE_CHECKING
from pydantic.fields import Field, _Unset, PydanticUndefined
@@ -12,7 +12,7 @@ if TYPE_CHECKING:
from pydantic.fields import _EmptyKwargs, AliasPath, AliasChoices
def RestField(
def RestField( # pylint: disable=too-many-locals
default: Any = PydanticUndefined,
*,
default_factory: Callable[[], Any] | None = _Unset,
@@ -28,7 +28,7 @@ def RestField(
json_schema_extra: dict[str, Any] | Callable[[dict[str, Any]], None] | None = _Unset,
frozen: bool | None = _Unset,
validate_default: bool | None = _Unset,
repr: bool = _Unset,
repr: bool = _Unset, # pylint: disable=redefined-builtin
init_var: bool | None = _Unset,
kw_only: bool | None = _Unset,
pattern: str | None = _Unset,
@@ -45,7 +45,7 @@ def RestField(
max_length: int | None = _Unset,
union_mode: Literal["smart", "left_to_right"] = _Unset,
ACL: list["ACL_record"] | None = _Unset,
plugin: Type["ResourcePlugin"] | None = _Unset,
plugin: type["ResourcePlugin"] | None = _Unset,
**extra: Unpack[_EmptyKwargs],
) -> Any:
if not json_schema_extra or json_schema_extra is _Unset:

View File

@@ -13,7 +13,7 @@ from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel, Field
from typeguard import check_type
from .rest_login import RestResourceBaseLogin
from .rest_types import rsrc_verb, T_AllSupportedFields
from .rest_request_opt import (
RestRequestParams_POST,
@@ -96,11 +96,11 @@ class RequestFactory(
request.update_ReqParams(self.cls_RestRequestParams_DELETE)
else:
raise RestResourceHandlerException_MethodNotAllowed("Invalid Verb")
return
class RestRequest(Generic[_T_RestRequestParams]):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
"""Main RestRequets class"""
def __init__(
@@ -206,15 +206,15 @@ class RestRequest(Generic[_T_RestRequestParams]):
return None
if isinstance(self.headers["cookie"], dict):
return self.headers["cookie"][key]
else:
return None
return None
def set_resp_cookie_value(self, key: str, value: str) -> None:
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
self.outgoing_cookie[
key
] = f"{value}; expires={self.root_resource.get_new_cookie_expiration_date().strftime('%a, %d %b %Y %H:%M:%S GMT')}; path=/; HttpOnly"
expire_date = self.root_resource.get_new_cookie_expiration_date().strftime("%a, %d %b %Y %H:%M:%S GMT")
self.outgoing_cookie[key] = f"{value}; expires={expire_date}; path=/; HttpOnly"
def reset_resp_cookie(self, key: str) -> None:
self.outgoing_cookie[key] = "null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT"

View File

@@ -8,10 +8,11 @@ from typing import (
from abc import ABC
import json
import pprint
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .helpers import _JSONEncoder, forward_exception
@@ -35,6 +36,7 @@ from .rest_exceptions import (
RestResourceException,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_types import T_SupportedRESTFields
@@ -43,6 +45,9 @@ if TYPE_CHECKING:
T_T_DictKey,
T_T_DictValues,
)
from .rest_resource_handler import (
ResourceHandler,
)
class RestResourceBase(ABC, BaseModel, validate_assignment=True):
@@ -53,7 +58,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
_plugins_: ClassVar[
dict[
str,
type[ResourcePlugin],
ResourcePlugin,
]
] = {}
_ACL_record_: ClassVar[
@@ -96,7 +101,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
def check_acl_self(self, request: RestRequest, new_data: Optional[dict[str, T_SupportedRESTFields]]) -> None:
"""Check ACL on requested field operation (involving checking sub-fields)"""
if request.get_verb() is rsrc_verb.GET:
for key in self.model_fields.keys():
for key in self.model_fields:
self._check_acl(request.user, request.groups, rsrc_verb.GET, key)
elif request.get_verb() is rsrc_verb.PUT:
if new_data is not None:
@@ -178,7 +183,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
def _process_request_session(self, request: RestRequest) -> None:
pass
def process_request(
def process_request( # pylint: disable=too-complex
self,
url: str,
verb: rsrc_verb = rsrc_verb.GET,
@@ -188,10 +193,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
headers: Optional[list[Any]] = None,
http_mode: bool = False,
) -> RestRequest:
from .rest_resource_handler import (
ResourceHandler,
ResourceHandler_RestResourceBase,
)
from .rest_resource_handler import ResourceHandler_RestResourceBase
data: dict = {}
if data_json:

View File

@@ -1,3 +1,5 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import Optional, cast, TypeVar, Generic, Self, TYPE_CHECKING
@@ -12,7 +14,10 @@ from .rest_types import (
T_Dict,
T_DictValues,
)
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_request import RequestFactory
from .rest_resource_plugin import (
ResourcePlugin_field,
@@ -40,14 +45,14 @@ from .rest_exceptions import (
RestResourceHandlerException_ResourceNotFound,
RestResourceHandlerException_MethodNotAllowed,
RestResourceHandlerException_BadRequest,
RestResourceHandlerException_Forbiden,
)
if TYPE_CHECKING:
from .rest_types import T_T_DictKey, T_T_DictValues
from .rest_request import RestRequest
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, RestResourceBase)
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, rest_resource.RestResourceBase)
class ResourceHandler(
@@ -103,7 +108,7 @@ class ResourceHandler(
raise RestResourceHandlerException("if req not set, url,verb must be setted")
else:
assert url is not None and verb is not None
assert isinstance(resource, RestResourceBase)
assert isinstance(resource, rest_resource.RestResourceBase)
if data is None:
data = {}
self.req = self._request_factory.get_RestRequest(resource, url, verb, data, query_string)
@@ -143,14 +148,6 @@ class ResourceHandler(
resource_handler = self._find_resource()
return resource_handler._process_verb()
def access_resource(
self,
) -> _T_Resource:
# print(f"[TRACE] {type(self).__name__}->access_resource()")
self._reset_context()
resource_handler = self._find_resource()
return resource_handler.resource
def _reset_context(self) -> None:
self.req.reset_url_stack()
@@ -177,11 +174,7 @@ class ResourceHandler(
# reveal_type(_next_resource)
# print(f"[DEBUG] next_resource = {type(next_resource).__name__}")
if (
isinstance(_next_resource, RestResourceBase)
or isinstance(_next_resource, dict)
or type(_next_resource) in _T_SupportedRESTFields
):
if isinstance(_next_resource, (rest_resource.RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
next_resource_handler_cls: type[ResourceHandler] = self._get_resource_handler(_next_resource, self.req)
self.saved_url = self.req.consume_url_stack(self._nb_url_element_to_consume_)
@@ -292,7 +285,7 @@ class ResourceHandler_dict(
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
if self.prev_handler is not None and self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
@@ -308,63 +301,113 @@ class ResourceHandler_dict(
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
plugin_dict: ResourcePlugin_dict | None = None
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
if params.API_key is not None:
del _dict[dict_key_type(params.API_key)]
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(params.API_key, "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(params.API_key)
_dict_key = key_std
if plugin_dict:
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return
del _dict[_dict_key]
else:
if plugin_dict:
plugin_dict.handle_dict_delete_all(_dict, params)
return
_dict.clear()
return
def _handle_process_post(self, params) -> Optional[T_DictKey]:
# pylint: disable=protected-access
def _handle_process_post(self, params) -> Optional[T_DictKey]: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_handle_process_post()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
dict_value_type: T_T_DictValues = cast(RestResourceBase, self.prev_handler.resource)._dict_value_type_[
dict_value_type: T_T_DictValues = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_value_type_[
self.req.get_resource_origin(1)
]
_obj: T_DictValues
if not issubclass(dict_value_type, NoneType): # type: ignore # => mypy bug with type[None]
_obj = dict_value_type(**self.req.get_data()) # type: ignore # => mypy bug with type[None]
if issubclass(dict_value_type, rest_resource.RestResourceBase):
_obj = dict_value_type(**self.req.get_data())
_obj_restrsrc = cast(rest_resource.RestResourceBase, _obj)
for key, _ in _obj_restrsrc.model_fields.items():
if key in _obj_restrsrc._plugins_:
if isinstance(_obj_restrsrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _obj_restrsrc._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(_obj_restrsrc, key)
setattr(_obj_restrsrc, key, plugin_field.handle_field_put(value, params))
elif not issubclass(dict_value_type, NoneType): # type: ignore # => mypy bug with Type[None]
_obj = dict_value_type(**self.req.get_data()) # type: ignore # => mypy bug with Type[None]
else:
_obj = None
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey | None = None
# 1st try/ using request param provided dict API_key
if params.API_key is not None:
if issubclass(dict_key_type, bytes):
key_byte: bytes = dict_key_type(params.API_key, "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(params.API_key)
_dict_key = key_std
# if a primary key is set for the resource, updating it
if isinstance(_obj, RestResourceBase):
if isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_pri: T_DictKey = dict_key_type(params.API_key)
setattr(_obj, _obj._primary_key_, _pri)
# storing resource
_dict[dict_key_type(params.API_key)] = _obj
return dict_key_type(params.API_key)
setattr(_obj, _obj._primary_key_, _dict_key)
# 2nd try/ using provided resource internal primary key
# & 3rd try/ using resource internal auto-generated primary key
# => this case is automatic because if self.req.get_data() doesn't contain the key, it should be automatically created
if isinstance(_obj, RestResourceBase):
elif isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_obj_primary_key: Optional[T_DictKey] = getattr(_obj, _obj._primary_key_)
if _obj_primary_key is not None:
_dict[_obj_primary_key] = _obj
return _obj_primary_key
_dict_key = _obj_primary_key
if _dict_key is not None:
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_post(_dict, _dict_key, _obj, params)
_dict[_dict_key] = _obj
return _dict_key
raise RestResourceHandlerException_BadRequest(
"Either the object needs defined primary key or the request must contain an API_key param to process this command"
)
return None # for mypy....
@ResourceHandler.register_resource_handler
@@ -404,38 +447,28 @@ class ResourceHandler_dict_elem(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(1)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(0), "utf-8")
dict_byte: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key = key_byte
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_elem_get(dict_byte, key_byte, params)
return dict_byte[key_byte]
else:
key_std = dict_key_type(self.req.get_resource_origin(0))
dict_std: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key = key_std
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_elem_get(dict_std, key_std, params)
return dict_std[key_std]
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_elem_get(_dict, _dict_key, params)
return _dict[_dict_key]
def _handle_process_delete(self, params) -> None:
# print(f"{type(self).__name__}->_handle_process_delete()")
@@ -446,42 +479,35 @@ class ResourceHandler_dict_elem(
# because self.req is another context that is not saved to improve performances
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
dict_byte: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key = key_byte
if self.req.get_resource_origin(2) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict, self.prev_handler.resource._plugins_[self.req.get_resource_origin(2)]
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
plugin_dict.handle_dict_delete(dict_byte, key_byte, params)
return None
del dict_byte[key_byte]
else:
key_std = dict_key_type(self.req.get_resource_origin(1))
dict_std: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key = key_std
if self.req.get_resource_origin(2) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict, self.prev_handler.resource._plugins_[self.req.get_resource_origin(2)]
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
plugin_dict.handle_dict_delete(dict_std, key_std, params)
return None
del dict_std[key_std]
if self.req.get_resource_origin(2) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict, self.prev_handler.resource._plugins_[self.req.get_resource_origin(2)]
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return
del _dict[_dict_key]
return
@ResourceHandler.register_resource_handler
class ResourceHandler_RestResourceBase(
ResourceHandler[
RestResourceBase,
rest_resource.RestResourceBase,
_T_RestRequestParams_POST,
_T_RestRequestParams_DELETE,
RestRequestParams_RestResourceBase_GET,
@@ -507,7 +533,7 @@ class ResourceHandler_RestResourceBase(
def _check_resource_handler(cls, resource: _T_Resource, req: RestRequest) -> bool:
# print(f"{cls.__name__}->_check_resource_handler()")
return isinstance(resource, RestResourceBase)
return isinstance(resource, rest_resource.RestResourceBase)
def _check_access_rights(self) -> None:
super()._check_access_rights()
@@ -532,7 +558,7 @@ class ResourceHandler_RestResourceBase(
if self.resource.model_fields[self.req.get_resource_origin(0)].exclude is True and self.req.get_verb() is rsrc_verb.GET:
raise RestResourceHandlerException_ResourceNotFound(f"Not allowed READ access detected: {self.req.get_url_stack()}")
def _handle_process_get(self, params) -> RestResourceBase:
def _handle_process_get(self, params) -> rest_resource.RestResourceBase:
# print(f"{type(self).__name__}->_process_get()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
@@ -543,7 +569,7 @@ class ResourceHandler_RestResourceBase(
if len(self.req.get_url_stack()) == 0:
self.resource.check_acl_self(self.req, None)
for key, attr in self.resource.model_fields.items():
for key, _ in self.resource.model_fields.items():
if key in self.resource._plugins_:
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
@@ -561,7 +587,6 @@ class ResourceHandler_RestResourceBase(
return self.resource
# CASE 2: specific (operation) case for root Node
# TODO: this must probably be merged with the previous bloc
if self.req.get_resource_origin(0) == "/":
return self.resource
@@ -583,52 +608,84 @@ class ResourceHandler_RestResourceBase(
return value
def _handle_process_put(self, params) -> None:
def _handle_process_put(self, params) -> None: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_process_put()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.resource, rest_resource.RestResourceBase)
self.resource.check_acl_self(self.req, self.req.get_data())
# creating a copy of the current resource
_new_resrc = self.resource.copy()
# updating values based on nex data
# updating values based on new data
_new_resrc.update(**self.req.get_data())
# applying plugins (to nested element)
if isinstance(_new_resrc, RestResourceBase):
for key, attr in _new_resrc.model_fields.items():
if key in _new_resrc._plugins_:
if isinstance(_new_resrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _new_resrc._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(_new_resrc, key)
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
for key, _ in _new_resrc.model_fields.items():
if key in _new_resrc._plugins_:
if isinstance(_new_resrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _new_resrc._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(_new_resrc, key)
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
# applying plugins (from parent element)
if self.prev_handler is not None:
# element is within a dict
if (
isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
):
key = self.req.get_resource_origin(2)
if key in self.prev_handler.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(ResourcePlugin_dict, self.prev_handler.prev_handler.resource._plugins_[key])
plugin_dict.set_context(self.req, self.req.get_root_resource())
_new_resrc = plugin_dict.handle_dict_elem_put(_new_resrc, params)
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase, self.prev_handler.resource._plugins_[key]
)
plugin_rsrc.set_context(self.req, self.req.get_root_resource())
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
self.resource.update(**_new_resrc.__dict__)
return
# element is within a dict
if (
isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
key = self.req.get_resource_origin(2)
dict_key_type: T_T_DictKey = self.prev_handler.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.prev_handler.resource)
_dict_key: T_DictKey
if key in self.prev_handler.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(ResourcePlugin_dict, self.prev_handler.prev_handler.resource._plugins_[key])
plugin_dict.set_context(self.req, self.req.get_root_resource())
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(self.req.get_resource_origin(1))
_dict_key = key_std
plugin_dict.handle_dict_elem_put(_dict, _dict_key, _new_resrc, params)
else:
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(self.req.get_resource_origin(1))
_dict_key = key_std
if _dict_key not in _dict:
raise RuntimeError(f"Key not found: {str(_dict_key)}")
_dict[_dict_key] = _new_resrc
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, rest_resource.RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase, self.prev_handler.resource._plugins_[key]
)
plugin_rsrc.set_context(self.req, self.req.get_root_resource())
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
else:
self.resource.update(**_new_resrc.__dict__)
else:
raise RuntimeError("unsupported operation")
# print("***************")
# print(self.resource)
# print(_new_resrc)
# print(_new_resrc.__dict__)
def _handle_process_delete(self, params) -> None:
# print(f"{type(self).__name__}->_handle_process_delete()")
@@ -639,7 +696,7 @@ class ResourceHandler_RestResourceBase(
self.prev_handler is not None
and isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
self.prev_handler._process_delete()
else:
@@ -667,7 +724,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)
@@ -685,7 +742,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from typing import ClassVar, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from .rest_resource import RestResourceBase
from .rest_model import RestField
from .rest_ACL import ACL_target_user
from .rest_resource_plugin_login import ResourcePlugin_Login
from .rest_login import UserSession, Login
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_login import UserLogin
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie is not None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
# print(f"non-connected user {request.get_client()}")
return
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
already_failed = True
else:
pass # pylint: disable=unnecessary-pass
pass # pylint: disable=unnecessary-pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -12,7 +12,6 @@ from .rest_types import (
)
from .rest_exceptions import RestResourceConfigException
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_resource import RestResourceBase
@@ -31,14 +30,15 @@ if TYPE_CHECKING:
class ResourcePlugin(ABC):
def __init__(self) -> None:
...
self.__request: RestRequest
self.__root_resource: RestResourceBase
def set_context(self, request: RestRequest, root_resource: RestResourceBase) -> None:
self.__request: RestRequest = request
self.__root_resource: RestResourceBase = root_resource
self.__request = request
self.__root_resource = root_resource
def user_login(self, user_name: str, user_secret: str) -> str:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
@@ -54,7 +54,7 @@ class ResourcePlugin(ABC):
self.__request.reset_resp_cookie(key)
def get_new_cookie_expiration_date(self) -> datetime:
from .rest_login import RestResourceBaseLogin
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use get_new_cookie_expiration_date")
@@ -226,4 +226,6 @@ class ResourcePlugin_dict_default(ResourcePlugin_dict[_T_DictKey, _T_DictValues]
resource: _T_DictValues,
params: RestRequestParams_Dict_elem_PUT,
) -> None:
if key not in resource_dict:
raise RuntimeError(f"Key not found: {str(key)}")
resource_dict[key] = resource

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_login import UserLogin, Login
from .rest_exceptions import RestResourceLoginException_InvalidCredentials
if TYPE_CHECKING:
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource

View File

@@ -1,3 +1,5 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import (
cast,
@@ -6,7 +8,6 @@ from typing import (
TYPE_CHECKING,
)
from pydantic import BaseModel
from pydantic.fields import FieldInfo
from .rest_resource import RestResourceBase
@@ -41,7 +42,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
if not get_origin(datatype[1]) is None:
raise RestResourceModelException("complex dict types are not supported (should create a RestResourceBase container)")
if not datatype[0] in _T_SupportedRESTFields:
raise RestResourceModelException(f"Unsupported Dict Field value type in class (key)")
raise RestResourceModelException("Unsupported Dict Field value type in class (key)")
# preprocessing types / structure
if self.parent is not None and isinstance(self.parent, RestResourceWalker_Sub_RestResourceBase):
@@ -62,7 +63,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_dict: type[ResourcePlugin_dict] = self.resource.json_schema_extra["plugin"]
@@ -97,7 +98,7 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
# print("aaaaaaaaaa")
@@ -152,7 +153,7 @@ class RestResourceWalker_Sub_RestResourceBase__tree_init(RestResourceWalker_Sub_
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_resource: type[ResourcePlugin_RestResourceBase] = self.resource.json_schema_extra["plugin"]

View File

@@ -5,7 +5,6 @@ from typing import (
get_args,
get_origin,
TypeVar,
Type,
Generic,
TYPE_CHECKING,
)
@@ -36,22 +35,22 @@ class RestResourceWalkerFutureResult(ABC, Generic[TV_RestResourceWalkerFutureRes
class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
# pylint: disable=too-many-instance-attributes
cls_RestResourceWalkerFutureResult: Optional[type[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
@classmethod
@abstractmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
"""implementation interface to Factory.
The factory will call this specialized method on each implementation to find a supported one.
"""
...
@classmethod
def get(
self,
cls,
subs: list[type[RestResourceWalker_Sub]],
resource_name: str,
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
parent: Optional[RestResourceWalker_Sub] = None,
argument: Optional[Any] = None,
) -> Optional[RestResourceWalker_Sub]:
@@ -61,12 +60,11 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
if _is_valid is True:
return sub(resource_name, resource, parent, _anno, _optional, argument)
raise RestResourceModelException(f"Incompatible Field Found: {type(resource).__name__}")
return None
def __init__(
self,
resource_name: str,
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
parent: Optional[RestResourceWalker_Sub] = None,
annotation: Optional[type[RestResourceBase]] = None,
_optional: Optional[bool] = None,
@@ -74,7 +72,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
):
self.argument: Any = argument
self.resource_name: str = resource_name
self.resource: FieldInfo | Type[RestResourceBase] = resource
self.resource: FieldInfo | type[RestResourceBase] = resource
self.parent: Optional[RestResourceWalker_Sub] = parent
self.future_results_subs: Optional[list[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
@@ -102,7 +100,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
def chain_process_future(self) -> Optional[TV_RestResourceWalkerFutureResult]:
if self.future_results_subs is not None and self.future_result is not None:
return_future_results_subs: list[Any] = [] # TODO: use typevar
return_future_results_subs: list[Any] = []
for future_result in self.future_results_subs:
return_future_results_subs.append(future_result.chain_process_future())
return self.future_result.process_future(return_future_results_subs)
@@ -124,11 +122,11 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
@staticmethod
def ProcessAnnotation(
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
) -> tuple[type[Any], bool]:
# from .rest_resource import RestResourceBase
_anno: Type[Any]
_anno: type[Any]
# print("!!!!!!!!!!!!!!!!!!!!!!!")
# print(resource)
@@ -159,7 +157,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
_type_resource = get_origin(_anno)
return (_type_resource is dict), _anno, _optional
@@ -175,7 +173,7 @@ class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
return (_anno in _T_SupportedRESTFields), _anno, _optional
@@ -185,7 +183,7 @@ class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
return (
((get_origin(_anno) is None) and issubclass(_anno, RestResourceBase)),
@@ -201,15 +199,15 @@ class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
class RestResourceWalker_Root:
cls_RestResourceWalker_Sub: list[Type[RestResourceWalker_Sub]] = [
cls_RestResourceWalker_Sub: list[type[RestResourceWalker_Sub]] = [
RestResourceWalker_Sub_T_Dict,
RestResourceWalker_Sub_RestFields,
RestResourceWalker_Sub_RestResourceBase,
]
def __init__(self, resource: RestResourceBase | Type[RestResourceBase]) -> None:
def __init__(self, resource: RestResourceBase | type[RestResourceBase]) -> None:
self.subwalker_argument: Any = None
self.resource: Type[RestResourceBase]
self.resource: type[RestResourceBase]
if isinstance(resource, RestResourceBase):
self.resource = type(resource)
else:
@@ -225,7 +223,7 @@ class RestResourceWalker_Root:
if sub_walker_initial is not None:
sub_walker_initial.process()
sub_walker_initial.get_future()
resource_list: list[tuple[str, FieldInfo | Type[RestResourceBase], RestResourceWalker_Sub]] = [
resource_list: list[tuple[str, FieldInfo | type[RestResourceBase], RestResourceWalker_Sub]] = [
(subresource_name, subresource, sub_walker_initial)
for subresource_name, subresource in sub_walker_initial.get_sub_resources()
]
@@ -252,5 +250,4 @@ class RestResourceWalker_Root:
resource_list = list(new_resource_list)
current_deep = current_deep + 1
return sub_walker_initial.chain_process_future()
else:
raise RestResourceModelException("Invalid Rootpoint")
raise RestResourceModelException("Invalid Rootpoint")

View File

@@ -1,6 +1,6 @@
# pylint: disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
from __future__ import annotations
from typing import Union, get_origin, NewType, TypeVar, Type, TYPE_CHECKING
from typing import Union, get_origin, NewType, TypeVar, TYPE_CHECKING
from enum import Enum, auto
from datetime import datetime
@@ -91,8 +91,8 @@ _T_DictValues = TypeVar(
NoneType,
)
T_T_FieldValue = Type[T_FieldValue]
T_T_DictValues = Type[T_DictValues]
T_T_FieldValue = type[T_FieldValue]
T_T_DictValues = type[T_DictValues]
T_Dict = dict[T_DictKey, T_DictValues]
_T_Dict = dict[_T_DictKey, _T_DictValues]

View File

@@ -480,7 +480,7 @@ class Test_RestAPI_PERFO(unittest.TestCase):
init_classes()
self.testapp = RootApp()
# @unittest.skip
@unittest.skip
def test_perf_dict(self):
print(f"LIB INTERNAL PERF TEST")
n_loop = 10000

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import unittest
from os import chdir
from pathlib import Path
from typing import Annotated
from typing import Optional
from src.pyrestresource import (
RestField,
@@ -14,7 +14,9 @@ from src.pyrestresource import (
RestRequestParams_Dict_GET,
RestRequestParams_PUT,
RestRequestParams_Dict_elem_GET,
RestRequestParams_Dict_elem_PUT,
RestRequestParams_Dict_DELETE,
RestRequestParams_Dict_POST,
T_SupportedRESTFields,
ResourcePlugin_dict_default,
)
@@ -38,9 +40,6 @@ def init_classes():
resource_dict: dict[str, Test_Record],
params: RestRequestParams_Dict_GET,
) -> list[str]:
# print("HOOK handle_dict_get_keys")
# print(resource_dict)
# print(params)
result = super().handle_dict_get_keys(resource_dict, params)
if self.static_Test_Record_active is True:
result.append("static_elem")
@@ -52,10 +51,6 @@ def init_classes():
key: str,
params: RestRequestParams_Dict_elem_GET,
) -> Test_Record:
# print("HOOK handle_dict_elem_get")
# print(resource_dict)
# print(key)
# print(params)
if key == "static_elem":
if self.static_Test_Record_active is True:
return self.static_Test_Record
@@ -69,15 +64,43 @@ def init_classes():
key: str,
params: RestRequestParams_Dict_DELETE[str],
) -> None:
print("HOOK handle_dict_delete")
print(resource_dict)
print(key)
print(params)
print(self.static_Test_Record_active)
if key == "static_elem":
self.static_Test_Record_active = False
else:
del resource_dict[key]
super().handle_dict_delete(resource_dict, key, params)
def handle_dict_delete_all(
self,
resource_dict: dict[str, Test_Record],
params: RestRequestParams_Dict_DELETE[str],
) -> None:
self.static_Test_Record_active = False
super().handle_dict_delete_all(resource_dict, params)
def handle_dict_elem_put(
self,
resource_dict: dict[str, Test_Record],
key: str,
resource: Test_Record,
params: RestRequestParams_Dict_elem_PUT,
) -> None:
if key == "static_elem":
if self.static_Test_Record_active is True:
self.static_Test_Record = resource
else:
raise RuntimeError("Key Not Found")
else:
super().handle_dict_elem_put(resource_dict, key, resource, params)
def handle_dict_post(
self,
resource_dict: dict[str, Test_Record],
key: str,
resource: Test_Record,
params: RestRequestParams_Dict_POST[str],
) -> Optional[str]:
resource.test_int = resource.test_int + 1
return super().handle_dict_post(resource_dict, key, resource, params)
@register_rest_rootpoint
class RootApp(RestResourceBase):
@@ -116,14 +139,55 @@ class Test_RestAPI_Plugin_Dict(unittest.TestCase):
self.assertEqual(result.get_result(), '{"test_str": "mytest", "test_int": 84}')
def test_delete_dict_elems(self):
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
print(result.get_result())
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.DELETE)
print(result.get_result())
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_delete_all_dict_elems(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_delete_dict_elems_API_key(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
self.testapp.process_request("/str_dict_Test_Record?API_key=static_elem", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_put_dict_elem(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.PUT, '{"test_str": "Hi", "test_int": 43}')
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi", "test_int": 43}')
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.PUT, '{"test_str": "Hi you", "test_int": 7}')
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi you", "test_int": 7}')
def test_post_dict_elem(self):
result = self.testapp.process_request("/str_dict_Test_Record?API_key=newval", rsrc_verb.POST, '{"test_str": "Hi2", "test_int": 77}')
self.assertEqual(result.get_result(), '"newval"')
result = self.testapp.process_request("/str_dict_Test_Record/newval", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi2", "test_int": 78}')
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["test", "newval", "static_elem"]')

View File

@@ -262,7 +262,7 @@ class Test_RestAPI_WebServer(unittest.TestCase):
s.close()
server.stop()
# @unittest.skip
@unittest.skip
def test_perf_dict(self):
print(f"SOCKET PERF TEST")
n_loop = 10000