From 2069325d16711ad1930a974e787cbf4836365439 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Jan 16 2023 10:12:20 +0000 Subject: Port the EventSource server to be asyncio only Signed-off-by: Pierre-Yves Chibon --- diff --git a/pagure-ev/pagure_stream_server.py b/pagure-ev/pagure_stream_server.py index 6908ea7..fa213f8 100644 --- a/pagure-ev/pagure_stream_server.py +++ b/pagure-ev/pagure_stream_server.py @@ -26,7 +26,7 @@ import os import redis -import trololio +import asyncio from six.moves.urllib.parse import urlparse @@ -137,15 +137,14 @@ def get_obj_from_path(path): return getfunc(repo, objid) -@trololio.coroutine +@asyncio.coroutine def handle_client(client_reader, client_writer): data = None while True: # give client a chance to respond, timeout after 10 seconds - line = yield trololio.From( - trololio.asyncio.wait_for(client_reader.readline(), timeout=10.0) - ) - if not line.decode().strip(): + line = yield from asyncio.wait_for(client_reader.readline(), timeout=10.0) + + if not line or not line.decode().strip(): break line = line.decode().rstrip() if data is None: @@ -155,7 +154,7 @@ def handle_client(client_reader, client_writer): log.warning("Expected ticket uid, received None") return - data = data.decode().rstrip().split() + data = data.rstrip().split() log.info("Received %s", data) if not data: log.warning("No URL provided: %s" % data) @@ -204,16 +203,16 @@ def handle_client(client_reader, client_writer): client_writer.write(("event: ping\n\n").encode()) oncall = 0 oncall += 1 - yield trololio.From(client_writer.drain()) - yield trololio.From(trololio.asyncio.sleep(1)) + yield from client_writer.drain() + yield from asyncio.sleep(1) else: - log.info("Sending %s", msg["data"]) - client_writer.write(("data: %s\n\n" % msg["data"]).encode()) - yield trololio.From(client_writer.drain()) + log.info("Sending %s", msg["data"].decode()) + client_writer.write(("data: %s\n\n" % msg["data"].decode()).encode()) + yield from client_writer.drain() except OSError: log.info("Client closed connection") - except trololio.ConnectionResetError as err: + except ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in handle_client") except Exception as err: log.exception("ERROR: Exception in handle_client") @@ -225,18 +224,18 @@ def handle_client(client_reader, client_writer): client_writer.close() -@trololio.coroutine +@asyncio.coroutine def stats(client_reader, client_writer): try: - log.info("Clients: %s", SERVER.active_count) + log.info("Clients: %s", SERVER._active_count) client_writer.write( ("HTTP/1.0 200 OK\n" "Cache: nocache\n\n").encode() ) - client_writer.write(("data: %s\n\n" % SERVER.active_count).encode()) - yield trololio.From(client_writer.drain()) + client_writer.write(("data: %s\n\n" % SERVER._active_count).encode()) + yield from client_writer.drain() - except trololio.ConnectionResetError as err: + except ConnectionResetError as err: log.info(err) finally: client_writer.close() @@ -248,8 +247,8 @@ def main(): _get_session() try: - loop = trololio.asyncio.get_event_loop() - coro = trololio.asyncio.start_server( + loop = asyncio.get_event_loop() + coro = asyncio.start_server( handle_client, host=None, port=pagure.config.config["EVENTSOURCE_PORT"], @@ -259,7 +258,7 @@ def main(): "Serving server at {}".format(SERVER.sockets[0].getsockname()) ) if pagure.config.config.get("EV_STATS_PORT"): - stats_coro = trololio.asyncio.start_server( + stats_coro = asyncio.start_server( stats, host=None, port=pagure.config.config.get("EV_STATS_PORT"), @@ -273,7 +272,7 @@ def main(): loop.run_forever() except KeyboardInterrupt: pass - except trololio.ConnectionResetError as err: + except ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in main") except Exception: log.exception("ERROR: Exception in main")