From d53a3b0da331eb18d74476963427223106e19e8b Mon Sep 17 00:00:00 2001 From: Rotemy Yaari Date: Sun, 5 Oct 2025 20:25:20 +0300 Subject: [PATCH] Better overwrite detection for existing catalogs --- src/catalog.rs | 19 ++++-- src/main.rs | 36 +++++++++- tests/test_overwrite.py | 142 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 tests/test_overwrite.py diff --git a/src/catalog.rs b/src/catalog.rs index f9672f2..19c6602 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -123,6 +123,14 @@ impl Directory { self.path.as_path() } + pub fn get_catalog_file_path(&self, algo: Algorithm) -> CanonicalPath { + if let Some(custom_file) = &self.catalog_path { + custom_file.clone() + } else { + self.signature_file_path(algo) + } + } + pub(crate) fn catalog_metadata(&self, algo: Algorithm) -> CatalogMetadata { let signature_filename = self.signature_filename(algo); let signature_file_path = self @@ -243,10 +251,13 @@ impl Catalog { } pub fn write_signature_file(&self, allow_existing: bool) -> anyhow::Result<()> { - let mut sigfile = std::fs::OpenOptions::new() - .create_new(!allow_existing) - .write(true) - .truncate(false) + let mut open_options = std::fs::OpenOptions::new(); + if allow_existing { + open_options.create(true).write(true).truncate(true); + } else { + open_options.create_new(true).write(true); + }; + let mut sigfile = open_options .open(self.metadata.signature_file_path.as_path()) .with_context(|| { format!( diff --git a/src/main.rs b/src/main.rs index 3705612..1d0bd18 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,6 +27,9 @@ struct SignParams { /// path to the catalog file to create/use instead of the default location #[clap(long)] catalog_file: Option, + /// automatically overwrite existing catalog file without prompting + #[clap(long)] + overwrite: bool, #[arg(short)] recursive: bool, #[clap(default_value = ".")] @@ -192,11 +195,24 @@ fn create_catalog(params: SignParams, config: &config::Config) -> anyhow::Result ) })?; + let catalog_file_path = directory.get_catalog_file_path(algo); + + if catalog_file_path.as_path().exists() + && !params.overwrite + && !prompt_user_overwrite(catalog_file_path.as_path())? + { + anyhow::bail!( + "Catalog file {:?} already exists. Use --overwrite to overwrite automatically.", + catalog_file_path.as_path() + ); + } + let mut catalog = directory.empty_catalog(algo); catalog.populate()?; - catalog.write_signature_file(false) + let should_overwrite = catalog_file_path.as_path().exists(); + catalog.write_signature_file(should_overwrite) } fn test_catalog(params: TestParams) -> anyhow::Result<()> { @@ -293,6 +309,24 @@ fn read_user_choice(writer: &mut dyn WriteColor) -> anyhow::Result } } +fn prompt_user_overwrite(catalog_path: &std::path::Path) -> anyhow::Result { + let mut writer = StandardStream::stderr(termcolor::ColorChoice::Auto); + + writer.set_color(ColorSpec::new().set_fg(Some(Color::Yellow)))?; + println!("Catalog file {:?} already exists.", catalog_path); + writer.reset()?; + + writer.set_color(ColorSpec::new().set_fg(Some(Color::Cyan)))?; + print!("Overwrite existing catalog file? [y/N]: "); + writer.reset()?; + std::io::stdout().flush()?; + + let key = read_single_char()?.to_ascii_lowercase(); + println!("{key}"); + + Ok(key == 'y') +} + fn confirm_updates( paths: &HashSet<&std::path::Path>, writer: &mut dyn WriteColor, diff --git a/tests/test_overwrite.py b/tests/test_overwrite.py new file mode 100644 index 0000000..02647e1 --- /dev/null +++ b/tests/test_overwrite.py @@ -0,0 +1,142 @@ +# pylint: disable=redefined-outer-name +import pytest +import subprocess +import pathlib +import os + + +def test_sign_fails_when_catalog_exists_without_overwrite(directory, run, algorithm): + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + with pytest.raises(subprocess.CalledProcessError) as exc_info: + run(f"sign -a {algorithm} .", cwd=directory) + + assert exc_info.value.returncode != 0 + assert b"already exists" in exc_info.value.output + + +def test_sign_succeeds_with_overwrite_flag(directory, run, algorithm): + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + original_content = sigfile.path.read_text() + + run(f"sign -a {algorithm} --overwrite .", cwd=directory) + assert sigfile.path.exists() + + new_content = sigfile.path.read_text() + assert original_content == new_content + + +def test_sign_with_custom_catalog_file_fails_when_exists_without_overwrite(directory, run, algorithm): + custom_catalog = directory / f"custom.{algorithm}" + + run(f"sign -a {algorithm} --catalog-file {custom_catalog} .", cwd=directory) + assert custom_catalog.exists() + + with pytest.raises(subprocess.CalledProcessError) as exc_info: + run(f"sign -a {algorithm} --catalog-file {custom_catalog} .", cwd=directory) + + assert exc_info.value.returncode != 0 + assert b"already exists" in exc_info.value.output + + +def test_sign_with_custom_catalog_file_succeeds_with_overwrite_flag(directory, run, algorithm): + custom_catalog = directory / f"custom.{algorithm}" + + run(f"sign -a {algorithm} --catalog-file {custom_catalog} .", cwd=directory) + assert custom_catalog.exists() + + original_content = custom_catalog.read_text() + + run(f"sign -a {algorithm} --catalog-file {custom_catalog} --overwrite .", cwd=directory) + assert custom_catalog.exists() + + new_content = custom_catalog.read_text() + assert original_content == new_content + + +def test_sign_overwrite_with_different_content(directory, run, algorithm, random_data_gen): + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + original_content = sigfile.path.read_text() + + test_file = directory / "new_file" + with test_file.open("wb") as f: + f.write(random_data_gen()) + + run(f"sign -a {algorithm} --overwrite .", cwd=directory) + assert sigfile.path.exists() + + new_content = sigfile.path.read_text() + assert original_content != new_content + assert "new_file" in new_content + + +def test_sign_overwrite_truncates_file_properly(directory, run, algorithm): + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + with sigfile.path.open("a") as f: + f.write("\nextra line that should be removed") + + modified_content = sigfile.path.read_text() + assert "extra line" in modified_content + + run(f"sign -a {algorithm} --overwrite .", cwd=directory) + + final_content = sigfile.path.read_text() + assert "extra line" not in final_content + + from conftest import Sigfile + sigfile_obj = Sigfile(sigfile.path) + sigfile_obj.assert_all_files_contained(directory, allow_unknown=False) + + +def test_sign_interactive_overwrite_yes(directory, run, algorithm, monkeypatch): + import io + import sys + + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + monkeypatch.setattr('sys.stdin', io.StringIO('y\n')) + + try: + result = run(f"sign -a {algorithm} .", cwd=directory, input=b'y\n') + assert sigfile.path.exists() + except subprocess.CalledProcessError: + pytest.skip("Interactive test requires TTY support") + + +def test_sign_interactive_overwrite_no(directory, run, algorithm, monkeypatch): + import io + import sys + + sigfile = algorithm.signature_file(directory) + + run(f"sign -a {algorithm} .", cwd=directory) + assert sigfile.path.exists() + + monkeypatch.setattr('sys.stdin', io.StringIO('n\n')) + + try: + with pytest.raises(subprocess.CalledProcessError) as exc_info: + run(f"sign -a {algorithm} .", cwd=directory, input=b'n\n') + + assert exc_info.value.returncode != 0 + assert b"already exists" in exc_info.value.output + except subprocess.CalledProcessError: + pytest.skip("Interactive test requires TTY support")