Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7625461929 | |||
|
|
8ba0ab5f5e | ||
|
|
c05b541148 | ||
|
|
afaebec633 | ||
|
|
f677f7bf28 | ||
|
|
6cc6056220 | ||
|
|
de71a19956 | ||
|
|
3b358ab49c |
@@ -3,5 +3,4 @@ encoding//src/pyrestresource/__init__.py=utf-8
|
||||
encoding//src/pyrestresource/__metadata__.py=utf-8
|
||||
encoding//src/pyrestresource/rest_login.py=utf-8
|
||||
encoding//src/pyrestresource/rest_resource.py=utf-8
|
||||
encoding//src/pyrestresource/rest_resource_handler_walker.py=utf-8
|
||||
encoding/<project>=UTF-8
|
||||
|
||||
4
Jenkinsfile
vendored
4
Jenkinsfile
vendored
@@ -184,7 +184,7 @@ pipeline {
|
||||
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
|
||||
|
||||
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
|
||||
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
|
||||
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
|
||||
|
||||
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
|
||||
|
||||
@@ -546,7 +546,7 @@ pipeline {
|
||||
dir("gitrepo") {
|
||||
junit 'helpers-results/cl_unit_test/*.xml'
|
||||
// using cobertura format (= coverage xml format)
|
||||
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
|
||||
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
|
||||
publishHTML([
|
||||
reportDir: "helpers-results/cl_unit_test_coverage",
|
||||
reportFiles: "index.html",
|
||||
|
||||
@@ -50,6 +50,9 @@ where = ["src"]
|
||||
[tool.setuptools.package-data]
|
||||
"pyrestresource" = ["py.typed"]
|
||||
|
||||
[tool.pylint.main]
|
||||
disable = ["missing-class-docstring","missing-function-docstring","missing-module-docstring"]
|
||||
|
||||
# [[tool.mypy.overrides]]
|
||||
# module = ""
|
||||
# ignore_missing_imports = true
|
||||
@@ -64,6 +67,11 @@ concurrency = [
|
||||
'thread'
|
||||
]
|
||||
|
||||
[tool.coverage.report]
|
||||
exclude_also = [
|
||||
"if TYPE_CHECKING:",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://chacha.ddns.net/gitea/chacha/pyrestresource"
|
||||
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/pyrestresource/master/latest/"
|
||||
|
||||
@@ -31,6 +31,8 @@ from .rest_request_opt import (
|
||||
RestRequestParams_Dict_POST,
|
||||
RestRequestParams_Dict_DELETE,
|
||||
RestRequestParams_Dict_GET,
|
||||
RestRequestParams_Dict_elem_GET,
|
||||
RestRequestParams_Dict_elem_PUT,
|
||||
)
|
||||
from .rest_resource_plugin import (
|
||||
ResourcePlugin_field_default,
|
||||
@@ -39,10 +41,9 @@ from .rest_resource_plugin import (
|
||||
)
|
||||
from .rest_ACL import ACL_target_user, ACL_target_group, ACL_target_group_Any, ACL_record, ACL_rule
|
||||
from .rest_resource import RestResourceBase
|
||||
from .rest_login import (
|
||||
RestResourceBaseLogin,
|
||||
UserLogin,
|
||||
)
|
||||
from .rest_login import UserLogin
|
||||
|
||||
from .rest_resource_login import RestResourceBaseLogin
|
||||
from .rest_exceptions import (
|
||||
RestResourceException,
|
||||
RestResourceLoginException,
|
||||
|
||||
@@ -36,5 +36,4 @@ def parse_dict_cookies(cookies: str) -> dict[str, str | None]:
|
||||
def forward_exception(e: Exception, forward: bool) -> None:
|
||||
if forward:
|
||||
raise e from None
|
||||
else:
|
||||
traceback.print_exc()
|
||||
traceback.print_exc()
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from pydantic import BaseModel
|
||||
from enum import Enum, auto
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .rest_types import rsrc_verb
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from .rest_login import UserLogin
|
||||
|
||||
|
||||
|
||||
@@ -12,27 +12,20 @@
|
||||
|
||||
"""CLI interface module"""
|
||||
from __future__ import annotations
|
||||
from typing import Optional, ClassVar, TYPE_CHECKING
|
||||
from typing import Optional, TYPE_CHECKING
|
||||
|
||||
from secrets import token_hex, compare_digest
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .rest_types import rsrc_verb
|
||||
from .rest_resource import RestResourceBase
|
||||
from .rest_model import RestField
|
||||
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule, ACL_target_user
|
||||
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
|
||||
from .rest_exceptions import (
|
||||
RestResourceLoginException_InvalidCredentials,
|
||||
RestResourceLoginException_ClientChange,
|
||||
RestResourceLoginException_SessionTimeout,
|
||||
RestResourceLoginException_InvalidSession,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
from .rest_request import RestRequest
|
||||
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
|
||||
# from .rest_resource import RestResourceBase
|
||||
from . import rest_resource
|
||||
from .rest_model import RestField
|
||||
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
class UserLogin(BaseModel):
|
||||
@@ -46,21 +39,7 @@ class UserSession(BaseModel):
|
||||
client: tuple[str, int] | tuple[()] | None
|
||||
|
||||
|
||||
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
|
||||
ar_UserLogin: list[UserLogin] = []
|
||||
|
||||
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
|
||||
return Login(username=self.get_user_login(), secret=None)
|
||||
|
||||
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
|
||||
if resource.username is None or resource.secret is None:
|
||||
raise RestResourceLoginException_InvalidCredentials()
|
||||
token = self.user_login(resource.username, resource.secret)
|
||||
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
|
||||
return resource
|
||||
|
||||
|
||||
class Login(RestResourceBase):
|
||||
class Login(rest_resource.RestResourceBase):
|
||||
username: Optional[str] = RestField(None)
|
||||
secret: Optional[str] = RestField(
|
||||
None,
|
||||
@@ -70,64 +49,3 @@ class Login(RestResourceBase):
|
||||
ACL_record(verbs=[rsrc_verb.GET], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class RestResourceBaseLogin(RestResourceBase):
|
||||
_ar_user_login: ClassVar[list[UserLogin]] = []
|
||||
_ar_user_session: dict[str, UserSession] = {}
|
||||
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
|
||||
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
|
||||
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
|
||||
|
||||
def get_new_cookie_expiration_date(self) -> datetime:
|
||||
return datetime.now() + self._max_session_time
|
||||
|
||||
def _process_request_session(self, request: RestRequest) -> None:
|
||||
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
|
||||
# print(f"[TRACE] request: {id(request)}")
|
||||
auth_cookie = request.get_cookie("Authorization")
|
||||
if auth_cookie != None:
|
||||
if auth_cookie in self._ar_user_session:
|
||||
# print(f"SESSION FOUND for {request.get_client()}")
|
||||
|
||||
if self._ar_user_session[auth_cookie].client != request.get_client():
|
||||
del self._ar_user_session[auth_cookie]
|
||||
raise RestResourceLoginException_ClientChange()
|
||||
|
||||
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
|
||||
if time_diff > self._max_session_inactive:
|
||||
del self._ar_user_session[auth_cookie]
|
||||
raise RestResourceLoginException_SessionTimeout()
|
||||
|
||||
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
|
||||
# print("SESSION RECOVERED")
|
||||
return
|
||||
|
||||
raise RestResourceLoginException_InvalidSession()
|
||||
return
|
||||
|
||||
# print(f"non-connected user {request.get_client()}")
|
||||
|
||||
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
|
||||
already_failed: bool = False
|
||||
|
||||
for iter_user_login in self._ar_user_login:
|
||||
username_ok: bool = compare_digest(user_name, iter_user_login.username)
|
||||
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
|
||||
|
||||
if username_ok is True:
|
||||
if secret_ok is True and not already_failed:
|
||||
return self._register_user_session(iter_user_login, request)
|
||||
else:
|
||||
already_failed = True
|
||||
else:
|
||||
pass
|
||||
pass
|
||||
|
||||
raise RestResourceLoginException_InvalidCredentials()
|
||||
|
||||
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
|
||||
token = token_hex(16)
|
||||
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
|
||||
self._ar_user_session[f"Bearer {token}"] = new_user_session
|
||||
return token
|
||||
|
||||
@@ -1,40 +1,34 @@
|
||||
from __future__ import annotations
|
||||
from typing import (
|
||||
Any,
|
||||
Literal,
|
||||
Callable,
|
||||
Optional,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from typing import Literal, Any, Callable, TYPE_CHECKING
|
||||
|
||||
from pydantic.fields import Field, _Unset, PydanticUndefined
|
||||
|
||||
from .rest_exceptions import RestResourceModelException
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from .rest_ACL import ACL_record
|
||||
from .rest_resource_plugin import ResourcePlugin
|
||||
from typing import Unpack
|
||||
from pydantic.fields import _EmptyKwargs, AliasPath, AliasChoices
|
||||
|
||||
|
||||
def RestField(
|
||||
def RestField( # pylint: disable=too-many-locals
|
||||
default: Any = PydanticUndefined,
|
||||
*,
|
||||
default_factory: Callable[[], Any] | None = _Unset,
|
||||
alias: str | None = _Unset,
|
||||
alias_priority: int | None = _Unset,
|
||||
validation_alias: str | AliasPath | AliasChoices | None = _Unset,
|
||||
validation_alias: str | "AliasPath" | "AliasChoices" | None = _Unset,
|
||||
serialization_alias: str | None = _Unset,
|
||||
title: str | None = _Unset,
|
||||
description: str | None = _Unset,
|
||||
examples: list[Any] | None = _Unset,
|
||||
examples: "list[Any] | None" = _Unset,
|
||||
exclude: bool | None = _Unset,
|
||||
discriminator: str | None = _Unset,
|
||||
json_schema_extra: dict[str, Any] | Callable[[dict[str, Any]], None] | None = _Unset,
|
||||
frozen: bool | None = _Unset,
|
||||
validate_default: bool | None = _Unset,
|
||||
repr: bool = _Unset,
|
||||
repr: bool = _Unset, # pylint: disable=redefined-builtin
|
||||
init_var: bool | None = _Unset,
|
||||
kw_only: bool | None = _Unset,
|
||||
pattern: str | None = _Unset,
|
||||
@@ -50,8 +44,8 @@ def RestField(
|
||||
min_length: int | None = _Unset,
|
||||
max_length: int | None = _Unset,
|
||||
union_mode: Literal["smart", "left_to_right"] = _Unset,
|
||||
ACL: Optional[list[ACL_record]] = _Unset,
|
||||
plugin: Optional[type[ResourcePlugin]] = _Unset,
|
||||
ACL: list["ACL_record"] | None = _Unset,
|
||||
plugin: type["ResourcePlugin"] | None = _Unset,
|
||||
**extra: Unpack[_EmptyKwargs],
|
||||
) -> Any:
|
||||
if not json_schema_extra or json_schema_extra is _Unset:
|
||||
|
||||
@@ -13,7 +13,7 @@ from urllib.parse import urlparse, parse_qs
|
||||
from pydantic import BaseModel, Field
|
||||
from typeguard import check_type
|
||||
|
||||
from .rest_login import RestResourceBaseLogin
|
||||
|
||||
from .rest_types import rsrc_verb, T_AllSupportedFields
|
||||
from .rest_request_opt import (
|
||||
RestRequestParams_POST,
|
||||
@@ -35,7 +35,7 @@ from .rest_exceptions import (
|
||||
RestResourceConfigException,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional
|
||||
from .rest_types import T_SupportedRESTFields
|
||||
from .rest_resource import RestResourceBase
|
||||
@@ -96,11 +96,11 @@ class RequestFactory(
|
||||
request.update_ReqParams(self.cls_RestRequestParams_DELETE)
|
||||
else:
|
||||
raise RestResourceHandlerException_MethodNotAllowed("Invalid Verb")
|
||||
return
|
||||
|
||||
|
||||
class RestRequest(Generic[_T_RestRequestParams]):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
# pylint: disable=too-many-public-methods
|
||||
"""Main RestRequets class"""
|
||||
|
||||
def __init__(
|
||||
@@ -206,15 +206,15 @@ class RestRequest(Generic[_T_RestRequestParams]):
|
||||
return None
|
||||
if isinstance(self.headers["cookie"], dict):
|
||||
return self.headers["cookie"][key]
|
||||
else:
|
||||
return None
|
||||
return None
|
||||
|
||||
def set_resp_cookie_value(self, key: str, value: str) -> None:
|
||||
from .rest_resource_login import RestResourceBaseLogin
|
||||
|
||||
if not isinstance(self.root_resource, RestResourceBaseLogin):
|
||||
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
|
||||
self.outgoing_cookie[
|
||||
key
|
||||
] = f"{value}; expires={self.root_resource.get_new_cookie_expiration_date().strftime('%a, %d %b %Y %H:%M:%S GMT')}; path=/; HttpOnly"
|
||||
expire_date = self.root_resource.get_new_cookie_expiration_date().strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
self.outgoing_cookie[key] = f"{value}; expires={expire_date}; path=/; HttpOnly"
|
||||
|
||||
def reset_resp_cookie(self, key: str) -> None:
|
||||
self.outgoing_cookie[key] = "null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT"
|
||||
|
||||
@@ -9,7 +9,7 @@ from .rest_types import (
|
||||
_T_DictKey,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
|
||||
@@ -8,10 +8,11 @@ from typing import (
|
||||
|
||||
from abc import ABC
|
||||
import json
|
||||
import pprint
|
||||
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
from .rest_types import rsrc_verb
|
||||
from .helpers import _JSONEncoder, forward_exception
|
||||
|
||||
@@ -35,7 +36,8 @@ from .rest_exceptions import (
|
||||
RestResourceException,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .rest_request import RestRequest
|
||||
from .rest_types import T_SupportedRESTFields
|
||||
from .rest_resource_plugin import ResourcePlugin
|
||||
@@ -43,6 +45,9 @@ if TYPE_CHECKING is True:
|
||||
T_T_DictKey,
|
||||
T_T_DictValues,
|
||||
)
|
||||
from .rest_resource_handler import (
|
||||
ResourceHandler,
|
||||
)
|
||||
|
||||
|
||||
class RestResourceBase(ABC, BaseModel, validate_assignment=True):
|
||||
@@ -53,7 +58,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
|
||||
_plugins_: ClassVar[
|
||||
dict[
|
||||
str,
|
||||
type[ResourcePlugin],
|
||||
ResourcePlugin,
|
||||
]
|
||||
] = {}
|
||||
_ACL_record_: ClassVar[
|
||||
@@ -96,7 +101,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
|
||||
def check_acl_self(self, request: RestRequest, new_data: Optional[dict[str, T_SupportedRESTFields]]) -> None:
|
||||
"""Check ACL on requested field operation (involving checking sub-fields)"""
|
||||
if request.get_verb() is rsrc_verb.GET:
|
||||
for key in self.model_fields.keys():
|
||||
for key in self.model_fields:
|
||||
self._check_acl(request.user, request.groups, rsrc_verb.GET, key)
|
||||
elif request.get_verb() is rsrc_verb.PUT:
|
||||
if new_data is not None:
|
||||
@@ -178,7 +183,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
|
||||
def _process_request_session(self, request: RestRequest) -> None:
|
||||
pass
|
||||
|
||||
def process_request(
|
||||
def process_request( # pylint: disable=too-complex
|
||||
self,
|
||||
url: str,
|
||||
verb: rsrc_verb = rsrc_verb.GET,
|
||||
@@ -188,10 +193,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
|
||||
headers: Optional[list[Any]] = None,
|
||||
http_mode: bool = False,
|
||||
) -> RestRequest:
|
||||
from .rest_resource_handler import (
|
||||
ResourceHandler,
|
||||
ResourceHandler_RestResourceBase,
|
||||
)
|
||||
from .rest_resource_handler import ResourceHandler_RestResourceBase
|
||||
|
||||
data: dict = {}
|
||||
if data_json:
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
# pylint: disable=protected-access
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Optional, cast, TypeVar, Generic, Self, TYPE_CHECKING
|
||||
|
||||
@@ -12,7 +14,10 @@ from .rest_types import (
|
||||
T_Dict,
|
||||
T_DictValues,
|
||||
)
|
||||
from .rest_resource import RestResourceBase
|
||||
|
||||
# from .rest_resource import RestResourceBase
|
||||
from . import rest_resource
|
||||
|
||||
from .rest_request import RequestFactory
|
||||
from .rest_resource_plugin import (
|
||||
ResourcePlugin_field,
|
||||
@@ -40,14 +45,14 @@ from .rest_exceptions import (
|
||||
RestResourceHandlerException_ResourceNotFound,
|
||||
RestResourceHandlerException_MethodNotAllowed,
|
||||
RestResourceHandlerException_BadRequest,
|
||||
RestResourceHandlerException_Forbiden,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from .rest_types import T_T_DictKey, T_T_DictValues
|
||||
from .rest_request import RestRequest
|
||||
|
||||
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, RestResourceBase)
|
||||
|
||||
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, rest_resource.RestResourceBase)
|
||||
|
||||
|
||||
class ResourceHandler(
|
||||
@@ -103,7 +108,7 @@ class ResourceHandler(
|
||||
raise RestResourceHandlerException("if req not set, url,verb must be setted")
|
||||
else:
|
||||
assert url is not None and verb is not None
|
||||
assert isinstance(resource, RestResourceBase)
|
||||
assert isinstance(resource, rest_resource.RestResourceBase)
|
||||
if data is None:
|
||||
data = {}
|
||||
self.req = self._request_factory.get_RestRequest(resource, url, verb, data, query_string)
|
||||
@@ -143,14 +148,6 @@ class ResourceHandler(
|
||||
resource_handler = self._find_resource()
|
||||
return resource_handler._process_verb()
|
||||
|
||||
def access_resource(
|
||||
self,
|
||||
) -> _T_Resource:
|
||||
# print(f"[TRACE] {type(self).__name__}->access_resource()")
|
||||
self._reset_context()
|
||||
resource_handler = self._find_resource()
|
||||
return resource_handler.resource
|
||||
|
||||
def _reset_context(self) -> None:
|
||||
self.req.reset_url_stack()
|
||||
|
||||
@@ -177,11 +174,7 @@ class ResourceHandler(
|
||||
# reveal_type(_next_resource)
|
||||
# print(f"[DEBUG] next_resource = {type(next_resource).__name__}")
|
||||
|
||||
if (
|
||||
isinstance(_next_resource, RestResourceBase)
|
||||
or isinstance(_next_resource, dict)
|
||||
or type(_next_resource) in _T_SupportedRESTFields
|
||||
):
|
||||
if isinstance(_next_resource, (rest_resource.RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
|
||||
next_resource_handler_cls: type[ResourceHandler] = self._get_resource_handler(_next_resource, self.req)
|
||||
|
||||
self.saved_url = self.req.consume_url_stack(self._nb_url_element_to_consume_)
|
||||
@@ -292,6 +285,14 @@ class ResourceHandler_dict(
|
||||
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
|
||||
|
||||
if self.prev_handler is not None and self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(
|
||||
ResourcePlugin_dict,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
|
||||
)
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
return plugin_dict.handle_dict_get_keys(self.resource, params)
|
||||
|
||||
return list(_dict.keys())
|
||||
|
||||
def _handle_process_delete(self, params) -> None:
|
||||
@@ -300,63 +301,113 @@ class ResourceHandler_dict(
|
||||
|
||||
assert self.prev_handler is not None
|
||||
|
||||
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
|
||||
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
|
||||
self.req.get_resource_origin(1)
|
||||
]
|
||||
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
|
||||
_dict_key: T_DictKey
|
||||
|
||||
plugin_dict: ResourcePlugin_dict | None = None
|
||||
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
plugin_dict = cast(
|
||||
ResourcePlugin_dict,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
|
||||
)
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
|
||||
if params.API_key is not None:
|
||||
del _dict[dict_key_type(params.API_key)]
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte = dict_key_type(params.API_key, "utf-8")
|
||||
_dict_key = key_byte
|
||||
else:
|
||||
key_std = dict_key_type(params.API_key)
|
||||
_dict_key = key_std
|
||||
|
||||
if plugin_dict:
|
||||
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
|
||||
return
|
||||
del _dict[_dict_key]
|
||||
|
||||
else:
|
||||
if plugin_dict:
|
||||
plugin_dict.handle_dict_delete_all(_dict, params)
|
||||
return
|
||||
_dict.clear()
|
||||
return
|
||||
|
||||
def _handle_process_post(self, params) -> Optional[T_DictKey]:
|
||||
# pylint: disable=protected-access
|
||||
|
||||
def _handle_process_post(self, params) -> Optional[T_DictKey]: # pylint: disable=too-complex,too-many-branches
|
||||
# print(f"{type(self).__name__}->_handle_process_post()")
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
assert self.prev_handler is not None
|
||||
|
||||
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
|
||||
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
|
||||
self.req.get_resource_origin(1)
|
||||
]
|
||||
|
||||
dict_value_type: T_T_DictValues = cast(RestResourceBase, self.prev_handler.resource)._dict_value_type_[
|
||||
dict_value_type: T_T_DictValues = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_value_type_[
|
||||
self.req.get_resource_origin(1)
|
||||
]
|
||||
|
||||
_obj: T_DictValues
|
||||
if not issubclass(dict_value_type, NoneType): # type: ignore # => mypy bug with type[None]
|
||||
_obj = dict_value_type(**self.req.get_data()) # type: ignore # => mypy bug with type[None]
|
||||
if issubclass(dict_value_type, rest_resource.RestResourceBase):
|
||||
_obj = dict_value_type(**self.req.get_data())
|
||||
_obj_restrsrc = cast(rest_resource.RestResourceBase, _obj)
|
||||
|
||||
for key, _ in _obj_restrsrc.model_fields.items():
|
||||
if key in _obj_restrsrc._plugins_:
|
||||
if isinstance(_obj_restrsrc._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _obj_restrsrc._plugins_[key])
|
||||
plugin_field.set_context(self.req, self.req.get_root_resource())
|
||||
value = getattr(_obj_restrsrc, key)
|
||||
setattr(_obj_restrsrc, key, plugin_field.handle_field_put(value, params))
|
||||
|
||||
elif not issubclass(dict_value_type, NoneType): # type: ignore # => mypy bug with Type[None]
|
||||
_obj = dict_value_type(**self.req.get_data()) # type: ignore # => mypy bug with Type[None]
|
||||
else:
|
||||
_obj = None
|
||||
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
|
||||
_dict_key: T_DictKey | None = None
|
||||
|
||||
# 1st try/ using request param provided dict API_key
|
||||
if params.API_key is not None:
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte: bytes = dict_key_type(params.API_key, "utf-8")
|
||||
_dict_key = key_byte
|
||||
else:
|
||||
key_std = dict_key_type(params.API_key)
|
||||
_dict_key = key_std
|
||||
|
||||
# if a primary key is set for the resource, updating it
|
||||
if isinstance(_obj, RestResourceBase):
|
||||
if isinstance(_obj, rest_resource.RestResourceBase):
|
||||
if _obj._primary_key_ is not None:
|
||||
_pri: T_DictKey = dict_key_type(params.API_key)
|
||||
setattr(_obj, _obj._primary_key_, _pri)
|
||||
# storing resource
|
||||
_dict[dict_key_type(params.API_key)] = _obj
|
||||
return dict_key_type(params.API_key)
|
||||
setattr(_obj, _obj._primary_key_, _dict_key)
|
||||
|
||||
# 2nd try/ using provided resource internal primary key
|
||||
# & 3rd try/ using resource internal auto-generated primary key
|
||||
# => this case is automatic because if self.req.get_data() doesn't contain the key, it should be automatically created
|
||||
if isinstance(_obj, RestResourceBase):
|
||||
elif isinstance(_obj, rest_resource.RestResourceBase):
|
||||
if _obj._primary_key_ is not None:
|
||||
_obj_primary_key: Optional[T_DictKey] = getattr(_obj, _obj._primary_key_)
|
||||
if _obj_primary_key is not None:
|
||||
_dict[_obj_primary_key] = _obj
|
||||
return _obj_primary_key
|
||||
_dict_key = _obj_primary_key
|
||||
|
||||
if _dict_key is not None:
|
||||
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(
|
||||
ResourcePlugin_dict,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
|
||||
)
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
return plugin_dict.handle_dict_post(_dict, _dict_key, _obj, params)
|
||||
_dict[_dict_key] = _obj
|
||||
return _dict_key
|
||||
|
||||
raise RestResourceHandlerException_BadRequest(
|
||||
"Either the object needs defined primary key or the request must contain an API_key param to process this command"
|
||||
)
|
||||
return None # for mypy....
|
||||
|
||||
|
||||
@ResourceHandler.register_resource_handler
|
||||
@@ -396,15 +447,28 @@ class ResourceHandler_dict_elem(
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
assert self.prev_handler is not None
|
||||
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
|
||||
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
|
||||
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(1)]
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
|
||||
_dict_key: T_DictKey
|
||||
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte = dict_key_type(self.req.get_resource_origin(0), "utf-8")
|
||||
return cast(dict[T_DictKey, T_DictValues], self.resource)[key_byte]
|
||||
_dict_key = key_byte
|
||||
|
||||
else:
|
||||
key = dict_key_type(self.req.get_resource_origin(0))
|
||||
return cast(dict[T_DictKey, T_DictValues], self.resource)[key]
|
||||
key_std = dict_key_type(self.req.get_resource_origin(0))
|
||||
_dict_key = key_std
|
||||
|
||||
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(
|
||||
ResourcePlugin_dict,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
|
||||
)
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
return plugin_dict.handle_dict_elem_get(_dict, _dict_key, params)
|
||||
return _dict[_dict_key]
|
||||
|
||||
def _handle_process_delete(self, params) -> None:
|
||||
# print(f"{type(self).__name__}->_handle_process_delete()")
|
||||
@@ -415,21 +479,35 @@ class ResourceHandler_dict_elem(
|
||||
# because self.req is another context that is not saved to improve performances
|
||||
|
||||
assert self.prev_handler is not None
|
||||
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
|
||||
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(2)]
|
||||
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
|
||||
_dict_key: T_DictKey
|
||||
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
|
||||
del cast(dict[T_DictKey, T_DictValues], self.resource)[key_byte]
|
||||
_dict_key = key_byte
|
||||
|
||||
else:
|
||||
key = dict_key_type(self.req.get_resource_origin(1))
|
||||
del cast(dict[T_DictKey, T_DictValues], self.resource)[key]
|
||||
key_std = dict_key_type(self.req.get_resource_origin(1))
|
||||
_dict_key = key_std
|
||||
|
||||
if self.req.get_resource_origin(2) in self.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(
|
||||
ResourcePlugin_dict, self.prev_handler.resource._plugins_[self.req.get_resource_origin(2)]
|
||||
)
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
|
||||
return
|
||||
del _dict[_dict_key]
|
||||
return
|
||||
|
||||
|
||||
@ResourceHandler.register_resource_handler
|
||||
class ResourceHandler_RestResourceBase(
|
||||
ResourceHandler[
|
||||
RestResourceBase,
|
||||
rest_resource.RestResourceBase,
|
||||
_T_RestRequestParams_POST,
|
||||
_T_RestRequestParams_DELETE,
|
||||
RestRequestParams_RestResourceBase_GET,
|
||||
@@ -455,7 +533,7 @@ class ResourceHandler_RestResourceBase(
|
||||
def _check_resource_handler(cls, resource: _T_Resource, req: RestRequest) -> bool:
|
||||
# print(f"{cls.__name__}->_check_resource_handler()")
|
||||
|
||||
return isinstance(resource, RestResourceBase)
|
||||
return isinstance(resource, rest_resource.RestResourceBase)
|
||||
|
||||
def _check_access_rights(self) -> None:
|
||||
super()._check_access_rights()
|
||||
@@ -480,7 +558,7 @@ class ResourceHandler_RestResourceBase(
|
||||
if self.resource.model_fields[self.req.get_resource_origin(0)].exclude is True and self.req.get_verb() is rsrc_verb.GET:
|
||||
raise RestResourceHandlerException_ResourceNotFound(f"Not allowed READ access detected: {self.req.get_url_stack()}")
|
||||
|
||||
def _handle_process_get(self, params) -> RestResourceBase:
|
||||
def _handle_process_get(self, params) -> rest_resource.RestResourceBase:
|
||||
# print(f"{type(self).__name__}->_process_get()")
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
@@ -491,16 +569,16 @@ class ResourceHandler_RestResourceBase(
|
||||
|
||||
if len(self.req.get_url_stack()) == 0:
|
||||
self.resource.check_acl_self(self.req, None)
|
||||
for key, attr in self.resource.model_fields.items():
|
||||
for key, _ in self.resource.model_fields.items():
|
||||
if key in self.resource._plugins_:
|
||||
if issubclass(self.resource._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key](self.req, self.req.get_root_resource()))
|
||||
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
|
||||
plugin_field.set_context(self.req, self.req.get_root_resource())
|
||||
value = getattr(self.resource, key)
|
||||
setattr(self.resource, key, plugin_field.handle_field_get(value, params))
|
||||
elif issubclass(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
|
||||
plugin_resource = cast(
|
||||
ResourcePlugin_RestResourceBase, self.resource._plugins_[key](self.req, self.req.get_root_resource())
|
||||
)
|
||||
elif isinstance(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
|
||||
plugin_resource = cast(ResourcePlugin_RestResourceBase, self.resource._plugins_[key])
|
||||
plugin_resource.set_context(self.req, self.req.get_root_resource())
|
||||
value = getattr(self.resource, key)
|
||||
setattr(self.resource, key, plugin_resource.handle_resource_get(value, params))
|
||||
|
||||
@@ -509,7 +587,6 @@ class ResourceHandler_RestResourceBase(
|
||||
return self.resource
|
||||
|
||||
# CASE 2: specific (operation) case for root Node
|
||||
# TODO: this must probably be merged with the previous bloc
|
||||
if self.req.get_resource_origin(0) == "/":
|
||||
return self.resource
|
||||
|
||||
@@ -519,71 +596,96 @@ class ResourceHandler_RestResourceBase(
|
||||
|
||||
key = self.req.get_resource_origin(0)
|
||||
if key in self.resource._plugins_:
|
||||
if issubclass(self.resource._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field = cast(
|
||||
ResourcePlugin_field,
|
||||
self.resource._plugins_[key](self.req, self.req.get_root_resource()),
|
||||
)
|
||||
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
|
||||
plugin_field.set_context(self.req, self.req.get_root_resource())
|
||||
value = plugin_field.handle_field_get(value, params)
|
||||
|
||||
elif issubclass(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
|
||||
plugin_resource = cast(
|
||||
ResourcePlugin_RestResourceBase,
|
||||
self.resource._plugins_[key](self.req, self.req.get_root_resource()),
|
||||
)
|
||||
elif isinstance(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
|
||||
plugin_resource = cast(ResourcePlugin_RestResourceBase, self.resource._plugins_[key])
|
||||
plugin_resource.set_context(self.req, self.req.get_root_resource())
|
||||
value = plugin_resource.handle_resource_get(value, params)
|
||||
|
||||
return value
|
||||
|
||||
def _handle_process_put(self, params) -> None:
|
||||
def _handle_process_put(self, params) -> None: # pylint: disable=too-complex,too-many-branches
|
||||
# print(f"{type(self).__name__}->_process_put()")
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
assert self.prev_handler is not None
|
||||
assert isinstance(self.resource, rest_resource.RestResourceBase)
|
||||
|
||||
self.resource.check_acl_self(self.req, self.req.get_data())
|
||||
|
||||
# creating a copy of the current resource
|
||||
_new_resrc = self.resource.copy()
|
||||
# updating values based on nex data
|
||||
|
||||
# updating values based on new data
|
||||
_new_resrc.update(**self.req.get_data())
|
||||
|
||||
# applying plugins (to nested element)
|
||||
if isinstance(_new_resrc, RestResourceBase):
|
||||
for key, attr in _new_resrc.model_fields.items():
|
||||
if key in _new_resrc._plugins_:
|
||||
if issubclass(_new_resrc._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field: ResourcePlugin_field = cast(
|
||||
ResourcePlugin_field, _new_resrc._plugins_[key](self.req, self.req.get_root_resource())
|
||||
)
|
||||
value = getattr(_new_resrc, key)
|
||||
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
|
||||
for key, _ in _new_resrc.model_fields.items():
|
||||
if key in _new_resrc._plugins_:
|
||||
if isinstance(_new_resrc._plugins_[key], ResourcePlugin_field):
|
||||
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _new_resrc._plugins_[key])
|
||||
plugin_field.set_context(self.req, self.req.get_root_resource())
|
||||
value = getattr(_new_resrc, key)
|
||||
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
|
||||
|
||||
# applying plugins (from parent element)
|
||||
if self.prev_handler is not None:
|
||||
# element is within a dict
|
||||
if (
|
||||
isinstance(self.prev_handler.resource, dict)
|
||||
and self.prev_handler.prev_handler is not None
|
||||
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
|
||||
):
|
||||
key = self.req.get_resource_origin(2)
|
||||
if key in self.prev_handler.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(
|
||||
ResourcePlugin_dict,
|
||||
self.prev_handler.prev_handler.resource._plugins_[key](self.req, self.req.get_root_resource()),
|
||||
)
|
||||
_new_resrc = plugin_dict.handle_dict_elem_put(_new_resrc, params)
|
||||
# element is within a RestResourceBase
|
||||
elif isinstance(self.prev_handler.resource, RestResourceBase):
|
||||
key = self.req.get_resource_origin(1)
|
||||
if key in self.prev_handler.resource._plugins_:
|
||||
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
|
||||
ResourcePlugin_RestResourceBase,
|
||||
self.prev_handler.resource._plugins_[key](self.req, self.req.get_root_resource()),
|
||||
)
|
||||
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
|
||||
|
||||
self.resource.update(**_new_resrc.__dict__)
|
||||
return
|
||||
# element is within a dict
|
||||
if (
|
||||
isinstance(self.prev_handler.resource, dict)
|
||||
and self.prev_handler.prev_handler is not None
|
||||
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
):
|
||||
key = self.req.get_resource_origin(2)
|
||||
dict_key_type: T_T_DictKey = self.prev_handler.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
|
||||
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.prev_handler.resource)
|
||||
_dict_key: T_DictKey
|
||||
|
||||
if key in self.prev_handler.prev_handler.resource._plugins_:
|
||||
plugin_dict: ResourcePlugin_dict = cast(ResourcePlugin_dict, self.prev_handler.prev_handler.resource._plugins_[key])
|
||||
plugin_dict.set_context(self.req, self.req.get_root_resource())
|
||||
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
|
||||
_dict_key = key_byte
|
||||
else:
|
||||
key_std = dict_key_type(self.req.get_resource_origin(1))
|
||||
_dict_key = key_std
|
||||
plugin_dict.handle_dict_elem_put(_dict, _dict_key, _new_resrc, params)
|
||||
else:
|
||||
if issubclass(dict_key_type, bytes):
|
||||
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
|
||||
_dict_key = key_byte
|
||||
|
||||
else:
|
||||
key_std = dict_key_type(self.req.get_resource_origin(1))
|
||||
_dict_key = key_std
|
||||
if _dict_key not in _dict:
|
||||
raise RuntimeError(f"Key not found: {str(_dict_key)}")
|
||||
_dict[_dict_key] = _new_resrc
|
||||
|
||||
# element is within a RestResourceBase
|
||||
elif isinstance(self.prev_handler.resource, rest_resource.RestResourceBase):
|
||||
key = self.req.get_resource_origin(1)
|
||||
if key in self.prev_handler.resource._plugins_:
|
||||
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
|
||||
ResourcePlugin_RestResourceBase, self.prev_handler.resource._plugins_[key]
|
||||
)
|
||||
plugin_rsrc.set_context(self.req, self.req.get_root_resource())
|
||||
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
|
||||
else:
|
||||
self.resource.update(**_new_resrc.__dict__)
|
||||
else:
|
||||
raise RuntimeError("unsupported operation")
|
||||
|
||||
# print("***************")
|
||||
# print(self.resource)
|
||||
# print(_new_resrc)
|
||||
# print(_new_resrc.__dict__)
|
||||
|
||||
def _handle_process_delete(self, params) -> None:
|
||||
# print(f"{type(self).__name__}->_handle_process_delete()")
|
||||
@@ -594,7 +696,7 @@ class ResourceHandler_RestResourceBase(
|
||||
self.prev_handler is not None
|
||||
and isinstance(self.prev_handler.resource, dict)
|
||||
and self.prev_handler.prev_handler is not None
|
||||
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
|
||||
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
):
|
||||
self.prev_handler._process_delete()
|
||||
else:
|
||||
@@ -622,15 +724,15 @@ class ResourceHandler_simple(
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
assert self.prev_handler is not None
|
||||
assert isinstance(self.prev_handler.resource, RestResourceBase)
|
||||
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
|
||||
self.prev_handler.resource.check_acl_field(self.req, 1)
|
||||
|
||||
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
plugin_simple: ResourcePlugin_field = cast(
|
||||
ResourcePlugin_field,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)](self.req, self.req.get_root_resource()),
|
||||
ResourcePlugin_field, self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)]
|
||||
)
|
||||
plugin_simple.set_context(self.req, self.req.get_root_resource())
|
||||
return plugin_simple.handle_field_get(self.resource, params)
|
||||
|
||||
return self.resource
|
||||
@@ -640,7 +742,7 @@ class ResourceHandler_simple(
|
||||
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
|
||||
|
||||
assert self.prev_handler is not None
|
||||
assert isinstance(self.prev_handler.resource, RestResourceBase)
|
||||
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
|
||||
|
||||
self.prev_handler.resource.check_acl_field(self.req, 1)
|
||||
|
||||
@@ -649,9 +751,9 @@ class ResourceHandler_simple(
|
||||
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
|
||||
# print("PLUGIN FOUND")
|
||||
plugin_simple: ResourcePlugin_field = cast(
|
||||
ResourcePlugin_field,
|
||||
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)](self.req, self.req.get_root_resource()),
|
||||
ResourcePlugin_field, self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)]
|
||||
)
|
||||
plugin_simple.set_context(self.req, self.req.get_root_resource())
|
||||
# print(value)
|
||||
value = plugin_simple.handle_field_put(value, params)
|
||||
# print(value)
|
||||
|
||||
@@ -1,83 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
# pyrestresource(c) by chacha
|
||||
#
|
||||
# pyrestresource is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
# pylint: disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
|
||||
|
||||
"""CLI interface module"""
|
||||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .rest_resource_walker import (
|
||||
RestResourceWalkerFutureResult,
|
||||
RestResourceWalker_Root,
|
||||
RestResourceWalker_Sub_T_Dict,
|
||||
RestResourceWalker_Sub_RestFields,
|
||||
RestResourceWalker_Sub_RestResourceBase,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
from typing import Optional, Any
|
||||
|
||||
|
||||
class RestResourceWalkerFutureResult_RestResourceBase_handler(RestResourceWalkerFutureResult[dict]):
|
||||
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
|
||||
# print(f"RestResourceWalkerFutureResult_RestResourceBase_handler {result}")
|
||||
res: dict[str, Any] = {}
|
||||
res[self.source.resource_name] = {}
|
||||
if result:
|
||||
for subres in result:
|
||||
key = next(iter(subres))
|
||||
print(key)
|
||||
res[self.source.resource_name] = res[self.source.resource_name] | subres
|
||||
return res
|
||||
|
||||
|
||||
class RestResourceWalkerFutureResult_Dict_handler(RestResourceWalkerFutureResult[dict]):
|
||||
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
|
||||
# print(f"RestResourceWalkerFutureResult_Dict_handler {result}")
|
||||
res: dict[str, Any] = {}
|
||||
if result:
|
||||
for subres in result:
|
||||
res = res | subres
|
||||
return res
|
||||
|
||||
|
||||
class RestResourceWalkerFutureResult_RestFields_handler(RestResourceWalkerFutureResult[dict]):
|
||||
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
|
||||
# print(f"RestResourceWalkerFutureResult_RestFields_handler {result}")
|
||||
# print(self.source.resource)
|
||||
res: dict[str, Any] = {}
|
||||
res[self.source.resource_name] = {}
|
||||
if result:
|
||||
for subres in result:
|
||||
key = next(iter(subres))
|
||||
print(key)
|
||||
res[self.source.resource_name] = res[self.source.resource_name] | subres
|
||||
return res
|
||||
|
||||
|
||||
class RestResourceWalker_Sub_T_Dict__handler(RestResourceWalker_Sub_T_Dict):
|
||||
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_Dict_handler
|
||||
|
||||
|
||||
class RestResourceWalker_Sub_RestResourceBase__handler(RestResourceWalker_Sub_RestResourceBase):
|
||||
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_RestResourceBase_handler
|
||||
|
||||
|
||||
class RestResourceWalker_Sub_RestResourceFields__handler(RestResourceWalker_Sub_RestFields):
|
||||
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_RestFields_handler
|
||||
|
||||
|
||||
class RestResourceWalker_Root__handler(RestResourceWalker_Root):
|
||||
cls_RestResourceWalker_Sub = [
|
||||
RestResourceWalker_Sub_T_Dict__handler,
|
||||
RestResourceWalker_Sub_RestResourceFields__handler,
|
||||
RestResourceWalker_Sub_RestResourceBase__handler,
|
||||
]
|
||||
83
src/pyrestresource/rest_resource_login.py
Normal file
83
src/pyrestresource/rest_resource_login.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from __future__ import annotations
|
||||
from typing import ClassVar, TYPE_CHECKING
|
||||
|
||||
from secrets import token_hex, compare_digest
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
from .rest_resource import RestResourceBase
|
||||
from .rest_model import RestField
|
||||
from .rest_ACL import ACL_target_user
|
||||
from .rest_resource_plugin_login import ResourcePlugin_Login
|
||||
from .rest_login import UserSession, Login
|
||||
|
||||
from .rest_exceptions import (
|
||||
RestResourceLoginException_InvalidCredentials,
|
||||
RestResourceLoginException_ClientChange,
|
||||
RestResourceLoginException_SessionTimeout,
|
||||
RestResourceLoginException_InvalidSession,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .rest_request import RestRequest
|
||||
from .rest_login import UserLogin
|
||||
|
||||
|
||||
class RestResourceBaseLogin(RestResourceBase):
|
||||
_ar_user_login: ClassVar[list[UserLogin]] = []
|
||||
_ar_user_session: dict[str, UserSession] = {}
|
||||
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
|
||||
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
|
||||
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
|
||||
|
||||
def get_new_cookie_expiration_date(self) -> datetime:
|
||||
return datetime.now() + self._max_session_time
|
||||
|
||||
def _process_request_session(self, request: RestRequest) -> None:
|
||||
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
|
||||
# print(f"[TRACE] request: {id(request)}")
|
||||
auth_cookie = request.get_cookie("Authorization")
|
||||
if auth_cookie is not None:
|
||||
if auth_cookie in self._ar_user_session:
|
||||
# print(f"SESSION FOUND for {request.get_client()}")
|
||||
|
||||
if self._ar_user_session[auth_cookie].client != request.get_client():
|
||||
del self._ar_user_session[auth_cookie]
|
||||
raise RestResourceLoginException_ClientChange()
|
||||
|
||||
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
|
||||
if time_diff > self._max_session_inactive:
|
||||
del self._ar_user_session[auth_cookie]
|
||||
raise RestResourceLoginException_SessionTimeout()
|
||||
|
||||
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
|
||||
# print("SESSION RECOVERED")
|
||||
return
|
||||
|
||||
raise RestResourceLoginException_InvalidSession()
|
||||
|
||||
# print(f"non-connected user {request.get_client()}")
|
||||
return
|
||||
|
||||
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
|
||||
already_failed: bool = False
|
||||
|
||||
for iter_user_login in self._ar_user_login:
|
||||
username_ok: bool = compare_digest(user_name, iter_user_login.username)
|
||||
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
|
||||
|
||||
if username_ok is True:
|
||||
if secret_ok is True and not already_failed:
|
||||
return self._register_user_session(iter_user_login, request)
|
||||
already_failed = True
|
||||
else:
|
||||
pass # pylint: disable=unnecessary-pass
|
||||
pass # pylint: disable=unnecessary-pass
|
||||
|
||||
raise RestResourceLoginException_InvalidCredentials()
|
||||
|
||||
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
|
||||
token = token_hex(16)
|
||||
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
|
||||
self._ar_user_session[f"Bearer {token}"] = new_user_session
|
||||
return token
|
||||
@@ -12,8 +12,7 @@ from .rest_types import (
|
||||
)
|
||||
from .rest_exceptions import RestResourceConfigException
|
||||
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from .rest_request import RestRequest
|
||||
from .rest_resource import RestResourceBase
|
||||
from .rest_request_opt import (
|
||||
@@ -30,12 +29,16 @@ if TYPE_CHECKING is True:
|
||||
|
||||
|
||||
class ResourcePlugin(ABC):
|
||||
def __init__(self, request: RestRequest, root_resource: RestResourceBase) -> None:
|
||||
self.__request: RestRequest = request
|
||||
self.__root_resource: RestResourceBase = root_resource
|
||||
def __init__(self) -> None:
|
||||
self.__request: RestRequest
|
||||
self.__root_resource: RestResourceBase
|
||||
|
||||
def set_context(self, request: RestRequest, root_resource: RestResourceBase) -> None:
|
||||
self.__request = request
|
||||
self.__root_resource = root_resource
|
||||
|
||||
def user_login(self, user_name: str, user_secret: str) -> str:
|
||||
from .rest_login import RestResourceBaseLogin
|
||||
from .rest_resource_login import RestResourceBaseLogin
|
||||
|
||||
if not isinstance(self.__root_resource, RestResourceBaseLogin):
|
||||
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
|
||||
@@ -51,7 +54,7 @@ class ResourcePlugin(ABC):
|
||||
self.__request.reset_resp_cookie(key)
|
||||
|
||||
def get_new_cookie_expiration_date(self) -> datetime:
|
||||
from .rest_login import RestResourceBaseLogin
|
||||
from .rest_resource_login import RestResourceBaseLogin
|
||||
|
||||
if not isinstance(self.__root_resource, RestResourceBaseLogin):
|
||||
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use get_new_cookie_expiration_date")
|
||||
@@ -130,6 +133,7 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
|
||||
def handle_dict_post(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
resource: _T_DictValues,
|
||||
params: RestRequestParams_Dict_POST[_T_DictKey],
|
||||
) -> Optional[_T_DictKey]:
|
||||
@@ -137,6 +141,15 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
|
||||
|
||||
@abstractmethod
|
||||
def handle_dict_delete(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
params: RestRequestParams_Dict_DELETE[_T_DictKey],
|
||||
) -> None:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def handle_dict_delete_all(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
params: RestRequestParams_Dict_DELETE[_T_DictKey],
|
||||
@@ -146,17 +159,20 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
|
||||
@abstractmethod
|
||||
def handle_dict_elem_get(
|
||||
self,
|
||||
resource: TV_RestResourceBase,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
params: RestRequestParams_Dict_elem_GET,
|
||||
) -> TV_RestResourceBase:
|
||||
) -> _T_DictValues:
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def handle_dict_elem_put(
|
||||
self,
|
||||
resource: TV_RestResourceBase,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
resource: _T_DictValues,
|
||||
params: RestRequestParams_Dict_elem_PUT,
|
||||
) -> TV_RestResourceBase:
|
||||
) -> None:
|
||||
...
|
||||
|
||||
|
||||
@@ -173,31 +189,43 @@ class ResourcePlugin_dict_default(ResourcePlugin_dict[_T_DictKey, _T_DictValues]
|
||||
def handle_dict_post(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
resource: _T_DictValues,
|
||||
params: RestRequestParams_Dict_POST[_T_DictKey],
|
||||
) -> Optional[_T_DictKey]:
|
||||
if params.API_key is not None:
|
||||
resource_dict[params.API_key] = resource
|
||||
return params.API_key
|
||||
resource_dict[key] = resource
|
||||
return key
|
||||
|
||||
def handle_dict_delete(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
params: RestRequestParams_Dict_DELETE[_T_DictKey],
|
||||
) -> None:
|
||||
if params.API_key is not None:
|
||||
del resource_dict[params.API_key]
|
||||
del resource_dict[key]
|
||||
|
||||
def handle_dict_delete_all(
|
||||
self,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
params: RestRequestParams_Dict_DELETE[_T_DictKey],
|
||||
) -> None:
|
||||
resource_dict.clear()
|
||||
|
||||
def handle_dict_elem_get(
|
||||
self,
|
||||
resource: TV_RestResourceBase,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
params: RestRequestParams_Dict_elem_GET,
|
||||
) -> TV_RestResourceBase:
|
||||
return resource
|
||||
) -> _T_DictValues:
|
||||
return resource_dict[key]
|
||||
|
||||
def handle_dict_elem_put(
|
||||
self,
|
||||
resource: TV_RestResourceBase,
|
||||
resource_dict: dict[_T_DictKey, _T_DictValues],
|
||||
key: _T_DictKey,
|
||||
resource: _T_DictValues,
|
||||
params: RestRequestParams_Dict_elem_PUT,
|
||||
) -> TV_RestResourceBase:
|
||||
return resource
|
||||
) -> None:
|
||||
if key not in resource_dict:
|
||||
raise RuntimeError(f"Key not found: {str(key)}")
|
||||
resource_dict[key] = resource
|
||||
|
||||
24
src/pyrestresource/rest_resource_plugin_login.py
Normal file
24
src/pyrestresource/rest_resource_plugin_login.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
|
||||
from .rest_login import UserLogin, Login
|
||||
|
||||
from .rest_exceptions import RestResourceLoginException_InvalidCredentials
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
|
||||
|
||||
|
||||
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
|
||||
ar_UserLogin: list[UserLogin] = []
|
||||
|
||||
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
|
||||
return Login(username=self.get_user_login(), secret=None)
|
||||
|
||||
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
|
||||
if resource.username is None or resource.secret is None:
|
||||
raise RestResourceLoginException_InvalidCredentials()
|
||||
token = self.user_login(resource.username, resource.secret)
|
||||
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
|
||||
return resource
|
||||
@@ -1,3 +1,5 @@
|
||||
# pylint: disable=protected-access
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import (
|
||||
cast,
|
||||
@@ -6,7 +8,6 @@ from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic.fields import FieldInfo
|
||||
|
||||
from .rest_resource import RestResourceBase
|
||||
@@ -29,8 +30,8 @@ from .rest_ACL import (
|
||||
)
|
||||
from .rest_exceptions import RestResourcePluginException_InvalidPluginSignature, RestResourceModelException, RestResourceModelException_ACL
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
pass
|
||||
if TYPE_CHECKING:
|
||||
...
|
||||
|
||||
|
||||
class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
|
||||
@@ -41,7 +42,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
|
||||
if not get_origin(datatype[1]) is None:
|
||||
raise RestResourceModelException("complex dict types are not supported (should create a RestResourceBase container)")
|
||||
if not datatype[0] in _T_SupportedRESTFields:
|
||||
raise RestResourceModelException(f"Unsupported Dict Field value type in class (key)")
|
||||
raise RestResourceModelException("Unsupported Dict Field value type in class (key)")
|
||||
|
||||
# preprocessing types / structure
|
||||
if self.parent is not None and isinstance(self.parent, RestResourceWalker_Sub_RestResourceBase):
|
||||
@@ -62,13 +63,13 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
|
||||
if (
|
||||
isinstance(self.resource, FieldInfo)
|
||||
and self.resource.json_schema_extra is not None
|
||||
and type(self.resource.json_schema_extra) is dict
|
||||
and isinstance(self.resource.json_schema_extra, dict)
|
||||
):
|
||||
if "plugin" in self.resource.json_schema_extra:
|
||||
plugin_dict: type[ResourcePlugin_dict] = self.resource.json_schema_extra["plugin"]
|
||||
if not issubclass(plugin_dict, ResourcePlugin_dict):
|
||||
raise RestResourcePluginException_InvalidPluginSignature()
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_dict
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_dict()
|
||||
# print("ADD DICT PLUGIN")
|
||||
|
||||
if "ACL" in self.resource.json_schema_extra:
|
||||
@@ -97,7 +98,7 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
|
||||
if (
|
||||
isinstance(self.resource, FieldInfo)
|
||||
and self.resource.json_schema_extra is not None
|
||||
and type(self.resource.json_schema_extra) is dict
|
||||
and isinstance(self.resource.json_schema_extra, dict)
|
||||
):
|
||||
# print("aaaaaaaaaa")
|
||||
|
||||
@@ -115,7 +116,7 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
|
||||
plugin_field: type[ResourcePlugin_field] = self.resource.json_schema_extra["plugin"]
|
||||
if not issubclass(plugin_field, ResourcePlugin_field):
|
||||
raise RestResourcePluginException_InvalidPluginSignature()
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_field
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_field()
|
||||
# print("ADD FIELD PLUGIN")
|
||||
|
||||
if "ACL" in self.resource.json_schema_extra:
|
||||
@@ -152,13 +153,13 @@ class RestResourceWalker_Sub_RestResourceBase__tree_init(RestResourceWalker_Sub_
|
||||
if (
|
||||
isinstance(self.resource, FieldInfo)
|
||||
and self.resource.json_schema_extra is not None
|
||||
and type(self.resource.json_schema_extra) is dict
|
||||
and isinstance(self.resource.json_schema_extra, dict)
|
||||
):
|
||||
if "plugin" in self.resource.json_schema_extra:
|
||||
plugin_resource: type[ResourcePlugin_RestResourceBase] = self.resource.json_schema_extra["plugin"]
|
||||
if not issubclass(plugin_resource, ResourcePlugin_RestResourceBase):
|
||||
raise RestResourcePluginException_InvalidPluginSignature()
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_resource
|
||||
self.parent.annotation._plugins_[self.resource_name] = plugin_resource()
|
||||
# print("ADD RESOURCE PLUGIN")
|
||||
|
||||
if "ACL" in self.resource.json_schema_extra:
|
||||
|
||||
@@ -5,7 +5,6 @@ from typing import (
|
||||
get_args,
|
||||
get_origin,
|
||||
TypeVar,
|
||||
Type,
|
||||
Generic,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
@@ -17,7 +16,7 @@ from .rest_types import _T_SupportedRESTFields
|
||||
from .rest_resource import RestResourceBase
|
||||
from .rest_exceptions import RestResourceModelException
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, Optional
|
||||
|
||||
TV_RestResourceWalkerFutureResult = TypeVar("TV_RestResourceWalkerFutureResult")
|
||||
@@ -36,22 +35,22 @@ class RestResourceWalkerFutureResult(ABC, Generic[TV_RestResourceWalkerFutureRes
|
||||
|
||||
|
||||
class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
# pylint: disable=too-many-instance-attributes
|
||||
cls_RestResourceWalkerFutureResult: Optional[type[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
|
||||
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
|
||||
"""implementation interface to Factory.
|
||||
The factory will call this specialized method on each implementation to find a supported one.
|
||||
"""
|
||||
...
|
||||
|
||||
@classmethod
|
||||
def get(
|
||||
self,
|
||||
cls,
|
||||
subs: list[type[RestResourceWalker_Sub]],
|
||||
resource_name: str,
|
||||
resource: FieldInfo | Type[RestResourceBase],
|
||||
resource: FieldInfo | type[RestResourceBase],
|
||||
parent: Optional[RestResourceWalker_Sub] = None,
|
||||
argument: Optional[Any] = None,
|
||||
) -> Optional[RestResourceWalker_Sub]:
|
||||
@@ -61,12 +60,11 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
if _is_valid is True:
|
||||
return sub(resource_name, resource, parent, _anno, _optional, argument)
|
||||
raise RestResourceModelException(f"Incompatible Field Found: {type(resource).__name__}")
|
||||
return None
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
resource_name: str,
|
||||
resource: FieldInfo | Type[RestResourceBase],
|
||||
resource: FieldInfo | type[RestResourceBase],
|
||||
parent: Optional[RestResourceWalker_Sub] = None,
|
||||
annotation: Optional[type[RestResourceBase]] = None,
|
||||
_optional: Optional[bool] = None,
|
||||
@@ -74,7 +72,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
):
|
||||
self.argument: Any = argument
|
||||
self.resource_name: str = resource_name
|
||||
self.resource: FieldInfo | Type[RestResourceBase] = resource
|
||||
self.resource: FieldInfo | type[RestResourceBase] = resource
|
||||
self.parent: Optional[RestResourceWalker_Sub] = parent
|
||||
|
||||
self.future_results_subs: Optional[list[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
|
||||
@@ -102,7 +100,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
|
||||
def chain_process_future(self) -> Optional[TV_RestResourceWalkerFutureResult]:
|
||||
if self.future_results_subs is not None and self.future_result is not None:
|
||||
return_future_results_subs: list[Any] = [] # TODO: use typevar
|
||||
return_future_results_subs: list[Any] = []
|
||||
for future_result in self.future_results_subs:
|
||||
return_future_results_subs.append(future_result.chain_process_future())
|
||||
return self.future_result.process_future(return_future_results_subs)
|
||||
@@ -124,11 +122,11 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
|
||||
@staticmethod
|
||||
def ProcessAnnotation(
|
||||
resource: FieldInfo | Type[RestResourceBase],
|
||||
resource: FieldInfo | type[RestResourceBase],
|
||||
) -> tuple[type[Any], bool]:
|
||||
# from .rest_resource import RestResourceBase
|
||||
|
||||
_anno: Type[Any]
|
||||
_anno: type[Any]
|
||||
|
||||
# print("!!!!!!!!!!!!!!!!!!!!!!!")
|
||||
# print(resource)
|
||||
@@ -159,7 +157,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
|
||||
|
||||
class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
|
||||
@classmethod
|
||||
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
|
||||
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
|
||||
_anno, _optional = cls.ProcessAnnotation(resource)
|
||||
_type_resource = get_origin(_anno)
|
||||
return (_type_resource is dict), _anno, _optional
|
||||
@@ -175,7 +173,7 @@ class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
|
||||
|
||||
class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
|
||||
@classmethod
|
||||
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
|
||||
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
|
||||
_anno, _optional = cls.ProcessAnnotation(resource)
|
||||
return (_anno in _T_SupportedRESTFields), _anno, _optional
|
||||
|
||||
@@ -185,7 +183,7 @@ class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
|
||||
|
||||
class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
|
||||
@classmethod
|
||||
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
|
||||
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
|
||||
_anno, _optional = cls.ProcessAnnotation(resource)
|
||||
return (
|
||||
((get_origin(_anno) is None) and issubclass(_anno, RestResourceBase)),
|
||||
@@ -201,15 +199,15 @@ class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
|
||||
|
||||
|
||||
class RestResourceWalker_Root:
|
||||
cls_RestResourceWalker_Sub: list[Type[RestResourceWalker_Sub]] = [
|
||||
cls_RestResourceWalker_Sub: list[type[RestResourceWalker_Sub]] = [
|
||||
RestResourceWalker_Sub_T_Dict,
|
||||
RestResourceWalker_Sub_RestFields,
|
||||
RestResourceWalker_Sub_RestResourceBase,
|
||||
]
|
||||
|
||||
def __init__(self, resource: RestResourceBase | Type[RestResourceBase]) -> None:
|
||||
def __init__(self, resource: RestResourceBase | type[RestResourceBase]) -> None:
|
||||
self.subwalker_argument: Any = None
|
||||
self.resource: Type[RestResourceBase]
|
||||
self.resource: type[RestResourceBase]
|
||||
if isinstance(resource, RestResourceBase):
|
||||
self.resource = type(resource)
|
||||
else:
|
||||
@@ -225,7 +223,7 @@ class RestResourceWalker_Root:
|
||||
if sub_walker_initial is not None:
|
||||
sub_walker_initial.process()
|
||||
sub_walker_initial.get_future()
|
||||
resource_list: list[tuple[str, FieldInfo | Type[RestResourceBase], RestResourceWalker_Sub]] = [
|
||||
resource_list: list[tuple[str, FieldInfo | type[RestResourceBase], RestResourceWalker_Sub]] = [
|
||||
(subresource_name, subresource, sub_walker_initial)
|
||||
for subresource_name, subresource in sub_walker_initial.get_sub_resources()
|
||||
]
|
||||
@@ -252,5 +250,4 @@ class RestResourceWalker_Root:
|
||||
resource_list = list(new_resource_list)
|
||||
current_deep = current_deep + 1
|
||||
return sub_walker_initial.chain_process_future()
|
||||
else:
|
||||
raise RestResourceModelException("Invalid Rootpoint")
|
||||
raise RestResourceModelException("Invalid Rootpoint")
|
||||
|
||||
@@ -8,7 +8,7 @@ from pathlib import Path
|
||||
from uuid import UUID
|
||||
from ipaddress import IPv4Address, IPv4Network
|
||||
|
||||
if TYPE_CHECKING is True:
|
||||
if TYPE_CHECKING:
|
||||
from .rest_resource import RestResourceBase
|
||||
|
||||
T_Gen_DictKeys: type = type({}.keys())
|
||||
@@ -91,7 +91,7 @@ _T_DictValues = TypeVar(
|
||||
NoneType,
|
||||
)
|
||||
|
||||
T_T_FieldValue = type(T_FieldValue)
|
||||
T_T_FieldValue = type[T_FieldValue]
|
||||
T_T_DictValues = type[T_DictValues]
|
||||
|
||||
T_Dict = dict[T_DictKey, T_DictValues]
|
||||
|
||||
@@ -4,12 +4,12 @@ from os import chdir
|
||||
from pathlib import Path
|
||||
from typing import Optional, ClassVar
|
||||
from time import sleep
|
||||
import uvicorn
|
||||
import socket
|
||||
import requests
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process
|
||||
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
import uvicorn
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
|
||||
@@ -8,10 +8,6 @@ from uuid import UUID, uuid4
|
||||
from time import time
|
||||
import json
|
||||
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
RestResourceHandlerException_Forbiden,
|
||||
|
||||
193
test/test_rest_resource_plugins_dict.py
Normal file
193
test/test_rest_resource_plugins_dict.py
Normal file
@@ -0,0 +1,193 @@
|
||||
from __future__ import annotations
|
||||
import unittest
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
register_rest_rootpoint,
|
||||
RestResourceBase,
|
||||
rsrc_verb,
|
||||
RestRequestParams_GET,
|
||||
RestRequestParams_POST,
|
||||
RestRequestParams_Dict_GET,
|
||||
RestRequestParams_PUT,
|
||||
RestRequestParams_Dict_elem_GET,
|
||||
RestRequestParams_Dict_elem_PUT,
|
||||
RestRequestParams_Dict_DELETE,
|
||||
RestRequestParams_Dict_POST,
|
||||
T_SupportedRESTFields,
|
||||
ResourcePlugin_dict_default,
|
||||
)
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
|
||||
# to allow mock-ing, all the tested classes are in a function
|
||||
def init_classes():
|
||||
class Test_Record(RestResourceBase):
|
||||
test_str: str
|
||||
test_int: int
|
||||
|
||||
class ResourcePlugin_dict_Test_Record(ResourcePlugin_dict_default[str, Test_Record]):
|
||||
static_Test_Record_active: bool = True
|
||||
static_Test_Record = Test_Record(test_str="mytest", test_int=84)
|
||||
|
||||
def handle_dict_get_keys(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
params: RestRequestParams_Dict_GET,
|
||||
) -> list[str]:
|
||||
result = super().handle_dict_get_keys(resource_dict, params)
|
||||
if self.static_Test_Record_active is True:
|
||||
result.append("static_elem")
|
||||
return result
|
||||
|
||||
def handle_dict_elem_get(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
key: str,
|
||||
params: RestRequestParams_Dict_elem_GET,
|
||||
) -> Test_Record:
|
||||
if key == "static_elem":
|
||||
if self.static_Test_Record_active is True:
|
||||
return self.static_Test_Record
|
||||
else:
|
||||
raise RuntimeError("Key Not Found")
|
||||
return super().handle_dict_elem_get(resource_dict, key, params)
|
||||
|
||||
def handle_dict_delete(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
key: str,
|
||||
params: RestRequestParams_Dict_DELETE[str],
|
||||
) -> None:
|
||||
if key == "static_elem":
|
||||
self.static_Test_Record_active = False
|
||||
else:
|
||||
super().handle_dict_delete(resource_dict, key, params)
|
||||
|
||||
def handle_dict_delete_all(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
params: RestRequestParams_Dict_DELETE[str],
|
||||
) -> None:
|
||||
self.static_Test_Record_active = False
|
||||
super().handle_dict_delete_all(resource_dict, params)
|
||||
|
||||
def handle_dict_elem_put(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
key: str,
|
||||
resource: Test_Record,
|
||||
params: RestRequestParams_Dict_elem_PUT,
|
||||
) -> None:
|
||||
if key == "static_elem":
|
||||
if self.static_Test_Record_active is True:
|
||||
self.static_Test_Record = resource
|
||||
else:
|
||||
raise RuntimeError("Key Not Found")
|
||||
else:
|
||||
super().handle_dict_elem_put(resource_dict, key, resource, params)
|
||||
|
||||
def handle_dict_post(
|
||||
self,
|
||||
resource_dict: dict[str, Test_Record],
|
||||
key: str,
|
||||
resource: Test_Record,
|
||||
params: RestRequestParams_Dict_POST[str],
|
||||
) -> Optional[str]:
|
||||
resource.test_int = resource.test_int + 1
|
||||
return super().handle_dict_post(resource_dict, key, resource, params)
|
||||
|
||||
@register_rest_rootpoint
|
||||
class RootApp(RestResourceBase):
|
||||
str_dict_Test_Record: dict[str, Test_Record] = RestField(
|
||||
default={"test": Test_Record(test_str="Hi", test_int=42)},
|
||||
plugin=ResourcePlugin_dict_Test_Record,
|
||||
)
|
||||
|
||||
# this add the classes to globals to allow using them later on
|
||||
# => this is only for uinit-testing purpose and is not needed in real use
|
||||
|
||||
globals()[Test_Record.__name__] = Test_Record
|
||||
globals()[RootApp.__name__] = RootApp
|
||||
|
||||
|
||||
class Test_RestAPI_Plugin_Dict(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
chdir(testdir_path.parent.resolve())
|
||||
init_classes()
|
||||
self.testapp = RootApp()
|
||||
|
||||
def test_get_root(self):
|
||||
result = self.testapp.process_request("/", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), "{}")
|
||||
# print(result.get_result())
|
||||
|
||||
def test_get_dict_keys(self):
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '["test", "static_elem"]')
|
||||
|
||||
def test_get_dict_elems(self):
|
||||
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '{"test_str": "Hi", "test_int": 42}')
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '{"test_str": "mytest", "test_int": 84}')
|
||||
|
||||
def test_delete_dict_elems(self):
|
||||
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '["static_elem"]')
|
||||
|
||||
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), "[]")
|
||||
|
||||
def test_delete_all_dict_elems(self):
|
||||
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '["static_elem"]')
|
||||
|
||||
self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), "[]")
|
||||
|
||||
def test_delete_dict_elems_API_key(self):
|
||||
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '["static_elem"]')
|
||||
|
||||
self.testapp.process_request("/str_dict_Test_Record?API_key=static_elem", rsrc_verb.DELETE)
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), "[]")
|
||||
|
||||
def test_put_dict_elem(self):
|
||||
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.PUT, '{"test_str": "Hi", "test_int": 43}')
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '{"test_str": "Hi", "test_int": 43}')
|
||||
|
||||
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.PUT, '{"test_str": "Hi you", "test_int": 7}')
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '{"test_str": "Hi you", "test_int": 7}')
|
||||
|
||||
def test_post_dict_elem(self):
|
||||
result = self.testapp.process_request("/str_dict_Test_Record?API_key=newval", rsrc_verb.POST, '{"test_str": "Hi2", "test_int": 77}')
|
||||
self.assertEqual(result.get_result(), '"newval"')
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record/newval", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '{"test_str": "Hi2", "test_int": 78}')
|
||||
|
||||
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
|
||||
self.assertEqual(result.get_result(), '["test", "newval", "static_elem"]')
|
||||
@@ -1,16 +1,11 @@
|
||||
from __future__ import annotations
|
||||
import unittest
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
from io import StringIO
|
||||
from contextlib import redirect_stdout
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
RestResourceBase,
|
||||
|
||||
@@ -1,15 +1,9 @@
|
||||
from __future__ import annotations
|
||||
import unittest
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
RestResourceBase,
|
||||
|
||||
@@ -1,22 +1,16 @@
|
||||
from __future__ import annotations
|
||||
import unittest
|
||||
from unittest.mock import patch
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from uuid import UUID, uuid4
|
||||
from time import time, sleep
|
||||
import json
|
||||
import uvicorn
|
||||
import socket
|
||||
import requests
|
||||
from contextlib import closing
|
||||
from multiprocessing import Process
|
||||
from requests.adapters import HTTPAdapter
|
||||
import coverage
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
import requests
|
||||
from requests.adapters import HTTPAdapter
|
||||
import uvicorn
|
||||
|
||||
from src.pyrestresource import (
|
||||
RestField,
|
||||
@@ -28,7 +22,6 @@ from src.pyrestresource import (
|
||||
RestRequestParams_Dict_GET,
|
||||
T_SupportedRESTFields,
|
||||
)
|
||||
from pprint import pprint
|
||||
|
||||
from test import ThreadedUvicorn
|
||||
|
||||
|
||||
Reference in New Issue
Block a user