From d52122857925dcfefe29295ebe67a841779f9ec8 Mon Sep 17 00:00:00 2001 From: Pierre-Yves Chibon Date: Oct 21 2016 10:07:13 +0000 Subject: Port the SSE server to py3 and asyncio --- diff --git a/ev-server/pagure-stream-server.py b/ev-server/pagure-stream-server.py index 746de60..66a9be6 100644 --- a/ev-server/pagure-stream-server.py +++ b/ev-server/pagure-stream-server.py @@ -22,17 +22,17 @@ nc localhost 8080 import datetime import logging import os -import urlparse +import urllib.parse as urlparse -import trollius -import trollius_redis +import asyncio +import asyncio_redis log = logging.getLogger(__name__) if 'PAGURE_CONFIG' not in os.environ \ and os.path.exists('/etc/pagure/pagure.cfg'): - print 'Using configuration file `/etc/pagure/pagure.cfg`' + print('Using configuration file `/etc/pagure/pagure.cfg`') os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' @@ -91,12 +91,12 @@ def get_obj_from_path(path): return output -@trollius.coroutine +@asyncio.coroutine def handle_client(client_reader, client_writer): data = None while True: # give client a chance to respond, timeout after 10 seconds - line = yield trollius.From(trollius.wait_for( + line = yield from(asyncio.wait_for( client_reader.readline(), timeout=10.0)) if not line.decode().strip(): @@ -109,7 +109,7 @@ def handle_client(client_reader, client_writer): log.warning("Expected ticket uid, received None") return - data = data.decode().rstrip().split() + data = data.rstrip().split() log.info("Received %s", data) if not data: log.warning("No URL provided: %s" % data) @@ -139,7 +139,7 @@ def handle_client(client_reader, client_writer): "Access-Control-Allow-Origin: %s\n\n" % origin ).encode()) - connection = yield trollius.From(trollius_redis.Connection.create( + connection = yield from(asyncio_redis.Connection.create( host=pagure.APP.config['REDIS_HOST'], port=pagure.APP.config['REDIS_PORT'], db=pagure.APP.config['REDIS_DB'])) @@ -147,21 +147,21 @@ def handle_client(client_reader, client_writer): try: # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) + subscriber = yield from(connection.start_subscribe()) # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid])) + yield from(subscriber.subscribe(['pagure.%s' % obj.uid])) # Inside a while loop, wait for incoming events. while True: - reply = yield trollius.From(subscriber.next_published()) + reply = yield from(subscriber.next_published()) #print(u'Received: ', repr(reply.value), u'on channel', reply.channel) log.info(reply) log.info("Sending %s", reply.value) client_writer.write(('data: %s\n\n' % reply.value).encode()) - yield trollius.From(client_writer.drain()) + yield from(client_writer.drain()) - except trollius.ConnectionResetError as err: + except asyncio.ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in handle_client") except Exception as err: log.exception("ERROR: Exception in handle_client") @@ -171,21 +171,17 @@ def handle_client(client_reader, client_writer): client_writer.close() -@trollius.coroutine +@asyncio.coroutine def stats(client_reader, client_writer): try: - log.info('Clients: %s', SERVER.active_count) + log.info('Clients: %s', SERVER._active_count) client_writer.write(( "HTTP/1.0 200 OK\n" "Cache: nocache\n\n" ).encode()) - client_writer.write(('data: %s\n\n' % SERVER.active_count).encode()) - yield trollius.From(client_writer.drain()) - - except trollius.ConnectionResetError as err: - log.info(err) - pass + client_writer.write(('data: %s\n\n' % SERVER._active_count).encode()) + yield from(client_writer.drain()) finally: client_writer.close() return @@ -195,8 +191,8 @@ def main(): global SERVER try: - loop = trollius.get_event_loop() - coro = trollius.start_server( + loop = asyncio.get_event_loop() + coro = asyncio.start_server( handle_client, host=None, port=pagure.APP.config['EVENTSOURCE_PORT'], @@ -204,7 +200,7 @@ def main(): SERVER = loop.run_until_complete(coro) log.info('Serving server at {}'.format(SERVER.sockets[0].getsockname())) if pagure.APP.config.get('EV_STATS_PORT'): - stats_coro = trollius.start_server( + stats_coro = asyncio.start_server( stats, host=None, port=pagure.APP.config.get('EV_STATS_PORT'), @@ -215,7 +211,7 @@ def main(): loop.run_forever() except KeyboardInterrupt: pass - except trollius.ConnectionResetError as err: + except asyncio.ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in main") except Exception as err: log.exception("ERROR: Exception in main")