Compare commits

...

18 Commits

Author SHA1 Message Date
cclecle
6efd914de1 non finished work 2025-09-28 17:33:34 +02:00
cclecle
2e81b3f0e6 work 2025-09-27 23:58:47 +02:00
cclecle
5d206ef266 fix Feature + cleaning 2025-09-27 22:00:47 +02:00
chacha
ba993b14b9 work 2025-09-27 16:41:36 +02:00
chacha
5232ad66fc some progress 2025-09-26 00:50:52 +02:00
chacha
44554006db work 2025-09-25 01:40:05 +02:00
chacha
1e84fd691e clean 2025-09-24 19:10:03 +02:00
chacha
2ac931edeb more work 2025-09-24 02:16:10 +02:00
chacha
7ffe05f514 clean 2025-09-23 22:43:39 +02:00
chacha
100d2cadcb renaming 2025-09-23 22:27:00 +02:00
chacha
71faefcf68 code cleaning 2025-09-23 22:10:03 +02:00
chacha
23f14a042d make metaclass thread safer 2025-09-23 22:06:02 +02:00
chacha
19a6c802bb yet another huge rework 2025-09-23 21:47:23 +02:00
chacha
0537d2d912 fix exceptions 2025-09-22 22:29:18 +02:00
chacha
09237ff8cd work 2025-09-22 21:43:03 +02:00
chacha
ff55ef18d1 all tests passes ! 2025-09-22 01:50:26 +02:00
chacha
827e5e3f55 huge work 2025-09-22 01:14:14 +02:00
chacha
b9f5b83690 work 2025-09-21 10:47:53 +02:00
24 changed files with 2981 additions and 1316 deletions

View File

@@ -1,41 +0,0 @@
from typing import Generic, TypeVar, Any
from .LAMField import LAMField
from .Constraint import Constraint
from ..tools import LAMdeepfreeze
T_Field = TypeVar("T_Field")
class FrozenLAMField(Generic[T_Field]):
"""FrozenLAMField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: LAMField[T_Field]):
self._inner_field = inner_field
@property
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return LAMdeepfreeze(self._inner_field.doc)
@property
def constraints(self) -> tuple[Constraint]:
"""Returns Field's constraint (frozen)"""
return LAMdeepfreeze(self._inner_field.constraints)
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return LAMdeepfreeze(self._inner_field.annotations)

View File

@@ -1,38 +0,0 @@
from typing import Generic, TypeVar, Self, Hashable, Any
from abc import ABC, abstractmethod
from ..tools import JSONType
TV_LAMCompatbile = TypeVar("TV_LABCompatbile", bound="LABCompatible")
class LAMCompatible(Generic[TV_LAMCompatbile], ABC):
"""Any type that can safely live inside a LABField."""
@classmethod
def lam_validate_annotation(cls, annotation: Any) -> None:
"""
Validate the type annotation (e.g., SpecialList[int]).
Raise if it's not compatible.
"""
return # default: do nothing (simple types dont need it)
@abstractmethod
def lam_validate(self) -> None:
"""Raise if the value is invalid for this type."""
...
@abstractmethod
def lam_freeze(self) -> Hashable:
"""Return an immutable/hashable representation of this value."""
...
@abstractmethod
def lam_to_plain(self) -> JSONType:
"""Return a plain serializable form (str, dict, etc.)."""
...
@classmethod
@abstractmethod
def lam_from_plain(cls, plain: JSONType) -> Self:
"""Return an Object from a plain serializable form (str, dict, etc.)."""
...

View File

@@ -1,62 +0,0 @@
from typing import Generic, TypeVar, Optional, Any
from .LAMFieldInfo import LAMFieldInfo
from .Constraint import Constraint
from ..tools import LAMdeepfreeze
TV_LABField = TypeVar("TV_LABField")
class LAMField(Generic[TV_LABField]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[TV_LABField], a: Any, i: LAMFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[TV_LABField] = v
self._value: Optional[TV_LABField] = v
self._annotations: Any = a
self._info: LAMFieldInfo = i
self._constraints: list[Constraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: Constraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[Constraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, v: Optional[TV_LABField] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
return LAMdeepfreeze(self._value)
@property
def raw_value(self) -> Optional[TV_LABField]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations

View File

@@ -1,26 +0,0 @@
from typing import Optional, Any
from .Constraint import Constraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(
self, *, doc: str = "", constraints: Optional[list[Constraint]] = None
):
self._doc: str = doc
self._constraints: list[Constraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self) -> list[Constraint[Any]]:
"""Returns Field's constraints"""
return self._constraints

View File

@@ -13,13 +13,14 @@ Main module __init__ file.
from .__metadata__ import __version__, __Summuary__, __Name__
from .LAMFields.LAMField import LAMField
from .LAMFields.LAMFieldInfo import LAMFieldInfo
from .LAMFields.FrozenLAMField import FrozenLAMField
from .LAMFields.LAMCompatible import LAMCompatible
from .meta.element import ClassMutable, ObjectMutable
from .element import Element
from .lam_field.lam_field import LAMField
from .lam_field.lam_field_info import LAMFieldInfo
# from .LAMFields.FrozenLAMField import FrozenLAMField
from .appliance import Appliance
from .feature import Feature
from .element import Element
from .exception import (
@@ -27,7 +28,6 @@ from .exception import (
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
@@ -38,6 +38,8 @@ from .exception import (
InvalidFeatureInheritance,
FeatureNotBound,
UnsupportedFieldType,
NonExistingField,
InvalidFieldName,
)
__all__ = [name for name in globals() if not name.startswith("_")]

View File

@@ -1,9 +1,71 @@
from .element import Element
from .meta.element import IAppliance
from .meta.appliance import _MetaAppliance
from .feature import Feature
class Appliance(metaclass=_MetaAppliance):
class Appliance(IAppliance, metaclass=_MetaAppliance):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""
def validate_schema(self):
super().validate_schema()
for k in self.__lam_schema__["features"]:
self.__dict__[k].validate_schema()
@classmethod
def validate_schema_class(cls):
super().validate_schema_class()
print(cls.__lam_schema__["features"])
for v in cls.__lam_schema__["features"].values():
v.validate_schema_class()
def _validate_schema_unknown_attr(self, name: str):
if isinstance(self.__dict__[name], Feature):
return
super()._validate_schema_unknown_attr(name)
def _validate_schema_missing_attr(self, name: str):
if name == "features":
return
super()._validate_schema_missing_attr(name)
@classmethod
def _validate_unknown_field_schema(cls, name: str):
if name == "features":
return
super()._validate_unknown_field_schema(name)
def _freeze_unknown_attr(self, name: str, force: bool = False):
if isinstance(self.__dict__[name], Feature):
self.__dict__[name].freeze(force)
return
super()._freeze_unknown_attr(name, force)
def _freeze_missing_attr(self, name: str, force: bool = False):
if name == "features":
for k, v in self.__lam_schema__["features"].items():
v.freeze_class(force)
return
super()._freeze_missing_attr(name, force)
@classmethod
def _freeze_unknown_field_schema(cls, name: str, force: bool = False):
if name == "features":
for v in cls.__lam_schema__["features"].values():
v.freeze_class(force)
return
super()._freeze_unknown_field_schema(name, force)
@classmethod
def _validate_unknown_attr_class(cls, name: str) -> None:
if issubclass(cls.__dict__[name], Feature):
return
super()._validate_unknown_attr_class(name)
@classmethod
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
if issubclass(cls.__dict__[name], Feature):
return
super()._freeze_unknown_attr_class(name, force)

View File

@@ -0,0 +1,253 @@
from typing import Any, Self, Dict, Optional
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from copy import deepcopy
from .lam_field.lam_field import LAMField, LAMField_Element
from .exception import ReadOnlyField, SchemaViolation, NonExistingField, InvalidFieldValue
from .tools import LAMdeepfreeze, is_data_attribute
'''
class ElementView:
__slots__ = ("_vals", "_types", "_touched")
def __init__(
self,
values: dict[str, Any],
types_map: dict[str, type],
):
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise NonExistingField(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
class ElementViewCls:
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(
self,
values: dict[str, Any],
types_map: dict[str, type],
name: Optional[str] = None,
module: Optional[str] = None,
):
super().__init__(values, types_map)
self._name: str
self._module: str
if name is not None:
object.__setattr__(self, "_name", name)
if module is not None:
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value: str):
pass
'''
class BaseElement:
__lam_schema__ = {}
__lam_initialized__ = False
__lam_class_mutable__ = False
__lam_object_mutable__ = False
__lam_options__ = {}
"""
def get_model_spec(self, name: str, module: str, memo: dict[str, Any]) -> ElementView:
# memo[self.__name__] = {}
init_fieldvalues = {}
init_fieldtypes = {}
for k, v in self.__lam_schema__.items():
if isinstance(v, LAMField_Element):
memo[k] = {}
init_fieldvalues[k] = v.value.get_model_spec(memo[k])
# clone = v.clone_unfrozen().value
elif isinstance(v, LAMField):
clone = deepcopy(v.value)
init_fieldvalues[k] = clone
else:
pass
init_fieldtypes[k] = v.annotations
return ElementView(init_fieldvalues, init_fieldtypes)
@classmethod
def get_model_spec_cls(cls, memo: dict[str, Any]) -> ElementView:
# memo[self.__name__] = {}
init_fieldvalues = {}
init_fieldtypes = {}
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField_Element):
memo[k] = {}
init_fieldvalues[k] = v.value.get_model_spec(k, memo[k])
# clone = v.clone_unfrozen().value
elif isinstance(v, LAMField):
clone = deepcopy(v.value)
init_fieldvalues[k] = clone
else:
pass
init_fieldtypes[k] = v.annotations
return ElementViewCls(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__modules__)
"""
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
raise NotImplemented()
@classmethod
@property
def frozen_cls(cls) -> bool:
return not cls.__lam_class_mutable__
@classmethod
@property
def mutable_obj(cls) -> bool:
return cls.__lam_object_mutable__
@property
def frozen(self) -> bool:
return not self.__lam_object_mutable__
def __setattr__(self, key: str, value: Any) -> None:
if key.startswith("_"):
return super().__setattr__(key, value)
if key not in self.__lam_schema__:
raise NonExistingField(f"Can't create new object attributes: {key}")
if not self.__lam_object_mutable__:
raise ReadOnlyField(f"{key} is read-only")
self.__lam_schema__[key].validate(value)
return super().__setattr__(key, value)
def freeze(self, force: bool = False) -> None:
if self.__lam_object_mutable__ or force:
if self.__lam_object_mutable__:
self.validate_schema()
setSchemaKeys = set(self.__lam_schema__)
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
for k_unknown in setInstanceKeys - setSchemaKeys:
self._freeze_unknown_attr(k_unknown, force)
for k_missing in setSchemaKeys - setInstanceKeys:
self._freeze_missing_attr(k_missing, force)
for k in list(setSchemaKeys & setInstanceKeys):
self.__lam_schema__[k].freeze()
if isinstance(self.__dict__[k], BaseElement):
self.__dict__[k].freeze(force)
else:
self.__dict__[k] = LAMdeepfreeze(self.__dict__[k])
self.__lam_object_mutable__ = False
def _freeze_unknown_attr(self, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Attribute <{name}> is not in the schema")
def _freeze_missing_attr(self, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Attribute <{name}> is missing from instance")
def validate_schema(self) -> None:
setSchemaKeys = set(self.__lam_schema__)
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
for k_unknown in setInstanceKeys - setSchemaKeys:
self._validate_schema_unknown_attr(k_unknown)
for k_missing in setSchemaKeys - setInstanceKeys:
self._validate_schema_missing_attr(k_missing)
for k in list(setSchemaKeys & setInstanceKeys):
self.__lam_schema__[k].validate_self()
def _validate_schema_unknown_attr(self, name: str) -> None:
raise SchemaViolation(f"Attribute <{name}> is not in the schema")
def _validate_schema_missing_attr(self, name: str) -> None:
raise SchemaViolation(f"Attribute <{name}> is missing from instance")
@classmethod
def freeze_class(cls, force: bool = False) -> None:
if cls.__lam_class_mutable__ or force:
cls.validate_schema_class()
# class should not have any elements so they are all unknown
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
cls._freeze_unknown_attr_class(k_unknown, force)
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
cls.__lam_schema__[k].freeze()
else:
cls._freeze_unknown_field_schema(k, force)
cls.__lam_class_mutable__ = False
@classmethod
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Class attribute <{name}> is not in the schema")
@classmethod
def _freeze_unknown_field_schema(cls, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Unknown field <{name} in the schema> ")
@classmethod
def validate_schema_class(cls) -> None:
# class should not have any elements so they are all unknown
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
cls._validate_unknown_attr_class(k_unknown)
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
v.validate_self()
else:
cls._validate_unknown_field_schema(k)
@classmethod
def _validate_unknown_attr_class(cls, name: str) -> None:
raise SchemaViolation(f"Class attribute <{name}> is not in the schema")
@classmethod
def _validate_unknown_field_schema(cls, name: str) -> None:
raise SchemaViolation(f"Unknown field <{name} in the schema> ")

87
src/dabmodel/defines.py Normal file
View File

@@ -0,0 +1,87 @@
from typing import Any, Union, Optional, List, Dict, Tuple, Set, FrozenSet, Annotated
from types import SimpleNamespace
import math
ALLOWED_MODEL_FIELDS_TYPES: set[type[Any], ...] = {
str,
int,
float,
complex,
bool,
bytes,
}
ALLOWED_ANNOTATIONS: dict[str, Any] = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
ALLOWED_HELPERS_DEFAULT: dict[str, object] = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
JSONPrimitive = Union[str, int, float, bool, None]
JSONType = Union[JSONPrimitive, List[Any], Dict[str, Any]] # recursive in practice

View File

@@ -4,17 +4,24 @@ class DABModelException(Exception):
"""
class FunctionForbidden(DABModelException): ...
class WrongUsage(DABModelException, RuntimeError):
pass
class ExternalCodeForbidden(FunctionForbidden): ...
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class"""
class ClosureForbidden(FunctionForbidden): ...
class ExternalCodeForbidden(FunctionForbidden):
"""ExternalCodeForbidden Exception class"""
class ReservedFieldName(Exception):
"""DABModelException Exception class
class ClosureForbidden(FunctionForbidden):
"""ClosureForbidden Exception class"""
class ReservedFieldName(AttributeError, DABModelException):
"""ReservedFieldName Exception class
Base Exception for DABModelException class
"""
@@ -31,19 +38,13 @@ class BrokenInheritance(DABModelException):
"""
class ReadOnlyField(DABModelException):
class ReadOnlyField(AttributeError, DABModelException):
"""ReadOnlyField Exception class
The used Field is ReadOnly
"""
class NewFieldForbidden(DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class InvalidFieldAnnotation(DABModelException):
class InvalidFieldAnnotation(AttributeError, DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
"""
@@ -73,24 +74,36 @@ class UnsupportedFieldType(InvalidFieldAnnotation):
"""
class ReadOnlyFieldAnnotation(DABModelException):
class ReadOnlyFieldAnnotation(AttributeError, DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class InvalidFieldValue(DABModelException):
class SchemaViolation(AttributeError, DABModelException):
"""SchemaViolation Exception class
The Element Schema is not respected
"""
class InvalidFieldValue(SchemaViolation):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(DABModelException):
class NonExistingField(SchemaViolation):
"""NonExistingField Exception class
The given Field is non existing
"""
class InvalidFieldName(AttributeError, DABModelException):
"""InvalidFieldName Exception class
The Field name is invalid
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
@@ -107,3 +120,27 @@ class FeatureNotBound(DABModelException):
"""FeatureNotBound Exception class
a Feature must be bound to the appliance (or parent)
"""
class FeatureAlreadyBound(DABModelException):
"""FeatureAlreadyBound Exception class
Feature can only be bind once
"""
class FeatureBoundToNonAppliance(DABModelException):
"""FeatureBoundToNonAppliance Exception class
Feature can only be bind to Appliance class
"""
class FeatureBoundToIncompatibleAppliance(DABModelException):
"""FeatureBoundToWrongAppliance Exception class
Feature have to be bound to correct appliance
"""
class FeatureWrongBound(DABModelException):
"""FeatureWrongBound Exception class
Feature can be bind to one and only one Appliance
"""

View File

@@ -1,12 +1,50 @@
from .element import Element
from .meta.element import IFeature, IAppliance
from .meta.feature import _MetaFeature
from .exception import (
FeatureAlreadyBound,
FeatureNotBound,
FeatureBoundToNonAppliance,
FeatureBoundToIncompatibleAppliance,
)
class Feature(metaclass=_MetaFeature):
class Feature(IFeature, metaclass=_MetaFeature):
"""Feature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
__lam_bound_appliance__ = None
@classmethod
def check_appliance_bound(cls):
if cls.__lam_bound_appliance__ is None:
raise FeatureNotBound(f"Feature {cls} is not bound to any Appliance")
@classmethod
def check_appliance_compatibility(cls, appliance_cls):
cls.check_appliance_bound()
if not issubclass(appliance_cls, cls.__lam_bound_appliance__):
raise FeatureBoundToIncompatibleAppliance(
f"Feature {cls} is bound to an incompatible Appliance {appliance_cls}"
)
@classmethod
def bind_appliance(cls, appliance_cls):
if cls.__lam_bound_appliance__ is not None:
raise FeatureAlreadyBound(
f"Feature {cls} already bound to an Appliance {cls.__lam_bound_appliance__}"
)
if (
appliance_cls is None
or not isinstance(appliance_cls, type)
or not issubclass(appliance_cls, IAppliance)
):
raise FeatureBoundToNonAppliance(
f"Trying to bind Feature {cls} to an invalid Appliance Reference: {appliance_cls}"
)
cls.__lam_bound_appliance__ = appliance_cls
Enabled: bool = False

View File

@@ -0,0 +1,39 @@
from typing import Generic, TypeVar, Dict, Protocol, runtime_checkable, Self
TV_Freezable = TypeVar("TV_Freezable")
@runtime_checkable
class FreezableElement(Protocol, Generic[TV_Freezable]):
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
pass
def freeze(self, force: bool = False) -> None:
pass
@classmethod
def freeze_class(cls, force: bool = False) -> None:
pass
def validate_schema(self) -> None:
pass
@classmethod
def validate_schema_class(cls) -> None:
pass
@classmethod
@property
def frozen_cls(cls) -> bool:
pass
@classmethod
@property
def mutable_obj(cls) -> bool:
pass
@property
def frozen(self) -> bool:
pass

View File

@@ -0,0 +1,176 @@
from typing import Generic, TypeVar, Optional, Any, Self, Annotated, get_origin, get_args
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from copy import deepcopy
from .lam_field_info import LAMFieldInfo
from .constraint import Constraint
from ..tools import LAMdeepfreeze
from ..exception import InvalidFieldValue, ReadOnlyField
from ..interfaces import FreezableElement
TV_LABField = TypeVar("TV_LABField")
class LAMField(Generic[TV_LABField]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, val: Optional[TV_LABField], ann: Any, i: LAMFieldInfo):
self._default_value: Optional[TV_LABField]
self._value: Optional[TV_LABField]
self.__annotations: Any
self.__name: str = name
self.__source: Optional[type] = None
self.__info: LAMFieldInfo = deepcopy(i)
self._set_annotations(ann)
self.__frozen: bool = False
self._frozen_value: Any = None
self.__frozen_value_set: True = False
self.validate(val)
self._init_value(val)
def _set_annotations(self, ann: Any) -> None:
_origin = get_origin(ann) or ann
_args = get_args(ann)
if _origin is Annotated:
self.__annotations: Any = LAMdeepfreeze(_args[0])
else:
self.__annotations: Any = LAMdeepfreeze(ann)
def _init_value(self, val: Optional[TV_LABField | FreezableElement]):
self._default_value: Optional[TV_LABField] = deepcopy(val)
self._value: Optional[TV_LABField] = val
@property
def name(self) -> str:
return self.__name
def is_frozen(self) -> bool:
return self.__frozen
def freeze(self):
self.__frozen = True
def clone_unfrozen(self) -> Self:
field = LAMFieldFactory.create_field(self.__name, self._default_value, self.__annotations, self.__info)
field.update_value(self._value)
return field
def add_source(self, src: type) -> None:
"""Adds source Appliance to the Field"""
if self.__frozen:
raise ReadOnlyField("Field is frozen, cannot add source now")
self.__source = src
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self.__info.doc
def add_constraint(self, cons: Constraint) -> None:
"""Adds constraint to the Field"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
self.__info.add_constraint(cons)
@property
def constraints(self) -> list[Constraint]:
"""Returns Field's constraint"""
return LAMdeepfreeze(self.__info.constraints)
def validate_self(self):
self.validate(self._value)
def validate(self, val: Optional[TV_LABField]):
try:
check_type(
val,
self.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{self.__name}> is not of expected type {self.annotations}.") from exp
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, val: Optional[TV_LABField] = None) -> None:
"""Updates Field's value"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
self.validate(val)
self._value = val
self.__frozen_value_set = False
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
if self.__frozen:
return self.frozen_value
else:
return self.raw_value
@property
def raw_value(self) -> Optional[TV_LABField]:
"""Returns Field's value"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
return self._value
def _generate_frozen_value(self):
self._frozen_value = LAMdeepfreeze(self._value)
@property
def frozen_value(self) -> Any:
if not self.__frozen_value_set:
self._generate_frozen_value()
self.__frozen_value_set = True
return self._frozen_value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self.__annotations
@property
def info(self) -> LAMFieldInfo:
"""Returns Field's info"""
return self.__info
class LAMField_Element(LAMField[FreezableElement]):
def _init_value(self, val: Optional[FreezableElement]):
self._default_value = deepcopy(val)
self._default_value.freeze()
self._value = val.clone_as_mutable_variant()
def validate(self, val: Optional[FreezableElement]):
super().validate(val)
if val is not None:
print(val)
val.validate_schema()
@property
def default_value(self) -> Any:
return self._default_value
def update_value(self, val: Optional[FreezableElement] = None) -> None:
super().update_value(val.clone_as_mutable_variant())
def _generate_frozen_value(self):
self._frozen_value = deepcopy(self._value)
self._frozen_value.freeze()
class LAMFieldFactory:
@staticmethod
def create_field(name: str, val: Optional[TV_LABField], anno: Any, info: LAMFieldInfo) -> LAMField:
if isinstance(val, FreezableElement):
print(f"Spawn LAMField_Element {name} !!!")
return LAMField_Element(name, val, anno, info)
else:
print(f"Spawn LAMField {name} !!!")
return LAMField(name, val, anno, info)

View File

@@ -0,0 +1,27 @@
from typing import Optional, Any
from .constraint import Constraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[Constraint]] = None):
self.__doc: str = doc
self.__constraints: list[Constraint]
if constraints is None:
self.__constraints = []
else:
self.__constraints = constraints
def add_constraint(self, constraint: Constraint):
self.__constraints.append(constraint)
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self.__doc
@property
def constraints(self) -> list[Constraint[Any]]:
"""Returns Field's constraints"""
return self.__constraints

View File

@@ -1,18 +1,10 @@
from typing import Any, Type
from frozendict import frozendict
from copy import copy
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from ..tools import LAMdeepfreeze
from ..LAMFields.LAMField import LAMField
from ..LAMFields.FrozenLAMField import FrozenLAMField
from .element import _MetaElement
from .element import _MetaElement, get_mutable_variant
from ..feature import Feature
from ..exception import (
InvalidFieldValue,
InvalidFeatureInheritance,
FeatureNotBound,
)
from ..exception import InvalidFieldValue, InvalidFeatureInheritance, InvalidFieldName
class _MetaAppliance(_MetaElement):
@@ -26,20 +18,30 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
) -> None:
"""
Appliance-specific pre-check: ensure the `features` slot exists in schema.
Copies the parent's `features` mapping when inheriting to keep it per-class.
"""
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
if "features" not in namespace["__LAMSchema__"]:
namespace["__LAMSchema__"]["features"] = {}
else:
namespace["__LAMSchema__"]["features"] = copy(
namespace["__LAMSchema__"]["features"]
)
super().check_class(name, bases, namespace, stack_exts) # type: ignore[misc]
if "features" not in namespace["__lam_schema__"]:
namespace["__lam_schema__"]["features"] = {}
@classmethod
def inherit_schema( # pylint: disable=too-complex,too-many-branches
mcs: type["_MetaElement"],
name: str,
base: type[Any],
namespace: dict[str, Any],
stack_exts: dict[str, Any],
):
super().inherit_schema(name, base, namespace, stack_exts)
if "features" in base.__lam_schema__:
namespace["__lam_schema__"]["features"] = dict(base.__lam_schema__["features"])
@classmethod
def process_class_fields(
@@ -47,18 +49,18 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Like meta.process_class_fields but also stages Feature declarations.
Initializes:
extensions["new_features"], extensions["modified_features"]
stack_exts["new_features"], stack_exts["modified_features"]
then defers to the base scanner for regular fields.
"""
extensions["new_features"] = {}
extensions["modified_features"] = {}
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
stack_exts["new_features"] = {}
stack_exts["modified_features"] = {}
super().process_class_fields(name, bases, namespace, stack_exts) # type: ignore[misc]
@classmethod
def process_new_field(
@@ -68,7 +70,7 @@ class _MetaAppliance(_MetaElement):
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
stack_exts: dict[str, Any],
): # pylint: disable=unused-argument
"""
Intercept Feature declarations.
@@ -78,16 +80,19 @@ class _MetaAppliance(_MetaElement):
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
- Otherwise, it is a regular field: delegate to meta.process_new_field.
"""
if _fname in namespace["__LAMSchema__"]["features"].keys():
if not issubclass(_fvalue, namespace["__LAMSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
)
extensions["modified_features"][_fname] = _fvalue
if _fname == "features":
raise InvalidFieldName("'feature' is a reserved Field name")
if _fname in namespace["__lam_schema__"]["features"]:
if not issubclass(_fvalue, namespace["__lam_schema__"]["features"][_fname]):
raise InvalidFeatureInheritance(f"Feature {_fname} is not a subclass of {bases[0]}.{_fname}")
stack_exts["modified_features"][_fname] = get_mutable_variant(_fvalue)
namespace[_fname] = stack_exts["modified_features"][_fname]
elif isinstance(_fvalue, type) and issubclass(_fvalue, Feature):
extensions["new_features"][_fname] = _fvalue
stack_exts["new_features"][_fname] = get_mutable_variant(_fvalue)
namespace[_fname] = stack_exts["new_features"][_fname]
else:
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
super().process_new_field(name, bases, namespace, _fname, _fvalue, stack_exts) # type: ignore[misc]
@classmethod
def commit_fields(
@@ -96,7 +101,7 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Commit regular fields (via meta) and then bind staged Feature classes.
@@ -105,62 +110,63 @@ class _MetaAppliance(_MetaElement):
- bind it to `cls` (sets the feature's `_BoundAppliance`),
- register it under `cls.__LAMSchema__["features"]`.
"""
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
super().commit_fields(cls, name, bases, namespace, stack_exts) # type: ignore[misc]
for _ftname, _ftvalue in extensions["modified_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__LAMSchema__["features"][_ftname] = _ftvalue
for _ftname, _ftvalue in extensions["new_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__LAMSchema__["features"][_ftname] = _ftvalue
cls.__lam_schema__["features"].update(stack_exts["modified_features"])
for v in stack_exts["new_features"].values():
v.bind_appliance(cls)
cls.__lam_schema__["features"].update(stack_exts["new_features"])
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for fname, fdef in obj.__LAMSchema__.get("features", {}).items():
# Case 1: plain class or subclass
if isinstance(fdef, type) and issubclass(fdef, Feature):
inst = fdef()
object.__setattr__(obj, fname, inst)
@classmethod
def prepare_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
init_fieldvalues: dict[str, Any],
init_fieldtypes: dict[str, Any],
stack_exts: dict[str, Any],
):
for k, v in cls.__lam_schema__["features"].items():
init_fieldvalues[k] = v
init_fieldtypes[k] = v
# Case 2: (class, dict) → dict overrides
elif isinstance(fdef, tuple) and len(fdef) == 2:
feat_cls, overrides = fdef
inst = feat_cls()
for field_name, new_val in overrides.items():
if field_name not in feat_cls.__LAMSchema__:
raise InvalidFieldValue(
f"Feature '{fname}' has no field '{field_name}'"
)
field = feat_cls.__LAMSchema__[field_name]
try:
check_type(
new_val,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for {fname}.{field_name}: "
f"expected {field.annotations}, got {new_val!r}"
) from exp
object.__setattr__(inst, field_name, LAMdeepfreeze(new_val))
inst.__LAMSchema__[field_name] = FrozenLAMField(
LAMField(field_name, new_val, field.annotations, field._info)
)
object.__setattr__(obj, fname, inst)
@classmethod
def commit_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
fakecls_exports: dict[str, Any],
stack_exts: dict[str, Any],
):
for fk, fv in cls.__lam_schema__["features"].items():
for k, v in fv.__lam_schema__.items():
v.update_value(fakecls_exports[fk].__getattr__(k))
else:
raise InvalidFieldValue(
f"Invalid feature definition stored for '{fname}': {fdef!r}"
)
@classmethod
def finalize_class(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
stack_exts: dict[str, Any],
):
cls.__lam_schema__["features"] = frozendict(cls.__lam_schema__["features"])
def apply_overrides(cls, obj, extensions, *args, **kwargs):
super().finalize_class(cls, name, bases, namespace, stack_exts)
if not cls.__lam_class_mutable__:
for feat in cls.__lam_schema__["features"].values():
feat.freeze_class(True)
def populate_instance(cls: Type, obj: Any, stack_exts: dict[str, Any], *args: Any, **kw: Any):
super().populate_instance(obj, stack_exts, *args, **kw)
obj.__lam_schema__["features"] = dict(cls.__lam_schema__["features"])
def apply_overrides(cls, obj, stack_exts, *args, **kwargs):
"""
Support for runtime field and feature overrides.
@@ -174,40 +180,63 @@ class _MetaAppliance(_MetaElement):
# --- feature overrides ---
for k, v in list(kwargs.items()):
if k in cls.__LAMSchema__.get("features", {}):
base_feat_cls = cls.__LAMSchema__["features"][k]
if k in obj.__lam_schema__["features"]:
base_feat_cls = obj.__lam_schema__["features"][k]
# Case 1: subclass replacement (inheritance)
if isinstance(v, type) and issubclass(v, base_feat_cls):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
v.check_appliance_compatibility(cls)
# record subclass into instance schema
obj.__LAMSchema__["features"][k] = v
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
if not obj.__lam_class_mutable__:
obj.__lam_schema__["features"][k].freeze_class(True)
kwargs.pop(k)
# Case 2: dict override
elif isinstance(v, dict):
# store (class, override_dict) for finalize_instance
obj.__LAMSchema__["features"][k] = (base_feat_cls, v)
obj.__lam_schema__["features"][k] = (base_feat_cls, v)
kwargs.pop(k)
else:
raise InvalidFieldValue(
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
)
raise InvalidFieldValue(f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}")
# --- new features not declared at class level ---
for k, v in list(kwargs.items()):
if isinstance(v, type) and issubclass(v, Feature):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
obj.__LAMSchema__["features"][k] = v
v.check_appliance_compatibility(cls)
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
if not obj.__lam_class_mutable__:
obj.__lam_schema__["features"][k].freeze_class(True)
kwargs.pop(k)
super().apply_overrides(obj, extensions, *args, **kwargs)
super().apply_overrides(obj, stack_exts, *args, **kwargs)
def finalize_instance(cls: Type, obj, stack_exts: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for k, v in obj.__lam_schema__["features"].items():
# Case 1: plain class or subclass
if isinstance(v, type) and issubclass(v, Feature):
inst = v()
object.__setattr__(obj, k, inst)
# Case 2: (class, dict) → dict overrides
elif isinstance(v, tuple) and len(v) == 2:
feat_cls, overrides = v
inst = feat_cls(**overrides)
object.__setattr__(obj, k, inst)
obj.__lam_schema__["features"][k] = feat_cls
else:
raise InvalidFieldValue(f"Invalid feature definition stored for '{k}': {fdef!r}")
obj.__lam_schema__["features"] = frozendict(obj.__lam_schema__["features"])
super().finalize_instance(obj, stack_exts)

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,5 @@
from typing import Type, Any
from .element import _MetaElement
from ..exception import FeatureNotBound
class _MetaFeature(_MetaElement):
@@ -8,12 +7,18 @@ class _MetaFeature(_MetaElement):
Feature specific metaclass code
"""
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseFeature new instance"""
@classmethod
def finalize_class(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
stack_exts: dict[str, Any],
):
if "appliance" in stack_exts["kwargs"]:
cls.bind_appliance(stack_exts["kwargs"]["appliance"])
if cls._BoundAppliance is None:
raise FeatureNotBound()
obj = super().__call__(*args, **kw)
return obj
def finalize_instance(cls: Type, obj: Any, stack_exts: dict[str, Any]):
cls.check_appliance_bound()
super().finalize_instance(obj, stack_exts)

View File

@@ -1,14 +1,21 @@
"""library's internal tools"""
from typing import Union, List, Any, Dict
from collections import ChainMap
from typing import Any, Annotated, get_origin, get_args, Union, Self, Optional, List, Dict, Tuple, Set, FrozenSet, Mapping, Callable
import typing
from dataclasses import dataclass
from types import UnionType, NoneType
import types
from uuid import UUID
from datetime import datetime
import json
import inspect
from frozendict import deepfreeze
JSONPrimitive = Union[str, int, float, bool, None]
JSONType = Union[JSONPrimitive, List[Any], Dict[str, Any]] # recursive in practice
from frozendict import deepfreeze, frozendict
from .defines import ALLOWED_ANNOTATIONS, ALLOWED_MODEL_FIELDS_TYPES
from .exception import IncompletelyAnnotatedField, UnsupportedFieldType
class LAMJSONEncoder(json.JSONEncoder):
@@ -34,3 +41,339 @@ def LAMdeepfreeze(value):
if isinstance(value, tuple):
return tuple(LAMdeepfreeze(v) for v in value)
return value
def is_data_attribute(name: str, value: any) -> bool:
if name.startswith("_"):
return False
if isinstance(value, (staticmethod, classmethod, property)):
return False
if inspect.isfunction(value) or inspect.ismethoddescriptor(value):
return False
return True
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
class AnnotationWalkerCtx:
def __init__(
self,
origin: Any,
args: Any,
layer: int,
parent: Optional[Self] = None,
allowed_types: set[type, ...] = frozenset(),
allowed_annotations: dict[str, Any] = {},
):
self.__origin = origin
self.args = args
self.__layer = layer
self.__parent = parent
self.__allowed_types: set[type, ...] = allowed_types
self.__allowed_annotations: dict[str, Any] = allowed_annotations
self.__ext: dict[Any, ChainMap] = {} # per-trigger namespaces (lazy)
@property
def origin(self) -> Any:
return self.__origin
@property
def layer(self) -> int:
return self.__layer
@property
def parent(self) -> Self:
return self.__parent
@property
def allowed_types(self) -> FrozenSet[type]:
return self.__allowed_types
@property
def allowed_annotations(self) -> Mapping[str, Any]:
return self.__allowed_annotations
def ns(self, owner: Any) -> ChainMap:
"""
A per-trigger overlay namespace that inherits from parent ctx.
Use as: bag = ctx.ns(self); bag['whatever'] = ...
Lookups fall back to parent's bag automatically.
"""
if owner in self.__ext:
return self.__ext[owner]
parent_map = self.__parent.__ext.get(owner) if (self.__parent and hasattr(self.__parent, "_AnnotationWalkerCtx__ext")) else {}
cm = ChainMap({}, parent_map if isinstance(parent_map, ChainMap) else dict(parent_map))
self.__ext[owner] = cm
return cm
@dataclass(frozen=True)
class TriggerResult:
# If provided, children won't be walked and this value is returned.
replace_with: Any | None = None
# If true, skip walking children but don't replace current node value.
skip_children: bool = False
# If provided, walker will restart processing with the given value
restart_with: Any | None = None # NEW
@staticmethod
def passthrough() -> Self:
return TriggerResult()
@staticmethod
def replace(value: Any) -> Self:
return TriggerResult(replace_with=value, skip_children=True)
@staticmethod
def skip() -> Self:
return TriggerResult(skip_children=True)
@staticmethod
def restart(value: Any) -> Self:
print("Doo!")
return TriggerResult(restart_with=value)
class AnnotationTrigger:
def init_trigger(self) -> None:
pass
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_unknown(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
return None
class LAMSchemaValidation(AnnotationTrigger):
def init_trigger(self) -> None:
print(f"Initializing {self.__class__.__name__}")
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_annotated")
print(ctx.origin)
print(ctx.args)
if len(ctx.args) != 2:
raise UnsupportedFieldType("Annotated[T,x] requires 2 parameters")
if ctx.parent is not None:
raise UnsupportedFieldType("Annotated[T,x] is only supported as parent annotation")
return None
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_union")
print(ctx.args)
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
return None
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_dict")
if len(ctx.args) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
if not ctx.args[0] in ctx.allowed_types:
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
return None
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_tuple")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_list")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_set")
if len(ctx.args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
return None
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
print("process_allowed")
if ctx.origin is type(None) or ctx.origin is None:
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
return None
class AnnotationWalker:
DEFAULT_ALLOWED_TYPES = frozenset({str, int, float, complex, bool, bytes, NoneType})
DEFAULT_ALLOWED_ANNOTATIONS: dict[str, Any] = frozendict(
{
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
)
def __init__(self, ann: Any, triggers: tuple[AnnotationTrigger, ...], **kwargs):
if not triggers:
raise RuntimeError("AnnotationWalker requires trigger(s)")
# Normalize triggers into instances
insts: list[AnnotationTrigger] = []
for t in triggers if isinstance(triggers, tuple) else (triggers,):
if isinstance(t, AnnotationTrigger):
insts.append(t)
elif isinstance(t, type) and issubclass(t, AnnotationTrigger):
insts.append(t())
else:
raise RuntimeError(f"Unsupported trigger: {t}")
self._triggers = tuple(insts)
# Allowed types / annotations
atypes = set(type(self).DEFAULT_ALLOWED_TYPES)
if "ex_allowed_types" in kwargs:
atypes.update(kwargs["ex_allowed_types"])
self._allowed_types = frozenset(atypes)
annots = dict(type(self).DEFAULT_ALLOWED_ANNOTATIONS)
if "ex_allowed_annotations" in kwargs:
annots.update(kwargs["ex_allowed_annotations"])
self._allowed_annotations = frozendict(annots)
# Annotation can be string
self.__ann = ann
if isinstance(ann, str):
self.__ann = eval(ann, {"__builtins__": {}}, self._allowed_annotations)
def run(self) -> TriggerResult:
for trigger in self._triggers:
trigger.init_trigger()
return self._walk(self.__ann, None)
# --- Helpers ---
def _new_ctx(self, origin, args, layer, parent):
return AnnotationWalkerCtx(origin, args, layer, parent, self._allowed_types, self._allowed_annotations)
def _apply_triggers(self, method: str, ctx: AnnotationWalkerCtx) -> TriggerResult:
final = TriggerResult.passthrough()
for trig in self._triggers:
res = getattr(trig, method)(ctx)
if not res:
continue
if res.restart_with is not None:
return res # short-circuit on restart
if res.replace_with is not None:
final = TriggerResult.replace(res.replace_with)
if res.skip_children:
final = TriggerResult(
replace_with=final.replace_with,
skip_children=True,
)
return final
def _handle_with_triggers(
self,
trigger_name: str,
ctx: AnnotationWalkerCtx,
args_handler: Callable[[AnnotationWalkerCtx], Any] | None = None,
) -> Any:
"""Generic handler: run triggers, maybe recurse into args with a custom handler."""
res = self._apply_triggers(trigger_name, ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
if not res.skip_children:
if args_handler:
return args_handler(ctx)
return tuple(self._walk(a, ctx) for a in ctx.args)
return None
def _walk_args_tuple(self, ctx: AnnotationWalkerCtx):
# special Ellipsis case for Tuple
if len(ctx.args) == 2 and ctx.args[1] is Ellipsis:
return (self._walk(ctx.args[0], ctx), Ellipsis)
return tuple(self._walk(a, ctx) for a in ctx.args)
# --- Dispatcher ---
def _walk(self, type_: Any, parent_ctx: Optional[AnnotationWalkerCtx]) -> Any:
print(f"[{parent_ctx.layer if parent_ctx else 0}] walking through: {type_}")
origin = get_origin(type_) or type_
if origin is None:
origin = NoneType
if origin is Union:
origin = UnionType
if not isinstance(origin, type):
raise RuntimeError("Annotation must be using type(s), not instances")
args = get_args(type_)
layer = 0 if parent_ctx is None else parent_ctx.layer + 1
ctx = self._new_ctx(origin, args, layer, parent_ctx)
print(origin)
match origin:
case typing.Annotated:
return self._handle_with_triggers(
"process_annotated", ctx, args_handler=lambda c: self._walk(c.args[0], c) if c.args else None
)
case types.UnionType:
return self._handle_with_triggers("process_union", ctx)
case _ if issubclass(origin, dict):
return self._handle_with_triggers("process_dict", ctx)
case _ if issubclass(origin, tuple):
return self._handle_with_triggers("process_tuple", ctx, self._walk_args_tuple)
case _ if issubclass(origin, list):
return self._handle_with_triggers("process_list", ctx)
case _ if issubclass(origin, set):
return self._handle_with_triggers("process_set", ctx)
case _ if origin in self._allowed_types:
return self._handle_with_triggers("process_allowed", ctx)
case _:
res = self._apply_triggers("process_unknown", ctx)
if res.restart_with is not None:
return self._walk(res.restart_with, ctx.parent)
if res.replace_with is not None:
return res.replace_with
raise UnsupportedFieldType(f"Not supported Field: {ctx.origin}, " f"Supported list: {self._allowed_types}")

View File

@@ -0,0 +1,46 @@
# dabmodel (c) by chacha
#
# dabmodel is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from typing import Annotated, Union, Optional
import sys
import subprocess
from os import chdir, environ
from pathlib import Path
print(__name__)
print(__package__)
from src import dabmodel as dm
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class ElementTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def test_element_simple(self):
print(isinstance(None, type(None)))
print("\n== From OBJs ==")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[str]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
print("\n== From STRING ==")
res = dm.tools.AnnotationWalker('Annotated[Optional[dict[int, list[str]]], "comment"]', (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[None]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
# ---------- main ----------
if __name__ == "__main__":
unittest.main()

File diff suppressed because it is too large Load Diff

View File

@@ -7,7 +7,7 @@
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from typing import Annotated, Union
import sys
import subprocess
from os import chdir, environ
@@ -94,7 +94,47 @@ class ElementTest(unittest.TestCase):
self.assertIsInstance(a.elems[1].ar_int2, tuple)
self.assertEqual(a.elems[1].ar_int2, (1, 54, 65))
def test_element_frozen(self):
def test_class_frozen(self):
class E(dm.Element):
ivalue: int = 43
strvalue: str = "test"
fvalue: float = 1.4322
ar_int: list[int] = [1, 54, 65]
with self.assertRaises(dm.ReadOnlyField):
E.ivalue = 3
with self.assertRaises(dm.ReadOnlyField):
E.strvalue = "toto"
with self.assertRaises(dm.ReadOnlyField):
E.fvalue = 3.14
with self.assertRaises(AttributeError):
E.ar_int.append(5)
def test_instance_frozen(self):
class E(dm.Element):
ivalue: int = 43
strvalue: str = "test"
fvalue: float = 1.4322
ar_int: list[int] = [1, 54, 65]
e = E()
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 3
with self.assertRaises(dm.ReadOnlyField):
e.strvalue = "toto"
with self.assertRaises(dm.ReadOnlyField):
e.fvalue = 3.14
with self.assertRaises(AttributeError):
e.ar_int.append(5)
def test_composition_frozen(self):
class E(dm.Element):
ivalue: int = 43
strvalue: str = "test"
@@ -286,6 +326,535 @@ class ElementTest(unittest.TestCase):
self.assertIsInstance(b.elems[1].ar_int2, tuple)
self.assertEqual(b.elems[1].ar_int2, (1, 54, 65))
def test_method(self):
class E(dm.Element):
ivalue: int = 43
def get_increment(self) -> int:
return self.ivalue + 1
def increment(self) -> int:
return type(self)(ivalue=self.ivalue + 1)
class A(dm.Appliance):
elem: E = E(ivalue=45)
a = A()
self.assertIsInstance(a.elem, E)
self.assertEqual(a.elem.ivalue, 45)
self.assertEqual(a.elem.get_increment(), 46)
class B(A):
@classmethod
def __initializer(cls):
cls.elem = cls.elem.increment()
b = B()
self.assertEqual(b.elem.ivalue, 46)
self.assertEqual(b.elem.get_increment(), 47)
def test_initializer_appliance_function_forbidden(self):
def test_fun() -> int:
return 12
class E(dm.Element):
ivalue: int = 43
with self.assertRaises(dm.FunctionForbidden):
class B(dm.Element):
elem: E = E()
@classmethod
def __initializer(cls):
cls.elem.ivalue = test_fun()
with self.assertRaises(dm.FunctionForbidden):
class C(dm.Element):
elem: E = E()
@classmethod
def __initializer(cls):
cls.elem = E(ivalue=test_fun())
def test_mutable_class2_newelement_fails(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 43
with self.assertRaises(dm.NonExistingField):
E.test = 123
e = E()
with self.assertRaises(dm.NonExistingField):
e.test = 123
def test_mutable_class_freeze(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 43
E.ivalue = 12
self.assertEqual(E.ivalue, 12)
E.freeze_class()
self.assertEqual(E.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E.ivalue = 13
self.assertEqual(E.ivalue, 12)
e = E()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_freeze(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ivalue: int = 43
self.assertEqual(E.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
E.ivalue = 13
self.assertEqual(E.ivalue, 43)
e = E()
self.assertEqual(e.ivalue, 43)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
def test_mutable_object_and_class_freeze(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 43
self.assertEqual(E.ivalue, 43)
E.ivalue = 13
self.assertEqual(E.ivalue, 13)
e = E()
self.assertEqual(e.ivalue, 13)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
def test_mutable_object_and_class_freeze_2(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 43
self.assertFalse(E.__lam_schema__["ivalue"].is_frozen())
E.freeze_class()
self.assertTrue(E.__lam_schema__["ivalue"].is_frozen())
e = E()
self.assertFalse(e.__lam_schema__["ivalue"].is_frozen())
e.freeze()
self.assertTrue(e.__lam_schema__["ivalue"].is_frozen())
def test_mutable_class_freeze_container(self):
class E(dm.Element, options=(dm.ClassMutable)):
ar_ivalue: list[int] = [43]
E.ar_ivalue.append(12)
self.assertEqual(E.ar_ivalue, [43, 12])
E.freeze_class()
self.assertEqual(E.ar_ivalue, (43, 12))
with self.assertRaises(AttributeError):
E.ar_ivalue.append(52)
self.assertEqual(E.ar_ivalue, (43, 12))
e = E()
self.assertEqual(e.ar_ivalue, (43, 12))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(52)
self.assertEqual(e.ar_ivalue, (43, 12))
def test_mutable_object_freeze_container(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ar_ivalue: list[int] = [43, 54]
self.assertEqual(E.ar_ivalue, (43, 54))
with self.assertRaises(AttributeError):
E.ar_ivalue.append(13)
self.assertEqual(E.ar_ivalue, (43, 54))
e = E()
self.assertEqual(e.ar_ivalue, [43, 54])
e.ar_ivalue.append(32)
self.assertEqual(e.ar_ivalue, [43, 54, 32])
e.freeze()
self.assertEqual(e.ar_ivalue, (43, 54, 32))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(12)
self.assertEqual(e.ar_ivalue, (43, 54, 32))
def test_mutable_object_and_class_freeze_container(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ar_ivalue: list[int] = [43, 54]
self.assertEqual(E.ar_ivalue, [43, 54])
E.ar_ivalue.append(13)
self.assertEqual(E.ar_ivalue, [43, 54, 13])
e = E()
self.assertEqual(e.ar_ivalue, [43, 54, 13])
e.ar_ivalue.append(14)
self.assertEqual(e.ar_ivalue, [43, 54, 13, 14])
e.freeze()
self.assertEqual(e.ar_ivalue, (43, 54, 13, 14))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(15)
self.assertEqual(e.ar_ivalue, (43, 54, 13, 14))
def test_mutable_class_freeze_inheritance(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_freeze_inheritance(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_and_class_freeze_inheritance(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_and_class_freeze_inheritance_noleak(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e2 = E2()
self.assertEqual(e2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e2.ivalue = 14
self.assertEqual(e2.ivalue, 12)
# no Leak
self.assertEqual(E.ivalue, 12)
E.ivalue = 13
self.assertEqual(E.ivalue, 13)
e = E()
self.assertEqual(e.ivalue, 13)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
# no Leak 2
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e2 = E2()
self.assertEqual(e2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e2.ivalue = 14
self.assertEqual(e2.ivalue, 12)
def test_mutable_class_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ClassMutable)):
e: NElem = NElem()
E.e.ivalue = 12
self.assertEqual(E.e.ivalue, 12)
E.freeze_class()
self.assertEqual(E.e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 12)
e = E()
self.assertEqual(e.e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 12)
def test_mutable_object_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ObjectMutable)):
e: NElem = NElem()
self.assertEqual(E.e.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 43)
e = E()
self.assertEqual(e.e.ivalue, 43)
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 14)
e.freeze()
self.assertEqual(e.e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 15
self.assertEqual(e.e.ivalue, 14)
def test_mutable_object_and_class_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
e: NElem = NElem()
self.assertEqual(E.e.ivalue, 43)
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 13)
e = E()
self.assertEqual(e.e.ivalue, 13)
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 14)
e.e.freeze()
self.assertEqual(e.e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 15
self.assertEqual(e.e.ivalue, 14)
def test_element_clone(self):
class Elem(dm.Element):
ivalue: int = 43
self.assertTrue(Elem.frozen_cls)
self.assertFalse(Elem.mutable_obj)
e = Elem()
print(e)
new_obj1 = e.clone_as_mutable_variant()
self.assertFalse(new_obj1.frozen_cls)
self.assertTrue(new_obj1.mutable_obj)
self.assertIsInstance(new_obj1, Elem)
self.assertEqual(new_obj1.ivalue, 43)
new_obj1.ivalue = 55
self.assertEqual(new_obj1.ivalue, 55)
new_obj2 = e.clone_as_mutable_variant()
self.assertFalse(new_obj2.frozen_cls)
self.assertTrue(new_obj2.mutable_obj)
self.assertNotEqual(new_obj1, new_obj2)
self.assertEqual(type(new_obj1), type(new_obj2))
self.assertEqual(new_obj2.ivalue, 43)
new_obj2.ivalue = 56
self.assertEqual(new_obj2.ivalue, 56)
new_obj3 = e.clone_as_mutable_variant()
self.assertFalse(new_obj3.frozen_cls)
self.assertTrue(new_obj3.mutable_obj)
self.assertNotEqual(new_obj1, new_obj3)
self.assertNotEqual(new_obj2, new_obj3)
self.assertEqual(type(new_obj1), type(new_obj3))
self.assertEqual(type(new_obj2), type(new_obj3))
self.assertEqual(new_obj3.ivalue, 43)
new_obj3.ivalue = 57
self.assertEqual(new_obj3.ivalue, 57)
self.assertEqual(e.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 43)
self.assertEqual(new_obj1.ivalue, 55)
new_obj1.freeze()
self.assertEqual(new_obj1.ivalue, 55)
with self.assertRaises(dm.ReadOnlyField):
new_obj1.ivalue = 15
self.assertEqual(new_obj1.ivalue, 55)
self.assertEqual(new_obj2.ivalue, 56)
self.assertEqual(new_obj3.ivalue, 57)
new_obj2.ivalue = 76
self.assertEqual(new_obj2.ivalue, 76)
self.assertEqual(new_obj3.ivalue, 57)
self.assertEqual(new_obj1.ivalue, 55)
new_obj2.freeze()
self.assertEqual(new_obj2.ivalue, 76)
with self.assertRaises(dm.ReadOnlyField):
new_obj2.ivalue = 15
self.assertEqual(new_obj2.ivalue, 76)
self.assertTrue(e.frozen)
self.assertTrue(new_obj1.frozen)
self.assertTrue(new_obj2.frozen)
self.assertFalse(new_obj3.frozen)
def test_element_clone_inheritance(self):
class ElemA(dm.Element):
ivalue: int = 43
class ElemB(ElemA):
ivalue = 18
elemA = ElemA()
elemB = ElemB()
_elemA = elemA.clone_as_mutable_variant()
_elemB = elemB.clone_as_mutable_variant()
self.assertIsInstance(elemB, type(elemA))
self.assertIsInstance(_elemA, type(elemA))
self.assertIsInstance(_elemB, type(elemB))
self.assertIsInstance(_elemB, type(_elemA))
def test_element_clone_inheritance_nested(self):
class ElemAInner(dm.Element):
fvalue: float = 0.123
class ElemA(dm.Element):
ivalue: int = 43
el1: ElemAInner = ElemAInner()
class ElemBInner(ElemAInner):
fvalue = 0.9
class ElemB(ElemA):
ivalue = 18
el1 = ElemAInner(fvalue=6.54)
el2: ElemBInner = ElemBInner()
self.assertIsInstance(ElemA.el1, ElemAInner)
self.assertIsInstance(ElemB.el1, ElemAInner)
self.assertIsInstance(ElemB.el2, ElemAInner)
self.assertIsInstance(ElemB.el2, ElemBInner)
self.assertNotIsInstance(ElemB.el1, ElemBInner)
elemA = ElemA()
elemB = ElemB()
_elemA = elemA.clone_as_mutable_variant()
_elemB = elemB.clone_as_mutable_variant()
self.assertIsInstance(elemB, type(elemA))
self.assertIsInstance(_elemA, type(elemA))
self.assertIsInstance(_elemB, type(elemB))
self.assertIsInstance(_elemB, type(_elemA))
self.assertIsInstance(_elemB.el1, type(_elemA.el1))
self.assertIsInstance(_elemA.el1, type(elemA.el1))
self.assertIsInstance(_elemB.el1, type(elemB.el1))
self.assertIsInstance(_elemB.el1, type(_elemA.el1))
self.assertIsInstance(_elemB.el2, type(_elemA.el1))
self.assertIsInstance(_elemB.el2, type(elemB.el1))
self.assertIsInstance(_elemB.el2, type(_elemA.el1))
def test_element_mutable_invalid_instance_override_container(self):
class Elem(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
val: list[int] = []
with self.assertRaises(dm.InvalidFieldValue):
Elem(val=["test"])
def test_element_mutable_invalid_instance_override_container_nested(self):
class ElemInner(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
val: list[int] = []
class ElemOuter(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
inner: ElemInner = ElemInner()
# with self.assertRaises(dm.InvalidFieldValue):
elem = ElemOuter(inner={"val": [1, 2, 3]})
self.assertEqual(elem.inner.val, [1, 2, 3])
def test_class_protocol(self):
self.assertIsInstance(dm.base_element.BaseElement, dm.interfaces.FreezableElement)
def test_wrong_annotated(self):
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[Annotated[str, dm.LAMFieldInfo(doc="foo1")]] = ["default value"]
def test_wrong_annotation_union(self):
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: str | int = "test"
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[str | int] = "test"
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[None | int] = "test"
def test_annotation_union(self):
class Elem(dm.Appliance):
strvar1: str | None = "test"
strvar1: str | None = None
ivar1: Union[int, None] = 12
ivar2: Union[int, None] = None
# ---------- main ----------

View File

@@ -35,15 +35,13 @@ class FeatureTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def immutable_vars__test_field(
self, obj: Any, name: str, default_value: Any, test_value: Any
):
def immutable_vars__test_field(self, obj: Any, name: str, default_value: Any, test_value: Any):
# field is not in the class
self.assertNotIn(name, dir(obj.__class__))
# field is in the object
self.assertIn(name, dir(obj))
# field is in the schema
self.assertIn(name, obj.__LAMSchema__.keys())
self.assertIn(name, obj.__lam_schema__.keys())
# field is readable
self.assertEqual(getattr(obj, name), default_value)
# field is read only
@@ -62,20 +60,20 @@ class FeatureTest(unittest.TestCase):
app1 = Appliance1()
self.assertIsInstance(Appliance1.__LAMSchema__["VarStrOuter"], dm.LAMField)
self.assertIsInstance(app1.__LAMSchema__["VarStrOuter"], dm.FrozenLAMField)
self.assertIn("Feature1", app1.__LAMSchema__["features"])
self.assertIn(
"VarStrInner", app1.__LAMSchema__["features"]["Feature1"].__LAMSchema__
)
self.assertIsInstance(Appliance1.__lam_schema__["VarStrOuter"], dm.LAMField)
self.assertTrue(app1.__lam_schema__["VarStrOuter"].is_frozen())
self.assertIn("Feature1", app1.__lam_schema__["features"])
self.assertIn("VarStrInner", app1.__lam_schema__["features"]["Feature1"].__lam_schema__)
self.assertIsInstance(
app1.__LAMSchema__["features"]["Feature1"].__LAMSchema__["VarStrInner"],
app1.__lam_schema__["features"]["Feature1"].__lam_schema__["VarStrInner"],
dm.LAMField,
)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(
app1.Feature1.__LAMSchema__["VarStrInner"], dm.FrozenLAMField
)
self.assertTrue(app1.Feature1.frozen)
print(app1)
print(app1.Feature1)
print(app1.Feature1.__lam_schema__["VarStrInner"])
self.assertTrue(app1.Feature1.__lam_schema__["VarStrInner"].is_frozen())
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
def test_inheritance(self):
@@ -114,20 +112,16 @@ class FeatureTest(unittest.TestCase):
app2 = Appliance2()
app3 = Appliance3()
self.assertIsInstance(Appliance1.__LAMSchema__["VarStrOuter"], dm.LAMField)
self.assertIsInstance(app1.__LAMSchema__["VarStrOuter"], dm.FrozenLAMField)
self.assertIn("Feature1", app1.__LAMSchema__["features"])
self.assertIn(
"VarStrInner", app1.__LAMSchema__["features"]["Feature1"].__LAMSchema__
)
self.assertIsInstance(Appliance1.__lam_schema__["VarStrOuter"], dm.LAMField)
self.assertTrue(app1.__lam_schema__["VarStrOuter"].is_frozen())
self.assertIn("Feature1", app1.__lam_schema__["features"])
self.assertIn("VarStrInner", app1.__lam_schema__["features"]["Feature1"].__lam_schema__)
self.assertIsInstance(
app1.__LAMSchema__["features"]["Feature1"].__LAMSchema__["VarStrInner"],
app1.__lam_schema__["features"]["Feature1"].__lam_schema__["VarStrInner"],
dm.LAMField,
)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(
app1.Feature1.__LAMSchema__["VarStrInner"], dm.FrozenLAMField
)
self.assertTrue(app1.Feature1.__lam_schema__["VarStrInner"].is_frozen())
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
self.assertEqual(app1.VarStrOuter, "testvalue APPLIANCE1")
self.assertEqual(app1.Feature1.VarStrInner, "testvalue FEATURE1")
@@ -139,6 +133,12 @@ class FeatureTest(unittest.TestCase):
self.assertEqual(app3.Feature1.VarInt, 42)
self.assertEqual(app3.Feature3.VarStrInner, "testvalue FEATURE3")
class Appliance4(Appliance3):
VarStrOuter = "testvalue APPLIANCE4"
class Feature1(Appliance3.Feature1):
VarStrInner = "testvalue FEATURE1 modded 3"
def test_inheritance2(self):
"""Testing first appliance feature, and Field types (simple)"""
@@ -188,57 +188,6 @@ class FeatureTest(unittest.TestCase):
self.assertEqual(app3.Feature1.VarStrInner, "testvalue FEATURE1")
self.assertEqual(app4.Feature1.VarStrInner, "testvalue FEATURE4")
def test_register(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature):
_BoundAppliance = Appliance1
VarStrInner: str = "testvalue FEATURE1"
app = Appliance1(feat1=Feature1)
self.assertEqual(app.feat1.VarStrInner, "testvalue FEATURE1")
# check it does not leak accross instances
app = Appliance1(feat2=Feature1)
self.assertEqual(app.feat2.VarStrInner, "testvalue FEATURE1")
with self.assertRaises(AttributeError):
app.feat1
def test_register_notbound(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature):
VarStrInner: str = "testvalue FEATURE1"
with self.assertRaises(dm.FeatureNotBound):
Appliance1(feat1=Feature1)
def test_register_defect(self):
class Feature1(dm.Feature):
pass
with self.assertRaises(dm.FeatureNotBound):
Feature1()
def test_new_field_forbidden(self):
class App(dm.Appliance):
x: int = 1
app = App()
with self.assertRaises(dm.NewFieldForbidden):
app.y = 2
def test_inherit_declared(self):
class App(dm.Appliance):
class F1(dm.Feature):
@@ -294,17 +243,6 @@ class FeatureTest(unittest.TestCase):
self.assertEqual(app.F1.val, 2)
self.assertEqual(app.F1.extra, "hello")
def test_not_bound_runtime_attach_fails(self):
class App(dm.Appliance):
pass
class UnboundFeature(dm.Feature):
x: int = 1
# attaching an unbound feature should raise
with self.assertRaises(dm.FeatureNotBound):
App(Unbound=UnboundFeature)
def test_override_does_not_leak_between_instances(self):
class App(dm.Appliance):
class F1(dm.Feature):
@@ -381,19 +319,6 @@ class FeatureTest(unittest.TestCase):
class Appliance8(dm.Appliance):
SomeVar: dict[str, object] = {}
def test_runtime_attach_bound_success(self):
class App(dm.Appliance):
class F1(dm.Feature):
val: int = 1
class Extra(App.F1): # stays bound to App
val = 7
app = App(Extra=Extra)
self.assertTrue(hasattr(app, "Extra"))
self.assertIsInstance(app.Extra, Extra)
self.assertEqual(app.Extra.val, 7)
def test_cant_override_inherited_annotation(self):
class App(dm.Appliance):
class F1(dm.Feature):
@@ -498,9 +423,7 @@ class FeatureTest(unittest.TestCase):
c_ok = C(Feat1=CFeat1Plus)
self.assertIsInstance(c_ok.Feat1, CFeat1Plus)
self.assertEqual(
(c_ok.Feat1.x, c_ok.Feat1.y, c_ok.Feat1.z, c_ok.Feat1.w), (1, 2, 3, 4)
)
self.assertEqual((c_ok.Feat1.x, c_ok.Feat1.y, c_ok.Feat1.z, c_ok.Feat1.w), (1, 2, 3, 4))
# ❌ Not OK: replacing with a subclass of the ancestor (A.Feat1) — must target latest (C.Feat1)
class AFeat1Alt(A.Feat1):
@@ -582,6 +505,8 @@ class FeatureTest(unittest.TestCase):
# ✅ Instances of B use B.F1
b = B()
self.assertIsInstance(b.F1, B.F1)
print(b.F1)
print(dir(b.F1))
self.assertEqual((b.F1.a, b.F1.b), (1, 2))
self.assertFalse(hasattr(b.F1, "c"))
@@ -723,6 +648,181 @@ class FeatureTest(unittest.TestCase):
class F1(dm.Feature):
fail: str = "oops"
def test_initializer(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.F.tag = "test"
cls.F.nums.append(3)
self.assertEqual(App.F.tag, "test")
self.assertEqual(App.F.nums, (1, 2, 3))
def test_initializer_nested(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.tag = "test"
cls.nums.append(3)
self.assertEqual(App.F.tag, "test")
self.assertEqual(App.F.nums, (1, 2, 3))
def test_initializer_nested_dual(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.tag = "test1"
cls.nums.append(3)
@classmethod
def __initializer(cls):
cls.F.tag = "test2"
cls.F.nums.append(4)
self.assertEqual(App.F.tag, "test2")
self.assertEqual(App.F.nums, (1, 2, 3, 4))
def test_container_frozen(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
with self.assertRaises(AttributeError):
App.F.nums.append(3)
def test_container_class_mutable_validation(self):
class App(dm.Appliance, options=(dm.ClassMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
App.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
App.freeze_class()
with self.assertRaises(dm.InvalidFieldValue):
a = App()
def test_container_object_mutable_validation(self):
class App(dm.Appliance, options=(dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
a = App()
a.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a.freeze()
def test_container_object_and_class_mutable_validation(self):
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
App.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a = App()
with self.assertRaises(dm.InvalidFieldValue):
App.freeze_class()
def test_nested_element_container_object_and_class_mutable(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
App.F.e.i.append(21)
self.assertEquals(App.F.e.i, [1, 21])
App.freeze_class()
with self.assertRaises(AttributeError):
App.F.e.i.append(22)
a = App()
self.assertEquals(a.F.e.i, [1, 21])
a.F.e.i.append(28)
self.assertEquals(a.F.e.i, [1, 21, 28])
a.freeze()
self.assertEquals(a.F.e.i, (1, 21, 28))
with self.assertRaises(AttributeError):
a.F.e.i.append(23)
self.assertEquals(a.F.e.i, (1, 21, 28))
def test_nested_element_container_object_and_class_mutable_validation(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
App.F.e.i.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a = App()
def test_nested_element_container_object_and_class_mutable_validation2(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
a = App()
a.F.e.i.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a.freeze()
# ---------- main ----------

166
test/test_feature_bind.py Normal file
View File

@@ -0,0 +1,166 @@
# dabmodel (c) by chacha
#
# dabmodel is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from os import chdir
from pathlib import Path
from typing import (
Any,
Annotated,
)
from dabmodel.appliance import Appliance
print(__name__)
print(__package__)
from src import dabmodel as dm
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
def test_initializer_safe_testfc():
eval("print('hi')")
class FeatureBindTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def test_simple(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
VarStrOuter: str = "testvalue APPLIANCE"
class Feature1(dm.Feature):
VarStrInner: str = "testvalue FEATURE"
app1 = Appliance1()
self.assertIn("Feature1", app1.__lam_schema__["features"])
self.assertTrue(hasattr(app1, "Feature1"))
def test_outside_bind(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature, appliance=Appliance1):
VarStrInner: str = "testvalue FEATURE1"
app = Appliance1(feat1=Feature1)
self.assertEqual(app.feat1.VarStrInner, "testvalue FEATURE1")
# check it does not leak accross instances
app = Appliance1(feat2=Feature1)
self.assertEqual(app.feat2.VarStrInner, "testvalue FEATURE1")
with self.assertRaises(AttributeError):
app.feat1
def test_outside_bind2(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature):
VarStrInner: str = "testvalue FEATURE1"
Feature1.bind_appliance(Appliance1)
app = Appliance1(feat1=Feature1)
self.assertEqual(app.feat1.VarStrInner, "testvalue FEATURE1")
# check it does not leak accross instances
app = Appliance1(feat2=Feature1)
self.assertEqual(app.feat2.VarStrInner, "testvalue FEATURE1")
with self.assertRaises(AttributeError):
app.feat1
def test_bind_inheritance_no_leak(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature):
VarStrInner: str = "testvalue FEATURE1"
class Feature2(Feature1, appliance=Appliance1):
VarStrInner = "testvalue FEATURE2"
app = Appliance1(feat=Feature2)
self.assertEqual(app.feat.VarStrInner, "testvalue FEATURE2")
with self.assertRaises(dm.FeatureNotBound):
app = Appliance1(feat=Feature1)
def test_bind_notbound(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.Appliance):
pass
class Feature1(dm.Feature):
VarStrInner: str = "testvalue FEATURE1"
with self.assertRaises(dm.FeatureNotBound):
Appliance1(feat1=Feature1)
def test_bind_defect(self):
class Feature1(dm.Feature):
pass
with self.assertRaises(dm.FeatureNotBound):
Feature1()
def test_not_bound_runtime_attach_fails(self):
class App(dm.Appliance):
pass
class UnboundFeature(dm.Feature):
x: int = 1
# attaching an unbound feature should raise
with self.assertRaises(dm.FeatureNotBound):
App(Unbound=UnboundFeature)
def test_runtime_attach_bound_success(self):
class App(dm.Appliance):
class F1(dm.Feature):
val: int = 1
class Extra(App.F1): # stays bound to App
val = 7
app = App(Extra=Extra)
self.assertTrue(hasattr(app, "Extra"))
self.assertIsInstance(app.Extra, Extra)
self.assertEqual(app.Extra.val, 7)
# ---------- main ----------
if __name__ == "__main__":
unittest.main()