Compare commits
2 Commits
0.0.1.post
...
0.0.1.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
26e32a004f | ||
|
|
b7cbc50f79 |
@@ -26,11 +26,12 @@ from copy import deepcopy, copy
|
||||
|
||||
# from pprint import pprint
|
||||
import math
|
||||
import inspect, ast, textwrap
|
||||
|
||||
from frozendict import deepfreeze
|
||||
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
||||
|
||||
ALLOWED_ANNOTATIONS = {
|
||||
ALLOWED_ANNOTATIONS: dict[str, Any] = {
|
||||
"Union": Union,
|
||||
"Optional": Optional,
|
||||
"List": List,
|
||||
@@ -54,10 +55,14 @@ ALLOWED_ANNOTATIONS = {
|
||||
"tuple": tuple,
|
||||
}
|
||||
|
||||
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
|
||||
|
||||
|
||||
# TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
|
||||
ALLOWED_MODEL_FIELDS_TYPES: tuple[type[Any], ...] = (
|
||||
str,
|
||||
int,
|
||||
float,
|
||||
complex,
|
||||
bool,
|
||||
bytes,
|
||||
)
|
||||
|
||||
|
||||
class DABModelException(Exception):
|
||||
@@ -66,6 +71,15 @@ class DABModelException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class FunctionForbidden(DABModelException): ...
|
||||
|
||||
|
||||
class ExternalCodeForbidden(FunctionForbidden): ...
|
||||
|
||||
|
||||
class ClosureForbidden(FunctionForbidden): ...
|
||||
|
||||
|
||||
class ReservedFieldName(Exception):
|
||||
"""DABModelException Exception class
|
||||
Base Exception for DABModelException class
|
||||
@@ -150,15 +164,9 @@ class InvalidFeatureInheritance(DABModelException):
|
||||
"""
|
||||
|
||||
|
||||
class FunctionForbidden(DABModelException):
|
||||
"""FunctionForbidden Exception class
|
||||
function call are forbidden
|
||||
"""
|
||||
|
||||
|
||||
class FeatureNotBound(DABModelException):
|
||||
"""FeatureNotBound Exception class
|
||||
a Feature must be bound to an Appliance
|
||||
a Feature must be bound to the appliance (or parent)
|
||||
"""
|
||||
|
||||
|
||||
@@ -181,7 +189,7 @@ ALLOWED_HELPERS_MATH = SimpleNamespace(
|
||||
degrees=math.degrees,
|
||||
)
|
||||
|
||||
ALLOWED_HELPERS_DEFAULT = {
|
||||
ALLOWED_HELPERS_DEFAULT: dict[str, object] = {
|
||||
"math": ALLOWED_HELPERS_MATH,
|
||||
"print": print,
|
||||
# Numbers & reducers (pure, deterministic)
|
||||
@@ -210,6 +218,61 @@ ALLOWED_HELPERS_DEFAULT = {
|
||||
}
|
||||
|
||||
|
||||
def _check_initializer_safety(func) -> None:
|
||||
"""
|
||||
Preliminary structural check for __initializer__.
|
||||
|
||||
Policy (minimal):
|
||||
- Forbid 'import' / 'from ... import ...' inside the initializer body.
|
||||
- Forbid nested function definitions (closures/helpers) in the body.
|
||||
- Allow lambdas.
|
||||
- No restrictions on calls here (keep it simple).
|
||||
- Optionally forbid closures (free vars) for determinism.
|
||||
"""
|
||||
try:
|
||||
src = inspect.getsource(func)
|
||||
except OSError as exc:
|
||||
# If source isn't available (rare), fail closed (or skip if you prefer)
|
||||
raise FunctionForbidden("Cannot inspect __initializer__ source") from exc
|
||||
|
||||
src = textwrap.dedent(src)
|
||||
mod = ast.parse(src)
|
||||
|
||||
# Find the FunctionDef node that corresponds to this initializer
|
||||
init_node = None
|
||||
for n in mod.body:
|
||||
if (
|
||||
isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
|
||||
and n.name == func.__name__
|
||||
):
|
||||
init_node = n
|
||||
break
|
||||
if init_node is None:
|
||||
# Fallback: if not found, analyze nothing further to avoid false positives
|
||||
return
|
||||
|
||||
# Walk ONLY the body of the initializer (don't flag the def itself)
|
||||
body_tree = ast.Module(body=init_node.body, type_ignores=[])
|
||||
|
||||
for node in ast.walk(body_tree):
|
||||
# Forbid imports
|
||||
if isinstance(node, (ast.Import, ast.ImportFrom)):
|
||||
raise ImportForbidden("imports disabled in __initializer")
|
||||
|
||||
# Forbid nested defs (but allow lambdas)
|
||||
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
||||
raise FunctionForbidden(
|
||||
"Nested function definitions are forbidden in __initializer"
|
||||
)
|
||||
|
||||
if isinstance(node, ast.Lambda): # Forbid lambda
|
||||
raise FunctionForbidden("Lambdas are forbidden in __initializer")
|
||||
|
||||
# Optional: forbid closures (keeps determinism; allows lambdas that don't capture)
|
||||
if func.__code__.co_freevars:
|
||||
raise FunctionForbidden("Closures are forbidden in __initializer__")
|
||||
|
||||
|
||||
def _blocked_import(*args, **kwargs):
|
||||
raise ImportForbidden("imports disabled in __initializer")
|
||||
|
||||
@@ -740,6 +803,7 @@ class BaseMeta(type):
|
||||
class schema's DABFields.
|
||||
"""
|
||||
if mcs.initializer is not None:
|
||||
_check_initializer_safety(mcs.initializer)
|
||||
init_fieldvalues = {}
|
||||
init_fieldtypes = {}
|
||||
for _fname, _fvalue in cls.__DABSchema__.items():
|
||||
@@ -1055,25 +1119,62 @@ class BaseMetaAppliance(BaseMeta):
|
||||
|
||||
def finalize_instance(cls: Type, obj, extensions: dict[str, Any]):
|
||||
"""
|
||||
Instantiate and attach all declared Feature classes on the appliance instance.
|
||||
|
||||
Each feature is constructed (running the same populate/freeze steps),
|
||||
then assigned to `obj.<FeatureName>`.
|
||||
Instantiate and attach all features declared (or overridden) in the instance schema.
|
||||
Handles:
|
||||
- Declared features (plain class)
|
||||
- Subclass replacements
|
||||
- Dict overrides (class + patch dict)
|
||||
"""
|
||||
for _ftname, _ftvalue in cls.__DABSchema__["features"].items():
|
||||
instft = _ftvalue()
|
||||
object.__setattr__(obj, _ftname, instft)
|
||||
for fname, fdef in obj.__DABSchema__.get("features", {}).items():
|
||||
# Case 1: plain class or subclass
|
||||
if isinstance(fdef, type) and issubclass(fdef, BaseFeature):
|
||||
inst = fdef()
|
||||
object.__setattr__(obj, fname, inst)
|
||||
|
||||
# Case 2: (class, dict) → dict overrides
|
||||
elif isinstance(fdef, tuple) and len(fdef) == 2:
|
||||
feat_cls, overrides = fdef
|
||||
inst = feat_cls()
|
||||
for field_name, new_val in overrides.items():
|
||||
if field_name not in feat_cls.__DABSchema__:
|
||||
raise InvalidFieldValue(
|
||||
f"Feature '{fname}' has no field '{field_name}'"
|
||||
)
|
||||
field = feat_cls.__DABSchema__[field_name]
|
||||
try:
|
||||
check_type(
|
||||
new_val,
|
||||
field.annotations,
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid value for {fname}.{field_name}: "
|
||||
f"expected {field.annotations}, got {new_val!r}"
|
||||
) from exp
|
||||
object.__setattr__(inst, field_name, _deepfreeze(new_val))
|
||||
inst.__DABSchema__[field_name] = FrozenDABField(
|
||||
DABField(field_name, new_val, field.annotations, field._info)
|
||||
)
|
||||
object.__setattr__(obj, fname, inst)
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(
|
||||
f"Invalid feature definition stored for '{fname}': {fdef!r}"
|
||||
)
|
||||
|
||||
def apply_overrides(cls, obj, extensions, *args, **kwargs):
|
||||
"""
|
||||
Support for runtime field and feature overrides.
|
||||
|
||||
Examples:
|
||||
MyApp(name="foo") # field override
|
||||
MyApp(Nginx=CustomNginxFeature) # override existing feature
|
||||
MyApp(Redis=RedisFeature) # attach new feature
|
||||
Fields:
|
||||
MyApp(name="foo")
|
||||
|
||||
Features:
|
||||
MyApp(F1=MyF1) # inheritance / replacement
|
||||
MyApp(F1={"val": 42, ...}) # dict override of existing feature
|
||||
"""
|
||||
# Handle field overrides
|
||||
# --- field overrides (unchanged) ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__: # regular field
|
||||
field = cls.__DABSchema__[k]
|
||||
@@ -1094,26 +1195,48 @@ class BaseMetaAppliance(BaseMeta):
|
||||
)
|
||||
kwargs.pop(k)
|
||||
|
||||
# Handle feature overrides/attachments
|
||||
# --- feature overrides ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if k in cls.__DABSchema__.get("features", {}):
|
||||
feat_cls = v if isinstance(v, type) else v.__class__
|
||||
if not issubclass(feat_cls, BaseFeature):
|
||||
base_feat_cls = cls.__DABSchema__["features"][k]
|
||||
|
||||
# Case 1: subclass replacement (inheritance)
|
||||
if isinstance(v, type) and issubclass(v, base_feat_cls):
|
||||
bound = getattr(v, "_BoundAppliance", None)
|
||||
if bound is None or not issubclass(cls, bound):
|
||||
raise FeatureNotBound(
|
||||
f"Feature {v.__name__} is not bound to {cls.__name__}"
|
||||
)
|
||||
# record subclass into instance schema
|
||||
obj.__DABSchema__["features"][k] = v
|
||||
kwargs.pop(k)
|
||||
|
||||
# Case 2: dict override
|
||||
elif isinstance(v, dict):
|
||||
# store (class, override_dict) for finalize_instance
|
||||
obj.__DABSchema__["features"][k] = (base_feat_cls, v)
|
||||
kwargs.pop(k)
|
||||
|
||||
else:
|
||||
raise InvalidFieldValue(
|
||||
f"Override for feature '{k}' must be a Feature class or instance"
|
||||
f"Feature override for '{k}' must be a Feature subclass or dict, got {type(v)}"
|
||||
)
|
||||
|
||||
# --- new features not declared at class level ---
|
||||
for k, v in list(kwargs.items()):
|
||||
if isinstance(v, type) and issubclass(v, BaseFeature):
|
||||
bound = getattr(v, "_BoundAppliance", None)
|
||||
if bound is None or not issubclass(cls, bound):
|
||||
raise FeatureNotBound(
|
||||
f"Feature {v.__name__} is not bound to {cls.__name__}"
|
||||
)
|
||||
feat_cls._BoundAppliance = cls
|
||||
inst = v if isinstance(v, BaseFeature) else feat_cls()
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__DABSchema__["features"][k] = feat_cls
|
||||
kwargs.pop(k)
|
||||
elif isinstance(v, type) and issubclass(v, BaseFeature):
|
||||
v._BoundAppliance = cls
|
||||
inst = v()
|
||||
object.__setattr__(obj, k, inst)
|
||||
obj.__DABSchema__["features"][k] = v
|
||||
kwargs.pop(k)
|
||||
|
||||
if kwargs:
|
||||
unknown = ", ".join(sorted(kwargs.keys()))
|
||||
raise InvalidFieldValue(f"Unknown parameters: {unknown}")
|
||||
|
||||
|
||||
class BaseAppliance(BaseElement, metaclass=BaseMetaAppliance):
|
||||
"""BaseFeature class
|
||||
|
||||
@@ -1825,6 +1825,19 @@ class MainTests(unittest.TestCase):
|
||||
with self.assertRaises(AttributeError):
|
||||
app.feat1
|
||||
|
||||
def test_feature_register_notbound(self):
|
||||
"""Testing first appliance feature, and Field types (simple)"""
|
||||
|
||||
# class can be created
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
pass
|
||||
|
||||
class Feature1(dm.BaseFeature):
|
||||
VarStrInner: str = "testvalue FEATURE1"
|
||||
|
||||
with self.assertRaises(dm.FeatureNotBound):
|
||||
app = Appliance1(feat1=Feature1)
|
||||
|
||||
def test_feature_register_defect(self):
|
||||
|
||||
class Feature1(dm.BaseFeature):
|
||||
@@ -1833,6 +1846,641 @@ class MainTests(unittest.TestCase):
|
||||
with self.assertRaises(dm.FeatureNotBound):
|
||||
feat1 = Feature1()
|
||||
|
||||
def test_override(self):
|
||||
"""Testing first appliance level, and Field types (List)"""
|
||||
|
||||
# class can be created with list
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: List[int] = [1, 2]
|
||||
testVar2: List[str] = ["a", "b"]
|
||||
testVar3: float = 0.5
|
||||
testVar4: "List[str]" = ["a", "c"]
|
||||
testVar5: list[str] = ["a", "b"]
|
||||
testVar6: "list[str]" = ["a", "b"]
|
||||
|
||||
app1 = Appliance1(testVar=[5, 6], testVar2=[], testVar3=45.8)
|
||||
|
||||
self.immutable_vars__test_field(app1, "testVar", (5, 6), (1, 5))
|
||||
self.immutable_vars__test_field(app1, "testVar2", (), ("h", "c"))
|
||||
self.immutable_vars__test_field(app1, "testVar3", 45.8, 43.9)
|
||||
self.immutable_vars__test_field(app1, "testVar4", ("a", "c"), ("h", "e"))
|
||||
self.immutable_vars__test_field(app1, "testVar5", ("a", "b"), ("h", "c"))
|
||||
self.immutable_vars__test_field(app1, "testVar6", ("a", "b"), ("h", "c"))
|
||||
|
||||
# test no leaks
|
||||
app2 = Appliance1()
|
||||
self.immutable_vars__test_field(app2, "testVar", (1, 2), (1, 5))
|
||||
self.immutable_vars__test_field(app2, "testVar2", ("a", "b"), ("h", "c"))
|
||||
self.immutable_vars__test_field(app2, "testVar3", 0.5, 43.9)
|
||||
self.immutable_vars__test_field(app2, "testVar4", ("a", "c"), ("h", "e"))
|
||||
self.immutable_vars__test_field(app2, "testVar5", ("a", "b"), ("h", "c"))
|
||||
self.immutable_vars__test_field(app2, "testVar6", ("a", "b"), ("h", "c"))
|
||||
|
||||
def test_new_field_forbidden(self):
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
app = App()
|
||||
with self.assertRaises(dm.NewFieldForbidden):
|
||||
app.y = 2
|
||||
|
||||
def test_private_field_allowed(self):
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
app = App()
|
||||
app._internal = 42 # should be allowed
|
||||
self.assertEqual(app._internal, 42)
|
||||
|
||||
def test_inherit_declared_feature(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
class MyF1(App.F1):
|
||||
val = 2
|
||||
val2: str = "toto"
|
||||
|
||||
app = App(F1=MyF1)
|
||||
self.assertIsInstance(app.F1, MyF1)
|
||||
self.assertEqual(app.F1.val, 2)
|
||||
self.assertEqual(app.F1.val2, "toto")
|
||||
|
||||
def test_override_declared_feature(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
val2: str = "toto"
|
||||
|
||||
app = App(F1={"val": 42, "val2": "tata"})
|
||||
self.assertEqual(app.F1.val, 42)
|
||||
self.assertEqual(app.F1.val2, "tata")
|
||||
|
||||
def test_feature_dict_override_type_error(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
# wrong type for val → must raise InvalidFieldValue
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App(F1={"val": "not-an-int"})
|
||||
|
||||
def test_feature_dict_override_nonexisting_field(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
# field does not exist → must raise
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App(F1={"doesnotexist": 123})
|
||||
|
||||
def test_feature_inheritance_with_extra_fields(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
class MyF1(App.F1):
|
||||
val = 2
|
||||
extra: str = "hello"
|
||||
|
||||
app = App(F1=MyF1)
|
||||
self.assertEqual(app.F1.val, 2)
|
||||
self.assertEqual(app.F1.extra, "hello")
|
||||
|
||||
def test_feature_not_bound_runtime_attach_fails(self):
|
||||
class App(dm.BaseAppliance):
|
||||
pass
|
||||
|
||||
class UnboundFeature(dm.BaseFeature):
|
||||
x: int = 1
|
||||
|
||||
# attaching an unbound feature should raise
|
||||
with self.assertRaises(dm.FeatureNotBound):
|
||||
App(Unbound=UnboundFeature)
|
||||
|
||||
def test_field_cannot_be_reassigned(self):
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
app = App()
|
||||
with self.assertRaises(dm.ReadOnlyField):
|
||||
app.x = 99
|
||||
|
||||
def test_feature_override_does_not_leak_between_instances(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
app1 = App(F1={"val": 99})
|
||||
app2 = App()
|
||||
self.assertEqual(app1.F1.val, 99)
|
||||
self.assertEqual(app2.F1.val, 1)
|
||||
|
||||
def test_schema_fields_are_frozen(self):
|
||||
class App(dm.BaseAppliance):
|
||||
x: list[int] = [1, 2]
|
||||
|
||||
app = App()
|
||||
with self.assertRaises(AttributeError): # frozendict / tuple immutability
|
||||
app.x.insert(3)
|
||||
|
||||
with self.assertRaises(dm.ReadOnlyField): # frozendict / tuple immutability
|
||||
app.x = (1, 3)
|
||||
|
||||
def test_initializer_invalid_type_raises(self):
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
cls.x = "wrong" # wrong type
|
||||
|
||||
def test_initializer_forbid_nested_functions(self):
|
||||
with self.assertRaises(dm.FunctionForbidden):
|
||||
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
def inner():
|
||||
return 2
|
||||
|
||||
cls.x = inner()
|
||||
|
||||
with self.assertRaises(dm.FunctionForbidden):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
y = 2
|
||||
|
||||
def inner():
|
||||
return y + 1
|
||||
|
||||
cls.x = inner()
|
||||
|
||||
def test_initializer_lambda_forbidden(self):
|
||||
with self.assertRaises(dm.FunctionForbidden):
|
||||
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
f = lambda: 42 # forbidden
|
||||
cls.x = f()
|
||||
|
||||
def test_multiple_inheritance_forbidden(self):
|
||||
with self.assertRaises(dm.MultipleInheritanceForbidden):
|
||||
|
||||
class A(dm.BaseAppliance):
|
||||
pass
|
||||
|
||||
class B(dm.BaseAppliance):
|
||||
pass
|
||||
|
||||
class C(A, B):
|
||||
pass
|
||||
|
||||
def test_initializer_can_use_math(self):
|
||||
|
||||
class App(dm.BaseAppliance):
|
||||
val: float = 0.0
|
||||
|
||||
@classmethod
|
||||
def __initializer(cls):
|
||||
# should be able to access math without NameError
|
||||
cls.val = math.ceil(1.2) + math.floor(2.8)
|
||||
|
||||
app = App()
|
||||
# math.ceil(1.2)=2, math.floor(2.8)=2 → 4
|
||||
self.assertEqual(app.val, 4)
|
||||
|
||||
def test_deepfreeze_nested_mixed_tuple_list(self):
|
||||
class App(dm.BaseAppliance):
|
||||
data: tuple[list[int], tuple[int, list[int]]] = ([1, 2], (3, [4, 5]))
|
||||
|
||||
app = App()
|
||||
|
||||
# Top-level: must be tuple
|
||||
self.assertIsInstance(app.data, tuple)
|
||||
|
||||
# First element of tuple: should have been frozen to tuple, not list
|
||||
self.assertIsInstance(app.data[0], tuple)
|
||||
|
||||
# Nested second element: itself a tuple
|
||||
self.assertIsInstance(app.data[1], tuple)
|
||||
|
||||
# Deepest element: inner list should also be frozen to tuple
|
||||
self.assertIsInstance(app.data[1][1], tuple)
|
||||
|
||||
# Check immutability
|
||||
with self.assertRaises(TypeError):
|
||||
app.data[0] += (99,) # tuples are immutable
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
app.data[1][1] += (42,) # inner tuple also immutable
|
||||
|
||||
def test_inacurate_type(self):
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
SomeVar: list = []
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance2(dm.BaseAppliance):
|
||||
SomeVar: list[Any] = []
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance3(dm.BaseAppliance):
|
||||
SomeVar: list[object] = []
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance4(dm.BaseAppliance):
|
||||
SomeVar: dict = {}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance5(dm.BaseAppliance):
|
||||
SomeVar: dict[str, Any] = {}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance6(dm.BaseAppliance):
|
||||
SomeVar: dict[Any, Any] = {}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance7(dm.BaseAppliance):
|
||||
SomeVar: dict[Any, str] = {}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldAnnotation):
|
||||
|
||||
class Appliance8(dm.BaseAppliance):
|
||||
SomeVar: dict[str, object] = {}
|
||||
|
||||
def test_deepfreeze_nested_dict_list_set(self):
|
||||
class App(dm.BaseAppliance):
|
||||
data: dict[str, list[int] | set[str] | dict[str, list[int] | set[str]]] = {
|
||||
"numbers": [1, 2, 3],
|
||||
"letters": {"a", "b", "c"},
|
||||
"mixed": {"x": [4, 5], "y": {"z"}},
|
||||
}
|
||||
|
||||
app = App()
|
||||
|
||||
# Top-level: should be frozendict
|
||||
self.assertEqual(type(app.data).__name__, "frozendict")
|
||||
|
||||
# Lists must be frozen to tuple
|
||||
self.assertIsInstance(app.data["numbers"], tuple)
|
||||
self.assertIsInstance(app.data["mixed"]["x"], tuple)
|
||||
|
||||
# Sets must be frozen to frozenset
|
||||
self.assertIsInstance(app.data["letters"], frozenset)
|
||||
self.assertIsInstance(app.data["mixed"]["y"], frozenset)
|
||||
|
||||
# Check immutability
|
||||
with self.assertRaises(TypeError):
|
||||
app.data["numbers"] += (99,)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app.data["letters"].add("d")
|
||||
|
||||
with self.assertRaises(TypeError):
|
||||
app.data["mixed"]["x"] += (6,)
|
||||
|
||||
def test_unknown_parameters_raise(self):
|
||||
class App(dm.BaseAppliance):
|
||||
a: int = 1
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue) as cm:
|
||||
App(foo=123)
|
||||
|
||||
self.assertIn("Unknown parameters:", str(cm.exception))
|
||||
self.assertIn("foo", str(cm.exception))
|
||||
|
||||
def test_variadic_tuple_annotation_ok_and_errors(self):
|
||||
class A(dm.BaseAppliance):
|
||||
xs: tuple[int, ...] = (1, 2, 3)
|
||||
|
||||
self.assertEqual(A().xs, (1, 2, 3))
|
||||
|
||||
class B(dm.BaseAppliance):
|
||||
xs: "Tuple[int, ...]" = (4, 5)
|
||||
|
||||
self.assertEqual(B().xs, (4, 5))
|
||||
|
||||
# wrong inner type should fail
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _Bad(dm.BaseAppliance):
|
||||
xs: tuple[int, ...] = (1, "x")
|
||||
|
||||
def test_field_override_typechecked_and_instance_local(self):
|
||||
class App(dm.BaseAppliance):
|
||||
a: list[int] = [1]
|
||||
|
||||
app1 = App(a=[9])
|
||||
app2 = App()
|
||||
# deepfreeze: lists become tuples
|
||||
self.assertEqual(app1.a, (9,))
|
||||
self.assertEqual(app2.a, (1,)) # no leakage
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App(a=["oops"]) # wrong inner type
|
||||
|
||||
def test_runtime_attach_bound_feature_success(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
class Extra(App.F1): # stays bound to App
|
||||
val = 7
|
||||
|
||||
app = App(Extra=Extra)
|
||||
self.assertTrue(hasattr(app, "Extra"))
|
||||
self.assertIsInstance(app.Extra, Extra)
|
||||
self.assertEqual(app.Extra.val, 7)
|
||||
|
||||
def test_cant_override_inherited_feature_annotation(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
val: int = 1
|
||||
|
||||
with self.assertRaises(dm.ReadOnlyFieldAnnotation):
|
||||
|
||||
class Extra(App.F1):
|
||||
val: str = "test"
|
||||
|
||||
def test_feature_fields_are_frozen_after_override(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F(dm.BaseFeature):
|
||||
nums: list[int] = [1, 2]
|
||||
tag: str = "x"
|
||||
|
||||
# dict override
|
||||
app1 = App(F={"nums": [9], "tag": "y"})
|
||||
self.assertEqual(app1.F.nums, (9,))
|
||||
self.assertEqual(app1.F.tag, "y")
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.F.nums.append(3) # tuple
|
||||
|
||||
# subclass override
|
||||
class F2(App.F):
|
||||
nums = [4, 5]
|
||||
|
||||
app2 = App(F=F2)
|
||||
self.assertEqual(app2.F.nums, (4, 5))
|
||||
with self.assertRaises(dm.ReadOnlyField):
|
||||
app2.F.nums += (6,) # still immutable
|
||||
|
||||
def test_feature_dict_partial_override_keeps_other_defaults(self):
|
||||
class App(dm.BaseAppliance):
|
||||
class F(dm.BaseFeature):
|
||||
a: int = 1
|
||||
b: str = "k"
|
||||
|
||||
app = App(F={"b": "z"})
|
||||
self.assertEqual(app.F.a, 1) # default remains
|
||||
self.assertEqual(app.F.b, "z") # overridden
|
||||
|
||||
def test_root_field_override_nonexisting_rejected(self):
|
||||
class App(dm.BaseAppliance):
|
||||
x: int = 1
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
App(y=2) # not in schema -> unknown parameter
|
||||
|
||||
def test_feature_override_linear_chain(self):
|
||||
# Base appliance defines Feat1
|
||||
class A(dm.BaseAppliance):
|
||||
class Feat1(dm.BaseFeature):
|
||||
x: int = 1
|
||||
|
||||
# ✅ Appliance B overrides Feat1 by subclassing A.Feat1
|
||||
class B(A):
|
||||
class Feat1(A.Feat1):
|
||||
y: int = 2
|
||||
|
||||
self.assertTrue(issubclass(B.Feat1, A.Feat1))
|
||||
|
||||
# ✅ Appliance C overrides Feat1 again by subclassing B.Feat1 (not A.Feat1)
|
||||
class C(B):
|
||||
class Feat1(B.Feat1):
|
||||
z: int = 3
|
||||
|
||||
self.assertTrue(issubclass(C.Feat1, B.Feat1))
|
||||
self.assertTrue(issubclass(C.Feat1, A.Feat1))
|
||||
|
||||
# ❌ Bad: D tries to override with a *fresh* BaseFeature, not subclass of B.Feat1
|
||||
with self.assertRaises(dm.InvalidFeatureInheritance):
|
||||
|
||||
class D(B):
|
||||
class Feat1(dm.BaseFeature):
|
||||
fail: str = "oops"
|
||||
|
||||
# ❌ Bad: E tries to override with ancestor (A.Feat1) instead of B.Feat1
|
||||
with self.assertRaises(dm.InvalidFeatureInheritance):
|
||||
|
||||
class E(B):
|
||||
class Feat1(A.Feat1):
|
||||
fail: str = "oops"
|
||||
|
||||
# ✅ New feature name in child is always fine
|
||||
class F(B):
|
||||
class Feat2(dm.BaseFeature):
|
||||
other: str = "ok"
|
||||
|
||||
self.assertTrue(hasattr(F, "Feat2"))
|
||||
|
||||
def test_feature_override_chain_runtime_replacement(self):
|
||||
# Build a linear chain: A -> B -> C for feature 'Feat1'
|
||||
class A(dm.BaseAppliance):
|
||||
class Feat1(dm.BaseFeature):
|
||||
x: int = 1
|
||||
|
||||
class B(A):
|
||||
class Feat1(A.Feat1):
|
||||
y: int = 2
|
||||
|
||||
class C(B):
|
||||
class Feat1(B.Feat1):
|
||||
z: int = 3
|
||||
|
||||
# ✅ OK: at instantiation of C, replacing Feat1 with a subclass of the LATEST (C.Feat1)
|
||||
class CFeat1Plus(C.Feat1):
|
||||
w: int = 4
|
||||
|
||||
c_ok = C(Feat1=CFeat1Plus)
|
||||
self.assertIsInstance(c_ok.Feat1, CFeat1Plus)
|
||||
self.assertEqual(
|
||||
(c_ok.Feat1.x, c_ok.Feat1.y, c_ok.Feat1.z, c_ok.Feat1.w), (1, 2, 3, 4)
|
||||
)
|
||||
|
||||
# ❌ Not OK: replacing with a subclass of the ancestor (A.Feat1) — must target latest (C.Feat1)
|
||||
class AFeat1Alt(A.Feat1):
|
||||
pass
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
C(Feat1=AFeat1Alt)
|
||||
|
||||
# ❌ Not OK: replacing with a subclass of the mid ancestor (B.Feat1) — still must target latest (C.Feat1)
|
||||
class BFeat1Alt(B.Feat1):
|
||||
pass
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
C(Feat1=BFeat1Alt)
|
||||
|
||||
def test_feature_inheritance_tree_and_no_leakage(self):
|
||||
class A(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
a: int = 1
|
||||
|
||||
class F2(dm.BaseFeature):
|
||||
b: int = 2
|
||||
|
||||
# ✅ Child inherits both features automatically
|
||||
class B(A):
|
||||
c: str = "extra"
|
||||
|
||||
b1 = B()
|
||||
self.assertIsInstance(b1.F1, A.F1)
|
||||
self.assertIsInstance(b1.F2, A.F2)
|
||||
self.assertEqual((b1.F1.a, b1.F2.b, b1.c), (1, 2, "extra"))
|
||||
|
||||
# ✅ Override only F2, F1 should still come from A
|
||||
class C(B):
|
||||
class F2(B.F2):
|
||||
bb: int = 22
|
||||
|
||||
c1 = C()
|
||||
self.assertIsInstance(c1.F1, A.F1) # unchanged
|
||||
self.assertIsInstance(c1.F2, C.F2) # overridden
|
||||
self.assertEqual((c1.F1.a, c1.F2.b, c1.F2.bb), (1, 2, 22))
|
||||
|
||||
# ✅ No leakage: instances of B are not affected by C's override
|
||||
b2 = B()
|
||||
self.assertIsInstance(b2.F2, A.F2)
|
||||
self.assertFalse(hasattr(b2.F2, "bb"))
|
||||
|
||||
# ✅ Adding a new feature in D is independent of previous appliances
|
||||
class D(C):
|
||||
class F3(dm.BaseFeature):
|
||||
d: int = 3
|
||||
|
||||
d1 = D()
|
||||
self.assertIsInstance(d1.F1, A.F1)
|
||||
self.assertIsInstance(d1.F2, C.F2)
|
||||
self.assertIsInstance(d1.F3, D.F3)
|
||||
|
||||
# ✅ No leakage: instances of A and B should not see F3
|
||||
a1 = A()
|
||||
self.assertFalse(hasattr(a1, "F3"))
|
||||
b3 = B()
|
||||
self.assertFalse(hasattr(b3, "F3"))
|
||||
|
||||
def test_appliance_inheritance_tree_feature_isolation(self):
|
||||
class A(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
a: int = 1
|
||||
|
||||
# Branch 1 overrides F1
|
||||
class B(A):
|
||||
class F1(A.F1):
|
||||
b: int = 2
|
||||
|
||||
# Branch 2 also overrides F1 differently
|
||||
class C(A):
|
||||
class F1(A.F1):
|
||||
c: int = 3
|
||||
|
||||
# ✅ Instances of B use B.F1
|
||||
b = B()
|
||||
self.assertIsInstance(b.F1, B.F1)
|
||||
self.assertEqual((b.F1.a, b.F1.b), (1, 2))
|
||||
self.assertFalse(hasattr(b.F1, "c"))
|
||||
|
||||
# ✅ Instances of C use C.F1
|
||||
c = C()
|
||||
self.assertIsInstance(c.F1, C.F1)
|
||||
self.assertEqual((c.F1.a, c.F1.c), (1, 3))
|
||||
self.assertFalse(hasattr(c.F1, "b"))
|
||||
|
||||
# ✅ Base appliance A still uses its original feature
|
||||
a = A()
|
||||
self.assertIsInstance(a.F1, A.F1)
|
||||
self.assertEqual(a.F1.a, 1)
|
||||
self.assertFalse(hasattr(a.F1, "b"))
|
||||
self.assertFalse(hasattr(a.F1, "c"))
|
||||
|
||||
# ✅ No leakage: B's override doesn't affect C and vice versa
|
||||
b2 = B()
|
||||
c2 = C()
|
||||
self.assertTrue(hasattr(b2.F1, "b"))
|
||||
self.assertFalse(hasattr(b2.F1, "c"))
|
||||
self.assertTrue(hasattr(c2.F1, "c"))
|
||||
self.assertFalse(hasattr(c2.F1, "b"))
|
||||
|
||||
def test_appliance_inheritance_tree_runtime_attach_isolation(self):
|
||||
class A(dm.BaseAppliance):
|
||||
class F1(dm.BaseFeature):
|
||||
a: int = 1
|
||||
|
||||
class B(A):
|
||||
class F1(A.F1):
|
||||
b: int = 2
|
||||
|
||||
class C(A):
|
||||
class F1(A.F1):
|
||||
c: int = 3
|
||||
|
||||
# Define new runtime-attachable features
|
||||
class FextraB(B.F1):
|
||||
xb: int = 99
|
||||
|
||||
class FextraC(C.F1):
|
||||
xc: int = -99
|
||||
|
||||
# ✅ Attach to B at instantiation
|
||||
b = B(F1=FextraB)
|
||||
self.assertIsInstance(b.F1, FextraB)
|
||||
self.assertEqual((b.F1.a, b.F1.b, b.F1.xb), (1, 2, 99))
|
||||
self.assertFalse(hasattr(b.F1, "c"))
|
||||
self.assertFalse(hasattr(b.F1, "xc"))
|
||||
|
||||
# ✅ Attach to C at instantiation
|
||||
c = C(F1=FextraC)
|
||||
self.assertIsInstance(c.F1, FextraC)
|
||||
self.assertEqual((c.F1.a, c.F1.c, c.F1.xc), (1, 3, -99))
|
||||
self.assertFalse(hasattr(c.F1, "b"))
|
||||
self.assertFalse(hasattr(c.F1, "xb"))
|
||||
|
||||
# ✅ Base appliance still untouched
|
||||
a = A()
|
||||
self.assertIsInstance(a.F1, A.F1)
|
||||
self.assertEqual(a.F1.a, 1)
|
||||
self.assertFalse(hasattr(a.F1, "b"))
|
||||
self.assertFalse(hasattr(a.F1, "c"))
|
||||
self.assertFalse(hasattr(a.F1, "xb"))
|
||||
self.assertFalse(hasattr(a.F1, "xc"))
|
||||
|
||||
# ✅ Repeated instantiations stay isolated
|
||||
b2 = B()
|
||||
c2 = C()
|
||||
self.assertIsInstance(b2.F1, B.F1)
|
||||
self.assertIsInstance(c2.F1, C.F1)
|
||||
self.assertFalse(hasattr(b2.F1, "xb"))
|
||||
self.assertFalse(hasattr(c2.F1, "xc"))
|
||||
|
||||
|
||||
# ---------- main ----------
|
||||
|
||||
|
||||
Reference in New Issue
Block a user