| |
@@ -0,0 +1,211 @@
|
| |
+ #!/bin/env python3
|
| |
+ # Copyright 2019 Red Hat
|
| |
+ #
|
| |
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
|
| |
+ # not use this file except in compliance with the License. You may obtain
|
| |
+ # a copy of the License at
|
| |
+ #
|
| |
+ # http://www.apache.org/licenses/LICENSE-2.0
|
| |
+ #
|
| |
+ # Unless required by applicable law or agreed to in writing, software
|
| |
+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
| |
+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
| |
+ # License for the specific language governing permissions and limitations
|
| |
+ # under the License.
|
| |
+
|
| |
+ import json
|
| |
+ import subprocess
|
| |
+ import time
|
| |
+ import logging
|
| |
+
|
| |
+ from os import environ
|
| |
+ from typing import Dict, List, Optional, Type
|
| |
+ from pathlib import Path
|
| |
+
|
| |
+ import requests
|
| |
+ from fedora_messaging import api, message
|
| |
+
|
| |
+
|
| |
+ #################
|
| |
+ # Configuration #
|
| |
+ #################
|
| |
+ ZUUL_GATEWAY = environ.get(
|
| |
+ "ZUUL_GATEWAY",
|
| |
+ "http://fedora.softwarefactory-project.io:9042").rstrip('/')
|
| |
+ ZUUL_URL = environ.get(
|
| |
+ "ZUUL_URL", "https://fedora.softwarefactory-project.io/zuul/").rstrip('/')
|
| |
+ ZUUL_TENANT = environ.get("ZUUL_TENANT", "fedora-staging")
|
| |
+ ZUUL_API = ZUUL_URL + "/api/tenant/" + ZUUL_TENANT
|
| |
+ ZUUL_WEB = ZUUL_URL + "/t/" + ZUUL_TENANT
|
| |
+
|
| |
+ DEBUG = True
|
| |
+ logging.basicConfig(
|
| |
+ format='[%(levelname)-7s %(name)s] - %(message)s',
|
| |
+ level=logging.DEBUG if DEBUG else logging.INFO)
|
| |
+ log = logging.getLogger("FedoraMessagingZuul")
|
| |
+
|
| |
+
|
| |
+ ##########################
|
| |
+ # AMQP Message Zuul Jobs #
|
| |
+ ##########################
|
| |
+ class Job(object):
|
| |
+ def __init__(self, event: Dict) -> None:
|
| |
+ self.event = event
|
| |
+ self.ref = event["msg_id"]
|
| |
+
|
| |
+ @staticmethod
|
| |
+ def match(event: Dict) -> bool:
|
| |
+ """Returns True when the event matches the job"""
|
| |
+
|
| |
+ def trigger(self) -> Dict:
|
| |
+ """Returns the list of Zuul jobs and custom variables"""
|
| |
+
|
| |
+ def report(self, build: Dict) -> Dict:
|
| |
+ """Returns the AMQP message body"""
|
| |
+
|
| |
+
|
| |
+ class RpmLint(Job):
|
| |
+ topic = "org.fedoraproject.prod.buildsys.build.state.change"
|
| |
+
|
| |
+ @staticmethod
|
| |
+ def match(event: Dict) -> bool:
|
| |
+ """Return True when koji build status is COMPLETED"""
|
| |
+ return event["msg"]["new"] == 1
|
| |
+
|
| |
+ def trigger(self) -> Dict:
|
| |
+ def kojiBuildInfo(buildId: int) -> List[str]:
|
| |
+ p = subprocess.Popen(["koji", "buildinfo", str(buildId)],
|
| |
+ stdout=subprocess.PIPE)
|
| |
+ stdout, _ = p.communicate()
|
| |
+ stdout = stdout.decode('utf-8')
|
| |
+ if "RPMs:" in stdout:
|
| |
+ return list(map(lambda x: x.replace(
|
| |
+ '/mnt/koji/', 'https://kojipkgs.fedoraproject.org/'),
|
| |
+ stdout.split("RPMs:")[1].strip().split('\n')))
|
| |
+ return []
|
| |
+
|
| |
+ buildId = self.event["msg"]["build_id"]
|
| |
+ rpms = kojiBuildInfo(buildId)
|
| |
+ if not rpms:
|
| |
+ raise RuntimeError("No rpms found in kojibuild %d" % buildId)
|
| |
+ nodeset = dict(nodes=[{"name": "container", "label": "runc-centos"}])
|
| |
+ return dict(jobs=["rpm-lint"], vars=dict(rpms=rpms), nodeset=nodeset)
|
| |
+
|
| |
+ def report(self, build: Dict) -> Dict:
|
| |
+ return dict(
|
| |
+ topic='org.fedoraproject.test.resultsdb.result.new',
|
| |
+ body=dict(
|
| |
+ type=["koji_build"],
|
| |
+ href=getBuildsetUrl("refs/pull/" + self.ref + "/head"),
|
| |
+ id=self.event["msg"]["build_id"],
|
| |
+ outcome=build["status"],
|
| |
+ message=build["comment"]))
|
| |
+
|
| |
+
|
| |
+ Jobs = [RpmLint]
|
| |
+
|
| |
+
|
| |
+ def event2job(event: Dict) -> Optional[Type[Job]]:
|
| |
+ jobs = [job for job in Jobs
|
| |
+ if job.topic == event["topic"] and job.match(event)]
|
| |
+ if not jobs:
|
| |
+ return None
|
| |
+ if len(jobs) > 1:
|
| |
+ raise RuntimeError("Multiple job match event")
|
| |
+ return jobs[0]
|
| |
+
|
| |
+
|
| |
+ #############################
|
| |
+ # Message Consumer Callback #
|
| |
+ #############################
|
| |
+ class Consumer(object):
|
| |
+ """Record relevant AMQP message"""
|
| |
+ def __call__(self, message):
|
| |
+ msgid = message.body.get("msg_id")
|
| |
+
|
| |
+ if not msgid or not event2job(message.body):
|
| |
+ # No job match the event
|
| |
+ return
|
| |
+
|
| |
+ log.info("%s: received %s", msgid, message.body.get("topic"))
|
| |
+ with open("new/{}.json".format(msgid), "w") as of:
|
| |
+ of.write(json.dumps(message.body))
|
| |
+
|
| |
+
|
| |
+ #############################
|
| |
+ # Message Processor Service #
|
| |
+ #############################
|
| |
+ def getBuildsetUrl(ref: str) -> str:
|
| |
+ # TODO: use buildset url when available (https://review.opendev.org/630079)
|
| |
+ builds = requests.get(ZUUL_API + "/builds?ref=" + ref).json()
|
| |
+ if not builds:
|
| |
+ raise RuntimeError("Couldn't get buildset url of %s" % ref)
|
| |
+ return ZUUL_WEB + "/build/" + builds[0]["uuid"]
|
| |
+
|
| |
+
|
| |
+ def isBuildCompleted(build: Dict) -> bool:
|
| |
+ return "comment" in build and "status" in build
|
| |
+
|
| |
+
|
| |
+ def process(event: Dict, builds: Dict) -> str:
|
| |
+ """Process an event and return the final directory name"""
|
| |
+ jobtype = event2job(event)
|
| |
+ if not jobtype:
|
| |
+ return "trash"
|
| |
+ job = jobtype(event)
|
| |
+ # First check if event has a running build
|
| |
+ build = builds.get(job.ref)
|
| |
+ if build and isBuildCompleted(build):
|
| |
+ msg = message.Message(**job.report(build))
|
| |
+ log.info("%s: reporting %s", job.ref, msg.body)
|
| |
+ try:
|
| |
+ # TODO: need an AMQP account...
|
| |
+ # api.publish(msg)
|
| |
+ pass
|
| |
+ except Exception:
|
| |
+ log.exception("Couldn't publish message %s:", msg)
|
| |
+ if DEBUG:
|
| |
+ raise
|
| |
+ return "error"
|
| |
+ requests.delete(ZUUL_GATEWAY + "/jobs/" + job.ref)
|
| |
+ return "done"
|
| |
+ elif build is None:
|
| |
+ trigger = job.trigger()
|
| |
+ trigger['vars']['amqp'] = event['msg']
|
| |
+ log.info("triggering %s for %s", trigger, job.ref)
|
| |
+ jobsDef = [{job: dict(vars=trigger['vars'],
|
| |
+ nodeset=trigger['nodeset'])}
|
| |
+ for job in trigger['jobs']]
|
| |
+ zuul = json.dumps([dict(project=dict(check=dict(jobs=jobsDef)))])
|
| |
+ log.debug("zuul configuration: %s", zuul)
|
| |
+ requests.post(ZUUL_GATEWAY + "/jobs/" + job.ref, data=zuul)
|
| |
+ return ""
|
| |
+
|
| |
+
|
| |
+ def main():
|
| |
+ """Loop over message in new directory and call the process method"""
|
| |
+ while True:
|
| |
+ events = [x for x in Path('new').iterdir() if x.suffix == '.json']
|
| |
+ if events:
|
| |
+ # Grab pending jobs
|
| |
+ builds = requests.get(ZUUL_GATEWAY + "/jobs").json()
|
| |
+ for event in events:
|
| |
+ try:
|
| |
+ result = process(json.loads(event.read_text()), builds)
|
| |
+ if result:
|
| |
+ event.rename(Path(result) / event.name)
|
| |
+ except RuntimeError as e:
|
| |
+ log.error("%s: %s", event.name, e)
|
| |
+ event.rename(Path('error') / event.name)
|
| |
+ except Exception:
|
| |
+ log.exception("%s: something went wrong:", event.name)
|
| |
+ event.rename(Path('error') / event.name)
|
| |
+ if DEBUG:
|
| |
+ raise
|
| |
+ else:
|
| |
+ log.debug("no events...")
|
| |
+ time.sleep(5)
|
| |
+
|
| |
+
|
| |
+ if __name__ == "__main__":
|
| |
+ main()
|
| |