| |
@@ -70,7 +70,7 @@
|
| |
@staticmethod
|
| |
def match(event: Dict) -> bool:
|
| |
"""Return True when koji build status is COMPLETED"""
|
| |
- return event["msg"]["new"] == 1
|
| |
+ return event["new"] == 1
|
| |
|
| |
def trigger(self) -> Dict:
|
| |
def kojiBuildInfo(buildId: int) -> List[str]:
|
| |
@@ -84,7 +84,7 @@
|
| |
stdout.split("RPMs:")[1].strip().split('\n')))
|
| |
return []
|
| |
|
| |
- buildId = self.event["msg"]["build_id"]
|
| |
+ buildId = self.event["build_id"]
|
| |
rpms = kojiBuildInfo(buildId)
|
| |
if not rpms:
|
| |
raise RuntimeError("No rpms found in kojibuild %d" % buildId)
|
| |
@@ -97,7 +97,7 @@
|
| |
body=dict(
|
| |
type=["koji_build"],
|
| |
href=getBuildsetUrl("refs/pull/" + self.ref + "/head"),
|
| |
- id=self.event["msg"]["build_id"],
|
| |
+ id=self.event["build_id"],
|
| |
outcome=build["status"],
|
| |
message=build["comment"]))
|
| |
|
| |
@@ -121,15 +121,20 @@
|
| |
class Consumer(object):
|
| |
"""Record relevant AMQP message"""
|
| |
def __call__(self, message):
|
| |
- msgid = message.body.get("msg_id")
|
| |
-
|
| |
- if not msgid or not event2job(message.body):
|
| |
- # No job match the event
|
| |
- return
|
| |
+ log.debug("Receive event topic:%s id:%s" % (message.topic, message.id))
|
| |
+ message.body["msg_id"] = message.id
|
| |
+ message.body["topic"] = message.topic
|
| |
+ try:
|
| |
+ if not event2job(message.body):
|
| |
+ # No job match the event
|
| |
+ log.debug("Nothing to do with %s" % message.id)
|
| |
+ return
|
| |
|
| |
- log.info("%s: received %s", msgid, message.body.get("topic"))
|
| |
- with open("new/{}.json".format(msgid), "w") as of:
|
| |
- of.write(json.dumps(message.body))
|
| |
+ log.info("Keep matching event topic:%s id:%s" % (message.topic, message.id))
|
| |
+ with open("new/{}.json".format(message.id), "w") as of:
|
| |
+ of.write(json.dumps(message.body))
|
| |
+ except Exception as err:
|
| |
+ log.exception("Unable to process event %s" % message.id)
|
| |
|
| |
|
| |
#############################
|
| |
@@ -171,7 +176,7 @@
|
| |
return "done"
|
| |
elif build is None:
|
| |
trigger = job.trigger()
|
| |
- trigger['vars']['amqp'] = event['msg']
|
| |
+ trigger['vars']['amqp'] = event
|
| |
log.info("triggering %s for %s", trigger, job.ref)
|
| |
jobsDef = [{job: dict(vars=trigger['vars'],
|
| |
nodeset=trigger['nodeset'])}
|
| |
The patch fix the consumer and now process again messages.