Compare commits

...

16 Commits

Author SHA1 Message Date
cclecle
85a2ca2753 work 2025-10-12 22:45:56 +02:00
chacha
df50632458 work 2025-09-29 21:14:24 +02:00
cclecle
25d5339946 work 2025-09-28 21:18:35 +02:00
cclecle
6efd914de1 non finished work 2025-09-28 17:33:34 +02:00
cclecle
2e81b3f0e6 work 2025-09-27 23:58:47 +02:00
cclecle
5d206ef266 fix Feature + cleaning 2025-09-27 22:00:47 +02:00
chacha
ba993b14b9 work 2025-09-27 16:41:36 +02:00
chacha
5232ad66fc some progress 2025-09-26 00:50:52 +02:00
chacha
44554006db work 2025-09-25 01:40:05 +02:00
chacha
1e84fd691e clean 2025-09-24 19:10:03 +02:00
chacha
2ac931edeb more work 2025-09-24 02:16:10 +02:00
chacha
7ffe05f514 clean 2025-09-23 22:43:39 +02:00
chacha
100d2cadcb renaming 2025-09-23 22:27:00 +02:00
chacha
71faefcf68 code cleaning 2025-09-23 22:10:03 +02:00
chacha
23f14a042d make metaclass thread safer 2025-09-23 22:06:02 +02:00
chacha
19a6c802bb yet another huge rework 2025-09-23 21:47:23 +02:00
22 changed files with 1988 additions and 847 deletions

View File

@@ -1,43 +0,0 @@
from typing import Generic, Any, Optional
from .LAMField import LAMField
from .Constraint import Constraint
from ..tools import LAMdeepfreeze
from .LAMField import TV_LABField
class FrozenLAMField(Generic[TV_LABField]):
"""FrozenLAMField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: LAMField[TV_LABField]):
self._inner_field = inner_field
def validate(self, v: Optional[TV_LABField]):
self._inner_field.validate(v)
@property
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return LAMdeepfreeze(self._inner_field.doc)
@property
def constraints(self) -> tuple[Constraint]:
"""Returns Field's constraint (frozen)"""
return LAMdeepfreeze(self._inner_field.constraints)
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return LAMdeepfreeze(self._inner_field.annotations)

View File

@@ -1,83 +0,0 @@
from typing import Generic, TypeVar, Optional, Any
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from .LAMFieldInfo import LAMFieldInfo
from .Constraint import Constraint
from ..tools import LAMdeepfreeze
from ..exception import InvalidFieldValue
TV_LABField = TypeVar("TV_LABField")
class LAMField(Generic[TV_LABField]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[TV_LABField], a: Any, i: LAMFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._info: LAMFieldInfo = i
self._annotations: Any = a
self.validate(v)
self._default_value: Optional[TV_LABField] = v
self._value: Optional[TV_LABField] = v
self._constraints: list[Constraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: Constraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[Constraint]:
"""Returns Field's constraint"""
return self._info.constraints
def validate(self, v: Optional[TV_LABField]):
try:
check_type(
v,
self.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{self._name}> is not of expected type {self.annotations}."
) from exp
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, v: Optional[TV_LABField] = None) -> None:
"""Updates Field's value"""
self.validate(v)
self._value = v
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
return LAMdeepfreeze(self._value)
@property
def raw_value(self) -> Optional[TV_LABField]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations
@property
def info(self) -> LAMFieldInfo:
"""Returns Field's info"""
return self._info

View File

@@ -1,26 +0,0 @@
from typing import Optional, Any
from .Constraint import Constraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(
self, *, doc: str = "", constraints: Optional[list[Constraint]] = None
):
self._doc: str = doc
self._constraints: list[Constraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self) -> list[Constraint[Any]]:
"""Returns Field's constraints"""
return self._constraints

View File

@@ -15,9 +15,10 @@ from .__metadata__ import __version__, __Summuary__, __Name__
from .meta.element import ClassMutable, ObjectMutable
from .element import Element
from .LAMFields.LAMField import LAMField
from .LAMFields.LAMFieldInfo import LAMFieldInfo
from .LAMFields.FrozenLAMField import FrozenLAMField
from .lam_field.lam_field import LAMField
from .lam_field.lam_field_info import LAMFieldInfo
# from .LAMFields.FrozenLAMField import FrozenLAMField
from .appliance import Appliance
from .feature import Feature
@@ -27,7 +28,6 @@ from .exception import (
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
@@ -38,6 +38,8 @@ from .exception import (
InvalidFeatureInheritance,
FeatureNotBound,
UnsupportedFieldType,
NonExistingField,
InvalidFieldName,
)
__all__ = [name for name in globals() if not name.startswith("_")]

View File

@@ -9,9 +9,19 @@ class Appliance(IAppliance, metaclass=_MetaAppliance):
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""
def validate_schema(self):
super().validate_schema()
for k in self.__lam_schema__["features"]:
self.__dict__[k].validate_schema()
@classmethod
def validate_schema_class(cls):
super().validate_schema_class()
print(cls.__lam_schema__["features"])
for v in cls.__lam_schema__["features"].values():
v.validate_schema_class()
def _validate_schema_unknown_attr(self, name: str):
print(f"(appliance) _validate_schema_unknown_attr {self}:{name}")
print(self.__dict__[name])
if isinstance(self.__dict__[name], Feature):
return
super()._validate_schema_unknown_attr(name)
@@ -20,3 +30,42 @@ class Appliance(IAppliance, metaclass=_MetaAppliance):
if name == "features":
return
super()._validate_schema_missing_attr(name)
@classmethod
def _validate_unknown_field_schema(cls, name: str):
if name == "features":
return
super()._validate_unknown_field_schema(name)
def _freeze_unknown_attr(self, name: str, force: bool = False):
if isinstance(self.__dict__[name], Feature):
self.__dict__[name].freeze(force)
return
super()._freeze_unknown_attr(name, force)
def _freeze_missing_attr(self, name: str, force: bool = False):
if name == "features":
for k, v in self.__lam_schema__["features"].items():
v.freeze_class(force)
return
super()._freeze_missing_attr(name, force)
@classmethod
def _freeze_unknown_field_schema(cls, name: str, force: bool = False):
if name == "features":
for v in cls.__lam_schema__["features"].values():
v.freeze_class(force)
return
super()._freeze_unknown_field_schema(name, force)
@classmethod
def _validate_unknown_attr_class(cls, name: str) -> None:
if issubclass(cls.__dict__[name], Feature):
return
super()._validate_unknown_attr_class(name)
@classmethod
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
if issubclass(cls.__dict__[name], Feature):
return
super()._freeze_unknown_attr_class(name, force)

View File

@@ -1,93 +1,253 @@
from typing import Any
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from typing import Any, Self, Dict, Optional
from .exception import ReadOnlyField, InvalidFieldValue, NewFieldForbidden, SchemaViolation
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from copy import deepcopy
from .lam_field.lam_field import LAMField, LAMField_Element
from .exception import ReadOnlyField, SchemaViolation, NonExistingField, InvalidFieldValue
from .tools import LAMdeepfreeze, is_data_attribute
'''
class ElementView:
__slots__ = ("_vals", "_types", "_touched")
def __init__(
self,
values: dict[str, Any],
types_map: dict[str, type],
):
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise NonExistingField(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
class ElementViewCls:
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(
self,
values: dict[str, Any],
types_map: dict[str, type],
name: Optional[str] = None,
module: Optional[str] = None,
):
super().__init__(values, types_map)
self._name: str
self._module: str
if name is not None:
object.__setattr__(self, "_name", name)
if module is not None:
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value: str):
pass
'''
class BaseElement:
__lam_schema__ = {}
__lam_initialized__ = False
__lam_class_mutable__ = False
__lam_object_mutable__ = False
__lam_options__ = {}
"""
def get_model_spec(self, name: str, module: str, memo: dict[str, Any]) -> ElementView:
# memo[self.__name__] = {}
init_fieldvalues = {}
init_fieldtypes = {}
for k, v in self.__lam_schema__.items():
if isinstance(v, LAMField_Element):
memo[k] = {}
init_fieldvalues[k] = v.value.get_model_spec(memo[k])
# clone = v.clone_unfrozen().value
elif isinstance(v, LAMField):
clone = deepcopy(v.value)
init_fieldvalues[k] = clone
else:
pass
init_fieldtypes[k] = v.annotations
return ElementView(init_fieldvalues, init_fieldtypes)
def __setattr__(self, key: str, value: Any):
print(f"!guarded_setattr {self} {key} {value}")
@classmethod
def get_model_spec_cls(cls, memo: dict[str, Any]) -> ElementView:
# memo[self.__name__] = {}
init_fieldvalues = {}
init_fieldtypes = {}
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField_Element):
memo[k] = {}
init_fieldvalues[k] = v.value.get_model_spec(k, memo[k])
# clone = v.clone_unfrozen().value
elif isinstance(v, LAMField):
clone = deepcopy(v.value)
init_fieldvalues[k] = clone
else:
pass
init_fieldtypes[k] = v.annotations
return ElementViewCls(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__modules__)
"""
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
raise NotImplemented()
@classmethod
@property
def frozen_cls(cls) -> bool:
return not cls.__lam_class_mutable__
@classmethod
@property
def mutable_obj(cls) -> bool:
return cls.__lam_object_mutable__
@property
def frozen(self) -> bool:
return not self.__lam_object_mutable__
def __setattr__(self, key: str, value: Any) -> None:
if key.startswith("_"):
return super().__setattr__(key, value)
if key not in self.__lam_schema__:
raise NewFieldForbidden(f"Can't create new object attributes: {key}")
raise NonExistingField(f"Can't create new object attributes: {key}")
if not self.__lam_object_mutable__:
raise ReadOnlyField(f"{key} is read-only")
try:
check_type(
value,
self.__lam_schema__[key].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{key}> is not of expected type {self.__lam_schema__[key].annotations}."
) from exp
self.__lam_schema__[key].validate(value)
return super().__setattr__(key, value)
def freeze(self, force: bool = False):
print("freeze")
def freeze(self, force: bool = False) -> None:
if self.__lam_object_mutable__ or force:
print("freezing !!!!!!!!!!!")
self.validate_schema()
if self.__lam_object_mutable__:
self.validate_schema()
setSchemaKeys = set(self.__lam_schema__.keys())
setSchemaKeys = set(self.__lam_schema__)
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
for unknown_attr in setInstanceKeys - setSchemaKeys:
self._validate_schema_unknown_attr(unknown_attr)
for k_unknown in setInstanceKeys - setSchemaKeys:
self._freeze_unknown_attr(k_unknown, force)
for unknown_attr in setSchemaKeys - setInstanceKeys:
self._validate_schema_missing_attr(unknown_attr)
for k_missing in setSchemaKeys - setInstanceKeys:
self._freeze_missing_attr(k_missing, force)
for attrName in setSchemaKeys & setInstanceKeys:
print(f"Freezing {attrName}")
object.__setattr__(self, attrName, LAMdeepfreeze(self.__dict__[attrName]))
for k in list(setSchemaKeys & setInstanceKeys):
self.__lam_schema__[k].freeze()
if isinstance(self.__dict__[k], BaseElement):
self.__dict__[k].freeze(force)
else:
self.__dict__[k] = LAMdeepfreeze(self.__dict__[k])
self.__lam_object_mutable__ = False
else:
print("already frozen")
def _freeze_unknown_attr(self, name: str):
def _freeze_unknown_attr(self, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Attribute <{name}> is not in the schema")
def _freeze_missing_attr(self, name: str):
def _freeze_missing_attr(self, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Attribute <{name}> is missing from instance")
def validate_schema(self):
print("validate_schema")
setSchemaKeys = set(self.__lam_schema__.keys())
def validate_schema(self) -> None:
setSchemaKeys = set(self.__lam_schema__)
setInstanceKeys = {_[0] for _ in self.__dict__.items() if is_data_attribute(_[0], _[1])}
for unknown_attr in setInstanceKeys - setSchemaKeys:
self._validate_schema_unknown_attr(unknown_attr)
for k_unknown in setInstanceKeys - setSchemaKeys:
self._validate_schema_unknown_attr(k_unknown)
for unknown_attr in setSchemaKeys - setInstanceKeys:
self._validate_schema_missing_attr(unknown_attr)
for k_missing in setSchemaKeys - setInstanceKeys:
self._validate_schema_missing_attr(k_missing)
for attrName in setSchemaKeys & setInstanceKeys:
print(
f"Validating {attrName}: {self.__dict__[attrName]} / {self.__lam_schema__[attrName].annotations}"
)
self.__lam_schema__[attrName].validate(self.__dict__[attrName])
print("validate_schema done")
for k in list(setSchemaKeys & setInstanceKeys):
self.__lam_schema__[k].validate_self()
def _validate_schema_unknown_attr(self, name: str):
def _validate_schema_unknown_attr(self, name: str) -> None:
raise SchemaViolation(f"Attribute <{name}> is not in the schema")
def _validate_schema_missing_attr(self, name: str):
def _validate_schema_missing_attr(self, name: str) -> None:
raise SchemaViolation(f"Attribute <{name}> is missing from instance")
@classmethod
def freeze_class(cls, force: bool = False):
pass
def freeze_class(cls, force: bool = False) -> None:
if cls.__lam_class_mutable__ or force:
cls.validate_schema_class()
# class should not have any elements so they are all unknown
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
cls._freeze_unknown_attr_class(k_unknown, force)
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
cls.__lam_schema__[k].freeze()
else:
cls._freeze_unknown_field_schema(k, force)
cls.__lam_class_mutable__ = False
@classmethod
def _freeze_unknown_attr_class(cls, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Class attribute <{name}> is not in the schema")
@classmethod
def _freeze_unknown_field_schema(cls, name: str, force: bool = False) -> None:
raise SchemaViolation(f"Unknown field <{name} in the schema> ")
@classmethod
def validate_schema_class(cls) -> None:
# class should not have any elements so they are all unknown
for k_unknown in {_[0] for _ in cls.__dict__.items() if is_data_attribute(_[0], _[1])}:
cls._validate_unknown_attr_class(k_unknown)
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
v.validate_self()
else:
cls._validate_unknown_field_schema(k)
@classmethod
def _validate_unknown_attr_class(cls, name: str) -> None:
raise SchemaViolation(f"Class attribute <{name}> is not in the schema")
@classmethod
def _validate_unknown_field_schema(cls, name: str) -> None:
raise SchemaViolation(f"Unknown field <{name} in the schema> ")

View File

@@ -2,14 +2,14 @@ from typing import Any, Union, Optional, List, Dict, Tuple, Set, FrozenSet, Anno
from types import SimpleNamespace
import math
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
ALLOWED_MODEL_FIELDS_TYPES: set[type[Any], ...] = {
str,
int,
float,
complex,
bool,
bytes,
)
}
ALLOWED_ANNOTATIONS: dict[str, Any] = {
"Union": Union,

View File

@@ -4,6 +4,10 @@ class DABModelException(Exception):
"""
class WrongUsage(DABModelException, RuntimeError):
pass
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class"""
@@ -40,12 +44,6 @@ class ReadOnlyField(AttributeError, DABModelException):
"""
class NewFieldForbidden(AttributeError, DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class InvalidFieldAnnotation(AttributeError, DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
@@ -82,30 +80,30 @@ class ReadOnlyFieldAnnotation(AttributeError, DABModelException):
"""
class InvalidFieldValue(AttributeError, DABModelException):
class SchemaViolation(AttributeError, DABModelException):
"""SchemaViolation Exception class
The Element Schema is not respected
"""
class InvalidFieldValue(SchemaViolation):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(SchemaViolation):
"""NonExistingField Exception class
The given Field is non existing
"""
class InvalidFieldName(AttributeError, DABModelException):
"""InvalidFieldName Exception class
The Field name is invalid
"""
class NonExistingField(AttributeError, DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class SchemaViolation(AttributeError, DABModelException):
"""SchemaViolation Exception class
The Element Schema is not respected
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
@@ -136,7 +134,7 @@ class FeatureBoundToNonAppliance(DABModelException):
"""
class FeatureBoundToIncompatiblegAppliance(DABModelException):
class FeatureBoundToIncompatibleAppliance(DABModelException):
"""FeatureBoundToWrongAppliance Exception class
Feature have to be bound to correct appliance
"""

View File

@@ -4,7 +4,7 @@ from .exception import (
FeatureAlreadyBound,
FeatureNotBound,
FeatureBoundToNonAppliance,
FeatureBoundToIncompatiblegAppliance,
FeatureBoundToIncompatibleAppliance,
)
@@ -26,13 +26,12 @@ class Feature(IFeature, metaclass=_MetaFeature):
cls.check_appliance_bound()
if not issubclass(appliance_cls, cls.__lam_bound_appliance__):
raise FeatureBoundToIncompatiblegAppliance(
raise FeatureBoundToIncompatibleAppliance(
f"Feature {cls} is bound to an incompatible Appliance {appliance_cls}"
)
@classmethod
def bind_appliance(cls, appliance_cls):
print(f"BINDING {cls} to {appliance_cls}")
if cls.__lam_bound_appliance__ is not None:
raise FeatureAlreadyBound(
f"Feature {cls} already bound to an Appliance {cls.__lam_bound_appliance__}"

View File

@@ -0,0 +1,39 @@
from typing import Generic, TypeVar, Dict, Protocol, runtime_checkable, Self
TV_Freezable = TypeVar("TV_Freezable")
@runtime_checkable
class FreezableElement(Protocol, Generic[TV_Freezable]):
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, Self] | None = None) -> Self:
pass
def freeze(self, force: bool = False) -> None:
pass
@classmethod
def freeze_class(cls, force: bool = False) -> None:
pass
def validate_schema(self) -> None:
pass
@classmethod
def validate_schema_class(cls) -> None:
pass
@classmethod
@property
def frozen_cls(cls) -> bool:
pass
@classmethod
@property
def mutable_obj(cls) -> bool:
pass
@property
def frozen(self) -> bool:
pass

View File

@@ -0,0 +1,176 @@
from typing import Generic, TypeVar, Optional, Any, Self, Annotated, get_origin, get_args
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from copy import deepcopy
from .lam_field_info import LAMFieldInfo
from .constraint import Constraint
from ..tools import LAMdeepfreeze
from ..exception import InvalidFieldValue, ReadOnlyField
from ..interfaces import FreezableElement
TV_LABField = TypeVar("TV_LABField")
class LAMField(Generic[TV_LABField]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, val: Optional[TV_LABField], ann: Any, i: LAMFieldInfo):
self._default_value: Optional[TV_LABField]
self._value: Optional[TV_LABField]
self.__annotations: Any
self.__name: str = name
self.__source: Optional[type] = None
self.__info: LAMFieldInfo = deepcopy(i)
self._set_annotations(ann)
self.__frozen: bool = False
self._frozen_value: Any = None
self.__frozen_value_set: True = False
self.validate(val)
self._init_value(val)
def _set_annotations(self, ann: Any) -> None:
_origin = get_origin(ann) or ann
_args = get_args(ann)
if _origin is Annotated:
self.__annotations: Any = LAMdeepfreeze(_args[0])
else:
self.__annotations: Any = LAMdeepfreeze(ann)
def _init_value(self, val: Optional[TV_LABField | FreezableElement]):
self._default_value: Optional[TV_LABField] = deepcopy(val)
self._value: Optional[TV_LABField] = val
@property
def name(self) -> str:
return self.__name
def is_frozen(self) -> bool:
return self.__frozen
def freeze(self):
self.__frozen = True
def clone_unfrozen(self) -> Self:
field = LAMFieldFactory.create_field(self.__name, self._default_value, self.__annotations, self.__info)
field.update_value(self._value)
return field
def add_source(self, src: type) -> None:
"""Adds source Appliance to the Field"""
if self.__frozen:
raise ReadOnlyField("Field is frozen, cannot add source now")
self.__source = src
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self.__info.doc
def add_constraint(self, cons: Constraint) -> None:
"""Adds constraint to the Field"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
self.__info.add_constraint(cons)
@property
def constraints(self) -> list[Constraint]:
"""Returns Field's constraint"""
return LAMdeepfreeze(self.__info.constraints)
def validate_self(self):
self.validate(self._value)
def validate(self, val: Optional[TV_LABField]):
try:
check_type(
val,
self.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{self.__name}> is not of expected type {self.annotations}.") from exp
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, val: Optional[TV_LABField] = None) -> None:
"""Updates Field's value"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
self.validate(val)
self._value = val
self.__frozen_value_set = False
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
if self.__frozen:
return self.frozen_value
else:
return self.raw_value
@property
def raw_value(self) -> Optional[TV_LABField]:
"""Returns Field's value"""
if self.__frozen:
raise ReadOnlyField("Field is frozen")
return self._value
def _generate_frozen_value(self):
self._frozen_value = LAMdeepfreeze(self._value)
@property
def frozen_value(self) -> Any:
if not self.__frozen_value_set:
self._generate_frozen_value()
self.__frozen_value_set = True
return self._frozen_value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self.__annotations
@property
def info(self) -> LAMFieldInfo:
"""Returns Field's info"""
return self.__info
class LAMField_Element(LAMField[FreezableElement]):
def _init_value(self, val: Optional[FreezableElement]):
self._default_value = deepcopy(val)
self._default_value.freeze()
self._value = val.clone_as_mutable_variant()
def validate(self, val: Optional[FreezableElement]):
super().validate(val)
if val is not None:
print(val)
val.validate_schema()
@property
def default_value(self) -> Any:
return self._default_value
def update_value(self, val: Optional[FreezableElement] = None) -> None:
super().update_value(val.clone_as_mutable_variant())
def _generate_frozen_value(self):
self._frozen_value = deepcopy(self._value)
self._frozen_value.freeze()
class LAMFieldFactory:
@staticmethod
def create_field(name: str, val: Optional[TV_LABField], anno: Any, info: LAMFieldInfo) -> LAMField:
if isinstance(val, FreezableElement):
print(f"Spawn LAMField_Element {name} !!!")
return LAMField_Element(name, val, anno, info)
else:
print(f"Spawn LAMField {name} !!!")
return LAMField(name, val, anno, info)

View File

@@ -0,0 +1,27 @@
from typing import Optional, Any
from .constraint import Constraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[Constraint]] = None):
self.__doc: str = doc
self.__constraints: list[Constraint]
if constraints is None:
self.__constraints = []
else:
self.__constraints = constraints
def add_constraint(self, constraint: Constraint):
self.__constraints.append(constraint)
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self.__doc
@property
def constraints(self) -> list[Constraint[Any]]:
"""Returns Field's constraints"""
return self.__constraints

View File

@@ -1,12 +1,10 @@
from typing import Any, Type
from frozendict import frozendict
from copy import copy
from ..LAMFields.LAMField import LAMField
from ..LAMFields.FrozenLAMField import FrozenLAMField
from .element import _MetaElement
from .element import _MetaElement, get_mutable_variant
from ..feature import Feature
from ..exception import InvalidFieldValue, InvalidFeatureInheritance, InvalidFieldName
from ..tools import LAMdeepfreeze
class _MetaAppliance(_MetaElement):
@@ -20,7 +18,7 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
) -> None:
"""
Appliance-specific pre-check: ensure the `features` slot exists in schema.
@@ -28,7 +26,7 @@ class _MetaAppliance(_MetaElement):
Copies the parent's `features` mapping when inheriting to keep it per-class.
"""
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
super().check_class(name, bases, namespace, stack_exts) # type: ignore[misc]
if "features" not in namespace["__lam_schema__"]:
namespace["__lam_schema__"]["features"] = {}
@@ -39,11 +37,11 @@ class _MetaAppliance(_MetaElement):
name: str,
base: type[Any],
namespace: dict[str, Any],
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
super().inherit_schema(name, base, namespace, extensions)
super().inherit_schema(name, base, namespace, stack_exts)
if "features" in base.__lam_schema__:
namespace["__lam_schema__"]["features"] = copy(base.__lam_schema__["features"])
namespace["__lam_schema__"]["features"] = dict(base.__lam_schema__["features"])
@classmethod
def process_class_fields(
@@ -51,18 +49,18 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Like meta.process_class_fields but also stages Feature declarations.
Initializes:
extensions["new_features"], extensions["modified_features"]
stack_exts["new_features"], stack_exts["modified_features"]
then defers to the base scanner for regular fields.
"""
extensions["new_features"] = {}
extensions["modified_features"] = {}
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
stack_exts["new_features"] = {}
stack_exts["modified_features"] = {}
super().process_class_fields(name, bases, namespace, stack_exts) # type: ignore[misc]
@classmethod
def process_new_field(
@@ -72,7 +70,7 @@ class _MetaAppliance(_MetaElement):
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
stack_exts: dict[str, Any],
): # pylint: disable=unused-argument
"""
Intercept Feature declarations.
@@ -82,22 +80,19 @@ class _MetaAppliance(_MetaElement):
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
- Otherwise, it is a regular field: delegate to meta.process_new_field.
"""
print(f"(appliance) process_new_field: {_fname}")
print(namespace["__lam_schema__"]["features"].keys())
if _fname == "feature":
if _fname == "features":
raise InvalidFieldName("'feature' is a reserved Field name")
if _fname in namespace["__lam_schema__"]["features"].keys():
print("??a")
if _fname in namespace["__lam_schema__"]["features"]:
if not issubclass(_fvalue, namespace["__lam_schema__"]["features"][_fname]):
raise InvalidFeatureInheritance(
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
)
extensions["modified_features"][_fname] = _fvalue
raise InvalidFeatureInheritance(f"Feature {_fname} is not a subclass of {bases[0]}.{_fname}")
stack_exts["modified_features"][_fname] = get_mutable_variant(_fvalue)
namespace[_fname] = stack_exts["modified_features"][_fname]
elif isinstance(_fvalue, type) and issubclass(_fvalue, Feature):
print("??b")
extensions["new_features"][_fname] = _fvalue
stack_exts["new_features"][_fname] = get_mutable_variant(_fvalue)
namespace[_fname] = stack_exts["new_features"][_fname]
else:
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
super().process_new_field(name, bases, namespace, _fname, _fvalue, stack_exts) # type: ignore[misc]
@classmethod
def commit_fields(
@@ -106,7 +101,7 @@ class _MetaAppliance(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Commit regular fields (via meta) and then bind staged Feature classes.
@@ -115,49 +110,63 @@ class _MetaAppliance(_MetaElement):
- bind it to `cls` (sets the feature's `_BoundAppliance`),
- register it under `cls.__LAMSchema__["features"]`.
"""
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
super().commit_fields(cls, name, bases, namespace, stack_exts) # type: ignore[misc]
for _ftname, _ftvalue in extensions["modified_features"].items():
cls.__lam_schema__["features"][_ftname] = _ftvalue
for _ftname, _ftvalue in extensions["new_features"].items():
_ftvalue.bind_appliance(cls)
cls.__lam_schema__["features"][_ftname] = _ftvalue
cls.__lam_schema__["features"].update(stack_exts["modified_features"])
for v in stack_exts["new_features"].values():
v.bind_appliance(cls)
cls.__lam_schema__["features"].update(stack_exts["new_features"])
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for fname, fdef in obj.__lam_schema__.get("features", {}).items():
# Case 1: plain class or subclass
if isinstance(fdef, type) and issubclass(fdef, Feature):
inst = fdef()
object.__setattr__(obj, fname, inst)
@classmethod
def prepare_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
init_fieldvalues: dict[str, Any],
init_fieldtypes: dict[str, Any],
stack_exts: dict[str, Any],
):
for k, v in cls.__lam_schema__["features"].items():
init_fieldvalues[k] = v
init_fieldtypes[k] = v
# Case 2: (class, dict) → dict overrides
elif isinstance(fdef, tuple) and len(fdef) == 2:
feat_cls, overrides = fdef
inst = feat_cls()
for field_name, new_val in overrides.items():
if field_name not in feat_cls.__lam_schema__:
raise InvalidFieldValue(f"Feature '{fname}' has no field '{field_name}'")
field = feat_cls.__lam_schema__[field_name]
field.validate(new_val)
object.__setattr__(inst, field_name, LAMdeepfreeze(new_val))
inst.__lam_schema__[field_name] = FrozenLAMField(
LAMField(field_name, new_val, field.annotations, field.info)
)
object.__setattr__(obj, fname, inst)
@classmethod
def commit_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
fakecls_exports: dict[str, Any],
stack_exts: dict[str, Any],
):
for fk, fv in cls.__lam_schema__["features"].items():
for k, v in fv.__lam_schema__.items():
v.update_value(fakecls_exports[fk].__getattr__(k))
else:
raise InvalidFieldValue(
f"Invalid feature definition stored for '{fname}': {fdef!r}"
)
@classmethod
def finalize_class(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
stack_exts: dict[str, Any],
):
cls.__lam_schema__["features"] = frozendict(cls.__lam_schema__["features"])
def apply_overrides(cls, obj, extensions, *args, **kwargs):
super().finalize_class(cls, name, bases, namespace, stack_exts)
if not cls.__lam_class_mutable__:
for feat in cls.__lam_schema__["features"].values():
feat.freeze_class(True)
def populate_instance(cls: Type, obj: Any, stack_exts: dict[str, Any], *args: Any, **kw: Any):
super().populate_instance(obj, stack_exts, *args, **kw)
obj.__lam_schema__["features"] = dict(cls.__lam_schema__["features"])
def apply_overrides(cls, obj, stack_exts, *args, **kwargs):
"""
Support for runtime field and feature overrides.
@@ -171,15 +180,17 @@ class _MetaAppliance(_MetaElement):
# --- feature overrides ---
for k, v in list(kwargs.items()):
if k in cls.__lam_schema__.get("features", {}):
base_feat_cls = cls.__lam_schema__["features"][k]
if k in obj.__lam_schema__["features"]:
base_feat_cls = obj.__lam_schema__["features"][k]
# Case 1: subclass replacement (inheritance)
if isinstance(v, type) and issubclass(v, base_feat_cls):
v.check_appliance_compatibility(cls)
# record subclass into instance schema
obj.__lam_schema__["features"][k] = v
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
if not obj.__lam_class_mutable__:
obj.__lam_schema__["features"][k].freeze_class(True)
kwargs.pop(k)
# Case 2: dict override
@@ -189,15 +200,43 @@ class _MetaAppliance(_MetaElement):
kwargs.pop(k)
else:
raise InvalidFieldValue(
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
)
raise InvalidFieldValue(f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}")
# --- new features not declared at class level ---
for k, v in list(kwargs.items()):
if isinstance(v, type) and issubclass(v, Feature):
v.check_appliance_compatibility(cls)
obj.__lam_schema__["features"][k] = v
obj.__lam_schema__["features"][k] = get_mutable_variant(v)
if not obj.__lam_class_mutable__:
obj.__lam_schema__["features"][k].freeze_class(True)
kwargs.pop(k)
super().apply_overrides(obj, extensions, *args, **kwargs)
super().apply_overrides(obj, stack_exts, *args, **kwargs)
def finalize_instance(cls: Type, obj, stack_exts: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for k, v in obj.__lam_schema__["features"].items():
# Case 1: plain class or subclass
if isinstance(v, type) and issubclass(v, Feature):
inst = v()
object.__setattr__(obj, k, inst)
# Case 2: (class, dict) → dict overrides
elif isinstance(v, tuple) and len(v) == 2:
feat_cls, overrides = v
inst = feat_cls(**overrides)
object.__setattr__(obj, k, inst)
obj.__lam_schema__["features"][k] = feat_cls
else:
raise InvalidFieldValue(f"Invalid feature definition stored for '{k}': {fdef!r}")
obj.__lam_schema__["features"] = frozendict(obj.__lam_schema__["features"])
super().finalize_instance(obj, stack_exts)

View File

@@ -1,16 +1,18 @@
from typing import Optional, TypeVar, get_origin, get_args, Dict, Any, Callable, Type, Union
from typing import Optional, TypeVar, get_origin, get_args, Any, Type, Union, Dict, Annotated
from types import FunctionType, UnionType
from types import FunctionType, UnionType, new_class
from copy import deepcopy, copy
import sys
import weakref
import inspect, ast, textwrap
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
from ..tools import _resolve_annotation, _peel_annotated
from ..LAMFields.LAMField import LAMField
from ..LAMFields.LAMFieldInfo import LAMFieldInfo
from ..LAMFields.FrozenLAMField import FrozenLAMField
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
from frozendict import frozendict
from ..tools import _resolve_annotation
from ..lam_field.lam_field import LAMFieldFactory, LAMField, LAMField_Element
from ..lam_field.lam_field_info import LAMFieldInfo
from ..defines import ALLOWED_HELPERS_MATH, ALLOWED_HELPERS_DEFAULT, ALLOWED_MODEL_FIELDS_TYPES
from ..base_element import BaseElement
@@ -29,64 +31,203 @@ from ..exception import (
InvalidInitializerType,
IncompletelyAnnotatedField,
UnsupportedFieldType,
WrongUsage,
)
class IElement(BaseElement): ...
# Cache per base class -> mutable variant
_MUTABLE_VARIANTS = weakref.WeakKeyDictionary()
class IFeature(BaseElement): ...
def _merge_options(base_opts: tuple, *adds) -> tuple:
merged = list(base_opts)
for a in adds:
if a not in merged:
merged.append(a)
return tuple(merged)
class IAppliance(BaseElement): ...
class IMutableVariant:
pass
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
_type,
):
# print(f"_type={_type}")
_type = _peel_annotated(_type)
def get_mutable_variant(base: Type[BaseElement]) -> Type[BaseElement]:
"""
Return a subclass of `base` that behaves the same except that instances
are created object-mutable (because the class was defined with options=(ObjectMutable,)).
"""
print(f"____ Walking through: {base}")
print(f"bases: {base.__bases__}")
if base in (IElement, IFeature, IAppliance):
return base
if base.mutable_obj or issubclass(base, IMutableVariant):
print("already mutable")
return base # already mutable
cached = _MUTABLE_VARIANTS.get(base)
if cached:
print("cached")
return cached
meta = type(base) # keep the same metaclass
base_opts = getattr(base, "__lam_options__", ())
new_opts = _merge_options(base_opts, ObjectMutable, ClassMutable, _MutableClone)
# Recursively lift each direct base
lifted_bases = []
root_base: Optional[type[BaseElement]] = None
for b in base.__bases__:
if b in (IElement, IFeature, IAppliance):
if root_base:
raise BrokenInheritance(f"Multiple exclusive root bases (previous {root_base}, now {b}")
else:
root_base = b
elif b is BaseElement:
raise BrokenInheritance("BaseElement must not be used")
elif issubclass(b, IMutableVariant):
lifted_bases.append(b)
elif issubclass(b, BaseElement) and b is not BaseElement:
lifted_bases.append(get_mutable_variant(b))
print(f"lifted_bases: {lifted_bases}")
# Keep original behavior (inherit from C), AND attach lifted base variants
if len(lifted_bases) == 0:
if root_base is None:
raise BrokenInheritance("inheritance root not found")
bases = (root_base,)
else:
bases = tuple(lifted_bases) + (base,) # ensures B' is subclass of B and A'
if IMutableVariant not in bases:
bases = bases + (IMutableVariant,)
name = f"{base.__name__}__Mutable"
def body(ns: dict[str, Any]) -> None:
ns["__module__"] = base.__module__
ns["__annotations__"] = base.__annotations__
# ns["__lam_class_mutable__"] = True
# ns["__lam_object_mutable__"] = True
ns["__qualname__"] = f"{base.__qualname__}__Mutable"
ns["__doc__"] = f"Mutable runtime variant of {base.__qualname__}"
for fname, fval in base.__lam_schema__.items():
if isinstance(fval, LAMField):
# v = getattr(base, fname)
ns[fname] = fval.clone_unfrozen().value
# IMPORTANT: pass options via kwds so your meta receives them
print(f"CREATING {name}")
variant = new_class(
name,
bases,
{"metaclass": meta, "options": new_opts},
body,
)
# Optional: register for pickling/import
sys.modules[base.__module__].__dict__[name] = variant
_MUTABLE_VARIANTS[base] = variant
return variant
class IBaseElement(BaseElement):
def clone_as_mutable_variant(self, *, deep: bool = True, _memo: Dict[int, BaseElement] | None = None) -> BaseElement:
if isinstance(self, type):
raise WrongUsage("clone_as_mutable_variant can only be applied to an instance")
if self.mutable_obj:
return self # already mutable
if _memo is None:
_memo = {}
sid = id(self)
if sid in _memo:
return _memo[sid]
dst_cls = get_mutable_variant(type(self))
# Create a fresh instance using the normal constructor path
dst = dst_cls() # your meta will populate defaults; we'll overwrite below
_memo[sid] = dst
for fname, fval in self.__lam_schema__.items():
if isinstance(fval, LAMField):
v = getattr(self, fname)
if isinstance(v, BaseElement):
if deep:
v = v.clone_as_mutable_variant(deep=True, _memo=_memo)
setattr(dst, fname, v)
else:
setattr(dst, fname, fval.clone_unfrozen().value)
# dst.__dict__[fname] = v
return dst
class IElement(IBaseElement): ...
class IFeature(IBaseElement): ...
class IAppliance(IBaseElement): ...
def _check_annotation_definition(_type, first_layer: bool = True): # pylint: disable=too-complex,too-many-return-statements
print(f"_type={_type}, {first_layer}")
_origin = get_origin(_type) or _type
_args = get_args(_type)
# handle Optional[] and Union[None,...]
if (_origin is Union or _origin is UnionType) and type(None) in _args:
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
# handle Annotated[,]
if _origin is Annotated:
if not first_layer:
raise UnsupportedFieldType("Annotated[] is only supported as parent annotation")
return _check_annotation_definition(_args[0], False)
# handle other Union[...]
# handle Optional[] and Union[None,...]
if _origin is Union or _origin is UnionType:
return all(_check_annotation_definition(_) for _ in _args)
if (len(_args) != 2) or (type(None) not in list(_args)) or (not first_layer):
raise UnsupportedFieldType(
"Union[] is only supported to implement Optional (takes 2 parameters) and is only supported as parent annotation"
)
return any(_check_annotation_definition(_, False) for _ in get_args(_type) if _ is not type(None))
# handle Dict[...]
if _origin is dict:
if len(_args) != 2:
raise IncompletelyAnnotatedField(
f"Dict Annotation requires 2 inner definitions: {_type}"
)
if not _peel_annotated(_args[0]) in ALLOWED_MODEL_FIELDS_TYPES:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
if not _args[0] in ALLOWED_MODEL_FIELDS_TYPES:
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {_type}")
return _check_annotation_definition(_args[1])
# return _check_annotation_definition(_args[1], False)
return any(_check_annotation_definition(_, False) for _ in _args)
# handle Tuple[]
if _origin is tuple:
if len(_args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(_args) == 2 and _args[1] is Ellipsis:
return _check_annotation_definition(_args[0])
return all(_check_annotation_definition(_) for _ in _args)
return _check_annotation_definition(_args[0], False)
return any(_check_annotation_definition(_, False) for _ in _args)
# handle Set[],Tuple[],FrozenSet[],List[]
if _origin in [set, frozenset, tuple, list]:
if len(_args) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all(_check_annotation_definition(_) for _ in _args)
return any(_check_annotation_definition(_, False) for _ in _args)
if isinstance(_origin, type) and issubclass(_origin, IElement):
return
if isinstance(_origin, type):
if issubclass(_origin, IElement):
return
elif issubclass(_origin, IAppliance):
raise UnsupportedFieldType(f"Nested Appliance are not supported: {_type}")
if _type in ALLOWED_MODEL_FIELDS_TYPES:
if _origin in ALLOWED_MODEL_FIELDS_TYPES:
return
raise UnsupportedFieldType(_type)
raise UnsupportedFieldType(_origin)
def _check_initializer_safety(func) -> None:
@@ -111,11 +252,7 @@ def _check_initializer_safety(func) -> None:
# Find the FunctionDef node that corresponds to this initializer
init_node = next(
(
n
for n in mod.body
if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) and n.name == func.__name__
),
(n for n in mod.body if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef)) and n.name == func.__name__),
None,
)
if init_node is None:
@@ -140,9 +277,7 @@ def _check_initializer_safety(func) -> None:
continue
if isinstance(val, (int, str, float, bool, type(None))):
continue
raise FunctionForbidden(
f"Closures are forbidden in __initializer__ (captured: {name}={val!r})"
)
raise FunctionForbidden(f"Closures are forbidden in __initializer__ (captured: {name}={val!r})")
def _blocked_import(*args, **kwargs):
@@ -183,7 +318,7 @@ class ModelSpecView:
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
raise NonExistingField(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
@@ -212,25 +347,58 @@ T_Meta = TypeVar("T_Meta", bound="_MetaElement")
T_BE = TypeVar("T_BE", bound="BaseElement")
class ElementOption:
class ElementOptions:
pass
class ClassMutable:
class ClassMutable(ElementOptions):
pass
class ObjectMutable:
class ObjectMutable(ElementOptions):
pass
class _MutableClone(ElementOptions):
pass
class _MetaElement(type):
"""metaclass to use to build BaseElement"""
modified_fields: Dict[str, Any] = {}
new_fields: Dict[str, LAMField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__lam_schema__: dict[str, Any] = {}
def __new__(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[BaseElement], ...],
namespace: dict[str, Any],
**kwargs,
) -> Type:
"""BaseElement new class"""
# create a dict to pass contextual arg onto the stack (multithread safe class init)
stack_exts: dict[str, Any] = {}
stack_exts["kwargs"] = kwargs
# retrieve options and normalize it (always a tuple)
if "options" not in stack_exts["kwargs"]:
stack_exts["kwargs"]["options"] = ()
elif not isinstance(stack_exts["kwargs"]["options"], tuple):
stack_exts["kwargs"]["options"] = (stack_exts["kwargs"]["options"],)
else:
stack_exts["kwargs"]["options"] = stack_exts["kwargs"]["options"]
# main class creation pipeline
mcs.check_class(name, bases, namespace, stack_exts)
mcs.process_class_fields(name, bases, namespace, stack_exts)
_cls = super().__new__(mcs, name, bases, namespace)
mcs.commit_fields(_cls, name, bases, namespace, stack_exts)
mcs.apply_initializer(_cls, name, bases, namespace, stack_exts)
mcs.finalize_class(_cls, name, bases, namespace, stack_exts)
if not _cls.__lam_class_mutable__:
_cls.freeze_class(True)
_cls.__lam_initialized__ = True
return _cls
@classmethod
def check_class(
@@ -238,7 +406,7 @@ class _MetaElement(type):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
) -> None:
"""
Early class-build hook.
@@ -249,27 +417,33 @@ class _MetaElement(type):
This runs before the class object is created.
"""
print(f"__NEW__ Defining {name}, len(bases) {len(bases)}, with keys { list(namespace)}")
print(f"__NEW__ Defining {name}, bases {bases}")
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
if _MutableClone not in stack_exts["kwargs"]["options"]:
raise MultipleInheritanceForbidden(f"Multiple inheritance is not supported by dabmodel: {bases}")
if len(bases) == 0:
raise BrokenInheritance("wrong base class (missing base class")
raise BrokenInheritance(f"missing base class")
if not issubclass(bases[0], BaseElement):
raise BrokenInheritance("wrong base class")
if not any([issubclass(base, BaseElement) for base in bases]):
raise BrokenInheritance(f"wrong base class: {bases}")
mcs.inherit_schema(name, bases[0], namespace, extensions)
# handle schema/fields inheritance
mcs.inherit_schema(name, bases[0], namespace, stack_exts)
# force field without default value to be instantiated (with None)
# force annotated fields without value to be still instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
for k_unknown in [_ for _ in namespace["__annotations__"] if _ not in namespace]:
namespace[k_unknown] = None
namespace["__lam_initialized__"] = False
namespace["__lam_class_mutable__"] = ClassMutable in extensions["kwargs"]["options"]
namespace["__lam_object_mutable__"] = ObjectMutable in extensions["kwargs"]["options"]
# process options
namespace["__lam_class_mutable__"] = ClassMutable in stack_exts["kwargs"]["options"]
namespace["__lam_object_mutable__"] = ObjectMutable in stack_exts["kwargs"]["options"]
namespace["__lam_options__"] = stack_exts["kwargs"]["options"]
@classmethod
def inherit_schema( # pylint: disable=too-complex,too-many-branches
@@ -277,9 +451,16 @@ class _MetaElement(type):
name: str,
base: type[Any],
namespace: dict[str, Any],
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
namespace["__lam_schema__"] = copy(base.__lam_schema__)
# create a new schema instance
namespace["__lam_schema__"] = {}
# copy elements from the parent class
namespace["__lam_schema__"].update(base.__lam_schema__)
# clone element (unfrozen)
for k, v in namespace["__lam_schema__"].items():
if isinstance(v, LAMField):
namespace["__lam_schema__"][k] = namespace["__lam_schema__"][k].clone_unfrozen()
@classmethod
def process_class_fields( # pylint: disable=too-complex,too-many-branches
@@ -287,7 +468,7 @@ class _MetaElement(type):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Scan the class namespace and partition fields.
@@ -299,43 +480,53 @@ class _MetaElement(type):
Validates annotations and types and removes processed items from `namespace`
so they won't become normal attributes. Results are staged into:
mcs.new_fields, mcs.modified_fields, mcs.initializer
stack_exts["new_fields"], stack_exts["modified_fields"], stack_exts["initializer"]
to be committed later.
"""
# iterating new and modified fields
mcs.modified_fields = {}
mcs.new_fields = {}
mcs.initializer = None
# Fields Factory
stack_exts["modified_fields"] = {}
stack_exts["new_fields"] = {}
stack_exts["initializer"] = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (
name.startswith("_") and _fname == "__initializer"
):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType("__initializer should be a classmethod")
mcs.initializer = _fvalue.__func__
for k, v in namespace.items():
print(f" {name} Processing Field: {k} / {v}")
# handling initializer method
if k == f"_{name}__initializer" or (name.startswith("_") and k == "__initializer"):
if not isinstance(v, classmethod):
raise InvalidInitializerType("__initializer must be a classmethod")
stack_exts["initializer"] = v.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("_"):
# skipping protected/private/dunder methods and attributes
elif k.startswith("_"):
pass
elif isinstance(_fvalue, classmethod):
# skipping classmethods
elif isinstance(v, classmethod):
pass
elif isinstance(_fvalue, FunctionType):
# skipping methods
elif isinstance(v, FunctionType):
pass
# disallowing nested appliances
elif isinstance(v, IAppliance) or (isinstance(v, type) and issubclass(v, IAppliance)):
raise UnsupportedFieldType(f"Nested Appliance are not supported: {name}:{v}")
# supported Fields
else:
print(f"Parsing Field: {_fname} / {_fvalue}")
if _fname in namespace["__lam_schema__"].keys(): # Modified fields
mcs.process_modified_field(name, bases, namespace, _fname, _fvalue, extensions)
else: # New fieds
mcs.process_new_field(name, bases, namespace, _fname, _fvalue, extensions)
# removing modified fields from class (will add them back later)
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in mcs.modified_fields:
del namespace[_fname]
if mcs.initializer is not None and initializer_name is not None:
print(f"Staging Field: {k} / {v}")
# Modified fields (already in parent's schema as LAMField)
if k in namespace["__lam_schema__"] and isinstance(namespace["__lam_schema__"][k], LAMField):
mcs.process_modified_field(name, bases, namespace, k, v, stack_exts)
# New fields (others)
else:
mcs.process_new_field(name, bases, namespace, k, v, stack_exts)
# removing processed Fields and initializer from namespace (will add them back in the class later)
for k in stack_exts["new_fields"]:
del namespace[k]
for k in stack_exts["modified_fields"]:
del namespace[k]
if stack_exts["initializer"] is not None and initializer_name is not None:
del namespace[initializer_name]
@classmethod
@@ -346,20 +537,21 @@ class _MetaElement(type):
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
stack_exts: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *modified* field declared by a subclass.
Forbids annotation changes, validates the new default value against
the inherited annotation, and stages the new default into `mcs.modified_fields`.
the inherited annotation, and stages the new default into `stack_exts["modified_fields"]`.
"""
# forbid already existing Field's schema from being modified
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation(
f"annotations cannot be modified on derived classes {_fname}"
)
raise ReadOnlyFieldAnnotation(f"annotations cannot be modified on derived classes {_fname}")
# validate the new value
namespace["__lam_schema__"][_fname].validate(_fvalue)
mcs.modified_fields[_fname] = _fvalue
# stage the new value
stack_exts["modified_fields"][_fname] = _fvalue
@classmethod
def process_new_field(
@@ -369,56 +561,98 @@ class _MetaElement(type):
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
stack_exts: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *new* field declared on the class.
Resolves string annotations against a whitelist, validates `Annotated[...]`
payloads (allowing only LAMFieldInfo), checks the default value type,
and stages the field as a `LAMField` in `mcs.new_fields`.
and stages the field as a `LAMField` in `stack_exts["new_fields"]`.
"""
# print(f"New field: {_fname}")
print(f"New field: {_fname} / {_fvalue}")
# check if field is annotated
# forbid non annotated field
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
# check if annotations' format is correct and save them
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(
namespace["__annotations__"][_fname]
)
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
# effectively checking the value is conform to annotations
try:
_check_annotation_definition(namespace["__annotations__"][_fname])
except InvalidFieldAnnotation:
raise
except Exception as ex:
raise InvalidFieldAnnotation(
f"Field <{_fname}> has not an allowed or valid annotation."
) from ex
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.") from ex
# extracting LAMFieldInfo from Annotated[] annotations
_finfo: LAMFieldInfo = LAMFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = (
getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
)
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], LAMFieldInfo):
raise InvalidFieldAnnotation(
"Only LAMFieldInfo object is allowed as Annotated data."
)
if len(args) == 2 and not issubclass(type(args[1]), LAMFieldInfo):
raise InvalidFieldAnnotation("Only LAMFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# stage the new field
stack_exts["new_fields"][_fname] = LAMFieldFactory.create_field(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
mcs.new_fields[_fname] = LAMField(
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
)
@classmethod
def commit_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
stack_exts: dict[str, Any],
):
"""
Commit staged fields into the class schema (`__lam_schema__`).
- For modified fields: copy the parent's LAMField, update its value.
- For new fields: set the freshly built LAMField and record its source.
"""
# updating values of modified fields
for k, v in stack_exts["modified_fields"].items():
cls.__lam_schema__[k].update_value(v)
# registering new fields
for k, v in stack_exts["new_fields"].items():
v.add_source(cls)
cls.__lam_schema__[k] = v
@classmethod
def prepare_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
init_fieldvalues: dict[str, Any],
init_fieldtypes: dict[str, Any],
stack_exts: dict[str, Any],
):
pass
@classmethod
def commit_initializer_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
fakecls_exports: dict[str, Any],
stack_exts: dict[str, Any],
):
pass
@classmethod
def apply_initializer(
@@ -427,7 +661,7 @@ class _MetaElement(type):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Apply the optional `__initializer` classmethod to compute derived defaults.
@@ -438,172 +672,61 @@ class _MetaElement(type):
class schema's DABFields.
"""
if mcs.initializer is not None:
_check_initializer_safety(mcs.initializer)
if stack_exts["initializer"] is not None:
# checking initializer function sanity
_check_initializer_safety(stack_exts["initializer"])
# preparing initializer context values
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__lam_schema__.items():
if isinstance(_fvalue, LAMField):
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
# clone = v.clone_unfrozen().value
clone = deepcopy(v.value)
init_fieldvalues[k] = clone
init_fieldtypes[k] = v.annotations
mcs.prepare_initializer_fields(cls, name, bases, namespace, init_fieldvalues, init_fieldtypes, stack_exts)
# creating a fake class container to hold the context
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
# fakecls = cls
# preparing a fake and safe environment
safe_globals = {
"__builtins__": {"__import__": _blocked_import},
**ALLOWED_HELPERS_DEFAULT,
}
# if mcs.initializer.__code__.co_freevars:
# if stack_exts["initializer"].__code__.co_freevars:
# raise FunctionForbidden("__initializer must not use closures")
# creating the fake call
safe_initializer = FunctionType(
mcs.initializer.__code__,
stack_exts["initializer"].__code__,
safe_globals,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=mcs.initializer.__closure__,
name=stack_exts["initializer"].__name__,
argdefs=stack_exts["initializer"].__defaults__,
closure=stack_exts["initializer"].__closure__,
)
# calling initializer
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
field = cls.__lam_schema__[_fname]
field.validate(_fvalue)
cls.__lam_schema__[_fname] = LAMField(
_fname, _fvalue, field.annotations, field.info
)
def __new__(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
**kwargs,
) -> Type:
"""BaseElement new class"""
extensions: dict[str, Any] = {}
extensions["kwargs"] = kwargs
if "options" not in extensions["kwargs"]:
extensions["kwargs"]["options"] = ()
elif extensions["kwargs"]["options"] is not tuple:
extensions["kwargs"]["options"] = (extensions["kwargs"]["options"],)
mcs.check_class(name, bases, namespace, extensions)
mcs.process_class_fields(name, bases, namespace, extensions)
_cls = super().__new__(mcs, name, bases, namespace)
mcs.commit_fields(_cls, name, bases, namespace, extensions)
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
mcs.finalize_class(_cls, name, bases, namespace, extensions)
if not _cls.__lam_class_mutable__:
_cls.freeze_class(True)
_cls.__lam_initialized__ = True
return _cls
# copying values back to the class
fakecls_exports = fakecls.export()
for k, v in cls.__lam_schema__.items():
if isinstance(v, LAMField):
cls.__lam_schema__[k].update_value(fakecls_exports[k])
mcs.commit_initializer_fields(cls, name, bases, namespace, fakecls_exports, stack_exts)
@classmethod
def commit_fields(
def finalize_class(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
"""
Commit staged fields into the class schema (`__lam_schema__`).
- For modified fields: copy the parent's LAMField, update its value.
- For new fields: set the freshly built LAMField and record its source.
"""
for _fname, _fvalue in mcs.modified_fields.items():
cls.__lam_schema__[_fname] = deepcopy(bases[0].__lam_schema__[_fname])
cls.__lam_schema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(cls)
cls.__lam_schema__[_fname] = _fvalue
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args)
extensions: dict[str, Any] = {}
cls.populate_instance(
obj, extensions, *args, **kw
) # pylint: disable=no-value-for-parameter
cls.freeze_instance_schema(
obj, extensions, *args, **kw
) # pylint: disable=no-value-for-parameter
cls.apply_overrides(obj, extensions, *args, **kw) # pylint: disable=no-value-for-parameter
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
if not cls.__lam_object_mutable__:
obj.freeze(True)
return obj
def populate_instance(cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any):
"""
Populate the new instance with field values from the class schema.
Copies each LAMField.value to an instance attribute (deep-frozen view).
"""
for _fname, _fvalue in cls.__lam_schema__.items():
if isinstance(_fvalue, LAMField):
object.__setattr__(obj, _fname, _fvalue.raw_value)
def freeze_instance_schema(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Freeze the instance's schema by wrapping DABFields into FrozenLAMField.
Creates a per-instance `__lam_schema__` dict where each field is read-only.
"""
inst_schema = copy(obj.__lam_schema__)
for _fname, _fvalue in cls.__lam_schema__.items():
if isinstance(_fvalue, LAMField):
inst_schema[_fname] = FrozenLAMField(_fvalue)
if "features" in inst_schema:
inst_schema["features"] = dict(inst_schema["features"])
object.__setattr__(obj, "__lam_schema__", inst_schema)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Hook for runtime overrides at instance creation.
Invoked after the schema has been frozen but before finalize_instance.
Subclasses of _MetaElement can override this to support things like:
- Field overrides: MyApp(field=value)
"""
# --- field overrides (unchanged) ---
for k, v in list(kwargs.items()):
if k in cls.__lam_schema__: # regular field
field = cls.__lam_schema__[k]
field.validate(v)
object.__setattr__(obj, k, v)
obj.__lam_schema__[k] = FrozenLAMField(
LAMField(k, v, field.annotations, field.info)
)
kwargs.pop(k)
if kwargs:
unknown = ", ".join(sorted(kwargs.keys()))
raise InvalidFieldValue(f"Unknown parameters: {unknown}")
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
"""
Finalization hook invoked at the end of instance construction.
Subclasses of the metaclass override this to attach runtime components
to the instance. (Example: BaseMetaAppliance instantiates bound Features
and sets them as attributes on the appliance instance.)
"""
# freezing the field schema
cls.__lam_schema__ = frozendict(cls.__lam_schema__)
def __setattr__(cls, name: str, value: Any):
if not hasattr(cls, "__lam_initialized__") or not getattr(cls, "__lam_initialized__"):
@@ -616,31 +739,106 @@ class _MetaElement(type):
raise NonExistingField(f"Can't create new class attributes: {name}")
if not cls.__lam_class_mutable__:
raise ReadOnlyField(f"Class Field <{name}> is read only.")
raise ReadOnlyField(f"Class is immutable.")
field = cls.__lam_schema__[name]
field.validate(value)
cls.__lam_schema__[name] = LAMField(name, value, field.annotations, field.info)
cls.__lam_schema__[name].update_value(value)
return
def __getattr__(cls, name) -> Any:
if (
hasattr(cls, "__lam_initialized__")
and getattr(cls, "__lam_initialized__")
and name in cls.__lam_schema__
):
if cls.__lam_class_mutable__:
return cls.__lam_schema__[name].raw_value
if hasattr(cls, "__lam_initialized__") and getattr(cls, "__lam_initialized__") and name in cls.__lam_schema__:
return cls.__lam_schema__[name].value
raise NonExistingField(f"Non existing class attribute: {name}")
@classmethod
def finalize_class(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
pass
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
cls.validate_schema_class()
obj = super().__call__(*args)
# create a dict to pass contextual arg onto the stack (multithread safe class init)
stack_exts: dict[str, Any] = {}
# create instance attributes from schema
cls.populate_instance(obj, stack_exts, *args, **kw)
cls.apply_overrides(obj, stack_exts, *args, **kw)
cls.finalize_instance(obj, stack_exts)
return obj
def populate_instance(cls: Type, obj: Any, stack_exts: dict[str, Any], *args: Any, **kw: Any):
"""
Populate the new instance with field values from the class schema.
Copies each LAMField.value to an instance attribute (deep-frozen view).
"""
obj.__lam_schema__ = dict(cls.__lam_schema__)
for k, v in obj.__lam_schema__.items():
if isinstance(v, LAMField):
unfrozen_clone = v.clone_unfrozen()
obj.__lam_schema__[k] = unfrozen_clone
if obj.__lam_object_mutable__:
object.__setattr__(obj, k, unfrozen_clone.value)
else:
object.__setattr__(obj, k, unfrozen_clone.frozen_value)
def apply_overrides(cls, obj, stack_exts, *args, **kwargs):
"""
Hook for runtime overrides at instance creation.
Invoked after the schema has been frozen but before finalize_instance.
Subclasses of _MetaElement can override this to support things like:
- Field overrides: MyApp(field=value)
"""
# --- field overrides (unchanged) ---
print("!!!???????")
for k, v in list(kwargs.items()):
if k in obj.__lam_schema__: # regular field
print(f"??????? {k} {v}")
print(obj.__lam_schema__[k])
if isinstance(obj.__lam_schema__[k], LAMField_Element):
valid_val = True
try:
obj.__lam_schema__[k].validate(v)
except InvalidFieldValue:
valid_val = False
if valid_val:
obj.__lam_schema__[k].update_value(v)
if not obj.__lam_object_mutable__:
object.__setattr__(obj, k, obj.__lam_schema__[k].frozen_value)
else:
object.__setattr__(obj, k, obj.__lam_schema__[k].value)
elif isinstance(v, dict):
raise RuntimeError("initializing Elem with dict is not supported yet")
else:
raise InvalidFieldValue(f"Element override for '{k}' must be a Feature subclass or dict, got {type(v)}")
else:
obj.__lam_schema__[k].update_value(v)
if not obj.__lam_object_mutable__:
object.__setattr__(obj, k, obj.__lam_schema__[k].frozen_value)
else:
object.__setattr__(obj, k, obj.__lam_schema__[k].value)
kwargs.pop(k)
if kwargs:
unknown = ", ".join(sorted(kwargs))
raise InvalidFieldValue(f"Unknown parameters: {unknown}")
def finalize_instance(cls: Type, obj: Any, stack_exts: dict[str, Any]):
"""
Finalization hook invoked at the end of instance construction.
Subclasses of the metaclass override this to attach runtime components
to the instance. (Example: BaseMetaAppliance instantiates bound Features
and sets them as attributes on the appliance instance.)
"""
obj.__lam_schema__ = frozendict(obj.__lam_schema__)
if not obj.__lam_object_mutable__:
for v in obj.__lam_schema__.values():
if isinstance(v, LAMField):
v.freeze()
obj.freeze(True)

View File

@@ -14,12 +14,11 @@ class _MetaFeature(_MetaElement):
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
stack_exts: dict[str, Any],
):
if "appliance" in extensions["kwargs"]:
cls.bind_appliance(extensions["kwargs"]["appliance"])
if "appliance" in stack_exts["kwargs"]:
cls.bind_appliance(stack_exts["kwargs"]["appliance"])
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
print(f"2GOT {cls}: {cls.__lam_bound_appliance__}")
def finalize_instance(cls: Type, obj: Any, stack_exts: dict[str, Any]):
cls.check_appliance_bound()
super().finalize_instance(obj, extensions)
super().finalize_instance(obj, stack_exts)

View File

@@ -1,16 +1,36 @@
"""library's internal tools"""
from typing import Any, get_origin, get_args
from collections import ChainMap
from typing import (
Any,
Annotated,
get_origin,
get_args,
Union,
Self,
Optional,
List,
Dict,
Tuple,
Set,
FrozenSet,
Mapping,
Callable,
)
import typing
from dataclasses import dataclass
from types import UnionType, NoneType
import types
from uuid import UUID
from datetime import datetime
import json
import inspect
from frozendict import deepfreeze
from .defines import (
ALLOWED_ANNOTATIONS,
)
from frozendict import deepfreeze, frozendict
from .defines import ALLOWED_ANNOTATIONS, ALLOWED_MODEL_FIELDS_TYPES
from .exception import IncompletelyAnnotatedField, UnsupportedFieldType
class LAMJSONEncoder(json.JSONEncoder):
@@ -39,29 +59,15 @@ def LAMdeepfreeze(value):
def is_data_attribute(name: str, value: any) -> bool:
if name.startswith("__") and name.endswith("__"):
if name.startswith("_"):
return False
if isinstance(value, (staticmethod, classmethod, property)):
return False
if inspect.isfunction(value) or inspect.isclass(value) or inspect.ismethoddescriptor(value):
if inspect.isfunction(value) or inspect.ismethoddescriptor(value):
return False
return True
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only

View File

@@ -0,0 +1,53 @@
# dabmodel (c) by chacha
#
# dabmodel is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from typing import Annotated, Union, Optional
import sys
import subprocess
from os import chdir, environ
from pathlib import Path
print(__name__)
print(__package__)
from src import dabmodel as dm
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class AnnotationsWalkerTest(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
def test_validate(self):
ann = dict[int, list[int]] | dict[int, list[int | str]]
val = {1: [2], 2: ["a", [1]]}
res = dm.tools.AnnotationWalker(ann, (dm.tools.HorizontalValidationTrigger(val),))
res.run()
def test_simple(self):
print(isinstance(None, type(None)))
print("\n== From OBJs ==")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[str]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
print("\n== From STRING ==")
res = dm.tools.AnnotationWalker('Annotated[Optional[dict[int, list[str]]], "comment"]', (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
res = dm.tools.AnnotationWalker(Annotated[Optional[dict[int, list[None]]], "comment"], (dm.tools.LAMSchemaValidation(),))
print(f"res={res.run()}")
# ---------- main ----------
if __name__ == "__main__":
unittest.main()

View File

@@ -88,9 +88,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
self.immutable_vars__test_field(app1, "VarBool", True, False)
self.immutable_vars__test_field(app1, "VarBool2", False, True)
self.immutable_vars__test_field(
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app1,
"VarBytes2",
@@ -158,9 +156,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
self.immutable_vars__test_field(app1, "VarBool", True, False)
self.immutable_vars__test_field(app1, "VarBool2", False, True)
self.immutable_vars__test_field(
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app1,
"VarBytes2",
@@ -255,7 +251,6 @@ class ApplianceTest(unittest.TestCase):
testVar4: "Set[str]" = {"a", "c"}
testVar5: set[str] = {"a", "b"}
testVar6: "set[str]" = {"a", "b"}
testVar7: set[int | str] = {1, 2, "abcd", "efg"}
app1 = Appliance1()
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
@@ -267,7 +262,6 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
self.immutable_vars__test_field(app1, "testVar5", {"a", "b"}, {"h", "c"})
self.immutable_vars__test_field(app1, "testVar6", {"a", "b"}, {"h", "c"})
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {"h", "c"})
# must work
sorted(app1.testVar)
@@ -334,7 +328,6 @@ class ApplianceTest(unittest.TestCase):
testVar4: "frozenset[str]" = frozenset({"a", "c"})
testVar5: FrozenSet[int] = frozenset({1, 2})
testVar6: "FrozenSet[int]" = frozenset({1, 2})
testVar7: FrozenSet[int | str] = frozenset({1, 2, "abcd", "efg"})
app1 = Appliance1()
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
@@ -346,7 +339,6 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
self.immutable_vars__test_field(app1, "testVar5", {1, 2}, {1, 5})
self.immutable_vars__test_field(app1, "testVar6", {1, 2}, {1, 5})
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {1, 5})
# must work
sorted(app1.testVar)
@@ -410,7 +402,6 @@ class ApplianceTest(unittest.TestCase):
testVar4: "List[str]" = ["a", "c"]
testVar5: list[str] = ["a", "b"]
testVar6: "list[str]" = ["a", "b"]
testVar7: List[Union[int, str]] = [1, 2, 3, "one", "two", "three"]
app1 = Appliance1()
# Note: lists are converted to tuples
@@ -420,9 +411,6 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "testVar4", ("a", "c"), ["h", "e"])
self.immutable_vars__test_field(app1, "testVar5", ("a", "b"), ["h", "c"])
self.immutable_vars__test_field(app1, "testVar6", ("a", "b"), ["h", "c"])
self.immutable_vars__test_field(
app1, "testVar7", (1, 2, 3, "one", "two", "three"), ["h", "c"]
)
# must work
sorted(app1.testVar)
@@ -518,7 +506,6 @@ class ApplianceTest(unittest.TestCase):
testVar5: tuple[str, ...] = ("a", "b")
testVar6: "tuple[str,...]" = ("a", "b")
testVar7: Tuple[int, str] = (1, "b")
# testVar7: Tuple[Union[int, str]] = (1, 2, 3, "one", "two", "three")
app1 = Appliance1()
@@ -612,12 +599,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
self.check_immutable_fields_schema(
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
)
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
self.check_immutable_fields_schema(
@@ -656,12 +639,8 @@ class ApplianceTest(unittest.TestCase):
self.assertIn("__lam_schema__", dir(app1))
self.assertIn("__lam_schema__", app1.__dict__)
self.check_immutable_fields_schema(
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(
app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), list[str]
)
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), list[str])
self.check_immutable_fields_schema(
app1,
"Dict1",
@@ -678,18 +657,10 @@ class ApplianceTest(unittest.TestCase):
)
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app1, "Tuple2", ("a", "b"), ("a", "b"), tuple[str, ...])
self.check_immutable_fields_schema(
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(
app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
self.check_immutable_fields_schema(app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), set[int])
# same test with Typing types (list -> List ...)
# class can be created
@@ -710,12 +681,8 @@ class ApplianceTest(unittest.TestCase):
self.assertIn("__lam_schema__", dir(app1))
self.assertIn("__lam_schema__", app1.__dict__)
self.check_immutable_fields_schema(
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), List[str]
)
self.check_immutable_fields_schema(
app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), List[str]
)
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), List[str])
self.check_immutable_fields_schema(app1, "ListStr2", ("val3", "val4"), ("val3", "val4"), List[str])
self.check_immutable_fields_schema(
app1,
"Dict1",
@@ -732,18 +699,10 @@ class ApplianceTest(unittest.TestCase):
)
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), Tuple[str, ...])
self.check_immutable_fields_schema(app1, "Tuple2", ("a", "b"), ("a", "b"), Tuple[str, ...])
self.check_immutable_fields_schema(
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int]
)
self.check_immutable_fields_schema(
app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int]
)
self.check_immutable_fields_schema(
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), Set[int]
)
self.check_immutable_fields_schema(
app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), Set[int]
)
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int])
self.check_immutable_fields_schema(app1, "FrozenSet2", frozenset({1, 2}), frozenset({1, 2}), FrozenSet[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), Set[int])
self.check_immutable_fields_schema(app1, "Set2", frozenset({1, 2}), frozenset({1, 2}), Set[int])
def test_immutable_fields_annotated(self):
"""Testing first appliance level, and Field types (annotated)"""
@@ -785,9 +744,7 @@ class ApplianceTest(unittest.TestCase):
self.assertEqual(app1.__lam_schema__["VarBool"].doc, "foo9")
self.immutable_vars__test_field(app1, "VarBool2", False, True)
self.assertEqual(app1.__lam_schema__["VarBool2"].doc, "foo10")
self.immutable_vars__test_field(
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
self.assertEqual(app1.__lam_schema__["VarBytes"].doc, "foo11")
self.immutable_vars__test_field(
app1,
@@ -852,9 +809,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
self.immutable_vars__test_field(app1, "VarBool", True, False)
self.immutable_vars__test_field(app1, "VarBool2", False, True)
self.immutable_vars__test_field(
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app1,
"VarBytes2",
@@ -868,12 +823,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
self.check_immutable_fields_schema(
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
)
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
self.check_immutable_fields_schema(
@@ -903,9 +854,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app2, "VarComplex2", complex(3, 0), complex(3, 2))
self.immutable_vars__test_field(app2, "VarBool", False, False)
self.immutable_vars__test_field(app2, "VarBool2", True, True)
self.immutable_vars__test_field(
app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app2,
"VarBytes2",
@@ -919,12 +868,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app2, "VarInt2", 23, 21, int)
self.check_immutable_fields_schema(app2, "VarFloat", 2.6, 12.1, float)
self.check_immutable_fields_schema(app2, "VarFloat2", 1.5, 21.2, float)
self.check_immutable_fields_schema(
app2, "VarComplex", complex(7, 1), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app2, "VarComplex2", complex(3, 0), complex(8, 6), complex
)
self.check_immutable_fields_schema(app2, "VarComplex", complex(7, 1), complex(3, 5), complex)
self.check_immutable_fields_schema(app2, "VarComplex2", complex(3, 0), complex(8, 6), complex)
self.check_immutable_fields_schema(app2, "VarBool", False, True, bool)
self.check_immutable_fields_schema(app2, "VarBool2", True, False, bool)
self.check_immutable_fields_schema(
@@ -961,9 +906,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app3, "VarComplex2", complex(3, 0), complex(3, 2))
self.immutable_vars__test_field(app3, "VarBool", False, False)
self.immutable_vars__test_field(app3, "VarBool2", True, True)
self.immutable_vars__test_field(
app3, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app3, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app3,
"VarBytes2",
@@ -978,12 +921,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app3, "VarInt2", 23, 21, int)
self.check_immutable_fields_schema(app3, "VarFloat", 2.6, 12.1, float)
self.check_immutable_fields_schema(app3, "VarFloat2", 1.5, 21.2, float)
self.check_immutable_fields_schema(
app3, "VarComplex", complex(7, 1), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app3, "VarComplex2", complex(3, 0), complex(8, 6), complex
)
self.check_immutable_fields_schema(app3, "VarComplex", complex(7, 1), complex(3, 5), complex)
self.check_immutable_fields_schema(app3, "VarComplex2", complex(3, 0), complex(8, 6), complex)
self.check_immutable_fields_schema(app3, "VarBool", False, True, bool)
self.check_immutable_fields_schema(app3, "VarBool2", True, False, bool)
self.check_immutable_fields_schema(
@@ -1012,9 +951,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
self.immutable_vars__test_field(app1, "VarBool", True, False)
self.immutable_vars__test_field(app1, "VarBool2", False, True)
self.immutable_vars__test_field(
app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app1,
"VarBytes2",
@@ -1028,12 +965,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app1, "VarInt2", 21, 21, int)
self.check_immutable_fields_schema(app1, "VarFloat", 12.1, 12.1, float)
self.check_immutable_fields_schema(app1, "VarFloat2", 21.2, 21.2, float)
self.check_immutable_fields_schema(
app1, "VarComplex", complex(3, 5), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app1, "VarComplex2", complex(8, 6), complex(8, 6), complex
)
self.check_immutable_fields_schema(app1, "VarComplex", complex(3, 5), complex(3, 5), complex)
self.check_immutable_fields_schema(app1, "VarComplex2", complex(8, 6), complex(8, 6), complex)
self.check_immutable_fields_schema(app1, "VarBool", True, True, bool)
self.check_immutable_fields_schema(app1, "VarBool2", False, False, bool)
self.check_immutable_fields_schema(
@@ -1061,9 +994,7 @@ class ApplianceTest(unittest.TestCase):
self.immutable_vars__test_field(app2, "VarComplex2", complex(3, 0), complex(3, 2))
self.immutable_vars__test_field(app2, "VarBool", False, False)
self.immutable_vars__test_field(app2, "VarBool2", True, True)
self.immutable_vars__test_field(
app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 ")
)
self.immutable_vars__test_field(app2, "VarBytes", bytes.fromhex("21f0 e1f2 "), bytes.fromhex("11f0 F1f2 "))
self.immutable_vars__test_field(
app2,
"VarBytes2",
@@ -1077,12 +1008,8 @@ class ApplianceTest(unittest.TestCase):
self.check_immutable_fields_schema(app2, "VarInt2", 23, 21, int)
self.check_immutable_fields_schema(app2, "VarFloat", 2.6, 12.1, float)
self.check_immutable_fields_schema(app2, "VarFloat2", 1.5, 21.2, float)
self.check_immutable_fields_schema(
app2, "VarComplex", complex(7, 1), complex(3, 5), complex
)
self.check_immutable_fields_schema(
app2, "VarComplex2", complex(3, 0), complex(8, 6), complex
)
self.check_immutable_fields_schema(app2, "VarComplex", complex(7, 1), complex(3, 5), complex)
self.check_immutable_fields_schema(app2, "VarComplex2", complex(3, 0), complex(8, 6), complex)
self.check_immutable_fields_schema(app2, "VarBool", False, True, bool)
self.check_immutable_fields_schema(app2, "VarBool2", True, False, bool)
self.check_immutable_fields_schema(
@@ -1137,16 +1064,12 @@ class ApplianceTest(unittest.TestCase):
app1 = Appliance1()
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app1,
"Dict1",
@@ -1155,26 +1078,18 @@ class ApplianceTest(unittest.TestCase):
dict[int, float],
)
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
app2 = Appliance2()
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
self.check_immutable_fields_schema(
app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app2,
"Dict1",
@@ -1182,27 +1097,17 @@ class ApplianceTest(unittest.TestCase):
{1: 1.1, 4: 7.6, 91: 23.6},
dict[int, float],
)
self.check_immutable_fields_schema(
app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
)
self.check_immutable_fields_schema(
app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app1,
"Dict1",
@@ -1211,12 +1116,8 @@ class ApplianceTest(unittest.TestCase):
dict[int, float],
)
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
# class can be created
class Appliance3(Appliance2):
@@ -1229,28 +1130,18 @@ class ApplianceTest(unittest.TestCase):
app3 = Appliance3()
self.immutable_vars__test_field(app3, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app3, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app3, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app3, "Set1", frozenset({1, 20}), set({4, 0}))
self.immutable_vars__test_field(
app3, "ListStr2", ("mod val3", "mod val3"), ["mod val3", "mod val3"]
)
self.immutable_vars__test_field(
app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6}
)
self.immutable_vars__test_field(app3, "ListStr2", ("mod val3", "mod val3"), ["mod val3", "mod val3"])
self.immutable_vars__test_field(app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6})
self.immutable_vars__test_field(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"))
self.immutable_vars__test_field(
app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127})
)
self.immutable_vars__test_field(app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127}))
self.immutable_vars__test_field(app3, "Set2", frozenset({10, 250}), set({10, 250}))
self.check_immutable_fields_schema(
app3, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app3, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app3,
"Dict1",
@@ -1258,15 +1149,9 @@ class ApplianceTest(unittest.TestCase):
{1: 1.1, 4: 7.6, 91: 23.6},
dict[int, float],
)
self.check_immutable_fields_schema(
app3, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
)
self.check_immutable_fields_schema(
app3, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app3, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app3, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app3, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app3, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.check_immutable_fields_schema(
app3,
@@ -1282,9 +1167,7 @@ class ApplianceTest(unittest.TestCase):
{9: 8.1, 5: 98.6, 551: 3.6},
dict[int, float],
)
self.check_immutable_fields_schema(
app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"), tuple[str, ...]
)
self.check_immutable_fields_schema(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"), tuple[str, ...])
self.check_immutable_fields_schema(
app3,
"FrozenSet2",
@@ -1292,21 +1175,15 @@ class ApplianceTest(unittest.TestCase):
frozenset({114, 127}),
frozenset[int],
)
self.check_immutable_fields_schema(
app3, "Set2", frozenset({10, 250}), frozenset({10, 250}), set[int]
)
self.check_immutable_fields_schema(app3, "Set2", frozenset({10, 250}), frozenset({10, 250}), set[int])
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
self.check_immutable_fields_schema(
app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app2,
"Dict1",
@@ -1314,27 +1191,17 @@ class ApplianceTest(unittest.TestCase):
{1: 1.1, 4: 7.6, 91: 23.6},
dict[int, float],
)
self.check_immutable_fields_schema(
app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...]
)
self.check_immutable_fields_schema(
app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(
app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}
)
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(
app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str]
)
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(
app1,
"Dict1",
@@ -1343,12 +1210,8 @@ class ApplianceTest(unittest.TestCase):
dict[int, float],
)
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(
app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int]
)
self.check_immutable_fields_schema(
app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int]
)
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
def test_initializer(self):
"""Testing first appliance level, and Field types (simple)"""
@@ -1619,7 +1482,7 @@ class ApplianceTest(unittest.TestCase):
x: int = 1
app = App()
with self.assertRaises(dm.NewFieldForbidden):
with self.assertRaises(dm.NonExistingField):
app.y = 2
def test_private_field_allowed(self):
@@ -1804,34 +1667,37 @@ class ApplianceTest(unittest.TestCase):
def test_deepfreeze_nested_dict_list_set(self):
class App(dm.Appliance):
data: dict[str, list[int] | set[str] | dict[str, list[int] | set[str]]] = {
"numbers": [1, 2, 3],
data: dict[str, list[int]] = {"numbers": [1, 2, 3]}
data2: dict[str, set[str]] = {
"letters": {"a", "b", "c"},
"mixed": {"x": [4, 5], "y": {"z"}},
}
data3: dict[str, dict[str, list[int]]] = {
"mixed": {"x": [4, 5]},
}
app = App()
# Top-level: should be frozendict
self.assertEqual(type(app.data).__name__, "frozendict")
self.assertEqual(type(app.data2).__name__, "frozendict")
self.assertEqual(type(app.data3).__name__, "frozendict")
# Lists must be frozen to tuple
self.assertIsInstance(app.data["numbers"], tuple)
self.assertIsInstance(app.data["mixed"]["x"], tuple)
self.assertIsInstance(app.data3["mixed"]["x"], tuple)
# Sets must be frozen to frozenset
self.assertIsInstance(app.data["letters"], frozenset)
self.assertIsInstance(app.data["mixed"]["y"], frozenset)
self.assertIsInstance(app.data2["letters"], frozenset)
# Check immutability
with self.assertRaises(TypeError):
app.data["numbers"] += (99,)
with self.assertRaises(AttributeError):
app.data["letters"].add("d")
app.data2["letters"].add("d")
with self.assertRaises(TypeError):
app.data["mixed"]["x"] += (6,)
app.data3["mixed"]["x"] += (6,)
def test_unknown_parameters_raise(self):
class App(dm.Appliance):
@@ -1906,6 +1772,52 @@ class ApplianceTest(unittest.TestCase):
app = App()
self.assertEqual(app.data["nums"], (1, 2)) # frozen tuple after init
def test_features_field_not_allowed(self):
with self.assertRaises(dm.InvalidFieldName):
class App(dm.Appliance):
features: str = ""
def test_features_field_not_allowed_2(self):
with self.assertRaises(dm.InvalidFieldName):
class App(dm.Appliance):
features: dm.Element = dm.Element()
def test_nested_appliance_not_allowed(self):
with self.assertRaises(dm.UnsupportedFieldType):
class App(dm.Appliance):
class NestdApp(dm.Appliance):
pass
def test_nested_appliance_not_allowed_2(self):
class NestdApp(dm.Appliance):
pass
with self.assertRaises(dm.UnsupportedFieldType):
class App(dm.Appliance):
wrong: NestdApp = NestdApp()
def test_nested_appliance_not_allowed_3(self):
class NestdApp(dm.Appliance):
pass
with self.assertRaises(dm.UnsupportedFieldType):
class App(dm.Appliance):
wrong: Optional[NestdApp] = None
def test_nested_appliance_not_allowed_4(self):
class NestdApp(dm.Appliance):
pass
with self.assertRaises(dm.UnsupportedFieldType):
class App(dm.Appliance):
wrong: NestdApp = None
# ---------- main ----------

View File

@@ -7,7 +7,7 @@
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from typing import Annotated, Union
import sys
import subprocess
from os import chdir, environ
@@ -340,7 +340,6 @@ class ElementTest(unittest.TestCase):
elem: E = E(ivalue=45)
a = A()
self.assertIsInstance(a.elem, E)
self.assertEqual(a.elem.ivalue, 45)
self.assertEqual(a.elem.get_increment(), 46)
@@ -352,7 +351,6 @@ class ElementTest(unittest.TestCase):
cls.elem = cls.elem.increment()
b = B()
self.assertEqual(b.elem.ivalue, 46)
self.assertEqual(b.elem.get_increment(), 47)
@@ -374,27 +372,488 @@ class ElementTest(unittest.TestCase):
with self.assertRaises(dm.FunctionForbidden):
class B(dm.Element):
class C(dm.Element):
elem: E = E()
@classmethod
def __initializer(cls):
print("COUCOU")
cls.elem = E(ivalue=test_fun())
def test_mutable_class(self):
def test_mutable_class2_newelement_fails(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 43
with self.assertRaises(dm.NonExistingField):
E.test = 123
e = E()
with self.assertRaises(dm.NonExistingField):
e.test = 123
def test_mutable_class_freeze(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 43
E.ivalue = 12
self.assertEqual(E.ivalue, 12)
E.freeze_class()
self.assertEqual(E.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E.ivalue = 13
self.assertEqual(E.ivalue, 12)
e = E()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_freeze(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ivalue: int = 43
self.assertEqual(E.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
E.ivalue = 13
self.assertEqual(E.ivalue, 43)
e = E()
self.assertEqual(e.ivalue, 43)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
def test_mutable_object_and_class_freeze(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 43
self.assertEqual(E.ivalue, 43)
E.ivalue = 13
self.assertEqual(E.ivalue, 13)
e = E()
self.assertEqual(e.ivalue, 13)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
def test_mutable_object_and_class_freeze_2(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 43
self.assertFalse(E.__lam_schema__["ivalue"].is_frozen())
E.freeze_class()
self.assertTrue(E.__lam_schema__["ivalue"].is_frozen())
e = E()
self.assertFalse(e.__lam_schema__["ivalue"].is_frozen())
e.freeze()
self.assertTrue(e.__lam_schema__["ivalue"].is_frozen())
def test_mutable_class_freeze_container(self):
class E(dm.Element, options=(dm.ClassMutable)):
ar_ivalue: list[int] = [43]
E.ar_ivalue.append(12)
self.assertEqual(E.ar_ivalue, [43, 12])
E.freeze_class()
self.assertEqual(E.ar_ivalue, (43, 12))
with self.assertRaises(AttributeError):
E.ar_ivalue.append(52)
self.assertEqual(E.ar_ivalue, (43, 12))
e = E()
self.assertEqual(e.ar_ivalue, (43, 12))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(52)
self.assertEqual(e.ar_ivalue, (43, 12))
def test_mutable_object_freeze_container(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ar_ivalue: list[int] = [43, 54]
self.assertEqual(E.ar_ivalue, (43, 54))
with self.assertRaises(AttributeError):
E.ar_ivalue.append(13)
self.assertEqual(E.ar_ivalue, (43, 54))
e = E()
self.assertEqual(e.ar_ivalue, [43, 54])
e.ar_ivalue.append(32)
self.assertEqual(e.ar_ivalue, [43, 54, 32])
e.freeze()
self.assertEqual(e.ar_ivalue, (43, 54, 32))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(12)
self.assertEqual(e.ar_ivalue, (43, 54, 32))
def test_mutable_object_and_class_freeze_container(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ar_ivalue: list[int] = [43, 54]
self.assertEqual(E.ar_ivalue, [43, 54])
E.ar_ivalue.append(13)
self.assertEqual(E.ar_ivalue, [43, 54, 13])
e = E()
self.assertEqual(e.ar_ivalue, [43, 54, 13])
e.ar_ivalue.append(14)
self.assertEqual(e.ar_ivalue, [43, 54, 13, 14])
e.freeze()
self.assertEqual(e.ar_ivalue, (43, 54, 13, 14))
with self.assertRaises(AttributeError):
e.ar_ivalue.append(15)
self.assertEqual(e.ar_ivalue, (43, 54, 13, 14))
def test_mutable_class_freeze_inheritance(self):
class E(dm.Element, options=(dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_freeze_inheritance(self):
class E(dm.Element, options=(dm.ObjectMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_and_class_freeze_inheritance(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e = E2()
self.assertEqual(e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 14
self.assertEqual(e.ivalue, 12)
def test_mutable_object_and_class_freeze_inheritance_noleak(self):
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
ivalue: int = 12
class E2(E):
pass
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e2 = E2()
self.assertEqual(e2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e2.ivalue = 14
self.assertEqual(e2.ivalue, 12)
# no Leak
self.assertEqual(E.ivalue, 12)
E.ivalue = 13
self.assertEqual(E.ivalue, 13)
e = E()
self.assertEqual(e.ivalue, 13)
e.ivalue = 14
self.assertEqual(e.ivalue, 14)
e.freeze()
self.assertEqual(e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 14)
# no Leak 2
self.assertEqual(E2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E2.ivalue = 13
self.assertEqual(E2.ivalue, 12)
e2 = E2()
self.assertEqual(e2.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e2.ivalue = 14
self.assertEqual(e2.ivalue, 12)
def test_mutable_class_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ClassMutable)):
e: NElem = NElem()
E.e.ivalue = 12
self.assertEqual(E.e.ivalue, 12)
E.freeze_class()
self.assertEqual(E.e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 12)
e = E()
self.assertEqual(e.e.ivalue, 12)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 12)
def test_mutable_object_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ObjectMutable)):
e: NElem = NElem()
self.assertEqual(E.e.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 43)
e = E()
self.assertEqual(e.e.ivalue, 43)
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 14)
e.freeze()
self.assertEqual(e.e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 15
self.assertEqual(e.e.ivalue, 14)
def test_mutable_object_and_class_freeze_nested_element(self):
class NElem(dm.Element):
ivalue: int = 43
class E(dm.Element, options=(dm.ObjectMutable, dm.ClassMutable)):
e: NElem = NElem()
self.assertEqual(E.e.ivalue, 43)
E.e.ivalue = 13
self.assertEqual(E.e.ivalue, 13)
e = E()
self.assertEqual(e.e.ivalue, 13)
e.e.ivalue = 14
self.assertEqual(e.e.ivalue, 14)
e.e.freeze()
self.assertEqual(e.e.ivalue, 14)
with self.assertRaises(dm.ReadOnlyField):
e.e.ivalue = 15
self.assertEqual(e.e.ivalue, 14)
def test_element_clone(self):
class Elem(dm.Element):
ivalue: int = 43
self.assertTrue(Elem.frozen_cls)
self.assertFalse(Elem.mutable_obj)
e = Elem()
print(e)
new_obj1 = e.clone_as_mutable_variant()
self.assertFalse(new_obj1.frozen_cls)
self.assertTrue(new_obj1.mutable_obj)
self.assertIsInstance(new_obj1, Elem)
self.assertEqual(new_obj1.ivalue, 43)
new_obj1.ivalue = 55
self.assertEqual(new_obj1.ivalue, 55)
new_obj2 = e.clone_as_mutable_variant()
self.assertFalse(new_obj2.frozen_cls)
self.assertTrue(new_obj2.mutable_obj)
self.assertNotEqual(new_obj1, new_obj2)
self.assertEqual(type(new_obj1), type(new_obj2))
self.assertEqual(new_obj2.ivalue, 43)
new_obj2.ivalue = 56
self.assertEqual(new_obj2.ivalue, 56)
new_obj3 = e.clone_as_mutable_variant()
self.assertFalse(new_obj3.frozen_cls)
self.assertTrue(new_obj3.mutable_obj)
self.assertNotEqual(new_obj1, new_obj3)
self.assertNotEqual(new_obj2, new_obj3)
self.assertEqual(type(new_obj1), type(new_obj3))
self.assertEqual(type(new_obj2), type(new_obj3))
self.assertEqual(new_obj3.ivalue, 43)
new_obj3.ivalue = 57
self.assertEqual(new_obj3.ivalue, 57)
self.assertEqual(e.ivalue, 43)
with self.assertRaises(dm.ReadOnlyField):
e.ivalue = 15
self.assertEqual(e.ivalue, 43)
self.assertEqual(new_obj1.ivalue, 55)
new_obj1.freeze()
self.assertEqual(new_obj1.ivalue, 55)
with self.assertRaises(dm.ReadOnlyField):
new_obj1.ivalue = 15
self.assertEqual(new_obj1.ivalue, 55)
self.assertEqual(new_obj2.ivalue, 56)
self.assertEqual(new_obj3.ivalue, 57)
new_obj2.ivalue = 76
self.assertEqual(new_obj2.ivalue, 76)
self.assertEqual(new_obj3.ivalue, 57)
self.assertEqual(new_obj1.ivalue, 55)
new_obj2.freeze()
self.assertEqual(new_obj2.ivalue, 76)
with self.assertRaises(dm.ReadOnlyField):
new_obj2.ivalue = 15
self.assertEqual(new_obj2.ivalue, 76)
self.assertTrue(e.frozen)
self.assertTrue(new_obj1.frozen)
self.assertTrue(new_obj2.frozen)
self.assertFalse(new_obj3.frozen)
def test_element_clone_inheritance(self):
class ElemA(dm.Element):
ivalue: int = 43
class ElemB(ElemA):
ivalue = 18
elemA = ElemA()
elemB = ElemB()
_elemA = elemA.clone_as_mutable_variant()
_elemB = elemB.clone_as_mutable_variant()
self.assertIsInstance(elemB, type(elemA))
self.assertIsInstance(_elemA, type(elemA))
self.assertIsInstance(_elemB, type(elemB))
self.assertIsInstance(_elemB, type(_elemA))
def test_element_clone_inheritance_nested(self):
class ElemAInner(dm.Element):
fvalue: float = 0.123
class ElemA(dm.Element):
ivalue: int = 43
el1: ElemAInner = ElemAInner()
class ElemBInner(ElemAInner):
fvalue = 0.9
class ElemB(ElemA):
ivalue = 18
el1 = ElemAInner(fvalue=6.54)
el2: ElemBInner = ElemBInner()
self.assertIsInstance(ElemA.el1, ElemAInner)
self.assertIsInstance(ElemB.el1, ElemAInner)
self.assertIsInstance(ElemB.el2, ElemAInner)
self.assertIsInstance(ElemB.el2, ElemBInner)
self.assertNotIsInstance(ElemB.el1, ElemBInner)
elemA = ElemA()
elemB = ElemB()
_elemA = elemA.clone_as_mutable_variant()
_elemB = elemB.clone_as_mutable_variant()
self.assertIsInstance(elemB, type(elemA))
self.assertIsInstance(_elemA, type(elemA))
self.assertIsInstance(_elemB, type(elemB))
self.assertIsInstance(_elemB, type(_elemA))
self.assertIsInstance(_elemB.el1, type(_elemA.el1))
self.assertIsInstance(_elemA.el1, type(elemA.el1))
self.assertIsInstance(_elemB.el1, type(elemB.el1))
self.assertIsInstance(_elemB.el1, type(_elemA.el1))
self.assertIsInstance(_elemB.el2, type(_elemA.el1))
self.assertIsInstance(_elemB.el2, type(elemB.el1))
self.assertIsInstance(_elemB.el2, type(_elemA.el1))
def test_element_mutable_invalid_instance_override_container(self):
class Elem(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
val: list[int] = []
with self.assertRaises(dm.InvalidFieldValue):
Elem(val=["test"])
def test_element_mutable_invalid_instance_override_container_nested(self):
class ElemInner(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
val: list[int] = []
class ElemOuter(dm.Element, options=(dm.ClassMutable, dm.ObjectMutable)):
inner: ElemInner = ElemInner()
# with self.assertRaises(dm.InvalidFieldValue):
elem = ElemOuter(inner={"val": [1, 2, 3]})
self.assertEqual(elem.inner.val, [1, 2, 3])
def test_class_protocol(self):
self.assertIsInstance(dm.base_element.BaseElement, dm.interfaces.FreezableElement)
def test_wrong_annotated(self):
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[Annotated[str, dm.LAMFieldInfo(doc="foo1")]] = ["default value"]
def test_wrong_annotation_union(self):
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: str | int = "test"
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[str | int] = "test"
with self.assertRaises(dm.UnsupportedFieldType):
class Elem(dm.Appliance):
StrVar: list[None | int] = "test"
def test_annotation_union(self):
class Elem(dm.Appliance):
strvar1: str | None = "test"
strvar1: str | None = None
ivar1: Union[int, None] = 12
ivar2: Union[int, None] = None
# ---------- main ----------

View File

@@ -61,7 +61,7 @@ class FeatureTest(unittest.TestCase):
app1 = Appliance1()
self.assertIsInstance(Appliance1.__lam_schema__["VarStrOuter"], dm.LAMField)
self.assertIsInstance(app1.__lam_schema__["VarStrOuter"], dm.FrozenLAMField)
self.assertTrue(app1.__lam_schema__["VarStrOuter"].is_frozen())
self.assertIn("Feature1", app1.__lam_schema__["features"])
self.assertIn("VarStrInner", app1.__lam_schema__["features"]["Feature1"].__lam_schema__)
self.assertIsInstance(
@@ -69,7 +69,11 @@ class FeatureTest(unittest.TestCase):
dm.LAMField,
)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(app1.Feature1.__lam_schema__["VarStrInner"], dm.FrozenLAMField)
self.assertTrue(app1.Feature1.frozen)
print(app1)
print(app1.Feature1)
print(app1.Feature1.__lam_schema__["VarStrInner"])
self.assertTrue(app1.Feature1.__lam_schema__["VarStrInner"].is_frozen())
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
def test_inheritance(self):
@@ -109,7 +113,7 @@ class FeatureTest(unittest.TestCase):
app3 = Appliance3()
self.assertIsInstance(Appliance1.__lam_schema__["VarStrOuter"], dm.LAMField)
self.assertIsInstance(app1.__lam_schema__["VarStrOuter"], dm.FrozenLAMField)
self.assertTrue(app1.__lam_schema__["VarStrOuter"].is_frozen())
self.assertIn("Feature1", app1.__lam_schema__["features"])
self.assertIn("VarStrInner", app1.__lam_schema__["features"]["Feature1"].__lam_schema__)
self.assertIsInstance(
@@ -117,7 +121,7 @@ class FeatureTest(unittest.TestCase):
dm.LAMField,
)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(app1.Feature1.__lam_schema__["VarStrInner"], dm.FrozenLAMField)
self.assertTrue(app1.Feature1.__lam_schema__["VarStrInner"].is_frozen())
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
self.assertEqual(app1.VarStrOuter, "testvalue APPLIANCE1")
self.assertEqual(app1.Feature1.VarStrInner, "testvalue FEATURE1")
@@ -129,6 +133,12 @@ class FeatureTest(unittest.TestCase):
self.assertEqual(app3.Feature1.VarInt, 42)
self.assertEqual(app3.Feature3.VarStrInner, "testvalue FEATURE3")
class Appliance4(Appliance3):
VarStrOuter = "testvalue APPLIANCE4"
class Feature1(Appliance3.Feature1):
VarStrInner = "testvalue FEATURE1 modded 3"
def test_inheritance2(self):
"""Testing first appliance feature, and Field types (simple)"""
@@ -178,14 +188,6 @@ class FeatureTest(unittest.TestCase):
self.assertEqual(app3.Feature1.VarStrInner, "testvalue FEATURE1")
self.assertEqual(app4.Feature1.VarStrInner, "testvalue FEATURE4")
def test_new_field_forbidden(self):
class App(dm.Appliance):
x: int = 1
app = App()
with self.assertRaises(dm.NewFieldForbidden):
app.y = 2
def test_inherit_declared(self):
class App(dm.Appliance):
class F1(dm.Feature):
@@ -646,6 +648,181 @@ class FeatureTest(unittest.TestCase):
class F1(dm.Feature):
fail: str = "oops"
def test_initializer(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.F.tag = "test"
cls.F.nums.append(3)
self.assertEqual(App.F.tag, "test")
self.assertEqual(App.F.nums, (1, 2, 3))
def test_initializer_nested(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.tag = "test"
cls.nums.append(3)
self.assertEqual(App.F.tag, "test")
self.assertEqual(App.F.nums, (1, 2, 3))
def test_initializer_nested_dual(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
@classmethod
def __initializer(cls):
cls.tag = "test1"
cls.nums.append(3)
@classmethod
def __initializer(cls):
cls.F.tag = "test2"
cls.F.nums.append(4)
self.assertEqual(App.F.tag, "test2")
self.assertEqual(App.F.nums, (1, 2, 3, 4))
def test_container_frozen(self):
class App(dm.Appliance):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
with self.assertRaises(AttributeError):
App.F.nums.append(3)
def test_container_class_mutable_validation(self):
class App(dm.Appliance, options=(dm.ClassMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
App.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
App.freeze_class()
with self.assertRaises(dm.InvalidFieldValue):
a = App()
def test_container_object_mutable_validation(self):
class App(dm.Appliance, options=(dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
a = App()
a.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a.freeze()
def test_container_object_and_class_mutable_validation(self):
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
App.F.nums.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a = App()
with self.assertRaises(dm.InvalidFieldValue):
App.freeze_class()
def test_nested_element_container_object_and_class_mutable(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
App.F.e.i.append(21)
self.assertEquals(App.F.e.i, [1, 21])
App.freeze_class()
with self.assertRaises(AttributeError):
App.F.e.i.append(22)
a = App()
self.assertEquals(a.F.e.i, [1, 21])
a.F.e.i.append(28)
self.assertEquals(a.F.e.i, [1, 21, 28])
a.freeze()
self.assertEquals(a.F.e.i, (1, 21, 28))
with self.assertRaises(AttributeError):
a.F.e.i.append(23)
self.assertEquals(a.F.e.i, (1, 21, 28))
def test_nested_element_container_object_and_class_mutable_validation(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
App.F.e.i.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a = App()
def test_nested_element_container_object_and_class_mutable_validation2(self):
class E(dm.Element):
val: str = "testelem"
i: list[int] = [1]
class App(dm.Appliance, options=(dm.ClassMutable, dm.ObjectMutable)):
integ: int = 18
class F(dm.Feature):
nums: list[int] = [1, 2]
tag: str = "x"
e: E = E(val="modified")
a = App()
a.F.e.i.append("test")
with self.assertRaises(dm.InvalidFieldValue):
a.freeze()
# ---------- main ----------