9 Commits

Author SHA1 Message Date
cclecle
f9c03a83f3 chore: fix export and nextcloud pwd script
test: fix typing
2024-03-31 02:04:08 +01:00
cclecle
f4ac5360b7 fix nextcloud password setting
add missing function for help
2024-03-31 00:45:55 +00:00
cclecle
9581772c9e chore: extract nextcloud password from source files 2024-03-31 00:36:34 +00:00
cclecle
da10ea2c19 feat: improve help for subparsers 2024-03-31 00:36:10 +00:00
cclecle
842b8b6a5c fix main 2024-03-29 12:15:40 +00:00
cclecle
5975be2ca5 chore: update project keywords 2024-03-29 03:34:56 +00:00
cclecle
2d753205ca feat: implement verbosity 2024-03-29 03:25:18 +00:00
cclecle
872dab1ed9 chore: optimize imports
fix: remove useless printf
2024-03-29 03:11:08 +00:00
cclecle
96d8f8fdd7 fix webdavclient3 version dep 2024-03-29 02:55:08 +00:00
13 changed files with 667 additions and 556 deletions

3
.gitignore vendored
View File

@@ -43,4 +43,5 @@ helpers-results
/.mypy_cache/
.coverage
.mypy_cache
test/tmp
test/tmp
test/nextcloud_pwd.mdp

View File

@@ -18,7 +18,7 @@ name = "dabdatasync"
description = "dabdatasync"
readme = "README.md"
requires-python = ">=3.11"
keywords = ["chacha","chacha","template","dabdatasync"]
keywords = ["chacha","dabdatasync","dab","debian","proxmox","pydabfactory"]
license = { file = "LICENSE.md" }
authors = [
@@ -35,7 +35,7 @@ classifiers = [
dependencies = [
'importlib-metadata; python_version<"3.9"',
'packaging',
'webdavclient3==1.*',
'webdavclient3==3.14.*',
'pydantic==2.*',
'typed-argument-parser==1.*',
'loguru==0.7.*',

View File

@@ -15,10 +15,12 @@ from typing import cast, Union, Optional
import sys
from tap import Tap
from loguru import logger
from . import __Summuary__, __Name__
from . import __Summuary__, __Name__,__version__
from . import datasync
from . import exceptions
from . import compressors
class dabdatasync_args_GetServices(Tap):
@@ -35,7 +37,7 @@ class dabdatasync_args_service_abstract(Tap):
service: Optional[str] = None
def configure(self) -> None:
self.add_argument("--service")
self.add_argument("--service",help=f"specify the service to use (availables: {datasync.DataSync_Factory.get_list()}, first found in priority if not set)")
class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract):
@@ -45,7 +47,7 @@ class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstra
def configure(self) -> None:
super().configure()
self.add_argument("--compressor")
self.add_argument("--compressor",help=f"specify the compressor method (availables: {compressors.DataSync_Compressors.get_list()}, .tar.gz if not set)")
class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract):
@@ -63,12 +65,16 @@ class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract):
class dabdatasync_args(Tap):
"""Main CLI arg parser"""
verbosity: int = 0
verbosity: int = 0
help: bool = False
version: bool = False
def configure(self) -> None:
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_argument("--version", action="store_true", help="show version string")
self.add_argument("-h", "--help", action="store_true", help="full help")
self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity")
self.add_subparsers(dest="command", help="command type", required=True)
self.add_subparsers(dest="command", help="command type")
self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list")
self.add_subparser("PullData", dabdatasync_args_PullData, help="Pull data from the service")
self.add_subparser("PushData", dabdatasync_args_PushData, help="Push data to the service")
@@ -85,9 +91,32 @@ def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,to
parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__)
args: dabdatasync_args = parser.parse_args(i_args)
if args.help or args.version:
print(f"{__Name__} version {__version__}")
if args.help:
print(parser.format_help())
for name, subparser in parser._subparsers.choices.items(): # type: ignore[union-attr]
print("=========================")
print(f"Subparser {{{name}}}")
print(subparser.format_help())
if args.help or args.version:
exit(0)
logger.remove()
if args.verbosity:
pass
if args.verbosity == 1:
logger.add(sys.stdout, level="WARNING")
elif args.verbosity == 2:
logger.add(sys.stdout, level="INFO")
else:
logger.add(sys.stdout, level="DEBUG")
else:
logger.add(sys.stdout, level="ERROR")
logger.add(sys.stderr, level="ERROR")
dabdatasync = datasync.DataSync_Factory.get_DataSync()
if len(dabdatasync) == 0:
@@ -133,13 +162,13 @@ def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,to
selected_dabdatasync.wipe_remote_data()
return
raise RuntimeError("Invalid argument")
raise RuntimeError("a command is required")
def CLI():
"""wrapper for .toml declared script"""
fct_main(sys.argv)
fct_main(sys.argv[1:])
if __name__ == "__main__":
fct_main(sys.argv[1:])
CLI()

View File

@@ -69,15 +69,17 @@ class DataSync_Compressors:
@classmethod
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
"""get a specific compressor"""
print([_.compressor_name for _ in cls._availables])
print(compressor_name)
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
if len(found) == 0:
raise DataSyncException_CompressorNotFound()
if len(found) == 1:
return found[0]
raise DataSyncException_TooManyCompressorFound()
@classmethod
def get_list(cls) -> list[str]:
"""return the available compressor name list"""
return [ _.compressor_name for _ in cls._availables]
@DataSync_Compressors.register
class DataSync_Compressor__tar_gz(A_DataSync_Compressor):

View File

@@ -21,17 +21,19 @@ from typing import final, TYPE_CHECKING
from loguru import logger
from .records import A_DataSync_Record, DataSync_Record_Factory
from .compressors import A_DataSync_Compressor, DataSync_Compressor__tar_gz, DataSync_Compressors
from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors
from .exceptions import DataSyncException_RemoteDataNotFound
if TYPE_CHECKING:
from typing import Optional, IO, Any, Self
from .compressors import A_DataSync_Compressor
class A_DataSync(ABC):
"""Abstract DataSync class"""
service_name: str = "ABSTRACT"
priority: int = 0
@classmethod
@final
@@ -177,3 +179,8 @@ class DataSync_Factory:
"""decorator to register a concrete class to the factory"""
cls.ar_cls_DataSync.add(_cls)
return _cls
@classmethod
def get_list(cls) -> list[str]:
"""return the available DataSync concrete class name list"""
return [ _.service_name for _ in cls.ar_cls_DataSync]

View File

@@ -19,12 +19,12 @@ from webdav3.client import Client as webdav3_Client
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
from .datasync import A_DataSync, DataSync_Factory
from .compressors import A_DataSync_Compressor
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
from .utils import urljoin
if TYPE_CHECKING:
from typing import Any, IO
from .compressors import A_DataSync_Compressor
@DataSync_Factory.register
@@ -32,6 +32,7 @@ class C_DataSync_NextCloud(A_DataSync):
"""Concrete DataSync class - Nextcloud"""
service_name: str = "Nextcloud"
priority: int = 100
def __init__(self, manifest: dict[Any, Any], cls_compressor: type[A_DataSync_Compressor]) -> None:
super().__init__(manifest, cls_compressor)

View File

@@ -20,10 +20,10 @@ from typing import TYPE_CHECKING, Optional
from pydantic import BaseModel
from .compressors import A_DataSync_Compressor
from .exceptions import DataSyncException_NoConcreteRecordClassFound
if TYPE_CHECKING:
from .compressors import A_DataSync_Compressor
from typing import IO

View File

@@ -10,9 +10,12 @@ import unittest
from os import chdir, path as os_path
from pathlib import Path
import pprint
import os
import glob
from io import StringIO
from contextlib import redirect_stdout, redirect_stderr
import shutil
import json
from contexttimer import Timer
print(__name__)
@@ -32,7 +35,28 @@ class TestDabDataSync(unittest.TestCase):
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data")
shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2")
nextcloud_pwd=""
if 'nextcloud_pwd' in os.environ:
nextcloud_pwd = os.environ.get("nextcloud_pwd")
elif os.path.isfile(testdir_path / "nextcloud_pwd.mdp"):
with open(testdir_path / "nextcloud_pwd.mdp","rt",encoding="utf-8") as pwd_file:
nextcloud_pwd=pwd_file.read()
else:
raise RuntimeError("NextCloud pwd file not found")
files = glob.glob(str(testdir_path / "*.json"))
files += glob.glob(str(testdir_path /"*.json"))
for file in files:
with open(file) as f:
data = json.load(f)
try:
data["Args"]["FSSync_NextCloud_Password"]["value"] = nextcloud_pwd
except Exception:
pass
os.remove(file)
with open(file, 'w') as f:
json.dump(data, f, sort_keys=True, indent=4)
def tearDown(self) -> None:
shutil.rmtree(testdir_path / "test_data", ignore_errors=True)
shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)
@@ -47,6 +71,31 @@ class TestDabDataSync(unittest.TestCase):
print(capted_stdout.getvalue())
print(capted_stderr.getvalue())
def test_cli_verbosity(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
fct_main(["WipeRemoteData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["PullData"])
self.assertEqual(capted_stdout.getvalue(), "")
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-v", "PullData"])
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertFalse("INFO" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
fct_main(["WipeLocalData"])
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
fct_main(["-vv", "PullData"])
self.assertTrue("INFO" in capted_stdout.getvalue())
self.assertTrue("WARNING" in capted_stdout.getvalue())
self.assertEqual(capted_stderr.getvalue(), "")
def test_cli_GetServices(self):
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:

View File

@@ -1,3 +1,4 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {}}
"Args": {}
}

View File

@@ -1,2 +1,3 @@
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002"}
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002"
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,25 @@
{
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {
"FSSync_NextCloud_Enabled": {"type": "BOOL", "value": false},
"FSSync_NextCloud_Address": {"type": "URL", "value": "https://chacha.ddns.net/nextcloud"},
"FSSync_NextCloud_User": {"type": "STRING", "value": "chacha-bot"},
"FSSync_NextCloud_Password": {"type": "STRING", "value": "F3P8m-nQHik-NSmb2-mnFEF-s85RE"},
"FSSync_NextCloud_Path": {"type": "STRING", "value": "pydabfactory"}
}}
"Args": {
"FSSync_NextCloud_Address": {
"type": "URL",
"value": "https://chacha.ddns.net/nextcloud"
},
"FSSync_NextCloud_Enabled": {
"type": "BOOL",
"value": false
},
"FSSync_NextCloud_Password": {
"type": "STRING",
"value": "F3P8m-nQHik-NSmb2-mnFEF-s85RE"
},
"FSSync_NextCloud_Path": {
"type": "STRING",
"value": "pydabfactory"
},
"FSSync_NextCloud_User": {
"type": "STRING",
"value": "chacha-bot"
}
}
}

View File

@@ -1,5 +1,9 @@
{
{
"APP_ID": "2a13dff2-1298-11ee-be56-0242ac120002",
"Args": {
"FSSync_NextCloud_Enabled": {"type": "BOOL", "value": true}
}}
"Args": {
"FSSync_NextCloud_Enabled": {
"type": "BOOL",
"value": true
}
}
}