From e0ebc15075a5f4398aa974e3e6f019b87cba0d29 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: May 23 2017 23:05:46 +0000 Subject: Use python-redis instead of trollius-redis to correctly clean up when client leaves With trollius-redis, the connection is not released when the client disconnects unless someone comments on the issue/PR they had open, because unless something happened on the issue they had open, we did not actually send anything to the client. As a result, we were not aware whether the client was still connected. With python-redis, we just check every so often if there's a message, and if not we at least try to send a ping to the client every 5 seconds. Trying to send data will cause an BrokenPipeError if the client is gone, and thus we can correctly clean up. Signed-off-by: Patrick Uiterwijk --- diff --git a/ev-server/pagure_stream_server.py b/ev-server/pagure_stream_server.py index d2ed404..c7c0a36 100644 --- a/ev-server/pagure_stream_server.py +++ b/ev-server/pagure_stream_server.py @@ -23,8 +23,8 @@ import logging import os import urlparse +import redis import trollius -import trollius_redis log = logging.getLogger(__name__) @@ -40,6 +40,10 @@ import pagure.lib # noqa: E402 from pagure.exceptions import PagureEvException # noqa: E402 SERVER = None +POOL = redis.ConnectionPool( + host=pagure.APP.config['REDIS_HOST'], + port=pagure.APP.config['REDIS_PORT'], + db=pagure.APP.config['REDIS_DB']) def _get_issue(repo, objid): @@ -207,34 +211,42 @@ def handle_client(client_reader, client_writer): "Access-Control-Allow-Origin: %s\n\n" % origin ).encode()) - connection = yield trollius.From(trollius_redis.Connection.create( - host=pagure.APP.config['REDIS_HOST'], - port=pagure.APP.config['REDIS_PORT'], - db=pagure.APP.config['REDIS_DB'])) - try: - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) + conn = redis.Redis(connection_pool=POOL) + subscriber = conn.pubsub(ignore_subscribe_messages=True) - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.%s' % obj.uid])) + try: + subscriber.subscribe('pagure.%s' % obj.uid) # Inside a while loop, wait for incoming events. + oncall = 0 while True: - reply = yield trollius.From(subscriber.next_published()) - log.info(reply) - log.info("Sending %s", reply.value) - client_writer.write(('data: %s\n\n' % reply.value).encode()) - yield trollius.From(client_writer.drain()) - + msg = subscriber.get_message() + if msg is None: + # Send a ping to see if the client is still alive + if oncall >= 5: + # Only send a ping once every 5 seconds + client_writer.write(('event: ping\n\n').encode()) + oncall = 0 + oncall += 1 + yield trollius.From(client_writer.drain()) + yield trollius.From(trollius.sleep(1)) + else: + log.info("Sending %s", msg['data']) + client_writer.write(('data: %s\n\n' % msg['data']).encode()) + yield trollius.From(client_writer.drain()) + + except OSError: + log.info("Client closed connection") except trollius.ConnectionResetError as err: log.exception("ERROR: ConnectionResetError in handle_client") except Exception as err: log.exception("ERROR: Exception in handle_client") + log.info(type(err)) finally: # Wathever happens, close the connection. - connection.close() + log.info("Client left. Goodbye!") + subscriber.close() client_writer.close()