Compare commits

...

11 Commits

Author SHA1 Message Date
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
cclecle
04a4cf7b36 add deps 2025-09-05 22:56:17 +02:00
cclecle
f42a839cff work 2025-09-05 22:53:47 +02:00
cclecle
7f3a4ef545 work 2025-09-03 23:15:04 +02:00
cclecle
608c8a1010 work 2025-09-03 01:14:09 +02:00
cclecle
210781f086 new again 2025-09-02 19:18:31 +02:00
cclecle
df966ccac4 Merge branch 'dev' of https://chacha.ddns.net/gitea/chacha/dabmodel.git
into dev
2025-09-01 00:07:22 +02:00
chacha
29827b51bc add chatgpt code :) 2025-08-31 22:19:58 +02:00
chacha
7440731135 dev 2025-08-31 21:23:03 +02:00
cclecle
87682c2c9c cleaning a little bit... 2025-01-08 18:00:33 +01:00
chacha
0eef35e36f continue work 2024-12-08 01:04:27 +01:00
7 changed files with 1699 additions and 484 deletions

View File

@@ -1,13 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python311</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>

View File

@@ -34,7 +34,9 @@ classifiers = [
]
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging'
'packaging',
'frozendict',
'typeguard'
]
dynamic = ["version"]
@@ -78,7 +80,7 @@ test = ["chacha_cicd_helper"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper","types-pytz"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]

View File

@@ -11,4 +11,20 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .model import DABField, BaseFeature, BaseAppliance, default_values_override
from .model import (
DABFieldInfo,
BaseAppliance,
BaseFeature,
DABModelException,
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
)

View File

@@ -1,267 +1,523 @@
from __future__ import annotations
from abc import ABC, ABCMeta, abstractmethod
from uuid import uuid4
from typing import Annotated, ClassVar, Any, Self, TypeVar, TypeAlias, Generic, Union
from datetime import datetime
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
from types import UnionType, FunctionType, SimpleNamespace
from frozendict import deepfreeze
from copy import deepcopy, copy
from typing_extensions import dataclass_transform, get_origin
from pydantic import (
ConfigDict,
BaseModel,
StrictInt,
StrictStr,
constr,
ByteSize,
AwareDatetime,
UUID4,
model_validator,
field_validator,
field_serializer,
SerializeAsAny,
from pprint import pprint
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
import math
ALLOWED_ANNOTATIONS = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
class DABModelException(Exception): ...
class MultipleInheritanceForbidden(DABModelException): ...
class BrokenInheritance(DABModelException): ...
class ReadOnlyField(DABModelException): ...
class NewFieldForbidden(DABModelException): ...
class InvalidFieldAnnotation(DABModelException): ...
class InvalidInitializerType(DABModelException): ...
class NotAnnotatedField(InvalidFieldAnnotation): ...
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
class ReadOnlyFieldAnnotation(DABModelException): ...
class InvalidFieldValue(DABModelException): ...
class NonExistingField(DABModelException): ...
class ImportForbidden(DABModelException): ...
class FunctionForbidden(DABModelException): ...
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
from pydantic.fields import Field, _Unset, PydanticUndefined
from pydantic._internal._model_construction import ModelMetaclass, PydanticModelField
from pydantic._internal._generics import PydanticGenericMetadata
from pydantic._internal._decorators import ensure_classmethod_based_on_signature
import pytz
import inspect
from runtype import issubclass as runtype_issubclass
ALLOWED_HELPERS_DEFAULT = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
class NoInstanceMethod:
"""Descriptor to forbid that other descriptors can be looked up on an instance"""
def __init__(self, descr, name=None):
self.descr = descr
self.name = name
def __set_name__(self, owner, name):
self.name = name
def __get__(self, instance, owner):
# enforce the instance cannot look up the attribute at all
if instance is not None:
raise AttributeError(f"{type(instance).__name__!r} has no attribute {self.name!r}")
# invoke any descriptor we are wrapping
return self.descr.__get__(instance, owner)
def _blocked_import(*args, **kwargs):
raise ImportForbidden("imports disabled in __initializer")
def DABField(default: Any = PydanticUndefined, *, final: bool | None = _Unset, finalize_only: bool | None = _Unset, **kwargs):
return Field(default, **kwargs)
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
return ann
T_BaseElement = TypeVar("T_BaseElement", bound="BaseElement")
T_BaseElement_ConfigMethod_Arg: TypeAlias = dict[str, Any]
T_BaseElement_ConfigMethod: TypeAlias = "classmethod[T_BaseElement, [T_BaseElement_ConfigMethod_Arg], T_BaseElement_ConfigMethod_Arg]"
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
class ConfigElement:
def __init__(self) -> None:
self.default_values_override_methods: dict[T_BaseElement_ConfigMethod, None] = {}
self.main_build_method: dict[T_BaseElement_ConfigMethod, None] = {}
def _check_annotation_definition(_type) -> bool:
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all([_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None)])
def __copy__(self) -> Self:
# we cannot deepcopy because of classmethods, so we do a manual enhanced copy
cls = self.__class__
result = cls.__new__(cls)
for k, v in self.__dict__.items():
setattr(result, k, copy(v))
return result
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all([_check_annotation_definition(_) for _ in get_args(_type)])
# handle Dict[...]
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all([_check_annotation_definition(_) for _ in inner_types])
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all([_check_annotation_definition(_) for _ in inner_types])
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class IBaseElement(BaseModel, ABC):
_config_element: ClassVar[ConfigElement] = ConfigElement()
class BaseConstraint(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
_bound_type: type
def __init__(self): ...
def check(self, value: TV_ALLOWED_MODEL_FIELDS_TYPES) -> bool: ...
@dataclass_transform(kw_only_default=True, field_specifiers=(PydanticModelField,))
class BaseElementMeta(ModelMetaclass, ABCMeta):
def __new__(
mcs,
cls_name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
__pydantic_generic_metadata__: PydanticGenericMetadata | None = None,
__pydantic_reset_parent_namespace__: bool = True,
_create_model_module: str | None = None,
**kwargs: Any,
) -> type:
result = super().__new__(
mcs,
cls_name,
bases,
namespace,
__pydantic_generic_metadata__,
__pydantic_reset_parent_namespace__,
_create_model_module,
**kwargs,
)
print(cls_name)
def _deepfreeze(value):
if isinstance(value, dict):
return deepfreeze(value)
elif isinstance(value, set):
return frozenset(_deepfreeze(v) for v in value)
elif isinstance(value, list):
return tuple(_deepfreeze(v) for v in value)
elif isinstance(value, tuple):
return tuple(_deepfreeze(v) for v in value)
return value
assert issubclass(result, IBaseElement), "Only IBaseElement subclasses are supported"
# forcing all Fields to be frozen
for _, field_val in result.model_fields.items():
field_val.frozen = True
class DABFieldInfo:
def __init__(self, *, doc: str = "", constraints: list[BaseConstraint] = []):
self._doc: str = doc
self._constraints: list[BaseConstraint] = constraints
# copying/forwarding base classes default-configs
if "_config_element" not in result.__dict__:
assert result.__base__ is not None, "Only IBaseElement subclasses are supported"
if issubclass(result.__base__, IBaseElement):
result._config_element = copy(result.__base__._config_element)
@property
def doc(self):
return self._doc
@property
def constraints(self):
return self._constraints
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any, i: str):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._annotations: Any = a
self._info: DABFieldInfo = i
self._constraints: List[BaseConstraint] = i.constraints
def add_source(self, s: type) -> None:
self._source = s
@property
def doc(self):
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
return self._info.constraints
@property
def default_value(self):
return _deepfreeze(self._default_value)
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
self._value = v
@property
def value(self):
return _deepfreeze(self._value)
@property
def raw_value(self):
return self._value
@property
def annotations(self) -> Any:
return self._annotations
class FrozenDABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, inner_field: DABField):
self._inner_field = inner_field
@property
def doc(self):
return self._inner_field.doc
@property
def constraints(self):
return _deepfreeze(self._inner_field.constraints)
@property
def default_value(self):
return self._inner_field.default_value
@property
def value(self):
return self._inner_field.value
@property
def annotations(self) -> Any:
return _deepfreeze(self._inner_field.annotations)
class ModelSpecView:
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(self, values: dict, types_map: dict[str, type], name, module):
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_touched", set())
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
return self._name
@property
def __module__(self) -> str:
return self._module
def __getattr__(self, name):
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name, value):
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
self._touched.add(name)
def export(self) -> dict:
return dict(self._vals)
def diff(self) -> dict:
return {k: self._vals[k] for k in self._touched}
class BaseMeta(type):
def __new__(mcls, name, bases, namespace):
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
elif len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = dict()
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
# iterating new and modified fields
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField] = {}
initializer: Optional[type] = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("__"):
pass
else:
result._config_element = ConfigElement()
# print(f"Parsing Field: {_fname} / {_fvalue}")
# Modified fields
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
# print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
modified_field[_fname] = _fvalue
# New fieds
else:
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# searching and storing current class default-configs
for _, method in result.__dict__.items():
if isinstance(method, classmethod):
if hasattr(method, "default_values_override"):
result._config_element.default_values_override_methods[method] = None
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# todo: find a way to 'lock' and add restriction to a field after inheritance
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
return result
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: Optional[DABFieldInfo] = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation(f"Only DABFieldInfo object is allowed as Annotated data.")
class BaseElement(
IBaseElement,
ABC,
validate_assignment=True,
# revalidate_instances="subclass-instances", # pydantic issue #10681
validate_default=True,
extra="forbid",
metaclass=BaseElementMeta,
):
class Config:
ignored_types = (NoInstanceMethod,)
_finfo = args[1]
template_id: Annotated[UUID4, DABField(..., repr=True)]
template_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
template_long_name: Annotated[StrictStr | None, DABField()]
template_description: Annotated[StrictStr | None, DABField()]
_saved_default_value: ClassVar[dict[str, Any]]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
@model_validator(mode="before")
@classmethod
def __default_values_override_hook__(cls, values: T_BaseElement_ConfigMethod_Arg) -> T_BaseElement_ConfigMethod_Arg:
# extracting default values that were set in model fields
cls._saved_default_value = dict()
for field_key, field_val in cls.model_fields.items():
assert field_val.annotation is not None, "all fields must have annotation"
assert not runtype_issubclass(
field_val.annotation, BaseFeature
), "Features can only be in Appliance's features[] dict attribute"
"""
if field_key == "features":
cls._saved_default_value[field_key] = dict()
for feat_key, feat_value in field_val.items():
cls._saved_default_value[field_key][feat_key] = feat_value.dict()
"""
if field_val.default != PydanticUndefined:
cls._saved_default_value[field_key] = deepcopy(field_val.default)
for method, _ in cls._config_element.default_values_override_methods.items():
method.__func__(cls, cls._saved_default_value)
# removing modified fields from class (will add them back later)
for _fname in new_fields.keys():
del namespace[_fname]
for _fname in modified_field.keys():
del namespace[_fname]
if initializer is not None:
del namespace[initializer_name]
cls._default_values_override_hook__input_apply__(values)
orig_setattr = namespace.get("__setattr__", object.__setattr__)
return cls._saved_default_value
@classmethod
@abstractmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
): ...
class BaseFeature(BaseElement, ABC):
@NoInstanceMethod
@classmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
):
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls}")
# applying user-defined values
for attr_key, attr_val in values.items():
assert attr_key in cls.model_fields, f"given feature attribute does not exist ({attr_key})"
cls._saved_default_value[attr_key] = attr_val
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls} DONE")
def default_values_override(func: T_BaseElement_ConfigMethod) -> T_BaseElement_ConfigMethod:
func = ensure_classmethod_based_on_signature(func)
setattr(func, "default_values_override", lambda: True)
return func
T_Feature = TypeVar("T_Feature", bound=BaseFeature)
def get_discriminator_value(v: Any) -> str:
if isinstance(v, dict):
return v.get("fruit", v.get("filling"))
return getattr(v, "fruit", getattr(v, "filling", None))
class BaseAppliance(Generic[T_Feature], BaseElement, ABC):
cpu_cnt: Annotated[StrictInt, DABField(1, gt=0)]
ram_size: Annotated[ByteSize, DABField(256, gt=128)]
swap_size: Annotated[ByteSize, DABField(200, ge=0)]
rootfs_size: Annotated[ByteSize, DABField(2048, ge=2048)]
dabinst_id: Annotated[UUID4, DABField(uuid4(), repr=True)]
dabinst_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
dabinst_long_name: Annotated[StrictStr | None, DABField("")]
dabinst_description: Annotated[StrictStr | None, DABField("")]
dabinst_creationdate: Annotated[AwareDatetime | None, DABField(datetime.now(tz=pytz.utc))]
features: SerializeAsAny[dict[str, T_Feature]] = DABField({})
@NoInstanceMethod
@classmethod
def add_feature(cls, feat: T_Feature):
cls._saved_default_value["features"][type(feat).__name__] = feat.dict()
@NoInstanceMethod
@classmethod
def del_feature(cls, type_feat: type[T_Feature]):
del cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def get_feature(cls, type_feat: type[T_Feature]) -> T_Feature:
return cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
):
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls}")
# applying user-defined values
for attr_key, attr_val in values.items():
if attr_key == "features":
if cls._saved_default_value["features"] is None:
cls._saved_default_value["features"] = {}
for feature_key, feature_val in attr_val.items():
print(f"searching feature: {feature_key}")
assert hasattr(cls, feature_key), f"feature not found ({feature_key})"
cls_feature = getattr(cls, feature_key)
assert (
cls_feature is not None and inspect.isclass(cls_feature) and issubclass(cls_feature, BaseFeature),
"The requested feature does not exist in the current Appliance class tree",
)
cls._saved_default_value["features"][feature_key] = cls_feature(**feature_val)
def guarded_setattr(self, key, value):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(self, key, value)
# block writes after init if key is readonly
if key in self.__DABSchema__.keys():
if hasattr(self, key):
raise ReadOnlyField(f"{key} is read-only")
else:
assert attr_key in cls.model_fields, f"given attribute does not exist ({attr_key})"
cls._saved_default_value[attr_key] = attr_val
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls} DONE")
raise NewFieldForbidden(f"creating new fields is not allowed")
return orig_setattr(self, key, value)
namespace["__setattr__"] = guarded_setattr
cls = super().__new__(mcls, name, bases, namespace)
for _fname, _fvalue in modified_field.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in new_fields.items():
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
if initializer is not None:
init_fieldvalues = dict()
init_fieldtypes = dict()
for _fname, _fvalue in cls.__DABSchema__.items():
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
if initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
initializer.__code__,
safe_globals,
name=initializer.__name__,
argdefs=initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls)
print(fakecls.diff())
for _fname, _fvalue in fakecls.export().items():
try:
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
return cls
def __call__(cls, *args, **kw):
obj = super().__call__(*args, **kw)
for _fname in cls.__DABSchema__.keys():
setattr(obj, _fname, cls.__DABSchema__[_fname].value)
inst_schema = dict()
for _fname, _fvalue in cls.__DABSchema__.items():
inst_schema[_fname] = FrozenDABField(_fvalue)
setattr(obj, "__DABSchema__", inst_schema)
return obj
class BaseElement(metaclass=BaseMeta):
pass
class BaseFeature(BaseElement):
pass
class BaseAppliance(BaseElement):
pass

13
src/dabmodel/tools.py Normal file
View File

@@ -0,0 +1,13 @@
from uuid import UUID
from datetime import datetime
import json
class DABJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
elif isinstance(obj, datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)

View File

@@ -1,37 +0,0 @@
from pydantic import BaseModel, SerializeAsAny
class commonbase(
BaseModel,
revalidate_instances="subclass-instances", # toogle to generate error
): ...
class basechild(commonbase):
test_val: int = 1
class derivedchild(basechild):
test_val2: int = 2
class container(commonbase):
ct_child_1: dict[str, basechild] = {}
ct_child_2: SerializeAsAny[dict[str, basechild]] = {}
ct_child_3: dict[str, SerializeAsAny[basechild]] = {}
if __name__ == "__main__":
test_val = container(
ct_child_1={"test1": derivedchild()},
ct_child_2={"test2": derivedchild()},
ct_child_3={"test3": derivedchild()},
)
print(test_val.model_dump_json(indent=1))
print(test_val.model_dump())
assert "test_val2" not in test_val.model_dump()["ct_child_1"]["test1"]
assert "test_val2" in test_val.model_dump()["ct_child_2"]["test2"]
assert "test_val2" in test_val.model_dump()["ct_child_3"]["test3"]

File diff suppressed because it is too large Load Diff