#337 Add a requests session registry for Hubs to use
Closed 6 years ago by jcline. Opened 7 years ago by jcline.
jcline/fedora-hubs session-registry  into  develop

file modified
+7
@@ -0,0 +1,7 @@ 

+ from .requests_registry import RequestsSessionRegistry

+ 

+ 

+ #: A thread-local requests Session registry that provides TCP connection pools

+ #: for each thread or process. Consult :class:`RequestsSessionRegistry` for

+ #: detailed documentation and usage.

Why the :'s?

Sphinx will render this as a docblock for the module-level variable.

+ RequestsSession = RequestsSessionRegistry()

@@ -0,0 +1,101 @@ 

+ # -*- coding: utf-8 -*-

+ #

+ # This file is part of the Fedora Hubs project.

+ # Copyright (C) 2017 Red Hat, Inc.

+ #

+ # This program is free software: you can redistribute it and/or modify

+ # it under the terms of the GNU Affero General Public License as published by

+ # the Free Software Foundation, either version 3 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU Affero General Public License for more details.

+ #

+ # You should have received a copy of the GNU Affero General Public License

+ # along with this program.  If not, see <http://www.gnu.org/licenses/>.

+ """

+ Provides a registry class to allow for the easy creation and sharing of

+ thread-local requests Sessions.

+ """

+ from __future__ import unicode_literals

+ 

+ import threading

+ 

+ import requests

+ 

+ 

+ class RequestsSessionRegistry(object):

+     """

+     Create a thread-local registry for requests Sessions.

+ 

+     Repeated calls to the registry from a single thread will return the same

+     session. For example:

+ 

+         >>> RequestsSession = RequestsSessionRegistry()

+         >>> session1 = RequestsSession()

+         >>> session1 is RequestsSession()

+         True

+         >>> RequestsSession.remove()

+         >>> session1 is RequestsSession()

+         False

+ 

+     To modify how the requests Session is created, create a function that

+     returns a Session object::

+ 

+         def my_session_factory(retries=3, max_backoff=5):

+             session = requests.Session()

+             retry_conf = retry.Retry(total=retries, backoff_factor=1)

+             retry_conf.BACKOFF_MAX = max_backoff

+             adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf)

+             session.mount('http://', adapter)

+             session.mount('https://', adapter)

+             return session

+ 

+         Session = RequestsSessionRegistry(my_session_factory)

+         session = Session(retries=5)

+ 

+     Args:

+         session_factory (callable): Some callable that returns a configured

+             :class:`requests.Session` object. By default, this is just the

+             :class:`requests.Session` class itself.

+     """

+ 

+     def __init__(self, session_factory=None):

+         self.registry = threading.local()

+         if session_factory is None:

+             session_factory = requests.Session

+         self.session_factory = session_factory

+ 

+     def __call__(self, *args, **kwargs):

+         """

+         Create or get the current thread-local requests session

+ 

+         Args:

+             args (tuple): Any positional arguments to pass to the session

+                 factory.

+             kwargs (dict): Any keyword arguments to provide to the session

+                 factory.

+ 

+         Raises:

+             ValueError: If the session has already been created and

+                 positional or keyword arguments are provided. If this occurs,

+                 you should call ``remove`` first to remove the current session.

+         """

+         if not hasattr(self.registry, 'session'):

+             self.registry.session = self.session_factory(*args, **kwargs)

+         elif args or kwargs:

+             # args or kwargs can't be provided if there's already a session

+             raise ValueError('args and kwargs must be none if the registry has'

+                              ' a session already. Call `remove` first.')

+ 

+         return self.registry.session

+ 

+     def remove(self):

+         """

+         Close all open connections and remove the thread-local session.

+         """

+         if hasattr(self.registry, 'session'):

+             self.registry.session.close()

+             del self.registry.session

@@ -0,0 +1,136 @@ 

+ # -*- coding: utf-8 -*-

+ #

+ # This file is part of the Fedora Hubs project.

+ # Copyright (C) 2017 Red Hat, Inc.

+ #

+ # This program is free software: you can redistribute it and/or modify

+ # it under the terms of the GNU Affero General Public License as published by

+ # the Free Software Foundation, either version 3 of the License, or

+ # (at your option) any later version.

+ #

+ # This program is distributed in the hope that it will be useful,

+ # but WITHOUT ANY WARRANTY; without even the implied warranty of

+ # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the

+ # GNU Affero General Public License for more details.

+ #

+ # You should have received a copy of the GNU Affero General Public License

+ # along with this program.  If not, see <http://www.gnu.org/licenses/>.

+ """Tests for the :mod:`hubs.requests_registry` module."""

+ from __future__ import unicode_literals

+ 

+ import multiprocessing

+ import unittest

+ 

+ import requests

+ from requests.packages.urllib3.util import retry

+ 

+ from hubs.requests_registry import RequestsSessionRegistry

+ 

+ 

+ def process_func(Session, original_session):

+     """

+     Simple test function to be run within a thread or process Pool for testing.

+     """

+     session1 = Session()

+     session2 = Session()

+     return session1 is session2 and session1 is not original_session

+ 

+ 

+ class CallTests(unittest.TestCase):

+     """Tests for the __call__ method of :class:`RequestsSessionRegistry`"""

+ 

+     def setUp(self):

+         self.Session = RequestsSessionRegistry()

+ 

+     def test_repeated_calls_in_thread(self):

+         """Assert multiple calls within a thread result in the same Session"""

+         sesh1 = self.Session()

+         sesh2 = self.Session()

sportsesh

+         self.assertTrue(sesh1 is sesh2)

+ 

+     def test_different_threads(self):

+         """Assert using the registry across threads provides new Sessions"""

+         session = self.Session()

+ 

+         pool = multiprocessing.pool.ThreadPool(processes=4)

+         results = [

+             pool.apply_async(process_func, (self.Session, session))

+             for i in range(4)

+         ]

+         for result in results:

+             self.assertTrue(result.get())

Nice!

+ 

+     def test_different_processes(self):

+         """Assert using the registry across processes provides new Sessions"""

+         session = self.Session()

+ 

+         pool = multiprocessing.Pool(processes=4)

+         results = [

+             pool.apply_async(process_func, (self.Session, session))

+             for i in range(4)

+         ]

+         for result in results:

+             self.assertTrue(result.get())

+ 

+     def test_call_with_params_session_exists(self):

+         """

+         Assert a ValueError is raised when constructor arguments are provided

+         when a session already exists.

+         """

+         self.Session()

+         self.assertRaises(ValueError, self.Session, 'some argument')

+ 

+ 

+ class RemoveTests(unittest.TestCase):

+     """Tests for the :meth:`RequestsSessionRegistry.remove` method"""

+ 

+     def setUp(self):

+         self.Session = RequestsSessionRegistry()

+ 

+     def test_remove(self):

+         """Assert calling remove results in a new session next call"""

+         session = self.Session()

+         self.assertTrue(session is self.Session())

+         self.Session.remove()

+         self.assertFalse(session is self.Session())

+ 

+     def test_remove_no_session(self):

+         """Assert calling remove when there's no session is okay"""

+         self.Session.remove()

+ 

+ 

+ class CustomFactoryTests(unittest.TestCase):

+     """Tests for using a factory to create session objects"""

+ 

+     def setUp(self):

+         def my_session_factory(retries=3):

+             session = requests.Session()

+             retry_conf = retry.Retry(total=retries)

+             adapter = requests.adapters.HTTPAdapter(max_retries=retry_conf)

+             session.mount('http://', adapter)

+             session.mount('https://', adapter)

+             return session

+ 

+         self.my_session_factory = my_session_factory

+ 

+     def test_custom_factory(self):

+         """Assert factories can be used to create the Session"""

+         Session = RequestsSessionRegistry(self.my_session_factory)

+         session = Session()

+         http_adapter = session.adapters['http://']

+         https_adapter = session.adapters['https://']

+         self.assertEqual(3, http_adapter.max_retries.total)

+         self.assertEqual(3, https_adapter.max_retries.total)

+ 

+     def test_custom_factory_with_args(self):

+         """Assert factories with arguments can be used to create the Session"""

+         Session = RequestsSessionRegistry(self.my_session_factory)

+         session = Session(retries=5)

+         http_adapter = session.adapters['http://']

+         https_adapter = session.adapters['https://']

+         self.assertEqual(5, http_adapter.max_retries.total)

+         self.assertEqual(5, https_adapter.max_retries.total)

+ 

+ 

+ if __name__ == '__main__':

+     unittest.main(verbosity=2)

file modified
+4 -5
@@ -1,15 +1,13 @@ 

  from __future__ import unicode_literals

  

- import json

  import logging

  from hashlib import sha256

  

+ from six.moves.urllib_parse import urlencode

  import arrow

  import markdown

- import requests

- import six

  

- from six.moves.urllib_parse import urlencode

+ from hubs import RequestsSession

  

  log = logging.getLogger(__name__)

  
@@ -61,8 +59,9 @@ 

  

  def _github_results(url, auth):

      link = dict(next=url)

+     requests_session = RequestsSession()

      while 'next' in link:

-         response = requests.get(link['next'], params=auth)

+         response = requests_session.get(link['next'], params=auth)

  

          # And.. if we didn't get good results, just bail.

          if not bool(response):

@@ -1,8 +1,8 @@ 

  from __future__ import unicode_literals

  

  import operator

- import requests

  

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -40,7 +40,8 @@ 

          username = self.instance.config["username"]

          url = "https://badges.fedoraproject.org/user/{username}/json"

          url = url.format(username=username)

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          assertions = response.json()['assertions']

          key = operator.itemgetter('issued')

          return dict(assertions=sorted(assertions, key=key, reverse=True))

@@ -1,8 +1,8 @@ 

  from __future__ import unicode_literals

  

- import requests

  import pkgwat.api

  

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -48,7 +48,8 @@ 

      def execute(self):

          username = self.instance.config["username"]

          url = "/".join([PKGDB_URL, username])

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          data = response.json()

  

          issues = []
@@ -92,7 +93,8 @@ 

      def should_invalidate(self, message):

          username = self.instance.config["username"]

          url = "/".join([PKGDB_URL, username])

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          data = response.json()

  

          owned = data['point of contact'] + data['co-maintained']

@@ -3,8 +3,8 @@ 

  import flask

  import fedmsg.config

  import fedmsg.meta

- import requests

  

+ from hubs import RequestsSession

  from hubs.utils import commas

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView
@@ -53,7 +53,8 @@ 

          username = self.instance.config["username"]

          url = "https://apps.fedoraproject.org/datagrepper/raw?user={username}"

          url = url.format(username=username)

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          fedmsgs = response.json()['total']

          sub_list = []

          for assoc in self.instance.hub.associations:

@@ -1,7 +1,6 @@ 

  from __future__ import unicode_literals

  

- import requests

- 

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -59,7 +58,8 @@ 

          repo = self.instance.config["repo"]

          display_number = int(self.instance.config["display_number"])

          url = '/'.join(['https://api.github.com/repos', org, repo, "issues"])

-         issues = requests.get(url).json()

+         requests_session = RequestsSession()

+         issues = requests_session.get(url).json()

  

          all_issues = []

          for issue in issues[:display_number]:

@@ -3,9 +3,8 @@ 

  import arrow

  import collections

  import datetime

- import requests

  

- from hubs import utils

+ from hubs import utils, RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -60,7 +59,8 @@ 

          base = ('https://apps.fedoraproject.org/calendar/api/meetings/'

                  '?calendar=%s')

          url = base % calendar

-         response = requests.get(url).json()

+         requests_session = RequestsSession()

+         response = requests_session.get(url).json()

  

          tmp = collections.defaultdict(list)

          for meeting in response['meetings']:

@@ -1,10 +1,10 @@ 

  from __future__ import unicode_literals

  

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction

  

- import requests

  

  pagure_url = "https://pagure.io/api/0"

  
@@ -45,7 +45,8 @@ 

      def execute(self):

          repo = self.instance.config["repo"]

          url = '/'.join([pagure_url, repo, "pull-requests"])

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          data = response.json()

          total_req = data['total_requests']

          all_pr = list()

@@ -1,7 +1,6 @@ 

  from __future__ import unicode_literals

  

- import requests

- 

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -48,7 +47,8 @@ 

          repo = self.instance.config["repo"]

  

          url = '/'.join([pagure_url, repo, "issues"])

-         issue_response = requests.get(url)

+         requests_session = RequestsSession()

+         issue_response = requests_session.get(url)

          data = issue_response.json()

          total = data['total_issues']

  

file modified
+7 -4
@@ -3,7 +3,8 @@ 

  import flask

  import hubs.models

  import kitchen.text.converters

- import requests

+ 

+ from hubs import RequestsSession

  

  

  class Validator(object):
@@ -133,7 +134,8 @@ 

  

      @classmethod

      def from_string(cls, value):

-         response = requests.get("https://pagure.io/%s" % value, timeout=5)

+         requests_session = RequestsSession()

+         response = requests_session.get("https://pagure.io/%s" % value, timeout=5)

          if response.status_code == 200:

              return value

          raise ValueError('Invalid pagure repo')
@@ -144,8 +146,9 @@ 

  

      @classmethod

      def from_string(cls, value):

-         response = requests.get("https://fedorahosted.org/%s/" % value,

-                                 timeout=5, allow_redirects=False)

+         requests_session = RequestsSession()

+         response = requests_session.get("https://fedorahosted.org/%s/" % value,

+                                         timeout=5, allow_redirects=False)

          if response.status_code == 200:

              return value

          raise ValueError('Invalid fedorahosted project')

@@ -2,6 +2,7 @@ 

  

  import requests

  

+ from hubs import RequestsSession

  from hubs.utils import username2avatar

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView
@@ -52,7 +53,8 @@ 

          baseurl = "https://admin.fedoraproject.org/pkgdb/api/pendingacls"

          query = "?username={username}&format=json".format(username=username)

          url = baseurl + query

-         response = requests.get(url)

+         requests_session = RequestsSession()

+         response = requests_session.get(url)

          data = response.json()

          for acl in data['pending_acls']:

              acl['avatar'] = username2avatar(acl['user'], s=32)

@@ -1,7 +1,6 @@ 

  from __future__ import unicode_literals

  

- import requests

- 

+ from hubs import RequestsSession

  from hubs.widgets import validators

  from hubs.widgets.base import Widget, WidgetView

  from hubs.widgets.caching import CachedFunction
@@ -59,7 +58,8 @@ 

          query = '?user={username}&status=testing'.format(username=username)

          url = bodhiurl + query

          headers = {'Accept': 'application/json'}

-         response = requests.get(url, headers=headers)

+         requests_session = RequestsSession()

+         response = requests_session.get(url, headers=headers)

          if response.status_code == 200:

              data = response.json()

  

All requests go through the session registry. This allows requests made from the same Python thread to share TCP connections which greatly improves the performance of HTTP requests. It also gives us a place to globally configure sessions (for things like retries, global headers, etc.)

If this looks like a good idea to other people, I'll probably see if requests-toolbelt would accept a PR with this and we can switch to that if they accept it and make a release with it.

fixes #336

This example function doesn't return the session, but the sentence above sounds like it was meant to.

You could docblock the parameter to say that it is a function.

This should probably be mentioned in the docblock.

Just a few suggestions that you can take or leave. LGTM.

Sphinx will render this as a docblock for the module-level variable.

You're right. I wrote this as a docblock first, then copied it into a test, found that bug, and forgot to update the docblock.

I did as part of the class docblock (Args). The napoleon style guide indicates this is fine, but that it's also possible to document it on the init function instead. If you do that, though, you need to throw in a :special-members: directive when using autodoc. I'm fine with both approaches. Which do you like more?

Ah that sounds fine. Carry on!

rebased

7 years ago

is this PR still being worked on? If not, we should close to clean up the queue a bit.

Pull-Request has been closed by jcline

6 years ago