Compare commits
3 Commits
0.0.1.post
...
0.0.1.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6efd914de1 | ||
|
|
2e81b3f0e6 | ||
|
|
5d206ef266 |
@@ -37,22 +37,35 @@ class Appliance(IAppliance, metaclass=_MetaAppliance):
|
||||
return
|
||||
super()._validate_unknown_field_schema(name)
|
||||
|
||||
def _freeze_unknown_attr(self, name: str):
|
||||
print(f"??????????????? {name}")
|
||||
def _freeze_unknown_attr(self, name: str, force: bool = False):
|
||||
if isinstance(self.__dict__[name], Feature):
|
||||
self.__dict__[name].freeze()
|
||||
self.__dict__[name].freeze(force)
|
||||
return
|
||||
super()._freeze_unknown_attr(name)
|
||||
super()._freeze_unknown_attr(name, force)
|
||||
|
||||
def _freeze_missing_attr(self, name: str):
|
||||
def _freeze_missing_attr(self, name: str, force: bool = False):
|
||||
if name == "features":
|
||||
for k, v in self.__lam_schema__["features"].items():
|
||||
v.freeze_class(force)
|
||||
return
|
||||
super()._freeze_missing_attr(name)
|
||||
super()._freeze_missing_attr(name, force)
|
||||
|
||||
@classmethod
|
||||
def _freeze_unknown_field_schema(cls, name: str):
|
||||
def _freeze_unknown_field_schema(cls, name: str, force: bool = False):
|
||||
if name == "features":
|
||||
for v in cls.__lam_schema__["features"].values():
|
||||
v.freeze_class()
|
||||
v.freeze_class(force)
|
||||
return
|
||||
super()._freeze_unknown_field_schema(name)
|
||||
super()._freeze_unknown_field_schema(name, force)
|
||||
|
||||
@classmethod
|
||||
def _validate_unknown_attr_class(cls, name: str) -> None:
|
||||
if issubclass(cls.__dict__[name], Feature):
|
||||
return
|
||||
super()._validate_unknown_attr_class(name)
|
||||
|
||||
@classmethod
|
||||
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
|
||||
if issubclass(cls.__dict__[name], Feature):
|
||||
return
|
||||
super()._freeze_unknown_attr_class(name, force)
|
||||
|
||||
@@ -3,7 +3,6 @@ from typing import Any, Self, Dict, Optional
|
||||
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
|
||||
from copy import deepcopy
|
||||
|
||||
# from .LAMFields.FrozenLAMField import FrozenLAMField
|
||||
from .lam_field.lam_field import LAMField, LAMField_Element
|
||||
from .exception import ReadOnlyField, SchemaViolation, NonExistingField, InvalidFieldValue
|
||||
from .tools import LAMdeepfreeze, is_data_attribute
|
||||
@@ -129,9 +128,7 @@ class BaseElement:
|
||||
return ElementViewCls(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__modules__)
|
||||
"""
|
||||
|
||||
def clone_as_mutable_variant(
|
||||
self, *, deep: bool = True, _memo: Dict[int, Self] | None = None
|
||||
) -> Self:
|
||||
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
|
||||
raise NotImplemented()
|
||||
|
||||
@classmethod
|
||||
@@ -163,7 +160,6 @@ class BaseElement:
|
||||
return super().__setattr__(key, value)
|
||||
|
||||
def freeze(self, force: bool = False) -> None:
|
||||
print("Freezing instance")
|
||||
if self.__lam_object_mutable__ or force:
|
||||
if self.__lam_object_mutable__:
|
||||
self.validate_schema()
|
||||
@@ -172,27 +168,27 @@ class BaseElement:
|
||||
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
|
||||
|
||||
for k_unknown in setInstanceKeys - setSchemaKeys:
|
||||
self._freeze_unknown_attr(k_unknown)
|
||||
self._freeze_unknown_attr(k_unknown, force)
|
||||
|
||||
for k_missing in setSchemaKeys - setInstanceKeys:
|
||||
self._freeze_missing_attr(k_missing)
|
||||
self._freeze_missing_attr(k_missing, force)
|
||||
|
||||
for k in list(setSchemaKeys & setInstanceKeys):
|
||||
self.__lam_schema__[k].freeze()
|
||||
if isinstance(self.__dict__[k], BaseElement):
|
||||
self.__dict__[k].freeze()
|
||||
self.__dict__[k].freeze(force)
|
||||
else:
|
||||
object.__setattr__(self, k, LAMdeepfreeze(self.__dict__[k]))
|
||||
self.__dict__[k] = LAMdeepfreeze(self.__dict__[k])
|
||||
|
||||
self.__lam_object_mutable__ = False
|
||||
|
||||
def _freeze_unknown_attr(self, name: str) -> None:
|
||||
def _freeze_unknown_attr(self, name: str, force: bool = False) -> None:
|
||||
raise SchemaViolation(f"Attribute <{name}> is not in the schema")
|
||||
|
||||
def _freeze_missing_attr(self, name: str) -> None:
|
||||
def _freeze_missing_attr(self, name: str, force: bool = False) -> None:
|
||||
raise SchemaViolation(f"Attribute <{name}> is missing from instance")
|
||||
|
||||
def validate_schema(self) -> None:
|
||||
print("Validating instance schema")
|
||||
setSchemaKeys = set(self.__lam_schema__)
|
||||
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
|
||||
|
||||
@@ -213,33 +209,31 @@ class BaseElement:
|
||||
|
||||
@classmethod
|
||||
def freeze_class(cls, force: bool = False) -> None:
|
||||
print("Freezing class")
|
||||
if cls.__lam_class_mutable__ or force:
|
||||
cls.validate_schema_class()
|
||||
|
||||
# class should not have any elements so they are all unknown
|
||||
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
|
||||
cls._freeze_unknown_attr_class(k_unknown)
|
||||
cls._freeze_unknown_attr_class(k_unknown, force)
|
||||
|
||||
for k, v in cls.__lam_schema__.items():
|
||||
if isinstance(v, LAMField):
|
||||
cls.__lam_schema__[k].freeze()
|
||||
else:
|
||||
cls._freeze_unknown_field_schema(k)
|
||||
cls._freeze_unknown_field_schema(k, force)
|
||||
|
||||
cls.__lam_class_mutable__ = False
|
||||
|
||||
@classmethod
|
||||
def _freeze_unknown_attr_class(cls, name: str) -> None:
|
||||
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
|
||||
raise SchemaViolation(f"Class attribute <{name}> is not in the schema")
|
||||
|
||||
@classmethod
|
||||
def _freeze_unknown_field_schema(cls, name: str) -> None:
|
||||
def _freeze_unknown_field_schema(cls, name: str, force: bool = False) -> None:
|
||||
raise SchemaViolation(f"Unknown field <{name} in the schema> ")
|
||||
|
||||
@classmethod
|
||||
def validate_schema_class(cls) -> None:
|
||||
print("Valisating class schema")
|
||||
# class should not have any elements so they are all unknown
|
||||
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
|
||||
cls._validate_unknown_attr_class(k_unknown)
|
||||
|
||||
@@ -2,14 +2,14 @@ from typing import Any, Union, Optional, List, Dict, Tuple, Set, FrozenSet, Anno
|
||||
from types import SimpleNamespace
|
||||
import math
|
||||
|
||||
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
|
||||
ALLOWED_MODEL_FIELDS_TYPES: set[type[Any], ...] = {
|
||||
str,
|
||||
int,
|
||||
float,
|
||||
complex,
|
||||
bool,
|
||||
bytes,
|
||||
)
|
||||
}
|
||||
|
||||
ALLOWED_ANNOTATIONS: dict[str, Any] = {
|
||||
"Union": Union,
|
||||
|
||||
@@ -80,30 +80,30 @@ class ReadOnlyFieldAnnotation(AttributeError, DABModelException):
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFieldValue(AttributeError, DABModelException):
|
||||
class SchemaViolation(AttributeError, DABModelException):
|
||||
"""SchemaViolation Exception class
|
||||
The Element Schema is not respected
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFieldValue(SchemaViolation):
|
||||
"""InvalidFieldValue Exception class
|
||||
The Field value is invalid
|
||||
"""
|
||||
|
||||
|
||||
class NonExistingField(SchemaViolation):
|
||||
"""NonExistingField Exception class
|
||||
The given Field is non existing
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFieldName(AttributeError, DABModelException):
|
||||
"""InvalidFieldName Exception class
|
||||
The Field name is invalid
|
||||
"""
|
||||
|
||||
|
||||
class NonExistingField(AttributeError, DABModelException):
|
||||
"""NonExistingField Exception class
|
||||
The given Field is non existing
|
||||
"""
|
||||
|
||||
|
||||
class SchemaViolation(AttributeError, DABModelException):
|
||||
"""SchemaViolation Exception class
|
||||
The Element Schema is not respected
|
||||
"""
|
||||
|
||||
|
||||
class ImportForbidden(DABModelException):
|
||||
"""ImportForbidden Exception class
|
||||
Imports are forbidden
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Generic, TypeVar, Dict, Protocol, runtime_checkable
|
||||
from typing import Generic, TypeVar, Dict, Protocol, runtime_checkable, Self
|
||||
|
||||
|
||||
TV_Freezable = TypeVar("TV_Freezable")
|
||||
@@ -7,9 +7,7 @@ TV_Freezable = TypeVar("TV_Freezable")
|
||||
@runtime_checkable
|
||||
class FreezableElement(Protocol, Generic[TV_Freezable]):
|
||||
|
||||
def clone_as_mutable_variant(
|
||||
self, *, deep: bool = True, _memo: Dict[int, TV_Freezable] | None = None
|
||||
) -> TV_Freezable:
|
||||
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
|
||||
pass
|
||||
|
||||
def freeze(self, force: bool = False) -> None:
|
||||
@@ -23,7 +21,7 @@ class FreezableElement(Protocol, Generic[TV_Freezable]):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def validate_schema_class(self) -> None:
|
||||
def validate_schema_class(cls) -> None:
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Generic, TypeVar, Optional, Any, Self
|
||||
from typing import Generic, TypeVar, Optional, Any, Self, Annotated, get_origin, get_args
|
||||
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
|
||||
from copy import deepcopy
|
||||
from .lam_field_info import LAMFieldInfo
|
||||
@@ -13,19 +13,29 @@ TV_LABField = TypeVar("TV_LABField")
|
||||
class LAMField(Generic[TV_LABField]):
|
||||
"""This class describe a Field in Schema"""
|
||||
|
||||
def __init__(self, name: str, val: Optional[TV_LABField], a: Any, i: LAMFieldInfo):
|
||||
def __init__(self, name: str, val: Optional[TV_LABField], ann: Any, i: LAMFieldInfo):
|
||||
self._default_value: Optional[TV_LABField]
|
||||
self._value: Optional[TV_LABField]
|
||||
self.__annotations: Any
|
||||
self.__name: str = name
|
||||
self.__source: Optional[type] = None
|
||||
self.__info: LAMFieldInfo = deepcopy(i)
|
||||
self.__annotations: Any = LAMdeepfreeze(a)
|
||||
self._set_annotations(ann)
|
||||
self.__frozen: bool = False
|
||||
self._frozen_value: Any = None
|
||||
self.__frozen_value_set: True = False
|
||||
self.validate(val)
|
||||
self._init_value(val)
|
||||
|
||||
def _set_annotations(self, ann: Any) -> None:
|
||||
_origin = get_origin(ann) or ann
|
||||
_args = get_args(ann)
|
||||
|
||||
if _origin is Annotated:
|
||||
self.__annotations: Any = LAMdeepfreeze(_args[0])
|
||||
else:
|
||||
self.__annotations: Any = LAMdeepfreeze(ann)
|
||||
|
||||
def _init_value(self, val: Optional[TV_LABField | FreezableElement]):
|
||||
self._default_value: Optional[TV_LABField] = deepcopy(val)
|
||||
self._value: Optional[TV_LABField] = val
|
||||
@@ -38,11 +48,10 @@ class LAMField(Generic[TV_LABField]):
|
||||
return self.__frozen
|
||||
|
||||
def freeze(self):
|
||||
print(f"Freezing field : {self.name}")
|
||||
self.__frozen = True
|
||||
|
||||
def clone_unfrozen(self) -> Self:
|
||||
field = LAMField(self.__name, self._default_value, self.__annotations, self.__info)
|
||||
field = LAMFieldFactory.create_field(self.__name, self._default_value, self.__annotations, self.__info)
|
||||
field.update_value(self._value)
|
||||
return field
|
||||
|
||||
@@ -80,9 +89,7 @@ class LAMField(Generic[TV_LABField]):
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{self.__name}> is not of expected type {self.annotations}."
|
||||
) from exp
|
||||
raise InvalidFieldValue(f"Value of Field <{self.__name}> is not of expected type {self.annotations}.") from exp
|
||||
|
||||
@property
|
||||
def default_value(self) -> Any:
|
||||
@@ -160,11 +167,10 @@ class LAMField_Element(LAMField[FreezableElement]):
|
||||
|
||||
class LAMFieldFactory:
|
||||
@staticmethod
|
||||
def create_field(
|
||||
name: str, val: Optional[TV_LABField], anno: Any, info: LAMFieldInfo
|
||||
) -> LAMField:
|
||||
print(f"Constructing a Field for {name} -> {val}")
|
||||
def create_field(name: str, val: Optional[TV_LABField], anno: Any, info: LAMFieldInfo) -> LAMField:
|
||||
if isinstance(val, FreezableElement):
|
||||
print(f"Spawn LAMField_Element {name} !!!")
|
||||
return LAMField_Element(name, val, anno, info)
|
||||
else:
|
||||
print(f"Spawn LAMField {name} !!!")
|
||||
return LAMField(name, val, anno, info)
|
||||
|
||||
@@ -41,8 +41,7 @@ class _MetaAppliance(_MetaElement):
|
||||
):
|
||||
super().inherit_schema(name, base, namespace, stack_exts)
|
||||
if "features" in base.__lam_schema__:
|
||||
namespace["__lam_schema__"]["features"] = {}
|
||||
namespace["__lam_schema__"]["features"].update(base.__lam_schema__["features"])
|
||||
namespace["__lam_schema__"]["features"] = dict(base.__lam_schema__["features"])
|
||||
|
||||
@classmethod
|
||||
def process_class_fields(
|
||||
@@ -86,15 +85,11 @@ class _MetaAppliance(_MetaElement):
|
||||
raise InvalidFieldName("'feature' is a reserved Field name")
|
||||
if _fname in namespace["__lam_schema__"]["features"]:
|
||||
if not issubclass(_fvalue, namespace["__lam_schema__"]["features"][_fname]):
|
||||
raise InvalidFeatureInheritance(
|
||||
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
|
||||
)
|
||||
raise InvalidFeatureInheritance(f"Feature {_fname} is not a subclass of {bases[0]}.{_fname}")
|
||||
stack_exts["modified_features"][_fname] = get_mutable_variant(_fvalue)
|
||||
# stack_exts["modified_features"][_fname] = _fvalue
|
||||
namespace[_fname] = stack_exts["modified_features"][_fname]
|
||||
elif isinstance(_fvalue, type) and issubclass(_fvalue, Feature):
|
||||
stack_exts["new_features"][_fname] = get_mutable_variant(_fvalue)
|
||||
# stack_exts["new_features"][_fname] = _fvalue
|
||||
namespace[_fname] = stack_exts["new_features"][_fname]
|
||||
else:
|
||||
super().process_new_field(name, bases, namespace, _fname, _fvalue, stack_exts) # type: ignore[misc]
|
||||
@@ -133,12 +128,7 @@ class _MetaAppliance(_MetaElement):
|
||||
init_fieldtypes: dict[str, Any],
|
||||
stack_exts: dict[str, Any],
|
||||
):
|
||||
print(f"prepare_initializer_fields")
|
||||
print(init_fieldvalues)
|
||||
print(init_fieldtypes)
|
||||
|
||||
for k, v in cls.__lam_schema__["features"].items():
|
||||
# mutable_cls = get_mutable_variant(v)
|
||||
init_fieldvalues[k] = v
|
||||
init_fieldtypes[k] = v
|
||||
|
||||
@@ -152,51 +142,30 @@ class _MetaAppliance(_MetaElement):
|
||||
fakecls_exports: dict[str, Any],
|
||||
stack_exts: dict[str, Any],
|
||||
):
|
||||
print(f"commit_initializer_fields {cls}")
|
||||
from pprint import pprint
|
||||
|
||||
pprint(dir(cls))
|
||||
print(fakecls_exports)
|
||||
for fk, fv in cls.__lam_schema__["features"].items():
|
||||
print(f"Setting value for {fk} from {fakecls_exports[fk]}")
|
||||
# cls.__lam_schema__["features"][k] = fakecls_exports[k] # .freeze_class()
|
||||
|
||||
for k, v in fv.__lam_schema__.items():
|
||||
# print(f"collected value: {fakecls_exports[fk].__dict__[k]}")
|
||||
v.update_value(fakecls_exports[fk].__getattr__(k))
|
||||
# cls.__dict__[k] = fakecls_exports[k]
|
||||
|
||||
@classmethod
|
||||
def finalize_class(
|
||||
mcs: type["_MetaElement"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
stack_exts: dict[str, Any],
|
||||
):
|
||||
cls.__lam_schema__["features"] = frozendict(cls.__lam_schema__["features"])
|
||||
|
||||
super().finalize_class(cls, name, bases, namespace, stack_exts)
|
||||
if not cls.__lam_class_mutable__:
|
||||
for feat in cls.__lam_schema__["features"].values():
|
||||
feat.freeze_class(True)
|
||||
|
||||
def populate_instance(cls: Type, obj: Any, stack_exts: dict[str, Any], *args: Any, **kw: Any):
|
||||
super().populate_instance(obj, stack_exts, *args, **kw)
|
||||
obj.__lam_schema__["features"] = dict(cls.__lam_schema__["features"])
|
||||
|
||||
def finalize_instance(cls: Type, obj, stack_exts: dict[str, Any]):
|
||||
"""
|
||||
Instantiate and attach all features declared (or overridden) in the instance schema.
|
||||
Handles:
|
||||
- Declared features (plain class)
|
||||
- Subclass replacements
|
||||
- Dict overrides (class + patch dict)
|
||||
"""
|
||||
super().finalize_instance(obj, stack_exts)
|
||||
for k, v in obj.__lam_schema__["features"].items():
|
||||
# Case 1: plain class or subclass
|
||||
if isinstance(v, type) and issubclass(v, Feature):
|
||||
inst = v()
|
||||
inst.freeze()
|
||||
object.__setattr__(obj, k, inst)
|
||||
|
||||
# Case 2: (class, dict) → dict overrides
|
||||
elif isinstance(v, tuple) and len(v) == 2:
|
||||
feat_cls, overrides = v
|
||||
inst = feat_cls(**overrides)
|
||||
inst.freeze()
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__lam_schema__["features"][k] = feat_cls
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(f"Invalid feature definition stored for '{k}': {fdef!r}")
|
||||
|
||||
def apply_overrides(cls, obj, stack_exts, *args, **kwargs):
|
||||
"""
|
||||
Support for runtime field and feature overrides.
|
||||
@@ -211,15 +180,17 @@ class _MetaAppliance(_MetaElement):
|
||||
|
||||
# --- feature overrides ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__lam_schema__["features"]:
|
||||
base_feat_cls = cls.__lam_schema__["features"][k]
|
||||
if k in obj.__lam_schema__["features"]:
|
||||
base_feat_cls = obj.__lam_schema__["features"][k]
|
||||
|
||||
# Case 1: subclass replacement (inheritance)
|
||||
if isinstance(v, type) and issubclass(v, base_feat_cls):
|
||||
v.check_appliance_compatibility(cls)
|
||||
|
||||
# record subclass into instance schema
|
||||
obj.__lam_schema__["features"][k] = v
|
||||
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
|
||||
if not obj.__lam_class_mutable__:
|
||||
obj.__lam_schema__["features"][k].freeze_class(True)
|
||||
kwargs.pop(k)
|
||||
|
||||
# Case 2: dict override
|
||||
@@ -229,27 +200,43 @@ class _MetaAppliance(_MetaElement):
|
||||
kwargs.pop(k)
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(
|
||||
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
|
||||
)
|
||||
raise InvalidFieldValue(f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}")
|
||||
|
||||
# --- new features not declared at class level ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if isinstance(v, type) and issubclass(v, Feature):
|
||||
v.check_appliance_compatibility(cls)
|
||||
obj.__lam_schema__["features"][k] = v
|
||||
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
|
||||
if not obj.__lam_class_mutable__:
|
||||
obj.__lam_schema__["features"][k].freeze_class(True)
|
||||
kwargs.pop(k)
|
||||
|
||||
super().apply_overrides(obj, stack_exts, *args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def finalize_class(
|
||||
mcs: type["_MetaElement"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
stack_exts: dict[str, Any],
|
||||
):
|
||||
cls.__lam_schema__["features"] = frozendict(cls.__lam_schema__["features"])
|
||||
super().finalize_class(cls, name, bases, namespace, stack_exts)
|
||||
def finalize_instance(cls: Type, obj, stack_exts: dict[str, Any]):
|
||||
"""
|
||||
Instantiate and attach all features declared (or overridden) in the instance schema.
|
||||
Handles:
|
||||
- Declared features (plain class)
|
||||
- Subclass replacements
|
||||
- Dict overrides (class + patch dict)
|
||||
"""
|
||||
|
||||
for k, v in obj.__lam_schema__["features"].items():
|
||||
# Case 1: plain class or subclass
|
||||
if isinstance(v, type) and issubclass(v, Feature):
|
||||
inst = v()
|
||||
object.__setattr__(obj, k, inst)
|
||||
|
||||
# Case 2: (class, dict) → dict overrides
|
||||
elif isinstance(v, tuple) and len(v) == 2:
|
||||
feat_cls, overrides = v
|
||||
inst = feat_cls(**overrides)
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__lam_schema__["features"][k] = feat_cls
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(f"Invalid feature definition stored for '{k}': {fdef!r}")
|
||||
|
||||
obj.__lam_schema__["features"] = frozendict(obj.__lam_schema__["features"])
|
||||
super().finalize_instance(obj, stack_exts)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Optional, TypeVar, get_origin, get_args, Any, Type, Union, Dict
|
||||
from typing import Optional, TypeVar, get_origin, get_args, Any, Type, Union, Dict, Annotated
|
||||
|
||||
from types import FunctionType, UnionType, new_class
|
||||
from copy import deepcopy, copy
|
||||
@@ -9,8 +9,8 @@ import inspect, ast, textwrap
|
||||
|
||||
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
||||
from frozendict import frozendict
|
||||
from ..tools import _resolve_annotation, _peel_annotated
|
||||
from ..lam_field.lam_field import LAMFieldFactory, LAMField
|
||||
from ..tools import _resolve_annotation
|
||||
from ..lam_field.lam_field import LAMFieldFactory, LAMField, LAMField_Element
|
||||
from ..lam_field.lam_field_info import LAMFieldInfo
|
||||
|
||||
from ..defines import ALLOWED_HELPERS_MATH, ALLOWED_HELPERS_DEFAULT, ALLOWED_MODEL_FIELDS_TYPES
|
||||
@@ -80,9 +80,7 @@ def get_mutable_variant(base: Type[BaseElement]) -> Type[BaseElement]:
|
||||
for b in base.__bases__:
|
||||
if b in (IElement, IFeature, IAppliance):
|
||||
if root_base:
|
||||
raise BrokenInheritance(
|
||||
f"Multiple exclusive root bases (previous {root_base}, now {b}"
|
||||
)
|
||||
raise BrokenInheritance(f"Multiple exclusive root bases (previous {root_base}, now {b}")
|
||||
else:
|
||||
root_base = b
|
||||
elif b is BaseElement:
|
||||
@@ -135,9 +133,7 @@ def get_mutable_variant(base: Type[BaseElement]) -> Type[BaseElement]:
|
||||
|
||||
class IBaseElement(BaseElement):
|
||||
|
||||
def clone_as_mutable_variant(
|
||||
self, *, deep: bool = True, _memo: Dict[int, BaseElement] | None = None
|
||||
) -> BaseElement:
|
||||
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, BaseElement] | None = None) -> BaseElement:
|
||||
|
||||
if isinstance(self, type):
|
||||
raise WrongUsage("clone_as_mutable_variant can only be applied to an instance")
|
||||
@@ -180,45 +176,48 @@ class IFeature(IBaseElement): ...
|
||||
class IAppliance(IBaseElement): ...
|
||||
|
||||
|
||||
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
|
||||
_type,
|
||||
):
|
||||
# print(f"_type={_type}")
|
||||
_type = _peel_annotated(_type)
|
||||
def _check_annotation_definition(_type, first_layer: bool = True): # pylint: disable=too-complex,too-many-return-statements
|
||||
print(f"_type={_type}, {first_layer}")
|
||||
|
||||
_origin = get_origin(_type) or _type
|
||||
_args = get_args(_type)
|
||||
|
||||
# handle Optional[] and Union[None,...]
|
||||
if (_origin is Union or _origin is UnionType) and type(None) in _args:
|
||||
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
|
||||
# handle Annotated[,]
|
||||
if _origin is Annotated:
|
||||
if not first_layer:
|
||||
raise UnsupportedFieldType("Annotated[] is only supported as parent annotation")
|
||||
return _check_annotation_definition(_args[0], False)
|
||||
|
||||
# handle other Union[...]
|
||||
# handle Optional[] and Union[None,...]
|
||||
if _origin is Union or _origin is UnionType:
|
||||
return all(_check_annotation_definition(_) for _ in _args)
|
||||
if (len(_args) != 2) or (type(None) not in list(_args)) or (not first_layer):
|
||||
raise UnsupportedFieldType(
|
||||
"Union[] is only supported to implement Optional (takes 2 parameters) and is only supported as parent annotation"
|
||||
)
|
||||
return any(_check_annotation_definition(_, False) for _ in get_args(_type) if _ is not type(None))
|
||||
|
||||
# handle Dict[...]
|
||||
if _origin is dict:
|
||||
if len(_args) != 2:
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Dict Annotation requires 2 inner definitions: {_type}"
|
||||
)
|
||||
if not _peel_annotated(_args[0]) in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
|
||||
if not _args[0] in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {_type}")
|
||||
return _check_annotation_definition(_args[1])
|
||||
# return _check_annotation_definition(_args[1], False)
|
||||
return any(_check_annotation_definition(_, False) for _ in _args)
|
||||
|
||||
# handle Tuple[]
|
||||
if _origin is tuple:
|
||||
if len(_args) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
||||
if len(_args) == 2 and _args[1] is Ellipsis:
|
||||
return _check_annotation_definition(_args[0])
|
||||
return all(_check_annotation_definition(_) for _ in _args)
|
||||
return _check_annotation_definition(_args[0], False)
|
||||
return any(_check_annotation_definition(_, False) for _ in _args)
|
||||
|
||||
# handle Set[],Tuple[],FrozenSet[],List[]
|
||||
if _origin in [set, frozenset, tuple, list]:
|
||||
if len(_args) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
||||
return all(_check_annotation_definition(_) for _ in _args)
|
||||
return any(_check_annotation_definition(_, False) for _ in _args)
|
||||
|
||||
if isinstance(_origin, type):
|
||||
if issubclass(_origin, IElement):
|
||||
@@ -226,9 +225,9 @@ def _check_annotation_definition( # pylint: disable=too-complex,too-many-return
|
||||
elif issubclass(_origin, IAppliance):
|
||||
raise UnsupportedFieldType(f"Nested Appliance are not supported: {_type}")
|
||||
|
||||
if _type in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
if _origin in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
return
|
||||
raise UnsupportedFieldType(_type)
|
||||
raise UnsupportedFieldType(_origin)
|
||||
|
||||
|
||||
def _check_initializer_safety(func) -> None:
|
||||
@@ -253,11 +252,7 @@ def _check_initializer_safety(func) -> None:
|
||||
|
||||
# Find the FunctionDef node that corresponds to this initializer
|
||||
init_node = next(
|
||||
(
|
||||
n
|
||||
for n in mod.body
|
||||
if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) and n.name == func.__name__
|
||||
),
|
||||
(n for n in mod.body if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) and n.name == func.__name__),
|
||||
None,
|
||||
)
|
||||
if init_node is None:
|
||||
@@ -282,9 +277,7 @@ def _check_initializer_safety(func) -> None:
|
||||
continue
|
||||
if isinstance(val, (int, str, float, bool, type(None))):
|
||||
continue
|
||||
raise FunctionForbidden(
|
||||
f"Closures are forbidden in __initializer__ (captured: {name}={val!r})"
|
||||
)
|
||||
raise FunctionForbidden(f"Closures are forbidden in __initializer__ (captured: {name}={val!r})")
|
||||
|
||||
|
||||
def _blocked_import(*args, **kwargs):
|
||||
@@ -424,17 +417,12 @@ class _MetaElement(type):
|
||||
|
||||
This runs before the class object is created.
|
||||
"""
|
||||
from pprint import pprint
|
||||
|
||||
print(f"__NEW__ Defining {name}, bases {bases}")
|
||||
pprint(namespace)
|
||||
pprint(stack_exts["kwargs"]["options"])
|
||||
|
||||
if len(bases) > 1:
|
||||
if _MutableClone not in stack_exts["kwargs"]["options"]:
|
||||
raise MultipleInheritanceForbidden(
|
||||
f"Multiple inheritance is not supported by dabmodel: {bases}"
|
||||
)
|
||||
raise MultipleInheritanceForbidden(f"Multiple inheritance is not supported by dabmodel: {bases}")
|
||||
|
||||
if len(bases) == 0:
|
||||
raise BrokenInheritance(f"missing base class")
|
||||
@@ -456,9 +444,6 @@ class _MetaElement(type):
|
||||
namespace["__lam_class_mutable__"] = ClassMutable in stack_exts["kwargs"]["options"]
|
||||
namespace["__lam_object_mutable__"] = ObjectMutable in stack_exts["kwargs"]["options"]
|
||||
namespace["__lam_options__"] = stack_exts["kwargs"]["options"]
|
||||
pprint(stack_exts["kwargs"]["options"])
|
||||
pprint(namespace["__lam_class_mutable__"])
|
||||
pprint(namespace["__lam_object_mutable__"])
|
||||
|
||||
@classmethod
|
||||
def inherit_schema( # pylint: disable=too-complex,too-many-branches
|
||||
@@ -530,9 +515,7 @@ class _MetaElement(type):
|
||||
else:
|
||||
print(f"Staging Field: {k} / {v}")
|
||||
# Modified fields (already in parent's schema as LAMField)
|
||||
if k in namespace["__lam_schema__"] and isinstance(
|
||||
namespace["__lam_schema__"][k], LAMField
|
||||
):
|
||||
if k in namespace["__lam_schema__"] and isinstance(namespace["__lam_schema__"][k], LAMField):
|
||||
mcs.process_modified_field(name, bases, namespace, k, v, stack_exts)
|
||||
# New fields (others)
|
||||
else:
|
||||
@@ -564,9 +547,7 @@ class _MetaElement(type):
|
||||
"""
|
||||
# forbid already existing Field's schema from being modified
|
||||
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
|
||||
raise ReadOnlyFieldAnnotation(
|
||||
f"annotations cannot be modified on derived classes {_fname}"
|
||||
)
|
||||
raise ReadOnlyFieldAnnotation(f"annotations cannot be modified on derived classes {_fname}")
|
||||
# validate the new value
|
||||
namespace["__lam_schema__"][_fname].validate(_fvalue)
|
||||
# stage the new value
|
||||
@@ -597,9 +578,7 @@ class _MetaElement(type):
|
||||
|
||||
# check if annotations' format is correct and save them
|
||||
if isinstance(namespace["__annotations__"][_fname], str):
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(
|
||||
namespace["__annotations__"][_fname]
|
||||
)
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
|
||||
|
||||
# effectively checking the value is conform to annotations
|
||||
try:
|
||||
@@ -607,31 +586,23 @@ class _MetaElement(type):
|
||||
except InvalidFieldAnnotation:
|
||||
raise
|
||||
except Exception as ex:
|
||||
raise InvalidFieldAnnotation(
|
||||
f"Field <{_fname}> has not an allowed or valid annotation."
|
||||
) from ex
|
||||
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.") from ex
|
||||
|
||||
# extracting LAMFieldInfo from Annotated[] annotations
|
||||
_finfo: LAMFieldInfo = LAMFieldInfo()
|
||||
origin = get_origin(namespace["__annotations__"][_fname])
|
||||
tname = (
|
||||
getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
)
|
||||
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
if "Annotated" in tname:
|
||||
args = get_args(namespace["__annotations__"][_fname])
|
||||
if args:
|
||||
if len(args) > 2:
|
||||
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
|
||||
if len(args) == 2 and not issubclass(type(args[1]), LAMFieldInfo):
|
||||
raise InvalidFieldAnnotation(
|
||||
"Only LAMFieldInfo object is allowed as Annotated data."
|
||||
)
|
||||
raise InvalidFieldAnnotation("Only LAMFieldInfo object is allowed as Annotated data.")
|
||||
|
||||
_finfo = args[1]
|
||||
# stage the new field
|
||||
stack_exts["new_fields"][_fname] = LAMFieldFactory.create_field(
|
||||
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
|
||||
)
|
||||
stack_exts["new_fields"][_fname] = LAMFieldFactory.create_field(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
|
||||
|
||||
@classmethod
|
||||
def commit_fields(
|
||||
@@ -714,9 +685,7 @@ class _MetaElement(type):
|
||||
clone = deepcopy(v.value)
|
||||
init_fieldvalues[k] = clone
|
||||
init_fieldtypes[k] = v.annotations
|
||||
mcs.prepare_initializer_fields(
|
||||
cls, name, bases, namespace, init_fieldvalues, init_fieldtypes, stack_exts
|
||||
)
|
||||
mcs.prepare_initializer_fields(cls, name, bases, namespace, init_fieldvalues, init_fieldtypes, stack_exts)
|
||||
# creating a fake class container to hold the context
|
||||
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
|
||||
# fakecls = cls
|
||||
@@ -772,21 +741,18 @@ class _MetaElement(type):
|
||||
if not cls.__lam_class_mutable__:
|
||||
raise ReadOnlyField(f"Class is immutable.")
|
||||
|
||||
field = cls.__lam_schema__[name]
|
||||
field.update_value(value)
|
||||
cls.__lam_schema__[name].update_value(value)
|
||||
return
|
||||
|
||||
def __getattr__(cls, name) -> Any:
|
||||
if (
|
||||
hasattr(cls, "__lam_initialized__")
|
||||
and getattr(cls, "__lam_initialized__")
|
||||
and name in cls.__lam_schema__
|
||||
):
|
||||
if hasattr(cls, "__lam_initialized__") and getattr(cls, "__lam_initialized__") and name in cls.__lam_schema__:
|
||||
return cls.__lam_schema__[name].value
|
||||
raise NonExistingField(f"Non existing class attribute: {name}")
|
||||
|
||||
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseElement new instance"""
|
||||
cls.validate_schema_class()
|
||||
|
||||
obj = super().__call__(*args)
|
||||
|
||||
# create a dict to pass contextual arg onto the stack (multithread safe class init)
|
||||
@@ -796,8 +762,7 @@ class _MetaElement(type):
|
||||
cls.populate_instance(obj, stack_exts, *args, **kw)
|
||||
cls.apply_overrides(obj, stack_exts, *args, **kw)
|
||||
cls.finalize_instance(obj, stack_exts)
|
||||
if not cls.__lam_object_mutable__:
|
||||
obj.freeze(True)
|
||||
|
||||
return obj
|
||||
|
||||
def populate_instance(cls: Type, obj: Any, stack_exts: dict[str, Any], *args: Any, **kw: Any):
|
||||
@@ -827,13 +792,35 @@ class _MetaElement(type):
|
||||
"""
|
||||
|
||||
# --- field overrides (unchanged) ---
|
||||
print("!!!???????")
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in obj.__lam_schema__: # regular field
|
||||
obj.__lam_schema__[k].update_value(v)
|
||||
if not obj.__lam_object_mutable__:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].frozen_value)
|
||||
print(f"??????? {k} {v}")
|
||||
print(obj.__lam_schema__[k])
|
||||
|
||||
if isinstance(obj.__lam_schema__[k], LAMField_Element):
|
||||
valid_val = True
|
||||
try:
|
||||
obj.__lam_schema__[k].validate(v)
|
||||
except InvalidFieldValue:
|
||||
valid_val = False
|
||||
if valid_val:
|
||||
obj.__lam_schema__[k].update_value(v)
|
||||
if not obj.__lam_object_mutable__:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].frozen_value)
|
||||
else:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].value)
|
||||
elif isinstance(v, dict):
|
||||
raise RuntimeError("initializing Elem with dict is not supported yet")
|
||||
else:
|
||||
raise InvalidFieldValue(f"Element override for '{k}' must be a Feature subclass or dict, got {type(v)}")
|
||||
|
||||
else:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].value)
|
||||
obj.__lam_schema__[k].update_value(v)
|
||||
if not obj.__lam_object_mutable__:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].frozen_value)
|
||||
else:
|
||||
object.__setattr__(obj, k, obj.__lam_schema__[k].value)
|
||||
kwargs.pop(k)
|
||||
|
||||
if kwargs:
|
||||
@@ -849,7 +836,9 @@ class _MetaElement(type):
|
||||
and sets them as attributes on the appliance instance.)
|
||||
"""
|
||||
obj.__lam_schema__ = frozendict(obj.__lam_schema__)
|
||||
|
||||
if not obj.__lam_object_mutable__:
|
||||
for v in obj.__lam_schema__.values():
|
||||
if isinstance(v, LAMField):
|
||||
v.freeze()
|
||||
obj.freeze(True)
|
||||
|
||||
@@ -1,16 +1,21 @@
|
||||
"""library's internal tools"""
|
||||
|
||||
from typing import Any, get_origin, get_args
|
||||
from collections import ChainMap
|
||||
from typing import Any, Annotated, get_origin, get_args, Union, Self, Optional, List, Dict, Tuple, Set, FrozenSet, Mapping, Callable
|
||||
import typing
|
||||
from dataclasses import dataclass
|
||||
from types import UnionType, NoneType
|
||||
import types
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
import json
|
||||
import inspect
|
||||
|
||||
from frozendict import deepfreeze
|
||||
|
||||
from .defines import (
|
||||
ALLOWED_ANNOTATIONS,
|
||||
)
|
||||
from frozendict import deepfreeze, frozendict
|
||||
|
||||
from .defines import ALLOWED_ANNOTATIONS, ALLOWED_MODEL_FIELDS_TYPES
|
||||
from .exception import IncompletelyAnnotatedField, UnsupportedFieldType
|
||||
|
||||
|
||||
class LAMJSONEncoder(json.JSONEncoder):
|
||||
@@ -39,31 +44,336 @@ def LAMdeepfreeze(value):
|
||||
|
||||
|
||||
def is_data_attribute(name: str, value: any) -> bool:
|
||||
if name.startswith("__") and name.endswith("__"):
|
||||
if name.startswith("_"):
|
||||
return False
|
||||
if isinstance(value, (staticmethod, classmethod, property)):
|
||||
return False
|
||||
if inspect.isfunction(value) or inspect.isclass(value) or inspect.ismethoddescriptor(value):
|
||||
if inspect.isfunction(value) or inspect.ismethoddescriptor(value):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _peel_annotated(t: Any) -> Any:
|
||||
# If you ever allow Annotated[T, ...], peel to T
|
||||
while True:
|
||||
origin = get_origin(t)
|
||||
if origin is None:
|
||||
return t
|
||||
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
if "Annotated" in name:
|
||||
args = get_args(t)
|
||||
t = args[0] if args else t
|
||||
else:
|
||||
return t
|
||||
|
||||
|
||||
def _resolve_annotation(ann):
|
||||
if isinstance(ann, str):
|
||||
# Safe eval against a **whitelist** only
|
||||
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
|
||||
return ann
|
||||
|
||||
|
||||
class AnnotationWalkerCtx:
|
||||
def __init__(
|
||||
self,
|
||||
origin: Any,
|
||||
args: Any,
|
||||
layer: int,
|
||||
parent: Optional[Self] = None,
|
||||
allowed_types: set[type, ...] = frozenset(),
|
||||
allowed_annotations: dict[str, Any] = {},
|
||||
):
|
||||
self.__origin = origin
|
||||
self.args = args
|
||||
self.__layer = layer
|
||||
self.__parent = parent
|
||||
|
||||
self.__allowed_types: set[type, ...] = allowed_types
|
||||
self.__allowed_annotations: dict[str, Any] = allowed_annotations
|
||||
self.__ext: dict[Any, ChainMap] = {} # per-trigger namespaces (lazy)
|
||||
|
||||
@property
|
||||
def origin(self) -> Any:
|
||||
return self.__origin
|
||||
|
||||
@property
|
||||
def layer(self) -> int:
|
||||
return self.__layer
|
||||
|
||||
@property
|
||||
def parent(self) -> Self:
|
||||
return self.__parent
|
||||
|
||||
@property
|
||||
def allowed_types(self) -> FrozenSet[type]:
|
||||
return self.__allowed_types
|
||||
|
||||
@property
|
||||
def allowed_annotations(self) -> Mapping[str, Any]:
|
||||
return self.__allowed_annotations
|
||||
|
||||
def ns(self, owner: Any) -> ChainMap:
|
||||
"""
|
||||
A per-trigger overlay namespace that inherits from parent ctx.
|
||||
Use as: bag = ctx.ns(self); bag['whatever'] = ...
|
||||
Lookups fall back to parent's bag automatically.
|
||||
"""
|
||||
if owner in self.__ext:
|
||||
return self.__ext[owner]
|
||||
parent_map = self.__parent.__ext.get(owner) if (self.__parent and hasattr(self.__parent, "_AnnotationWalkerCtx__ext")) else {}
|
||||
cm = ChainMap({}, parent_map if isinstance(parent_map, ChainMap) else dict(parent_map))
|
||||
self.__ext[owner] = cm
|
||||
return cm
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TriggerResult:
|
||||
# If provided, children won't be walked and this value is returned.
|
||||
replace_with: Any | None = None
|
||||
# If true, skip walking children but don't replace current node value.
|
||||
skip_children: bool = False
|
||||
# If provided, walker will restart processing with the given value
|
||||
restart_with: Any | None = None # NEW
|
||||
|
||||
@staticmethod
|
||||
def passthrough() -> Self:
|
||||
return TriggerResult()
|
||||
|
||||
@staticmethod
|
||||
def replace(value: Any) -> Self:
|
||||
return TriggerResult(replace_with=value, skip_children=True)
|
||||
|
||||
@staticmethod
|
||||
def skip() -> Self:
|
||||
return TriggerResult(skip_children=True)
|
||||
|
||||
@staticmethod
|
||||
def restart(value: Any) -> Self:
|
||||
print("Doo!")
|
||||
return TriggerResult(restart_with=value)
|
||||
|
||||
|
||||
class AnnotationTrigger:
|
||||
|
||||
def init_trigger(self) -> None:
|
||||
pass
|
||||
|
||||
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_unknown(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
return None
|
||||
|
||||
|
||||
class LAMSchemaValidation(AnnotationTrigger):
|
||||
def init_trigger(self) -> None:
|
||||
print(f"Initializing {self.__class__.__name__}")
|
||||
|
||||
def process_annotated(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_annotated")
|
||||
print(ctx.origin)
|
||||
print(ctx.args)
|
||||
if len(ctx.args) != 2:
|
||||
raise UnsupportedFieldType("Annotated[T,x] requires 2 parameters")
|
||||
if ctx.parent is not None:
|
||||
raise UnsupportedFieldType("Annotated[T,x] is only supported as parent annotation")
|
||||
return None
|
||||
|
||||
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_union")
|
||||
print(ctx.args)
|
||||
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
|
||||
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
|
||||
return None
|
||||
|
||||
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_dict")
|
||||
if len(ctx.args) != 2:
|
||||
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
|
||||
if not ctx.args[0] in ctx.allowed_types:
|
||||
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
|
||||
return None
|
||||
|
||||
def process_tuple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_tuple")
|
||||
if len(ctx.args) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
||||
return None
|
||||
|
||||
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_list")
|
||||
if len(ctx.args) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
||||
return None
|
||||
|
||||
def process_set(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_set")
|
||||
if len(ctx.args) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {ctx.origin}")
|
||||
return None
|
||||
|
||||
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_allowed")
|
||||
if ctx.origin is type(None) or ctx.origin is None:
|
||||
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
|
||||
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
|
||||
return None
|
||||
|
||||
|
||||
class AnnotationWalker:
|
||||
DEFAULT_ALLOWED_TYPES = frozenset({str, int, float, complex, bool, bytes, NoneType})
|
||||
DEFAULT_ALLOWED_ANNOTATIONS: dict[str, Any] = frozendict(
|
||||
{
|
||||
"Union": Union,
|
||||
"Optional": Optional,
|
||||
"List": List,
|
||||
"Dict": Dict,
|
||||
"Tuple": Tuple,
|
||||
"Set": Set,
|
||||
"FrozenSet": FrozenSet,
|
||||
"Annotated": Annotated,
|
||||
# builtins:
|
||||
"int": int,
|
||||
"str": str,
|
||||
"float": float,
|
||||
"bool": bool,
|
||||
"complex": complex,
|
||||
"bytes": bytes,
|
||||
"None": type(None),
|
||||
"list": list,
|
||||
"dict": dict,
|
||||
"set": set,
|
||||
"frozenset": frozenset,
|
||||
"tuple": tuple,
|
||||
}
|
||||
)
|
||||
|
||||
def __init__(self, ann: Any, triggers: tuple[AnnotationTrigger, ...], **kwargs):
|
||||
if not triggers:
|
||||
raise RuntimeError("AnnotationWalker requires trigger(s)")
|
||||
|
||||
# Normalize triggers into instances
|
||||
insts: list[AnnotationTrigger] = []
|
||||
for t in triggers if isinstance(triggers, tuple) else (triggers,):
|
||||
if isinstance(t, AnnotationTrigger):
|
||||
insts.append(t)
|
||||
elif isinstance(t, type) and issubclass(t, AnnotationTrigger):
|
||||
insts.append(t())
|
||||
else:
|
||||
raise RuntimeError(f"Unsupported trigger: {t}")
|
||||
self._triggers = tuple(insts)
|
||||
|
||||
# Allowed types / annotations
|
||||
atypes = set(type(self).DEFAULT_ALLOWED_TYPES)
|
||||
if "ex_allowed_types" in kwargs:
|
||||
atypes.update(kwargs["ex_allowed_types"])
|
||||
self._allowed_types = frozenset(atypes)
|
||||
|
||||
annots = dict(type(self).DEFAULT_ALLOWED_ANNOTATIONS)
|
||||
if "ex_allowed_annotations" in kwargs:
|
||||
annots.update(kwargs["ex_allowed_annotations"])
|
||||
self._allowed_annotations = frozendict(annots)
|
||||
|
||||
# Annotation can be string
|
||||
self.__ann = ann
|
||||
if isinstance(ann, str):
|
||||
self.__ann = eval(ann, {"__builtins__": {}}, self._allowed_annotations)
|
||||
|
||||
def run(self) -> TriggerResult:
|
||||
for trigger in self._triggers:
|
||||
trigger.init_trigger()
|
||||
return self._walk(self.__ann, None)
|
||||
|
||||
# --- Helpers ---
|
||||
|
||||
def _new_ctx(self, origin, args, layer, parent):
|
||||
return AnnotationWalkerCtx(origin, args, layer, parent, self._allowed_types, self._allowed_annotations)
|
||||
|
||||
def _apply_triggers(self, method: str, ctx: AnnotationWalkerCtx) -> TriggerResult:
|
||||
final = TriggerResult.passthrough()
|
||||
for trig in self._triggers:
|
||||
res = getattr(trig, method)(ctx)
|
||||
if not res:
|
||||
continue
|
||||
if res.restart_with is not None:
|
||||
return res # short-circuit on restart
|
||||
if res.replace_with is not None:
|
||||
final = TriggerResult.replace(res.replace_with)
|
||||
if res.skip_children:
|
||||
final = TriggerResult(
|
||||
replace_with=final.replace_with,
|
||||
skip_children=True,
|
||||
)
|
||||
return final
|
||||
|
||||
def _handle_with_triggers(
|
||||
self,
|
||||
trigger_name: str,
|
||||
ctx: AnnotationWalkerCtx,
|
||||
args_handler: Callable[[AnnotationWalkerCtx], Any] | None = None,
|
||||
) -> Any:
|
||||
"""Generic handler: run triggers, maybe recurse into args with a custom handler."""
|
||||
res = self._apply_triggers(trigger_name, ctx)
|
||||
if res.restart_with is not None:
|
||||
return self._walk(res.restart_with, ctx.parent)
|
||||
if res.replace_with is not None:
|
||||
return res.replace_with
|
||||
if not res.skip_children:
|
||||
if args_handler:
|
||||
return args_handler(ctx)
|
||||
return tuple(self._walk(a, ctx) for a in ctx.args)
|
||||
return None
|
||||
|
||||
def _walk_args_tuple(self, ctx: AnnotationWalkerCtx):
|
||||
# special Ellipsis case for Tuple
|
||||
if len(ctx.args) == 2 and ctx.args[1] is Ellipsis:
|
||||
return (self._walk(ctx.args[0], ctx), Ellipsis)
|
||||
return tuple(self._walk(a, ctx) for a in ctx.args)
|
||||
|
||||
# --- Dispatcher ---
|
||||
|
||||
def _walk(self, type_: Any, parent_ctx: Optional[AnnotationWalkerCtx]) -> Any:
|
||||
print(f"[{parent_ctx.layer if parent_ctx else 0}] walking through: {type_}")
|
||||
|
||||
origin = get_origin(type_) or type_
|
||||
if origin is None:
|
||||
origin = NoneType
|
||||
if origin is Union:
|
||||
origin = UnionType
|
||||
if not isinstance(origin, type):
|
||||
raise RuntimeError("Annotation must be using type(s), not instances")
|
||||
|
||||
args = get_args(type_)
|
||||
layer = 0 if parent_ctx is None else parent_ctx.layer + 1
|
||||
ctx = self._new_ctx(origin, args, layer, parent_ctx)
|
||||
|
||||
print(origin)
|
||||
match origin:
|
||||
case typing.Annotated:
|
||||
return self._handle_with_triggers(
|
||||
"process_annotated", ctx, args_handler=lambda c: self._walk(c.args[0], c) if c.args else None
|
||||
)
|
||||
case types.UnionType:
|
||||
return self._handle_with_triggers("process_union", ctx)
|
||||
case _ if issubclass(origin, dict):
|
||||
return self._handle_with_triggers("process_dict", ctx)
|
||||
case _ if issubclass(origin, tuple):
|
||||
return self._handle_with_triggers("process_tuple", ctx, self._walk_args_tuple)
|
||||
case _ if issubclass(origin, list):
|
||||
return self._handle_with_triggers("process_list", ctx)
|
||||
case _ if issubclass(origin, set):
|
||||
return self._handle_with_triggers("process_set", ctx)
|
||||
case _ if origin in self._allowed_types:
|
||||
return self._handle_with_triggers("process_allowed", ctx)
|
||||
case _:
|
||||
res = self._apply_triggers("process_unknown", ctx)
|
||||
if res.restart_with is not None:
|
||||
return self._walk(res.restart_with, ctx.parent)
|
||||
if res.replace_with is not None:
|
||||
return res.replace_with
|
||||
raise UnsupportedFieldType(f"Not supported Field: {ctx.origin}, " f"Supported list: {self._allowed_types}")
|
||||
|
||||
46
test/test_AnnotationTool.py
Normal file
46
test/test_AnnotationTool.py
Normal file
@@ -0,0 +1,46 @@
|
||||
# dabmodel (c) by chacha
|
||||
#
|
||||
# dabmodel is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
import unittest
|
||||
from typing import Annotated, Union, Optional
|
||||
import sys
|
||||
import subprocess
|
||||
from os import chdir, environ
|
||||
from pathlib import Path
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src import dabmodel as dm
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
|
||||
class ElementTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
print("\n->", unittest.TestCase.id(self))
|
||||
|
||||
def test_element_simple(self):
|
||||
print(isinstance(None, type(None)))
|
||||
print("\n== From OBJs ==")
|
||||
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[str]]], "comment"], (dm.tools.LAMSchemaValidation(),))
|
||||
print(f"res={res.run()}")
|
||||
|
||||
print("\n== From STRING ==")
|
||||
res = dm.tools.AnnotationWalker('Annotated[Optional[dict[int, list[str]]], "comment"]', (dm.tools.LAMSchemaValidation(),))
|
||||
print(f"res={res.run()}")
|
||||
|
||||
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[None]]], "comment"], (dm.tools.LAMSchemaValidation(),))
|
||||
print(f"res={res.run()}")
|
||||
|
||||
|
||||
# ---------- main ----------
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -88,9 +88,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(
|
||||
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app1,
|
||||
"VarBytes2",
|
||||
@@ -158,9 +156,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(
|
||||
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app1,
|
||||
"VarBytes2",
|
||||
@@ -255,7 +251,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
testVar4: "Set[str]" = {"a", "c"}
|
||||
testVar5: set[str] = {"a", "b"}
|
||||
testVar6: "set[str]" = {"a", "b"}
|
||||
testVar7: set[int | str] = {1, 2, "abcd", "efg"}
|
||||
|
||||
app1 = Appliance1()
|
||||
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
|
||||
@@ -267,7 +262,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
|
||||
self.immutable_vars__test_field(app1, "testVar5", {"a", "b"}, {"h", "c"})
|
||||
self.immutable_vars__test_field(app1, "testVar6", {"a", "b"}, {"h", "c"})
|
||||
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {"h", "c"})
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
@@ -334,7 +328,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
testVar4: "frozenset[str]" = frozenset({"a", "c"})
|
||||
testVar5: FrozenSet[int] = frozenset({1, 2})
|
||||
testVar6: "FrozenSet[int]" = frozenset({1, 2})
|
||||
testVar7: FrozenSet[int | str] = frozenset({1, 2, "abcd", "efg"})
|
||||
|
||||
app1 = Appliance1()
|
||||
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
|
||||
@@ -346,7 +339,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
|
||||
self.immutable_vars__test_field(app1, "testVar5", {1, 2}, {1, 5})
|
||||
self.immutable_vars__test_field(app1, "testVar6", {1, 2}, {1, 5})
|
||||
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {1, 5})
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
@@ -410,7 +402,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
testVar4: "List[str]" = ["a", "c"]
|
||||
testVar5: list[str] = ["a", "b"]
|
||||
testVar6: "list[str]" = ["a", "b"]
|
||||
testVar7: List[Union[int, str]] = [1, 2, 3, "one", "two", "three"]
|
||||
|
||||
app1 = Appliance1()
|
||||
# Note: lists are converted to tuples
|
||||
@@ -420,9 +411,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "testVar4", ("a", "c"), ["h", "e"])
|
||||
self.immutable_vars__test_field(app1, "testVar5", ("a", "b"), ["h", "c"])
|
||||
self.immutable_vars__test_field(app1, "testVar6", ("a", "b"), ["h", "c"])
|
||||
self.immutable_vars__test_field(
|
||||
app1, "testVar7", (1, 2, 3, "one", "two", "three"), ["h", "c"]
|
||||
)
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
@@ -518,7 +506,6 @@ class ApplianceTest(unittest.TestCase):
|
||||
testVar5: tuple[str, ...] = ("a", "b")
|
||||
testVar6: "tuple[str,...]" = ("a", "b")
|
||||
testVar7: Tuple[int, str] = (1, "b")
|
||||
# testVar7: Tuple[Union[int, str]] = (1, 2, 3, "one", "two", "three")
|
||||
|
||||
app1 = Appliance1()
|
||||
|
||||
@@ -612,12 +599,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
|
||||
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -656,12 +639,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.assertIn("__lam_schema__", dir(app1))
|
||||
self.assertIn("__lam_schema__", app1.__dict__)
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app1,
|
||||
"Dict1",
|
||||
@@ -678,18 +657,10 @@ class ApplianceTest(unittest.TestCase):
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(app1, "Tuple2", ("a", "b"), ("a", "b"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
|
||||
self.check_immutable_fields_schema(app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), set[int])
|
||||
|
||||
# same test with Typing types (list -> List ...)
|
||||
# class can be created
|
||||
@@ -710,12 +681,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.assertIn("__lam_schema__", dir(app1))
|
||||
self.assertIn("__lam_schema__", app1.__dict__)
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), List[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), List[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), List[str])
|
||||
self.check_immutable_fields_schema(app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), List[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app1,
|
||||
"Dict1",
|
||||
@@ -732,18 +699,10 @@ class ApplianceTest(unittest.TestCase):
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), Tuple[str, ...])
|
||||
self.check_immutable_fields_schema(app1, "Tuple2", ("a", "b"), ("a", "b"), Tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), Set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), Set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int])
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int])
|
||||
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), Set[int])
|
||||
self.check_immutable_fields_schema(app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), Set[int])
|
||||
|
||||
def test_immutable_fields_annotated(self):
|
||||
"""Testing first appliance level, and Field types (annotated)"""
|
||||
@@ -785,9 +744,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.assertEqual(app1.__lam_schema__["VarBool"].doc, "foo9")
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.assertEqual(app1.__lam_schema__["VarBool2"].doc, "foo10")
|
||||
self.immutable_vars__test_field(
|
||||
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.assertEqual(app1.__lam_schema__["VarBytes"].doc, "foo11")
|
||||
self.immutable_vars__test_field(
|
||||
app1,
|
||||
@@ -852,9 +809,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(
|
||||
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app1,
|
||||
"VarBytes2",
|
||||
@@ -868,12 +823,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
|
||||
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -903,9 +854,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app2, "VarComplex2", complex(3, 0), complex(3, 2))
|
||||
self.immutable_vars__test_field(app2, "VarBool", False, False)
|
||||
self.immutable_vars__test_field(app2, "VarBool2", True, True)
|
||||
self.immutable_vars__test_field(
|
||||
app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app2,
|
||||
"VarBytes2",
|
||||
@@ -919,12 +868,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app2, "VarInt2", 23, 21, int)
|
||||
self.check_immutable_fields_schema(app2, "VarFloat", 2.6, 12.1, float)
|
||||
self.check_immutable_fields_schema(app2, "VarFloat2", 1.5, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "VarComplex", complex(7, 1), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "VarComplex2", complex(3, 0), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "VarComplex", complex(7, 1), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app2, "VarComplex2", complex(3, 0), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app2, "VarBool", False, True, bool)
|
||||
self.check_immutable_fields_schema(app2, "VarBool2", True, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -961,9 +906,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app3, "VarComplex2", complex(3, 0), complex(3, 2))
|
||||
self.immutable_vars__test_field(app3, "VarBool", False, False)
|
||||
self.immutable_vars__test_field(app3, "VarBool2", True, True)
|
||||
self.immutable_vars__test_field(
|
||||
app3, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app3, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app3,
|
||||
"VarBytes2",
|
||||
@@ -978,12 +921,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app3, "VarInt2", 23, 21, int)
|
||||
self.check_immutable_fields_schema(app3, "VarFloat", 2.6, 12.1, float)
|
||||
self.check_immutable_fields_schema(app3, "VarFloat2", 1.5, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "VarComplex", complex(7, 1), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "VarComplex2", complex(3, 0), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app3, "VarComplex", complex(7, 1), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app3, "VarComplex2", complex(3, 0), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app3, "VarBool", False, True, bool)
|
||||
self.check_immutable_fields_schema(app3, "VarBool2", True, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -1012,9 +951,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(
|
||||
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app1,
|
||||
"VarBytes2",
|
||||
@@ -1028,12 +965,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
|
||||
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
|
||||
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -1061,9 +994,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.immutable_vars__test_field(app2, "VarComplex2", complex(3, 0), complex(3, 2))
|
||||
self.immutable_vars__test_field(app2, "VarBool", False, False)
|
||||
self.immutable_vars__test_field(app2, "VarBool2", True, True)
|
||||
self.immutable_vars__test_field(
|
||||
app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
|
||||
)
|
||||
self.immutable_vars__test_field(app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(
|
||||
app2,
|
||||
"VarBytes2",
|
||||
@@ -1077,12 +1008,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
self.check_immutable_fields_schema(app2, "VarInt2", 23, 21, int)
|
||||
self.check_immutable_fields_schema(app2, "VarFloat", 2.6, 12.1, float)
|
||||
self.check_immutable_fields_schema(app2, "VarFloat2", 1.5, 21.2, float)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "VarComplex", complex(7, 1), complex(3, 5), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "VarComplex2", complex(3, 0), complex(8, 6), complex
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "VarComplex", complex(7, 1), complex(3, 5), complex)
|
||||
self.check_immutable_fields_schema(app2, "VarComplex2", complex(3, 0), complex(8, 6), complex)
|
||||
self.check_immutable_fields_schema(app2, "VarBool", False, True, bool)
|
||||
self.check_immutable_fields_schema(app2, "VarBool2", True, False, bool)
|
||||
self.check_immutable_fields_schema(
|
||||
@@ -1137,16 +1064,12 @@ class ApplianceTest(unittest.TestCase):
|
||||
app1 = Appliance1()
|
||||
|
||||
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app1,
|
||||
"Dict1",
|
||||
@@ -1155,26 +1078,18 @@ class ApplianceTest(unittest.TestCase):
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
|
||||
|
||||
app2 = Appliance2()
|
||||
|
||||
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app2,
|
||||
"Dict1",
|
||||
@@ -1182,27 +1097,17 @@ class ApplianceTest(unittest.TestCase):
|
||||
{1: 1.1, 4: 7.6, 91: 23.6},
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
|
||||
|
||||
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app1,
|
||||
"Dict1",
|
||||
@@ -1211,12 +1116,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
|
||||
|
||||
# class can be created
|
||||
class Appliance3(Appliance2):
|
||||
@@ -1229,28 +1130,18 @@ class ApplianceTest(unittest.TestCase):
|
||||
app3 = Appliance3()
|
||||
|
||||
self.immutable_vars__test_field(app3, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app3, "Tuple1", ("aa", "cc"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app3, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app3, "Set1", frozenset({1, 20}), set({4, 0}))
|
||||
|
||||
self.immutable_vars__test_field(
|
||||
app3, "ListStr2", ("mod val3", "mod val3"), ["mod val3", "mod val3"]
|
||||
)
|
||||
self.immutable_vars__test_field(
|
||||
app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app3, "ListStr2", ("mod val3", "mod val3"), ["mod val3", "mod val3"])
|
||||
self.immutable_vars__test_field(app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6})
|
||||
self.immutable_vars__test_field(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"))
|
||||
self.immutable_vars__test_field(
|
||||
app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127})
|
||||
)
|
||||
self.immutable_vars__test_field(app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127}))
|
||||
self.immutable_vars__test_field(app3, "Set2", frozenset({10, 250}), set({10, 250}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app3, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app3,
|
||||
"Dict1",
|
||||
@@ -1258,15 +1149,9 @@ class ApplianceTest(unittest.TestCase):
|
||||
{1: 1.1, 4: 7.6, 91: 23.6},
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app3, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(app3, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app3, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app3,
|
||||
@@ -1282,9 +1167,7 @@ class ApplianceTest(unittest.TestCase):
|
||||
{9: 8.1, 5: 98.6, 551: 3.6},
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"), tuple[str, ...]
|
||||
)
|
||||
self.check_immutable_fields_schema(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app3,
|
||||
"FrozenSet2",
|
||||
@@ -1292,21 +1175,15 @@ class ApplianceTest(unittest.TestCase):
|
||||
frozenset({114, 127}),
|
||||
frozenset[int],
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app3, "Set2", frozenset({10, 250}), frozenset({10, 250}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app3, "Set2", frozenset({10, 250}), frozenset({10, 250}), set[int])
|
||||
|
||||
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app2,
|
||||
"Dict1",
|
||||
@@ -1314,27 +1191,17 @@ class ApplianceTest(unittest.TestCase):
|
||||
{1: 1.1, 4: 7.6, 91: 23.6},
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
|
||||
|
||||
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
|
||||
self.immutable_vars__test_field(
|
||||
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
|
||||
)
|
||||
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
|
||||
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
|
||||
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
|
||||
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
|
||||
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
|
||||
self.check_immutable_fields_schema(
|
||||
app1,
|
||||
"Dict1",
|
||||
@@ -1343,12 +1210,8 @@ class ApplianceTest(unittest.TestCase):
|
||||
dict[int, float],
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(
|
||||
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
|
||||
)
|
||||
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
|
||||
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
|
||||
|
||||
def test_initializer(self):
|
||||
"""Testing first appliance level, and Field types (simple)"""
|
||||
@@ -1804,34 +1667,37 @@ class ApplianceTest(unittest.TestCase):
|
||||
|
||||
def test_deepfreeze_nested_dict_list_set(self):
|
||||
class App(dm.Appliance):
|
||||
data: dict[str, list[int] | set[str] | dict[str, list[int] | set[str]]] = {
|
||||
"numbers": [1, 2, 3],
|
||||
data: dict[str, list[int]] = {"numbers": [1, 2, 3]}
|
||||
data2: dict[str, set[str]] = {
|
||||
"letters": {"a", "b", "c"},
|
||||
"mixed": {"x": [4, 5], "y": {"z"}},
|
||||
}
|
||||
data3: dict[str, dict[str, list[int]]] = {
|
||||
"mixed": {"x": [4, 5]},
|
||||
}
|
||||
|
||||
app = App()
|
||||
|
||||
# Top-level: should be frozendict
|
||||
self.assertEqual(type(app.data).__name__, "frozendict")
|
||||
self.assertEqual(type(app.data2).__name__, "frozendict")
|
||||
self.assertEqual(type(app.data3).__name__, "frozendict")
|
||||
|
||||
# Lists must be frozen to tuple
|
||||
self.assertIsInstance(app.data["numbers"], tuple)
|
||||
self.assertIsInstance(app.data["mixed"]["x"], tuple)
|
||||
self.assertIsInstance(app.data3["mixed"]["x"], tuple)
|
||||
|
||||
# Sets must be frozen to frozenset
|
||||
self.assertIsInstance(app.data["letters"], frozenset)
|
||||
self.assertIsInstance(app.data["mixed"]["y"], frozenset)
|
||||
self.assertIsInstance(app.data2["letters"], frozenset)
|
||||
|
||||
# Check immutability
|
||||
with self.assertRaises(TypeError):
|
||||
app.data["numbers"] += (99,)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app.data["letters"].add("d")
|
||||
app.data2["letters"].add("d")
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
app.data["mixed"]["x"] += (6,)
|
||||
app.data3["mixed"]["x"] += (6,)
|
||||
|
||||
def test_unknown_parameters_raise(self):
|
||||
class App(dm.Appliance):
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
import unittest
|
||||
|
||||
from typing import Annotated, Union
|
||||
import sys
|
||||
import subprocess
|
||||
from os import chdir, environ
|
||||
@@ -445,6 +445,19 @@ class ElementTest(unittest.TestCase):
|
||||
e.ivalue = 15
|
||||
self.assertEqual(e.ivalue, 14)
|
||||
|
||||
def test_mutable_object_and_class_freeze_2(self):
|
||||
|
||||
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
|
||||
ivalue: int = 43
|
||||
|
||||
self.assertFalse(E.__lam_schema__["ivalue"].is_frozen())
|
||||
E.freeze_class()
|
||||
self.assertTrue(E.__lam_schema__["ivalue"].is_frozen())
|
||||
e = E()
|
||||
self.assertFalse(e.__lam_schema__["ivalue"].is_frozen())
|
||||
e.freeze()
|
||||
self.assertTrue(e.__lam_schema__["ivalue"].is_frozen())
|
||||
|
||||
def test_mutable_class_freeze_container(self):
|
||||
|
||||
class E(dm.Element, options=(dm.ClassMutable)):
|
||||
@@ -788,6 +801,60 @@ class ElementTest(unittest.TestCase):
|
||||
self.assertIsInstance(_elemB.el2, type(elemB.el1))
|
||||
self.assertIsInstance(_elemB.el2, type(_elemA.el1))
|
||||
|
||||
def test_element_mutable_invalid_instance_override_container(self):
|
||||
class Elem(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
val: list[int] = []
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
Elem(val=["test"])
|
||||
|
||||
def test_element_mutable_invalid_instance_override_container_nested(self):
|
||||
class ElemInner(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
val: list[int] = []
|
||||
|
||||
class ElemOuter(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
inner: ElemInner = ElemInner()
|
||||
|
||||
# with self.assertRaises(dm.InvalidFieldValue):
|
||||
elem = ElemOuter(inner={"val": [1, 2, 3]})
|
||||
|
||||
self.assertEqual(elem.inner.val, [1, 2, 3])
|
||||
|
||||
def test_class_protocol(self):
|
||||
self.assertIsInstance(dm.base_element.BaseElement, dm.interfaces.FreezableElement)
|
||||
|
||||
def test_wrong_annotated(self):
|
||||
|
||||
with self.assertRaises(dm.UnsupportedFieldType):
|
||||
|
||||
class Elem(dm.Appliance):
|
||||
StrVar: list[Annotated[str, dm.LAMFieldInfo(doc="foo1")]] = ["default value"]
|
||||
|
||||
def test_wrong_annotation_union(self):
|
||||
|
||||
with self.assertRaises(dm.UnsupportedFieldType):
|
||||
|
||||
class Elem(dm.Appliance):
|
||||
StrVar: str | int = "test"
|
||||
|
||||
with self.assertRaises(dm.UnsupportedFieldType):
|
||||
|
||||
class Elem(dm.Appliance):
|
||||
StrVar: list[str | int] = "test"
|
||||
|
||||
with self.assertRaises(dm.UnsupportedFieldType):
|
||||
|
||||
class Elem(dm.Appliance):
|
||||
StrVar: list[None | int] = "test"
|
||||
|
||||
def test_annotation_union(self):
|
||||
|
||||
class Elem(dm.Appliance):
|
||||
strvar1: str | None = "test"
|
||||
strvar1: str | None = None
|
||||
ivar1: Union[int, None] = 12
|
||||
ivar2: Union[int, None] = None
|
||||
|
||||
|
||||
# ---------- main ----------
|
||||
|
||||
|
||||
@@ -70,6 +70,9 @@ class FeatureTest(unittest.TestCase):
|
||||
)
|
||||
self.assertTrue(hasattr(app1, "Feature1"))
|
||||
self.assertTrue(app1.Feature1.frozen)
|
||||
print(app1)
|
||||
print(app1.Feature1)
|
||||
print(app1.Feature1.__lam_schema__["VarStrInner"])
|
||||
self.assertTrue(app1.Feature1.__lam_schema__["VarStrInner"].is_frozen())
|
||||
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
|
||||
|
||||
@@ -661,6 +664,165 @@ class FeatureTest(unittest.TestCase):
|
||||
self.assertEqual(App.F.tag, "test")
|
||||
self.assertEqual(App.F.nums, (1, 2, 3))
|
||||
|
||||
def test_initializer_nested(self):
|
||||
class App(dm.Appliance):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
cls.tag = "test"
|
||||
cls.nums.append(3)
|
||||
|
||||
self.assertEqual(App.F.tag, "test")
|
||||
self.assertEqual(App.F.nums, (1, 2, 3))
|
||||
|
||||
def test_initializer_nested_dual(self):
|
||||
class App(dm.Appliance):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
cls.tag = "test1"
|
||||
cls.nums.append(3)
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
cls.F.tag = "test2"
|
||||
cls.F.nums.append(4)
|
||||
|
||||
self.assertEqual(App.F.tag, "test2")
|
||||
self.assertEqual(App.F.nums, (1, 2, 3, 4))
|
||||
|
||||
def test_container_frozen(self):
|
||||
class App(dm.Appliance):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
App.F.nums.append(3)
|
||||
|
||||
def test_container_class_mutable_validation(self):
|
||||
class App(dm.Appliance, options=(dm.ClassMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
App.F.nums.append("test")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App.freeze_class()
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
a = App()
|
||||
|
||||
def test_container_object_mutable_validation(self):
|
||||
class App(dm.Appliance, options=(dm.ObjectMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
a = App()
|
||||
a.F.nums.append("test")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
a.freeze()
|
||||
|
||||
def test_container_object_and_class_mutable_validation(self):
|
||||
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
App.F.nums.append("test")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
a = App()
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App.freeze_class()
|
||||
|
||||
def test_nested_element_container_object_and_class_mutable(self):
|
||||
class E(dm.Element):
|
||||
val: str = "testelem"
|
||||
i: list[int] = [1]
|
||||
|
||||
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
e: E = E(val="modified")
|
||||
|
||||
App.F.e.i.append(21)
|
||||
self.assertEquals(App.F.e.i, [1, 21])
|
||||
App.freeze_class()
|
||||
with self.assertRaises(AttributeError):
|
||||
App.F.e.i.append(22)
|
||||
a = App()
|
||||
self.assertEquals(a.F.e.i, [1, 21])
|
||||
a.F.e.i.append(28)
|
||||
self.assertEquals(a.F.e.i, [1, 21, 28])
|
||||
a.freeze()
|
||||
self.assertEquals(a.F.e.i, (1, 21, 28))
|
||||
with self.assertRaises(AttributeError):
|
||||
a.F.e.i.append(23)
|
||||
self.assertEquals(a.F.e.i, (1, 21, 28))
|
||||
|
||||
def test_nested_element_container_object_and_class_mutable_validation(self):
|
||||
class E(dm.Element):
|
||||
val: str = "testelem"
|
||||
i: list[int] = [1]
|
||||
|
||||
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
e: E = E(val="modified")
|
||||
|
||||
App.F.e.i.append("test")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
a = App()
|
||||
|
||||
def test_nested_element_container_object_and_class_mutable_validation2(self):
|
||||
class E(dm.Element):
|
||||
val: str = "testelem"
|
||||
i: list[int] = [1]
|
||||
|
||||
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
|
||||
integ: int = 18
|
||||
|
||||
class F(dm.Feature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
e: E = E(val="modified")
|
||||
|
||||
a = App()
|
||||
a.F.e.i.append("test")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
a.freeze()
|
||||
|
||||
|
||||
# ---------- main ----------
|
||||
|
||||
|
||||
Reference in New Issue
Block a user