From e74c632765d2d6525b047cbcdc89ea4fa62765ef Mon Sep 17 00:00:00 2001 From: Josef Skladanka Date: May 06 2014 07:47:56 +0000 Subject: Bodhi Comment directive This patch adds the AutoQA-like bodhi comment capability to Taskbot reporting. --- diff --git a/libtaskotron/config_defaults.py b/libtaskotron/config_defaults.py index d00066c..60ae648 100644 --- a/libtaskotron/config_defaults.py +++ b/libtaskotron/config_defaults.py @@ -42,6 +42,7 @@ class Config(object): 'http://resultsdb.qa.fedoraproject.org/resultsdb/api/v1.0/' self.bodhi_email_failed_span = 4320 + self.bodhi_posting_comments_span = 4320 self.tmpdir = '/var/tmp/taskotron' self.logdir = '/var/log/taskotron' diff --git a/libtaskotron/directives/bodhi_comment_directive.py b/libtaskotron/directives/bodhi_comment_directive.py new file mode 100644 index 0000000..b2741ff --- /dev/null +++ b/libtaskotron/directives/bodhi_comment_directive.py @@ -0,0 +1,440 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# Authors: +# Tim Flink +# Josef Skladanka + +directive_class = 'BodhiCommentDirective' + + +from libtaskotron.directives import BaseDirective + +from libtaskotron.logger import log +from libtaskotron import check +from libtaskotron import config +from libtaskotron import bodhi_utils + +from libtaskotron.exceptions import TaskotronDirectiveError, TaskotronValueError +from libtaskotron.rpm_utils import basearch + +import datetime +import re + +RE_COMMENT = re.compile(r'Taskotron: (?P\w+) test (?P\w+)'\ + r' on (?P\w+)\. Result log:[\t \n]+'\ + r'(?P[/:\w]+).*') + + +# symbols used for consistant declaration of state +PASS = 'state_PASS'# <%s>' % __name__ +FAIL = 'state_FAIL'# <%s>'% __name__ +INCOMPLETE = 'state_INCOMPLETE'# <%s>' % __name__ +UNKNOWN = 'state_UNKNOWN'# <%s>' % __name__ + +class BodhiUpdateState: + def __init__(self, name, kojitag='updates'): + self.test_states = { 'depcheck_32':'NORUN', + 'depcheck_64':'NORUN', + 'upgradepath':'NORUN' + } + + # since upgradepath is only run on updates-pending, + # set it to passed if we're working on updates-testing + if kojitag == 'updates-testing': + self.test_states['upgradepath'] = 'PASSED' + + self.result_history = [] + self.update_name = name + self.test_change = False + + def num_results(self): + ''' Retrieve the number of results currently held ''' + return len(self.result_history) + + def add_result(self, new_result): + ''' Update the current state with a new result. + + Args: + new_result: a dictionary containing the following keys: + time (datetime object), testname, result, arch + ''' + + self.result_history.append(new_result) + + # copy the current test state so that we can compare later + old_state = self.test_states.copy() + + # for depcheck, we only care about i386 and x86_64 arches + # anything else will be ignored + if new_result['testname'] == 'depcheck': + arch = basearch(new_result['arch']) + + if arch == 'i386': + self.test_states['depcheck_32'] = new_result['result'] + + elif arch == 'x86_64': + self.test_states['depcheck_64'] = new_result['result'] + + else: + log.info("Arch %s is not valid for depcheck. Ignoring." \ + % new_result['arch']) + + # don't care about the arch for upgrade path. It should be noarch + # but the results aren't changed if it isn't + elif new_result['testname'] == 'upgradepath': + self.test_states['upgradepath'] = new_result['result'] + + # detect if any tests changed from the last result + if old_state != self.test_states: + self.test_change = True + else: + self.test_change = False + + def get_state(self): + ''' Determine state based on currently known results ''' + + # print out the current state for debugging purposes + state_str = "Current State of update %s :" % self.update_name + state_str += '\n %s' % self.test_states + for result in self.result_history: + state_str += '\n %s' % result + + log.info(state_str) + + test_values = set(self.test_states.values()) + if 'NORUN' in test_values: + return INCOMPLETE + if 'FAILED' in test_values: + return FAIL + if 'PASSED' in test_values and len(test_values) == 1: + return PASS + + return UNKNOWN + + def did_test_change(self): + ''' Determine whether the state of any tests changed with the addition + of the last result''' + return self.test_change + + +def _is_comment_email_needed(update_name, parsed_comments, new_result, + kojitag): + ''' + Determines whether or not to send an email with the comment posted to bodhi. + Uses previous comments on update in order to determine current state + + Args: + update_name -- name of the update to be tested + parsed_comments -- already existing AutoQA comments for the update + new_result -- the new result to be posted + kojitag -- the koji tag being tested (affects the expected tests) + ''' + + Config = config.get_config() + + log.info("checking email needed for %s (%s)", update_name, kojitag) + # compute current state + update_state = BodhiUpdateState(update_name, kojitag=kojitag) + for result in parsed_comments: + update_state.add_result(result) + + current_state = update_state.get_state() + update_state.add_result(new_result) + new_state = update_state.get_state() + + log.info('update state : %s to %s', current_state, new_state) + + # by default, we want to send email on state change, so start from there + send_email = True + + if new_state is PASS and current_state is not FAIL: + send_email = False + + return send_email + +def _parse_result_from_comment(comment): + ''' + Parses timestamp and results from bodhi comment + + Args: + comment -- the 'comment' part of bodhi update object + ''' + + comment_time = datetime.datetime.strptime(comment['timestamp'], '%Y-%m-%d %H:%M:%S') + + # comment form to match: + # 'Taskotron: %s test %s on %s. Result log: %s' + # note that it isn't looking for the autoqa user right now, that needs to + # be done in any code that calls this + comment_match = RE_COMMENT.match(comment['text']) + + test_name = '' + result = '' + arch = '' + + if comment_match: + test_name = comment_match.group('test_name') + result = comment_match.group('result') + arch = comment_match.group('arch') + else: + log.warning('Failed to parse bodhi comment: %r', comment['text']) + + return {'time':comment_time, 'testname':test_name, + 'result':result, 'arch':arch} + + +def _already_commented(bodhi_api, update, testname, arch): + '''Check if Taskotron comment is already posted. + + Args: + update -- Bodhi update object --or-- update title --or-- update ID --or-- package NVR + testname -- the name of the test + arch -- tested architecture + + Note: Only NVR allowed, not ENVR. See https://fedorahosted.org/bodhi/ticket/592. + + Returns: + Tuple containing old result and time when the last comment was posted. + If no comment is posted yet, or it is, but the update + has been modified since, tuple will contain two empty strings. + + Throws: + TaskotronValueError -- if no such update can be found + ''' + + Config = config.get_config() + + # if we received update title or ID, let's convert it to update object first + if isinstance(update, unicode) or isinstance(update, str): + u = bodhi_api.query_update(update) + if u: + update = u + else: + raise TaskotronValueError("No such update: %s" % update) + + comment_re = r'Taskotron:[\s]+%s[\s]+test[\s]+(\w+)[\s]+on[\s]+%s' % (testname, arch) + old_result = '' + comment_time = '' + + taskotron_comments = [comment for comment in update['comments'] + if comment['author'] == Config.fas_username] + for comment in taskotron_comments: + m = re.match(comment_re, comment['text']) + if m is None: + continue + old_result = m.group(1) + comment_time = comment['timestamp'] + # check whether update was modified after the last posted comment + if update['date_modified'] > comment_time: + return ('','') + return (old_result, comment_time) + +def _is_comment_needed(old_result, comment_time, result, time_span = None): + '''Check if the comment is meant to be posted. + + Args: + old_result -- the result of the last test + comment_time -- the comment time of the last test + result -- the result of the test + time_span -- waiting period (in minutes) before posting the same comment + + Returns: + True if the comment will be posted, False otherwise. + ''' + # the first comment or a comment with different result, post it + if not old_result or old_result != result: + return True + + # If we got here, it means that the comment with the same result has been + # already posted, we now need to determine whether we can post the + # comment again or not. + # If the previous result is *not* 'FAILED', we won't post it in order not to + # spam developers. + # If the previous result *is* 'FAILED', we will need to check whether given + # time span expired, if so, we will post the same comment again to remind + # a developer about the issue. + + if result != 'FAILED': + return False + + if time_span is None: + Config = config.get_config() + time_span = Config.bodhi_posting_comments_span + + + posted_datetime = datetime.datetime.strptime(comment_time, '%Y-%m-%d %H:%M:%S') + + delta = (datetime.datetime.utcnow() - posted_datetime) + # total_seconds() is introduced in python 2.7, until 2.7 is everywhere... + delta_minutes = (delta.seconds + delta.days * 24 * 3600) / 60 + + if delta_minutes < time_span: + return False + + return True + + +def _post_testresult(bodhi_api, update, testname, result, url, + arch = 'noarch', karma = 0, doreport='onchange'): + '''Post comment and karma to bodhi + + Args: + update -- the *title* of the update comment on + testname -- the name of the test + result -- the result of the test + url -- url of the result of the test + arch -- tested architecture (default 'noarch') + karma -- karma points (default 0) + doreport -- set to 'all' to force posting bodhi comment + + Returns: + True if comment was posted successfully or comment wasn't meant to be + posted (either posting is turned off or comment was already posted), + False otherwise. + ''' + + # TODO when new bodhi releases, add update identification by UPDATEID support + + Config = config.get_config() + + if not update or not testname or not result or url == None: + log.error("bodhi_post_testresults() requires non-empty update, "\ + "testname, result and url arguments.") + return False + + if not Config.fas_username or not Config.fas_password: + log.error('Conf file containing FAS credentials is incomplete!') + return False + + comment = 'Taskotron: %s test %s on %s. Result log: %s ' \ + '(results are informative only)' % (testname, result, arch, url) + try: + (old_result, comment_time) = _already_commented(bodhi_api, update, testname, arch) + + comment_needed = _is_comment_needed(old_result, comment_time, result) + + if not comment_needed and doreport != 'all': + log.info('Current test result is already present in bodhi.') + return True + + bodhi_update = bodhi_api.query_update(update) + parsed_results = [] + for found_comment in bodhi_update['comments']: + if found_comment['author'] == Config.fas_username: + parsed_results.append(_parse_result_from_comment(found_comment)) + + kojitag = 'updates-testing' + if bodhi_update['request'] == 'stable' or bodhi_update['status'] == 'stable': + kojitag = 'updates' + + new_result = {'time':datetime.datetime.utcnow(), 'testname':testname, + 'result':result, 'arch':arch} + + send_email = _is_comment_email_needed(update, parsed_results, + new_result, kojitag) + + + if not bodhi_api.client.comment(update, comment, karma, send_email): + log.error('Could not post a comment to Bodhi. %r, %r, %r, %r', + update, comment, karma, send_email) + return False + + log.info('The test result was sent to bodhi successfully.') + + except Exception as e: + log.exception(e) + return False + + return True + + + +class BodhiCommentDirective(BaseDirective): + """The bodhi_comment directive interfaces with Bodhi to create + a comment in Bodhi + + format: "bodhi_comment: + results: + doreport: [all, onchange]" + Also note, that 'checkname' needs to be present in env_data. + """ + def __init__(self, bodhi_api=None): + + if bodhi_api: + self.bodhi_api = bodhi_api + else: + self.bodhi_api = bodhi_utils.BodhiUtils() + + def process(self, input_data, env_data): + output_data = [] + + + Config = config.get_config() + if not (Config.reporting_enabled and Config.report_to_bodhi): + log.info("Reporting to Bodhi is disabled.") + return + + + if not ('doreport' in input_data and 'results' in input_data): + detected_args = ', '.join(input_data.keys()) + raise TaskotronDirectiveError("The bodhi_comment directive "\ + "requires 'doreport' and 'results' arguments. "\ + "Detected arguments: %s" % detected_args) + + if input_data['doreport'] not in ['all', 'onchange']: + raise TaskotronDirectiveError("The argument 'doreport' is"\ + "set to invalid value '%s'. Valid values are: "\ + "'all', 'onchange'.") + + if 'checkname' not in env_data: + raise TaskotronDirectiveError("The bodhi_comment directive "\ + "requires checkname.") + + try: + check_details = check.import_TAP(input_data['results']) + except TaskotronValueError as e: + log.exception(e) + raise TaskotronDirectiveError("Failed to load the 'results':"\ + "%s" % e) + + # Filter the results of "BODHI_UPDATE" type + check_details = [r for r in check_details + if r.report_type == check.ReportType.BODHI_UPDATE] + + # ? Log when no results of the type are found ? + + for detail in check_details: + #TODO: replace url with proper URL + outcome = _post_testresult(self.bodhi_api, detail.item, + env_data['checkname'], + detail.outcome, + url = "http://example.com", + doreport = input_data['doreport'], + arch = detail.keyvals.get('arch', 'noarch'), + ) + + if not outcome: + log.warning("Failed to post Bodhi Comment: `%s` `%s` `%s`", + detail.item, + env_data['checkname'], + detail.outcome) + + output_data.append(outcome) + + return output_data + diff --git a/testing/test_bodhi_comment_directive.py b/testing/test_bodhi_comment_directive.py new file mode 100644 index 0000000..a7e719a --- /dev/null +++ b/testing/test_bodhi_comment_directive.py @@ -0,0 +1,325 @@ +"""Unit tests for libtaskotron/directives/resultsdb_directive.py""" + +import pytest +import datetime +import itertools +from dingus import Dingus +from copy import deepcopy + +from libtaskotron.directives import bodhi_comment_directive +from libtaskotron.exceptions import TaskotronDirectiveError, TaskotronValueError + +from libtaskotron import check +from libtaskotron import config + + +class TestBodhiCommentReport(): + + def setup_method(self, method): + '''Run this before every test invocation''' + config._config = None + Config = config.get_config() + Config.reporting_enabled = True + Config.report_to_bodhi = True + + + self.comment = { + "author": Config.fas_username, + "timestamp": "2014-01-31 10:11:12", + "text": "Taskotron: depcheck test PASSED on x86_64. Result log: http://example.com", + } + + self.update = { + 'request': 'stable', + 'comments': [self.comment], + 'date_modified': "2014-01-31 10:00:00" + } + + self.stub_bodhi = Dingus(query_update__returns = self.update) + self.directive = bodhi_comment_directive.BodhiCommentDirective(self.stub_bodhi) + + self.cd = check.CheckDetail( + item = 'update_title', + report_type = check.ReportType.BODHI_UPDATE, + outcome = 'PASSED', + ) + + + #TODO: remove this ugly hack - it is here because of the bug in bayeux + tap = "TAP version 13\n1..1\n%s" % check.export_TAP(self.cd) + + + self.ref_input = { + 'results': tap, + 'doreport': 'onchange', + } + self.ref_envdata = { + 'checkname': 'depcheck', + } + + def test_config_reporting_disabled(self): + """Checks config options can disable reporting.""" + Config = config.get_config() + + for r_e, r_t_b in ((False, False), (False, True), (True, False)): + Config.reporting_enabled = r_e + Config.report_to_bodhi = r_t_b + + assert self.directive.process(self.ref_input, self.ref_envdata) is None + + def test_missing_arguments_in_inputdata(self): + """Check if arguments raise exception""" + with pytest.raises(TaskotronDirectiveError): + input_data = deepcopy(self.ref_input) + del(input_data['results']) + self.directive.process(input_data, self.ref_envdata) + + with pytest.raises(TaskotronDirectiveError): + input_data = deepcopy(self.ref_input) + del(input_data['doreport']) + self.directive.process(input_data, self.ref_envdata) + + def test_wrong_doreport_value(self): + """Checks whether unsupported doreport value""" + with pytest.raises(TaskotronDirectiveError): + input_data = deepcopy(self.ref_input) + input_data['doreport'] = 'FooBar' + self.directive.process(input_data, self.ref_envdata) + + def test_missing_checkname(self, monkeypatch): + """Checks if missing checkname raises exception""" + with pytest.raises(TaskotronDirectiveError): + envdata = deepcopy(self.ref_envdata) + del(envdata['checkname']) + + self.directive.process(self.ref_input, envdata) + + def test_failed_tap_import(self, monkeypatch): + """Checks if failed TAP import raises exception""" + def mock_raise(self, *args, **kwargs): + raise TaskotronValueError("Testing Error") + + monkeypatch.setattr(check, 'import_TAP', mock_raise) + + with pytest.raises(TaskotronDirectiveError): + self.directive.process(self.ref_input, self.ref_envdata) + + + def test_parse_result_from_comment(self): + """Checks parsing of the bodhi comments""" + output = bodhi_comment_directive._parse_result_from_comment(self.comment) + + assert output['time'] == datetime.datetime(2014, 1, 31, 10, 11, 12) + assert output['testname'] == 'depcheck' + assert output['result'] == 'PASSED' + assert output['arch'] == 'x86_64' + + def test_already_commented_nonexistent_update(self): + """Checks whether non-existing updates throws + an error in _already_commented.""" + stub_bodhi = Dingus(query_update__returns = None) + + with pytest.raises(TaskotronValueError): + bodhi_comment_directive._already_commented(stub_bodhi, '', '', '') + + + def test_already_commented(self): + """Tests general _already_commented behaviour.""" + + # Test comment found + outcome = bodhi_comment_directive._already_commented(None, self.update, 'depcheck', 'x86_64') + assert outcome == ('PASSED', self.comment['timestamp']) + + # Test no comment + update = deepcopy(self.update) + update['comments'] = [] + outcome = bodhi_comment_directive._already_commented(None, update, 'depcheck', 'x86_64') + assert outcome == ('', '') + + # Test modified after commented + update = deepcopy(self.update) + update['date_modified'] = "2014-01-31 15:00:00" + outcome = bodhi_comment_directive._already_commented(None, update, 'depcheck', 'x86_64') + assert outcome == ('', '') + + def test_is_comment_needed(self): + """Checks whether _is_comment_needed returns proper results in + various situations.""" + # Test first comment + outcome = bodhi_comment_directive._is_comment_needed(None, None, None, None) + assert outcome == True + + # Test different result + outcome = bodhi_comment_directive._is_comment_needed('PASSED', None, 'FAILED', None) + assert outcome == True + + # Test same result, but not FAILED + outcome = bodhi_comment_directive._is_comment_needed('PASSED', None, 'PASSED', None) + assert outcome == False + + def test_is_comment_needed_decide_repost(self, monkeypatch): + """Checks whether time_span has an impact on comment posting.""" + date_utcnow = datetime.datetime(2014, 01, 10, 12, 05) + date_posted = datetime.datetime(2014, 01, 10, 12, 00) + + class FakeDatetime(datetime.datetime): + @classmethod + def utcnow(cls): + return date_utcnow + + + monkeypatch.setattr(datetime, 'datetime', FakeDatetime) + + # Test same result, FAILED, delta < time_span + time_span = 1000 + comment_time = date_posted.strftime('%Y-%m-%d %H:%M:%S') + outcome = bodhi_comment_directive._is_comment_needed('FAILED', comment_time, 'FAILED', time_span) + assert outcome == False + + # Test same result, FAILED, delta > time_span + time_span = 0 + comment_time = date_posted.strftime('%Y-%m-%d %H:%M:%S') + outcome = bodhi_comment_directive._is_comment_needed('FAILED', comment_time, 'FAILED', time_span) + assert outcome == True + + def test_post_testresult_params_and_config(self): + """Checks whether missing parameters/config values terminate + _post_testresults.""" + # Test empty parameters + outcome = bodhi_comment_directive._post_testresult(Dingus(), + update = None, + testname = None, + result = None, + url = None) + assert outcome == False + + # Test configuration + Config = config.get_config() + Config.fas_username = None + Config.fas_password = None + outcome = bodhi_comment_directive._post_testresult(Dingus(), + update = True, + testname = True, + result = True, + url = True) + config._config = None + assert outcome == False + + @pytest.fixture + def prepare_for_bodhi_comment(self, monkeypatch): + """Sets up environment for fake bodhi comment calls""" + Config = config.get_config() + Config.fas_password = "secret!" + + stub_already_commented = lambda *args, **kwargs: ('', '') + stub_return_true = lambda *args, **kwargs: True + stub_return_false = lambda *args, **kwargs: False + + monkeypatch.setattr(bodhi_comment_directive, '_already_commented', stub_already_commented) + monkeypatch.setattr(bodhi_comment_directive, '_is_comment_needed', stub_return_true) + monkeypatch.setattr(bodhi_comment_directive, '_is_comment_email_needed', stub_return_false) + + + def test_post_testresult_comment_not_needed(self, monkeypatch, prepare_for_bodhi_comment): + """Checks the behaviour when comment is not needed.""" + self.stub_bodhi.client = Dingus('bodhi_api.bodhi') + + monkeypatch.setattr(bodhi_comment_directive, '_is_comment_needed', lambda *a, **k: False) + + outcome = bodhi_comment_directive._post_testresult(self.stub_bodhi, + update = True, + testname = True, + result = True, + url = True, + doreport = 'onchange') + assert outcome == True + assert len(self.stub_bodhi.client.calls()) == 0 + + def test_post_testresult(self, prepare_for_bodhi_comment): + """Checks whether _post_testresult maps arguments correctly to + the bodhi comment call.""" + self.stub_bodhi.client = Dingus('bodhi_api.bodhi') + + outcome = bodhi_comment_directive._post_testresult(self.stub_bodhi, + update = "update_title", + testname = "depcheck", + result = "PASSED", + url = "http://example.com") + + # Select the first call of "comment" method. + call = [call for call in self.stub_bodhi.client.calls() if call[0] == 'comment'][0] + # Select the positional arguments of that call + call_data = call[1] + + assert call_data[0] == "update_title" + assert call_data[1] == "Taskotron: depcheck test PASSED on noarch. Result log: http://example.com (results are informative only)" + assert call_data[2] == 0 + assert call_data[3] == False + + def test_tap_mapping(self, prepare_for_bodhi_comment): + """Checks whether TAP output is mapped correctly to + the bodhi comment call.""" + self.stub_bodhi.client = Dingus('bodhi_api.bodhi') + + self.directive.process(self.ref_input, self.ref_envdata) + + # Select the first call of "comment" method. + call = [call for call in self.stub_bodhi.client.calls() if call[0] == 'comment'][0] + # Select the positional arguments of that call + call_data = call[1] + + assert call_data[0] == "update_title" + assert call_data[1] == "Taskotron: depcheck test PASSED on noarch. Result log: http://example.com (results are informative only)" + assert call_data[2] == 0 + assert call_data[3] == False + + def test_is_comment_email_needed(self, monkeypatch): + def mock_BodhiUpdateState_factory(current_state, new_state): + + class MockBodhiUpdateState(Dingus): + def __init__(self, *args, **kwargs): + Dingus.__init__(self, *args, **kwargs) + self.state = current_state + + def add_result(self, *args, **kwargs): + self.state = new_state + + def get_state(self): + return self.state + + return MockBodhiUpdateState + + PASS = bodhi_comment_directive.PASS + FAIL = bodhi_comment_directive.FAIL + INCOMPLETE = bodhi_comment_directive.INCOMPLETE + UNKNOWN = bodhi_comment_directive.UNKNOWN + + + # if new_state is PASS and the current_state was not 'FAIL', do not send email + for current_state in (PASS, INCOMPLETE, UNKNOWN): + mock_bus = mock_BodhiUpdateState_factory(current_state, PASS) + monkeypatch.setattr(bodhi_comment_directive, 'BodhiUpdateState', mock_bus) + + outcome = bodhi_comment_directive._is_comment_email_needed(None, [], None, None) + assert outcome == False + + # If current_state is FAIL, send email + for new_state in (PASS, FAIL, INCOMPLETE, UNKNOWN): + mock_bus = mock_BodhiUpdateState_factory(FAIL, new_state) + monkeypatch.setattr(bodhi_comment_directive, 'BodhiUpdateState', mock_bus) + + outcome = bodhi_comment_directive._is_comment_email_needed(None, [], None, None) + assert outcome == True + + # If new_state is not PASS and current_state is not FAIL, send email + new_states = (FAIL, INCOMPLETE, UNKNOWN) + current_states = (PASS, INCOMPLETE, UNKNOWN) + + for current_state, new_state in itertools.product(current_states, new_states): + mock_bus = mock_BodhiUpdateState_factory(current_state, new_state) + monkeypatch.setattr(bodhi_comment_directive, 'BodhiUpdateState', mock_bus) + + outcome = bodhi_comment_directive._is_comment_email_needed(None, [], None, None) + assert outcome == True + +