Compare commits

...

23 Commits

Author SHA1 Message Date
chacha
d20712a72f remove last references to DAB 2025-09-20 19:01:21 +02:00
chacha
2837b6439f cleaning 2025-09-20 18:57:32 +02:00
chacha
b4d6ed6130 add missing __init__ files 2025-09-20 18:48:17 +02:00
chacha
cd69fc22a8 continue renaming 2025-09-20 18:44:19 +02:00
chacha
9aec2d66cd reorganize and rename (partial) 2025-09-20 18:27:36 +02:00
chacha
af81ec5fd3 more test cases 2025-09-20 13:18:40 +02:00
chacha
26e32a004f increase coverage 2025-09-20 12:43:43 +02:00
chacha
b7cbc50f79 work 2025-09-20 11:38:05 +02:00
chacha
86eee2e378 continue features implementation + code lint + typing + etc 2025-09-18 23:21:42 +02:00
chacha
3e0defc574 work 2025-09-18 00:32:32 +02:00
chacha
f6e581381d cleaning 2025-09-17 00:16:30 +02:00
chacha
981c5201a9 partially fix features 2025-09-16 23:40:41 +02:00
chacha
ab11052c8f work 2025-09-09 00:13:06 +02:00
chacha
4f5dade949 first feature implementation 2025-09-08 01:23:46 +02:00
cclecle
cce260bc5e reordering 2025-09-07 18:42:38 +02:00
cclecle
915a4332ee tiny fix :) 2025-09-06 01:47:49 +02:00
cclecle
4dca3eb9d1 improve typing 2025-09-06 01:43:20 +02:00
cclecle
e11c541139 small opt 2025-09-06 01:35:28 +02:00
cclecle
637b50b325 quality & typing fixes 2025-09-06 01:31:55 +02:00
cclecle
f45c9cc8f3 fix unittest 2025-09-05 23:04:16 +02:00
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
cclecle
04a4cf7b36 add deps 2025-09-05 22:56:17 +02:00
cclecle
f42a839cff work 2025-09-05 22:53:47 +02:00
19 changed files with 3247 additions and 627 deletions

View File

@@ -35,8 +35,8 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'pydantic',
'runtype'
'frozendict',
'typeguard'
]
dynamic = ["version"]

View File

@@ -0,0 +1,17 @@
from typing import Generic, TypeVar
T_Field = TypeVar("T_Field")
class BaseConstraint(Generic[T_Field]):
"""BaseConstraint class
Base class for Field's constraints
"""
_bound_type: type
def __init__(self): ...
def check(self, value: T_Field) -> bool:
"""Check if a Constraint is completed"""
return True

View File

@@ -0,0 +1,41 @@
from typing import Generic, TypeVar, Any
from .LAMField import LAMField
from .Constraint import BaseConstraint
from ..tools import LAMdeepfreeze
T_Field = TypeVar("T_Field")
class FrozenLAMField(Generic[T_Field]):
"""FrozenLAMField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: LAMField):
self._inner_field = inner_field
@property
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return LAMdeepfreeze(self._inner_field.doc)
@property
def constraints(self) -> tuple[BaseConstraint]:
"""Returns Field's constraint (frozen)"""
return LAMdeepfreeze(self._inner_field.constraints)
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return LAMdeepfreeze(self._inner_field.annotations)

View File

@@ -0,0 +1,62 @@
from typing import Generic, TypeVar, Optional, Any
from .LAMFieldInfo import LAMFieldInfo
from .Constraint import BaseConstraint
from ..tools import LAMdeepfreeze
T_Field = TypeVar("T_Field")
class LAMField(Generic[T_Field]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: LAMFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[T_Field] = v
self._value: Optional[T_Field] = v
self._annotations: Any = a
self._info: LAMFieldInfo = i
self._constraints: list[BaseConstraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, v: Optional[T_Field] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
return LAMdeepfreeze(self._value)
@property
def raw_value(self) -> Optional[T_Field]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations

View File

@@ -0,0 +1,26 @@
from typing import Optional, Any
from .Constraint import BaseConstraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(
self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None
):
self._doc: str = doc
self._constraints: list[BaseConstraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self) -> list[BaseConstraint[Any]]:
"""Returns Field's constraints"""
return self._constraints

View File

View File

@@ -11,10 +11,16 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .model import (
DABFieldInfo,
BaseAppliance,
BaseFeature,
from .LAMFields.LAMField import LAMField
from .LAMFields.LAMFieldInfo import LAMFieldInfo
from .LAMFields.FrozenLAMField import FrozenLAMField
from .appliance import Appliance
from .feature import Feature
from .exception import (
DABModelException,
MultipleInheritanceForbidden,
BrokenInheritance,
@@ -25,4 +31,10 @@ from .model import (
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
InvalidFeatureInheritance,
FeatureNotBound,
)
__all__ = [name for name in globals() if not name.startswith("_")]

View File

@@ -15,20 +15,26 @@ import warnings
try: # pragma: no cover
__version__ = version("dabmodel")
except PackageNotFoundError: # pragma: no cover
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
except PackageNotFoundError: # pragma: no cover
warnings.warn(
"can not read __version__, assuming local test context, setting it to ?.?.?"
)
__version__ = "?.?.?"
try: # pragma: no cover
dist = distribution("dabmodel")
__Summuary__ = dist.metadata["Summary"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>')
warnings.warn(
'can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>'
)
__Summuary__ = "dabmodel description"
try: # pragma: no cover
dist = distribution("dabmodel")
__Name__ = dist.metadata["Name"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>')
warnings.warn(
'can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>'
)
__Name__ = "dabmodel"

View File

@@ -0,0 +1,9 @@
from .element import Element
from .meta.appliance import _MetaAppliance
class Appliance(Element, metaclass=_MetaAppliance):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""

7
src/dabmodel/element.py Normal file
View File

@@ -0,0 +1,7 @@
from .meta.base import _MetaElement
class Element(metaclass=_MetaElement):
"""Element class
Base class to apply metaclass and set common Fields.
"""

103
src/dabmodel/exception.py Normal file
View File

@@ -0,0 +1,103 @@
class DABModelException(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class FunctionForbidden(DABModelException): ...
class ExternalCodeForbidden(FunctionForbidden): ...
class ClosureForbidden(FunctionForbidden): ...
class ReservedFieldName(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class MultipleInheritanceForbidden(DABModelException):
"""MultipleInheritanceForbidden Exception class
Multiple inheritance is forbidden when using dabmodel
"""
class BrokenInheritance(DABModelException):
"""BrokenInheritance Exception class
inheritance chain is broken
"""
class ReadOnlyField(DABModelException):
"""ReadOnlyField Exception class
The used Field is ReadOnly
"""
class NewFieldForbidden(DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class InvalidFieldAnnotation(DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
"""
class InvalidInitializerType(DABModelException):
"""InvalidInitializerType Exception class
The initializer is not a valid type
"""
class NotAnnotatedField(InvalidFieldAnnotation):
"""NotAnnotatedField Exception class
The Field is not Annotated
"""
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
"""IncompletelyAnnotatedField Exception class
The field annotation is incomplete
"""
class ReadOnlyFieldAnnotation(DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class InvalidFieldValue(DABModelException):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
"""
class InvalidFeatureInheritance(DABModelException):
"""InvalidFeatureInheritance Exception class
Features of same name in child appliance need to be from same type
"""
class FeatureNotBound(DABModelException):
"""FeatureNotBound Exception class
a Feature must be bound to the appliance (or parent)
"""

12
src/dabmodel/feature.py Normal file
View File

@@ -0,0 +1,12 @@
from .element import Element
from .meta.feature import _MetaFeature
class Feature(Element, metaclass=_MetaFeature):
"""Feature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
Enabled: bool = False

View File

View File

@@ -0,0 +1,235 @@
from typing import Any, Type
from copy import copy
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from ..tools import LAMdeepfreeze
from ..LAMFields.LAMField import LAMField
from ..LAMFields.FrozenLAMField import FrozenLAMField
from .base import _MetaElement
from ..feature import Feature
from ..exception import (
InvalidFieldValue,
InvalidFeatureInheritance,
FeatureNotBound,
)
class _MetaAppliance(_MetaElement):
"""_MetaAppliance class
Appliance specific metaclass code
"""
@classmethod
def check_class(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""
Appliance-specific pre-check: ensure the `features` slot exists in schema.
Copies the parent's `features` mapping when inheriting to keep it per-class.
"""
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
if "features" not in namespace["__LAMSchema__"]:
namespace["__LAMSchema__"]["features"] = {}
else:
namespace["__LAMSchema__"]["features"] = copy(
namespace["__LAMSchema__"]["features"]
)
@classmethod
def process_class_fields(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
"""
Like meta.process_class_fields but also stages Feature declarations.
Initializes:
extensions["new_features"], extensions["modified_features"]
then defers to the base scanner for regular fields.
"""
extensions["new_features"] = {}
extensions["modified_features"] = {}
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
@classmethod
def process_new_field(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Intercept Feature declarations.
- If `_fname` already exists in parent's `features`, enforce same type;
stage into `modified_features`.
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
- Otherwise, it is a regular field: delegate to meta.process_new_field.
"""
if _fname in namespace["__LAMSchema__"]["features"].keys():
if not issubclass(_fvalue, namespace["__LAMSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
)
extensions["modified_features"][_fname] = _fvalue
elif isinstance(_fvalue, type) and issubclass(_fvalue, Feature):
extensions["new_features"][_fname] = _fvalue
else:
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
@classmethod
def commit_fields(
mcs: type["meta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Commit regular fields (via meta) and then bind staged Feature classes.
For each new/modified feature:
- bind it to `cls` (sets the feature's `_BoundAppliance`),
- register it under `cls.__LAMSchema__["features"]`.
"""
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
for _ftname, _ftvalue in extensions["modified_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__LAMSchema__["features"][_ftname] = _ftvalue
for _ftname, _ftvalue in extensions["new_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__LAMSchema__["features"][_ftname] = _ftvalue
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for fname, fdef in obj.__LAMSchema__.get("features", {}).items():
# Case 1: plain class or subclass
if isinstance(fdef, type) and issubclass(fdef, Feature):
inst = fdef()
object.__setattr__(obj, fname, inst)
# Case 2: (class, dict) → dict overrides
elif isinstance(fdef, tuple) and len(fdef) == 2:
feat_cls, overrides = fdef
inst = feat_cls()
for field_name, new_val in overrides.items():
if field_name not in feat_cls.__LAMSchema__:
raise InvalidFieldValue(
f"Feature '{fname}' has no field '{field_name}'"
)
field = feat_cls.__LAMSchema__[field_name]
try:
check_type(
new_val,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for {fname}.{field_name}: "
f"expected {field.annotations}, got {new_val!r}"
) from exp
object.__setattr__(inst, field_name, LAMdeepfreeze(new_val))
inst.__LAMSchema__[field_name] = FrozenLAMField(
LAMField(field_name, new_val, field.annotations, field._info)
)
object.__setattr__(obj, fname, inst)
else:
raise InvalidFieldValue(
f"Invalid feature definition stored for '{fname}': {fdef!r}"
)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Support for runtime field and feature overrides.
Fields:
MyApp(name="foo")
Features:
MyApp(F1=MyF1) # inheritance / replacement
MyApp(F1={"val": 42, ...}) # dict override of existing feature
"""
# --- field overrides (unchanged) ---
for k, v in list(kwargs.items()):
if k in cls.__LAMSchema__: # regular field
field = cls.__LAMSchema__[k]
try:
check_type(
v,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for field '{k}': expected {field.annotations}, got {v!r}"
) from exp
object.__setattr__(obj, k, LAMdeepfreeze(v))
obj.__LAMSchema__[k] = FrozenLAMField(
LAMField(k, v, field.annotations, field._info)
)
kwargs.pop(k)
# --- feature overrides ---
for k, v in list(kwargs.items()):
if k in cls.__LAMSchema__.get("features", {}):
base_feat_cls = cls.__LAMSchema__["features"][k]
# Case 1: subclass replacement (inheritance)
if isinstance(v, type) and issubclass(v, base_feat_cls):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
# record subclass into instance schema
obj.__LAMSchema__["features"][k] = v
kwargs.pop(k)
# Case 2: dict override
elif isinstance(v, dict):
# store (class, override_dict) for finalize_instance
obj.__LAMSchema__["features"][k] = (base_feat_cls, v)
kwargs.pop(k)
else:
raise InvalidFieldValue(
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
)
# --- new features not declared at class level ---
for k, v in list(kwargs.items()):
if isinstance(v, type) and issubclass(v, Feature):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
obj.__LAMSchema__["features"][k] = v
kwargs.pop(k)
if kwargs:
unknown = ", ".join(sorted(kwargs.keys()))
raise InvalidFieldValue(f"Unknown parameters: {unknown}")

749
src/dabmodel/meta/base.py Normal file
View File

@@ -0,0 +1,749 @@
from typing import (
Optional,
TypeVar,
Union,
get_origin,
get_args,
List,
Dict,
Any,
Tuple,
Set,
Annotated,
FrozenSet,
Callable,
Type,
)
from types import UnionType, FunctionType, SimpleNamespace
from copy import deepcopy, copy
import math
import inspect, ast, textwrap
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
from ..LAMFields.LAMField import LAMField
from ..LAMFields.LAMFieldInfo import LAMFieldInfo
from ..LAMFields.FrozenLAMField import FrozenLAMField
from ..exception import (
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
NonExistingField,
InvalidInitializerType,
)
ALLOWED_ANNOTATIONS: dict[str, Any] = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
str,
int,
float,
complex,
bool,
bytes,
)
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
ALLOWED_HELPERS_DEFAULT: dict[str, object] = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
def _check_initializer_safety(func) -> None:
"""
Preliminary structural check for __initializer__.
Policy (minimal):
- Forbid 'import' / 'from ... import ...' inside the initializer body.
- Forbid nested function definitions (closures/helpers) in the body.
- Allow lambdas.
- No restrictions on calls here (keep it simple).
- Optionally forbid closures (free vars) for determinism.
"""
try:
src = inspect.getsource(func)
except OSError as exc:
# If source isn't available (rare), fail closed (or skip if you prefer)
raise FunctionForbidden("Cannot inspect __initializer__ source") from exc
src = textwrap.dedent(src)
mod = ast.parse(src)
# Find the FunctionDef node that corresponds to this initializer
init_node = None
for n in mod.body:
if (
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
and n.name == func.__name__
):
init_node = n
break
if init_node is None:
# Fallback: if not found, analyze nothing further to avoid false positives
return
# Walk ONLY the body of the initializer (don't flag the def itself)
body_tree = ast.Module(body=init_node.body, type_ignores=[])
for node in ast.walk(body_tree):
# Forbid imports
if isinstance(node, (ast.Import, ast.ImportFrom)):
raise ImportForbidden("imports disabled in __initializer")
# Forbid nested defs (but allow lambdas)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise FunctionForbidden(
"Nested function definitions are forbidden in __initializer"
)
if isinstance(node, ast.Lambda): # Forbid lambda
raise FunctionForbidden("Lambdas are forbidden in __initializer")
# Optional: forbid closures (keeps determinism; allows lambdas that don't capture)
if func.__code__.co_freevars:
raise FunctionForbidden("Closures are forbidden in __initializer__")
def _blocked_import(*args, **kwargs):
raise ImportForbidden("imports disabled in __initializer")
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval( # pylint: disable=eval-used
ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS
)
return ann
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
_type,
) -> bool:
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(
None
) in get_args(_type):
return all(
_check_annotation_definition(_)
for _ in get_args(_type)
if _ is not type(None)
)
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all(_check_annotation_definition(_) for _ in get_args(_type))
# handle Dict[...]
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(
f"Dict Annotation requires 2 inner definitions: {_type}"
)
return _peel_annotated(
inner[0]
) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all(_check_annotation_definition(_) for _ in inner_types)
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class ModelSpecView:
"""ModelSpecView class
A class that will act as fake BaseElement proxy to allow setting values"""
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(
self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str
):
self._name: str
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
self._module: str
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value: str):
pass
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{name}> value is not of expected type {T}."
) from exp
self._vals[name] = value
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
T_Meta = TypeVar("T_Meta", bound="_MetaElement")
T_BE = TypeVar("T_BE", bound="BaseElement")
class _MetaElement(type):
"""metaclass to use to build BaseElement"""
modified_fields: Dict[str, Any] = {}
new_fields: Dict[str, LAMField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__LAMSchema__: dict[str, Any] = {}
@classmethod
def check_class(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""
Early class-build hook.
Validates the inheritance shape, initializes an empty schema for root classes,
copies the parent schema for subclasses, and ensures all annotated fields
have a default (inserting `None` when missing).
This runs before the class object is created.
"""
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden(
"Multiple inheritance is not supported by dabmodel"
)
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance(
"__DABSchema__ not found in base class, broken inheritance chain."
)
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__LAMSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [
_ for _ in namespace["__annotations__"] if _ not in namespace.keys()
]:
namespace[_funknown] = None
@classmethod
def process_class_fields( # pylint: disable=too-complex,too-many-branches
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
"""
Scan the class namespace and partition fields.
Detects:
- modified fields (shadowing parent values),
- new fields (present in annotations),
- the optional `__initializer` classmethod (in mangled or unmangled form).
Validates annotations and types and removes processed items from `namespace`
so they won't become normal attributes. Results are staged into:
mcs.new_fields, mcs.modified_fields, mcs.initializer
to be committed later.
"""
# iterating new and modified fields
mcs.modified_fields = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (
name.startswith("_") and _fname == "__initializer"
):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
mcs.initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("_"):
pass
elif isinstance(_fvalue, classmethod):
pass
else:
print(f"Parsing Field: {_fname} / {_fvalue}")
if (
len(bases) == 1 and _fname in namespace["__DABSchema__"].keys()
): # Modified fields
mcs.process_modified_field(
name, bases, namespace, _fname, _fvalue, extensions
)
else: # New fieds
mcs.process_new_field(
name, bases, namespace, _fname, _fvalue, extensions
)
# removing modified fields from class (will add them back later)
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in mcs.modified_fields:
del namespace[_fname]
if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
@classmethod
def process_modified_field(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *modified* field declared by a subclass.
Forbids annotation changes, validates the new default value against
the inherited annotation, and stages the new default into `mcs.modified_fields`.
"""
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation(
f"annotations cannot be modified on derived classes {_fname}"
)
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_fields[_fname] = _fvalue
@classmethod
def process_new_field(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *new* field declared on the class.
Resolves string annotations against a whitelist, validates `Annotated[...]`
payloads (allowing only LAMFieldInfo), checks the default value type,
and stages the field as a `LAMField` in `mcs.new_fields`.
"""
# print(f"New field: {_fname}")
# check if field is annotated
if (
"__annotations__" not in namespace
or _fname not in namespace["__annotations__"]
):
raise NotAnnotatedField(
f"Every dabmodel Fields must be annotated ({_fname})"
)
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(
namespace["__annotations__"][_fname]
)
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(
f"Field <{_fname}> has not an allowed or valid annotation."
)
_finfo: LAMFieldInfo = LAMFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(
f"Field <{_fname}> had invalid Annotated value."
)
if len(args) == 2 and not isinstance(args[1], LAMFieldInfo):
raise InvalidFieldAnnotation(
"Only LAMFieldInfo object is allowed as Annotated data."
)
_finfo = args[1]
# check if value is valid
try:
check_type(
_fvalue,
namespace["__annotations__"][_fname],
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
mcs.new_fields[_fname] = LAMField(
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
)
@classmethod
def apply_initializer(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Apply the optional `__initializer` classmethod to compute derived defaults.
The initializer runs in a restricted, import-blocked environment using a
`ModelSpecView` proxy that enforces type checking on assignments.
On success, the computed values are validated and written back into the
class schema's DABFields.
"""
if mcs.initializer is not None:
_check_initializer_safety(mcs.initializer)
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__LAMSchema__.items():
if isinstance(_fvalue, LAMField):
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(
init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__
)
safe_globals = {
"__builtins__": {"__import__": _blocked_import},
**ALLOWED_HELPERS_DEFAULT,
}
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
mcs.initializer.__code__,
safe_globals,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(
_fvalue,
cls.__LAMSchema__[_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__LAMSchema__[_fname].update_value(_fvalue)
def __new__(
mcs: type["_MetaElement"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
) -> Type:
"""BaseElement new class"""
extensions: dict[str, Any] = {}
mcs.check_class(name, bases, namespace, extensions)
mcs.process_class_fields(name, bases, namespace, extensions)
_cls = super().__new__(mcs, name, bases, namespace)
mcs.commit_fields(_cls, name, bases, namespace, extensions)
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
_cls.install_instance_guard(extensions)
return _cls
@classmethod
def commit_fields(
mcs: type["_MetaElement"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Commit staged fields into the class schema (`__DABSchema__`).
- For modified fields: copy the parent's LAMField, update its value.
- For new fields: set the freshly built LAMField and record its source.
"""
for _fname, _fvalue in mcs.modified_fields.items():
cls.__LAMSchema__[_fname] = deepcopy(bases[0].__LAMSchema__[_fname])
cls.__LAMSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(cls)
cls.__LAMSchema__[_fname] = _fvalue
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args)
extensions: dict[str, Any] = {}
cls.populate_instance( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.freeze_instance_schema( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.apply_overrides( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
return obj
def populate_instance(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Populate the new instance with field values from the class schema.
Copies each LAMField.value to an instance attribute (deep-frozen view).
"""
for _fname, _fvalue in cls.__LAMSchema__.items():
if isinstance(_fvalue, LAMField):
object.__setattr__(obj, _fname, _fvalue.value)
def freeze_instance_schema(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Freeze the instance's schema by wrapping DABFields into FrozenLAMField.
Creates a per-instance `__DABSchema__` dict where each field is read-only.
"""
inst_schema = copy(obj.__LAMSchema__)
for _fname, _fvalue in cls.__LAMSchema__.items():
if isinstance(_fvalue, LAMField):
inst_schema[_fname] = FrozenLAMField(_fvalue)
if "features" in inst_schema:
inst_schema["features"] = dict(inst_schema["features"])
object.__setattr__(obj, "__DABSchema__", inst_schema)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Hook for runtime overrides at instance creation.
Invoked after the schema has been frozen but before finalize_instance.
Subclasses of _MetaElement can override this to support things like:
- Field overrides: MyApp(field=value)
- Feature overrides: MyApp(FeatureName=CustomFeature)
- Feature attachments: MyApp(NewFeature=FeatureClass)
By default this does nothing.
"""
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
"""
Finalization hook invoked at the end of instance construction.
Subclasses of the metaclass override this to attach runtime components
to the instance. (Example: BaseMetaAppliance instantiates bound Features
and sets them as attributes on the appliance instance.)
"""
def install_instance_guard(cls: Type, extensions: dict[str, Any]):
"""
Install the runtime `__setattr__` guard on the class.
After instances are constructed, prevents:
- creating new public fields,
- reassigning existing fields post-initialization.
Private/dunder attributes are exempt to allow internal bookkeeping.
"""
orig_setattr = getattr(cls, "__setattr__")
# cls.orig_setattr = orig_setattr
def guarded_setattr(_self, key: str, value: Any):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(_self, key, value)
# block writes after init if key is readonly
if key in _self.__LAMSchema__.keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
# elif key in _self.__DABSchema__["features"].keys():
# if key in _self.__dict__:
# raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden("creating new fields is not allowed")
return orig_setattr(_self, key, value)
setattr(cls, "__setattr__", guarded_setattr)

View File

@@ -0,0 +1,19 @@
from typing import Type, Any
from .base import _MetaElement
from ..exception import FeatureNotBound
class _MetaFeature(_MetaElement):
"""_MetaFeature class
Feature specific metaclass code
"""
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseFeature new instance"""
if cls._BoundAppliance is None:
raise FeatureNotBound()
obj = super().__call__(*args, **kw)
return obj

View File

@@ -1,374 +0,0 @@
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
from types import UnionType
from frozendict import deepfreeze
from copy import deepcopy, copy
from pprint import pprint
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
ALLOWED_ANNOTATIONS = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
class DABModelException(Exception): ...
class MultipleInheritanceForbidden(DABModelException): ...
class BrokenInheritance(DABModelException): ...
class ReadOnlyField(DABModelException): ...
class NewFieldForbidden(DABModelException): ...
class InvalidFieldAnnotation(DABModelException): ...
class NotAnnotatedField(InvalidFieldAnnotation): ...
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
class ReadOnlyFieldAnnotation(DABModelException): ...
class InvalidFieldValue(DABModelException): ...
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
return ann
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
def _check_annotation_definition(_type) -> bool:
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all([_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None)])
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all([_check_annotation_definition(_) for _ in get_args(_type)])
# handle Dict[...]
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all([_check_annotation_definition(_) for _ in inner_types])
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all([_check_annotation_definition(_) for _ in inner_types])
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class BaseConstraint(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
_bound_type: type
def __init__(self): ...
def check(self, value: TV_ALLOWED_MODEL_FIELDS_TYPES) -> bool: ...
def _deepfreeze(value):
if isinstance(value, dict):
return deepfreeze(value)
elif isinstance(value, set):
return frozenset(_deepfreeze(v) for v in value)
elif isinstance(value, list):
return tuple(_deepfreeze(v) for v in value)
elif isinstance(value, tuple):
return tuple(_deepfreeze(v) for v in value)
return value
class DABFieldInfo:
def __init__(self, *, doc: str = "", constraints: list[BaseConstraint] = []):
self._doc: str = doc
self._constraints: list[BaseConstraint] = constraints
@property
def doc(self):
return self._doc
@property
def constraints(self):
return self._constraints
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any, i: str):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._annotations: Any = a
self._info: DABFieldInfo = i
self._constraints: List[BaseConstraint] = i.constraints
def add_source(self, s: type) -> None:
self._source = s
@property
def doc(self):
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
return self._info.constraints
@property
def default_value(self):
return _deepfreeze(self._default_value)
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
self._value = v
@property
def value(self):
return _deepfreeze(self._value)
@property
def annotations(self) -> Any:
return self._annotations
class FrozenDABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, inner_field: DABField):
self._inner_field = inner_field
@property
def doc(self):
return self._inner_field.doc
@property
def constraints(self):
return _deepfreeze(self._inner_field.constraints)
@property
def default_value(self):
return self._inner_field.default_value
@property
def value(self):
return self._inner_field.value
@property
def annotations(self) -> Any:
return _deepfreeze(self._inner_field.annotations)
class BaseMeta(type):
def __new__(mcls, name, bases, namespace):
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
elif len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = dict()
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
# iterating new and modified fields
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField] = {}
for _fname, _fvalue in namespace.items():
if _fname.startswith("__"):
pass
elif _fname == "Constraints" and type(_fvalue) is type:
...
# print("FOUND Constraints")
else:
# print(f"Parsing Field: {_fname} / {_fvalue}")
# Modified fields
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
# print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
modified_field[_fname] = _fvalue
# New fieds
else:
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: Optional[DABFieldInfo] = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation(f"Only DABFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
# removing modified fields from class (will add them back later)
for _fname in new_fields.keys():
del namespace[_fname]
for _fname in modified_field.keys():
del namespace[_fname]
orig_setattr = namespace.get("__setattr__", object.__setattr__)
def guarded_setattr(self, key, value):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(self, key, value)
# block writes after init if key is readonly
if key in self.__DABSchema__.keys():
if hasattr(self, key):
raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden(f"creating new fields is not allowed")
return orig_setattr(self, key, value)
namespace["__setattr__"] = guarded_setattr
cls = super().__new__(mcls, name, bases, namespace)
for _fname, _fvalue in modified_field.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in new_fields.items():
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
return cls
def __call__(cls, *args, **kw):
obj = super().__call__(*args, **kw)
for _fname in cls.__DABSchema__.keys():
setattr(obj, _fname, cls.__DABSchema__[_fname].value)
# obj.__DABSchema__ = deepfreeze(obj.__DABSchema__)
# setattr(obj, "__DABSchema__", deepfreeze(obj.__DABSchema__))
inst_schema = dict()
for _fname, _fvalue in cls.__DABSchema__.items():
inst_schema[_fname] = FrozenDABField(_fvalue)
setattr(obj, "__DABSchema__", inst_schema)
return obj
class BaseElement(metaclass=BaseMeta):
pass
class BaseFeature(BaseElement):
pass
class BaseAppliance(BaseElement):
pass

View File

@@ -1,13 +1,32 @@
"""library's internal tools"""
from uuid import UUID
from datetime import datetime
import json
from frozendict import deepfreeze
class DABJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
elif isinstance(obj, datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
class LAMJSONEncoder(json.JSONEncoder):
"""allows to JSON encode non supported data type"""
def default(self, o):
if isinstance(o, UUID):
# if the o is uuid, we simply return the value of uuid
return o.hex
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)
def LAMdeepfreeze(value):
"""recursive freeze helper function"""
if isinstance(value, dict):
return deepfreeze(value)
if isinstance(value, set):
return frozenset(LAMdeepfreeze(v) for v in value)
if isinstance(value, list):
return tuple(LAMdeepfreeze(v) for v in value)
if isinstance(value, tuple):
return tuple(LAMdeepfreeze(v) for v in value)
return value

File diff suppressed because it is too large Load Diff