10 Commits

Author SHA1 Message Date
bfcf65b6e7 Merge pull request 'dev' (#3) from dev into master
Reviewed-on: https://chacha.ddns.net/gitea/chacha/dabdatasync/pulls/3
new-tag:0.2.0
2024-03-28 03:30:02 +01:00
cclecle
5fca93e4a8 chore: rework suffix (now in compressor class only) 2024-03-28 02:16:14 +00:00
cclecle
79518946ac fix quality warning 2024-03-28 02:06:01 +00:00
cclecle
6aea5311bb feat: fully implement read for dir and files
test: implement full unitest scenario
2024-03-28 01:55:17 +00:00
cclecle
b4292a6f57 feat: start implement read
chore: improve type check
2024-03-28 00:51:18 +00:00
cclecle
38031deba2 fix: ignore webdav3.client typing
fix: ignore coverage for 'if TYPE_CHECKING:' directives
2024-03-28 00:02:18 +00:00
cclecle
abe800814d fix quality and typing 2024-03-27 02:37:47 +00:00
cclecle
81ba24d6f3 improve quality 2024-03-27 02:17:08 +00:00
f39fd9cf44 Merge pull request 'import code' (#2) from dev into master
Reviewed-on: https://chacha.ddns.net/gitea/chacha/dabdatasync/pulls/2
new-tag:0.1.0
2024-03-23 13:00:47 +01:00
cclecle
dd45b1281f rename project 2024-03-23 11:45:18 +00:00
6 changed files with 242 additions and 66 deletions

View File

@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>{{project_name}}</name>
<name>dabdatasync</name>
<comment></comment>
<projects>
</projects>

View File

@@ -51,9 +51,9 @@ where = ["src"]
"dabdatasync.data" = ["*.*"]
"dabdatasync" = ["py.typed"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.client"
ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
@@ -65,6 +65,11 @@ concurrency = [
'thread'
]
[tool.coverage.report]
exclude_also = [
"if TYPE_CHECKING:",
]
[project.urls]
Homepage = "https://chacha.ddns.net/gitea/chacha/dabdatasync"
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/latest/"

View File

@@ -1,22 +1,26 @@
"""Main datasync class"""
import json
from abc import ABC, abstractmethod
from typing import final, BinaryIO
from typing import final, TYPE_CHECKING, IO
from typing import Self, Any, Set, Optional
from webdav3.client import Client as webdav3_Client
from uuid import UUID
from pathlib import Path
import os
from pydantic import BaseModel
import tarfile
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, TemporaryDirectory
import shutil
from pydantic import BaseModel
from webdav3.client import Client as webdav3_Client
class DataSyncException(Exception):
pass
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
pass
"""specific datasync exception class - Dab appliance manifest not found"""
def urljoin(*args):
@@ -28,123 +32,180 @@ def urljoin(*args):
return "/".join(map(lambda x: str(x).rstrip("/"), args))
class I_DataSync_Compressor(ABC):
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
suffix: str
@abstractmethod
def compress(self, path_in: Path, file_out: Path):
pass
def compress(self, path_in: Path, file_out: IO):
"""compress method - virtual"""
@abstractmethod
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - virtual"""
class DataSync_Compressor_targz(I_DataSync_Compressor):
def compress(self, path_in: Path, file_out):
class DataSync_Compressor_targz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
suffix: str = ".tar.gz"
def compress(self, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
def decompress(self, path_in: Path, path_out: Path):
"""decompress method - .tar.gz concrete"""
with tarfile.open(path_in, "r") as tar:
tar.extractall(path_out)
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
class A_DataSync_Record(BaseModel):
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the record - virtual"""
@abstractmethod
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - virtual"""
class DataSync_Record_Factory:
ar_cls_DataSync_Record: Set[A_DataSync_Record] = set()
"""DataSync Record Factory"""
@classmethod
def _register_C_DataSync_Record(cls, cls_DataSync_Record):
cls.ar_cls_DataSync_Record.add(cls_DataSync_Record)
ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise RuntimeError("No DataSync_Record concrete class found")
@classmethod
def register(cls, _cls):
cls._register_C_DataSync_Record(_cls)
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: I_DataSync_Compressor, file_out: BinaryIO) -> None:
compressor.compress(self.path, self.path, file_out)
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
compressor.compress(self.path, file_out)
def decompress(self, compressor: A_DataSync_Compressor, path_in: Path) -> None:
"""compress the record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)
compressor.decompress(path_in, self.path.parent)
class I_DataSync(ABC):
"""Abstract DataSync class"""
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type(I_DataSync_Compressor) = DataSync_Compressor_targz
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz
@classmethod
@final
def get_manifest_data(cls) -> dict[Any, Any]:
with open(cls.manifest_path) as f_DAB_manifest:
def get_manifest_data(cls) -> dict[str, Any]:
"""utilitary method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
@final
def try_get_instance(cls, manifest) -> Self | None:
def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest)
return None
def __init__(self, manifest: dict[Any, Any]) -> None:
def __init__(self, manifest: dict[str, Any]) -> None:
self.connected: bool = False
self.compressor: I_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[Any, Any] = manifest
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[str, Any] = manifest
self.app_id: UUID = UUID(manifest["APP_ID"])
self.ar_datasync_record: list[A_DataSync_Record] = []
if "FSSYNC_RECORD" in manifest["Args"]:
for record in manifest["Args"]["FSSYNC_RECORD"]["value"]:
self.ar_datasync_record.append(
DataSync_Record_Factory.get_C_DataSync_Record(
record["value"]["name"]["value"],
record["value"]["type"]["value"],
record["value"]["value"]["value"],
)
record = DataSync_Record_Factory.get_C_DataSync_Record(
record["value"]["name"]["value"],
record["value"]["type"]["value"],
record["value"]["value"]["value"],
)
assert isinstance(record, A_DataSync_Record)
self.ar_datasync_record.append(record)
@classmethod
def test_applicable(cls, manifest: dict[Any, Any]) -> bool:
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - generic"""
del manifest # quality warning removal
return False
def configure(self) -> None:
"""configure the class instance"""
self._impl_configure()
@abstractmethod
def _impl_configure(self) -> None:
pass
"""configure the class instance - virtual"""
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
self._impl_connect()
self.connected = True
@abstractmethod
def _impl_connect(self) -> None:
pass
"""connect to the service - virtual"""
def read_data(self) -> None:
"""read data from the service"""
self.connect()
self._impl_read_data()
with TemporaryDirectory() as tmpdir:
for datasync_record in self.ar_datasync_record:
self._impl_read_data(Path(datasync_record.name + self.compressor.suffix), Path(tmpdir))
datasync_record.decompress(self.compressor, Path(tmpdir) / (datasync_record.name + self.compressor.suffix))
@abstractmethod
def _impl_read_data(self) -> None:
pass
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the service - virtual"""
def write_data(self) -> None:
"""write data to the service"""
self.connect()
self._impl_wipe_data()
for datasync_record in self.ar_datasync_record:
try:
with NamedTemporaryFile("wb", suffix=".tar.gz", delete=False) as tmp_file:
datasync_record.compress(type(self).cls_compressor, tmp_file)
with NamedTemporaryFile("wb", suffix=self.compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self.compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
@@ -153,19 +214,27 @@ class I_DataSync(ABC):
os.unlink(tmp_file.name)
@abstractmethod
def _impl_write_data(self, record_name: str, file_in: BinaryIO) -> None:
pass
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the service - virtual"""
def wipe_data(self) -> None:
"""wipe data on the service"""
self.connect()
self._impl_wipe_data()
@abstractmethod
def _impl_wipe_data(self) -> None:
"""wipe data on the service - virtual"""
class DataSync_Factory:
ar_cls_DataSync: Set[I_DataSync] = set()
"""DataSync Factory"""
@classmethod
def _register_C_DataSync(cls, cls_DataSync):
cls.ar_cls_DataSync.add(cls_DataSync)
ar_cls_DataSync: Set[type[I_DataSync]] = set()
@classmethod
def get_DataSync(cls) -> I_DataSync | None:
"""get and configure a DataSync Concrete class instance"""
manifest = I_DataSync.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest):
@@ -174,13 +243,16 @@ class DataSync_Factory:
return None
@classmethod
def register(cls, _cls):
cls._register_C_DataSync(_cls)
def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
"""Concrete DataSync class - Nextcloud"""
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
self.nextcloud_address: str
@@ -191,17 +263,17 @@ class C_DataSync_NextCloud(I_DataSync):
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest) -> bool:
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
else:
raise DataSyncException_InvalidManifest()
return True
raise DataSyncException_InvalidManifest()
def _impl_configure(self):
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
@@ -222,25 +294,37 @@ class C_DataSync_NextCloud(I_DataSync):
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self):
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self):
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_read_data(self):
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
def _impl_write_data(self, record_name: str, file_in: BinaryIO):
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -8,10 +8,9 @@
import unittest
from os import chdir
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
from pathlib import Path
import pprint
import shutil
print(__name__)
print(__package__)
@@ -25,6 +24,14 @@ chdir(testdir_path.parent.resolve())
class TestDabDataSync(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data")
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2")
def tearDown(self) -> None:
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
def test_version(self):
self.assertNotEqual(dabdatasync.__version__, "?.?.?")
@@ -34,10 +41,73 @@ class TestDabDataSync(unittest.TestCase):
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, dabdatasync.I_DataSync)
self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud)
datasync.read_data()
pprint.pprint(datasync.ar_datasync_record)
self.assertEqual(len(datasync.ar_datasync_record), 2)
self.assertEqual(datasync.ar_datasync_record[0].name, "SOTF_map")
self.assertEqual(datasync.ar_datasync_record[1].name, "SOTF_map2")
self.assertEqual(datasync.ar_datasync_record[0].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[1].rec_type, "fs")
self.assertEqual(datasync.ar_datasync_record[0].value, "test/test_data")
self.assertEqual(datasync.ar_datasync_record[1].value, "test/test_data2/SAVE_FILE.txt")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync.write_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
datasync.read_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
datasync.write_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync.read_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
def test_load_empty(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()

View File

@@ -220,6 +220,23 @@
"value": "test/test_data"
}
}
},
{
"type": "T_FSSYNC_RECORD",
"value": {
"name": {
"type": "SIMPLE_STRING",
"value": "SOTF_map2"
},
"type": {
"type": "SIMPLE_STRING",
"value": "fs"
},
"value": {
"type": "STRING",
"value": "test/test_data2/SAVE_FILE.txt"
}
}
}
]
},