Compare commits

...

1 Commits

Author SHA1 Message Date
cclecle
6efd914de1 non finished work 2025-09-28 17:33:34 +02:00
5 changed files with 398 additions and 33 deletions

View File

@@ -2,14 +2,14 @@ from typing import Any, Union, Optional, List, Dict, Tuple, Set, FrozenSet, Anno
from types import SimpleNamespace
import math
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
ALLOWED_MODEL_FIELDS_TYPES: set[type[Any], ...] = {
str,
int,
float,
complex,
bool,
bytes,
)
}
ALLOWED_ANNOTATIONS: dict[str, Any] = {
"Union": Union,

View File

@@ -1,4 +1,4 @@
from typing import Generic, TypeVar, Optional, Any, Self
from typing import Generic, TypeVar, Optional, Any, Self, Annotated, get_origin, get_args
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from copy import deepcopy
from .lam_field_info import LAMFieldInfo
@@ -13,19 +13,29 @@ TV_LABField = TypeVar("TV_LABField")
class LAMField(Generic[TV_LABField]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, val: Optional[TV_LABField], a: Any, i: LAMFieldInfo):
def __init__(self, name: str, val: Optional[TV_LABField], ann: Any, i: LAMFieldInfo):
self._default_value: Optional[TV_LABField]
self._value: Optional[TV_LABField]
self.__annotations: Any
self.__name: str = name
self.__source: Optional[type] = None
self.__info: LAMFieldInfo = deepcopy(i)
self.__annotations: Any = LAMdeepfreeze(a)
self._set_annotations(ann)
self.__frozen: bool = False
self._frozen_value: Any = None
self.__frozen_value_set: True = False
self.validate(val)
self._init_value(val)
def _set_annotations(self, ann: Any) -> None:
_origin = get_origin(ann) or ann
_args = get_args(ann)
if _origin is Annotated:
self.__annotations: Any = LAMdeepfreeze(_args[0])
else:
self.__annotations: Any = LAMdeepfreeze(ann)
def _init_value(self, val: Optional[TV_LABField | FreezableElement]):
self._default_value: Optional[TV_LABField] = deepcopy(val)
self._value: Optional[TV_LABField] = val

View File

@@ -177,7 +177,7 @@ class IAppliance(IBaseElement): ...
def _check_annotation_definition(_type, first_layer: bool = True): # pylint: disable=too-complex,too-many-return-statements
# print(f"_type={_type}, {first_layer}")
print(f"_type={_type}, {first_layer}")
_origin = get_origin(_type) or _type
_args = get_args(_type)
@@ -194,7 +194,7 @@ def _check_annotation_definition(_type, first_layer: bool = True): # pylint: di
raise UnsupportedFieldType(
"Union[] is only supported to implement Optional (takes 2 parameters) and is only supported as parent annotation"
)
return all(_check_annotation_definition(_, False) for _ in get_args(_type) if _ is not type(None))
return any(_check_annotation_definition(_, False) for _ in get_args(_type) if _ is not type(None))
# handle Dict[...]
if _origin is dict:
@@ -202,7 +202,8 @@ def _check_annotation_definition(_type, first_layer: bool = True): # pylint: di
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
if not _args[0] in ALLOWED_MODEL_FIELDS_TYPES:
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {_type}")
return _check_annotation_definition(_args[1], False)
# return _check_annotation_definition(_args[1], False)
return any(_check_annotation_definition(_, False) for _ in _args)
# handle Tuple[]
if _origin is tuple:
@@ -210,13 +211,13 @@ def _check_annotation_definition(_type, first_layer: bool = True): # pylint: di
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(_args) == 2 and _args[1] is Ellipsis:
return _check_annotation_definition(_args[0], False)
return all(_check_annotation_definition(_, False) for _ in _args)
return any(_check_annotation_definition(_, False) for _ in _args)
# handle Set[],Tuple[],FrozenSet[],List[]
if _origin in [set, frozenset, tuple, list]:
if len(_args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all(_check_annotation_definition(_, False) for _ in _args)
return any(_check_annotation_definition(_, False) for _ in _args)
if isinstance(_origin, type):
if issubclass(_origin, IElement):
@@ -224,9 +225,9 @@ def _check_annotation_definition(_type, first_layer: bool = True): # pylint: di
elif issubclass(_origin, IAppliance):
raise UnsupportedFieldType(f"Nested Appliance are not supported: {_type}")
if _type in ALLOWED_MODEL_FIELDS_TYPES:
if _origin in ALLOWED_MODEL_FIELDS_TYPES:
return
raise UnsupportedFieldType(_type)
raise UnsupportedFieldType(_origin)
def _check_initializer_safety(func) -> None:

View File

@@ -1,16 +1,21 @@
"""library's internal tools"""
from typing import Any, get_origin, get_args
from collections import ChainMap
from typing import Any, Annotated, get_origin, get_args, Union, Self, Optional, List, Dict, Tuple, Set, FrozenSet, Mapping, Callable
import typing
from dataclasses import dataclass
from types import UnionType, NoneType
import types
from uuid import UUID
from datetime import datetime
import json
import inspect
from frozendict import deepfreeze
from .defines import (
ALLOWED_ANNOTATIONS,
)
from frozendict import deepfreeze, frozendict
from .defines import ALLOWED_ANNOTATIONS, ALLOWED_MODEL_FIELDS_TYPES
from .exception import IncompletelyAnnotatedField, UnsupportedFieldType
class LAMJSONEncoder(json.JSONEncoder):
@@ -48,24 +53,327 @@ def is_data_attribute(name: str, value: any) -> bool:
return True
"""
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
"""
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
class AnnotationWalkerCtx:
def __init__(
self,
origin: Any,
args: Any,
layer: int,
parent: Optional[Self] = None,
allowed_types: set[type, ...] = frozenset(),
allowed_annotations: dict[str, Any] = {},
):
self.__origin = origin
self.args = args
self.__layer = layer
self.__parent = parent
self.__allowed_types: set[type, ...] = allowed_types
self.__allowed_annotations: dict[str, Any] = allowed_annotations
self.__ext: dict[Any, ChainMap] = {} # per-trigger namespaces (lazy)
@property
def origin(self) -> Any:
return self.__origin
@property
def layer(self) -> int:
return self.__layer
@property
def parent(self) -> Self:
return self.__parent
@property
def allowed_types(self) -> FrozenSet[type]:
return self.__allowed_types
@property
def allowed_annotations(self) -> Mapping[str, Any]:
return self.__allowed_annotations
def ns(self, owner: Any) -> ChainMap:
"""
A per-trigger overlay namespace that inherits from parent ctx.
Use as: bag = ctx.ns(self); bag['whatever'] = ...
Lookups fall back to parent's bag automatically.
"""
if owner in self.__ext:
return self.__ext[owner]
parent_map = self.__parent.__ext.get(owner) if (self.__parent and hasattr(self.__parent, "_AnnotationWalkerCtx__ext")) else {}
cm = ChainMap({}, parent_map if isinstance(parent_map, ChainMap) else dict(parent_map))
self.__ext[owner] = cm
return cm
@dataclass(frozen=True)
class TriggerResult:
# If provided, children won't be walked and this value is returned.
replace_with: Any | None = None
# If true, skip walking children but don't replace current node value.
skip_children: bool = False
# If provided, walker will restart processing with the given value
restart_with: Any | None = None # NEW
@staticmethod
def passthrough() -> Self:
return TriggerResult()
@staticmethod
def replace(value: Any) -> Self:
return TriggerResult(replace_with=value, skip_children=True)
@staticmethod
def skip() -> Self:
return TriggerResult(skip_children=True)
@staticmethod
def restart(value: Any) -> Self:
print("Doo!")
return TriggerResult(restart_with=value)
class AnnotationTrigger:
def init_trigger(self) -> None:
pass
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_unknown(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
class LAMSchemaValidation(AnnotationTrigger):
def init_trigger(self) -> None:
print(f"Initializing {self.__class__.__name__}")
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_annotated")
print(ctx.origin)
print(ctx.args)
if len(ctx.args) != 2:
raise UnsupportedFieldType("Annotated[T,x] requires 2 parameters")
if ctx.parent is not None:
raise UnsupportedFieldType("Annotated[T,x] is only supported as parent annotation")
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_union")
print(ctx.args)
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_dict")
if len(ctx.args) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
if not ctx.args[0] in ctx.allowed_types:
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_tuple")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_list")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_set")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_allowed")
if ctx.origin is type(None) or ctx.origin is None:
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
return None
class AnnotationWalker:
DEFAULT_ALLOWED_TYPES = frozenset({str, int, float, complex, bool, bytes, NoneType})
DEFAULT_ALLOWED_ANNOTATIONS: dict[str, Any] = frozendict(
{
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
)
def __init__(self, ann: Any, triggers: tuple[AnnotationTrigger, ...], **kwargs):
if not triggers:
raise RuntimeError("AnnotationWalker requires trigger(s)")
# Normalize triggers into instances
insts: list[AnnotationTrigger] = []
for t in triggers if isinstance(triggers, tuple) else (triggers,):
if isinstance(t, AnnotationTrigger):
insts.append(t)
elif isinstance(t, type) and issubclass(t, AnnotationTrigger):
insts.append(t())
else:
raise RuntimeError(f"Unsupported trigger: {t}")
self._triggers = tuple(insts)
# Allowed types / annotations
atypes = set(type(self).DEFAULT_ALLOWED_TYPES)
if "ex_allowed_types" in kwargs:
atypes.update(kwargs["ex_allowed_types"])
self._allowed_types = frozenset(atypes)
annots = dict(type(self).DEFAULT_ALLOWED_ANNOTATIONS)
if "ex_allowed_annotations" in kwargs:
annots.update(kwargs["ex_allowed_annotations"])
self._allowed_annotations = frozendict(annots)
# Annotation can be string
self.__ann = ann
if isinstance(ann, str):
self.__ann = eval(ann, {"__builtins__": {}}, self._allowed_annotations)
def run(self) -> TriggerResult:
for trigger in self._triggers:
trigger.init_trigger()
return self._walk(self.__ann, None)
# --- Helpers ---
def _new_ctx(self, origin, args, layer, parent):
return AnnotationWalkerCtx(origin, args, layer, parent, self._allowed_types, self._allowed_annotations)
def _apply_triggers(self, method: str, ctx: AnnotationWalkerCtx) -> TriggerResult:
final = TriggerResult.passthrough()
for trig in self._triggers:
res = getattr(trig, method)(ctx)
if not res:
continue
if res.restart_with is not None:
return res # short-circuit on restart
if res.replace_with is not None:
final = TriggerResult.replace(res.replace_with)
if res.skip_children:
final = TriggerResult(
replace_with=final.replace_with,
skip_children=True,
)
return final
def _handle_with_triggers(
self,
trigger_name: str,
ctx: AnnotationWalkerCtx,
args_handler: Callable[[AnnotationWalkerCtx], Any] | None = None,
) -> Any:
"""Generic handler: run triggers, maybe recurse into args with a custom handler."""
res = self._apply_triggers(trigger_name, ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
if not res.skip_children:
if args_handler:
return args_handler(ctx)
return tuple(self._walk(a, ctx) for a in ctx.args)
return None
def _walk_args_tuple(self, ctx: AnnotationWalkerCtx):
# special Ellipsis case for Tuple
if len(ctx.args) == 2 and ctx.args[1] is Ellipsis:
return (self._walk(ctx.args[0], ctx), Ellipsis)
return tuple(self._walk(a, ctx) for a in ctx.args)
# --- Dispatcher ---
def _walk(self, type_: Any, parent_ctx: Optional[AnnotationWalkerCtx]) -> Any:
print(f"[{parent_ctx.layer if parent_ctx else 0}] walking through: {type_}")
origin = get_origin(type_) or type_
if origin is None:
origin = NoneType
if origin is Union:
origin = UnionType
if not isinstance(origin, type):
raise RuntimeError("Annotation must be using type(s), not instances")
args = get_args(type_)
layer = 0 if parent_ctx is None else parent_ctx.layer + 1
ctx = self._new_ctx(origin, args, layer, parent_ctx)
print(origin)
match origin:
case typing.Annotated:
return self._handle_with_triggers(
"process_annotated", ctx, args_handler=lambda c: self._walk(c.args[0], c) if c.args else None
)
case types.UnionType:
return self._handle_with_triggers("process_union", ctx)
case _ if issubclass(origin, dict):
return self._handle_with_triggers("process_dict", ctx)
case _ if issubclass(origin, tuple):
return self._handle_with_triggers("process_tuple", ctx, self._walk_args_tuple)
case _ if issubclass(origin, list):
return self._handle_with_triggers("process_list", ctx)
case _ if issubclass(origin, set):
return self._handle_with_triggers("process_set", ctx)
case _ if origin in self._allowed_types:
return self._handle_with_triggers("process_allowed", ctx)
case _:
res = self._apply_triggers("process_unknown", ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
raise UnsupportedFieldType(f"Not supported Field: {ctx.origin}, " f"Supported list: {self._allowed_types}")

View File

@@ -0,0 +1,46 @@
# dabmodel (c) by chacha
#
# dabmodel is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from typing import Annotated, Union, Optional
import sys
import subprocess
from os import chdir, environ
from pathlib import Path
print(__name__)
print(__package__)
from src import dabmodel as dm
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class ElementTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def test_element_simple(self):
print(isinstance(None, type(None)))
print("\n== From OBJs ==")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[str]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
print("\n== From STRING ==")
res = dm.tools.AnnotationWalker('Annotated[Optional[dict[int, list[str]]], "comment"]', (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[None]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
# ---------- main ----------
if __name__ == "__main__":
unittest.main()