diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs index 99f26c0..22d8696 100644 --- a/.settings/org.eclipse.core.resources.prefs +++ b/.settings/org.eclipse.core.resources.prefs @@ -1,2 +1,3 @@ eclipse.preferences.version=1 +encoding//src/dabdatasync/__main__.py=utf-8 encoding/=UTF-8 diff --git a/pyproject.toml b/pyproject.toml index 689229d..e5899cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,8 +35,11 @@ classifiers = [ dependencies = [ 'importlib-metadata; python_version<"3.9"', 'packaging', - 'webdavclient3', - 'pydantic' + 'webdavclient3==3.14.*', + 'pydantic==2.*', + 'typed-argument-parser==1.*', + 'loguru==0.7.*', + 'lz4' ] dynamic = ["version"] @@ -55,6 +58,18 @@ where = ["src"] module = "webdav3.client" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "webdav3.exceptions" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "lz4" +ignore_missing_imports = true + +[[tool.mypy.overrides]] +module = "lz4.frame" +ignore_missing_imports = true + [tool.coverage.run] cover_pylib = false branch = true @@ -76,13 +91,13 @@ Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/ Tracker = "https://chacha.ddns.net/gitea/chacha/dabdatasync/issues" [project.optional-dependencies] -test = ["chacha_cicd_helper"] +test = ["chacha_cicd_helper","contexttimer"] coverage-check = ["chacha_cicd_helper"] complexity-check = ["chacha_cicd_helper"] quality-check = ["chacha_cicd_helper"] type-check = ["chacha_cicd_helper"] doc-gen = ["chacha_cicd_helper"] -# [project.scripts] -# my-script = "my_package.module:function" +[project.scripts] +dabdatasync = "dabdatasync.__main__:CLI" diff --git a/src/dabdatasync/__init__.py b/src/dabdatasync/__init__.py index 395cd8e..c273a16 100644 --- a/src/dabdatasync/__init__.py +++ b/src/dabdatasync/__init__.py @@ -11,5 +11,18 @@ Main module __init__ file. """ from .__metadata__ import __version__, __Summuary__, __Name__ -from .datasync import I_DataSync, DataSync_Factory, C_DataSync_NextCloud -from .datasync import DataSyncException, DataSyncException_InvalidManifest +from .datasync import A_DataSync, DataSync_Factory +from .datasync_nextcloud import C_DataSync_NextCloud +from .exceptions import ( + DataSyncException, + DataSyncException_InvalidManifest, + DataSyncException_NoConcreteRecordClassFound, + DataSyncException_RemoteDataNotFound, + DataSyncException_NoValidServiceFound, + DataSyncException_ServiceNotFound, + DataSyncException_TooManyServiceFound, + DataSyncException_CompressorNotFound, + DataSyncException_TooManyCompressorFound, +) + +from .compressors import DataSync_Compressors diff --git a/src/dabdatasync/__main__.py b/src/dabdatasync/__main__.py new file mode 100644 index 0000000..4ab0b31 --- /dev/null +++ b/src/dabdatasync/__main__.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +"""CLI interface module""" +from __future__ import annotations +from typing import cast, Union, Optional + +import sys +from tap import Tap +from loguru import logger + +from . import __Summuary__, __Name__ +from . import datasync +from . import exceptions + + +class dabdatasync_args_GetServices(Tap): + """GetServices CLI arg subparser""" + + +class dabdatasync_args_WipeLocalData(Tap): + """WipeLocalData CLI arg subparser""" + + +class dabdatasync_args_service_abstract(Tap): + """service abstract CLI arg subparser""" + + service: Optional[str] = None + + def configure(self) -> None: + self.add_argument("--service") + + +class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract): + """service compressor abstract CLI arg subparser""" + + compressor: Optional[str] = None + + def configure(self) -> None: + super().configure() + self.add_argument("--compressor") + + +class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract): + """PullData CLI arg subparser""" + + +class dabdatasync_args_PushData(dabdatasync_args_service_compress_abstract): + """PushData CLI arg subparser""" + + +class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract): + """WipeRemoteData CLI arg subparser""" + + +class dabdatasync_args(Tap): + """Main CLI arg parser""" + + verbosity: int = 0 + + def configure(self) -> None: + self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity") + + self.add_subparsers(dest="command", help="command type", required=True) + self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list") + self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service") + self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service") + self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data") + self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe remote service data") + + def process_args(self) -> None: + """dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy""" + self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init + + +def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,too-complex + """CLI main function""" + parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__) + + args: dabdatasync_args = parser.parse_args(i_args) + + logger.remove() + if args.verbosity: + if args.verbosity == 1: + logger.add(sys.stdout, level="WARNING") + elif args.verbosity == 2: + logger.add(sys.stdout, level="INFO") + else: + logger.add(sys.stdout, level="DEBUG") + else: + logger.add(sys.stdout, level="ERROR") + logger.add(sys.stderr, level="ERROR") + + dabdatasync = datasync.DataSync_Factory.get_DataSync() + if len(dabdatasync) == 0: + raise exceptions.DataSyncException_NoValidServiceFound("No valid service found") + + if args.command == "GetServices": + for service in dabdatasync: + print(service.service_name) + return + + if args.command == "WipeLocalData": + dabdatasync[0].wipe_local_data() + return + + selected_dabdatasync: datasync.A_DataSync + if args.command in ["PullData", "PushData", "WipeRemoteData"]: + requested_service = cast(dabdatasync_args_service_abstract, args).service # pylint: disable=no-member + if requested_service: + services = [_ for _ in dabdatasync if type(_).service_name == requested_service] + if len(services) == 0: + raise exceptions.DataSyncException_ServiceNotFound() + if len(services) == 1: + selected_dabdatasync = services[0] + else: + raise exceptions.DataSyncException_TooManyServiceFound() + else: + selected_dabdatasync = dabdatasync[0] + + if args.command in ["PullData", "PushData"]: + compressor = cast(dabdatasync_args_service_compress_abstract, args).compressor # pylint: disable=no-member + if compressor: + selected_dabdatasync.set_compressor(compressor) + + if args.command == "PullData": + selected_dabdatasync.pull_data() + return + + if args.command == "PushData": + selected_dabdatasync.push_data() + return + + if args.command == "WipeRemoteData": + selected_dabdatasync.wipe_remote_data() + return + + raise RuntimeError("Invalid argument") + + +def CLI(): + """wrapper for .toml declared script""" + fct_main(sys.argv) + + +if __name__ == "__main__": + fct_main(sys.argv[1:]) diff --git a/src/dabdatasync/compressors.py b/src/dabdatasync/compressors.py new file mode 100644 index 0000000..a6293f9 --- /dev/null +++ b/src/dabdatasync/compressors.py @@ -0,0 +1,123 @@ +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +""" +Compressor interface and implementation +""" + +from __future__ import annotations +from abc import ABC, abstractmethod +from pathlib import Path +import os +from io import BytesIO +from typing import TYPE_CHECKING +import tarfile +import lz4.frame +from loguru import logger + +from .exceptions import DataSyncException_CompressorNotFound, DataSyncException_TooManyCompressorFound + +if TYPE_CHECKING: + from typing import IO + + +class A_DataSync_Compressor(ABC): + """abstract compressor class""" + + suffix: str + compressor_name: str = "ABSTRACT" + + @classmethod + def compress(cls, path_in: Path, file_out: IO): + """compress method""" + logger.debug(f"compressing <{path_in}> data using <{cls.compressor_name}> to <{file_out.name}>") + cls._impl_compress(path_in, file_out) + + @classmethod + @abstractmethod + def _impl_compress(cls, path_in: Path, file_out: IO): + """compress method - virtual""" + + @classmethod + def uncompress(cls, path_in: Path, path_out: Path): + """uncompress method""" + logger.debug(f"uncompressing <{path_in}> data using <{cls.compressor_name}> to <{path_out}>") + cls._impl_uncompress(path_in, path_out) + + @classmethod + @abstractmethod + def _impl_uncompress(cls, path_in: Path, path_out: Path): + """uncompress method - virtual""" + + +class DataSync_Compressors: + """compressers container/factory class""" + + _availables: list[type[A_DataSync_Compressor]] = [] + + @classmethod + def register(cls, _cls: type[A_DataSync_Compressor]) -> type[A_DataSync_Compressor]: + """register a new compressor""" + cls._availables.append(_cls) + return _cls + + @classmethod + def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]: + """get a specific compressor""" + found = [_ for _ in cls._availables if _.compressor_name == compressor_name] + if len(found) == 0: + raise DataSyncException_CompressorNotFound() + if len(found) == 1: + return found[0] + raise DataSyncException_TooManyCompressorFound() + + +@DataSync_Compressors.register +class DataSync_Compressor__tar_gz(A_DataSync_Compressor): + """Concrete compressor class - .tar.gz compressor""" + + suffix: str = ".tar.gz" + compressor_name: str = "tar_gz" + + @classmethod + def _impl_compress(cls, path_in: Path, file_out: IO): + """compress method - .tar.gz concrete""" + with tarfile.open(fileobj=file_out, mode="w:gz") as tar: + tar.add(path_in, arcname=os.path.basename(path_in)) + + @classmethod + def _impl_uncompress(cls, path_in: Path, path_out: Path): + """uncompressing method - .tar.gz concrete""" + with tarfile.open(path_in, mode="r:gz") as tar: + tar.extractall(path_out) + + +@DataSync_Compressors.register +class DataSync_Compressor__tar_lz4(A_DataSync_Compressor): + """Concrete compressor class - .tar.lz4 compressor""" + + suffix: str = ".tar.lz4" + compressor_name: str = "tar_lz4" + + @classmethod + def _impl_compress(cls, path_in: Path, file_out: IO): + """compress method - .tar.lz4 concrete""" + with BytesIO() as memBuff: + with tarfile.open(fileobj=memBuff, mode="w:") as tar: + tar.add(path_in, arcname=os.path.basename(path_in)) + memBuff.seek(0) + file_out.write(lz4.frame.compress(memBuff.read())) + + @classmethod + def _impl_uncompress(cls, path_in: Path, path_out: Path): + """uncompressing method - .tar.lz4 concrete""" + with open(path_in, "rb") as file_in, BytesIO() as memBuff: + memBuff.write(lz4.frame.decompress(file_in.read())) + memBuff.seek(0) + with tarfile.open(fileobj=memBuff, mode="r:") as tar: + tar.extractall(path_out) diff --git a/src/dabdatasync/data/.keep b/src/dabdatasync/data/.keep deleted file mode 100644 index e69de29..0000000 diff --git a/src/dabdatasync/datasync.py b/src/dabdatasync/datasync.py index 808507e..461b57f 100644 --- a/src/dabdatasync/datasync.py +++ b/src/dabdatasync/datasync.py @@ -1,157 +1,53 @@ -"""Main datasync class""" +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . -import json +""" +Nextcloud abstract interface +""" +from __future__ import annotations from abc import ABC, abstractmethod -from typing import final, TYPE_CHECKING, IO -from typing import Self, Any, Set, Optional from uuid import UUID from pathlib import Path import os -import tarfile from tempfile import NamedTemporaryFile, TemporaryDirectory -import shutil +import json +from typing import final, TYPE_CHECKING -from pydantic import BaseModel -from webdav3.client import Client as webdav3_Client +from loguru import logger + +from .records import A_DataSync_Record, DataSync_Record_Factory +from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors +from .exceptions import DataSyncException_RemoteDataNotFound + +if TYPE_CHECKING: + from typing import Optional, IO, Any, Self + from .compressors import A_DataSync_Compressor -class DataSyncException(Exception): - """generic datasync exception class""" - - -class DataSyncException_InvalidManifest(DataSyncException): - """specific datasync exception class - Dab appliance manifest not found""" - - -def urljoin(*args): - """ - Joins given arguments into an url. Trailing but not leading slashes are - stripped for each argument. - """ - - return "/".join(map(lambda x: str(x).rstrip("/"), args)) - - -class A_DataSync_Compressor(ABC): - """abstract compressor class""" - - suffix: str - - @abstractmethod - def compress(self, path_in: Path, file_out: IO): - """compress method - virtual""" - - @abstractmethod - def decompress(self, path_in: Path, path_out: Path): - """decompress method - virtual""" - - -class DataSync_Compressor_targz(A_DataSync_Compressor): - """Concrete compressor class - .tar.gz compressor""" - - suffix: str = ".tar.gz" - - def compress(self, path_in: Path, file_out: IO): - """compress method - .tar.gz concrete""" - with tarfile.open(fileobj=file_out, mode="w:gz") as tar: - tar.add(path_in, arcname=os.path.basename(path_in)) - - def decompress(self, path_in: Path, path_out: Path): - """decompress method - .tar.gz concrete""" - with tarfile.open(path_in, "r") as tar: - tar.extractall(path_out) - - -class A_DataSync_Record(BaseModel, ABC): - """Abstract DataSync Record class""" - - name: str - rec_type: str - value: str - - @abstractmethod - def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: - """compress the record - virtual""" - - @abstractmethod - def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: - """compress the record - virtual""" - - -class DataSync_Record_Factory: - """DataSync Record Factory""" - - ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set() - - @classmethod - def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None: - """get a concrete DataSync Record class instance""" - for cls_DataSync_Record in cls.ar_cls_DataSync_Record: - if cls_DataSync_Record.model_fields["rec_type"].default == rec_type: - return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value) - raise RuntimeError("No DataSync_Record concrete class found") - - @classmethod - def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]: - """decorator to register a concrete DataSync Record class""" - cls.ar_cls_DataSync_Record.add(_cls) - return _cls - - -@DataSync_Record_Factory.register -class C_DataSync_Record_FS(A_DataSync_Record): - """Concrete DataSync Record class - FileSystem""" - - rec_type: str = "fs" - path: Optional[Path] = None - - def model_post_init(self, __context) -> None: - self.path = Path(self.value) - - def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: - """compress the DataSync Record - concrete FS implementation""" - if TYPE_CHECKING: - assert isinstance(self.path, Path) - compressor.compress(self.path, file_out) - - def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: - """compress the record - concrete FS implementation""" - if TYPE_CHECKING: - assert isinstance(self.path, Path) - if os.path.isdir(self.path): - shutil.rmtree(self.path) - if os.path.isfile(self.path): - os.remove(self.path) - compressor.decompress(path_in, self.path.parent) - - -class I_DataSync(ABC): +class A_DataSync(ABC): """Abstract DataSync class""" - manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json" - cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz + service_name: str = "ABSTRACT" @classmethod @final - def get_manifest_data(cls) -> dict[str, Any]: - """utilitary method to get manifest""" - with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest: - return json.load(f_DAB_manifest) - - @classmethod - @final - def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None: + def try_get_instance(cls, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> Self | None: """try to get an instance of a concrete class""" if cls.test_applicable(manifest): - return cls(manifest) + return cls(manifest, cls_compressor) return None - def __init__(self, manifest: dict[str, Any]) -> None: + def __init__(self, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> None: self.connected: bool = False - self.compressor: A_DataSync_Compressor = type(self).cls_compressor() - self.manifest: dict[str, Any] = manifest - self.app_id: UUID = UUID(manifest["APP_ID"]) - self.ar_datasync_record: list[A_DataSync_Record] = [] + self._cls_compressor: type[A_DataSync_Compressor] = cls_compressor + self._manifest: dict[str, Any] = manifest + self._app_id: UUID = UUID(manifest["APP_ID"]) + self._ar_datasync_record: list[A_DataSync_Record] = [] if "FSSYNC_RECORD" in manifest["Args"]: for record in manifest["Args"]["FSSYNC_RECORD"]["value"]: record = DataSync_Record_Factory.get_C_DataSync_Record( @@ -160,7 +56,15 @@ class I_DataSync(ABC): record["value"]["value"]["value"], ) assert isinstance(record, A_DataSync_Record) - self.ar_datasync_record.append(record) + self._ar_datasync_record.append(record) + + def set_compressor(self, compressor_name: str) -> None: + """set compressor to be used""" + self._cls_compressor = DataSync_Compressors.get(compressor_name) + + def get_datasync_records(self) -> list[A_DataSync_Record]: + """get list of records""" + return self._ar_datasync_record @classmethod def test_applicable(cls, manifest: dict[str, Any]) -> bool: @@ -179,152 +83,98 @@ class I_DataSync(ABC): def connect(self) -> None: """connect to the service""" if not self.connected: + logger.info(f"connection to service <{self.service_name}>") self._impl_connect() self.connected = True + logger.info("connection done") @abstractmethod def _impl_connect(self) -> None: """connect to the service - virtual""" - def read_data(self) -> None: - """read data from the service""" + def pull_data(self) -> None: + """pull data from the service""" + logger.info(f"pulling data from service <{self.service_name}>") self.connect() with TemporaryDirectory() as tmpdir: - for datasync_record in self.ar_datasync_record: - self._impl_read_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir)) - datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix)) + for datasync_record in self._ar_datasync_record: + logger.info(f"pulling record <{datasync_record.name}>") + try: + self._impl_pull_data(Path(datasync_record.name + self._cls_compressor.suffix), Path(tmpdir)) + datasync_record.uncompress(self._cls_compressor, Path(tmpdir) / (datasync_record.name + self._cls_compressor.suffix)) + except DataSyncException_RemoteDataNotFound: + logger.warning(f"remote record file not found <{datasync_record.name}>") + logger.info("done") @abstractmethod - def _impl_read_data(self, file_in: Path, file_out: Path) -> None: - """read data from the service - virtual""" + def _impl_pull_data(self, file_in: Path, file_out: Path) -> None: + """pull data from the service - virtual""" - def write_data(self) -> None: - """write data to the service""" + def push_data(self) -> None: + """push data to the service""" + logger.info(f"pushing data to service <{self.service_name}>") self.connect() - self._impl_wipe_data() - for datasync_record in self.ar_datasync_record: + self._impl_wipe_remote_data() + for datasync_record in self._ar_datasync_record: + logger.info(f"pushing record <{datasync_record.name}>") try: - with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file: - datasync_record.compress(self.compressor, tmp_file) + with NamedTemporaryFile("wb", suffix=self._cls_compressor.suffix, delete=False) as tmp_file: + datasync_record.compress(self._cls_compressor, tmp_file) tmp_file.seek(0) tmp_file.close() - self._impl_write_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file) + self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file) finally: os.unlink(tmp_file.name) + logger.info("done") @abstractmethod - def _impl_write_data(self, record_name: str, file_in: IO) -> None: - """write data to the service - virtual""" + def _impl_push_data(self, record_name: str, file_in: IO) -> None: + """push data to the service - virtual""" - def wipe_data(self) -> None: + def wipe_remote_data(self) -> None: """wipe data on the service""" + logger.info(f"wiping remote data on service <{self.service_name}>") self.connect() - self._impl_wipe_data() + self._impl_wipe_remote_data() + + def wipe_local_data(self) -> None: + """wipe local data""" + logger.info("wiping local data") + for datasync_record in self._ar_datasync_record: + datasync_record.wipe() @abstractmethod - def _impl_wipe_data(self) -> None: + def _impl_wipe_remote_data(self) -> None: """wipe data on the service - virtual""" class DataSync_Factory: """DataSync Factory""" - ar_cls_DataSync: Set[type[I_DataSync]] = set() + ar_cls_DataSync: set[type[A_DataSync]] = set() + manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json" + cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor__tar_gz @classmethod - def get_DataSync(cls) -> I_DataSync | None: + def get_manifest_data(cls) -> dict[str, Any]: + """tool method to get manifest""" + with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest: + return json.load(f_DAB_manifest) + + @classmethod + def get_DataSync(cls) -> list[A_DataSync]: """get and configure a DataSync Concrete class instance""" - manifest = I_DataSync.get_manifest_data() + ar_datasync: list[A_DataSync] = [] + manifest = cls.get_manifest_data() for cls_DataSync in cls.ar_cls_DataSync: - if res := cls_DataSync.try_get_instance(manifest): + if res := cls_DataSync.try_get_instance(manifest, cls.cls_compressor): res.configure() - return res - return None + ar_datasync.append(res) + return ar_datasync @classmethod - def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]: + def register(cls, _cls: type[A_DataSync]) -> type[A_DataSync]: """decorator to register a concrete class to the factory""" cls.ar_cls_DataSync.add(_cls) return _cls - - -@DataSync_Factory.register -class C_DataSync_NextCloud(I_DataSync): - """Concrete DataSync class - Nextcloud""" - - def __init__(self, manifest: dict[Any, Any]) -> None: - super().__init__(manifest) - self.nextcloud_address: str - self.nextcloud_user: str - self.nextcloud_password: str - self.nextcloud_path: str - self.client: webdav3_Client - self.connected: bool = False - - @classmethod - def test_applicable(cls, manifest: dict[str, Any]) -> bool: - """check if a concrete class is applicable - Nextcloud override""" - if "Args" in manifest: - if "FSSync_NextCloud_Enabled" in manifest["Args"]: - if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True: - return True - return False - raise DataSyncException_InvalidManifest() - - def _impl_configure(self) -> None: - """configure the class instance - Nextcloud concrete implementation""" - if "FSSync_NextCloud_Address" in self.manifest["Args"]: - self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_User" in self.manifest["Args"]: - self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_Password" in self.manifest["Args"]: - self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_Path" in self.manifest["Args"]: - self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace( - os.sep, "/" - ) - - else: - raise DataSyncException_InvalidManifest() - - def _impl_connect(self) -> None: - """connect to the remote service - Nextcloud concrete implementation""" - full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user) - self.client = webdav3_Client( - {"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password} - ) - - def _check_create_dir(self) -> None: - """check and create directory in remote service""" - url_accumulator: str = "" - for url_part in self.nextcloud_path.split("/"): - url_accumulator += "/" + url_part - if not self.client.check(url_accumulator): - self.client.mkdir(url_accumulator) - - def _impl_read_data(self, file_in: Path, file_out: Path) -> None: - """read data from the remote service - Nextcloud concrete implementation""" - self._check_create_dir() - self.client.download_sync( - remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/") - ) - - def _impl_write_data(self, record_name: str, file_in: IO) -> None: - """write data to the remote service - Nextcloud concrete implementation""" - self._check_create_dir() - self.client.upload_sync( - remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"), - local_path=file_in.name, - ) - - def _impl_wipe_data(self) -> None: - """wipe data on the service - concrete implementation""" - if self.client.check(self.nextcloud_path): - self.client.clean(self.nextcloud_path) diff --git a/src/dabdatasync/datasync_nextcloud.py b/src/dabdatasync/datasync_nextcloud.py new file mode 100644 index 0000000..0f2b02d --- /dev/null +++ b/src/dabdatasync/datasync_nextcloud.py @@ -0,0 +1,113 @@ +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +""" +Nextcloud datasync implementation +""" +from __future__ import annotations +from pathlib import Path +import os +from typing import TYPE_CHECKING + + +from webdav3.client import Client as webdav3_Client +from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound + +from .datasync import A_DataSync, DataSync_Factory +from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound +from .utils import urljoin + +if TYPE_CHECKING: + from typing import Any, IO + from .compressors import A_DataSync_Compressor + + +@DataSync_Factory.register +class C_DataSync_NextCloud(A_DataSync): + """Concrete DataSync class - Nextcloud""" + + service_name: str = "Nextcloud" + + def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None: + super().__init__(manifest, cls_compressor) + self.nextcloud_address: str + self.nextcloud_user: str + self.nextcloud_password: str + self.nextcloud_path: str + self.client: webdav3_Client + self.connected: bool = False + + @classmethod + def test_applicable(cls, manifest: dict[str, Any]) -> bool: + """check if a concrete class is applicable - Nextcloud override""" + if "Args" in manifest: + if "FSSync_NextCloud_Enabled" in manifest["Args"]: + if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True: + return True + return False + raise DataSyncException_InvalidManifest() + + def _impl_configure(self) -> None: + """configure the class instance - Nextcloud concrete implementation""" + if "FSSync_NextCloud_Address" in self._manifest["Args"]: + self.nextcloud_address = self._manifest["Args"]["FSSync_NextCloud_Address"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_User" in self._manifest["Args"]: + self.nextcloud_user = self._manifest["Args"]["FSSync_NextCloud_User"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_Password" in self._manifest["Args"]: + self.nextcloud_password = self._manifest["Args"]["FSSync_NextCloud_Password"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_Path" in self._manifest["Args"]: + self.nextcloud_path = str(Path(self._manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self._app_id))).replace( + os.sep, "/" + ) + + else: + raise DataSyncException_InvalidManifest() + + def _impl_connect(self) -> None: + """connect to the remote service - Nextcloud concrete implementation""" + full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user) + self.client = webdav3_Client( + {"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password} + ) + + def _check_create_dir(self) -> None: + """check and create directory in remote service""" + url_accumulator: str = "" + for url_part in self.nextcloud_path.split("/"): + url_accumulator += "/" + url_part + if not self.client.check(url_accumulator): + self.client.mkdir(url_accumulator) + + def _impl_pull_data(self, file_in: Path, file_out: Path) -> None: + """pull data from the remote service - Nextcloud concrete implementation""" + self._check_create_dir() + try: + self.client.download_sync( + remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/") + ) + except webdav3_RemoteResourceNotFound as exc: + raise DataSyncException_RemoteDataNotFound from exc + + def _impl_push_data(self, record_name: str, file_in: IO) -> None: + """push data to the remote service - Nextcloud concrete implementation""" + self._check_create_dir() + self.client.upload_sync( + remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"), + local_path=file_in.name, + ) + + def _impl_wipe_remote_data(self) -> None: + """wipe data on the service - concrete implementation""" + if self.client.check(self.nextcloud_path): + self.client.clean(self.nextcloud_path) diff --git a/src/dabdatasync/exceptions.py b/src/dabdatasync/exceptions.py new file mode 100644 index 0000000..0e940b9 --- /dev/null +++ b/src/dabdatasync/exceptions.py @@ -0,0 +1,47 @@ +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +""" +Exception declaration +""" + + +class DataSyncException(Exception): + """generic datasync exception class""" + + +class DataSyncException_InvalidManifest(DataSyncException): + """specific datasync exception class - Dab appliance manifest not found""" + + +class DataSyncException_NoConcreteRecordClassFound(DataSyncException): + """specific datasync exception class - No concrete Record Class Found""" + + +class DataSyncException_RemoteDataNotFound(DataSyncException): + """specific datasync exception class - Remote Data Not Found""" + + +class DataSyncException_NoValidServiceFound(DataSyncException): + """specific datasync exception class - Remote Valid Service Found""" + + +class DataSyncException_ServiceNotFound(DataSyncException): + """specific datasync exception class - Service Not Found""" + + +class DataSyncException_TooManyServiceFound(DataSyncException): + """specific datasync exception class - Too Many Service Found""" + + +class DataSyncException_CompressorNotFound(DataSyncException): + """specific datasync exception class - Compressor Not Found""" + + +class DataSyncException_TooManyCompressorFound(DataSyncException): + """specific datasync exception class - Too Many Compressor Found""" diff --git a/src/dabdatasync/records.py b/src/dabdatasync/records.py new file mode 100644 index 0000000..81f30fc --- /dev/null +++ b/src/dabdatasync/records.py @@ -0,0 +1,99 @@ +# dabdatasync (c) by chacha +# +# dabdatasync is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +""" +datasync records description and implementation +""" +from __future__ import annotations +from abc import ABC, abstractmethod +from pathlib import Path +import os +import shutil + +from typing import TYPE_CHECKING, Optional + + +from pydantic import BaseModel + +from .exceptions import DataSyncException_NoConcreteRecordClassFound + +if TYPE_CHECKING: + from .compressors import A_DataSync_Compressor + from typing import IO + + +class A_DataSync_Record(BaseModel, ABC): + """Abstract DataSync Record class""" + + name: str + rec_type: str + value: str + + @abstractmethod + def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None: + """compress the DataSync Record - virtual""" + + @abstractmethod + def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None: + """uncompress the DataSync record - virtual""" + + @abstractmethod + def wipe(self): + """wipe the record local data - virtual""" + + +class DataSync_Record_Factory: + """DataSync Record Factory""" + + ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set() + + @classmethod + def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None: + """get a concrete DataSync Record class instance""" + for cls_DataSync_Record in cls.ar_cls_DataSync_Record: + if cls_DataSync_Record.model_fields["rec_type"].default == rec_type: + return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value) + raise DataSyncException_NoConcreteRecordClassFound() + + @classmethod + def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]: + """decorator to register a concrete DataSync Record class""" + cls.ar_cls_DataSync_Record.add(_cls) + return _cls + + +@DataSync_Record_Factory.register +class C_DataSync_Record_FS(A_DataSync_Record): + """Concrete DataSync Record class - FileSystem""" + + rec_type: str = "fs" + path: Optional[Path] = None + + def model_post_init(self, __context) -> None: + self.path = Path(self.value) + + def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None: + """compress the DataSync Record - concrete FS implementation""" + if TYPE_CHECKING: + assert isinstance(self.path, Path) + cls_compressor.compress(self.path, file_out) + + def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None: + """uncompress the DataSync record - concrete FS implementation""" + if TYPE_CHECKING: + assert isinstance(self.path, Path) + self.wipe() + cls_compressor.uncompress(path_in, self.path.parent) + + def wipe(self): + if TYPE_CHECKING: + assert isinstance(self.path, Path) + if os.path.isdir(self.path): + shutil.rmtree(self.path) + if os.path.isfile(self.path): + os.remove(self.path) diff --git a/src/dabdatasync/data/__init__.py b/src/dabdatasync/utils.py similarity index 53% rename from src/dabdatasync/data/__init__.py rename to src/dabdatasync/utils.py index d0d551c..b6e6192 100644 --- a/src/dabdatasync/data/__init__.py +++ b/src/dabdatasync/utils.py @@ -5,3 +5,16 @@ # # You should have received a copy of the license along with this # work. If not, see . + +""" +tools classes / functions +""" + + +def urljoin(*args: str) -> str: + """ + Joins given arguments into an url. Trailing but not leading slashes are + stripped for each argument. + """ + + return "/".join(map(lambda x: str(x).rstrip("/"), args)) diff --git a/test/test_datasync.py b/test/test_datasync.py index c031ff0..7212d23 100644 --- a/test/test_datasync.py +++ b/test/test_datasync.py @@ -7,15 +7,19 @@ # work. If not, see . import unittest -from os import chdir +from os import chdir, path as os_path from pathlib import Path import pprint +from io import StringIO +from contextlib import redirect_stdout, redirect_stderr import shutil +from contexttimer import Timer print(__name__) print(__package__) from src import dabdatasync +from src.dabdatasync.__main__ import fct_main testdir_path = Path(__file__).parent.resolve() chdir(testdir_path.parent.resolve()) @@ -36,26 +40,97 @@ class TestDabDataSync(unittest.TestCase): def test_version(self): self.assertNotEqual(dabdatasync.__version__, "?.?.?") - def test_load_nextcloud(self): - dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud.json" - datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsInstance(datasync, dabdatasync.I_DataSync) - self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud) + def test_cli_help(self): + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + with self.assertRaises(SystemExit): + fct_main(["-h"]) + print(capted_stdout.getvalue()) + print(capted_stderr.getvalue()) - self.assertEqual(len(datasync.ar_datasync_record), 2) - self.assertEqual(datasync.ar_datasync_record[0].name, "SOTF_map") - self.assertEqual(datasync.ar_datasync_record[1].name, "SOTF_map2") - self.assertEqual(datasync.ar_datasync_record[0].rec_type, "fs") - self.assertEqual(datasync.ar_datasync_record[1].rec_type, "fs") - self.assertEqual(datasync.ar_datasync_record[0].value, "test/test_data") - self.assertEqual(datasync.ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt") + def test_cli_verbosity(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" + fct_main(["WipeRemoteData"]) + + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + fct_main(["PullData"]) + self.assertEqual(capted_stdout.getvalue(), "") + self.assertEqual(capted_stderr.getvalue(), "") + + fct_main(["WipeLocalData"]) + + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + fct_main(["-v", "PullData"]) + self.assertTrue("WARNING" in capted_stdout.getvalue()) + self.assertFalse("INFO" in capted_stdout.getvalue()) + self.assertEqual(capted_stderr.getvalue(), "") + + fct_main(["WipeLocalData"]) + + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + fct_main(["-vv", "PullData"]) + self.assertTrue("INFO" in capted_stdout.getvalue()) + self.assertTrue("WARNING" in capted_stdout.getvalue()) + self.assertEqual(capted_stderr.getvalue(), "") + + def test_cli_GetServices(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + fct_main(["GetServices"]) + print(capted_stdout.getvalue()) + print(capted_stderr.getvalue()) + self.assertTrue("Nextcloud" in capted_stdout.getvalue()) + self.assertEqual(capted_stderr.getvalue(), "") + + def test_cli_GetServices_noservice(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound): + fct_main(["GetServices"]) + self.assertEqual(capted_stdout.getvalue(), "") + self.assertEqual(capted_stderr.getvalue(), "") + + def test_cli_GetServices_invalid(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json" + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): + fct_main(["GetServices"]) + self.assertEqual(capted_stdout.getvalue(), "") + self.assertEqual(capted_stderr.getvalue(), "") + + def test_cli_GetServices_nextcloud_invalid(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json" + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): + fct_main(["GetServices"]) + self.assertEqual(capted_stdout.getvalue(), "") + self.assertEqual(capted_stderr.getvalue(), "") + + def test_cli_WipeLocalData(self): + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" + + with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: + fct_main(["WipeLocalData"]) + + self.assertEqual(capted_stdout.getvalue(), "") + self.assertEqual(capted_stderr.getvalue(), "") + + self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + def test_cli_simple(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") - datasync.write_data() + fct_main(["PushData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") @@ -72,7 +147,7 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE2") - datasync.read_data() + fct_main(["PullData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") @@ -84,7 +159,7 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE32") - datasync.write_data() + fct_main(["PushData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") @@ -101,29 +176,285 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") - datasync.read_data() + fct_main(["PullData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") - def test_load_empty(self): - dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json" + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + fct_main(["WipeRemoteData"]) + fct_main(["PullData"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + fct_main(["PushData"]) + fct_main(["WipeLocalData"]) + + self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + fct_main(["PullData"]) + + self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + def test_defect_cli_select_wrong_service(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" + with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound): + fct_main(["PullData", "--service", "WRONGSERVICE"]) + + def test_cli_select_service(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + fct_main(["PushData", "--service", "Nextcloud"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE2") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE2") + + fct_main(["PullData", "--service", "Nextcloud"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE32") + + fct_main(["PushData", "--service", "Nextcloud"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE32") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + fct_main(["PullData", "--service", "Nextcloud"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE32") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + fct_main(["WipeRemoteData", "--service", "Nextcloud"]) + fct_main(["PullData", "--service", "Nextcloud"]) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + fct_main(["PushData", "--service", "Nextcloud"]) + fct_main(["WipeLocalData"]) + + self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + fct_main(["PullData", "--service", "Nextcloud"]) + + self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + def load_nextcloud_gen(self, compressor): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsNone(datasync) + datasync[0].set_compressor(compressor) + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 1) + + self.assertIsInstance(datasync[0], dabdatasync.A_DataSync) + self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud) + + self.assertEqual(len(datasync[0].get_datasync_records()), 2) + self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map") + self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2") + self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs") + self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs") + self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data") + self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + datasync[0].push_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE2") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE2") + + datasync[0].pull_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "SAVED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE32") + + datasync[0].push_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE32") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + datasync[0].pull_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE3") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE32") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + datasync[0].wipe_remote_data() + datasync[0].pull_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + datasync[0].push_data() + datasync[0].wipe_local_data() + + self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + datasync[0].pull_data() + + self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + def test_load_nextcloud__tar_gz(self): + with Timer() as t: + self.load_nextcloud_gen("tar_gz") + print(t.elapsed) + + def test_load_nextcloud__tar_lz4(self): + with Timer() as t: + self.load_nextcloud_gen("tar_lz4") + print(t.elapsed) + + def test_load_empty(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json" + datasync = dabdatasync.DataSync_Factory.get_DataSync() + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 0) def test_load_nextcloud_disabled(self): - dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsNone(datasync) + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 0) - def test_load_invalid(self): - dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_invalid.json" + def test_defect_load_invalid(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json" with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): dabdatasync.DataSync_Factory.get_DataSync() - def test_load_nextcloud_invalid(self): - dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json" + def test_defect_load_nextcloud_invalid(self): + dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json" with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): dabdatasync.DataSync_Factory.get_DataSync()