#11 fedora-messaging: bootstrap the services
Merged 4 years ago by fbo. Opened 4 years ago by tdecacqu.
tdecacqu/fedora-project-config fedora-messaging  into  master

@@ -0,0 +1,13 @@ 

+ # Fedora messaging gateway

+ 

+ ## Design

+ 

+ The fedora messaging gateway is composed of 3 components:

+ 

+ - [zuul gateway](https://github.com/TristanCacqueray/zuul-gateway): trigger job.

+ - Consumer: record AMQP message

+ - Processor: trigger job and report completed jobs

+ 

+ ## Setup

+ 

+ To setup or update the system, run the `deploy.yaml` playbook.

@@ -0,0 +1,248 @@ 

+ ---

+ - hosts: localhost

+   become: yes

+   gather_facts: no

+   vars:

+     ansible_python_interpreter: /usr/bin/python2

+     user: fedora-messaging-zuul

+     state_dir: /var/lib/fedora-messaging-zuul

+     lib_dir: /usr/libexec/fedora-messaging-zuul

+     gateway_version: a96f543cd02398ac582952a44563372442b13a41

+     gateway_sum: "sha256:3bbda75372c1cf54bf8a4dc810c0c5372256a28ef28b7412bf8014586f18382f"

+     container_image: "fedora-messaging:latest"

+     state: started

+     zuul_url: https://fedora.softwarefactory-project.io/zuul

+     zuul_tenant: fedora-staging

+     zuul_connection: amqp

+     zuul_gateway: https://fedora.softwarefactory-project.io/amqp

+ 

+   pre_tasks:

+     - name: Create service user

+       user:

+         name: "{{ user }}"

+         home: "{{ state_dir }}"

+       register: user_object

+     - name: Create state directories

+       file:

+         path: "{{ state_dir }}/{{ item }}"

+         state: directory

+         mode: 0755

+         owner: "{{ user }}"

+       with_items:

+         - ""

+         - new

+         - done

+         - trash

+         - error

+     - name: Create libexec directory

+       file:

+         path: "{{ lib_dir }}"

+         state: directory

+         mode: 0755

+     - name: Install dependencies

+       package:

+         name:

+           - rh-python35-python-gunicorn

+           - rh-python35-python-requests

+           - rh-python35-python-flask

+           - rh-python35-enable-py3

+           - buildah

+           - podman

+ 

+   handlers:

+     - name: Restart zuul-gateway

+       service:

+         name: fedora-messaging-gateway

+         state: restarted

+       when: state == "started"

+     - name: Restart fedora-messaging services

+       service:

+         name: "fedora-messaging-{{ item }}"

+         state: restarted

+       with_items:

+         - consumer

+         - processor

+       when: state == "started"

+     - name: Reload apache

+       service:

+         name: httpd

+         state: reloaded

+ 

+   tasks:

+     - name: Copy the script

+       copy:

+         src: script.py

+         dest: "{{ lib_dir }}/script.py"

+         mode: 0755

+       notify: Restart fedora-messaging services

+     - name: Setup fedora-messaging configuration

+       copy:

+         content: |

+           amqp_url = "amqps://fedora:@rabbitmq.fedoraproject.org/%2Fpublic_pubsub"

+ 

+           [tls]

+           ca_cert = "/etc/fedora-messaging/cacert.pem"

+           keyfile = "/etc/fedora-messaging/fedora-key.pem"

+           certfile = "/etc/fedora-messaging/fedora-cert.pem"

+ 

+           [client_properties]

+           app = "Fedora Messaging Zuul"

+ 

+           [exchanges."amq.topic"]

+           type = "topic"

+           durable = true

+           auto_delete = false

+           arguments = {}

+ 

+           [qos]

+           prefetch_size = 0

+           prefetch_count = 25

+ 

+           [log_config]

+           version = 1

+           disable_existing_loggers = true

+ 

+           [log_config.formatters.simple]

+           format = "[%(levelname)-7s %(name)s] - %(message)s"

+ 

+           [log_config.handlers.console]

+           class = "logging.StreamHandler"

+           formatter = "simple"

+           stream = "ext://sys.stdout"

+ 

+           [log_config.root]

+           level = "ERROR"

+           handlers = ["console"]

+         dest: "{{ lib_dir }}/fedora-messaging-zuul.toml"

+ 

+     - name: Deploy the zuul-gateway

+       block:

+         - name: Fetch the gateway.py script

+           get_url:

+             url: "https://raw.githubusercontent.com/TristanCacqueray/zuul-gateway/{{ gateway_version }}/gateway.py"

+             dest: "{{ lib_dir }}/gateway.py"

+             checksum: "{{ gateway_sum }}"

+           notify: Restart zuul-gateway

+         - name: Setup systemd service

+           copy:

+             content: |

+               [Unit]

+               Description=Zuul gateway API Service

+               After=syslog.target network.target

+ 

+               [Service]

+               Type=simple

+               User={{ user }}

+               Environment=ZUUL_URL={{ zuul_url }}

+               Environment=ZUUL_TENANT={{ zuul_tenant }}

+               Environment=ZUUL_CONNECTION={{ zuul_connection }}

+               SyslogIdentifier=fedora-messaging-gateway

+               EnvironmentFile=-/etc/opt/rh/rh-python35/sysconfig/enable-py3

+               ExecStart=/opt/rh/rh-python35/root/usr/bin/gunicorn \

+                 --chdir / -b 0.0.0.0:9042 --pythonpath {{ lib_dir }} gateway:app

+ 

+               [Install]

+               WantedBy=multi-user.target

+             dest: /etc/systemd/system/fedora-messaging-gateway.service

+           register: _gateway_service

+         - name: Start the service

+           systemd:

+             name: fedora-messaging-gateway.service

+             daemon-reload: "{% if _gateway_service is changed %}yes{% else %}no{% endif %}"

+             state: "{{ state }}"

+             enabled: yes

+         - name: Setup gateway rewrite

+           copy:

+             content: |

+               ProxyPass /amqp/ http://127.0.0.1:9042/ retry=0

+               ProxyPassReverse /amqp/ http://127.0.0.1:9042/

+             dest: /etc/httpd/conf.d/amqp.conf

+           notify: Reload apache

+ 

+     - name: Deploy the fedora-messaging-consumer

+       block:

+         - name: Copy image file

+           copy:

+             content: |

+               FROM fedora

+               RUN dnf install -y fedora-messaging python3-requests koji

+               RUN useradd --uid {{ user_object.uid }} -m fedora-messaging-zuul

+               USER fedora-messaging-zuul

+             dest: "{{ lib_dir }}/Buildahfile"

+           register: _buildahfile

+           notify: Restart fedora-messaging services

+         - name: Build the image

+           command: "buildah bud -f Buildahfile -t {{ container_image }} {{ lib_dir }}"

+           when: _buildahfile is changed

+         - name: Setup systemd service

+           copy:

+             content: |

+               [Unit]

+               Description=Zuul messaging consumer

+               After=syslog.target network.target

+ 

+               [Service]

+               Type=simple

+               SyslogIdentifier=fedora-messaging-consumer

+               ExecStart=/usr/bin/podman run --name fedora-messaging-consumer \

+                 -v {{ lib_dir }}:{{ lib_dir }} \

+                 -v {{ state_dir }}:{{ state_dir }} \

+                 --env PYTHONUNBUFFERED=1 \

+                 --env PYTHONPATH={{ lib_dir }} \

+                 --workdir {{ state_dir }} \

+                 --user {{ user_object.uid }} \

+                 --rm {{ container_image }} \

+                 fedora-messaging --conf {{ lib_dir }}/fedora-messaging-zuul.toml consume --callback script:Consumer

+               ExecStop=-/usr/bin/podman stop fedora-messaging-consumer

+               Restart=always

+               RestartSec=3s

+ 

+               [Install]

+               WantedBy=multi-user.target

+             dest: /etc/systemd/system/fedora-messaging-consumer.service

+           register: _consumer_service

+         - name: Start the service

+           systemd:

+             name: fedora-messaging-consumer.service

+             daemon-reload: "{% if _consumer_service is changed %}yes{% else %}no{% endif %}"

+             state: "{{ state }}"

+             enabled: yes

+ 

+     - name: Deploy the fedora-messaging-processor

+       block:

+         - name: Setup systemd service

+           copy:

+             content: |

+               [Unit]

+               Description=Zuul messaging processor

+               After=syslog.target network.target

+ 

+               [Service]

+               Type=simple

+               SyslogIdentifier=fedora-messaging-processor

+               ExecStart=/usr/bin/podman run --name fedora-messaging-processor \

+                 -v {{ lib_dir }}:{{ lib_dir }} \

+                 -v {{ state_dir }}:{{ state_dir }} \

+                 --env PYTHONUNBUFFERED=1 \

+                 --env FEDORA_MESSAGING_CONF={{ lib_dir }}/fedora-messaging-zuul.toml \

+                 --env "ZUUL_GATEWAY={{ zuul_gateway }}" \

+                 --env "ZUUL_URL={{ zuul_url }}" \

+                 --env "ZUUL_TENANT={{ zuul_tenant }}" \

+                 --workdir {{ state_dir }} \

+                 --user {{ user_object.uid }} \

+                 --rm {{ container_image }} \

+                 {{ lib_dir }}/script.py

+               ExecStop=-/usr/bin/podman stop fedora-messaging-processor

+               Restart=always

+               RestartSec=3s

+ 

+               [Install]

+               WantedBy=multi-user.target

+             dest: /etc/systemd/system/fedora-messaging-processor.service

+           register: _processor_service

+         - name: Start the service

+           systemd:

+             name: fedora-messaging-processor.service

+             daemon-reload: "{% if _processor_service is changed %}yes{% else %}no{% endif %}"

+             state: "{{ state }}"

+             enabled: yes

@@ -0,0 +1,211 @@ 

+ #!/bin/env python3

+ # Copyright 2019 Red Hat

+ #

+ # Licensed under the Apache License, Version 2.0 (the "License"); you may

+ # not use this file except in compliance with the License. You may obtain

+ # a copy of the License at

+ #

+ #      http://www.apache.org/licenses/LICENSE-2.0

+ #

+ # Unless required by applicable law or agreed to in writing, software

+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

+ # License for the specific language governing permissions and limitations

+ # under the License.

+ 

+ import json

+ import subprocess

+ import time

+ import logging

+ 

+ from os import environ

+ from typing import Dict, List, Optional, Type

+ from pathlib import Path

+ 

+ import requests

+ from fedora_messaging import api, message

+ 

+ 

+ #################

+ # Configuration #

+ #################

+ ZUUL_GATEWAY = environ.get(

+     "ZUUL_GATEWAY",

+     "http://fedora.softwarefactory-project.io:9042").rstrip('/')

+ ZUUL_URL = environ.get(

+     "ZUUL_URL", "https://fedora.softwarefactory-project.io/zuul/").rstrip('/')

+ ZUUL_TENANT = environ.get("ZUUL_TENANT", "fedora-staging")

+ ZUUL_API = ZUUL_URL + "/api/tenant/" + ZUUL_TENANT

+ ZUUL_WEB = ZUUL_URL + "/t/" + ZUUL_TENANT

+ 

+ DEBUG = True

+ logging.basicConfig(

+     format='[%(levelname)-7s %(name)s] - %(message)s',

+     level=logging.DEBUG if DEBUG else logging.INFO)

+ log = logging.getLogger("FedoraMessagingZuul")

+ 

+ 

+ ##########################

+ # AMQP Message Zuul Jobs #

+ ##########################

+ class Job(object):

+     def __init__(self, event: Dict) -> None:

+         self.event = event

+         self.ref = event["msg_id"]

+ 

+     @staticmethod

+     def match(event: Dict) -> bool:

+         """Returns True when the event matches the job"""

+ 

+     def trigger(self) -> Dict:

+         """Returns the list of Zuul jobs and custom variables"""

+ 

+     def report(self, build: Dict) -> Dict:

+         """Returns the AMQP message body"""

+ 

+ 

+ class RpmLint(Job):

+     topic = "org.fedoraproject.prod.buildsys.build.state.change"

+ 

+     @staticmethod

+     def match(event: Dict) -> bool:

+         """Return True when koji build status is COMPLETED"""

+         return event["msg"]["new"] == 1

+ 

+     def trigger(self) -> Dict:

+         def kojiBuildInfo(buildId: int) -> List[str]:

+             p = subprocess.Popen(["koji", "buildinfo", str(buildId)],

+                                  stdout=subprocess.PIPE)

+             stdout, _ = p.communicate()

+             stdout = stdout.decode('utf-8')

+             if "RPMs:" in stdout:

+                 return list(map(lambda x: x.replace(

+                     '/mnt/koji/', 'https://kojipkgs.fedoraproject.org/'),

+                     stdout.split("RPMs:")[1].strip().split('\n')))

+             return []

+ 

+         buildId = self.event["msg"]["build_id"]

+         rpms = kojiBuildInfo(buildId)

+         if not rpms:

+             raise RuntimeError("No rpms found in kojibuild %d" % buildId)

+         nodeset = dict(nodes=[{"name": "container", "label": "runc-centos"}])

+         return dict(jobs=["rpm-lint"], vars=dict(rpms=rpms), nodeset=nodeset)

+ 

+     def report(self, build: Dict) -> Dict:

+         return dict(

+             topic='org.fedoraproject.test.resultsdb.result.new',

+             body=dict(

+                 type=["koji_build"],

+                 href=getBuildsetUrl("refs/pull/" + self.ref + "/head"),

+                 id=self.event["msg"]["build_id"],

+                 outcome=build["status"],

+                 message=build["comment"]))

+ 

+ 

+ Jobs = [RpmLint]

+ 

+ 

+ def event2job(event: Dict) -> Optional[Type[Job]]:

+     jobs = [job for job in Jobs

+             if job.topic == event["topic"] and job.match(event)]

+     if not jobs:

+         return None

+     if len(jobs) > 1:

+         raise RuntimeError("Multiple job match event")

+     return jobs[0]

+ 

+ 

+ #############################

+ # Message Consumer Callback #

+ #############################

+ class Consumer(object):

+     """Record relevant AMQP message"""

+     def __call__(self, message):

+         msgid = message.body.get("msg_id")

+ 

+         if not msgid or not event2job(message.body):

+             # No job match the event

+             return

+ 

+         log.info("%s: received %s", msgid, message.body.get("topic"))

+         with open("new/{}.json".format(msgid), "w") as of:

+             of.write(json.dumps(message.body))

+ 

+ 

+ #############################

+ # Message Processor Service #

+ #############################

+ def getBuildsetUrl(ref: str) -> str:

+     # TODO: use buildset url when available (https://review.opendev.org/630079)

+     builds = requests.get(ZUUL_API + "/builds?ref=" + ref).json()

+     if not builds:

+         raise RuntimeError("Couldn't get buildset url of %s" % ref)

+     return ZUUL_WEB + "/build/" + builds[0]["uuid"]

+ 

+ 

+ def isBuildCompleted(build: Dict) -> bool:

+     return "comment" in build and "status" in build

+ 

+ 

+ def process(event: Dict, builds: Dict) -> str:

+     """Process an event and return the final directory name"""

+     jobtype = event2job(event)

+     if not jobtype:

+         return "trash"

+     job = jobtype(event)

+     # First check if event has a running build

+     build = builds.get(job.ref)

+     if build and isBuildCompleted(build):

+         msg = message.Message(**job.report(build))

+         log.info("%s: reporting %s", job.ref, msg.body)

+         try:

+             # TODO: need an AMQP account...

+             # api.publish(msg)

+             pass

+         except Exception:

+             log.exception("Couldn't publish message %s:", msg)

+             if DEBUG:

+                 raise

+             return "error"

+         requests.delete(ZUUL_GATEWAY + "/jobs/" + job.ref)

+         return "done"

+     elif build is None:

+         trigger = job.trigger()

+         trigger['vars']['amqp'] = event['msg']

+         log.info("triggering %s for %s", trigger, job.ref)

+         jobsDef = [{job: dict(vars=trigger['vars'],

+                               nodeset=trigger['nodeset'])}

+                    for job in trigger['jobs']]

+         zuul = json.dumps([dict(project=dict(check=dict(jobs=jobsDef)))])

+         log.debug("zuul configuration: %s", zuul)

+         requests.post(ZUUL_GATEWAY + "/jobs/" + job.ref, data=zuul)

+     return ""

+ 

+ 

+ def main():

+     """Loop over message in new directory and call the process method"""

+     while True:

+         events = [x for x in Path('new').iterdir() if x.suffix == '.json']

+         if events:

+             # Grab pending jobs

+             builds = requests.get(ZUUL_GATEWAY + "/jobs").json()

+             for event in events:

+                 try:

+                     result = process(json.loads(event.read_text()), builds)

+                     if result:

+                         event.rename(Path(result) / event.name)

+                 except RuntimeError as e:

+                     log.error("%s: %s", event.name, e)

+                     event.rename(Path('error') / event.name)

+                 except Exception:

+                     log.exception("%s: something went wrong:", event.name)

+                     event.rename(Path('error') / event.name)

+                     if DEBUG:

+                         raise

+         else:

+             log.debug("no events...")

+         time.sleep(5)

+ 

+ 

+ if __name__ == "__main__":

+     main()

@@ -0,0 +1,33 @@ 

+ # Copyright 2019 Red Hat

+ #

+ # Licensed under the Apache License, Version 2.0 (the "License"); you may

+ # not use this file except in compliance with the License. You may obtain

+ # a copy of the License at

+ #

+ #      http://www.apache.org/licenses/LICENSE-2.0

+ #

+ # Unless required by applicable law or agreed to in writing, software

+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT

+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the

+ # License for the specific language governing permissions and limitations

+ # under the License.

+ 

+ import setuptools

+ 

+ setuptools.setup(

+     name="zuul_fedora_messaging",

+     version="0.0.1",

+     author="Tristan de Cacqueray",

+     author_email="tdecacqu@redhat.com",

+     description="Process fedora messaging event with Zuul",

+     long_description=open("README.md").read(),

+     long_description_content_type="text/markdown",

+     url="https://pagure.io/fedora-project-config",

+     py_modules=["script"],

+     entry_points=dict(console_scripts=["zuul-fedora-messaging=script:main"]),

+     classifiers=[

+         "Programming Language :: Python :: 3",

+         "License :: OSI Approved :: ASL 2.0 License",

+         "Operating System :: OS Independent",

+     ],

+ )

@@ -0,0 +1,11 @@ 

+ [tox]

+ envlist = py35

+ 

+ [testenv]

+ deps =

+     flake8

+     mypy

+ whitelist_externals = bash

+ commands =

+     flake8 script.py

+     mypy --ignore-missing-imports script.py

no initial comment

Build succeeded.

rebased onto 1e0215dec173bc9232804fb54c6076a28d1f26ae

4 years ago

Build succeeded.

rebased onto 8d418ce67fb6500e461915b395ab3a805525cce7

4 years ago

Build succeeded.

The gateway is restarted by the other handler (only needed when the gateway.py change)

rebased onto 47dbe8d5d73444c0e647e7c43719e790b7299d1b

4 years ago

Build succeeded.

rebased onto ad61609

4 years ago

Build succeeded.

Build succeeded (gate pipeline).

Pull-Request has been merged by fbo

4 years ago