Compare commits

..

3 Commits

Author SHA1 Message Date
cclecle
85a2ca2753 work 2025-10-12 22:45:56 +02:00
chacha
df50632458 work 2025-09-29 21:14:24 +02:00
cclecle
25d5339946 work 2025-09-28 21:18:35 +02:00
2 changed files with 25 additions and 322 deletions

View File

@@ -1,7 +1,22 @@
"""library's internal tools"""
from collections import ChainMap
from typing import Any, Annotated, get_origin, get_args, Union, Self, Optional, List, Dict, Tuple, Set, FrozenSet, Mapping, Callable
from typing import (
Any,
Annotated,
get_origin,
get_args,
Union,
Self,
Optional,
List,
Dict,
Tuple,
Set,
FrozenSet,
Mapping,
Callable,
)
import typing
from dataclasses import dataclass
from types import UnionType, NoneType
@@ -58,322 +73,3 @@ def _resolve_annotation(ann):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
class AnnotationWalkerCtx:
def __init__(
self,
origin: Any,
args: Any,
layer: int,
parent: Optional[Self] = None,
allowed_types: set[type, ...] = frozenset(),
allowed_annotations: dict[str, Any] = {},
):
self.__origin = origin
self.args = args
self.__layer = layer
self.__parent = parent
self.__allowed_types: set[type, ...] = allowed_types
self.__allowed_annotations: dict[str, Any] = allowed_annotations
self.__ext: dict[Any, ChainMap] = {} # per-trigger namespaces (lazy)
@property
def origin(self) -> Any:
return self.__origin
@property
def layer(self) -> int:
return self.__layer
@property
def parent(self) -> Self:
return self.__parent
@property
def allowed_types(self) -> FrozenSet[type]:
return self.__allowed_types
@property
def allowed_annotations(self) -> Mapping[str, Any]:
return self.__allowed_annotations
def ns(self, owner: Any) -> ChainMap:
"""
A per-trigger overlay namespace that inherits from parent ctx.
Use as: bag = ctx.ns(self); bag['whatever'] = ...
Lookups fall back to parent's bag automatically.
"""
if owner in self.__ext:
return self.__ext[owner]
parent_map = self.__parent.__ext.get(owner) if (self.__parent and hasattr(self.__parent, "_AnnotationWalkerCtx__ext")) else {}
cm = ChainMap({}, parent_map if isinstance(parent_map, ChainMap) else dict(parent_map))
self.__ext[owner] = cm
return cm
@dataclass(frozen=True)
class TriggerResult:
# If provided, children won't be walked and this value is returned.
replace_with: Any | None = None
# If true, skip walking children but don't replace current node value.
skip_children: bool = False
# If provided, walker will restart processing with the given value
restart_with: Any | None = None # NEW
@staticmethod
def passthrough() -> Self:
return TriggerResult()
@staticmethod
def replace(value: Any) -> Self:
return TriggerResult(replace_with=value, skip_children=True)
@staticmethod
def skip() -> Self:
return TriggerResult(skip_children=True)
@staticmethod
def restart(value: Any) -> Self:
print("Doo!")
return TriggerResult(restart_with=value)
class AnnotationTrigger:
def init_trigger(self) -> None:
pass
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_unknown(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
class LAMSchemaValidation(AnnotationTrigger):
def init_trigger(self) -> None:
print(f"Initializing {self.__class__.__name__}")
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_annotated")
print(ctx.origin)
print(ctx.args)
if len(ctx.args) != 2:
raise UnsupportedFieldType("Annotated[T,x] requires 2 parameters")
if ctx.parent is not None:
raise UnsupportedFieldType("Annotated[T,x] is only supported as parent annotation")
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_union")
print(ctx.args)
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_dict")
if len(ctx.args) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
if not ctx.args[0] in ctx.allowed_types:
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_tuple")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_list")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_set")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_allowed")
if ctx.origin is type(None) or ctx.origin is None:
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
return None
class AnnotationWalker:
DEFAULT_ALLOWED_TYPES = frozenset({str, int, float, complex, bool, bytes, NoneType})
DEFAULT_ALLOWED_ANNOTATIONS: dict[str, Any] = frozendict(
{
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
)
def __init__(self, ann: Any, triggers: tuple[AnnotationTrigger, ...], **kwargs):
if not triggers:
raise RuntimeError("AnnotationWalker requires trigger(s)")
# Normalize triggers into instances
insts: list[AnnotationTrigger] = []
for t in triggers if isinstance(triggers, tuple) else (triggers,):
if isinstance(t, AnnotationTrigger):
insts.append(t)
elif isinstance(t, type) and issubclass(t, AnnotationTrigger):
insts.append(t())
else:
raise RuntimeError(f"Unsupported trigger: {t}")
self._triggers = tuple(insts)
# Allowed types / annotations
atypes = set(type(self).DEFAULT_ALLOWED_TYPES)
if "ex_allowed_types" in kwargs:
atypes.update(kwargs["ex_allowed_types"])
self._allowed_types = frozenset(atypes)
annots = dict(type(self).DEFAULT_ALLOWED_ANNOTATIONS)
if "ex_allowed_annotations" in kwargs:
annots.update(kwargs["ex_allowed_annotations"])
self._allowed_annotations = frozendict(annots)
# Annotation can be string
self.__ann = ann
if isinstance(ann, str):
self.__ann = eval(ann, {"__builtins__": {}}, self._allowed_annotations)
def run(self) -> TriggerResult:
for trigger in self._triggers:
trigger.init_trigger()
return self._walk(self.__ann, None)
# --- Helpers ---
def _new_ctx(self, origin, args, layer, parent):
return AnnotationWalkerCtx(origin, args, layer, parent, self._allowed_types, self._allowed_annotations)
def _apply_triggers(self, method: str, ctx: AnnotationWalkerCtx) -> TriggerResult:
final = TriggerResult.passthrough()
for trig in self._triggers:
res = getattr(trig, method)(ctx)
if not res:
continue
if res.restart_with is not None:
return res # short-circuit on restart
if res.replace_with is not None:
final = TriggerResult.replace(res.replace_with)
if res.skip_children:
final = TriggerResult(
replace_with=final.replace_with,
skip_children=True,
)
return final
def _handle_with_triggers(
self,
trigger_name: str,
ctx: AnnotationWalkerCtx,
args_handler: Callable[[AnnotationWalkerCtx], Any] | None = None,
) -> Any:
"""Generic handler: run triggers, maybe recurse into args with a custom handler."""
res = self._apply_triggers(trigger_name, ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
if not res.skip_children:
if args_handler:
return args_handler(ctx)
return tuple(self._walk(a, ctx) for a in ctx.args)
return None
def _walk_args_tuple(self, ctx: AnnotationWalkerCtx):
# special Ellipsis case for Tuple
if len(ctx.args) == 2 and ctx.args[1] is Ellipsis:
return (self._walk(ctx.args[0], ctx), Ellipsis)
return tuple(self._walk(a, ctx) for a in ctx.args)
# --- Dispatcher ---
def _walk(self, type_: Any, parent_ctx: Optional[AnnotationWalkerCtx]) -> Any:
print(f"[{parent_ctx.layer if parent_ctx else 0}] walking through: {type_}")
origin = get_origin(type_) or type_
if origin is None:
origin = NoneType
if origin is Union:
origin = UnionType
if not isinstance(origin, type):
raise RuntimeError("Annotation must be using type(s), not instances")
args = get_args(type_)
layer = 0 if parent_ctx is None else parent_ctx.layer + 1
ctx = self._new_ctx(origin, args, layer, parent_ctx)
print(origin)
match origin:
case typing.Annotated:
return self._handle_with_triggers(
"process_annotated", ctx, args_handler=lambda c: self._walk(c.args[0], c) if c.args else None
)
case types.UnionType:
return self._handle_with_triggers("process_union", ctx)
case _ if issubclass(origin, dict):
return self._handle_with_triggers("process_dict", ctx)
case _ if issubclass(origin, tuple):
return self._handle_with_triggers("process_tuple", ctx, self._walk_args_tuple)
case _ if issubclass(origin, list):
return self._handle_with_triggers("process_list", ctx)
case _ if issubclass(origin, set):
return self._handle_with_triggers("process_set", ctx)
case _ if origin in self._allowed_types:
return self._handle_with_triggers("process_allowed", ctx)
case _:
res = self._apply_triggers("process_unknown", ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
raise UnsupportedFieldType(f"Not supported Field: {ctx.origin}, " f"Supported list: {self._allowed_types}")

View File

@@ -22,11 +22,18 @@ testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class ElementTest(unittest.TestCase):
class AnnotationsWalkerTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def test_element_simple(self):
def test_validate(self):
ann = dict[int, list[int]] | dict[int, list[int | str]]
val = {1: [2], 2: ["a", [1]]}
res = dm.tools.AnnotationWalker(ann, (dm.tools.HorizontalValidationTrigger(val),))
res.run()
def test_simple(self):
print(isinstance(None, type(None)))
print("\n== From OBJs ==")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[str]]], "comment"], (dm.tools.LAMSchemaValidation(),))