# dabdatasync (c) by chacha # # dabdatasync is licensed under a # Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International Unported License. # # You should have received a copy of the license along with this # work. If not, see . import unittest from os import chdir, path as os_path from pathlib import Path import pprint import os import glob from io import StringIO from contextlib import redirect_stdout, redirect_stderr import shutil import json from contexttimer import Timer print(__name__) print(__package__) from src import dabdatasync from src.dabdatasync.__main__ import fct_main testdir_path = Path(__file__).parent.resolve() chdir(testdir_path.parent.resolve()) class TestDabDataSync(unittest.TestCase): def setUp(self) -> None: chdir(testdir_path.parent.resolve()) shutil.rmtree(testdir_path / "test_data", ignore_errors=True) shutil.rmtree(testdir_path / "test_data2", ignore_errors=True) shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data") shutil.copytree(testdir_path / "test_data_origin", testdir_path / "test_data2") nextcloud_pwd = "" if "nextcloud_pwd" in os.environ: nextcloud_pwd = os.environ.get("nextcloud_pwd") elif os.path.isfile(testdir_path / "nextcloud_pwd.mdp"): with open(testdir_path / "nextcloud_pwd.mdp", "rt", encoding="utf-8") as pwd_file: nextcloud_pwd = pwd_file.read() else: raise RuntimeError("NextCloud pwd file not found") files = glob.glob(str(testdir_path / "*.json")) files += glob.glob(str(testdir_path / "*.json")) for file in files: with open(file) as f: data = json.load(f) try: data["Args"]["FSSync_NextCloud_Password"]["value"] = nextcloud_pwd except Exception: pass os.remove(file) with open(file, "w") as f: json.dump(data, f, sort_keys=True, indent=4) def tearDown(self) -> None: shutil.rmtree(testdir_path / "test_data", ignore_errors=True) shutil.rmtree(testdir_path / "test_data2", ignore_errors=True) files = glob.glob(str(testdir_path / "*.json")) files += glob.glob(str(testdir_path / "*.json")) for file in files: with open(file) as f: data = json.load(f) try: data["Args"]["FSSync_NextCloud_Password"]["value"] = "" except Exception: pass os.remove(file) with open(file, "w") as f: json.dump(data, f, sort_keys=True, indent=4) def test_version(self): self.assertNotEqual(dabdatasync.__version__, "?.?.?") def test_cli_help(self): with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: with self.assertRaises(SystemExit): fct_main(["-h"]) print(capted_stdout.getvalue()) print(capted_stderr.getvalue()) def test_cli_verbosity(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" fct_main(["WipeRemoteData"]) with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: fct_main(["PullData"]) self.assertEqual(capted_stdout.getvalue(), "") self.assertEqual(capted_stderr.getvalue(), "") fct_main(["WipeLocalData"]) with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: fct_main(["-v", "PullData"]) self.assertTrue("WARNING" in capted_stdout.getvalue()) self.assertFalse("INFO" in capted_stdout.getvalue()) self.assertEqual(capted_stderr.getvalue(), "") fct_main(["WipeLocalData"]) with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: fct_main(["-vv", "PullData"]) self.assertTrue("INFO" in capted_stdout.getvalue()) self.assertTrue("WARNING" in capted_stdout.getvalue()) self.assertEqual(capted_stderr.getvalue(), "") def test_cli_GetServices(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: fct_main(["GetServices"]) print(capted_stdout.getvalue()) print(capted_stderr.getvalue()) self.assertTrue("Nextcloud" in capted_stdout.getvalue()) self.assertEqual(capted_stderr.getvalue(), "") def test_cli_GetServices_noservice(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: with self.assertRaises(dabdatasync.DataSyncException_NoValidServiceFound): fct_main(["GetServices"]) self.assertEqual(capted_stdout.getvalue(), "") self.assertEqual(capted_stderr.getvalue(), "") def test_cli_GetServices_invalid(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json" with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): fct_main(["GetServices"]) self.assertEqual(capted_stdout.getvalue(), "") self.assertEqual(capted_stderr.getvalue(), "") def test_cli_GetServices_nextcloud_invalid(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json" with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): fct_main(["GetServices"]) self.assertEqual(capted_stdout.getvalue(), "") self.assertEqual(capted_stderr.getvalue(), "") def test_cli_WipeLocalData(self): with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with redirect_stdout(StringIO()) as capted_stdout, redirect_stderr(StringIO()) as capted_stderr: fct_main(["WipeLocalData"]) self.assertEqual(capted_stdout.getvalue(), "") self.assertEqual(capted_stderr.getvalue(), "") self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) def test_cli_simple(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") fct_main(["PushData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE2") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE2") fct_main(["PullData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE32") fct_main(["PushData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["PullData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["WipeRemoteData"]) fct_main(["PullData"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["PushData"]) fct_main(["WipeLocalData"]) self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) fct_main(["PullData"]) self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") def test_defect_cli_select_wrong_service(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with self.assertRaises(dabdatasync.DataSyncException_ServiceNotFound): fct_main(["PullData", "--service", "WRONGSERVICE"]) def test_cli_select_service(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") fct_main(["PushData", "--service", "Nextcloud"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE2") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE2") fct_main(["PullData", "--service", "Nextcloud"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE32") fct_main(["PushData", "--service", "Nextcloud"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["PullData", "--service", "Nextcloud"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["WipeRemoteData", "--service", "Nextcloud"]) fct_main(["PullData", "--service", "Nextcloud"]) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") fct_main(["PushData", "--service", "Nextcloud"]) fct_main(["WipeLocalData"]) self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) fct_main(["PullData", "--service", "Nextcloud"]) self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") def test_load_remote_empty(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() datasync[0].wipe_remote_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") def load_nextcloud_gen(self, compressor): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() datasync[0].set_compressor(compressor) self.assertIsInstance(datasync, list) self.assertEqual(len(datasync), 1) self.assertIsInstance(datasync[0], dabdatasync.A_DataSync) self.assertIsInstance(datasync[0], dabdatasync.C_DataSync_NextCloud) self.assertEqual(len(datasync[0].get_datasync_records()), 2) self.assertEqual(datasync[0].get_datasync_records()[0].name, "SOTF_map") self.assertEqual(datasync[0].get_datasync_records()[1].name, "SOTF_map2") self.assertEqual(datasync[0].get_datasync_records()[0].rec_type, "fs") self.assertEqual(datasync[0].get_datasync_records()[1].rec_type, "fs") self.assertEqual(datasync[0].get_datasync_records()[0].value, "test/test_data") self.assertEqual(datasync[0].get_datasync_records()[1].value, "test/test_data2/SAVE_FILE.txt") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") datasync[0].push_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE2") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE2") datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "SAVED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE32") datasync[0].push_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE3") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE32") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "w", encoding="utf-8") as testfile: testfile.write("MODIFIED_VALUE") with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") datasync[0].wipe_remote_data() datasync[0].pull_data() with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") datasync[0].push_data() datasync[0].wipe_local_data() self.assertFalse(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertFalse(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) datasync[0].pull_data() self.assertTrue(os_path.isfile(testdir_path / "test_data" / "SAVE_FILE.txt")) self.assertTrue(os_path.isfile(testdir_path / "test_data2" / "SAVE_FILE.txt")) with open(testdir_path / "test_data" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") with open(testdir_path / "test_data2" / "SAVE_FILE.txt", "rt", encoding="utf-8") as testfile: self.assertEqual(testfile.read(), "MODIFIED_VALUE") def test_load_nextcloud__tar_gz(self): with Timer() as t: self.load_nextcloud_gen("tar_gz") print(t.elapsed) def test_load_nextcloud__tar_lz4(self): with Timer() as t: self.load_nextcloud_gen("tar_lz4") print(t.elapsed) def test_load_empty(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_empty.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() self.assertIsInstance(datasync, list) self.assertEqual(len(datasync), 0) def test_load_nextcloud_disabled(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_disabled.json" datasync = dabdatasync.DataSync_Factory.get_DataSync() self.assertIsInstance(datasync, list) self.assertEqual(len(datasync), 0) def test_defect_load_invalid(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_invalid.json" with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): dabdatasync.DataSync_Factory.get_DataSync() def test_defect_load_nextcloud_invalid(self): dabdatasync.DataSync_Factory.manifest_path = testdir_path / "test_manifest_nextcloud_invalid.json" with self.assertRaises(dabdatasync.DataSyncException_InvalidManifest): dabdatasync.DataSync_Factory.get_DataSync()