From 3778a85b75a9fa02c1ddab81dc0d14748a4acab4 Mon Sep 17 00:00:00 2001 From: Aurélien Bompard Date: Oct 11 2017 14:29:58 +0000 Subject: Use MongoDB to store the feed items --- diff --git a/ansible/roles/hubs/tasks/main.yml b/ansible/roles/hubs/tasks/main.yml index 2ac3f8a..f26adaf 100644 --- a/ansible/roles/hubs/tasks/main.yml +++ b/ansible/roles/hubs/tasks/main.yml @@ -10,6 +10,7 @@ with_items: - npm - redis + - mongodb-server - fedmsg-hub - python-virtualenv - python3-flask-oidc @@ -152,6 +153,7 @@ service: name={{ item }} state=started enabled=yes with_items: - redis + - mongod # Include mode-specific tasks diff --git a/hubs/feed.py b/hubs/feed.py index 1cbe510..4c92454 100644 --- a/hubs/feed.py +++ b/hubs/feed.py @@ -7,7 +7,7 @@ import re import fedmsg.meta import flask -import redis +import pymongo from fedmsg.encoding import loads, dumps import hubs.app @@ -130,7 +130,6 @@ def _make_hub_links(msg, attr, existing_usernames): class Feed(object): max_items = 100 - connection_pool = None msgtype = None def __init__(self, owner): @@ -145,35 +144,23 @@ class Feed(object): self.db = None fedmsg_config = hubs.app.fedmsg_config self.db_config = { - 'host': fedmsg_config.get('hubs.redis.host', 'localhost'), - 'port': fedmsg_config.get('hubs.redis.port', 6379), - 'db': fedmsg_config.get('hubs.redis.db', 0), - 'password': fedmsg_config.get('hubs.redis.password', None), - } - self.key = self._get_key() - - def _get_key(self): - parts = ["feed", self.msgtype, self.owner] - key_prefix = hubs.app.fedmsg_config.get('hubs.redis.feed-prefix') - if key_prefix: - parts.insert(0, key_prefix) - return "|".join(parts) - - def get(self, start=0, end=None): - end = end or -1 + "url": fedmsg_config.get('hubs.mongodb.url'), + "db": fedmsg_config.get('hubs.mongodb.database', "hubs"), + } + + def get(self): if self.db is None: self.connect() return [ - loads(item.decode("utf-8")) for item in - self.db.lrange(self.key, start, end) + loads(item["msg"]) for item in + self.db.find(sort=[('$natural', pymongo.DESCENDING)]) ] def add(self, msg): if self.db is None: self.connect() - log.debug("Adding message %s to %s", msg["msg_id"], self.key) - self.db.lpush(self.key, dumps(self._preprocess_msg(msg))) - self.db.ltrim(self.key, 0, self.max_items) + log.debug("Adding message %s to %s", msg["msg_id"], self.db.name) + self.db.insert_one({"msg": dumps(self._preprocess_msg(msg))}) def _preprocess_msg(self, msg): # Default implementation: no-op. @@ -182,29 +169,35 @@ class Feed(object): def length(self): if self.db is None: self.connect() - return self.db.llen(self.key) + return self.db.count() __len__ = length def connect(self): - if self.connection_pool is None: - self.connection_pool = redis.ConnectionPool(**self.db_config) - self.db = redis.Redis(connection_pool=self.connection_pool) try: - self.db.info() - except redis.exceptions.ConnectionError: - log.warning("Could not connect to Redis") + client = pymongo.MongoClient(self.db_config["url"]) + database = client[self.db_config["db"]] + collection_name = "|".join(["feed", self.msgtype, self.owner]) + existing = database.collection_names( + include_system_collections=False) + if collection_name in existing: + self.db = database[collection_name] + else: + self.db = database.create_collection( + collection_name, + capped=True, size=5242880, max=self.max_items) + except pymongo.errors.ConnectionFailure: + log.warning("Could not connect to MongoDB") self.db = None raise # XXX: do better? def close(self): try: - self.db.close() - except redis.exceptions.ConnectionError: - log.warning("Could not disconnect from Redis") + self.db.database.client.close() + except pymongo.errors.ConnectionFailure: + log.warning("Could not disconnect from MongoDB") raise # XXX: do better? self.db = None - self.connection_pool = None class Notifications(Feed): diff --git a/hubs/tests/test_feed.py b/hubs/tests/test_feed.py index 4df4ca4..526aac7 100644 --- a/hubs/tests/test_feed.py +++ b/hubs/tests/test_feed.py @@ -2,7 +2,8 @@ from __future__ import unicode_literals import json -from mock import Mock, patch +import pymongo +from mock import MagicMock, Mock, patch from hubs.app import app from hubs.models import User, Hub, Association @@ -24,14 +25,33 @@ class FeedTest(APPTest): def tearDown(self): super(FeedTest, self).tearDown() + @patch("hubs.feed.pymongo") + def test_connect(self, pymongo_mock): + client = MagicMock() + pymongo_mock.MongoClient.return_value = client + for feed_class, msgtype in self.feed_classes: + database = Mock() + database.collection_names.return_value = [] + database.create_collection.return_value = db = object() + client.__getitem__.return_value = database + db_name = "feed|%s|testuser" % msgtype + feed = feed_class("testuser") + feed.connect() + pymongo_mock.MongoClient.assert_called_with(None) + client.__getitem__.assert_called_with("hubs") + database.create_collection.assert_called_with( + db_name, capped=True, size=5242880, max=100 + ) + self.assertIs(feed.db, db) + def test_get(self): for feed_class, msgtype in self.feed_classes: feed = feed_class("testuser") feed.db = Mock() - feed.db.lrange.return_value = [] + feed.db.find.return_value = [] feed.get() - feed.db.lrange.assert_called_once_with( - "feed|{}|testuser".format(msgtype), 0, -1) + feed.db.find.assert_called_once_with( + sort=[('$natural', pymongo.DESCENDING)]) def test_add(self): msg = { @@ -43,34 +63,30 @@ class FeedTest(APPTest): for feed_class, msgtype in self.feed_classes: feed = feed_class("testuser") feed.db = Mock() - key = "feed|{}|testuser".format(msgtype) feed.add(msg) - feed.db.lpush.assert_called_once() - lpush_call_args = feed.db.lpush.call_args_list[0][0] - self.assertEqual(lpush_call_args[0], key) + feed.db.insert_one.assert_called_once() + insert_call_args = feed.db.insert_one.call_args_list[0][0] + inserted_msg = json.loads(insert_call_args[0]["msg"]) if feed_class == Notifications: - self.assertIn("msg_ids", json.loads(lpush_call_args[1])) + self.assertIn("msg_ids", inserted_msg) elif feed_class == Activity: - self.assertEqual(json.loads(lpush_call_args[1]), msg) - feed.db.ltrim.assert_called_once_with(key, 0, 100) + self.assertEqual(inserted_msg, msg) def test_length(self): for feed_class, msgtype in self.feed_classes: feed = feed_class("testuser") feed.db = Mock() - feed.db.llen.return_value = 42 + feed.db.count.return_value = 42 self.assertEqual(feed.length(), 42) - feed.db.llen.assert_called_once_with( - "feed|{}|testuser".format(msgtype)) + feed.db.count.assert_called_once_with() def test_close(self): for feed_class, msgtype in self.feed_classes: feed = feed_class("testuser") - redis_mock = feed.db = Mock() + db_mock = feed.db = Mock() feed.close() - redis_mock.close.assert_called_once() + db_mock.database.client.close.assert_called_once() self.assertIsNone(feed.db) - self.assertIsNone(feed.connection_pool) @patch("hubs.feed.Activity") def test_on_new_message(self, mock_activity): diff --git a/requirements.txt b/requirements.txt index 07dab88..4a42cec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ gunicorn html5lib==0.9999999 munch psycopg2 +pymongo pytz requests setuptools