Merge branch 'dev' of https://chacha.ddns.net/gitea/chacha/dabmodel.git
into dev
This commit is contained in:
@@ -1,13 +1,20 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
|
||||
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
<path>/${PROJECT_DIR_NAME}/src</path>
|
||||
<path>/${PROJECT_DIR_NAME}</path>
|
||||
</pydev_pathproperty>
|
||||
|
||||
</pydev_project>
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python311</pydev_property>
|
||||
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
|
||||
|
||||
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
|
||||
<path>/${PROJECT_DIR_NAME}/src</path>
|
||||
|
||||
<path>/${PROJECT_DIR_NAME}</path>
|
||||
|
||||
</pydev_pathproperty>
|
||||
|
||||
|
||||
</pydev_project>
|
||||
|
||||
@@ -11,5 +11,4 @@ Main module __init__ file.
|
||||
"""
|
||||
|
||||
from .__metadata__ import __version__, __Summuary__, __Name__
|
||||
from .model import DABField, BaseFeature, BaseAppliance, default_values_override
|
||||
from .tools import DABJSONEncoder
|
||||
from .model import BaseFeature, BaseAppliance
|
||||
|
||||
@@ -1,198 +1,437 @@
|
||||
# pylint: disable=C0114,C0115,C0116
|
||||
from __future__ import annotations
|
||||
from typing import Any, Dict, Optional, TypeVar, Generic, get_origin, get_args, Annotated as _Annotated, Union
|
||||
|
||||
from uuid import uuid4, UUID
|
||||
from datetime import datetime as _dt
|
||||
|
||||
from abc import ABC, ABCMeta
|
||||
from uuid import uuid4
|
||||
from typing import Annotated, ClassVar, Any, Self, TypeVar, TypeAlias, Generic
|
||||
from datetime import datetime
|
||||
from copy import deepcopy, copy
|
||||
from typing_extensions import dataclass_transform
|
||||
from pydantic import (
|
||||
BaseModel,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
constr,
|
||||
ByteSize,
|
||||
Field,
|
||||
ConfigDict,
|
||||
model_validator,
|
||||
AwareDatetime,
|
||||
UUID4,
|
||||
model_validator,
|
||||
ByteSize,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
)
|
||||
from pydantic.fields import Field, _Unset, PydanticUndefined
|
||||
from pydantic._internal._model_construction import ModelMetaclass, PydanticModelField
|
||||
from pydantic._internal._generics import PydanticGenericMetadata
|
||||
from pydantic._internal._decorators import ensure_classmethod_based_on_signature
|
||||
from pydantic._internal._model_construction import ModelMetaclass
|
||||
import pytz
|
||||
|
||||
from runtype import issubclass as runtype_issubclass
|
||||
import copy as _copy
|
||||
|
||||
|
||||
class NoInstanceMethod:
|
||||
"""Descriptor to forbid that other descriptors can be looked up on an instance"""
|
||||
# ===================== Maintainers’ policy =====================
|
||||
|
||||
def __init__(self, descr, name=None):
|
||||
self.descr = descr
|
||||
self.name = name
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
# enforce the instance cannot look up the attribute at all
|
||||
if instance is not None:
|
||||
raise AttributeError(f"{type(instance).__name__!r} has no attribute {self.name!r}")
|
||||
# invoke any descriptor we are wrapping
|
||||
return self.descr.__get__(instance, owner)
|
||||
ALLOWED_APPLIANCE_FIELD_TYPES: set[type] = {
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ByteSize,
|
||||
UUID4,
|
||||
UUID,
|
||||
AwareDatetime,
|
||||
_dt,
|
||||
bool,
|
||||
int,
|
||||
str,
|
||||
}
|
||||
ALLOWED_FEATURE_FIELD_TYPES: set[type] = {
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
ByteSize,
|
||||
UUID4,
|
||||
UUID,
|
||||
AwareDatetime,
|
||||
_dt,
|
||||
bool,
|
||||
int,
|
||||
str,
|
||||
}
|
||||
|
||||
|
||||
def DABField(default: Any = PydanticUndefined, *, final: bool | None = _Unset, finalize_only: bool | None = _Unset, **kwargs):
|
||||
kwargs["final"] = final is not None
|
||||
kwargs["finalize_only"] = finalize_only is not None
|
||||
return Field(default, **kwargs)
|
||||
def _is_allowed_type(annotation: Any, allowed: set[type]) -> bool:
|
||||
origin = get_origin(annotation)
|
||||
if origin is _Annotated or (origin is not None and getattr(origin, "__name__", "") == "Annotated"):
|
||||
return _is_allowed_type(get_args(annotation)[0], allowed)
|
||||
if origin is None:
|
||||
return annotation in allowed
|
||||
if origin is Union:
|
||||
args = [a for a in get_args(annotation) if a is not type(None)]
|
||||
return all(_is_allowed_type(a, allowed) for a in args)
|
||||
args = get_args(annotation)
|
||||
if origin in (list, tuple, dict, set):
|
||||
return all(_is_allowed_type(a, allowed) for a in args if a is not None)
|
||||
return all(_is_allowed_type(a, allowed) for a in args if a is not None)
|
||||
|
||||
|
||||
T_BaseElement = TypeVar("T_BaseElement", bound="BaseElement")
|
||||
T_BaseElement_ConfigMethod_Arg: TypeAlias = dict[str, Any]
|
||||
T_BaseElement_ConfigMethod: TypeAlias = "classmethod[T_BaseElement, [T_BaseElement_ConfigMethod_Arg], T_BaseElement_ConfigMethod_Arg]"
|
||||
def _is_feature_annotation(annotation: Any) -> bool:
|
||||
origin = get_origin(annotation)
|
||||
if origin is Union:
|
||||
args = [a for a in get_args(annotation) if a is not type(None)]
|
||||
return any(_is_feature_annotation(a) for a in args)
|
||||
base = annotation
|
||||
try:
|
||||
return isinstance(base, type) and issubclass(base, BaseFeature) # type: ignore[name-defined]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class ConfigElement:
|
||||
def __init__(self) -> None:
|
||||
self.default_values_override_methods: dict[T_BaseElement_ConfigMethod, None] = {}
|
||||
self.main_build_method: dict[T_BaseElement_ConfigMethod, None] = {}
|
||||
|
||||
def __copy__(self) -> Self:
|
||||
# we cannot deepcopy because of classmethods, so we do a manual enhanced copy
|
||||
cls = self.__class__
|
||||
result = cls.__new__(cls)
|
||||
for k, v in self.__dict__.items():
|
||||
setattr(result, k, copy(v))
|
||||
return result
|
||||
def _annotation_equal(a: Any, b: Any) -> bool:
|
||||
oa, ob = get_origin(a), get_origin(b)
|
||||
if oa is None and ob is None:
|
||||
return a is b
|
||||
if oa != ob:
|
||||
return False
|
||||
aa, ab = get_args(a), get_args(b)
|
||||
if len(aa) != len(ab):
|
||||
return False
|
||||
return all(_annotation_equal(x, y) for x, y in zip(aa, ab))
|
||||
|
||||
|
||||
class IBaseElement(BaseModel, ABC):
|
||||
_config_element: ClassVar[ConfigElement] = ConfigElement()
|
||||
# ===================== Defaults mini-DSL (values only) =====================
|
||||
|
||||
|
||||
@dataclass_transform(kw_only_default=True, field_specifiers=(PydanticModelField,))
|
||||
class BaseElementMeta(ModelMetaclass, ABCMeta):
|
||||
def __new__(
|
||||
mcs,
|
||||
cls_name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
__pydantic_generic_metadata__: PydanticGenericMetadata | None = None,
|
||||
__pydantic_reset_parent_namespace__: bool = True,
|
||||
_create_model_module: str | None = None,
|
||||
**kwargs: Any,
|
||||
) -> type:
|
||||
result = super().__new__(
|
||||
mcs,
|
||||
cls_name,
|
||||
bases,
|
||||
namespace,
|
||||
None, # __pydantic_generic_metadata__,
|
||||
True, # __pydantic_reset_parent_namespace__,
|
||||
None, # _create_model_module,
|
||||
**kwargs,
|
||||
)
|
||||
class _DefaultsSpec:
|
||||
__slots__ = ("mapping",)
|
||||
|
||||
assert issubclass(result, IBaseElement), "Only IBaseElement subclasses are supported"
|
||||
def __init__(self, mapping: Dict[str, Any]):
|
||||
self.mapping = mapping
|
||||
|
||||
# forcing all Fields to be frozen
|
||||
for _, field_val in result.model_fields.items():
|
||||
field_val.frozen = True
|
||||
|
||||
# copying/forwarding base classes default-configs
|
||||
if "_config_element" not in result.__dict__:
|
||||
assert result.__base__ is not None, "Only IBaseElement subclasses are supported"
|
||||
if issubclass(result.__base__, IBaseElement):
|
||||
result._config_element = copy(result.__base__._config_element)
|
||||
def defaults(**kwargs) -> _DefaultsSpec:
|
||||
"""
|
||||
Flat, values-only defaults.
|
||||
Supports '.' or '__' for nesting, e.g.:
|
||||
Defaults = defaults(
|
||||
template_short_name="apx",
|
||||
Network__enabled=True,
|
||||
Network__mtu=9000,
|
||||
)
|
||||
"""
|
||||
return _DefaultsSpec(kwargs)
|
||||
|
||||
|
||||
def _split_path(key: str) -> list[str]:
|
||||
return key.replace("__", ".").split(".")
|
||||
|
||||
|
||||
def _merge_path(target: Dict[str, Any], path: str, value: Any):
|
||||
parts = _split_path(path)
|
||||
node = target
|
||||
for i, p in enumerate(parts):
|
||||
if i == len(parts) - 1:
|
||||
node[p] = value
|
||||
else:
|
||||
node = node.setdefault(p, {}) # type: ignore[assignment]
|
||||
|
||||
|
||||
def _flatten_defaults_class(cls: type) -> Dict[str, Any]:
|
||||
out: Dict[str, Any] = {}
|
||||
for name, val in vars(cls).items():
|
||||
if name.startswith("__"):
|
||||
continue
|
||||
if isinstance(val, type):
|
||||
sub = _flatten_defaults_class(val)
|
||||
for k, v in sub.items():
|
||||
out[f"{name}.{k}"] = v
|
||||
else:
|
||||
out[name] = val
|
||||
return out
|
||||
|
||||
|
||||
def _deep_copy(v):
|
||||
try:
|
||||
return _copy.deepcopy(v)
|
||||
except Exception:
|
||||
if isinstance(v, dict):
|
||||
return {k: _deep_copy(val) for k, val in v.items()}
|
||||
if isinstance(v, list):
|
||||
return [_deep_copy(x) for x in v]
|
||||
if isinstance(v, tuple):
|
||||
return tuple(_deep_copy(x) for x in v)
|
||||
return v
|
||||
|
||||
|
||||
def _deep_merge(dst: Dict[str, Any], src: Dict[str, Any]):
|
||||
for k, v in src.items():
|
||||
if isinstance(v, dict):
|
||||
if isinstance(dst.get(k), dict):
|
||||
_deep_merge(dst[k], v) # type: ignore[index]
|
||||
else:
|
||||
result._config_element = ConfigElement()
|
||||
|
||||
# searching and storing current class default-configs
|
||||
for _, method in result.__dict__.items():
|
||||
if isinstance(method, classmethod):
|
||||
if hasattr(method, "default_values_override"):
|
||||
result._config_element.default_values_override_methods[method] = None
|
||||
|
||||
# todo: find a way to 'lock' and add restriction to a field after inheritance
|
||||
|
||||
return result
|
||||
dst[k] = _deep_copy(v)
|
||||
else:
|
||||
dst[k] = v
|
||||
|
||||
|
||||
class BaseElement(
|
||||
IBaseElement,
|
||||
ABC,
|
||||
validate_assignment=True,
|
||||
# revalidate_instances="subclass-instances", # pydantic issue #10681
|
||||
validate_default=True,
|
||||
extra="forbid",
|
||||
metaclass=BaseElementMeta,
|
||||
):
|
||||
class Config:
|
||||
ignored_types = (NoInstanceMethod,)
|
||||
# ===================== Core base classes =====================
|
||||
|
||||
template_id: Annotated[UUID4, DABField(..., repr=True)]
|
||||
template_short_name: Annotated[
|
||||
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
|
||||
]
|
||||
template_long_name: Annotated[StrictStr | None, DABField()]
|
||||
template_description: Annotated[StrictStr | None, DABField()]
|
||||
_saved_default_value: ClassVar[dict[str, Any]]
|
||||
T_Feature = TypeVar("T_Feature", bound="BaseFeature")
|
||||
|
||||
|
||||
class BaseElement(BaseModel):
|
||||
"""
|
||||
Base for Appliance/Feature:
|
||||
- Declarative Defaults (inner class or mapping), values only
|
||||
- Allowed-type enforcement at class creation (schema-time)
|
||||
- Frozen instances
|
||||
"""
|
||||
|
||||
model_config = ConfigDict(
|
||||
frozen=True,
|
||||
extra="forbid",
|
||||
validate_assignment=True,
|
||||
protected_namespaces=(), # allow names like __defaults_map__
|
||||
)
|
||||
|
||||
__defaults_map__: Dict[str, Any] = {}
|
||||
|
||||
def __init_subclass__(cls, **kwargs):
|
||||
super().__init_subclass__(**kwargs)
|
||||
|
||||
# 1) Collect Defaults (values) along MRO (base -> subclass)
|
||||
merged: Dict[str, Any] = {}
|
||||
for base in reversed(cls.mro()):
|
||||
spec = base.__dict__.get("Defaults")
|
||||
if spec is None:
|
||||
continue
|
||||
if isinstance(spec, _DefaultsSpec):
|
||||
flat = spec.mapping
|
||||
elif isinstance(spec, type):
|
||||
flat = _flatten_defaults_class(spec)
|
||||
else:
|
||||
continue
|
||||
for k, v in flat.items():
|
||||
_merge_path(merged, k, v)
|
||||
cls.__defaults_map__ = merged # keep 'enabled' here; strip later during validation
|
||||
|
||||
# 2) Schema-time allowed types for non-feature fields
|
||||
is_appl = any(b.__name__ == "BaseAppliance" for b in cls.mro())
|
||||
is_feat = any(b.__name__ == "BaseFeature" for b in cls.mro())
|
||||
if is_appl or is_feat:
|
||||
allowed = ALLOWED_APPLIANCE_FIELD_TYPES if is_appl else ALLOWED_FEATURE_FIELD_TYPES
|
||||
for name, f in getattr(cls, "model_fields", {}).items():
|
||||
ann = f.annotation
|
||||
if ann is None:
|
||||
continue
|
||||
if _is_feature_annotation(ann):
|
||||
continue # feature fields checked in ApplianceMeta
|
||||
if not _is_allowed_type(ann, allowed):
|
||||
kind = "Appliance" if is_appl else "Feature"
|
||||
raise TypeError(f"{kind} field '{cls.__name__}.{name}' has disallowed type: {ann}")
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def __default_values_override_hook__(cls, values: T_BaseElement_ConfigMethod_Arg) -> T_BaseElement_ConfigMethod_Arg:
|
||||
# extracting default values that were set in model fields
|
||||
cls._saved_default_value = {}
|
||||
for field_key, field_val in cls.model_fields.items():
|
||||
assert field_val.annotation is not None, "all fields must have annotation"
|
||||
assert not runtype_issubclass(field_val.annotation, BaseFeature), "Features can only be contained within an Appliance"
|
||||
if field_val.default != PydanticUndefined:
|
||||
cls._saved_default_value[field_key] = deepcopy(field_val.default)
|
||||
for method, _ in cls._config_element.default_values_override_methods.items():
|
||||
method.__func__(cls, cls._saved_default_value)
|
||||
def _apply_defaults_then_input(cls, values: Any):
|
||||
if not isinstance(values, dict):
|
||||
return values
|
||||
|
||||
cls._default_values_override_hook__input_apply__(values)
|
||||
# 1) seed from declared field defaults
|
||||
merged: Dict[str, Any] = {}
|
||||
for name, f in cls.model_fields.items():
|
||||
if f.default is not None:
|
||||
merged[name] = f.default
|
||||
elif getattr(f, "default_factory", None) is not None:
|
||||
merged[name] = f.default_factory() # type: ignore[misc]
|
||||
|
||||
return cls._saved_default_value
|
||||
# 2) merge class Defaults (values only), then 3) user values
|
||||
_deep_merge(merged, cls.__defaults_map__)
|
||||
_deep_merge(merged, values)
|
||||
|
||||
@NoInstanceMethod
|
||||
# 3) per-feature handling
|
||||
for name, f in cls.model_fields.items():
|
||||
ann = f.annotation
|
||||
args_origin = get_origin(ann)
|
||||
base_ann = [a for a in get_args(ann) if a is not type(None)][0] if args_origin is Union else ann
|
||||
try:
|
||||
if isinstance(base_ann, type) and issubclass(base_ann, BaseFeature):
|
||||
# user override flag?
|
||||
user_block = values.get(name) if isinstance(values, dict) else None
|
||||
user_enabled = None
|
||||
if isinstance(user_block, dict) and "enabled" in user_block:
|
||||
user_enabled = bool(user_block.get("enabled"))
|
||||
|
||||
# class default flag?
|
||||
class_enabled = None
|
||||
defaults_block = cls.__defaults_map__.get(name) if isinstance(cls.__defaults_map__, dict) else None
|
||||
if isinstance(defaults_block, dict) and "enabled" in defaults_block:
|
||||
class_enabled = bool(defaults_block.get("enabled"))
|
||||
|
||||
v = merged.get(name)
|
||||
|
||||
# fallback: enabled still present in merged?
|
||||
merged_enabled = None
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
merged_enabled = bool(v.get("enabled"))
|
||||
|
||||
# precedence: user > class > merged
|
||||
final_enabled = (
|
||||
user_enabled if user_enabled is not None else (class_enabled if class_enabled is not None else merged_enabled)
|
||||
)
|
||||
|
||||
# strip enabled key from payload (feature models forbid extras)
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
v = dict(v)
|
||||
v.pop("enabled", None)
|
||||
merged[name] = v
|
||||
|
||||
if final_enabled is False:
|
||||
merged[name] = None
|
||||
continue
|
||||
|
||||
# Merge feature defaults to ensure required fields are present
|
||||
feat_defaults = getattr(base_ann, "__defaults_map__", {}) or {}
|
||||
if final_enabled is True and v is None:
|
||||
merged[name] = dict(feat_defaults)
|
||||
elif isinstance(v, dict):
|
||||
cfg: Dict[str, Any] = {}
|
||||
_deep_merge(cfg, feat_defaults)
|
||||
_deep_merge(cfg, v)
|
||||
merged[name] = cfg
|
||||
# final_enabled is None and v is None → leave as None
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
class BaseFeature(BaseElement):
|
||||
# Make required-at-runtime fields optional here to avoid early ValidationError
|
||||
# when someone does `Extra: Optional[Extra] = Extra()` during class body execution.
|
||||
template_id: Optional[UUID4] = None
|
||||
template_short_name: Optional[StrictStr] = None
|
||||
template_long_name: Optional[StrictStr] = None
|
||||
template_description: Optional[StrictStr] = None
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def _default_values_override_hook__input_apply__(
|
||||
cls,
|
||||
values: T_BaseElement_ConfigMethod_Arg,
|
||||
):
|
||||
# applying user-defined values
|
||||
for attr_key, attr_val in values.items():
|
||||
assert attr_key in cls.model_fields, f"given feature attribute does not exist ({attr_key})"
|
||||
cls._saved_default_value[attr_key] = attr_val
|
||||
def _strip_enabled_flag(cls, v: Any):
|
||||
# Features never accept `enabled`; strip if present from Defaults or user input.
|
||||
if isinstance(v, dict) and "enabled" in v:
|
||||
v = dict(v)
|
||||
v.pop("enabled", None)
|
||||
return v
|
||||
|
||||
|
||||
class BaseFeature(BaseElement, ABC): ...
|
||||
class ApplianceMeta(ModelMetaclass):
|
||||
def __new__(mcls, name, bases, namespace, **kwargs):
|
||||
# Skip special handling for the abstract BaseAppliance itself
|
||||
if name == "BaseAppliance":
|
||||
return super().__new__(mcls, name, bases, namespace, **kwargs)
|
||||
|
||||
# 1) Auto-inject feature fields from THIS class's inner `Features`
|
||||
declared_here: Dict[str, type[BaseFeature]] = {}
|
||||
inner = namespace.get("Features")
|
||||
if isinstance(inner, type):
|
||||
for fname, fval in vars(inner).items():
|
||||
if fname.startswith("__"):
|
||||
continue
|
||||
if isinstance(fval, type) and issubclass(fval, BaseFeature):
|
||||
declared_here[fname] = fval
|
||||
namespace.setdefault("__annotations__", {})
|
||||
if fname not in namespace["__annotations__"]:
|
||||
namespace["__annotations__"][fname] = Optional[fval] # type: ignore[index]
|
||||
namespace[fname] = None # default None; Defaults can enable/configure
|
||||
|
||||
# PRE-CHECK: feature fields added in this class must be declared in this class's inner Features
|
||||
parent = next((b for b in bases if isinstance(b, ApplianceMeta)), None)
|
||||
parent_features_early = getattr(parent, "__feature_fields__", {}) if parent else {}
|
||||
annotations = namespace.get("__annotations__", {})
|
||||
for field_name, ann_type in list(annotations.items()):
|
||||
origin = get_origin(ann_type)
|
||||
base_ann = [a for a in get_args(ann_type) if a is not type(None)][0] if origin is Union else ann_type
|
||||
try:
|
||||
is_feat = isinstance(base_ann, type) and issubclass(base_ann, BaseFeature)
|
||||
except Exception:
|
||||
is_feat = False
|
||||
if is_feat:
|
||||
if field_name not in parent_features_early and field_name not in declared_here:
|
||||
raise TypeError(f"{name}: new feature '{field_name}' must be declared in this class's inner `Features`.")
|
||||
|
||||
# 2) Create the class via Pydantic's metaclass
|
||||
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
|
||||
|
||||
# 3) Discover current feature fields (after Pydantic processed annotations)
|
||||
features_now: Dict[str, type[BaseFeature]] = {}
|
||||
for field_name, f in cls.model_fields.items():
|
||||
ann = f.annotation
|
||||
args_origin = get_origin(ann)
|
||||
base_ann = [a for a in get_args(ann) if a is not type(None)][0] if args_origin is Union else ann
|
||||
try:
|
||||
if issubclass(base_ann, BaseFeature): # type: ignore[arg-type]
|
||||
features_now[field_name] = base_ann # type: ignore[assignment]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 4) Enforce parent schema monotonicity (no removal/type changes), allow additions
|
||||
parent_cls = next((b for b in cls.__mro__[1:] if isinstance(b, ApplianceMeta)), None)
|
||||
parent_fields = dict(getattr(parent_cls, "model_fields", {})) if parent_cls else {}
|
||||
parent_features = getattr(parent_cls, "__feature_fields__", {}) if parent_cls else {}
|
||||
|
||||
# PARENT FIELDS: present with identical annotation + same constraints (skip feature fields)
|
||||
for pname, pfield in parent_fields.items():
|
||||
if pname in parent_features:
|
||||
continue
|
||||
if pname not in cls.model_fields:
|
||||
raise TypeError(f"{cls.__name__}: removing parent field '{pname}' is forbidden.")
|
||||
child_field = cls.model_fields[pname]
|
||||
if (not _annotation_equal(child_field.annotation, pfield.annotation)) or (
|
||||
tuple(child_field.metadata) != tuple(pfield.metadata)
|
||||
):
|
||||
raise TypeError(f"{cls.__name__}: changing type/constraints of parent field '{pname}' is forbidden.")
|
||||
|
||||
# PARENT FEATURES: present with identical type
|
||||
for fname, ftype in parent_features.items():
|
||||
if fname not in features_now:
|
||||
raise TypeError(f"{cls.__name__}: removing parent feature '{fname}' is forbidden.")
|
||||
if features_now[fname] is not ftype:
|
||||
raise TypeError(f"{cls.__name__}: retargeting feature '{fname}' is forbidden.")
|
||||
|
||||
# NEW FEATURES must be declared in THIS class's inner Features
|
||||
new_feature_names = set(features_now) - set(parent_features)
|
||||
if new_feature_names:
|
||||
if not declared_here:
|
||||
raise TypeError(
|
||||
f"{cls.__name__}: new features detected {sorted(new_feature_names)} "
|
||||
f"but none declared in this class's inner `Features`."
|
||||
)
|
||||
undeclared = [n for n in new_feature_names if n not in declared_here]
|
||||
if undeclared:
|
||||
raise TypeError(f"{cls.__name__}: new features {sorted(undeclared)} must be declared in this class's `Features`.")
|
||||
|
||||
# 5) Publish the canonical sets
|
||||
if parent_cls and getattr(parent_cls, "__declared_features__", {}):
|
||||
merged_decl = dict(parent_cls.__declared_features__)
|
||||
merged_decl.update(declared_here)
|
||||
cls.__declared_features__ = merged_decl
|
||||
else:
|
||||
cls.__declared_features__ = declared_here
|
||||
cls.__feature_fields__ = features_now
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
T_Feature = TypeVar("T_Feature", bound=BaseFeature)
|
||||
class BaseAppliance(BaseElement, Generic[T_Feature], metaclass=ApplianceMeta):
|
||||
"""
|
||||
Feature binding is implicit via inner `class Features:`.
|
||||
Subclasses may add fields and add features freely.
|
||||
They may NOT change types of parent fields/features or remove them.
|
||||
"""
|
||||
|
||||
__declared_features__: Dict[str, type[BaseFeature]] = {}
|
||||
__feature_fields__: Dict[str, type[BaseFeature]] = {}
|
||||
|
||||
class BaseAppliance(Generic[T_Feature], BaseElement, ABC):
|
||||
cpu_cnt: Annotated[StrictInt, DABField(1, gt=0)]
|
||||
ram_size: Annotated[ByteSize, DABField(256, gt=128)]
|
||||
swap_size: Annotated[ByteSize, DABField(200, ge=0)]
|
||||
# core appliance schema
|
||||
template_id: UUID4
|
||||
template_short_name: StrictStr
|
||||
template_long_name: Optional[StrictStr] = None
|
||||
template_description: Optional[StrictStr] = None
|
||||
|
||||
rootfs_size: Annotated[ByteSize, DABField(2048, ge=2048)]
|
||||
cpu_cnt: StrictInt = Field(1, gt=0)
|
||||
ram_size: ByteSize = Field(256, gt=128)
|
||||
swap_size: ByteSize = Field(200, ge=0)
|
||||
rootfs_size: ByteSize = Field(2048, ge=2048)
|
||||
|
||||
dabinst_id: Annotated[UUID4, DABField(uuid4(), repr=True)]
|
||||
dabinst_short_name: Annotated[
|
||||
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
|
||||
]
|
||||
dabinst_long_name: Annotated[StrictStr | None, DABField("")]
|
||||
dabinst_description: Annotated[StrictStr | None, DABField("")]
|
||||
dabinst_creationdate: Annotated[AwareDatetime | None, DABField(datetime.now(tz=pytz.utc))]
|
||||
|
||||
|
||||
def default_values_override(func: T_BaseElement_ConfigMethod) -> T_BaseElement_ConfigMethod:
|
||||
func = ensure_classmethod_based_on_signature(func)
|
||||
setattr(func, "default_values_override", lambda: True)
|
||||
return func
|
||||
dabinst_id: UUID4 = Field(default_factory=uuid4)
|
||||
dabinst_short_name: StrictStr
|
||||
dabinst_long_name: Optional[StrictStr] = ""
|
||||
dabinst_description: Optional[StrictStr] = ""
|
||||
dabinst_creationdate: Optional[AwareDatetime] = Field(default_factory=lambda: _dt.now(tz=pytz.utc))
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
from pydantic import BaseModel, SerializeAsAny
|
||||
|
||||
|
||||
class commonbase(
|
||||
BaseModel,
|
||||
revalidate_instances="subclass-instances", # toogle to generate error
|
||||
): ...
|
||||
|
||||
|
||||
class basechild(commonbase):
|
||||
test_val: int = 1
|
||||
|
||||
|
||||
class derivedchild(basechild):
|
||||
test_val2: int = 2
|
||||
|
||||
|
||||
class container(commonbase):
|
||||
|
||||
ct_child_1: dict[str, basechild] = {}
|
||||
ct_child_2: SerializeAsAny[dict[str, basechild]] = {}
|
||||
ct_child_3: dict[str, SerializeAsAny[basechild]] = {}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_val = container(
|
||||
ct_child_1={"test1": derivedchild()},
|
||||
ct_child_2={"test2": derivedchild()},
|
||||
ct_child_3={"test3": derivedchild()},
|
||||
)
|
||||
|
||||
print(test_val.model_dump_json(indent=1))
|
||||
|
||||
print(test_val.model_dump())
|
||||
assert "test_val2" not in test_val.model_dump()["ct_child_1"]["test1"]
|
||||
assert "test_val2" in test_val.model_dump()["ct_child_2"]["test2"]
|
||||
assert "test_val2" in test_val.model_dump()["ct_child_3"]["test3"]
|
||||
@@ -10,217 +10,352 @@ import unittest
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import StrictInt
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
# print(__name__)
|
||||
# print(__package__)
|
||||
|
||||
from src import dabmodel
|
||||
from typing import Annotated, Optional
|
||||
from src import dabmodel as dm
|
||||
from typing import Annotated, Any, Optional
|
||||
from uuid import uuid4
|
||||
from pydantic import UUID4
|
||||
import json
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
"""
|
||||
class UUIDEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, UUID):
|
||||
# if the obj is uuid, we simply return the value of uuid
|
||||
return obj.hex
|
||||
elif isinstance(obj, datetime):
|
||||
return str(obj)
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
"""
|
||||
from typing import Optional, Union
|
||||
|
||||
class MyAppliance(dabmodel.BaseAppliance):
|
||||
app_specifi_integer_arg: Annotated[StrictInt | None, dabmodel.DABField(42)]
|
||||
|
||||
class _MyFeature(dabmodel.BaseFeature):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 1")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa666"
|
||||
values["template_short_name"] = "my-feature-1"
|
||||
values["template_long_name"] = "My feature template 1 !!"
|
||||
values["template_description"] = """A very nice FEature 1"""
|
||||
|
||||
MyFeature: Annotated[Optional[_MyFeature], dabmodel.DABField(_MyFeature())]
|
||||
|
||||
class _MyFeature2(dabmodel.BaseFeature):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 2")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa666"
|
||||
values["template_short_name"] = "my-feature-2"
|
||||
values["template_long_name"] = "My feature template 2 !!"
|
||||
values["template_description"] = """A very nice FEature 2"""
|
||||
|
||||
MyFeature2: Annotated[Optional[_MyFeature2], dabmodel.DABField(_MyFeature2())]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 1")
|
||||
print(f"!!!! {values['rootfs_size']}")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b64-ec439f1faae8"
|
||||
values["template_short_name"] = "my-app- tem 1"
|
||||
values["template_long_name"] = "My appliance template 1 !!"
|
||||
values["template_description"] = """A very nice Appliance 1"""
|
||||
values["ram_size"] = 1024
|
||||
# cls.add_feature(cls.MyFeature())
|
||||
# cls.add_feature(cls.MyFeature2())
|
||||
from pydantic import ValidationError, StrictInt, StrictStr, Field
|
||||
|
||||
|
||||
class MyAppliance2(MyAppliance):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 2")
|
||||
print(f"!!!! {values['template_id']}")
|
||||
values["template_id"] = "421d61cb-e664-46d8-9b64-ec439f1fafff"
|
||||
values["template_short_name"] = "my-app- tem 2"
|
||||
values["template_long_name"] = "My appliance template 2 !!"
|
||||
values["template_description"] = """A very nice Appliance 2"""
|
||||
values["MyFeature"] = None
|
||||
# cls.del_feature(MyAppliance.MyFeature)
|
||||
# values["features"]["MyFeature2"].template_description = """Override feature desc"""
|
||||
# ---------- Base models to reuse in many tests ----------
|
||||
|
||||
|
||||
class MyAppliance3(dabmodel.BaseAppliance):
|
||||
class Router(dm.BaseAppliance):
|
||||
class Features:
|
||||
class Network(dm.BaseFeature):
|
||||
mtu: StrictInt = 1500
|
||||
|
||||
class _MyFeature6(MyAppliance._MyFeature):
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
class Defaults:
|
||||
template_id = "00000000-0000-4000-8000-000000000001"
|
||||
template_short_name = "net"
|
||||
|
||||
test_integer: Annotated[int, dabmodel.DABField(200, ge=0)]
|
||||
test_integer1234: Annotated[int, dabmodel.DABField(200, ge=0)]
|
||||
# test_integer: int = dabmodel.DABField(200, ge=0)
|
||||
class Defaults:
|
||||
template_id = "11111111-1111-4111-8111-111111111111"
|
||||
template_short_name = "router"
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 1 (modified)")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa778"
|
||||
values["template_short_name"] = "my-feature-1-bis"
|
||||
values["test_integer"] = 666
|
||||
|
||||
MyFeature6: Annotated[Optional[_MyFeature6], dabmodel.DABField(_MyFeature6())]
|
||||
|
||||
class _MyFeature7(dabmodel.BaseFeature):
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
test_integer_2: Annotated[int, dabmodel.DABField(759, ge=0)]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 7")
|
||||
values["template_id"] = "421d61cb-e364-46d8-ac55-ec439f1fa778"
|
||||
values["template_short_name"] = "my-feature-7"
|
||||
values["template_long_name"] = "My appliance template 7 !!"
|
||||
values["template_description"] = """A very nice Appliance 7"""
|
||||
values["test_integer_2"] = 3844
|
||||
|
||||
MyFeature7: Annotated[Optional[_MyFeature7], dabmodel.DABField(_MyFeature7())]
|
||||
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 3")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b64-ec439f1faaaa"
|
||||
values["template_short_name"] = "my-app- tem 3"
|
||||
values["template_long_name"] = "My appliance template 3 !!"
|
||||
values["template_description"] = """A very nice Appliance 3"""
|
||||
values["ram_size"] = 3076
|
||||
print("CREATE FEATURE")
|
||||
# cls.add_feature(cls.MyFeature6())
|
||||
# cls.add_feature(cls.MyFeature7())
|
||||
print("!!! CONFIG Appliance 3 DONE")
|
||||
class Network:
|
||||
enabled = True
|
||||
mtu = 9000
|
||||
|
||||
|
||||
class MyAppliance4(MyAppliance):
|
||||
custom_uuid: Annotated[UUID4, dabmodel.DABField(uuid4(), repr=True)]
|
||||
class RouterPlus(Router):
|
||||
model_name: StrictStr = "plus"
|
||||
|
||||
class _MyFeature8(dabmodel.BaseFeature):
|
||||
# testtt: Annotated[MyAppliance._MyFeature, Field(MyAppliance._MyFeature())] # error case (nested feature)
|
||||
class Features:
|
||||
class Firewall(dm.BaseFeature):
|
||||
enabled_by_default: bool = True
|
||||
|
||||
# toto: fix that
|
||||
# test_integer_10_bad: Annotated[int, dabmodel.DABField(3189, ge=0, toto="tata")] # error case (extra field), /!\ this one is not working anymore :(
|
||||
class Defaults:
|
||||
template_id = "22222222-2222-4222-8222-222222222222"
|
||||
template_short_name = "fw"
|
||||
|
||||
test_integer_10: Annotated[int, dabmodel.DABField(3189, ge=0)]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 8")
|
||||
values["template_id"] = "421d61cb-e364-46d8-ac55-ec4398888778"
|
||||
values["template_short_name"] = "my-feature-8"
|
||||
values["template_long_name"] = "My appliance template 8 !!"
|
||||
values["template_description"] = """A very nice Appliance 8"""
|
||||
values["test_integer_10"] = 951753
|
||||
# values["tete"] = 1 # error case (extra field in feature)
|
||||
|
||||
MyFeature8: Annotated[Optional[_MyFeature8], dabmodel.DABField(_MyFeature8())]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 4")
|
||||
values["template_id"] = "421d1234-e364-46d8-9b64-ec439f1faaaa"
|
||||
values["template_short_name"] = "my-app-tem 4"
|
||||
values["template_long_name"] = "My appliance template 4 !!"
|
||||
values["template_description"] = """A very nice Appliance 4"""
|
||||
values["ram_size"] = 954
|
||||
print("CREATE FEATURE")
|
||||
# cls.add_feature(cls.MyFeature8())
|
||||
print("!!! CONFIG Appliance 4 DONE")
|
||||
class Defaults:
|
||||
class Firewall:
|
||||
enabled = True
|
||||
enabled_by_default = True
|
||||
|
||||
|
||||
class TestModel(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
chdir(testdir_path.parent.resolve())
|
||||
class RouterLite(RouterPlus):
|
||||
class Defaults:
|
||||
template_short_name = "router-lite"
|
||||
model_name = "lite"
|
||||
|
||||
def test_version(self):
|
||||
self.assertNotEqual(dabmodel.__version__, "?.?.?")
|
||||
class Network:
|
||||
enabled = False
|
||||
|
||||
def test_model(self):
|
||||
class Firewall:
|
||||
enabled = True
|
||||
enabled_by_default = False
|
||||
|
||||
feature1 = MyAppliance._MyFeature()
|
||||
print(feature1)
|
||||
print(MyAppliance._MyFeature)
|
||||
print(MyAppliance._MyFeature.__name__)
|
||||
print(MyAppliance._MyFeature.__class__)
|
||||
print("==")
|
||||
print(feature1.model_dump_json(indent=1))
|
||||
|
||||
app = MyAppliance(dabinst_short_name="my-app-1", app_specifi_integer_arg=123)
|
||||
app2 = MyAppliance2(dabinst_short_name="my-app-2", app_specifi_integer_arg=654)
|
||||
app3 = MyAppliance3(dabinst_short_name="my-app-3", template_description="FORCED")
|
||||
# ---------- happy path: defaults & merges ----------
|
||||
|
||||
print(app.model_dump_json(indent=1))
|
||||
print(app2.model_dump_json(indent=1))
|
||||
print(app3.model_dump_json(indent=1))
|
||||
|
||||
app3 = MyAppliance3(dabinst_short_name="my-app-3", template_description="FORCED")
|
||||
tmp_json = app3.dict()
|
||||
tmp_json["MyFeature7"]["test_integer_2"] = 123
|
||||
print(tmp_json)
|
||||
recreated_obj = MyAppliance3.model_validate_json(json.dumps(tmp_json, cls=dabmodel.DABJSONEncoder))
|
||||
print(recreated_obj)
|
||||
print(recreated_obj.model_dump_json(indent=1))
|
||||
class TestDefaultsAndMerges(unittest.TestCase):
|
||||
def test_root_defaults_and_feature(self):
|
||||
a = Router(dabinst_short_name="r1")
|
||||
self.assertEqual(a.template_short_name, "router")
|
||||
self.assertIsNotNone(a.Network)
|
||||
self.assertEqual(a.Network.mtu, 9000)
|
||||
# required feature defaults from Feature.Defaults are present
|
||||
self.assertIsNotNone(a.Network.template_id)
|
||||
self.assertEqual(str(a.Network.template_id), "00000000-0000-4000-8000-000000000001")
|
||||
self.assertEqual(a.Network.template_short_name, "net")
|
||||
|
||||
app4 = MyAppliance4(dabinst_short_name="my-app-4", template_description="FORCED2")
|
||||
tmp_json = app4.dict()
|
||||
tmp_json["MyFeature"]["template_description"] = "blablabla"
|
||||
tmp_json["MyFeature2"]["template_description"] = "blablabla2"
|
||||
print(tmp_json)
|
||||
recreated_obj = MyAppliance4.model_validate_json(json.dumps(tmp_json, cls=dabmodel.DABJSONEncoder))
|
||||
print(recreated_obj)
|
||||
print(recreated_obj.model_dump_json(indent=1))
|
||||
def test_subclass_adds_field_and_feature(self):
|
||||
a = RouterPlus(dabinst_short_name="rp1")
|
||||
self.assertEqual(a.model_name, "plus")
|
||||
self.assertIsNotNone(a.Network)
|
||||
self.assertIsNotNone(a.Firewall)
|
||||
self.assertTrue(a.Firewall.enabled_by_default)
|
||||
|
||||
# tmp_json["non-existing"] = "test" # error case
|
||||
# tmp_json["non-existing"] = "test" # error case
|
||||
# tmp_json["MyFeature"]["132"] = "test" # error case
|
||||
def test_subclass_value_overrides(self):
|
||||
a = RouterLite(dabinst_short_name="rl1")
|
||||
self.assertEqual(a.template_short_name, "router-lite")
|
||||
self.assertEqual(a.model_name, "lite")
|
||||
self.assertIsNone(a.Network) # disabled by subclass default
|
||||
self.assertIsNotNone(a.Firewall)
|
||||
self.assertFalse(a.Firewall.enabled_by_default)
|
||||
|
||||
recreated_obj = MyAppliance4.model_validate_json(json.dumps(tmp_json, cls=dabmodel.DABJSONEncoder))
|
||||
def test_user_input_overrides_enabled_and_values(self):
|
||||
# user value merge (no 'enabled'): stays enabled from class default
|
||||
a = Router(dabinst_short_name="u1", Network={"mtu": 4096})
|
||||
self.assertEqual(a.Network.mtu, 4096)
|
||||
|
||||
# app3.add_feature(MyAppliance.MyFeature()) # error case (add_feature not callable from instance)
|
||||
# user disables feature
|
||||
b = RouterPlus(dabinst_short_name="u2", Firewall={"enabled": False})
|
||||
self.assertIsNone(b.Firewall)
|
||||
|
||||
for name in globals().keys():
|
||||
print(name)
|
||||
# user enables & overrides a field
|
||||
c = RouterPlus(dabinst_short_name="u3", Firewall={"enabled": True, "enabled_by_default": False})
|
||||
self.assertIsNotNone(c.Firewall)
|
||||
self.assertFalse(c.Firewall.enabled_by_default)
|
||||
|
||||
def test_enabling_without_config_pulls_feature_defaults(self):
|
||||
class R(Router):
|
||||
class Defaults:
|
||||
class Network:
|
||||
enabled = True # no extra values here
|
||||
|
||||
r = R(dabinst_short_name="x")
|
||||
self.assertIsNotNone(r.Network)
|
||||
# pulled from Network.Defaults (template fields & mtu default 1500 -> overridden by Router.Defaults to 9000)
|
||||
self.assertEqual(r.Network.mtu, 9000)
|
||||
self.assertEqual(r.Network.template_short_name, "net")
|
||||
|
||||
|
||||
# ---------- invariants: schema monotonicity & declarations ----------
|
||||
|
||||
|
||||
class TestInheritanceRules(unittest.TestCase):
|
||||
def test_cannot_remove_parent_feature(self):
|
||||
def make_bad():
|
||||
class Bad(RouterPlus):
|
||||
class Features:
|
||||
class Firewall(dm.BaseFeature): # re-declare only one, drop 'Network'
|
||||
enabled_by_default: bool = True
|
||||
|
||||
return Bad
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = make_bad()
|
||||
|
||||
def test_cannot_retarget_parent_feature_type(self):
|
||||
def make_bad():
|
||||
class Router2(Router):
|
||||
class Features:
|
||||
class Network(dm.BaseFeature): # Trying to retarget 'Network' to a new type
|
||||
mtu: StrictInt = 1600
|
||||
|
||||
return Router2
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = make_bad()
|
||||
|
||||
def test_cannot_change_parent_field_type_or_constraints(self):
|
||||
# Type change (StrictInt -> int)
|
||||
def bad_type():
|
||||
class Bad(Router):
|
||||
cpu_cnt: int = 2 # loses StrictInt constraint
|
||||
|
||||
return Bad
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = bad_type()
|
||||
|
||||
# Constraint change (gt=0 -> ge=0)
|
||||
def bad_constraint():
|
||||
class Bad2(Router):
|
||||
cpu_cnt: StrictInt = Field(1, ge=0)
|
||||
|
||||
return Bad2
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = bad_constraint()
|
||||
|
||||
def test_new_feature_must_be_declared_in_inner_Features(self):
|
||||
# Add a new feature field but forget to declare it in inner Features
|
||||
def bad_undeclared():
|
||||
class Bad(Router):
|
||||
class Features:
|
||||
class Network(dm.BaseFeature):
|
||||
mtu: StrictInt = 1500
|
||||
|
||||
class Extra(dm.BaseFeature):
|
||||
foo: StrictInt = 1
|
||||
|
||||
Extra: Optional[Extra] = Extra()
|
||||
|
||||
return Bad
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = bad_undeclared()
|
||||
|
||||
def test_subclass_can_add_new_feature_when_declared(self):
|
||||
class Good(Router):
|
||||
class Features:
|
||||
class QoS(dm.BaseFeature):
|
||||
priority: StrictInt = 5
|
||||
|
||||
class Defaults:
|
||||
template_id = "33333333-3333-4333-8333-333333333333"
|
||||
template_short_name = "qos"
|
||||
|
||||
class Defaults:
|
||||
class QoS:
|
||||
enabled = True
|
||||
priority = 7
|
||||
|
||||
g = Good(dabinst_short_name="g1")
|
||||
self.assertIsNotNone(g.QoS)
|
||||
self.assertEqual(g.QoS.priority, 7)
|
||||
self.assertEqual(str(g.QoS.template_id), "33333333-3333-4333-8333-333333333333")
|
||||
|
||||
|
||||
# ---------- type allowlist enforcement ----------
|
||||
|
||||
|
||||
class TestAllowedTypes(unittest.TestCase):
|
||||
def test_disallow_bad_appliance_field_type(self):
|
||||
def bad_field():
|
||||
class Bad(Router):
|
||||
bad: float = 1.2 # float not in allowed set
|
||||
|
||||
return Bad
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = bad_field()
|
||||
|
||||
def test_disallow_bad_feature_field_type(self):
|
||||
def bad_feature():
|
||||
class Bad(Router):
|
||||
class Features:
|
||||
class BadFeat(dm.BaseFeature):
|
||||
bad: float = 1.2
|
||||
|
||||
return Bad
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
_ = bad_feature()
|
||||
|
||||
|
||||
# ---------- extras=forbid & immutability ----------
|
||||
|
||||
|
||||
class TestValidationAndImmutability(unittest.TestCase):
|
||||
def test_appliance_extra_forbidden(self):
|
||||
with self.assertRaises(ValidationError):
|
||||
_ = Router(dabinst_short_name="r", unknown=1)
|
||||
|
||||
def test_feature_extra_forbidden(self):
|
||||
# 'enabled' is stripped, but unknown keys should error
|
||||
with self.assertRaises(ValidationError):
|
||||
_ = Router(dabinst_short_name="r", Network={"enabled": True, "unknown": 42})
|
||||
|
||||
def test_instances_are_frozen(self):
|
||||
r = Router(dabinst_short_name="f")
|
||||
with self.assertRaises(TypeError):
|
||||
r.template_short_name = "hack" # frozen
|
||||
|
||||
|
||||
# ---------- Optional/Union handling ----------
|
||||
|
||||
|
||||
class TestOptionalUnionHandling(unittest.TestCase):
|
||||
def test_optional_feature_annotation_is_detected(self):
|
||||
class R(dm.BaseAppliance):
|
||||
class Features:
|
||||
class Net(dm.BaseFeature):
|
||||
mtu: StrictInt = 1500
|
||||
|
||||
class Defaults:
|
||||
template_id = "44444444-4444-4444-8444-444444444444"
|
||||
template_short_name = "net2"
|
||||
|
||||
# explicit Optional[...] annotation (the metaclass injects it too if missing)
|
||||
Net: Optional[Features.Net] = None
|
||||
|
||||
class Defaults:
|
||||
template_id = "55555555-5555-4555-8555-555555555551"
|
||||
template_short_name = "R"
|
||||
|
||||
class Net:
|
||||
enabled = True
|
||||
mtu = 2000
|
||||
|
||||
r = R(dabinst_short_name="o1")
|
||||
self.assertIsNotNone(r.Net)
|
||||
self.assertEqual(r.Net.mtu, 2000)
|
||||
self.assertEqual(str(r.Net.template_id), "44444444-4444-4444-8444-444444444444")
|
||||
|
||||
def test_union_with_none_is_detected(self):
|
||||
class R(dm.BaseAppliance):
|
||||
class Features:
|
||||
class Net(dm.BaseFeature):
|
||||
mtu: StrictInt = 1500
|
||||
|
||||
class Defaults:
|
||||
template_id = "55555555-5555-4555-8555-555555555555"
|
||||
template_short_name = "net3"
|
||||
|
||||
# fancier spelling of Optional
|
||||
Net: Union[Features.Net, None] = None
|
||||
|
||||
class Defaults:
|
||||
template_id = "55555555-5555-4555-8555-555555555551"
|
||||
template_short_name = "R"
|
||||
|
||||
class Net:
|
||||
enabled = True
|
||||
|
||||
r = R(dabinst_short_name="u1")
|
||||
self.assertIsNotNone(r.Net)
|
||||
self.assertEqual(str(r.Net.template_id), "55555555-5555-4555-8555-555555555555")
|
||||
|
||||
|
||||
# ---------- class defaults MUST NOT mutate during instance creation ----------
|
||||
|
||||
|
||||
class TestDefaultsMutationSafety(unittest.TestCase):
|
||||
def test_defaults_map_not_mutated(self):
|
||||
# Preconditions
|
||||
self.assertIn("Firewall", RouterPlus.__defaults_map__)
|
||||
self.assertIn("enabled", RouterPlus.__defaults_map__["Firewall"])
|
||||
# Create an instance that disables Firewall, which previously used to mutate defaults
|
||||
_ = RouterPlus(dabinst_short_name="x", Firewall={"enabled": False})
|
||||
# Postconditions: class defaults unchanged
|
||||
self.assertIn("Firewall", RouterPlus.__defaults_map__)
|
||||
self.assertIn("enabled", RouterPlus.__defaults_map__["Firewall"])
|
||||
self.assertTrue(RouterPlus.__defaults_map__["Firewall"]["enabled"])
|
||||
|
||||
|
||||
# ---------- regression: providing config dict without flags still merges ----------
|
||||
|
||||
|
||||
class TestConfigWithoutEnabledFlag(unittest.TestCase):
|
||||
def test_merge_when_no_flags(self):
|
||||
# Provide a dict (no 'enabled'), should merge and keep enabled from class default
|
||||
r = Router(dabinst_short_name="m", Network={"mtu": 7777})
|
||||
self.assertEqual(r.Network.mtu, 7777)
|
||||
|
||||
Reference in New Issue
Block a user