Compare commits
6 Commits
0.0.1.post
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b485fbf27 | ||
|
|
fdfda6bd1d | ||
|
|
2ce2855c61 | ||
|
|
4fe6918045 | ||
|
|
130a8c6319 | ||
|
|
9e68b35586 |
@@ -14,44 +14,63 @@ class DataValidation(AnnotationTrigger):
|
||||
self._root = value
|
||||
|
||||
def init_trigger(self) -> None:
|
||||
self._data_id = 0
|
||||
pass
|
||||
|
||||
def _get_new_id(self) -> int:
|
||||
self._data_id = self._data_id + 1
|
||||
return self._data_id
|
||||
def _new_branch(self, ctx: AnnotationWalkerCtx):
|
||||
ns = ctx.ns(self)
|
||||
if ctx.parent:
|
||||
pns = ctx.parent.ns(self)
|
||||
ns["current_branch_id"] = pns["current_branch_id"] + 1
|
||||
else:
|
||||
ns["current_branch_id"] = 1
|
||||
|
||||
ns["nb_leaf"] = 0
|
||||
ns["leaf_status"] = dict()
|
||||
ns["leak_origin_branch"] = dict()
|
||||
|
||||
def _new_leaf(self, ctx: AnnotationWalkerCtx):
|
||||
ns = ctx.ns(self)
|
||||
if ctx.parent:
|
||||
pns = ctx.parent.ns(self)
|
||||
pns["nb_leaf"] = pns["nb_leaf"] + 1
|
||||
ns["current_leaf_id"] = pns["nb_leaf"]
|
||||
ns["leak_origin_branch"][ns["current_leaf_id"]] = pns["current_branch_id"]
|
||||
else:
|
||||
ns["nb_leaf"] = 1
|
||||
ns["current_leaf_id"] = 1
|
||||
ns["leak_origin_branch"][1] = ns["current_branch_id"]
|
||||
|
||||
ns["leaf_status"][ns["current_leaf_id"]] = True
|
||||
|
||||
def process_enter(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("")
|
||||
print(f"process_enter ({id(ctx)}): {ctx.node_role} / {ctx.edge_token}")
|
||||
# print(f"process_enter ({id(ctx)}): {ctx.node_role} / {ctx.edge_token}")
|
||||
# print(f"arg_index: {ctx.arg_index}")
|
||||
ns = ctx.ns(self)
|
||||
if not ctx.parent:
|
||||
print("= ROOT NODE =")
|
||||
# print("= ROOT NODE =")
|
||||
ns["cursors"] = [self._root]
|
||||
ns["next_cursors"] = []
|
||||
ns["current_branch_id"] = self._get_new_id()
|
||||
ns["current_leaf_id"] = self._get_new_id()
|
||||
ns["leaf_status"] = dict()
|
||||
ns["leaf_status"][ns["current_leaf_id"]] = True
|
||||
ns["leak_origin_branch"] = dict()
|
||||
ns["leak_origin_branch"][ns["current_leaf_id"]] = ns["current_branch_id"]
|
||||
self._new_branch(ctx)
|
||||
self._new_leaf(ctx)
|
||||
|
||||
else:
|
||||
pns = ctx.parent.ns(self)
|
||||
ns["cursors"] = pns["next_cursors"]
|
||||
|
||||
if ctx.node_role == NodeRole.BRANCH:
|
||||
ns["current_leaf_id"] = self._get_new_id()
|
||||
ns["leaf_status"][ns["current_leaf_id"]] = True
|
||||
ns["leak_origin_branch"][ns["current_leaf_id"]] = ns["current_branch_id"]
|
||||
print(f"- NEW leaf ({ns['current_leaf_id']}), from branch ({ns['current_branch_id']})")
|
||||
self._new_leaf(ctx)
|
||||
print(
|
||||
f"- NEW leaf ({ns['current_leaf_id']}), from branch ({ns['current_branch_id']})"
|
||||
)
|
||||
|
||||
print(f"Branch ctx: < B{ns['current_branch_id'] } : L{ns['current_leaf_id'] } >")
|
||||
print(f"DATA to check: {ns['cursors'] }")
|
||||
|
||||
def process_union(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print(f"process_union ({id(ctx)})")
|
||||
ns = ctx.ns(self)
|
||||
ns["current_branch_id"] = self._get_new_id()
|
||||
self._new_branch(ctx)
|
||||
print(f"+ NEW branch ({ns['current_branch_id']})")
|
||||
|
||||
def process_list(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
@@ -79,7 +98,7 @@ class DataValidation(AnnotationTrigger):
|
||||
ns["branch_is_valid"][path_id] = ns["branch_is_valid"]
|
||||
|
||||
def process_simple(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print(f"??? process_allowed ({id(ctx)})")
|
||||
# print(f"??? process_simple ({id(ctx)})")
|
||||
ns = ctx.ns(self)
|
||||
return
|
||||
|
||||
@@ -89,7 +108,7 @@ class DataValidation(AnnotationTrigger):
|
||||
ns["candidates"].extend(remaining)
|
||||
|
||||
def process_exit(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print(f"!!! process_exit ({id(ctx)})")
|
||||
# print(f"!!! process_exit ({id(ctx)})")
|
||||
ns = ctx.ns(self)
|
||||
print("")
|
||||
|
||||
@@ -137,13 +156,17 @@ class LAMSchemaValidation(AnnotationTrigger):
|
||||
print("process_union")
|
||||
print(ctx.args)
|
||||
if (len(ctx.args) != 2) or (type(None) not in list(ctx.args)):
|
||||
raise UnsupportedFieldType("Union[] is only supported to implement Optional[] (takes 2 parameters, including None)")
|
||||
raise UnsupportedFieldType(
|
||||
"Union[] is only supported to implement Optional[] (takes 2 parameters, including None)"
|
||||
)
|
||||
return None
|
||||
|
||||
def process_dict(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_dict")
|
||||
if len(ctx.args) != 2:
|
||||
raise IncompletelyAnnotatedField(f"Dict Annotation requires 2 inner definitions: {ctx.origin}")
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"Dict Annotation requires 2 inner definitions: {ctx.origin}"
|
||||
)
|
||||
if not ctx.args[0] in ctx.allowed_types:
|
||||
raise IncompletelyAnnotatedField(f"Dict Key must be simple builtin: {ctx.origin}")
|
||||
return None
|
||||
@@ -169,8 +192,12 @@ class LAMSchemaValidation(AnnotationTrigger):
|
||||
def process_allowed(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
print("process_allowed")
|
||||
if ctx.origin is type(None) or ctx.origin is None:
|
||||
if ctx.parent is None or not (ctx.parent.origin is Union or ctx.parent.origin is UnionType):
|
||||
raise IncompletelyAnnotatedField(f"None is only accepted with Union, to implement Optional[]")
|
||||
if ctx.parent is None or not (
|
||||
ctx.parent.origin is Union or ctx.parent.origin is UnionType
|
||||
):
|
||||
raise IncompletelyAnnotatedField(
|
||||
f"None is only accepted with Union, to implement Optional[]"
|
||||
)
|
||||
return None
|
||||
|
||||
def process_exit(self, ctx: AnnotationWalkerCtx) -> None | TriggerResult:
|
||||
|
||||
Reference in New Issue
Block a user