2 Commits

Author SHA1 Message Date
cclecle
b4292a6f57 feat: start implement read
chore: improve type check
2024-03-28 00:51:18 +00:00
cclecle
38031deba2 fix: ignore webdav3.client typing
fix: ignore coverage for 'if TYPE_CHECKING:' directives
2024-03-28 00:02:18 +00:00
3 changed files with 48 additions and 22 deletions

View File

@@ -51,9 +51,9 @@ where = ["src"]
"dabdatasync.data" = ["*.*"]
"dabdatasync" = ["py.typed"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.client"
ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
@@ -65,6 +65,11 @@ concurrency = [
'thread'
]
[tool.coverage.report]
exclude_also = [
"if TYPE_CHECKING:",
]
[project.urls]
Homepage = "https://chacha.ddns.net/gitea/chacha/dabdatasync"
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/latest/"

View File

@@ -8,7 +8,7 @@ from uuid import UUID
from pathlib import Path
import os
import tarfile
from tempfile import NamedTemporaryFile
from tempfile import NamedTemporaryFile, TemporaryDirectory
from pydantic import BaseModel
from webdav3.client import Client as webdav3_Client
@@ -54,6 +54,7 @@ class A_DataSync_Record(BaseModel, ABC):
name: str
rec_type: str
value: str
suffix: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
@@ -85,6 +86,7 @@ class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
suffix: str = ".tar.gz"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
@@ -105,23 +107,23 @@ class I_DataSync(ABC):
@classmethod
@final
def get_manifest_data(cls) -> dict[Any, Any]:
def get_manifest_data(cls) -> dict[str, Any]:
"""utilitary method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
@final
def try_get_instance(cls, manifest) -> Self | None:
def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest)
return None
def __init__(self, manifest: dict[Any, Any]) -> None:
def __init__(self, manifest: dict[str, Any]) -> None:
self.connected: bool = False
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[Any, Any] = manifest
self.manifest: dict[str, Any] = manifest
self.app_id: UUID = UUID(manifest["APP_ID"])
self.ar_datasync_record: list[A_DataSync_Record] = []
if "FSSYNC_RECORD" in manifest["Args"]:
@@ -135,8 +137,8 @@ class I_DataSync(ABC):
self.ar_datasync_record.append(record)
@classmethod
def test_applicable(cls, manifest: dict[Any, Any]) -> bool:
"""check if a concrete class is applicable"""
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - generic"""
del manifest # quality warning removal
return False
@@ -161,18 +163,21 @@ class I_DataSync(ABC):
def read_data(self) -> None:
"""read data from the service"""
self.connect()
self._impl_read_data()
with TemporaryDirectory() as tmpdir:
for datasync_record in self.ar_datasync_record:
self._impl_read_data(Path(datasync_record.name + datasync_record.suffix), Path(tmpdir))
@abstractmethod
def _impl_read_data(self) -> None:
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the service - virtual"""
def write_data(self) -> None:
"""write data to the service"""
self.connect()
self._impl_wipe_data()
for datasync_record in self.ar_datasync_record:
try:
with NamedTemporaryFile("wb", suffix=".tar.gz", delete=False) as tmp_file:
with NamedTemporaryFile("wb", suffix=datasync_record.suffix, delete=False) as tmp_file:
datasync_record.compress(self.compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
@@ -185,6 +190,15 @@ class I_DataSync(ABC):
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the service - virtual"""
def wipe_data(self) -> None:
"""wipe data on the service"""
self.connect()
self._impl_wipe_data()
@abstractmethod
def _impl_wipe_data(self) -> None:
"""wipe data on the service - virtual"""
class DataSync_Factory:
"""DataSync Factory"""
@@ -222,7 +236,8 @@ class C_DataSync_NextCloud(I_DataSync):
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest) -> bool:
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
@@ -231,7 +246,7 @@ class C_DataSync_NextCloud(I_DataSync):
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - concrete implementation"""
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
@@ -253,7 +268,7 @@ class C_DataSync_NextCloud(I_DataSync):
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - concrete implementation"""
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
@@ -267,14 +282,22 @@ class C_DataSync_NextCloud(I_DataSync):
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_read_data(self) -> None:
"""read data from the remote service - concrete implementation"""
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
"""read data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
"""write data to the remote service - concrete implementation"""
"""write data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -8,8 +8,6 @@
import unittest
from os import chdir
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
from pathlib import Path
import pprint
@@ -34,9 +32,9 @@ class TestDabDataSync(unittest.TestCase):
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, dabdatasync.I_DataSync)
self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud)
datasync.read_data()
pprint.pprint(datasync.ar_datasync_record)
datasync.write_data()
datasync.read_data()
def test_load_empty(self):
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"