25 Commits
0.0.1 ... dev

Author SHA1 Message Date
507abf1c76 Update Jenkinsfile 2024-10-12 16:39:27 +02:00
cclecle
054d9329fd unittest: test remote empty
chore: remove useless config keys
2024-04-12 22:55:08 +01:00
cclecle
e95b59ae3d fix quality report 2024-03-31 02:22:42 +01:00
cclecle
7e6aacc82f fix: properly remove credentials after test 2024-03-31 02:19:47 +01:00
cclecle
cb4872d057 chore: apply black formater 2024-03-31 02:09:41 +01:00
cclecle
f9c03a83f3 chore: fix export and nextcloud pwd script
test: fix typing
2024-03-31 02:04:08 +01:00
cclecle
f4ac5360b7 fix nextcloud password setting
add missing function for help
2024-03-31 00:45:55 +00:00
cclecle
9581772c9e chore: extract nextcloud password from source files 2024-03-31 00:36:34 +00:00
cclecle
da10ea2c19 feat: improve help for subparsers 2024-03-31 00:36:10 +00:00
cclecle
842b8b6a5c fix main 2024-03-29 12:15:40 +00:00
cclecle
5975be2ca5 chore: update project keywords 2024-03-29 03:34:56 +00:00
cclecle
2d753205ca feat: implement verbosity 2024-03-29 03:25:18 +00:00
cclecle
872dab1ed9 chore: optimize imports
fix: remove useless printf
2024-03-29 03:11:08 +00:00
cclecle
96d8f8fdd7 fix webdavclient3 version dep 2024-03-29 02:55:08 +00:00
cclecle
0ff9f442d2 chore: remove useless data
feature: add lz4 compressor
refactoring: make compressor configurable
feature: add CLI interface and configure int in .toml
feature: add logging using loguru
2024-03-29 02:51:49 +00:00
cclecle
9f4f2bbba1 feat: add main / argparse (tap)
feat: add logging system
refactoring: split project into multiples files
2024-03-28 17:59:10 +00:00
cclecle
5fca93e4a8 chore: rework suffix (now in compressor class only) 2024-03-28 02:16:14 +00:00
cclecle
79518946ac fix quality warning 2024-03-28 02:06:01 +00:00
cclecle
6aea5311bb feat: fully implement read for dir and files
test: implement full unitest scenario
2024-03-28 01:55:17 +00:00
cclecle
b4292a6f57 feat: start implement read
chore: improve type check
2024-03-28 00:51:18 +00:00
cclecle
38031deba2 fix: ignore webdav3.client typing
fix: ignore coverage for 'if TYPE_CHECKING:' directives
2024-03-28 00:02:18 +00:00
cclecle
abe800814d fix quality and typing 2024-03-27 02:37:47 +00:00
cclecle
81ba24d6f3 improve quality 2024-03-27 02:17:08 +00:00
cclecle
b56a61b796 implement Nextcloud write 2024-03-27 01:52:19 +00:00
cclecle
f1a748c09f import code 2024-03-23 11:56:52 +00:00
24 changed files with 1908 additions and 92 deletions

3
.gitignore vendored
View File

@@ -43,4 +43,5 @@ helpers-results
/.mypy_cache/
.coverage
.mypy_cache
test/tmp
test/tmp
test/nextcloud_pwd.mdp

View File

@@ -1,2 +1,3 @@
eclipse.preferences.version=1
encoding//src/dabdatasync/__main__.py=utf-8
encoding/<project>=UTF-8

4
Jenkinsfile vendored
View File

@@ -184,7 +184,7 @@ pipeline {
sh("virtualenv --pip=embed --setuptools=embed --wheel=embed --no-periodic-update --activators bash,python TOOLS_ENV")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade setuptools build pip")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==8.*' jinja2-slug toml")
sh(". ~/BUILD_ENV/bin/activate && pip install --upgrade 'copier==9.*' jinja2-slug toml")
sh(". ~/TEST_ENV/bin/activate && pip install --upgrade pip")
@@ -546,7 +546,7 @@ pipeline {
dir("gitrepo") {
junit 'helpers-results/cl_unit_test/*.xml'
// using cobertura format (= coverage xml format)
publishCoverage adapters: [cobertura(mergeToOneReport: true, path: "helpers-results/cl_unit_test_coverage/test_coverage.xml")]
recordCoverage(tools: [[parser: 'COBERTURA', pattern: 'helpers-results/cl_unit_test_coverage/test_coverage.xml']])
publishHTML([
reportDir: "helpers-results/cl_unit_test_coverage",
reportFiles: "index.html",

View File

@@ -18,7 +18,7 @@ name = "dabdatasync"
description = "dabdatasync"
readme = "README.md"
requires-python = ">=3.11"
keywords = ["chacha","chacha","template","dabdatasync"]
keywords = ["chacha","dabdatasync","dab","debian","proxmox","pydabfactory"]
license = { file = "LICENSE.md" }
authors = [
@@ -34,7 +34,12 @@ classifiers = [
]
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging'
'packaging',
'webdavclient3==3.14.*',
'pydantic==2.*',
'typed-argument-parser==1.*',
'loguru==0.7.*',
'lz4'
]
dynamic = ["version"]
@@ -49,9 +54,21 @@ where = ["src"]
"dabdatasync.data" = ["*.*"]
"dabdatasync" = ["py.typed"]
# [[tool.mypy.overrides]]
# module = ""
# ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.client"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "webdav3.exceptions"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4"
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "lz4.frame"
ignore_missing_imports = true
[tool.coverage.run]
cover_pylib = false
@@ -63,19 +80,24 @@ concurrency = [
'thread'
]
[tool.coverage.report]
exclude_also = [
"if TYPE_CHECKING:",
]
[project.urls]
Homepage = "https://chacha.ddns.net/gitea/chacha/dabdatasync"
Documentation = "https://chacha.ddns.net/mkdocs-web/chacha/dabdatasync/master/latest/"
Tracker = "https://chacha.ddns.net/gitea/chacha/dabdatasync/issues"
[project.optional-dependencies]
test = ["chacha_cicd_helper"]
test = ["chacha_cicd_helper","contexttimer"]
coverage-check = ["chacha_cicd_helper"]
complexity-check = ["chacha_cicd_helper"]
quality-check = ["chacha_cicd_helper"]
type-check = ["chacha_cicd_helper"]
doc-gen = ["chacha_cicd_helper"]
# [project.scripts]
# my-script = "my_package.module:function"
[project.scripts]
dabdatasync = "dabdatasync.__main__:CLI"

View File

@@ -11,4 +11,18 @@ Main module __init__ file.
"""
from .__metadata__ import __version__, __Summuary__, __Name__
from .test_module import test_function
from .datasync import A_DataSync, DataSync_Factory
from .datasync_nextcloud import C_DataSync_NextCloud
from .exceptions import (
DataSyncException,
DataSyncException_InvalidManifest,
DataSyncException_NoConcreteRecordClassFound,
DataSyncException_RemoteDataNotFound,
DataSyncException_NoValidServiceFound,
DataSyncException_ServiceNotFound,
DataSyncException_TooManyServiceFound,
DataSyncException_CompressorNotFound,
DataSyncException_TooManyCompressorFound,
)
from .compressors import DataSync_Compressors

180
src/dabdatasync/__main__.py Normal file
View File

@@ -0,0 +1,180 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""CLI interface module"""
from __future__ import annotations
from typing import cast, Union, Optional
import sys
from tap import Tap
from loguru import logger
from . import __Summuary__, __Name__, __version__
from . import datasync
from . import exceptions
from . import compressors
class dabdatasync_args_GetServices(Tap):
"""GetServices CLI arg subparser"""
class dabdatasync_args_WipeLocalData(Tap):
"""WipeLocalData CLI arg subparser"""
class dabdatasync_args_service_abstract(Tap):
"""service abstract CLI arg subparser"""
service: Optional[str] = None
def configure(self) -> None:
self.add_argument(
"--service",
help=f"specify the service to use (availables: {datasync.DataSync_Factory.get_list()}, first found in priority if not set)",
)
class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract):
"""service compressor abstract CLI arg subparser"""
compressor: Optional[str] = None
def configure(self) -> None:
super().configure()
self.add_argument(
"--compressor",
help=f"specify the compressor method (availables: {compressors.DataSync_Compressors.get_list()}, .tar.gz if not set)",
)
class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract):
"""PullData CLI arg subparser"""
class dabdatasync_args_PushData(dabdatasync_args_service_compress_abstract):
"""PushData CLI arg subparser"""
class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract):
"""WipeRemoteData CLI arg subparser"""
class dabdatasync_args(Tap):
"""Main CLI arg parser"""
verbosity: int = 0
help: bool = False
version: bool = False
def configure(self) -> None:
self.add_argument("--version", action="store_true", help="show version string")
self.add_argument("-h", "--help", action="store_true", help="full help")
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_subparsers(dest="command", help="command type")
self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list")
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
self.add_subparser("WipeLocalData", dabdatasync_args_WipeLocalData, help="Wipe local data")
self.add_subparser("WipeRemoteData", dabdatasync_args_WipeRemoteData, help="Wipe remote service data")
def process_args(self) -> None:
"""dynamically add self.command to avoid conflict with Tap/argparse while keep pylint happy"""
self.command: Union[str, None] = cast(Union[str, None], self.command) # pylint: disable=attribute-defined-outside-init
def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,too-complex,too-many-statements
"""CLI main function"""
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
args: dabdatasync_args = parser.parse_args(i_args)
if args.help or args.version:
print(f"{__Name__} version {__version__}")
if args.help:
print(parser.format_help())
for name, subparser in parser._subparsers.choices.items(): # type: ignore[union-attr] # pylint: disable=protected-access
print("=========================")
print(f"Subparser {{{name}}}")
print(subparser.format_help())
if args.help or args.version:
sys.exit(0)
logger.remove()
if args.verbosity:
if args.verbosity == 1:
logger.add(sys.stdout, level="WARNING")
elif args.verbosity == 2:
logger.add(sys.stdout, level="INFO")
else:
logger.add(sys.stdout, level="DEBUG")
else:
logger.add(sys.stdout, level="ERROR")
logger.add(sys.stderr, level="ERROR")
dabdatasync = datasync.DataSync_Factory.get_DataSync()
if len(dabdatasync) == 0:
raise exceptions.DataSyncException_NoValidServiceFound("No valid service found")
if args.command == "GetServices":
for service in dabdatasync:
print(service.service_name)
return
if args.command == "WipeLocalData":
dabdatasync[0].wipe_local_data()
return
selected_dabdatasync: datasync.A_DataSync
if args.command in ["PullData", "PushData", "WipeRemoteData"]:
requested_service = cast(dabdatasync_args_service_abstract, args).service # pylint: disable=no-member
if requested_service:
services = [_ for _ in dabdatasync if type(_).service_name == requested_service]
if len(services) == 0:
raise exceptions.DataSyncException_ServiceNotFound()
if len(services) == 1:
selected_dabdatasync = services[0]
else:
raise exceptions.DataSyncException_TooManyServiceFound()
else:
selected_dabdatasync = dabdatasync[0]
if args.command in ["PullData", "PushData"]:
compressor = cast(dabdatasync_args_service_compress_abstract, args).compressor # pylint: disable=no-member
if compressor:
selected_dabdatasync.set_compressor(compressor)
if args.command == "PullData":
selected_dabdatasync.pull_data()
return
if args.command == "PushData":
selected_dabdatasync.push_data()
return
if args.command == "WipeRemoteData":
selected_dabdatasync.wipe_remote_data()
return
raise RuntimeError("a command is required")
def CLI():
"""wrapper for .toml declared script"""
fct_main(sys.argv[1:])
if __name__ == "__main__":
CLI()

View File

@@ -15,7 +15,7 @@ import warnings
try: # pragma: no cover
__version__ = version("dabdatasync")
except PackageNotFoundError: # pragma: no cover
except PackageNotFoundError: # pragma: no cover
warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?")
__version__ = "?.?.?"

View File

@@ -0,0 +1,128 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Compressor interface and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
from io import BytesIO
from typing import TYPE_CHECKING
import tarfile
import lz4.frame
from loguru import logger
from .exceptions import DataSyncException_CompressorNotFound, DataSyncException_TooManyCompressorFound
if TYPE_CHECKING:
from typing import IO
class A_DataSync_Compressor(ABC):
"""abstract compressor class"""
suffix: str
compressor_name: str = "ABSTRACT"
@classmethod
def compress(cls, path_in: Path, file_out: IO):
"""compress method"""
logger.debug(f"compressing <{path_in}> data using <{cls.compressor_name}> to <{file_out.name}>")
cls._impl_compress(path_in, file_out)
@classmethod
@abstractmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - virtual"""
@classmethod
def uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method"""
logger.debug(f"uncompressing <{path_in}> data using <{cls.compressor_name}> to <{path_out}>")
cls._impl_uncompress(path_in, path_out)
@classmethod
@abstractmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompress method - virtual"""
class DataSync_Compressors:
"""compressers container/factory class"""
_availables: list[type[A_DataSync_Compressor]] = []
@classmethod
def register(cls, _cls: type[A_DataSync_Compressor]) -> type[A_DataSync_Compressor]:
"""register a new compressor"""
cls._availables.append(_cls)
return _cls
@classmethod
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
"""get a specific compressor"""
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
if len(found) == 0:
raise DataSyncException_CompressorNotFound()
if len(found) == 1:
return found[0]
raise DataSyncException_TooManyCompressorFound()
@classmethod
def get_list(cls) -> list[str]:
"""return the available compressor name list"""
return [_.compressor_name for _ in cls._availables]
@DataSync_Compressors.register
class DataSync_Compressor__tar_gz(A_DataSync_Compressor):
"""Concrete compressor class - .tar.gz compressor"""
suffix: str = ".tar.gz"
compressor_name: str = "tar_gz"
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.gz concrete"""
with tarfile.open(fileobj=file_out, mode="w:gz") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.gz concrete"""
with tarfile.open(path_in, mode="r:gz") as tar:
tar.extractall(path_out)
@DataSync_Compressors.register
class DataSync_Compressor__tar_lz4(A_DataSync_Compressor):
"""Concrete compressor class - .tar.lz4 compressor"""
suffix: str = ".tar.lz4"
compressor_name: str = "tar_lz4"
@classmethod
def _impl_compress(cls, path_in: Path, file_out: IO):
"""compress method - .tar.lz4 concrete"""
with BytesIO() as memBuff:
with tarfile.open(fileobj=memBuff, mode="w:") as tar:
tar.add(path_in, arcname=os.path.basename(path_in))
memBuff.seek(0)
file_out.write(lz4.frame.compress(memBuff.read()))
@classmethod
def _impl_uncompress(cls, path_in: Path, path_out: Path):
"""uncompressing method - .tar.lz4 concrete"""
with open(path_in, "rb") as file_in, BytesIO() as memBuff:
memBuff.write(lz4.frame.decompress(file_in.read()))
memBuff.seek(0)
with tarfile.open(fileobj=memBuff, mode="r:") as tar:
tar.extractall(path_out)

186
src/dabdatasync/datasync.py Normal file
View File

@@ -0,0 +1,186 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Nextcloud abstract interface
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from uuid import UUID
from pathlib import Path
import os
from tempfile import NamedTemporaryFile, TemporaryDirectory
import json
from typing import final, TYPE_CHECKING
from loguru import logger
from .records import A_DataSync_Record, DataSync_Record_Factory
from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors
from .exceptions import DataSyncException_RemoteDataNotFound
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
from .compressors import A_DataSync_Compressor
class A_DataSync(ABC):
"""Abstract DataSync class"""
service_name: str = "ABSTRACT"
priority: int = 0
@classmethod
@final
def try_get_instance(cls, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> Self | None:
"""try to get an instance of a concrete class"""
if cls.test_applicable(manifest):
return cls(manifest, cls_compressor)
return None
def __init__(self, manifest: dict[str, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
self.connected: bool = False
self._cls_compressor: type[A_DataSync_Compressor] = cls_compressor
self._manifest: dict[str, Any] = manifest
self._app_id: UUID = UUID(manifest["APP_ID"])
self._ar_datasync_record: list[A_DataSync_Record] = []
if "FSSYNC_RECORD" in manifest["Args"]:
for record in manifest["Args"]["FSSYNC_RECORD"]["value"]:
record = DataSync_Record_Factory.get_C_DataSync_Record(
record["value"]["name"]["value"],
record["value"]["type"]["value"],
record["value"]["value"]["value"],
)
assert isinstance(record, A_DataSync_Record)
self._ar_datasync_record.append(record)
def set_compressor(self, compressor_name: str) -> None:
"""set compressor to be used"""
self._cls_compressor = DataSync_Compressors.get(compressor_name)
def get_datasync_records(self) -> list[A_DataSync_Record]:
"""get list of records"""
return self._ar_datasync_record
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - generic"""
del manifest # quality warning removal
return False
def configure(self) -> None:
"""configure the class instance"""
self._impl_configure()
@abstractmethod
def _impl_configure(self) -> None:
"""configure the class instance - virtual"""
def connect(self) -> None:
"""connect to the service"""
if not self.connected:
logger.info(f"connection to service <{self.service_name}>")
self._impl_connect()
self.connected = True
logger.info("connection done")
@abstractmethod
def _impl_connect(self) -> None:
"""connect to the service - virtual"""
def pull_data(self) -> None:
"""pull data from the service"""
logger.info(f"pulling data from service <{self.service_name}>")
self.connect()
with TemporaryDirectory() as tmpdir:
for datasync_record in self._ar_datasync_record:
logger.info(f"pulling record <{datasync_record.name}>")
try:
self._impl_pull_data(Path(datasync_record.name + self._cls_compressor.suffix), Path(tmpdir))
datasync_record.uncompress(self._cls_compressor, Path(tmpdir) / (datasync_record.name + self._cls_compressor.suffix))
except DataSyncException_RemoteDataNotFound:
logger.warning(f"remote record file not found <{datasync_record.name}>")
logger.info("done")
@abstractmethod
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the service - virtual"""
def push_data(self) -> None:
"""push data to the service"""
logger.info(f"pushing data to service <{self.service_name}>")
self.connect()
self._impl_wipe_remote_data()
for datasync_record in self._ar_datasync_record:
logger.info(f"pushing record <{datasync_record.name}>")
try:
with NamedTemporaryFile("wb", suffix=self._cls_compressor.suffix, delete=False) as tmp_file:
datasync_record.compress(self._cls_compressor, tmp_file)
tmp_file.seek(0)
tmp_file.close()
self._impl_push_data(datasync_record.name + "".join(Path(tmp_file.name).suffixes), tmp_file)
finally:
os.unlink(tmp_file.name)
logger.info("done")
@abstractmethod
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the service - virtual"""
def wipe_remote_data(self) -> None:
"""wipe data on the service"""
logger.info(f"wiping remote data on service <{self.service_name}>")
self.connect()
self._impl_wipe_remote_data()
def wipe_local_data(self) -> None:
"""wipe local data"""
logger.info("wiping local data")
for datasync_record in self._ar_datasync_record:
datasync_record.wipe()
@abstractmethod
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - virtual"""
class DataSync_Factory:
"""DataSync Factory"""
ar_cls_DataSync: set[type[A_DataSync]] = set()
manifest_path: str = "/opt/pyDABFactoryAppliance/Manifest.json"
cls_compressor: type[A_DataSync_Compressor] = DataSync_Compressor__tar_gz
@classmethod
def get_manifest_data(cls) -> dict[str, Any]:
"""tool method to get manifest"""
with open(cls.manifest_path, encoding="utf-8") as f_DAB_manifest:
return json.load(f_DAB_manifest)
@classmethod
def get_DataSync(cls) -> list[A_DataSync]:
"""get and configure a DataSync Concrete class instance"""
ar_datasync: list[A_DataSync] = []
manifest = cls.get_manifest_data()
for cls_DataSync in cls.ar_cls_DataSync:
if res := cls_DataSync.try_get_instance(manifest, cls.cls_compressor):
res.configure()
ar_datasync.append(res)
return ar_datasync
@classmethod
def register(cls, _cls: type[A_DataSync]) -> type[A_DataSync]:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@classmethod
def get_list(cls) -> list[str]:
"""return the available DataSync concrete class name list"""
return [_.service_name for _ in cls.ar_cls_DataSync]

View File

@@ -0,0 +1,114 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Nextcloud datasync implementation
"""
from __future__ import annotations
from pathlib import Path
import os
from typing import TYPE_CHECKING
from webdav3.client import Client as webdav3_Client
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
from .datasync import A_DataSync, DataSync_Factory
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
from .utils import urljoin
if TYPE_CHECKING:
from typing import Any, IO
from .compressors import A_DataSync_Compressor
@DataSync_Factory.register
class C_DataSync_NextCloud(A_DataSync):
"""Concrete DataSync class - Nextcloud"""
service_name: str = "Nextcloud"
priority: int = 100
def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
super().__init__(manifest, cls_compressor)
self.nextcloud_address: str
self.nextcloud_user: str
self.nextcloud_password: str
self.nextcloud_path: str
self.client: webdav3_Client
self.connected: bool = False
@classmethod
def test_applicable(cls, manifest: dict[str, Any]) -> bool:
"""check if a concrete class is applicable - Nextcloud override"""
if "Args" in manifest:
if "FSSync_NextCloud_Enabled" in manifest["Args"]:
if manifest["Args"]["FSSync_NextCloud_Enabled"]["value"] is True:
return True
return False
raise DataSyncException_InvalidManifest()
def _impl_configure(self) -> None:
"""configure the class instance - Nextcloud concrete implementation"""
if "FSSync_NextCloud_Address" in self._manifest["Args"]:
self.nextcloud_address = self._manifest["Args"]["FSSync_NextCloud_Address"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_User" in self._manifest["Args"]:
self.nextcloud_user = self._manifest["Args"]["FSSync_NextCloud_User"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Password" in self._manifest["Args"]:
self.nextcloud_password = self._manifest["Args"]["FSSync_NextCloud_Password"]["value"]
else:
raise DataSyncException_InvalidManifest()
if "FSSync_NextCloud_Path" in self._manifest["Args"]:
self.nextcloud_path = str(Path(self._manifest["Args"]["FSSync_NextCloud_Path"]["value"]) / Path(str(self._app_id))).replace(
os.sep, "/"
)
else:
raise DataSyncException_InvalidManifest()
def _impl_connect(self) -> None:
"""connect to the remote service - Nextcloud concrete implementation"""
full_adress = urljoin(self.nextcloud_address, "remote.php/dav/files/", self.nextcloud_user)
self.client = webdav3_Client(
{"webdav_hostname": full_adress, "webdav_login": self.nextcloud_user, "webdav_password": self.nextcloud_password}
)
def _check_create_dir(self) -> None:
"""check and create directory in remote service"""
url_accumulator: str = ""
for url_part in self.nextcloud_path.split("/"):
url_accumulator += "/" + url_part
if not self.client.check(url_accumulator):
self.client.mkdir(url_accumulator)
def _impl_pull_data(self, file_in: Path, file_out: Path) -> None:
"""pull data from the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
try:
self.client.download_sync(
remote_path=str(self.nextcloud_path / file_in).replace(os.sep, "/"), local_path=str(file_out / file_in).replace(os.sep, "/")
)
except webdav3_RemoteResourceNotFound as exc:
raise DataSyncException_RemoteDataNotFound from exc
def _impl_push_data(self, record_name: str, file_in: IO) -> None:
"""push data to the remote service - Nextcloud concrete implementation"""
self._check_create_dir()
self.client.upload_sync(
remote_path=str(Path(self.nextcloud_path) / record_name).replace(os.sep, "/"),
local_path=file_in.name,
)
def _impl_wipe_remote_data(self) -> None:
"""wipe data on the service - concrete implementation"""
if self.client.check(self.nextcloud_path):
self.client.clean(self.nextcloud_path)

View File

@@ -0,0 +1,47 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
Exception declaration
"""
class DataSyncException(Exception):
"""generic datasync exception class"""
class DataSyncException_InvalidManifest(DataSyncException):
"""specific datasync exception class - Dab appliance manifest not found"""
class DataSyncException_NoConcreteRecordClassFound(DataSyncException):
"""specific datasync exception class - No concrete Record Class Found"""
class DataSyncException_RemoteDataNotFound(DataSyncException):
"""specific datasync exception class - Remote Data Not Found"""
class DataSyncException_NoValidServiceFound(DataSyncException):
"""specific datasync exception class - Remote Valid Service Found"""
class DataSyncException_ServiceNotFound(DataSyncException):
"""specific datasync exception class - Service Not Found"""
class DataSyncException_TooManyServiceFound(DataSyncException):
"""specific datasync exception class - Too Many Service Found"""
class DataSyncException_CompressorNotFound(DataSyncException):
"""specific datasync exception class - Compressor Not Found"""
class DataSyncException_TooManyCompressorFound(DataSyncException):
"""specific datasync exception class - Too Many Compressor Found"""

View File

@@ -0,0 +1,99 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
datasync records description and implementation
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
import os
import shutil
from typing import TYPE_CHECKING, Optional
from pydantic import BaseModel
from .exceptions import DataSyncException_NoConcreteRecordClassFound
if TYPE_CHECKING:
from .compressors import A_DataSync_Compressor
from typing import IO
class A_DataSync_Record(BaseModel, ABC):
"""Abstract DataSync Record class"""
name: str
rec_type: str
value: str
@abstractmethod
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - virtual"""
@abstractmethod
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - virtual"""
@abstractmethod
def wipe(self):
"""wipe the record local data - virtual"""
class DataSync_Record_Factory:
"""DataSync Record Factory"""
ar_cls_DataSync_Record: set[type[A_DataSync_Record]] = set()
@classmethod
def get_C_DataSync_Record(cls, name: str, rec_type: str, value: str) -> A_DataSync_Record | None:
"""get a concrete DataSync Record class instance"""
for cls_DataSync_Record in cls.ar_cls_DataSync_Record:
if cls_DataSync_Record.model_fields["rec_type"].default == rec_type:
return C_DataSync_Record_FS(name=name, rec_type=rec_type, value=value)
raise DataSyncException_NoConcreteRecordClassFound()
@classmethod
def register(cls, _cls: type[A_DataSync_Record]) -> type[A_DataSync_Record]:
"""decorator to register a concrete DataSync Record class"""
cls.ar_cls_DataSync_Record.add(_cls)
return _cls
@DataSync_Record_Factory.register
class C_DataSync_Record_FS(A_DataSync_Record):
"""Concrete DataSync Record class - FileSystem"""
rec_type: str = "fs"
path: Optional[Path] = None
def model_post_init(self, __context) -> None:
self.path = Path(self.value)
def compress(self, cls_compressor: type[A_DataSync_Compressor], file_out: IO) -> None:
"""compress the DataSync Record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
cls_compressor.compress(self.path, file_out)
def uncompress(self, cls_compressor: type[A_DataSync_Compressor], path_in: Path) -> None:
"""uncompress the DataSync record - concrete FS implementation"""
if TYPE_CHECKING:
assert isinstance(self.path, Path)
self.wipe()
cls_compressor.uncompress(path_in, self.path.parent)
def wipe(self):
if TYPE_CHECKING:
assert isinstance(self.path, Path)
if os.path.isdir(self.path):
shutil.rmtree(self.path)
if os.path.isfile(self.path):
os.remove(self.path)

View File

@@ -1,43 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""Phasellus tellus lectus, volutpat eu dapibus ut, suscipit vel augue.
Tips:
Aliquam non leo vel libero sagittis viverra. Quisque lobortis nunc sit amet augue euismod laoreet.
Note:
Maecenas volutpat porttitor pretium. Aliquam suscipit quis nisi non imperdiet.
Note:
Vivamus et efficitur lorem, eget imperdiet tortor. Integer vel interdum sem.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING: # Only imports the below statements during type checking
pass
def test_function(testvar: int) -> int:
""" A test function that return testvar+1 and print "Hello world !"
Proin eget sapien eget ipsum efficitur mollis nec ac nibh.
Note:
Morbi id lectus maximus, condimentum nunc eget, porta felis. In tristique velit tortor.
Args:
testvar: any integer
Returns:
testvar+1
"""
print("Hello world !")
return testvar+1

View File

@@ -5,3 +5,16 @@
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
"""
tools classes / functions
"""
def urljoin(*args: str) -> str:
"""
Joins given arguments into an url. Trailing but not leading slashes are
stripped for each argument.
"""
return "/".join(map(lambda x: str(x).rstrip("/"), args))

View File

@@ -4,4 +4,4 @@
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.

View File

@@ -0,0 +1 @@
SAVED_VALUE

518
test/test_datasync.py Normal file
View File

@@ -0,0 +1,518 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from os import chdir, path as os_path
from pathlib import Path
import pprint
import os
import glob
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import shutil
import json
from contexttimer import Timer
print(__name__)
print(__package__)
from src import dabdatasync
from src.dabdatasync.__main__ import fct_main
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class TestDabDataSync(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data")
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2")
nextcloud_pwd = ""
if "nextcloud_pwd" in os.environ:
nextcloud_pwd = os.environ.get("nextcloud_pwd")
elif os.path.isfile(testdir_path / "nextcloud_pwd.mdp"):
with open(testdir_path / "nextcloud_pwd.mdp", "rt", encoding="utf-8") as pwd_file:
nextcloud_pwd = pwd_file.read()
else:
raise RuntimeError("NextCloud pwd file not found")
files = glob.glob(str(testdir_path / "*.json"))
files += glob.glob(str(testdir_path / "*.json"))
for file in files:
with open(file) as f:
data = json.load(f)
try:
data["Args"]["FSSync_NextCloud_Password"]["value"] = nextcloud_pwd
except Exception:
pass
os.remove(file)
with open(file, "w") as f:
json.dump(data, f, sort_keys=True, indent=4)
def tearDown(self) -> None:
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
files = glob.glob(str(testdir_path / "*.json"))
files += glob.glob(str(testdir_path / "*.json"))
for file in files:
with open(file) as f:
data = json.load(f)
try:
data["Args"]["FSSync_NextCloud_Password"]["value"] = ""
except Exception:
pass
os.remove(file)
with open(file, "w") as f:
json.dump(data, f, sort_keys=True, indent=4)
def test_version(self):
self.assertNotEqual(dabdatasync.__version__, "?.?.?")
def test_cli_help(self):
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(SystemExit):
fct_main(["-h"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
def test_cli_verbosity(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
fct_main(["WipeRemoteData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["PullData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-v", "PullData"])
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertFalse("INFO" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-vv", "PullData"])
self.assertTrue("INFO" in capted_stdout.getvalue())
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["GetServices"])
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
self.assertTrue("Nextcloud" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_noservice(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
fct_main(["GetServices"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_WipeLocalData(self):
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["WipeLocalData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
def test_cli_simple(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
fct_main(["PushData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData"])
fct_main(["PullData"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_defect_cli_select_wrong_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound):
fct_main(["PullData", "--service", "WRONGSERVICE"])
def test_cli_select_service(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
fct_main(["PushData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["WipeRemoteData", "--service", "Nextcloud"])
fct_main(["PullData", "--service", "Nextcloud"])
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
fct_main(["PushData", "--service", "Nextcloud"])
fct_main(["WipeLocalData"])
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
fct_main(["PullData", "--service", "Nextcloud"])
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_load_remote_empty(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
datasync[0].wipe_remote_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
def load_nextcloud_gen(self, compressor):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
datasync[0].set_compressor(compressor)
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 1)
self.assertIsInstance(datasync[0], dabdatasync.A_DataSync)
self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud)
self.assertEqual(len(datasync[0].get_datasync_records()), 2)
self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map")
self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2")
self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs")
self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data")
self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE2")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE2")
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "SAVED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE32")
datasync[0].push_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE3")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE32")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile:
testfile.write("MODIFIED_VALUE")
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].wipe_remote_data()
datasync[0].pull_data()
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
datasync[0].push_data()
datasync[0].wipe_local_data()
self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
datasync[0].pull_data()
self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt"))
self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt"))
with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile:
self.assertEqual(testfile.read(), "MODIFIED_VALUE")
def test_load_nextcloud__tar_gz(self):
with Timer() as t:
self.load_nextcloud_gen("tar_gz")
print(t.elapsed)
def test_load_nextcloud__tar_lz4(self):
with Timer() as t:
self.load_nextcloud_gen("tar_lz4")
print(t.elapsed)
def test_load_empty(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_load_nextcloud_disabled(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json"
datasync = dabdatasync.DataSync_Factory.get_DataSync()
self.assertIsInstance(datasync, list)
self.assertEqual(len(datasync), 0)
def test_defect_load_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()
def test_defect_load_nextcloud_invalid(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json"
with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest):
dabdatasync.DataSync_Factory.get_DataSync()

View File

@@ -0,0 +1,4 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {}
}

View File

@@ -0,0 +1,3 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002"
}

View File

@@ -0,0 +1,529 @@
{
"APP_DESC": "ChaCha SonOfTheForset Dedicated Server",
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"APP_NAME": "CHACHA-SOTF",
"Args": {
"ACLSavePath": {
"type": "STRING",
"value": "/saved.acl"
},
"AnimalSpawnRate": {
"type": "STRING",
"value": "Normal"
},
"BlobSyncPort": {
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 9700
}
}
},
"CPUQUOTA": {
"type": "SYSTEMD_CPUQUOTA",
"value": 98
},
"ColdPenalties": {
"type": "STRING",
"value": "Off"
},
"ConsumableEffects": {
"type": "STRING",
"value": "Normal"
},
"CustomFirstBootFilePath": {
"type": "STRING",
"value": "/customfirstboot.sh"
},
"DAB_EXT_LISTEN_PORTS": {
"type": "AR_EXT_LISTEN_PORT",
"value": [
{
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 8766
}
}
},
{
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 27018
}
}
},
{
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 9700
}
}
}
]
},
"DAB_LISTEN_PORTS": {
"type": "AR_LISTEN_PORT",
"value": [
{
"type": "LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "tcp"
},
"value": {
"type": "UINT",
"value": 50020
}
}
}
]
},
"DAB_MOUNT_POINTS": {
"type": "AR_MOUNT_POINT",
"value": []
},
"DAB_SHARED_MOUNT_POINTS": {
"type": "AR_MOUNT_POINT",
"value": []
},
"DEBUG_TOOLS": {
"type": "BOOL",
"value": false
},
"DEFAULT_CHACHA_GIT_BRANCH": {
"type": "STRING",
"value": "production"
},
"DayLength": {
"type": "STRING",
"value": "Default"
},
"ENABLE_WINE_ESYNC": {
"type": "BOOL",
"value": false
},
"ENABLE_WINE_FSYNC": {
"type": "BOOL",
"value": true
},
"ENABLE_WINE_PRELOADER": {
"type": "BOOL",
"value": false
},
"EXEC_USER": {
"type": "STRING",
"value": "GenUser"
},
"EXEC_USER_ID": {
"type": "UINT",
"value": 1000
},
"EXEC_USER_PASSWD": {
"type": "PASSWD",
"value": "######"
},
"EnableAutoReboot": {
"type": "BOOL",
"value": true
},
"EnableJava": {
"type": "BOOL",
"value": false
},
"EnableLog2Ram": {
"type": "BOOL",
"value": false
},
"EnemyAggression": {
"type": "STRING",
"value": "Normal"
},
"EnemyArmour": {
"type": "STRING",
"value": "Normal"
},
"EnemyDamage": {
"type": "STRING",
"value": "Normal"
},
"EnemyHealth": {
"type": "STRING",
"value": "Normal"
},
"EnemySpawn": {
"type": "BOOL",
"value": true
},
"FSSYNC_RECORD": {
"type": "T_ARRAY_FSSYNC_RECORD",
"value": [
{
"type": "T_FSSYNC_RECORD",
"value": {
"name": {
"type": "SIMPLE_STRING",
"value": "SOTF_map"
},
"type": {
"type": "SIMPLE_STRING",
"value": "fs"
},
"value": {
"type": "STRING",
"value": "test/test_data"
}
}
},
{
"type": "T_FSSYNC_RECORD",
"value": {
"name": {
"type": "SIMPLE_STRING",
"value": "SOTF_map2"
},
"type": {
"type": "SIMPLE_STRING",
"value": "fs"
},
"value": {
"type": "STRING",
"value": "test/test_data2/SAVE_FILE.txt"
}
}
}
]
},
"FSSync_NextCloud_Address": {
"type": "URL",
"value": "https://chacha.ddns.net/nextcloud"
},
"FSSync_NextCloud_Enabled": {
"type": "BOOL",
"value": true
},
"FSSync_NextCloud_Password": {
"type": "STRING",
"value": ""
},
"FSSync_NextCloud_Path": {
"type": "STRING",
"value": "pydabfactory-test"
},
"FSSync_NextCloud_User": {
"type": "STRING",
"value": "chacha-bot"
},
"FirstBootFilePath": {
"type": "STRING",
"value": "/firstboot.sh"
},
"ForcePython39": {
"type": "BOOL",
"value": false
},
"GAMETYPENAME": {
"type": "STRING",
"value": "sotf"
},
"GAME_MNG_DEFAULT_MODE": {
"type": "STRING",
"value": "BLACKLIST"
},
"GAME_MNG_LISTENING_PORT": {
"type": "UINT",
"value": 50000
},
"GAME_MNG_PWD": {
"type": "STRING",
"value": "cfographut"
},
"GAME_MNG_RESTART_DELAY": {
"type": "UINT",
"value": 30
},
"GamePassword": {
"type": "STRING",
"value": "!bourges2023"
},
"GamePort": {
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 8766
}
}
},
"HostName": {
"type": "STRING",
"value": "ChaCha - Sons Of The Forest Server"
},
"LanOnly": {
"type": "BOOL",
"value": false
},
"MEMLIMITHIGH": {
"type": "SYSTEMD_RAM",
"value": "10G"
},
"MEMLIMITMAX": {
"type": "SYSTEMD_RAM",
"value": "11G"
},
"MaxPlayers": {
"type": "UINT",
"value": 8
},
"PlayerStatsDamage": {
"type": "STRING",
"value": "Off"
},
"PrecipitationFrequency": {
"type": "STRING",
"value": "Default"
},
"QueryPort": {
"type": "EXT_LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "udp"
},
"value": {
"type": "UINT",
"value": 27018
}
}
},
"ReducedFoodInContainers": {
"type": "BOOL",
"value": false
},
"RootPasswd": {
"type": "ROOT_PASSWD",
"value": "######"
},
"SECTION": {
"type": "STRING",
"value": "games"
},
"SSH_PORT": {
"type": "LISTEN_PORT",
"value": {
"port_type": {
"type": "STRING",
"value": "tcp"
},
"value": {
"type": "UINT",
"value": 50020
}
}
},
"STEAM_APP_ID": {
"type": "UINT",
"value": 2465200
},
"STEAM_LOGIN": {
"type": "STRING",
"value": "cclecle"
},
"STEAM_PWD": {
"type": "PASSWD",
"value": "######"
},
"SaveInterval": {
"type": "UINT",
"value": 600
},
"SaveSlot": {
"type": "UINT",
"value": 1
},
"SeasonLength": {
"type": "STRING",
"value": "Default"
},
"SingleUseContainers": {
"type": "BOOL",
"value": false
},
"SkipNetworkAccessibilityTest": {
"type": "BOOL",
"value": true
},
"StartingSeason": {
"type": "STRING",
"value": "Summer"
},
"StructureDamage": {
"type": "BOOL",
"value": false
},
"SystemDJournalMaxSize": {
"type": "STRING",
"value": "40M"
},
"TreeRegrowth": {
"type": "BOOL",
"value": true
},
"WINEPREFIX": {
"type": "STRING",
"value": "DEFAULT"
},
"WINE_ARCH": {
"type": "STRING",
"value": "win64"
},
"WINE_NAME": {
"type": "STRING",
"value": "wine-9.4-staging-tkg-amd64"
},
"WINE_URL": {
"type": "URL",
"value": "https://github.com/Kron4ek/Wine-Builds/releases/download/9.4/wine-9.4-staging-tkg-amd64.tar.xz"
},
"locale": {
"type": "STRING",
"value": "fr_FR.UTF-8"
},
"locale_gen": {
"type": "STRING",
"value": "fr_FR.UTF-8 UTF-8"
},
"timezone": {
"type": "STRING",
"value": "Europe/Paris"
}
},
"NOBOOTSTRAP": false,
"NOFINALIZE": false,
"NOSTART": false,
"Params": {
"AR_CFG_OPT": {
"modified": false,
"value": []
},
"AR_TAGS": {
"modified": true,
"value": [
{
"value": "pydabfactory"
},
{
"value": "debianbase"
},
{
"value": "pydabfactory"
},
{
"value": "chacha"
},
{
"value": "pydabfactory"
},
{
"value": "games"
},
{
"value": "pydabfactory"
},
{
"value": "sotf"
}
]
},
"AUTOSTART": {
"modified": true,
"value": true
},
"CPU_COUNT": {
"modified": true,
"value": 2
},
"CPU_UNIT": {
"modified": true,
"value": 1536
},
"DEST_NODE": {
"modified": true,
"value": "hypervisor2"
},
"FEATURE_FUSE": {
"modified": false,
"value": false
},
"FEATURE_MKNODE": {
"modified": false,
"value": false
},
"FEATURE_NESTING": {
"modified": false,
"value": false
},
"MAIN_MACADDR": {
"modified": true,
"value": "D2:A9:59:72:C4:B4"
},
"NETWORK_BRIDGE": {
"modified": true,
"value": "vmbr1"
},
"PRIVILEGIED": {
"modified": false,
"value": false
},
"RAM_M": {
"modified": true,
"value": 12000
},
"ROOTFS_SIZE_G": {
"modified": true,
"value": 20
},
"RUNNING_STORAGE": {
"modified": true,
"value": "VMStore2"
},
"SWAP_M": {
"modified": true,
"value": 2048
},
"TEMPLATE_STORAGE": {
"modified": true,
"value": "live-storage-h2"
}
},
"REFERENCE_CONFIG_ID": "cf698a62-120a-11ee-be56-0242ac120002",
"VIRTUAL": false,
"creation_date": "2024-03-24T19:07:48.862542"
}

View File

@@ -0,0 +1,25 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {
"FSSync_NextCloud_Address": {
"type": "URL",
"value": "https://chacha.ddns.net/nextcloud"
},
"FSSync_NextCloud_Enabled": {
"type": "BOOL",
"value": false
},
"FSSync_NextCloud_Password": {
"type": "STRING",
"value": ""
},
"FSSync_NextCloud_Path": {
"type": "STRING",
"value": "pydabfactory"
},
"FSSync_NextCloud_User": {
"type": "STRING",
"value": "chacha-bot"
}
}
}

View File

@@ -0,0 +1,9 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {
"FSSync_NextCloud_Enabled": {
"type": "BOOL",
"value": true
}
}
}

View File

@@ -1,35 +0,0 @@
# dabdatasync (c) by chacha
#
# dabdatasync is licensed under a
# Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License.
#
# You should have received a copy of the license along with this
# work. If not, see <https://creativecommons.org/licenses/by-nc-sa/4.0/>.
import unittest
from os import chdir
from io import StringIO
from contextlib import redirect_stdout,redirect_stderr
from pathlib import Path
print(__name__)
print(__package__)
from src import dabdatasync
testdir_path = Path(__file__).parent.resolve()
chdir(testdir_path.parent.resolve())
class Testtest_module(unittest.TestCase):
def setUp(self) -> None:
chdir(testdir_path.parent.resolve())
def test_version(self):
self.assertNotEqual(dabdatasync.__version__,"?.?.?")
def test_test_module(self):
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
self.assertEqual(dabdatasync.test_function(41),42)
self.assertEqual(len(capted_stderr.getvalue()),0)
self.assertEqual(capted_stdout.getvalue().strip(),"Hello world !")