diff --git a/examples/json.cfg b/examples/json.cfg index 791c258..908ddb0 100644 --- a/examples/json.cfg +++ b/examples/json.cfg @@ -1,20 +1,23 @@ -[ - { - "Name": "BtrFS Backup Example", - "UUID": "12345678-1234-5678-1234-567812345678", - "DevicePassCmd": "echo device-password", - "BackupRepositoryFolder": "ButterBackupRepo", - "Compression": "zstd:3", - "Folders": { "/tmp": "temp-files" }, - "Files": [], - "FilesDest": "single-files" - }, - { - "Name": "Restic Backup Example", - "UUID": "87654321-4321-8765-4321-876543218765", - "DevicePassCmd": "echo device-password", - "BackupRepositoryFolder": "ResticRepo", - "RepositoryPassCmd": "echo repo-password", - "FilesAndFolders": ["/tmp"] - } -] +{ + "SudoPassCmd": "gpg --decrypt ~/.local/share/passwords/sudo-password.gpg", + "DeviceConfigurations": [ + { + "Name": "BtrFS Backup Example", + "UUID": "12345678-1234-5678-1234-567812345678", + "DevicePassCmd": "echo device-password", + "BackupRepositoryFolder": "ButterBackupRepo", + "Compression": "zstd:3", + "Folders": { "/tmp": "temp-files" }, + "Files": [], + "FilesDest": "single-files" + }, + { + "Name": "Restic Backup Example", + "UUID": "87654321-4321-8765-4321-876543218765", + "DevicePassCmd": "echo device-password", + "BackupRepositoryFolder": "ResticRepo", + "RepositoryPassCmd": "echo repo-password", + "FilesAndFolders": ["/tmp"] + } + ] +} diff --git a/examples/json5.cfg b/examples/json5.cfg index 9d9b21d..f8961cf 100644 --- a/examples/json5.cfg +++ b/examples/json5.cfg @@ -1,21 +1,24 @@ // JSON5 allows comments and trailing commas -[ - { - Name: "BtrFS Backup Example", - UUID: "12345678-1234-5678-1234-567812345678", - DevicePassCmd: "echo device-password", - BackupRepositoryFolder: "ButterBackupRepo", - Compression: "zstd:3", - Folders: { "/tmp": "temp-files" }, - Files: [], - FilesDest: "single-files", - }, - { - Name: "Restic Backup Example", - UUID: "87654321-4321-8765-4321-876543218765", - DevicePassCmd: "echo device-password", - BackupRepositoryFolder: "ResticRepo", - RepositoryPassCmd: "echo repo-password", - FilesAndFolders: ["/tmp"], - }, -] +{ + SudoPassCmd: "gpg --decrypt ~/.local/share/passwords/sudo-password.gpg", + DeviceConfigurations: [ + { + Name: "BtrFS Backup Example", + UUID: "12345678-1234-5678-1234-567812345678", + DevicePassCmd: "echo device-password", + BackupRepositoryFolder: "ButterBackupRepo", + Compression: "zstd:3", + Folders: { "/tmp": "temp-files" }, + Files: [], + FilesDest: "single-files", + }, + { + Name: "Restic Backup Example", + UUID: "87654321-4321-8765-4321-876543218765", + DevicePassCmd: "echo device-password", + BackupRepositoryFolder: "ResticRepo", + RepositoryPassCmd: "echo repo-password", + FilesAndFolders: ["/tmp"], + }, + ], +} diff --git a/examples/toml.cfg b/examples/toml.cfg index 9a92e10..f7e1ee1 100644 --- a/examples/toml.cfg +++ b/examples/toml.cfg @@ -1,7 +1,10 @@ # TOML configuration for butter-backup -# Each [[DEVICE_CONFIGURATION]] section defines one device configuration +# Each [[butter-backup.device-configurations]] section defines one device configuration -[[DEVICE_CONFIGURATION]] +[butter-backup] +SudoPassCmd = "gpg --decrypt ~/.local/share/passwords/sudo-password.gpg" + +[[butter-backup.device-configurations]] Name = "BtrFS Backup Example" UUID = "12345678-1234-5678-1234-567812345678" DevicePassCmd = "echo device-password" @@ -10,10 +13,10 @@ Compression = "zstd:3" Files = [] FilesDest = "single-files" -[DEVICE_CONFIGURATION.Folders] +[butter-backup.device-configurations.Folders] "/tmp" = "temp-files" -[[DEVICE_CONFIGURATION]] +[[butter-backup.device-configurations]] Name = "Restic Backup Example" UUID = "87654321-4321-8765-4321-876543218765" DevicePassCmd = "echo device-password" diff --git a/examples/yaml.cfg b/examples/yaml.cfg index f0299b5..628ba57 100644 --- a/examples/yaml.cfg +++ b/examples/yaml.cfg @@ -1,18 +1,20 @@ --- -- Name: "BtrFS Backup Example" - UUID: "12345678-1234-5678-1234-567812345678" - DevicePassCmd: "echo device-password" - BackupRepositoryFolder: "ButterBackupRepo" - Compression: "zstd:3" - Folders: - /tmp: "temp-files" - Files: [] - FilesDest: "single-files" +SudoPassCmd: "gpg --decrypt ~/.local/share/passwords/sudo-password.gpg" +DeviceConfigurations: + - Name: "BtrFS Backup Example" + UUID: "12345678-1234-5678-1234-567812345678" + DevicePassCmd: "echo device-password" + BackupRepositoryFolder: "ButterBackupRepo" + Compression: "zstd:3" + Folders: + /tmp: "temp-files" + Files: [] + FilesDest: "single-files" -- Name: "Restic Backup Example" - UUID: "87654321-4321-8765-4321-876543218765" - DevicePassCmd: "echo device-password" - BackupRepositoryFolder: "ResticRepo" - RepositoryPassCmd: "echo repo-password" - FilesAndFolders: - - /tmp + - Name: "Restic Backup Example" + UUID: "87654321-4321-8765-4321-876543218765" + DevicePassCmd: "echo device-password" + BackupRepositoryFolder: "ResticRepo" + RepositoryPassCmd: "echo repo-password" + FilesAndFolders: + - /tmp diff --git a/src/butter_backup/backup_backends.py b/src/butter_backup/backup_backends.py index 14a64e9..c26ee75 100644 --- a/src/butter_backup/backup_backends.py +++ b/src/butter_backup/backup_backends.py @@ -11,9 +11,16 @@ from . import config_parser as cp +def _refresh_sudo(sudo_pass_cmd: str | None) -> None: + if sudo_pass_cmd is not None: + sh.pipe_pass_cmd_to_real_cmd( + sudo_pass_cmd, ["sudo", "-Sv"], capture_output=True + ) + + class BackupBackend(abc.ABC): @abc.abstractmethod - def do_backup(self, mount_dir: Path) -> None: ... + def do_backup(self, mount_dir: Path, sudo_pass_cmd: str | None = None) -> None: ... @overload @staticmethod @@ -37,7 +44,7 @@ def from_config( class BtrFSRsyncBackend(BackupBackend): config: cp.BtrFSRsyncConfig - def do_backup(self, mount_dir: Path) -> None: + def do_backup(self, mount_dir: Path, sudo_pass_cmd: str | None = None) -> None: logger.info(f"Beginne mit BtrFS-Backup für Speichermedium {self.config.Name}.") backup_repository = mount_dir / self.config.BackupRepositoryFolder src_snapshot = self.get_source_snapshot(backup_repository) @@ -45,10 +52,12 @@ def do_backup(self, mount_dir: Path) -> None: backup_root = self.snapshot( src=src_snapshot, backup_repository=backup_repository ) + _refresh_sudo(sudo_pass_cmd) self.adapt_ownership(backup_root) for src, dest_name in self.config.Folders.items(): dest = backup_root / dest_name + _refresh_sudo(sudo_pass_cmd) self.rsync_folder(src, dest, self.config.ExcludePatternsFile) files_dest = backup_root / self.config.FilesDest @@ -56,6 +65,7 @@ def do_backup(self, mount_dir: Path) -> None: files_dest.unlink() files_dest.mkdir(parents=True, exist_ok=True) for src in self.config.Files: + _refresh_sudo(sudo_pass_cmd) self.rsync_file(src, files_dest) @staticmethod @@ -129,10 +139,12 @@ def rsync_folder( class ResticBackend(BackupBackend): config: cp.ResticConfig - def do_backup(self, mount_dir: Path) -> None: + def do_backup(self, mount_dir: Path, sudo_pass_cmd: str | None = None) -> None: logger.info(f"Beginne mit Restic-Backup für Speichermedium {self.config.Name}.") backup_repository = mount_dir / self.config.BackupRepositoryFolder + _refresh_sudo(sudo_pass_cmd) self.copy_files(backup_repository) + _refresh_sudo(sudo_pass_cmd) self.adapt_ownership(backup_repository) @staticmethod diff --git a/src/butter_backup/cli.py b/src/butter_backup/cli.py index 71dd2a4..0725dae 100644 --- a/src/butter_backup/cli.py +++ b/src/butter_backup/cli.py @@ -9,6 +9,7 @@ from tempfile import mkdtemp from typing import Any, Callable +import shell_interface as sh import storage_device_managers as sdm import typer from loguru import logger @@ -70,11 +71,18 @@ def _get_default_file_system(backend: ValidBackends) -> ValidFileSystems: t.assert_never(backend) +def _refresh_sudo(sudo_pass_cmd: str | None) -> None: + if sudo_pass_cmd is not None: + sh.pipe_pass_cmd_to_real_cmd( + sudo_pass_cmd, ["sudo", "-Sv"], capture_output=True + ) + + def _skip_device( - config: cp.Configuration, + config: cp.DeviceConfiguration, *, - log_missing: Callable[[cp.Configuration], None] | None = None, - log_opened: Callable[[cp.Configuration], None] | None = None, + log_missing: Callable[[cp.DeviceConfiguration], None] | None = None, + log_opened: Callable[[cp.DeviceConfiguration], None] | None = None, ) -> bool: """ Helper function to determine whether a device should be skipped. @@ -102,10 +110,13 @@ def _skip_device( VERBOSITY_OPTION = typer.Option(0, "--verbose", "-v", count=True) -def _open_device(cfg: cp.Configuration, base_dir: Path) -> None: +def _open_device( + cfg: cp.DeviceConfiguration, base_dir: Path, sudo_pass_cmd: str | None +) -> None: mount_dir = base_dir / cfg.Name mount_dir.mkdir(exist_ok=True) try: + _refresh_sudo(sudo_pass_cmd) decrypted = sdm.open_encrypted_device(cfg.device(), cfg.DevicePassCmd) sdm.mount_device(decrypted, mount_dir=mount_dir, compression=cfg.compression()) except: @@ -143,9 +154,9 @@ def open( # noqa: A001 Andernfalls wird ein temporäres Verzeichnis erstellt. """ setup_logging(verbose) - configurations = cp.parse_configuration(config.read_text()) + parsed_config = cp.parse_configuration(config.read_text()) base_dir = dest if dest is not None else Path(mkdtemp()) - for cfg in configurations: + for cfg in parsed_config.DeviceConfigurations: if _skip_device( cfg, log_opened=lambda cfg: logger.warning( @@ -153,7 +164,7 @@ def open( # noqa: A001 ), ): continue - _open_device(cfg, base_dir) + _open_device(cfg, base_dir, parsed_config.SudoPassCmd) @app.command() @@ -166,9 +177,9 @@ def close(config: Path = CONFIG_OPTION, verbose: int = VERBOSITY_OPTION) -> None `open`. Weitere Erklärungen finden sich dort. """ setup_logging(verbose) - configurations = cp.parse_configuration(config.read_text()) + parsed_config = cp.parse_configuration(config.read_text()) mounted_devices = sdm.get_mounted_devices() - for cfg in configurations: + for cfg in parsed_config.DeviceConfigurations: map_name = cfg.map_name() map_name_as_str = str(map_name) if cfg.device().exists() and map_name_as_str in mounted_devices: @@ -182,6 +193,7 @@ def close(config: Path = CONFIG_OPTION, verbose: int = VERBOSITY_OPTION) -> None device=cfg.Name, ) continue + _refresh_sudo(parsed_config.SudoPassCmd) sdm.unmount_device(map_name) sdm.close_decrypted_device(map_name) @@ -207,8 +219,8 @@ def backup(config: Path = CONFIG_OPTION, verbose: int = VERBOSITY_OPTION) -> Non weitere manuelle Schritte sind nicht nötig. """ setup_logging(verbose) - configurations = cp.parse_configuration(config.read_text()) - for cfg in configurations: + parsed_config = cp.parse_configuration(config.read_text()) + for cfg in parsed_config.DeviceConfigurations: if _skip_device( cfg, log_missing=lambda cfg: logger.info( @@ -220,9 +232,14 @@ def backup(config: Path = CONFIG_OPTION, verbose: int = VERBOSITY_OPTION) -> Non ): continue backend = bb.BackupBackend.from_config(cfg) + _refresh_sudo(parsed_config.SudoPassCmd) with sdm.decrypted_device(cfg.device(), cfg.DevicePassCmd) as decrypted: with sdm.mounted_device(decrypted, cfg.compression()) as mount_dir: - backend.do_backup(mount_dir) + backend.do_backup(mount_dir, parsed_config.SudoPassCmd) + # A backup could take so long that the sudo session expires. In this + # case the user would have to enter the password again to unmount and + # close the device. To prevent this, the sudo session is refreshed. + _refresh_sudo(parsed_config.SudoPassCmd) @app.command() @@ -278,7 +295,7 @@ def format_device( "Zieldatei für ButterBackup-Konfiguration existiert schon!" ) config_writer = config_to.write_text - config: cp.Configuration + config: cp.DeviceConfiguration match backend: case ValidBackends.btrfs_rsync: config = prepare_device_for_butterbackend(device) @@ -287,7 +304,8 @@ def format_device( case _: t.assert_never(backend) json_serialisable = json.loads(config.model_dump_json(exclude_none=True)) - config_writer(json.dumps([json_serialisable], indent=4, sort_keys=True)) + wrapper = {"DeviceConfigurations": [json_serialisable]} + config_writer(json.dumps(wrapper, indent=4, sort_keys=True)) @app.command() diff --git a/src/butter_backup/config_parser.py b/src/butter_backup/config_parser.py index 9ffe23b..8b079ff 100644 --- a/src/butter_backup/config_parser.py +++ b/src/butter_backup/config_parser.py @@ -17,7 +17,6 @@ DirectoryPath, Field, FilePath, - RootModel, field_validator, model_validator, ) @@ -178,15 +177,17 @@ def compression(self) -> None: return None -Configuration = BtrFSRsyncConfig | ResticConfig +DeviceConfiguration = BtrFSRsyncConfig | ResticConfig -class ConfigurationList(RootModel[list[Configuration]]): - root: list[Configuration] +class Configuration(BaseModel): + model_config = ConfigDict(frozen=True) + DeviceConfigurations: list[DeviceConfiguration] + SudoPassCmd: str | None = None @model_validator(mode="after") - def check_unique_names(self) -> "ConfigurationList": - name_counts = Counter(cfg.Name for cfg in self.root) + def check_unique_names(self) -> t.Self: + name_counts = Counter(cfg.Name for cfg in self.DeviceConfigurations) duplicates = [name for name, count in name_counts.items() if count > 1] if duplicates: raise ValueError( @@ -205,7 +206,8 @@ def _parse_as_json5(content: str) -> Any: def _parse_as_toml(content: str) -> Any: data = tomllib.loads(content) - return data["DEVICE_CONFIGURATION"] + bb = data["butter-backup"] + return {"DeviceConfigurations": bb["device-configurations"]} def _parse_as_yaml(content: str) -> Any: @@ -220,15 +222,15 @@ def _parse_as_yaml(content: str) -> Any: ] -def parse_configuration(content: str) -> list[Configuration]: +def parse_configuration(content: str) -> Configuration: for parse_fn, exc_type in _PARSERS: try: raw = parse_fn(content) except exc_type: continue - config_lst = ConfigurationList.model_validate(raw) - if len(config_lst.root) == 0: + config = Configuration.model_validate(raw) + if len(config.DeviceConfigurations) == 0: sys.exit("Leere Konfigurationsdateien sind nicht erlaubt.\n") - return config_lst.root + return config sys.exit("Konfigurationsdatei konnte nicht gelesen werden.\n") diff --git a/tests/__init__.py b/tests/__init__.py index dff53ca..078256d 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -23,8 +23,14 @@ def complement_configuration( def complement_configuration( - config: cp.Configuration, source_dir: Path -) -> cp.Configuration: + config: cp.DeviceConfiguration, source_dir: Path +) -> cp.DeviceConfiguration: + """ + Register to configuration some files and folders that should be backed up + + This is used to test the handling of both files and folders in the backup backends. + """ + folders_root = source_dir / "backup-root" single_files = { source_dir / "config" / "docker" / "daemon.json", diff --git a/tests/cli/__init__.py b/tests/cli/__init__.py new file mode 100644 index 0000000..e303a3f --- /dev/null +++ b/tests/cli/__init__.py @@ -0,0 +1,35 @@ +import typing as t +from pathlib import Path + +from butter_backup import config_parser as cp + + +def in_docker_container() -> bool: + return Path("/.dockerenv").exists() + + +def prepare_tmp_path(config: cp.DeviceConfiguration, parent: Path) -> None: + if isinstance(config, cp.BtrFSRsyncConfig): + _prepare_tmp_path_for_btrfs(config, parent) + elif isinstance(config, cp.ResticConfig): + _prepare_tmp_path_for_restic(config) + else: + t.assert_never(config) + + +def _prepare_tmp_path_for_btrfs(config: cp.BtrFSRsyncConfig, parent: Path) -> None: + for cur in config.Folders: + cur.mkdir(exist_ok=True) + for cur in config.Files: + cur.parent.mkdir(exist_ok=True, parents=True) + cur.touch() + (parent / config.FilesDest).mkdir(exist_ok=True) + + +def _prepare_tmp_path_for_restic(config: cp.ResticConfig) -> None: + for cur in config.FilesAndFolders: + # For the purpose of the test that uses this helper function, + # test_do_backup_refuses_backup_when_device_is_already_open, it does not matter + # what the content of tmp_path is as long as the configuration can be parsed. + # Therefore, all items will be folders, since this is much easier to achieve. + cur.mkdir(parents=True, exist_ok=True) diff --git a/tests/test_cli.py b/tests/cli/test_cli_commands.py similarity index 89% rename from tests/test_cli.py rename to tests/cli/test_cli_commands.py index f2876bd..b2fe186 100644 --- a/tests/test_cli.py +++ b/tests/cli/test_cli_commands.py @@ -1,7 +1,6 @@ import datetime as dt import re import time -import typing as t from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory from unittest import mock @@ -17,36 +16,7 @@ from butter_backup.cli import app from tests import complement_configuration, get_random_filename - -def in_docker_container() -> bool: - return Path("/.dockerenv").exists() - - -def prepare_tmp_path(config: cp.Configuration, parent: Path) -> None: - if isinstance(config, cp.BtrFSRsyncConfig): - prepare_tmp_path_for_btrfs(config, parent) - elif isinstance(config, cp.ResticConfig): - prepare_tmp_path_for_restic(config) - else: - t.assert_never(config) - - -def prepare_tmp_path_for_btrfs(config: cp.BtrFSRsyncConfig, parent: Path) -> None: - for cur in config.Folders: - cur.mkdir(exist_ok=True) - for cur in config.Files: - cur.parent.mkdir(exist_ok=True, parents=True) - cur.touch() - (parent / config.FilesDest).mkdir(exist_ok=True) - - -def prepare_tmp_path_for_restic(config: cp.ResticConfig) -> None: - for cur in config.FilesAndFolders: - # For the purpose of the test that uses this helper function, - # test_do_backup_refuses_backup_when_device_is_already_open, it does not matter - # what the content of tmp_path is as long as the configuration can be parsed. - # Therefore, all items will be folders, since this is much easier to achieve. - cur.mkdir(parents=True, exist_ok=True) +from . import in_docker_container, prepare_tmp_path def wait_until_gone(p: Path, timeout: dt.timedelta = dt.timedelta(seconds=3)) -> None: @@ -184,7 +154,8 @@ def test_close_does_not_close_unopened_device(runner, encrypted_btrfs_device) -> config = encrypted_btrfs_device with NamedTemporaryFile() as tempf: config_file = Path(tempf.name) - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped_config = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped_config.model_dump_json()) close_result = runner.invoke(app, ["close", "--config", str(config_file)]) assert close_result.stdout == "" assert close_result.exit_code == 0 @@ -198,7 +169,8 @@ def test_open_close_roundtrip(runner, encrypted_device) -> None: expected_cryptsetup_map = Path(f"/dev/mapper/{config.UUID}") with NamedTemporaryFile() as tempf: config_file = Path(tempf.name) - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped_config = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped_config.model_dump_json()) open_result = runner.invoke(app, ["open", "--config", str(config_file)]) expected_msg = ( f"Speichermedium {config.Name} wurde in (?P/[^ ]+) geöffnet." @@ -227,7 +199,8 @@ def test_open_with_explicit_dest( config = encrypted_device expected_cryptsetup_map = Path(f"/dev/mapper/{config.UUID}") config_file = tmp_path / "config.json" - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped_config = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped_config.model_dump_json()) dest_dir = tmp_path / "mounts" dest_dir.mkdir() expected_mount_dir = dest_dir / config.Name @@ -256,7 +229,8 @@ def test_open_shows_error_on_failure(runner, encrypted_device, tmp_path: Path) - update={"DevicePassCmd": "echo wrong_password"} ) config_file = tmp_path / "config.json" - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped_config = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped_config.model_dump_json()) dest_dir = tmp_path / "mounts" dest_dir.mkdir() open_result = runner.invoke( @@ -335,7 +309,8 @@ def test_format_device_creates_expected_file_system( ) assert format_result.exit_code == 0 serialised_config = format_result.stdout - config_lst = list(cp.parse_configuration(serialised_config)) + parsed = cp.parse_configuration(serialised_config) + config_lst = parsed.DeviceConfigurations assert len(config_lst) == 1 config = config_lst[0] with sdm.decrypted_device(big_file, config.DevicePassCmd) as decrypted: @@ -347,7 +322,8 @@ def test_format_device_creates_expected_file_system( def test_format_device(runner, backend: str, big_file: Path) -> None: format_result = runner.invoke(app, ["format-device", backend, str(big_file)]) serialised_config = format_result.stdout - config_lst = list(cp.parse_configuration(serialised_config)) + parsed = cp.parse_configuration(serialised_config) + config_lst = parsed.DeviceConfigurations assert len(config_lst) == 1 device_uuid = config_lst[0].UUID device_name = config_lst[0].Name @@ -371,7 +347,8 @@ def test_format_device_chowns_filesystem_to_user( ) -> None: format_result = runner.invoke(app, ["format-device", backend, str(big_file)]) serialised_config = format_result.stdout - config_lst = list(cp.parse_configuration(serialised_config)) + parsed = cp.parse_configuration(serialised_config) + config_lst = parsed.DeviceConfigurations assert len(config_lst) == 1 config = config_lst[0] @@ -405,7 +382,8 @@ def test_do_backup_refuses_backup_when_device_is_already_open( config_file = tmp_path / "config.json" - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped_config = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped_config.model_dump_json()) runner.invoke(app, ["open", "--config", str(config_file)]) result = runner.invoke(app, [subprogram, "--config", str(config_file)]) expected_msg = ( @@ -417,7 +395,7 @@ def test_do_backup_refuses_backup_when_device_is_already_open( def test_unmount_error_does_not_cause_content_deletion( - runner: CliRunner, encrypted_device: cp.Configuration, tmp_path: Path, mocker + runner: CliRunner, encrypted_device: cp.DeviceConfiguration, tmp_path: Path, mocker ) -> None: # THIS IS A REGRESSION TEST! # @@ -439,7 +417,8 @@ def test_unmount_error_does_not_cause_content_deletion( config = complement_configuration(encrypted_device, tmp_path) prepare_tmp_path(config, tmp_path) config_file = tmp_path / "config.json" - config_file.write_text(f"[{config.model_dump_json()}]") + wrapped = cp.Configuration(DeviceConfigurations=[config]) + config_file.write_text(wrapped.model_dump_json()) result = runner.invoke(app, ["backup", "--config", str(config_file)]) assert result.exit_code == 1 diff --git a/tests/cli/test_sudo_pass_cmd.py b/tests/cli/test_sudo_pass_cmd.py new file mode 100644 index 0000000..1ae9d23 --- /dev/null +++ b/tests/cli/test_sudo_pass_cmd.py @@ -0,0 +1,175 @@ +import os +import typing as t +from pathlib import Path + +import pytest +import shell_interface as sh +from click.testing import Result +from typer.testing import CliRunner + +from butter_backup import config_parser as cp +from butter_backup.cli import app +from tests import complement_configuration + +from . import in_docker_container, prepare_tmp_path + +_SUDO_PASS_CMD = os.environ.get("BUTTERBACKUP_SUDO_PASSCMD") +_requires_sudo_pass_cmd = pytest.mark.skipif( + _SUDO_PASS_CMD is None, + reason="BUTTERBACKUP_SUDO_PASSCMD environment variable is not set", +) + + +@pytest.fixture +def runner(): + return CliRunner() + + +def _invalidate_sudo_session() -> None: + sh.run_cmd(cmd=["sudo", "-k"]) + + +def test_sudo_pass_cmd_is_used_in_open( + runner: CliRunner, + encrypted_device: cp.DeviceConfiguration, + mocker, + tmp_path: Path, +) -> None: + sudo_pass_cmd = "echo test_password" + wrapped_config = cp.Configuration( + DeviceConfigurations=[encrypted_device], SudoPassCmd=sudo_pass_cmd + ) + config_file = tmp_path / "config.json" + config_file.write_text(wrapped_config.model_dump_json()) + dest_dir = tmp_path / "mounts" + dest_dir.mkdir() + + mock_pipe = mocker.patch("shell_interface.pipe_pass_cmd_to_real_cmd") + mocker.patch( + "storage_device_managers.open_encrypted_device", + return_value=Path("/dev/mapper/test"), + ) + mocker.patch("storage_device_managers.mount_device") + + runner.invoke(app, ["open", str(dest_dir), "--config", str(config_file)]) + + mock_pipe.assert_called_once_with( + sudo_pass_cmd, ["sudo", "-Sv"], capture_output=True + ) + + +def test_sudo_pass_cmd_is_used_in_backup( + runner: CliRunner, + encrypted_device: cp.DeviceConfiguration, + mocker, + tmp_path: Path, +) -> None: + sudo_pass_cmd = "echo test_password" + wrapped_config = cp.Configuration( + DeviceConfigurations=[encrypted_device], SudoPassCmd=sudo_pass_cmd + ) + config_file = tmp_path / "config.json" + config_file.write_text(wrapped_config.model_dump_json()) + + mock_pipe = mocker.patch("shell_interface.pipe_pass_cmd_to_real_cmd") + mocker.patch("storage_device_managers.decrypted_device") + mocker.patch("storage_device_managers.mounted_device") + mocker.patch( + "butter_backup.backup_backends.BackupBackend.from_config", + return_value=mocker.MagicMock(), + ) + + runner.invoke(app, ["backup", "--config", str(config_file)]) + + expected_nof_calls = 2 # One before opening the device and one post backup + result_calls = mock_pipe.call_args_list + expected_calls = [ + ((sudo_pass_cmd, ["sudo", "-Sv"]), {"capture_output": True}) + ] * expected_nof_calls + assert result_calls == expected_calls + + +def test_sudo_pass_cmd_is_used_in_close( + runner: CliRunner, + encrypted_device: cp.DeviceConfiguration, + mocker, + tmp_path: Path, +) -> None: + sudo_pass_cmd = "echo test_password" + wrapped_config = cp.Configuration( + DeviceConfigurations=[encrypted_device], SudoPassCmd=sudo_pass_cmd + ) + config_file = tmp_path / "config.json" + config_file.write_text(wrapped_config.model_dump_json()) + map_name = str(encrypted_device.map_name()) + + mock_pipe = mocker.patch("shell_interface.pipe_pass_cmd_to_real_cmd") + mocker.patch( + "storage_device_managers.get_mounted_devices", + return_value={map_name: [tmp_path / "mnt"]}, + ) + mocker.patch("storage_device_managers.unmount_device") + mocker.patch("storage_device_managers.close_decrypted_device") + + runner.invoke(app, ["close", "--config", str(config_file)]) + + mock_pipe.assert_called_once_with( + sudo_pass_cmd, ["sudo", "-Sv"], capture_output=True + ) + + +@_requires_sudo_pass_cmd +@pytest.mark.skipif( + in_docker_container(), reason="Test is known to fail in Docker container" +) +@pytest.mark.parametrize( + "command,has_failed", + [ + ( + "open", + lambda config, result: ( + f"Speichermedium {config.Name} konnte nicht geöffnet werden." + in result.stdout + ), + ), + ("backup", lambda _, result: result.exit_code != 0), + ("close", lambda _, result: result.exit_code != 0), + ], +) +def test_open_requires_correct_sudo_pass_cmd( + runner: CliRunner, + encrypted_device: cp.DeviceConfiguration, + command: str, + has_failed: t.Callable[[cp.DeviceConfiguration, Result], bool], + tmp_path: Path, +) -> None: + assert _SUDO_PASS_CMD is not None + config = complement_configuration(encrypted_device, tmp_path) + prepare_tmp_path(config, tmp_path) + correct_config = cp.Configuration( + DeviceConfigurations=[config], SudoPassCmd=_SUDO_PASS_CMD + ) + correct_config_file = tmp_path / "correct_config.json" + correct_config_file.write_text(correct_config.model_dump_json()) + wrong_config = cp.Configuration(DeviceConfigurations=[config], SudoPassCmd="false") + wrong_config_file = tmp_path / "wrong_config.json" + wrong_config_file.write_text(wrong_config.model_dump_json()) + + # Wrong pass-cmd: pipe_pass_cmd_to_real_cmd raises PassCmdError since "false" + # exits 1. The open command catches it and prints the failure message. + _invalidate_sudo_session() + if command == "close": + # The device needs to be open for the close command to pick up the device at + # all. + runner.invoke(app, ["open", "--config", str(correct_config_file)]) + + wrong_result = runner.invoke(app, [command, "--config", str(wrong_config_file)]) + assert has_failed(config, wrong_result) + + # Correct password: open should succeed + _invalidate_sudo_session() + correct_result = runner.invoke(app, [command, "--config", str(correct_config_file)]) + assert correct_result.exit_code == 0 + + # Cleanup + runner.invoke(app, ["close", "--config", str(correct_config_file)]) diff --git a/tests/config_parser/test_btrfs_config.py b/tests/config_parser/test_btrfs_config.py index 1ee7971..b9492fe 100644 --- a/tests/config_parser/test_btrfs_config.py +++ b/tests/config_parser/test_btrfs_config.py @@ -1,4 +1,5 @@ import re +import typing as t from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -15,25 +16,8 @@ EXCLUDE_FILE = TEST_RESOURCES / "exclude-file" -@st.composite -def valid_unparsed_empty_btrfs_config(draw): - config = draw( - st.fixed_dictionaries( - { - "BackupRepositoryFolder": st.text(), - "Compression": st.sampled_from( - [cur.value for cur in ValidCompressions] - ), - "ExcludePatternsFile": st.just(str(EXCLUDE_FILE)) | st.none(), - "DevicePassCmd": st.text(), - "Files": st.just([]), - "FilesDest": st.text(), - "Folders": st.just({}), - "UUID": st.uuids().map(str), - } - ) - ) - return config +def valid_unparsed_empty_btrfs_config() -> st.SearchStrategy[dict[str, t.Any]]: + return hu.valid_unparsed_empty_btrfs_config(EXCLUDE_FILE) @given(base_config=valid_unparsed_empty_btrfs_config(), dest_dir=hu.filenames()) @@ -51,7 +35,7 @@ def test_btrfs_config_rejects_file_dest_collision(base_config, dest_dir: str): @given(base_config=valid_unparsed_empty_btrfs_config(), file_name=hu.filenames()) -def test_btrfs_config_rejects_filename_collision(base_config, file_name): +def test_btrfs_config_rejects_filename_collision(base_config, file_name: str): base_config["Folders"] = {} with TemporaryDirectory() as td1: with TemporaryDirectory() as td2: diff --git a/tests/config_parser/test_parse_configuration.py b/tests/config_parser/test_parse_configuration.py index 57f00d3..343cab0 100644 --- a/tests/config_parser/test_parse_configuration.py +++ b/tests/config_parser/test_parse_configuration.py @@ -1,7 +1,7 @@ import json +import typing as t from pathlib import Path from tempfile import TemporaryDirectory -from uuid import UUID import pytest from hypothesis import given @@ -29,69 +29,38 @@ def test_example_files_can_be_parsed(example_file: Path) -> None: content = example_file.read_text() result = cp.parse_configuration(content) expected_names = ["BtrFS Backup Example", "Restic Backup Example"] - assert [cfg.Name for cfg in result] == expected_names + assert [cfg.Name for cfg in result.DeviceConfigurations] == expected_names @given( - backup_dest_dirs=st.lists(st.text(), min_size=2, max_size=2, unique=True), - backup_repository_folder=st.text(), - name=hu.valid_path_components(), - pass_cmd=st.text(), - uuid=st.uuids(), + first_config=hu.valid_empty_btrfs_config(None) | hu.valid_empty_restic_config(None), + second_config=hu.valid_empty_btrfs_config(None) + | hu.valid_empty_restic_config(None), + shared_name=hu.valid_path_components(), ) -def test_parse_configuration_rejects_duplicate_names_for_btrfs_rsync( - backup_dest_dirs: list[str], - backup_repository_folder: str, - name: str, - pass_cmd: str, - uuid: UUID, +def test_parse_configuration_rejects_duplicate_names( + first_config: cp.BtrFSRsyncConfig | cp.ResticConfig, + second_config: cp.BtrFSRsyncConfig | cp.ResticConfig, + shared_name: str, ) -> None: - with TemporaryDirectory() as source: - btrfs_cfg = cp.BtrFSRsyncConfig( - BackupRepositoryFolder=backup_repository_folder, - DevicePassCmd=pass_cmd, - Files=set(), - FilesDest=backup_dest_dirs[1], - Folders={Path(source): backup_dest_dirs[0]}, - Name=name, - UUID=uuid, - ) - btrfs_cfg_json = btrfs_cfg.model_dump_json() - with pytest.raises(ValidationError): - cp.parse_configuration(f"[{btrfs_cfg_json}, {btrfs_cfg_json}]") - - -@given( - backup_repository_folder=st.text(), - device_pass_cmd=st.text(), - name=hu.valid_path_components(), - repository_pass_cmd=st.text(), - uuid=st.uuids(), -) -def test_parse_configuration_rejects_duplicate_names_for_restic( - backup_repository_folder: str, - device_pass_cmd: str, - name: str, - repository_pass_cmd: str, - uuid: UUID, -) -> None: - with TemporaryDirectory() as source: - restic_cfg = cp.ResticConfig( - BackupRepositoryFolder=backup_repository_folder, - DevicePassCmd=device_pass_cmd, - FilesAndFolders={Path(source)}, - Name=name, - RepositoryPassCmd=repository_pass_cmd, - UUID=uuid, + first_config = first_config.model_copy(update={"Name": shared_name}) + second_config = second_config.model_copy(update={"Name": shared_name}) + with pytest.raises(ValidationError): + cp.parse_configuration( + json.dumps( + { + "DeviceConfigurations": [ + first_config.model_dump(mode="json"), + second_config.model_dump(mode="json"), + ] + } + ) ) - restic_cfg_json = restic_cfg.model_dump_json() - with pytest.raises(ValidationError): - cp.parse_configuration(f"[{restic_cfg_json}, {restic_cfg_json}]") def test_parse_configuration_rejects_empty_list() -> None: with pytest.raises(SystemExit) as sysexit: - cp.parse_configuration("[]") + cp.parse_configuration(json.dumps({"DeviceConfigurations": []})) assert sysexit.value.code not in SUCCESS_CODES @@ -102,64 +71,60 @@ def test_parse_configuration_rejects_empty_list() -> None: ) def test_parse_configuration_warns_on_non_lists(non_list) -> None: with pytest.raises(ValidationError): - cp.parse_configuration(json.dumps(non_list)) + cp.parse_configuration(json.dumps({"DeviceConfigurations": non_list})) def test_parse_configuration_warns_on_non_dict_item() -> None: with pytest.raises(ValidationError): - cp.parse_configuration(json.dumps([{}, 1337])) + cp.parse_configuration(json.dumps({"DeviceConfigurations": [{}, 1337]})) @given( - backup_dest_dirs=st.lists(st.text(), min_size=2, max_size=2, unique=True), - backup_repository_folder=st.text(), - name=hu.valid_path_components(), - pass_cmd=st.text(), - uuid=st.uuids(), + base_config=hu.valid_unparsed_empty_btrfs_config(None), + files_folders_dest=st.lists( + hu.valid_path_components(), min_size=2, max_size=2, unique=True + ), ) def test_parse_configuration_parses_btrfs_config( - backup_dest_dirs: list[str], - backup_repository_folder: str, - name: str, - pass_cmd: str, - uuid: UUID, + base_config: dict[str, t.Any], files_folders_dest: list[str] ) -> None: + files_dest, folders_dest = files_folders_dest with TemporaryDirectory() as source: - btrfs_cfg = cp.BtrFSRsyncConfig( - BackupRepositoryFolder=backup_repository_folder, - DevicePassCmd=pass_cmd, - Files=set(), - FilesDest=backup_dest_dirs[1], - Folders={Path(source): backup_dest_dirs[0]}, - Name=name, - UUID=uuid, - ) - cfg_lst = cp.parse_configuration(f"[{btrfs_cfg.model_dump_json()}]") - assert cfg_lst == [btrfs_cfg] + base_config.update({"Folders": {source: folders_dest}, "FilesDest": files_dest}) + btrfs_cfg = cp.BtrFSRsyncConfig.model_validate(base_config) + cfg = cp.Configuration(DeviceConfigurations=[btrfs_cfg]) + result = cp.parse_configuration(cfg.model_dump_json()) + assert result == cfg @given( - backup_repository_folder=st.text(), - device_pass_cmd=st.text(), - name=hu.valid_path_components(), - repository_pass_cmd=st.text(), - uuid=st.uuids(), + base_config=hu.valid_unparsed_empty_restic_config(None), ) -def test_load_configuration_parses_restic_config( - backup_repository_folder: str, - device_pass_cmd: str, - name: str, - repository_pass_cmd: str, - uuid: UUID, -) -> None: +def test_load_configuration_parses_restic_config(base_config: dict[str, t.Any]) -> None: with TemporaryDirectory() as source: - restic_cfg = cp.ResticConfig( - BackupRepositoryFolder=backup_repository_folder, - DevicePassCmd=device_pass_cmd, - FilesAndFolders={Path(source)}, - Name=name, - RepositoryPassCmd=repository_pass_cmd, - UUID=uuid, - ) - cfg_lst = cp.parse_configuration(f"[{restic_cfg.model_dump_json()}]") - assert cfg_lst == [restic_cfg] + base_config["FilesAndFolders"] = [source] + restic_cfg = cp.ResticConfig.model_validate(base_config) + cfg = cp.Configuration(DeviceConfigurations=[restic_cfg]) + result = cp.parse_configuration(cfg.model_dump_json()) + assert result == cfg + + +@given( + device_configurations=st.lists( + hu.valid_empty_restic_config(None) | hu.valid_empty_btrfs_config(None), + min_size=1, + unique_by=lambda cfg: cfg.Name, + ), + sudo_pass_cmd=st.text(), +) +def test_parse_configuration_with_sudo_pass_cmd( + device_configurations: list[cp.ResticConfig | cp.BtrFSRsyncConfig], + sudo_pass_cmd: str, +) -> None: + cfg = cp.Configuration( + DeviceConfigurations=device_configurations, + SudoPassCmd=sudo_pass_cmd, + ) + raw = cfg.model_dump_json() + result = cp.parse_configuration(raw) + assert result == cfg diff --git a/tests/config_parser/test_restic_config.py b/tests/config_parser/test_restic_config.py index e5b5faa..4f93a97 100644 --- a/tests/config_parser/test_restic_config.py +++ b/tests/config_parser/test_restic_config.py @@ -1,4 +1,3 @@ -import json from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory @@ -15,21 +14,10 @@ EXCLUDE_FILE = TEST_RESOURCES / "exclude-file" -@st.composite -def valid_unparsed_empty_restic_config(draw): - config = draw( - st.builds( - cp.ResticConfig, - BackupRepositoryFolder=st.text(), - ExcludePatternsFile=st.just(str(EXCLUDE_FILE)) | st.none(), - DevicePassCmd=st.text(), - FilesAndFolders=st.just([]), - Name=hu.valid_path_components(), - RepositoryPassCmd=st.text(), - UUID=st.uuids(), - ) - ) - return json.loads(config.model_dump_json()) +def valid_unparsed_empty_restic_config() -> st.SearchStrategy[ + dict[str, str | Path | None] +]: + return hu.valid_unparsed_empty_restic_config(EXCLUDE_FILE) @given(base_config=valid_unparsed_empty_restic_config()) diff --git a/tests/conftest.py b/tests/conftest.py index 62f7ee8..3588ba1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -111,13 +111,13 @@ def encrypted_restic_device( @pytest.fixture(params=["encrypted_btrfs_device", "encrypted_restic_device"]) -def encrypted_device(request) -> cp.Configuration: - config: cp.Configuration = request.getfixturevalue(request.param) +def encrypted_device(request) -> cp.DeviceConfiguration: + config: cp.DeviceConfiguration = request.getfixturevalue(request.param) return config @pytest.fixture -def mounted_device(encrypted_device) -> t.Iterator[tuple[cp.Configuration, Path]]: +def mounted_device(encrypted_device) -> t.Iterator[tuple[cp.DeviceConfiguration, Path]]: config = encrypted_device with sdm.decrypted_device(config.device(), config.DevicePassCmd) as decrypted: with sdm.mounted_device(decrypted, config.compression()) as mounted_device: diff --git a/tests/hypothesis_utils.py b/tests/hypothesis_utils.py index 9b45dcf..dbdcb1b 100644 --- a/tests/hypothesis_utils.py +++ b/tests/hypothesis_utils.py @@ -1,25 +1,75 @@ +import typing as t +from pathlib import Path + from hypothesis import strategies as st +from storage_device_managers import ValidCompressions + +from butter_backup import config_parser as cp -@st.composite -def filenames(draw, min_size=1) -> str: +def filenames(min_size=1) -> st.SearchStrategy[str]: alpha = "abcdefghijklmnopqrstuvwxyzäöu" num = "01234567890" special = "_-.,() " permitted_chars = f"{alpha}{alpha.upper()}{num}{special}" - fname: str = draw( - st.text(permitted_chars, min_size=min_size).filter( - lambda fname: fname not in {".", ".."} - ) + return st.text(permitted_chars, min_size=min_size).filter( + lambda fname: fname not in {".", ".."} + ) + + +def valid_path_components(min_size=1) -> st.SearchStrategy[str]: + return st.text(min_size=min_size, max_size=128).filter( + lambda n: "/" not in n and "\x00" not in n and n not in {".", ".."} + ) + + +def valid_unparsed_empty_btrfs_config( + exclude_file: Path | None, +) -> st.SearchStrategy[dict[str, t.Any]]: + return valid_empty_btrfs_config(exclude_file).map( + lambda config: config.model_dump(mode="json") + ) + + +def valid_empty_btrfs_config( + exclude_file: Path | None, +) -> st.SearchStrategy[cp.BtrFSRsyncConfig]: + return st.builds( + cp.BtrFSRsyncConfig, + Name=st.none() | valid_path_components(), + BackupRepositoryFolder=valid_path_components(), + Compression=valid_compressions(), + ExcludePatternsFile=st.just(exclude_file), + DevicePassCmd=st.text(), + Files=st.just([]), + FilesDest=st.text(), + Folders=st.just({}), + UUID=st.uuids().map(str), + ) + + +def valid_compressions() -> st.SearchStrategy[ValidCompressions]: + return st.sampled_from(list(ValidCompressions)) + + +def valid_empty_restic_config( + exclude_file: Path | None, +) -> st.SearchStrategy[cp.ResticConfig]: + return st.builds( + cp.ResticConfig, + BackupRepositoryFolder=st.text(), + ExcludePatternsFile=st.just(exclude_file), + DevicePassCmd=st.text(), + FilesAndFolders=st.just([]), + Name=st.none() | valid_path_components(), + RepositoryPassCmd=st.text(), + UUID=st.uuids(), ) - return fname -@st.composite -def valid_path_components(draw, min_size=1) -> str: - name: str = draw( - st.text(min_size=min_size, max_size=128).filter( - lambda n: "/" not in n and "\x00" not in n and n not in {".", ".."} - ) +def valid_unparsed_empty_restic_config( + exclude_file: Path | None, +) -> st.SearchStrategy[dict[str, t.Any]]: + return valid_empty_restic_config(exclude_file).map( + lambda config: config.model_dump(mode="json") ) - return name diff --git a/tests/test_backup_backends.py b/tests/test_backup_backends.py index 2165e9f..1974f3b 100644 --- a/tests/test_backup_backends.py +++ b/tests/test_backup_backends.py @@ -6,6 +6,7 @@ from pathlib import Path from tempfile import TemporaryDirectory from typing import Iterable, overload +from uuid import uuid4 import pytest import shell_interface as sh @@ -28,11 +29,11 @@ def list_files_recursively(path: Path) -> Iterable[Path]: def run_backup_cycle( - base_config: cp.Configuration, + base_config: cp.DeviceConfiguration, source_dir: Path, device: Path, config_extension: dict[str, t.Any] | None = None, -) -> cp.Configuration: +) -> cp.DeviceConfiguration: config = complement_configuration(base_config, source_dir) if config_extension is not None: config = config.model_copy(update=config_extension) @@ -54,7 +55,7 @@ def get_expected_content( def get_expected_content( - config: cp.Configuration, + config: cp.DeviceConfiguration, exclude_to_ignore_file: bool, ) -> Counter[bytes] | dict[Path, bytes]: match config: @@ -116,7 +117,7 @@ def get_result_content(config: cp.ResticConfig, mounted: Path) -> Counter[bytes] def get_result_content( - config: cp.Configuration, mounted: Path + config: cp.DeviceConfiguration, mounted: Path ) -> Counter[bytes] | dict[Path, bytes]: match config: case cp.BtrFSRsyncConfig(): @@ -315,3 +316,55 @@ def test_do_backup_for_restic_adapts_ownership( } assert found_user == {expected_user} assert found_group == {expected_group} + + +def test_btrfs_backend_refreshes_sudo_session_in_do_backup( + mocker, tmp_path: Path +) -> None: + sudo_pass_cmd = "echo test_password" + config = cp.BtrFSRsyncConfig( + BackupRepositoryFolder="repo", + DevicePassCmd="echo pass", + Files=set(), + FilesDest="files", + Folders={}, + Name="test-device", + UUID=uuid4(), + ) + backend = bb.BtrFSRsyncBackend(config=config) + mock_refresh = mocker.patch("butter_backup.backup_backends._refresh_sudo") + mocker.patch.object( + bb.BtrFSRsyncBackend, "get_source_snapshot", return_value=tmp_path + ) + mocker.patch.object(bb.BtrFSRsyncBackend, "snapshot", return_value=tmp_path) + mocker.patch.object(bb.BtrFSRsyncBackend, "adapt_ownership") + + backend.do_backup(tmp_path, sudo_pass_cmd) + + # Only one call before adapting ownership. The other calls are not covered by this + # test to keep the setup simple. + mock_refresh.assert_called_once_with(sudo_pass_cmd) + + +def test_restic_backend_refreshes_sudo_session_in_do_backup( + mocker, tmp_path: Path +) -> None: + sudo_pass_cmd = "echo test_password" + config = cp.ResticConfig( + BackupRepositoryFolder="repo", + DevicePassCmd="echo pass", + FilesAndFolders=set(), + Name="test-device", + RepositoryPassCmd="echo repo_pass", + UUID=uuid4(), + ) + backend = bb.ResticBackend(config=config) + mock_refresh = mocker.patch("butter_backup.backup_backends._refresh_sudo") + mocker.patch.object(bb.ResticBackend, "copy_files") + mocker.patch.object(bb.ResticBackend, "adapt_ownership") + + backend.do_backup(tmp_path, sudo_pass_cmd) + + expected_nof_calls = 2 + result_calls = mock_refresh.call_args_list + assert len(result_calls) == expected_nof_calls