| |
@@ -148,6 +148,8 @@
|
| |
# Fix bus_id soon enough.
|
| |
self.opts.bus_id = getattr(self.opts, 'bus_id', type(self).__name__)
|
| |
|
| |
+ self.opts.bus_publish_retries = getattr(self.opts, 'bus_publish_retries', 5)
|
| |
+
|
| |
if not log:
|
| |
log = logging
|
| |
logging.basicConfig(level=logging.DEBUG)
|
| |
@@ -172,10 +174,16 @@
|
| |
"""
|
| |
try:
|
| |
message.validate()
|
| |
- self._send_message(message)
|
| |
- # pylint: disable=W0703
|
| |
- except Exception:
|
| |
- self.log.exception("Failed to publish message.")
|
| |
+ except Exception: # pylint: disable=W0703
|
| |
+ self.log.exception("Failed to validate a message")
|
| |
+ return
|
| |
+
|
| |
+ for attempt in range(1, self.opts.bus_publish_retries + 1):
|
| |
+ try:
|
| |
+ self._send_message(message)
|
| |
+ except Exception: # pylint: disable=W0703
|
| |
+ # We don't want to halt the worker because of messaging.
|
| |
+ self.log.exception("Attempt %s to publish a message failed", attempt)
|
| |
|
| |
def announce_job(self, msg_type, job, who, ip, pid):
|
| |
"""
|
| |
@@ -315,5 +323,5 @@
|
| |
os.environ['FEDORA_MESSAGING_CONF'] = opts.toml_config
|
| |
|
| |
def _send_message(self, message):
|
| |
- from fedora_messaging import api
|
| |
- api.publish(message)
|
| |
+ from fedora_messaging import api as fm_api, exceptions as fm_ex
|
| |
+ fm_api.publish(message)
|
| |
With fedora-messaging-1.7.1-1.fc30.noarch we had an issue that the
api.publish() never returned; so I updated the package to the latest
fedora version 2.0.0-1.fc30.noarch and the problem disappeared.
But as it turns out, the logs started to contain messages like:
This means that some of the messages were lost because of documented
PublishTimeout. Per documentation, it is up to the caller what to do
about this - so let's re-try for now, by default 5x.