diff --git a/src/dabdatasync/datasync.py b/src/dabdatasync/datasync.py index 0b3464d..cdeffac 100644 --- a/src/dabdatasync/datasync.py +++ b/src/dabdatasync/datasync.py @@ -2,7 +2,7 @@ import json from abc import ABC, abstractmethod -from typing import final, BinaryIO +from typing import final, TYPE_CHECKING, IO from typing import Self, Any, Set, Optional from uuid import UUID from pathlib import Path @@ -35,14 +35,14 @@ class A_DataSync_Compressor(ABC): """abstract compressor class""" @abstractmethod - def compress(self, path_in: Path, file_out: Path): + def compress(self, path_in: Path, file_out: IO): """compress method - virtual""" class DataSync_Compressor_targz(A_DataSync_Compressor): """Concrete compressor class - .tar.gz compressor""" - def compress(self, path_in: Path, file_out): + def compress(self, path_in: Path, file_out: IO): """compress method - .tar.gz concrete""" with tarfile.open(fileobj=file_out, mode="w:gz") as tar: tar.add(path_in, arcname=os.path.basename(path_in)) @@ -56,14 +56,14 @@ class A_DataSync_Record(BaseModel, ABC): value: str @abstractmethod - def compress(self, compressor: A_DataSync_Compressor, file_out: BinaryIO) -> None: + def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: """compress the record - virtual""" class DataSync_Record_Factory: """DataSync Record Factory""" - ar_cls_DataSync_Record: Set[A_DataSync_Record] = set() + ar_cls_DataSync_Record: Set[type[A_DataSync_Record]] = set() @classmethod def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None: @@ -74,7 +74,7 @@ class DataSync_Record_Factory: raise RuntimeError("No DataSync_Record concrete class found") @classmethod - def register(cls, _cls): + def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]: """decorator to register a concrete DataSync Record class""" cls.ar_cls_DataSync_Record.add(_cls) return _cls @@ -90,9 +90,11 @@ class C_DataSync_Record_FS(A_DataSync_Record): def model_post_init(self, __context) -> None: self.path = Path(self.value) - def compress(self, compressor: A_DataSync_Compressor, file_out: BinaryIO) -> None: + def compress(self, compressor: A_DataSync_Compressor, file_out: IO) -> None: """compress the DataSync Record - concrete FS implementation""" - compressor.compress(self.path, self.path, file_out) + if TYPE_CHECKING: + assert isinstance(self.path, Path) + compressor.compress(self.path, file_out) class I_DataSync(ABC): @@ -124,13 +126,13 @@ class I_DataSync(ABC): self.ar_datasync_record: list[A_DataSync_Record] = [] if "FSSYNC_RECORD" in manifest["Args"]: for record in manifest["Args"]["FSSYNC_RECORD"]["value"]: - self.ar_datasync_record.append( - DataSync_Record_Factory.get_C_DataSync_Record( - record["value"]["name"]["value"], - record["value"]["type"]["value"], - record["value"]["value"]["value"], - ) + record = DataSync_Record_Factory.get_C_DataSync_Record( + record["value"]["name"]["value"], + record["value"]["type"]["value"], + record["value"]["value"]["value"], ) + assert isinstance(record, A_DataSync_Record) + self.ar_datasync_record.append(record) @classmethod def test_applicable(cls, manifest: dict[Any, Any]) -> bool: @@ -171,7 +173,7 @@ class I_DataSync(ABC): for datasync_record in self.ar_datasync_record: try: with NamedTemporaryFile("wb", suffix=".tar.gz", delete=False) as tmp_file: - datasync_record.compress(type(self).cls_compressor, tmp_file) + datasync_record.compress(self.compressor, tmp_file) tmp_file.seek(0) tmp_file.close() @@ -180,14 +182,14 @@ class I_DataSync(ABC): os.unlink(tmp_file.name) @abstractmethod - def _impl_write_data(self, record_name: str, file_in: BinaryIO) -> None: + def _impl_write_data(self, record_name: str, file_in: IO) -> None: """write data to the service - virtual""" class DataSync_Factory: """DataSync Factory""" - ar_cls_DataSync: Set[I_DataSync] = set() + ar_cls_DataSync: Set[type[I_DataSync]] = set() @classmethod def get_DataSync(cls) -> I_DataSync | None: @@ -200,7 +202,7 @@ class DataSync_Factory: return None @classmethod - def register(cls, _cls): + def register(cls, _cls: type[I_DataSync]) -> type[I_DataSync]: """decorator to register a concrete class to the factory""" cls.ar_cls_DataSync.add(_cls) return _cls @@ -228,7 +230,7 @@ class C_DataSync_NextCloud(I_DataSync): return False raise DataSyncException_InvalidManifest() - def _impl_configure(self): + def _impl_configure(self) -> None: """configure the class instance - concrete implementation""" if "FSSync_NextCloud_Address" in self.manifest["Args"]: self.nextcloud_address = self.manifest["Args"]["FSSync_NextCloud_Address"]["value"] @@ -250,14 +252,14 @@ class C_DataSync_NextCloud(I_DataSync): else: raise DataSyncException_InvalidManifest() - def _impl_connect(self): + def _impl_connect(self) -> None: """connect to the remote service - concrete implementation""" full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user) self.client = webdav3_Client( {"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password} ) - def _check_create_dir(self): + def _check_create_dir(self) -> None: """check and create directory in remote service""" url_accumulator: str = "" for url_part in self.nextcloud_path.split("/"): @@ -265,11 +267,11 @@ class C_DataSync_NextCloud(I_DataSync): if not self.client.check(url_accumulator): self.client.mkdir(url_accumulator) - def _impl_read_data(self): + def _impl_read_data(self) -> None: """read data from the remote service - concrete implementation""" self._check_create_dir() - def _impl_write_data(self, record_name: str, file_in: BinaryIO): + def _impl_write_data(self, record_name: str, file_in: IO) -> None: """write data to the remote service - concrete implementation""" self._check_create_dir() self.client.upload_sync(