Compare commits

...

27 Commits

Author SHA1 Message Date
chacha
9aec2d66cd reorganize and rename (partial) 2025-09-20 18:27:36 +02:00
chacha
af81ec5fd3 more test cases 2025-09-20 13:18:40 +02:00
chacha
26e32a004f increase coverage 2025-09-20 12:43:43 +02:00
chacha
b7cbc50f79 work 2025-09-20 11:38:05 +02:00
chacha
86eee2e378 continue features implementation + code lint + typing + etc 2025-09-18 23:21:42 +02:00
chacha
3e0defc574 work 2025-09-18 00:32:32 +02:00
chacha
f6e581381d cleaning 2025-09-17 00:16:30 +02:00
chacha
981c5201a9 partially fix features 2025-09-16 23:40:41 +02:00
chacha
ab11052c8f work 2025-09-09 00:13:06 +02:00
chacha
4f5dade949 first feature implementation 2025-09-08 01:23:46 +02:00
cclecle
cce260bc5e reordering 2025-09-07 18:42:38 +02:00
cclecle
915a4332ee tiny fix :) 2025-09-06 01:47:49 +02:00
cclecle
4dca3eb9d1 improve typing 2025-09-06 01:43:20 +02:00
cclecle
e11c541139 small opt 2025-09-06 01:35:28 +02:00
cclecle
637b50b325 quality & typing fixes 2025-09-06 01:31:55 +02:00
cclecle
f45c9cc8f3 fix unittest 2025-09-05 23:04:16 +02:00
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
cclecle
04a4cf7b36 add deps 2025-09-05 22:56:17 +02:00
cclecle
f42a839cff work 2025-09-05 22:53:47 +02:00
cclecle
7f3a4ef545 work 2025-09-03 23:15:04 +02:00
cclecle
608c8a1010 work 2025-09-03 01:14:09 +02:00
cclecle
210781f086 new again 2025-09-02 19:18:31 +02:00
cclecle
df966ccac4 Merge branch 'dev' of https://chacha.ddns.net/gitea/chacha/dabmodel.git
into dev
2025-09-01 00:07:22 +02:00
chacha
29827b51bc add chatgpt code :) 2025-08-31 22:19:58 +02:00
chacha
7440731135 dev 2025-08-31 21:23:03 +02:00
cclecle
87682c2c9c cleaning a little bit... 2025-01-08 18:00:33 +01:00
chacha
0eef35e36f continue work 2024-12-08 01:04:27 +01:00
19 changed files with 3924 additions and 518 deletions

View File

@@ -1,13 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python311</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>

View File

@@ -34,7 +34,9 @@ classifiers = [
]
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging'
'packaging',
'frozendict',
'typeguard'
]
dynamic = ["version"]
@@ -78,7 +80,7 @@ test = ["chacha_cicd_helper"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper","types-pytz"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]

View File

@@ -0,0 +1,17 @@
from typing import Generic, TypeVar
T_Field = TypeVar("T_Field")
class BaseConstraint(Generic[T_Field]):
"""BaseConstraint class
Base class for Field's constraints
"""
_bound_type: type
def __init__(self): ...
def check(self, value: T_Field) -> bool:
"""Check if a Constraint is completed"""
return True

View File

@@ -0,0 +1,41 @@
from typing import Generic, TypeVar, Any
from .LAMField import LAMField
from .Constraint import BaseConstraint
from ..tools import LAMdeepfreeze
T_Field = TypeVar("T_Field")
class FrozenLAMField(Generic[T_Field]):
"""FrozenLAMField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: LAMField):
self._inner_field = inner_field
@property
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return LAMdeepfreeze(self._inner_field.doc)
@property
def constraints(self) -> tuple[BaseConstraint]:
"""Returns Field's constraint (frozen)"""
return LAMdeepfreeze(self._inner_field.constraints)
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return LAMdeepfreeze(self._inner_field.annotations)

View File

@@ -0,0 +1,62 @@
from typing import Generic, TypeVar, Optional, Any
from .LAMFieldInfo import LAMFieldInfo
from .Constraint import BaseConstraint
from ..tools import LAMdeepfreeze
T_Field = TypeVar("T_Field")
class LAMField(Generic[T_Field]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: LAMFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[T_Field] = v
self._value: Optional[T_Field] = v
self._annotations: Any = a
self._info: LAMFieldInfo = i
self._constraints: list[BaseConstraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return LAMdeepfreeze(self._default_value)
def update_value(self, v: Optional[T_Field] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self) -> Any:
"""Returns Field's value (frozen)"""
return LAMdeepfreeze(self._value)
@property
def raw_value(self) -> Optional[T_Field]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations

View File

@@ -0,0 +1,26 @@
from typing import Optional, Any
from .Constraint import BaseConstraint
class LAMFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(
self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None
):
self._doc: str = doc
self._constraints: list[BaseConstraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self) -> list[BaseConstraint[Any]]:
"""Returns Field's constraints"""
return self._constraints

View File

@@ -11,4 +11,28 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .model import DABField, BaseFeature, BaseAppliance, default_values_override
from .LAMFields.LAMField import LAMField
from .LAMFields.LAMFieldInfo import LAMFieldInfo
from .LAMFields.FrozenLAMField import FrozenLAMField
from .appliance import BaseAppliance
from .feature import BaseFeature
from .exception import (
DABModelException,
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
InvalidFeatureInheritance,
FeatureNotBound,
)

View File

@@ -15,20 +15,26 @@ import warnings
try: # pragma: no cover
__version__ = version("dabmodel")
except PackageNotFoundError: # pragma: no cover
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
except PackageNotFoundError: # pragma: no cover
warnings.warn(
"can not read __version__, assuming local test context, setting it to ?.?.?"
)
__version__ = "?.?.?"
try: # pragma: no cover
dist = distribution("dabmodel")
__Summuary__ = dist.metadata["Summary"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>')
warnings.warn(
'can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>'
)
__Summuary__ = "dabmodel description"
try: # pragma: no cover
dist = distribution("dabmodel")
__Name__ = dist.metadata["Name"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>')
warnings.warn(
'can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>'
)
__Name__ = "dabmodel"

View File

@@ -0,0 +1,9 @@
from .element import BaseElement
from .meta.appliance import BaseMetaAppliance
class BaseAppliance(BaseElement, metaclass=BaseMetaAppliance):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""

7
src/dabmodel/element.py Normal file
View File

@@ -0,0 +1,7 @@
from .meta.base import BaseMeta
class BaseElement(metaclass=BaseMeta):
"""BaseElement class
Base class to apply metaclass and set common Fields.
"""

103
src/dabmodel/exception.py Normal file
View File

@@ -0,0 +1,103 @@
class DABModelException(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class FunctionForbidden(DABModelException): ...
class ExternalCodeForbidden(FunctionForbidden): ...
class ClosureForbidden(FunctionForbidden): ...
class ReservedFieldName(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class MultipleInheritanceForbidden(DABModelException):
"""MultipleInheritanceForbidden Exception class
Multiple inheritance is forbidden when using dabmodel
"""
class BrokenInheritance(DABModelException):
"""BrokenInheritance Exception class
inheritance chain is broken
"""
class ReadOnlyField(DABModelException):
"""ReadOnlyField Exception class
The used Field is ReadOnly
"""
class NewFieldForbidden(DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class InvalidFieldAnnotation(DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
"""
class InvalidInitializerType(DABModelException):
"""InvalidInitializerType Exception class
The initializer is not a valid type
"""
class NotAnnotatedField(InvalidFieldAnnotation):
"""NotAnnotatedField Exception class
The Field is not Annotated
"""
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
"""IncompletelyAnnotatedField Exception class
The field annotation is incomplete
"""
class ReadOnlyFieldAnnotation(DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class InvalidFieldValue(DABModelException):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
"""
class InvalidFeatureInheritance(DABModelException):
"""InvalidFeatureInheritance Exception class
Features of same name in child appliance need to be from same type
"""
class FeatureNotBound(DABModelException):
"""FeatureNotBound Exception class
a Feature must be bound to the appliance (or parent)
"""

12
src/dabmodel/feature.py Normal file
View File

@@ -0,0 +1,12 @@
from .element import BaseElement
from .meta.feature import BaseMetaFeature
class BaseFeature(BaseElement, metaclass=BaseMetaFeature):
"""BaseFeature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
Enabled: bool = False

View File

@@ -0,0 +1,235 @@
from typing import Any, Type
from copy import copy
from typeguard import check_type, CollectionCheckStrategy, TypeCheckError
from ..tools import LAMdeepfreeze
from ..LAMFields.LAMField import LAMField
from ..LAMFields.FrozenLAMField import FrozenLAMField
from .base import BaseMeta
from ..feature import BaseFeature
from ..exception import (
InvalidFieldValue,
InvalidFeatureInheritance,
FeatureNotBound,
)
class BaseMetaAppliance(BaseMeta):
"""BaseMetaAppliance class
Appliance specific metaclass code
"""
@classmethod
def check_class(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""
Appliance-specific pre-check: ensure the `features` slot exists in schema.
Copies the parent's `features` mapping when inheriting to keep it per-class.
"""
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
if "features" not in namespace["__DABSchema__"]:
namespace["__DABSchema__"]["features"] = {}
else:
namespace["__DABSchema__"]["features"] = copy(
namespace["__DABSchema__"]["features"]
)
@classmethod
def process_class_fields(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
"""
Like meta.process_class_fields but also stages Feature declarations.
Initializes:
extensions["new_features"], extensions["modified_features"]
then defers to the base scanner for regular fields.
"""
extensions["new_features"] = {}
extensions["modified_features"] = {}
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
@classmethod
def process_new_field(
mcs: type["meta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Intercept Feature declarations.
- If `_fname` already exists in parent's `features`, enforce same type;
stage into `modified_features`.
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
- Otherwise, it is a regular field: delegate to meta.process_new_field.
"""
if _fname in namespace["__DABSchema__"]["features"].keys():
if not issubclass(_fvalue, namespace["__DABSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
)
extensions["modified_features"][_fname] = _fvalue
elif isinstance(_fvalue, type) and issubclass(_fvalue, BaseFeature):
extensions["new_features"][_fname] = _fvalue
else:
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
@classmethod
def commit_fields(
mcs: type["meta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Commit regular fields (via meta) and then bind staged Feature classes.
For each new/modified feature:
- bind it to `cls` (sets the feature's `_BoundAppliance`),
- register it under `cls.__DABSchema__["features"]`.
"""
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
for _ftname, _ftvalue in extensions["modified_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__DABSchema__["features"][_ftname] = _ftvalue
for _ftname, _ftvalue in extensions["new_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__DABSchema__["features"][_ftname] = _ftvalue
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
"""
Instantiate and attach all features declared (or overridden) in the instance schema.
Handles:
- Declared features (plain class)
- Subclass replacements
- Dict overrides (class + patch dict)
"""
for fname, fdef in obj.__DABSchema__.get("features", {}).items():
# Case 1: plain class or subclass
if isinstance(fdef, type) and issubclass(fdef, BaseFeature):
inst = fdef()
object.__setattr__(obj, fname, inst)
# Case 2: (class, dict) → dict overrides
elif isinstance(fdef, tuple) and len(fdef) == 2:
feat_cls, overrides = fdef
inst = feat_cls()
for field_name, new_val in overrides.items():
if field_name not in feat_cls.__DABSchema__:
raise InvalidFieldValue(
f"Feature '{fname}' has no field '{field_name}'"
)
field = feat_cls.__DABSchema__[field_name]
try:
check_type(
new_val,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for {fname}.{field_name}: "
f"expected {field.annotations}, got {new_val!r}"
) from exp
object.__setattr__(inst, field_name, LAMdeepfreeze(new_val))
inst.__DABSchema__[field_name] = FrozenLAMField(
LAMField(field_name, new_val, field.annotations, field._info)
)
object.__setattr__(obj, fname, inst)
else:
raise InvalidFieldValue(
f"Invalid feature definition stored for '{fname}': {fdef!r}"
)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Support for runtime field and feature overrides.
Fields:
MyApp(name="foo")
Features:
MyApp(F1=MyF1) # inheritance / replacement
MyApp(F1={"val": 42, ...}) # dict override of existing feature
"""
# --- field overrides (unchanged) ---
for k, v in list(kwargs.items()):
if k in cls.__DABSchema__: # regular field
field = cls.__DABSchema__[k]
try:
check_type(
v,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for field '{k}': expected {field.annotations}, got {v!r}"
) from exp
object.__setattr__(obj, k, LAMdeepfreeze(v))
obj.__DABSchema__[k] = FrozenLAMField(
LAMField(k, v, field.annotations, field._info)
)
kwargs.pop(k)
# --- feature overrides ---
for k, v in list(kwargs.items()):
if k in cls.__DABSchema__.get("features", {}):
base_feat_cls = cls.__DABSchema__["features"][k]
# Case 1: subclass replacement (inheritance)
if isinstance(v, type) and issubclass(v, base_feat_cls):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
# record subclass into instance schema
obj.__DABSchema__["features"][k] = v
kwargs.pop(k)
# Case 2: dict override
elif isinstance(v, dict):
# store (class, override_dict) for finalize_instance
obj.__DABSchema__["features"][k] = (base_feat_cls, v)
kwargs.pop(k)
else:
raise InvalidFieldValue(
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
)
# --- new features not declared at class level ---
for k, v in list(kwargs.items()):
if isinstance(v, type) and issubclass(v, BaseFeature):
bound = getattr(v, "_BoundAppliance", None)
if bound is None or not issubclass(cls, bound):
raise FeatureNotBound(
f"Feature {v.__name__} is not bound to {cls.__name__}"
)
obj.__DABSchema__["features"][k] = v
kwargs.pop(k)
if kwargs:
unknown = ", ".join(sorted(kwargs.keys()))
raise InvalidFieldValue(f"Unknown parameters: {unknown}")

749
src/dabmodel/meta/base.py Normal file
View File

@@ -0,0 +1,749 @@
from typing import (
Optional,
TypeVar,
Union,
get_origin,
get_args,
List,
Dict,
Any,
Tuple,
Set,
Annotated,
FrozenSet,
Callable,
Type,
)
from types import UnionType, FunctionType, SimpleNamespace
from copy import deepcopy, copy
import math
import inspect, ast, textwrap
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
from ..LAMFields.LAMField import LAMField
from ..LAMFields.LAMFieldInfo import LAMFieldInfo
from ..LAMFields.FrozenLAMField import FrozenLAMField
from ..exception import (
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
NonExistingField,
InvalidInitializerType,
)
ALLOWED_ANNOTATIONS: dict[str, Any] = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
str,
int,
float,
complex,
bool,
bytes,
)
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
ALLOWED_HELPERS_DEFAULT: dict[str, object] = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
def _check_initializer_safety(func) -> None:
"""
Preliminary structural check for __initializer__.
Policy (minimal):
- Forbid 'import' / 'from ... import ...' inside the initializer body.
- Forbid nested function definitions (closures/helpers) in the body.
- Allow lambdas.
- No restrictions on calls here (keep it simple).
- Optionally forbid closures (free vars) for determinism.
"""
try:
src = inspect.getsource(func)
except OSError as exc:
# If source isn't available (rare), fail closed (or skip if you prefer)
raise FunctionForbidden("Cannot inspect __initializer__ source") from exc
src = textwrap.dedent(src)
mod = ast.parse(src)
# Find the FunctionDef node that corresponds to this initializer
init_node = None
for n in mod.body:
if (
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
and n.name == func.__name__
):
init_node = n
break
if init_node is None:
# Fallback: if not found, analyze nothing further to avoid false positives
return
# Walk ONLY the body of the initializer (don't flag the def itself)
body_tree = ast.Module(body=init_node.body, type_ignores=[])
for node in ast.walk(body_tree):
# Forbid imports
if isinstance(node, (ast.Import, ast.ImportFrom)):
raise ImportForbidden("imports disabled in __initializer")
# Forbid nested defs (but allow lambdas)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
raise FunctionForbidden(
"Nested function definitions are forbidden in __initializer"
)
if isinstance(node, ast.Lambda): # Forbid lambda
raise FunctionForbidden("Lambdas are forbidden in __initializer")
# Optional: forbid closures (keeps determinism; allows lambdas that don't capture)
if func.__code__.co_freevars:
raise FunctionForbidden("Closures are forbidden in __initializer__")
def _blocked_import(*args, **kwargs):
raise ImportForbidden("imports disabled in __initializer")
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval( # pylint: disable=eval-used
ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS
)
return ann
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
_type,
) -> bool:
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(
None
) in get_args(_type):
return all(
_check_annotation_definition(_)
for _ in get_args(_type)
if _ is not type(None)
)
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all(_check_annotation_definition(_) for _ in get_args(_type))
# handle Dict[...]
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(
f"Dict Annotation requires 2 inner definitions: {_type}"
)
return _peel_annotated(
inner[0]
) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all(_check_annotation_definition(_) for _ in inner_types)
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class ModelSpecView:
"""ModelSpecView class
A class that will act as fake BaseElement proxy to allow setting values"""
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(
self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str
):
self._name: str
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
self._module: str
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value: str):
pass
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{name}> value is not of expected type {T}."
) from exp
self._vals[name] = value
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
T_Meta = TypeVar("T_Meta", bound="BaseMeta")
T_BE = TypeVar("T_BE", bound="BaseElement")
class BaseMeta(type):
"""metaclass to use to build BaseElement"""
modified_fields: Dict[str, Any] = {}
new_fields: Dict[str, LAMField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__DABSchema__: dict[str, Any] = {}
@classmethod
def check_class(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""
Early class-build hook.
Validates the inheritance shape, initializes an empty schema for root classes,
copies the parent schema for subclasses, and ensures all annotated fields
have a default (inserting `None` when missing).
This runs before the class object is created.
"""
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden(
"Multiple inheritance is not supported by dabmodel"
)
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance(
"__DABSchema__ not found in base class, broken inheritance chain."
)
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [
_ for _ in namespace["__annotations__"] if _ not in namespace.keys()
]:
namespace[_funknown] = None
@classmethod
def process_class_fields( # pylint: disable=too-complex,too-many-branches
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
"""
Scan the class namespace and partition fields.
Detects:
- modified fields (shadowing parent values),
- new fields (present in annotations),
- the optional `__initializer` classmethod (in mangled or unmangled form).
Validates annotations and types and removes processed items from `namespace`
so they won't become normal attributes. Results are staged into:
mcs.new_fields, mcs.modified_fields, mcs.initializer
to be committed later.
"""
# iterating new and modified fields
mcs.modified_fields = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (
name.startswith("_") and _fname == "__initializer"
):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
mcs.initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("_"):
pass
elif isinstance(_fvalue, classmethod):
pass
else:
print(f"Parsing Field: {_fname} / {_fvalue}")
if (
len(bases) == 1 and _fname in namespace["__DABSchema__"].keys()
): # Modified fields
mcs.process_modified_field(
name, bases, namespace, _fname, _fvalue, extensions
)
else: # New fieds
mcs.process_new_field(
name, bases, namespace, _fname, _fvalue, extensions
)
# removing modified fields from class (will add them back later)
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in mcs.modified_fields:
del namespace[_fname]
if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
@classmethod
def process_modified_field(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *modified* field declared by a subclass.
Forbids annotation changes, validates the new default value against
the inherited annotation, and stages the new default into `mcs.modified_fields`.
"""
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation(
f"annotations cannot be modified on derived classes {_fname}"
)
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_fields[_fname] = _fvalue
@classmethod
def process_new_field(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Handle a *new* field declared on the class.
Resolves string annotations against a whitelist, validates `Annotated[...]`
payloads (allowing only LAMFieldInfo), checks the default value type,
and stages the field as a `LAMField` in `mcs.new_fields`.
"""
# print(f"New field: {_fname}")
# check if field is annotated
if (
"__annotations__" not in namespace
or _fname not in namespace["__annotations__"]
):
raise NotAnnotatedField(
f"Every dabmodel Fields must be annotated ({_fname})"
)
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(
namespace["__annotations__"][_fname]
)
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(
f"Field <{_fname}> has not an allowed or valid annotation."
)
_finfo: LAMFieldInfo = LAMFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(
f"Field <{_fname}> had invalid Annotated value."
)
if len(args) == 2 and not isinstance(args[1], LAMFieldInfo):
raise InvalidFieldAnnotation(
"Only LAMFieldInfo object is allowed as Annotated data."
)
_finfo = args[1]
# check if value is valid
try:
check_type(
_fvalue,
namespace["__annotations__"][_fname],
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
mcs.new_fields[_fname] = LAMField(
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
)
@classmethod
def apply_initializer(
mcs: type["BaseMeta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Apply the optional `__initializer` classmethod to compute derived defaults.
The initializer runs in a restricted, import-blocked environment using a
`ModelSpecView` proxy that enforces type checking on assignments.
On success, the computed values are validated and written back into the
class schema's DABFields.
"""
if mcs.initializer is not None:
_check_initializer_safety(mcs.initializer)
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue, LAMField):
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(
init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__
)
safe_globals = {
"__builtins__": {"__import__": _blocked_import},
**ALLOWED_HELPERS_DEFAULT,
}
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
mcs.initializer.__code__,
safe_globals,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(
_fvalue,
cls.__DABSchema__[_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
def __new__(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
) -> Type:
"""BaseElement new class"""
extensions: dict[str, Any] = {}
mcs.check_class(name, bases, namespace, extensions)
mcs.process_class_fields(name, bases, namespace, extensions)
_cls = super().__new__(mcs, name, bases, namespace)
mcs.commit_fields(_cls, name, bases, namespace, extensions)
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
_cls.install_instance_guard(extensions)
return _cls
@classmethod
def commit_fields(
mcs: type["BaseMeta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Commit staged fields into the class schema (`__DABSchema__`).
- For modified fields: copy the parent's LAMField, update its value.
- For new fields: set the freshly built LAMField and record its source.
"""
for _fname, _fvalue in mcs.modified_fields.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args)
extensions: dict[str, Any] = {}
cls.populate_instance( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.freeze_instance_schema( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.apply_overrides( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
return obj
def populate_instance(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Populate the new instance with field values from the class schema.
Copies each LAMField.value to an instance attribute (deep-frozen view).
"""
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue, LAMField):
object.__setattr__(obj, _fname, _fvalue.value)
def freeze_instance_schema(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Freeze the instance's schema by wrapping DABFields into FrozenLAMField.
Creates a per-instance `__DABSchema__` dict where each field is read-only.
"""
inst_schema = copy(obj.__DABSchema__)
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue, LAMField):
inst_schema[_fname] = FrozenLAMField(_fvalue)
if "features" in inst_schema:
inst_schema["features"] = dict(inst_schema["features"])
object.__setattr__(obj, "__DABSchema__", inst_schema)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Hook for runtime overrides at instance creation.
Invoked after the schema has been frozen but before finalize_instance.
Subclasses of BaseMeta can override this to support things like:
- Field overrides: MyApp(field=value)
- Feature overrides: MyApp(FeatureName=CustomFeature)
- Feature attachments: MyApp(NewFeature=FeatureClass)
By default this does nothing.
"""
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
"""
Finalization hook invoked at the end of instance construction.
Subclasses of the metaclass override this to attach runtime components
to the instance. (Example: BaseMetaAppliance instantiates bound Features
and sets them as attributes on the appliance instance.)
"""
def install_instance_guard(cls: Type, extensions: dict[str, Any]):
"""
Install the runtime `__setattr__` guard on the class.
After instances are constructed, prevents:
- creating new public fields,
- reassigning existing fields post-initialization.
Private/dunder attributes are exempt to allow internal bookkeeping.
"""
orig_setattr = getattr(cls, "__setattr__")
# cls.orig_setattr = orig_setattr
def guarded_setattr(_self, key: str, value: Any):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(_self, key, value)
# block writes after init if key is readonly
if key in _self.__DABSchema__.keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
# elif key in _self.__DABSchema__["features"].keys():
# if key in _self.__dict__:
# raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden("creating new fields is not allowed")
return orig_setattr(_self, key, value)
setattr(cls, "__setattr__", guarded_setattr)

View File

@@ -0,0 +1,19 @@
from typing import Type, Any
from .base import BaseMeta
from ..exception import FeatureNotBound
class BaseMetaFeature(BaseMeta):
"""BaseMetaFeature class
Feature specific metaclass code
"""
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseFeature new instance"""
if cls._BoundAppliance is None:
raise FeatureNotBound()
obj = super().__call__(*args, **kw)
return obj

View File

@@ -1,267 +0,0 @@
from __future__ import annotations
from abc import ABC, ABCMeta, abstractmethod
from uuid import uuid4
from typing import Annotated, ClassVar, Any, Self, TypeVar, TypeAlias, Generic, Union
from datetime import datetime
from copy import deepcopy, copy
from typing_extensions import dataclass_transform, get_origin
from pydantic import (
ConfigDict,
BaseModel,
StrictInt,
StrictStr,
constr,
ByteSize,
AwareDatetime,
UUID4,
model_validator,
field_validator,
field_serializer,
SerializeAsAny,
)
from pydantic.fields import Field, _Unset, PydanticUndefined
from pydantic._internal._model_construction import ModelMetaclass, PydanticModelField
from pydantic._internal._generics import PydanticGenericMetadata
from pydantic._internal._decorators import ensure_classmethod_based_on_signature
import pytz
import inspect
from runtype import issubclass as runtype_issubclass
class NoInstanceMethod:
"""Descriptor to forbid that other descriptors can be looked up on an instance"""
def __init__(self, descr, name=None):
self.descr = descr
self.name = name
def __set_name__(self, owner, name):
self.name = name
def __get__(self, instance, owner):
# enforce the instance cannot look up the attribute at all
if instance is not None:
raise AttributeError(f"{type(instance).__name__!r} has no attribute {self.name!r}")
# invoke any descriptor we are wrapping
return self.descr.__get__(instance, owner)
def DABField(default: Any = PydanticUndefined, *, final: bool | None = _Unset, finalize_only: bool | None = _Unset, **kwargs):
return Field(default, **kwargs)
T_BaseElement = TypeVar("T_BaseElement", bound="BaseElement")
T_BaseElement_ConfigMethod_Arg: TypeAlias = dict[str, Any]
T_BaseElement_ConfigMethod: TypeAlias = "classmethod[T_BaseElement, [T_BaseElement_ConfigMethod_Arg], T_BaseElement_ConfigMethod_Arg]"
class ConfigElement:
def __init__(self) -> None:
self.default_values_override_methods: dict[T_BaseElement_ConfigMethod, None] = {}
self.main_build_method: dict[T_BaseElement_ConfigMethod, None] = {}
def __copy__(self) -> Self:
# we cannot deepcopy because of classmethods, so we do a manual enhanced copy
cls = self.__class__
result = cls.__new__(cls)
for k, v in self.__dict__.items():
setattr(result, k, copy(v))
return result
class IBaseElement(BaseModel, ABC):
_config_element: ClassVar[ConfigElement] = ConfigElement()
@dataclass_transform(kw_only_default=True, field_specifiers=(PydanticModelField,))
class BaseElementMeta(ModelMetaclass, ABCMeta):
def __new__(
mcs,
cls_name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
__pydantic_generic_metadata__: PydanticGenericMetadata | None = None,
__pydantic_reset_parent_namespace__: bool = True,
_create_model_module: str | None = None,
**kwargs: Any,
) -> type:
result = super().__new__(
mcs,
cls_name,
bases,
namespace,
__pydantic_generic_metadata__,
__pydantic_reset_parent_namespace__,
_create_model_module,
**kwargs,
)
print(cls_name)
assert issubclass(result, IBaseElement), "Only IBaseElement subclasses are supported"
# forcing all Fields to be frozen
for _, field_val in result.model_fields.items():
field_val.frozen = True
# copying/forwarding base classes default-configs
if "_config_element" not in result.__dict__:
assert result.__base__ is not None, "Only IBaseElement subclasses are supported"
if issubclass(result.__base__, IBaseElement):
result._config_element = copy(result.__base__._config_element)
else:
result._config_element = ConfigElement()
# searching and storing current class default-configs
for _, method in result.__dict__.items():
if isinstance(method, classmethod):
if hasattr(method, "default_values_override"):
result._config_element.default_values_override_methods[method] = None
# todo: find a way to 'lock' and add restriction to a field after inheritance
return result
class BaseElement(
IBaseElement,
ABC,
validate_assignment=True,
# revalidate_instances="subclass-instances", # pydantic issue #10681
validate_default=True,
extra="forbid",
metaclass=BaseElementMeta,
):
class Config:
ignored_types = (NoInstanceMethod,)
template_id: Annotated[UUID4, DABField(..., repr=True)]
template_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
template_long_name: Annotated[StrictStr | None, DABField()]
template_description: Annotated[StrictStr | None, DABField()]
_saved_default_value: ClassVar[dict[str, Any]]
@model_validator(mode="before")
@classmethod
def __default_values_override_hook__(cls, values: T_BaseElement_ConfigMethod_Arg) -> T_BaseElement_ConfigMethod_Arg:
# extracting default values that were set in model fields
cls._saved_default_value = dict()
for field_key, field_val in cls.model_fields.items():
assert field_val.annotation is not None, "all fields must have annotation"
assert not runtype_issubclass(
field_val.annotation, BaseFeature
), "Features can only be in Appliance's features[] dict attribute"
"""
if field_key == "features":
cls._saved_default_value[field_key] = dict()
for feat_key, feat_value in field_val.items():
cls._saved_default_value[field_key][feat_key] = feat_value.dict()
"""
if field_val.default != PydanticUndefined:
cls._saved_default_value[field_key] = deepcopy(field_val.default)
for method, _ in cls._config_element.default_values_override_methods.items():
method.__func__(cls, cls._saved_default_value)
cls._default_values_override_hook__input_apply__(values)
return cls._saved_default_value
@classmethod
@abstractmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
): ...
class BaseFeature(BaseElement, ABC):
@NoInstanceMethod
@classmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
):
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls}")
# applying user-defined values
for attr_key, attr_val in values.items():
assert attr_key in cls.model_fields, f"given feature attribute does not exist ({attr_key})"
cls._saved_default_value[attr_key] = attr_val
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls} DONE")
def default_values_override(func: T_BaseElement_ConfigMethod) -> T_BaseElement_ConfigMethod:
func = ensure_classmethod_based_on_signature(func)
setattr(func, "default_values_override", lambda: True)
return func
T_Feature = TypeVar("T_Feature", bound=BaseFeature)
def get_discriminator_value(v: Any) -> str:
if isinstance(v, dict):
return v.get("fruit", v.get("filling"))
return getattr(v, "fruit", getattr(v, "filling", None))
class BaseAppliance(Generic[T_Feature], BaseElement, ABC):
cpu_cnt: Annotated[StrictInt, DABField(1, gt=0)]
ram_size: Annotated[ByteSize, DABField(256, gt=128)]
swap_size: Annotated[ByteSize, DABField(200, ge=0)]
rootfs_size: Annotated[ByteSize, DABField(2048, ge=2048)]
dabinst_id: Annotated[UUID4, DABField(uuid4(), repr=True)]
dabinst_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
dabinst_long_name: Annotated[StrictStr | None, DABField("")]
dabinst_description: Annotated[StrictStr | None, DABField("")]
dabinst_creationdate: Annotated[AwareDatetime | None, DABField(datetime.now(tz=pytz.utc))]
features: SerializeAsAny[dict[str, T_Feature]] = DABField({})
@NoInstanceMethod
@classmethod
def add_feature(cls, feat: T_Feature):
cls._saved_default_value["features"][type(feat).__name__] = feat.dict()
@NoInstanceMethod
@classmethod
def del_feature(cls, type_feat: type[T_Feature]):
del cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def get_feature(cls, type_feat: type[T_Feature]) -> T_Feature:
return cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
):
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls}")
# applying user-defined values
for attr_key, attr_val in values.items():
if attr_key == "features":
if cls._saved_default_value["features"] is None:
cls._saved_default_value["features"] = {}
for feature_key, feature_val in attr_val.items():
print(f"searching feature: {feature_key}")
assert hasattr(cls, feature_key), f"feature not found ({feature_key})"
cls_feature = getattr(cls, feature_key)
assert (
cls_feature is not None and inspect.isclass(cls_feature) and issubclass(cls_feature, BaseFeature),
"The requested feature does not exist in the current Appliance class tree",
)
cls._saved_default_value["features"][feature_key] = cls_feature(**feature_val)
else:
assert attr_key in cls.model_fields, f"given attribute does not exist ({attr_key})"
cls._saved_default_value[attr_key] = attr_val
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls} DONE")

32
src/dabmodel/tools.py Normal file
View File

@@ -0,0 +1,32 @@
"""library's internal tools"""
from uuid import UUID
from datetime import datetime
import json
from frozendict import deepfreeze
class LAMJSONEncoder(json.JSONEncoder):
"""allows to JSON encode non supported data type"""
def default(self, o):
if isinstance(o, UUID):
# if the o is uuid, we simply return the value of uuid
return o.hex
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)
def LAMdeepfreeze(value):
"""recursive freeze helper function"""
if isinstance(value, dict):
return deepfreeze(value)
if isinstance(value, set):
return frozenset(LAMdeepfreeze(v) for v in value)
if isinstance(value, list):
return tuple(LAMdeepfreeze(v) for v in value)
if isinstance(value, tuple):
return tuple(LAMdeepfreeze(v) for v in value)
return value

View File

@@ -1,37 +0,0 @@
from pydantic import BaseModel, SerializeAsAny
class commonbase(
BaseModel,
revalidate_instances="subclass-instances", # toogle to generate error
): ...
class basechild(commonbase):
test_val: int = 1
class derivedchild(basechild):
test_val2: int = 2
class container(commonbase):
ct_child_1: dict[str, basechild] = {}
ct_child_2: SerializeAsAny[dict[str, basechild]] = {}
ct_child_3: dict[str, SerializeAsAny[basechild]] = {}
if __name__ == "__main__":
test_val = container(
ct_child_1={"test1": derivedchild()},
ct_child_2={"test2": derivedchild()},
ct_child_3={"test3": derivedchild()},
)
print(test_val.model_dump_json(indent=1))
print(test_val.model_dump())
assert "test_val2" not in test_val.model_dump()["ct_child_1"]["test1"]
assert "test_val2" in test_val.model_dump()["ct_child_2"]["test2"]
assert "test_val2" in test_val.model_dump()["ct_child_3"]["test3"]

File diff suppressed because it is too large Load Diff