Compare commits
7 Commits
master
...
0.0.1.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
608c8a1010 | ||
|
|
210781f086 | ||
|
|
df966ccac4 | ||
|
|
29827b51bc | ||
|
|
7440731135 | ||
|
|
87682c2c9c | ||
|
|
0eef35e36f |
@@ -1,13 +1,20 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
|
||||
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
<path>/${PROJECT_DIR_NAME}/src</path>
|
||||
<path>/${PROJECT_DIR_NAME}</path>
|
||||
</pydev_pathproperty>
|
||||
|
||||
</pydev_project>
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Python311</pydev_property>
|
||||
|
||||
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python interpreter</pydev_property>
|
||||
|
||||
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
|
||||
<path>/${PROJECT_DIR_NAME}/src</path>
|
||||
|
||||
<path>/${PROJECT_DIR_NAME}</path>
|
||||
|
||||
</pydev_pathproperty>
|
||||
|
||||
|
||||
</pydev_project>
|
||||
|
||||
@@ -34,7 +34,9 @@ classifiers = [
|
||||
]
|
||||
dependencies = [
|
||||
'importlib-metadata; python_version<"3.9"',
|
||||
'packaging'
|
||||
'packaging',
|
||||
'pydantic',
|
||||
'runtype'
|
||||
]
|
||||
dynamic = ["version"]
|
||||
|
||||
@@ -78,7 +80,7 @@ test = ["chacha_cicd_helper"]
|
||||
coverage-check = ["chacha_cicd_helper"]
|
||||
complexity-check = ["chacha_cicd_helper"]
|
||||
quality-check = ["chacha_cicd_helper"]
|
||||
type-check = ["chacha_cicd_helper"]
|
||||
type-check = ["chacha_cicd_helper","types-pytz"]
|
||||
doc-gen = ["chacha_cicd_helper"]
|
||||
|
||||
# [project.scripts]
|
||||
|
||||
@@ -11,4 +11,17 @@ Main module __init__ file.
|
||||
"""
|
||||
|
||||
from .__metadata__ import __version__, __Summuary__, __Name__
|
||||
from .model import DABField, BaseFeature, BaseAppliance, default_values_override
|
||||
from .model import (
|
||||
BaseAppliance,
|
||||
BaseFeature,
|
||||
DABModelException,
|
||||
MultipleInheritanceForbidden,
|
||||
BrokenInheritance,
|
||||
ReadOnlyField,
|
||||
NewFieldForbidden,
|
||||
NotAnnotatedField,
|
||||
ReadOnlyFieldAnnotation,
|
||||
InvalidFieldValue,
|
||||
InvalidFieldAnnotation,
|
||||
IncompletelyAnnotatedField,
|
||||
)
|
||||
|
||||
@@ -1,267 +1,297 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from abc import ABC, ABCMeta, abstractmethod
|
||||
from uuid import uuid4
|
||||
from typing import Annotated, ClassVar, Any, Self, TypeVar, TypeAlias, Generic, Union
|
||||
from datetime import datetime
|
||||
from typing import Optional, TypeVar, Generic, Union, get_origin, get_args, List, Dict, Any, Tuple, Set, Annotated, FrozenSet
|
||||
from types import UnionType
|
||||
from frozendict import deepfreeze
|
||||
from copy import deepcopy, copy
|
||||
from typing_extensions import dataclass_transform, get_origin
|
||||
from pydantic import (
|
||||
ConfigDict,
|
||||
BaseModel,
|
||||
StrictInt,
|
||||
StrictStr,
|
||||
constr,
|
||||
ByteSize,
|
||||
AwareDatetime,
|
||||
UUID4,
|
||||
model_validator,
|
||||
field_validator,
|
||||
field_serializer,
|
||||
SerializeAsAny,
|
||||
)
|
||||
from pydantic.fields import Field, _Unset, PydanticUndefined
|
||||
from pydantic._internal._model_construction import ModelMetaclass, PydanticModelField
|
||||
from pydantic._internal._generics import PydanticGenericMetadata
|
||||
from pydantic._internal._decorators import ensure_classmethod_based_on_signature
|
||||
import pytz
|
||||
import inspect
|
||||
from pprint import pprint
|
||||
|
||||
from runtype import issubclass as runtype_issubclass
|
||||
from typeguard import check_type, TypeCheckError, CollectionCheckStrategy
|
||||
|
||||
ALLOWED_ANNOTATIONS = {
|
||||
"Union": Union,
|
||||
"Optional": Optional,
|
||||
"List": List,
|
||||
"Dict": Dict,
|
||||
"Tuple": Tuple,
|
||||
"Set": Set,
|
||||
"FrozenSet": FrozenSet,
|
||||
"Annotated": Annotated,
|
||||
# builtins:
|
||||
"int": int,
|
||||
"str": str,
|
||||
"float": float,
|
||||
"bool": bool,
|
||||
"complex": complex,
|
||||
"bytes": bytes,
|
||||
"None": type(None),
|
||||
"list": list,
|
||||
"dict": dict,
|
||||
"set": set,
|
||||
"frozenset": frozenset,
|
||||
"tuple": tuple,
|
||||
}
|
||||
|
||||
ALLOWED_MODEL_FIELDS_TYPES = (str, int, float, complex, bool, bytes)
|
||||
ALLOWED_MODEL_FIELDS_CONTAINERS = (dict, list, set, frozenset, tuple)
|
||||
|
||||
|
||||
class NoInstanceMethod:
|
||||
"""Descriptor to forbid that other descriptors can be looked up on an instance"""
|
||||
|
||||
def __init__(self, descr, name=None):
|
||||
self.descr = descr
|
||||
self.name = name
|
||||
|
||||
def __set_name__(self, owner, name):
|
||||
self.name = name
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
# enforce the instance cannot look up the attribute at all
|
||||
if instance is not None:
|
||||
raise AttributeError(f"{type(instance).__name__!r} has no attribute {self.name!r}")
|
||||
# invoke any descriptor we are wrapping
|
||||
return self.descr.__get__(instance, owner)
|
||||
TV_ALLOWED_MODEL_FIELDS_TYPES = TypeVar("TV_ALLOWED_MODEL_FIELDS_TYPES", *ALLOWED_MODEL_FIELDS_TYPES, *ALLOWED_MODEL_FIELDS_CONTAINERS)
|
||||
|
||||
|
||||
def DABField(default: Any = PydanticUndefined, *, final: bool | None = _Unset, finalize_only: bool | None = _Unset, **kwargs):
|
||||
return Field(default, **kwargs)
|
||||
class DABModelException(Exception): ...
|
||||
|
||||
|
||||
T_BaseElement = TypeVar("T_BaseElement", bound="BaseElement")
|
||||
T_BaseElement_ConfigMethod_Arg: TypeAlias = dict[str, Any]
|
||||
T_BaseElement_ConfigMethod: TypeAlias = "classmethod[T_BaseElement, [T_BaseElement_ConfigMethod_Arg], T_BaseElement_ConfigMethod_Arg]"
|
||||
class MultipleInheritanceForbidden(DABModelException): ...
|
||||
|
||||
|
||||
class ConfigElement:
|
||||
def __init__(self) -> None:
|
||||
self.default_values_override_methods: dict[T_BaseElement_ConfigMethod, None] = {}
|
||||
self.main_build_method: dict[T_BaseElement_ConfigMethod, None] = {}
|
||||
|
||||
def __copy__(self) -> Self:
|
||||
# we cannot deepcopy because of classmethods, so we do a manual enhanced copy
|
||||
cls = self.__class__
|
||||
result = cls.__new__(cls)
|
||||
for k, v in self.__dict__.items():
|
||||
setattr(result, k, copy(v))
|
||||
return result
|
||||
class BrokenInheritance(DABModelException): ...
|
||||
|
||||
|
||||
class IBaseElement(BaseModel, ABC):
|
||||
_config_element: ClassVar[ConfigElement] = ConfigElement()
|
||||
class ReadOnlyField(DABModelException): ...
|
||||
|
||||
|
||||
@dataclass_transform(kw_only_default=True, field_specifiers=(PydanticModelField,))
|
||||
class BaseElementMeta(ModelMetaclass, ABCMeta):
|
||||
def __new__(
|
||||
mcs,
|
||||
cls_name: str,
|
||||
bases: tuple[type[Any], ...],
|
||||
namespace: dict[str, Any],
|
||||
__pydantic_generic_metadata__: PydanticGenericMetadata | None = None,
|
||||
__pydantic_reset_parent_namespace__: bool = True,
|
||||
_create_model_module: str | None = None,
|
||||
**kwargs: Any,
|
||||
) -> type:
|
||||
result = super().__new__(
|
||||
mcs,
|
||||
cls_name,
|
||||
bases,
|
||||
namespace,
|
||||
__pydantic_generic_metadata__,
|
||||
__pydantic_reset_parent_namespace__,
|
||||
_create_model_module,
|
||||
**kwargs,
|
||||
)
|
||||
print(cls_name)
|
||||
class NewFieldForbidden(DABModelException): ...
|
||||
|
||||
assert issubclass(result, IBaseElement), "Only IBaseElement subclasses are supported"
|
||||
|
||||
# forcing all Fields to be frozen
|
||||
for _, field_val in result.model_fields.items():
|
||||
field_val.frozen = True
|
||||
class InvalidFieldAnnotation(DABModelException): ...
|
||||
|
||||
# copying/forwarding base classes default-configs
|
||||
if "_config_element" not in result.__dict__:
|
||||
assert result.__base__ is not None, "Only IBaseElement subclasses are supported"
|
||||
if issubclass(result.__base__, IBaseElement):
|
||||
result._config_element = copy(result.__base__._config_element)
|
||||
|
||||
class NotAnnotatedField(InvalidFieldAnnotation): ...
|
||||
|
||||
|
||||
class IncompletelyAnnotatedField(InvalidFieldAnnotation): ...
|
||||
|
||||
|
||||
class ReadOnlyFieldAnnotation(DABModelException): ...
|
||||
|
||||
|
||||
class InvalidFieldValue(DABModelException): ...
|
||||
|
||||
|
||||
def _resolve_annotation(ann):
|
||||
if isinstance(ann, str):
|
||||
# Safe eval against a **whitelist** only
|
||||
return eval(ann, {"__builtins__": {}}, ALLOWED_ANNOTATIONS)
|
||||
return ann
|
||||
|
||||
|
||||
def _peel_annotated(t: Any) -> Any:
|
||||
# If you ever allow Annotated[T, ...], peel to T
|
||||
while True:
|
||||
origin = get_origin(t)
|
||||
if origin is None:
|
||||
return t
|
||||
name = getattr(origin, "__name__", "") or getattr(origin, "__qualname__", "") or str(origin)
|
||||
if "Annotated" in name:
|
||||
args = get_args(t)
|
||||
t = args[0] if args else t
|
||||
else:
|
||||
return t
|
||||
|
||||
|
||||
def __check_annotation_definition__(_type) -> bool:
|
||||
_type = _peel_annotated(_type)
|
||||
# handle Optional[] and Union[None,...]
|
||||
if (get_origin(_type) is Union or get_origin(_type) is UnionType) and type(None) in get_args(_type):
|
||||
return all([__check_annotation_definition__(_) for _ in get_args(_type) if _ is not type(None)])
|
||||
|
||||
# handle other Union[...]
|
||||
if get_origin(_type) is Union or get_origin(_type) is UnionType:
|
||||
return all([__check_annotation_definition__(_) for _ in get_args(_type)])
|
||||
|
||||
# handle Dict[...]
|
||||
if get_origin(_type) is dict:
|
||||
inner = get_args(_type)
|
||||
if len(inner) != 2:
|
||||
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {_type}")
|
||||
return _peel_annotated(inner[0]) in ALLOWED_MODEL_FIELDS_TYPES and __check_annotation_definition__(inner[1])
|
||||
|
||||
# handle Set[],Tuple[],FrozenSet[],List[]
|
||||
if get_origin(_type) in [set, frozenset, tuple, list]:
|
||||
inner_types = get_args(_type)
|
||||
if len(inner_types) == 0:
|
||||
raise IncompletelyAnnotatedField(f"Annotation requires inner definition: {_type}")
|
||||
return all([__check_annotation_definition__(_) for _ in inner_types])
|
||||
|
||||
if _type in ALLOWED_MODEL_FIELDS_TYPES:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class Constraint: ...
|
||||
|
||||
|
||||
def _deepfreeze(value):
|
||||
if isinstance(value, dict):
|
||||
return deepfreeze(value)
|
||||
elif isinstance(value, set):
|
||||
return frozenset(_deepfreeze(v) for v in value)
|
||||
elif isinstance(value, list):
|
||||
return tuple(_deepfreeze(v) for v in value)
|
||||
elif isinstance(value, tuple):
|
||||
return tuple(_deepfreeze(v) for v in value)
|
||||
return value
|
||||
|
||||
|
||||
class DABField(Generic[TV_ALLOWED_MODEL_FIELDS_TYPES]):
|
||||
def __init__(self, name: str, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES], a: Any):
|
||||
self._constraints: List[Constraint] = []
|
||||
self._name: str = name
|
||||
self._source: Optional[type] = None
|
||||
self._default_value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
|
||||
self._value: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = v
|
||||
self._annotations: Any = a
|
||||
self._documentation: str = ""
|
||||
|
||||
def add_documentation(self, d: str) -> None:
|
||||
self._documentation = d
|
||||
|
||||
def add_source(self, s: type) -> None:
|
||||
self._source = s
|
||||
|
||||
def add_constraint(self, c: Constraint) -> None:
|
||||
self._constraints.append(c)
|
||||
|
||||
def update_value(self, v: Optional[TV_ALLOWED_MODEL_FIELDS_TYPES] = None) -> None:
|
||||
self._value = v
|
||||
|
||||
def render(self) -> TV_ALLOWED_MODEL_FIELDS_TYPES:
|
||||
return _deepfreeze(self._value)
|
||||
|
||||
def render_default(self) -> TV_ALLOWED_MODEL_FIELDS_TYPES:
|
||||
return _deepfreeze(self._default_value)
|
||||
|
||||
def get_annotation(self) -> Any:
|
||||
return self._annotations
|
||||
|
||||
|
||||
class BaseMeta(type):
|
||||
def __new__(mcls, name, bases, namespace):
|
||||
# print("__NEW__ Defining:", name, "with keys:", list(namespace))
|
||||
|
||||
if len(bases) > 1:
|
||||
raise MultipleInheritanceForbidden("Multiple inheritance is not supported by dabmodel")
|
||||
elif len(bases) == 0: # base class (BaseElement)
|
||||
namespace["__DABSchema__"] = dict()
|
||||
else: # standard inheritance
|
||||
# check class tree origin
|
||||
if "__DABSchema__" not in dir(bases[0]):
|
||||
raise BrokenInheritance("__DABSchema__ not found in base class, broken inheritance chain.")
|
||||
# copy inherited schema
|
||||
namespace["__DABSchema__"] = copy(bases[0].__DABSchema__)
|
||||
|
||||
# force field without default value to be instantiated (with None)
|
||||
if "__annotations__" in namespace:
|
||||
for _funknown in [_ for _ in namespace["__annotations__"] if _ not in namespace.keys()]:
|
||||
namespace[_funknown] = None
|
||||
|
||||
# iterating new and modified fields
|
||||
modified_field: Dict[str, Any] = {}
|
||||
new_fields: Dict[str, DABField] = {}
|
||||
for _fname, _fvalue in namespace.items():
|
||||
if _fname.startswith("__"):
|
||||
pass
|
||||
elif _fname == "Constraints" and type(_fvalue) is type:
|
||||
...
|
||||
# print("FOUND Constraints")
|
||||
else:
|
||||
result._config_element = ConfigElement()
|
||||
# print(f"Parsing Field: {_fname} / {_fvalue}")
|
||||
# Modified fields
|
||||
if len(bases) == 1 and _fname in namespace["__DABSchema__"].keys():
|
||||
# print(f"Modified field: {_fname}")
|
||||
if _fname in namespace["__annotations__"]:
|
||||
raise ReadOnlyFieldAnnotation("annotations cannot be modified on derived classes")
|
||||
try:
|
||||
check_type(
|
||||
_fvalue,
|
||||
namespace["__DABSchema__"][_fname].get_annotation(),
|
||||
collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS,
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Field <{_fname}> New Field value is not of expected type {bases[0].__annotations__[_fname]}."
|
||||
) from exp
|
||||
modified_field[_fname] = _fvalue
|
||||
# New fieds
|
||||
else:
|
||||
# print(f"New field: {_fname}")
|
||||
# print(f"type is: {type(_fvalue)}")
|
||||
# print(f"value is: {_fvalue}")
|
||||
|
||||
# searching and storing current class default-configs
|
||||
for _, method in result.__dict__.items():
|
||||
if isinstance(method, classmethod):
|
||||
if hasattr(method, "default_values_override"):
|
||||
result._config_element.default_values_override_methods[method] = None
|
||||
# check if field is annotated
|
||||
if "__annotations__" not in namespace or _fname not in namespace["__annotations__"]:
|
||||
raise NotAnnotatedField(f"Every dabmodel Fields must be annotated ({_fname})")
|
||||
|
||||
# todo: find a way to 'lock' and add restriction to a field after inheritance
|
||||
# check if annotation is allowed
|
||||
if isinstance(namespace["__annotations__"][_fname], str):
|
||||
namespace["__annotations__"][_fname] = _resolve_annotation(namespace["__annotations__"][_fname])
|
||||
|
||||
return result
|
||||
if not __check_annotation_definition__(namespace["__annotations__"][_fname]):
|
||||
raise InvalidFieldAnnotation(f"Field <{_fname}> has not an allowed or valid annotation.")
|
||||
|
||||
# print(f"annotation is: {namespace['__annotations__'][_fname]}")
|
||||
# check if value is valid
|
||||
try:
|
||||
check_type(
|
||||
_fvalue, namespace["__annotations__"][_fname], collection_check_strategy=CollectionCheckStrategy.ALL_ITEMS
|
||||
)
|
||||
except TypeCheckError as exp:
|
||||
raise InvalidFieldValue(
|
||||
f"Value of Field <{_fname}> is not of expected type {namespace['__annotations__'][_fname]}."
|
||||
) from exp
|
||||
new_fields[_fname] = DABField(_fname, _fvalue, namespace["__annotations__"][_fname])
|
||||
# namespace[_fname].add_documentation()
|
||||
|
||||
class BaseElement(
|
||||
IBaseElement,
|
||||
ABC,
|
||||
validate_assignment=True,
|
||||
# revalidate_instances="subclass-instances", # pydantic issue #10681
|
||||
validate_default=True,
|
||||
extra="forbid",
|
||||
metaclass=BaseElementMeta,
|
||||
):
|
||||
class Config:
|
||||
ignored_types = (NoInstanceMethod,)
|
||||
# removing modified fields from class (will add them back later)
|
||||
for _fname in new_fields.keys():
|
||||
del namespace[_fname]
|
||||
for _fname in modified_field.keys():
|
||||
del namespace[_fname]
|
||||
|
||||
template_id: Annotated[UUID4, DABField(..., repr=True)]
|
||||
template_short_name: Annotated[
|
||||
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
|
||||
]
|
||||
template_long_name: Annotated[StrictStr | None, DABField()]
|
||||
template_description: Annotated[StrictStr | None, DABField()]
|
||||
_saved_default_value: ClassVar[dict[str, Any]]
|
||||
orig_setattr = namespace.get("__setattr__", object.__setattr__)
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def __default_values_override_hook__(cls, values: T_BaseElement_ConfigMethod_Arg) -> T_BaseElement_ConfigMethod_Arg:
|
||||
# extracting default values that were set in model fields
|
||||
cls._saved_default_value = dict()
|
||||
for field_key, field_val in cls.model_fields.items():
|
||||
assert field_val.annotation is not None, "all fields must have annotation"
|
||||
assert not runtype_issubclass(
|
||||
field_val.annotation, BaseFeature
|
||||
), "Features can only be in Appliance's features[] dict attribute"
|
||||
"""
|
||||
if field_key == "features":
|
||||
cls._saved_default_value[field_key] = dict()
|
||||
for feat_key, feat_value in field_val.items():
|
||||
cls._saved_default_value[field_key][feat_key] = feat_value.dict()
|
||||
"""
|
||||
if field_val.default != PydanticUndefined:
|
||||
cls._saved_default_value[field_key] = deepcopy(field_val.default)
|
||||
for method, _ in cls._config_element.default_values_override_methods.items():
|
||||
method.__func__(cls, cls._saved_default_value)
|
||||
|
||||
cls._default_values_override_hook__input_apply__(values)
|
||||
|
||||
return cls._saved_default_value
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def _default_values_override_hook__input_apply__(
|
||||
cls,
|
||||
values: T_BaseElement_ConfigMethod_Arg,
|
||||
): ...
|
||||
|
||||
|
||||
class BaseFeature(BaseElement, ABC):
|
||||
@NoInstanceMethod
|
||||
@classmethod
|
||||
def _default_values_override_hook__input_apply__(
|
||||
cls,
|
||||
values: T_BaseElement_ConfigMethod_Arg,
|
||||
):
|
||||
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls}")
|
||||
# applying user-defined values
|
||||
for attr_key, attr_val in values.items():
|
||||
assert attr_key in cls.model_fields, f"given feature attribute does not exist ({attr_key})"
|
||||
cls._saved_default_value[attr_key] = attr_val
|
||||
print(f"BaseFeature._default_values_override_hook__input_apply__ {cls} DONE")
|
||||
|
||||
|
||||
def default_values_override(func: T_BaseElement_ConfigMethod) -> T_BaseElement_ConfigMethod:
|
||||
func = ensure_classmethod_based_on_signature(func)
|
||||
setattr(func, "default_values_override", lambda: True)
|
||||
return func
|
||||
|
||||
|
||||
T_Feature = TypeVar("T_Feature", bound=BaseFeature)
|
||||
|
||||
|
||||
def get_discriminator_value(v: Any) -> str:
|
||||
if isinstance(v, dict):
|
||||
return v.get("fruit", v.get("filling"))
|
||||
return getattr(v, "fruit", getattr(v, "filling", None))
|
||||
|
||||
|
||||
class BaseAppliance(Generic[T_Feature], BaseElement, ABC):
|
||||
|
||||
cpu_cnt: Annotated[StrictInt, DABField(1, gt=0)]
|
||||
ram_size: Annotated[ByteSize, DABField(256, gt=128)]
|
||||
swap_size: Annotated[ByteSize, DABField(200, ge=0)]
|
||||
|
||||
rootfs_size: Annotated[ByteSize, DABField(2048, ge=2048)]
|
||||
|
||||
dabinst_id: Annotated[UUID4, DABField(uuid4(), repr=True)]
|
||||
dabinst_short_name: Annotated[
|
||||
StrictStr, constr(strip_whitespace=True, to_lower=True, strict=True, max_length=16), DABField(..., repr=True)
|
||||
]
|
||||
dabinst_long_name: Annotated[StrictStr | None, DABField("")]
|
||||
dabinst_description: Annotated[StrictStr | None, DABField("")]
|
||||
dabinst_creationdate: Annotated[AwareDatetime | None, DABField(datetime.now(tz=pytz.utc))]
|
||||
|
||||
features: SerializeAsAny[dict[str, T_Feature]] = DABField({})
|
||||
|
||||
@NoInstanceMethod
|
||||
@classmethod
|
||||
def add_feature(cls, feat: T_Feature):
|
||||
cls._saved_default_value["features"][type(feat).__name__] = feat.dict()
|
||||
|
||||
@NoInstanceMethod
|
||||
@classmethod
|
||||
def del_feature(cls, type_feat: type[T_Feature]):
|
||||
del cls._saved_default_value["features"][type_feat.__name__]
|
||||
|
||||
@NoInstanceMethod
|
||||
@classmethod
|
||||
def get_feature(cls, type_feat: type[T_Feature]) -> T_Feature:
|
||||
return cls._saved_default_value["features"][type_feat.__name__]
|
||||
|
||||
@NoInstanceMethod
|
||||
@classmethod
|
||||
def _default_values_override_hook__input_apply__(
|
||||
cls,
|
||||
values: T_BaseElement_ConfigMethod_Arg,
|
||||
):
|
||||
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls}")
|
||||
# applying user-defined values
|
||||
for attr_key, attr_val in values.items():
|
||||
if attr_key == "features":
|
||||
if cls._saved_default_value["features"] is None:
|
||||
cls._saved_default_value["features"] = {}
|
||||
for feature_key, feature_val in attr_val.items():
|
||||
print(f"searching feature: {feature_key}")
|
||||
assert hasattr(cls, feature_key), f"feature not found ({feature_key})"
|
||||
cls_feature = getattr(cls, feature_key)
|
||||
assert (
|
||||
cls_feature is not None and inspect.isclass(cls_feature) and issubclass(cls_feature, BaseFeature),
|
||||
"The requested feature does not exist in the current Appliance class tree",
|
||||
)
|
||||
cls._saved_default_value["features"][feature_key] = cls_feature(**feature_val)
|
||||
def guarded_setattr(self, key, value):
|
||||
if key.startswith("_"): # allow private and dunder attrs
|
||||
return orig_setattr(self, key, value)
|
||||
# block writes after init if key is readonly
|
||||
if key in self.__DABSchema__.keys():
|
||||
if hasattr(self, key):
|
||||
raise ReadOnlyField(f"{key} is read-only")
|
||||
else:
|
||||
assert attr_key in cls.model_fields, f"given attribute does not exist ({attr_key})"
|
||||
cls._saved_default_value[attr_key] = attr_val
|
||||
print(f"BaseAppliance._default_values_override_hook__input_apply__ {cls} DONE")
|
||||
raise NewFieldForbidden(f"creating new fields is not allowed")
|
||||
|
||||
return orig_setattr(self, key, value)
|
||||
|
||||
namespace["__setattr__"] = guarded_setattr
|
||||
|
||||
cls = super().__new__(mcls, name, bases, namespace)
|
||||
|
||||
for _fname, _fvalue in modified_field.items():
|
||||
cls.__DABSchema__[_fname] = deepcopy(bases[0].__DABSchema__[_fname])
|
||||
cls.__DABSchema__[_fname].update_value(_fvalue)
|
||||
|
||||
for _fname, _fvalue in new_fields.items():
|
||||
_fvalue.add_source(cls)
|
||||
cls.__DABSchema__[_fname] = _fvalue
|
||||
|
||||
return cls
|
||||
|
||||
def __call__(cls, *args, **kw):
|
||||
obj = super().__call__(*args, **kw)
|
||||
|
||||
for _fname in cls.__DABSchema__.keys():
|
||||
setattr(obj, _fname, cls.__DABSchema__[_fname].render())
|
||||
# obj.__DABSchema__ = deepfreeze(obj.__DABSchema__)
|
||||
# setattr(obj, "__DABSchema__", deepfreeze(obj.__DABSchema__))
|
||||
return obj
|
||||
|
||||
|
||||
class BaseElement(metaclass=BaseMeta):
|
||||
pass
|
||||
|
||||
|
||||
class BaseFeature(BaseElement):
|
||||
pass
|
||||
|
||||
|
||||
class BaseAppliance(BaseElement):
|
||||
pass
|
||||
|
||||
13
src/dabmodel/tools.py
Normal file
13
src/dabmodel/tools.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
|
||||
class DABJSONEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, UUID):
|
||||
# if the obj is uuid, we simply return the value of uuid
|
||||
return obj.hex
|
||||
elif isinstance(obj, datetime):
|
||||
return str(obj)
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
@@ -1,37 +0,0 @@
|
||||
from pydantic import BaseModel, SerializeAsAny
|
||||
|
||||
|
||||
class commonbase(
|
||||
BaseModel,
|
||||
revalidate_instances="subclass-instances", # toogle to generate error
|
||||
): ...
|
||||
|
||||
|
||||
class basechild(commonbase):
|
||||
test_val: int = 1
|
||||
|
||||
|
||||
class derivedchild(basechild):
|
||||
test_val2: int = 2
|
||||
|
||||
|
||||
class container(commonbase):
|
||||
|
||||
ct_child_1: dict[str, basechild] = {}
|
||||
ct_child_2: SerializeAsAny[dict[str, basechild]] = {}
|
||||
ct_child_3: dict[str, SerializeAsAny[basechild]] = {}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_val = container(
|
||||
ct_child_1={"test1": derivedchild()},
|
||||
ct_child_2={"test2": derivedchild()},
|
||||
ct_child_3={"test3": derivedchild()},
|
||||
)
|
||||
|
||||
print(test_val.model_dump_json(indent=1))
|
||||
|
||||
print(test_val.model_dump())
|
||||
assert "test_val2" not in test_val.model_dump()["ct_child_1"]["test1"]
|
||||
assert "test_val2" in test_val.model_dump()["ct_child_2"]["test2"]
|
||||
assert "test_val2" in test_val.model_dump()["ct_child_3"]["test3"]
|
||||
@@ -7,215 +7,528 @@
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
import unittest
|
||||
from os import chdir
|
||||
from pathlib import Path
|
||||
|
||||
from pydantic import StrictInt, model_validator
|
||||
from pydantic.fields import Field
|
||||
import sys
|
||||
import subprocess
|
||||
from os import chdir, environ
|
||||
from pathlib import Path
|
||||
import textwrap
|
||||
from typing import List, Optional, Dict, Union, Tuple, Set, FrozenSet, TypeVar, Generic, Any, Annotated
|
||||
from pprint import pprint
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src import dabmodel
|
||||
from typing import Annotated, Any
|
||||
from uuid import uuid4
|
||||
import json
|
||||
from uuid import UUID
|
||||
from datetime import datetime
|
||||
from src import dabmodel as dm
|
||||
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
|
||||
class UUIDEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, UUID):
|
||||
# if the obj is uuid, we simply return the value of uuid
|
||||
return obj.hex
|
||||
elif isinstance(obj, datetime):
|
||||
return str(obj)
|
||||
return json.JSONEncoder.default(self, obj)
|
||||
class TestConfigWithoutEnabledFlag(unittest.TestCase):
|
||||
def setUp(self):
|
||||
print("\n->", unittest.TestCase.id(self))
|
||||
|
||||
def immutable_vars__test_field(self, obj: Any, name: str, default_value: Any, test_value: Any):
|
||||
# field is not in the class
|
||||
self.assertNotIn(name, dir(obj.__class__))
|
||||
# field is in the object
|
||||
self.assertIn(name, dir(obj))
|
||||
# field is in the schema
|
||||
self.assertIn(name, obj.__DABSchema__.keys())
|
||||
# field is readable
|
||||
self.assertEqual(getattr(obj, name), default_value)
|
||||
# field is read only
|
||||
with self.assertRaises(dm.ReadOnlyField):
|
||||
setattr(obj, name, test_value)
|
||||
|
||||
def test_immutable_fields(self):
|
||||
"""Testing first appliance level, and Field types (simple)"""
|
||||
|
||||
# class can be created
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
StrVar: str = "default value"
|
||||
StrVar2: str = "default value2"
|
||||
VarInt: int = 12
|
||||
VarInt2: int = 21
|
||||
VarFloat: float = 12.1
|
||||
VarFloat2: float = 21.2
|
||||
VarComplex: complex = complex(3, 5)
|
||||
VarComplex2: complex = complex(8, 6)
|
||||
VarBool: bool = True
|
||||
VarBool2: bool = False
|
||||
VarBytes: bytes = bytes.fromhex("2Ef0 F1f2 ")
|
||||
VarBytes2: bytes = bytes.fromhex("2ff0 F7f2 ")
|
||||
|
||||
app1 = Appliance1()
|
||||
|
||||
self.immutable_vars__test_field(app1, "StrVar", "default value", "test")
|
||||
self.immutable_vars__test_field(app1, "StrVar2", "default value2", "test2")
|
||||
self.immutable_vars__test_field(app1, "VarInt", 12, 13)
|
||||
self.immutable_vars__test_field(app1, "VarInt2", 21, 22)
|
||||
self.immutable_vars__test_field(app1, "VarFloat", 12.1, 32)
|
||||
self.immutable_vars__test_field(app1, "VarFloat2", 21.2, 42)
|
||||
self.immutable_vars__test_field(app1, "VarComplex", complex(3, 5), complex(1, 2))
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(app1, "VarBytes2", bytes.fromhex("2ff0 F7f2 "), bytes.fromhex("11f0 F1e2 "))
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: str = 12
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: int = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: float = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: complex = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: bool = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: bytes = "value"
|
||||
|
||||
def test_annotation(self):
|
||||
"""Testing first appliance level, and Field types (simple annotations)"""
|
||||
|
||||
# class can be created if annotation is a string
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
StrVar: "str" = "default value"
|
||||
StrVar2: "str" = "default value2"
|
||||
VarInt: "int" = 12
|
||||
VarInt2: "int" = 21
|
||||
VarFloat: "float" = 12.1
|
||||
VarFloat2: "float" = 21.2
|
||||
VarComplex: "complex" = complex(3, 5)
|
||||
VarComplex2: "complex" = complex(8, 6)
|
||||
VarBool: "bool" = True
|
||||
VarBool2: "bool" = False
|
||||
VarBytes: "bytes" = bytes.fromhex("2Ef0 F1f2 ")
|
||||
VarBytes2: "bytes" = bytes.fromhex("2ff0 F7f2 ")
|
||||
|
||||
app1 = Appliance1()
|
||||
|
||||
self.immutable_vars__test_field(app1, "StrVar", "default value", "test")
|
||||
self.immutable_vars__test_field(app1, "StrVar2", "default value2", "test2")
|
||||
self.immutable_vars__test_field(app1, "VarInt", 12, 13)
|
||||
self.immutable_vars__test_field(app1, "VarInt2", 21, 22)
|
||||
self.immutable_vars__test_field(app1, "VarFloat", 12.1, 32)
|
||||
self.immutable_vars__test_field(app1, "VarFloat2", 21.2, 42)
|
||||
self.immutable_vars__test_field(app1, "VarComplex", complex(3, 5), complex(1, 2))
|
||||
self.immutable_vars__test_field(app1, "VarComplex2", complex(8, 6), complex(3, 2))
|
||||
self.immutable_vars__test_field(app1, "VarBool", True, False)
|
||||
self.immutable_vars__test_field(app1, "VarBool2", False, True)
|
||||
self.immutable_vars__test_field(app1, "VarBytes", bytes.fromhex("2Ef0 F1f2 "), bytes.fromhex("11f0 F1f2 "))
|
||||
self.immutable_vars__test_field(app1, "VarBytes2", bytes.fromhex("2ff0 F7f2 "), bytes.fromhex("11f0 F1e2 "))
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "str" = 12
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "int" = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "float" = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "complex" = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "bool" = "value"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "bytes" = "value"
|
||||
|
||||
# class cannot be created if not annotated field
|
||||
with self.assertRaises(dm.NotAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_ = "default value"
|
||||
|
||||
def test_annotated(self):
|
||||
"""Testing first appliance level, and Field types (annotated one)"""
|
||||
|
||||
# class can be created if annotation is a string
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
StrVar: Annotated[str, "my string"] = "default value"
|
||||
|
||||
def test_optionnal(self):
|
||||
"""Testing first appliance level, and Field types (Optionnal annotations)"""
|
||||
|
||||
# class can be created with Optionnal (and variant)
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
StrVar: Optional[str] = "default value"
|
||||
StrVar2: Optional[str]
|
||||
StrVar3: Union[None | str] = "default value"
|
||||
StrVar4: Union[None | str]
|
||||
StrVar5: None | str = "default value"
|
||||
StrVar6: None | str
|
||||
|
||||
app1 = Appliance1()
|
||||
|
||||
self.immutable_vars__test_field(app1, "StrVar", "default value", "123")
|
||||
self.immutable_vars__test_field(app1, "StrVar2", None, "123")
|
||||
self.immutable_vars__test_field(app1, "StrVar3", "default value", "123")
|
||||
self.immutable_vars__test_field(app1, "StrVar4", None, "123")
|
||||
self.immutable_vars__test_field(app1, "StrVar5", "default value", "123")
|
||||
self.immutable_vars__test_field(app1, "StrVar6", None, "123")
|
||||
|
||||
# class can be created with Optionnal (and variant), as string annotation
|
||||
class Appliance2(dm.BaseAppliance):
|
||||
StrVar: "Optional[str]" = "default value"
|
||||
StrVar2: "Optional[str]"
|
||||
StrVar3: "Union[None | str]" = "default value"
|
||||
StrVar4: "Union[None | str]"
|
||||
StrVar5: "None | str" = "default value"
|
||||
StrVar6: "None | str"
|
||||
|
||||
app2 = Appliance2()
|
||||
|
||||
self.immutable_vars__test_field(app2, "StrVar", "default value", "123")
|
||||
self.immutable_vars__test_field(app2, "StrVar2", None, "123")
|
||||
self.immutable_vars__test_field(app2, "StrVar3", "default value", "123")
|
||||
self.immutable_vars__test_field(app2, "StrVar4", None, "123")
|
||||
self.immutable_vars__test_field(app2, "StrVar5", "default value", "123")
|
||||
self.immutable_vars__test_field(app2, "StrVar6", None, "123")
|
||||
|
||||
@unittest.skip
|
||||
def test_containers__set(self):
|
||||
"""Testing first appliance level, and Field types (Set)"""
|
||||
|
||||
# class can be created with set
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: Set[int] = {1, 2}
|
||||
testVar2: Set[str] = {"a", "b"}
|
||||
testVar3: Set[float] = {0.5, 0.456, 12}
|
||||
testVar4: "Set[str]" = {"a", "c"}
|
||||
testVar5: set[str] = {"a", "b"}
|
||||
testVar6: "set[str]" = {"a", "b"}
|
||||
testVar7: set[int | str] = {1, 2, "abcd", "efg"}
|
||||
|
||||
app1 = Appliance1()
|
||||
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
|
||||
self.assertEqual(app1.testVar, {2, 1})
|
||||
self.immutable_vars__test_field(app1, "testVar2", {"a", "b"}, {"h", "c"})
|
||||
self.assertEqual(app1.testVar2, {"b", "a"})
|
||||
self.immutable_vars__test_field(app1, "testVar3", {0.5, 0.456, 12}, {0.9, 0.4156, 11})
|
||||
self.assertEqual(app1.testVar3, {0.456, 0.5, 12})
|
||||
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
|
||||
self.immutable_vars__test_field(app1, "testVar5", {"a", "b"}, {"h", "c"})
|
||||
self.immutable_vars__test_field(app1, "testVar6", {"a", "b"}, {"h", "c"})
|
||||
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {"h", "c"})
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar.add(3)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar4.add("coucou")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Set[int] = {"a"}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Set[int]" = {"a"}
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Set = {1, 2}
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Set" = {1, 2}
|
||||
|
||||
# Hacky part !
|
||||
# set() are randomly ordered, and typeguard can be configured to only check 1st element of containers.
|
||||
# we need to make sure it is properly configured, so we are expecting it to detect wrong type at any element
|
||||
# this is why we need to run the code multiple time with a new interpreter that will use a different random seed
|
||||
code = textwrap.dedent(
|
||||
r"""
|
||||
from src import dabmodel as dm
|
||||
from typing import Set
|
||||
try:
|
||||
class Yours(dm.BaseAppliance):
|
||||
My1: Set[int] = {99, 1, 3, "a", 5,6}
|
||||
except dm.InvalidFieldValue as ex:
|
||||
raise SystemExit(2)
|
||||
raise SystemExit(0)
|
||||
|
||||
"""
|
||||
)
|
||||
|
||||
for i in range(15):
|
||||
env = environ.copy()
|
||||
env["PYTHONHASHSEED"] = str(i)
|
||||
res = subprocess.run([sys.executable, "-c", code], env=env)
|
||||
self.assertEqual(res.returncode, 2)
|
||||
|
||||
@unittest.skip
|
||||
def test_containers__frozenset(self):
|
||||
"""Testing first appliance level, and Field types (FrozenSet)"""
|
||||
|
||||
# class can be created with set
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: frozenset[int] = frozenset({1, 2})
|
||||
testVar2: frozenset[str] = frozenset({"a", "b"})
|
||||
testVar3: frozenset[float] = frozenset({0.5, 0.456, 12})
|
||||
testVar4: "frozenset[str]" = frozenset({"a", "c"})
|
||||
testVar5: FrozenSet[int] = frozenset({1, 2})
|
||||
testVar6: "FrozenSet[int]" = frozenset({1, 2})
|
||||
testVar7: FrozenSet[int | str] = frozenset({1, 2, "abcd", "efg"})
|
||||
|
||||
app1 = Appliance1()
|
||||
self.immutable_vars__test_field(app1, "testVar", {1, 2}, {1, 5})
|
||||
self.assertEqual(app1.testVar, {2, 1})
|
||||
self.immutable_vars__test_field(app1, "testVar2", {"a", "b"}, {"h", "c"})
|
||||
self.assertEqual(app1.testVar2, {"b", "a"})
|
||||
self.immutable_vars__test_field(app1, "testVar3", {0.5, 0.456, 12}, {0.9, 0.4156, 11})
|
||||
self.assertEqual(app1.testVar3, {0.456, 0.5, 12})
|
||||
self.immutable_vars__test_field(app1, "testVar4", {"a", "c"}, {"h", "e"})
|
||||
self.immutable_vars__test_field(app1, "testVar5", {1, 2}, {1, 5})
|
||||
self.immutable_vars__test_field(app1, "testVar6", {1, 2}, {1, 5})
|
||||
self.immutable_vars__test_field(app1, "testVar7", {1, 2, "abcd", "efg"}, {1, 5})
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar.add(3)
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: FrozenSet[int] = {"a"}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "FrozenSet[int]" = {"a"}
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: FrozenSet = {1, 2}
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "FrozenSet" = {1, 2}
|
||||
|
||||
# Hacky part !
|
||||
# set() are randomly ordered, and typeguard can be configured to only check 1st element of containers.
|
||||
# we need to make sure it is properly configured, so we are expecting it to detect wrong type at any element
|
||||
# this is why we need to run the code multiple time with a new interpreter that will use a different random seed
|
||||
code = textwrap.dedent(
|
||||
r"""
|
||||
from src import dabmodel as dm
|
||||
from typing import FrozenSet
|
||||
try:
|
||||
class Yours(dm.BaseAppliance):
|
||||
My1: FrozenSet[int] = frozenset({99, 1, 3, "a", 5,6})
|
||||
except dm.InvalidFieldValue as ex:
|
||||
raise SystemExit(2)
|
||||
raise SystemExit(0)
|
||||
|
||||
"""
|
||||
)
|
||||
|
||||
for i in range(15):
|
||||
env = environ.copy()
|
||||
env["PYTHONHASHSEED"] = str(i)
|
||||
res = subprocess.run([sys.executable, "-c", code], env=env)
|
||||
self.assertEqual(res.returncode, 2)
|
||||
|
||||
def test_containers__list(self):
|
||||
"""Testing first appliance level, and Field types (List)"""
|
||||
|
||||
# class can be created with list
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: List[int] = [1, 2]
|
||||
testVar2: List[str] = ["a", "b"]
|
||||
testVar3: List[float] = [0.5, 0.456, 12]
|
||||
testVar4: "List[str]" = ["a", "c"]
|
||||
testVar5: list[str] = ["a", "b"]
|
||||
testVar6: "list[str]" = ["a", "b"]
|
||||
testVar7: List[Union[int, str]] = [1, 2, 3, "one", "two", "three"]
|
||||
|
||||
app1 = Appliance1()
|
||||
# Note: lists are converted to tuples
|
||||
self.immutable_vars__test_field(app1, "testVar", (1, 2), [1, 5])
|
||||
self.immutable_vars__test_field(app1, "testVar2", ("a", "b"), ["h", "c"])
|
||||
self.immutable_vars__test_field(app1, "testVar3", (0.5, 0.456, 12), [0.9, 0.4156, 11])
|
||||
self.immutable_vars__test_field(app1, "testVar4", ("a", "c"), ["h", "e"])
|
||||
self.immutable_vars__test_field(app1, "testVar5", ("a", "b"), ["h", "c"])
|
||||
self.immutable_vars__test_field(app1, "testVar6", ("a", "b"), ["h", "c"])
|
||||
self.immutable_vars__test_field(app1, "testVar7", (1, 2, 3, "one", "two", "three"), ["h", "c"])
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar.append(3)
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar.pop()
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar.sort()
|
||||
|
||||
with self.assertRaises(AttributeError):
|
||||
app1.testVar4.append("coucou")
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: List[int] = ["a"]
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "List[int]" = ["a"]
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: List = [1, 2]
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "List" = [1, 2]
|
||||
|
||||
def test_containers__dict(self):
|
||||
"""Testing first appliance level, and Field types (Dict)"""
|
||||
|
||||
# class can be created with dict
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: Dict[int, str] = {1: "a", 2: "b"}
|
||||
testVar2: "Dict[int, str]" = {1: "c", 99: "d"}
|
||||
|
||||
app1 = Appliance1()
|
||||
self.immutable_vars__test_field(app1, "testVar", {1: "a", 2: "b"}, {1: "", 99: "i"})
|
||||
self.immutable_vars__test_field(app1, "testVar2", {1: "c", 99: "d"}, {10: "", 50: "i"})
|
||||
|
||||
# TODO: wrap exception type
|
||||
with self.assertRaises(TypeError):
|
||||
app1.testVar[58] = "aaa"
|
||||
|
||||
# TODO: wrap exception type
|
||||
with self.assertRaises(TypeError):
|
||||
app1.testVar[1] = "ggg"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Dict[int, str] = {1: 64, 2: "b"}
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Dict[int, str]" = {1: 64, 2: "b"}
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Dict = {1: 64, 2: "b"}
|
||||
|
||||
# annotation is parsed before the library can do anything, so the exception can only be TypeError
|
||||
with self.assertRaises(TypeError):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Dict[int] = {1: 64, 2: "b"}
|
||||
|
||||
# annotation is parsed before the library can do anything, so the exception can only be TypeError
|
||||
with self.assertRaises(TypeError):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Dict[int]" = {1: 64, 2: "b"}
|
||||
|
||||
def test_containers__tuple(self):
|
||||
"""Testing first appliance level, and Field types (Tuple)"""
|
||||
|
||||
# class can be created with list
|
||||
class Appliance1(dm.BaseAppliance):
|
||||
testVar: Tuple[int, ...] = (1, 2)
|
||||
testVar2: Tuple[str, ...] = ("a", "b")
|
||||
testVar3: Tuple[float, ...] = (0.5, 0.456, 12)
|
||||
testVar4: "Tuple[str,...]" = ("a", "c")
|
||||
testVar5: tuple[str, ...] = ("a", "b")
|
||||
testVar6: "tuple[str,...]" = ("a", "b")
|
||||
# testVar7: Tuple[Union[int, str]] = (1, 2, 3, "one", "two", "three")
|
||||
|
||||
app1 = Appliance1()
|
||||
|
||||
self.immutable_vars__test_field(app1, "testVar", (1, 2), (1, 5))
|
||||
self.immutable_vars__test_field(app1, "testVar2", ("a", "b"), ("h", "c"))
|
||||
self.immutable_vars__test_field(app1, "testVar3", (0.5, 0.456, 12), (0.9, 0.4156, 11))
|
||||
self.immutable_vars__test_field(app1, "testVar4", ("a", "c"), ("h", "e"))
|
||||
self.immutable_vars__test_field(app1, "testVar5", ("a", "b"), ("h", "c"))
|
||||
self.immutable_vars__test_field(app1, "testVar6", ("a", "b"), ("h", "c"))
|
||||
# self.immutable_vars__test_field(app1, "testVar7", (1, 2, 3, "one", "two", "three"), ("h", "c"))
|
||||
|
||||
# must work
|
||||
sorted(app1.testVar)
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Tuple[int] = "a"
|
||||
|
||||
with self.assertRaises(dm.InvalidFieldValue):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Tuple[int]" = "a"
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: Tuple = (1, 2)
|
||||
|
||||
with self.assertRaises(dm.IncompletelyAnnotatedField):
|
||||
|
||||
class _(dm.BaseAppliance):
|
||||
_: "Tuple" = (1, 2)
|
||||
|
||||
|
||||
class MyAppliance(dabmodel.BaseAppliance):
|
||||
app_specifi_integer_arg: Annotated[StrictInt | None, dabmodel.DABField(42)]
|
||||
# ---------- main ----------
|
||||
|
||||
class MyFeature(dabmodel.BaseFeature):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 1")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa666"
|
||||
values["template_short_name"] = "my-feature-1"
|
||||
values["template_long_name"] = "My feature template 1 !!"
|
||||
values["template_description"] = """A very nice FEature 1"""
|
||||
|
||||
class MyFeature2(dabmodel.BaseFeature):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 2")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa666"
|
||||
values["template_short_name"] = "my-feature-2"
|
||||
values["template_long_name"] = "My feature template 2 !!"
|
||||
values["template_description"] = """A very nice FEature 2"""
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 1")
|
||||
print(f"!!!! {values['rootfs_size']}")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b64-ec439f1faae8"
|
||||
values["template_short_name"] = "my-app- tem 1"
|
||||
values["template_long_name"] = "My appliance template 1 !!"
|
||||
values["template_description"] = """A very nice Appliance 1"""
|
||||
values["ram_size"] = 1024
|
||||
cls.add_feature(cls.MyFeature())
|
||||
cls.add_feature(cls.MyFeature2())
|
||||
|
||||
|
||||
class MyAppliance2(MyAppliance):
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 2")
|
||||
print(f"!!!! {values['template_id']}")
|
||||
values["template_id"] = "421d61cb-e664-46d8-9b64-ec439f1fafff"
|
||||
values["template_short_name"] = "my-app- tem 2"
|
||||
values["template_long_name"] = "My appliance template 2 !!"
|
||||
values["template_description"] = """A very nice Appliance 2"""
|
||||
cls.del_feature(MyAppliance.MyFeature)
|
||||
# values["features"]["MyFeature2"].template_description = """Override feature desc"""
|
||||
|
||||
|
||||
class MyAppliance3(dabmodel.BaseAppliance):
|
||||
|
||||
class MyFeature6(MyAppliance.MyFeature):
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
test_integer: Annotated[int, dabmodel.DABField(200, ge=0)]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 1 (modified)")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b77-ec439f1fa778"
|
||||
values["template_short_name"] = "my-feature-1-bis"
|
||||
values["test_integer"] = 666
|
||||
|
||||
class MyFeature7(dabmodel.BaseFeature):
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
test_integer_2: Annotated[int, dabmodel.DABField(759, ge=0)]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 7")
|
||||
values["template_id"] = "421d61cb-e364-46d8-ac55-ec439f1fa778"
|
||||
values["template_short_name"] = "my-feature-7"
|
||||
values["template_long_name"] = "My appliance template 7 !!"
|
||||
values["template_description"] = """A very nice Appliance 7"""
|
||||
values["test_integer_2"] = 3844
|
||||
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 3")
|
||||
values["template_id"] = "421d61cb-e364-46d8-9b64-ec439f1faaaa"
|
||||
values["template_short_name"] = "my-app- tem 3"
|
||||
values["template_long_name"] = "My appliance template 3 !!"
|
||||
values["template_description"] = """A very nice Appliance 3"""
|
||||
values["ram_size"] = 3076
|
||||
print("CREATE FEATURE")
|
||||
cls.add_feature(cls.MyFeature6())
|
||||
cls.add_feature(cls.MyFeature7())
|
||||
print("!!! CONFIG Appliance 3 DONE")
|
||||
|
||||
|
||||
class MyAppliance4(MyAppliance):
|
||||
|
||||
class MyFeature8(dabmodel.BaseFeature):
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case (nested feature)
|
||||
# test_integer_10: Annotated[int, dabmodel.DABField(3189, ge=0, toto="tata")] # error case (extra field)
|
||||
test_integer_10: Annotated[int, dabmodel.DABField(3189, ge=0, toto="tata")]
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Feature 8")
|
||||
values["template_id"] = "421d61cb-e364-46d8-ac55-ec4398888778"
|
||||
values["template_short_name"] = "my-feature-8"
|
||||
values["template_long_name"] = "My appliance template 8 !!"
|
||||
values["template_description"] = """A very nice Appliance 8"""
|
||||
values["test_integer_10"] = 951753
|
||||
# values["tete"] = 1 # error case (extra field in feature)
|
||||
|
||||
# testtt: Annotated[MyAppliance.MyFeature, Field(MyAppliance.MyFeature())] # error case (feature not in features[] list)
|
||||
|
||||
@dabmodel.default_values_override
|
||||
@classmethod
|
||||
def __override_config__(cls, values):
|
||||
print("!!! CONFIG Appliance 4")
|
||||
values["template_id"] = "421d1234-e364-46d8-9b64-ec439f1faaaa"
|
||||
values["template_short_name"] = "my-app-tem 4"
|
||||
values["template_long_name"] = "My appliance template 4 !!"
|
||||
values["template_description"] = """A very nice Appliance 4"""
|
||||
values["ram_size"] = 954
|
||||
print("CREATE FEATURE")
|
||||
cls.add_feature(cls.MyFeature8())
|
||||
print("!!! CONFIG Appliance 4 DONE")
|
||||
|
||||
|
||||
class TestModel(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
def test_version(self):
|
||||
self.assertNotEqual(dabmodel.__version__, "?.?.?")
|
||||
|
||||
def test_model(self):
|
||||
|
||||
feature1 = MyAppliance.MyFeature()
|
||||
print(feature1)
|
||||
print(MyAppliance.MyFeature)
|
||||
print(MyAppliance.MyFeature.__name__)
|
||||
print(MyAppliance.MyFeature.__class__)
|
||||
print("==")
|
||||
print(feature1.model_dump_json(indent=1))
|
||||
|
||||
app = MyAppliance(dabinst_short_name="my-app-1", app_specifi_integer_arg=123)
|
||||
app2 = MyAppliance2(dabinst_short_name="my-app-2", app_specifi_integer_arg=654)
|
||||
app3 = MyAppliance3(dabinst_short_name="my-app-3", template_description="FORCED")
|
||||
|
||||
print(app.model_dump_json(indent=1))
|
||||
print(app2.model_dump_json(indent=1))
|
||||
print(app3.model_dump_json(indent=1))
|
||||
|
||||
app3 = MyAppliance3(dabinst_short_name="my-app-3", template_description="FORCED")
|
||||
tmp_json = app3.dict()
|
||||
tmp_json["features"]["MyFeature7"]["test_integer_2"] = 123
|
||||
print(tmp_json)
|
||||
recreated_obj = MyAppliance3.model_validate_json(json.dumps(tmp_json, cls=UUIDEncoder))
|
||||
print(recreated_obj)
|
||||
print(recreated_obj.model_dump_json(indent=1))
|
||||
|
||||
app4 = MyAppliance4(dabinst_short_name="my-app-4", template_description="FORCED2")
|
||||
tmp_json = app4.dict()
|
||||
tmp_json["features"]["MyFeature"]["template_description"] = "blablabla"
|
||||
tmp_json["features"]["MyFeature2"]["template_description"] = "blablabla2"
|
||||
print(tmp_json)
|
||||
recreated_obj = MyAppliance4.model_validate_json(json.dumps(tmp_json, cls=UUIDEncoder))
|
||||
print(recreated_obj)
|
||||
print(recreated_obj.model_dump_json(indent=1))
|
||||
|
||||
# tmp_json["non-existing"] = "test" # error case
|
||||
# tmp_json["features"]["non-existing"] = "test" # error case
|
||||
# tmp_json["features"]["MyFeature"]["132"] = "test" # error case
|
||||
|
||||
recreated_obj = MyAppliance4.model_validate_json(json.dumps(tmp_json, cls=UUIDEncoder))
|
||||
|
||||
# app3.add_feature(MyAppliance.MyFeature()) # error case (add_feature not callable from instance)
|
||||
|
||||
for name in globals().keys():
|
||||
print(name)
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user