#394 Use MongoDB to store the feed items
Merged 6 years ago by abompard. Opened 6 years ago by abompard.
abompard/fedora-hubs feature/feed-in-mongodb  into  develop

@@ -10,6 +10,7 @@ 

    with_items:

      - npm

      - redis

+     - mongodb-server

      - fedmsg-hub

      - python-virtualenv

      - python3-flask-oidc
@@ -152,6 +153,7 @@ 

    service: name={{ item }} state=started enabled=yes

    with_items:

      - redis

+     - mongod

  

  

  # Include mode-specific tasks

file modified
+27 -34
@@ -7,7 +7,7 @@ 

  

  import fedmsg.meta

  import flask

- import redis

+ import pymongo

  from fedmsg.encoding import loads, dumps

  

  import hubs.app
@@ -130,7 +130,6 @@ 

  class Feed(object):

  

      max_items = 100

-     connection_pool = None

      msgtype = None

  

      def __init__(self, owner):
@@ -145,35 +144,23 @@ 

          self.db = None

          fedmsg_config = hubs.app.fedmsg_config

          self.db_config = {

-             'host': fedmsg_config.get('hubs.redis.host', 'localhost'),

-             'port': fedmsg_config.get('hubs.redis.port', 6379),

-             'db': fedmsg_config.get('hubs.redis.db', 0),

-             'password': fedmsg_config.get('hubs.redis.password', None),

-             }

-         self.key = self._get_key()

- 

-     def _get_key(self):

-         parts = ["feed", self.msgtype, self.owner]

-         key_prefix = hubs.app.fedmsg_config.get('hubs.redis.feed-prefix')

-         if key_prefix:

-             parts.insert(0, key_prefix)

-         return "|".join(parts)

- 

-     def get(self, start=0, end=None):

-         end = end or -1

+             "url": fedmsg_config.get('hubs.mongodb.url'),

+             "db": fedmsg_config.get('hubs.mongodb.database', "hubs"),

+         }

+ 

+     def get(self):

          if self.db is None:

              self.connect()

          return [

-             loads(item.decode("utf-8")) for item in

-             self.db.lrange(self.key, start, end)

+             loads(item["msg"]) for item in

+             self.db.find(sort=[('$natural', pymongo.DESCENDING)])

This suggestion is likely overkill, but I thought I might as well suggest it anyway just for thought: you might consider using mongoengine, which is an "OM" (like an ORM, but Mongo isn't relational… ☺).

On the other hand, it looks like you are doing something extremely simple here so probably not worth it.

              ]

  

      def add(self, msg):

          if self.db is None:

              self.connect()

-         log.debug("Adding message %s to %s", msg["msg_id"], self.key)

-         self.db.lpush(self.key, dumps(self._preprocess_msg(msg)))

-         self.db.ltrim(self.key, 0, self.max_items)

+         log.debug("Adding message %s to %s", msg["msg_id"], self.db.name)

+         self.db.insert_one({"msg": dumps(self._preprocess_msg(msg))})

  

      def _preprocess_msg(self, msg):

          # Default implementation: no-op.
@@ -182,29 +169,35 @@ 

      def length(self):

          if self.db is None:

              self.connect()

-         return self.db.llen(self.key)

+         return self.db.count()

  

      __len__ = length

  

      def connect(self):

-         if self.connection_pool is None:

-             self.connection_pool = redis.ConnectionPool(**self.db_config)

-         self.db = redis.Redis(connection_pool=self.connection_pool)

          try:

-             self.db.info()

-         except redis.exceptions.ConnectionError:

-             log.warning("Could not connect to Redis")

+             client = pymongo.MongoClient(self.db_config["url"])

+             database = client[self.db_config["db"]]

+             collection_name = "|".join(["feed", self.msgtype, self.owner])

+             existing = database.collection_names(

+                 include_system_collections=False)

+             if collection_name in existing:

+                 self.db = database[collection_name]

+             else:

+                 self.db = database.create_collection(

+                     collection_name,

+                     capped=True, size=5242880, max=self.max_items)

With MongoDB, you don't usually need to create collections explicitly - they just come to exist the moment you try to store an object in one.

+         except pymongo.errors.ConnectionFailure:

+             log.warning("Could not connect to MongoDB")

              self.db = None

              raise  # XXX: do better?

  

      def close(self):

          try:

-             self.db.close()

-         except redis.exceptions.ConnectionError:

-             log.warning("Could not disconnect from Redis")

+             self.db.database.client.close()

+         except pymongo.errors.ConnectionFailure:

+             log.warning("Could not disconnect from MongoDB")

              raise  # XXX: do better?

          self.db = None

-         self.connection_pool = None

  

  

  class Notifications(Feed):

@@ -6,10 +6,7 @@ 

  import Modal from '../Modal';

  import { getAvailableWidgets } from '../../core/actions/widgets';

  import { addWidget } from '../../core/actions/widget';

- import {

-   makeLoadable,

-   makeLoadingComponent

- } from '../../core/utils';

+ import { makeLoadable } from '../../core/utils';

  

  

  class AddWidgetDialog extends React.Component {
@@ -122,10 +119,8 @@ 

        if (this.state.selectedWidget.isReact) {

          const AsyncComponent = makeLoadable(

            () => import(`../../widgets/${this.state.selectedWidget.name}/Config`),

-           makeLoadingComponent(

-             "Loading widget configuration...",

-             "Sorry, there was a problem loading the widget configuration."

-           )

+           "Loading widget configuration...",

+           "Sorry, there was a problem loading the widget configuration."

          );

          contents = (

            <AsyncComponent

@@ -1,17 +1,9 @@ 

  import React from 'react';

  import PropTypes from 'prop-types';

  import { connect } from 'react-redux';

- import {

-   makeLoadable,

-   makeLoadingComponent

- } from '../core/utils';

+ import { makeLoadable } from '../core/utils';

  import SimpleWidget from '../components/SimpleWidget';

  

- /*

- import HalpWidget from '../widgets/halp/Widget';

- import FeedWidget from '../widgets/feed/Widget';

- */

- 

  

  class Widget extends React.PureComponent {

  
@@ -30,17 +22,9 @@ 

      if (!this.props.editMode && this.props.widget.isReact) {

        const AsyncComponent = makeLoadable(

          () => import(`../widgets/${this.props.widget.component}/Widget`),

-         makeLoadingComponent(

-           "Loading widget...",

-           "Sorry, there was a problem loading the widget."

-         )

+         "Loading widget...",

+         "Sorry, there was a problem loading the widget."

        );

-       /*

-       console.log(this.props.widget);

-       let AsyncComponent = null;

-       if (this.props.widget.component === "halp") { AsyncComponent = HalpWidget; }

-       if (this.props.widget.component === "feed") { AsyncComponent = FeedWidget; }

-       */

        widgetComponent = (

          <AsyncComponent {...this.props} />

        );

@@ -6,10 +6,7 @@ 

    deleteWidget,

    closeConfigDialog,

    } from "../core/actions/widget";

- import {

-   makeLoadable,

-   makeLoadingComponent

- } from '../core/utils';

+ import { makeLoadable } from '../core/utils';

  import SimpleWidgetConfig from '../components/SimpleWidgetConfig';

  import Modal from '../components/Modal';

  
@@ -72,10 +69,8 @@ 

      if (this.props.widget.isReact) {

        const AsyncComponent = makeLoadable(

          () => import(`../widgets/${widgetName}/Config`),

-         makeLoadingComponent(

-           "Loading widget configuration...",

-           "Sorry, there was a problem loading the widget configuration."

-         )

+         "Loading widget configuration...",

+         "Sorry, there was a problem loading the widget configuration."

        );

        contents = (

          <AsyncComponent

@@ -1,26 +1,19 @@ 

  import React from 'react';

  import {

    makeLoadable,

-   makeLoadingComponent

  } from '../core/utils';

  

  

- const PageLoadingComponent = makeLoadingComponent(

-   "Loading...",

-   "Sorry, there was a problem loading the page."

- );

- 

- 

  let Hub = makeLoadable(

    () => import(/* webpackChunkName: "page-hub" */ '../components/HubPage'),

-   PageLoadingComponent

+   "Loading...",

+   "Sorry, there was a problem loading the page."

  );

  let Streams = makeLoadable(

    () => import(/* webpackChunkName: "page-streams" */ '../components/StreamsPage'),

-   PageLoadingComponent

+   "Loading...",

+   "Sorry, there was a problem loading the page."

  );

  

- //Hub = require('../components/HubPage').default;

- //Streams = require('../components/StreamsPage').default;

  

  export {Hub, Streams};

@@ -1,6 +1,6 @@ 

  import React from 'react';

  import fetch from 'isomorphic-fetch';

- import Loadable from 'react-loadable';

+ import universal from 'react-universal-component'

  

  

  export function backendCall(url, fetchConfig, json=false) {
@@ -38,51 +38,44 @@ 

  }

  

  

- export function makeLoadable(loader, loading) {

-   const renderDefault = (loaded, props) => {

-     const Component = loaded.default;

-     return <Component {...props} />;

-   }

-   return Loadable({

-     loader,

-     loading,

-     render: renderDefault,

-     delay: 200,

+ export function makeLoadable(loader, loadingText, errorText) {

+   //return makeSyncLoadable(loader);

+   const LoadingComponent = () => (

+     <div className="text-muted">

+       {loadingText}

+     </div>

+   );

+   const ErrorComponent = ({ error }) => (

+     <div>

+       {error ? error.message : errorText}

+     </div>

+   );

+   const AsyncComponent = universal(loader, {

+     loading: LoadingComponent,

+     error: ErrorComponent

    });

+   return AsyncComponent;

  }

  

- export function makeLoadingComponent(loadingText, errorText) {

-   class LoadingComponent extends React.Component {

+ function makeSyncLoadable(loader) {

+   class SyncLoadable extends React.Component {

+     constructor(props) {

+       super(props);

+       this.state = {component: null}

+     }

+     componentDidMount() {

+       loader().then((module) => {

+         this.setState({component: module.default});

+       }).catch((error) => {

+         console.error(error);

+       });

+     }

      render() {

-       // Handle the loading state

-       if (this.props.isLoading) {

-         if (this.props.timedOut) {

-           // In case we've timed out loading our other component.

-           return <div className="text-warning">Loader timed out!</div>;

-         } else if (this.props.pastDelay) {

-           // Display a loading screen after a set delay.

-           return (

-             <div className="text-muted">

-               {loadingText}

-             </div>

-           );

-         } else {

-           // Don't flash "Loading..." when we don't need to.

-           return null;

-         }

-       }

-       // Handle the error state

-       else if (this.props.error) {

-         return (

-           <div>

-             {errorText}

-           </div>

-         );

-       }

-       else {

+       if (!this.state.component) {

          return null;

        }

+       return <this.state.component {...this.props} />;

      }

    }

-   return LoadingComponent;

+   return SyncLoadable;

  }

@@ -8,7 +8,7 @@ 

  import WidgetChrome from '../../components/WidgetChrome';

  

  

- export default class Widget extends React.PureComponent {

+ export default class FeedWidget extends React.PureComponent {

  

    propTypes: {

      widget: PropTypes.object.isRequired,

@@ -37,7 +37,7 @@ 

  });

  

  

- export default class Widget extends React.Component {

+ export default class HalpWidget extends React.Component {

  

    propTypes: {

      widget: PropTypes.object.isRequired,

@@ -15,10 +15,10 @@ 

      "react-dom": "^15.6.1",

      "react-intl": "^2.3.0",

      "react-linkify": "^0.2.1",

-     "react-loadable": "^4.0.4",

      "react-redux": "^5.0.6",

      "react-sortable-hoc": "^0.6.7",

      "react-timeago": "^3.4.3",

+     "react-universal-component": "^2.5.5",

      "reconnecting-eventsource": "^1.0.1",

      "redux": "^3.7.2",

      "redux-thunk": "^2.2.0"

file modified
+33 -17
@@ -2,7 +2,8 @@ 

  

  import json

  

- from mock import Mock, patch

+ import pymongo

+ from mock import MagicMock, Mock, patch

  

  from hubs.app import app

  from hubs.models import User, Hub, Association
@@ -24,14 +25,33 @@ 

      def tearDown(self):

          super(FeedTest, self).tearDown()

  

+     @patch("hubs.feed.pymongo")

+     def test_connect(self, pymongo_mock):

+         client = MagicMock()

+         pymongo_mock.MongoClient.return_value = client

+         for feed_class, msgtype in self.feed_classes:

+             database = Mock()

+             database.collection_names.return_value = []

+             database.create_collection.return_value = db = object()

+             client.__getitem__.return_value = database

+             db_name = "feed|%s|testuser" % msgtype

+             feed = feed_class("testuser")

+             feed.connect()

+             pymongo_mock.MongoClient.assert_called_with(None)

+             client.__getitem__.assert_called_with("hubs")

+             database.create_collection.assert_called_with(

+                 db_name, capped=True, size=5242880, max=100

+                 )

+             self.assertIs(feed.db, db)

+ 

      def test_get(self):

          for feed_class, msgtype in self.feed_classes:

              feed = feed_class("testuser")

              feed.db = Mock()

-             feed.db.lrange.return_value = []

+             feed.db.find.return_value = []

              feed.get()

-             feed.db.lrange.assert_called_once_with(

-                 "feed|{}|testuser".format(msgtype), 0, -1)

+             feed.db.find.assert_called_once_with(

+                 sort=[('$natural', pymongo.DESCENDING)])

  

      def test_add(self):

          msg = {
@@ -43,34 +63,30 @@ 

          for feed_class, msgtype in self.feed_classes:

              feed = feed_class("testuser")

              feed.db = Mock()

-             key = "feed|{}|testuser".format(msgtype)

              feed.add(msg)

-             feed.db.lpush.assert_called_once()

-             lpush_call_args = feed.db.lpush.call_args_list[0][0]

-             self.assertEqual(lpush_call_args[0], key)

+             feed.db.insert_one.assert_called_once()

+             insert_call_args = feed.db.insert_one.call_args_list[0][0]

+             inserted_msg = json.loads(insert_call_args[0]["msg"])

              if feed_class == Notifications:

-                 self.assertIn("msg_ids", json.loads(lpush_call_args[1]))

+                 self.assertIn("msg_ids", inserted_msg)

              elif feed_class == Activity:

-                 self.assertEqual(json.loads(lpush_call_args[1]), msg)

-             feed.db.ltrim.assert_called_once_with(key, 0, 100)

+                 self.assertEqual(inserted_msg, msg)

  

      def test_length(self):

          for feed_class, msgtype in self.feed_classes:

              feed = feed_class("testuser")

              feed.db = Mock()

-             feed.db.llen.return_value = 42

+             feed.db.count.return_value = 42

              self.assertEqual(feed.length(), 42)

-             feed.db.llen.assert_called_once_with(

-                 "feed|{}|testuser".format(msgtype))

+             feed.db.count.assert_called_once_with()

  

      def test_close(self):

          for feed_class, msgtype in self.feed_classes:

              feed = feed_class("testuser")

-             redis_mock = feed.db = Mock()

+             db_mock = feed.db = Mock()

              feed.close()

-             redis_mock.close.assert_called_once()

+             db_mock.database.client.close.assert_called_once()

              self.assertIsNone(feed.db)

-             self.assertIsNone(feed.connection_pool)

  

      @patch("hubs.feed.Activity")

      def test_on_new_message(self, mock_activity):

file modified
+1
@@ -15,6 +15,7 @@ 

  html5lib==0.9999999

  munch

  psycopg2

+ pymongo

  pytz

  requests

  setuptools

The feed items are currently stored in Redis. But Redis is a memory-based DB, and the feed items may become too big with the increase of the number of Hubs users.
This commit switches the backend to MongoDB.

With MongoDB, you don't usually need to create collections explicitly - they just come to exist the moment you try to store an object in one.

This suggestion is likely overkill, but I thought I might as well suggest it anyway just for thought: you might consider using mongoengine, which is an "OM" (like an ORM, but Mongo isn't relational… ☺).

On the other hand, it looks like you are doing something extremely simple here so probably not worth it.

Looks like a reasonable replacement for Redis to me, but doesn't Redis also have the ability to persist its data to disk? I.e., couldn't you accomplish the goal by just configuring Redis to persist?

rebased onto 44b3b9f3eb7a9ddd3698133762e3fc3ea90af369

6 years ago

Thanks for reviewing, sorry for not seeing it earlier!

About the collection creation, I explicitly create it because I want to set it as "capped", i.e. as a bounded queue. This way Mongo will deal with the queue limits by itself.

Yeah, I thought about using a model library but I'm just inserting and pulling all items, so I'll just use the basic driver. Thanks for the pointer though :-)

About Redis, you can configure it to persist on disk, but the data will still be loaded entirely in RAM, and that's what I want to avoid here (using up all the RAM).

Hello @abompard!

In my experience, MongoDB does use all the RAM on your box too. It uses memory-mapped files, which tends to just eat up RAM until there is no more. It's usually best to give it a system-limited memory cap to prevent this, either by running it in a cgroup or by giving it its own VM. Either way, you don't want to run it on the same box as your application server without taking some action to ensure it doesn't consume all the resources.

Ah alright, thanks for pointing it out. I've read up a bit on the subject and ended up with this conclusion:

  • Redis can't store more data than what's available in the RAM. The only way to limit it is to auto-evict data, and we don't want that.
  • MongoDB couldn't be limited in the amount of RAM it uses (unless with cgroups), but it can store more data than the RAM. However, since version 3.2, it seems to be possible to limit the RAM and there's even a reasonable default. We will probably want to set the limit lower than 60% of the RAM, but it's not horrible.

So apparently moving to MongoDB for feed data would be useful. What do you think?

@abompard Whoah I didn't know about that new MongoDB setting. I would recommend some testing to ensure that it works as expected, but I do think you are right that it would be useful either way. If we need to, we can fall back to cgroups. You are right about how MongoDB can at least handle more data than the RAM, so I think that's enough of a reason right there.

Cool! Do you think the PR is ready to merge?

rebased onto dc3654f

6 years ago

Pull-Request has been merged by abompard

6 years ago