Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 636316650e | |||
|
|
2d753205ca | ||
|
|
872dab1ed9 | ||
|
|
96d8f8fdd7 | ||
|
|
0ff9f442d2 | ||
|
|
9f4f2bbba1 | ||
| bfcf65b6e7 | |||
|
|
5fca93e4a8 | ||
|
|
79518946ac | ||
|
|
6aea5311bb | ||
|
|
b4292a6f57 | ||
|
|
38031deba2 | ||
|
|
abe800814d | ||
|
|
81ba24d6f3 | ||
|
|
b56a61b796 | ||
| f39fd9cf44 | |||
|
|
f1a748c09f | ||
|
|
dd45b1281f |
2
.project
2
.project
@@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>{{project_name}}</name>
|
||||
<name>dabdatasync</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
eclipse.preferences.version=1
|
||||
encoding//src/dabdatasync/__main__.py=utf-8
|
||||
encoding/<project>=UTF-8
|
||||
|
||||
@@ -34,7 +34,12 @@ classifiers = [
|
||||
]
|
||||
dependencies = [
|
||||
'importlib-metadata; python_version<"3.9"',
|
||||
'packaging'
|
||||
'packaging',
|
||||
'webdavclient3==3.14.*',
|
||||
'pydantic==2.*',
|
||||
'typed-argument-parser==1.*',
|
||||
'loguru==0.7.*',
|
||||
'lz4'
|
||||
]
|
||||
dynamic = ["version"]
|
||||
|
||||
@@ -49,9 +54,21 @@ where = ["src"]
|
||||
"dabdatasync.data" = ["*.*"]
|
||||
"dabdatasync" = ["py.typed"]
|
||||
|
||||
# [[tool.mypy.overrides]]
|
||||
# module = ""
|
||||
# ignore_missing_imports = true
|
||||
[[tool.mypy.overrides]]
|
||||
module = "webdav3.client"
|
||||
ignore_missing_imports = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "webdav3.exceptions"
|
||||
ignore_missing_imports = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "lz4"
|
||||
ignore_missing_imports = true
|
||||
|
||||
[[tool.mypy.overrides]]
|
||||
module = "lz4.frame"
|
||||
ignore_missing_imports = true
|
||||
|
||||
[tool.coverage.run]
|
||||
cover_pylib = false
|
||||
@@ -63,19 +80,24 @@ concurrency = [
|
||||
'thread'
|
||||
]
|
||||
|
||||
[tool.coverage.report]
|
||||
exclude_also = [
|
||||
"if TYPE_CHECKING:",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
Homepage = "https://chacha.ddns.net/gitea/chacha/dabdatasync"
|
||||
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/latest/"
|
||||
Tracker = "https://chacha.ddns.net/gitea/chacha/dabdatasync/issues"
|
||||
|
||||
[project.optional-dependencies]
|
||||
test = ["chacha_cicd_helper"]
|
||||
test = ["chacha_cicd_helper","contexttimer"]
|
||||
coverage-check = ["chacha_cicd_helper"]
|
||||
complexity-check = ["chacha_cicd_helper"]
|
||||
quality-check = ["chacha_cicd_helper"]
|
||||
type-check = ["chacha_cicd_helper"]
|
||||
doc-gen = ["chacha_cicd_helper"]
|
||||
|
||||
# [project.scripts]
|
||||
# my-script = "my_package.module:function"
|
||||
[project.scripts]
|
||||
dabdatasync = "dabdatasync.__main__:CLI"
|
||||
|
||||
|
||||
@@ -11,4 +11,18 @@ Main module __init__ file.
|
||||
"""
|
||||
|
||||
from .__metadata__ import __version__, __Summuary__, __Name__
|
||||
from .test_module import test_function
|
||||
from .datasync import A_DataSync, DataSync_Factory
|
||||
from .datasync_nextcloud import C_DataSync_NextCloud
|
||||
from .exceptions import (
|
||||
DataSyncException,
|
||||
DataSyncException_InvalidManifest,
|
||||
DataSyncException_NoConcreteRecordClassFound,
|
||||
DataSyncException_RemoteDataNotFound,
|
||||
DataSyncException_NoValidServiceFound,
|
||||
DataSyncException_ServiceNotFound,
|
||||
DataSyncException_TooManyServiceFound,
|
||||
DataSyncException_CompressorNotFound,
|
||||
DataSyncException_TooManyCompressorFound,
|
||||
)
|
||||
|
||||
from .compressors import DataSync_Compressors
|
||||
|
||||
155
src/dabdatasync/__main__.py
Normal file
155
src/dabdatasync/__main__.py
Normal file
@@ -0,0 +1,155 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""CLI interface module"""
|
||||
from __future__ import annotations
|
||||
from typing import cast, Union, Optional
|
||||
|
||||
import sys
|
||||
from tap import Tap
|
||||
from loguru import logger
|
||||
|
||||
from . import __Summuary__, __Name__
|
||||
from . import datasync
|
||||
from . import exceptions
|
||||
|
||||
|
||||
class dabdatasync_args_GetServices(Tap):
|
||||
"""GetServices CLI arg subparser"""
|
||||
|
||||
|
||||
class dabdatasync_args_WipeLocalData(Tap):
|
||||
"""WipeLocalData CLI arg subparser"""
|
||||
|
||||
|
||||
class dabdatasync_args_service_abstract(Tap):
|
||||
"""service abstract CLI arg subparser"""
|
||||
|
||||
service: Optional[str] = None
|
||||
|
||||
def configure(self) -> None:
|
||||
self.add_argument("--service")
|
||||
|
||||
|
||||
class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract):
|
||||
"""service compressor abstract CLI arg subparser"""
|
||||
|
||||
compressor: Optional[str] = None
|
||||
|
||||
def configure(self) -> None:
|
||||
super().configure()
|
||||
self.add_argument("--compressor")
|
||||
|
||||
|
||||
class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract):
|
||||
"""PullData CLI arg subparser"""
|
||||
|
||||
|
||||
class dabdatasync_args_PushData(dabdatasync_args_service_compress_abstract):
|
||||
"""PushData CLI arg subparser"""
|
||||
|
||||
|
||||
class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract):
|
||||
"""WipeRemoteData CLI arg subparser"""
|
||||
|
||||
|
||||
class dabdatasync_args(Tap):
|
||||
"""Main CLI arg parser"""
|
||||
|
||||
verbosity: int = 0
|
||||
|
||||
def configure(self) -> None:
|
||||
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
|
||||
|
||||
self.add_subparsers(dest="command", help="command type", required=True)
|
||||
self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list")
|
||||
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
|
||||
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
|
||||
self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data")
|
||||
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe remote service data")
|
||||
|
||||
def process_args(self) -> None:
|
||||
"""dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy"""
|
||||
self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init
|
||||
|
||||
|
||||
def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,too-complex
|
||||
"""CLI main function"""
|
||||
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
|
||||
|
||||
args: dabdatasync_args = parser.parse_args(i_args)
|
||||
|
||||
logger.remove()
|
||||
if args.verbosity:
|
||||
if args.verbosity == 1:
|
||||
logger.add(sys.stdout, level="WARNING")
|
||||
elif args.verbosity == 2:
|
||||
logger.add(sys.stdout, level="INFO")
|
||||
else:
|
||||
logger.add(sys.stdout, level="DEBUG")
|
||||
else:
|
||||
logger.add(sys.stdout, level="ERROR")
|
||||
logger.add(sys.stderr, level="ERROR")
|
||||
|
||||
dabdatasync = datasync.DataSync_Factory.get_DataSync()
|
||||
if len(dabdatasync) == 0:
|
||||
raise exceptions.DataSyncException_NoValidServiceFound("No valid service found")
|
||||
|
||||
if args.command == "GetServices":
|
||||
for service in dabdatasync:
|
||||
print(service.service_name)
|
||||
return
|
||||
|
||||
if args.command == "WipeLocalData":
|
||||
dabdatasync[0].wipe_local_data()
|
||||
return
|
||||
|
||||
selected_dabdatasync: datasync.A_DataSync
|
||||
if args.command in ["PullData", "PushData", "WipeRemoteData"]:
|
||||
requested_service = cast(dabdatasync_args_service_abstract, args).service # pylint: disable=no-member
|
||||
if requested_service:
|
||||
services = [_ for _ in dabdatasync if type(_).service_name == requested_service]
|
||||
if len(services) == 0:
|
||||
raise exceptions.DataSyncException_ServiceNotFound()
|
||||
if len(services) == 1:
|
||||
selected_dabdatasync = services[0]
|
||||
else:
|
||||
raise exceptions.DataSyncException_TooManyServiceFound()
|
||||
else:
|
||||
selected_dabdatasync = dabdatasync[0]
|
||||
|
||||
if args.command in ["PullData", "PushData"]:
|
||||
compressor = cast(dabdatasync_args_service_compress_abstract, args).compressor # pylint: disable=no-member
|
||||
if compressor:
|
||||
selected_dabdatasync.set_compressor(compressor)
|
||||
|
||||
if args.command == "PullData":
|
||||
selected_dabdatasync.pull_data()
|
||||
return
|
||||
|
||||
if args.command == "PushData":
|
||||
selected_dabdatasync.push_data()
|
||||
return
|
||||
|
||||
if args.command == "WipeRemoteData":
|
||||
selected_dabdatasync.wipe_remote_data()
|
||||
return
|
||||
|
||||
raise RuntimeError("Invalid argument")
|
||||
|
||||
|
||||
def CLI():
|
||||
"""wrapper for .toml declared script"""
|
||||
fct_main(sys.argv)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fct_main(sys.argv[1:])
|
||||
123
src/dabdatasync/compressors.py
Normal file
123
src/dabdatasync/compressors.py
Normal file
@@ -0,0 +1,123 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
Compressor interface and implementation
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
import os
|
||||
from io import BytesIO
|
||||
from typing import TYPE_CHECKING
|
||||
import tarfile
|
||||
import lz4.frame
|
||||
from loguru import logger
|
||||
|
||||
from .exceptions import DataSyncException_CompressorNotFound, DataSyncException_TooManyCompressorFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import IO
|
||||
|
||||
|
||||
class A_DataSync_Compressor(ABC):
|
||||
"""abstract compressor class"""
|
||||
|
||||
suffix: str
|
||||
compressor_name: str = "ABSTRACT"
|
||||
|
||||
@classmethod
|
||||
def compress(cls, path_in: Path, file_out: IO):
|
||||
"""compress method"""
|
||||
logger.debug(f"compressing <{path_in}> data using <{cls.compressor_name}> to <{file_out.name}>")
|
||||
cls._impl_compress(path_in, file_out)
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def _impl_compress(cls, path_in: Path, file_out: IO):
|
||||
"""compress method - virtual"""
|
||||
|
||||
@classmethod
|
||||
def uncompress(cls, path_in: Path, path_out: Path):
|
||||
"""uncompress method"""
|
||||
logger.debug(f"uncompressing <{path_in}> data using <{cls.compressor_name}> to <{path_out}>")
|
||||
cls._impl_uncompress(path_in, path_out)
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def _impl_uncompress(cls, path_in: Path, path_out: Path):
|
||||
"""uncompress method - virtual"""
|
||||
|
||||
|
||||
class DataSync_Compressors:
|
||||
"""compressers container/factory class"""
|
||||
|
||||
_availables: list[type[A_DataSync_Compressor]] = []
|
||||
|
||||
@classmethod
|
||||
def register(cls, _cls: type[A_DataSync_Compressor]) -> type[A_DataSync_Compressor]:
|
||||
"""register a new compressor"""
|
||||
cls._availables.append(_cls)
|
||||
return _cls
|
||||
|
||||
@classmethod
|
||||
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
|
||||
"""get a specific compressor"""
|
||||
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
|
||||
if len(found) == 0:
|
||||
raise DataSyncException_CompressorNotFound()
|
||||
if len(found) == 1:
|
||||
return found[0]
|
||||
raise DataSyncException_TooManyCompressorFound()
|
||||
|
||||
|
||||
@DataSync_Compressors.register
|
||||
class DataSync_Compressor__tar_gz(A_DataSync_Compressor):
|
||||
"""Concrete compressor class - .tar.gz compressor"""
|
||||
|
||||
suffix: str = ".tar.gz"
|
||||
compressor_name: str = "tar_gz"
|
||||
|
||||
@classmethod
|
||||
def _impl_compress(cls, path_in: Path, file_out: IO):
|
||||
"""compress method - .tar.gz concrete"""
|
||||
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
|
||||
tar.add(path_in, arcname=os.path.basename(path_in))
|
||||
|
||||
@classmethod
|
||||
def _impl_uncompress(cls, path_in: Path, path_out: Path):
|
||||
"""uncompressing method - .tar.gz concrete"""
|
||||
with tarfile.open(path_in, mode="r:gz") as tar:
|
||||
tar.extractall(path_out)
|
||||
|
||||
|
||||
@DataSync_Compressors.register
|
||||
class DataSync_Compressor__tar_lz4(A_DataSync_Compressor):
|
||||
"""Concrete compressor class - .tar.lz4 compressor"""
|
||||
|
||||
suffix: str = ".tar.lz4"
|
||||
compressor_name: str = "tar_lz4"
|
||||
|
||||
@classmethod
|
||||
def _impl_compress(cls, path_in: Path, file_out: IO):
|
||||
"""compress method - .tar.lz4 concrete"""
|
||||
with BytesIO() as memBuff:
|
||||
with tarfile.open(fileobj=memBuff, mode="w:") as tar:
|
||||
tar.add(path_in, arcname=os.path.basename(path_in))
|
||||
memBuff.seek(0)
|
||||
file_out.write(lz4.frame.compress(memBuff.read()))
|
||||
|
||||
@classmethod
|
||||
def _impl_uncompress(cls, path_in: Path, path_out: Path):
|
||||
"""uncompressing method - .tar.lz4 concrete"""
|
||||
with open(path_in, "rb") as file_in, BytesIO() as memBuff:
|
||||
memBuff.write(lz4.frame.decompress(file_in.read()))
|
||||
memBuff.seek(0)
|
||||
with tarfile.open(fileobj=memBuff, mode="r:") as tar:
|
||||
tar.extractall(path_out)
|
||||
180
src/dabdatasync/datasync.py
Normal file
180
src/dabdatasync/datasync.py
Normal file
@@ -0,0 +1,180 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
Nextcloud abstract interface
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from abc import ABC, abstractmethod
|
||||
from uuid import UUID
|
||||
from pathlib import Path
|
||||
import os
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
import json
|
||||
from typing import final, TYPE_CHECKING
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from .records import A_DataSync_Record, DataSync_Record_Factory
|
||||
from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors
|
||||
from .exceptions import DataSyncException_RemoteDataNotFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional, IO, Any, Self
|
||||
from .compressors import A_DataSync_Compressor
|
||||
|
||||
|
||||
class A_DataSync(ABC):
|
||||
"""Abstract DataSync class"""
|
||||
|
||||
service_name: str = "ABSTRACT"
|
||||
|
||||
@classmethod
|
||||
@final
|
||||
def try_get_instance(cls, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> Self | None:
|
||||
"""try to get an instance of a concrete class"""
|
||||
if cls.test_applicable(manifest):
|
||||
return cls(manifest, cls_compressor)
|
||||
return None
|
||||
|
||||
def __init__(self, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
|
||||
self.connected: bool = False
|
||||
self._cls_compressor: type[A_DataSync_Compressor] = cls_compressor
|
||||
self._manifest: dict[str, Any] = manifest
|
||||
self._app_id: UUID = UUID(manifest["APP_ID"])
|
||||
self._ar_datasync_record: list[A_DataSync_Record] = []
|
||||
if "FSSYNC_RECORD" in manifest["Args"]:
|
||||
for record in manifest["Args"]["FSSYNC_RECORD"]["value"]:
|
||||
record = DataSync_Record_Factory.get_C_DataSync_Record(
|
||||
record["value"]["name"]["value"],
|
||||
record["value"]["type"]["value"],
|
||||
record["value"]["value"]["value"],
|
||||
)
|
||||
assert isinstance(record, A_DataSync_Record)
|
||||
self._ar_datasync_record.append(record)
|
||||
|
||||
def set_compressor(self, compressor_name: str) -> None:
|
||||
"""set compressor to be used"""
|
||||
self._cls_compressor = DataSync_Compressors.get(compressor_name)
|
||||
|
||||
def get_datasync_records(self) -> list[A_DataSync_Record]:
|
||||
"""get list of records"""
|
||||
return self._ar_datasync_record
|
||||
|
||||
@classmethod
|
||||
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
|
||||
"""check if a concrete class is applicable - generic"""
|
||||
del manifest # quality warning removal
|
||||
return False
|
||||
|
||||
def configure(self) -> None:
|
||||
"""configure the class instance"""
|
||||
self._impl_configure()
|
||||
|
||||
@abstractmethod
|
||||
def _impl_configure(self) -> None:
|
||||
"""configure the class instance - virtual"""
|
||||
|
||||
def connect(self) -> None:
|
||||
"""connect to the service"""
|
||||
if not self.connected:
|
||||
logger.info(f"connection to service <{self.service_name}>")
|
||||
self._impl_connect()
|
||||
self.connected = True
|
||||
logger.info("connection done")
|
||||
|
||||
@abstractmethod
|
||||
def _impl_connect(self) -> None:
|
||||
"""connect to the service - virtual"""
|
||||
|
||||
def pull_data(self) -> None:
|
||||
"""pull data from the service"""
|
||||
logger.info(f"pulling data from service <{self.service_name}>")
|
||||
self.connect()
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
for datasync_record in self._ar_datasync_record:
|
||||
logger.info(f"pulling record <{datasync_record.name}>")
|
||||
try:
|
||||
self._impl_pull_data(Path(datasync_record.name + self._cls_compressor.suffix), Path(tmpdir))
|
||||
datasync_record.uncompress(self._cls_compressor, Path(tmpdir) / (datasync_record.name + self._cls_compressor.suffix))
|
||||
except DataSyncException_RemoteDataNotFound:
|
||||
logger.warning(f"remote record file not found <{datasync_record.name}>")
|
||||
logger.info("done")
|
||||
|
||||
@abstractmethod
|
||||
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
|
||||
"""pull data from the service - virtual"""
|
||||
|
||||
def push_data(self) -> None:
|
||||
"""push data to the service"""
|
||||
logger.info(f"pushing data to service <{self.service_name}>")
|
||||
self.connect()
|
||||
self._impl_wipe_remote_data()
|
||||
for datasync_record in self._ar_datasync_record:
|
||||
logger.info(f"pushing record <{datasync_record.name}>")
|
||||
try:
|
||||
with NamedTemporaryFile("wb", suffix=self._cls_compressor.suffix, delete=False) as tmp_file:
|
||||
datasync_record.compress(self._cls_compressor, tmp_file)
|
||||
tmp_file.seek(0)
|
||||
tmp_file.close()
|
||||
|
||||
self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
|
||||
finally:
|
||||
os.unlink(tmp_file.name)
|
||||
logger.info("done")
|
||||
|
||||
@abstractmethod
|
||||
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
|
||||
"""push data to the service - virtual"""
|
||||
|
||||
def wipe_remote_data(self) -> None:
|
||||
"""wipe data on the service"""
|
||||
logger.info(f"wiping remote data on service <{self.service_name}>")
|
||||
self.connect()
|
||||
self._impl_wipe_remote_data()
|
||||
|
||||
def wipe_local_data(self) -> None:
|
||||
"""wipe local data"""
|
||||
logger.info("wiping local data")
|
||||
for datasync_record in self._ar_datasync_record:
|
||||
datasync_record.wipe()
|
||||
|
||||
@abstractmethod
|
||||
def _impl_wipe_remote_data(self) -> None:
|
||||
"""wipe data on the service - virtual"""
|
||||
|
||||
|
||||
class DataSync_Factory:
|
||||
"""DataSync Factory"""
|
||||
|
||||
ar_cls_DataSync: set[type[A_DataSync]] = set()
|
||||
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
|
||||
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor__tar_gz
|
||||
|
||||
@classmethod
|
||||
def get_manifest_data(cls) -> dict[str, Any]:
|
||||
"""tool method to get manifest"""
|
||||
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
|
||||
return json.load(f_DAB_manifest)
|
||||
|
||||
@classmethod
|
||||
def get_DataSync(cls) -> list[A_DataSync]:
|
||||
"""get and configure a DataSync Concrete class instance"""
|
||||
ar_datasync: list[A_DataSync] = []
|
||||
manifest = cls.get_manifest_data()
|
||||
for cls_DataSync in cls.ar_cls_DataSync:
|
||||
if res := cls_DataSync.try_get_instance(manifest, cls.cls_compressor):
|
||||
res.configure()
|
||||
ar_datasync.append(res)
|
||||
return ar_datasync
|
||||
|
||||
@classmethod
|
||||
def register(cls, _cls: type[A_DataSync]) -> type[A_DataSync]:
|
||||
"""decorator to register a concrete class to the factory"""
|
||||
cls.ar_cls_DataSync.add(_cls)
|
||||
return _cls
|
||||
113
src/dabdatasync/datasync_nextcloud.py
Normal file
113
src/dabdatasync/datasync_nextcloud.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
Nextcloud datasync implementation
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from pathlib import Path
|
||||
import os
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
|
||||
from webdav3.client import Client as webdav3_Client
|
||||
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
|
||||
|
||||
from .datasync import A_DataSync, DataSync_Factory
|
||||
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
|
||||
from .utils import urljoin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, IO
|
||||
from .compressors import A_DataSync_Compressor
|
||||
|
||||
|
||||
@DataSync_Factory.register
|
||||
class C_DataSync_NextCloud(A_DataSync):
|
||||
"""Concrete DataSync class - Nextcloud"""
|
||||
|
||||
service_name: str = "Nextcloud"
|
||||
|
||||
def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
|
||||
super().__init__(manifest, cls_compressor)
|
||||
self.nextcloud_address: str
|
||||
self.nextcloud_user: str
|
||||
self.nextcloud_password: str
|
||||
self.nextcloud_path: str
|
||||
self.client: webdav3_Client
|
||||
self.connected: bool = False
|
||||
|
||||
@classmethod
|
||||
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
|
||||
"""check if a concrete class is applicable - Nextcloud override"""
|
||||
if "Args" in manifest:
|
||||
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
|
||||
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
|
||||
return True
|
||||
return False
|
||||
raise DataSyncException_InvalidManifest()
|
||||
|
||||
def _impl_configure(self) -> None:
|
||||
"""configure the class instance - Nextcloud concrete implementation"""
|
||||
if "FSSync_NextCloud_Address" in self._manifest["Args"]:
|
||||
self.nextcloud_address = self._manifest["Args"]["FSSync_NextCloud_Address"]["value"]
|
||||
else:
|
||||
raise DataSyncException_InvalidManifest()
|
||||
if "FSSync_NextCloud_User" in self._manifest["Args"]:
|
||||
self.nextcloud_user = self._manifest["Args"]["FSSync_NextCloud_User"]["value"]
|
||||
else:
|
||||
raise DataSyncException_InvalidManifest()
|
||||
if "FSSync_NextCloud_Password" in self._manifest["Args"]:
|
||||
self.nextcloud_password = self._manifest["Args"]["FSSync_NextCloud_Password"]["value"]
|
||||
else:
|
||||
raise DataSyncException_InvalidManifest()
|
||||
if "FSSync_NextCloud_Path" in self._manifest["Args"]:
|
||||
self.nextcloud_path = str(Path(self._manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self._app_id))).replace(
|
||||
os.sep, "/"
|
||||
)
|
||||
|
||||
else:
|
||||
raise DataSyncException_InvalidManifest()
|
||||
|
||||
def _impl_connect(self) -> None:
|
||||
"""connect to the remote service - Nextcloud concrete implementation"""
|
||||
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
|
||||
self.client = webdav3_Client(
|
||||
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
|
||||
)
|
||||
|
||||
def _check_create_dir(self) -> None:
|
||||
"""check and create directory in remote service"""
|
||||
url_accumulator: str = ""
|
||||
for url_part in self.nextcloud_path.split("/"):
|
||||
url_accumulator += "/" + url_part
|
||||
if not self.client.check(url_accumulator):
|
||||
self.client.mkdir(url_accumulator)
|
||||
|
||||
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
|
||||
"""pull data from the remote service - Nextcloud concrete implementation"""
|
||||
self._check_create_dir()
|
||||
try:
|
||||
self.client.download_sync(
|
||||
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
|
||||
)
|
||||
except webdav3_RemoteResourceNotFound as exc:
|
||||
raise DataSyncException_RemoteDataNotFound from exc
|
||||
|
||||
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
|
||||
"""push data to the remote service - Nextcloud concrete implementation"""
|
||||
self._check_create_dir()
|
||||
self.client.upload_sync(
|
||||
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
|
||||
local_path=file_in.name,
|
||||
)
|
||||
|
||||
def _impl_wipe_remote_data(self) -> None:
|
||||
"""wipe data on the service - concrete implementation"""
|
||||
if self.client.check(self.nextcloud_path):
|
||||
self.client.clean(self.nextcloud_path)
|
||||
47
src/dabdatasync/exceptions.py
Normal file
47
src/dabdatasync/exceptions.py
Normal file
@@ -0,0 +1,47 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
Exception declaration
|
||||
"""
|
||||
|
||||
|
||||
class DataSyncException(Exception):
|
||||
"""generic datasync exception class"""
|
||||
|
||||
|
||||
class DataSyncException_InvalidManifest(DataSyncException):
|
||||
"""specific datasync exception class - Dab appliance manifest not found"""
|
||||
|
||||
|
||||
class DataSyncException_NoConcreteRecordClassFound(DataSyncException):
|
||||
"""specific datasync exception class - No concrete Record Class Found"""
|
||||
|
||||
|
||||
class DataSyncException_RemoteDataNotFound(DataSyncException):
|
||||
"""specific datasync exception class - Remote Data Not Found"""
|
||||
|
||||
|
||||
class DataSyncException_NoValidServiceFound(DataSyncException):
|
||||
"""specific datasync exception class - Remote Valid Service Found"""
|
||||
|
||||
|
||||
class DataSyncException_ServiceNotFound(DataSyncException):
|
||||
"""specific datasync exception class - Service Not Found"""
|
||||
|
||||
|
||||
class DataSyncException_TooManyServiceFound(DataSyncException):
|
||||
"""specific datasync exception class - Too Many Service Found"""
|
||||
|
||||
|
||||
class DataSyncException_CompressorNotFound(DataSyncException):
|
||||
"""specific datasync exception class - Compressor Not Found"""
|
||||
|
||||
|
||||
class DataSyncException_TooManyCompressorFound(DataSyncException):
|
||||
"""specific datasync exception class - Too Many Compressor Found"""
|
||||
99
src/dabdatasync/records.py
Normal file
99
src/dabdatasync/records.py
Normal file
@@ -0,0 +1,99 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
datasync records description and implementation
|
||||
"""
|
||||
from __future__ import annotations
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
import os
|
||||
import shutil
|
||||
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .exceptions import DataSyncException_NoConcreteRecordClassFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .compressors import A_DataSync_Compressor
|
||||
from typing import IO
|
||||
|
||||
|
||||
class A_DataSync_Record(BaseModel, ABC):
|
||||
"""Abstract DataSync Record class"""
|
||||
|
||||
name: str
|
||||
rec_type: str
|
||||
value: str
|
||||
|
||||
@abstractmethod
|
||||
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
|
||||
"""compress the DataSync Record - virtual"""
|
||||
|
||||
@abstractmethod
|
||||
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
|
||||
"""uncompress the DataSync record - virtual"""
|
||||
|
||||
@abstractmethod
|
||||
def wipe(self):
|
||||
"""wipe the record local data - virtual"""
|
||||
|
||||
|
||||
class DataSync_Record_Factory:
|
||||
"""DataSync Record Factory"""
|
||||
|
||||
ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set()
|
||||
|
||||
@classmethod
|
||||
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
|
||||
"""get a concrete DataSync Record class instance"""
|
||||
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
|
||||
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
|
||||
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
|
||||
raise DataSyncException_NoConcreteRecordClassFound()
|
||||
|
||||
@classmethod
|
||||
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
|
||||
"""decorator to register a concrete DataSync Record class"""
|
||||
cls.ar_cls_DataSync_Record.add(_cls)
|
||||
return _cls
|
||||
|
||||
|
||||
@DataSync_Record_Factory.register
|
||||
class C_DataSync_Record_FS(A_DataSync_Record):
|
||||
"""Concrete DataSync Record class - FileSystem"""
|
||||
|
||||
rec_type: str = "fs"
|
||||
path: Optional[Path] = None
|
||||
|
||||
def model_post_init(self, __context) -> None:
|
||||
self.path = Path(self.value)
|
||||
|
||||
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
|
||||
"""compress the DataSync Record - concrete FS implementation"""
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(self.path, Path)
|
||||
cls_compressor.compress(self.path, file_out)
|
||||
|
||||
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
|
||||
"""uncompress the DataSync record - concrete FS implementation"""
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(self.path, Path)
|
||||
self.wipe()
|
||||
cls_compressor.uncompress(path_in, self.path.parent)
|
||||
|
||||
def wipe(self):
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(self.path, Path)
|
||||
if os.path.isdir(self.path):
|
||||
shutil.rmtree(self.path)
|
||||
if os.path.isfile(self.path):
|
||||
os.remove(self.path)
|
||||
@@ -1,43 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""Phasellus tellus lectus, volutpat eu dapibus ut, suscipit vel augue.
|
||||
|
||||
Tips:
|
||||
Aliquam non leo vel libero sagittis viverra. Quisque lobortis nunc sit amet augue euismod laoreet.
|
||||
Note:
|
||||
Maecenas volutpat porttitor pretium. Aliquam suscipit quis nisi non imperdiet.
|
||||
Note:
|
||||
Vivamus et efficitur lorem, eget imperdiet tortor. Integer vel interdum sem.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
if TYPE_CHECKING: # Only imports the below statements during type checking
|
||||
pass
|
||||
|
||||
def test_function(testvar: int) -> int:
|
||||
""" A test function that return testvar+1 and print "Hello world !"
|
||||
|
||||
Proin eget sapien eget ipsum efficitur mollis nec ac nibh.
|
||||
|
||||
Note:
|
||||
Morbi id lectus maximus, condimentum nunc eget, porta felis. In tristique velit tortor.
|
||||
|
||||
Args:
|
||||
testvar: any integer
|
||||
|
||||
Returns:
|
||||
testvar+1
|
||||
"""
|
||||
print("Hello world !")
|
||||
return testvar+1
|
||||
@@ -5,3 +5,16 @@
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
"""
|
||||
tools classes / functions
|
||||
"""
|
||||
|
||||
|
||||
def urljoin(*args: str) -> str:
|
||||
"""
|
||||
Joins given arguments into an url. Trailing but not leading slashes are
|
||||
stripped for each argument.
|
||||
"""
|
||||
|
||||
return "/".join(map(lambda x: str(x).rstrip("/"), args))
|
||||
1
test/test_data_origin/SAVE_FILE.txt
Normal file
1
test/test_data_origin/SAVE_FILE.txt
Normal file
@@ -0,0 +1 @@
|
||||
SAVED_VALUE
|
||||
460
test/test_datasync.py
Normal file
460
test/test_datasync.py
Normal file
@@ -0,0 +1,460 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
import unittest
|
||||
from os import chdir, path as os_path
|
||||
from pathlib import Path
|
||||
import pprint
|
||||
from io import StringIO
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
import shutil
|
||||
from contexttimer import Timer
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src import dabdatasync
|
||||
from src.dabdatasync.__main__ import fct_main
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
|
||||
class TestDabDataSync(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
chdir(testdir_path.parent.resolve())
|
||||
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
|
||||
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
|
||||
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data")
|
||||
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2")
|
||||
|
||||
def tearDown(self) -> None:
|
||||
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
|
||||
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
|
||||
|
||||
def test_version(self):
|
||||
self.assertNotEqual(dabdatasync.__version__, "?.?.?")
|
||||
|
||||
def test_cli_help(self):
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
with self.assertRaises(SystemExit):
|
||||
fct_main(["-h"])
|
||||
print(capted_stdout.getvalue())
|
||||
print(capted_stderr.getvalue())
|
||||
|
||||
def test_cli_verbosity(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
fct_main(["WipeRemoteData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["PullData"])
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["-v", "PullData"])
|
||||
self.assertTrue("WARNING" in capted_stdout.getvalue())
|
||||
self.assertFalse("INFO" in capted_stdout.getvalue())
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["-vv", "PullData"])
|
||||
self.assertTrue("INFO" in capted_stdout.getvalue())
|
||||
self.assertTrue("WARNING" in capted_stdout.getvalue())
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_GetServices(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["GetServices"])
|
||||
print(capted_stdout.getvalue())
|
||||
print(capted_stderr.getvalue())
|
||||
self.assertTrue("Nextcloud" in capted_stdout.getvalue())
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_GetServices_noservice(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound):
|
||||
fct_main(["GetServices"])
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_GetServices_invalid(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
|
||||
fct_main(["GetServices"])
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_GetServices_nextcloud_invalid(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
|
||||
fct_main(["GetServices"])
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_WipeLocalData(self):
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
def test_cli_simple(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
fct_main(["PushData"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE2")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
|
||||
|
||||
fct_main(["PullData"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE32")
|
||||
|
||||
fct_main(["PushData"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["PullData"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["WipeRemoteData"])
|
||||
fct_main(["PullData"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["PushData"])
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
fct_main(["PullData"])
|
||||
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
def test_defect_cli_select_wrong_service(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound):
|
||||
fct_main(["PullData", "--service", "WRONGSERVICE"])
|
||||
|
||||
def test_cli_select_service(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
fct_main(["PushData", "--service", "Nextcloud"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE2")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
|
||||
|
||||
fct_main(["PullData", "--service", "Nextcloud"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE32")
|
||||
|
||||
fct_main(["PushData", "--service", "Nextcloud"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["PullData", "--service", "Nextcloud"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["WipeRemoteData", "--service", "Nextcloud"])
|
||||
fct_main(["PullData", "--service", "Nextcloud"])
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
fct_main(["PushData", "--service", "Nextcloud"])
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
fct_main(["PullData", "--service", "Nextcloud"])
|
||||
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
def load_nextcloud_gen(self, compressor):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
datasync = dabdatasync.DataSync_Factory.get_DataSync()
|
||||
datasync[0].set_compressor(compressor)
|
||||
self.assertIsInstance(datasync, list)
|
||||
self.assertEqual(len(datasync), 1)
|
||||
|
||||
self.assertIsInstance(datasync[0], dabdatasync.A_DataSync)
|
||||
self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud)
|
||||
|
||||
self.assertEqual(len(datasync[0].get_datasync_records()), 2)
|
||||
self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map")
|
||||
self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2")
|
||||
self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs")
|
||||
self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs")
|
||||
self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data")
|
||||
self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
datasync[0].push_data()
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE2")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
|
||||
|
||||
datasync[0].pull_data()
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "SAVED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE32")
|
||||
|
||||
datasync[0].push_data()
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
datasync[0].pull_data()
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
|
||||
testfile.write("MODIFIED_VALUE")
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
datasync[0].wipe_remote_data()
|
||||
datasync[0].pull_data()
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
datasync[0].push_data()
|
||||
datasync[0].wipe_local_data()
|
||||
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
datasync[0].pull_data()
|
||||
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
|
||||
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
|
||||
|
||||
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
|
||||
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
|
||||
|
||||
def test_load_nextcloud__tar_gz(self):
|
||||
with Timer() as t:
|
||||
self.load_nextcloud_gen("tar_gz")
|
||||
print(t.elapsed)
|
||||
|
||||
def test_load_nextcloud__tar_lz4(self):
|
||||
with Timer() as t:
|
||||
self.load_nextcloud_gen("tar_lz4")
|
||||
print(t.elapsed)
|
||||
|
||||
def test_load_empty(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json"
|
||||
datasync = dabdatasync.DataSync_Factory.get_DataSync()
|
||||
self.assertIsInstance(datasync, list)
|
||||
self.assertEqual(len(datasync), 0)
|
||||
|
||||
def test_load_nextcloud_disabled(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
|
||||
datasync = dabdatasync.DataSync_Factory.get_DataSync()
|
||||
self.assertIsInstance(datasync, list)
|
||||
self.assertEqual(len(datasync), 0)
|
||||
|
||||
def test_defect_load_invalid(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
|
||||
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
|
||||
dabdatasync.DataSync_Factory.get_DataSync()
|
||||
|
||||
def test_defect_load_nextcloud_invalid(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
|
||||
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
|
||||
dabdatasync.DataSync_Factory.get_DataSync()
|
||||
3
test/test_manifest_empty.json
Normal file
3
test/test_manifest_empty.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
|
||||
"Args": {}}
|
||||
2
test/test_manifest_invalid.json
Normal file
2
test/test_manifest_invalid.json
Normal file
@@ -0,0 +1,2 @@
|
||||
{
|
||||
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002"}
|
||||
541
test/test_manifest_nextcloud.json
Normal file
541
test/test_manifest_nextcloud.json
Normal file
@@ -0,0 +1,541 @@
|
||||
{
|
||||
"APP_NAME": "CHACHA-SOTF",
|
||||
"APP_DESC": "ChaCha SonOfTheForset Dedicated Server",
|
||||
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
|
||||
"REFERENCE_CONFIG_ID": "cf698a62-120a-11ee-be56-0242ac120002",
|
||||
"VIRTUAL": false,
|
||||
"NOBOOTSTRAP": false,
|
||||
"NOFINALIZE": false,
|
||||
"NOSTART": false,
|
||||
"creation_date": "2024-03-24T19:07:48.862542",
|
||||
"Params": {
|
||||
"ROOTFS_SIZE_G": {
|
||||
"value": 20,
|
||||
"modified": true
|
||||
},
|
||||
"AR_TAGS": {
|
||||
"value": [
|
||||
{
|
||||
"value": "pydabfactory"
|
||||
},
|
||||
{
|
||||
"value": "debianbase"
|
||||
},
|
||||
{
|
||||
"value": "pydabfactory"
|
||||
},
|
||||
{
|
||||
"value": "chacha"
|
||||
},
|
||||
{
|
||||
"value": "pydabfactory"
|
||||
},
|
||||
{
|
||||
"value": "games"
|
||||
},
|
||||
{
|
||||
"value": "pydabfactory"
|
||||
},
|
||||
{
|
||||
"value": "sotf"
|
||||
}
|
||||
],
|
||||
"modified": true
|
||||
},
|
||||
"FEATURE_NESTING": {
|
||||
"value": false,
|
||||
"modified": false
|
||||
},
|
||||
"MAIN_MACADDR": {
|
||||
"value": "D2:A9:59:72:C4:B4",
|
||||
"modified": true
|
||||
},
|
||||
"AUTOSTART": {
|
||||
"value": true,
|
||||
"modified": true
|
||||
},
|
||||
"CPU_UNIT": {
|
||||
"value": 1536,
|
||||
"modified": true
|
||||
},
|
||||
"PRIVILEGIED": {
|
||||
"value": false,
|
||||
"modified": false
|
||||
},
|
||||
"DEST_NODE": {
|
||||
"value": "hypervisor2",
|
||||
"modified": true
|
||||
},
|
||||
"SWAP_M": {
|
||||
"value": 2048,
|
||||
"modified": true
|
||||
},
|
||||
"AR_CFG_OPT": {
|
||||
"value": [],
|
||||
"modified": false
|
||||
},
|
||||
"RUNNING_STORAGE": {
|
||||
"value": "VMStore2",
|
||||
"modified": true
|
||||
},
|
||||
"FEATURE_FUSE": {
|
||||
"value": false,
|
||||
"modified": false
|
||||
},
|
||||
"FEATURE_MKNODE": {
|
||||
"value": false,
|
||||
"modified": false
|
||||
},
|
||||
"NETWORK_BRIDGE": {
|
||||
"value": "vmbr1",
|
||||
"modified": true
|
||||
},
|
||||
"CPU_COUNT": {
|
||||
"value": 2,
|
||||
"modified": true
|
||||
},
|
||||
"TEMPLATE_STORAGE": {
|
||||
"value": "live-storage-h2",
|
||||
"modified": true
|
||||
},
|
||||
"RAM_M": {
|
||||
"value": 12000,
|
||||
"modified": true
|
||||
}
|
||||
},
|
||||
"Args": {
|
||||
"RootPasswd": {
|
||||
"type": "ROOT_PASSWD",
|
||||
"value": "######"
|
||||
},
|
||||
"DEBUG_TOOLS": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"SSH_PORT": {
|
||||
"type": "LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "tcp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 50020
|
||||
}
|
||||
}
|
||||
},
|
||||
"FirstBootFilePath": {
|
||||
"type": "STRING",
|
||||
"value": "/firstboot.sh"
|
||||
},
|
||||
"CustomFirstBootFilePath": {
|
||||
"type": "STRING",
|
||||
"value": "/customfirstboot.sh"
|
||||
},
|
||||
"ACLSavePath": {
|
||||
"type": "STRING",
|
||||
"value": "/saved.acl"
|
||||
},
|
||||
"locale": {
|
||||
"type": "STRING",
|
||||
"value": "fr_FR.UTF-8"
|
||||
},
|
||||
"locale_gen": {
|
||||
"type": "STRING",
|
||||
"value": "fr_FR.UTF-8 UTF-8"
|
||||
},
|
||||
"timezone": {
|
||||
"type": "STRING",
|
||||
"value": "Europe/Paris"
|
||||
},
|
||||
"EnableLog2Ram": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"EnableAutoReboot": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"EnableJava": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"ForcePython39": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"EXEC_USER": {
|
||||
"type": "STRING",
|
||||
"value": "GenUser"
|
||||
},
|
||||
"EXEC_USER_ID": {
|
||||
"type": "UINT",
|
||||
"value": 1000
|
||||
},
|
||||
"EXEC_USER_PASSWD": {
|
||||
"type": "PASSWD",
|
||||
"value": "######"
|
||||
},
|
||||
"DEFAULT_CHACHA_GIT_BRANCH": {
|
||||
"type": "STRING",
|
||||
"value": "production"
|
||||
},
|
||||
"SECTION": {
|
||||
"type": "STRING",
|
||||
"value": "games"
|
||||
},
|
||||
"SystemDJournalMaxSize": {
|
||||
"type": "STRING",
|
||||
"value": "40M"
|
||||
},
|
||||
"FSSYNC_PRESYNC_CMD": {
|
||||
"type": "STRING",
|
||||
"value": ""
|
||||
},
|
||||
"FSSYNC_POSTSYNC_CMD": {
|
||||
"type": "STRING",
|
||||
"value": ""
|
||||
},
|
||||
"FSSYNC_INITIAL_FETCH": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"FSSYNC_RECORD": {
|
||||
"type": "T_ARRAY_FSSYNC_RECORD",
|
||||
"value": [
|
||||
{
|
||||
"type": "T_FSSYNC_RECORD",
|
||||
"value": {
|
||||
"name": {
|
||||
"type": "SIMPLE_STRING",
|
||||
"value": "SOTF_map"
|
||||
},
|
||||
"type": {
|
||||
"type": "SIMPLE_STRING",
|
||||
"value": "fs"
|
||||
},
|
||||
"value": {
|
||||
"type": "STRING",
|
||||
"value": "test/test_data"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "T_FSSYNC_RECORD",
|
||||
"value": {
|
||||
"name": {
|
||||
"type": "SIMPLE_STRING",
|
||||
"value": "SOTF_map2"
|
||||
},
|
||||
"type": {
|
||||
"type": "SIMPLE_STRING",
|
||||
"value": "fs"
|
||||
},
|
||||
"value": {
|
||||
"type": "STRING",
|
||||
"value": "test/test_data2/SAVE_FILE.txt"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"FSSync_NextCloud_Enabled": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"FSSync_NextCloud_Address": {
|
||||
"type": "URL",
|
||||
"value": "https://chacha.ddns.net/nextcloud"
|
||||
},
|
||||
"FSSync_NextCloud_User": {
|
||||
"type": "STRING",
|
||||
"value": "chacha-bot"
|
||||
},
|
||||
"FSSync_NextCloud_Password": {
|
||||
"type": "STRING",
|
||||
"value": "F3P8m-nQHik-NSmb2-mnFEF-s85RE"
|
||||
},
|
||||
"FSSync_NextCloud_Path": {
|
||||
"type": "STRING",
|
||||
"value": "pydabfactory-test"
|
||||
},
|
||||
"GAME_MNG_PWD": {
|
||||
"type": "STRING",
|
||||
"value": "cfographut"
|
||||
},
|
||||
"GAME_MNG_DEFAULT_MODE": {
|
||||
"type": "STRING",
|
||||
"value": "BLACKLIST"
|
||||
},
|
||||
"GAME_MNG_LISTENING_PORT": {
|
||||
"type": "UINT",
|
||||
"value": 50000
|
||||
},
|
||||
"GAME_MNG_RESTART_DELAY": {
|
||||
"type": "UINT",
|
||||
"value": 30
|
||||
},
|
||||
"GAMETYPENAME": {
|
||||
"type": "STRING",
|
||||
"value": "sotf"
|
||||
},
|
||||
"WINE_ARCH": {
|
||||
"type": "STRING",
|
||||
"value": "win64"
|
||||
},
|
||||
"WINE_NAME": {
|
||||
"type": "STRING",
|
||||
"value": "wine-9.4-staging-tkg-amd64"
|
||||
},
|
||||
"WINEPREFIX": {
|
||||
"type": "STRING",
|
||||
"value": "DEFAULT"
|
||||
},
|
||||
"WINE_URL": {
|
||||
"type": "URL",
|
||||
"value": "https://github.com/Kron4ek/Wine-Builds/releases/download/9.4/wine-9.4-staging-tkg-amd64.tar.xz"
|
||||
},
|
||||
"ENABLE_WINE_ESYNC": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"ENABLE_WINE_FSYNC": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"ENABLE_WINE_PRELOADER": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"MEMLIMITHIGH": {
|
||||
"type": "SYSTEMD_RAM",
|
||||
"value": "10G"
|
||||
},
|
||||
"MEMLIMITMAX": {
|
||||
"type": "SYSTEMD_RAM",
|
||||
"value": "11G"
|
||||
},
|
||||
"CPUQUOTA": {
|
||||
"type": "SYSTEMD_CPUQUOTA",
|
||||
"value": 98
|
||||
},
|
||||
"STEAM_APP_ID": {
|
||||
"type": "UINT",
|
||||
"value": 2465200
|
||||
},
|
||||
"STEAM_LOGIN": {
|
||||
"type": "STRING",
|
||||
"value": "cclecle"
|
||||
},
|
||||
"STEAM_PWD": {
|
||||
"type": "PASSWD",
|
||||
"value": "######"
|
||||
},
|
||||
"HostName": {
|
||||
"type": "STRING",
|
||||
"value": "ChaCha - Sons Of The Forest Server"
|
||||
},
|
||||
"MaxPlayers": {
|
||||
"type": "UINT",
|
||||
"value": 8
|
||||
},
|
||||
"GamePassword": {
|
||||
"type": "STRING",
|
||||
"value": "!bourges2023"
|
||||
},
|
||||
"GamePort": {
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 8766
|
||||
}
|
||||
}
|
||||
},
|
||||
"QueryPort": {
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 27018
|
||||
}
|
||||
}
|
||||
},
|
||||
"BlobSyncPort": {
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 9700
|
||||
}
|
||||
}
|
||||
},
|
||||
"LanOnly": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"SaveSlot": {
|
||||
"type": "UINT",
|
||||
"value": 1
|
||||
},
|
||||
"SaveInterval": {
|
||||
"type": "UINT",
|
||||
"value": 600
|
||||
},
|
||||
"TreeRegrowth": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"StructureDamage": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"EnemySpawn": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"SkipNetworkAccessibilityTest": {
|
||||
"type": "BOOL",
|
||||
"value": true
|
||||
},
|
||||
"EnemyHealth": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"EnemyDamage": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"EnemyArmour": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"EnemyAggression": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"AnimalSpawnRate": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"StartingSeason": {
|
||||
"type": "STRING",
|
||||
"value": "Summer"
|
||||
},
|
||||
"SeasonLength": {
|
||||
"type": "STRING",
|
||||
"value": "Default"
|
||||
},
|
||||
"DayLength": {
|
||||
"type": "STRING",
|
||||
"value": "Default"
|
||||
},
|
||||
"PrecipitationFrequency": {
|
||||
"type": "STRING",
|
||||
"value": "Default"
|
||||
},
|
||||
"ConsumableEffects": {
|
||||
"type": "STRING",
|
||||
"value": "Normal"
|
||||
},
|
||||
"PlayerStatsDamage": {
|
||||
"type": "STRING",
|
||||
"value": "Off"
|
||||
},
|
||||
"ColdPenalties": {
|
||||
"type": "STRING",
|
||||
"value": "Off"
|
||||
},
|
||||
"ReducedFoodInContainers": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"SingleUseContainers": {
|
||||
"type": "BOOL",
|
||||
"value": false
|
||||
},
|
||||
"DAB_LISTEN_PORTS": {
|
||||
"type": "AR_LISTEN_PORT",
|
||||
"value": [
|
||||
{
|
||||
"type": "LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "tcp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 50020
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"DAB_EXT_LISTEN_PORTS": {
|
||||
"type": "AR_EXT_LISTEN_PORT",
|
||||
"value": [
|
||||
{
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 8766
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 27018
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "EXT_LISTEN_PORT",
|
||||
"value": {
|
||||
"port_type": {
|
||||
"type": "STRING",
|
||||
"value": "udp"
|
||||
},
|
||||
"value": {
|
||||
"type": "UINT",
|
||||
"value": 9700
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"DAB_MOUNT_POINTS": {
|
||||
"type": "AR_MOUNT_POINT",
|
||||
"value": []
|
||||
},
|
||||
"DAB_SHARED_MOUNT_POINTS": {
|
||||
"type": "AR_MOUNT_POINT",
|
||||
"value": []
|
||||
}
|
||||
}
|
||||
}
|
||||
9
test/test_manifest_nextcloud_disabled.json
Normal file
9
test/test_manifest_nextcloud_disabled.json
Normal file
@@ -0,0 +1,9 @@
|
||||
{
|
||||
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
|
||||
"Args": {
|
||||
"FSSync_NextCloud_Enabled": {"type": "BOOL", "value": false},
|
||||
"FSSync_NextCloud_Address": {"type": "URL", "value": "https://chacha.ddns.net/nextcloud"},
|
||||
"FSSync_NextCloud_User": {"type": "STRING", "value": "chacha-bot"},
|
||||
"FSSync_NextCloud_Password": {"type": "STRING", "value": "F3P8m-nQHik-NSmb2-mnFEF-s85RE"},
|
||||
"FSSync_NextCloud_Path": {"type": "STRING", "value": "pydabfactory"}
|
||||
}}
|
||||
5
test/test_manifest_nextcloud_invalid.json
Normal file
5
test/test_manifest_nextcloud_invalid.json
Normal file
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
|
||||
"Args": {
|
||||
"FSSync_NextCloud_Enabled": {"type": "BOOL", "value": true}
|
||||
}}
|
||||
@@ -1,35 +0,0 @@
|
||||
# dabdatasync (c) by chacha
|
||||
#
|
||||
# dabdatasync is licensed under a
|
||||
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
|
||||
#
|
||||
# You should have received a copy of the license along with this
|
||||
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
|
||||
|
||||
import unittest
|
||||
from os import chdir
|
||||
from io import StringIO
|
||||
from contextlib import redirect_stdout,redirect_stderr
|
||||
from pathlib import Path
|
||||
|
||||
print(__name__)
|
||||
print(__package__)
|
||||
|
||||
from src import dabdatasync
|
||||
|
||||
testdir_path = Path(__file__).parent.resolve()
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
class Testtest_module(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
chdir(testdir_path.parent.resolve())
|
||||
|
||||
def test_version(self):
|
||||
self.assertNotEqual(dabdatasync.__version__,"?.?.?")
|
||||
|
||||
def test_test_module(self):
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
self.assertEqual(dabdatasync.test_function(41),42)
|
||||
self.assertEqual(len(capted_stderr.getvalue()),0)
|
||||
self.assertEqual(capted_stdout.getvalue().strip(),"Hello world !")
|
||||
Reference in New Issue
Block a user