diff --git a/.settings/org.eclipse.core.resources.prefs b/.settings/org.eclipse.core.resources.prefs index 99f26c0..22d8696 100644 --- a/.settings/org.eclipse.core.resources.prefs +++ b/.settings/org.eclipse.core.resources.prefs @@ -1,2 +1,3 @@ eclipse.preferences.version=1 +encoding//src/dabdatasync/__main__.py=utf-8 encoding/=UTF-8 diff --git a/pyproject.toml b/pyproject.toml index 689229d..0b1ae1d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,8 +35,10 @@ classifiers = [ dependencies = [ 'importlib-metadata; python_version<"3.9"', 'packaging', - 'webdavclient3', - 'pydantic' + 'webdavclient3==1.*', + 'pydantic==2.*', + 'typed-argument-parser==1.*', + 'loguru==0.7.*' ] dynamic = ["version"] diff --git a/src/dabdatasync/__init__.py b/src/dabdatasync/__init__.py index 395cd8e..923a557 100644 --- a/src/dabdatasync/__init__.py +++ b/src/dabdatasync/__init__.py @@ -11,5 +11,6 @@ Main module __init__ file. """ from .__metadata__ import __version__, __Summuary__, __Name__ -from .datasync import I_DataSync, DataSync_Factory, C_DataSync_NextCloud -from .datasync import DataSyncException, DataSyncException_InvalidManifest +from .datasync import I_DataSync, DataSync_Factory +from .datasync_nextcloud import C_DataSync_NextCloud +from .exceptions import DataSyncException, DataSyncException_InvalidManifest diff --git a/src/dabdatasync/__main__.py b/src/dabdatasync/__main__.py new file mode 100644 index 0000000..4d684f0 --- /dev/null +++ b/src/dabdatasync/__main__.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# pyGameCFG(c) by chacha +# +# pyGameCFG is licensed under a +# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. +# +# You should have received a copy of the license along with this +# work. If not, see . + +"""CLI interface module""" +from __future__ import annotations +from typing import Literal, cast, Union + +import sys +from tap import Tap + +from . import __Summuary__, __Name__ +from . import datasync + + +class dabdatasync_args_PullData(Tap): + """PullData CLI arg subparser""" + + +class dabdatasync_args_PushData(Tap): + """PushData CLI arg subparser""" + + +class dabdatasync_args_WipeLocalData(Tap): + """WipeLocalData CLI arg subparser""" + + +class dabdatasync_args_WipeRemoteData(Tap): + """WipeRemoteData CLI arg subparser""" + + +class dabdatasync_args(Tap): + """Main CLI arg parser""" + + verbosity: int = 0 + + def configure(self) -> None: + self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity") + + self.add_subparsers(dest="command", help="command type", required=True) + self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service") + self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service") + self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data") + self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe service data") + + def process_args(self) -> None: + """dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy""" + self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init + + +def fct_main(i_args: list[str]) -> None: + """CLI main function""" + parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__) + + args: dabdatasync_args = parser.parse_args(i_args) + + if args.verbosity: + pass + + datasync = datasync.DataSync_Factory.get_DataSync() + + if args.command == "PullData": + datasync.read_data() + elif args.command == "PushData": + datasync.write_data() + elif args.command == "WipeLocalData": + datasync.wipe_local_data() + elif args.command == "WipeRemoteData": + datasync.wipe_remote_data() + else: + raise RuntimeError("Invalid argument") + + +if __name__ == "__main__": + fct_main(sys.argv[1:]) diff --git a/src/dabdatasync/compressors.py b/src/dabdatasync/compressors.py new file mode 100644 index 0000000..b9f66ed --- /dev/null +++ b/src/dabdatasync/compressors.py @@ -0,0 +1,46 @@ +from __future__ import annotations +from abc import ABC, abstractmethod +from pathlib import Path +import os +import tarfile + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import IO + +from loguru import logger + + +class A_DataSync_Compressor(ABC): + """abstract compressor class""" + + compressor_name: str = "Abtract" + suffix: str + + @abstractmethod + def compress(self, path_in: Path, file_out: IO): + """compress method - virtual""" + + @abstractmethod + def decompress(self, path_in: Path, path_out: Path): + """decompress method - virtual""" + + +class DataSync_Compressor_targz(A_DataSync_Compressor): + """Concrete compressor class - .tar.gz compressor""" + + compressor_name: str = "targz" + suffix: str = ".tar.gz" + + def compress(self, path_in: Path, file_out: IO): + """compress method - .tar.gz concrete""" + logger.info(f"compressing using {self.compressor_name}") + with tarfile.open(fileobj=file_out, mode="w:gz") as tar: + tar.add(path_in, arcname=os.path.basename(path_in)) + + def decompress(self, path_in: Path, path_out: Path): + """uncompressing method - .tar.gz concrete""" + logger.info(f"uncompressing using {self.compressor_name}") + with tarfile.open(path_in, "r") as tar: + tar.extractall(path_out) diff --git a/src/dabdatasync/datasync.py b/src/dabdatasync/datasync.py index 808507e..7494ded 100644 --- a/src/dabdatasync/datasync.py +++ b/src/dabdatasync/datasync.py @@ -1,140 +1,34 @@ -"""Main datasync class""" - -import json +from __future__ import annotations from abc import ABC, abstractmethod -from typing import final, TYPE_CHECKING, IO -from typing import Self, Any, Set, Optional from uuid import UUID from pathlib import Path import os -import tarfile from tempfile import NamedTemporaryFile, TemporaryDirectory -import shutil +import json -from pydantic import BaseModel -from webdav3.client import Client as webdav3_Client +from typing import final, TYPE_CHECKING +if TYPE_CHECKING: + from typing import Optional, IO, Any, Self -class DataSyncException(Exception): - """generic datasync exception class""" +from loguru import logger - -class DataSyncException_InvalidManifest(DataSyncException): - """specific datasync exception class - Dab appliance manifest not found""" - - -def urljoin(*args): - """ - Joins given arguments into an url. Trailing but not leading slashes are - stripped for each argument. - """ - - return "/".join(map(lambda x: str(x).rstrip("/"), args)) - - -class A_DataSync_Compressor(ABC): - """abstract compressor class""" - - suffix: str - - @abstractmethod - def compress(self, path_in: Path, file_out: IO): - """compress method - virtual""" - - @abstractmethod - def decompress(self, path_in: Path, path_out: Path): - """decompress method - virtual""" - - -class DataSync_Compressor_targz(A_DataSync_Compressor): - """Concrete compressor class - .tar.gz compressor""" - - suffix: str = ".tar.gz" - - def compress(self, path_in: Path, file_out: IO): - """compress method - .tar.gz concrete""" - with tarfile.open(fileobj=file_out, mode="w:gz") as tar: - tar.add(path_in, arcname=os.path.basename(path_in)) - - def decompress(self, path_in: Path, path_out: Path): - """decompress method - .tar.gz concrete""" - with tarfile.open(path_in, "r") as tar: - tar.extractall(path_out) - - -class A_DataSync_Record(BaseModel, ABC): - """Abstract DataSync Record class""" - - name: str - rec_type: str - value: str - - @abstractmethod - def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: - """compress the record - virtual""" - - @abstractmethod - def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: - """compress the record - virtual""" - - -class DataSync_Record_Factory: - """DataSync Record Factory""" - - ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set() - - @classmethod - def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None: - """get a concrete DataSync Record class instance""" - for cls_DataSync_Record in cls.ar_cls_DataSync_Record: - if cls_DataSync_Record.model_fields["rec_type"].default == rec_type: - return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value) - raise RuntimeError("No DataSync_Record concrete class found") - - @classmethod - def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]: - """decorator to register a concrete DataSync Record class""" - cls.ar_cls_DataSync_Record.add(_cls) - return _cls - - -@DataSync_Record_Factory.register -class C_DataSync_Record_FS(A_DataSync_Record): - """Concrete DataSync Record class - FileSystem""" - - rec_type: str = "fs" - path: Optional[Path] = None - - def model_post_init(self, __context) -> None: - self.path = Path(self.value) - - def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: - """compress the DataSync Record - concrete FS implementation""" - if TYPE_CHECKING: - assert isinstance(self.path, Path) - compressor.compress(self.path, file_out) - - def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: - """compress the record - concrete FS implementation""" - if TYPE_CHECKING: - assert isinstance(self.path, Path) - if os.path.isdir(self.path): - shutil.rmtree(self.path) - if os.path.isfile(self.path): - os.remove(self.path) - compressor.decompress(path_in, self.path.parent) +from .records import A_DataSync_Record, DataSync_Record_Factory +from .compressors import A_DataSync_Compressor, DataSync_Compressor_targz +from .exceptions import DataSyncException_RemoteDataNotFound class I_DataSync(ABC): """Abstract DataSync class""" + service_name: str = "Abtract" manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json" cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz @classmethod @final def get_manifest_data(cls) -> dict[str, Any]: - """utilitary method to get manifest""" + """tool method to get manifest""" with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest: return json.load(f_DAB_manifest) @@ -179,152 +73,90 @@ class I_DataSync(ABC): def connect(self) -> None: """connect to the service""" if not self.connected: + logger.info(f"connection to service {self.service_name}") self._impl_connect() self.connected = True + logger.info(f"connection done") @abstractmethod def _impl_connect(self) -> None: """connect to the service - virtual""" - def read_data(self) -> None: - """read data from the service""" + def pull_data(self) -> None: + """pull data from the service""" + logger.info(f"pulling data from service {self.service_name}") self.connect() with TemporaryDirectory() as tmpdir: for datasync_record in self.ar_datasync_record: - self._impl_read_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir)) - datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix)) + logger.info(f"pulling record {datasync_record.name}") + try: + self._impl_pull_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir)) + datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix)) + except DataSyncException_RemoteDataNotFound: + logger.warning(f"remote record file not found {datasync_record.name}") + logger.info(f"done") @abstractmethod - def _impl_read_data(self, file_in: Path, file_out: Path) -> None: - """read data from the service - virtual""" + def _impl_pull_data(self, file_in: Path, file_out: Path) -> None: + """pull data from the service - virtual""" - def write_data(self) -> None: - """write data to the service""" + def push_data(self) -> None: + """push data to the service""" + logger.info(f"pushing data to service {self.service_name}") self.connect() - self._impl_wipe_data() + self._impl_wipe_remote_data() for datasync_record in self.ar_datasync_record: + logger.info(f"pushing record {datasync_record.name}") try: with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file: datasync_record.compress(self.compressor, tmp_file) tmp_file.seek(0) tmp_file.close() - self._impl_write_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file) + self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file) finally: os.unlink(tmp_file.name) + logger.info(f"done") @abstractmethod - def _impl_write_data(self, record_name: str, file_in: IO) -> None: - """write data to the service - virtual""" + def _impl_push_data(self, record_name: str, file_in: IO) -> None: + """push data to the service - virtual""" - def wipe_data(self) -> None: + def wipe_remote_data(self) -> None: """wipe data on the service""" + logger.info(f"wiping remote data on service {self.service_name}") self.connect() - self._impl_wipe_data() + self._impl_wipe_remote_data() + + def wipe_local_data(self) -> None: + """wipe local data""" + logger.info(f"wiping local data") + for datasync_record in self.ar_datasync_record: + datasync_record.wipe() @abstractmethod - def _impl_wipe_data(self) -> None: + def _impl_wipe_remote_data(self) -> None: """wipe data on the service - virtual""" class DataSync_Factory: """DataSync Factory""" - ar_cls_DataSync: Set[type[I_DataSync]] = set() + ar_cls_DataSync: set[type[I_DataSync]] = set() @classmethod - def get_DataSync(cls) -> I_DataSync | None: + def get_DataSync(cls) -> list[I_DataSync]: """get and configure a DataSync Concrete class instance""" + ar_datasync: list[I_DataSync] = [] manifest = I_DataSync.get_manifest_data() for cls_DataSync in cls.ar_cls_DataSync: if res := cls_DataSync.try_get_instance(manifest): res.configure() - return res - return None + ar_datasync.append(res) + return ar_datasync @classmethod def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]: """decorator to register a concrete class to the factory""" cls.ar_cls_DataSync.add(_cls) return _cls - - -@DataSync_Factory.register -class C_DataSync_NextCloud(I_DataSync): - """Concrete DataSync class - Nextcloud""" - - def __init__(self, manifest: dict[Any, Any]) -> None: - super().__init__(manifest) - self.nextcloud_address: str - self.nextcloud_user: str - self.nextcloud_password: str - self.nextcloud_path: str - self.client: webdav3_Client - self.connected: bool = False - - @classmethod - def test_applicable(cls, manifest: dict[str, Any]) -> bool: - """check if a concrete class is applicable - Nextcloud override""" - if "Args" in manifest: - if "FSSync_NextCloud_Enabled" in manifest["Args"]: - if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True: - return True - return False - raise DataSyncException_InvalidManifest() - - def _impl_configure(self) -> None: - """configure the class instance - Nextcloud concrete implementation""" - if "FSSync_NextCloud_Address" in self.manifest["Args"]: - self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_User" in self.manifest["Args"]: - self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_Password" in self.manifest["Args"]: - self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"] - else: - raise DataSyncException_InvalidManifest() - if "FSSync_NextCloud_Path" in self.manifest["Args"]: - self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace( - os.sep, "/" - ) - - else: - raise DataSyncException_InvalidManifest() - - def _impl_connect(self) -> None: - """connect to the remote service - Nextcloud concrete implementation""" - full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user) - self.client = webdav3_Client( - {"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password} - ) - - def _check_create_dir(self) -> None: - """check and create directory in remote service""" - url_accumulator: str = "" - for url_part in self.nextcloud_path.split("/"): - url_accumulator += "/" + url_part - if not self.client.check(url_accumulator): - self.client.mkdir(url_accumulator) - - def _impl_read_data(self, file_in: Path, file_out: Path) -> None: - """read data from the remote service - Nextcloud concrete implementation""" - self._check_create_dir() - self.client.download_sync( - remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/") - ) - - def _impl_write_data(self, record_name: str, file_in: IO) -> None: - """write data to the remote service - Nextcloud concrete implementation""" - self._check_create_dir() - self.client.upload_sync( - remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"), - local_path=file_in.name, - ) - - def _impl_wipe_data(self) -> None: - """wipe data on the service - concrete implementation""" - if self.client.check(self.nextcloud_path): - self.client.clean(self.nextcloud_path) diff --git a/src/dabdatasync/datasync_nextcloud.py b/src/dabdatasync/datasync_nextcloud.py new file mode 100644 index 0000000..a723de2 --- /dev/null +++ b/src/dabdatasync/datasync_nextcloud.py @@ -0,0 +1,100 @@ +from __future__ import annotations +from pathlib import Path +import os +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import Any, IO + +from webdav3.client import Client as webdav3_Client +from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound + +from .datasync import I_DataSync, DataSync_Factory +from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound +from .utils import urljoin + + +@DataSync_Factory.register +class C_DataSync_NextCloud(I_DataSync): + """Concrete DataSync class - Nextcloud""" + + service_name: str = "Nextcloud" + + def __init__(self, manifest: dict[Any, Any]) -> None: + super().__init__(manifest) + self.nextcloud_address: str + self.nextcloud_user: str + self.nextcloud_password: str + self.nextcloud_path: str + self.client: webdav3_Client + self.connected: bool = False + + @classmethod + def test_applicable(cls, manifest: dict[str, Any]) -> bool: + """check if a concrete class is applicable - Nextcloud override""" + if "Args" in manifest: + if "FSSync_NextCloud_Enabled" in manifest["Args"]: + if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True: + return True + return False + raise DataSyncException_InvalidManifest() + + def _impl_configure(self) -> None: + """configure the class instance - Nextcloud concrete implementation""" + if "FSSync_NextCloud_Address" in self.manifest["Args"]: + self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_User" in self.manifest["Args"]: + self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_Password" in self.manifest["Args"]: + self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"] + else: + raise DataSyncException_InvalidManifest() + if "FSSync_NextCloud_Path" in self.manifest["Args"]: + self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace( + os.sep, "/" + ) + + else: + raise DataSyncException_InvalidManifest() + + def _impl_connect(self) -> None: + """connect to the remote service - Nextcloud concrete implementation""" + full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user) + self.client = webdav3_Client( + {"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password} + ) + + def _check_create_dir(self) -> None: + """check and create directory in remote service""" + url_accumulator: str = "" + for url_part in self.nextcloud_path.split("/"): + url_accumulator += "/" + url_part + if not self.client.check(url_accumulator): + self.client.mkdir(url_accumulator) + + def _impl_pull_data(self, file_in: Path, file_out: Path) -> None: + """pull data from the remote service - Nextcloud concrete implementation""" + self._check_create_dir() + try: + self.client.download_sync( + remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/") + ) + except webdav3_RemoteResourceNotFound: + raise DataSyncException_RemoteDataNotFound(webdav3_RemoteResourceNotFound) + + def _impl_push_data(self, record_name: str, file_in: IO) -> None: + """push data to the remote service - Nextcloud concrete implementation""" + self._check_create_dir() + self.client.upload_sync( + remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"), + local_path=file_in.name, + ) + + def _impl_wipe_remote_data(self) -> None: + """wipe data on the service - concrete implementation""" + if self.client.check(self.nextcloud_path): + self.client.clean(self.nextcloud_path) diff --git a/src/dabdatasync/exceptions.py b/src/dabdatasync/exceptions.py new file mode 100644 index 0000000..799574a --- /dev/null +++ b/src/dabdatasync/exceptions.py @@ -0,0 +1,14 @@ +class DataSyncException(Exception): + """generic datasync exception class""" + + +class DataSyncException_InvalidManifest(DataSyncException): + """specific datasync exception class - Dab appliance manifest not found""" + + +class DataSyncException_NoConcreteRecordClassFound(DataSyncException): + """specific datasync exception class - No concrete Record Class Found""" + + +class DataSyncException_RemoteDataNotFound(DataSyncException): + """specific datasync exception class - Remote Data Not Found""" diff --git a/src/dabdatasync/records.py b/src/dabdatasync/records.py new file mode 100644 index 0000000..c09ae80 --- /dev/null +++ b/src/dabdatasync/records.py @@ -0,0 +1,87 @@ +from __future__ import annotations +from abc import ABC, abstractmethod +from pathlib import Path +import os +import shutil + +from typing import TYPE_CHECKING, Optional + +if TYPE_CHECKING: + from typing import IO + +from pydantic import BaseModel + +from .compressors import A_DataSync_Compressor +from .exceptions import DataSyncException_NoConcreteRecordClassFound + + +class A_DataSync_Record(BaseModel, ABC): + """Abstract DataSync Record class""" + + name: str + rec_type: str + value: str + + @abstractmethod + def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: + """compress the DataSync Record - virtual""" + + @abstractmethod + def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: + """decompress the DataSync record - virtual""" + + @abstractmethod + def wipe(self): + """wipe the record local data - virtual""" + + +class DataSync_Record_Factory: + """DataSync Record Factory""" + + ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set() + + @classmethod + def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None: + """get a concrete DataSync Record class instance""" + for cls_DataSync_Record in cls.ar_cls_DataSync_Record: + if cls_DataSync_Record.model_fields["rec_type"].default == rec_type: + return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value) + raise DataSyncException_NoConcreteRecordClassFound() + + @classmethod + def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]: + """decorator to register a concrete DataSync Record class""" + cls.ar_cls_DataSync_Record.add(_cls) + return _cls + + +@DataSync_Record_Factory.register +class C_DataSync_Record_FS(A_DataSync_Record): + """Concrete DataSync Record class - FileSystem""" + + rec_type: str = "fs" + path: Optional[Path] = None + + def model_post_init(self, __context) -> None: + self.path = Path(self.value) + + def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: + """compress the DataSync Record - concrete FS implementation""" + if TYPE_CHECKING: + assert isinstance(self.path, Path) + compressor.compress(self.path, file_out) + + def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None: + """decompress the DataSync record - concrete FS implementation""" + if TYPE_CHECKING: + assert isinstance(self.path, Path) + self.wipe() + compressor.decompress(path_in, self.path.parent) + + def wipe(self): + if TYPE_CHECKING: + assert isinstance(self.path, Path) + if os.path.isdir(self.path): + shutil.rmtree(self.path) + if os.path.isfile(self.path): + os.remove(self.path) diff --git a/src/dabdatasync/utils.py b/src/dabdatasync/utils.py new file mode 100644 index 0000000..93a86ea --- /dev/null +++ b/src/dabdatasync/utils.py @@ -0,0 +1,7 @@ +def urljoin(*args: str) -> str: + """ + Joins given arguments into an url. Trailing but not leading slashes are + stripped for each argument. + """ + + return "/".join(map(lambda x: str(x).rstrip("/"), args)) diff --git a/test/test_datasync.py b/test/test_datasync.py index c031ff0..2b2e7db 100644 --- a/test/test_datasync.py +++ b/test/test_datasync.py @@ -7,7 +7,7 @@ # work. If not, see . import unittest -from os import chdir +from os import chdir, path as os_path from pathlib import Path import pprint import shutil @@ -39,23 +39,26 @@ class TestDabDataSync(unittest.TestCase): def test_load_nextcloud(self): dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsInstance(datasync, dabdatasync.I_DataSync) - self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud) + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 1) - self.assertEqual(len(datasync.ar_datasync_record), 2) - self.assertEqual(datasync.ar_datasync_record[0].name, "SOTF_map") - self.assertEqual(datasync.ar_datasync_record[1].name, "SOTF_map2") - self.assertEqual(datasync.ar_datasync_record[0].rec_type, "fs") - self.assertEqual(datasync.ar_datasync_record[1].rec_type, "fs") - self.assertEqual(datasync.ar_datasync_record[0].value, "test/test_data") - self.assertEqual(datasync.ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt") + self.assertIsInstance(datasync[0], dabdatasync.I_DataSync) + self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud) + + self.assertEqual(len(datasync[0].ar_datasync_record), 2) + self.assertEqual(datasync[0].ar_datasync_record[0].name, "SOTF_map") + self.assertEqual(datasync[0].ar_datasync_record[1].name, "SOTF_map2") + self.assertEqual(datasync[0].ar_datasync_record[0].rec_type, "fs") + self.assertEqual(datasync[0].ar_datasync_record[1].rec_type, "fs") + self.assertEqual(datasync[0].ar_datasync_record[0].value, "test/test_data") + self.assertEqual(datasync[0].ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") - datasync.write_data() + datasync[0].push_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") @@ -72,7 +75,7 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE2") - datasync.read_data() + datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") @@ -84,7 +87,7 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE32") - datasync.write_data() + datasync[0].push_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") @@ -101,22 +104,58 @@ class TestDabDataSync(unittest.TestCase): with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") - datasync.read_data() + datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: + testfile.write("MODIFIED_VALUE") + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + datasync[0].wipe_remote_data() + datasync[0].pull_data() + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + + datasync[0].push_data() + datasync[0].wipe_local_data() + + self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + datasync[0].pull_data() + + self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) + self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) + + with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: + self.assertEqual(testfile.read(), "MODIFIED_VALUE") + def test_load_empty(self): dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsNone(datasync) + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 0) def test_load_nextcloud_disabled(self): dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() - self.assertIsNone(datasync) + self.assertIsInstance(datasync, list) + self.assertEqual(len(datasync), 0) def test_load_invalid(self): dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_invalid.json"