Compare commits
2 Commits
0.2.0.post
...
0.2.0.post
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d753205ca | ||
|
|
872dab1ed9 |
@@ -15,6 +15,7 @@ from typing import cast, Union, Optional
|
||||
|
||||
import sys
|
||||
from tap import Tap
|
||||
from loguru import logger
|
||||
|
||||
from . import __Summuary__, __Name__
|
||||
from . import datasync
|
||||
@@ -86,8 +87,17 @@ def fct_main(i_args: list[str]) -> None: # pylint: disable=too-many-branches,to
|
||||
|
||||
args: dabdatasync_args = parser.parse_args(i_args)
|
||||
|
||||
logger.remove()
|
||||
if args.verbosity:
|
||||
pass
|
||||
if args.verbosity == 1:
|
||||
logger.add(sys.stdout, level="WARNING")
|
||||
elif args.verbosity == 2:
|
||||
logger.add(sys.stdout, level="INFO")
|
||||
else:
|
||||
logger.add(sys.stdout, level="DEBUG")
|
||||
else:
|
||||
logger.add(sys.stdout, level="ERROR")
|
||||
logger.add(sys.stderr, level="ERROR")
|
||||
|
||||
dabdatasync = datasync.DataSync_Factory.get_DataSync()
|
||||
if len(dabdatasync) == 0:
|
||||
|
||||
@@ -69,8 +69,6 @@ class DataSync_Compressors:
|
||||
@classmethod
|
||||
def get(cls, compressor_name: str) -> type[A_DataSync_Compressor]:
|
||||
"""get a specific compressor"""
|
||||
print([_.compressor_name for _ in cls._availables])
|
||||
print(compressor_name)
|
||||
found = [_ for _ in cls._availables if _.compressor_name == compressor_name]
|
||||
if len(found) == 0:
|
||||
raise DataSyncException_CompressorNotFound()
|
||||
|
||||
@@ -21,11 +21,12 @@ from typing import final, TYPE_CHECKING
|
||||
from loguru import logger
|
||||
|
||||
from .records import A_DataSync_Record, DataSync_Record_Factory
|
||||
from .compressors import A_DataSync_Compressor, DataSync_Compressor__tar_gz, DataSync_Compressors
|
||||
from .compressors import DataSync_Compressor__tar_gz, DataSync_Compressors
|
||||
from .exceptions import DataSyncException_RemoteDataNotFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Optional, IO, Any, Self
|
||||
from .compressors import A_DataSync_Compressor
|
||||
|
||||
|
||||
class A_DataSync(ABC):
|
||||
|
||||
@@ -19,12 +19,12 @@ from webdav3.client import Client as webdav3_Client
|
||||
from webdav3.exceptions import RemoteResourceNotFound as webdav3_RemoteResourceNotFound
|
||||
|
||||
from .datasync import A_DataSync, DataSync_Factory
|
||||
from .compressors import A_DataSync_Compressor
|
||||
from .exceptions import DataSyncException_InvalidManifest, DataSyncException_RemoteDataNotFound
|
||||
from .utils import urljoin
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from typing import Any, IO
|
||||
from .compressors import A_DataSync_Compressor
|
||||
|
||||
|
||||
@DataSync_Factory.register
|
||||
|
||||
@@ -20,10 +20,10 @@ from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from .compressors import A_DataSync_Compressor
|
||||
from .exceptions import DataSyncException_NoConcreteRecordClassFound
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .compressors import A_DataSync_Compressor
|
||||
from typing import IO
|
||||
|
||||
|
||||
|
||||
@@ -47,6 +47,31 @@ class TestDabDataSync(unittest.TestCase):
|
||||
print(capted_stdout.getvalue())
|
||||
print(capted_stderr.getvalue())
|
||||
|
||||
def test_cli_verbosity(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
fct_main(["WipeRemoteData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["PullData"])
|
||||
self.assertEqual(capted_stdout.getvalue(), "")
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["-v", "PullData"])
|
||||
self.assertTrue("WARNING" in capted_stdout.getvalue())
|
||||
self.assertFalse("INFO" in capted_stdout.getvalue())
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
fct_main(["WipeLocalData"])
|
||||
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
fct_main(["-vv", "PullData"])
|
||||
self.assertTrue("INFO" in capted_stdout.getvalue())
|
||||
self.assertTrue("WARNING" in capted_stdout.getvalue())
|
||||
self.assertEqual(capted_stderr.getvalue(), "")
|
||||
|
||||
def test_cli_GetServices(self):
|
||||
dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json"
|
||||
with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr:
|
||||
|
||||
Reference in New Issue
Block a user