From 5514d9f555d52fdb9a5f52acf2787e14824ec373 Mon Sep 17 00:00:00 2001 From: Nils Philippsen Date: Jun 16 2021 09:37:42 +0000 Subject: Move subcommand modules into their own package This is needed so that exposing process_distgit() (the function) doesn't make process_distgit (the module) inaccessible (and is better namespacing anyway). Signed-off-by: Nils Philippsen --- diff --git a/koji_plugins/rpmautospec_builder.py b/koji_plugins/rpmautospec_builder.py index 58c51a4..2f757d4 100644 --- a/koji_plugins/rpmautospec_builder.py +++ b/koji_plugins/rpmautospec_builder.py @@ -2,7 +2,7 @@ import logging from koji.plugin import callback -from rpmautospec.process_distgit import process_distgit +from rpmautospec.subcommands.process_distgit import process_distgit log = logging.getLogger(__name__) diff --git a/rpmautospec/changelog.py b/rpmautospec/changelog.py deleted file mode 100644 index 45b1f53..0000000 --- a/rpmautospec/changelog.py +++ /dev/null @@ -1,60 +0,0 @@ -import logging -from typing import Any, Dict, Optional, Union - -from .pkg_history import PkgHistoryProcessor - - -log = logging.getLogger(__name__) - - -def register_subcommand(subparsers): - subcmd_name = "generate-changelog" - - gen_changelog_parser = subparsers.add_parser( - subcmd_name, - help="Generate changelog entries from git commit logs", - ) - - gen_changelog_parser.add_argument( - "spec_or_path", - default=".", - nargs="?", - help="Path to package worktree or the spec file within", - ) - - return subcmd_name - - -def _coerce_to_str(str_or_bytes: Union[str, bytes]) -> str: - if isinstance(str_or_bytes, bytes): - str_or_bytes = str_or_bytes.decode("utf-8", errors="replace") - return str_or_bytes - - -def _coerce_to_bytes(str_or_bytes: Union[str, bytes]) -> str: - if isinstance(str_or_bytes, str): - str_or_bytes = str_or_bytes.encode("utf-8") - return str_or_bytes - - -def collate_changelog( - processor_results: Dict[str, Any], result_type: Optional[type] = str -) -> Union[str, bytes]: - changelog = processor_results["changelog"] - if result_type == str: - entry_strings = (_coerce_to_str(entry["data"]) for entry in changelog) - else: - entry_strings = (_coerce_to_bytes(entry["data"]) for entry in changelog) - return "\n\n".join(entry_strings) - - -def produce_changelog(spec_or_repo): - processor = PkgHistoryProcessor(spec_or_repo) - result = processor.run(visitors=(processor.release_number_visitor, processor.changelog_visitor)) - return collate_changelog(result) - - -def main(args): - """Main method.""" - changelog = produce_changelog(args.spec_or_path) - log.info(changelog) diff --git a/rpmautospec/cli.py b/rpmautospec/cli.py index b49f5e1..6bc28d5 100644 --- a/rpmautospec/cli.py +++ b/rpmautospec/cli.py @@ -3,7 +3,7 @@ import logging import sys import typing -from . import changelog, process_distgit, release +from .subcommands import changelog, process_distgit, release subcmd_modules_by_name = {} diff --git a/rpmautospec/process_distgit.py b/rpmautospec/process_distgit.py deleted file mode 100644 index d35c942..0000000 --- a/rpmautospec/process_distgit.py +++ /dev/null @@ -1,117 +0,0 @@ -import logging -import os -import shutil -import tempfile -from pathlib import Path -from typing import Union - -from .misc import check_specfile_features -from .pkg_history import PkgHistoryProcessor - - -log = logging.getLogger(__name__) -__here__ = os.path.dirname(__file__) - -autorelease_template = """## START: Set by rpmautospec -%define autorelease(e:s:pb:) %{{?-p:0.}}%{{lua: - release_number = {autorelease_number:d}; - base_release_number = tonumber(rpm.expand("%{{?-b*}}%{{!?-b:1}}")); - print(release_number + base_release_number - 1); -}}%{{?-e:.%{{-e*}}}}%{{?-s:.%{{-s*}}}}%{{?dist}} -## END: Set by rpmautospec -""" # noqa: E501 - - -def register_subcommand(subparsers): - subcmd_name = "process-distgit" - - process_distgit_parser = subparsers.add_parser( - subcmd_name, - help="Modify the contents of the specfile according to the repo", - ) - - process_distgit_parser.add_argument( - "spec_or_path", - help="Path to package worktree or the spec file within", - ) - - process_distgit_parser.add_argument( - "target", - help="Path where to write processed spec file", - ) - - return subcmd_name - - -def process_distgit(spec_or_path: Union[Path, str], target: Union[Path, str] = None) -> bool: - """Process an RPM spec file in a distgit repository. - - :param spec_or_path: the spec file or path of the repository - :return: whether or not the spec file needed processing - """ - processor = PkgHistoryProcessor(spec_or_path) - - if target is None: - target = processor.specfile - elif isinstance(target, Path): - target = Path(target) - - features = check_specfile_features(processor.specfile) - processing_necessary = ( - features.has_autorelease or features.has_autochangelog or not features.changelog_lineno - ) - if not processing_necessary: - return False - - needs_autochangelog = ( - features.changelog_lineno is None - and features.autochangelog_lineno is None - or features.has_autochangelog - ) - - visitors = [processor.release_number_visitor] - if needs_autochangelog: - visitors.append(processor.changelog_visitor) - result = processor.run(visitors=visitors) - - autorelease_number = result["release-number"] - - with processor.specfile.open("r") as specfile, tempfile.NamedTemporaryFile("w") as tmp_specfile: - # Process the spec file into a temporary file... - if features.has_autorelease: - # Write %autorelease macro header - print( - autorelease_template.format(autorelease_number=autorelease_number), - file=tmp_specfile, - ) - - for lineno, line in enumerate(specfile, start=1): - if features.changelog_lineno: - if features.has_autochangelog and lineno > features.changelog_lineno: - break - - else: - if features.has_autochangelog and lineno == features.autochangelog_lineno: - print("%changelog\n", file=tmp_specfile, end="") - break - print(line, file=tmp_specfile, end="") - - if not features.has_autochangelog and features.changelog_lineno is None: - print("\n%changelog\n", file=tmp_specfile, end="") - - if needs_autochangelog: - print( - "\n\n".join(entry["data"] for entry in result["changelog"]), - file=tmp_specfile, - ) - - tmp_specfile.flush() - - # ...and copy it back (potentially across device boundaries) - shutil.copy2(tmp_specfile.name, target) - - -def main(args): - """Main method.""" - spec_or_path = args.spec_or_path.rstrip(os.path.sep) - process_distgit(spec_or_path, args.target) diff --git a/rpmautospec/release.py b/rpmautospec/release.py deleted file mode 100644 index dee5b61..0000000 --- a/rpmautospec/release.py +++ /dev/null @@ -1,38 +0,0 @@ -import logging -from pathlib import Path -from typing import Union - -from .pkg_history import PkgHistoryProcessor - - -log = logging.getLogger(__name__) - - -def register_subcommand(subparsers): - subcmd_name = "calculate-release" - - calc_release_parser = subparsers.add_parser( - subcmd_name, - help="Calculate the next release tag for a package build", - ) - - calc_release_parser.add_argument( - "spec_or_path", - default=".", - nargs="?", - help="Path to package worktree or the spec file within", - ) - - return subcmd_name - - -def calculate_release(spec_or_path: Union[str, Path]) -> int: - processor = PkgHistoryProcessor(spec_or_path) - result = processor.run(visitors=(processor.release_number_visitor,)) - return result["release-complete"] - - -def main(args): - """Main method.""" - release = calculate_release(args.spec_or_path) - log.info("calculate_release release: %s", release) diff --git a/rpmautospec/subcommands/__init__.py b/rpmautospec/subcommands/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/rpmautospec/subcommands/__init__.py diff --git a/rpmautospec/subcommands/changelog.py b/rpmautospec/subcommands/changelog.py new file mode 100644 index 0000000..a1607ca --- /dev/null +++ b/rpmautospec/subcommands/changelog.py @@ -0,0 +1,60 @@ +import logging +from typing import Any, Dict, Optional, Union + +from ..pkg_history import PkgHistoryProcessor + + +log = logging.getLogger(__name__) + + +def register_subcommand(subparsers): + subcmd_name = "generate-changelog" + + gen_changelog_parser = subparsers.add_parser( + subcmd_name, + help="Generate changelog entries from git commit logs", + ) + + gen_changelog_parser.add_argument( + "spec_or_path", + default=".", + nargs="?", + help="Path to package worktree or the spec file within", + ) + + return subcmd_name + + +def _coerce_to_str(str_or_bytes: Union[str, bytes]) -> str: + if isinstance(str_or_bytes, bytes): + str_or_bytes = str_or_bytes.decode("utf-8", errors="replace") + return str_or_bytes + + +def _coerce_to_bytes(str_or_bytes: Union[str, bytes]) -> str: + if isinstance(str_or_bytes, str): + str_or_bytes = str_or_bytes.encode("utf-8") + return str_or_bytes + + +def collate_changelog( + processor_results: Dict[str, Any], result_type: Optional[type] = str +) -> Union[str, bytes]: + changelog = processor_results["changelog"] + if result_type == str: + entry_strings = (_coerce_to_str(entry["data"]) for entry in changelog) + else: + entry_strings = (_coerce_to_bytes(entry["data"]) for entry in changelog) + return "\n\n".join(entry_strings) + + +def produce_changelog(spec_or_repo): + processor = PkgHistoryProcessor(spec_or_repo) + result = processor.run(visitors=(processor.release_number_visitor, processor.changelog_visitor)) + return collate_changelog(result) + + +def main(args): + """Main method.""" + changelog = produce_changelog(args.spec_or_path) + log.info(changelog) diff --git a/rpmautospec/subcommands/process_distgit.py b/rpmautospec/subcommands/process_distgit.py new file mode 100644 index 0000000..c90cf8c --- /dev/null +++ b/rpmautospec/subcommands/process_distgit.py @@ -0,0 +1,117 @@ +import logging +import os +import shutil +import tempfile +from pathlib import Path +from typing import Union + +from ..misc import check_specfile_features +from ..pkg_history import PkgHistoryProcessor + + +log = logging.getLogger(__name__) +__here__ = os.path.dirname(__file__) + +autorelease_template = """## START: Set by rpmautospec +%define autorelease(e:s:pb:) %{{?-p:0.}}%{{lua: + release_number = {autorelease_number:d}; + base_release_number = tonumber(rpm.expand("%{{?-b*}}%{{!?-b:1}}")); + print(release_number + base_release_number - 1); +}}%{{?-e:.%{{-e*}}}}%{{?-s:.%{{-s*}}}}%{{?dist}} +## END: Set by rpmautospec +""" # noqa: E501 + + +def register_subcommand(subparsers): + subcmd_name = "process-distgit" + + process_distgit_parser = subparsers.add_parser( + subcmd_name, + help="Modify the contents of the specfile according to the repo", + ) + + process_distgit_parser.add_argument( + "spec_or_path", + help="Path to package worktree or the spec file within", + ) + + process_distgit_parser.add_argument( + "target", + help="Path where to write processed spec file", + ) + + return subcmd_name + + +def process_distgit(spec_or_path: Union[Path, str], target: Union[Path, str] = None) -> bool: + """Process an RPM spec file in a distgit repository. + + :param spec_or_path: the spec file or path of the repository + :return: whether or not the spec file needed processing + """ + processor = PkgHistoryProcessor(spec_or_path) + + if target is None: + target = processor.specfile + elif isinstance(target, Path): + target = Path(target) + + features = check_specfile_features(processor.specfile) + processing_necessary = ( + features.has_autorelease or features.has_autochangelog or not features.changelog_lineno + ) + if not processing_necessary: + return False + + needs_autochangelog = ( + features.changelog_lineno is None + and features.autochangelog_lineno is None + or features.has_autochangelog + ) + + visitors = [processor.release_number_visitor] + if needs_autochangelog: + visitors.append(processor.changelog_visitor) + result = processor.run(visitors=visitors) + + autorelease_number = result["release-number"] + + with processor.specfile.open("r") as specfile, tempfile.NamedTemporaryFile("w") as tmp_specfile: + # Process the spec file into a temporary file... + if features.has_autorelease: + # Write %autorelease macro header + print( + autorelease_template.format(autorelease_number=autorelease_number), + file=tmp_specfile, + ) + + for lineno, line in enumerate(specfile, start=1): + if features.changelog_lineno: + if features.has_autochangelog and lineno > features.changelog_lineno: + break + + else: + if features.has_autochangelog and lineno == features.autochangelog_lineno: + print("%changelog\n", file=tmp_specfile, end="") + break + print(line, file=tmp_specfile, end="") + + if not features.has_autochangelog and features.changelog_lineno is None: + print("\n%changelog\n", file=tmp_specfile, end="") + + if needs_autochangelog: + print( + "\n\n".join(entry["data"] for entry in result["changelog"]), + file=tmp_specfile, + ) + + tmp_specfile.flush() + + # ...and copy it back (potentially across device boundaries) + shutil.copy2(tmp_specfile.name, target) + + +def main(args): + """Main method.""" + spec_or_path = args.spec_or_path.rstrip(os.path.sep) + process_distgit(spec_or_path, args.target) diff --git a/rpmautospec/subcommands/release.py b/rpmautospec/subcommands/release.py new file mode 100644 index 0000000..8e5de13 --- /dev/null +++ b/rpmautospec/subcommands/release.py @@ -0,0 +1,38 @@ +import logging +from pathlib import Path +from typing import Union + +from ..pkg_history import PkgHistoryProcessor + + +log = logging.getLogger(__name__) + + +def register_subcommand(subparsers): + subcmd_name = "calculate-release" + + calc_release_parser = subparsers.add_parser( + subcmd_name, + help="Calculate the next release tag for a package build", + ) + + calc_release_parser.add_argument( + "spec_or_path", + default=".", + nargs="?", + help="Path to package worktree or the spec file within", + ) + + return subcmd_name + + +def calculate_release(spec_or_path: Union[str, Path]) -> int: + processor = PkgHistoryProcessor(spec_or_path) + result = processor.run(visitors=(processor.release_number_visitor,)) + return result["release-complete"] + + +def main(args): + """Main method.""" + release = calculate_release(args.spec_or_path) + log.info("calculate_release release: %s", release) diff --git a/tests/rpmautospec/subcommands/__init__.py b/tests/rpmautospec/subcommands/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/rpmautospec/subcommands/__init__.py diff --git a/tests/rpmautospec/subcommands/test_process_distgit.py b/tests/rpmautospec/subcommands/test_process_distgit.py new file mode 100644 index 0000000..0e4fc71 --- /dev/null +++ b/tests/rpmautospec/subcommands/test_process_distgit.py @@ -0,0 +1,240 @@ +import difflib +import os +import re +import shutil +from subprocess import run, check_output +import tarfile +import tempfile + +import pytest + +from rpmautospec.subcommands import process_distgit + + +__here__ = os.path.dirname(__file__) + + +class TestProcessDistgit: + """Test the rpmautospec.subcommands.process_distgit module""" + + autorelease_autochangelog_cases = [ + (autorelease_case, autochangelog_case) + for autorelease_case in ("unchanged", "with braces", "optional") + for autochangelog_case in ( + "unchanged", + "changelog case insensitive", + "changelog trailing garbage", + "line in between", + "trailing line", + "with braces", + "missing", + "optional", + ) + ] + + relnum_re = re.compile("^(?P[0-9]+)(?P.*)$") + + @classmethod + def relnum_split(cls, release): + match = cls.relnum_re.match(release) + # let this fail if the regex doesn't match + return int(match.group("relnum")), match.group("rest") + + @staticmethod + def fuzz_spec_file(spec_file_path, autorelease_case, autochangelog_case, run_git_amend): + """Fuzz a spec file in ways which shouldn't change the outcome""" + + with open(spec_file_path, "r") as orig, open(spec_file_path + ".new", "w") as new: + for line in orig: + if line.startswith("Release:") and autorelease_case != "unchanged": + if autorelease_case == "with braces": + print("Release: %{autorelease}", file=new) + elif autorelease_case == "optional": + print("Release: %{?autorelease}", file=new) + else: + raise ValueError(f"Unknown autorelease_case: {autorelease_case}") + elif line.strip() == "%changelog" and autochangelog_case != "unchanged": + if autochangelog_case == "changelog case insensitive": + print("%ChAnGeLoG", file=new) + elif autochangelog_case == "changelog trailing garbage": + print("%changelog with trailing garbage yes this works", file=new) + elif autochangelog_case == "line in between": + print("%changelog\n\n%autochangelog", file=new) + break + elif autochangelog_case == "trailing line": + print("%changelog\n%autochangelog\n", file=new) + break + elif autochangelog_case == "with braces": + print("%changelog\n%{autochangelog}", file=new) + break + elif autochangelog_case == "missing": + # do nothing, i.e. don't print a %changelog to file + break + elif autochangelog_case == "optional": + print("%changelog\n%{?autochangelog}", file=new) + break + else: + raise ValueError(f"Unknown autochangelog_case: {autochangelog_case}") + else: + print(line, file=new, end="") + + os.rename(spec_file_path + ".new", spec_file_path) + + if run_git_amend: + # Ensure worktree doesn't differ + workdir = os.path.dirname(spec_file_path) + commit_timestamp = check_output( + ["git", "log", "-1", "--pretty=format:%cI"], + cwd=workdir, + encoding="ascii", + ).strip() + env = os.environ.copy() + # Set name and email explicitly so CI doesn't trip over them being unset. + env.update( + { + "GIT_COMMITTER_NAME": "Test User", + "GIT_COMMITTER_EMAIL": "", + "GIT_COMMITTER_DATE": commit_timestamp, + } + ) + run( + ["git", "commit", "--all", "--allow-empty", "--amend", "--no-edit"], + cwd=workdir, + env=env, + ) + + @pytest.mark.parametrize("overwrite_specfile", (False, True)) + @pytest.mark.parametrize("dirty_worktree", (False, True)) + @pytest.mark.parametrize("autorelease_case", ("unchanged", "with braces", "optional")) + @pytest.mark.parametrize( + "autochangelog_case", + ( + "unchanged", + "changelog case insensitive", + "changelog trailing garbage", + "line in between", + "trailing line", + "with braces", + "missing", + "optional", + ), + ) + def test_process_distgit( + self, overwrite_specfile, dirty_worktree, autorelease_case, autochangelog_case + ): + """Test the process_distgit() function""" + with tempfile.TemporaryDirectory() as workdir: + with tarfile.open( + os.path.join( + __here__, + os.path.pardir, + os.path.pardir, + "test-data", + "repodata", + "dummy-test-package-gloster-git.tar.gz", + ) + ) as tar: + tar.extractall(path=workdir) + + unpacked_repo_dir = os.path.join(workdir, "dummy-test-package-gloster") + test_spec_file_path = os.path.join( + unpacked_repo_dir, + "dummy-test-package-gloster.spec", + ) + + if autorelease_case != "unchanged" or autochangelog_case != "unchanged": + self.fuzz_spec_file( + test_spec_file_path, + autorelease_case, + autochangelog_case, + run_git_amend=not dirty_worktree, + ) + + if overwrite_specfile: + target_spec_file_path = None + else: + target_spec_file_path = os.path.join(workdir, "test-this-specfile-please.spec") + + orig_test_spec_file_stat = os.stat(test_spec_file_path) + process_distgit.process_distgit(unpacked_repo_dir, target_spec_file_path) + if not overwrite_specfile: + test_spec_file_stat = os.stat(test_spec_file_path) + # we can't compare stat_results directly because st_atime has changed + for attr in ("mode", "ino", "dev", "uid", "gid", "size", "mtime", "ctime"): + assert getattr(test_spec_file_stat, "st_" + attr) == getattr( + orig_test_spec_file_stat, "st_" + attr + ) + + expected_spec_file_path = os.path.join( + __here__, + os.path.pardir, + os.path.pardir, + "test-data", + "repodata", + "dummy-test-package-gloster.spec.expected", + ) + + with tempfile.NamedTemporaryFile() as tmpspec: + shutil.copy2(expected_spec_file_path, tmpspec.name) + if autorelease_case != "unchanged" or autochangelog_case != "unchanged": + if autochangelog_case not in ( + "changelog case insensitive", + "changelog trailing garbage", + ): + # "%changelog", "%ChAnGeLoG", ... stay verbatim, trick fuzz_spec_file() to + # leave the rest of the cases as is, the %autorelease macro is expanded. + fuzz_autochangelog_case = "unchanged" + else: + fuzz_autochangelog_case = autochangelog_case + expected_spec_file_path = tmpspec.name + self.fuzz_spec_file( + expected_spec_file_path, + autorelease_case, + fuzz_autochangelog_case, + run_git_amend=False, + ) + + rpm_cmd = ["rpm", "--define", "dist .fc32", "--specfile"] + + if target_spec_file_path: + test_cmd = rpm_cmd + [target_spec_file_path] + else: + test_cmd = rpm_cmd + [test_spec_file_path] + expected_cmd = rpm_cmd + [expected_spec_file_path] + + q_release = ["--qf", "%{release}\n"] + test_output = check_output(test_cmd + q_release, encoding="utf-8").strip() + test_relnum, test_rest = self.relnum_split(test_output) + expected_output = check_output(expected_cmd + q_release, encoding="utf-8").strip() + expected_relnum, expected_rest = self.relnum_split(expected_output) + + if dirty_worktree and ( + autorelease_case != "unchanged" or autochangelog_case != "unchanged" + ): + assert test_relnum == expected_relnum + 1 + else: + assert test_relnum == expected_relnum + + assert test_rest == expected_rest + + q_changelog = ["--changelog"] + test_output = check_output(test_cmd + q_changelog, encoding="utf-8") + expected_output = check_output(expected_cmd + q_changelog, encoding="utf-8") + + if dirty_worktree and ( + autorelease_case != "unchanged" or autochangelog_case != "unchanged" + ): + diff = list( + difflib.ndiff(expected_output.splitlines(), test_output.splitlines()) + ) + # verify entry for uncommitted changes + assert all(line.startswith("+ ") for line in diff[:3]) + assert diff[0].endswith(f"-{expected_relnum + 1}") + assert diff[1] == "+ - Uncommitted changes" + assert diff[2] == "+ " + + # verify the rest is the expected changelog + assert all(line.startswith(" ") for line in diff[3:]) + assert expected_output.splitlines() == [line[2:] for line in diff[3:]] + else: + assert test_output == expected_output diff --git a/tests/rpmautospec/subcommands/test_release.py b/tests/rpmautospec/subcommands/test_release.py new file mode 100644 index 0000000..42727f1 --- /dev/null +++ b/tests/rpmautospec/subcommands/test_release.py @@ -0,0 +1,45 @@ +import logging +import os.path +import tarfile +import tempfile +from pathlib import Path +from unittest.mock import Mock + +import pytest + +from rpmautospec.subcommands import release + + +__here__ = os.path.dirname(__file__) + + +class TestRelease: + """Test the rpmautospec.subcommands.release module""" + + @pytest.mark.parametrize("method_to_test", ("calculate_release", "main")) + def test_calculate_release(self, method_to_test, caplog): + with tempfile.TemporaryDirectory() as workdir: + with tarfile.open( + os.path.join( + __here__, + os.path.pardir, + os.path.pardir, + "test-data", + "repodata", + "dummy-test-package-gloster-git.tar.gz", + ) + ) as tar: + tar.extractall(path=workdir) + + unpacked_repo_dir = Path(workdir) / "dummy-test-package-gloster" + + expected_release = "11" + + if method_to_test == "calculate_release": + assert release.calculate_release(unpacked_repo_dir) == expected_release + else: + with caplog.at_level(logging.INFO): + args = Mock() + args.spec_or_path = unpacked_repo_dir + release.main(args) + assert f"calculate_release release: {expected_release}" in caplog.text diff --git a/tests/rpmautospec/test_process_distgit.py b/tests/rpmautospec/test_process_distgit.py deleted file mode 100644 index 6ca2cc9..0000000 --- a/tests/rpmautospec/test_process_distgit.py +++ /dev/null @@ -1,238 +0,0 @@ -import difflib -import os -import re -import shutil -from subprocess import run, check_output -import tarfile -import tempfile - -import pytest - -from rpmautospec import process_distgit - - -__here__ = os.path.dirname(__file__) - - -class TestProcessDistgit: - """Test the rpmautospec.process_distgit module""" - - autorelease_autochangelog_cases = [ - (autorelease_case, autochangelog_case) - for autorelease_case in ("unchanged", "with braces", "optional") - for autochangelog_case in ( - "unchanged", - "changelog case insensitive", - "changelog trailing garbage", - "line in between", - "trailing line", - "with braces", - "missing", - "optional", - ) - ] - - relnum_re = re.compile("^(?P[0-9]+)(?P.*)$") - - @classmethod - def relnum_split(cls, release): - match = cls.relnum_re.match(release) - # let this fail if the regex doesn't match - return int(match.group("relnum")), match.group("rest") - - @staticmethod - def fuzz_spec_file(spec_file_path, autorelease_case, autochangelog_case, run_git_amend): - """Fuzz a spec file in ways which shouldn't change the outcome""" - - with open(spec_file_path, "r") as orig, open(spec_file_path + ".new", "w") as new: - for line in orig: - if line.startswith("Release:") and autorelease_case != "unchanged": - if autorelease_case == "with braces": - print("Release: %{autorelease}", file=new) - elif autorelease_case == "optional": - print("Release: %{?autorelease}", file=new) - else: - raise ValueError(f"Unknown autorelease_case: {autorelease_case}") - elif line.strip() == "%changelog" and autochangelog_case != "unchanged": - if autochangelog_case == "changelog case insensitive": - print("%ChAnGeLoG", file=new) - elif autochangelog_case == "changelog trailing garbage": - print("%changelog with trailing garbage yes this works", file=new) - elif autochangelog_case == "line in between": - print("%changelog\n\n%autochangelog", file=new) - break - elif autochangelog_case == "trailing line": - print("%changelog\n%autochangelog\n", file=new) - break - elif autochangelog_case == "with braces": - print("%changelog\n%{autochangelog}", file=new) - break - elif autochangelog_case == "missing": - # do nothing, i.e. don't print a %changelog to file - break - elif autochangelog_case == "optional": - print("%changelog\n%{?autochangelog}", file=new) - break - else: - raise ValueError(f"Unknown autochangelog_case: {autochangelog_case}") - else: - print(line, file=new, end="") - - os.rename(spec_file_path + ".new", spec_file_path) - - if run_git_amend: - # Ensure worktree doesn't differ - workdir = os.path.dirname(spec_file_path) - commit_timestamp = check_output( - ["git", "log", "-1", "--pretty=format:%cI"], - cwd=workdir, - encoding="ascii", - ).strip() - env = os.environ.copy() - # Set name and email explicitly so CI doesn't trip over them being unset. - env.update( - { - "GIT_COMMITTER_NAME": "Test User", - "GIT_COMMITTER_EMAIL": "", - "GIT_COMMITTER_DATE": commit_timestamp, - } - ) - run( - ["git", "commit", "--all", "--allow-empty", "--amend", "--no-edit"], - cwd=workdir, - env=env, - ) - - @pytest.mark.parametrize("overwrite_specfile", (False, True)) - @pytest.mark.parametrize("dirty_worktree", (False, True)) - @pytest.mark.parametrize("autorelease_case", ("unchanged", "with braces", "optional")) - @pytest.mark.parametrize( - "autochangelog_case", - ( - "unchanged", - "changelog case insensitive", - "changelog trailing garbage", - "line in between", - "trailing line", - "with braces", - "missing", - "optional", - ), - ) - def test_process_distgit( - self, overwrite_specfile, dirty_worktree, autorelease_case, autochangelog_case - ): - """Test the process_distgit() function""" - with tempfile.TemporaryDirectory() as workdir: - with tarfile.open( - os.path.join( - __here__, - os.path.pardir, - "test-data", - "repodata", - "dummy-test-package-gloster-git.tar.gz", - ) - ) as tar: - tar.extractall(path=workdir) - - unpacked_repo_dir = os.path.join(workdir, "dummy-test-package-gloster") - test_spec_file_path = os.path.join( - unpacked_repo_dir, - "dummy-test-package-gloster.spec", - ) - - if autorelease_case != "unchanged" or autochangelog_case != "unchanged": - self.fuzz_spec_file( - test_spec_file_path, - autorelease_case, - autochangelog_case, - run_git_amend=not dirty_worktree, - ) - - if overwrite_specfile: - target_spec_file_path = None - else: - target_spec_file_path = os.path.join(workdir, "test-this-specfile-please.spec") - - orig_test_spec_file_stat = os.stat(test_spec_file_path) - process_distgit.process_distgit(unpacked_repo_dir, target_spec_file_path) - if not overwrite_specfile: - test_spec_file_stat = os.stat(test_spec_file_path) - # we can't compare stat_results directly because st_atime has changed - for attr in ("mode", "ino", "dev", "uid", "gid", "size", "mtime", "ctime"): - assert getattr(test_spec_file_stat, "st_" + attr) == getattr( - orig_test_spec_file_stat, "st_" + attr - ) - - expected_spec_file_path = os.path.join( - __here__, - os.path.pardir, - "test-data", - "repodata", - "dummy-test-package-gloster.spec.expected", - ) - - with tempfile.NamedTemporaryFile() as tmpspec: - shutil.copy2(expected_spec_file_path, tmpspec.name) - if autorelease_case != "unchanged" or autochangelog_case != "unchanged": - if autochangelog_case not in ( - "changelog case insensitive", - "changelog trailing garbage", - ): - # "%changelog", "%ChAnGeLoG", ... stay verbatim, trick fuzz_spec_file() to - # leave the rest of the cases as is, the %autorelease macro is expanded. - fuzz_autochangelog_case = "unchanged" - else: - fuzz_autochangelog_case = autochangelog_case - expected_spec_file_path = tmpspec.name - self.fuzz_spec_file( - expected_spec_file_path, - autorelease_case, - fuzz_autochangelog_case, - run_git_amend=False, - ) - - rpm_cmd = ["rpm", "--define", "dist .fc32", "--specfile"] - - if target_spec_file_path: - test_cmd = rpm_cmd + [target_spec_file_path] - else: - test_cmd = rpm_cmd + [test_spec_file_path] - expected_cmd = rpm_cmd + [expected_spec_file_path] - - q_release = ["--qf", "%{release}\n"] - test_output = check_output(test_cmd + q_release, encoding="utf-8").strip() - test_relnum, test_rest = self.relnum_split(test_output) - expected_output = check_output(expected_cmd + q_release, encoding="utf-8").strip() - expected_relnum, expected_rest = self.relnum_split(expected_output) - - if dirty_worktree and ( - autorelease_case != "unchanged" or autochangelog_case != "unchanged" - ): - assert test_relnum == expected_relnum + 1 - else: - assert test_relnum == expected_relnum - - assert test_rest == expected_rest - - q_changelog = ["--changelog"] - test_output = check_output(test_cmd + q_changelog, encoding="utf-8") - expected_output = check_output(expected_cmd + q_changelog, encoding="utf-8") - - if dirty_worktree and ( - autorelease_case != "unchanged" or autochangelog_case != "unchanged" - ): - diff = list( - difflib.ndiff(expected_output.splitlines(), test_output.splitlines()) - ) - # verify entry for uncommitted changes - assert all(line.startswith("+ ") for line in diff[:3]) - assert diff[0].endswith(f"-{expected_relnum + 1}") - assert diff[1] == "+ - Uncommitted changes" - assert diff[2] == "+ " - - # verify the rest is the expected changelog - assert all(line.startswith(" ") for line in diff[3:]) - assert expected_output.splitlines() == [line[2:] for line in diff[3:]] - else: - assert test_output == expected_output diff --git a/tests/rpmautospec/test_release.py b/tests/rpmautospec/test_release.py deleted file mode 100644 index 7f65cc0..0000000 --- a/tests/rpmautospec/test_release.py +++ /dev/null @@ -1,44 +0,0 @@ -import logging -import os.path -import tarfile -import tempfile -from pathlib import Path -from unittest.mock import Mock - -import pytest - -from rpmautospec import release - - -__here__ = os.path.dirname(__file__) - - -class TestRelease: - """Test the rpmautospec.release module""" - - @pytest.mark.parametrize("method_to_test", ("calculate_release", "main")) - def test_calculate_release(self, method_to_test, caplog): - with tempfile.TemporaryDirectory() as workdir: - with tarfile.open( - os.path.join( - __here__, - os.path.pardir, - "test-data", - "repodata", - "dummy-test-package-gloster-git.tar.gz", - ) - ) as tar: - tar.extractall(path=workdir) - - unpacked_repo_dir = Path(workdir) / "dummy-test-package-gloster" - - expected_release = "11" - - if method_to_test == "calculate_release": - assert release.calculate_release(unpacked_repo_dir) == expected_release - else: - with caplog.at_level(logging.INFO): - args = Mock() - args.spec_or_path = unpacked_repo_dir - release.main(args) - assert f"calculate_release release: {expected_release}" in caplog.text