continue features implementation + code lint + typing + etc

This commit is contained in:
chacha
2025-09-18 23:21:42 +02:00
parent 3e0defc574
commit 86eee2e378
3 changed files with 1222 additions and 411 deletions

View File

@@ -15,20 +15,26 @@ import warnings
try: # pragma: no cover
__version__ = version("dabmodel")
except PackageNotFoundError: # pragma: no cover
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
except PackageNotFoundError: # pragma: no cover
warnings.warn(
"can not read __version__, assuming local test context, setting it to ?.?.?"
)
__version__ = "?.?.?"
try: # pragma: no cover
dist = distribution("dabmodel")
__Summuary__ = dist.metadata["Summary"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>')
warnings.warn(
'can not read dist.metadata["Summary"], assuming local test context, setting it to <dabmodel description>'
)
__Summuary__ = "dabmodel description"
try: # pragma: no cover
dist = distribution("dabmodel")
__Name__ = dist.metadata["Name"]
except PackageNotFoundError: # pragma: no cover
warnings.warn('can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>')
warnings.warn(
'can not read dist.metadata["Name"], assuming local test context, setting it to <dabmodel>'
)
__Name__ = "dabmodel"

View File

@@ -1,4 +1,4 @@
""" dabmodel model module
"""dabmodel model module
This module implements DAB model classes.
This module contains metaclass and bases classes used to create models.
BaseAppliance can be used to create a new Appliance Data.
@@ -21,7 +21,7 @@ from typing import (
Callable,
Type,
)
from types import UnionType, FunctionType, SimpleNamespace, MethodType
from types import UnionType, FunctionType, SimpleNamespace
from copy import deepcopy, copy
# from pprint import pprint
@@ -65,11 +65,13 @@ class DABModelException(Exception):
Base Exception for DABModelException class
"""
class ReservedFieldName(Exception):
"""DABModelException Exception class
Base Exception for DABModelException class
"""
class MultipleInheritanceForbidden(DABModelException):
"""MultipleInheritanceForbidden Exception class
Multiple inheritance is forbidden when using dabmodel
@@ -141,21 +143,25 @@ class ImportForbidden(DABModelException):
Imports are forbidden
"""
class InvalidFeatureInheritance(DABModelException):
"""InvalidFeatureInheritance Exception class
Features of same name in child appliance need to be from same type
"""
class FunctionForbidden(DABModelException):
"""FunctionForbidden Exception class
function call are forbidden
"""
class FeatureNotBound(DABModelException):
"""FeatureNotBound Exception class
a Feature must be bound to an Appliance
"""
ALLOWED_HELPERS_MATH = SimpleNamespace(
sqrt=math.sqrt,
floor=math.floor,
@@ -211,7 +217,9 @@ def _blocked_import(*args, **kwargs):
def _resolve_annotation(ann):
if isinstance(ann, str):
# Safe eval against a **whitelist** only
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS) # pylint: disable=eval-used
return eval( # pylint: disable=eval-used
ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS
)
return ann
@@ -221,7 +229,11 @@ def _peel_annotated(t: Any) -> Any:
origin = get_origin(t)
if origin is None:
return t
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
name = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in name:
args = get_args(t)
t = args[0] if args else t
@@ -229,11 +241,19 @@ def _peel_annotated(t: Any) -> Any:
return t
def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,too-many-return-statements
def _check_annotation_definition( # pylint: disable=too-complex,too-many-return-statements
_type,
) -> bool:
_type = _peel_annotated(_type)
# handle Optional[] and Union[None,...]
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
return all(_check_annotation_definition(_) for _ in get_args(_type) if _ is not type(None))
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(
None
) in get_args(_type):
return all(
_check_annotation_definition(_)
for _ in get_args(_type)
if _ is not type(None)
)
# handle other Union[...]
if get_origin(_type) is Union or get_origin(_type) is UnionType:
@@ -243,14 +263,20 @@ def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,
if get_origin(_type) is dict:
inner = get_args(_type)
if len(inner) != 2:
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
raise IncompletelyAnnotatedField(
f"Dict Annotation requires 2 inner definitions: {_type}"
)
return _peel_annotated(
inner[0]
) in ALLOWED_MODEL_FIELDS_TYPES and _check_annotation_definition(inner[1])
# handle Tuple[]
if get_origin(_type) in [tuple]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
if len(inner_types) == 2 and inner_types[1] is Ellipsis:
return _check_annotation_definition(inner_types[0])
return all(_check_annotation_definition(_) for _ in inner_types)
@@ -259,7 +285,9 @@ def _check_annotation_definition(_type) -> bool: # pylint: disable=too-complex,
if get_origin(_type) in [set, frozenset, tuple, list]:
inner_types = get_args(_type)
if len(inner_types) == 0:
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
raise IncompletelyAnnotatedField(
f"Annotation requires inner definition: {_type}"
)
return all(_check_annotation_definition(_) for _ in inner_types)
if _type in ALLOWED_MODEL_FIELDS_TYPES:
@@ -300,7 +328,9 @@ def _deepfreeze(value):
class DABFieldInfo:
"""This Class allows to describe a Field in Appliance class"""
def __init__(self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None):
def __init__(
self, *, doc: str = "", constraints: Optional[list[BaseConstraint]] = None
):
self._doc: str = doc
self._constraints: list[BaseConstraint]
if constraints is None:
@@ -414,7 +444,9 @@ class ModelSpecView:
__slots__ = ("_vals", "_types", "_touched", "_name", "_module")
def __init__(self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str):
def __init__(
self, values: dict[str, Any], types_map: dict[str, type], name: str, module: str
):
self._name: str
self._vals: dict[str, Any]
self._types: dict[str, type]
@@ -458,7 +490,9 @@ class ModelSpecView:
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Field <{name}> value is not of expected type {T}.") from exp
raise InvalidFieldValue(
f"Field <{name}> value is not of expected type {T}."
) from exp
self._vals[name] = value
@@ -474,45 +508,82 @@ T_BE = TypeVar("T_BE", bound="BaseElement")
class BaseMeta(type):
"""metaclass to use to build BaseElement"""
modified_field: Dict[str, Any] = {}
modified_fields: Dict[str, Any] = {}
new_fields: Dict[str, DABField[Any]] = {}
initializer: Optional[Callable[..., Any]] = None
__DABSchema__: dict[str, Any] = {}
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], # pylint: disable=unused-argument
extensions : dict[str,Any]
def check_class(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""early BaseElement checks"""
"""
Early class-build hook.
Validates the inheritance shape, initializes an empty schema for root classes,
copies the parent schema for subclasses, and ensures all annotated fields
have a default (inserting `None` when missing).
This runs before the class object is created.
"""
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
if len(bases) > 1:
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
raise MultipleInheritanceForbidden(
"Multiple inheritance is not supported by dabmodel"
)
if len(bases) == 0: # base class (BaseElement)
namespace["__DABSchema__"] = {}
else: # standard inheritance
# check class tree origin
if "__DABSchema__" not in dir(bases[0]):
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
raise BrokenInheritance(
"__DABSchema__ not found in base class, broken inheritance chain."
)
# copy inherited schema
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
# force field without default value to be instantiated (with None)
if "__annotations__" in namespace:
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
for _funknown in [
_ for _ in namespace["__annotations__"] if _ not in namespace.keys()
]:
namespace[_funknown] = None
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any],extensions : dict[str,Any]):
"""preprocessing BaseElement"""
def process_class_fields( # pylint: disable=too-complex,too-many-branches
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
"""
Scan the class namespace and partition fields.
Detects:
- modified fields (shadowing parent values),
- new fields (present in annotations),
- the optional `__initializer` classmethod (in mangled or unmangled form).
Validates annotations and types and removes processed items from `namespace`
so they won't become normal attributes. Results are staged into:
mcs.new_fields, mcs.modified_fields, mcs.initializer
to be committed later.
"""
# iterating new and modified fields
mcs.modified_field = {}
mcs.modified_fields = {}
mcs.new_fields = {}
mcs.initializer = None
initializer_name: Optional[str] = None
for _fname, _fvalue in namespace.items():
if _fname == f"_{name}__initializer" or (name.startswith("_") and _fname == "__initializer"):
if _fname == f"_{name}__initializer" or (
name.startswith("_") and _fname == "__initializer"
):
if not isinstance(_fvalue, classmethod):
raise InvalidInitializerType()
mcs.initializer = _fvalue.__func__
@@ -526,26 +597,44 @@ class BaseMeta(type):
pass
else:
print(f"Parsing Field: {_fname} / {_fvalue}")
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys(): # Modified fields
mcs.pre_processing_modified_fields(name, bases, namespace, _fname, _fvalue, extensions)
if (
len(bases) == 1 and _fname in namespace["__DABSchema__"].keys()
): # Modified fields
mcs.process_modified_field(
name, bases, namespace, _fname, _fvalue, extensions
)
else: # New fieds
mcs.pre_processing_new_fields(name, bases, namespace, _fname, _fvalue, extensions)
mcs.process_new_field(
name, bases, namespace, _fname, _fvalue, extensions
)
# removing modified fields from class (will add them back later)
for _fname in mcs.new_fields:
del namespace[_fname]
for _fname in mcs.modified_field:
for _fname in mcs.modified_fields:
del namespace[_fname]
if mcs.initializer is not None and initializer_name is not None:
del namespace[initializer_name]
@classmethod
def pre_processing_modified_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any,
extensions : dict[str,Any]
def process_modified_field(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""preprocessing BaseElement modified Fields"""
"""
Handle a *modified* field declared by a subclass.
Forbids annotation changes, validates the new default value against
the inherited annotation, and stages the new default into `mcs.modified_fields`.
"""
if "__annotations__" in namespace and _fname in namespace["__annotations__"]:
raise ReadOnlyFieldAnnotation(f"annotations cannot be modified on derived classes {_fname}")
raise ReadOnlyFieldAnnotation(
f"annotations cannot be modified on derived classes {_fname}"
)
try:
check_type(
_fvalue,
@@ -556,62 +645,114 @@ class BaseMeta(type):
raise InvalidFieldValue(
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
) from exp
mcs.modified_field[_fname] = _fvalue
mcs.modified_fields[_fname] = _fvalue
@classmethod
def pre_processing_new_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any,
extensions : dict[str,Any]
def process_new_field(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
#print(f"New field: {_fname}")
"""
Handle a *new* field declared on the class.
Resolves string annotations against a whitelist, validates `Annotated[...]`
payloads (allowing only DABFieldInfo), checks the default value type,
and stages the field as a `DABField` in `mcs.new_fields`.
"""
# print(f"New field: {_fname}")
# check if field is annotated
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
if (
"__annotations__" not in namespace
or _fname not in namespace["__annotations__"]
):
raise NotAnnotatedField(
f"Every dabmodel Fields must be annotated ({_fname})"
)
# check if annotation is allowed
if isinstance(namespace["__annotations__"][_fname], str):
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
namespace["__annotations__"][_fname] = _resolve_annotation(
namespace["__annotations__"][_fname]
)
if not _check_annotation_definition(namespace["__annotations__"][_fname]):
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
raise InvalidFieldAnnotation(
f"Field <{_fname}> has not an allowed or valid annotation."
)
_finfo: DABFieldInfo = DABFieldInfo()
origin = get_origin(namespace["__annotations__"][_fname])
tname = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
tname = (
getattr(origin, "__name__", "")
or getattr(origin, "__qualname__", "")
or str(origin)
)
if "Annotated" in tname:
args = get_args(namespace["__annotations__"][_fname])
if args:
if len(args) > 2:
raise InvalidFieldAnnotation(f"Field <{_fname}> had invalid Annotated value.")
raise InvalidFieldAnnotation(
f"Field <{_fname}> had invalid Annotated value."
)
if len(args) == 2 and not isinstance(args[1], DABFieldInfo):
raise InvalidFieldAnnotation("Only DABFieldInfo object is allowed as Annotated data.")
raise InvalidFieldAnnotation(
"Only DABFieldInfo object is allowed as Annotated data."
)
_finfo = args[1]
# check if value is valid
try:
check_type(_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
check_type(
_fvalue,
namespace["__annotations__"][_fname],
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}.") from exp
mcs.new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname], _finfo)
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
mcs.new_fields[_fname] = DABField(
_fname, _fvalue, namespace["__annotations__"][_fname], _finfo
)
@classmethod
def call_initializer(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], # pylint: disable=unused-argument
extensions : dict[str,Any]
def apply_initializer(
mcs: type["BaseMeta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""BaseElement initializer processing"""
"""
Apply the optional `__initializer` classmethod to compute derived defaults.
The initializer runs in a restricted, import-blocked environment using a
`ModelSpecView` proxy that enforces type checking on assignments.
On success, the computed values are validated and written back into the
class schema's DABFields.
"""
if mcs.initializer is not None:
init_fieldvalues = {}
init_fieldtypes = {}
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue,DABField):
if isinstance(_fvalue, DABField):
init_fieldvalues[_fname] = deepcopy(_fvalue.raw_value)
init_fieldtypes[_fname] = _fvalue.annotations
fakecls = ModelSpecView(init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__)
safe_globals = {"__builtins__": {"__import__": _blocked_import}, **ALLOWED_HELPERS_DEFAULT}
fakecls = ModelSpecView(
init_fieldvalues, init_fieldtypes, cls.__name__, cls.__module__
)
safe_globals = {
"__builtins__": {"__import__": _blocked_import},
**ALLOWED_HELPERS_DEFAULT,
}
if mcs.initializer.__code__.co_freevars:
raise FunctionForbidden("__initializer must not use closures")
safe_initializer = FunctionType(
@@ -624,70 +765,145 @@ class BaseMeta(type):
safe_initializer(fakecls) # pylint: disable=not-callable
for _fname, _fvalue in fakecls.export().items():
try:
check_type(_fvalue, cls.__DABSchema__[_fname].annotations, collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS)
check_type(
_fvalue,
cls.__DABSchema__[_fname].annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
) from exp
cls.__DABSchema__[_fname].update_value(_fvalue)
def __new__(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any]) -> Type:
def __new__(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
) -> Type:
"""BaseElement new class"""
extensions : dict[str,Any] = dict()
mcs.pre_check(name, bases, namespace, extensions)
mcs.pre_processing(name, bases, namespace, extensions)
extensions: dict[str, Any] = {}
mcs.check_class(name, bases, namespace, extensions)
mcs.process_class_fields(name, bases, namespace, extensions)
_cls = super().__new__(mcs, name, bases, namespace)
mcs.save_values(_cls, name, bases, namespace, extensions)
mcs.call_initializer(_cls, name, bases, namespace, extensions)
_cls.install_guard(extensions)
mcs.commit_fields(_cls, name, bases, namespace, extensions)
mcs.apply_initializer(_cls, name, bases, namespace, extensions)
_cls.install_instance_guard(extensions)
return _cls
@classmethod
def save_values(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], # pylint: disable=unused-argument
extensions : dict[str,Any]
def commit_fields(
mcs: type["BaseMeta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
for _fname, _fvalue in mcs.modified_field.items():
"""
Commit staged fields into the class schema (`__DABSchema__`).
- For modified fields: copy the parent's DABField, update its value.
- For new fields: set the freshly built DABField and record its source.
"""
for _fname, _fvalue in mcs.modified_fields.items():
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
cls.__DABSchema__[_fname].update_value(_fvalue)
for _fname, _fvalue in mcs.new_fields.items():
_fvalue.add_source(mcs)
_fvalue.add_source(cls)
cls.__DABSchema__[_fname] = _fvalue
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseElement new instance"""
obj = super().__call__(*args, **kw)
extensions = dict()
obj = super().__call__(*args)
extensions: dict[str, Any] = {}
cls.populate_instance( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.freeze_instance_schema( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.apply_overrides( # pylint: disable=no-value-for-parameter
obj, extensions, *args, **kw
)
cls.finalize_instance(obj, extensions) # pylint: disable=no-value-for-parameter
return obj
def populate_instance(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Populate the new instance with field values from the class schema.
Copies each DABField.value to an instance attribute (deep-frozen view).
"""
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue,DABField):
if isinstance(_fvalue, DABField):
object.__setattr__(obj, _fname, _fvalue.value)
def freeze_instance_schema(
cls: Type, obj: Any, extensions: dict[str, Any], *args: Any, **kw: Any
):
"""
Freeze the instance's schema by wrapping DABFields into FrozenDABField.
Creates a per-instance `__DABSchema__` dict where each field is read-only.
"""
inst_schema = copy(obj.__DABSchema__)
for _fname, _fvalue in cls.__DABSchema__.items():
if isinstance(_fvalue,DABField):
if isinstance(_fvalue, DABField):
inst_schema[_fname] = FrozenDABField(_fvalue)
if "features" in inst_schema:
inst_schema["features"] = dict(inst_schema["features"])
object.__setattr__(obj, "__DABSchema__", inst_schema)
cls.modify_object(obj,extensions)
return obj
def modify_object(cls:Type,obj,extensions : dict[str,Any]):
pass
def install_guard(cls:Type,extensions : dict[str,Any]):
orig_setattr = getattr(cls,"__setattr__")
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Hook for runtime overrides at instance creation.
Invoked after the schema has been frozen but before finalize_instance.
Subclasses of BaseMeta can override this to support things like:
- Field overrides: MyApp(field=value)
- Feature overrides: MyApp(FeatureName=CustomFeature)
- Feature attachments: MyApp(NewFeature=FeatureClass)
By default this does nothing.
"""
def finalize_instance(cls: Type, obj: Any, extensions: dict[str, Any]):
"""
Finalization hook invoked at the end of instance construction.
Subclasses of the metaclass override this to attach runtime components
to the instance. (Example: BaseMetaAppliance instantiates bound Features
and sets them as attributes on the appliance instance.)
"""
def install_instance_guard(cls: Type, extensions: dict[str, Any]):
"""
Install the runtime `__setattr__` guard on the class.
After instances are constructed, prevents:
- creating new public fields,
- reassigning existing fields post-initialization.
Private/dunder attributes are exempt to allow internal bookkeeping.
"""
orig_setattr = getattr(cls, "__setattr__")
# cls.orig_setattr = orig_setattr
def guarded_setattr(_self, key: str, value: Any):
if key.startswith("_"): # allow private and dunder attrs
return orig_setattr(_self, key, value)
@@ -695,9 +911,9 @@ class BaseMeta(type):
if key in _self.__DABSchema__.keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
elif key in _self.__DABSchema__["features"].keys():
if key in _self.__dict__:
raise ReadOnlyField(f"{key} is read-only")
# elif key in _self.__DABSchema__["features"].keys():
# if key in _self.__dict__:
# raise ReadOnlyField(f"{key} is read-only")
else:
raise NewFieldForbidden("creating new fields is not allowed")
@@ -705,92 +921,201 @@ class BaseMeta(type):
setattr(cls, "__setattr__", guarded_setattr)
class BaseElement(metaclass=BaseMeta):
"""BaseElement class
Base class to apply metaclass and set common Fields.
"""
class BaseMetaFeature(BaseMeta):
"""BaseMetaFeature class
Feature specific metaclass code
"""
def __call__(cls: Type, *args: Any, **kw: Any): # intentionally untyped
"""BaseFeature new instance"""
if cls._BoundAppliance is None:
raise FeatureNotBound()
obj = super().__call__(*args, **kw)
return obj
class BaseFeature(BaseElement,metaclass=BaseMetaFeature):
class BaseFeature(BaseElement, metaclass=BaseMetaFeature):
"""BaseFeature class
Base class for Appliance's Features.
Features are optional traits of an appliance.
"""
_BoundAppliance:"Optional[Type[BaseAppliance]]" = None
Enabled:bool=False
_BoundAppliance: "Optional[Type[BaseAppliance]]" = None
Enabled: bool = False
class BaseMetaAppliance(BaseMeta):
"""BaseMetaAppliance class
Appliance specific metaclass code
"""
@classmethod
def pre_check(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], # pylint: disable=unused-argument
extensions : dict[str,Any]
def check_class(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
) -> None:
"""early BaseElement checks"""
super().pre_check(name,bases,namespace,extensions)
"""
Appliance-specific pre-check: ensure the `features` slot exists in schema.
Copies the parent's `features` mapping when inheriting to keep it per-class.
"""
super().check_class(name, bases, namespace, extensions) # type: ignore[misc]
if "features" not in namespace["__DABSchema__"]:
namespace["__DABSchema__"]["features"]={}
namespace["__DABSchema__"]["features"] = {}
else:
namespace["__DABSchema__"]["features"] = copy(namespace["__DABSchema__"]["features"])
return
namespace["__DABSchema__"]["features"] = copy(
namespace["__DABSchema__"]["features"]
)
@classmethod
def pre_processing(mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any],extensions : dict[str,Any]):
extensions["new_features"]: dict[str,type[BaseFeature]] = {}
extensions["modified_features"]: dict[str,type[BaseFeature]] = {}
super().pre_processing(name,bases,namespace,extensions)
@classmethod
def pre_processing_new_fields(
mcs: type["BaseMeta"], name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], _fname: str, _fvalue: Any,
extensions : dict[str,Any]
): # pylint: disable=unused-argument
"""preprocessing BaseElement new Fields"""
#print('pre_processing_new_fields')
if _fname in namespace["__DABSchema__"]["features"].keys():
if not issubclass(_fvalue,namespace["__DABSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(f"Feature {_fname} is not an instance of {bases[0]}.{_fname}")
extensions["modified_features"][_fname]=_fvalue
elif isinstance(_fvalue,BaseMetaFeature):
extensions["new_features"][_fname]=_fvalue
else:
super().pre_processing_new_fields(name,bases,namespace,_fname,_fvalue,extensions)
@classmethod
def save_values(
mcs: type["BaseMeta"], cls, name: str, bases: tuple[type[Any], ...], namespace: dict[str, Any], # pylint: disable=unused-argument
extensions : dict[str,Any]
def process_class_fields(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
extensions: dict[str, Any],
):
super().save_values(cls,name,bases,namespace,extensions)
"""
Like BaseMeta.process_class_fields but also stages Feature declarations.
for _ftname,_ftvalue in extensions["modified_features"].items():
_ftvalue._BoundAppliance = cls
Initializes:
extensions["new_features"], extensions["modified_features"]
then defers to the base scanner for regular fields.
"""
extensions["new_features"] = {}
extensions["modified_features"] = {}
super().process_class_fields(name, bases, namespace, extensions) # type: ignore[misc]
@classmethod
def process_new_field(
mcs: type["BaseMeta"],
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any],
_fname: str,
_fvalue: Any,
extensions: dict[str, Any],
): # pylint: disable=unused-argument
"""
Intercept Feature declarations.
- If `_fname` already exists in parent's `features`, enforce same type;
stage into `modified_features`.
- Else, if `_fvalue` is a Feature *class*, stage into `new_features`.
- Otherwise, it is a regular field: delegate to BaseMeta.process_new_field.
"""
if _fname in namespace["__DABSchema__"]["features"].keys():
if not issubclass(_fvalue, namespace["__DABSchema__"]["features"][_fname]):
raise InvalidFeatureInheritance(
f"Feature {_fname} is not an instance of {bases[0]}.{_fname}"
)
extensions["modified_features"][_fname] = _fvalue
elif isinstance(_fvalue, type) and issubclass(_fvalue, BaseFeature):
extensions["new_features"][_fname] = _fvalue
else:
super().process_new_field(name, bases, namespace, _fname, _fvalue, extensions) # type: ignore[misc]
@classmethod
def commit_fields(
mcs: type["BaseMeta"],
cls,
name: str,
bases: tuple[type[Any], ...],
namespace: dict[str, Any], # pylint: disable=unused-argument
extensions: dict[str, Any],
):
"""
Commit regular fields (via BaseMeta) and then bind staged Feature classes.
For each new/modified feature:
- bind it to `cls` (sets the feature's `_BoundAppliance`),
- register it under `cls.__DABSchema__["features"]`.
"""
super().commit_fields(cls, name, bases, namespace, extensions) # type: ignore[misc]
for _ftname, _ftvalue in extensions["modified_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__DABSchema__["features"][_ftname] = _ftvalue
for _ftname,_ftvalue in extensions["new_features"].items():
_ftvalue._BoundAppliance = cls
for _ftname, _ftvalue in extensions["new_features"].items():
_ftvalue._BoundAppliance = cls # pylint: disable=protected-access
cls.__DABSchema__["features"][_ftname] = _ftvalue
def modify_object(cls:Type, obj, extensions : dict[str,Any]): # intentionally untyped
for _ftname,_ftvalue in cls.__DABSchema__["features"].items():
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
"""
Instantiate and attach all declared Feature classes on the appliance instance.
Each feature is constructed (running the same populate/freeze steps),
then assigned to `obj.<FeatureName>`.
"""
for _ftname, _ftvalue in cls.__DABSchema__["features"].items():
instft = _ftvalue()
object.__setattr__(obj, _ftname,instft )
class BaseAppliance(BaseElement,metaclass=BaseMetaAppliance):
object.__setattr__(obj, _ftname, instft)
def apply_overrides(cls, obj, extensions, *args, **kwargs):
"""
Support for runtime field and feature overrides.
Examples:
MyApp(name="foo") # field override
MyApp(Nginx=CustomNginxFeature) # override existing feature
MyApp(Redis=RedisFeature) # attach new feature
"""
# Handle field overrides
for k, v in list(kwargs.items()):
if k in cls.__DABSchema__: # regular field
field = cls.__DABSchema__[k]
try:
check_type(
v,
field.annotations,
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
)
except TypeCheckError as exp:
raise InvalidFieldValue(
f"Invalid value for field '{k}': expected {field.annotations}, got {v!r}"
) from exp
object.__setattr__(obj, k, _deepfreeze(v))
obj.__DABSchema__[k] = FrozenDABField(
DABField(k, v, field.annotations, field._info)
)
kwargs.pop(k)
# Handle feature overrides/attachments
for k, v in list(kwargs.items()):
if k in cls.__DABSchema__.get("features", {}):
feat_cls = v if isinstance(v, type) else v.__class__
if not issubclass(feat_cls, BaseFeature):
raise InvalidFieldValue(
f"Override for feature '{k}' must be a Feature class or instance"
)
feat_cls._BoundAppliance = cls
inst = v if isinstance(v, BaseFeature) else feat_cls()
object.__setattr__(obj, k, inst)
obj.__DABSchema__["features"][k] = feat_cls
kwargs.pop(k)
elif isinstance(v, type) and issubclass(v, BaseFeature):
v._BoundAppliance = cls
inst = v()
object.__setattr__(obj, k, inst)
obj.__DABSchema__["features"][k] = v
kwargs.pop(k)
class BaseAppliance(BaseElement, metaclass=BaseMetaAppliance):
"""BaseFeature class
Base class for Appliance.
An appliance is a server configuration / image that is built using appliance's code and Fields.

File diff suppressed because it is too large Load Diff