Compare commits

...

5 Commits

Author SHA1 Message Date
cclecle
637b50b325 quality & typing fixes 2025-09-06 01:31:55 +02:00
cclecle
f45c9cc8f3 fix unittest 2025-09-05 23:04:16 +02:00
cclecle
95b0c298ce update deps 2025-09-05 23:00:36 +02:00
cclecle
04a4cf7b36 add deps 2025-09-05 22:56:17 +02:00
cclecle
f42a839cff work 2025-09-05 22:53:47 +02:00
5 changed files with 734 additions and 141 deletions

View File

@@ -35,8 +35,8 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'pydantic',
'runtype'
'frozendict',
'typeguard'
]
dynamic = ["version"]

View File

@@ -25,4 +25,6 @@ from .model import (
InvalidFieldValue,
InvalidFieldAnnotation,
IncompletelyAnnotatedField,
ImportForbidden,
FunctionForbidden,
)

View File

@@ -1,9 +1,33 @@
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
from types import UnionType
from frozendict import deepfreeze
from copy import deepcopy, copy
from pprint import pprint
""" dabmodel model module
This module implements DAB model classes.
This module contains metaclass and bases classes used to create models.
BaseAppliance can be used to create a new Appliance Data.
BaseFeature can be used to create new Appliance's Features."""
from typing import (
Optional,
TypeVar,
Generic,
Union,
get_origin,
get_args,
List,
Dict,
Any,
Tuple,
Set,
Annotated,
FrozenSet,
Callable,
Type,
)
from types import UnionType, FunctionType, SimpleNamespace
from copy import deepcopy, copy
# from pprint import pprint
import math
from frozendict import deepfreeze
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
ALLOWED_ANNOTATIONS = {
@@ -34,43 +58,149 @@ ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
# TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
class DABModelException(Exception): ...
class DABModelException(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class MultipleInheritanceForbidden(DABModelException): ...
class MultipleInheritanceForbidden(DABModelException):
"""MultipleInheritanceForbidden Exception class
Multiple inheritance is forbidden when using dabmodel
"""
class BrokenInheritance(DABModelException): ...
class BrokenInheritance(DABModelException):
"""BrokenInheritance Exception class
inheritance chain is broken
"""
class ReadOnlyField(DABModelException): ...
class ReadOnlyField(DABModelException):
"""ReadOnlyField Exception class
The used Field is ReadOnly
"""
class NewFieldForbidden(DABModelException): ...
class NewFieldForbidden(DABModelException):
"""NewFieldForbidden Exception class
Field creation is forbidden
"""
class InvalidFieldAnnotation(DABModelException): ...
class InvalidFieldAnnotation(DABModelException):
"""InvalidFieldAnnotation Exception class
The field annotation is invalid
"""
class NotAnnotatedField(InvalidFieldAnnotation): ...
class InvalidInitializerType(DABModelException):
"""InvalidInitializerType Exception class
The initializer is not a valid type
"""
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
class NotAnnotatedField(InvalidFieldAnnotation):
"""NotAnnotatedField Exception class
The Field is not Annotated
"""
class ReadOnlyFieldAnnotation(DABModelException): ...
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
"""IncompletelyAnnotatedField Exception class
The field annotation is incomplete
"""
class InvalidFieldValue(DABModelException): ...
class ReadOnlyFieldAnnotation(DABModelException):
"""ReadOnlyFieldAnnotation Exception class
Field annotation connot be modified
"""
class InvalidFieldValue(DABModelException):
"""InvalidFieldValue Exception class
The Field value is invalid
"""
class NonExistingField(DABModelException):
"""NonExistingField Exception class
The given Field is non existing
"""
class ImportForbidden(DABModelException):
"""ImportForbidden Exception class
Imports are forbidden
"""
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class
function call are forbidden
"""
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
ceil=math.ceil,
trunc=math.trunc,
fabs=math.fabs,
copysign=math.copysign,
hypot=math.hypot,
exp=math.exp,
log=math.log,
log10=math.log10,
sin=math.sin,
cos=math.cos,
tan=math.tan,
atan2=math.atan2,
radians=math.radians,
degrees=math.degrees,
)
ALLOWED_HELPERS_DEFAULT = {
"math": ALLOWED_HELPERS_MATH,
"print": print,
# Numbers & reducers (pure, deterministic)
"abs": abs,
"round": round,
"min": min,
"max": max,
"sum": sum,
# Introspection-free basics
"len": len,
"sorted": sorted,
# Basic constructors (for copy-on-write patterns)
"tuple": tuple,
"list": list,
"dict": dict,
"set": set,
# Simple casts if they need to normalize types
"int": int,
"float": float,
"str": str,
"bool": bool,
"bytes": bytes,
"complex": complex,
# Easy iteration helpers (optional but handy)
"range": range,
}
def _blocked_import(*args, **kwargs):
raise ImportForbidden("imports disabled in __initializer")
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return ann
@@ -88,15 +218,15 @@ def _peel_annotated(t: Any) -> Any:
return t
def _check_annotation_definition(_type) -> bool:
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all([_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None)])
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
return all([_check_annotation_definition(_) for _ in get_args(_type)])
return all(_check_annotation_definition(_) for _ in get_args(_type))
# handle Dict[...]
if get_origin(_type) is dict:
@@ -112,127 +242,240 @@ def _check_annotation_definition(_type) -> bool:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all([_check_annotation_definition(_) for _ in inner_types])
return all(_check_annotation_definition(_) for _ in inner_types)
# handle Set[],Tuple[],FrozenSet[],List[]
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
return all([_check_annotation_definition(_) for _ in inner_types])
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
return True
return False
class BaseConstraint(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
T_Field = TypeVar("T_Field")
class BaseConstraint(Generic[T_Field]):
"""BaseConstraint class
Base class for Field's constraints
"""
_bound_type: type
def __init__(self): ...
def check(self, value: TV_ALLOWED_MODEL_FIELDS_TYPES) -> bool: ...
def check(self, value: T_Field) -> bool:
"""Check if a Constraint is completed"""
return True
def _deepfreeze(value):
"""recursive freeze helper function"""
if isinstance(value, dict):
return deepfreeze(value)
elif isinstance(value, set):
if isinstance(value, set):
return frozenset(_deepfreeze(v) for v in value)
elif isinstance(value, list):
if isinstance(value, list):
return tuple(_deepfreeze(v) for v in value)
elif isinstance(value, tuple):
if isinstance(value, tuple):
return tuple(_deepfreeze(v) for v in value)
return value
class DABFieldInfo:
def __init__(self, *, doc: str = "", constraints: list[BaseConstraint] = []):
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
self._doc: str = doc
self._constraints: list[BaseConstraint] = constraints
self._constraints: list[BaseConstraint]
if constraints is None:
self._constraints = []
else:
self._constraints = constraints
@property
def doc(self):
"""Returns Field's documentation"""
return self._doc
@property
def constraints(self):
def constraints(self) -> list[BaseConstraint[Any]]:
"""Returns Field's constraints"""
return self._constraints
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any, i: str):
class DABField(Generic[T_Field]):
"""This class describe a Field in Schema"""
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: DABFieldInfo):
self._name: str = name
self._source: Optional[type] = None
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
self._default_value: Optional[T_Field] = v
self._value: Optional[T_Field] = v
self._annotations: Any = a
self._info: DABFieldInfo = i
self._constraints: List[BaseConstraint] = i.constraints
self._constraints: List[BaseConstraint[Any]] = i.constraints
def add_source(self, s: type) -> None:
"""Adds source Appliance to the Field"""
self._source = s
@property
def doc(self):
"""Returns Field's documentation"""
return self._info.doc
def add_constraint(self, c: BaseConstraint) -> None:
"""Adds constraint to the Field"""
self._constraints.append(c)
@property
def constraints(self) -> list[BaseConstraint]:
"""Returns Field's constraint"""
return self._info.constraints
@property
def default_value(self):
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return _deepfreeze(self._default_value)
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
def update_value(self, v: Optional[T_Field] = None) -> None:
"""Updates Field's value"""
self._value = v
@property
def value(self):
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return _deepfreeze(self._value)
@property
def raw_value(self) -> Optional[T_Field]:
"""Returns Field's value"""
return self._value
@property
def annotations(self) -> Any:
"""Returns Field's annotation"""
return self._annotations
class FrozenDABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
class FrozenDABField(Generic[T_Field]):
"""FrozenDABField class
a read-only proxy of a Field
"""
def __init__(self, inner_field: DABField):
self._inner_field = inner_field
@property
def doc(self):
return self._inner_field.doc
def doc(self) -> str:
"""Returns Field's documentation (frozen)"""
return _deepfreeze(self._inner_field.doc)
@property
def constraints(self):
def constraints(self) -> tuple[BaseConstraint]:
"""Returns Field's constraint (frozen)"""
return _deepfreeze(self._inner_field.constraints)
@property
def default_value(self):
def default_value(self) -> Any:
"""Returns Field's default value (frozen)"""
return self._inner_field.default_value
@property
def value(self):
def value(self) -> Any:
"""Returns Field's value (frosen)"""
return self._inner_field.value
@property
def annotations(self) -> Any:
"""Returns Field's annotation (frozen)"""
return _deepfreeze(self._inner_field.annotations)
class ModelSpecView:
"""ModelSpecView class
A class that will act as fake BaseElement proxy to allow setting values"""
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(self, values: dict, types_map: dict[str, type], name, module):
object.__setattr__(self, "_vals", dict(values))
object.__setattr__(self, "_types", types_map)
object.__setattr__(self, "_touched", set())
object.__setattr__(self, "_name", name)
object.__setattr__(self, "_module", module)
@property
def __name__(self) -> str:
"""returns proxified class' name"""
return self._name
@property
def __module__(self) -> str:
"""returns proxified module's name"""
return self._module
@__module__.setter
def __module__(self, value):
pass
def __getattr__(self, name):
"""internal proxy getattr"""
if name not in self._types:
raise AttributeError(f"Unknown field {name}")
return self._vals[name]
def __setattr__(self, name, value):
"""internal proxy setattr"""
if name not in self._types:
raise NonExistingField(f"Cannot set unknown field {name}")
T = self._types[name]
try:
check_type(
value,
T,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
self._vals[name] = value
self._touched.add(name)
def export(self) -> dict:
"""exports all proxified values"""
return dict(self._vals)
T_Meta = TypeVar("T_Meta", bound="BaseMeta")
T_BE = TypeVar("T_BE", bound="BaseElement")
class BaseMeta(type):
def __new__(mcls, name, bases, namespace):
"""metaclass to use to build BaseElement"""
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__DABSchema__: dict[str, Any] = {}
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
) -> None:
"""early BaseElement checks"""
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
elif len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = dict()
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
@@ -245,80 +488,136 @@ class BaseMeta(type):
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
namespace[_funknown] = None
@classmethod
def pre_processing_modified(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
): # pylint: disable=unused-argument
"""preprocessing BaseElement modified Fields"""
# print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_field[_fname] = _fvalue
@classmethod
def pre_processing_new(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: DABFieldInfo = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
"""preprocessing BaseElement"""
# iterating new and modified fields
modified_field: Dict[str, Any] = {}
new_fields: Dict[str, DABField] = {}
mcs.modified_field = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname.startswith("__"):
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
mcs.initializer = _fvalue.__func__
if name.startswith("_"):
initializer_name = "__initializer"
else:
initializer_name = f"_{name}__initializer"
elif _fname.startswith("__"):
pass
elif _fname == "Constraints" and type(_fvalue) is type:
...
# print("FOUND Constraints")
else:
# print(f"Parsing Field: {_fname} / {_fvalue}")
# Modified fields
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
# print(f"Modified field: {_fname}")
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
try:
check_type(
_fvalue,
namespace["__DABSchema__"][_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
modified_field[_fname] = _fvalue
# New fieds
else:
# print(f"New field: {_fname}")
# print(f"type is: {type(_fvalue)}")
# print(f"value is: {_fvalue}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
_finfo: Optional[DABFieldInfo] = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
if "Annotated" in name:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation(f"Only DABFieldInfo object is allowed as Annotated data.")
_finfo = args[1]
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
# check if value is valid
try:
check_type(
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
mcs.pre_processing_modified(name, bases, namespace, _fname, _fvalue)
else: # New fieds
mcs.pre_processing_new(name, bases, namespace, _fname, _fvalue)
# removing modified fields from class (will add them back later)
for _fname in new_fields.keys():
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in modified_field.keys():
for _fname in mcs.modified_field:
del namespace[_fname]
if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
@classmethod
def call_initializer(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]
): # pylint: disable=unused-argument
"""BaseElement initializer processing"""
if mcs.initializer is not None:
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__DABSchema__.items():
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
mcs.initializer.__code__,
safe_globals,
name=mcs.initializer.__name__,
argdefs=mcs.initializer.__defaults__,
closure=None,
)
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
"""BaseElement new class"""
mcs.pre_check(name, bases, namespace)
mcs.pre_processing(name, bases, namespace)
orig_setattr = namespace.get("__setattr__", object.__setattr__)
@@ -330,32 +629,33 @@ class BaseMeta(type):
if hasattr(self, key):
raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden(f"creating new fields is not allowed")
raise NewFieldForbidden("creating new fields is not allowed")
return orig_setattr(self, key, value)
namespace["__setattr__"] = guarded_setattr
cls = super().__new__(mcls, name, bases, namespace)
_cls = super().__new__(mcs, name, bases, namespace)
for _fname, _fvalue in modified_field.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.modified_field.items():
_cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
_cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in new_fields.items():
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(mcs)
_cls.__DABSchema__[_fname] = _fvalue
return cls
mcs.call_initializer(_cls, name, bases, namespace)
def __call__(cls, *args, **kw):
return _cls
def __call__(cls, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args, **kw)
for _fname in cls.__DABSchema__.keys():
for _fname, _fvalue in cls.__DABSchema__.items():
setattr(obj, _fname, cls.__DABSchema__[_fname].value)
# obj.__DABSchema__ = deepfreeze(obj.__DABSchema__)
# setattr(obj, "__DABSchema__", deepfreeze(obj.__DABSchema__))
inst_schema = dict()
inst_schema: dict[str, Any] = {}
for _fname, _fvalue in cls.__DABSchema__.items():
inst_schema[_fname] = FrozenDABField(_fvalue)
setattr(obj, "__DABSchema__", inst_schema)
@@ -363,12 +663,20 @@ class BaseMeta(type):
class BaseElement(metaclass=BaseMeta):
pass
"""BaseElement class
Base class to apply metaclass and set common Fields.
"""
class BaseFeature(BaseElement):
pass
"""BaseFeature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
class BaseAppliance(BaseElement):
pass
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.
"""

View File

@@ -1,13 +1,17 @@
"""library's internal tools"""
from uuid import UUID
from datetime import datetime
import json
class DABJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
elif isinstance(obj, datetime):
return str(obj)
return json.JSONEncoder.default(self, obj)
"""allows to JSON encode non supported data type"""
def default(self, o):
if isinstance(o, UUID):
# if the o is uuid, we simply return the value of uuid
return o.hex
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)

View File

@@ -15,6 +15,7 @@ from pathlib import Path
import textwrap
from typing import List, Optional, Dict, Union, Tuple, Set, FrozenSet, TypeVar, Generic, Any, Annotated
from pprint import pprint
from frozendict import frozendict
print(__name__)
print(__package__)
@@ -26,6 +27,10 @@ testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
def test_initializer_safe_testfc():
eval("print('hi')")
class TestConfigWithoutEnabledFlag(unittest.TestCase):
def setUp(self):
print("\n->", unittest.TestCase.id(self))
@@ -214,7 +219,7 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
self.immutable_vars__test_field(app2, "StrVar5", "default value", "123")
self.immutable_vars__test_field(app2, "StrVar6", None, "123")
@unittest.skip
# @unittest.skip
def test_containers__set(self):
"""Testing first appliance level, and Field types (Set)"""
@@ -293,7 +298,7 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
res = subprocess.run([sys.executable, "-c", code], env=env)
self.assertEqual(res.returncode, 2)
@unittest.skip
# @unittest.skip
def test_containers__frozenset(self):
"""Testing first appliance level, and Field types (FrozenSet)"""
@@ -896,6 +901,280 @@ class TestConfigWithoutEnabledFlag(unittest.TestCase):
class _(Appliance3):
StrVar: int = 12
def test_containers_field_inheritance(self):
"""Testing first appliance level, and Field types (annotated)"""
# class can be created
class Appliance1(dm.BaseAppliance):
ListStr: list[str] = ["val1", "val2"]
Dict1: dict[int, float] = {1: 1.1, 4: 7.6, 91: 23.6}
Tuple1: "tuple[str,...]" = ("a", "c")
FrozenSet1: frozenset[int] = frozenset({1, 2})
Set1: set[int] = set({1, 2})
# class can be created
class Appliance2(Appliance1):
ListStr = ["mod val1", "mod val2"]
Dict1 = {4: 1.1, 9: 7.6, 51: 23.6}
Tuple1 = ("aa", "cc")
FrozenSet1 = frozenset({14, 27})
Set1 = set({1, 20})
app1 = Appliance1()
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
app2 = Appliance2()
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
# class can be created
class Appliance3(Appliance2):
ListStr2: list[str] = ["mod val3", "mod val3"]
Dict2: dict[int, float] = {9: 8.1, 5: 98.6, 551: 3.6}
Tuple2: "tuple[str,...]" = ("aaa", "ccc")
FrozenSet2: frozenset[int] = frozenset({114, 127})
Set2: set[int] = set({10, 250})
app3 = Appliance3()
self.immutable_vars__test_field(app3, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app3, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app3, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app3, "Set1", frozenset({1, 20}), set({4, 0}))
self.immutable_vars__test_field(app3, "ListStr2", ("mod val3", "mod val3"), ["mod val3", "mod val3"])
self.immutable_vars__test_field(app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6})
self.immutable_vars__test_field(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"))
self.immutable_vars__test_field(app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127}))
self.immutable_vars__test_field(app3, "Set2", frozenset({10, 250}), set({10, 250}))
self.check_immutable_fields_schema(app3, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app3, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app3, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app3, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app3, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.check_immutable_fields_schema(app3, "ListStr2", ("mod val3", "mod val3"), ("mod val3", "mod val3"), list[str])
self.check_immutable_fields_schema(app3, "Dict2", {9: 8.1, 5: 98.6, 551: 3.6}, {9: 8.1, 5: 98.6, 551: 3.6}, dict[int, float])
self.check_immutable_fields_schema(app3, "Tuple2", ("aaa", "ccc"), ("aaa", "ccc"), tuple[str, ...])
self.check_immutable_fields_schema(app3, "FrozenSet2", frozenset({114, 127}), frozenset({114, 127}), frozenset[int])
self.check_immutable_fields_schema(app3, "Set2", frozenset({10, 250}), frozenset({10, 250}), set[int])
self.immutable_vars__test_field(app2, "ListStr", ("mod val1", "mod val2"), ["val2", "val3"])
self.immutable_vars__test_field(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app2, "Tuple1", ("aa", "cc"), ("h", "r"))
self.immutable_vars__test_field(app2, "FrozenSet1", frozenset({14, 27}), frozenset({4, 0}))
self.immutable_vars__test_field(app2, "Set1", frozenset({1, 20}), set({4, 0}))
self.check_immutable_fields_schema(app2, "ListStr", ("mod val1", "mod val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app2, "Dict1", {4: 1.1, 9: 7.6, 51: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app2, "Tuple1", ("aa", "cc"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app2, "FrozenSet1", frozenset({14, 27}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app2, "Set1", frozenset({1, 20}), frozenset({1, 2}), set[int])
self.immutable_vars__test_field(app1, "ListStr", ("val1", "val2"), ["val2", "val3"])
self.immutable_vars__test_field(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6})
self.immutable_vars__test_field(app1, "Tuple1", ("a", "c"), ("h", "r"))
self.immutable_vars__test_field(app1, "FrozenSet1", frozenset({1, 2}), frozenset({4, 0}))
self.immutable_vars__test_field(app1, "Set1", frozenset({1, 2}), set({4, 0}))
self.check_immutable_fields_schema(app1, "ListStr", ("val1", "val2"), ("val1", "val2"), list[str])
self.check_immutable_fields_schema(app1, "Dict1", {1: 1.1, 4: 7.6, 91: 23.6}, {1: 1.1, 4: 7.6, 91: 23.6}, dict[int, float])
self.check_immutable_fields_schema(app1, "Tuple1", ("a", "c"), ("a", "c"), tuple[str, ...])
self.check_immutable_fields_schema(app1, "FrozenSet1", frozenset({1, 2}), frozenset({1, 2}), frozenset[int])
self.check_immutable_fields_schema(app1, "Set1", frozenset({1, 2}), frozenset({1, 2}), set[int])
def test_initializer(self):
"""Testing first appliance level, and Field types (simple)"""
# class can be created
class Appliance1(dm.BaseAppliance):
VarInt: int = 12
VarInt2: int = 21
list1: list[int] = [1]
set1: set[float] = {1.43}
set2: set[float] = {5.43}
VarStr1: str = "abcd"
@classmethod
def __initializer(cls):
cls.VarInt2 = cls.VarInt + 1
cls.list1.append(2)
cls.set2 = cls.set1 | {1234}
cls.set1 = {0}
cls.VarStr1 = cls.VarStr1.upper()
app1 = Appliance1()
self.assertEqual(app1.VarInt, 12)
self.assertEqual(app1.VarInt2, 13)
self.assertEqual(app1.set1, frozenset({0}))
self.assertEqual(app1.set2, frozenset((1.43, 1234)))
self.assertEqual(app1.VarStr1, "ABCD")
# class can be created
class _(dm.BaseAppliance):
VarInt: int = 41
@classmethod
def __initializer(cls):
cls.VarInt = cls.VarInt + 1
app2 = _()
self.assertEqual(app2.VarInt, 42)
def test_initializer_safe(self):
with self.assertRaises(dm.ImportForbidden):
class test(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
import pprint
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
eval("2 ** 8")
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
open("foo")
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
exec("exit()")
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
compile("print(55)", "test", "eval")
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
compile("print(55)", "test", "eval")
with self.assertRaises(dm.FunctionForbidden):
def testfc():
eval("print('test')")
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
testfc()
# class can be created
class Appliance2(dm.BaseAppliance):
VarInt: int = -56
VarInt2: int = 0
VarInt3: int = 0
VarInt4: int = 0
VarInt5: int = 0
VarFloat: float = 1.23
list1: list[int] = [1, 2, 0, 4]
list2: Optional[list[int]] = None
list3: Optional[list[int]] = None
set1: set[float] = {1.43}
set2: set[float] = {5.43}
@classmethod
def __initializer(cls):
cls.VarInt2 = abs(cls.VarInt)
cls.VarFloat = round(cls.VarFloat)
cls.VarInt = min(cls.list1)
cls.VarInt3 = max(cls.list1)
cls.VarInt4 = sum(cls.list1)
cls.VarInt5 = len(cls.list1)
cls.list2 = sorted(cls.list1)
cls.list3 = []
for i in range(5):
cls.list3.append(i)
cls.set2 = {math.ceil(list(cls.set1)[0])}
app2 = Appliance2()
self.assertEqual(app2.VarInt2, 56)
self.assertEqual(app2.VarFloat, 1)
self.assertEqual(app2.VarInt, 0)
self.assertEqual(app2.VarInt3, 4)
self.assertEqual(app2.VarInt4, 7)
self.assertEqual(app2.VarInt5, 4)
self.assertEqual(app2.list2, (0, 1, 2, 4))
self.assertEqual(app2.list3, (0, 1, 2, 3, 4))
self.assertEqual(app2.set2, {2})
with self.assertRaises(NameError):
class _(dm.BaseAppliance):
_: int = 0
@classmethod
def __initializer(cls):
test_initializer_safe_testfc()
# ---------- main ----------