Compare commits

...

10 Commits

Author SHA1 Message Date
chacha
981c5201a9 partially fix features 2025-09-16 23:40:41 +02:00
chacha
ab11052c8f work 2025-09-09 00:13:06 +02:00
chacha
4f5dade949 first feature implementation 2025-09-08 01:23:46 +02:00
cclecle
cce260bc5e reordering 2025-09-07 18:42:38 +02:00
cclecle
915a4332ee tiny fix :) 2025-09-06 01:47:49 +02:00
cclecle
4dca3eb9d1 improve typing 2025-09-06 01:43:20 +02:00
cclecle
e11c541139 small opt 2025-09-06 01:35:28 +02:00
cclecle
637b50b325 quality & typing fixes 2025-09-06 01:31:55 +02:00
cclecle
f45c9cc8f3 fix unittest 2025-09-05 23:04:16 +02:00
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
5 changed files with 633 additions and 202 deletions

View File

@@ -7,7 +7,7 @@
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
[build-system]
requires = ["setuptools>=63", "wheel", "setuptools_scm","frozendict","typeguard"]
requires = ["setuptools>=63", "wheel", "setuptools_scm"]
build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
@@ -35,8 +35,8 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'pydantic',
'runtype'
'frozendict',
'typeguard'
]
dynamic = ["version"]

View File

@@ -13,6 +13,7 @@ Main module __init__ file.
from .__metadata__ import __version__, __Summuary__, __Name__
from .model import (
DABFieldInfo,
DABField,
BaseAppliance,
BaseFeature,
DABModelException,
@@ -27,4 +28,6 @@ from .model import (
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
FrozenDABField,
InvalidFeatureInheritance,
)

View File

@@ -1,11 +1,35 @@
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
""" dabmodel model module
This module implements DAB model classes.
This module contains metaclass and bases classes used to create models.
BaseAppliance can be used to create a new Appliance Data.
BaseFeature can be used to create new Appliance's Features."""
from typing import (
Optional,
TypeVar,
Generic,
Union,
get_origin,
get_args,
List,
Dict,
Any,
Tuple,
Set,
Annotated,
FrozenSet,
Callable,
Type,
)
from types import UnionType, FunctionType, SimpleNamespace
from frozendict import deepfreeze
from copy import deepcopy, copy
from pprint import pprint
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
# from pprint import pprint
import math
from frozendict import deepfreeze
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
ALLOWED_ANNOTATIONS = {
"Union": Union,
"Optional": Optional,
@@ -31,52 +55,101 @@ ALLOWED_ANNOTATIONS = {
}
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
# TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
class DABModelException(Exception): ...
class DABModelException(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class ReservedFieldName(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class MultipleInheritanceForbidden(DABModelException):
"""MultipleInheritanceForbidden Exception class
Multiple inheritance is forbidden when using dabmodel
"""
class MultipleInheritanceForbidden(DABModelException): ...
class BrokenInheritance(DABModelException):
"""BrokenInheritance Exception class
inheritance chain is broken
"""
class BrokenInheritance(DABModelException): ...
class ReadOnlyField(DABModelException):
"""ReadOnlyField Exception class
The used Field is ReadOnly
"""
class ReadOnlyField(DABModelException): ...
class NewFieldForbidden(DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class NewFieldForbidden(DABModelException): ...
class InvalidFieldAnnotation(DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
"""
class InvalidFieldAnnotation(DABModelException): ...
class InvalidInitializerType(DABModelException):
"""InvalidInitializerType Exception class
The initializer is not a valid type
"""
class InvalidInitializerType(DABModelException): ...
class NotAnnotatedField(InvalidFieldAnnotation):
"""NotAnnotatedField Exception class
The Field is not Annotated
"""
class NotAnnotatedField(InvalidFieldAnnotation): ...
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
"""IncompletelyAnnotatedField Exception class
The field annotation is incomplete
"""
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
class ReadOnlyFieldAnnotation(DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class ReadOnlyFieldAnnotation(DABModelException): ...
class InvalidFieldValue(DABModelException):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class InvalidFieldValue(DABModelException): ...
class NonExistingField(DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class NonExistingField(DABModelException): ...
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
"""
class InvalidFeatureInheritance(DABModelException):
"""InvalidFeatureInheritance Exception class
Features of same name in child appliance need to be from same type
"""
class ImportForbidden(DABModelException): ...
class FunctionForbidden(DABModelException): ...
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class
function call are forbidden
"""
ALLOWED_HELPERS_MATH = SimpleNamespace(
@@ -134,7 +207,7 @@ def _blocked_import(*args, **kwargs):
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
@@ -152,15 +225,15 @@ def _peel_annotated(t: Any) -> Any:
return t
def _check_annotation_definition(_type) -> bool:
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all([_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None)])
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all([_check_annotation_definition(_) for _ in get_args(_type)])
return all(_check_annotation_definition(_) for _ in get_args(_type))
# handle Dict[...]
if get_origin(_type) is dict:
@@ -176,147 +249,200 @@ def _check_annotation_definition(_type) -> bool:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all([_check_annotation_definition(_) for _ in inner_types])
return all(_check_annotation_definition(_) for _ in inner_types)
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all([_check_annotation_definition(_) for _ in inner_types])
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class BaseConstraint(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
T_Field = TypeVar("T_Field")
class BaseConstraint(Generic[T_Field]):
"""BaseConstraint class
Base class for Field's constraints
"""
_bound_type: type
def __init__(self): ...
def check(self, value: TV_ALLOWED_MODEL_FIELDS_TYPES) -> bool: ...
def check(self, value: T_Field) -> bool:
"""Check if a Constraint is completed"""
return True
def _deepfreeze(value):
"""recursive freeze helper function"""
if isinstance(value, dict):
return deepfreeze(value)
elif isinstance(value, set):
if isinstance(value, set):
return frozenset(_deepfreeze(v) for v in value)
elif isinstance(value, list):
if isinstance(value, list):
return tuple(_deepfreeze(v) for v in value)
elif isinstance(value, tuple):
if isinstance(value, tuple):
return tuple(_deepfreeze(v) for v in value)
return value
class DABFieldInfo:
def __init__(self, *, doc: str = "", constraints: list[BaseConstraint] = []):
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
self._doc: str = doc
self._constraints: list[BaseConstraint] = constraints
self._constraints: list[BaseConstraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self):
def doc(self) -> str:
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self):
def constraints(self) -> list[BaseConstraint[Any]]:
"""Returns Field's constraints"""
return self._constraints
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any, i: str):
class DABField(Generic[T_Field]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: DABFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._default_value: Optional[T_Field] = v
self._value: Optional[T_Field] = v
self._annotations: Any = a
self._info: DABFieldInfo = i
self._constraints: List[BaseConstraint] = i.constraints
self._constraints: List[BaseConstraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self):
def doc(self) -> str:
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self):
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return _deepfreeze(self._default_value)
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
def update_value(self, v: Optional[T_Field] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self):
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return _deepfreeze(self._value)
@property
def raw_value(self):
def raw_value(self) -> Optional[T_Field]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations
class FrozenDABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
class FrozenDABField(Generic[T_Field]):
"""FrozenDABField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: DABField):
self._inner_field = inner_field
@property
def doc(self):
return self._inner_field.doc
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return _deepfreeze(self._inner_field.doc)
@property
def constraints(self):
def constraints(self) -> tuple[BaseConstraint]:
"""Returns Field's constraint (frozen)"""
return _deepfreeze(self._inner_field.constraints)
@property
def default_value(self):
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self):
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return _deepfreeze(self._inner_field.annotations)
class ModelSpecView:
"""ModelSpecView class
A class that will act as fake BaseElement proxy to allow setting values"""
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(self, values: dict, types_map: dict[str, type], name, module):
def __init__(self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str):
self._name: str
self._vals: dict[str, Any]
self._types: dict[str, type]
self._touched: set
self._module: str
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_touched", set())
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
def __getattr__(self, name):
@__module__.setter
def __module__(self, value: str):
pass
def __getattr__(self, name: str) -> Any:
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name, value):
def __setattr__(self, name: str, value: Any):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
@@ -331,23 +457,35 @@ class ModelSpecView:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
self._touched.add(name)
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
def diff(self) -> dict:
return {k: self._vals[k] for k in self._touched}
T_Meta = TypeVar("T_Meta", bound="BaseMeta")
T_BE = TypeVar("T_BE", bound="BaseElement")
class BaseMeta(type):
def __new__(mcls, name, bases, namespace):
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
"""metaclass to use to build BaseElement"""
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__DABSchema__: dict[str, Any] = {}
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
) -> None:
"""early BaseElement checks"""
print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
elif len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = dict()
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
@@ -360,16 +498,19 @@ class BaseMeta(type):
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
"""preprocessing BaseElement"""
# iterating new and modified fields
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField] = {}
initializer: Optional[type] = None
mcs.modified_field = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
initializer = _fvalue.__func__
mcs.initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
@@ -378,117 +519,102 @@ class BaseMeta(type):
pass
else:
# print(f"Parsing Field: {_fname} / {_fvalue}")
# Modified fields
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
# print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
modified_field[_fname] = _fvalue
# New fieds
else:
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: Optional[DABFieldInfo] = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation(f"Only DABFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
mcs.pre_processing_modified_fields(name, bases, namespace, _fname, _fvalue)
else: # New fieds
mcs.pre_processing_new_fields(name, bases, namespace, _fname, _fvalue)
# removing modified fields from class (will add them back later)
for _fname in new_fields.keys():
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in modified_field.keys():
for _fname in mcs.modified_field:
del namespace[_fname]
if initializer is not None:
if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
orig_setattr = namespace.get("__setattr__", object.__setattr__)
@classmethod
def pre_processing_modified_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
): # pylint: disable=unused-argument
"""preprocessing BaseElement modified Fields"""
print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation(f"annotations cannot be modified on derived classes {_fname}")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_field[_fname] = _fvalue
def guarded_setattr(self, key, value):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(self, key, value)
# block writes after init if key is readonly
if key in self.__DABSchema__.keys():
if hasattr(self, key):
raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden(f"creating new fields is not allowed")
@classmethod
def pre_processing_new_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
return orig_setattr(self, key, value)
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
namespace["__setattr__"] = guarded_setattr
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
cls = super().__new__(mcls, name, bases, namespace)
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
for _fname, _fvalue in modified_field.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
_finfo: DABFieldInfo = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
for _fname, _fvalue in new_fields.items():
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
_finfo = args[1]
# check if value is valid
try:
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
if initializer is not None:
init_fieldvalues = dict()
init_fieldtypes = dict()
@classmethod
def call_initializer(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
):
"""BaseElement initializer processing"""
if mcs.initializer is not None:
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__DABSchema__.items():
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
if isinstance(_fvalue,DABField):
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
if initializer.__code__.co_freevars:
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
initializer.__code__,
mcs.initializer.__code__,
safe_globals,
name=initializer.__name__,
argdefs=initializer.__defaults__,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls)
print(fakecls.diff())
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
@@ -497,27 +623,150 @@ class BaseMeta(type):
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
return cls
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
"""BaseElement new class"""
mcs.pre_check(name, bases, namespace)
mcs.pre_processing(name, bases, namespace)
_cls = super().__new__(mcs, name, bases, namespace)
def __call__(cls, *args, **kw):
mcs.save_values(_cls, name, bases, namespace)
mcs.call_initializer(_cls, name, bases, namespace)
_cls.install_guard()
return _cls
@classmethod
def save_values(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
):
for _fname, _fvalue in mcs.modified_field.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(mcs)
cls.__DABSchema__[_fname] = _fvalue
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args, **kw)
for _fname in cls.__DABSchema__.keys():
setattr(obj, _fname, cls.__DABSchema__[_fname].value)
inst_schema = dict()
for _fname, _fvalue in cls.__DABSchema__.items():
inst_schema[_fname] = FrozenDABField(_fvalue)
setattr(obj, "__DABSchema__", inst_schema)
return obj
if isinstance(_fvalue,DABField):
object.__setattr__(obj, _fname, _fvalue.value)
inst_schema = copy(obj.__DABSchema__)
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue,DABField):
inst_schema[_fname] = FrozenDABField(_fvalue)
object.__setattr__(obj, "__DABSchema__", inst_schema)
cls.modify_object(obj)
return obj
def modify_object(cls:Type,obj):
pass
def install_guard(cls:Type):
orig_setattr = getattr(cls,"__setattr__")
# cls.orig_setattr = orig_setattr
def guarded_setattr(_self, key: str, value: Any):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(_self, key, value)
# block writes after init if key is readonly
if key in _self.__DABSchema__.keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
elif key in _self.__DABSchema__["features"].keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden("creating new fields is not allowed")
return orig_setattr(_self, key, value)
setattr(cls, "__setattr__", guarded_setattr)
class BaseElement(metaclass=BaseMeta):
"""BaseElement class
Base class to apply metaclass and set common Fields.
"""
class BaseMetaFeature(BaseMeta):
pass
class BaseFeature(BaseElement,metaclass=BaseMetaFeature):
"""BaseFeature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
Enabled:bool=False
class BaseFeature(BaseElement):
pass
class BaseMetaAppliance(BaseMeta):
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
) -> None:
"""early BaseElement checks"""
print("__NEW__ Defining:", name, "with keys:", list(namespace))
super().pre_check(name,bases,namespace)
if "features" not in namespace["__DABSchema__"]:
namespace["__DABSchema__"]["features"]={}
else:
namespace["__DABSchema__"]["features"] = copy(namespace["__DABSchema__"]["features"])
return
class BaseAppliance(BaseElement):
pass
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
mcs.new_features: dict[str,type[BaseFeature]] = {}
mcs.modified_features: dict[str,type[BaseFeature]] = {}
super().pre_processing(name,bases,namespace)
@classmethod
def pre_processing_new_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
print('pre_processing_new_fields')
if _fname in namespace["__DABSchema__"]["features"].keys():
print("existing Feature !")
if not issubclass(_fvalue,namespace["__DABSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(f"Feature {_fname} is not an instance of {bases[0]}.{_fname}")
mcs.modified_features[_fname]=_fvalue
elif isinstance(_fvalue,BaseMetaFeature):
print("find Feature")
mcs.new_features[_fname]=_fvalue
else:
super().pre_processing_new_fields(name,bases,namespace,_fname,_fvalue)
@classmethod
def save_values(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
):
super().save_values(cls,name,bases,namespace)
print(dir(mcs))
for _ftname,_ftvalue in mcs.modified_features.items():
cls.__DABSchema__["features"][_ftname] = _ftvalue
for _ftname,_ftvalue in mcs.new_features.items():
cls.__DABSchema__["features"][_ftname] = _ftvalue
def modify_object(cls:Type, obj): # intentionally untyped
for _ftname,_ftvalue in cls.__DABSchema__["features"].items():
instft = _ftvalue()
object.__setattr__(obj, _ftname,instft )
class BaseAppliance(BaseElement,metaclass=BaseMetaAppliance):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""

View File

@@ -1,13 +1,17 @@
"""library's internal tools"""
from uuid import UUID
from datetime import datetime
import json
class DABJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
elif isinstance(obj, datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
"""allows to JSON encode non supported data type"""
def default(self, o):
if isinstance(o, UUID):
# if the o is uuid, we simply return the value of uuid
return o.hex
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)

View File

@@ -15,6 +15,7 @@ from pathlib import Path
import textwrap
from typing import List, Optional, Dict, Union, Tuple, Set, FrozenSet, TypeVar, Generic, Any, Annotated
from pprint import pprint
from frozendict import frozendict
print(__name__)
print(__package__)
@@ -30,7 +31,7 @@ def test_initializer_safe_testfc():
eval("print('hi')")
class TestConfigWithoutEnabledFlag(unittest.TestCase):
class MainTests(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
@@ -1040,11 +1041,11 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
app1 = Appliance1()
self.assertEquals(app1.VarInt, 12)
self.assertEquals(app1.VarInt2, 13)
self.assertEquals(app1.set1, frozenset({0}))
self.assertEquals(app1.set2, frozenset((1.43, 1234)))
self.assertEquals(app1.VarStr1, "ABCD")
self.assertEqual(app1.VarInt, 12)
self.assertEqual(app1.VarInt2, 13)
self.assertEqual(app1.set1, frozenset({0}))
self.assertEqual(app1.set2, frozenset((1.43, 1234)))
self.assertEqual(app1.VarStr1, "ABCD")
# class can be created
class _(dm.BaseAppliance):
@@ -1056,7 +1057,7 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
cls.VarInt = cls.VarInt + 1
app2 = _()
self.assertEquals(app2.VarInt, 42)
self.assertEqual(app2.VarInt, 42)
def test_initializer_safe(self):
@@ -1155,24 +1156,198 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
cls.set2 = {math.ceil(list(cls.set1)[0])}
app2 = Appliance2()
self.assertEquals(app2.VarInt2, 56)
self.assertEquals(app2.VarFloat, 1)
self.assertEquals(app2.VarInt, 0)
self.assertEquals(app2.VarInt3, 4)
self.assertEquals(app2.VarInt4, 7)
self.assertEquals(app2.VarInt5, 4)
self.assertEquals(app2.list2, (0, 1, 2, 4))
self.assertEquals(app2.list3, (0, 1, 2, 3, 4))
self.assertEquals(app2.set2, {2})
self.assertEqual(app2.VarInt2, 56)
self.assertEqual(app2.VarFloat, 1)
self.assertEqual(app2.VarInt, 0)
self.assertEqual(app2.VarInt3, 4)
self.assertEqual(app2.VarInt4, 7)
self.assertEqual(app2.VarInt5, 4)
self.assertEqual(app2.list2, (0, 1, 2, 4))
self.assertEqual(app2.list3, (0, 1, 2, 3, 4))
self.assertEqual(app2.set2, {2})
class _(dm.BaseAppliance):
_: int = 0
with self.assertRaises(NameError):
@classmethod
def __initializer(cls):
test_initializer_safe_testfc()
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
test_initializer_safe_testfc()
def test_feature(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.BaseAppliance):
VarStrOuter: str = "testvalue APPLIANCE"
class Feature1(dm.BaseFeature):
VarStrInner: str = "testvalue FEATURE"
app1 = Appliance1()
self.assertIsInstance(Appliance1.__DABSchema__["VarStrOuter"],dm.DABField)
self.assertIsInstance(app1.__DABSchema__["VarStrOuter"],dm.FrozenDABField)
self.assertIn("Feature1",app1.__DABSchema__["features"])
self.assertIn("VarStrInner",app1.__DABSchema__["features"]["Feature1"].__DABSchema__)
self.assertIsInstance(app1.__DABSchema__["features"]["Feature1"].__DABSchema__["VarStrInner"],dm.DABField)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(app1.Feature1.__DABSchema__["VarStrInner"],dm.FrozenDABField)
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
def test_feature_inheritance(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.BaseAppliance):
VarStrOuter: str = "testvalue APPLIANCE1"
class Feature1(dm.BaseFeature):
VarStrInner: str = "testvalue FEATURE1"
VarInt:int=42
print(dir(Appliance1))
class Appliance2(Appliance1):
VarStrOuter = "testvalue APPLIANCE2"
class Feature2(dm.BaseFeature):
VarStrInner: str = "testvalue FEATURE2"
print(dir(Appliance2))
class Appliance3(Appliance2):
VarStrOuter = "testvalue APPLIANCE3"
class Feature1(Appliance1.Feature1):
VarStrInner = "testvalue FEATURE1 modded"
class Feature3(dm.BaseFeature):
VarStrInner: str = "testvalue FEATURE3"
print(dir(Appliance3))
app1 = Appliance1()
app2 = Appliance2()
app3 = Appliance3()
self.assertIsInstance(Appliance1.__DABSchema__["VarStrOuter"],dm.DABField)
self.assertIsInstance(app1.__DABSchema__["VarStrOuter"],dm.FrozenDABField)
self.assertIn("Feature1",app1.__DABSchema__["features"])
self.assertIn("VarStrInner",app1.__DABSchema__["features"]["Feature1"].__DABSchema__)
self.assertIsInstance(app1.__DABSchema__["features"]["Feature1"].__DABSchema__["VarStrInner"],dm.DABField)
self.assertTrue(hasattr(app1, "Feature1"))
self.assertIsInstance(app1.Feature1.__DABSchema__["VarStrInner"],dm.FrozenDABField)
self.assertTrue(hasattr(app1.Feature1, "VarStrInner"))
self.assertEqual(app1.VarStrOuter,"testvalue APPLIANCE1")
self.assertEqual(app1.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app1.Feature1.VarInt,42)
self.assertEqual(app2.VarStrOuter,"testvalue APPLIANCE2")
self.assertEqual(app2.Feature2.VarStrInner,"testvalue FEATURE2")
self.assertEqual(app3.VarStrOuter,"testvalue APPLIANCE3")
self.assertEqual(app3.Feature1.VarStrInner,"testvalue FEATURE1 modded")
self.assertEqual(app3.Feature1.VarInt,42)
self.assertEqual(app3.Feature3.VarStrInner,"testvalue FEATURE3")
def test_feature_inheritance2(self):
"""Testing first appliance feature, and Field types (simple)"""
# class can be created
class Appliance1(dm.BaseAppliance):
class Feature1(dm.BaseFeature):
VarStrInner: str = "testvalue FEATURE1"
# check cannot REdefine a feature from BaseFeature
with self.assertRaises(dm.InvalidFeatureInheritance):
class Appliance2(Appliance1):
class Feature1(dm.BaseFeature):
...
class Appliance2b(Appliance1):
class Feature1(Appliance1.Feature1):
...
# check only REdefine a feature from direct parent
with self.assertRaises(dm.InvalidFeatureInheritance):
class Appliance3(Appliance2b):
class Feature1(Appliance1.Feature1):
...
class Appliance3b(Appliance2b):
class Feature1(Appliance2b.Feature1):
...
app1 = Appliance1()
app2 = Appliance2b()
print("youhou1")
print(Appliance3b)
print(Appliance3b.Feature1)
print("=====")
app3 = Appliance3b()
print("youhou2")
print(Appliance3b)
print(Appliance3b.Feature1)
print("=====")
#self.assertEqual(app1.Feature1.VarStrInner,"testvalue FEATURE1")
#self.assertEqual(app2.Feature1.VarStrInner,"testvalue FEATURE1")
#self.assertEqual(app3.Feature1.VarStrInner,"testvalue FEATURE1")
class Appliance4(Appliance3b):
class Feature1(Appliance3b.Feature1):
VarStrInner = "testvalue FEATURE4"
self.assertEqual(app1.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app2.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app3.Feature1.VarStrInner,"testvalue FEATURE1")
app4 = Appliance4()
self.assertEqual(app1.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app2.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app3.Feature1.VarStrInner,"testvalue FEATURE1")
self.assertEqual(app4.Feature1.VarStrInner,"testvalue FEATURE4")
def test_inheritance_chain(self):
# class can be created
class Appliance1(dm.BaseAppliance):
VarStr: str = "testvalue1"
class Appliance2(Appliance1):
pass
class Appliance3(Appliance2):
pass
app1 = Appliance1()
app2 = Appliance2()
app3 = Appliance3()
self.assertEqual(app1.VarStr,"testvalue1")
self.assertEqual(app2.VarStr,"testvalue1")
self.assertEqual(app3.VarStr,"testvalue1")
class Appliance4(Appliance3):
VarStr = "testvalue moded"
app4 = Appliance4()
self.assertEqual(app1.VarStr,"testvalue1")
self.assertEqual(app2.VarStr,"testvalue1")
self.assertEqual(app3.VarStr,"testvalue1")
self.assertEqual(app4.VarStr,"testvalue moded")
app1b = Appliance1()
app2b = Appliance2()
app3b = Appliance3()
self.assertEqual(app1b.VarStr,"testvalue1")
self.assertEqual(app2b.VarStr,"testvalue1")
self.assertEqual(app3b.VarStr,"testvalue1")
# ---------- main ----------
if __name__ == "__main__":