5 Commits

Author SHA1 Message Date
cclecle
2d753205ca feat: implement verbosity 2024-03-29 03:25:18 +00:00
cclecle
872dab1ed9 chore: optimize imports
fix: remove useless printf
2024-03-29 03:11:08 +00:00
cclecle
96d8f8fdd7 fix webdavclient3 version dep 2024-03-29 02:55:08 +00:00
cclecle
0ff9f442d2 chore: remove useless data
feature: add lz4 compressor
refactoring: make compressor configurable
feature: add CLI interface and configure int in .toml
feature: add logging using loguru
2024-03-29 02:51:49 +00:00
cclecle
9f4f2bbba1 feat: add main / argparse (tap)
feat: add logging system
refactoring: split project into multiples files
2024-03-28 17:59:10 +00:00
12 changed files with 1036 additions and 276 deletions

View File

@@ -1,2 +1,3 @@
eclipse.preferences.version=1
encoding//src/dabdatasync/__main__.py=utf-8
encoding/<project>=UTF-8

View File

@@ -35,8 +35,11 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'webdavclient3',
'pydantic'
'webdavclient3==3.14.*',
'pydantic==2.*',
'typed-argument-parser==1.*',
'loguru==0.7.*',
'lz4'
]
dynamic = ["version"]
@@ -55,6 +58,18 @@ where = ["src"]
module = "webdav3.client"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.exceptions"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4.frame"
ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
branch = true
@@ -76,13 +91,13 @@ Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/
Tracker = "https://chacha.ddns.net/gitea/chacha/dabdatasync/issues"
[project.optional-dependencies]
test = ["chacha_cicd_helper"]
test = ["chacha_cicd_helper","contexttimer"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]
# my-script = "my_package.module:function"
[project.scripts]
dabdatasync = "dabdatasync.__main__:CLI"

View File

@@ -11,5 +11,18 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .datasync import I_DataSync, DataSync_Factory, C_DataSync_NextCloud
from .datasync import DataSyncException, DataSyncException_InvalidManifest
from .datasync import A_DataSync, DataSync_Factory
from .datasync_nextcloud import C_DataSync_NextCloud
from .exceptions import (
DataSyncException,
DataSyncException_InvalidManifest,
DataSyncException_NoConcreteRecordClassFound,
DataSyncException_RemoteDataNotFound,
DataSyncException_NoValidServiceFound,
DataSyncException_ServiceNotFound,
DataSyncException_TooManyServiceFound,
DataSyncException_CompressorNotFound,
DataSyncException_TooManyCompressorFound,
)
from .compressors import DataSync_Compressors

155
src/dabdatasync/__main__.py Normal file
View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""CLI interface module"""
from __future__ import annotations
from typing import cast, Union, Optional
import sys
from tap import Tap
from loguru import logger
from . import __Summuary__, __Name__
from . import datasync
from . import exceptions
class dabdatasync_args_GetServices(Tap):
"""GetServices CLI arg subparser"""
class dabdatasync_args_WipeLocalData(Tap):
"""WipeLocalData CLI arg subparser"""
class dabdatasync_args_service_abstract(Tap):
"""service abstract CLI arg subparser"""
service: Optional[str] = None
def configure(self) -> None:
self.add_argument("--service")
class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract):
"""service compressor abstract CLI arg subparser"""
compressor: Optional[str] = None
def configure(self) -> None:
super().configure()
self.add_argument("--compressor")
class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract):
"""PullData CLI arg subparser"""
class dabdatasync_args_PushData(dabdatasync_args_service_compress_abstract):
"""PushData CLI arg subparser"""
class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract):
"""WipeRemoteData CLI arg subparser"""
class dabdatasync_args(Tap):
"""Main CLI arg parser"""
verbosity: int = 0
def configure(self) -> None:
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_subparsers(dest="command", help="command type", required=True)
self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list")
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data")
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe remote service data")
def process_args(self) -> None:
"""dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy"""
self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init
def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,too-complex
"""CLI main function"""
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
args: dabdatasync_args = parser.parse_args(i_args)
logger.remove()
if args.verbosity:
if args.verbosity == 1:
logger.add(sys.stdout, level="WARNING")
elif args.verbosity == 2:
logger.add(sys.stdout, level="INFO")
else:
logger.add(sys.stdout, level="DEBUG")
else:
logger.add(sys.stdout, level="ERROR")
logger.add(sys.stderr, level="ERROR")
dabdatasync = datasync.DataSync_Factory.get_DataSync()
if len(dabdatasync) == 0:
raise exceptions.DataSyncException_NoValidServiceFound("No valid service found")
if args.command == "GetServices":
for service in dabdatasync:
print(service.service_name)
return
if args.command == "WipeLocalData":
dabdatasync[0].wipe_local_data()
return
selected_dabdatasync: datasync.A_DataSync
if args.command in ["PullData", "PushData", "WipeRemoteData"]:
requested_service = cast(dabdatasync_args_service_abstract, args).service # pylint: disable=no-member
if requested_service:
services = [_ for _ in dabdatasync if type(_).service_name == requested_service]
if len(services) == 0:
raise exceptions.DataSyncException_ServiceNotFound()
if len(services) == 1:
selected_dabdatasync = services[0]
else:
raise exceptions.DataSyncException_TooManyServiceFound()
else:
selected_dabdatasync = dabdatasync[0]
if args.command in ["PullData", "PushData"]:
compressor = cast(dabdatasync_args_service_compress_abstract, args).compressor # pylint: disable=no-member
if compressor:
selected_dabdatasync.set_compressor(compressor)
if args.command == "PullData":
selected_dabdatasync.pull_data()
return
if args.command == "PushData":
selected_dabdatasync.push_data()
return
if args.command == "WipeRemoteData":
selected_dabdatasync.wipe_remote_data()
return
raise RuntimeError("Invalid argument")
def CLI():
"""wrapper for .toml declared script"""
fct_main(sys.argv)
if __name__ == "__main__":
fct_main(sys.argv[1:])

View File

@@ -0,0 +1,123 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Compressor interface and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
from io import BytesIO
from typing import TYPE_CHECKING
import tarfile
import lz4.frame
from loguru import logger
from .exceptions import DataSyncException_CompressorNotFound, DataSyncException_TooManyCompressorFound
if TYPE_CHECKING:
from typing import IO
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
suffix: str
compressor_name: str = "ABSTRACT"
@classmethod
def compress(cls, path_in: Path, file_out: IO):
"""compress method"""
logger.debug(f"compressing <{path_in}> data using <{cls.compressor_name}> to <{file_out.name}>")
cls._impl_compress(path_in, file_out)
@classmethod
@abstractmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - virtual"""
@classmethod
def uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method"""
logger.debug(f"uncompressing <{path_in}> data using <{cls.compressor_name}> to <{path_out}>")
cls._impl_uncompress(path_in, path_out)
@classmethod
@abstractmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method - virtual"""
class DataSync_Compressors:
"""compressers container/factory class"""
_availables: list[type[A_DataSync_Compressor]] = []
@classmethod
def register(cls, _cls: type[A_DataSync_Compressor]) -> type[A_DataSync_Compressor]:
"""register a new compressor"""
cls._availables.append(_cls)
return _cls
@classmethod
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
"""get a specific compressor"""
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
if len(found) == 0:
raise DataSyncException_CompressorNotFound()
if len(found) == 1:
return found[0]
raise DataSyncException_TooManyCompressorFound()
@DataSync_Compressors.register
class DataSync_Compressor__tar_gz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
suffix: str = ".tar.gz"
compressor_name: str = "tar_gz"
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.gz concrete"""
with tarfile.open(path_in, mode="r:gz") as tar:
tar.extractall(path_out)
@DataSync_Compressors.register
class DataSync_Compressor__tar_lz4(A_DataSync_Compressor):
"""Concrete compressor class - .tar.lz4 compressor"""
suffix: str = ".tar.lz4"
compressor_name: str = "tar_lz4"
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.lz4 concrete"""
with BytesIO() as memBuff:
with tarfile.open(fileobj=memBuff, mode="w:") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
memBuff.seek(0)
file_out.write(lz4.frame.compress(memBuff.read()))
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.lz4 concrete"""
with open(path_in, "rb") as file_in, BytesIO() as memBuff:
memBuff.write(lz4.frame.decompress(file_in.read()))
memBuff.seek(0)
with tarfile.open(fileobj=memBuff, mode="r:") as tar:
tar.extractall(path_out)

View File

@@ -1,157 +1,53 @@
"""Main datasync class"""
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import json
"""
Nextcloud abstract interface
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import final, TYPE_CHECKING, IO
from typing import Self, Any, Set, Optional
from uuid import UUID
from pathlib import Path
import os
import tarfile
from tempfile import NamedTemporaryFile, TemporaryDirectory
import shutil
import json
from typing import final, TYPE_CHECKING
from pydantic import BaseModel
from webdav3.client import Client as webdav3_Client
from loguru import logger
from .records import A_DataSync_Record, DataSync_Record_Factory
from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors
from .exceptions import DataSyncException_RemoteDataNotFound
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
from .compressors import A_DataSync_Compressor
class DataSyncException(Exception):
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
"""specific datasync exception class - Dab appliance manifest not found"""
def urljoin(*args):
"""
Joins given arguments into an url. Trailing but not leading slashes are
stripped for each argument.
"""
return "/".join(map(lambda x: str(x).rstrip("/"), args))
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
suffix: str
@abstractmethod
def compress(self, path_in: Path, file_out: IO):
"""compress method - virtual"""
@abstractmethod
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - virtual"""
class DataSync_Compressor_targz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
suffix: str = ".tar.gz"
def compress(self, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - .tar.gz concrete"""
with tarfile.open(path_in, "r") as tar:
tar.extractall(path_out)
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the record - virtual"""
@abstractmethod
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise RuntimeError("No DataSync_Record concrete class found")
@classmethod
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
compressor.compress(self.path, file_out)
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)
compressor.decompress(path_in, self.path.parent)
class I_DataSync(ABC):
class A_DataSync(ABC):
"""Abstract DataSync class"""
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz
service_name: str = "ABSTRACT"
@classmethod
@final
def get_manifest_data(cls) -> dict[str, Any]:
"""utilitary method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
@final
def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None:
def try_get_instance(cls, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest)
return cls(manifest, cls_compressor)
return None
def __init__(self, manifest: dict[str, Any]) -> None:
def __init__(self, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
self.connected: bool = False
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[str, Any] = manifest
self.app_id: UUID = UUID(manifest["APP_ID"])
self.ar_datasync_record: list[A_DataSync_Record] = []
self._cls_compressor: type[A_DataSync_Compressor] = cls_compressor
self._manifest: dict[str, Any] = manifest
self._app_id: UUID = UUID(manifest["APP_ID"])
self._ar_datasync_record: list[A_DataSync_Record] = []
if "FSSYNC_RECORD" in manifest["Args"]:
for record in manifest["Args"]["FSSYNC_RECORD"]["value"]:
record = DataSync_Record_Factory.get_C_DataSync_Record(
@@ -160,7 +56,15 @@ class I_DataSync(ABC):
record["value"]["value"]["value"],
)
assert isinstance(record, A_DataSync_Record)
self.ar_datasync_record.append(record)
self._ar_datasync_record.append(record)
def set_compressor(self, compressor_name: str) -> None:
"""set compressor to be used"""
self._cls_compressor = DataSync_Compressors.get(compressor_name)
def get_datasync_records(self) -> list[A_DataSync_Record]:
"""get list of records"""
return self._ar_datasync_record
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
@@ -179,152 +83,98 @@ class I_DataSync(ABC):
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
logger.info(f"connection to service <{self.service_name}>")
self._impl_connect()
self.connected = True
logger.info("connection done")
@abstractmethod
def _impl_connect(self) -> None:
"""connect to the service - virtual"""
def read_data(self) -> None:
"""read data from the service"""
def pull_data(self) -> None:
"""pull data from the service"""
logger.info(f"pulling data from service <{self.service_name}>")
self.connect()
with TemporaryDirectory() as tmpdir:
for datasync_record in self.ar_datasync_record:
self._impl_read_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir))
datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix))
for datasync_record in self._ar_datasync_record:
logger.info(f"pulling record <{datasync_record.name}>")
try:
self._impl_pull_data(Path(datasync_record.name + self._cls_compressor.suffix), Path(tmpdir))
datasync_record.uncompress(self._cls_compressor, Path(tmpdir) / (datasync_record.name + self._cls_compressor.suffix))
except DataSyncException_RemoteDataNotFound:
logger.warning(f"remote record file not found <{datasync_record.name}>")
logger.info("done")
@abstractmethod
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the service - virtual"""
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the service - virtual"""
def write_data(self) -> None:
"""write data to the service"""
def push_data(self) -> None:
"""push data to the service"""
logger.info(f"pushing data to service <{self.service_name}>")
self.connect()
self._impl_wipe_data()
for datasync_record in self.ar_datasync_record:
self._impl_wipe_remote_data()
for datasync_record in self._ar_datasync_record:
logger.info(f"pushing record <{datasync_record.name}>")
try:
with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self.compressor, tmp_file)
with NamedTemporaryFile("wb", suffix=self._cls_compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self._cls_compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
self._impl_write_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
finally:
os.unlink(tmp_file.name)
logger.info("done")
@abstractmethod
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the service - virtual"""
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the service - virtual"""
def wipe_data(self) -> None:
def wipe_remote_data(self) -> None:
"""wipe data on the service"""
logger.info(f"wiping remote data on service <{self.service_name}>")
self.connect()
self._impl_wipe_data()
self._impl_wipe_remote_data()
def wipe_local_data(self) -> None:
"""wipe local data"""
logger.info("wiping local data")
for datasync_record in self._ar_datasync_record:
datasync_record.wipe()
@abstractmethod
def _impl_wipe_data(self) -> None:
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - virtual"""
class DataSync_Factory:
"""DataSync Factory"""
ar_cls_DataSync: Set[type[I_DataSync]] = set()
ar_cls_DataSync: set[type[A_DataSync]] = set()
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor__tar_gz
@classmethod
def get_DataSync(cls) -> I_DataSync | None:
def get_manifest_data(cls) -> dict[str, Any]:
"""tool method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
def get_DataSync(cls) -> list[A_DataSync]:
"""get and configure a DataSync Concrete class instance"""
manifest = I_DataSync.get_manifest_data()
ar_datasync: list[A_DataSync] = []
manifest = cls.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest):
if res := cls_DataSync.try_get_instance(manifest, cls.cls_compressor):
res.configure()
return res
return None
ar_datasync.append(res)
return ar_datasync
@classmethod
def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]:
def register(cls, _cls: type[A_DataSync]) -> type[A_DataSync]:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
"""Concrete DataSync class - Nextcloud"""
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
self.nextcloud_path: str
self.client: webdav3_Client
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self.manifest["Args"]:
self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self.manifest["Args"]:
self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self.manifest["Args"]:
self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace(
os.sep, "/"
)
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -0,0 +1,113 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Nextcloud datasync implementation
"""
from __future__ import annotations
from pathlib import Path
import os
from typing import TYPE_CHECKING
from webdav3.client import Client as webdav3_Client
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
from .datasync import A_DataSync, DataSync_Factory
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
from .utils import urljoin
if TYPE_CHECKING:
from typing import Any, IO
from .compressors import A_DataSync_Compressor
@DataSync_Factory.register
class C_DataSync_NextCloud(A_DataSync):
"""Concrete DataSync class - Nextcloud"""
service_name: str = "Nextcloud"
def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
super().__init__(manifest, cls_compressor)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
self.nextcloud_path: str
self.client: webdav3_Client
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self._manifest["Args"]:
self.nextcloud_address = self._manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self._manifest["Args"]:
self.nextcloud_user = self._manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self._manifest["Args"]:
self.nextcloud_password = self._manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self._manifest["Args"]:
self.nextcloud_path = str(Path(self._manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self._app_id))).replace(
os.sep, "/"
)
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
try:
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
except webdav3_RemoteResourceNotFound as exc:
raise DataSyncException_RemoteDataNotFound from exc
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -0,0 +1,47 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Exception declaration
"""
class DataSyncException(Exception):
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
"""specific datasync exception class - Dab appliance manifest not found"""
class DataSyncException_NoConcreteRecordClassFound(DataSyncException):
"""specific datasync exception class - No concrete Record Class Found"""
class DataSyncException_RemoteDataNotFound(DataSyncException):
"""specific datasync exception class - Remote Data Not Found"""
class DataSyncException_NoValidServiceFound(DataSyncException):
"""specific datasync exception class - Remote Valid Service Found"""
class DataSyncException_ServiceNotFound(DataSyncException):
"""specific datasync exception class - Service Not Found"""
class DataSyncException_TooManyServiceFound(DataSyncException):
"""specific datasync exception class - Too Many Service Found"""
class DataSyncException_CompressorNotFound(DataSyncException):
"""specific datasync exception class - Compressor Not Found"""
class DataSyncException_TooManyCompressorFound(DataSyncException):
"""specific datasync exception class - Too Many Compressor Found"""

View File

@@ -0,0 +1,99 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
datasync records description and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
import shutil
from typing import TYPE_CHECKING, Optional
from pydantic import BaseModel
from .exceptions import DataSyncException_NoConcreteRecordClassFound
if TYPE_CHECKING:
from .compressors import A_DataSync_Compressor
from typing import IO
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - virtual"""
@abstractmethod
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - virtual"""
@abstractmethod
def wipe(self):
"""wipe the record local data - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise DataSyncException_NoConcreteRecordClassFound()
@classmethod
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
cls_compressor.compress(self.path, file_out)
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
self.wipe()
cls_compressor.uncompress(path_in, self.path.parent)
def wipe(self):
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)

View File

@@ -5,3 +5,16 @@
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
tools classes / functions
"""
def urljoin(*args: str) -> str:
"""
Joins given arguments into an url. Trailing but not leading slashes are
stripped for each argument.
"""
return "/".join(map(lambda x: str(x).rstrip("/"), args))

View File

@@ -7,15 +7,19 @@
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from os import chdir
from os import chdir, path as os_path
from pathlib import Path
import pprint
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import shutil
from contexttimer import Timer
print(__name__)
print(__package__)
from src import dabdatasync
from src.dabdatasync.__main__ import fct_main
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
@@ -36,26 +40,97 @@ class TestDabDataSync(unittest.TestCase):
def test_version(self):
self.assertNotEqual(dabdatasync.__version__, "?.?.?")
def test_load_nextcloud(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, dabdatasync.I_DataSync)
self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud)
def test_cli_help(self):
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(SystemExit):
fct_main(["-h"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
self.assertEqual(len(datasync.ar_datasync_record), 2)
self.assertEqual(datasync.ar_datasync_record[0].name, "SOTF_map")
self.assertEqual(datasync.ar_datasync_record[1].name, "SOTF_map2")
self.assertEqual(datasync.ar_datasync_record[0].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[1].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[0].value, "test/test_data")
self.assertEqual(datasync.ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt")
def test_cli_verbosity(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
fct_main(["WipeRemoteData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["PullData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-v", "PullData"])
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertFalse("INFO" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-vv", "PullData"])
self.assertTrue("INFO" in capted_stdout.getvalue())
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["GetServices"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
self.assertTrue("Nextcloud" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_noservice(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_WipeLocalData(self):
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["WipeLocalData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
def test_cli_simple(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync.write_data()
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
@@ -72,7 +147,7 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
datasync.read_data()
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
@@ -84,7 +159,7 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
datasync.write_data()
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
@@ -101,29 +176,285 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync.read_data()
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
def test_load_empty(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData"])
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_defect_cli_select_wrong_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound):
fct_main(["PullData", "--service", "WRONGSERVICE"])
def test_cli_select_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData", "--service", "Nextcloud"])
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData", "--service", "Nextcloud"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def load_nextcloud_gen(self, compressor):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsNone(datasync)
datasync[0].set_compressor(compressor)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 1)
self.assertIsInstance(datasync[0], dabdatasync.A_DataSync)
self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud)
self.assertEqual(len(datasync[0].get_datasync_records()), 2)
self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map")
self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2")
self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data")
self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].wipe_remote_data()
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].push_data()
datasync[0].wipe_local_data()
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
datasync[0].pull_data()
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_load_nextcloud__tar_gz(self):
with Timer() as t:
self.load_nextcloud_gen("tar_gz")
print(t.elapsed)
def test_load_nextcloud__tar_lz4(self):
with Timer() as t:
self.load_nextcloud_gen("tar_lz4")
print(t.elapsed)
def test_load_empty(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_nextcloud_disabled(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsNone(datasync)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_invalid(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_invalid.json"
def test_defect_load_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()
def test_load_nextcloud_invalid(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
def test_defect_load_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()