|
|
|
|
@@ -1,11 +1,35 @@
|
|
|
|
|
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
|
|
|
|
|
""" dabmodel model module
|
|
|
|
|
This module implements DAB model classes.
|
|
|
|
|
This module contains metaclass and bases classes used to create models.
|
|
|
|
|
BaseAppliance can be used to create a new Appliance Data.
|
|
|
|
|
BaseFeature can be used to create new Appliance's Features."""
|
|
|
|
|
|
|
|
|
|
from typing import (
|
|
|
|
|
Optional,
|
|
|
|
|
TypeVar,
|
|
|
|
|
Generic,
|
|
|
|
|
Union,
|
|
|
|
|
get_origin,
|
|
|
|
|
get_args,
|
|
|
|
|
List,
|
|
|
|
|
Dict,
|
|
|
|
|
Any,
|
|
|
|
|
Tuple,
|
|
|
|
|
Set,
|
|
|
|
|
Annotated,
|
|
|
|
|
FrozenSet,
|
|
|
|
|
Callable,
|
|
|
|
|
Type,
|
|
|
|
|
)
|
|
|
|
|
from types import UnionType, FunctionType, SimpleNamespace
|
|
|
|
|
from frozendict import deepfreeze
|
|
|
|
|
from copy import deepcopy, copy
|
|
|
|
|
from pprint import pprint
|
|
|
|
|
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
|
|
|
|
|
|
|
|
|
# from pprint import pprint
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
from frozendict import deepfreeze
|
|
|
|
|
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
|
|
|
|
|
|
|
|
|
ALLOWED_ANNOTATIONS = {
|
|
|
|
|
"Union": Union,
|
|
|
|
|
"Optional": Optional,
|
|
|
|
|
@@ -34,49 +58,91 @@ ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
|
|
|
|
|
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
|
|
|
|
|
# TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DABModelException(Exception): ...
|
|
|
|
|
class DABModelException(Exception):
|
|
|
|
|
"""DABModelException Exception class
|
|
|
|
|
Base Exception for DABModelException class
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MultipleInheritanceForbidden(DABModelException): ...
|
|
|
|
|
class MultipleInheritanceForbidden(DABModelException):
|
|
|
|
|
"""MultipleInheritanceForbidden Exception class
|
|
|
|
|
Multiple inheritance is forbidden when using dabmodel
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BrokenInheritance(DABModelException): ...
|
|
|
|
|
class BrokenInheritance(DABModelException):
|
|
|
|
|
"""BrokenInheritance Exception class
|
|
|
|
|
inheritance chain is broken
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReadOnlyField(DABModelException): ...
|
|
|
|
|
class ReadOnlyField(DABModelException):
|
|
|
|
|
"""ReadOnlyField Exception class
|
|
|
|
|
The used Field is ReadOnly
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NewFieldForbidden(DABModelException): ...
|
|
|
|
|
class NewFieldForbidden(DABModelException):
|
|
|
|
|
"""NewFieldForbidden Exception class
|
|
|
|
|
Field creation is forbidden
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidFieldAnnotation(DABModelException): ...
|
|
|
|
|
class InvalidFieldAnnotation(DABModelException):
|
|
|
|
|
"""InvalidFieldAnnotation Exception class
|
|
|
|
|
The field annotation is invalid
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidInitializerType(DABModelException): ...
|
|
|
|
|
class InvalidInitializerType(DABModelException):
|
|
|
|
|
"""InvalidInitializerType Exception class
|
|
|
|
|
The initializer is not a valid type
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NotAnnotatedField(InvalidFieldAnnotation): ...
|
|
|
|
|
class NotAnnotatedField(InvalidFieldAnnotation):
|
|
|
|
|
"""NotAnnotatedField Exception class
|
|
|
|
|
The Field is not Annotated
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
|
|
|
|
|
class IncompletelyAnnotatedField(InvalidFieldAnnotation):
|
|
|
|
|
"""IncompletelyAnnotatedField Exception class
|
|
|
|
|
The field annotation is incomplete
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ReadOnlyFieldAnnotation(DABModelException): ...
|
|
|
|
|
class ReadOnlyFieldAnnotation(DABModelException):
|
|
|
|
|
"""ReadOnlyFieldAnnotation Exception class
|
|
|
|
|
Field annotation connot be modified
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidFieldValue(DABModelException): ...
|
|
|
|
|
class InvalidFieldValue(DABModelException):
|
|
|
|
|
"""InvalidFieldValue Exception class
|
|
|
|
|
The Field value is invalid
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NonExistingField(DABModelException): ...
|
|
|
|
|
class NonExistingField(DABModelException):
|
|
|
|
|
"""NonExistingField Exception class
|
|
|
|
|
The given Field is non existing
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ImportForbidden(DABModelException): ...
|
|
|
|
|
class ImportForbidden(DABModelException):
|
|
|
|
|
"""ImportForbidden Exception class
|
|
|
|
|
Imports are forbidden
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FunctionForbidden(DABModelException): ...
|
|
|
|
|
class FunctionForbidden(DABModelException):
|
|
|
|
|
"""FunctionForbidden Exception class
|
|
|
|
|
function call are forbidden
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ALLOWED_HELPERS_MATH = SimpleNamespace(
|
|
|
|
|
@@ -134,7 +200,7 @@ def _blocked_import(*args, **kwargs):
|
|
|
|
|
def _resolve_annotation(ann):
|
|
|
|
|
if isinstance(ann, str):
|
|
|
|
|
# Safe eval against a **whitelist** only
|
|
|
|
|
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
|
|
|
|
|
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
|
|
|
|
|
return ann
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -152,15 +218,15 @@ def _peel_annotated(t: Any) -> Any:
|
|
|
|
|
return t
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _check_annotation_definition(_type) -> bool:
|
|
|
|
|
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
|
|
|
|
|
_type = _peel_annotated(_type)
|
|
|
|
|
# handle Optional[] and Union[None,...]
|
|
|
|
|
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
|
|
|
|
|
return all([_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None)])
|
|
|
|
|
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
|
|
|
|
|
|
|
|
|
|
# handle other Union[...]
|
|
|
|
|
if get_origin(_type) is Union or get_origin(_type) is UnionType:
|
|
|
|
|
return all([_check_annotation_definition(_) for _ in get_args(_type)])
|
|
|
|
|
return all(_check_annotation_definition(_) for _ in get_args(_type))
|
|
|
|
|
|
|
|
|
|
# handle Dict[...]
|
|
|
|
|
if get_origin(_type) is dict:
|
|
|
|
|
@@ -176,124 +242,165 @@ def _check_annotation_definition(_type) -> bool:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
|
|
|
|
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
|
|
|
|
|
return _check_annotation_definition(inner_types[0])
|
|
|
|
|
return all([_check_annotation_definition(_) for _ in inner_types])
|
|
|
|
|
return all(_check_annotation_definition(_) for _ in inner_types)
|
|
|
|
|
|
|
|
|
|
# handle Set[],Tuple[],FrozenSet[],List[]
|
|
|
|
|
if get_origin(_type) in [set, frozenset, tuple, list]:
|
|
|
|
|
inner_types = get_args(_type)
|
|
|
|
|
if len(inner_types) == 0:
|
|
|
|
|
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
|
|
|
|
return all([_check_annotation_definition(_) for _ in inner_types])
|
|
|
|
|
return all(_check_annotation_definition(_) for _ in inner_types)
|
|
|
|
|
|
|
|
|
|
if _type in ALLOWED_MODEL_FIELDS_TYPES:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseConstraint(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
|
|
|
|
|
T_Field = TypeVar("T_Field")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseConstraint(Generic[T_Field]):
|
|
|
|
|
"""BaseConstraint class
|
|
|
|
|
Base class for Field's constraints
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
_bound_type: type
|
|
|
|
|
|
|
|
|
|
def __init__(self): ...
|
|
|
|
|
|
|
|
|
|
def check(self, value: TV_ALLOWED_MODEL_FIELDS_TYPES) -> bool: ...
|
|
|
|
|
def check(self, value: T_Field) -> bool:
|
|
|
|
|
"""Check if a Constraint is completed"""
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _deepfreeze(value):
|
|
|
|
|
"""recursive freeze helper function"""
|
|
|
|
|
if isinstance(value, dict):
|
|
|
|
|
return deepfreeze(value)
|
|
|
|
|
elif isinstance(value, set):
|
|
|
|
|
if isinstance(value, set):
|
|
|
|
|
return frozenset(_deepfreeze(v) for v in value)
|
|
|
|
|
elif isinstance(value, list):
|
|
|
|
|
if isinstance(value, list):
|
|
|
|
|
return tuple(_deepfreeze(v) for v in value)
|
|
|
|
|
elif isinstance(value, tuple):
|
|
|
|
|
if isinstance(value, tuple):
|
|
|
|
|
return tuple(_deepfreeze(v) for v in value)
|
|
|
|
|
return value
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DABFieldInfo:
|
|
|
|
|
def __init__(self, *, doc: str = "", constraints: list[BaseConstraint] = []):
|
|
|
|
|
"""This Class allows to describe a Field in Appliance class"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
|
|
|
|
|
self._doc: str = doc
|
|
|
|
|
self._constraints: list[BaseConstraint] = constraints
|
|
|
|
|
self._constraints: list[BaseConstraint]
|
|
|
|
|
if constraints is None:
|
|
|
|
|
self._constraints = []
|
|
|
|
|
else:
|
|
|
|
|
self._constraints = constraints
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def doc(self):
|
|
|
|
|
"""Returns Field's documentation"""
|
|
|
|
|
return self._doc
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def constraints(self):
|
|
|
|
|
def constraints(self) -> list[BaseConstraint[Any]]:
|
|
|
|
|
"""Returns Field's constraints"""
|
|
|
|
|
return self._constraints
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
|
|
|
|
|
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any, i: str):
|
|
|
|
|
class DABField(Generic[T_Field]):
|
|
|
|
|
"""This class describe a Field in Schema"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, name: str, v: Optional[T_Field], a: Any, i: DABFieldInfo):
|
|
|
|
|
self._name: str = name
|
|
|
|
|
self._source: Optional[type] = None
|
|
|
|
|
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
|
|
|
|
|
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
|
|
|
|
|
self._default_value: Optional[T_Field] = v
|
|
|
|
|
self._value: Optional[T_Field] = v
|
|
|
|
|
self._annotations: Any = a
|
|
|
|
|
self._info: DABFieldInfo = i
|
|
|
|
|
self._constraints: List[BaseConstraint] = i.constraints
|
|
|
|
|
self._constraints: List[BaseConstraint[Any]] = i.constraints
|
|
|
|
|
|
|
|
|
|
def add_source(self, s: type) -> None:
|
|
|
|
|
"""Adds source Appliance to the Field"""
|
|
|
|
|
self._source = s
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def doc(self):
|
|
|
|
|
"""Returns Field's documentation"""
|
|
|
|
|
return self._info.doc
|
|
|
|
|
|
|
|
|
|
def add_constraint(self, c: BaseConstraint) -> None:
|
|
|
|
|
"""Adds constraint to the Field"""
|
|
|
|
|
self._constraints.append(c)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def constraints(self) -> list[BaseConstraint]:
|
|
|
|
|
"""Returns Field's constraint"""
|
|
|
|
|
return self._info.constraints
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def default_value(self):
|
|
|
|
|
def default_value(self) -> Any:
|
|
|
|
|
"""Returns Field's default value (frozen)"""
|
|
|
|
|
return _deepfreeze(self._default_value)
|
|
|
|
|
|
|
|
|
|
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
|
|
|
|
|
def update_value(self, v: Optional[T_Field] = None) -> None:
|
|
|
|
|
"""Updates Field's value"""
|
|
|
|
|
self._value = v
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def value(self):
|
|
|
|
|
def value(self) -> Any:
|
|
|
|
|
"""Returns Field's value (frosen)"""
|
|
|
|
|
return _deepfreeze(self._value)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def raw_value(self):
|
|
|
|
|
def raw_value(self) -> Optional[T_Field]:
|
|
|
|
|
"""Returns Field's value"""
|
|
|
|
|
return self._value
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def annotations(self) -> Any:
|
|
|
|
|
"""Returns Field's annotation"""
|
|
|
|
|
return self._annotations
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FrozenDABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
|
|
|
|
|
class FrozenDABField(Generic[T_Field]):
|
|
|
|
|
"""FrozenDABField class
|
|
|
|
|
a read-only proxy of a Field
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, inner_field: DABField):
|
|
|
|
|
self._inner_field = inner_field
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def doc(self):
|
|
|
|
|
return self._inner_field.doc
|
|
|
|
|
def doc(self) -> str:
|
|
|
|
|
"""Returns Field's documentation (frozen)"""
|
|
|
|
|
return _deepfreeze(self._inner_field.doc)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def constraints(self):
|
|
|
|
|
def constraints(self) -> tuple[BaseConstraint]:
|
|
|
|
|
"""Returns Field's constraint (frozen)"""
|
|
|
|
|
return _deepfreeze(self._inner_field.constraints)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def default_value(self):
|
|
|
|
|
def default_value(self) -> Any:
|
|
|
|
|
"""Returns Field's default value (frozen)"""
|
|
|
|
|
return self._inner_field.default_value
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def value(self):
|
|
|
|
|
def value(self) -> Any:
|
|
|
|
|
"""Returns Field's value (frosen)"""
|
|
|
|
|
return self._inner_field.value
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def annotations(self) -> Any:
|
|
|
|
|
"""Returns Field's annotation (frozen)"""
|
|
|
|
|
return _deepfreeze(self._inner_field.annotations)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelSpecView:
|
|
|
|
|
"""ModelSpecView class
|
|
|
|
|
A class that will act as fake BaseElement proxy to allow setting values"""
|
|
|
|
|
|
|
|
|
|
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
|
|
|
|
|
|
|
|
|
|
def __init__(self, values: dict, types_map: dict[str, type], name, module):
|
|
|
|
|
@@ -305,18 +412,26 @@ class ModelSpecView:
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def __name__(self) -> str:
|
|
|
|
|
"""returns proxified class' name"""
|
|
|
|
|
return self._name
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def __module__(self) -> str:
|
|
|
|
|
"""returns proxified module's name"""
|
|
|
|
|
return self._module
|
|
|
|
|
|
|
|
|
|
@__module__.setter
|
|
|
|
|
def __module__(self, value):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def __getattr__(self, name):
|
|
|
|
|
"""internal proxy getattr"""
|
|
|
|
|
if name not in self._types:
|
|
|
|
|
raise AttributeError(f"Unknown field {name}")
|
|
|
|
|
return self._vals[name]
|
|
|
|
|
|
|
|
|
|
def __setattr__(self, name, value):
|
|
|
|
|
"""internal proxy setattr"""
|
|
|
|
|
if name not in self._types:
|
|
|
|
|
raise NonExistingField(f"Cannot set unknown field {name}")
|
|
|
|
|
T = self._types[name]
|
|
|
|
|
@@ -334,20 +449,33 @@ class ModelSpecView:
|
|
|
|
|
self._touched.add(name)
|
|
|
|
|
|
|
|
|
|
def export(self) -> dict:
|
|
|
|
|
"""exports all proxified values"""
|
|
|
|
|
return dict(self._vals)
|
|
|
|
|
|
|
|
|
|
def diff(self) -> dict:
|
|
|
|
|
return {k: self._vals[k] for k in self._touched}
|
|
|
|
|
|
|
|
|
|
T_Meta = TypeVar("T_Meta", bound="BaseMeta")
|
|
|
|
|
T_BE = TypeVar("T_BE", bound="BaseElement")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseMeta(type):
|
|
|
|
|
def __new__(mcls, name, bases, namespace):
|
|
|
|
|
"""metaclass to use to build BaseElement"""
|
|
|
|
|
|
|
|
|
|
modified_field: Dict[str, Any] = {}
|
|
|
|
|
new_fields: Dict[str, DABField[Any]] = {}
|
|
|
|
|
initializer: Optional[Callable[..., Any]] = None
|
|
|
|
|
__DABSchema__: dict[str, Any] = {}
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def pre_check(
|
|
|
|
|
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
|
|
|
|
|
) -> None:
|
|
|
|
|
"""early BaseElement checks"""
|
|
|
|
|
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
|
|
|
|
|
|
|
|
|
|
if len(bases) > 1:
|
|
|
|
|
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
|
|
|
|
|
elif len(bases) == 0: # base class (BaseElement)
|
|
|
|
|
namespace["__DABSchema__"] = dict()
|
|
|
|
|
if len(bases) == 0: # base class (BaseElement)
|
|
|
|
|
namespace["__DABSchema__"] = {}
|
|
|
|
|
else: # standard inheritance
|
|
|
|
|
# check class tree origin
|
|
|
|
|
if "__DABSchema__" not in dir(bases[0]):
|
|
|
|
|
@@ -360,16 +488,80 @@ class BaseMeta(type):
|
|
|
|
|
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
|
|
|
|
|
namespace[_funknown] = None
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def pre_processing_modified(
|
|
|
|
|
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
|
|
|
|
|
): # pylint: disable=unused-argument
|
|
|
|
|
"""preprocessing BaseElement modified Fields"""
|
|
|
|
|
# print(f"Modified field: {_fname}")
|
|
|
|
|
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
|
|
|
|
|
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
|
|
|
|
|
try:
|
|
|
|
|
check_type(
|
|
|
|
|
_fvalue,
|
|
|
|
|
namespace["__DABSchema__"][_fname].annotations,
|
|
|
|
|
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
|
|
|
|
)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(
|
|
|
|
|
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
|
|
|
|
|
) from exp
|
|
|
|
|
mcs.modified_field[_fname] = _fvalue
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def pre_processing_new(
|
|
|
|
|
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
|
|
|
|
|
): # pylint: disable=unused-argument
|
|
|
|
|
"""preprocessing BaseElement new Fields"""
|
|
|
|
|
# print(f"New field: {_fname}")
|
|
|
|
|
# print(f"type is: {type(_fvalue)}")
|
|
|
|
|
# print(f"value is: {_fvalue}")
|
|
|
|
|
|
|
|
|
|
# check if field is annotated
|
|
|
|
|
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
|
|
|
|
|
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
|
|
|
|
|
|
|
|
|
|
# check if annotation is allowed
|
|
|
|
|
if isinstance(namespace["__annotations__"][_fname], str):
|
|
|
|
|
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
|
|
|
|
|
|
|
|
|
|
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
|
|
|
|
|
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
|
|
|
|
|
|
|
|
|
|
_finfo: DABFieldInfo = DABFieldInfo()
|
|
|
|
|
origin = get_origin(namespace["__annotations__"][_fname])
|
|
|
|
|
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
|
|
|
|
if "Annotated" in tname:
|
|
|
|
|
args = get_args(namespace["__annotations__"][_fname])
|
|
|
|
|
if args:
|
|
|
|
|
if len(args) > 2:
|
|
|
|
|
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
|
|
|
|
|
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
|
|
|
|
|
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
|
|
|
|
|
|
|
|
|
|
_finfo = args[1]
|
|
|
|
|
|
|
|
|
|
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
|
|
|
|
|
# check if value is valid
|
|
|
|
|
try:
|
|
|
|
|
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
|
|
|
|
|
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
|
|
|
|
|
"""preprocessing BaseElement"""
|
|
|
|
|
# iterating new and modified fields
|
|
|
|
|
modified_field: Dict[str, Any] = {}
|
|
|
|
|
new_fields: Dict[str, DABField] = {}
|
|
|
|
|
initializer: Optional[type] = None
|
|
|
|
|
mcs.modified_field = {}
|
|
|
|
|
mcs.new_fields = {}
|
|
|
|
|
mcs.initializer = None
|
|
|
|
|
initializer_name: Optional[str] = None
|
|
|
|
|
for _fname, _fvalue in namespace.items():
|
|
|
|
|
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
|
|
|
|
|
if not isinstance(_fvalue, classmethod):
|
|
|
|
|
raise InvalidInitializerType()
|
|
|
|
|
initializer = _fvalue.__func__
|
|
|
|
|
mcs.initializer = _fvalue.__func__
|
|
|
|
|
if name.startswith("_"):
|
|
|
|
|
initializer_name = "__initializer"
|
|
|
|
|
else:
|
|
|
|
|
@@ -378,72 +570,55 @@ class BaseMeta(type):
|
|
|
|
|
pass
|
|
|
|
|
else:
|
|
|
|
|
# print(f"Parsing Field: {_fname} / {_fvalue}")
|
|
|
|
|
# Modified fields
|
|
|
|
|
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
|
|
|
|
|
# print(f"Modified field: {_fname}")
|
|
|
|
|
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
|
|
|
|
|
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
|
|
|
|
|
try:
|
|
|
|
|
check_type(
|
|
|
|
|
_fvalue,
|
|
|
|
|
namespace["__DABSchema__"][_fname].annotations,
|
|
|
|
|
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
|
|
|
|
)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(
|
|
|
|
|
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
|
|
|
|
|
) from exp
|
|
|
|
|
modified_field[_fname] = _fvalue
|
|
|
|
|
# New fieds
|
|
|
|
|
else:
|
|
|
|
|
# print(f"New field: {_fname}")
|
|
|
|
|
# print(f"type is: {type(_fvalue)}")
|
|
|
|
|
# print(f"value is: {_fvalue}")
|
|
|
|
|
|
|
|
|
|
# check if field is annotated
|
|
|
|
|
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
|
|
|
|
|
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
|
|
|
|
|
|
|
|
|
|
# check if annotation is allowed
|
|
|
|
|
if isinstance(namespace["__annotations__"][_fname], str):
|
|
|
|
|
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
|
|
|
|
|
|
|
|
|
|
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
|
|
|
|
|
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
|
|
|
|
|
|
|
|
|
|
_finfo: Optional[DABFieldInfo] = DABFieldInfo()
|
|
|
|
|
origin = get_origin(namespace["__annotations__"][_fname])
|
|
|
|
|
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
|
|
|
|
if "Annotated" in tname:
|
|
|
|
|
args = get_args(namespace["__annotations__"][_fname])
|
|
|
|
|
if args:
|
|
|
|
|
if len(args) > 2:
|
|
|
|
|
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
|
|
|
|
|
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
|
|
|
|
|
raise InvalidFieldAnnotation(f"Only DABFieldInfo object is allowed as Annotated data.")
|
|
|
|
|
|
|
|
|
|
_finfo = args[1]
|
|
|
|
|
|
|
|
|
|
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
|
|
|
|
|
# check if value is valid
|
|
|
|
|
try:
|
|
|
|
|
check_type(
|
|
|
|
|
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
|
|
|
|
|
)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(
|
|
|
|
|
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
|
|
|
|
) from exp
|
|
|
|
|
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
|
|
|
|
|
|
|
|
|
|
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
|
|
|
|
|
mcs.pre_processing_modified(name, bases, namespace, _fname, _fvalue)
|
|
|
|
|
else: # New fieds
|
|
|
|
|
mcs.pre_processing_new(name, bases, namespace, _fname, _fvalue)
|
|
|
|
|
# removing modified fields from class (will add them back later)
|
|
|
|
|
for _fname in new_fields.keys():
|
|
|
|
|
for _fname in mcs.new_fields:
|
|
|
|
|
del namespace[_fname]
|
|
|
|
|
for _fname in modified_field.keys():
|
|
|
|
|
for _fname in mcs.modified_field:
|
|
|
|
|
del namespace[_fname]
|
|
|
|
|
if initializer is not None:
|
|
|
|
|
if mcs.initializer is not None and initializer_name is not None:
|
|
|
|
|
del namespace[initializer_name]
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def call_initializer(
|
|
|
|
|
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]
|
|
|
|
|
): # pylint: disable=unused-argument
|
|
|
|
|
"""BaseElement initializer processing"""
|
|
|
|
|
if mcs.initializer is not None:
|
|
|
|
|
init_fieldvalues = {}
|
|
|
|
|
init_fieldtypes = {}
|
|
|
|
|
for _fname, _fvalue in cls.__DABSchema__.items():
|
|
|
|
|
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
|
|
|
|
|
init_fieldtypes[_fname] = _fvalue.annotations
|
|
|
|
|
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
|
|
|
|
|
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
|
|
|
|
|
if mcs.initializer.__code__.co_freevars:
|
|
|
|
|
raise FunctionForbidden("__initializer must not use closures")
|
|
|
|
|
safe_initializer = FunctionType(
|
|
|
|
|
mcs.initializer.__code__,
|
|
|
|
|
safe_globals,
|
|
|
|
|
name=mcs.initializer.__name__,
|
|
|
|
|
argdefs=mcs.initializer.__defaults__,
|
|
|
|
|
closure=None,
|
|
|
|
|
)
|
|
|
|
|
safe_initializer(fakecls) # pylint: disable=not-callable
|
|
|
|
|
for _fname, _fvalue in fakecls.export().items():
|
|
|
|
|
try:
|
|
|
|
|
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(
|
|
|
|
|
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
|
|
|
|
) from exp
|
|
|
|
|
cls.__DABSchema__[_fname].update_value(_fvalue)
|
|
|
|
|
|
|
|
|
|
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
|
|
|
|
|
"""BaseElement new class"""
|
|
|
|
|
mcs.pre_check(name, bases, namespace)
|
|
|
|
|
mcs.pre_processing(name, bases, namespace)
|
|
|
|
|
|
|
|
|
|
orig_setattr = namespace.get("__setattr__", object.__setattr__)
|
|
|
|
|
|
|
|
|
|
def guarded_setattr(self, key, value):
|
|
|
|
|
@@ -454,57 +629,33 @@ class BaseMeta(type):
|
|
|
|
|
if hasattr(self, key):
|
|
|
|
|
raise ReadOnlyField(f"{key} is read-only")
|
|
|
|
|
else:
|
|
|
|
|
raise NewFieldForbidden(f"creating new fields is not allowed")
|
|
|
|
|
raise NewFieldForbidden("creating new fields is not allowed")
|
|
|
|
|
|
|
|
|
|
return orig_setattr(self, key, value)
|
|
|
|
|
|
|
|
|
|
namespace["__setattr__"] = guarded_setattr
|
|
|
|
|
|
|
|
|
|
cls = super().__new__(mcls, name, bases, namespace)
|
|
|
|
|
_cls = super().__new__(mcs, name, bases, namespace)
|
|
|
|
|
|
|
|
|
|
for _fname, _fvalue in modified_field.items():
|
|
|
|
|
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
|
|
|
|
cls.__DABSchema__[_fname].update_value(_fvalue)
|
|
|
|
|
for _fname, _fvalue in mcs.modified_field.items():
|
|
|
|
|
_cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
|
|
|
|
_cls.__DABSchema__[_fname].update_value(_fvalue)
|
|
|
|
|
|
|
|
|
|
for _fname, _fvalue in new_fields.items():
|
|
|
|
|
_fvalue.add_source(cls)
|
|
|
|
|
cls.__DABSchema__[_fname] = _fvalue
|
|
|
|
|
for _fname, _fvalue in mcs.new_fields.items():
|
|
|
|
|
_fvalue.add_source(mcs)
|
|
|
|
|
_cls.__DABSchema__[_fname] = _fvalue
|
|
|
|
|
|
|
|
|
|
if initializer is not None:
|
|
|
|
|
init_fieldvalues = dict()
|
|
|
|
|
init_fieldtypes = dict()
|
|
|
|
|
for _fname, _fvalue in cls.__DABSchema__.items():
|
|
|
|
|
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
|
|
|
|
|
init_fieldtypes[_fname] = _fvalue.annotations
|
|
|
|
|
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
|
|
|
|
|
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
|
|
|
|
|
if initializer.__code__.co_freevars:
|
|
|
|
|
raise FunctionForbidden("__initializer must not use closures")
|
|
|
|
|
safe_initializer = FunctionType(
|
|
|
|
|
initializer.__code__,
|
|
|
|
|
safe_globals,
|
|
|
|
|
name=initializer.__name__,
|
|
|
|
|
argdefs=initializer.__defaults__,
|
|
|
|
|
closure=None,
|
|
|
|
|
)
|
|
|
|
|
safe_initializer(fakecls)
|
|
|
|
|
print(fakecls.diff())
|
|
|
|
|
for _fname, _fvalue in fakecls.export().items():
|
|
|
|
|
try:
|
|
|
|
|
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
|
|
|
|
|
except TypeCheckError as exp:
|
|
|
|
|
raise InvalidFieldValue(
|
|
|
|
|
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
|
|
|
|
) from exp
|
|
|
|
|
cls.__DABSchema__[_fname].update_value(_fvalue)
|
|
|
|
|
return cls
|
|
|
|
|
mcs.call_initializer(_cls, name, bases, namespace)
|
|
|
|
|
|
|
|
|
|
def __call__(cls, *args, **kw):
|
|
|
|
|
return _cls
|
|
|
|
|
|
|
|
|
|
def __call__(cls, *args: Any, **kw: Any): # intentionally untyped
|
|
|
|
|
"""BaseElement new instance"""
|
|
|
|
|
obj = super().__call__(*args, **kw)
|
|
|
|
|
|
|
|
|
|
for _fname in cls.__DABSchema__.keys():
|
|
|
|
|
for _fname, _fvalue in cls.__DABSchema__.items():
|
|
|
|
|
setattr(obj, _fname, cls.__DABSchema__[_fname].value)
|
|
|
|
|
inst_schema = dict()
|
|
|
|
|
inst_schema: dict[str, Any] = {}
|
|
|
|
|
for _fname, _fvalue in cls.__DABSchema__.items():
|
|
|
|
|
inst_schema[_fname] = FrozenDABField(_fvalue)
|
|
|
|
|
setattr(obj, "__DABSchema__", inst_schema)
|
|
|
|
|
@@ -512,12 +663,20 @@ class BaseMeta(type):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseElement(metaclass=BaseMeta):
|
|
|
|
|
pass
|
|
|
|
|
"""BaseElement class
|
|
|
|
|
Base class to apply metaclass and set common Fields.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseFeature(BaseElement):
|
|
|
|
|
pass
|
|
|
|
|
"""BaseFeature class
|
|
|
|
|
Base class for Appliance's Features.
|
|
|
|
|
Features are optional traits of an appliance.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseAppliance(BaseElement):
|
|
|
|
|
pass
|
|
|
|
|
"""BaseFeature class
|
|
|
|
|
Base class for Appliance.
|
|
|
|
|
An appliance is a server configuration / image that is built using appliance's code and Fields.
|
|
|
|
|
"""
|
|
|
|
|
|