From 14217cd1485862718cc669d2d83bb80a5d36fab0 Mon Sep 17 00:00:00 2001 From: Michal Konečný Date: Jul 28 2020 15:25:37 +0000 Subject: Use pytest Signed-off-by: Michal Konečný --- diff --git a/README.rst b/README.rst index 4c65ede..0504b3c 100644 --- a/README.rst +++ b/README.rst @@ -75,7 +75,7 @@ Tests The tests here require the *test suite* of pagure itself to work. You have to modify your PYTHONPATH to find them. Run with:: - $ PYTHONPATH=.:/path/to/pagure/checkout nosetests pagure_distgit_tests/ + $ PYTHONPATH=.:/path/to/pagure/checkout pytest pagure_distgit_tests/ You can use our requirements-testing.txt to install testing dependencies with pip: $ pip install -r /path/to/pagure/checkout/requirements.txt diff --git a/pagure_distgit_tests/bugzilla_overrides_tests.py b/pagure_distgit_tests/bugzilla_overrides_tests.py deleted file mode 100644 index a81cf64..0000000 --- a/pagure_distgit_tests/bugzilla_overrides_tests.py +++ /dev/null @@ -1,361 +0,0 @@ -from __future__ import print_function - -import os -import json - - -import pagure.lib.query - -import tests -from pagure_distgit import plugin - - -class PagureFlaskApiProjectBZOverrideTests(tests.Modeltests): - """ Tests the bugzilla override endpoints added in pagure-dist-git. """ - - def setUp(self): - """ Set up the environnment, ran before every tests. """ - super(PagureFlaskApiProjectBZOverrideTests, self).setUp() - self.session.flush() - tests.create_projects(self.session) - tests.create_projects_git(os.path.join(self.path, "repos"), bare=True) - tests.create_tokens(self.session) - tests.create_tokens_acl(self.session) - self._app.register_blueprint(plugin.DISTGIT_NS) - - def test_default_values(self): - """Test the default values returned by the bz overrides endpoint. """ - - expected_result = { - "epel_assignee": "pingou", - "fedora_assignee": "pingou", - } - output = self.app.get("/_dg/bzoverrides/somenamespace/test3") - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_token_missing_ACL(self): - """Test the bz endpoint with an API token missing the `modify_project` ACL. - """ - headers = {"Authorization": "token foo_token"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", headers=headers - ) - # invalid token - self.assertEqual(output.status_code, 401) - - def test_invalid_token(self): - """Test the bz endpoint with an invalid API token. """ - headers = {"Authorization": "token BBBZZZOOO"} - datainput = {} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 401) - - def test_change_both_assignee(self): - """Test the bz endpoint when changing both assignee at once. """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "foo", "fedora_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, datainput) - - def test_change_fedora_assignee(self): - """Test the bz endpoint when changing the Fedora assignee while keeping - the EPEL one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "foo", "fedora_assignee": "pingou"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, datainput) - - def test_change_invalid_fedora_assignee(self): - """Test the bz endpoint when changing the Fedora assignee while keeping - the EPEL one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "foo", "fedora_assignee": "invalid"} - expected_result = { - "error": "Invalid or incomplete input submitted", - "error_code": "EINVALIDREQ", - "errors": ["Invalid user or group name as fedora_assignee"], - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 400) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_change_invalid_fedora_group_assignee(self): - """Test the bz endpoint when changing the Fedora assignee while keeping - the EPEL one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "foo", "fedora_assignee": "@invalid"} - expected_result = { - "error": "Invalid or incomplete input submitted", - "error_code": "EINVALIDREQ", - "errors": ["Invalid user or group name as fedora_assignee"], - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 400) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_change_epel_assignee(self): - """Test the bz endpoint when changing the EPEL assignee while keeping - the Fedora one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "", "fedora_assignee": None} - expected_result = { - "epel_assignee": "pingou", - "fedora_assignee": "pingou", - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_change_invalid_epel_assignee(self): - """Test the bz endpoint when changing the EPEL assignee while keeping - the Fedora one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "invalid", "fedora_assignee": None} - expected_result = { - "error": "Invalid or incomplete input submitted", - "error_code": "EINVALIDREQ", - "errors": ["Invalid user or group name as epel_assignee"], - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 400) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_change_invalid_epel_group_assignee(self): - """Test the bz endpoint when changing the EPEL assignee while keeping - the Fedora one. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "@invalid", "fedora_assignee": None} - expected_result = { - "error": "Invalid or incomplete input submitted", - "error_code": "EINVALIDREQ", - "errors": ["Invalid user or group name as epel_assignee"], - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 400) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, expected_result) - - def test_reset_fedora_assignees(self): - """Test the bz endpoint when resetting the Fedora assignee. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"fedora_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "foo"} - ) - repo = pagure.lib.query.get_authorized_project( - self.session, "test3", namespace="somenamespace", - ) - self.assertIsNotNone(repo.bzoverride) - - datainput = {"fedora_assignee": ""} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} - ) - repo = pagure.lib.query.get_authorized_project( - self.session, "test3", namespace="somenamespace", - ) - self.assertIsNotNone(repo.bzoverride) - - def test_reset_epel_assignees(self): - """Test the bz endpoint when resetting the EPEL assignee. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {"epel_assignee": "foo", "fedora_assignee": "pingou"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual(data, datainput) - - datainput = {"fedora_assignee": "pingou"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} - ) - - repo = pagure.lib.query.get_authorized_project( - self.session, "test3", namespace="somenamespace", - ) - self.assertIsNotNone(repo.bzoverride) - - def test_reset_both_assignees(self): - """Test the bz endpoint when resetting the both assignees. - """ - headers = {"Authorization": "token aaabbbcccddd"} - datainput = {} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", - data=datainput, - headers=headers, - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} - ) - - repo = pagure.lib.query.get_authorized_project( - self.session, "test3", namespace="somenamespace", - ) - self.assertIsNone(repo.bzoverride) - - def test_changing_assignees_logged_in_invalid_user(self): - """Test the bz endpoint when changing the both assignees when logged in - the UI but with an user that is not allowed. - """ - user = tests.FakeUser(username="foo") - with tests.user_set(self.app.application, user): - expected_result = { - "epel_assignee": "pingou", - "fedora_assignee": "pingou", - } - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", data=expected_result - ) - self.assertEqual(output.status_code, 401) - - def test_change_epel_assignee_logged_in(self): - """Test the bz endpoint when changing the EPEL assignee when logged in - the UI. - """ - user = tests.FakeUser(username="pingou") - with tests.user_set(self.app.application, user): - # change one assignee - datainput = {"epel_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", data=datainput - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "foo", "fedora_assignee": "pingou"} - ) - - def test_change_fedora_assignee_logged_in(self): - """Test the bz endpoint when changing the Fedora assignee when logged in - the UI. - """ - user = tests.FakeUser(username="pingou") - with tests.user_set(self.app.application, user): - # change one assignee - datainput = {"fedora_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", data=datainput - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "foo"} - ) - - def test_change_both_assignees_logged_in(self): - """Test the bz endpoint when changing the both assignees when logged in - the UI. - """ - user = tests.FakeUser(username="pingou") - with tests.user_set(self.app.application, user): - datainput = {"epel_assignee": "foo", "fedora_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", data=datainput - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "foo", "fedora_assignee": "foo"} - ) - - def test_resetting_assignees_logged_in(self): - """Test the bz endpoint when resetting assignees when logged in the UI. - """ - user = tests.FakeUser(username="pingou") - with tests.user_set(self.app.application, user): - # Changing one of them before the reset - datainput = {"epel_assignee": "foo"} - output = self.app.post( - "/_dg/bzoverrides/somenamespace/test3", data=datainput - ) - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "foo", "fedora_assignee": "pingou"} - ) - - # Resetting - output = self.app.post("/_dg/bzoverrides/somenamespace/test3") - self.assertEqual(output.status_code, 200) - data = json.loads(output.get_data(as_text=True)) - self.assertDictEqual( - data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} - ) diff --git a/pagure_distgit_tests/dist_git_auth_tests.py b/pagure_distgit_tests/dist_git_auth_tests.py deleted file mode 100644 index 965ba0b..0000000 --- a/pagure_distgit_tests/dist_git_auth_tests.py +++ /dev/null @@ -1,776 +0,0 @@ -from __future__ import print_function - -from mock import Mock, patch - -import pagure.config -import pagure.exceptions -import pagure.lib.model - -try: - from pagure.lib import _get_project as get_project -except ImportError: - # From pagure 5.2, code has been moved to pagure.lib.query - from pagure.lib.query import _get_project as get_project - -import tests - -import dist_git_auth - - -def patch_pdc(values): - """ Decorator to patch the PDC calls to return values for this test. - - Args: - values (dict): A dictionary where the keys are project fullnames - (i.e. namespace/name), and the values are dicts with key branch - names, and values their supported status. - Note that as namespace, the PDC "type" is used, which lacks the "s" - for rpms and modules. - e.g.: {'rpm/test': {'f28': True, 'f27': False}} - """ - - def pdc_get_paged(_, global_component, type, name, fields): - """ Function that emulates the pdc.get_paged call - - Args as provided by dist_git_auth's calls to it. - - Args: - _ (anything): PDCClient internal - global_component (string): package name - type (string): The PDC "type": "rpm", "module", ... - name (string): The branch name - fields (list): Always ["active"] - """ - fullname = "%s/%s" % (type, global_component) - if fullname not in values: - return [] - val = values[fullname] - if name not in val: - return [] - val = val[name] - if val in (True, False): - return [{"active": val}] - # This case is used to emulate "weird" results - return val - - def decorator(func): - def test_wrapper(*args, **kwargs): - if dist_git_auth.PDCClient: - with patch.object(dist_git_auth.PDCClient, "__getitem__"): - with patch.object( - dist_git_auth.PDCClient, - "get_paged", - side_effect=pdc_get_paged, - ): - return func(*args, **kwargs) - - return test_wrapper - - return decorator - - -class DistGitAuthTests(tests.Modeltests): - """ Test DistGitAuth ACLs with Fedora config. """ - - maxDiff = None - - def setUp(self): - """ Set up the environment in which to run the tests. """ - super(DistGitAuthTests, self).setUp() - - pagure.config.config["ACL_DEBUG"] = True - pagure.config.config["EXTERNAL_COMMITTER"] = {"relenggroup": {}} - pagure.config.config.update(self.dga_config) - - self.dga = dist_git_auth.DistGitAuth() - self.dga.info = Mock(side_effect=print) - # We default to saying it's not a forced push - dist_git_auth.is_forced_push = Mock(return_value=False) - - # Create an RCM user/group - rcmuser = pagure.lib.model.User( - user="releng", - fullname="Release Engineering", - token="aaabbbcd", - default_email="rcm@local.local", - ) - self.session.add(rcmuser) - self.session.flush() - rcmgroup = pagure.lib.model.PagureGroup( - group_name="relenggroup", - group_type="user", - display_name="Releng group", - user_id=rcmuser.id, - ) - self.session.add(rcmgroup) - self.session.flush() - rcmuser.group_objs.append(rcmgroup) - self.session.commit() - - def tearDown(self): - """ Tear down the environment in which the tests ran. """ - self.dga = None - super(DistGitAuthTests, self).tearDown() - - def create_namespaced_project( - self, namespace, name, is_fork=False, settings={} - ): - item = pagure.lib.model.Project( - user_id=1, # pingou - name=name, - is_fork=is_fork, - parent_id=3 if is_fork else None, - description="namespaced test project", - hook_token="aaabbbeee", - namespace=namespace, - settings=settings, - ) - item.close_status = [ - "Invalid", - "Insufficient data", - "Fixed", - "Duplicate", - ] - self.session.add(item) - self.session.commit() - return get_project(self.session, name=name, namespace=namespace) - - def expect_info_msg(self, expect_msg): - found = False - for call in self.dga.info.call_args_list: - args = call[0] - msg = args[0] - if msg == expect_msg: - found = True - if not found: - raise AssertionError( - "Info message '%s' expected but not found" % expect_msg - ) - - -class DistGitAuthTestsGeneric(DistGitAuthTests): - dga_config = {} - - def test_unused_repotype(self): - self.assertFalse( - self.dga.check_acl( - self.session, - project=None, - username=None, - refname=None, - pull_request=None, - repodir=None, - repotype="tickets", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Repotype tickets not in use") - - def test_branch_deletion(self): - dist_git_auth.is_forced_push.return_value = True - - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username=None, - refname=None, - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto="0000000000000000000000000000000000000000", - is_internal=False, - ) - ) - - self.expect_info_msg("Branch deletion is not allowed") - - def test_forced_push(self): - dist_git_auth.is_forced_push.return_value = True - - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username=None, - refname=None, - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Forced pushes are not allowed") - - def test_internal(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username=None, - refname=None, - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=True, - ) - ) - - self.expect_info_msg("Internal push allowed") - - def test_deploykey(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="deploykey_foobar", - refname=None, - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Deploy keys are disabled") - - def test_invalid_user(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertRaises( - pagure.exceptions.PagureException, - self.dga.check_acl, - self.session, - project=project, - username="nosuchuser", - refname=None, - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - - def test_unprotected_committer(self): - project = self.create_namespaced_project("unprotected", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Committer push") - - def test_unprotected_non_committer(self): - project = self.create_namespaced_project("unprotected", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="foo", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Fall-through deny") - - def test_unprotected_pr_required_pr(self): - project = self.create_namespaced_project("unprotected", "test") - self.dga.global_pr_only = True - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=True, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Committer push") - - def test_unprotected_pr_required_no_pr(self): - project = self.create_namespaced_project("unprotected", "test") - self.dga.global_pr_only = True - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("A pull request is required for this branch") - - def test_unprotected_pr_required_no_pr_cvsadmin(self): - project = self.create_namespaced_project("unprotected", "test") - self.dga.global_pr_only = True - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="releng", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Committer push") - - def test_unprotected_pr_required_requests(self): - project = self.create_namespaced_project("unprotected", "test") - self.dga.global_pr_only = True - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="requests", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Committer push") - - def test_unprotected_pr_required_repo_pr_only_no_pr(self): - settings = {"pull_request_access_only": True} - project = self.create_namespaced_project( - "unprotected", "test", settings=settings - ) - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("A pull request is required for this branch") - - def test_unprotected_pr_required_repo_pr_only(self): - settings = {"pull_request_access_only": True} - project = self.create_namespaced_project( - "unprotected", "test", settings=settings - ) - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request="pull_request", - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Committer push") - - -class DistGitAuthTestsFedora(DistGitAuthTests): - dga_config = { - "PR_ONLY": False, - "ACL_BLOCK_UNSPECIFIED": False, - "BLACKLIST_RES": ["refs/heads/c[0-9]+.*"], - "UNSPECIFIED_BLACKLIST_RES": ["refs/heads/f[0-9]+"], - "RCM_GROUP": "relenggroup", - "RCM_BRANCHES": ["refs/heads/f[0-9]+"], - "ACL_PROTECTED_NAMESPACES": ["rpms", "modules", "container"], - "PDC_URL": "invalid://", - } - - def test_protected_blacklisted_ref(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/c7", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Ref refs/heads/c7 is blocked") - - def test_protected_rcm(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="releng", - refname="refs/heads/f27", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("RCM push") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_unsupported_branch(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/f26", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Branch refs/heads/f26 is unsupported") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_supported_branch_committer(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/f27", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Branch refs/heads/f27 is supported") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_supported_branch_non_committer(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="foo", - refname="refs/heads/f27", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Branch refs/heads/f27 is supported") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_unspecified_branch_blacklisted(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/f28", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Unspecified ref refs/heads/f28 is blocked") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_unspecified_branch_normal_committer(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Unspecified branch push") - - @patch_pdc({"rpm/test": {"f26": False, "f27": True}}) - def test_protected_unspecified_branch_normal_non_committer(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="foo", - refname="refs/heads/mywip", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Unspecified branch push") - - -class DistGitAuthTestsCentOS(DistGitAuthTests): - dga_config = { - "PR_ONLY": False, - "ACL_BLOCK_UNSPECIFIED": True, - "BLACKLIST_RES": ["refs/heads/f[0-9]+.*"], - "RCM_GROUP": "relenggroup", - "RCM_BRANCHES": ["refs/heads/c[0-9]+"], - "SUPPORTED_SIGS": ["sig-core"], - "SIG_PREFIXES": ["refs/heads/c7"], - "ACL_PROTECTED_NAMESPACES": ["rpms"], - } - - def setUp(self): - super(DistGitAuthTestsCentOS, self).setUp() - - # Create an RCM user/group - arrfab = pagure.lib.model.User( - user="arrfab", - fullname="Fabian Arriton", - token="aaabbbcd", - default_email="arrfab@local.local", - ) - self.session.add(arrfab) - self.session.flush() - sigcore = pagure.lib.model.PagureGroup( - group_name="sig-core", - group_type="user", - display_name="Core SIG group", - user_id=arrfab.id, - ) - self.session.add(sigcore) - self.session.flush() - arrfab.group_objs.append(sigcore) - self.session.commit() - - def test_protected_blacklisted_ref(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/f27", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Ref refs/heads/f27 is blocked") - - def test_protected_rcm(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="releng", - refname="refs/heads/c7", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("RCM push") - - def test_protected_sig_sig_member(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="arrfab", - refname="refs/heads/c7-sig-core-test", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("SIG push") - - def test_protected_sig_sig_member_precise(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertTrue( - self.dga.check_acl( - self.session, - project=project, - username="arrfab", - refname="refs/heads/c7-sig-core", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("SIG push") - - def test_protected_sig_no_sig_member(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="pingou", - refname="refs/heads/c7-sig-core-test", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Access to namespace rpms is restricted") - - def test_protected_sig_sig_member_no_sig_branch(self): - project = self.create_namespaced_project("rpms", "test") - - self.assertFalse( - self.dga.check_acl( - self.session, - project=project, - username="arrfab", - refname="refs/heads/c7", - pull_request=None, - repodir=None, - repotype="main", - revfrom=None, - revto=None, - is_internal=False, - ) - ) - - self.expect_info_msg("Access to namespace rpms is restricted") diff --git a/pagure_distgit_tests/test_bugzilla_overrides.py b/pagure_distgit_tests/test_bugzilla_overrides.py new file mode 100644 index 0000000..a81cf64 --- /dev/null +++ b/pagure_distgit_tests/test_bugzilla_overrides.py @@ -0,0 +1,361 @@ +from __future__ import print_function + +import os +import json + + +import pagure.lib.query + +import tests +from pagure_distgit import plugin + + +class PagureFlaskApiProjectBZOverrideTests(tests.Modeltests): + """ Tests the bugzilla override endpoints added in pagure-dist-git. """ + + def setUp(self): + """ Set up the environnment, ran before every tests. """ + super(PagureFlaskApiProjectBZOverrideTests, self).setUp() + self.session.flush() + tests.create_projects(self.session) + tests.create_projects_git(os.path.join(self.path, "repos"), bare=True) + tests.create_tokens(self.session) + tests.create_tokens_acl(self.session) + self._app.register_blueprint(plugin.DISTGIT_NS) + + def test_default_values(self): + """Test the default values returned by the bz overrides endpoint. """ + + expected_result = { + "epel_assignee": "pingou", + "fedora_assignee": "pingou", + } + output = self.app.get("/_dg/bzoverrides/somenamespace/test3") + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_token_missing_ACL(self): + """Test the bz endpoint with an API token missing the `modify_project` ACL. + """ + headers = {"Authorization": "token foo_token"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", headers=headers + ) + # invalid token + self.assertEqual(output.status_code, 401) + + def test_invalid_token(self): + """Test the bz endpoint with an invalid API token. """ + headers = {"Authorization": "token BBBZZZOOO"} + datainput = {} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 401) + + def test_change_both_assignee(self): + """Test the bz endpoint when changing both assignee at once. """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "foo", "fedora_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, datainput) + + def test_change_fedora_assignee(self): + """Test the bz endpoint when changing the Fedora assignee while keeping + the EPEL one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "foo", "fedora_assignee": "pingou"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, datainput) + + def test_change_invalid_fedora_assignee(self): + """Test the bz endpoint when changing the Fedora assignee while keeping + the EPEL one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "foo", "fedora_assignee": "invalid"} + expected_result = { + "error": "Invalid or incomplete input submitted", + "error_code": "EINVALIDREQ", + "errors": ["Invalid user or group name as fedora_assignee"], + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 400) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_change_invalid_fedora_group_assignee(self): + """Test the bz endpoint when changing the Fedora assignee while keeping + the EPEL one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "foo", "fedora_assignee": "@invalid"} + expected_result = { + "error": "Invalid or incomplete input submitted", + "error_code": "EINVALIDREQ", + "errors": ["Invalid user or group name as fedora_assignee"], + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 400) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_change_epel_assignee(self): + """Test the bz endpoint when changing the EPEL assignee while keeping + the Fedora one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "", "fedora_assignee": None} + expected_result = { + "epel_assignee": "pingou", + "fedora_assignee": "pingou", + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_change_invalid_epel_assignee(self): + """Test the bz endpoint when changing the EPEL assignee while keeping + the Fedora one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "invalid", "fedora_assignee": None} + expected_result = { + "error": "Invalid or incomplete input submitted", + "error_code": "EINVALIDREQ", + "errors": ["Invalid user or group name as epel_assignee"], + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 400) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_change_invalid_epel_group_assignee(self): + """Test the bz endpoint when changing the EPEL assignee while keeping + the Fedora one. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "@invalid", "fedora_assignee": None} + expected_result = { + "error": "Invalid or incomplete input submitted", + "error_code": "EINVALIDREQ", + "errors": ["Invalid user or group name as epel_assignee"], + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 400) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, expected_result) + + def test_reset_fedora_assignees(self): + """Test the bz endpoint when resetting the Fedora assignee. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"fedora_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "foo"} + ) + repo = pagure.lib.query.get_authorized_project( + self.session, "test3", namespace="somenamespace", + ) + self.assertIsNotNone(repo.bzoverride) + + datainput = {"fedora_assignee": ""} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} + ) + repo = pagure.lib.query.get_authorized_project( + self.session, "test3", namespace="somenamespace", + ) + self.assertIsNotNone(repo.bzoverride) + + def test_reset_epel_assignees(self): + """Test the bz endpoint when resetting the EPEL assignee. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {"epel_assignee": "foo", "fedora_assignee": "pingou"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual(data, datainput) + + datainput = {"fedora_assignee": "pingou"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} + ) + + repo = pagure.lib.query.get_authorized_project( + self.session, "test3", namespace="somenamespace", + ) + self.assertIsNotNone(repo.bzoverride) + + def test_reset_both_assignees(self): + """Test the bz endpoint when resetting the both assignees. + """ + headers = {"Authorization": "token aaabbbcccddd"} + datainput = {} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", + data=datainput, + headers=headers, + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} + ) + + repo = pagure.lib.query.get_authorized_project( + self.session, "test3", namespace="somenamespace", + ) + self.assertIsNone(repo.bzoverride) + + def test_changing_assignees_logged_in_invalid_user(self): + """Test the bz endpoint when changing the both assignees when logged in + the UI but with an user that is not allowed. + """ + user = tests.FakeUser(username="foo") + with tests.user_set(self.app.application, user): + expected_result = { + "epel_assignee": "pingou", + "fedora_assignee": "pingou", + } + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", data=expected_result + ) + self.assertEqual(output.status_code, 401) + + def test_change_epel_assignee_logged_in(self): + """Test the bz endpoint when changing the EPEL assignee when logged in + the UI. + """ + user = tests.FakeUser(username="pingou") + with tests.user_set(self.app.application, user): + # change one assignee + datainput = {"epel_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", data=datainput + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "foo", "fedora_assignee": "pingou"} + ) + + def test_change_fedora_assignee_logged_in(self): + """Test the bz endpoint when changing the Fedora assignee when logged in + the UI. + """ + user = tests.FakeUser(username="pingou") + with tests.user_set(self.app.application, user): + # change one assignee + datainput = {"fedora_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", data=datainput + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "foo"} + ) + + def test_change_both_assignees_logged_in(self): + """Test the bz endpoint when changing the both assignees when logged in + the UI. + """ + user = tests.FakeUser(username="pingou") + with tests.user_set(self.app.application, user): + datainput = {"epel_assignee": "foo", "fedora_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", data=datainput + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "foo", "fedora_assignee": "foo"} + ) + + def test_resetting_assignees_logged_in(self): + """Test the bz endpoint when resetting assignees when logged in the UI. + """ + user = tests.FakeUser(username="pingou") + with tests.user_set(self.app.application, user): + # Changing one of them before the reset + datainput = {"epel_assignee": "foo"} + output = self.app.post( + "/_dg/bzoverrides/somenamespace/test3", data=datainput + ) + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "foo", "fedora_assignee": "pingou"} + ) + + # Resetting + output = self.app.post("/_dg/bzoverrides/somenamespace/test3") + self.assertEqual(output.status_code, 200) + data = json.loads(output.get_data(as_text=True)) + self.assertDictEqual( + data, {"epel_assignee": "pingou", "fedora_assignee": "pingou"} + ) diff --git a/pagure_distgit_tests/test_dist_git_auth.py b/pagure_distgit_tests/test_dist_git_auth.py new file mode 100644 index 0000000..fe326d0 --- /dev/null +++ b/pagure_distgit_tests/test_dist_git_auth.py @@ -0,0 +1,766 @@ +from __future__ import print_function + +from mock import Mock, patch + +import pagure.config +import pagure.exceptions +import pagure.lib.model + +try: + from pagure.lib import _get_project as get_project +except ImportError: + # From pagure 5.2, code has been moved to pagure.lib.query + from pagure.lib.query import _get_project as get_project + +import tests + +import dist_git_auth + + +class DistGitAuthTests(tests.Modeltests): + """ Test DistGitAuth ACLs with Fedora config. """ + + maxDiff = None + + def setUp(self): + """ Set up the environment in which to run the tests. """ + super(DistGitAuthTests, self).setUp() + + pagure.config.config["ACL_DEBUG"] = True + pagure.config.config["EXTERNAL_COMMITTER"] = {"relenggroup": {}} + pagure.config.config.update(self.dga_config) + + self.dga = dist_git_auth.DistGitAuth() + self.dga.info = Mock(side_effect=print) + # We default to saying it's not a forced push + dist_git_auth.is_forced_push = Mock(return_value=False) + + # Create an RCM user/group + rcmuser = pagure.lib.model.User( + user="releng", + fullname="Release Engineering", + token="aaabbbcd", + default_email="rcm@local.local", + ) + self.session.add(rcmuser) + self.session.flush() + rcmgroup = pagure.lib.model.PagureGroup( + group_name="relenggroup", + group_type="user", + display_name="Releng group", + user_id=rcmuser.id, + ) + self.session.add(rcmgroup) + self.session.flush() + rcmuser.group_objs.append(rcmgroup) + self.session.commit() + + def tearDown(self): + """ Tear down the environment in which the tests ran. """ + self.dga = None + super(DistGitAuthTests, self).tearDown() + + def create_namespaced_project( + self, namespace, name, is_fork=False, settings={} + ): + item = pagure.lib.model.Project( + user_id=1, # pingou + name=name, + is_fork=is_fork, + parent_id=3 if is_fork else None, + description="namespaced test project", + hook_token="aaabbbeee", + namespace=namespace, + settings=settings, + ) + item.close_status = [ + "Invalid", + "Insufficient data", + "Fixed", + "Duplicate", + ] + self.session.add(item) + self.session.commit() + return get_project(self.session, name=name, namespace=namespace) + + def expect_info_msg(self, expect_msg): + found = False + for call in self.dga.info.call_args_list: + args = call[0] + msg = args[0] + if msg == expect_msg: + found = True + if not found: + raise AssertionError( + "Info message '%s' expected but not found" % expect_msg + ) + + +class DistGitAuthTestsGeneric(DistGitAuthTests): + dga_config = { + "BLACKLIST_RES": [], + "RCM_BRANCHES": [], + "BYPASS_PR_ONLY_GROUPS": ["relenggroup"], + } + + def test_unused_repotype(self): + self.assertFalse( + self.dga.check_acl( + self.session, + project=None, + username=None, + refname=None, + pull_request=None, + repodir=None, + repotype="tickets", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Repotype tickets not in use") + + def test_branch_deletion(self): + dist_git_auth.is_forced_push.return_value = True + + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username=None, + refname=None, + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto="0000000000000000000000000000000000000000", + is_internal=False, + ) + ) + + self.expect_info_msg("Branch deletion is not allowed") + + def test_forced_push(self): + dist_git_auth.is_forced_push.return_value = True + + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username=None, + refname=None, + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Forced pushes are not allowed") + + def test_internal(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username=None, + refname=None, + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=True, + ) + ) + + self.expect_info_msg("Internal push allowed") + + def test_deploykey(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="deploykey_foobar", + refname=None, + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Deploy keys are disabled") + + def test_invalid_user(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertRaises( + pagure.exceptions.PagureException, + self.dga.check_acl, + self.session, + project=project, + username="nosuchuser", + refname=None, + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + + def test_unprotected_committer(self): + project = self.create_namespaced_project("unprotected", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Committer push") + + def test_unprotected_non_committer(self): + project = self.create_namespaced_project("unprotected", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="foo", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Fall-through deny") + + def test_unprotected_pr_required_pr(self): + project = self.create_namespaced_project("unprotected", "test") + self.dga.global_pr_only = True + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=True, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Committer push") + + def test_unprotected_pr_required_no_pr(self): + project = self.create_namespaced_project("unprotected", "test") + self.dga.global_pr_only = True + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("A pull request is required for this branch") + + def test_unprotected_pr_required_no_pr_cvsadmin(self): + project = self.create_namespaced_project("unprotected", "test") + self.dga.global_pr_only = True + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="releng", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Committer push") + + def test_unprotected_pr_required_requests(self): + project = self.create_namespaced_project("unprotected", "test") + self.dga.global_pr_only = True + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="requests", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Committer push") + + def test_unprotected_pr_required_repo_pr_only_no_pr(self): + settings = {"pull_request_access_only": True} + project = self.create_namespaced_project( + "unprotected", "test", settings=settings + ) + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("A pull request is required for this branch") + + def test_unprotected_pr_required_repo_pr_only(self): + settings = {"pull_request_access_only": True} + project = self.create_namespaced_project( + "unprotected", "test", settings=settings + ) + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request="pull_request", + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Committer push") + + +class DistGitAuthTestsFedora(DistGitAuthTests): + dga_config = { + "PR_ONLY": False, + "ACL_BLOCK_UNSPECIFIED": False, + "BLACKLIST_RES": ["refs/heads/c[0-9]+.*"], + "UNSPECIFIED_BLACKLIST_RES": ["refs/heads/f[0-9]+"], + "RCM_GROUP": "relenggroup", + "RCM_BRANCHES": ["refs/heads/f[0-9]+"], + "ACL_PROTECTED_NAMESPACES": ["rpms", "modules", "container"], + "PDC_URL": "invalid://", + } + + def test_protected_blacklisted_ref(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/c7", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Ref refs/heads/c7 is blocked") + + def test_protected_rcm(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="releng", + refname="refs/heads/f27", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("RCM push") + + @patch("dist_git_auth.requests") + def test_protected_unsupported_branch(self, mock_requests): + res = Mock() + res.ok = True + res.json.return_value = {"results": [{"active": False}]} + mock_requests.get.return_value = res + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/f26", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Branch refs/heads/f26 is unsupported") + + @patch("dist_git_auth.requests") + def test_protected_supported_branch_committer(self, mock_requests): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": [{"active": True}]} + mock_requests.get.return_value = res + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/f27", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Branch refs/heads/f27 is supported") + + @patch("dist_git_auth.requests") + def test_protected_supported_branch_non_committer(self, mock_requests): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": [{"active": True}]} + mock_requests.get.return_value = res + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="foo", + refname="refs/heads/f27", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Branch refs/heads/f27 is supported") + + @patch("dist_git_auth.requests") + def test_protected_unspecified_branch_blacklisted(self, mock_requests): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": []} + mock_requests.get.return_value = res + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/f28", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Unspecified ref refs/heads/f28 is blocked") + + @patch("dist_git_auth.requests") + def test_protected_unspecified_branch_normal_committer( + self, mock_requests + ): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": []} + mock_requests.get.return_value = res + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Unspecified branch push") + + @patch("dist_git_auth.requests") + def test_protected_unspecified_branch_normal_non_committer( + self, mock_requests + ): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": []} + mock_requests.get.return_value = res + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="foo", + refname="refs/heads/mywip", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Unspecified branch push") + + +class DistGitAuthTestsCentOS(DistGitAuthTests): + dga_config = { + "PR_ONLY": False, + "ACL_BLOCK_UNSPECIFIED": True, + "BLACKLIST_RES": ["refs/heads/f[0-9]+.*"], + "RCM_GROUP": "relenggroup", + "RCM_BRANCHES": ["refs/heads/c[0-9]+"], + "SUPPORTED_SIGS": ["sig-core"], + "SIG_PREFIXES": ["refs/heads/c7"], + "ACL_PROTECTED_NAMESPACES": ["rpms"], + } + + def setUp(self): + super(DistGitAuthTestsCentOS, self).setUp() + + # Create an RCM user/group + arrfab = pagure.lib.model.User( + user="arrfab", + fullname="Fabian Arriton", + token="aaabbbcd", + default_email="arrfab@local.local", + ) + self.session.add(arrfab) + self.session.flush() + sigcore = pagure.lib.model.PagureGroup( + group_name="sig-core", + group_type="user", + display_name="Core SIG group", + user_id=arrfab.id, + ) + self.session.add(sigcore) + self.session.flush() + arrfab.group_objs.append(sigcore) + self.session.commit() + + def test_protected_blacklisted_ref(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/f27", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Ref refs/heads/f27 is blocked") + + def test_protected_rcm(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="releng", + refname="refs/heads/c7", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("RCM push") + + def test_protected_sig_sig_member(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="arrfab", + refname="refs/heads/c7-sig-core-test", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("SIG push") + + def test_protected_sig_sig_member_precise(self): + project = self.create_namespaced_project("rpms", "test") + + self.assertTrue( + self.dga.check_acl( + self.session, + project=project, + username="arrfab", + refname="refs/heads/c7-sig-core", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("SIG push") + + @patch("dist_git_auth.requests") + def test_protected_sig_no_sig_member(self, mock_requests): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": []} + mock_requests.get.return_value = res + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="pingou", + refname="refs/heads/c7-sig-core-test", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Access to namespace rpms is restricted") + + @patch("dist_git_auth.requests") + def test_protected_sig_sig_member_no_sig_branch(self, mock_requests): + project = self.create_namespaced_project("rpms", "test") + res = Mock() + res.ok = True + res.json.return_value = {"results": []} + mock_requests.get.return_value = res + + self.assertFalse( + self.dga.check_acl( + self.session, + project=project, + username="arrfab", + refname="refs/heads/c7", + pull_request=None, + repodir=None, + repotype="main", + revfrom=None, + revto=None, + is_internal=False, + ) + ) + + self.expect_info_msg("Access to namespace rpms is restricted") diff --git a/pagure_distgit_tests/test_style.py b/pagure_distgit_tests/test_style.py new file mode 100644 index 0000000..c2f4b9a --- /dev/null +++ b/pagure_distgit_tests/test_style.py @@ -0,0 +1,109 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" + (c) 2019 - Copyright Red Hat Inc + + Authors: + Pierre-Yves Chibon + +Tests for flake8 and black compliance of the code + +""" + +from __future__ import unicode_literals, absolute_import + +import os +import subprocess +import sys +import unittest + +import six + +REPO_PATH = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "pagure_distgit") +) +TESTS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__))) + + +class TestStyle(unittest.TestCase): + """This test class contains tests pertaining to code style.""" + + def test_code_with_flake8(self): + """Enforce PEP-8 compliance on the codebase. + + This test runs flake8 on the code, and will fail if it returns a + non-zero exit code. + """ + # We ignore E712, which disallows non-identity comparisons with True and False + # We ignore W503, which disallows line break before binary operator + flake8_command = [ + sys.executable, + "-m", + "flake8", + "--ignore=E712,W503,E203", + REPO_PATH, + ] + + # check if we have an old flake8 or not + import flake8 + + flake8_v = flake8.__version__.split(".") + for idx, val in enumerate(flake8_v): + try: + val = int(val) + except ValueError: + pass + flake8_v[idx] = val + old_flake = tuple(flake8_v) < (3, 0) + + if old_flake: + raise unittest.SkipTest("Flake8 version too old to be useful") + + proc = subprocess.Popen( + flake8_command, stdout=subprocess.PIPE, cwd=REPO_PATH + ) + print(proc.communicate()) + + self.assertEqual(proc.returncode, 0) + + @unittest.skipIf( + not (six.PY3 and sys.version_info.minor >= 6), + "Black is only available in python 3.6+", + ) + def test_code_with_black(self): + """Enforce black compliance on the codebase. + + This test runs black on the code, and will fail if it returns a + non-zero exit code. + """ + black_command = [ + sys.executable, + "-m", + "black", + "-l", + "79", + "--check", + "--exclude", + '"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|' + '_build|buck-out|build|dist)/"', # noqa + REPO_PATH, + TESTS_PATH, + ] + proc = subprocess.Popen( + black_command, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=REPO_PATH, + ) + stdout, stderr = proc.communicate() + print("stdout: ") + print(stdout.decode("utf-8")) + print("stderr: ") + print(stderr.decode("utf-8")) + + self.assertEqual(proc.returncode, 0) + + +if __name__ == "__main__": + unittest.main(verbosity=2) diff --git a/pagure_distgit_tests/tests_style.py b/pagure_distgit_tests/tests_style.py deleted file mode 100644 index c2f4b9a..0000000 --- a/pagure_distgit_tests/tests_style.py +++ /dev/null @@ -1,109 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - (c) 2019 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - -Tests for flake8 and black compliance of the code - -""" - -from __future__ import unicode_literals, absolute_import - -import os -import subprocess -import sys -import unittest - -import six - -REPO_PATH = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..", "pagure_distgit") -) -TESTS_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__))) - - -class TestStyle(unittest.TestCase): - """This test class contains tests pertaining to code style.""" - - def test_code_with_flake8(self): - """Enforce PEP-8 compliance on the codebase. - - This test runs flake8 on the code, and will fail if it returns a - non-zero exit code. - """ - # We ignore E712, which disallows non-identity comparisons with True and False - # We ignore W503, which disallows line break before binary operator - flake8_command = [ - sys.executable, - "-m", - "flake8", - "--ignore=E712,W503,E203", - REPO_PATH, - ] - - # check if we have an old flake8 or not - import flake8 - - flake8_v = flake8.__version__.split(".") - for idx, val in enumerate(flake8_v): - try: - val = int(val) - except ValueError: - pass - flake8_v[idx] = val - old_flake = tuple(flake8_v) < (3, 0) - - if old_flake: - raise unittest.SkipTest("Flake8 version too old to be useful") - - proc = subprocess.Popen( - flake8_command, stdout=subprocess.PIPE, cwd=REPO_PATH - ) - print(proc.communicate()) - - self.assertEqual(proc.returncode, 0) - - @unittest.skipIf( - not (six.PY3 and sys.version_info.minor >= 6), - "Black is only available in python 3.6+", - ) - def test_code_with_black(self): - """Enforce black compliance on the codebase. - - This test runs black on the code, and will fail if it returns a - non-zero exit code. - """ - black_command = [ - sys.executable, - "-m", - "black", - "-l", - "79", - "--check", - "--exclude", - '"/(\.eggs|\.git|\.hg|\.mypy_cache|\.nox|\.tox|\.venv|' - '_build|buck-out|build|dist)/"', # noqa - REPO_PATH, - TESTS_PATH, - ] - proc = subprocess.Popen( - black_command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - cwd=REPO_PATH, - ) - stdout, stderr = proc.communicate() - print("stdout: ") - print(stdout.decode("utf-8")) - print("stderr: ") - print(stderr.decode("utf-8")) - - self.assertEqual(proc.returncode, 0) - - -if __name__ == "__main__": - unittest.main(verbosity=2) diff --git a/requirements-testing.txt b/requirements-testing.txt index 80bae14..865880d 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -1,4 +1,4 @@ flake8 flake8-import-order -nose pdc_client +pytest