From 3346c23c7460eff9e1714088e6127cdf8ae877b6 Mon Sep 17 00:00:00 2001 From: Jeremy Cline Date: Mar 15 2017 13:33:49 +0000 Subject: [PATCH 1/2] Add a requests session registry for Hubs to use Signed-off-by: Jeremy Cline --- diff --git a/hubs/__init__.py b/hubs/__init__.py old mode 100755 new mode 100644 index e69de29..aa430e9 --- a/hubs/__init__.py +++ b/hubs/__init__.py @@ -0,0 +1,7 @@ +from .requests_registry import RequestsSessionRegistry + + +#: A thread-local requests Session registry that provides TCP connection pools +#: for each thread or process. Consult :class:`RequestsSessionRegistry` for +#: detailed documentation and usage. +RequestsSession = RequestsSessionRegistry() diff --git a/hubs/requests_registry.py b/hubs/requests_registry.py new file mode 100644 index 0000000..87d5ab2 --- /dev/null +++ b/hubs/requests_registry.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the Fedora Hubs project. +# Copyright (C) 2017 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +""" +Provides a registry class to allow for the easy creation and sharing of +thread-local requests Sessions. +""" +from __future__ import unicode_literals + +import threading + +import requests + + +class RequestsSessionRegistry(object): + """ + Create a thread-local registry for requests Sessions. + + Repeated calls to the registry from a single thread will return the same + session. For example: + + >>> RequestsSession = RequestsSessionRegistry() + >>> session1 = RequestsSession() + >>> session1 is RequestsSession() + True + >>> RequestsSession.remove() + >>> session1 is RequestsSession() + False + + To modify how the requests Session is created, create a function that + returns a Session object:: + + def my_session_factory(retries=3, max_backoff=5): + session = requests.Session() + retry_conf = retry.Retry(total=retries, backoff_factor=1) + retry_conf.BACKOFF_MAX = max_backoff + adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf) + session.mount('http://', adapter) + session.mount('https://', adapter) + + Session = RequestsSessionRegistry(my_session_factory) + session = Session(retries=5) + + Args: + session_factory (callable): Some callable that returns a configured + :class:`requests.Session` object. By default, this is just the + :class:`requests.Session` class itself. + """ + + def __init__(self, session_factory=None): + self.registry = threading.local() + if session_factory is None: + session_factory = requests.Session + self.session_factory = session_factory + + def __call__(self, *args, **kwargs): + """ + Create or get the current thread-local requests session + + Args: + args (tuple): Any positional arguments to pass to the session + factory. + kwargs (dict): Any keyword arguments to provide to the session + factory. + + Raises: + ValueError: If the session has already been created and + positional or keyword arguments are provided. + """ + if not hasattr(self.registry, 'session'): + self.registry.session = self.session_factory(*args, **kwargs) + elif args or kwargs: + # args or kwargs can't be provided if there's already a session + raise ValueError('args and kwargs must be none if the registry has' + ' a session already. Call `remove` first.') + + return self.registry.session + + def remove(self): + """ + Close all open connections and remove the thread-local session. + """ + if hasattr(self.registry, 'session'): + self.registry.session.close() + del self.registry.session diff --git a/hubs/tests/test_requests_registry.py b/hubs/tests/test_requests_registry.py new file mode 100644 index 0000000..f0012b9 --- /dev/null +++ b/hubs/tests/test_requests_registry.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the Fedora Hubs project. +# Copyright (C) 2017 Red Hat, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +"""Tests for the :mod:`hubs.requests_registry` module.""" +from __future__ import unicode_literals + +import multiprocessing +import unittest + +import requests +from requests.packages.urllib3.util import retry + +from hubs.requests_registry import RequestsSessionRegistry + + +def process_func(Session, original_session): + """ + Simple test function to be run within a thread or process Pool for testing. + """ + session1 = Session() + session2 = Session() + return session1 is session2 and session1 is not original_session + + +class CallTests(unittest.TestCase): + """Tests for the __call__ method of :class:`RequestsSessionRegistry`""" + + def setUp(self): + self.Session = RequestsSessionRegistry() + + def test_repeated_calls_in_thread(self): + """Assert multiple calls within a thread result in the same Session""" + sesh1 = self.Session() + sesh2 = self.Session() + self.assertTrue(sesh1 is sesh2) + + def test_different_threads(self): + """Assert using the registry across threads provides new Sessions""" + session = self.Session() + + pool = multiprocessing.pool.ThreadPool(processes=4) + results = [ + pool.apply_async(process_func, (self.Session, session)) + for i in range(4) + ] + for result in results: + self.assertTrue(result.get()) + + def test_different_processes(self): + """Assert using the registry across processes provides new Sessions""" + session = self.Session() + + pool = multiprocessing.Pool(processes=4) + results = [ + pool.apply_async(process_func, (self.Session, session)) + for i in range(4) + ] + for result in results: + self.assertTrue(result.get()) + + def test_call_with_params_session_exists(self): + """ + Assert a ValueError is raised when constructor arguments are provided + when a session already exists. + """ + self.Session() + self.assertRaises(ValueError, self.Session, 'some argument') + + +class RemoveTests(unittest.TestCase): + """Tests for the :meth:`RequestsSessionRegistry.remove` method""" + + def setUp(self): + self.Session = RequestsSessionRegistry() + + def test_remove(self): + """Assert calling remove results in a new session next call""" + session = self.Session() + self.assertTrue(session is self.Session()) + self.Session.remove() + self.assertFalse(session is self.Session()) + + def test_remove_no_session(self): + """Assert calling remove when there's no session is okay""" + self.Session.remove() + + +class CustomFactoryTests(unittest.TestCase): + """Tests for using a factory to create session objects""" + + def setUp(self): + def my_session_factory(retries=3): + session = requests.Session() + retry_conf = retry.Retry(total=retries) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + + self.my_session_factory = my_session_factory + + def test_custom_factory(self): + """Assert factories can be used to create the Session""" + Session = RequestsSessionRegistry(self.my_session_factory) + session = Session() + http_adapter = session.adapters['http://'] + https_adapter = session.adapters['https://'] + self.assertEqual(3, http_adapter.max_retries.total) + self.assertEqual(3, https_adapter.max_retries.total) + + def test_custom_factory_with_args(self): + """Assert factories with arguments can be used to create the Session""" + Session = RequestsSessionRegistry(self.my_session_factory) + session = Session(retries=5) + http_adapter = session.adapters['http://'] + https_adapter = session.adapters['https://'] + self.assertEqual(5, http_adapter.max_retries.total) + self.assertEqual(5, https_adapter.max_retries.total) + + +if __name__ == '__main__': + unittest.main(verbosity=2) From 92d9af309cf26a5fa9d2116fe3bf03516720db94 Mon Sep 17 00:00:00 2001 From: Jeremy Cline Date: Mar 15 2017 14:04:05 +0000 Subject: [PATCH 2/2] All requests go through the session registry This allows requests made from the same Python thread to share TCP connections which greatly improves the performance of HTTP requests. Signed-off-by: Jeremy Cline --- diff --git a/hubs/requests_registry.py b/hubs/requests_registry.py index 87d5ab2..b340b1c 100644 --- a/hubs/requests_registry.py +++ b/hubs/requests_registry.py @@ -51,6 +51,7 @@ class RequestsSessionRegistry(object): adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf) session.mount('http://', adapter) session.mount('https://', adapter) + return session Session = RequestsSessionRegistry(my_session_factory) session = Session(retries=5) @@ -79,7 +80,8 @@ class RequestsSessionRegistry(object): Raises: ValueError: If the session has already been created and - positional or keyword arguments are provided. + positional or keyword arguments are provided. If this occurs, + you should call ``remove`` first to remove the current session. """ if not hasattr(self.registry, 'session'): self.registry.session = self.session_factory(*args, **kwargs) diff --git a/hubs/utils.py b/hubs/utils.py index f1ecd92..b8c18fc 100755 --- a/hubs/utils.py +++ b/hubs/utils.py @@ -1,15 +1,13 @@ from __future__ import unicode_literals -import json import logging from hashlib import sha256 +from six.moves.urllib_parse import urlencode import arrow import markdown -import requests -import six -from six.moves.urllib_parse import urlencode +from hubs import RequestsSession log = logging.getLogger(__name__) @@ -61,8 +59,9 @@ def github_pulls(token, username, repo): def _github_results(url, auth): link = dict(next=url) + requests_session = RequestsSession() while 'next' in link: - response = requests.get(link['next'], params=auth) + response = requests_session.get(link['next'], params=auth) # And.. if we didn't get good results, just bail. if not bool(response): diff --git a/hubs/widgets/badges/__init__.py b/hubs/widgets/badges/__init__.py index 00eae24..7fd4946 100644 --- a/hubs/widgets/badges/__init__.py +++ b/hubs/widgets/badges/__init__.py @@ -1,8 +1,8 @@ from __future__ import unicode_literals import operator -import requests +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -40,7 +40,8 @@ class GetBadges(CachedFunction): username = self.instance.config["username"] url = "https://badges.fedoraproject.org/user/{username}/json" url = url.format(username=username) - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) assertions = response.json()['assertions'] key = operator.itemgetter('issued') return dict(assertions=sorted(assertions, key=key, reverse=True)) diff --git a/hubs/widgets/bugzilla/__init__.py b/hubs/widgets/bugzilla/__init__.py index a9e0c4f..c4e5518 100644 --- a/hubs/widgets/bugzilla/__init__.py +++ b/hubs/widgets/bugzilla/__init__.py @@ -1,8 +1,8 @@ from __future__ import unicode_literals -import requests import pkgwat.api +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -48,7 +48,8 @@ class GetIssues(CachedFunction): def execute(self): username = self.instance.config["username"] url = "/".join([PKGDB_URL, username]) - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) data = response.json() issues = [] @@ -92,7 +93,8 @@ class GetIssues(CachedFunction): def should_invalidate(self, message): username = self.instance.config["username"] url = "/".join([PKGDB_URL, username]) - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) data = response.json() owned = data['point of contact'] + data['co-maintained'] diff --git a/hubs/widgets/fedmsgstats/__init__.py b/hubs/widgets/fedmsgstats/__init__.py index ac1ba24..cad524e 100644 --- a/hubs/widgets/fedmsgstats/__init__.py +++ b/hubs/widgets/fedmsgstats/__init__.py @@ -3,8 +3,8 @@ from __future__ import unicode_literals import flask import fedmsg.config import fedmsg.meta -import requests +from hubs import RequestsSession from hubs.utils import commas from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView @@ -53,7 +53,8 @@ class GetStats(CachedFunction): username = self.instance.config["username"] url = "https://apps.fedoraproject.org/datagrepper/raw?user={username}" url = url.format(username=username) - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) fedmsgs = response.json()['total'] sub_list = [] for assoc in self.instance.hub.associations: diff --git a/hubs/widgets/githubissues/__init__.py b/hubs/widgets/githubissues/__init__.py index 4da6f9f..684da14 100644 --- a/hubs/widgets/githubissues/__init__.py +++ b/hubs/widgets/githubissues/__init__.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals -import requests - +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -59,7 +58,8 @@ class GetIssues(CachedFunction): repo = self.instance.config["repo"] display_number = int(self.instance.config["display_number"]) url = '/'.join(['https://api.github.com/repos', org, repo, "issues"]) - issues = requests.get(url).json() + requests_session = RequestsSession() + issues = requests_session.get(url).json() all_issues = [] for issue in issues[:display_number]: diff --git a/hubs/widgets/meetings/__init__.py b/hubs/widgets/meetings/__init__.py index b737f0a..3764a3a 100644 --- a/hubs/widgets/meetings/__init__.py +++ b/hubs/widgets/meetings/__init__.py @@ -3,9 +3,8 @@ from __future__ import unicode_literals import arrow import collections import datetime -import requests -from hubs import utils +from hubs import utils, RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -60,7 +59,8 @@ class GetMeetings(CachedFunction): base = ('https://apps.fedoraproject.org/calendar/api/meetings/' '?calendar=%s') url = base % calendar - response = requests.get(url).json() + requests_session = RequestsSession() + response = requests_session.get(url).json() tmp = collections.defaultdict(list) for meeting in response['meetings']: diff --git a/hubs/widgets/pagure_pr/__init__.py b/hubs/widgets/pagure_pr/__init__.py index cce7322..f83153d 100644 --- a/hubs/widgets/pagure_pr/__init__.py +++ b/hubs/widgets/pagure_pr/__init__.py @@ -1,10 +1,10 @@ from __future__ import unicode_literals +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction -import requests pagure_url = "https://pagure.io/api/0" @@ -45,7 +45,8 @@ class GetPRs(CachedFunction): def execute(self): repo = self.instance.config["repo"] url = '/'.join([pagure_url, repo, "pull-requests"]) - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) data = response.json() total_req = data['total_requests'] all_pr = list() diff --git a/hubs/widgets/pagureissues/__init__.py b/hubs/widgets/pagureissues/__init__.py index 85f7110..ee53872 100644 --- a/hubs/widgets/pagureissues/__init__.py +++ b/hubs/widgets/pagureissues/__init__.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals -import requests - +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -48,7 +47,8 @@ class GetIssues(CachedFunction): repo = self.instance.config["repo"] url = '/'.join([pagure_url, repo, "issues"]) - issue_response = requests.get(url) + requests_session = RequestsSession() + issue_response = requests_session.get(url) data = issue_response.json() total = data['total_issues'] diff --git a/hubs/widgets/validators.py b/hubs/widgets/validators.py index 9c96320..8e187cc 100644 --- a/hubs/widgets/validators.py +++ b/hubs/widgets/validators.py @@ -3,7 +3,8 @@ from __future__ import unicode_literals import flask import hubs.models import kitchen.text.converters -import requests + +from hubs import RequestsSession class Validator(object): @@ -133,7 +134,8 @@ class PagureRepo(Validator): @classmethod def from_string(cls, value): - response = requests.get("https://pagure.io/%s" % value, timeout=5) + requests_session = RequestsSession() + response = requests_session.get("https://pagure.io/%s" % value, timeout=5) if response.status_code == 200: return value raise ValueError('Invalid pagure repo') @@ -144,8 +146,9 @@ class FedorahostedProject(Validator): @classmethod def from_string(cls, value): - response = requests.get("https://fedorahosted.org/%s/" % value, - timeout=5, allow_redirects=False) + requests_session = RequestsSession() + response = requests_session.get("https://fedorahosted.org/%s/" % value, + timeout=5, allow_redirects=False) if response.status_code == 200: return value raise ValueError('Invalid fedorahosted project') diff --git a/hubs/widgets/workflow/pendingacls.py b/hubs/widgets/workflow/pendingacls.py index b30a208..487436b 100644 --- a/hubs/widgets/workflow/pendingacls.py +++ b/hubs/widgets/workflow/pendingacls.py @@ -2,6 +2,7 @@ from __future__ import unicode_literals import requests +from hubs import RequestsSession from hubs.utils import username2avatar from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView @@ -52,7 +53,8 @@ class GetPending(CachedFunction): baseurl = "https://admin.fedoraproject.org/pkgdb/api/pendingacls" query = "?username={username}&format=json".format(username=username) url = baseurl + query - response = requests.get(url) + requests_session = RequestsSession() + response = requests_session.get(url) data = response.json() for acl in data['pending_acls']: acl['avatar'] = username2avatar(acl['user'], s=32) diff --git a/hubs/widgets/workflow/updates2stable.py b/hubs/widgets/workflow/updates2stable.py index 6f36260..592f101 100644 --- a/hubs/widgets/workflow/updates2stable.py +++ b/hubs/widgets/workflow/updates2stable.py @@ -1,7 +1,6 @@ from __future__ import unicode_literals -import requests - +from hubs import RequestsSession from hubs.widgets import validators from hubs.widgets.base import Widget, WidgetView from hubs.widgets.caching import CachedFunction @@ -59,7 +58,8 @@ class GetPending(CachedFunction): query = '?user={username}&status=testing'.format(username=username) url = bodhiurl + query headers = {'Accept': 'application/json'} - response = requests.get(url, headers=headers) + requests_session = RequestsSession() + response = requests_session.get(url, headers=headers) if response.status_code == 200: data = response.json()