Compare commits

...

17 Commits

Author SHA1 Message Date
cclecle
cce260bc5e reordering 2025-09-07 18:42:38 +02:00
cclecle
915a4332ee tiny fix :) 2025-09-06 01:47:49 +02:00
cclecle
4dca3eb9d1 improve typing 2025-09-06 01:43:20 +02:00
cclecle
e11c541139 small opt 2025-09-06 01:35:28 +02:00
cclecle
637b50b325 quality & typing fixes 2025-09-06 01:31:55 +02:00
cclecle
f45c9cc8f3 fix unittest 2025-09-05 23:04:16 +02:00
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
cclecle
04a4cf7b36 add deps 2025-09-05 22:56:17 +02:00
cclecle
f42a839cff work 2025-09-05 22:53:47 +02:00
cclecle
7f3a4ef545 work 2025-09-03 23:15:04 +02:00
cclecle
608c8a1010 work 2025-09-03 01:14:09 +02:00
cclecle
210781f086 new again 2025-09-02 19:18:31 +02:00
cclecle
df966ccac4 Merge branch 'dev' of https://chacha.ddns.net/gitea/chacha/dabmodel.git
into dev
2025-09-01 00:07:22 +02:00
chacha
29827b51bc add chatgpt code :) 2025-08-31 22:19:58 +02:00
chacha
7440731135 dev 2025-08-31 21:23:03 +02:00
cclecle
87682c2c9c cleaning a little bit... 2025-01-08 18:00:33 +01:00
chacha
0eef35e36f continue work 2024-12-08 01:04:27 +01:00
7 changed files with 1870 additions and 486 deletions

View File

@@ -1,13 +1,20 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?><pydev_project> <?eclipse-pydev version="1.0"?><pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python311</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH"> <pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
<path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty> <pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
</pydev_project> <path>/${PROJECT_DIR_NAME}/src</path>
<path>/${PROJECT_DIR_NAME}</path>
</pydev_pathproperty>
</pydev_project>

View File

@@ -34,7 +34,9 @@ classifiers = [
] ]
dependencies = [ dependencies = [
'importlib-metadata; python_version<"3.9"', 'importlib-metadata; python_version<"3.9"',
'packaging' 'packaging',
'frozendict',
'typeguard'
] ]
dynamic = ["version"] dynamic = ["version"]
@@ -78,7 +80,7 @@ test = ["chacha_cicd_helper"]
coverage-check = ["chacha_cicd_helper"] coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"] complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"] quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"] type-check = ["chacha_cicd_helper","types-pytz"]
doc-gen = ["chacha_cicd_helper"] doc-gen = ["chacha_cicd_helper"]
# [project.scripts] # [project.scripts]

View File

@@ -11,4 +11,20 @@ Main module __init__ file.
""" """
from .__metadata__ import __version__, __Summuary__, __Name__ from .__metadata__ import __version__, __Summuary__, __Name__
from .model import DABField, BaseFeature, BaseAppliance, default_values_override from .model import (
DABFieldInfo,
BaseAppliance,
BaseFeature,
DABModelException,
MultipleInheritanceForbidden,
BrokenInheritance,
ReadOnlyField,
NewFieldForbidden,
NotAnnotatedField,
ReadOnlyFieldAnnotation,
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
)

View File

@@ -1,267 +1,685 @@
from __future__ import annotations """ dabmodel model module
This module implements DAB model classes.
This module contains metaclass and bases classes used to create models.
BaseAppliance can be used to create a new Appliance Data.
BaseFeature can be used to create new Appliance's Features."""
from abc import ABC, ABCMeta, abstractmethod from typing import (
from uuid import uuid4 Optional,
from typing import Annotated, ClassVar, Any, Self, TypeVar, TypeAlias, Generic, Union TypeVar,
from datetime import datetime Generic,
from copy import deepcopy, copy Union,
from typing_extensions import dataclass_transform, get_origin get_origin,
from pydantic import ( get_args,
ConfigDict, List,
BaseModel, Dict,
StrictInt, Any,
StrictStr, Tuple,
constr, Set,
ByteSize, Annotated,
AwareDatetime, FrozenSet,
UUID4, Callable,
model_validator, Type,
field_validator,
field_serializer,
SerializeAsAny,
) )
from pydantic.fields import Field, _Unset, PydanticUndefined from types import UnionType, FunctionType, SimpleNamespace
from pydantic._internal._model_construction import ModelMetaclass, PydanticModelField from copy import deepcopy, copy
from pydantic._internal._generics import PydanticGenericMetadata
from pydantic._internal._decorators import ensure_classmethod_based_on_signature
import pytz
import inspect
from runtype import issubclass as runtype_issubclass # from pprint import pprint
import math
from frozendict import deepfreeze
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
ALLOWED_ANNOTATIONS = {
"Union": Union,
"Optional": Optional,
"List": List,
"Dict": Dict,
"Tuple": Tuple,
"Set": Set,
"FrozenSet": FrozenSet,
"Annotated": Annotated,
# builtins:
"int": int,
"str": str,
"float": float,
"bool": bool,
"complex": complex,
"bytes": bytes,
"None": type(None),
"list": list,
"dict": dict,
"set": set,
"frozenset": frozenset,
"tuple": tuple,
}
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
class NoInstanceMethod: # TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
"""Descriptor to forbid that other descriptors can be looked up on an instance"""
def __init__(self, descr, name=None):
self.descr = descr
self.name = name
def __set_name__(self, owner, name):
self.name = name
def __get__(self, instance, owner):
# enforce the instance cannot look up the attribute at all
if instance is not None:
raise AttributeError(f"{type(instance).__name__!r} has no attribute {self.name!r}")
# invoke any descriptor we are wrapping
return self.descr.__get__(instance, owner)
def DABField(default: Any = PydanticUndefined, *, final: bool | None = _Unset, finalize_only: bool | None = _Unset, **kwargs): class DABModelException(Exception):
return Field(default, **kwargs) """DABModelException Exception class
Base Exception for DABModelException class
"""
T_BaseElement = TypeVar("T_BaseElement", bound="BaseElement") class MultipleInheritanceForbidden(DABModelException):
T_BaseElement_ConfigMethod_Arg: TypeAlias = dict[str, Any] """MultipleInheritanceForbidden Exception class
T_BaseElement_ConfigMethod: TypeAlias = "classmethod[T_BaseElement, [T_BaseElement_ConfigMethod_Arg], T_BaseElement_ConfigMethod_Arg]" Multiple inheritance is forbidden when using dabmodel
"""
class ConfigElement: class BrokenInheritance(DABModelException):
def __init__(self) -> None: """BrokenInheritance Exception class
self.default_values_override_methods: dict[T_BaseElement_ConfigMethod, None] = {} inheritance chain is broken
self.main_build_method: dict[T_BaseElement_ConfigMethod, None] = {} """
def __copy__(self) -> Self:
# we cannot deepcopy because of classmethods, so we do a manual enhanced copy
cls = self.__class__
result = cls.__new__(cls)
for k, v in self.__dict__.items():
setattr(result, k, copy(v))
return result
class IBaseElement(BaseModel, ABC): class ReadOnlyField(DABModelException):
_config_element: ClassVar[ConfigElement] = ConfigElement() """ReadOnlyField Exception class
The used Field is ReadOnly
"""
@dataclass_transform(kw_only_default=True, field_specifiers=(PydanticModelField,)) class NewFieldForbidden(DABModelException):
class BaseElementMeta(ModelMetaclass, ABCMeta): """NewFieldForbidden Exception class
def __new__( Field creation is forbidden
mcs, """
cls_name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
__pydantic_generic_metadata__: PydanticGenericMetadata | None = None,
__pydantic_reset_parent_namespace__: bool = True,
_create_model_module: str | None = None,
**kwargs: Any,
) -> type:
result = super().__new__(
mcs,
cls_name,
bases,
namespace,
__pydantic_generic_metadata__,
__pydantic_reset_parent_namespace__,
_create_model_module,
**kwargs,
)
print(cls_name)
assert issubclass(result, IBaseElement), "Only IBaseElement subclasses are supported"
# forcing all Fields to be frozen class InvalidFieldAnnotation(DABModelException):
for _, field_val in result.model_fields.items(): """InvalidFieldAnnotation Exception class
field_val.frozen = True The field annotation is invalid
"""
# copying/forwarding base classes default-configs
if "_config_element" not in result.__dict__: class InvalidInitializerType(DABModelException):
assert result.__base__ is not None, "Only IBaseElement subclasses are supported" """InvalidInitializerType Exception class
if issubclass(result.__base__, IBaseElement): The initializer is not a valid type
result._config_element = copy(result.__base__._config_element) """
class NotAnnotatedField(InvalidFieldAnnotation):
"""NotAnnotatedField Exception class
The Field is not Annotated
"""
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
"""IncompletelyAnnotatedField Exception class
The field annotation is incomplete
"""
class ReadOnlyFieldAnnotation(DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class InvalidFieldValue(DABModelException):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
"""
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class
function call are forbidden
"""
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
ALLOWED_HELPERS_DEFAULT = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
def _blocked_import(*args, **kwargs):
raise ImportForbidden("imports disabled in __initializer")
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
def _peel_annotated(t: Any) -> Any:
# If you ever allow Annotated[T, ...], peel to T
while True:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
else:
return t
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all(_check_annotation_definition(_) for _ in get_args(_type))
# handle Dict[...]
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all(_check_annotation_definition(_) for _ in inner_types)
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
T_Field = TypeVar("T_Field")
class BaseConstraint(Generic[T_Field]):
"""BaseConstraint class
Base class for Field's constraints
"""
_bound_type: type
def __init__(self): ...
def check(self, value: T_Field) -> bool:
"""Check if a Constraint is completed"""
return True
def _deepfreeze(value):
"""recursive freeze helper function"""
if isinstance(value, dict):
return deepfreeze(value)
if isinstance(value, set):
return frozenset(_deepfreeze(v) for v in value)
if isinstance(value, list):
return tuple(_deepfreeze(v) for v in value)
if isinstance(value, tuple):
return tuple(_deepfreeze(v) for v in value)
return value
class DABFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
self._doc: str = doc
self._constraints: list[BaseConstraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self) -> list[BaseConstraint[Any]]:
"""Returns Field's constraints"""
return self._constraints
class DABField(Generic[T_Field]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: DABFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[T_Field] = v
self._value: Optional[T_Field] = v
self._annotations: Any = a
self._info: DABFieldInfo = i
self._constraints: List[BaseConstraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return _deepfreeze(self._default_value)
def update_value(self, v: Optional[T_Field] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return _deepfreeze(self._value)
@property
def raw_value(self) -> Optional[T_Field]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations
class FrozenDABField(Generic[T_Field]):
"""FrozenDABField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: DABField):
self._inner_field = inner_field
@property
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return _deepfreeze(self._inner_field.doc)
@property
def constraints(self) -> tuple[BaseConstraint]:
"""Returns Field's constraint (frozen)"""
return _deepfreeze(self._inner_field.constraints)
@property
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return _deepfreeze(self._inner_field.annotations)
class ModelSpecView:
"""ModelSpecView class
A class that will act as fake BaseElement proxy to allow setting values"""
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str):
self._name: str
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
self._module: str
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value: str):
pass
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
T_Meta = TypeVar("T_Meta", bound="BaseMeta")
T_BE = TypeVar("T_BE", bound="BaseElement")
class BaseMeta(type):
"""metaclass to use to build BaseElement"""
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__DABSchema__: dict[str, Any] = {}
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
) -> None:
"""early BaseElement checks"""
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
"""preprocessing BaseElement"""
# iterating new and modified fields
mcs.modified_field = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
mcs.initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("__"):
pass
else: else:
result._config_element = ConfigElement() # print(f"Parsing Field: {_fname} / {_fvalue}")
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
# searching and storing current class default-configs mcs.pre_processing_modified_fields(name, bases, namespace, _fname, _fvalue)
for _, method in result.__dict__.items(): else: # New fieds
if isinstance(method, classmethod): mcs.pre_processing_new_fields(name, bases, namespace, _fname, _fvalue)
if hasattr(method, "default_values_override"): # removing modified fields from class (will add them back later)
result._config_element.default_values_override_methods[method] = None for _fname in mcs.new_fields:
del namespace[_fname]
# todo: find a way to 'lock' and add restriction to a field after inheritance for _fname in mcs.modified_field:
del namespace[_fname]
return result if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
class BaseElement(
IBaseElement,
ABC,
validate_assignment=True,
# revalidate_instances="subclass-instances", # pydantic issue #10681
validate_default=True,
extra="forbid",
metaclass=BaseElementMeta,
):
class Config:
ignored_types = (NoInstanceMethod,)
template_id: Annotated[UUID4, DABField(..., repr=True)]
template_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
template_long_name: Annotated[StrictStr | None, DABField()]
template_description: Annotated[StrictStr | None, DABField()]
_saved_default_value: ClassVar[dict[str, Any]]
@model_validator(mode="before")
@classmethod
def __default_values_override_hook__(cls, values: T_BaseElement_ConfigMethod_Arg) -> T_BaseElement_ConfigMethod_Arg:
# extracting default values that were set in model fields
cls._saved_default_value = dict()
for field_key, field_val in cls.model_fields.items():
assert field_val.annotation is not None, "all fields must have annotation"
assert not runtype_issubclass(
field_val.annotation, BaseFeature
), "Features can only be in Appliance's features[] dict attribute"
"""
if field_key == "features":
cls._saved_default_value[field_key] = dict()
for feat_key, feat_value in field_val.items():
cls._saved_default_value[field_key][feat_key] = feat_value.dict()
"""
if field_val.default != PydanticUndefined:
cls._saved_default_value[field_key] = deepcopy(field_val.default)
for method, _ in cls._config_element.default_values_override_methods.items():
method.__func__(cls, cls._saved_default_value)
cls._default_values_override_hook__input_apply__(values)
return cls._saved_default_value
@classmethod @classmethod
@abstractmethod def pre_processing_modified_fields(
def _default_values_override_hook__input_apply__( mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
cls, ): # pylint: disable=unused-argument
values: T_BaseElement_ConfigMethod_Arg, """preprocessing BaseElement modified Fields"""
): ... # print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_field[_fname] = _fvalue
class BaseFeature(BaseElement, ABC):
@NoInstanceMethod
@classmethod @classmethod
def _default_values_override_hook__input_apply__( def pre_processing_new_fields(
cls, mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
values: T_BaseElement_ConfigMethod_Arg, ): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: DABFieldInfo = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
@classmethod
def call_initializer(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
): ):
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls}") """BaseElement initializer processing"""
# applying user-defined values if mcs.initializer is not None:
for attr_key, attr_val in values.items(): init_fieldvalues = {}
assert attr_key in cls.model_fields, f"given feature attribute does not exist ({attr_key})" init_fieldtypes = {}
cls._saved_default_value[attr_key] = attr_val for _fname, _fvalue in cls.__DABSchema__.items():
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls} DONE") init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
mcs.initializer.__code__,
safe_globals,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
"""BaseElement new class"""
mcs.pre_check(name, bases, namespace)
mcs.pre_processing(name, bases, namespace)
def default_values_override(func: T_BaseElement_ConfigMethod) -> T_BaseElement_ConfigMethod: orig_setattr = namespace.get("__setattr__", object.__setattr__)
func = ensure_classmethod_based_on_signature(func)
setattr(func, "default_values_override", lambda: True)
return func
def guarded_setattr(self, key: str, value: Any):
T_Feature = TypeVar("T_Feature", bound=BaseFeature) if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(self, key, value)
# block writes after init if key is readonly
def get_discriminator_value(v: Any) -> str: if key in self.__DABSchema__.keys():
if isinstance(v, dict): if hasattr(self, key):
return v.get("fruit", v.get("filling")) raise ReadOnlyField(f"{key} is read-only")
return getattr(v, "fruit", getattr(v, "filling", None))
class BaseAppliance(Generic[T_Feature], BaseElement, ABC):
cpu_cnt: Annotated[StrictInt, DABField(1, gt=0)]
ram_size: Annotated[ByteSize, DABField(256, gt=128)]
swap_size: Annotated[ByteSize, DABField(200, ge=0)]
rootfs_size: Annotated[ByteSize, DABField(2048, ge=2048)]
dabinst_id: Annotated[UUID4, DABField(uuid4(), repr=True)]
dabinst_short_name: Annotated[
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
]
dabinst_long_name: Annotated[StrictStr | None, DABField("")]
dabinst_description: Annotated[StrictStr | None, DABField("")]
dabinst_creationdate: Annotated[AwareDatetime | None, DABField(datetime.now(tz=pytz.utc))]
features: SerializeAsAny[dict[str, T_Feature]] = DABField({})
@NoInstanceMethod
@classmethod
def add_feature(cls, feat: T_Feature):
cls._saved_default_value["features"][type(feat).__name__] = feat.dict()
@NoInstanceMethod
@classmethod
def del_feature(cls, type_feat: type[T_Feature]):
del cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def get_feature(cls, type_feat: type[T_Feature]) -> T_Feature:
return cls._saved_default_value["features"][type_feat.__name__]
@NoInstanceMethod
@classmethod
def _default_values_override_hook__input_apply__(
cls,
values: T_BaseElement_ConfigMethod_Arg,
):
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls}")
# applying user-defined values
for attr_key, attr_val in values.items():
if attr_key == "features":
if cls._saved_default_value["features"] is None:
cls._saved_default_value["features"] = {}
for feature_key, feature_val in attr_val.items():
print(f"searching feature: {feature_key}")
assert hasattr(cls, feature_key), f"feature not found ({feature_key})"
cls_feature = getattr(cls, feature_key)
assert (
cls_feature is not None and inspect.isclass(cls_feature) and issubclass(cls_feature, BaseFeature),
"The requested feature does not exist in the current Appliance class tree",
)
cls._saved_default_value["features"][feature_key] = cls_feature(**feature_val)
else: else:
assert attr_key in cls.model_fields, f"given attribute does not exist ({attr_key})" raise NewFieldForbidden("creating new fields is not allowed")
cls._saved_default_value[attr_key] = attr_val
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls} DONE") return orig_setattr(self, key, value)
namespace["__setattr__"] = guarded_setattr
_cls = super().__new__(mcs, name, bases, namespace)
for _fname, _fvalue in mcs.modified_field.items():
_cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
_cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(mcs)
_cls.__DABSchema__[_fname] = _fvalue
mcs.call_initializer(_cls, name, bases, namespace)
return _cls
def __call__(cls, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args, **kw)
for _fname, _fvalue in cls.__DABSchema__.items():
setattr(obj, _fname, _fvalue.value)
inst_schema: dict[str, Any] = {}
for _fname, _fvalue in cls.__DABSchema__.items():
inst_schema[_fname] = FrozenDABField(_fvalue)
setattr(obj, "__DABSchema__", inst_schema)
return obj
class BaseElement(metaclass=BaseMeta):
"""BaseElement class
Base class to apply metaclass and set common Fields.
"""
class BaseFeature(BaseElement):
"""BaseFeature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
class BaseAppliance(BaseElement):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""

17
src/dabmodel/tools.py Normal file
View File

@@ -0,0 +1,17 @@
"""library's internal tools"""
from uuid import UUID
from datetime import datetime
import json
class DABJSONEncoder(json.JSONEncoder):
"""allows to JSON encode non supported data type"""
def default(self, o):
if isinstance(o, UUID):
# if the o is uuid, we simply return the value of uuid
return o.hex
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)

View File

@@ -1,37 +0,0 @@
from pydantic import BaseModel, SerializeAsAny
class commonbase(
BaseModel,
revalidate_instances="subclass-instances", # toogle to generate error
): ...
class basechild(commonbase):
test_val: int = 1
class derivedchild(basechild):
test_val2: int = 2
class container(commonbase):
ct_child_1: dict[str, basechild] = {}
ct_child_2: SerializeAsAny[dict[str, basechild]] = {}
ct_child_3: dict[str, SerializeAsAny[basechild]] = {}
if __name__ == "__main__":
test_val = container(
ct_child_1={"test1": derivedchild()},
ct_child_2={"test2": derivedchild()},
ct_child_3={"test3": derivedchild()},
)
print(test_val.model_dump_json(indent=1))
print(test_val.model_dump())
assert "test_val2" not in test_val.model_dump()["ct_child_1"]["test1"]
assert "test_val2" in test_val.model_dump()["ct_child_2"]["test2"]
assert "test_val2" in test_val.model_dump()["ct_child_3"]["test3"]

File diff suppressed because it is too large Load Diff