feat: start implement read
chore: improve type check
This commit is contained in:
@@ -8,7 +8,7 @@ from uuid import UUID
|
||||
from pathlib import Path
|
||||
import os
|
||||
import tarfile
|
||||
from tempfile import NamedTemporaryFile
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
|
||||
from pydantic import BaseModel
|
||||
from webdav3.client import Client as webdav3_Client
|
||||
@@ -54,6 +54,7 @@ class A_DataSync_Record(BaseModel, ABC):
|
||||
name: str
|
||||
rec_type: str
|
||||
value: str
|
||||
suffix: str
|
||||
|
||||
@abstractmethod
|
||||
def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None:
|
||||
@@ -85,6 +86,7 @@ class C_DataSync_Record_FS(A_DataSync_Record):
|
||||
"""Concrete DataSync Record class - FileSystem"""
|
||||
|
||||
rec_type: str = "fs"
|
||||
suffix: str = ".tar.gz"
|
||||
path: Optional[Path] = None
|
||||
|
||||
def model_post_init(self, __context) -> None:
|
||||
@@ -105,23 +107,23 @@ class I_DataSync(ABC):
|
||||
|
||||
@classmethod
|
||||
@final
|
||||
def get_manifest_data(cls) -> dict[Any, Any]:
|
||||
def get_manifest_data(cls) -> dict[str, Any]:
|
||||
"""utilitary method to get manifest"""
|
||||
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
|
||||
return json.load(f_DAB_manifest)
|
||||
|
||||
@classmethod
|
||||
@final
|
||||
def try_get_instance(cls, manifest) -> Self | None:
|
||||
def try_get_instance(cls, manifest: dict[str, Any]) -> Self | None:
|
||||
"""try to get an instance of a concrete class"""
|
||||
if cls.test_applicable(manifest):
|
||||
return cls(manifest)
|
||||
return None
|
||||
|
||||
def __init__(self, manifest: dict[Any, Any]) -> None:
|
||||
def __init__(self, manifest: dict[str, Any]) -> None:
|
||||
self.connected: bool = False
|
||||
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
|
||||
self.manifest: dict[Any, Any] = manifest
|
||||
self.manifest: dict[str, Any] = manifest
|
||||
self.app_id: UUID = UUID(manifest["APP_ID"])
|
||||
self.ar_datasync_record: list[A_DataSync_Record] = []
|
||||
if "FSSYNC_RECORD" in manifest["Args"]:
|
||||
@@ -135,8 +137,8 @@ class I_DataSync(ABC):
|
||||
self.ar_datasync_record.append(record)
|
||||
|
||||
@classmethod
|
||||
def test_applicable(cls, manifest: dict[Any, Any]) -> bool:
|
||||
"""check if a concrete class is applicable"""
|
||||
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
|
||||
"""check if a concrete class is applicable - generic"""
|
||||
del manifest # quality warning removal
|
||||
return False
|
||||
|
||||
@@ -161,18 +163,21 @@ class I_DataSync(ABC):
|
||||
def read_data(self) -> None:
|
||||
"""read data from the service"""
|
||||
self.connect()
|
||||
self._impl_read_data()
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
for datasync_record in self.ar_datasync_record:
|
||||
self._impl_read_data(Path(datasync_record.name + datasync_record.suffix), Path(tmpdir))
|
||||
|
||||
@abstractmethod
|
||||
def _impl_read_data(self) -> None:
|
||||
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
|
||||
"""read data from the service - virtual"""
|
||||
|
||||
def write_data(self) -> None:
|
||||
"""write data to the service"""
|
||||
self.connect()
|
||||
self._impl_wipe_data()
|
||||
for datasync_record in self.ar_datasync_record:
|
||||
try:
|
||||
with NamedTemporaryFile("wb", suffix=".tar.gz", delete=False) as tmp_file:
|
||||
with NamedTemporaryFile("wb", suffix=datasync_record.suffix, delete=False) as tmp_file:
|
||||
datasync_record.compress(self.compressor, tmp_file)
|
||||
tmp_file.seek(0)
|
||||
tmp_file.close()
|
||||
@@ -185,6 +190,15 @@ class I_DataSync(ABC):
|
||||
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
|
||||
"""write data to the service - virtual"""
|
||||
|
||||
def wipe_data(self) -> None:
|
||||
"""wipe data on the service"""
|
||||
self.connect()
|
||||
self._impl_wipe_data()
|
||||
|
||||
@abstractmethod
|
||||
def _impl_wipe_data(self) -> None:
|
||||
"""wipe data on the service - virtual"""
|
||||
|
||||
|
||||
class DataSync_Factory:
|
||||
"""DataSync Factory"""
|
||||
@@ -222,7 +236,8 @@ class C_DataSync_NextCloud(I_DataSync):
|
||||
self.connected: bool = False
|
||||
|
||||
@classmethod
|
||||
def test_applicable(cls, manifest) -> bool:
|
||||
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
|
||||
"""check if a concrete class is applicable - Nextcloud override"""
|
||||
if "Args" in manifest:
|
||||
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
|
||||
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
|
||||
@@ -231,7 +246,7 @@ class C_DataSync_NextCloud(I_DataSync):
|
||||
raise DataSyncException_InvalidManifest()
|
||||
|
||||
def _impl_configure(self) -> None:
|
||||
"""configure the class instance - concrete implementation"""
|
||||
"""configure the class instance - Nextcloud concrete implementation"""
|
||||
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
|
||||
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
|
||||
else:
|
||||
@@ -253,7 +268,7 @@ class C_DataSync_NextCloud(I_DataSync):
|
||||
raise DataSyncException_InvalidManifest()
|
||||
|
||||
def _impl_connect(self) -> None:
|
||||
"""connect to the remote service - concrete implementation"""
|
||||
"""connect to the remote service - Nextcloud concrete implementation"""
|
||||
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
|
||||
self.client = webdav3_Client(
|
||||
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
|
||||
@@ -267,14 +282,22 @@ class C_DataSync_NextCloud(I_DataSync):
|
||||
if not self.client.check(url_accumulator):
|
||||
self.client.mkdir(url_accumulator)
|
||||
|
||||
def _impl_read_data(self) -> None:
|
||||
"""read data from the remote service - concrete implementation"""
|
||||
def _impl_read_data(self, file_in: Path, file_out: Path) -> None:
|
||||
"""read data from the remote service - Nextcloud concrete implementation"""
|
||||
self._check_create_dir()
|
||||
self.client.download_sync(
|
||||
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
|
||||
)
|
||||
|
||||
def _impl_write_data(self, record_name: str, file_in: IO) -> None:
|
||||
"""write data to the remote service - concrete implementation"""
|
||||
"""write data to the remote service - Nextcloud concrete implementation"""
|
||||
self._check_create_dir()
|
||||
self.client.upload_sync(
|
||||
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
|
||||
local_path=file_in.name,
|
||||
)
|
||||
|
||||
def _impl_wipe_data(self) -> None:
|
||||
"""wipe data on the service - concrete implementation"""
|
||||
if self.client.check(self.nextcloud_path):
|
||||
self.client.clean(self.nextcloud_path)
|
||||
|
||||
@@ -8,8 +8,6 @@
|
||||
|
||||
import unittest
|
||||
from os import chdir
|
||||
from io import StringIO
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
from pathlib import Path
|
||||
import pprint
|
||||
|
||||
@@ -34,9 +32,9 @@ class TestDabDataSync(unittest.TestCase):
|
||||
datasync = dabdatasync.DataSync_Factory.get_DataSync()
|
||||
self.assertIsInstance(datasync, dabdatasync.I_DataSync)
|
||||
self.assertIsInstance(datasync, dabdatasync.C_DataSync_NextCloud)
|
||||
datasync.read_data()
|
||||
pprint.pprint(datasync.ar_datasync_record)
|
||||
datasync.write_data()
|
||||
datasync.read_data()
|
||||
|
||||
def test_load_empty(self):
|
||||
dabdatasync.I_DataSync.manifest_path = testdir_path / "test_manifest_empty.json"
|
||||
|
||||
Reference in New Issue
Block a user