#361 Retry sending stomp message after a delay
Merged 4 years ago by lholecek. Opened 4 years ago by lholecek.
lholecek/waiverdb retry-sending-stomp-message  into  master

file modified
+3 -10
@@ -10,18 +10,11 @@ 

  

  

  @pytest.fixture(scope='session')

- def app(request):

+ def app():

      os.environ['TEST'] = 'true'

      app = create_app()

-     # Establish an application context before running the tests.

-     ctx = app.app_context()

-     ctx.push()

- 

-     def teardown():

-         ctx.pop()

- 

-     request.addfinalizer(teardown)

-     return app

+     with app.app_context():

+         yield app

  

  

  @pytest.fixture(scope='session')

file modified
+74 -30
@@ -6,14 +6,38 @@ 

  import pytest

  from requests import ConnectionError, HTTPError

  from mock import patch, Mock

+ from stomp.exception import StompException

  

  from .utils import create_waiver

  from waiverdb import __version__

  from waiverdb.models import Waiver

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver(mocked_get_user, client, session):

+ @pytest.fixture

+ def mocked_get_user(username):

+     with patch('waiverdb.auth.get_user', return_value=(username, {})):

+         yield username

+ 

+ 

+ @pytest.fixture

+ def mocked_user():

+     with patch('waiverdb.auth.get_user', return_value=('foo', {})):

+         yield 'foo'

+ 

+ 

+ @pytest.fixture

+ def mocked_bodhi_user():

+     with patch('waiverdb.auth.get_user', return_value=('bodhi', {})):

+         yield 'bodhi'

+ 

+ 

+ @pytest.fixture

+ def mocked_resultsdb():

+     with patch('waiverdb.api_v1.get_resultsdb_result') as mocked_resultsdb:

+         yield mocked_resultsdb

+ 

+ 

+ def test_create_waiver(mocked_user, client, session):

      data = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -36,8 +60,7 @@ 

      assert res_data['comment'] == 'it broke'

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_subject(mocked_get_user, client, session):

+ def test_create_waiver_with_subject(mocked_user, client, session):

      # 'subject' key was the API in Waiverdb < 0.11

      data = {

          'subject': {'type': 'koji_build', 'item': 'glibc-2.26-27.fc27'},
@@ -61,9 +84,7 @@ 

      assert res_data['comment'] == 'it really broke'

  

  

- @patch('waiverdb.api_v1.get_resultsdb_result')

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_result_id(mocked_get_user, mocked_resultsdb, client, session):

+ def test_create_waiver_with_result_id(mocked_user, mocked_resultsdb, client, session):

      mocked_resultsdb.return_value = {

          'data': {

              'type': ['koji_build'],
@@ -93,10 +114,8 @@ 

      assert res_data['comment'] == 'it broke'

  

  

- @patch('waiverdb.api_v1.get_resultsdb_result')

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

  def test_create_waiver_with_result_for_original_spec_nvr(

-         mocked_get_user, mocked_resultsdb, client, session):

+         mocked_user, mocked_resultsdb, client, session):

      mocked_resultsdb.return_value = {

          'data': {

              'original_spec_nvr': ['somedata'],
@@ -124,8 +143,7 @@ 

      assert res_data['comment'] == 'it broke'

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_without_comment(mocked_get_user, client, session):

+ def test_create_waiver_without_comment(mocked_user, client, session):

      data = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -140,9 +158,8 @@ 

      assert res_data['message']['comment'] == 'Missing required parameter in the JSON body'

  

  

- @patch('waiverdb.api_v1.get_resultsdb_result', side_effect=HTTPError(response=Mock(status=404)))

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_unknown_result_id(mocked_get_user, mocked_resultsdb, client, session):

+ def test_create_waiver_with_unknown_result_id(mocked_user, mocked_resultsdb, client, session):

+     mocked_resultsdb.side_effect = HTTPError(response=Mock(status=404))

      data = {

          'result_id': 123,

          'product_version': 'fool-1',
@@ -156,8 +173,7 @@ 

      assert res_data['message'].startswith('Failed looking up result in Resultsdb:')

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_no_testcase(mocked_get_user, client):

+ def test_create_waiver_with_no_testcase(mocked_user, client):

      data = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -172,8 +188,7 @@ 

      assert 'Missing required parameter in the JSON body' in res_data['message']['testcase']

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_malformed_subject(mocked_get_user, client):

+ def test_create_waiver_with_malformed_subject(mocked_user, client):

      data = {

          'subject': 'asd',

          'testcase': 'qqq',
@@ -185,8 +200,7 @@ 

      assert 'Must be a valid dict' in res_data['message']['subject']

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_non_superuser_cannot_create_waiver_for_other_users(mocked_get_user, client):

+ def test_non_superuser_cannot_create_waiver_for_other_users(mocked_user, client):

      data = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -203,8 +217,7 @@ 

      assert 'user foo does not have the proxyuser ability' == res_data['message']

  

  

- @patch('waiverdb.auth.get_user', return_value=('bodhi', {}))

- def test_superuser_can_create_waiver_for_other_users(mocked_get_user, client, session):

+ def test_superuser_can_create_waiver_for_other_users(mocked_bodhi_user, client, session):

      data = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -220,7 +233,7 @@ 

      assert r.status_code == 201

      # a waiver should be created for bar by bodhi

      assert res_data['username'] == 'bar'

-     assert res_data['proxied_by'] == 'bodhi'

+     assert res_data['proxied_by'] == mocked_bodhi_user

  

  

  def test_get_waiver(client, session):
@@ -672,8 +685,7 @@ 

      assert 'Access-Control-Allow-Methods' not in r.headers

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_multiple_waivers(mocked_get_user, client, session):

+ def test_create_multiple_waivers(mocked_user, client, session):

      item1 = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -709,8 +721,7 @@ 

      assert session.query(Waiver).count() == 2

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_multiple_waivers_rollback_on_error(mocked_get_user, client, session):

+ def test_create_multiple_waivers_rollback_on_error(mocked_user, client, session):

      item1 = {

          'subject_type': 'koji_build',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -731,8 +742,7 @@ 

      assert session.query(Waiver).count() == 0

  

  

- @patch('waiverdb.auth.get_user', return_value=('foo', {}))

- def test_create_waiver_with_arbitrary_subject_type(mocked_get_user, client, session):

+ def test_create_waiver_with_arbitrary_subject_type(mocked_user, client, session):

      data = {

          'subject_type': 'kind-of-magic',

          'subject_identifier': 'glibc-2.26-27.fc27',
@@ -753,3 +763,37 @@ 

      assert res_data['product_version'] == 'fool-1'

      assert res_data['waived'] is True

      assert res_data['comment'] == 'it broke'

+ 

+ 

+ def test_create_waiver_failed_event_once(mocked_user, client, session, caplog):

+     config = dict(

+         MESSAGE_BUS_PUBLISH=True,

+         MESSAGE_PUBLISHER='stomp',

+         MAX_STOMP_RETRY=3,

+         STOMP_RETRY_DELAY_SECONDS=0,

+         STOMP_CONFIGS={

+             'destination': '/topic/VirtualTopic.eng.waiverdb.waiver.new',

+             'connection': {

+                 'host_and_ports': [('broker01', 61612)],

+             },

+         },

+     )

+ 

+     data = {

+         'subject_type': 'koji_build',

+         'subject_identifier': 'glibc-2.26-27.fc27',

+         'testcase': 'testcase1',

+         'product_version': 'fool-1',

+         'waived': True,

+         'comment': 'it broke',

+     }

+ 

+     with patch.dict(client.application.config, config):

+         with patch('waiverdb.events.stomp.Connection') as connection:

+             connection().connect.side_effect = (StompException, StompException, None)

+             r = client.post('/api/v1.0/waivers/', json=data)

+             assert r.status_code == 201

+             assert 'Failed to send message (try 1/3)' in caplog.text

+             assert 'Failed to send message (try 2/3)' in caplog.text

+             assert 'Failed to send message (try 3/3)' not in caplog.text

+             assert 'StompException' in caplog.text

file modified
+39 -18
@@ -10,6 +10,7 @@ 

  """

  

  import logging

+ import time

  

  from flask_restful import marshal

  import stomp
@@ -25,6 +26,41 @@ 

  

  _log = logging.getLogger(__name__)

  

+ MAX_STOMP_RETRY = 3

+ STOMP_RETRY_DELAY_SECONDS = 5

+ 

+ 

+ def _send_stomp_message(session):

+     with stomp_connection() as conn:

+         stomp_configs = current_app.config.get('STOMP_CONFIGS')

+         for row in session.identity_map.values():

+             monitor.messaging_tx_to_send_counter.inc()

+             if not isinstance(row, Waiver):

+                 continue

+             _log.debug('Publishing a message for %r', row)

+             msg = json.dumps(marshal(row, waiver_fields))

+             kwargs = dict(body=msg, headers={}, destination=stomp_configs['destination'])

+             if stomp.__version__[0] < 4:

+                 kwargs['message'] = kwargs.pop('body')  # On EL7, different sig.

+             try:

+                 conn.send(**kwargs)

+                 monitor.messaging_tx_sent_ok_counter.inc()

+             except Exception:

+                 _log.exception('Couldn\'t publish message via stomp')

+                 monitor.messaging_tx_failed_counter.inc()

+                 raise

+ 

+ 

+ def _send_stomp_message_with_retry(session, max_retry, retry_delay):

+     for i in range(max_retry):

+         time.sleep(i * retry_delay)

+         try:

+             _send_stomp_message(session)

+         except stomp.exception.StompException:

+             _log.exception('Failed to send message (try %s/%s)', i + 1, max_retry)

+         else:

+             break

+ 

  

  def publish_new_waiver(session):

      """
@@ -66,24 +102,9 @@ 

                 current_app.config['MESSAGE_PUBLISHER'])

  

      if current_app.config['MESSAGE_PUBLISHER'] == 'stomp':

-         with stomp_connection() as conn:

-             stomp_configs = current_app.config.get('STOMP_CONFIGS')

-             for row in session.identity_map.values():

-                 monitor.messaging_tx_to_send_counter.inc()

-                 if not isinstance(row, Waiver):

-                     continue

-                 _log.debug('Publishing a message for %r', row)

-                 msg = json.dumps(marshal(row, waiver_fields))

-                 kwargs = dict(body=msg, headers={}, destination=stomp_configs['destination'])

-                 if stomp.__version__[0] < 4:

-                     kwargs['message'] = kwargs.pop('body')  # On EL7, different sig.

-                 try:

-                     conn.send(**kwargs)

-                     monitor.messaging_tx_sent_ok_counter.inc()

-                 except Exception:

-                     _log.exception('Couldn\'t publish message via stomp')

-                     monitor.messaging_tx_failed_counter.inc()

-                     raise

+         max_retry = current_app.config.get('MAX_STOMP_RETRY', MAX_STOMP_RETRY)

+         retry_delay = current_app.config.get('STOMP_RETRY_DELAY_SECONDS', STOMP_RETRY_DELAY_SECONDS)

+         _send_stomp_message_with_retry(session, max_retry=max_retry, retry_delay=retry_delay)

  

      elif current_app.config['MESSAGE_PUBLISHER'] == 'fedmsg':

          for row in session.identity_map.values():

no initial comment

This is not true if it is the latest run.. but.. not a big issue.

2 new commits added

  • Retry sending stomp message after a delay
  • Tests: Simplify mocking and fixtures
4 years ago

This is not true if it is the latest run.. but.. not a big issue.

Oh, good point. Changed.

Build 7c256f63efd685b5391a8769ac22949f9eb3979f FAILED!
Rebase or make new commits to rebuild.

2 new commits added

  • Retry sending stomp message after a delay
  • Tests: Simplify mocking and fixtures
4 years ago

Pull-Request has been merged by lholecek

4 years ago