17 Commits

Author SHA1 Message Date
7625461929 Update Jenkinsfile 2024-10-12 16:43:41 +02:00
cclecle
8ba0ab5f5e chore: wip 2024-04-12 22:56:27 +01:00
cclecle
c05b541148 improve code quality 2023-11-11 17:17:35 +00:00
cclecle
afaebec633 complete base dict plugin + unittest 2023-11-11 15:04:47 +00:00
cclecle
f677f7bf28 fix dict delete + plugin 2023-11-11 12:09:03 +00:00
cclecle
6cc6056220 start unittesting dict & add some fixes
allocate plugin at app creation, not at call to gain time and keep
context.
2023-11-11 11:49:51 +00:00
cclecle
de71a19956 fix import order in unittest 2023-11-11 10:03:01 +00:00
cclecle
3b358ab49c remove unused resource_handler_walker
uniform all if TYPE_CHECKING
ignore TYPE_CHECKING in coverage
2023-11-11 09:01:38 +00:00
cclecle
6311d90a2d update jenkins & toml from project template 2023-11-06 14:56:46 +00:00
cclecle
7e13d49feb remove unused data dir from toml 2023-11-06 13:57:00 +00:00
cclecle
9b3e847908 use threaded uvicorn during test 2023-11-06 13:50:34 +00:00
cclecle
3afebdba33 temporary disable PERF tests 2023-11-06 11:25:49 +00:00
cclecle
0ec875e497 add parallel coverage option 2023-11-06 10:35:05 +00:00
cclecle
d0e146ac76 try to fix toml / coverage 2023-11-06 09:33:55 +00:00
cclecle
4af812cf80 enable multiprocessing on coverage 2023-11-06 09:27:53 +00:00
cclecle
d58173f07b fix typing issues 2023-11-06 01:11:43 +00:00
cclecle
04ef407a6f typo fix 2023-11-05 22:21:07 +00:00
33 changed files with 1011 additions and 584 deletions

View File

@@ -3,5 +3,4 @@ encoding//src/pyrestresource/__init__.py=utf-8
encoding//src/pyrestresource/__metadata__.py=utf-8
encoding//src/pyrestresource/rest_login.py=utf-8
encoding//src/pyrestresource/rest_resource.py=utf-8
encoding//src/pyrestresource/rest_resource_handler_walker.py=utf-8
encoding/<project>=UTF-8

18
Jenkinsfile vendored
View File

@@ -184,7 +184,7 @@ pipeline {
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
@@ -426,9 +426,17 @@ pipeline {
}
post {
always {
dir("gitrepo") {
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_types_check/cobertura.xml")]
junit 'helpers-results/cl_types_check/junit.xml'
dir("gitrepo") {
//publish coverage
recordCoverage( sourceDirectories: [[path: 'src']],
tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_types_check/cobertura.xml']],
id: 'COBERTURA', name: 'COBERTURA Coverage',
sourceCodeRetention: 'EVERY_BUILD',)
//add type check to junit result set
junit 'helpers-results/cl_types_check/junit.xml'
//publish html reports files
publishHTML([
reportDir: "helpers-results/cl_quality_check",
reportFiles: "report.html",
@@ -538,7 +546,7 @@ pipeline {
dir("gitrepo") {
junit 'helpers-results/cl_unit_test/*.xml'
// using cobertura format (= coverage xml format)
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
publishHTML([
reportDir: "helpers-results/cl_unit_test_coverage",
reportFiles: "index.html",

View File

@@ -12,21 +12,23 @@
A RESTful API library built on top of pydantic & uvicorn to make service API from a data model.
/!\ early in-progress project for internal use ATM.
/!\\ early in-progress project for internal use ATM.
Feel free to contribute.
Features:
- use annotation
Features (available):
- type annotation used
- support containers (dict)
- support plugins (for hook and biding)
- user authentification (WIP)
- ACL (WIP)
- python internal model instance (with possible serialization/auto-save on-disk)
- user auth
- ACL
- daemon mode
Features(planned):
- group support
- python internal model instance (with possible serialization/auto-save on-disk)
Limitations:
- no nested reads / writes
- weak unitest (atm)
Checkout [Latest Documentation](https://chacha.ddns.net/mkdocs-web/chacha/pyrestresource/master/latest/).

View File

@@ -10,7 +10,7 @@
<stringAttribute key="org.eclipse.debug.ui.ATTR_CONSOLE_ENCODING" value="UTF-8"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION" value="${workspace_loc:pyrestresource/helpers_proxy}"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_OTHER_WORKING_DIRECTORY" value=""/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_TOOL_ARGUMENTS" value="--typecheck --qualitycheck"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_TOOL_ARGUMENTS" value="--qualitycheck"/>
<stringAttribute key="org.python.pydev.debug.ATTR_INTERPRETER" value="__default"/>
<stringAttribute key="org.python.pydev.debug.ATTR_PROJECT" value="pyrestresource"/>
<stringAttribute key="process_factory_id" value="org.python.pydev.debug.processfactory.PyProcessFactory"/>

17
RUN_types.launch Normal file
View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<launchConfiguration type="org.python.pydev.debug.regularLaunchConfigurationType">
<booleanAttribute key="org.eclipse.debug.core.ATTR_FORCE_SYSTEM_CONSOLE_ENCODING" value="false"/>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_PATHS">
<listEntry value="/pyrestresource/helpers_proxy"/>
</listAttribute>
<listAttribute key="org.eclipse.debug.core.MAPPED_RESOURCE_TYPES">
<listEntry value="2"/>
</listAttribute>
<stringAttribute key="org.eclipse.debug.ui.ATTR_CONSOLE_ENCODING" value="UTF-8"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_LOCATION" value="${workspace_loc:pyrestresource/helpers_proxy}"/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_OTHER_WORKING_DIRECTORY" value=""/>
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_TOOL_ARGUMENTS" value="--typecheck"/>
<stringAttribute key="org.python.pydev.debug.ATTR_INTERPRETER" value="__default"/>
<stringAttribute key="org.python.pydev.debug.ATTR_PROJECT" value="pyrestresource"/>
<stringAttribute key="process_factory_id" value="org.python.pydev.debug.processfactory.PyProcessFactory"/>
</launchConfiguration>

View File

@@ -34,7 +34,7 @@ classifiers = [
]
dependencies = [
'packaging',
'typegard',
'typeguard',
'pydantic>=2.4,<3',
'uvicorn>=0.23'
]
@@ -48,21 +48,42 @@ include-package-data = true
where = ["src"]
[tool.setuptools.package-data]
"pyrestresource.data" = ["*.*"]
"pyrestresource" = ["py.typed"]
[tool.pylint.main]
disable = ["missing-class-docstring","missing-function-docstring","missing-module-docstring"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
branch = true
data_file="helpers-results/cl_unit_test_raw_coverage/.coverage"
# debug = ["config","multiproc","process"]
parallel = true
concurrency = [
'thread'
]
[tool.coverage.report]
exclude_also = [
"if TYPE_CHECKING:",
]
[project.urls]
Homepage = "https://chacha.ddns.net/gitea/chacha/pyrestresource"
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/pyrestresource/master/latest/"
Tracker = "https://chacha.ddns.net/gitea/chacha/pyrestresource/issues"
[project.optional-dependencies]
test = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
coverage-check = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
complexity-check = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
quality-check = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
type-check = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
doc-gen = ["chacha_cicd_helper@git+https://chacha.ddns.net/gitea/chacha/chacha_cicd_helper.git@master"]
test = ["chacha_cicd_helper"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]
# my-script = "my_package.module:function"

View File

@@ -18,22 +18,9 @@ from typing import TYPE_CHECKING
from .__metadata__ import __version__, __Summuary__, __Name__
from .rest_resource import RestResourceBase
from .rest_model import RestField
from .rest_resource_rootpoint import register_rest_rootpoint
from .rest_types import rsrc_verb, T_SupportedRESTFields
if TYPE_CHECKING:
from .rest_types import (
T_ListIndex,
T_ListSize,
T_DictKey,
T_T_DictKey,
T_DictValues,
T_T_DictValues,
)
from .rest_request_opt import (
RestRequestParams_POST,
RestRequestParams_DELETE,
@@ -44,19 +31,19 @@ from .rest_request_opt import (
RestRequestParams_Dict_POST,
RestRequestParams_Dict_DELETE,
RestRequestParams_Dict_GET,
RestRequestParams_Dict_elem_GET,
RestRequestParams_Dict_elem_PUT,
)
from .rest_resource_plugin import (
ResourcePlugin_field_default,
ResourcePlugin_RestResourceBase_default,
ResourcePlugin_dict_default,
)
from .rest_ACL import ACL_target_user, ACL_target_group, ACL_target_group_Any, ACL_record, ACL_rule
from .rest_login import (
RestResourceBaseLogin,
UserLogin,
)
from .rest_resource import RestResourceBase
from .rest_login import UserLogin
from .rest_resource_login import RestResourceBaseLogin
from .rest_exceptions import (
RestResourceException,
RestResourceLoginException,
@@ -67,3 +54,13 @@ from .rest_exceptions import (
RestResourcePluginException_InvalidPluginSignature,
RestResourceHandlerException_Forbiden,
)
if TYPE_CHECKING:
from .rest_types import (
T_ListIndex,
T_ListSize,
T_DictKey,
T_T_DictKey,
T_DictValues,
T_T_DictValues,
)

View File

@@ -19,8 +19,8 @@ class _JSONEncoder(json.JSONEncoder):
return json.JSONEncoder.default(self, o)
def parse_dict_cookies(cookies: str) -> dict[str, str]:
result = {}
def parse_dict_cookies(cookies: str) -> dict[str, str | None]:
result: dict[str, str | None] = {}
for item in cookies.split(";"):
item = item.strip()
if not item:
@@ -36,5 +36,4 @@ def parse_dict_cookies(cookies: str) -> dict[str, str]:
def forward_exception(e: Exception, forward: bool) -> None:
if forward:
raise e from None
else:
traceback.print_exc()
traceback.print_exc()

View File

@@ -1,12 +1,13 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from pydantic import BaseModel
from enum import Enum, auto
from pydantic import BaseModel
from .rest_types import rsrc_verb
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from .rest_login import UserLogin
@@ -22,7 +23,7 @@ class ACL_target_user(ACL_target):
return cls(name=user_login.username)
class ACL_target_user_Annonymous(ACL_target):
class ACL_target_user_Annonymous(ACL_target_user):
name: str = "__ANNONYMOUS__"

View File

@@ -2,6 +2,10 @@ class RestResourceException(Exception):
pass
class RestResourceConfigException(RestResourceException):
pass
class RestResourceModelException(RestResourceException):
pass

View File

@@ -12,25 +12,20 @@
"""CLI interface module"""
from __future__ import annotations
from typing import Optional, ClassVar, TYPE_CHECKING
from typing import Optional, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from pydantic import BaseModel, Field
from datetime import datetime
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .rest_resource import RestResourceBase
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule, ACL_target_user
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
if TYPE_CHECKING is True:
from .rest_request import RestRequest, RestRequestParams_GET
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_model import RestField
from .rest_ACL import ACL_record, ACL_target_group_Any, ACL_rule
if TYPE_CHECKING:
pass
class UserLogin(BaseModel):
@@ -41,24 +36,12 @@ class UserLogin(BaseModel):
class UserSession(BaseModel):
last_update: datetime
user_login: UserLogin
client: Optional[tuple[str, int]]
client: tuple[str, int] | tuple[()] | None
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_GET) -> Login:
return Login(username=self.get_user_login())
def handle_resource_put(self, resource: Login, params: RestRequestParams_GET) -> Login:
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource
class Login(RestResourceBase):
username: Optional[str] = Field(None)
secret: Optional[str] = Field(
class Login(rest_resource.RestResourceBase):
username: Optional[str] = RestField(None)
secret: Optional[str] = RestField(
None,
exclude=True,
ACL=[
@@ -66,65 +49,3 @@ class Login(RestResourceBase):
ACL_record(verbs=[rsrc_verb.GET], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = Field(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie != None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
return
# print(f"non-connected user {request.get_client()}")
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
else:
already_failed = True
else:
pass
pass
if already_failed:
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -0,0 +1,97 @@
from __future__ import annotations
from typing import Literal, Any, Callable, TYPE_CHECKING
from pydantic.fields import Field, _Unset, PydanticUndefined
from .rest_exceptions import RestResourceModelException
if TYPE_CHECKING:
from .rest_ACL import ACL_record
from .rest_resource_plugin import ResourcePlugin
from typing import Unpack
from pydantic.fields import _EmptyKwargs, AliasPath, AliasChoices
def RestField( # pylint: disable=too-many-locals
default: Any = PydanticUndefined,
*,
default_factory: Callable[[], Any] | None = _Unset,
alias: str | None = _Unset,
alias_priority: int | None = _Unset,
validation_alias: str | "AliasPath" | "AliasChoices" | None = _Unset,
serialization_alias: str | None = _Unset,
title: str | None = _Unset,
description: str | None = _Unset,
examples: "list[Any] | None" = _Unset,
exclude: bool | None = _Unset,
discriminator: str | None = _Unset,
json_schema_extra: dict[str, Any] | Callable[[dict[str, Any]], None] | None = _Unset,
frozen: bool | None = _Unset,
validate_default: bool | None = _Unset,
repr: bool = _Unset, # pylint: disable=redefined-builtin
init_var: bool | None = _Unset,
kw_only: bool | None = _Unset,
pattern: str | None = _Unset,
strict: bool | None = _Unset,
gt: float | None = _Unset,
ge: float | None = _Unset,
lt: float | None = _Unset,
le: float | None = _Unset,
multiple_of: float | None = _Unset,
allow_inf_nan: bool | None = _Unset,
max_digits: int | None = _Unset,
decimal_places: int | None = _Unset,
min_length: int | None = _Unset,
max_length: int | None = _Unset,
union_mode: Literal["smart", "left_to_right"] = _Unset,
ACL: list["ACL_record"] | None = _Unset,
plugin: type["ResourcePlugin"] | None = _Unset,
**extra: Unpack[_EmptyKwargs],
) -> Any:
if not json_schema_extra or json_schema_extra is _Unset:
if extra:
json_schema_extra = extra # type: ignore
else:
json_schema_extra = {}
if ACL is not _Unset:
json_schema_extra["ACL"] = ACL
if plugin is not _Unset:
json_schema_extra["plugin"] = plugin
else:
raise RestResourceModelException("json_schema_extra must not be set")
return Field(
default,
default_factory=default_factory,
alias=alias,
alias_priority=alias_priority,
validation_alias=validation_alias,
serialization_alias=serialization_alias,
title=title,
description=description,
examples=examples,
exclude=exclude,
discriminator=discriminator,
json_schema_extra=json_schema_extra,
frozen=frozen,
validate_default=validate_default,
repr=repr,
init_var=init_var,
kw_only=kw_only,
pattern=pattern,
strict=strict,
gt=gt,
ge=ge,
lt=lt,
le=le,
multiple_of=multiple_of,
allow_inf_nan=allow_inf_nan,
max_digits=max_digits,
decimal_places=decimal_places,
min_length=min_length,
max_length=max_length,
union_mode=union_mode,
**extra,
)

View File

@@ -13,6 +13,7 @@ from urllib.parse import urlparse, parse_qs
from pydantic import BaseModel, Field
from typeguard import check_type
from .rest_types import rsrc_verb, T_AllSupportedFields
from .rest_request_opt import (
RestRequestParams_POST,
@@ -27,8 +28,14 @@ from .rest_request_opt import (
)
from .rest_ACL import ACL_target_user, ACL_target_user_Annonymous, ACL_target_group
from .helpers import parse_dict_cookies
from .rest_exceptions import (
RestResourceHandlerException_MethodNotAllowed,
RestResourceHandlerException_BadRequest,
RestResourceException,
RestResourceConfigException,
)
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from typing import Optional
from .rest_types import T_SupportedRESTFields
from .rest_resource import RestResourceBase
@@ -89,11 +96,11 @@ class RequestFactory(
request.update_ReqParams(self.cls_RestRequestParams_DELETE)
else:
raise RestResourceHandlerException_MethodNotAllowed("Invalid Verb")
return
class RestRequest(Generic[_T_RestRequestParams]):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
"""Main RestRequets class"""
def __init__(
@@ -121,7 +128,7 @@ class RestRequest(Generic[_T_RestRequestParams]):
self.verb: rsrc_verb
self.data: dict
self._raw_headers: list[Any] = []
self._client: tuple[str, int] = ()
self._client: tuple[str, int] | tuple[()] = ()
self.headers: dict[str, None | str | dict[str, None | str]] = {"host": None, "cookie": {}}
self._saved_url_params: dict
self.ReqParams: _T_RestRequestParams = type_request_params()
@@ -179,7 +186,7 @@ class RestRequest(Generic[_T_RestRequestParams]):
def set_client(self, client: tuple[str, int]) -> None:
self._client = client
def get_client(self) -> tuple[str, int]:
def get_client(self) -> tuple[str, int] | tuple[()]:
return self._client
def set_headers(self, headers: list[Any]) -> None:
@@ -193,19 +200,26 @@ class RestRequest(Generic[_T_RestRequestParams]):
self.headers["cookie"] = parse_dict_cookies(elem[1].decode("utf-8"))
def get_cookie(self, key: str) -> str | None:
if self.headers["cookie"] is None:
return None
if key not in self.headers["cookie"]:
return None
return self.headers["cookie"][key]
if isinstance(self.headers["cookie"], dict):
return self.headers["cookie"][key]
return None
def set_resp_cookie_value(self, key: str, value: str) -> None:
self.outgoing_cookie[
key
] = f"{value}; expires={self.root_resource.get_new_cookie_expiration_date().strftime('%a, %d %b %Y %H:%M:%S GMT')}; path=/; HttpOnly"
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
expire_date = self.root_resource.get_new_cookie_expiration_date().strftime("%a, %d %b %Y %H:%M:%S GMT")
self.outgoing_cookie[key] = f"{value}; expires={expire_date}; path=/; HttpOnly"
def reset_resp_cookie(self, key: str) -> None:
self.outgoing_cookie[key] = "null; path=/; expires=Thu, 01 Jan 1970 00:00:00 GMT"
def get_host(self) -> str:
def get_host(self) -> str | dict[str, str | None] | None:
return self.headers["host"]
def set_result(self, result: str):

View File

@@ -9,7 +9,7 @@ from .rest_types import (
_T_DictKey,
)
if TYPE_CHECKING is True:
if TYPE_CHECKING:
pass

View File

@@ -8,12 +8,13 @@ from typing import (
from abc import ABC
import json
import pprint
from pydantic import BaseModel
from .rest_types import rsrc_verb
from .helpers import _JSONEncoder, forward_exception
from .rest_types import rsrc_verb, _T_SupportedRESTFields
from .rest_ACL import (
ACL_record,
@@ -23,7 +24,6 @@ from .rest_ACL import (
ACL_rule,
)
from .rest_request import RestRequest
from .rest_exceptions import (
RestResourceLoginException_InvalidSession,
RestResourceLoginException_SessionTimeout,
@@ -36,15 +36,21 @@ from .rest_exceptions import (
RestResourceException,
)
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_types import T_SupportedRESTFields
from .rest_resource_plugin import ResourcePlugin
from .rest_types import (
T_T_DictKey,
T_T_DictValues,
)
from .rest_resource_handler import (
ResourceHandler,
)
class RestResourceBase(ABC, BaseModel, validate_assignment=True):
# _resp_cookies: ClassVar[dict[str, str]] = {}
_dict_key_type_: ClassVar[dict[str, T_T_DictKey]] = {}
_dict_value_type_: ClassVar[dict[str, T_T_DictValues]] = {}
_model_dump_excluded_: ClassVar[dict[str, bool]] = {}
@@ -52,13 +58,13 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
_plugins_: ClassVar[
dict[
str,
list[ACL_record],
ResourcePlugin,
]
] = {}
_ACL_record_: ClassVar[
dict[
str,
ACL_record,
list[ACL_record],
]
] = {}
@@ -92,15 +98,16 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
"""Check ACL on requested field access"""
self._check_acl(request.user, request.groups, request.get_verb(), request.get_resource_origin(req_index), False)
def check_acl_self(self, request: RestRequest, new_data: Optional[dict[str, _T_SupportedRESTFields]]) -> None:
def check_acl_self(self, request: RestRequest, new_data: Optional[dict[str, T_SupportedRESTFields]]) -> None:
"""Check ACL on requested field operation (involving checking sub-fields)"""
if request.get_verb() is rsrc_verb.GET:
for key in self.model_fields.keys():
for key in self.model_fields:
self._check_acl(request.user, request.groups, rsrc_verb.GET, key)
elif request.get_verb() is rsrc_verb.PUT:
for key in new_data.keys():
if key in self.model_fields:
self._check_acl(request.user, request.groups, rsrc_verb.PUT, key)
if new_data is not None:
for key in new_data.keys():
if key in self.model_fields:
self._check_acl(request.user, request.groups, rsrc_verb.PUT, key)
else:
raise RestResourceException("Incompatible verb")
@@ -122,7 +129,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
return body
async def __call__(self, scope, receive, send):
async def __call__(self, scope, receive, send) -> None:
assert scope["type"] == "http"
method = scope["method"]
@@ -148,7 +155,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
assert request is not None
header_resp = {
header_resp: dict[str, Any] = {
"type": "http.response.start",
"status": request.get_status(),
"headers": [
@@ -162,8 +169,9 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
await send(header_resp)
body = None
if request.get_result():
body = request.get_result().encode("utf-8")
result = request.get_result()
if result:
body = result.encode("utf-8")
await send(
{
@@ -175,7 +183,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
def _process_request_session(self, request: RestRequest) -> None:
pass
def process_request(
def process_request( # pylint: disable=too-complex
self,
url: str,
verb: rsrc_verb = rsrc_verb.GET,
@@ -185,10 +193,7 @@ class RestResourceBase(ABC, BaseModel, validate_assignment=True):
headers: Optional[list[Any]] = None,
http_mode: bool = False,
) -> RestRequest:
from .rest_resource_handler import (
ResourceHandler,
ResourceHandler_RestResourceBase,
)
from .rest_resource_handler import ResourceHandler_RestResourceBase
data: dict = {}
if data_json:

View File

@@ -1,9 +1,12 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import Optional, cast, TypeVar, Generic, Self, TYPE_CHECKING
import abc
from .rest_types import (
NoneType,
rsrc_verb,
T_SupportedRESTFields,
T_DictKey,
@@ -11,10 +14,14 @@ from .rest_types import (
T_Dict,
T_DictValues,
)
from .rest_resource import RestResourceBase
# from .rest_resource import RestResourceBase
from . import rest_resource
from .rest_request import RequestFactory
from .rest_resource_plugin import (
ResourcePlugin_field,
ResourcePlugin_dict,
ResourcePlugin_RestResourceBase,
)
from .rest_request_opt import (
@@ -38,14 +45,14 @@ from .rest_exceptions import (
RestResourceHandlerException_ResourceNotFound,
RestResourceHandlerException_MethodNotAllowed,
RestResourceHandlerException_BadRequest,
RestResourceHandlerException_Forbiden,
)
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from .rest_types import T_T_DictKey, T_T_DictValues
from .rest_request import RestRequest
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, RestResourceBase)
_T_Resource = TypeVar("_T_Resource", T_DictValues, T_Dict, T_SupportedRESTFields, rest_resource.RestResourceBase)
class ResourceHandler(
@@ -100,8 +107,8 @@ class ResourceHandler(
elif None in [url, verb]:
raise RestResourceHandlerException("if req not set, url,verb must be setted")
else:
if url is None or verb is None:
raise RestResourceHandlerException("url and verb must be set")
assert url is not None and verb is not None
assert isinstance(resource, rest_resource.RestResourceBase)
if data is None:
data = {}
self.req = self._request_factory.get_RestRequest(resource, url, verb, data, query_string)
@@ -141,14 +148,6 @@ class ResourceHandler(
resource_handler = self._find_resource()
return resource_handler._process_verb()
def access_resource(
self,
) -> _T_Resource:
# print(f"[TRACE] {type(self).__name__}->access_resource()")
self._reset_context()
resource_handler = self._find_resource()
return resource_handler.resource
def _reset_context(self) -> None:
self.req.reset_url_stack()
@@ -175,11 +174,7 @@ class ResourceHandler(
# reveal_type(_next_resource)
# print(f"[DEBUG] next_resource = {type(next_resource).__name__}")
if (
isinstance(_next_resource, RestResourceBase)
or isinstance(_next_resource, dict)
or type(_next_resource) in _T_SupportedRESTFields
):
if isinstance(_next_resource, (rest_resource.RestResourceBase, dict)) or type(_next_resource) in _T_SupportedRESTFields:
next_resource_handler_cls: type[ResourceHandler] = self._get_resource_handler(_next_resource, self.req)
self.saved_url = self.req.consume_url_stack(self._nb_url_element_to_consume_)
@@ -290,6 +285,14 @@ class ResourceHandler_dict(
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
if self.prev_handler is not None and self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_get_keys(self.resource, params)
return list(_dict.keys())
def _handle_process_delete(self, params) -> None:
@@ -298,58 +301,113 @@ class ResourceHandler_dict(
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
plugin_dict: ResourcePlugin_dict | None = None
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
if params.API_key is not None:
del _dict[dict_key_type(params.API_key)]
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(params.API_key, "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(params.API_key)
_dict_key = key_std
if plugin_dict:
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return
del _dict[_dict_key]
else:
if plugin_dict:
plugin_dict.handle_dict_delete_all(_dict, params)
return
_dict.clear()
return
def _handle_process_post(self, params) -> Optional[T_DictKey]:
# pylint: disable=protected-access
def _handle_process_post(self, params) -> Optional[T_DictKey]: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_handle_process_post()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_value_type: T_T_DictValues = cast(RestResourceBase, self.prev_handler.resource)._dict_value_type_[
dict_key_type: T_T_DictKey = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_key_type_[
self.req.get_resource_origin(1)
]
_obj = dict_value_type(**self.req.get_data())
dict_value_type: T_T_DictValues = cast(rest_resource.RestResourceBase, self.prev_handler.resource)._dict_value_type_[
self.req.get_resource_origin(1)
]
_obj: T_DictValues
if issubclass(dict_value_type, rest_resource.RestResourceBase):
_obj = dict_value_type(**self.req.get_data())
_obj_restrsrc = cast(rest_resource.RestResourceBase, _obj)
for key, _ in _obj_restrsrc.model_fields.items():
if key in _obj_restrsrc._plugins_:
if isinstance(_obj_restrsrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _obj_restrsrc._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(_obj_restrsrc, key)
setattr(_obj_restrsrc, key, plugin_field.handle_field_put(value, params))
elif not issubclass(dict_value_type, NoneType): # type: ignore # => mypy bug with Type[None]
_obj = dict_value_type(**self.req.get_data()) # type: ignore # => mypy bug with Type[None]
else:
_obj = None
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey | None = None
# 1st try/ using request param provided dict API_key
if params.API_key is not None:
if issubclass(dict_key_type, bytes):
key_byte: bytes = dict_key_type(params.API_key, "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(params.API_key)
_dict_key = key_std
# if a primary key is set for the resource, updating it
if isinstance(_obj, RestResourceBase):
if isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_pri: T_DictKey = dict_key_type(params.API_key)
setattr(_obj, _obj._primary_key_, _pri)
# storing resource
_dict[dict_key_type(params.API_key)] = _obj
return dict_key_type(params.API_key)
setattr(_obj, _obj._primary_key_, _dict_key)
# 2nd try/ using provided resource internal primary key
# & 3rd try/ using resource internal auto-generated primary key
# => this case is automatic because if self.req.get_data() doesn't contain the key, it should be automatically created
if isinstance(_obj, RestResourceBase):
elif isinstance(_obj, rest_resource.RestResourceBase):
if _obj._primary_key_ is not None:
_obj_primary_key: Optional[T_DictKey] = getattr(_obj, _obj._primary_key_)
if _obj_primary_key is not None:
_dict[_obj_primary_key] = _obj
return _obj_primary_key
_dict_key = _obj_primary_key
if _dict_key is not None:
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_post(_dict, _dict_key, _obj, params)
_dict[_dict_key] = _obj
return _dict_key
raise RestResourceHandlerException_BadRequest(
"Either the object needs defined primary key or the request must contain an API_key param to process this command"
)
return None # for mypy....
@ResourceHandler.register_resource_handler
@@ -389,15 +447,28 @@ class ResourceHandler_dict_elem(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(1)]
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(1)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(0), "utf-8")
return cast(dict[T_DictKey, T_DictValues], self.resource)[key_byte]
_dict_key = key_byte
else:
key = dict_key_type(self.req.get_resource_origin(0))
return cast(dict[T_DictKey, T_DictValues], self.resource)[key]
key_std = dict_key_type(self.req.get_resource_origin(0))
_dict_key = key_std
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)],
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
return plugin_dict.handle_dict_elem_get(_dict, _dict_key, params)
return _dict[_dict_key]
def _handle_process_delete(self, params) -> None:
# print(f"{type(self).__name__}->_handle_process_delete()")
@@ -408,21 +479,35 @@ class ResourceHandler_dict_elem(
# because self.req is another context that is not saved to improve performances
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
dict_key_type: T_T_DictKey = cast(RestResourceBase, self.prev_handler.resource)._dict_key_type_[self.req.get_resource_origin(2)]
dict_key_type: T_T_DictKey = self.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.resource)
_dict_key: T_DictKey
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
del cast(dict[T_DictKey, T_DictValues], self.resource)[key_byte]
_dict_key = key_byte
else:
key = dict_key_type(self.req.get_resource_origin(1))
del cast(dict[T_DictKey, T_DictValues], self.resource)[key]
key_std = dict_key_type(self.req.get_resource_origin(1))
_dict_key = key_std
if self.req.get_resource_origin(2) in self.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(
ResourcePlugin_dict, self.prev_handler.resource._plugins_[self.req.get_resource_origin(2)]
)
plugin_dict.set_context(self.req, self.req.get_root_resource())
plugin_dict.handle_dict_delete(_dict, _dict_key, params)
return
del _dict[_dict_key]
return
@ResourceHandler.register_resource_handler
class ResourceHandler_RestResourceBase(
ResourceHandler[
RestResourceBase,
rest_resource.RestResourceBase,
_T_RestRequestParams_POST,
_T_RestRequestParams_DELETE,
RestRequestParams_RestResourceBase_GET,
@@ -448,7 +533,7 @@ class ResourceHandler_RestResourceBase(
def _check_resource_handler(cls, resource: _T_Resource, req: RestRequest) -> bool:
# print(f"{cls.__name__}->_check_resource_handler()")
return isinstance(resource, RestResourceBase)
return isinstance(resource, rest_resource.RestResourceBase)
def _check_access_rights(self) -> None:
super()._check_access_rights()
@@ -473,35 +558,35 @@ class ResourceHandler_RestResourceBase(
if self.resource.model_fields[self.req.get_resource_origin(0)].exclude is True and self.req.get_verb() is rsrc_verb.GET:
raise RestResourceHandlerException_ResourceNotFound(f"Not allowed READ access detected: {self.req.get_url_stack()}")
def _handle_process_get(self, params) -> RestResourceBase:
def _handle_process_get(self, params) -> rest_resource.RestResourceBase:
# print(f"{type(self).__name__}->_process_get()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
# CASE 1: no more item in url_stack => we reached the endpoint (operation)
# So we are in a RestResourceBase instance and must return the content
plugin_field: ResourcePlugin_field
plugin_resource: ResourcePlugin_RestResourceBase
if len(self.req.get_url_stack()) == 0:
self.resource.check_acl_self(self.req, None)
for key, attr in self.resource.model_fields.items():
for key, _ in self.resource.model_fields.items():
if key in self.resource._plugins_:
if issubclass(self.resource._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(
ResourcePlugin_field, self.resource._plugins_[key](self.req, self.req.get_root_resource())
)
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(self.resource, key)
setattr(self.resource, key, plugin_field.handle_field_get(value, params))
elif issubclass(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
plugin_field: ResourcePlugin_field = cast(
ResourcePlugin_RestResourceBase, self.resource._plugins_[key](self.req, self.req.get_root_resource())
)
elif isinstance(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
plugin_resource = cast(ResourcePlugin_RestResourceBase, self.resource._plugins_[key])
plugin_resource.set_context(self.req, self.req.get_root_resource())
value = getattr(self.resource, key)
setattr(self.resource, key, plugin_field.handle_resource_get(value, params))
setattr(self.resource, key, plugin_resource.handle_resource_get(value, params))
# result = RestResourceWalker_Root__handler(self.resource).process()
# print(result)
return self.resource
# CASE 2: specific (operation) case for root Node
# TODO: this must probably be merged with the previous bloc
if self.req.get_resource_origin(0) == "/":
return self.resource
@@ -511,70 +596,96 @@ class ResourceHandler_RestResourceBase(
key = self.req.get_resource_origin(0)
if key in self.resource._plugins_:
if issubclass(self.resource._plugins_[key], ResourcePlugin_field):
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase,
self.resource._plugins_[key](self.req, self.req.get_root_resource()),
)
value = plugin_rsrc.handle_field_get(value, params)
if isinstance(self.resource._plugins_[key], ResourcePlugin_field):
plugin_field = cast(ResourcePlugin_field, self.resource._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = plugin_field.handle_field_get(value, params)
elif issubclass(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase,
self.resource._plugins_[key](self.req, self.req.get_root_resource()),
)
value = plugin_rsrc.handle_resource_get(value, params)
elif isinstance(self.resource._plugins_[key], ResourcePlugin_RestResourceBase):
plugin_resource = cast(ResourcePlugin_RestResourceBase, self.resource._plugins_[key])
plugin_resource.set_context(self.req, self.req.get_root_resource())
value = plugin_resource.handle_resource_get(value, params)
return value
def _handle_process_put(self, params) -> None:
def _handle_process_put(self, params) -> None: # pylint: disable=too-complex,too-many-branches
# print(f"{type(self).__name__}->_process_put()")
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.resource, rest_resource.RestResourceBase)
self.resource.check_acl_self(self.req, self.req.get_data())
# creating a copy of the current resource
_new_resrc = self.resource.copy()
# updating values based on nex data
# updating values based on new data
_new_resrc.update(**self.req.get_data())
# applying plugins (to nested element)
if isinstance(_new_resrc, RestResourceBase):
for key, attr in _new_resrc.model_fields.items():
if key in _new_resrc._plugins_:
if issubclass(_new_resrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(
ResourcePlugin_field, _new_resrc._plugins_[key](self.req, self.req.get_root_resource())
)
value = getattr(_new_resrc, key)
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
for key, _ in _new_resrc.model_fields.items():
if key in _new_resrc._plugins_:
if isinstance(_new_resrc._plugins_[key], ResourcePlugin_field):
plugin_field: ResourcePlugin_field = cast(ResourcePlugin_field, _new_resrc._plugins_[key])
plugin_field.set_context(self.req, self.req.get_root_resource())
value = getattr(_new_resrc, key)
setattr(_new_resrc, key, plugin_field.handle_field_put(value, params))
# applying plugins (from parent element)
if self.prev_handler is not None:
if (
isinstance(self.prev_handler.resource, dict) # element is within a dict
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
):
key = self.req.get_resource_origin(2)
if key in self.prev_handler.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase,
self.prev_handler.prev_handler.resource._plugins_[key](self.req, self.req.get_root_resource()),
)
_new_resrc = plugin_rsrc.handle_dict_elem_put(_new_resrc, params)
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase,
self.prev_handler.resource._plugins_[key](self.req, self.req.get_root_resource()),
)
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
self.resource.update(**_new_resrc.__dict__)
return
# element is within a dict
if (
isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
key = self.req.get_resource_origin(2)
dict_key_type: T_T_DictKey = self.prev_handler.prev_handler.resource._dict_key_type_[self.req.get_resource_origin(2)]
_dict: dict[T_DictKey, T_DictValues] = cast(dict[T_DictKey, T_DictValues], self.prev_handler.resource)
_dict_key: T_DictKey
if key in self.prev_handler.prev_handler.resource._plugins_:
plugin_dict: ResourcePlugin_dict = cast(ResourcePlugin_dict, self.prev_handler.prev_handler.resource._plugins_[key])
plugin_dict.set_context(self.req, self.req.get_root_resource())
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(self.req.get_resource_origin(1))
_dict_key = key_std
plugin_dict.handle_dict_elem_put(_dict, _dict_key, _new_resrc, params)
else:
if issubclass(dict_key_type, bytes):
key_byte = dict_key_type(self.req.get_resource_origin(1), "utf-8")
_dict_key = key_byte
else:
key_std = dict_key_type(self.req.get_resource_origin(1))
_dict_key = key_std
if _dict_key not in _dict:
raise RuntimeError(f"Key not found: {str(_dict_key)}")
_dict[_dict_key] = _new_resrc
# element is within a RestResourceBase
elif isinstance(self.prev_handler.resource, rest_resource.RestResourceBase):
key = self.req.get_resource_origin(1)
if key in self.prev_handler.resource._plugins_:
plugin_rsrc: ResourcePlugin_RestResourceBase = cast(
ResourcePlugin_RestResourceBase, self.prev_handler.resource._plugins_[key]
)
plugin_rsrc.set_context(self.req, self.req.get_root_resource())
_new_resrc = plugin_rsrc.handle_resource_put(_new_resrc, params)
else:
self.resource.update(**_new_resrc.__dict__)
else:
raise RuntimeError("unsupported operation")
# print("***************")
# print(self.resource)
# print(_new_resrc)
# print(_new_resrc.__dict__)
def _handle_process_delete(self, params) -> None:
# print(f"{type(self).__name__}->_handle_process_delete()")
@@ -585,7 +696,7 @@ class ResourceHandler_RestResourceBase(
self.prev_handler is not None
and isinstance(self.prev_handler.resource, dict)
and self.prev_handler.prev_handler is not None
and isinstance(self.prev_handler.prev_handler.resource, RestResourceBase)
and isinstance(self.prev_handler.prev_handler.resource, rest_resource.RestResourceBase)
):
self.prev_handler._process_delete()
else:
@@ -613,15 +724,15 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
plugin_simple: ResourcePlugin_field = cast(
ResourcePlugin_field,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)](self.req, self.req.get_root_resource()),
ResourcePlugin_field, self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)]
)
plugin_simple.set_context(self.req, self.req.get_root_resource())
return plugin_simple.handle_field_get(self.resource, params)
return self.resource
@@ -631,7 +742,7 @@ class ResourceHandler_simple(
# print(f"{type(self).__name__}->resource = {type(self.resource).__name__}")
assert self.prev_handler is not None
assert isinstance(self.prev_handler.resource, RestResourceBase)
assert isinstance(self.prev_handler.resource, rest_resource.RestResourceBase)
self.prev_handler.resource.check_acl_field(self.req, 1)
@@ -640,9 +751,9 @@ class ResourceHandler_simple(
if self.req.get_resource_origin(1) in self.prev_handler.resource._plugins_:
# print("PLUGIN FOUND")
plugin_simple: ResourcePlugin_field = cast(
ResourcePlugin_field,
self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)](self.req, self.req.get_root_resource()),
ResourcePlugin_field, self.prev_handler.resource._plugins_[self.req.get_resource_origin(1)]
)
plugin_simple.set_context(self.req, self.req.get_root_resource())
# print(value)
value = plugin_simple.handle_field_put(value, params)
# print(value)

View File

@@ -1,80 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pyrestresource(c) by chacha
#
# pyrestresource is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
# pylint: disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
"""CLI interface module"""
from __future__ import annotations
from typing import TYPE_CHECKING
from .rest_resource_walker import (
RestResourceWalkerFutureResult,
RestResourceWalker_Root,
RestResourceWalker_Sub_T_Dict,
RestResourceWalker_Sub_RestFields,
RestResourceWalker_Sub_RestResourceBase,
)
if TYPE_CHECKING is True:
from typing import Optional
class RestResourceWalkerFutureResult_RestResourceBase_handler(RestResourceWalkerFutureResult[dict]):
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
print(f"RestResourceWalkerFutureResult_RestResourceBase_handler {result}")
res = {}
res[self.source.resource_name] = dict()
for subres in result:
key = next(iter(subres))
print(key)
res[self.source.resource_name] = res[self.source.resource_name] | subres
return res
class RestResourceWalkerFutureResult_Dict_handler(RestResourceWalkerFutureResult[dict]):
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
print(f"RestResourceWalkerFutureResult_Dict_handler {result}")
res = {}
for subres in result:
res = res | subres
return res
class RestResourceWalkerFutureResult_RestFields_handler(RestResourceWalkerFutureResult[dict]):
def process_future(self, result: Optional[list[dict]]) -> Optional[dict]:
print(f"RestResourceWalkerFutureResult_RestFields_handler {result}")
print(self.source.resource)
res = {}
res[self.source.resource_name] = dict()
for subres in result:
key = next(iter(subres))
print(key)
res[self.source.resource_name] = res[self.source.resource_name] | subres
return res
class RestResourceWalker_Sub_T_Dict__handler(RestResourceWalker_Sub_T_Dict):
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_Dict_handler
class RestResourceWalker_Sub_RestResourceBase__handler(RestResourceWalker_Sub_RestResourceBase):
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_RestResourceBase_handler
class RestResourceWalker_Sub_RestResourceFields__handler(RestResourceWalker_Sub_RestFields):
cls_RestResourceWalkerFutureResult = RestResourceWalkerFutureResult_RestFields_handler
class RestResourceWalker_Root__handler(RestResourceWalker_Root):
cls_RestResourceWalker_Sub = [
RestResourceWalker_Sub_T_Dict__handler,
RestResourceWalker_Sub_RestResourceFields__handler,
RestResourceWalker_Sub_RestResourceBase__handler,
]

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
from typing import ClassVar, TYPE_CHECKING
from secrets import token_hex, compare_digest
from datetime import datetime, timedelta
from .rest_resource import RestResourceBase
from .rest_model import RestField
from .rest_ACL import ACL_target_user
from .rest_resource_plugin_login import ResourcePlugin_Login
from .rest_login import UserSession, Login
from .rest_exceptions import (
RestResourceLoginException_InvalidCredentials,
RestResourceLoginException_ClientChange,
RestResourceLoginException_SessionTimeout,
RestResourceLoginException_InvalidSession,
)
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_login import UserLogin
class RestResourceBaseLogin(RestResourceBase):
_ar_user_login: ClassVar[list[UserLogin]] = []
_ar_user_session: dict[str, UserSession] = {}
_max_session_inactive: ClassVar[timedelta] = timedelta(minutes=20)
_max_session_time: ClassVar[timedelta] = timedelta(hours=12)
login: Login = RestField(default=Login(), plugin=ResourcePlugin_Login)
def get_new_cookie_expiration_date(self) -> datetime:
return datetime.now() + self._max_session_time
def _process_request_session(self, request: RestRequest) -> None:
# print(f"[TRACE] {type(self).__name__}->_process_request_session()")
# print(f"[TRACE] request: {id(request)}")
auth_cookie = request.get_cookie("Authorization")
if auth_cookie is not None:
if auth_cookie in self._ar_user_session:
# print(f"SESSION FOUND for {request.get_client()}")
if self._ar_user_session[auth_cookie].client != request.get_client():
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_ClientChange()
time_diff = datetime.now() - self._ar_user_session[auth_cookie].last_update
if time_diff > self._max_session_inactive:
del self._ar_user_session[auth_cookie]
raise RestResourceLoginException_SessionTimeout()
request.set_user(ACL_target_user(name=self._ar_user_session[auth_cookie].user_login.username))
# print("SESSION RECOVERED")
return
raise RestResourceLoginException_InvalidSession()
# print(f"non-connected user {request.get_client()}")
return
def user_login(self, user_name: str, user_secret: str, request: RestRequest) -> str:
already_failed: bool = False
for iter_user_login in self._ar_user_login:
username_ok: bool = compare_digest(user_name, iter_user_login.username)
secret_ok: bool = compare_digest(user_secret, iter_user_login.secret)
if username_ok is True:
if secret_ok is True and not already_failed:
return self._register_user_session(iter_user_login, request)
already_failed = True
else:
pass # pylint: disable=unnecessary-pass
pass # pylint: disable=unnecessary-pass
raise RestResourceLoginException_InvalidCredentials()
def _register_user_session(self, user_login: UserLogin, request: RestRequest) -> str:
token = token_hex(16)
new_user_session = UserSession(last_update=datetime.now(), user_login=user_login, client=request.get_client())
self._ar_user_session[f"Bearer {token}"] = new_user_session
return token

View File

@@ -10,10 +10,10 @@ from .rest_types import (
TV_SupportedRESTFields,
TV_RestResourceBase,
)
from .rest_request import RestRequest
from .rest_exceptions import RestResourceConfigException
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from .rest_request import RestRequest
from .rest_resource import RestResourceBase
from .rest_request_opt import (
RestRequestParams_GET,
@@ -29,11 +29,19 @@ if TYPE_CHECKING is True:
class ResourcePlugin(ABC):
def __init__(self, request: RestRequest, root_resource: RestResourceBase) -> None:
self.__request: RestRequest = request
self.__root_resource: RestRequest = root_resource
def __init__(self) -> None:
self.__request: RestRequest
self.__root_resource: RestResourceBase
def set_context(self, request: RestRequest, root_resource: RestResourceBase) -> None:
self.__request = request
self.__root_resource = root_resource
def user_login(self, user_name: str, user_secret: str) -> str:
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use user_login")
return self.__root_resource.user_login(user_name, user_secret, self.__request)
def get_user_login(self) -> str:
@@ -46,6 +54,10 @@ class ResourcePlugin(ABC):
self.__request.reset_resp_cookie(key)
def get_new_cookie_expiration_date(self) -> datetime:
from .rest_resource_login import RestResourceBaseLogin
if not isinstance(self.__root_resource, RestResourceBaseLogin):
raise RestResourceConfigException("root_resource must be RestResourceBaseLogin to use get_new_cookie_expiration_date")
return self.__root_resource.get_new_cookie_expiration_date()
def set_resp_status(self, status: int) -> None:
@@ -121,6 +133,7 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
def handle_dict_post(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
resource: _T_DictValues,
params: RestRequestParams_Dict_POST[_T_DictKey],
) -> Optional[_T_DictKey]:
@@ -128,6 +141,15 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
@abstractmethod
def handle_dict_delete(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
params: RestRequestParams_Dict_DELETE[_T_DictKey],
) -> None:
...
@abstractmethod
def handle_dict_delete_all(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
params: RestRequestParams_Dict_DELETE[_T_DictKey],
@@ -137,17 +159,20 @@ class ResourcePlugin_dict(ResourcePlugin, Generic[_T_DictKey, _T_DictValues]):
@abstractmethod
def handle_dict_elem_get(
self,
resource: TV_RestResourceBase,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
params: RestRequestParams_Dict_elem_GET,
) -> TV_RestResourceBase:
) -> _T_DictValues:
...
@abstractmethod
def handle_dict_elem_put(
self,
resource: TV_RestResourceBase,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
resource: _T_DictValues,
params: RestRequestParams_Dict_elem_PUT,
) -> TV_RestResourceBase:
) -> None:
...
@@ -164,31 +189,43 @@ class ResourcePlugin_dict_default(ResourcePlugin_dict[_T_DictKey, _T_DictValues]
def handle_dict_post(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
resource: _T_DictValues,
params: RestRequestParams_Dict_POST[_T_DictKey],
) -> Optional[_T_DictKey]:
if params.API_key is not None:
resource_dict[params.API_key] = resource
return params.API_key
resource_dict[key] = resource
return key
def handle_dict_delete(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
params: RestRequestParams_Dict_DELETE[_T_DictKey],
) -> None:
if params.API_key is not None:
del resource_dict[params.API_key]
del resource_dict[key]
def handle_dict_delete_all(
self,
resource_dict: dict[_T_DictKey, _T_DictValues],
params: RestRequestParams_Dict_DELETE[_T_DictKey],
) -> None:
resource_dict.clear()
def handle_dict_elem_get(
self,
resource: TV_RestResourceBase,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
params: RestRequestParams_Dict_elem_GET,
) -> TV_RestResourceBase:
return resource
) -> _T_DictValues:
return resource_dict[key]
def handle_dict_elem_put(
self,
resource: TV_RestResourceBase,
resource_dict: dict[_T_DictKey, _T_DictValues],
key: _T_DictKey,
resource: _T_DictValues,
params: RestRequestParams_Dict_elem_PUT,
) -> TV_RestResourceBase:
return resource
) -> None:
if key not in resource_dict:
raise RuntimeError(f"Key not found: {str(key)}")
resource_dict[key] = resource

View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from .rest_resource_plugin import ResourcePlugin_RestResourceBase_default
from .rest_login import UserLogin, Login
from .rest_exceptions import RestResourceLoginException_InvalidCredentials
if TYPE_CHECKING:
from .rest_request_opt import RestRequestParams_RestResourceBase_PUT, RestRequestParams_RestResourceBase_GET
class ResourcePlugin_Login(ResourcePlugin_RestResourceBase_default):
ar_UserLogin: list[UserLogin] = []
def handle_resource_get(self, resource: Login, params: RestRequestParams_RestResourceBase_GET) -> Login:
return Login(username=self.get_user_login(), secret=None)
def handle_resource_put(self, resource: Login, params: RestRequestParams_RestResourceBase_PUT) -> Login:
if resource.username is None or resource.secret is None:
raise RestResourceLoginException_InvalidCredentials()
token = self.user_login(resource.username, resource.secret)
self.set_resp_cookie_value("Authorization", f"Bearer {token}")
return resource

View File

@@ -1,5 +1,8 @@
# pylint: disable=protected-access
from __future__ import annotations
from typing import (
cast,
get_args,
get_origin,
TYPE_CHECKING,
@@ -27,8 +30,8 @@ from .rest_ACL import (
)
from .rest_exceptions import RestResourcePluginException_InvalidPluginSignature, RestResourceModelException, RestResourceModelException_ACL
if TYPE_CHECKING is True:
pass
if TYPE_CHECKING:
...
class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
@@ -39,7 +42,7 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
if not get_origin(datatype[1]) is None:
raise RestResourceModelException("complex dict types are not supported (should create a RestResourceBase container)")
if not datatype[0] in _T_SupportedRESTFields:
raise RestResourceModelException(f"Unsupported Dict Field value type in class (key)")
raise RestResourceModelException("Unsupported Dict Field value type in class (key)")
# preprocessing types / structure
if self.parent is not None and isinstance(self.parent, RestResourceWalker_Sub_RestResourceBase):
@@ -47,21 +50,26 @@ class RestResourceWalker_Sub_T_Dict__tree_init(RestResourceWalker_Sub_T_Dict):
self.parent.annotation._dict_value_type_[self.resource_name] = datatype[1] # pylint: disable=protected-access
self.parent.annotation._model_dump_excluded_[self.resource_name] = True # pylint: disable=protected-access
self.resource.exclude = True
self.parent.resource.model_rebuild(force=True)
assert isinstance(self.resource, FieldInfo)
current_resource = cast(FieldInfo, self.resource)
current_resource.exclude = True
parent_resource = cast(type[RestResourceBase], self.parent.resource)
assert issubclass(parent_resource, RestResourceBase)
parent_resource.model_rebuild(force=True)
self.parent.annotation._ACL_record_[self.resource_name] = []
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_dict: ResourcePlugin_dict = self.resource.json_schema_extra["plugin"]
plugin_dict: type[ResourcePlugin_dict] = self.resource.json_schema_extra["plugin"]
if not issubclass(plugin_dict, ResourcePlugin_dict):
raise RestResourcePluginException_InvalidPluginSignature()
self.parent.annotation._plugins_[self.resource_name] = plugin_dict
self.parent.annotation._plugins_[self.resource_name] = plugin_dict()
# print("ADD DICT PLUGIN")
if "ACL" in self.resource.json_schema_extra:
@@ -90,7 +98,7 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
# print("aaaaaaaaaa")
@@ -105,10 +113,10 @@ class RestResourceWalker_Sub_RestFields__tree_init(RestResourceWalker_Sub_RestFi
]
if "plugin" in self.resource.json_schema_extra:
plugin_field: ResourcePlugin_field = self.resource.json_schema_extra["plugin"]
plugin_field: type[ResourcePlugin_field] = self.resource.json_schema_extra["plugin"]
if not issubclass(plugin_field, ResourcePlugin_field):
raise RestResourcePluginException_InvalidPluginSignature()
self.parent.annotation._plugins_[self.resource_name] = plugin_field
self.parent.annotation._plugins_[self.resource_name] = plugin_field()
# print("ADD FIELD PLUGIN")
if "ACL" in self.resource.json_schema_extra:
@@ -134,20 +142,24 @@ class RestResourceWalker_Sub_RestResourceBase__tree_init(RestResourceWalker_Sub_
# preprocessing types / structure
if self.parent is not None and isinstance(self.parent, RestResourceWalker_Sub_RestResourceBase):
self.parent.annotation._model_dump_excluded_[self.resource_name] = True
self.resource.exclude = True
self.parent.resource.model_rebuild(force=True)
assert isinstance(self.resource, FieldInfo)
current_resource = cast(FieldInfo, self.resource)
current_resource.exclude = True
parent_resource = cast(type[RestResourceBase], self.parent.resource)
assert issubclass(parent_resource, RestResourceBase)
parent_resource.model_rebuild(force=True)
self.parent.annotation._ACL_record_[self.resource_name] = []
if (
isinstance(self.resource, FieldInfo)
and self.resource.json_schema_extra is not None
and type(self.resource.json_schema_extra) is dict
and isinstance(self.resource.json_schema_extra, dict)
):
if "plugin" in self.resource.json_schema_extra:
plugin_resource: ResourcePlugin_RestResourceBase = self.resource.json_schema_extra["plugin"]
plugin_resource: type[ResourcePlugin_RestResourceBase] = self.resource.json_schema_extra["plugin"]
if not issubclass(plugin_resource, ResourcePlugin_RestResourceBase):
raise RestResourcePluginException_InvalidPluginSignature()
self.parent.annotation._plugins_[self.resource_name] = plugin_resource
self.parent.annotation._plugins_[self.resource_name] = plugin_resource()
# print("ADD RESOURCE PLUGIN")
if "ACL" in self.resource.json_schema_extra:

View File

@@ -5,7 +5,6 @@ from typing import (
get_args,
get_origin,
TypeVar,
Type,
Generic,
TYPE_CHECKING,
)
@@ -17,7 +16,7 @@ from .rest_types import _T_SupportedRESTFields
from .rest_resource import RestResourceBase
from .rest_exceptions import RestResourceModelException
if TYPE_CHECKING is True:
if TYPE_CHECKING:
from typing import Any, Optional
TV_RestResourceWalkerFutureResult = TypeVar("TV_RestResourceWalkerFutureResult")
@@ -36,24 +35,24 @@ class RestResourceWalkerFutureResult(ABC, Generic[TV_RestResourceWalkerFutureRes
class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
# pylint: disable=too-many-instance-attributes
cls_RestResourceWalkerFutureResult: Optional[type[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
@classmethod
@abstractmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
"""implementation interface to Factory.
The factory will call this specialized method on each implementation to find a supported one.
"""
...
@classmethod
def get(
self,
cls,
subs: list[type[RestResourceWalker_Sub]],
resource_name: str,
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
parent: Optional[RestResourceWalker_Sub] = None,
argument: Optional[any] = None,
argument: Optional[Any] = None,
) -> Optional[RestResourceWalker_Sub]:
for sub in subs:
_is_valid, _anno, _optional = sub.check_type(resource)
@@ -61,20 +60,19 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
if _is_valid is True:
return sub(resource_name, resource, parent, _anno, _optional, argument)
raise RestResourceModelException(f"Incompatible Field Found: {type(resource).__name__}")
return None
def __init__(
self,
resource_name: str,
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
parent: Optional[RestResourceWalker_Sub] = None,
annotation: Optional[type[RestResourceBase]] = None,
_optional: Optional[bool] = None,
argument: Optional[any] = None,
argument: Optional[Any] = None,
):
self.argument: any = argument
self.argument: Any = argument
self.resource_name: str = resource_name
self.resource: FieldInfo | Type[RestResourceBase] = resource
self.resource: FieldInfo | type[RestResourceBase] = resource
self.parent: Optional[RestResourceWalker_Sub] = parent
self.future_results_subs: Optional[list[RestResourceWalkerFutureResult[TV_RestResourceWalkerFutureResult]]] = None
@@ -102,7 +100,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
def chain_process_future(self) -> Optional[TV_RestResourceWalkerFutureResult]:
if self.future_results_subs is not None and self.future_result is not None:
return_future_results_subs: list[Any] = [] # TODO: use typevar
return_future_results_subs: list[Any] = []
for future_result in self.future_results_subs:
return_future_results_subs.append(future_result.chain_process_future())
return self.future_result.process_future(return_future_results_subs)
@@ -124,11 +122,11 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
@staticmethod
def ProcessAnnotation(
resource: FieldInfo | Type[RestResourceBase],
resource: FieldInfo | type[RestResourceBase],
) -> tuple[type[Any], bool]:
# from .rest_resource import RestResourceBase
_anno: Type[Any]
_anno: type[Any]
# print("!!!!!!!!!!!!!!!!!!!!!!!")
# print(resource)
@@ -159,7 +157,7 @@ class RestResourceWalker_Sub(ABC, Generic[TV_RestResourceWalkerFutureResult]):
class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
_type_resource = get_origin(_anno)
return (_type_resource is dict), _anno, _optional
@@ -175,7 +173,7 @@ class RestResourceWalker_Sub_T_Dict(RestResourceWalker_Sub):
class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
return (_anno in _T_SupportedRESTFields), _anno, _optional
@@ -185,7 +183,7 @@ class RestResourceWalker_Sub_RestFields(RestResourceWalker_Sub):
class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
@classmethod
def check_type(cls, resource: FieldInfo | Type[RestResourceBase]) -> tuple[bool, Type[Any], bool]:
def check_type(cls, resource: FieldInfo | type[RestResourceBase]) -> tuple[bool, type[Any], bool]:
_anno, _optional = cls.ProcessAnnotation(resource)
return (
((get_origin(_anno) is None) and issubclass(_anno, RestResourceBase)),
@@ -201,21 +199,21 @@ class RestResourceWalker_Sub_RestResourceBase(RestResourceWalker_Sub):
class RestResourceWalker_Root:
cls_RestResourceWalker_Sub: list[Type[RestResourceWalker_Sub]] = [
cls_RestResourceWalker_Sub: list[type[RestResourceWalker_Sub]] = [
RestResourceWalker_Sub_T_Dict,
RestResourceWalker_Sub_RestFields,
RestResourceWalker_Sub_RestResourceBase,
]
def __init__(self, resource: RestResourceBase | Type[RestResourceBase]) -> None:
self.subwalker_argument: any = None
self.resource: Type[RestResourceBase]
def __init__(self, resource: RestResourceBase | type[RestResourceBase]) -> None:
self.subwalker_argument: Any = None
self.resource: type[RestResourceBase]
if isinstance(resource, RestResourceBase):
self.resource = type(resource)
else:
self.resource = resource
def process(self, argument: Optional[any] = None, deep_limit: Optional[int] = None) -> Optional[TV_RestResourceWalkerFutureResult]:
def process(self, argument: Optional[Any] = None, deep_limit: Optional[int] = None) -> Optional[TV_RestResourceWalkerFutureResult]:
current_deep: int = 0
sub_walker_initial: Optional[RestResourceWalker_Sub] = RestResourceWalker_Sub.get(
@@ -225,7 +223,7 @@ class RestResourceWalker_Root:
if sub_walker_initial is not None:
sub_walker_initial.process()
sub_walker_initial.get_future()
resource_list: list[tuple[str, FieldInfo | Type[RestResourceBase], RestResourceWalker_Sub]] = [
resource_list: list[tuple[str, FieldInfo | type[RestResourceBase], RestResourceWalker_Sub]] = [
(subresource_name, subresource, sub_walker_initial)
for subresource_name, subresource in sub_walker_initial.get_sub_resources()
]
@@ -252,6 +250,4 @@ class RestResourceWalker_Root:
resource_list = list(new_resource_list)
current_deep = current_deep + 1
return sub_walker_initial.chain_process_future()
else:
raise RestResourceModelException("Invalid Rootpoint")
return None
raise RestResourceModelException("Invalid Rootpoint")

View File

@@ -8,9 +8,8 @@ from pathlib import Path
from uuid import UUID
from ipaddress import IPv4Address, IPv4Network
if TYPE_CHECKING is True:
pass
if TYPE_CHECKING:
from .rest_resource import RestResourceBase
T_Gen_DictKeys: type = type({}.keys())
NoneType = type(None)
@@ -92,7 +91,7 @@ _T_DictValues = TypeVar(
NoneType,
)
T_T_FieldValue = type(T_FieldValue)
T_T_FieldValue = type[T_FieldValue]
T_T_DictValues = type[T_DictValues]
T_Dict = dict[T_DictKey, T_DictValues]

23
test/ThreadedUvicorn.py Normal file
View File

@@ -0,0 +1,23 @@
from uvicorn import Config, Server
from threading import Thread
import asyncio
class ThreadedUvicorn:
def __init__(self, config: Config):
self.server = Server(config)
self.thread = Thread(daemon=True, target=self.server.run)
def start(self):
self.thread.start()
asyncio.run(self.wait_for_started())
async def wait_for_started(self):
while not self.server.started:
await asyncio.sleep(0.1)
def stop(self):
if self.thread.is_alive():
self.server.should_exit = True
while self.thread.is_alive():
continue

View File

@@ -5,3 +5,5 @@
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
from .ThreadedUvicorn import ThreadedUvicorn

View File

@@ -3,9 +3,9 @@ import unittest
from os import chdir
from pathlib import Path
from typing import Optional
from pydantic import Field
from src.pyrestresource import (
RestField,
RestResourceHandlerException_Forbiden,
register_rest_rootpoint,
RestResourceBase,
@@ -30,8 +30,8 @@ chdir(testdir_path.parent.resolve())
# to allow mock-ing, all the tested classes are in a function
def init_classes():
class TestResource(RestResourceBase):
username: Optional[str] = Field(None)
secret: Optional[str] = Field(
username: Optional[str] = RestField(None)
secret: Optional[str] = RestField(
None,
exclude=True,
ACL=[
@@ -41,21 +41,21 @@ def init_classes():
)
class TestResource2(RestResourceBase):
version_ro: Optional[str] = Field(
version_ro: Optional[str] = RestField(
"1.2.3",
ACL=[
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
version: Optional[str] = Field("3.2.1")
version: Optional[str] = RestField("3.2.1")
@register_rest_rootpoint
class RootApp(RestResourceBase):
resource_with_secret: TestResource = Field(default=TestResource())
resource_with_secret_ACL: TestResource = Field(
resource_with_secret: TestResource = RestField(default=TestResource())
resource_with_secret_ACL: TestResource = RestField(
default=TestResource(), ACL=[ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_group_Any(), rule=ACL_rule.DENY)]
)
resource_ro: TestResource2 = Field(TestResource2())
resource_ro: TestResource2 = RestField(TestResource2())
# this add the classes to globals to allow using them later on
# => this is only for uinit-testing purpose and is not needed in real use

View File

@@ -3,16 +3,16 @@ import unittest
from os import chdir
from pathlib import Path
from typing import Optional, ClassVar
from pydantic import Field
from time import sleep
import uvicorn
import socket
import requests
from contextlib import closing
from multiprocessing import Process
import requests
from requests.adapters import HTTPAdapter
import uvicorn
from src.pyrestresource import (
RestField,
ACL_target_user,
UserLogin,
RestResourceBase,
@@ -32,6 +32,8 @@ from src.pyrestresource import (
)
from test import ThreadedUvicorn
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
@@ -42,24 +44,24 @@ def init_classes():
user_test2 = UserLogin(username="TestUser2", secret="abcdef")
class TestResource(RestResourceBase):
test_field: Optional[str] = Field("ORIGIN_VALUE")
test_field: Optional[str] = RestField("ORIGIN_VALUE")
class TestResourceACL(RestResourceBase):
test_field: Optional[str] = Field(
test_field: Optional[str] = RestField(
"ORIGIN_VALUE",
ACL=[
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_user.from_user_login(user_test), rule=ACL_rule.ALLOW),
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
test_field2: Optional[str] = Field(
test_field2: Optional[str] = RestField(
"ORIGIN_VALUE",
ACL=[
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_user.from_user_login(user_test2), rule=ACL_rule.ALLOW),
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_group_Any(), rule=ACL_rule.DENY),
],
)
test_field_both: Optional[str] = Field(
test_field_both: Optional[str] = RestField(
"ORIGIN_VALUE",
ACL=[
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_user.from_user_login(user_test), rule=ACL_rule.ALLOW),
@@ -71,7 +73,7 @@ def init_classes():
@register_rest_rootpoint
class RootApp(RestResourceBaseLogin):
_ar_user_login: ClassVar[list[UserLogin]] = [user_test, user_test2]
test_resourceACL: TestResource = Field(
test_resourceACL: TestResource = RestField(
TestResource(),
ACL=[
ACL_record(verbs=[rsrc_verb.PUT], target=ACL_target_user(name=user_test.username), rule=ACL_rule.ALLOW),
@@ -93,26 +95,18 @@ def find_free_port():
return "localhost", s.getsockname()[1]
def launch_server(ip, port):
init_classes()
uvicorn.run(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True)
class Test_RestAPI_LOGIN_Web(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
def test_login_two_users(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -206,19 +200,15 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertEqual(response.json(), "A TEST SET VALUE 2")
finally:
proc.terminate()
s.close()
server.stop()
def test_login(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -260,19 +250,15 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertEqual(response.json(), "TestUser")
finally:
proc.terminate()
s.close()
server.stop()
def test_change_host(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s1 = requests.Session()
s1.mount("http://", HTTPAdapter(max_retries=0))
@@ -378,20 +364,16 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertEqual(response.json(), "__ANNONYMOUS__")
finally:
proc.terminate()
s1.close()
s2.close()
server.stop()
def test_login_wrong_pwd(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -493,19 +475,15 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertDictEqual(s.cookies.get_dict(), {})
finally:
proc.terminate()
s.close()
server.stop()
def test_access_resourceACL(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -570,19 +548,15 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertEqual(response.json(), "TEST SET VALUE 2")
finally:
proc.terminate()
s.close()
server.stop()
def test_access_fieldACL(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -647,5 +621,5 @@ class Test_RestAPI_LOGIN_Web(unittest.TestCase):
self.assertEqual(response.json(), "TEST SET VALUE 2")
finally:
proc.terminate()
s.close()
server.stop()

View File

@@ -4,16 +4,12 @@ from unittest.mock import patch
from os import chdir
from pathlib import Path
from typing import Optional
from pydantic import Field
from uuid import UUID, uuid4
from time import time
import json
print(__name__)
print(__package__)
from src.pyrestresource import (
RestField,
RestResourceHandlerException_Forbiden,
register_rest_rootpoint,
RestResourceBase,
@@ -39,19 +35,19 @@ def init_classes():
api_version: str
class Patch(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
class Profile(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
class Game(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
@@ -62,12 +58,12 @@ def init_classes():
Patch_2 = Patch(uuid="d385a1d2-65fa-11ee-8c99-0242ac120002", shortname="testPatch2")
class User(RestResourceBase):
uuid: UUID = Field(
uuid: UUID = RestField(
default_factory=uuid4,
primary_key=True,
)
name: str
secret: str = Field(
secret: str = RestField(
...,
exclude=True,
ACL=[
@@ -83,7 +79,7 @@ def init_classes():
)
class Patch2(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
@@ -484,7 +480,7 @@ class Test_RestAPI_PERFO(unittest.TestCase):
init_classes()
self.testapp = RootApp()
# @unittest.skip
@unittest.skip
def test_perf_dict(self):
print(f"LIB INTERNAL PERF TEST")
n_loop = 10000

View File

@@ -3,9 +3,9 @@ import unittest
from os import chdir
from pathlib import Path
from typing import Annotated
from pydantic import Field
from src.pyrestresource import (
RestField,
register_rest_rootpoint,
RestResourceBase,
rsrc_verb,
@@ -40,27 +40,27 @@ def init_classes():
class Info_get(RestResourceBase):
# test plugin injection within annotation
# + test plugin on a simple field
version: Annotated[str, Field(plugin=ResourcePlugin_version_get)]
version: Annotated[str, RestField(plugin=ResourcePlugin_version_get)]
api_version: str
class Info_put(RestResourceBase):
# test plugin injection within annotation
# + test plugin on a simple field
version: Annotated[str, Field(plugin=ResourcePlugin_version_put)]
version: Annotated[str, RestField(plugin=ResourcePlugin_version_put)]
api_version: str
@register_rest_rootpoint
class RootApp(RestResourceBase):
# test plugin injection within Field value
# + test plugin on a RestResourceBase field
info: Info_get = Field(
info: Info_get = RestField(
default=Info_get(version="0.0.1", api_version="0.0.2"),
plugin=ResourcePlugin_Info,
)
info_put: Info_put = Field(
info_put: Info_put = RestField(
default=Info_put(version="0.0.1", api_version="0.0.2"),
)
info2: Info_get = Field(default=Info_get(version="0.0.2", api_version="0.0.3"))
info2: Info_get = RestField(default=Info_get(version="0.0.2", api_version="0.0.3"))
# this add the classes to globals to allow using them later on
# => this is only for uinit-testing purpose and is not needed in real use
@@ -75,11 +75,11 @@ def init_bad_plugin1():
...
class TestResource(RestResourceBase):
tetvaluestr: Annotated[str, Field(plugin=ResourcePlugin_TestResource)]
tetvaluestr: Annotated[str, RestField(plugin=ResourcePlugin_TestResource)]
@register_rest_rootpoint
class RootApp2(RestResourceBase):
test: TestResource = Field(default=TestResource(tetvaluestr="testvalue"))
test: TestResource = RestField(default=TestResource(tetvaluestr="testvalue"))
RootApp2()

View File

@@ -0,0 +1,193 @@
from __future__ import annotations
import unittest
from os import chdir
from pathlib import Path
from typing import Optional
from src.pyrestresource import (
RestField,
register_rest_rootpoint,
RestResourceBase,
rsrc_verb,
RestRequestParams_GET,
RestRequestParams_POST,
RestRequestParams_Dict_GET,
RestRequestParams_PUT,
RestRequestParams_Dict_elem_GET,
RestRequestParams_Dict_elem_PUT,
RestRequestParams_Dict_DELETE,
RestRequestParams_Dict_POST,
T_SupportedRESTFields,
ResourcePlugin_dict_default,
)
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
# to allow mock-ing, all the tested classes are in a function
def init_classes():
class Test_Record(RestResourceBase):
test_str: str
test_int: int
class ResourcePlugin_dict_Test_Record(ResourcePlugin_dict_default[str, Test_Record]):
static_Test_Record_active: bool = True
static_Test_Record = Test_Record(test_str="mytest", test_int=84)
def handle_dict_get_keys(
self,
resource_dict: dict[str, Test_Record],
params: RestRequestParams_Dict_GET,
) -> list[str]:
result = super().handle_dict_get_keys(resource_dict, params)
if self.static_Test_Record_active is True:
result.append("static_elem")
return result
def handle_dict_elem_get(
self,
resource_dict: dict[str, Test_Record],
key: str,
params: RestRequestParams_Dict_elem_GET,
) -> Test_Record:
if key == "static_elem":
if self.static_Test_Record_active is True:
return self.static_Test_Record
else:
raise RuntimeError("Key Not Found")
return super().handle_dict_elem_get(resource_dict, key, params)
def handle_dict_delete(
self,
resource_dict: dict[str, Test_Record],
key: str,
params: RestRequestParams_Dict_DELETE[str],
) -> None:
if key == "static_elem":
self.static_Test_Record_active = False
else:
super().handle_dict_delete(resource_dict, key, params)
def handle_dict_delete_all(
self,
resource_dict: dict[str, Test_Record],
params: RestRequestParams_Dict_DELETE[str],
) -> None:
self.static_Test_Record_active = False
super().handle_dict_delete_all(resource_dict, params)
def handle_dict_elem_put(
self,
resource_dict: dict[str, Test_Record],
key: str,
resource: Test_Record,
params: RestRequestParams_Dict_elem_PUT,
) -> None:
if key == "static_elem":
if self.static_Test_Record_active is True:
self.static_Test_Record = resource
else:
raise RuntimeError("Key Not Found")
else:
super().handle_dict_elem_put(resource_dict, key, resource, params)
def handle_dict_post(
self,
resource_dict: dict[str, Test_Record],
key: str,
resource: Test_Record,
params: RestRequestParams_Dict_POST[str],
) -> Optional[str]:
resource.test_int = resource.test_int + 1
return super().handle_dict_post(resource_dict, key, resource, params)
@register_rest_rootpoint
class RootApp(RestResourceBase):
str_dict_Test_Record: dict[str, Test_Record] = RestField(
default={"test": Test_Record(test_str="Hi", test_int=42)},
plugin=ResourcePlugin_dict_Test_Record,
)
# this add the classes to globals to allow using them later on
# => this is only for uinit-testing purpose and is not needed in real use
globals()[Test_Record.__name__] = Test_Record
globals()[RootApp.__name__] = RootApp
class Test_RestAPI_Plugin_Dict(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
init_classes()
self.testapp = RootApp()
def test_get_root(self):
result = self.testapp.process_request("/", rsrc_verb.GET)
self.assertEqual(result.get_result(), "{}")
# print(result.get_result())
def test_get_dict_keys(self):
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["test", "static_elem"]')
def test_get_dict_elems(self):
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi", "test_int": 42}')
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "mytest", "test_int": 84}')
def test_delete_dict_elems(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_delete_all_dict_elems(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_delete_dict_elems_API_key(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["static_elem"]')
self.testapp.process_request("/str_dict_Test_Record?API_key=static_elem", rsrc_verb.DELETE)
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), "[]")
def test_put_dict_elem(self):
self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.PUT, '{"test_str": "Hi", "test_int": 43}')
result = self.testapp.process_request("/str_dict_Test_Record/test", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi", "test_int": 43}')
self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.PUT, '{"test_str": "Hi you", "test_int": 7}')
result = self.testapp.process_request("/str_dict_Test_Record/static_elem", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi you", "test_int": 7}')
def test_post_dict_elem(self):
result = self.testapp.process_request("/str_dict_Test_Record?API_key=newval", rsrc_verb.POST, '{"test_str": "Hi2", "test_int": 77}')
self.assertEqual(result.get_result(), '"newval"')
result = self.testapp.process_request("/str_dict_Test_Record/newval", rsrc_verb.GET)
self.assertEqual(result.get_result(), '{"test_str": "Hi2", "test_int": 78}')
result = self.testapp.process_request("/str_dict_Test_Record", rsrc_verb.GET)
self.assertEqual(result.get_result(), '["test", "newval", "static_elem"]')

View File

@@ -1,18 +1,13 @@
from __future__ import annotations
import unittest
from typing import Optional
from os import chdir
from pathlib import Path
from pydantic import Field
from io import StringIO
from contextlib import redirect_stdout
print(__name__)
print(__package__)
from src.pyrestresource import (
RestField,
RestResourceBase,
)
@@ -80,7 +75,7 @@ def init_classes():
last_name: str
class RootApp(RestResourceBase):
info: Info = Field(default=Info(version="0.0.1", api_version="0.0.2"))
info: Info = RestField(default=Info(version="0.0.1", api_version="0.0.2"))
info2: Info = Info(version="0.0.2", api_version="0.0.3")
peoples: dict[str, People] = {
"john": People(last_name="Doe"),

View File

@@ -1,17 +1,11 @@
from __future__ import annotations
import unittest
from typing import Optional
from os import chdir
from pathlib import Path
from pydantic import Field
print(__name__)
print(__package__)
from src.pyrestresource import (
RestField,
RestResourceBase,
)
@@ -80,7 +74,7 @@ def init_classes():
last_name: str
class RootApp(RestResourceBase):
info: Info = Field(default=Info(version="0.0.1", api_version="0.0.2"))
info: Info = RestField(default=Info(version="0.0.1", api_version="0.0.2"))
info2: Info = Info(version="0.0.2", api_version="0.0.3")
peoples: dict[str, People] = {
"john": People(last_name="Doe"),

View File

@@ -1,24 +1,19 @@
from __future__ import annotations
import unittest
from unittest.mock import patch
from os import chdir
from pathlib import Path
from typing import Optional
from pydantic import Field
from uuid import UUID, uuid4
from time import time, sleep
import json
import uvicorn
import socket
import requests
from contextlib import closing
from multiprocessing import Process
from requests.adapters import HTTPAdapter
print(__name__)
print(__package__)
import requests
from requests.adapters import HTTPAdapter
import uvicorn
from src.pyrestresource import (
RestField,
register_rest_rootpoint,
RestResourceBase,
rsrc_verb,
@@ -27,7 +22,8 @@ from src.pyrestresource import (
RestRequestParams_Dict_GET,
T_SupportedRESTFields,
)
from pprint import pprint
from test import ThreadedUvicorn
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
@@ -40,19 +36,19 @@ def init_classes():
api_version: str
class Patch(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
class Profile(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
class Game(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
@@ -63,9 +59,9 @@ def init_classes():
Patch_2 = Patch(uuid="d385a1d2-65fa-11ee-8c99-0242ac120002", shortname="testPatch2")
class User(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
name: str
secret: str = Field(..., exclude=True)
secret: str = RestField(..., exclude=True)
User1 = User(
uuid="8da57a3c-661f-11ee-8c99-0242ac120002",
@@ -74,7 +70,7 @@ def init_classes():
)
class Patch2(RestResourceBase):
uuid: UUID = Field(default_factory=uuid4, primary_key=True)
uuid: UUID = RestField(default_factory=uuid4, primary_key=True)
shortname: str
name: Optional[str] = None
description: Optional[str] = None
@@ -120,25 +116,16 @@ def find_free_port():
return "localhost", s.getsockname()[1]
def launch_server(ip, port):
init_classes()
uvicorn.run(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True)
class Test_RestAPI_WebServer(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
def test_nomal_AllCmd_games(self):
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -272,23 +259,19 @@ class Test_RestAPI_WebServer(unittest.TestCase):
data = response.json()
self.assertTrue(len(data) == 0)
finally:
proc.terminate()
s.close()
server.stop()
# @unittest.skip
@unittest.skip
def test_perf_dict(self):
print(f"SOCKET PERF TEST")
n_loop = 10000
ip, port = find_free_port()
proc = Process(
target=launch_server,
args=(
ip,
port,
),
)
proc.start()
init_classes()
server = ThreadedUvicorn(uvicorn.Config(f"{__loader__.name}:RootApp", port=port, host="0.0.0.0", log_level="warning", factory=True))
server.start()
sleep(1)
s = requests.Session()
s.mount("http://", HTTPAdapter(max_retries=0))
@@ -366,5 +349,5 @@ class Test_RestAPI_WebServer(unittest.TestCase):
print(f"PUT/GET 2nd level (value) dict: {int(n_loop/(end-start))} Req/s")
finally:
proc.terminate()
s.close()
server.stop()