improve quality

This commit is contained in:
cclecle
2024-03-27 02:17:08 +00:00
parent b56a61b796
commit 81ba24d6f3

View File

@@ -1,22 +1,25 @@
"""Main datasync class"""
import json
from abc import ABC, abstractmethod
from typing import final, BinaryIO
from typing import Self, Any, Set, Optional
from webdav3.client import Client as webdav3_Client
from uuid import UUID
from pathlib import Path
import os
from pydantic import BaseModel
import tarfile
from tempfile import NamedTemporaryFile
from pydantic import BaseModel
from webdav3.client import Client as webdav3_Client
class DataSyncException(Exception):
pass
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
pass
"""specific datasync exception class - Dab appliance manifest not found"""
def urljoin(*args):
@@ -28,33 +31,43 @@ def urljoin(*args):
return "/".join(map(lambda x: str(x).rstrip("/"), args))
class I_DataSync_Compressor(ABC):
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
@abstractmethod
def compress(self, path_in: Path, file_out: Path):
pass
"""compress method - virtual"""
class DataSync_Compressor_targz(I_DataSync_Compressor):
class DataSync_Compressor_targz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
def compress(self, path_in: Path, file_out):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
class A_DataSync_Record(BaseModel):
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, compressor: A_DataSync_Compressor, file_out: BinaryIO) -> None:
"""compress the record - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: Set[A_DataSync_Record] = set()
@classmethod
def _register_C_DataSync_Record(cls, cls_DataSync_Record):
cls.ar_cls_DataSync_Record.add(cls_DataSync_Record)
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
@@ -62,42 +75,50 @@ class DataSync_Record_Factory:
@classmethod
def register(cls, _cls):
cls._register_C_DataSync_Record(_cls)
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, compressor: I_DataSync_Compressor, file_out: BinaryIO) -> None:
def compress(self, compressor: A_DataSync_Compressor, file_out: BinaryIO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
compressor.compress(self.path, self.path, file_out)
class I_DataSync(ABC):
"""Abstract DataSync class"""
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type(I_DataSync_Compressor) = DataSync_Compressor_targz
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor_targz
@classmethod
@final
def get_manifest_data(cls) -> dict[Any, Any]:
with open(cls.manifest_path) as f_DAB_manifest:
"""utilitary method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
@final
def try_get_instance(cls, manifest) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest)
return None
def __init__(self, manifest: dict[Any, Any]) -> None:
self.connected: bool = False
self.compressor: I_DataSync_Compressor = type(self).cls_compressor()
self.compressor: A_DataSync_Compressor = type(self).cls_compressor()
self.manifest: dict[Any, Any] = manifest
self.app_id: UUID = UUID(manifest["APP_ID"])
self.ar_datasync_record: list[A_DataSync_Record] = []
@@ -113,33 +134,39 @@ class I_DataSync(ABC):
@classmethod
def test_applicable(cls, manifest: dict[Any, Any]) -> bool:
"""check if a concrete class is applicable"""
del manifest # quality warning removal
return False
def configure(self) -> None:
"""configure the class instance"""
self._impl_configure()
@abstractmethod
def _impl_configure(self) -> None:
pass
"""configure the class instance - virtual"""
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
self._impl_connect()
self.connected = True
@abstractmethod
def _impl_connect(self) -> None:
pass
"""connect to the service - virtual"""
def read_data(self) -> None:
"""read data from the service"""
self.connect()
self._impl_read_data()
@abstractmethod
def _impl_read_data(self) -> None:
pass
"""read data from the service - virtual"""
def write_data(self) -> None:
"""write data to the service"""
self.connect()
for datasync_record in self.ar_datasync_record:
try:
@@ -154,18 +181,17 @@ class I_DataSync(ABC):
@abstractmethod
def _impl_write_data(self, record_name: str, file_in: BinaryIO) -> None:
pass
"""write data to the service - virtual"""
class DataSync_Factory:
"""DataSync Factory"""
ar_cls_DataSync: Set[I_DataSync] = set()
@classmethod
def _register_C_DataSync(cls, cls_DataSync):
cls.ar_cls_DataSync.add(cls_DataSync)
@classmethod
def get_DataSync(cls) -> I_DataSync | None:
"""get and configure a DataSync Concrete class instance"""
manifest = I_DataSync.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest):
@@ -175,12 +201,15 @@ class DataSync_Factory:
@classmethod
def register(cls, _cls):
cls._register_C_DataSync(_cls)
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@DataSync_Factory.register
class C_DataSync_NextCloud(I_DataSync):
"""Concrete DataSync class - Nextcloud"""
def __init__(self, manifest: dict[Any, Any]) -> None:
super().__init__(manifest)
self.nextcloud_address: str
@@ -197,11 +226,10 @@ class C_DataSync_NextCloud(I_DataSync):
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
else:
raise DataSyncException_InvalidManifest()
return True
raise DataSyncException_InvalidManifest()
def _impl_configure(self):
"""configure the class instance - concrete implementation"""
if "FSSync_NextCloud_Address" in self.manifest["Args"]:
self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
@@ -223,12 +251,14 @@ class C_DataSync_NextCloud(I_DataSync):
raise DataSyncException_InvalidManifest()
def _impl_connect(self):
"""connect to the remote service - concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self):
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
@@ -236,9 +266,11 @@ class C_DataSync_NextCloud(I_DataSync):
self.client.mkdir(url_accumulator)
def _impl_read_data(self):
"""read data from the remote service - concrete implementation"""
self._check_create_dir()
def _impl_write_data(self, record_name: str, file_in: BinaryIO):
"""write data to the remote service - concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),