diff --git a/src/dabdatasync/__main__.py b/src/dabdatasync/__main__.py index 45c4acb..a4dfb7e 100644 --- a/src/dabdatasync/__main__.py +++ b/src/dabdatasync/__main__.py @@ -17,7 +17,7 @@ import sys from tap import Tap from loguru import logger -from . import __Summuary__, __Name__,__version__ +from . import __Summuary__, __Name__, __version__ from . import datasync from . import exceptions from . import compressors @@ -37,7 +37,10 @@ class dabdatasync_args_service_abstract(Tap): service: Optional[str] = None def configure(self) -> None: - self.add_argument("--service",help=f"specify the service to use (availables: {datasync.DataSync_Factory.get_list()}, first found in priority if not set)") + self.add_argument( + "--service", + help=f"specify the service to use (availables: {datasync.DataSync_Factory.get_list()}, first found in priority if not set)", + ) class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstract): @@ -47,7 +50,10 @@ class dabdatasync_args_service_compress_abstract(dabdatasync_args_service_abstra def configure(self) -> None: super().configure() - self.add_argument("--compressor",help=f"specify the compressor method (availables: {compressors.DataSync_Compressors.get_list()}, .tar.gz if not set)") + self.add_argument( + "--compressor", + help=f"specify the compressor method (availables: {compressors.DataSync_Compressors.get_list()}, .tar.gz if not set)", + ) class dabdatasync_args_PullData(dabdatasync_args_service_compress_abstract): @@ -65,14 +71,14 @@ class dabdatasync_args_WipeRemoteData(dabdatasync_args_service_abstract): class dabdatasync_args(Tap): """Main CLI arg parser""" - verbosity: int = 0 - help: bool = False - version: bool = False - + verbosity: int = 0 + help: bool = False + version: bool = False + def configure(self) -> None: - self.add_argument("--version", action="store_true", help="show version string") - self.add_argument("-h", "--help", action="store_true", help="full help") - self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity") + self.add_argument("--version", action="store_true", help="show version string") + self.add_argument("-h", "--help", action="store_true", help="full help") + self.add_argument("-v", "--verbosity", action="count", help="increase output verbosity") self.add_subparsers(dest="command", help="command type") self.add_subparser("GetServices", dabdatasync_args_GetServices, help="Get registered services list") @@ -91,21 +97,21 @@ def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,to parser: dabdatasync_args = dabdatasync_args(prog=__Name__, description=__Summuary__) args: dabdatasync_args = parser.parse_args(i_args) - + if args.help or args.version: print(f"{__Name__} version {__version__}") - + if args.help: print(parser.format_help()) - - for name, subparser in parser._subparsers.choices.items(): # type: ignore[union-attr] + + for name, subparser in parser._subparsers.choices.items(): # type: ignore[union-attr] print("=========================") print(f"Subparser {{{name}}}") print(subparser.format_help()) - + if args.help or args.version: exit(0) - + logger.remove() if args.verbosity: if args.verbosity == 1: diff --git a/src/dabdatasync/__metadata__.py b/src/dabdatasync/__metadata__.py index 8ba1eab..a2e9dd1 100644 --- a/src/dabdatasync/__metadata__.py +++ b/src/dabdatasync/__metadata__.py @@ -15,7 +15,7 @@ import warnings try: # pragma: no cover __version__ = version("dabdatasync") -except PackageNotFoundError: # pragma: no cover +except PackageNotFoundError: # pragma: no cover warnings.warn("can not read __version__, assuming local test context, setting it to ?.?.?") __version__ = "?.?.?" diff --git a/src/dabdatasync/compressors.py b/src/dabdatasync/compressors.py index 9ecbc45..ae8db6c 100644 --- a/src/dabdatasync/compressors.py +++ b/src/dabdatasync/compressors.py @@ -75,11 +75,12 @@ class DataSync_Compressors: if len(found) == 1: return found[0] raise DataSyncException_TooManyCompressorFound() - + @classmethod def get_list(cls) -> list[str]: """return the available compressor name list""" - return [ _.compressor_name for _ in cls._availables] + return [_.compressor_name for _ in cls._availables] + @DataSync_Compressors.register class DataSync_Compressor__tar_gz(A_DataSync_Compressor): diff --git a/src/dabdatasync/datasync.py b/src/dabdatasync/datasync.py index 15bcf72..56de217 100644 --- a/src/dabdatasync/datasync.py +++ b/src/dabdatasync/datasync.py @@ -183,4 +183,4 @@ class DataSync_Factory: @classmethod def get_list(cls) -> list[str]: """return the available DataSync concrete class name list""" - return [ _.service_name for _ in cls.ar_cls_DataSync] + return [_.service_name for _ in cls.ar_cls_DataSync] diff --git a/test/__init__.py b/test/__init__.py index a5f7a5a..d0d551c 100644 --- a/test/__init__.py +++ b/test/__init__.py @@ -4,4 +4,4 @@ # Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. # # You should have received a copy of the license along with this -# work. If not, see . \ No newline at end of file +# work. If not, see . diff --git a/test/test_datasync.py b/test/test_datasync.py index 1a7bf2d..45ce3b1 100644 --- a/test/test_datasync.py +++ b/test/test_datasync.py @@ -35,28 +35,28 @@ class TestDabDataSync(unittest.TestCase): shutil.rmtree(testdir_path / "test_data2", ignore_errors=True) shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data") shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2") - - nextcloud_pwd="" - if 'nextcloud_pwd' in os.environ: + + nextcloud_pwd = "" + if "nextcloud_pwd" in os.environ: nextcloud_pwd = os.environ.get("nextcloud_pwd") elif os.path.isfile(testdir_path / "nextcloud_pwd.mdp"): - with open(testdir_path / "nextcloud_pwd.mdp","rt",encoding="utf-8") as pwd_file: - nextcloud_pwd=pwd_file.read() + with open(testdir_path / "nextcloud_pwd.mdp", "rt", encoding="utf-8") as pwd_file: + nextcloud_pwd = pwd_file.read() else: raise RuntimeError("NextCloud pwd file not found") files = glob.glob(str(testdir_path / "*.json")) - files += glob.glob(str(testdir_path /"*.json")) + files += glob.glob(str(testdir_path / "*.json")) for file in files: with open(file) as f: - data = json.load(f) + data = json.load(f) try: data["Args"]["FSSync_NextCloud_Password"]["value"] = nextcloud_pwd except Exception: pass os.remove(file) - with open(file, 'w') as f: + with open(file, "w") as f: json.dump(data, f, sort_keys=True, indent=4) - + def tearDown(self) -> None: shutil.rmtree(testdir_path / "test_data", ignore_errors=True) shutil.rmtree(testdir_path / "test_data2", ignore_errors=True)