From 2ea584ca915d0690ebc19de1a7ce28393a24f32d Mon Sep 17 00:00:00 2001 From: Michael Simacek Date: Dec 09 2014 10:17:19 +0000 Subject: Watchdog for watcher --- diff --git a/config.cfg.template b/config.cfg.template index 30bda7e..a5aa005 100644 --- a/config.cfg.template +++ b/config.cfg.template @@ -1,4 +1,5 @@ -# vim: set filetype=python -*- python -*- +# -*- python -*- +# vim: ft=python config = { "database_config": { "drivername": "postgres", @@ -40,6 +41,9 @@ config = { "t1": 7 * 24, }, "services": { + "watcher": { + "watchdog_interval": 20, # in seconds + }, "polling": { "interval": 20 * 60, }, diff --git a/koschei/watcher.py b/koschei/watcher.py index 754c704..e9f89cf 100644 --- a/koschei/watcher.py +++ b/koschei/watcher.py @@ -17,21 +17,30 @@ # Author: Michael Simacek # Author: Mikolaj Izdebski -import fedmsg import koji +from signal import signal, alarm, SIGALRM + from . import util from .backend import Backend -from .service import KojiService +from .service import KojiService, FedmsgService, Service from .models import Build, Package, RepoGenerationRequest +class WatchdogInterrupt(Exception): + pass + +class WatchdogService(Service): + def get_handled_exceptions(self): + return (list([WatchdogInterrupt]) + + super(WatchdogService, self).get_handled_exceptions()) -class Watcher(KojiService): +class Watcher(KojiService, FedmsgService, WatchdogService): topic_name = util.config['fedmsg']['topic'] tag = util.koji_config['build_tag'] instance = util.config['fedmsg']['instance'] build_tag = util.koji_config['target_tag'] + watchdog_interval = util.config['services']['watcher']['watchdog_interval'] def __init__(self, backend=None, *args, **kwargs): super(Watcher, self).__init__(*args, **kwargs) @@ -81,6 +90,11 @@ class Watcher(KojiService): self.db.commit() def main(self): - for _, _, topic, msg in fedmsg.tail_messages(): + def handler(n, s): + raise WatchdogInterrupt() + signal(SIGALRM, handler) + alarm(self.watchdog_interval) + for _, _, topic, msg in self.fedmsg.tail_messages(): if topic.startswith(self.topic_name + '.'): self.consume(topic, msg['msg']) + alarm(self.watchdog_interval) diff --git a/test/common.py b/test/common.py index a8f8037..ab1396a 100644 --- a/test/common.py +++ b/test/common.py @@ -1,5 +1,4 @@ import os -import sys import unittest import sqlalchemy import logging @@ -11,11 +10,6 @@ from datetime import datetime testdir = os.path.dirname(os.path.realpath(__file__)) datadir = os.path.join(testdir, 'data') os.chdir(testdir) -sys.path[:0] = [os.path.join(testdir, '..'), - os.path.join(testdir, 'mocks')] - -# our mock -import fedmsg use_postgres = os.environ.get('TEST_WITH_POSTGRES') @@ -54,7 +48,6 @@ class AbstractTest(unittest.TestCase): def __init__(self, *args, **kwargs): super(AbstractTest, self).__init__(*args, **kwargs) - self.fedmsg = fedmsg def _rm_workdir(self): try: @@ -66,10 +59,8 @@ class AbstractTest(unittest.TestCase): self._rm_workdir() os.mkdir(workdir) os.chdir(workdir) - self.fedmsg.mock_init() def tearDown(self): - self.fedmsg.mock_verify_empty() self._rm_workdir() @staticmethod diff --git a/test/mocks/fedmsg.py b/test/mocks/fedmsg.py deleted file mode 100644 index dbf76e2..0000000 --- a/test/mocks/fedmsg.py +++ /dev/null @@ -1,24 +0,0 @@ -_message_queue = None -mock_published = None - -def mock_init(): - global _message_queue - global mock_published - _message_queue = [] - mock_published = [] - -def mock_add_message(name='', topic='', endpoint='', msg=''): - _message_queue.append((name, endpoint, topic, msg)) - -def mock_verify_empty(): - if _message_queue: - raise AssertionError("Messages remaining in fedmsg queue") - -def tail_messages(): - global _message_queue - for item in _message_queue: - yield item - _message_queue = [] - -def publish(topic, modname, msg): - mock_published.append((topic, modname, msg)) diff --git a/test/watcher_test.py b/test/watcher_test.py index d9cb63e..b067baf 100644 --- a/test/watcher_test.py +++ b/test/watcher_test.py @@ -1,10 +1,6 @@ -import koji - -from datetime import datetime -from mock import Mock, patch +from mock import Mock from common import DBTest -from koschei import models as m from koschei.watcher import Watcher test_topic = 'org.fedoraproject.test.buildsys' @@ -20,20 +16,27 @@ def generate_state_change(instance='primary', task_id=666, old='OPEN', new='CLOS class WatcherTest(DBTest): def test_ignored_topic(self): - self.fedmsg.mock_add_message(topic='org.fedoraproject.prod.buildsys.task.state.change', - msg=generate_state_change()) - Watcher(db=Mock(), koji_session=Mock()).main() + class FedmsgMock(object): + def tail_messages(self): + yield ('', '', 'org.fedoraproject.prod.buildsys.task.state.change', + generate_state_change()) + Watcher(db=Mock(), koji_session=Mock(), fedmsg_context=FedmsgMock()).main() def test_ignored_instance(self): - self.fedmsg.mock_add_message(topic=test_topic, - msg=generate_state_change(instance='ppc')) - Watcher(db=Mock(), koji_session=Mock()).main() + class FedmsgMock(object): + def tail_messages(self): + yield ('', '', test_topic, + generate_state_change(instance='ppc')) + Watcher(db=Mock(), koji_session=Mock(), fedmsg_context=FedmsgMock()).main() def test_task_completed(self): + class FedmsgMock(object): + def tail_messages(self): + yield ('', '', test_topic + '.task.state.change', + generate_state_change()) _, build = self.prepare_basic_data() - self.fedmsg.mock_add_message(topic=test_topic + '.task.state.change', - msg=generate_state_change()) backend_mock = Mock() - watcher = Watcher(db=self.s, koji_session=Mock(), backend=backend_mock) + watcher = Watcher(db=self.s, koji_session=Mock(), backend=backend_mock, + fedmsg_context=FedmsgMock()) watcher.main() backend_mock.update_build_state.assert_called_once_with(build, 'CLOSED')