#68 Add support for sending messages
Merged 9 years ago by ausil. Opened 9 years ago by lsedlar.
lsedlar/pungi fedmsg  into  master

@@ -0,0 +1,20 @@ 

+ #!/usr/bin/env python

+ # -*- coding: utf-8 -*-

+ 

+ import argparse

+ import fedmsg

+ import json

+ import sys

+ 

+ 

+ def send(cmd, data):

+     topic = 'compose.%s' % cmd.replace('-', '.').lower()

+     fedmsg.publish(topic=topic, modname='pungi', msg=data)

+ 

+ if __name__ == '__main__':

+     parser = argparse.ArgumentParser()

+     parser.add_argument('cmd')

+ 

+     opts = parser.parse_args()

+     data = json.load(sys.stdin)

+     send(opts.cmd, data)

file modified
+16 -1
@@ -26,6 +26,7 @@ 

  

  

  COMPOSE = None

+ NOTIFIER = None

  

  

  def main():
@@ -187,8 +188,19 @@ 

  def run_compose(compose):

      import pungi.phases

      import pungi.metadata

+     import pungi.notifier

+ 

+     errors = []

+ 

+     # initializer notifier

+     compose.notifier = pungi.notifier.PungiNotifier(compose)

+     try:

+         compose.notifier.validate()

+     except ValueError as ex:

+         errors.extend(["NOTIFIER: %s" % m for m in ex.message.split('\n')])

  

      compose.write_status("STARTED")

+     compose.notifier.send('start')

      compose.log_info("Host: %s" % socket.gethostname())

      compose.log_info("User name: %s" % getpass.getuser())

      compose.log_info("Working directory: %s" % os.getcwd())
@@ -215,7 +227,6 @@ 

      test_phase = pungi.phases.TestPhase(compose)

  

      # check if all config options are set

-     errors = []

      for phase in (init_phase, pkgset_phase, createrepo_phase,

                    buildinstall_phase, productimg_phase, gather_phase,

                    extrafiles_phase, createiso_phase, liveimages_phase,
@@ -229,6 +240,7 @@ 

                  errors.append("%s: %s" % (phase.name.upper(), i))

      if errors:

          for i in errors:

+             compose.notifier.send('abort')

              compose.log_error(i)

              print(i)

          sys.exit(1)
@@ -329,6 +341,7 @@ 

  

      compose.log_info("Compose finished: %s" % compose.topdir)

      compose.write_status("FINISHED")

+     compose.notifier.send('finish')

  

  

  if __name__ == "__main__":
@@ -343,6 +356,8 @@ 

              COMPOSE.write_status("DOOMED")

              import kobo.tback

              open(tb_path, "w").write(kobo.tback.Traceback().get_traceback())

+             if COMPOSE.notifier:

+                 COMPOSE.notifier.send('doomed')

          else:

              print("Exception: %s" % ex)

              raise

file modified
+36
@@ -548,3 +548,39 @@ 

      >>> from pungi.paths import translate_paths

      >>> print translate_paths(compose_object_with_mapping, "/mnt/a/c/somefile")

      http://b/dir/c/somefile

+ 

+ 

+ Progress notification

+ =====================

+ 

+ *Pungi* has the ability to emit notification messages about progress and

+ status. These can be used to e.g. send messages to *fedmsg*. This is

+ implemented by actually calling a separate script.

+ 

+ The script will be called with one argument describing action that just

+ happened. A JSON-encoded object will be passed to standard input to provide

+ more information about the event. At least, the object will contain a

+ ``compose_id`` key.

+ 

+ Currently these messages are sent:

+ 

+  * ``start`` -- when composing starts

+  * ``abort`` -- when compose is aborted due to incorrect configuration

+  * ``finish`` -- on successful finish of compose

+  * ``doomed`` -- when an error happens

+  * ``phase-start`` -- on start of a phase

+  * ``phase-stop`` -- when phase is finished

+ 

+ For phase related messages ``phase_name`` key is provided as well.

+ 

+ The script is invoked in compose directory and can read other information

+ there.

+ 

+ A ``pungi-fedmsg-notification`` script is provided and understands this

+ interface.

+ 

+ Config options

+ --------------

+ 

+ **notification_script**

+     (*str*) -- executable to be invoked to send the message

file modified
+1
@@ -102,6 +102,7 @@ 

          self.just_phases = just_phases or []

          self.old_composes = old_composes or []

          self.koji_event = koji_event

+         self.notifier = None

  

          # intentionally upper-case (visible in the code)

          self.DEBUG = debug

file added
+74
@@ -0,0 +1,74 @@ 

+ # -*- coding: utf-8 -*-

+ 

+ # This program is free software; you can redistribute it and/or modify

+ # it under the terms of the GNU General Public License as published by

+ # the Free Software Foundation; version 2 of the License.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU Library General Public License for more details.

+ #

+ # You should have received a copy of the GNU General Public License

+ # along with this program; if not, write to the Free Software

+ # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

+ 

+ import json

+ import threading

+ 

+ from kobo import shortcuts

+ from pungi.checks import validate_options

+ 

+ 

+ class PungiNotifier(object):

+     """Wrapper around an external script for sending messages.

+ 

+     If no script is configured, the messages are just silently ignored. If the

+     script fails, a warning will be logged, but the compose process will not be

+     interrupted.

+     """

+     config_options = (

+         {

+             "name": "notification_script",

+             "expected_types": [str],

+             "optional": True

+         },

+     )

+ 

+     def __init__(self, compose):

+         self.compose = compose

+         self.lock = threading.Lock()

+ 

+     def validate(self):

+         errors = validate_options(self.compose.conf, self.config_options)

+         if errors:

+             raise ValueError("\n".join(errors))

+ 

+     def _update_args(self, data):

+         """Add compose related information to the data."""

+         data.setdefault('compose_id', self.compose.compose_id)

+ 

+     def send(self, msg, **kwargs):

+         """Send a message.

+ 

+         The actual meaning of ``msg`` depends on what the notification script

+         will be doing. The keyword arguments will be JSON-encoded and passed on

+         to standard input of the notification process.

+ 

+         Unless you specify it manually, a ``compose_id`` key with appropriate

+         value will be automatically added.

+         """

+         script = self.compose.conf.get('notification_script', None)

+         if not script:

+             return

+ 

+         self._update_args(kwargs)

+ 

+         with self.lock:

+             ret, _ = shortcuts.run((script, msg),

+                                    stdin_data=json.dumps(kwargs),

+                                    can_fail=True,

+                                    workdir=self.compose.paths.compose.topdir(),

+                                    return_stdout=False)

+             if ret != 0:

+                 self.compose.log_warning('Failed to invoke notification script.')

file modified
+2 -1
@@ -14,7 +14,6 @@ 

  # along with this program; if not, write to the Free Software

  # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

  

- 

  from pungi.checks import validate_options

  

  
@@ -59,6 +58,7 @@ 

              self.finished = True

              return

          self.compose.log_info("[BEGIN] %s" % self.msg)

+         self.compose.notifier.send('phase-start', phase_name=self.name)

          self.run()

  

      def stop(self):
@@ -68,6 +68,7 @@ 

              self.pool.stop()

          self.finished = True

          self.compose.log_info("[DONE ] %s" % self.msg)

+         self.compose.notifier.send('phase-stop', phase_name=self.name)

  

      def run(self):

          raise NotImplementedError

file modified
+12
@@ -55,6 +55,7 @@ 

      def run(self):

          iso = IsoWrapper(logger=self.compose._logger)

          symlink_isos_to = self.compose.conf.get("symlink_isos_to", None)

+         deliverables = []

  

          commands = []

          for variant in self.compose.get_variants(types=["variant", "layered-product", "optional"], recursive=True):
@@ -97,6 +98,7 @@ 

                          self.compose.log_warning("Skipping mkisofs, image already exists: %s" % iso_path)

                          continue

                      iso_name = os.path.basename(iso_path)

+                     deliverables.append(iso_path)

  

                      graft_points = prepare_iso(self.compose, arch, variant, disc_num=disc_num, disc_count=disc_count, split_iso_data=iso_data)

  
@@ -180,6 +182,8 @@ 

                          cmd["cmd"] = " && ".join(cmd["cmd"])

                          commands.append(cmd)

  

+         self.compose.notifier.send('createiso-targets', deliverables=deliverables)

+ 

          for cmd in commands:

              self.pool.add(CreateIsoThread(self.pool))

              self.pool.queue_put((self.compose, cmd))
@@ -201,6 +205,10 @@ 

              # TODO: remove jigdo & template & checksums

          except OSError:

              pass

+         compose.notifier.send('createiso-imagefail',

+                               file=cmd['iso_path'],

+                               arch=cmd['arch'],

+                               variant=str(cmd['variant']))

  

      def process(self, item, num):

          compose, cmd = item
@@ -294,6 +302,10 @@ 

          # add: boot.iso

  

          self.pool.log_info("[DONE ] %s" % msg)

+         compose.notifier.send('createiso-imagedone',

+                               file=cmd['iso_path'],

+                               arch=cmd['arch'],

+                               variant=str(cmd['variant']))

  

  

  def split_iso(compose, arch, variant):

file modified
+1
@@ -37,6 +37,7 @@ 

          'bin/pungi',

          'bin/pungi-koji',

          'bin/comps_filter',

+         'bin/pungi-fedmsg-notifier',

      ],

      data_files      = [

          ('/usr/share/pungi', glob.glob('share/*.xsl')),

@@ -0,0 +1,82 @@ 

+ #!/usr/bin/python

+ # -*- coding: utf-8 -*-

+ 

+ import json

+ import mock

+ import os

+ import sys

+ import unittest

+ 

+ sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))

+ 

+ from pungi.notifier import PungiNotifier

+ 

+ 

+ class TestNotifier(unittest.TestCase):

+     def test_incorrect_config(self):

+         compose = mock.Mock(

+             conf={'notification_script': [1, 2]}

+         )

+ 

+         n = PungiNotifier(compose)

+         with self.assertRaises(ValueError) as err:

+             n.validate()

+             self.assertIn('notification_script', err.message)

+ 

+     @mock.patch('kobo.shortcuts.run')

+     def test_invokes_script(self, run):

+         compose = mock.Mock(

+             compose_id='COMPOSE_ID',

+             conf={'notification_script': 'run-notify'},

+             paths=mock.Mock(

+                 compose=mock.Mock(

+                     topdir=mock.Mock(return_value='/a/b')

+                 )

+             )

+         )

+ 

+         run.return_value = (0, None)

+ 

+         n = PungiNotifier(compose)

+         data = {'foo': 'bar', 'baz': 'quux'}

+         n.send('cmd', **data)

+ 

+         data['compose_id'] = 'COMPOSE_ID'

+         run.assert_called_once_with(('run-notify', 'cmd'),

+                                     stdin_data=json.dumps(data),

+                                     can_fail=True, return_stdout=False, workdir='/a/b')

+ 

+     @mock.patch('kobo.shortcuts.run')

+     def test_does_not_run_without_config(self, run):

+         compose = mock.Mock(conf={})

+ 

+         n = PungiNotifier(compose)

+         n.send('cmd', foo='bar', baz='quux')

+         self.assertFalse(run.called)

+ 

+     @mock.patch('kobo.shortcuts.run')

+     def test_logs_warning_on_failure(self, run):

+         compose = mock.Mock(

+             compose_id='COMPOSE_ID',

+             log_warning=mock.Mock(),

+             conf={'notification_script': 'run-notify'},

+             paths=mock.Mock(

+                 compose=mock.Mock(

+                     topdir=mock.Mock(return_value='/a/b')

+                 )

+             )

+         )

+ 

+         run.return_value = (1, None)

+ 

+         n = PungiNotifier(compose)

+         n.send('cmd')

+ 

+         run.assert_called_once_with(('run-notify', 'cmd'),

+                                     stdin_data=json.dumps({'compose_id': 'COMPOSE_ID'}),

+                                     can_fail=True, return_stdout=False, workdir='/a/b')

+         self.assertTrue(compose.log_warning.called)

+ 

+ 

+ if __name__ == "__main__":

+     unittest.main()