feat: add main / argparse (tap)

feat: add logging system
refactoring: split project into multiples files
This commit is contained in:
cclecle
2024-03-28 17:59:10 +00:00
parent 5fca93e4a8
commit 9f4f2bbba1
11 changed files with 448 additions and 237 deletions

View File

@@ -1,2 +1,3 @@
eclipse.preferences.version=1
encoding//src/dabdatasync/__main__.py=utf-8
encoding/<project>=UTF-8

View File

@@ -35,8 +35,10 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'webdavclient3',
'pydantic'
'webdavclient3==1.*',
'pydantic==2.*',
'typed-argument-parser==1.*',
'loguru==0.7.*'
]
dynamic = ["version"]

View File

@@ -11,5 +11,6 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .datasync import I_DataSync, DataSync_Factory, C_DataSync_NextCloud
from .datasync import DataSyncException, DataSyncException_InvalidManifest
from .datasync import I_DataSync, DataSync_Factory
from .datasync_nextcloud import C_DataSync_NextCloud
from .exceptions import DataSyncException, DataSyncException_InvalidManifest

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# pyGameCFG(c) by chacha
#
# pyGameCFG is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""CLI interface module"""
from __future__ import annotations
from typing import Literal, cast, Union
import sys
from tap import Tap
from . import __Summuary__, __Name__
from . import datasync
class dabdatasync_args_PullData(Tap):
"""PullData CLI arg subparser"""
class dabdatasync_args_PushData(Tap):
"""PushData CLI arg subparser"""
class dabdatasync_args_WipeLocalData(Tap):
"""WipeLocalData CLI arg subparser"""
class dabdatasync_args_WipeRemoteData(Tap):
"""WipeRemoteData CLI arg subparser"""
class dabdatasync_args(Tap):
"""Main CLI arg parser"""
verbosity: int = 0
def configure(self) -> None:
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_subparsers(dest="command", help="command type", required=True)
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data")
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe service data")
def process_args(self) -> None:
"""dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy"""
self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init
def fct_main(i_args: list[str]) -> None:
"""CLI main function"""
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
args: dabdatasync_args = parser.parse_args(i_args)
if args.verbosity:
pass
datasync = datasync.DataSync_Factory.get_DataSync()
if args.command == "PullData":
datasync.read_data()
elif args.command == "PushData":
datasync.write_data()
elif args.command == "WipeLocalData":
datasync.wipe_local_data()
elif args.command == "WipeRemoteData":
datasync.wipe_remote_data()
else:
raise RuntimeError("Invalid argument")
if __name__ == "__main__":
fct_main(sys.argv[1:])

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
import tarfile
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import IO
from loguru import logger
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
compressor_name: str = "Abtract"
suffix: str
@abstractmethod
def compress(self, path_in: Path, file_out: IO):
"""compress method - virtual"""
@abstractmethod
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - virtual"""
class DataSync_Compressor_targz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
compressor_name: str = "targz"
suffix: str = ".tar.gz"
def compress(self, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
logger.info(f"compressing using {self.compressor_name}")
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
def decompress(self, path_in: Path, path_out: Path):
"""uncompressing method - .tar.gz concrete"""
logger.info(f"uncompressing using {self.compressor_name}")
with tarfile.open(path_in, "r") as tar:
tar.extractall(path_out)

View File

@@ -1,140 +1,34 @@
"""Main datasync class"""
import json
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import final, TYPE_CHECKING, IO
from typing import Self, Any, Set, Optional
from uuid import UUID
from pathlib import Path
import os
import tarfile
from tempfile import NamedTemporaryFile, TemporaryDirectory
import shutil
import json
from pydantic import BaseModel
from webdav3.client import Client as webdav3_Client
from typing import final, TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
class DataSyncException(Exception):
"""generic datasync exception class"""
from loguru import logger
class DataSyncException_InvalidManifest(DataSyncException):
"""specific datasync exception class - Dab appliance manifest not found"""
def urljoin(*args):
"""
Joins given arguments into an url. Trailing but not leading slashes are
stripped for each argument.
"""
return "/".join(map(lambda x: str(x).rstrip("/"), args))
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
suffix: str
@abstractmethod
def compress(self, path_in: Path, file_out: IO):
"""compress method - virtual"""
@abstractmethod
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - virtual"""
class DataSync_Compressor_targz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
suffix: str = ".tar.gz"
def compress(self, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - .tar.gz concrete"""
with tarfile.open(path_in, "r") as tar:
tar.extractall(path_out)
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the record - virtual"""
@abstractmethod
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise RuntimeError("No DataSync_Record concrete class found")
@classmethod
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
compressor.compress(self.path, file_out)
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)
compressor.decompress(path_in, self.path.parent)
from .records import A_DataSync_Record, DataSync_Record_Factory
from .compressors import A_DataSync_Compressor, DataSync_Compressor_targz
from .exceptions import DataSyncException_RemoteDataNotFound
class I_DataSync(ABC):
"""Abstract DataSync class"""
service_name: str = "Abtract"
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz
@classmethod
@final
def get_manifest_data(cls) -> dict[str, Any]:
"""utilitary method to get manifest"""
"""tool method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@@ -179,152 +73,90 @@ class I_DataSync(ABC):
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
logger.info(f"connection to service {self.service_name}")
self._impl_connect()
self.connected = True
logger.info(f"connection done")
@abstractmethod
def _impl_connect(self) -> None:
"""connect to the service - virtual"""
def read_data(self) -> None:
"""read data from the service"""
def pull_data(self) -> None:
"""pull data from the service"""
logger.info(f"pulling data from service {self.service_name}")
self.connect()
with TemporaryDirectory() as tmpdir:
for datasync_record in self.ar_datasync_record:
self._impl_read_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir))
datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix))
logger.info(f"pulling record {datasync_record.name}")
try:
self._impl_pull_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir))
datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix))
except DataSyncException_RemoteDataNotFound:
logger.warning(f"remote record file not found {datasync_record.name}")
logger.info(f"done")
@abstractmethod
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the service - virtual"""
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the service - virtual"""
def write_data(self) -> None:
"""write data to the service"""
def push_data(self) -> None:
"""push data to the service"""
logger.info(f"pushing data to service {self.service_name}")
self.connect()
self._impl_wipe_data()
self._impl_wipe_remote_data()
for datasync_record in self.ar_datasync_record:
logger.info(f"pushing record {datasync_record.name}")
try:
with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self.compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
self._impl_write_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
finally:
os.unlink(tmp_file.name)
logger.info(f"done")
@abstractmethod
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the service - virtual"""
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the service - virtual"""
def wipe_data(self) -> None:
def wipe_remote_data(self) -> None:
"""wipe data on the service"""
logger.info(f"wiping remote data on service {self.service_name}")
self.connect()
self._impl_wipe_data()
self._impl_wipe_remote_data()
def wipe_local_data(self) -> None:
"""wipe local data"""
logger.info(f"wiping local data")
for datasync_record in self.ar_datasync_record:
datasync_record.wipe()
@abstractmethod
def _impl_wipe_data(self) -> None:
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - virtual"""
class DataSync_Factory:
"""DataSync Factory"""
ar_cls_DataSync: Set[type[I_DataSync]] = set()
ar_cls_DataSync: set[type[I_DataSync]] = set()
@classmethod
def get_DataSync(cls) -> I_DataSync | None:
def get_DataSync(cls) -> list[I_DataSync]:
"""get and configure a DataSync Concrete class instance"""
ar_datasync: list[I_DataSync] = []
manifest = I_DataSync.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest):
res.configure()
return res
return None
ar_datasync.append(res)
return ar_datasync
@classmethod
def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
"""Concrete DataSync class - Nextcloud"""
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
self.nextcloud_path: str
self.client: webdav3_Client
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self.manifest["Args"]:
self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self.manifest["Args"]:
self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self.manifest["Args"]:
self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace(
os.sep, "/"
)
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -0,0 +1,100 @@
from __future__ import annotations
from pathlib import Path
import os
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, IO
from webdav3.client import Client as webdav3_Client
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
from .datasync import I_DataSync, DataSync_Factory
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
from .utils import urljoin
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
"""Concrete DataSync class - Nextcloud"""
service_name: str = "Nextcloud"
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
self.nextcloud_path: str
self.client: webdav3_Client
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self.manifest["Args"]:
self.nextcloud_user = self.manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self.manifest["Args"]:
self.nextcloud_password = self.manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self.manifest["Args"]:
self.nextcloud_path = str(Path(self.manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self.app_id))).replace(
os.sep, "/"
)
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
try:
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
except webdav3_RemoteResourceNotFound:
raise DataSyncException_RemoteDataNotFound(webdav3_RemoteResourceNotFound)
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -0,0 +1,14 @@
class DataSyncException(Exception):
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
"""specific datasync exception class - Dab appliance manifest not found"""
class DataSyncException_NoConcreteRecordClassFound(DataSyncException):
"""specific datasync exception class - No concrete Record Class Found"""
class DataSyncException_RemoteDataNotFound(DataSyncException):
"""specific datasync exception class - Remote Data Not Found"""

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
import shutil
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from typing import IO
from pydantic import BaseModel
from .compressors import A_DataSync_Compressor
from .exceptions import DataSyncException_NoConcreteRecordClassFound
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the DataSync Record - virtual"""
@abstractmethod
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""decompress the DataSync record - virtual"""
@abstractmethod
def wipe(self):
"""wipe the record local data - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise DataSyncException_NoConcreteRecordClassFound()
@classmethod
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
compressor.compress(self.path, file_out)
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""decompress the DataSync record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
self.wipe()
compressor.decompress(path_in, self.path.parent)
def wipe(self):
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)

7
src/dabdatasync/utils.py Normal file
View File

@@ -0,0 +1,7 @@
def urljoin(*args: str) -> str:
"""
Joins given arguments into an url. Trailing but not leading slashes are
stripped for each argument.
"""
return "/".join(map(lambda x: str(x).rstrip("/"), args))

View File

@@ -7,7 +7,7 @@
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from os import chdir
from os import chdir, path as os_path
from pathlib import Path
import pprint
import shutil
@@ -39,23 +39,26 @@ class TestDabDataSync(unittest.TestCase):
def test_load_nextcloud(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, dabdatasync.I_DataSync)
self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 1)
self.assertEqual(len(datasync.ar_datasync_record), 2)
self.assertEqual(datasync.ar_datasync_record[0].name, "SOTF_map")
self.assertEqual(datasync.ar_datasync_record[1].name, "SOTF_map2")
self.assertEqual(datasync.ar_datasync_record[0].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[1].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[0].value, "test/test_data")
self.assertEqual(datasync.ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt")
self.assertIsInstance(datasync[0], dabdatasync.I_DataSync)
self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud)
self.assertEqual(len(datasync[0].ar_datasync_record), 2)
self.assertEqual(datasync[0].ar_datasync_record[0].name, "SOTF_map")
self.assertEqual(datasync[0].ar_datasync_record[1].name, "SOTF_map2")
self.assertEqual(datasync[0].ar_datasync_record[0].rec_type, "fs")
self.assertEqual(datasync[0].ar_datasync_record[1].rec_type, "fs")
self.assertEqual(datasync[0].ar_datasync_record[0].value, "test/test_data")
self.assertEqual(datasync[0].ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync.write_data()
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
@@ -72,7 +75,7 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
datasync.read_data()
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
@@ -84,7 +87,7 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
datasync.write_data()
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
@@ -101,22 +104,58 @@ class TestDabDataSync(unittest.TestCase):
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync.read_data()
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].wipe_remote_data()
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].push_data()
datasync[0].wipe_local_data()
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
datasync[0].pull_data()
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_load_empty(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsNone(datasync)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_nextcloud_disabled(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsNone(datasync)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_invalid(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_invalid.json"