#19 Consumer was stuck - seems schema changed a bit
Merged 4 years ago by fbo. Opened 4 years ago by fbo.
fbo/fedora-project-config fix-consumer-msg-format  into  master

file modified
+17 -12
@@ -70,7 +70,7 @@ 

      @staticmethod

      def match(event: Dict) -> bool:

          """Return True when koji build status is COMPLETED"""

-         return event["msg"]["new"] == 1

+         return event["new"] == 1

  

      def trigger(self) -> Dict:

          def kojiBuildInfo(buildId: int) -> List[str]:
@@ -84,7 +84,7 @@ 

                      stdout.split("RPMs:")[1].strip().split('\n')))

              return []

  

-         buildId = self.event["msg"]["build_id"]

+         buildId = self.event["build_id"]

          rpms = kojiBuildInfo(buildId)

          if not rpms:

              raise RuntimeError("No rpms found in kojibuild %d" % buildId)
@@ -97,7 +97,7 @@ 

              body=dict(

                  type=["koji_build"],

                  href=getBuildsetUrl("refs/pull/" + self.ref + "/head"),

-                 id=self.event["msg"]["build_id"],

+                 id=self.event["build_id"],

                  outcome=build["status"],

                  message=build["comment"]))

  
@@ -121,15 +121,20 @@ 

  class Consumer(object):

      """Record relevant AMQP message"""

      def __call__(self, message):

-         msgid = message.body.get("msg_id")

- 

-         if not msgid or not event2job(message.body):

-             # No job match the event

-             return

+         log.debug("Receive event topic:%s id:%s" % (message.topic, message.id))

+         message.body["msg_id"] = message.id

+         message.body["topic"] = message.topic

+         try:

+             if not event2job(message.body):

+                 # No job match the event

+                 log.debug("Nothing to do with %s" % message.id)

+                 return

  

-         log.info("%s: received %s", msgid, message.body.get("topic"))

-         with open("new/{}.json".format(msgid), "w") as of:

-             of.write(json.dumps(message.body))

+             log.info("Keep matching event topic:%s id:%s" % (message.topic, message.id))

+             with open("new/{}.json".format(message.id), "w") as of:

+                 of.write(json.dumps(message.body))

+         except Exception as err:

+             log.exception("Unable to process event %s" % message.id)

  

  

  #############################
@@ -171,7 +176,7 @@ 

          return "done"

      elif build is None:

          trigger = job.trigger()

-         trigger['vars']['amqp'] = event['msg']

+         trigger['vars']['amqp'] = event

          log.info("triggering %s for %s", trigger, job.ref)

          jobsDef = [{job: dict(vars=trigger['vars'],

                                nodeset=trigger['nodeset'])}

The patch fix the consumer and now process again messages.

Build succeeded.

Build succeeded (gate pipeline).

Pull-Request has been merged by fbo

4 years ago