Compare commits
25 Commits
0.0.1.post
...
0.0.1.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cd69fc22a8 | ||
|
|
9aec2d66cd | ||
|
|
af81ec5fd3 | ||
|
|
26e32a004f | ||
|
|
b7cbc50f79 | ||
|
|
86eee2e378 | ||
|
|
3e0defc574 | ||
|
|
f6e581381d | ||
|
|
981c5201a9 | ||
|
|
ab11052c8f | ||
|
|
4f5dade949 | ||
|
|
cce260bc5e | ||
|
|
915a4332ee | ||
|
|
4dca3eb9d1 | ||
|
|
e11c541139 | ||
|
|
637b50b325 | ||
|
|
f45c9cc8f3 | ||
|
|
95b0c298ce | ||
|
|
04a4cf7b36 | ||
|
|
f42a839cff | ||
|
|
7f3a4ef545 | ||
|
|
608c8a1010 | ||
|
|
210781f086 | ||
|
|
df966ccac4 | ||
|
|
87682c2c9c |
BIN
dabmodel.zip
Normal file
BIN
dabmodel.zip
Normal file
Binary file not shown.
@@ -35,8 +35,8 @@ classifiers = [
|
||||
dependencies = [
|
||||
'importlib-metadata; python_version<"3.9"',
|
||||
'packaging',
|
||||
'pydantic',
|
||||
'runtype'
|
||||
'frozendict',
|
||||
'typeguard'
|
||||
]
|
||||
dynamic = ["version"]
|
||||
|
||||
|
||||
17
src/dabmodel/LAMFields/Constraint.py
Normal file
17
src/dabmodel/LAMFields/Constraint.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
T_Field = TypeVar("T_Field")
|
||||
|
||||
|
||||
class BaseConstraint(Generic[T_Field]):
|
||||
"""BaseConstraint class
|
||||
Base class for Field's constraints
|
||||
"""
|
||||
|
||||
_bound_type: type
|
||||
|
||||
def __init__(self): ...
|
||||
|
||||
def check(self, value: T_Field) -> bool:
|
||||
"""Check if a Constraint is completed"""
|
||||
return True
|
||||
41
src/dabmodel/LAMFields/FrozenLAMField.py
Normal file
41
src/dabmodel/LAMFields/FrozenLAMField.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from typing import Generic, TypeVar, Any
|
||||
|
||||
from .LAMField import LAMField
|
||||
from .Constraint import BaseConstraint
|
||||
from ..tools import LAMdeepfreeze
|
||||
|
||||
T_Field = TypeVar("T_Field")
|
||||
|
||||
|
||||
class FrozenLAMField(Generic[T_Field]):
|
||||
"""FrozenLAMField class
|
||||
a read-only proxy of a Field
|
||||
"""
|
||||
|
||||
def __init__(self, inner_field: LAMField):
|
||||
self._inner_field = inner_field
|
||||
|
||||
@property
|
||||
def doc(self) -> str:
|
||||
"""Returns Field's documentation (frozen)"""
|
||||
return LAMdeepfreeze(self._inner_field.doc)
|
||||
|
||||
@property
|
||||
def constraints(self) -> tuple[BaseConstraint]:
|
||||
"""Returns Field's constraint (frozen)"""
|
||||
return LAMdeepfreeze(self._inner_field.constraints)
|
||||
|
||||
@property
|
||||
def default_value(self) -> Any:
|
||||
"""Returns Field's default value (frozen)"""
|
||||
return self._inner_field.default_value
|
||||
|
||||
@property
|
||||
def value(self) -> Any:
|
||||
"""Returns Field's value (frosen)"""
|
||||
return self._inner_field.value
|
||||
|
||||
@property
|
||||
def annotations(self) -> Any:
|
||||
"""Returns Field's annotation (frozen)"""
|
||||
return LAMdeepfreeze(self._inner_field.annotations)
|
||||
62
src/dabmodel/LAMFields/LAMField.py
Normal file
62
src/dabmodel/LAMFields/LAMField.py
Normal file
@@ -0,0 +1,62 @@
|
||||
from typing import Generic, TypeVar, Optional, Any
|
||||
|
||||
from .LAMFieldInfo import LAMFieldInfo
|
||||
from .Constraint import BaseConstraint
|
||||
from ..tools import LAMdeepfreeze
|
||||
|
||||
T_Field = TypeVar("T_Field")
|
||||
|
||||
|
||||
class LAMField(Generic[T_Field]):
|
||||
"""This class describe a Field in Schema"""
|
||||
|
||||
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: LAMFieldInfo):
|
||||
self._name: str = name
|
||||
self._source: Optional[type] = None
|
||||
self._default_value: Optional[T_Field] = v
|
||||
self._value: Optional[T_Field] = v
|
||||
self._annotations: Any = a
|
||||
self._info: LAMFieldInfo = i
|
||||
self._constraints: list[BaseConstraint[Any]] = i.constraints
|
||||
|
||||
def add_source(self, s: type) -> None:
|
||||
"""Adds source Appliance to the Field"""
|
||||
self._source = s
|
||||
|
||||
@property
|
||||
def doc(self) -> str:
|
||||
"""Returns Field's documentation"""
|
||||
return self._info.doc
|
||||
|
||||
def add_constraint(self, c: BaseConstraint) -> None:
|
||||
"""Adds constraint to the Field"""
|
||||
self._constraints.append(c)
|
||||
|
||||
@property
|
||||
def constraints(self) -> list[BaseConstraint]:
|
||||
"""Returns Field's constraint"""
|
||||
return self._info.constraints
|
||||
|
||||
@property
|
||||
def default_value(self) -> Any:
|
||||
"""Returns Field's default value (frozen)"""
|
||||
return LAMdeepfreeze(self._default_value)
|
||||
|
||||
def update_value(self, v: Optional[T_Field] = None) -> None:
|
||||
"""Updates Field's value"""
|
||||
self._value = v
|
||||
|
||||
@property
|
||||
def value(self) -> Any:
|
||||
"""Returns Field's value (frozen)"""
|
||||
return LAMdeepfreeze(self._value)
|
||||
|
||||
@property
|
||||
def raw_value(self) -> Optional[T_Field]:
|
||||
"""Returns Field's value"""
|
||||
return self._value
|
||||
|
||||
@property
|
||||
def annotations(self) -> Any:
|
||||
"""Returns Field's annotation"""
|
||||
return self._annotations
|
||||
26
src/dabmodel/LAMFields/LAMFieldInfo.py
Normal file
26
src/dabmodel/LAMFields/LAMFieldInfo.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from typing import Optional, Any
|
||||
from .Constraint import BaseConstraint
|
||||
|
||||
|
||||
class LAMFieldInfo:
|
||||
"""This Class allows to describe a Field in Appliance class"""
|
||||
|
||||
def __init__(
|
||||
self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None
|
||||
):
|
||||
self._doc: str = doc
|
||||
self._constraints: list[BaseConstraint]
|
||||
if constraints is None:
|
||||
self._constraints = []
|
||||
else:
|
||||
self._constraints = constraints
|
||||
|
||||
@property
|
||||
def doc(self) -> str:
|
||||
"""Returns Field's documentation"""
|
||||
return self._doc
|
||||
|
||||
@property
|
||||
def constraints(self) -> list[BaseConstraint[Any]]:
|
||||
"""Returns Field's constraints"""
|
||||
return self._constraints
|
||||
@@ -11,4 +11,28 @@ Main module __init__ file.
|
||||
"""
|
||||
|
||||
from .__metadata__ import __version__, __Summuary__, __Name__
|
||||
from .model import BaseFeature, BaseAppliance
|
||||
|
||||
|
||||
from .LAMFields.LAMField import LAMField
|
||||
from .LAMFields.LAMFieldInfo import LAMFieldInfo
|
||||
from .LAMFields.FrozenLAMField import FrozenLAMField
|
||||
from .appliance import Appliance
|
||||
from .feature import Feature
|
||||
|
||||
|
||||
from .exception import (
|
||||
DABModelException,
|
||||
MultipleInheritanceForbidden,
|
||||
BrokenInheritance,
|
||||
ReadOnlyField,
|
||||
NewFieldForbidden,
|
||||
NotAnnotatedField,
|
||||
ReadOnlyFieldAnnotation,
|
||||
InvalidFieldValue,
|
||||
InvalidFieldAnnotation,
|
||||
IncompletelyAnnotatedField,
|
||||
ImportForbidden,
|
||||
FunctionForbidden,
|
||||
InvalidFeatureInheritance,
|
||||
FeatureNotBound,
|
||||
)
|
||||
|
||||
@@ -15,20 +15,26 @@ import warnings
|
||||
|
||||
try: # pragma: no cover
|
||||
__version__ = version("dabmodel")
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn(
|
||||
"can not read __version__, assuming local test context, setting it to ?.?.?"
|
||||
)
|
||||
__version__ = "?.?.?"
|
||||
|
||||
try: # pragma: no cover
|
||||
dist = distribution("dabmodel")
|
||||
__Summuary__ = dist.metadata["Summary"]
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn('can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>')
|
||||
warnings.warn(
|
||||
'can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>'
|
||||
)
|
||||
__Summuary__ = "dabmodel description"
|
||||
|
||||
try: # pragma: no cover
|
||||
dist = distribution("dabmodel")
|
||||
__Name__ = dist.metadata["Name"]
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn('can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>')
|
||||
warnings.warn(
|
||||
'can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>'
|
||||
)
|
||||
__Name__ = "dabmodel"
|
||||
|
||||
9
src/dabmodel/appliance.py
Normal file
9
src/dabmodel/appliance.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from .element import Element
|
||||
from .meta.appliance import _MetaAppliance
|
||||
|
||||
|
||||
class Appliance(Element, metaclass=_MetaAppliance):
|
||||
"""BaseFeature class
|
||||
Base class for Appliance.
|
||||
An appliance is a server configuration / image that is built using appliance's code and Fields.
|
||||
"""
|
||||
7
src/dabmodel/element.py
Normal file
7
src/dabmodel/element.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .meta.base import _MetaElement
|
||||
|
||||
|
||||
class Element(metaclass=_MetaElement):
|
||||
"""Element class
|
||||
Base class to apply metaclass and set common Fields.
|
||||
"""
|
||||
103
src/dabmodel/exception.py
Normal file
103
src/dabmodel/exception.py
Normal file
@@ -0,0 +1,103 @@
|
||||
class DABModelException(Exception):
|
||||
"""DABModelException Exception class
|
||||
Base Exception for DABModelException class
|
||||
"""
|
||||
|
||||
|
||||
class FunctionForbidden(DABModelException): ...
|
||||
|
||||
|
||||
class ExternalCodeForbidden(FunctionForbidden): ...
|
||||
|
||||
|
||||
class ClosureForbidden(FunctionForbidden): ...
|
||||
|
||||
|
||||
class ReservedFieldName(Exception):
|
||||
"""DABModelException Exception class
|
||||
Base Exception for DABModelException class
|
||||
"""
|
||||
|
||||
|
||||
class MultipleInheritanceForbidden(DABModelException):
|
||||
"""MultipleInheritanceForbidden Exception class
|
||||
Multiple inheritance is forbidden when using dabmodel
|
||||
"""
|
||||
|
||||
|
||||
class BrokenInheritance(DABModelException):
|
||||
"""BrokenInheritance Exception class
|
||||
inheritance chain is broken
|
||||
"""
|
||||
|
||||
|
||||
class ReadOnlyField(DABModelException):
|
||||
"""ReadOnlyField Exception class
|
||||
The used Field is ReadOnly
|
||||
"""
|
||||
|
||||
|
||||
class NewFieldForbidden(DABModelException):
|
||||
"""NewFieldForbidden Exception class
|
||||
Field creation is forbidden
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFieldAnnotation(DABModelException):
|
||||
"""InvalidFieldAnnotation Exception class
|
||||
The field annotation is invalid
|
||||
"""
|
||||
|
||||
|
||||
class InvalidInitializerType(DABModelException):
|
||||
"""InvalidInitializerType Exception class
|
||||
The initializer is not a valid type
|
||||
"""
|
||||
|
||||
|
||||
class NotAnnotatedField(InvalidFieldAnnotation):
|
||||
"""NotAnnotatedField Exception class
|
||||
The Field is not Annotated
|
||||
"""
|
||||
|
||||
|
||||
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
|
||||
"""IncompletelyAnnotatedField Exception class
|
||||
The field annotation is incomplete
|
||||
"""
|
||||
|
||||
|
||||
class ReadOnlyFieldAnnotation(DABModelException):
|
||||
"""ReadOnlyFieldAnnotation Exception class
|
||||
Field annotation connot be modified
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFieldValue(DABModelException):
|
||||
"""InvalidFieldValue Exception class
|
||||
The Field value is invalid
|
||||
"""
|
||||
|
||||
|
||||
class NonExistingField(DABModelException):
|
||||
"""NonExistingField Exception class
|
||||
The given Field is non existing
|
||||
"""
|
||||
|
||||
|
||||
class ImportForbidden(DABModelException):
|
||||
"""ImportForbidden Exception class
|
||||
Imports are forbidden
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFeatureInheritance(DABModelException):
|
||||
"""InvalidFeatureInheritance Exception class
|
||||
Features of same name in child appliance need to be from same type
|
||||
"""
|
||||
|
||||
|
||||
class FeatureNotBound(DABModelException):
|
||||
"""FeatureNotBound Exception class
|
||||
a Feature must be bound to the appliance (or parent)
|
||||
"""
|
||||
12
src/dabmodel/feature.py
Normal file
12
src/dabmodel/feature.py
Normal file
@@ -0,0 +1,12 @@
|
||||
from .element import Element
|
||||
from .meta.feature import _MetaFeature
|
||||
|
||||
|
||||
class Feature(Element, metaclass=_MetaFeature):
|
||||
"""Feature class
|
||||
Base class for Appliance's Features.
|
||||
Features are optional traits of an appliance.
|
||||
"""
|
||||
|
||||
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
|
||||
Enabled: bool = False
|
||||
235
src/dabmodel/meta/appliance.py
Normal file
235
src/dabmodel/meta/appliance.py
Normal file
@@ -0,0 +1,235 @@
|
||||
from typing import Any, Type
|
||||
from copy import copy
|
||||
|
||||
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
|
||||
|
||||
from ..tools import LAMdeepfreeze
|
||||
from ..LAMFields.LAMField import LAMField
|
||||
from ..LAMFields.FrozenLAMField import FrozenLAMField
|
||||
from .base import _MetaElement
|
||||
from ..feature import Feature
|
||||
from ..exception import (
|
||||
InvalidFieldValue,
|
||||
InvalidFeatureInheritance,
|
||||
FeatureNotBound,
|
||||
)
|
||||
|
||||
|
||||
class _MetaAppliance(_MetaElement):
|
||||
"""_MetaAppliance class
|
||||
Appliance specific metaclass code
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def check_class(
|
||||
mcs: type["meta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
) -> None:
|
||||
"""
|
||||
Appliance-specific pre-check: ensure the `features` slot exists in schema.
|
||||
|
||||
Copies the parent's `features` mapping when inheriting to keep it per-class.
|
||||
"""
|
||||
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
|
||||
if "features" not in namespace["__DABSchema__"]:
|
||||
namespace["__DABSchema__"]["features"] = {}
|
||||
else:
|
||||
namespace["__DABSchema__"]["features"] = copy(
|
||||
namespace["__DABSchema__"]["features"]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def process_class_fields(
|
||||
mcs: type["meta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Like meta.process_class_fields but also stages Feature declarations.
|
||||
|
||||
Initializes:
|
||||
extensions["new_features"], extensions["modified_features"]
|
||||
then defers to the base scanner for regular fields.
|
||||
"""
|
||||
extensions["new_features"] = {}
|
||||
extensions["modified_features"] = {}
|
||||
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
|
||||
|
||||
@classmethod
|
||||
def process_new_field(
|
||||
mcs: type["meta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""
|
||||
Intercept Feature declarations.
|
||||
|
||||
- If `_fname` already exists in parent's `features`, enforce same type;
|
||||
stage into `modified_features`.
|
||||
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
|
||||
- Otherwise, it is a regular field: delegate to meta.process_new_field.
|
||||
"""
|
||||
if _fname in namespace["__DABSchema__"]["features"].keys():
|
||||
if not issubclass(_fvalue, namespace["__DABSchema__"]["features"][_fname]):
|
||||
raise InvalidFeatureInheritance(
|
||||
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
|
||||
)
|
||||
extensions["modified_features"][_fname] = _fvalue
|
||||
elif isinstance(_fvalue, type) and issubclass(_fvalue, Feature):
|
||||
extensions["new_features"][_fname] = _fvalue
|
||||
else:
|
||||
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
|
||||
|
||||
@classmethod
|
||||
def commit_fields(
|
||||
mcs: type["meta"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Commit regular fields (via meta) and then bind staged Feature classes.
|
||||
|
||||
For each new/modified feature:
|
||||
- bind it to `cls` (sets the feature's `_BoundAppliance`),
|
||||
- register it under `cls.__DABSchema__["features"]`.
|
||||
"""
|
||||
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
|
||||
|
||||
for _ftname, _ftvalue in extensions["modified_features"].items():
|
||||
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
|
||||
cls.__DABSchema__["features"][_ftname] = _ftvalue
|
||||
for _ftname, _ftvalue in extensions["new_features"].items():
|
||||
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
|
||||
cls.__DABSchema__["features"][_ftname] = _ftvalue
|
||||
|
||||
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
|
||||
"""
|
||||
Instantiate and attach all features declared (or overridden) in the instance schema.
|
||||
Handles:
|
||||
- Declared features (plain class)
|
||||
- Subclass replacements
|
||||
- Dict overrides (class + patch dict)
|
||||
"""
|
||||
for fname, fdef in obj.__DABSchema__.get("features", {}).items():
|
||||
# Case 1: plain class or subclass
|
||||
if isinstance(fdef, type) and issubclass(fdef, Feature):
|
||||
inst = fdef()
|
||||
object.__setattr__(obj, fname, inst)
|
||||
|
||||
# Case 2: (class, dict) → dict overrides
|
||||
elif isinstance(fdef, tuple) and len(fdef) == 2:
|
||||
feat_cls, overrides = fdef
|
||||
inst = feat_cls()
|
||||
for field_name, new_val in overrides.items():
|
||||
if field_name not in feat_cls.__DABSchema__:
|
||||
raise InvalidFieldValue(
|
||||
f"Feature '{fname}' has no field '{field_name}'"
|
||||
)
|
||||
field = feat_cls.__DABSchema__[field_name]
|
||||
try:
|
||||
check_type(
|
||||
new_val,
|
||||
field.annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid value for {fname}.{field_name}: "
|
||||
f"expected {field.annotations}, got {new_val!r}"
|
||||
) from exp
|
||||
object.__setattr__(inst, field_name, LAMdeepfreeze(new_val))
|
||||
inst.__DABSchema__[field_name] = FrozenLAMField(
|
||||
LAMField(field_name, new_val, field.annotations, field._info)
|
||||
)
|
||||
object.__setattr__(obj, fname, inst)
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid feature definition stored for '{fname}': {fdef!r}"
|
||||
)
|
||||
|
||||
def apply_overrides(cls, obj, extensions, *args, **kwargs):
|
||||
"""
|
||||
Support for runtime field and feature overrides.
|
||||
|
||||
Fields:
|
||||
MyApp(name="foo")
|
||||
|
||||
Features:
|
||||
MyApp(F1=MyF1) # inheritance / replacement
|
||||
MyApp(F1={"val": 42, ...}) # dict override of existing feature
|
||||
"""
|
||||
# --- field overrides (unchanged) ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__: # regular field
|
||||
field = cls.__DABSchema__[k]
|
||||
try:
|
||||
check_type(
|
||||
v,
|
||||
field.annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid value for field '{k}': expected {field.annotations}, got {v!r}"
|
||||
) from exp
|
||||
|
||||
object.__setattr__(obj, k, LAMdeepfreeze(v))
|
||||
obj.__DABSchema__[k] = FrozenLAMField(
|
||||
LAMField(k, v, field.annotations, field._info)
|
||||
)
|
||||
kwargs.pop(k)
|
||||
|
||||
# --- feature overrides ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__.get("features", {}):
|
||||
base_feat_cls = cls.__DABSchema__["features"][k]
|
||||
|
||||
# Case 1: subclass replacement (inheritance)
|
||||
if isinstance(v, type) and issubclass(v, base_feat_cls):
|
||||
bound = getattr(v, "_BoundAppliance", None)
|
||||
if bound is None or not issubclass(cls, bound):
|
||||
raise FeatureNotBound(
|
||||
f"Feature {v.__name__} is not bound to {cls.__name__}"
|
||||
)
|
||||
# record subclass into instance schema
|
||||
obj.__DABSchema__["features"][k] = v
|
||||
kwargs.pop(k)
|
||||
|
||||
# Case 2: dict override
|
||||
elif isinstance(v, dict):
|
||||
# store (class, override_dict) for finalize_instance
|
||||
obj.__DABSchema__["features"][k] = (base_feat_cls, v)
|
||||
kwargs.pop(k)
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(
|
||||
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
|
||||
)
|
||||
|
||||
# --- new features not declared at class level ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if isinstance(v, type) and issubclass(v, Feature):
|
||||
bound = getattr(v, "_BoundAppliance", None)
|
||||
if bound is None or not issubclass(cls, bound):
|
||||
raise FeatureNotBound(
|
||||
f"Feature {v.__name__} is not bound to {cls.__name__}"
|
||||
)
|
||||
obj.__DABSchema__["features"][k] = v
|
||||
kwargs.pop(k)
|
||||
|
||||
if kwargs:
|
||||
unknown = ", ".join(sorted(kwargs.keys()))
|
||||
raise InvalidFieldValue(f"Unknown parameters: {unknown}")
|
||||
749
src/dabmodel/meta/base.py
Normal file
749
src/dabmodel/meta/base.py
Normal file
@@ -0,0 +1,749 @@
|
||||
from typing import (
|
||||
Optional,
|
||||
TypeVar,
|
||||
Union,
|
||||
get_origin,
|
||||
get_args,
|
||||
List,
|
||||
Dict,
|
||||
Any,
|
||||
Tuple,
|
||||
Set,
|
||||
Annotated,
|
||||
FrozenSet,
|
||||
Callable,
|
||||
Type,
|
||||
)
|
||||
|
||||
from types import UnionType, FunctionType, SimpleNamespace
|
||||
from copy import deepcopy, copy
|
||||
|
||||
import math
|
||||
import inspect, ast, textwrap
|
||||
|
||||
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
||||
|
||||
from ..LAMFields.LAMField import LAMField
|
||||
from ..LAMFields.LAMFieldInfo import LAMFieldInfo
|
||||
from ..LAMFields.FrozenLAMField import FrozenLAMField
|
||||
|
||||
from ..exception import (
|
||||
MultipleInheritanceForbidden,
|
||||
BrokenInheritance,
|
||||
ReadOnlyField,
|
||||
NewFieldForbidden,
|
||||
NotAnnotatedField,
|
||||
ReadOnlyFieldAnnotation,
|
||||
InvalidFieldValue,
|
||||
InvalidFieldAnnotation,
|
||||
IncompletelyAnnotatedField,
|
||||
ImportForbidden,
|
||||
FunctionForbidden,
|
||||
NonExistingField,
|
||||
InvalidInitializerType,
|
||||
)
|
||||
|
||||
ALLOWED_ANNOTATIONS: dict[str, Any] = {
|
||||
"Union": Union,
|
||||
"Optional": Optional,
|
||||
"List": List,
|
||||
"Dict": Dict,
|
||||
"Tuple": Tuple,
|
||||
"Set": Set,
|
||||
"FrozenSet": FrozenSet,
|
||||
"Annotated": Annotated,
|
||||
# builtins:
|
||||
"int": int,
|
||||
"str": str,
|
||||
"float": float,
|
||||
"bool": bool,
|
||||
"complex": complex,
|
||||
"bytes": bytes,
|
||||
"None": type(None),
|
||||
"list": list,
|
||||
"dict": dict,
|
||||
"set": set,
|
||||
"frozenset": frozenset,
|
||||
"tuple": tuple,
|
||||
}
|
||||
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
|
||||
str,
|
||||
int,
|
||||
float,
|
||||
complex,
|
||||
bool,
|
||||
bytes,
|
||||
)
|
||||
|
||||
|
||||
ALLOWED_HELPERS_MATH = SimpleNamespace(
|
||||
sqrt=math.sqrt,
|
||||
floor=math.floor,
|
||||
ceil=math.ceil,
|
||||
trunc=math.trunc,
|
||||
fabs=math.fabs,
|
||||
copysign=math.copysign,
|
||||
hypot=math.hypot,
|
||||
exp=math.exp,
|
||||
log=math.log,
|
||||
log10=math.log10,
|
||||
sin=math.sin,
|
||||
cos=math.cos,
|
||||
tan=math.tan,
|
||||
atan2=math.atan2,
|
||||
radians=math.radians,
|
||||
degrees=math.degrees,
|
||||
)
|
||||
|
||||
ALLOWED_HELPERS_DEFAULT: dict[str, object] = {
|
||||
"math": ALLOWED_HELPERS_MATH,
|
||||
"print": print,
|
||||
# Numbers & reducers (pure, deterministic)
|
||||
"abs": abs,
|
||||
"round": round,
|
||||
"min": min,
|
||||
"max": max,
|
||||
"sum": sum,
|
||||
# Introspection-free basics
|
||||
"len": len,
|
||||
"sorted": sorted,
|
||||
# Basic constructors (for copy-on-write patterns)
|
||||
"tuple": tuple,
|
||||
"list": list,
|
||||
"dict": dict,
|
||||
"set": set,
|
||||
# Simple casts if they need to normalize types
|
||||
"int": int,
|
||||
"float": float,
|
||||
"str": str,
|
||||
"bool": bool,
|
||||
"bytes": bytes,
|
||||
"complex": complex,
|
||||
# Easy iteration helpers (optional but handy)
|
||||
"range": range,
|
||||
}
|
||||
|
||||
|
||||
def _check_initializer_safety(func) -> None:
|
||||
"""
|
||||
Preliminary structural check for __initializer__.
|
||||
|
||||
Policy (minimal):
|
||||
- Forbid 'import' / 'from ... import ...' inside the initializer body.
|
||||
- Forbid nested function definitions (closures/helpers) in the body.
|
||||
- Allow lambdas.
|
||||
- No restrictions on calls here (keep it simple).
|
||||
- Optionally forbid closures (free vars) for determinism.
|
||||
"""
|
||||
try:
|
||||
src = inspect.getsource(func)
|
||||
except OSError as exc:
|
||||
# If source isn't available (rare), fail closed (or skip if you prefer)
|
||||
raise FunctionForbidden("Cannot inspect __initializer__ source") from exc
|
||||
|
||||
src = textwrap.dedent(src)
|
||||
mod = ast.parse(src)
|
||||
|
||||
# Find the FunctionDef node that corresponds to this initializer
|
||||
init_node = None
|
||||
for n in mod.body:
|
||||
if (
|
||||
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
|
||||
and n.name == func.__name__
|
||||
):
|
||||
init_node = n
|
||||
break
|
||||
if init_node is None:
|
||||
# Fallback: if not found, analyze nothing further to avoid false positives
|
||||
return
|
||||
|
||||
# Walk ONLY the body of the initializer (don't flag the def itself)
|
||||
body_tree = ast.Module(body=init_node.body, type_ignores=[])
|
||||
|
||||
for node in ast.walk(body_tree):
|
||||
# Forbid imports
|
||||
if isinstance(node, (ast.Import, ast.ImportFrom)):
|
||||
raise ImportForbidden("imports disabled in __initializer")
|
||||
|
||||
# Forbid nested defs (but allow lambdas)
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
raise FunctionForbidden(
|
||||
"Nested function definitions are forbidden in __initializer"
|
||||
)
|
||||
|
||||
if isinstance(node, ast.Lambda): # Forbid lambda
|
||||
raise FunctionForbidden("Lambdas are forbidden in __initializer")
|
||||
|
||||
# Optional: forbid closures (keeps determinism; allows lambdas that don't capture)
|
||||
if func.__code__.co_freevars:
|
||||
raise FunctionForbidden("Closures are forbidden in __initializer__")
|
||||
|
||||
|
||||
def _blocked_import(*args, **kwargs):
|
||||
raise ImportForbidden("imports disabled in __initializer")
|
||||
|
||||
|
||||
def _resolve_annotation(ann):
|
||||
if isinstance(ann, str):
|
||||
# Safe eval against a **whitelist** only
|
||||
return eval( # pylint: disable=eval-used
|
||||
ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS
|
||||
)
|
||||
return ann
|
||||
|
||||
|
||||
def _peel_annotated(t: Any) -> Any:
|
||||
# If you ever allow Annotated[T, ...], peel to T
|
||||
while True:
|
||||
origin = get_origin(t)
|
||||
if origin is None:
|
||||
return t
|
||||
name = (
|
||||
getattr(origin, "__name__", "")
|
||||
or getattr(origin, "__qualname__", "")
|
||||
or str(origin)
|
||||
)
|
||||
if "Annotated" in name:
|
||||
args = get_args(t)
|
||||
t = args[0] if args else t
|
||||
else:
|
||||
return t
|
||||
|
||||
|
||||
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
|
||||
_type,
|
||||
) -> bool:
|
||||
_type = _peel_annotated(_type)
|
||||
# handle Optional[] and Union[None,...]
|
||||
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(
|
||||
None
|
||||
) in get_args(_type):
|
||||
return all(
|
||||
_check_annotation_definition(_)
|
||||
for _ in get_args(_type)
|
||||
if _ is not type(None)
|
||||
)
|
||||
|
||||
# handle other Union[...]
|
||||
if get_origin(_type) is Union or get_origin(_type) is UnionType:
|
||||
return all(_check_annotation_definition(_) for _ in get_args(_type))
|
||||
|
||||
# handle Dict[...]
|
||||
if get_origin(_type) is dict:
|
||||
inner = get_args(_type)
|
||||
if len(inner) != 2:
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Dict Annotation requires 2 inner definitions: {_type}"
|
||||
)
|
||||
return _peel_annotated(
|
||||
inner[0]
|
||||
) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
|
||||
|
||||
# handle Tuple[]
|
||||
if get_origin(_type) in [tuple]:
|
||||
inner_types = get_args(_type)
|
||||
if len(inner_types) == 0:
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Annotation requires inner definition: {_type}"
|
||||
)
|
||||
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
|
||||
return _check_annotation_definition(inner_types[0])
|
||||
return all(_check_annotation_definition(_) for _ in inner_types)
|
||||
|
||||
# handle Set[],Tuple[],FrozenSet[],List[]
|
||||
if get_origin(_type) in [set, frozenset, tuple, list]:
|
||||
inner_types = get_args(_type)
|
||||
if len(inner_types) == 0:
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Annotation requires inner definition: {_type}"
|
||||
)
|
||||
return all(_check_annotation_definition(_) for _ in inner_types)
|
||||
|
||||
if _type in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class ModelSpecView:
|
||||
"""ModelSpecView class
|
||||
A class that will act as fake BaseElement proxy to allow setting values"""
|
||||
|
||||
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
|
||||
|
||||
def __init__(
|
||||
self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str
|
||||
):
|
||||
self._name: str
|
||||
self._vals: dict[str, Any]
|
||||
self._types: dict[str, type]
|
||||
self._touched: set
|
||||
self._module: str
|
||||
object.__setattr__(self, "_vals", dict(values))
|
||||
object.__setattr__(self, "_types", types_map)
|
||||
object.__setattr__(self, "_name", name)
|
||||
object.__setattr__(self, "_module", module)
|
||||
|
||||
@property
|
||||
def __name__(self) -> str:
|
||||
"""returns proxified class' name"""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def __module__(self) -> str:
|
||||
"""returns proxified module's name"""
|
||||
return self._module
|
||||
|
||||
@__module__.setter
|
||||
def __module__(self, value: str):
|
||||
pass
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
"""internal proxy getattr"""
|
||||
if name not in self._types:
|
||||
raise AttributeError(f"Unknown field {name}")
|
||||
return self._vals[name]
|
||||
|
||||
def __setattr__(self, name: str, value: Any):
|
||||
"""internal proxy setattr"""
|
||||
if name not in self._types:
|
||||
raise NonExistingField(f"Cannot set unknown field {name}")
|
||||
T = self._types[name]
|
||||
|
||||
try:
|
||||
check_type(
|
||||
value,
|
||||
T,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Field <{name}> value is not of expected type {T}."
|
||||
) from exp
|
||||
|
||||
self._vals[name] = value
|
||||
|
||||
def export(self) -> dict:
|
||||
"""exports all proxified values"""
|
||||
return dict(self._vals)
|
||||
|
||||
|
||||
T_Meta = TypeVar("T_Meta", bound="_MetaElement")
|
||||
T_BE = TypeVar("T_BE", bound="BaseElement")
|
||||
|
||||
|
||||
class _MetaElement(type):
|
||||
"""metaclass to use to build BaseElement"""
|
||||
|
||||
modified_fields: Dict[str, Any] = {}
|
||||
new_fields: Dict[str, LAMField[Any]] = {}
|
||||
initializer: Optional[Callable[..., Any]] = None
|
||||
__DABSchema__: dict[str, Any] = {}
|
||||
|
||||
@classmethod
|
||||
def check_class(
|
||||
mcs: type["_MetaElement"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
) -> None:
|
||||
"""
|
||||
Early class-build hook.
|
||||
|
||||
Validates the inheritance shape, initializes an empty schema for root classes,
|
||||
copies the parent schema for subclasses, and ensures all annotated fields
|
||||
have a default (inserting `None` when missing).
|
||||
|
||||
This runs before the class object is created.
|
||||
"""
|
||||
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
|
||||
|
||||
if len(bases) > 1:
|
||||
raise MultipleInheritanceForbidden(
|
||||
"Multiple inheritance is not supported by dabmodel"
|
||||
)
|
||||
if len(bases) == 0: # base class (BaseElement)
|
||||
namespace["__DABSchema__"] = {}
|
||||
else: # standard inheritance
|
||||
# check class tree origin
|
||||
if "__DABSchema__" not in dir(bases[0]):
|
||||
raise BrokenInheritance(
|
||||
"__DABSchema__ not found in base class, broken inheritance chain."
|
||||
)
|
||||
# copy inherited schema
|
||||
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
|
||||
|
||||
# force field without default value to be instantiated (with None)
|
||||
if "__annotations__" in namespace:
|
||||
for _funknown in [
|
||||
_ for _ in namespace["__annotations__"] if _ not in namespace.keys()
|
||||
]:
|
||||
namespace[_funknown] = None
|
||||
|
||||
@classmethod
|
||||
def process_class_fields( # pylint: disable=too-complex,too-many-branches
|
||||
mcs: type["_MetaElement"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Scan the class namespace and partition fields.
|
||||
|
||||
Detects:
|
||||
- modified fields (shadowing parent values),
|
||||
- new fields (present in annotations),
|
||||
- the optional `__initializer` classmethod (in mangled or unmangled form).
|
||||
|
||||
Validates annotations and types and removes processed items from `namespace`
|
||||
so they won't become normal attributes. Results are staged into:
|
||||
mcs.new_fields, mcs.modified_fields, mcs.initializer
|
||||
to be committed later.
|
||||
"""
|
||||
# iterating new and modified fields
|
||||
mcs.modified_fields = {}
|
||||
mcs.new_fields = {}
|
||||
mcs.initializer = None
|
||||
initializer_name: Optional[str] = None
|
||||
for _fname, _fvalue in namespace.items():
|
||||
if _fname == f"_{name}__initializer" or (
|
||||
name.startswith("_") and _fname == "__initializer"
|
||||
):
|
||||
if not isinstance(_fvalue, classmethod):
|
||||
raise InvalidInitializerType()
|
||||
mcs.initializer = _fvalue.__func__
|
||||
if name.startswith("_"):
|
||||
initializer_name = "__initializer"
|
||||
else:
|
||||
initializer_name = f"_{name}__initializer"
|
||||
elif _fname.startswith("_"):
|
||||
pass
|
||||
elif isinstance(_fvalue, classmethod):
|
||||
pass
|
||||
else:
|
||||
print(f"Parsing Field: {_fname} / {_fvalue}")
|
||||
if (
|
||||
len(bases) == 1 and _fname in namespace["__DABSchema__"].keys()
|
||||
): # Modified fields
|
||||
mcs.process_modified_field(
|
||||
name, bases, namespace, _fname, _fvalue, extensions
|
||||
)
|
||||
else: # New fieds
|
||||
mcs.process_new_field(
|
||||
name, bases, namespace, _fname, _fvalue, extensions
|
||||
)
|
||||
# removing modified fields from class (will add them back later)
|
||||
for _fname in mcs.new_fields:
|
||||
del namespace[_fname]
|
||||
for _fname in mcs.modified_fields:
|
||||
del namespace[_fname]
|
||||
if mcs.initializer is not None and initializer_name is not None:
|
||||
del namespace[initializer_name]
|
||||
|
||||
@classmethod
|
||||
def process_modified_field(
|
||||
mcs: type["_MetaElement"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""
|
||||
Handle a *modified* field declared by a subclass.
|
||||
|
||||
Forbids annotation changes, validates the new default value against
|
||||
the inherited annotation, and stages the new default into `mcs.modified_fields`.
|
||||
"""
|
||||
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
|
||||
raise ReadOnlyFieldAnnotation(
|
||||
f"annotations cannot be modified on derived classes {_fname}"
|
||||
)
|
||||
try:
|
||||
check_type(
|
||||
_fvalue,
|
||||
namespace["__DABSchema__"][_fname].annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
|
||||
) from exp
|
||||
mcs.modified_fields[_fname] = _fvalue
|
||||
|
||||
@classmethod
|
||||
def process_new_field(
|
||||
mcs: type["_MetaElement"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""
|
||||
Handle a *new* field declared on the class.
|
||||
|
||||
Resolves string annotations against a whitelist, validates `Annotated[...]`
|
||||
payloads (allowing only LAMFieldInfo), checks the default value type,
|
||||
and stages the field as a `LAMField` in `mcs.new_fields`.
|
||||
"""
|
||||
# print(f"New field: {_fname}")
|
||||
|
||||
# check if field is annotated
|
||||
if (
|
||||
"__annotations__" not in namespace
|
||||
or _fname not in namespace["__annotations__"]
|
||||
):
|
||||
raise NotAnnotatedField(
|
||||
f"Every dabmodel Fields must be annotated ({_fname})"
|
||||
)
|
||||
|
||||
# check if annotation is allowed
|
||||
if isinstance(namespace["__annotations__"][_fname], str):
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(
|
||||
namespace["__annotations__"][_fname]
|
||||
)
|
||||
|
||||
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
|
||||
raise InvalidFieldAnnotation(
|
||||
f"Field <{_fname}> has not an allowed or valid annotation."
|
||||
)
|
||||
|
||||
_finfo: LAMFieldInfo = LAMFieldInfo()
|
||||
origin = get_origin(namespace["__annotations__"][_fname])
|
||||
tname = (
|
||||
getattr(origin, "__name__", "")
|
||||
or getattr(origin, "__qualname__", "")
|
||||
or str(origin)
|
||||
)
|
||||
if "Annotated" in tname:
|
||||
args = get_args(namespace["__annotations__"][_fname])
|
||||
if args:
|
||||
if len(args) > 2:
|
||||
raise InvalidFieldAnnotation(
|
||||
f"Field <{_fname}> had invalid Annotated value."
|
||||
)
|
||||
if len(args) == 2 and not isinstance(args[1], LAMFieldInfo):
|
||||
raise InvalidFieldAnnotation(
|
||||
"Only LAMFieldInfo object is allowed as Annotated data."
|
||||
)
|
||||
|
||||
_finfo = args[1]
|
||||
|
||||
# check if value is valid
|
||||
try:
|
||||
check_type(
|
||||
_fvalue,
|
||||
namespace["__annotations__"][_fname],
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
||||
) from exp
|
||||
mcs.new_fields[_fname] = LAMField(
|
||||
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def apply_initializer(
|
||||
mcs: type["_MetaElement"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Apply the optional `__initializer` classmethod to compute derived defaults.
|
||||
|
||||
The initializer runs in a restricted, import-blocked environment using a
|
||||
`ModelSpecView` proxy that enforces type checking on assignments.
|
||||
On success, the computed values are validated and written back into the
|
||||
class schema's DABFields.
|
||||
"""
|
||||
if mcs.initializer is not None:
|
||||
_check_initializer_safety(mcs.initializer)
|
||||
init_fieldvalues = {}
|
||||
init_fieldtypes = {}
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
if isinstance(_fvalue, LAMField):
|
||||
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
|
||||
init_fieldtypes[_fname] = _fvalue.annotations
|
||||
fakecls = ModelSpecView(
|
||||
init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__
|
||||
)
|
||||
safe_globals = {
|
||||
"__builtins__": {"__import__": _blocked_import},
|
||||
**ALLOWED_HELPERS_DEFAULT,
|
||||
}
|
||||
if mcs.initializer.__code__.co_freevars:
|
||||
raise FunctionForbidden("__initializer must not use closures")
|
||||
safe_initializer = FunctionType(
|
||||
mcs.initializer.__code__,
|
||||
safe_globals,
|
||||
name=mcs.initializer.__name__,
|
||||
argdefs=mcs.initializer.__defaults__,
|
||||
closure=None,
|
||||
)
|
||||
safe_initializer(fakecls) # pylint: disable=not-callable
|
||||
for _fname, _fvalue in fakecls.export().items():
|
||||
try:
|
||||
check_type(
|
||||
_fvalue,
|
||||
cls.__DABSchema__[_fname].annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
||||
) from exp
|
||||
cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
def __new__(
|
||||
mcs: type["_MetaElement"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
) -> Type:
|
||||
"""BaseElement new class"""
|
||||
extensions: dict[str, Any] = {}
|
||||
mcs.check_class(name, bases, namespace, extensions)
|
||||
mcs.process_class_fields(name, bases, namespace, extensions)
|
||||
|
||||
_cls = super().__new__(mcs, name, bases, namespace)
|
||||
|
||||
mcs.commit_fields(_cls, name, bases, namespace, extensions)
|
||||
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
|
||||
_cls.install_instance_guard(extensions)
|
||||
|
||||
return _cls
|
||||
|
||||
@classmethod
|
||||
def commit_fields(
|
||||
mcs: type["_MetaElement"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Commit staged fields into the class schema (`__DABSchema__`).
|
||||
|
||||
- For modified fields: copy the parent's LAMField, update its value.
|
||||
- For new fields: set the freshly built LAMField and record its source.
|
||||
"""
|
||||
for _fname, _fvalue in mcs.modified_fields.items():
|
||||
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
||||
cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
for _fname, _fvalue in mcs.new_fields.items():
|
||||
_fvalue.add_source(cls)
|
||||
cls.__DABSchema__[_fname] = _fvalue
|
||||
|
||||
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseElement new instance"""
|
||||
obj = super().__call__(*args)
|
||||
|
||||
extensions: dict[str, Any] = {}
|
||||
|
||||
cls.populate_instance( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.freeze_instance_schema( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.apply_overrides( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
|
||||
|
||||
return obj
|
||||
|
||||
def populate_instance(
|
||||
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
|
||||
):
|
||||
"""
|
||||
Populate the new instance with field values from the class schema.
|
||||
|
||||
Copies each LAMField.value to an instance attribute (deep-frozen view).
|
||||
"""
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
if isinstance(_fvalue, LAMField):
|
||||
object.__setattr__(obj, _fname, _fvalue.value)
|
||||
|
||||
def freeze_instance_schema(
|
||||
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
|
||||
):
|
||||
"""
|
||||
Freeze the instance's schema by wrapping DABFields into FrozenLAMField.
|
||||
|
||||
Creates a per-instance `__DABSchema__` dict where each field is read-only.
|
||||
"""
|
||||
inst_schema = copy(obj.__DABSchema__)
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
if isinstance(_fvalue, LAMField):
|
||||
inst_schema[_fname] = FrozenLAMField(_fvalue)
|
||||
|
||||
if "features" in inst_schema:
|
||||
inst_schema["features"] = dict(inst_schema["features"])
|
||||
|
||||
object.__setattr__(obj, "__DABSchema__", inst_schema)
|
||||
|
||||
def apply_overrides(cls, obj, extensions, *args, **kwargs):
|
||||
"""
|
||||
Hook for runtime overrides at instance creation.
|
||||
|
||||
Invoked after the schema has been frozen but before finalize_instance.
|
||||
Subclasses of _MetaElement can override this to support things like:
|
||||
|
||||
- Field overrides: MyApp(field=value)
|
||||
- Feature overrides: MyApp(FeatureName=CustomFeature)
|
||||
- Feature attachments: MyApp(NewFeature=FeatureClass)
|
||||
|
||||
By default this does nothing.
|
||||
"""
|
||||
|
||||
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
|
||||
"""
|
||||
Finalization hook invoked at the end of instance construction.
|
||||
|
||||
Subclasses of the metaclass override this to attach runtime components
|
||||
to the instance. (Example: BaseMetaAppliance instantiates bound Features
|
||||
and sets them as attributes on the appliance instance.)
|
||||
"""
|
||||
|
||||
def install_instance_guard(cls: Type, extensions: dict[str, Any]):
|
||||
"""
|
||||
Install the runtime `__setattr__` guard on the class.
|
||||
|
||||
After instances are constructed, prevents:
|
||||
- creating new public fields,
|
||||
- reassigning existing fields post-initialization.
|
||||
|
||||
Private/dunder attributes are exempt to allow internal bookkeeping.
|
||||
"""
|
||||
orig_setattr = getattr(cls, "__setattr__")
|
||||
|
||||
# cls.orig_setattr = orig_setattr
|
||||
|
||||
def guarded_setattr(_self, key: str, value: Any):
|
||||
if key.startswith("_"): # allow private and dunder attrs
|
||||
return orig_setattr(_self, key, value)
|
||||
# block writes after init if key is readonly
|
||||
if key in _self.__DABSchema__.keys():
|
||||
if key in _self.__dict__:
|
||||
raise ReadOnlyField(f"{key} is read-only")
|
||||
# elif key in _self.__DABSchema__["features"].keys():
|
||||
# if key in _self.__dict__:
|
||||
# raise ReadOnlyField(f"{key} is read-only")
|
||||
else:
|
||||
raise NewFieldForbidden("creating new fields is not allowed")
|
||||
|
||||
return orig_setattr(_self, key, value)
|
||||
|
||||
setattr(cls, "__setattr__", guarded_setattr)
|
||||
19
src/dabmodel/meta/feature.py
Normal file
19
src/dabmodel/meta/feature.py
Normal file
@@ -0,0 +1,19 @@
|
||||
from typing import Type, Any
|
||||
from .base import _MetaElement
|
||||
from ..exception import FeatureNotBound
|
||||
|
||||
|
||||
class _MetaFeature(_MetaElement):
|
||||
"""_MetaFeature class
|
||||
Feature specific metaclass code
|
||||
"""
|
||||
|
||||
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseFeature new instance"""
|
||||
|
||||
if cls._BoundAppliance is None:
|
||||
raise FeatureNotBound()
|
||||
|
||||
obj = super().__call__(*args, **kw)
|
||||
|
||||
return obj
|
||||
@@ -1,383 +0,0 @@
|
||||
from __future__ import annotations
|
||||
from typing import Any, Dict, Optional, TypeVar, Generic, get_origin, get_args, Annotated as _Annotated, Union
|
||||
|
||||
from uuid import uuid4, UUID
|
||||
from datetime import datetime as _dt
|
||||
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
Field,
|
||||
ConfigDict,
|
||||
model_validator,
|
||||
AwareDatetime,
|
||||
UUID4,
|
||||
ByteSize,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
)
|
||||
from pydantic._internal._model_construction import ModelMetaclass
|
||||
import pytz
|
||||
|
||||
|
||||
# ===================== Maintainers’ policy =====================
|
||||
|
||||
ALLOWED_APPLIANCE_FIELD_TYPES: set[type] = {
|
||||
StrictInt, StrictStr, ByteSize, UUID4, UUID, AwareDatetime, _dt, bool, int, str,
|
||||
}
|
||||
ALLOWED_FEATURE_FIELD_TYPES: set[type] = {
|
||||
StrictInt, StrictStr, ByteSize, UUID4, UUID, AwareDatetime, _dt, bool, int, str,
|
||||
}
|
||||
|
||||
|
||||
def _is_allowed_type(annotation: Any, allowed: set[type]) -> bool:
|
||||
origin = get_origin(annotation)
|
||||
if origin is _Annotated or (origin is not None and getattr(origin, "__name__", "") == "Annotated"):
|
||||
return _is_allowed_type(get_args(annotation)[0], allowed)
|
||||
if origin is None:
|
||||
return annotation in allowed
|
||||
if origin is Union:
|
||||
args = [a for a in get_args(annotation) if a is not type(None)]
|
||||
return all(_is_allowed_type(a, allowed) for a in args)
|
||||
args = get_args(annotation)
|
||||
if origin in (list, tuple, dict, set):
|
||||
return all(_is_allowed_type(a, allowed) for a in args if a is not None)
|
||||
return all(_is_allowed_type(a, allowed) for a in args if a is not None)
|
||||
|
||||
|
||||
def _is_feature_annotation(annotation: Any) -> bool:
|
||||
origin = get_origin(annotation)
|
||||
if origin is Union:
|
||||
args = [a for a in get_args(annotation) if a is not type(None)]
|
||||
return any(_is_feature_annotation(a) for a in args)
|
||||
base = get_args(annotation)[0] if get_origin(annotation) is Optional else annotation
|
||||
try:
|
||||
return isinstance(base, type) and issubclass(base, BaseFeature) # type: ignore[name-defined]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _annotation_equal(a: Any, b: Any) -> bool:
|
||||
oa, ob = get_origin(a), get_origin(b)
|
||||
if oa is None and ob is None:
|
||||
return a is b
|
||||
if oa != ob:
|
||||
return False
|
||||
aa, ab = get_args(a), get_args(b)
|
||||
if len(aa) != len(ab):
|
||||
return False
|
||||
return all(_annotation_equal(x, y) for x, y in zip(aa, ab))
|
||||
|
||||
|
||||
# ===================== Defaults mini-DSL (values only) =====================
|
||||
|
||||
class _DefaultsSpec:
|
||||
__slots__ = ("mapping",)
|
||||
def __init__(self, mapping: Dict[str, Any]): self.mapping = mapping
|
||||
|
||||
|
||||
def defaults(**kwargs) -> _DefaultsSpec:
|
||||
"""
|
||||
Declare class defaults in a flat mapping (values only).
|
||||
Supports '.' or '__' for nesting:
|
||||
Defaults = defaults(
|
||||
template_short_name="apx",
|
||||
Network__enabled=True,
|
||||
Network__mtu=9000,
|
||||
)
|
||||
"""
|
||||
return _DefaultsSpec(kwargs)
|
||||
|
||||
|
||||
def _split_path(key: str) -> list[str]:
|
||||
return key.replace("__", ".").split(".")
|
||||
|
||||
|
||||
def _merge_path(target: Dict[str, Any], path: str, value: Any):
|
||||
parts = _split_path(path)
|
||||
node = target
|
||||
for i, p in enumerate(parts):
|
||||
if i == len(parts) - 1:
|
||||
node[p] = value
|
||||
else:
|
||||
node = node.setdefault(p, {}) # type: ignore[assignment]
|
||||
|
||||
|
||||
def _flatten_defaults_class(cls: type) -> Dict[str, Any]:
|
||||
out: Dict[str, Any] = {}
|
||||
for name, val in vars(cls).items():
|
||||
if name.startswith("__"):
|
||||
continue
|
||||
if isinstance(val, type):
|
||||
sub = _flatten_defaults_class(val)
|
||||
for k, v in sub.items():
|
||||
out[f"{name}.{k}"] = v
|
||||
else:
|
||||
out[name] = val
|
||||
return out
|
||||
|
||||
|
||||
def _deep_merge(dst: Dict[str, Any], src: Dict[str, Any]):
|
||||
for k, v in src.items():
|
||||
if isinstance(v, dict) and isinstance(dst.get(k), dict):
|
||||
_deep_merge(dst[k], v) # type: ignore[index]
|
||||
else:
|
||||
dst[k] = v
|
||||
|
||||
|
||||
# ===================== Core base classes =====================
|
||||
|
||||
T_Feature = TypeVar("T_Feature", bound="BaseFeature")
|
||||
|
||||
|
||||
class BaseElement(BaseModel):
|
||||
"""
|
||||
Base for Appliance/Feature:
|
||||
- Declarative Defaults (inner class or mapping), values only
|
||||
- Allowed-type enforcement at class creation (schema-time)
|
||||
- Frozen instances
|
||||
"""
|
||||
model_config = ConfigDict(
|
||||
frozen=True,
|
||||
extra="forbid",
|
||||
validate_assignment=True,
|
||||
protected_namespaces=(), # allow names like __defaults_map__
|
||||
)
|
||||
|
||||
__defaults_map__: Dict[str, Any] = {}
|
||||
|
||||
def __init_subclass__(cls, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
# 1) Collect Defaults (values) along MRO (base -> subclass)
|
||||
merged: Dict[str, Any] = {}
|
||||
for base in reversed(cls.mro()): # parent first, subclass last (subclass wins)
|
||||
spec = base.__dict__.get("Defaults")
|
||||
if spec is None:
|
||||
continue
|
||||
if isinstance(spec, _DefaultsSpec):
|
||||
flat = spec.mapping
|
||||
elif isinstance(spec, type):
|
||||
flat = _flatten_defaults_class(spec)
|
||||
else:
|
||||
continue
|
||||
for k, v in flat.items():
|
||||
_merge_path(merged, k, v)
|
||||
|
||||
# keep 'enabled' inside this map; we'll strip at validation time
|
||||
cls.__defaults_map__ = merged
|
||||
|
||||
# 2) Schema-time allowed types for non-feature fields
|
||||
is_appl = any(b.__name__ == "BaseAppliance" for b in cls.mro())
|
||||
is_feat = any(b.__name__ == "BaseFeature" for b in cls.mro())
|
||||
if is_appl or is_feat:
|
||||
allowed = ALLOWED_APPLIANCE_FIELD_TYPES if is_appl else ALLOWED_FEATURE_FIELD_TYPES
|
||||
for name, f in getattr(cls, "model_fields", {}).items():
|
||||
ann = f.annotation
|
||||
if ann is None:
|
||||
continue
|
||||
if _is_feature_annotation(ann):
|
||||
continue # feature fields checked in ApplianceMeta
|
||||
if not _is_allowed_type(ann, allowed):
|
||||
kind = "Appliance" if is_appl else "Feature"
|
||||
raise TypeError(f"{kind} field '{cls.__name__}.{name}' has disallowed type: {ann}")
|
||||
|
||||
# >>> THIS MUST BE INSIDE THE CLASS <<<
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def _apply_defaults_then_input(cls, values: Any):
|
||||
if not isinstance(values, dict):
|
||||
return values
|
||||
|
||||
# 1) seed from declared field defaults
|
||||
merged: Dict[str, Any] = {}
|
||||
for name, f in cls.model_fields.items():
|
||||
if f.default is not None:
|
||||
merged[name] = f.default
|
||||
elif getattr(f, "default_factory", None) is not None:
|
||||
merged[name] = f.default_factory() # type: ignore[misc]
|
||||
|
||||
# 2) merge class Defaults (values only), then 3) user values
|
||||
_deep_merge(merged, cls.__defaults_map__)
|
||||
_deep_merge(merged, values)
|
||||
|
||||
# 3) per-feature handling
|
||||
for name, f in cls.model_fields.items():
|
||||
ann = f.annotation
|
||||
base = get_args(ann)[0] if get_origin(ann) is Optional else ann
|
||||
try:
|
||||
if isinstance(base, type) and issubclass(base, BaseFeature):
|
||||
# user override flag?
|
||||
user_block = values.get(name) if isinstance(values, dict) else None
|
||||
user_enabled = None
|
||||
if isinstance(user_block, dict) and "enabled" in user_block:
|
||||
user_enabled = bool(user_block.get("enabled"))
|
||||
|
||||
# class default flag?
|
||||
class_enabled = None
|
||||
defaults_block = cls.__defaults_map__.get(name) if isinstance(cls.__defaults_map__, dict) else None
|
||||
if isinstance(defaults_block, dict) and "enabled" in defaults_block:
|
||||
class_enabled = bool(defaults_block.get("enabled"))
|
||||
|
||||
v = merged.get(name)
|
||||
|
||||
# *** fallback: read flag from merged payload if still present ***
|
||||
merged_enabled = None
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
merged_enabled = bool(v.get("enabled"))
|
||||
|
||||
# final precedence: user > class > merged
|
||||
final_enabled = (
|
||||
user_enabled
|
||||
if user_enabled is not None
|
||||
else (class_enabled if class_enabled is not None else merged_enabled)
|
||||
)
|
||||
|
||||
# strip any lingering 'enabled' key so the feature never sees it
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
v = dict(v)
|
||||
v.pop("enabled", None)
|
||||
merged[name] = v
|
||||
|
||||
if final_enabled is False:
|
||||
merged[name] = None
|
||||
continue
|
||||
|
||||
# Merge feature defaults to ensure required fields are present
|
||||
feat_defaults = getattr(base, "__defaults_map__", {}) or {}
|
||||
if final_enabled is True and v is None:
|
||||
merged[name] = dict(feat_defaults)
|
||||
elif isinstance(v, dict):
|
||||
cfg: Dict[str, Any] = {}
|
||||
_deep_merge(cfg, feat_defaults)
|
||||
_deep_merge(cfg, v)
|
||||
merged[name] = cfg
|
||||
# If final_enabled is None and v is None → leave as None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
class BaseFeature(BaseElement):
|
||||
template_id: UUID4
|
||||
template_short_name: StrictStr
|
||||
template_long_name: Optional[StrictStr] = None
|
||||
template_description: Optional[StrictStr] = None
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def _strip_enabled_flag(cls, v: Any):
|
||||
# Features never accept `enabled`; strip if present from Defaults or user input.
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
v = dict(v)
|
||||
v.pop("enabled", None)
|
||||
return v
|
||||
|
||||
|
||||
class ApplianceMeta(ModelMetaclass):
|
||||
def __new__(mcls, name, bases, namespace, **kwargs):
|
||||
# Skip special handling for the abstract BaseAppliance itself
|
||||
if name == 'BaseAppliance':
|
||||
return super().__new__(mcls, name, bases, namespace, **kwargs)
|
||||
|
||||
# 1) Auto-inject feature fields from THIS class's inner `Features`
|
||||
declared_here: Dict[str, type[BaseFeature]] = {}
|
||||
inner = namespace.get("Features")
|
||||
if isinstance(inner, type):
|
||||
for fname, fval in vars(inner).items():
|
||||
if fname.startswith("__"):
|
||||
continue
|
||||
if isinstance(fval, type) and issubclass(fval, BaseFeature):
|
||||
declared_here[fname] = fval
|
||||
namespace.setdefault("__annotations__", {})
|
||||
if fname not in namespace["__annotations__"]:
|
||||
namespace["__annotations__"][fname] = Optional[fval] # type: ignore[index]
|
||||
namespace[fname] = None # default None; Defaults can enable/configure
|
||||
|
||||
# 2) Create the class via Pydantic's metaclass
|
||||
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
|
||||
|
||||
# 3) Discover current feature fields (after Pydantic processed annotations)
|
||||
features_now: Dict[str, type[BaseFeature]] = {}
|
||||
for field_name, f in cls.model_fields.items():
|
||||
ann = f.annotation
|
||||
base = get_args(ann)[0] if get_origin(ann) is Optional else ann
|
||||
try:
|
||||
if issubclass(base, BaseFeature): # type: ignore[arg-type]
|
||||
features_now[field_name] = base # type: ignore[assignment]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 4) Enforce parent schema monotonicity (no removal/type changes), allow additions
|
||||
parent = next((b for b in cls.__mro__[1:] if isinstance(b, ApplianceMeta)), None)
|
||||
parent_fields = dict(getattr(parent, "model_fields", {})) if parent else {}
|
||||
parent_features = getattr(parent, "__feature_fields__", {}) if parent else {}
|
||||
|
||||
# PARENT FIELDS: present with identical annotation + same constraints (skip feature fields)
|
||||
for pname, pfield in parent_fields.items():
|
||||
if pname in parent_features:
|
||||
continue
|
||||
if pname not in cls.model_fields:
|
||||
raise TypeError(f"{cls.__name__}: removing parent field '{pname}' is forbidden.")
|
||||
child_field = cls.model_fields[pname]
|
||||
if (not _annotation_equal(child_field.annotation, pfield.annotation)) or (tuple(child_field.metadata) != tuple(pfield.metadata)):
|
||||
raise TypeError(f"{cls.__name__}: changing type/constraints of parent field '{pname}' is forbidden.")
|
||||
|
||||
# PARENT FEATURES: present with identical type
|
||||
for fname, ftype in parent_features.items():
|
||||
if fname not in features_now:
|
||||
raise TypeError(f"{cls.__name__}: removing parent feature '{fname}' is forbidden.")
|
||||
if features_now[fname] is not ftype:
|
||||
raise TypeError(f"{cls.__name__}: retargeting feature '{fname}' is forbidden.")
|
||||
|
||||
# NEW FEATURES must be declared in THIS class's inner Features
|
||||
new_feature_names = set(features_now) - set(parent_features)
|
||||
if new_feature_names:
|
||||
if not declared_here:
|
||||
raise TypeError(
|
||||
f"{cls.__name__}: new features detected {sorted(new_feature_names)} "
|
||||
f"but none declared in this class's inner `Features`."
|
||||
)
|
||||
undeclared = [n for n in new_feature_names if n not in declared_here]
|
||||
if undeclared:
|
||||
raise TypeError(
|
||||
f"{cls.__name__}: new features {sorted(undeclared)} must be declared in this class's `Features`."
|
||||
)
|
||||
|
||||
# 5) Publish the canonical sets
|
||||
if parent and getattr(parent, "__declared_features__", {}):
|
||||
merged_decl = dict(parent.__declared_features__)
|
||||
merged_decl.update(declared_here)
|
||||
cls.__declared_features__ = merged_decl
|
||||
else:
|
||||
cls.__declared_features__ = declared_here
|
||||
cls.__feature_fields__ = features_now
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
class BaseAppliance(BaseElement, Generic[T_Feature], metaclass=ApplianceMeta):
|
||||
"""
|
||||
Feature binding is implicit via inner `class Features:`.
|
||||
Subclasses may add fields and add features freely.
|
||||
They may NOT change types of parent fields/features or remove them.
|
||||
"""
|
||||
__declared_features__: Dict[str, type[BaseFeature]] = {}
|
||||
__feature_fields__: Dict[str, type[BaseFeature]] = {}
|
||||
|
||||
# core appliance schema
|
||||
template_id: UUID4
|
||||
template_short_name: StrictStr
|
||||
template_long_name: Optional[StrictStr] = None
|
||||
template_description: Optional[StrictStr] = None
|
||||
|
||||
cpu_cnt: StrictInt = Field(1, gt=0)
|
||||
ram_size: ByteSize = Field(256, gt=128)
|
||||
swap_size: ByteSize = Field(200, ge=0)
|
||||
rootfs_size: ByteSize = Field(2048, ge=2048)
|
||||
|
||||
dabinst_id: UUID4 = Field(default_factory=uuid4)
|
||||
dabinst_short_name: StrictStr
|
||||
dabinst_long_name: Optional[StrictStr] = ""
|
||||
dabinst_description: Optional[StrictStr] = ""
|
||||
dabinst_creationdate: Optional[AwareDatetime] = Field(default_factory=lambda: _dt.now(tz=pytz.utc))
|
||||
32
src/dabmodel/tools.py
Normal file
32
src/dabmodel/tools.py
Normal file
@@ -0,0 +1,32 @@
|
||||
"""library's internal tools"""
|
||||
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
from frozendict import deepfreeze
|
||||
|
||||
|
||||
class LAMJSONEncoder(json.JSONEncoder):
|
||||
"""allows to JSON encode non supported data type"""
|
||||
|
||||
def default(self, o):
|
||||
if isinstance(o, UUID):
|
||||
# if the o is uuid, we simply return the value of uuid
|
||||
return o.hex
|
||||
if isinstance(o, datetime):
|
||||
return str(o)
|
||||
return json.JSONEncoder.default(self, o)
|
||||
|
||||
|
||||
def LAMdeepfreeze(value):
|
||||
"""recursive freeze helper function"""
|
||||
if isinstance(value, dict):
|
||||
return deepfreeze(value)
|
||||
if isinstance(value, set):
|
||||
return frozenset(LAMdeepfreeze(v) for v in value)
|
||||
if isinstance(value, list):
|
||||
return tuple(LAMdeepfreeze(v) for v in value)
|
||||
if isinstance(value, tuple):
|
||||
return tuple(LAMdeepfreeze(v) for v in value)
|
||||
return value
|
||||
2638
test/test_model.py
2638
test/test_model.py
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user