Compare commits
9 Commits
0.0.1.post
...
0.0.1.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86eee2e378 | ||
|
|
3e0defc574 | ||
|
|
f6e581381d | ||
|
|
981c5201a9 | ||
|
|
ab11052c8f | ||
|
|
4f5dade949 | ||
|
|
cce260bc5e | ||
|
|
915a4332ee | ||
|
|
4dca3eb9d1 |
@@ -13,6 +13,7 @@ Main module __init__ file.
|
||||
from .__metadata__ import __version__, __Summuary__, __Name__
|
||||
from .model import (
|
||||
DABFieldInfo,
|
||||
DABField,
|
||||
BaseAppliance,
|
||||
BaseFeature,
|
||||
DABModelException,
|
||||
@@ -27,4 +28,7 @@ from .model import (
|
||||
IncompletelyAnnotatedField,
|
||||
ImportForbidden,
|
||||
FunctionForbidden,
|
||||
FrozenDABField,
|
||||
InvalidFeatureInheritance,
|
||||
FeatureNotBound,
|
||||
)
|
||||
|
||||
@@ -15,20 +15,26 @@ import warnings
|
||||
|
||||
try: # pragma: no cover
|
||||
__version__ = version("dabmodel")
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn(
|
||||
"can not read __version__, assuming local test context, setting it to ?.?.?"
|
||||
)
|
||||
__version__ = "?.?.?"
|
||||
|
||||
try: # pragma: no cover
|
||||
dist = distribution("dabmodel")
|
||||
__Summuary__ = dist.metadata["Summary"]
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn('can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>')
|
||||
warnings.warn(
|
||||
'can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>'
|
||||
)
|
||||
__Summuary__ = "dabmodel description"
|
||||
|
||||
try: # pragma: no cover
|
||||
dist = distribution("dabmodel")
|
||||
__Name__ = dist.metadata["Name"]
|
||||
except PackageNotFoundError: # pragma: no cover
|
||||
warnings.warn('can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>')
|
||||
warnings.warn(
|
||||
'can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>'
|
||||
)
|
||||
__Name__ = "dabmodel"
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
""" dabmodel model module
|
||||
"""dabmodel model module
|
||||
This module implements DAB model classes.
|
||||
This module contains metaclass and bases classes used to create models.
|
||||
BaseAppliance can be used to create a new Appliance Data.
|
||||
@@ -55,7 +55,6 @@ ALLOWED_ANNOTATIONS = {
|
||||
}
|
||||
|
||||
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
|
||||
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
|
||||
|
||||
|
||||
# TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
|
||||
@@ -67,6 +66,12 @@ class DABModelException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class ReservedFieldName(Exception):
|
||||
"""DABModelException Exception class
|
||||
Base Exception for DABModelException class
|
||||
"""
|
||||
|
||||
|
||||
class MultipleInheritanceForbidden(DABModelException):
|
||||
"""MultipleInheritanceForbidden Exception class
|
||||
Multiple inheritance is forbidden when using dabmodel
|
||||
@@ -139,12 +144,24 @@ class ImportForbidden(DABModelException):
|
||||
"""
|
||||
|
||||
|
||||
class InvalidFeatureInheritance(DABModelException):
|
||||
"""InvalidFeatureInheritance Exception class
|
||||
Features of same name in child appliance need to be from same type
|
||||
"""
|
||||
|
||||
|
||||
class FunctionForbidden(DABModelException):
|
||||
"""FunctionForbidden Exception class
|
||||
function call are forbidden
|
||||
"""
|
||||
|
||||
|
||||
class FeatureNotBound(DABModelException):
|
||||
"""FeatureNotBound Exception class
|
||||
a Feature must be bound to an Appliance
|
||||
"""
|
||||
|
||||
|
||||
ALLOWED_HELPERS_MATH = SimpleNamespace(
|
||||
sqrt=math.sqrt,
|
||||
floor=math.floor,
|
||||
@@ -200,7 +217,9 @@ def _blocked_import(*args, **kwargs):
|
||||
def _resolve_annotation(ann):
|
||||
if isinstance(ann, str):
|
||||
# Safe eval against a **whitelist** only
|
||||
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
|
||||
return eval( # pylint: disable=eval-used
|
||||
ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS
|
||||
)
|
||||
return ann
|
||||
|
||||
|
||||
@@ -210,7 +229,11 @@ def _peel_annotated(t: Any) -> Any:
|
||||
origin = get_origin(t)
|
||||
if origin is None:
|
||||
return t
|
||||
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
name = (
|
||||
getattr(origin, "__name__", "")
|
||||
or getattr(origin, "__qualname__", "")
|
||||
or str(origin)
|
||||
)
|
||||
if "Annotated" in name:
|
||||
args = get_args(t)
|
||||
t = args[0] if args else t
|
||||
@@ -218,11 +241,19 @@ def _peel_annotated(t: Any) -> Any:
|
||||
return t
|
||||
|
||||
|
||||
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
|
||||
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
|
||||
_type,
|
||||
) -> bool:
|
||||
_type = _peel_annotated(_type)
|
||||
# handle Optional[] and Union[None,...]
|
||||
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
|
||||
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
|
||||
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(
|
||||
None
|
||||
) in get_args(_type):
|
||||
return all(
|
||||
_check_annotation_definition(_)
|
||||
for _ in get_args(_type)
|
||||
if _ is not type(None)
|
||||
)
|
||||
|
||||
# handle other Union[...]
|
||||
if get_origin(_type) is Union or get_origin(_type) is UnionType:
|
||||
@@ -232,14 +263,20 @@ def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,
|
||||
if get_origin(_type) is dict:
|
||||
inner = get_args(_type)
|
||||
if len(inner) != 2:
|
||||
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
|
||||
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Dict Annotation requires 2 inner definitions: {_type}"
|
||||
)
|
||||
return _peel_annotated(
|
||||
inner[0]
|
||||
) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
|
||||
|
||||
# handle Tuple[]
|
||||
if get_origin(_type) in [tuple]:
|
||||
inner_types = get_args(_type)
|
||||
if len(inner_types) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Annotation requires inner definition: {_type}"
|
||||
)
|
||||
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
|
||||
return _check_annotation_definition(inner_types[0])
|
||||
return all(_check_annotation_definition(_) for _ in inner_types)
|
||||
@@ -248,7 +285,9 @@ def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,
|
||||
if get_origin(_type) in [set, frozenset, tuple, list]:
|
||||
inner_types = get_args(_type)
|
||||
if len(inner_types) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Annotation requires inner definition: {_type}"
|
||||
)
|
||||
return all(_check_annotation_definition(_) for _ in inner_types)
|
||||
|
||||
if _type in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
@@ -289,7 +328,9 @@ def _deepfreeze(value):
|
||||
class DABFieldInfo:
|
||||
"""This Class allows to describe a Field in Appliance class"""
|
||||
|
||||
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
|
||||
def __init__(
|
||||
self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None
|
||||
):
|
||||
self._doc: str = doc
|
||||
self._constraints: list[BaseConstraint]
|
||||
if constraints is None:
|
||||
@@ -298,7 +339,7 @@ class DABFieldInfo:
|
||||
self._constraints = constraints
|
||||
|
||||
@property
|
||||
def doc(self):
|
||||
def doc(self) -> str:
|
||||
"""Returns Field's documentation"""
|
||||
return self._doc
|
||||
|
||||
@@ -325,7 +366,7 @@ class DABField(Generic[T_Field]):
|
||||
self._source = s
|
||||
|
||||
@property
|
||||
def doc(self):
|
||||
def doc(self) -> str:
|
||||
"""Returns Field's documentation"""
|
||||
return self._info.doc
|
||||
|
||||
@@ -403,10 +444,16 @@ class ModelSpecView:
|
||||
|
||||
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
|
||||
|
||||
def __init__(self, values: dict, types_map: dict[str, type], name, module):
|
||||
def __init__(
|
||||
self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str
|
||||
):
|
||||
self._name: str
|
||||
self._vals: dict[str, Any]
|
||||
self._types: dict[str, type]
|
||||
self._touched: set
|
||||
self._module: str
|
||||
object.__setattr__(self, "_vals", dict(values))
|
||||
object.__setattr__(self, "_types", types_map)
|
||||
object.__setattr__(self, "_touched", set())
|
||||
object.__setattr__(self, "_name", name)
|
||||
object.__setattr__(self, "_module", module)
|
||||
|
||||
@@ -421,16 +468,16 @@ class ModelSpecView:
|
||||
return self._module
|
||||
|
||||
@__module__.setter
|
||||
def __module__(self, value):
|
||||
def __module__(self, value: str):
|
||||
pass
|
||||
|
||||
def __getattr__(self, name):
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
"""internal proxy getattr"""
|
||||
if name not in self._types:
|
||||
raise AttributeError(f"Unknown field {name}")
|
||||
return self._vals[name]
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
def __setattr__(self, name: str, value: Any):
|
||||
"""internal proxy setattr"""
|
||||
if name not in self._types:
|
||||
raise NonExistingField(f"Cannot set unknown field {name}")
|
||||
@@ -443,10 +490,11 @@ class ModelSpecView:
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
|
||||
raise InvalidFieldValue(
|
||||
f"Field <{name}> value is not of expected type {T}."
|
||||
) from exp
|
||||
|
||||
self._vals[name] = value
|
||||
self._touched.add(name)
|
||||
|
||||
def export(self) -> dict:
|
||||
"""exports all proxified values"""
|
||||
@@ -460,42 +508,133 @@ T_BE = TypeVar("T_BE", bound="BaseElement")
|
||||
class BaseMeta(type):
|
||||
"""metaclass to use to build BaseElement"""
|
||||
|
||||
modified_field: Dict[str, Any] = {}
|
||||
modified_fields: Dict[str, Any] = {}
|
||||
new_fields: Dict[str, DABField[Any]] = {}
|
||||
initializer: Optional[Callable[..., Any]] = None
|
||||
__DABSchema__: dict[str, Any] = {}
|
||||
|
||||
@classmethod
|
||||
def pre_check(
|
||||
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any] # pylint: disable=unused-argument
|
||||
def check_class(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
) -> None:
|
||||
"""early BaseElement checks"""
|
||||
"""
|
||||
Early class-build hook.
|
||||
|
||||
Validates the inheritance shape, initializes an empty schema for root classes,
|
||||
copies the parent schema for subclasses, and ensures all annotated fields
|
||||
have a default (inserting `None` when missing).
|
||||
|
||||
This runs before the class object is created.
|
||||
"""
|
||||
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
|
||||
|
||||
if len(bases) > 1:
|
||||
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
|
||||
raise MultipleInheritanceForbidden(
|
||||
"Multiple inheritance is not supported by dabmodel"
|
||||
)
|
||||
if len(bases) == 0: # base class (BaseElement)
|
||||
namespace["__DABSchema__"] = {}
|
||||
else: # standard inheritance
|
||||
# check class tree origin
|
||||
if "__DABSchema__" not in dir(bases[0]):
|
||||
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
|
||||
raise BrokenInheritance(
|
||||
"__DABSchema__ not found in base class, broken inheritance chain."
|
||||
)
|
||||
# copy inherited schema
|
||||
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
|
||||
|
||||
# force field without default value to be instantiated (with None)
|
||||
if "__annotations__" in namespace:
|
||||
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
|
||||
for _funknown in [
|
||||
_ for _ in namespace["__annotations__"] if _ not in namespace.keys()
|
||||
]:
|
||||
namespace[_funknown] = None
|
||||
|
||||
@classmethod
|
||||
def pre_processing_modified(
|
||||
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
|
||||
def process_class_fields( # pylint: disable=too-complex,too-many-branches
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Scan the class namespace and partition fields.
|
||||
|
||||
Detects:
|
||||
- modified fields (shadowing parent values),
|
||||
- new fields (present in annotations),
|
||||
- the optional `__initializer` classmethod (in mangled or unmangled form).
|
||||
|
||||
Validates annotations and types and removes processed items from `namespace`
|
||||
so they won't become normal attributes. Results are staged into:
|
||||
mcs.new_fields, mcs.modified_fields, mcs.initializer
|
||||
to be committed later.
|
||||
"""
|
||||
# iterating new and modified fields
|
||||
mcs.modified_fields = {}
|
||||
mcs.new_fields = {}
|
||||
mcs.initializer = None
|
||||
initializer_name: Optional[str] = None
|
||||
for _fname, _fvalue in namespace.items():
|
||||
if _fname == f"_{name}__initializer" or (
|
||||
name.startswith("_") and _fname == "__initializer"
|
||||
):
|
||||
if not isinstance(_fvalue, classmethod):
|
||||
raise InvalidInitializerType()
|
||||
mcs.initializer = _fvalue.__func__
|
||||
if name.startswith("_"):
|
||||
initializer_name = "__initializer"
|
||||
else:
|
||||
initializer_name = f"_{name}__initializer"
|
||||
elif _fname.startswith("_"):
|
||||
pass
|
||||
elif isinstance(_fvalue, classmethod):
|
||||
pass
|
||||
else:
|
||||
print(f"Parsing Field: {_fname} / {_fvalue}")
|
||||
if (
|
||||
len(bases) == 1 and _fname in namespace["__DABSchema__"].keys()
|
||||
): # Modified fields
|
||||
mcs.process_modified_field(
|
||||
name, bases, namespace, _fname, _fvalue, extensions
|
||||
)
|
||||
else: # New fieds
|
||||
mcs.process_new_field(
|
||||
name, bases, namespace, _fname, _fvalue, extensions
|
||||
)
|
||||
# removing modified fields from class (will add them back later)
|
||||
for _fname in mcs.new_fields:
|
||||
del namespace[_fname]
|
||||
for _fname in mcs.modified_fields:
|
||||
del namespace[_fname]
|
||||
if mcs.initializer is not None and initializer_name is not None:
|
||||
del namespace[initializer_name]
|
||||
|
||||
@classmethod
|
||||
def process_modified_field(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""preprocessing BaseElement modified Fields"""
|
||||
# print(f"Modified field: {_fname}")
|
||||
"""
|
||||
Handle a *modified* field declared by a subclass.
|
||||
|
||||
Forbids annotation changes, validates the new default value against
|
||||
the inherited annotation, and stages the new default into `mcs.modified_fields`.
|
||||
"""
|
||||
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
|
||||
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
|
||||
raise ReadOnlyFieldAnnotation(
|
||||
f"annotations cannot be modified on derived classes {_fname}"
|
||||
)
|
||||
try:
|
||||
check_type(
|
||||
_fvalue,
|
||||
@@ -506,95 +645,114 @@ class BaseMeta(type):
|
||||
raise InvalidFieldValue(
|
||||
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
|
||||
) from exp
|
||||
mcs.modified_field[_fname] = _fvalue
|
||||
mcs.modified_fields[_fname] = _fvalue
|
||||
|
||||
@classmethod
|
||||
def pre_processing_new(
|
||||
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any
|
||||
def process_new_field(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""preprocessing BaseElement new Fields"""
|
||||
"""
|
||||
Handle a *new* field declared on the class.
|
||||
|
||||
Resolves string annotations against a whitelist, validates `Annotated[...]`
|
||||
payloads (allowing only DABFieldInfo), checks the default value type,
|
||||
and stages the field as a `DABField` in `mcs.new_fields`.
|
||||
"""
|
||||
# print(f"New field: {_fname}")
|
||||
# print(f"type is: {type(_fvalue)}")
|
||||
# print(f"value is: {_fvalue}")
|
||||
|
||||
# check if field is annotated
|
||||
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
|
||||
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
|
||||
if (
|
||||
"__annotations__" not in namespace
|
||||
or _fname not in namespace["__annotations__"]
|
||||
):
|
||||
raise NotAnnotatedField(
|
||||
f"Every dabmodel Fields must be annotated ({_fname})"
|
||||
)
|
||||
|
||||
# check if annotation is allowed
|
||||
if isinstance(namespace["__annotations__"][_fname], str):
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(
|
||||
namespace["__annotations__"][_fname]
|
||||
)
|
||||
|
||||
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
|
||||
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
|
||||
raise InvalidFieldAnnotation(
|
||||
f"Field <{_fname}> has not an allowed or valid annotation."
|
||||
)
|
||||
|
||||
_finfo: DABFieldInfo = DABFieldInfo()
|
||||
origin = get_origin(namespace["__annotations__"][_fname])
|
||||
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
tname = (
|
||||
getattr(origin, "__name__", "")
|
||||
or getattr(origin, "__qualname__", "")
|
||||
or str(origin)
|
||||
)
|
||||
if "Annotated" in tname:
|
||||
args = get_args(namespace["__annotations__"][_fname])
|
||||
if args:
|
||||
if len(args) > 2:
|
||||
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
|
||||
raise InvalidFieldAnnotation(
|
||||
f"Field <{_fname}> had invalid Annotated value."
|
||||
)
|
||||
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
|
||||
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
|
||||
raise InvalidFieldAnnotation(
|
||||
"Only DABFieldInfo object is allowed as Annotated data."
|
||||
)
|
||||
|
||||
_finfo = args[1]
|
||||
|
||||
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
|
||||
# check if value is valid
|
||||
try:
|
||||
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
|
||||
check_type(
|
||||
_fvalue,
|
||||
namespace["__annotations__"][_fname],
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
|
||||
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
||||
) from exp
|
||||
mcs.new_fields[_fname] = DABField(
|
||||
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]):
|
||||
"""preprocessing BaseElement"""
|
||||
# iterating new and modified fields
|
||||
mcs.modified_field = {}
|
||||
mcs.new_fields = {}
|
||||
mcs.initializer = None
|
||||
initializer_name: Optional[str] = None
|
||||
for _fname, _fvalue in namespace.items():
|
||||
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
|
||||
if not isinstance(_fvalue, classmethod):
|
||||
raise InvalidInitializerType()
|
||||
mcs.initializer = _fvalue.__func__
|
||||
if name.startswith("_"):
|
||||
initializer_name = "__initializer"
|
||||
else:
|
||||
initializer_name = f"_{name}__initializer"
|
||||
elif _fname.startswith("__"):
|
||||
pass
|
||||
else:
|
||||
# print(f"Parsing Field: {_fname} / {_fvalue}")
|
||||
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
|
||||
mcs.pre_processing_modified(name, bases, namespace, _fname, _fvalue)
|
||||
else: # New fieds
|
||||
mcs.pre_processing_new(name, bases, namespace, _fname, _fvalue)
|
||||
# removing modified fields from class (will add them back later)
|
||||
for _fname in mcs.new_fields:
|
||||
del namespace[_fname]
|
||||
for _fname in mcs.modified_field:
|
||||
del namespace[_fname]
|
||||
if mcs.initializer is not None and initializer_name is not None:
|
||||
del namespace[initializer_name]
|
||||
def apply_initializer(
|
||||
mcs: type["BaseMeta"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Apply the optional `__initializer` classmethod to compute derived defaults.
|
||||
|
||||
@classmethod
|
||||
def call_initializer(
|
||||
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]
|
||||
): # pylint: disable=unused-argument
|
||||
"""BaseElement initializer processing"""
|
||||
The initializer runs in a restricted, import-blocked environment using a
|
||||
`ModelSpecView` proxy that enforces type checking on assignments.
|
||||
On success, the computed values are validated and written back into the
|
||||
class schema's DABFields.
|
||||
"""
|
||||
if mcs.initializer is not None:
|
||||
init_fieldvalues = {}
|
||||
init_fieldtypes = {}
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
|
||||
init_fieldtypes[_fname] = _fvalue.annotations
|
||||
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
|
||||
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
|
||||
if isinstance(_fvalue, DABField):
|
||||
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
|
||||
init_fieldtypes[_fname] = _fvalue.annotations
|
||||
fakecls = ModelSpecView(
|
||||
init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__
|
||||
)
|
||||
safe_globals = {
|
||||
"__builtins__": {"__import__": _blocked_import},
|
||||
**ALLOWED_HELPERS_DEFAULT,
|
||||
}
|
||||
if mcs.initializer.__code__.co_freevars:
|
||||
raise FunctionForbidden("__initializer must not use closures")
|
||||
safe_initializer = FunctionType(
|
||||
@@ -607,60 +765,162 @@ class BaseMeta(type):
|
||||
safe_initializer(fakecls) # pylint: disable=not-callable
|
||||
for _fname, _fvalue in fakecls.export().items():
|
||||
try:
|
||||
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
|
||||
check_type(
|
||||
_fvalue,
|
||||
cls.__DABSchema__[_fname].annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
||||
) from exp
|
||||
cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
|
||||
def __new__(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
) -> Type:
|
||||
"""BaseElement new class"""
|
||||
mcs.pre_check(name, bases, namespace)
|
||||
mcs.pre_processing(name, bases, namespace)
|
||||
|
||||
orig_setattr = namespace.get("__setattr__", object.__setattr__)
|
||||
|
||||
def guarded_setattr(self, key, value):
|
||||
if key.startswith("_"): # allow private and dunder attrs
|
||||
return orig_setattr(self, key, value)
|
||||
# block writes after init if key is readonly
|
||||
if key in self.__DABSchema__.keys():
|
||||
if hasattr(self, key):
|
||||
raise ReadOnlyField(f"{key} is read-only")
|
||||
else:
|
||||
raise NewFieldForbidden("creating new fields is not allowed")
|
||||
|
||||
return orig_setattr(self, key, value)
|
||||
|
||||
namespace["__setattr__"] = guarded_setattr
|
||||
extensions: dict[str, Any] = {}
|
||||
mcs.check_class(name, bases, namespace, extensions)
|
||||
mcs.process_class_fields(name, bases, namespace, extensions)
|
||||
|
||||
_cls = super().__new__(mcs, name, bases, namespace)
|
||||
|
||||
for _fname, _fvalue in mcs.modified_field.items():
|
||||
_cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
||||
_cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
for _fname, _fvalue in mcs.new_fields.items():
|
||||
_fvalue.add_source(mcs)
|
||||
_cls.__DABSchema__[_fname] = _fvalue
|
||||
|
||||
mcs.call_initializer(_cls, name, bases, namespace)
|
||||
mcs.commit_fields(_cls, name, bases, namespace, extensions)
|
||||
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
|
||||
_cls.install_instance_guard(extensions)
|
||||
|
||||
return _cls
|
||||
|
||||
def __call__(cls, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseElement new instance"""
|
||||
obj = super().__call__(*args, **kw)
|
||||
@classmethod
|
||||
def commit_fields(
|
||||
mcs: type["BaseMeta"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Commit staged fields into the class schema (`__DABSchema__`).
|
||||
|
||||
- For modified fields: copy the parent's DABField, update its value.
|
||||
- For new fields: set the freshly built DABField and record its source.
|
||||
"""
|
||||
for _fname, _fvalue in mcs.modified_fields.items():
|
||||
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
||||
cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
for _fname, _fvalue in mcs.new_fields.items():
|
||||
_fvalue.add_source(cls)
|
||||
cls.__DABSchema__[_fname] = _fvalue
|
||||
|
||||
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseElement new instance"""
|
||||
obj = super().__call__(*args)
|
||||
|
||||
extensions: dict[str, Any] = {}
|
||||
|
||||
cls.populate_instance( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.freeze_instance_schema( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.apply_overrides( # pylint: disable=no-value-for-parameter
|
||||
obj, extensions, *args, **kw
|
||||
)
|
||||
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
|
||||
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
setattr(obj, _fname, _fvalue)
|
||||
inst_schema: dict[str, Any] = {}
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
inst_schema[_fname] = FrozenDABField(_fvalue)
|
||||
setattr(obj, "__DABSchema__", inst_schema)
|
||||
return obj
|
||||
|
||||
def populate_instance(
|
||||
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
|
||||
):
|
||||
"""
|
||||
Populate the new instance with field values from the class schema.
|
||||
|
||||
Copies each DABField.value to an instance attribute (deep-frozen view).
|
||||
"""
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
if isinstance(_fvalue, DABField):
|
||||
object.__setattr__(obj, _fname, _fvalue.value)
|
||||
|
||||
def freeze_instance_schema(
|
||||
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
|
||||
):
|
||||
"""
|
||||
Freeze the instance's schema by wrapping DABFields into FrozenDABField.
|
||||
|
||||
Creates a per-instance `__DABSchema__` dict where each field is read-only.
|
||||
"""
|
||||
inst_schema = copy(obj.__DABSchema__)
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
if isinstance(_fvalue, DABField):
|
||||
inst_schema[_fname] = FrozenDABField(_fvalue)
|
||||
|
||||
if "features" in inst_schema:
|
||||
inst_schema["features"] = dict(inst_schema["features"])
|
||||
|
||||
object.__setattr__(obj, "__DABSchema__", inst_schema)
|
||||
|
||||
def apply_overrides(cls, obj, extensions, *args, **kwargs):
|
||||
"""
|
||||
Hook for runtime overrides at instance creation.
|
||||
|
||||
Invoked after the schema has been frozen but before finalize_instance.
|
||||
Subclasses of BaseMeta can override this to support things like:
|
||||
|
||||
- Field overrides: MyApp(field=value)
|
||||
- Feature overrides: MyApp(FeatureName=CustomFeature)
|
||||
- Feature attachments: MyApp(NewFeature=FeatureClass)
|
||||
|
||||
By default this does nothing.
|
||||
"""
|
||||
|
||||
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
|
||||
"""
|
||||
Finalization hook invoked at the end of instance construction.
|
||||
|
||||
Subclasses of the metaclass override this to attach runtime components
|
||||
to the instance. (Example: BaseMetaAppliance instantiates bound Features
|
||||
and sets them as attributes on the appliance instance.)
|
||||
"""
|
||||
|
||||
def install_instance_guard(cls: Type, extensions: dict[str, Any]):
|
||||
"""
|
||||
Install the runtime `__setattr__` guard on the class.
|
||||
|
||||
After instances are constructed, prevents:
|
||||
- creating new public fields,
|
||||
- reassigning existing fields post-initialization.
|
||||
|
||||
Private/dunder attributes are exempt to allow internal bookkeeping.
|
||||
"""
|
||||
orig_setattr = getattr(cls, "__setattr__")
|
||||
|
||||
# cls.orig_setattr = orig_setattr
|
||||
|
||||
def guarded_setattr(_self, key: str, value: Any):
|
||||
if key.startswith("_"): # allow private and dunder attrs
|
||||
return orig_setattr(_self, key, value)
|
||||
# block writes after init if key is readonly
|
||||
if key in _self.__DABSchema__.keys():
|
||||
if key in _self.__dict__:
|
||||
raise ReadOnlyField(f"{key} is read-only")
|
||||
# elif key in _self.__DABSchema__["features"].keys():
|
||||
# if key in _self.__dict__:
|
||||
# raise ReadOnlyField(f"{key} is read-only")
|
||||
else:
|
||||
raise NewFieldForbidden("creating new fields is not allowed")
|
||||
|
||||
return orig_setattr(_self, key, value)
|
||||
|
||||
setattr(cls, "__setattr__", guarded_setattr)
|
||||
|
||||
|
||||
class BaseElement(metaclass=BaseMeta):
|
||||
"""BaseElement class
|
||||
@@ -668,14 +928,194 @@ class BaseElement(metaclass=BaseMeta):
|
||||
"""
|
||||
|
||||
|
||||
class BaseFeature(BaseElement):
|
||||
class BaseMetaFeature(BaseMeta):
|
||||
"""BaseMetaFeature class
|
||||
Feature specific metaclass code
|
||||
"""
|
||||
|
||||
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
|
||||
"""BaseFeature new instance"""
|
||||
|
||||
if cls._BoundAppliance is None:
|
||||
raise FeatureNotBound()
|
||||
|
||||
obj = super().__call__(*args, **kw)
|
||||
|
||||
return obj
|
||||
|
||||
|
||||
class BaseFeature(BaseElement, metaclass=BaseMetaFeature):
|
||||
"""BaseFeature class
|
||||
Base class for Appliance's Features.
|
||||
Features are optional traits of an appliance.
|
||||
"""
|
||||
|
||||
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
|
||||
Enabled: bool = False
|
||||
|
||||
class BaseAppliance(BaseElement):
|
||||
|
||||
class BaseMetaAppliance(BaseMeta):
|
||||
"""BaseMetaAppliance class
|
||||
Appliance specific metaclass code
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def check_class(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
) -> None:
|
||||
"""
|
||||
Appliance-specific pre-check: ensure the `features` slot exists in schema.
|
||||
|
||||
Copies the parent's `features` mapping when inheriting to keep it per-class.
|
||||
"""
|
||||
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
|
||||
if "features" not in namespace["__DABSchema__"]:
|
||||
namespace["__DABSchema__"]["features"] = {}
|
||||
else:
|
||||
namespace["__DABSchema__"]["features"] = copy(
|
||||
namespace["__DABSchema__"]["features"]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def process_class_fields(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Like BaseMeta.process_class_fields but also stages Feature declarations.
|
||||
|
||||
Initializes:
|
||||
extensions["new_features"], extensions["modified_features"]
|
||||
then defers to the base scanner for regular fields.
|
||||
"""
|
||||
extensions["new_features"] = {}
|
||||
extensions["modified_features"] = {}
|
||||
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
|
||||
|
||||
@classmethod
|
||||
def process_new_field(
|
||||
mcs: type["BaseMeta"],
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
_fname: str,
|
||||
_fvalue: Any,
|
||||
extensions: dict[str, Any],
|
||||
): # pylint: disable=unused-argument
|
||||
"""
|
||||
Intercept Feature declarations.
|
||||
|
||||
- If `_fname` already exists in parent's `features`, enforce same type;
|
||||
stage into `modified_features`.
|
||||
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
|
||||
- Otherwise, it is a regular field: delegate to BaseMeta.process_new_field.
|
||||
"""
|
||||
if _fname in namespace["__DABSchema__"]["features"].keys():
|
||||
if not issubclass(_fvalue, namespace["__DABSchema__"]["features"][_fname]):
|
||||
raise InvalidFeatureInheritance(
|
||||
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
|
||||
)
|
||||
extensions["modified_features"][_fname] = _fvalue
|
||||
elif isinstance(_fvalue, type) and issubclass(_fvalue, BaseFeature):
|
||||
extensions["new_features"][_fname] = _fvalue
|
||||
else:
|
||||
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
|
||||
|
||||
@classmethod
|
||||
def commit_fields(
|
||||
mcs: type["BaseMeta"],
|
||||
cls,
|
||||
name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any], # pylint: disable=unused-argument
|
||||
extensions: dict[str, Any],
|
||||
):
|
||||
"""
|
||||
Commit regular fields (via BaseMeta) and then bind staged Feature classes.
|
||||
|
||||
For each new/modified feature:
|
||||
- bind it to `cls` (sets the feature's `_BoundAppliance`),
|
||||
- register it under `cls.__DABSchema__["features"]`.
|
||||
"""
|
||||
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
|
||||
|
||||
for _ftname, _ftvalue in extensions["modified_features"].items():
|
||||
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
|
||||
cls.__DABSchema__["features"][_ftname] = _ftvalue
|
||||
for _ftname, _ftvalue in extensions["new_features"].items():
|
||||
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
|
||||
cls.__DABSchema__["features"][_ftname] = _ftvalue
|
||||
|
||||
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
|
||||
"""
|
||||
Instantiate and attach all declared Feature classes on the appliance instance.
|
||||
|
||||
Each feature is constructed (running the same populate/freeze steps),
|
||||
then assigned to `obj.<FeatureName>`.
|
||||
"""
|
||||
for _ftname, _ftvalue in cls.__DABSchema__["features"].items():
|
||||
instft = _ftvalue()
|
||||
object.__setattr__(obj, _ftname, instft)
|
||||
|
||||
def apply_overrides(cls, obj, extensions, *args, **kwargs):
|
||||
"""
|
||||
Support for runtime field and feature overrides.
|
||||
|
||||
Examples:
|
||||
MyApp(name="foo") # field override
|
||||
MyApp(Nginx=CustomNginxFeature) # override existing feature
|
||||
MyApp(Redis=RedisFeature) # attach new feature
|
||||
"""
|
||||
# Handle field overrides
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__: # regular field
|
||||
field = cls.__DABSchema__[k]
|
||||
try:
|
||||
check_type(
|
||||
v,
|
||||
field.annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid value for field '{k}': expected {field.annotations}, got {v!r}"
|
||||
) from exp
|
||||
|
||||
object.__setattr__(obj, k, _deepfreeze(v))
|
||||
obj.__DABSchema__[k] = FrozenDABField(
|
||||
DABField(k, v, field.annotations, field._info)
|
||||
)
|
||||
kwargs.pop(k)
|
||||
|
||||
# Handle feature overrides/attachments
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__.get("features", {}):
|
||||
feat_cls = v if isinstance(v, type) else v.__class__
|
||||
if not issubclass(feat_cls, BaseFeature):
|
||||
raise InvalidFieldValue(
|
||||
f"Override for feature '{k}' must be a Feature class or instance"
|
||||
)
|
||||
feat_cls._BoundAppliance = cls
|
||||
inst = v if isinstance(v, BaseFeature) else feat_cls()
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__DABSchema__["features"][k] = feat_cls
|
||||
kwargs.pop(k)
|
||||
elif isinstance(v, type) and issubclass(v, BaseFeature):
|
||||
v._BoundAppliance = cls
|
||||
inst = v()
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__DABSchema__["features"][k] = v
|
||||
kwargs.pop(k)
|
||||
|
||||
|
||||
class BaseAppliance(BaseElement, metaclass=BaseMetaAppliance):
|
||||
"""BaseFeature class
|
||||
Base class for Appliance.
|
||||
An appliance is a server configuration / image that is built using appliance's code and Fields.
|
||||
|
||||
1042
test/test_model.py
1042
test/test_model.py
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user