|
|
|
|
@@ -1,7 +1,22 @@
|
|
|
|
|
"""library's internal tools"""
|
|
|
|
|
|
|
|
|
|
from collections import ChainMap
|
|
|
|
|
from typing import Any, Annotated, get_origin, get_args, Union, Self, Optional, List, Dict, Tuple, Set, FrozenSet, Mapping, Callable
|
|
|
|
|
from typing import (
|
|
|
|
|
Any,
|
|
|
|
|
Annotated,
|
|
|
|
|
get_origin,
|
|
|
|
|
get_args,
|
|
|
|
|
Union,
|
|
|
|
|
Self,
|
|
|
|
|
Optional,
|
|
|
|
|
List,
|
|
|
|
|
Dict,
|
|
|
|
|
Tuple,
|
|
|
|
|
Set,
|
|
|
|
|
FrozenSet,
|
|
|
|
|
Mapping,
|
|
|
|
|
Callable,
|
|
|
|
|
)
|
|
|
|
|
import typing
|
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
from types import UnionType, NoneType
|
|
|
|
|
@@ -58,322 +73,3 @@ def _resolve_annotation(ann):
|
|
|
|
|
# Safe eval against a **whitelist** only
|
|
|
|
|
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
|
|
|
|
|
return ann
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnnotationWalkerCtx:
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
origin: Any,
|
|
|
|
|
args: Any,
|
|
|
|
|
layer: int,
|
|
|
|
|
parent: Optional[Self] = None,
|
|
|
|
|
allowed_types: set[type, ...] = frozenset(),
|
|
|
|
|
allowed_annotations: dict[str, Any] = {},
|
|
|
|
|
):
|
|
|
|
|
self.__origin = origin
|
|
|
|
|
self.args = args
|
|
|
|
|
self.__layer = layer
|
|
|
|
|
self.__parent = parent
|
|
|
|
|
|
|
|
|
|
self.__allowed_types: set[type, ...] = allowed_types
|
|
|
|
|
self.__allowed_annotations: dict[str, Any] = allowed_annotations
|
|
|
|
|
self.__ext: dict[Any, ChainMap] = {} # per-trigger namespaces (lazy)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def origin(self) -> Any:
|
|
|
|
|
return self.__origin
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def layer(self) -> int:
|
|
|
|
|
return self.__layer
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def parent(self) -> Self:
|
|
|
|
|
return self.__parent
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def allowed_types(self) -> FrozenSet[type]:
|
|
|
|
|
return self.__allowed_types
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def allowed_annotations(self) -> Mapping[str, Any]:
|
|
|
|
|
return self.__allowed_annotations
|
|
|
|
|
|
|
|
|
|
def ns(self, owner: Any) -> ChainMap:
|
|
|
|
|
"""
|
|
|
|
|
A per-trigger overlay namespace that inherits from parent ctx.
|
|
|
|
|
Use as: bag = ctx.ns(self); bag['whatever'] = ...
|
|
|
|
|
Lookups fall back to parent's bag automatically.
|
|
|
|
|
"""
|
|
|
|
|
if owner in self.__ext:
|
|
|
|
|
return self.__ext[owner]
|
|
|
|
|
parent_map = self.__parent.__ext.get(owner) if (self.__parent and hasattr(self.__parent, "_AnnotationWalkerCtx__ext")) else {}
|
|
|
|
|
cm = ChainMap({}, parent_map if isinstance(parent_map, ChainMap) else dict(parent_map))
|
|
|
|
|
self.__ext[owner] = cm
|
|
|
|
|
return cm
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
|
|
|
class TriggerResult:
|
|
|
|
|
# If provided, children won't be walked and this value is returned.
|
|
|
|
|
replace_with: Any | None = None
|
|
|
|
|
# If true, skip walking children but don't replace current node value.
|
|
|
|
|
skip_children: bool = False
|
|
|
|
|
# If provided, walker will restart processing with the given value
|
|
|
|
|
restart_with: Any | None = None # NEW
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def passthrough() -> Self:
|
|
|
|
|
return TriggerResult()
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def replace(value: Any) -> Self:
|
|
|
|
|
return TriggerResult(replace_with=value, skip_children=True)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def skip() -> Self:
|
|
|
|
|
return TriggerResult(skip_children=True)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def restart(value: Any) -> Self:
|
|
|
|
|
print("Doo!")
|
|
|
|
|
return TriggerResult(restart_with=value)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnnotationTrigger:
|
|
|
|
|
|
|
|
|
|
def init_trigger(self) -> None:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_unknown(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LAMSchemaValidation(AnnotationTrigger):
|
|
|
|
|
def init_trigger(self) -> None:
|
|
|
|
|
print(f"Initializing {self.__class__.__name__}")
|
|
|
|
|
|
|
|
|
|
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_annotated")
|
|
|
|
|
print(ctx.origin)
|
|
|
|
|
print(ctx.args)
|
|
|
|
|
if len(ctx.args) != 2:
|
|
|
|
|
raise UnsupportedFieldType("Annotated[T,x] requires 2 parameters")
|
|
|
|
|
if ctx.parent is not None:
|
|
|
|
|
raise UnsupportedFieldType("Annotated[T,x] is only supported as parent annotation")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_union")
|
|
|
|
|
print(ctx.args)
|
|
|
|
|
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
|
|
|
|
|
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_dict")
|
|
|
|
|
if len(ctx.args) != 2:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
|
|
|
|
|
if not ctx.args[0] in ctx.allowed_types:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_tuple")
|
|
|
|
|
if len(ctx.args) == 0:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_list")
|
|
|
|
|
if len(ctx.args) == 0:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_set")
|
|
|
|
|
if len(ctx.args) == 0:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
|
|
|
|
print("process_allowed")
|
|
|
|
|
if ctx.origin is type(None) or ctx.origin is None:
|
|
|
|
|
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
|
|
|
|
|
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnnotationWalker:
|
|
|
|
|
DEFAULT_ALLOWED_TYPES = frozenset({str, int, float, complex, bool, bytes, NoneType})
|
|
|
|
|
DEFAULT_ALLOWED_ANNOTATIONS: dict[str, Any] = frozendict(
|
|
|
|
|
{
|
|
|
|
|
"Union": Union,
|
|
|
|
|
"Optional": Optional,
|
|
|
|
|
"List": List,
|
|
|
|
|
"Dict": Dict,
|
|
|
|
|
"Tuple": Tuple,
|
|
|
|
|
"Set": Set,
|
|
|
|
|
"FrozenSet": FrozenSet,
|
|
|
|
|
"Annotated": Annotated,
|
|
|
|
|
# builtins:
|
|
|
|
|
"int": int,
|
|
|
|
|
"str": str,
|
|
|
|
|
"float": float,
|
|
|
|
|
"bool": bool,
|
|
|
|
|
"complex": complex,
|
|
|
|
|
"bytes": bytes,
|
|
|
|
|
"None": type(None),
|
|
|
|
|
"list": list,
|
|
|
|
|
"dict": dict,
|
|
|
|
|
"set": set,
|
|
|
|
|
"frozenset": frozenset,
|
|
|
|
|
"tuple": tuple,
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __init__(self, ann: Any, triggers: tuple[AnnotationTrigger, ...], **kwargs):
|
|
|
|
|
if not triggers:
|
|
|
|
|
raise RuntimeError("AnnotationWalker requires trigger(s)")
|
|
|
|
|
|
|
|
|
|
# Normalize triggers into instances
|
|
|
|
|
insts: list[AnnotationTrigger] = []
|
|
|
|
|
for t in triggers if isinstance(triggers, tuple) else (triggers,):
|
|
|
|
|
if isinstance(t, AnnotationTrigger):
|
|
|
|
|
insts.append(t)
|
|
|
|
|
elif isinstance(t, type) and issubclass(t, AnnotationTrigger):
|
|
|
|
|
insts.append(t())
|
|
|
|
|
else:
|
|
|
|
|
raise RuntimeError(f"Unsupported trigger: {t}")
|
|
|
|
|
self._triggers = tuple(insts)
|
|
|
|
|
|
|
|
|
|
# Allowed types / annotations
|
|
|
|
|
atypes = set(type(self).DEFAULT_ALLOWED_TYPES)
|
|
|
|
|
if "ex_allowed_types" in kwargs:
|
|
|
|
|
atypes.update(kwargs["ex_allowed_types"])
|
|
|
|
|
self._allowed_types = frozenset(atypes)
|
|
|
|
|
|
|
|
|
|
annots = dict(type(self).DEFAULT_ALLOWED_ANNOTATIONS)
|
|
|
|
|
if "ex_allowed_annotations" in kwargs:
|
|
|
|
|
annots.update(kwargs["ex_allowed_annotations"])
|
|
|
|
|
self._allowed_annotations = frozendict(annots)
|
|
|
|
|
|
|
|
|
|
# Annotation can be string
|
|
|
|
|
self.__ann = ann
|
|
|
|
|
if isinstance(ann, str):
|
|
|
|
|
self.__ann = eval(ann, {"__builtins__": {}}, self._allowed_annotations)
|
|
|
|
|
|
|
|
|
|
def run(self) -> TriggerResult:
|
|
|
|
|
for trigger in self._triggers:
|
|
|
|
|
trigger.init_trigger()
|
|
|
|
|
return self._walk(self.__ann, None)
|
|
|
|
|
|
|
|
|
|
# --- Helpers ---
|
|
|
|
|
|
|
|
|
|
def _new_ctx(self, origin, args, layer, parent):
|
|
|
|
|
return AnnotationWalkerCtx(origin, args, layer, parent, self._allowed_types, self._allowed_annotations)
|
|
|
|
|
|
|
|
|
|
def _apply_triggers(self, method: str, ctx: AnnotationWalkerCtx) -> TriggerResult:
|
|
|
|
|
final = TriggerResult.passthrough()
|
|
|
|
|
for trig in self._triggers:
|
|
|
|
|
res = getattr(trig, method)(ctx)
|
|
|
|
|
if not res:
|
|
|
|
|
continue
|
|
|
|
|
if res.restart_with is not None:
|
|
|
|
|
return res # short-circuit on restart
|
|
|
|
|
if res.replace_with is not None:
|
|
|
|
|
final = TriggerResult.replace(res.replace_with)
|
|
|
|
|
if res.skip_children:
|
|
|
|
|
final = TriggerResult(
|
|
|
|
|
replace_with=final.replace_with,
|
|
|
|
|
skip_children=True,
|
|
|
|
|
)
|
|
|
|
|
return final
|
|
|
|
|
|
|
|
|
|
def _handle_with_triggers(
|
|
|
|
|
self,
|
|
|
|
|
trigger_name: str,
|
|
|
|
|
ctx: AnnotationWalkerCtx,
|
|
|
|
|
args_handler: Callable[[AnnotationWalkerCtx], Any] | None = None,
|
|
|
|
|
) -> Any:
|
|
|
|
|
"""Generic handler: run triggers, maybe recurse into args with a custom handler."""
|
|
|
|
|
res = self._apply_triggers(trigger_name, ctx)
|
|
|
|
|
if res.restart_with is not None:
|
|
|
|
|
return self._walk(res.restart_with, ctx.parent)
|
|
|
|
|
if res.replace_with is not None:
|
|
|
|
|
return res.replace_with
|
|
|
|
|
if not res.skip_children:
|
|
|
|
|
if args_handler:
|
|
|
|
|
return args_handler(ctx)
|
|
|
|
|
return tuple(self._walk(a, ctx) for a in ctx.args)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def _walk_args_tuple(self, ctx: AnnotationWalkerCtx):
|
|
|
|
|
# special Ellipsis case for Tuple
|
|
|
|
|
if len(ctx.args) == 2 and ctx.args[1] is Ellipsis:
|
|
|
|
|
return (self._walk(ctx.args[0], ctx), Ellipsis)
|
|
|
|
|
return tuple(self._walk(a, ctx) for a in ctx.args)
|
|
|
|
|
|
|
|
|
|
# --- Dispatcher ---
|
|
|
|
|
|
|
|
|
|
def _walk(self, type_: Any, parent_ctx: Optional[AnnotationWalkerCtx]) -> Any:
|
|
|
|
|
print(f"[{parent_ctx.layer if parent_ctx else 0}] walking through: {type_}")
|
|
|
|
|
|
|
|
|
|
origin = get_origin(type_) or type_
|
|
|
|
|
if origin is None:
|
|
|
|
|
origin = NoneType
|
|
|
|
|
if origin is Union:
|
|
|
|
|
origin = UnionType
|
|
|
|
|
if not isinstance(origin, type):
|
|
|
|
|
raise RuntimeError("Annotation must be using type(s), not instances")
|
|
|
|
|
|
|
|
|
|
args = get_args(type_)
|
|
|
|
|
layer = 0 if parent_ctx is None else parent_ctx.layer + 1
|
|
|
|
|
ctx = self._new_ctx(origin, args, layer, parent_ctx)
|
|
|
|
|
|
|
|
|
|
print(origin)
|
|
|
|
|
match origin:
|
|
|
|
|
case typing.Annotated:
|
|
|
|
|
return self._handle_with_triggers(
|
|
|
|
|
"process_annotated", ctx, args_handler=lambda c: self._walk(c.args[0], c) if c.args else None
|
|
|
|
|
)
|
|
|
|
|
case types.UnionType:
|
|
|
|
|
return self._handle_with_triggers("process_union", ctx)
|
|
|
|
|
case _ if issubclass(origin, dict):
|
|
|
|
|
return self._handle_with_triggers("process_dict", ctx)
|
|
|
|
|
case _ if issubclass(origin, tuple):
|
|
|
|
|
return self._handle_with_triggers("process_tuple", ctx, self._walk_args_tuple)
|
|
|
|
|
case _ if issubclass(origin, list):
|
|
|
|
|
return self._handle_with_triggers("process_list", ctx)
|
|
|
|
|
case _ if issubclass(origin, set):
|
|
|
|
|
return self._handle_with_triggers("process_set", ctx)
|
|
|
|
|
case _ if origin in self._allowed_types:
|
|
|
|
|
return self._handle_with_triggers("process_allowed", ctx)
|
|
|
|
|
case _:
|
|
|
|
|
res = self._apply_triggers("process_unknown", ctx)
|
|
|
|
|
if res.restart_with is not None:
|
|
|
|
|
return self._walk(res.restart_with, ctx.parent)
|
|
|
|
|
if res.replace_with is not None:
|
|
|
|
|
return res.replace_with
|
|
|
|
|
raise UnsupportedFieldType(f"Not supported Field: {ctx.origin}, " f"Supported list: {self._allowed_types}")
|
|
|
|
|
|