chore: remove useless data

feature: add lz4 compressor
refactoring: make compressor configurable
feature: add CLI interface and configure int in .toml
feature: add logging using loguru
This commit is contained in:
cclecle
2024-03-29 02:51:49 +00:00
parent 9f4f2bbba1
commit 0ff9f442d2
12 changed files with 655 additions and 140 deletions

View File

@@ -38,7 +38,8 @@ dependencies = [
'webdavclient3==1.*',
'pydantic==2.*',
'typed-argument-parser==1.*',
'loguru==0.7.*'
'loguru==0.7.*',
'lz4'
]
dynamic = ["version"]
@@ -57,6 +58,18 @@ where = ["src"]
module = "webdav3.client"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.exceptions"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4.frame"
ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
branch = true
@@ -78,13 +91,13 @@ Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/
Tracker = "https://chacha.ddns.net/gitea/chacha/dabdatasync/issues"
[project.optional-dependencies]
test = ["chacha_cicd_helper"]
test = ["chacha_cicd_helper","contexttimer"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]
# my-script = "my_package.module:function"
[project.scripts]
dabdatasync = "dabdatasync.__main__:CLI"

View File

@@ -11,6 +11,18 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .datasync import I_DataSync, DataSync_Factory
from .datasync import A_DataSync, DataSync_Factory
from .datasync_nextcloud import C_DataSync_NextCloud
from .exceptions import DataSyncException, DataSyncException_InvalidManifest
from .exceptions import (
DataSyncException,
DataSyncException_InvalidManifest,
DataSyncException_NoConcreteRecordClassFound,
DataSyncException_RemoteDataNotFound,
DataSyncException_NoValidServiceFound,
DataSyncException_ServiceNotFound,
DataSyncException_TooManyServiceFound,
DataSyncException_CompressorNotFound,
DataSyncException_TooManyCompressorFound,
)
from .compressors import DataSync_Compressors

View File

@@ -1,9 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pyGameCFG(c) by chacha
# dabdatasync (c) by chacha
#
# pyGameCFG is licensed under a
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
@@ -11,28 +11,52 @@
"""CLI interface module"""
from __future__ import annotations
from typing import Literal, cast, Union
from typing import cast, Union, Optional
import sys
from tap import Tap
from . import __Summuary__, __Name__
from . import datasync
from . import exceptions
class dabdatasync_args_PullData(Tap):
"""PullData CLI arg subparser"""
class dabdatasync_args_PushData(Tap):
"""PushData CLI arg subparser"""
class dabdatasync_args_GetServices(Tap):
"""GetServices CLI arg subparser"""
class dabdatasync_args_WipeLocalData(Tap):
"""WipeLocalData CLI arg subparser"""
class dabdatasync_args_WipeRemoteData(Tap):
class dabdatasync_args_service_abstract(Tap):
"""service abstract CLI arg subparser"""
service: Optional[str] = None
def configure(self) -> None:
self.add_argument("--service")
class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract):
"""service compressor abstract CLI arg subparser"""
compressor: Optional[str] = None
def configure(self) -> None:
super().configure()
self.add_argument("--compressor")
class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract):
"""PullData CLI arg subparser"""
class dabdatasync_args_PushData(dabdatasync_args_service_compress_abstract):
"""PushData CLI arg subparser"""
class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract):
"""WipeRemoteData CLI arg subparser"""
@@ -45,17 +69,18 @@ class dabdatasync_args(Tap):
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_subparsers(dest="command", help="command type", required=True)
self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list")
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data")
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe service data")
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe remote service data")
def process_args(self) -> None:
"""dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy"""
self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init
def fct_main(i_args: list[str]) -> None:
def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,too-complex
"""CLI main function"""
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
@@ -64,18 +89,56 @@ def fct_main(i_args: list[str]) -> None:
if args.verbosity:
pass
datasync = datasync.DataSync_Factory.get_DataSync()
dabdatasync = datasync.DataSync_Factory.get_DataSync()
if len(dabdatasync) == 0:
raise exceptions.DataSyncException_NoValidServiceFound("No valid service found")
if args.command == "GetServices":
for service in dabdatasync:
print(service.service_name)
return
if args.command == "WipeLocalData":
dabdatasync[0].wipe_local_data()
return
selected_dabdatasync: datasync.A_DataSync
if args.command in ["PullData", "PushData", "WipeRemoteData"]:
requested_service = cast(dabdatasync_args_service_abstract, args).service # pylint: disable=no-member
if requested_service:
services = [_ for _ in dabdatasync if type(_).service_name == requested_service]
if len(services) == 0:
raise exceptions.DataSyncException_ServiceNotFound()
if len(services) == 1:
selected_dabdatasync = services[0]
else:
raise exceptions.DataSyncException_TooManyServiceFound()
else:
selected_dabdatasync = dabdatasync[0]
if args.command in ["PullData", "PushData"]:
compressor = cast(dabdatasync_args_service_compress_abstract, args).compressor # pylint: disable=no-member
if compressor:
selected_dabdatasync.set_compressor(compressor)
if args.command == "PullData":
datasync.read_data()
elif args.command == "PushData":
datasync.write_data()
elif args.command == "WipeLocalData":
datasync.wipe_local_data()
elif args.command == "WipeRemoteData":
datasync.wipe_remote_data()
else:
raise RuntimeError("Invalid argument")
selected_dabdatasync.pull_data()
return
if args.command == "PushData":
selected_dabdatasync.push_data()
return
if args.command == "WipeRemoteData":
selected_dabdatasync.wipe_remote_data()
return
raise RuntimeError("Invalid argument")
def CLI():
"""wrapper for .toml declared script"""
fct_main(sys.argv)
if __name__ == "__main__":

View File

@@ -1,46 +1,125 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Compressor interface and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
import tarfile
from io import BytesIO
from typing import TYPE_CHECKING
import tarfile
import lz4.frame
from loguru import logger
from .exceptions import DataSyncException_CompressorNotFound, DataSyncException_TooManyCompressorFound
if TYPE_CHECKING:
from typing import IO
from loguru import logger
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
compressor_name: str = "Abtract"
suffix: str
compressor_name: str = "ABSTRACT"
@classmethod
def compress(cls, path_in: Path, file_out: IO):
"""compress method"""
logger.debug(f"compressing <{path_in}> data using <{cls.compressor_name}> to <{file_out.name}>")
cls._impl_compress(path_in, file_out)
@classmethod
@abstractmethod
def compress(self, path_in: Path, file_out: IO):
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - virtual"""
@classmethod
def uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method"""
logger.debug(f"uncompressing <{path_in}> data using <{cls.compressor_name}> to <{path_out}>")
cls._impl_uncompress(path_in, path_out)
@classmethod
@abstractmethod
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - virtual"""
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method - virtual"""
class DataSync_Compressor_targz(A_DataSync_Compressor):
class DataSync_Compressors:
"""compressers container/factory class"""
_availables: list[type[A_DataSync_Compressor]] = []
@classmethod
def register(cls, _cls: type[A_DataSync_Compressor]) -> type[A_DataSync_Compressor]:
"""register a new compressor"""
cls._availables.append(_cls)
return _cls
@classmethod
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
"""get a specific compressor"""
print([_.compressor_name for _ in cls._availables])
print(compressor_name)
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
if len(found) == 0:
raise DataSyncException_CompressorNotFound()
if len(found) == 1:
return found[0]
raise DataSyncException_TooManyCompressorFound()
@DataSync_Compressors.register
class DataSync_Compressor__tar_gz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
compressor_name: str = "targz"
suffix: str = ".tar.gz"
compressor_name: str = "tar_gz"
def compress(self, path_in: Path, file_out: IO):
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
logger.info(f"compressing using {self.compressor_name}")
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
def decompress(self, path_in: Path, path_out: Path):
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.gz concrete"""
logger.info(f"uncompressing using {self.compressor_name}")
with tarfile.open(path_in, "r") as tar:
with tarfile.open(path_in, mode="r:gz") as tar:
tar.extractall(path_out)
@DataSync_Compressors.register
class DataSync_Compressor__tar_lz4(A_DataSync_Compressor):
"""Concrete compressor class - .tar.lz4 compressor"""
suffix: str = ".tar.lz4"
compressor_name: str = "tar_lz4"
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.lz4 concrete"""
with BytesIO() as memBuff:
with tarfile.open(fileobj=memBuff, mode="w:") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
memBuff.seek(0)
file_out.write(lz4.frame.compress(memBuff.read()))
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.lz4 concrete"""
with open(path_in, "rb") as file_in, BytesIO() as memBuff:
memBuff.write(lz4.frame.decompress(file_in.read()))
memBuff.seek(0)
with tarfile.open(fileobj=memBuff, mode="r:") as tar:
tar.extractall(path_out)

View File

@@ -1,7 +0,0 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.

View File

@@ -1,3 +1,14 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Nextcloud abstract interface
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from uuid import UUID
@@ -5,47 +16,37 @@ from pathlib import Path
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import json
from typing import final, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
from loguru import logger
from .records import A_DataSync_Record, DataSync_Record_Factory
from .compressors import A_DataSync_Compressor, DataSync_Compressor_targz
from .compressors import A_DataSync_Compressor, DataSync_Compressor__tar_gz, DataSync_Compressors
from .exceptions import DataSyncException_RemoteDataNotFound
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
class I_DataSync(ABC):
class A_DataSync(ABC):
"""Abstract DataSync class"""
service_name: str = "Abtract"
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz
service_name: str = "ABSTRACT"
@classmethod
@final
def get_manifest_data(cls) -> dict[str, Any]:
"""tool method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
@final
def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None:
def try_get_instance(cls, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest)
return cls(manifest, cls_compressor)
return None
def __init__(self, manifest: dict[str, Any]) -> None:
def __init__(self, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
self.connected: bool = False
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[str, Any] = manifest
self.app_id: UUID = UUID(manifest["APP_ID"])
self.ar_datasync_record: list[A_DataSync_Record] = []
self._cls_compressor: type[A_DataSync_Compressor] = cls_compressor
self._manifest: dict[str, Any] = manifest
self._app_id: UUID = UUID(manifest["APP_ID"])
self._ar_datasync_record: list[A_DataSync_Record] = []
if "FSSYNC_RECORD" in manifest["Args"]:
for record in manifest["Args"]["FSSYNC_RECORD"]["value"]:
record = DataSync_Record_Factory.get_C_DataSync_Record(
@@ -54,7 +55,15 @@ class I_DataSync(ABC):
record["value"]["value"]["value"],
)
assert isinstance(record, A_DataSync_Record)
self.ar_datasync_record.append(record)
self._ar_datasync_record.append(record)
def set_compressor(self, compressor_name: str) -> None:
"""set compressor to be used"""
self._cls_compressor = DataSync_Compressors.get(compressor_name)
def get_datasync_records(self) -> list[A_DataSync_Record]:
"""get list of records"""
return self._ar_datasync_record
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
@@ -73,10 +82,10 @@ class I_DataSync(ABC):
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
logger.info(f"connection to service {self.service_name}")
logger.info(f"connection to service <{self.service_name}>")
self._impl_connect()
self.connected = True
logger.info(f"connection done")
logger.info("connection done")
@abstractmethod
def _impl_connect(self) -> None:
@@ -84,17 +93,17 @@ class I_DataSync(ABC):
def pull_data(self) -> None:
"""pull data from the service"""
logger.info(f"pulling data from service {self.service_name}")
logger.info(f"pulling data from service <{self.service_name}>")
self.connect()
with TemporaryDirectory() as tmpdir:
for datasync_record in self.ar_datasync_record:
logger.info(f"pulling record {datasync_record.name}")
for datasync_record in self._ar_datasync_record:
logger.info(f"pulling record <{datasync_record.name}>")
try:
self._impl_pull_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir))
datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix))
self._impl_pull_data(Path(datasync_record.name + self._cls_compressor.suffix), Path(tmpdir))
datasync_record.uncompress(self._cls_compressor, Path(tmpdir) / (datasync_record.name + self._cls_compressor.suffix))
except DataSyncException_RemoteDataNotFound:
logger.warning(f"remote record file not found {datasync_record.name}")
logger.info(f"done")
logger.warning(f"remote record file not found <{datasync_record.name}>")
logger.info("done")
@abstractmethod
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
@@ -102,21 +111,21 @@ class I_DataSync(ABC):
def push_data(self) -> None:
"""push data to the service"""
logger.info(f"pushing data to service {self.service_name}")
logger.info(f"pushing data to service <{self.service_name}>")
self.connect()
self._impl_wipe_remote_data()
for datasync_record in self.ar_datasync_record:
logger.info(f"pushing record {datasync_record.name}")
for datasync_record in self._ar_datasync_record:
logger.info(f"pushing record <{datasync_record.name}>")
try:
with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self.compressor, tmp_file)
with NamedTemporaryFile("wb", suffix=self._cls_compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self._cls_compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
finally:
os.unlink(tmp_file.name)
logger.info(f"done")
logger.info("done")
@abstractmethod
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
@@ -124,14 +133,14 @@ class I_DataSync(ABC):
def wipe_remote_data(self) -> None:
"""wipe data on the service"""
logger.info(f"wiping remote data on service {self.service_name}")
logger.info(f"wiping remote data on service <{self.service_name}>")
self.connect()
self._impl_wipe_remote_data()
def wipe_local_data(self) -> None:
"""wipe local data"""
logger.info(f"wiping local data")
for datasync_record in self.ar_datasync_record:
logger.info("wiping local data")
for datasync_record in self._ar_datasync_record:
datasync_record.wipe()
@abstractmethod
@@ -142,21 +151,29 @@ class I_DataSync(ABC):
class DataSync_Factory:
"""DataSync Factory"""
ar_cls_DataSync: set[type[I_DataSync]] = set()
ar_cls_DataSync: set[type[A_DataSync]] = set()
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor__tar_gz
@classmethod
def get_DataSync(cls) -> list[I_DataSync]:
def get_manifest_data(cls) -> dict[str, Any]:
"""tool method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
def get_DataSync(cls) -> list[A_DataSync]:
"""get and configure a DataSync Concrete class instance"""
ar_datasync: list[I_DataSync] = []
manifest = I_DataSync.get_manifest_data()
ar_datasync: list[A_DataSync] = []
manifest = cls.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest):
if res := cls_DataSync.try_get_instance(manifest, cls.cls_compressor):
res.configure()
ar_datasync.append(res)
return ar_datasync
@classmethod
def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]:
def register(cls, _cls: type[A_DataSync]) -> type[A_DataSync]:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls

View File

@@ -1,27 +1,40 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Nextcloud datasync implementation
"""
from __future__ import annotations
from pathlib import Path
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, IO
from webdav3.client import Client as webdav3_Client
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
from .datasync import I_DataSync, DataSync_Factory
from .datasync import A_DataSync, DataSync_Factory
from .compressors import A_DataSync_Compressor
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
from .utils import urljoin
if TYPE_CHECKING:
from typing import Any, IO
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
class C_DataSync_NextCloud(A_DataSync):
"""Concrete DataSync class - Nextcloud"""
service_name: str = "Nextcloud"
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
super().__init__(manifest, cls_compressor)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
@@ -41,20 +54,20 @@ class C_DataSync_NextCloud(I_DataSync):
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
if "FSSync_NextCloud_Address" in self._manifest["Args"]:
self.nextcloud_address = self._manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self.manifest["Args"]:
self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"]
if "FSSync_NextCloud_User" in self._manifest["Args"]:
self.nextcloud_user = self._manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self.manifest["Args"]:
self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"]
if "FSSync_NextCloud_Password" in self._manifest["Args"]:
self.nextcloud_password = self._manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self.manifest["Args"]:
self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace(
if "FSSync_NextCloud_Path" in self._manifest["Args"]:
self.nextcloud_path = str(Path(self._manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self._app_id))).replace(
os.sep, "/"
)
@@ -83,8 +96,8 @@ class C_DataSync_NextCloud(I_DataSync):
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
except webdav3_RemoteResourceNotFound:
raise DataSyncException_RemoteDataNotFound(webdav3_RemoteResourceNotFound)
except webdav3_RemoteResourceNotFound as exc:
raise DataSyncException_RemoteDataNotFound from exc
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the remote service - Nextcloud concrete implementation"""

View File

@@ -1,3 +1,16 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Exception declaration
"""
class DataSyncException(Exception):
"""generic datasync exception class"""
@@ -12,3 +25,23 @@ class DataSyncException_NoConcreteRecordClassFound(DataSyncException):
class DataSyncException_RemoteDataNotFound(DataSyncException):
"""specific datasync exception class - Remote Data Not Found"""
class DataSyncException_NoValidServiceFound(DataSyncException):
"""specific datasync exception class - Remote Valid Service Found"""
class DataSyncException_ServiceNotFound(DataSyncException):
"""specific datasync exception class - Service Not Found"""
class DataSyncException_TooManyServiceFound(DataSyncException):
"""specific datasync exception class - Too Many Service Found"""
class DataSyncException_CompressorNotFound(DataSyncException):
"""specific datasync exception class - Compressor Not Found"""
class DataSyncException_TooManyCompressorFound(DataSyncException):
"""specific datasync exception class - Too Many Compressor Found"""

View File

@@ -1,3 +1,14 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
datasync records description and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
@@ -6,14 +17,15 @@ import shutil
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from typing import IO
from pydantic import BaseModel
from .compressors import A_DataSync_Compressor
from .exceptions import DataSyncException_NoConcreteRecordClassFound
if TYPE_CHECKING:
from typing import IO
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
@@ -23,12 +35,12 @@ class A_DataSync_Record(BaseModel, ABC):
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - virtual"""
@abstractmethod
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""decompress the DataSync record - virtual"""
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - virtual"""
@abstractmethod
def wipe(self):
@@ -65,18 +77,18 @@ class C_DataSync_Record_FS(A_DataSync_Record):
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
compressor.compress(self.path, file_out)
cls_compressor.compress(self.path, file_out)
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""decompress the DataSync record - concrete FS implementation"""
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
self.wipe()
compressor.decompress(path_in, self.path.parent)
cls_compressor.uncompress(path_in, self.path.parent)
def wipe(self):
if TYPE_CHECKING:

View File

@@ -1,3 +1,16 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
tools classes / functions
"""
def urljoin(*args: str) -> str:
"""
Joins given arguments into an url. Trailing but not leading slashes are

View File

@@ -10,12 +10,16 @@ import unittest
from os import chdir, path as os_path
from pathlib import Path
import pprint
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import shutil
from contexttimer import Timer
print(__name__)
print(__package__)
from src import dabdatasync
from src.dabdatasync.__main__ import fct_main
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
@@ -36,22 +40,275 @@ class TestDabDataSync(unittest.TestCase):
def test_version(self):
self.assertNotEqual(dabdatasync.__version__, "?.?.?")
def test_load_nextcloud(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud.json"
def test_cli_help(self):
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(SystemExit):
fct_main(["-h"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
def test_cli_GetServices(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["GetServices"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
self.assertTrue("Nextcloud" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_noservice(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_WipeLocalData(self):
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["WipeLocalData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
def test_cli_simple(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData"])
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_defect_cli_select_wrong_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound):
fct_main(["PullData", "--service", "WRONGSERVICE"])
def test_cli_select_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData", "--service", "Nextcloud"])
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData", "--service", "Nextcloud"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def load_nextcloud_gen(self, compressor):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
datasync[0].set_compressor(compressor)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 1)
self.assertIsInstance(datasync[0], dabdatasync.I_DataSync)
self.assertIsInstance(datasync[0], dabdatasync.A_DataSync)
self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud)
self.assertEqual(len(datasync[0].ar_datasync_record), 2)
self.assertEqual(datasync[0].ar_datasync_record[0].name, "SOTF_map")
self.assertEqual(datasync[0].ar_datasync_record[1].name, "SOTF_map2")
self.assertEqual(datasync[0].ar_datasync_record[0].rec_type, "fs")
self.assertEqual(datasync[0].ar_datasync_record[1].rec_type, "fs")
self.assertEqual(datasync[0].ar_datasync_record[0].value, "test/test_data")
self.assertEqual(datasync[0].ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt")
self.assertEqual(len(datasync[0].get_datasync_records()), 2)
self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map")
self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2")
self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data")
self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
@@ -145,24 +402,34 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_load_nextcloud__tar_gz(self):
with Timer() as t:
self.load_nextcloud_gen("tar_gz")
print(t.elapsed)
def test_load_nextcloud__tar_lz4(self):
with Timer() as t:
self.load_nextcloud_gen("tar_lz4")
print(t.elapsed)
def test_load_empty(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_nextcloud_disabled(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_invalid(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_invalid.json"
def test_defect_load_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()
def test_load_nextcloud_invalid(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
def test_defect_load_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()