Matthias Nehlsen

Software, Data and Stuff

ReactiveMongo 0.9 and Lossless Persistence

Initially I parsed the Tweets in the BirdWatch application into instances of a Tweet case class upon ingestion and then used that case class representation throughout, including for database persistence. Then I realized that that was actually not a good idea. Using a case class for passing around information in the application is very convenient and useful. But for the persistence, I argue that we cannot afford to be opinionated about what to keep and what to throw away. I fixed this together with the planned migration to ReactiveMongo version 0.9 in the latest commits, storing each observable fact coming from the Twitter Streaming API in its entirety.

Any data model will almost invariably be wrong in the future as we cannot predict what we will want to analyze later. We can always change the data model at a later point and from then on store a different interpretation of the observable fact, but then we would not have complete historic information when we want to test our hypotheses on retrospective data. The solution for this is to store the Tweets in their complete JSON representation. MongoDB is a great choice for this as it allows indexing our data while leaving the JSON structure intact. We get the best of two worlds. With this lossless persistence we can always reconstruct the observable fact from the database while at the same time being able to quickly search through a potentially large dataset.

I also wanted to upgrade ReactiveMongo in order to fix a previous problem with Killcursors. Version 0.9 entails some changes in the API, so it was a good idea to tackle the upgrade and the Tweet persistence layer together. Let’s go through some of the changes:

Mongo Connection in Version 0.8Mongo.scala
1
2
3
4
5
/** Mongo connection object */
object Mongo {
  val connection = MongoConnection(List("localhost:27017"))
  val db = connection("BirdWatch")
}
Mongo Connection in Version 0.9Mongo.scala
1
2
3
4
5
6
/** Mongo connection object */
object Mongo {
  val driver = new MongoDriver
  val connection = driver.connection(List("localhost:27017"))
  val db = connection("BirdWatch")
}

ReactiveMongo now uses an instance of the MongoDriver class and its connection method. The MongoConnection class still exists, but I couldn’t get it to work for some reason.

I have moved the Tweet collection and basic query and insert methods into a Tweet companion object, with the intention of turning this into a lightweight DAO (Data Access Object) for Tweets:

Tweet Companion ObjectTweet.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/** Data Access Object for Tweets*/
object Tweet {
  def rawTweets: JSONCollection = Mongo.db.collection[JSONCollection]("rawTweets")
  def insertJson(json: JsValue) = rawTweets.insert[JsValue](json)

  /** get collection size from MongoDB (fast) */
  def count: Future[Int] = Mongo.db.command(Count("rawTweets"))

  /** Query latest tweets (lazily evaluated stream, result could be of arbitrary size) */
  def jsonLatestN(n: Int): Future[List[JsObject]] = {
    val cursor: Cursor[JsObject] = rawTweets
      .find(Json.obj("text" -> Json.obj("$exists" -> true)))
      .sort(Json.obj("_id" -> -1))
      .cursor[JsObject]
    cursor.toList(n)
  }
}

Storing the JSON from Twitter not only prevents us from throwing away data we might need in the future, it is also much simpler than having to deal with implicit BSONReader and BSONWriter objects as was previously the case:

BSON implicits (with 0.8)TweetImplicits.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  implicit object TweetBSONWriter extends BSONWriter[Tweet] {
    def toBSON(t: Tweet) = {
      BSONDocument(
        "_id" -> t.id.getOrElse(BSONObjectID.generate),
        "tweet_id" -> BSONLong(t.tweet_id),
        "screen_name" -> BSONString(t.screen_name),
        "text" -> BSONString(t.text),
        "wordCount" -> BSONInteger(t.wordCount),
        "charCount" -> BSONInteger(t.charCount),
        "location" -> BSONString(t.location),
        "profile_image_url" -> BSONString(t.profile_image_url),
        "geo" -> BSONString(t.geo.getOrElse("")),
        "created_at" -> BSONDateTime(t.created_at.getMillis)
      )
    }
  }

  implicit object TweetBSONReader extends BSONReader[Tweet] {
    def fromBSON(document: BSONDocument): Tweet = {
      val doc = document.toTraversable
      Tweet(
        doc.getAs[BSONLong]("tweet_id").get.value,
        doc.getAs[BSONString]("screen_name").get.value,
        doc.getAs[BSONString]("text").get.value,
        doc.getAs[BSONInteger]("wordCount").get.value,
        doc.getAs[BSONInteger]("charCount").get.value,
        doc.getAs[BSONString]("location").get.value,
        doc.getAs[BSONString]("profile_image_url").get.value,
        None,
        new DateTime(doc.getAs[BSONDateTime]("created_at").get.value),
        doc.getAs[BSONObjectID]("_id")
      )
    }
  }

Instead we just parse a JSON string from Twitter and insert the parsed JsValue into the JSONCollection:

Inserting into databaseTwitterClient.scala
1
2
3
4
  val json = Json.parse(chunkString)

  /** persist any valid JSON from Twitter Streaming API */
  Tweet.insertJson(json)

This is really all there is to storing JSON into MongoDB now. I don’t have to worry about additional fields or other changes in the Twitter Streaming API. If it is valid JSON, it will find its way into the database. Major changes to the API might break parsing into Tweets, but they would not break database persistence.

Error and status messages from Twitter also come as JSON, so they are stored as well:

JavaScript query in MongoDB shell
1
2
3
{ "_id" : ObjectId("…"), "disconnect" :
  { "code" : 7, "stream_name" : "_MNehlsen-statuses237381",
    "reason" : "admin logout" } }

Querying is more concise than before, making use of Json.obj instead of BSONDocuments:

OLD: Query for latest TweetsTwitter.scala
1
2
3
4
5
6
7
8
def latestTweetQuery: Future[List[Tweet]] = {
  val query = QueryBuilder().query(BSONDocument("created_at" ->
    BSONDocument("$lte" -> BSONDateTime(DateTime.now.getMillis))))
    .sort("created_at" -> SortOrder.Descending)

  val cursor = Mongo.tweets.find(query)
  cursor.toList
}
NEW: Query for latest TweetsTweet.scala
1
2
3
4
5
6
7
def jsonLatestN(n: Int): Future[List[JsObject]] = {
  val cursor: Cursor[JsObject] = rawTweets
    .find(Json.obj("text" -> Json.obj("$exists" -> true)))
    .sort(Json.obj("_id" -> -1))
    .cursor[JsObject]
  cursor.toList(n)
}

This looks much neater and is also close to the syntax in the MongoDB JavaScript shell:

JavaScript query in MongoDB shell
1
2
db.rawTweets.find( { "text" : { "$exists" : true} } )
  .sort( { "_id": -1 } )

Curly braces get replaced by Json.obj() and the colon gets replaced by “->”. Other than that, the syntax is very close. Note the “$exists” part. This limits the results to only Tweets (and potentially error and status messages that have a “text” field, but I have not encountered those).

The usage above with generating a List from the cursor works fine for small n, but for larger results sets (say hundreds of thousands of items) it would be a bad idea to build the list in memory first. Luckily ReactiveMongo allows us to stream the results. That itself is not new, but since version 0.9 we can limit the number of results, making this much more useful for a latestN scenario:

Enumerating cursor of TweetsTweet.scala
1
2
3
4
5
6
7
8
9
10
/** Enumerate latest Tweets (descending order) into specified Iteratee.
 * @param n number of results to enumerate over
 **/
def enumJsonLatestN(n: Int): Enumerator[JsObject] = {
  val cursor: Cursor[JsObject] = rawTweets
    .find(Json.obj("text" -> Json.obj("$exists" -> true)))
    .sort(Json.obj("_id" -> -1))
    .cursor[JsObject]
  cursor.enumerate(n)
}

With this we create an Enumerator of JsObjects that streams the results into an Iteratee. The usage of this is simple once we understand what this pattern means. Check out my previous Iteratee article, hope it helps a little bit.

This allows us to stream results into an Iteratee that will do whatever we need, in this case just doing a simple foreach:

Attaching Iteratee to Enumerator
1
2
3
4
5
6
7
8
val dbTweetIteratee = Iteratee.foreach[JsObject] {
  json => TweetReads.reads(json) match {
    case JsSuccess(t: Tweet, _) =>
      tweetChannel.push(WordCount.wordsChars(t)) // word and char count for each t
    case JsError(msg) => println(json)
  }
}
Tweet.enumJsonLatestN(500)(dbTweetIteratee)

I currently do not enumerate the results into an Iteratee because the Tweets would appear in the wrong order in the UI and I cannot easily reverse the direction in which the Tweets are enumerated without an auto-incrementing counter in MongoDB to determine from where to start enumerating in ascending order (from position [collectionsize - n]). But this is more a problem of the UI, the next versions will certainly make use of this pattern.

The only thing I was still missing is an easy way to get the size of a collection. In the shell we would write:

JavaScript query in MongoDB shell
1
db.rawTweets.find( { "text" : { "$exists" : true} } ).count()

Turns out that in ReactiveMongo, we can use the Count command for this, which returns a Future[Int] with the result (see Tweet.scala above). This allows us to do something upon return of the collection size in a non-blocking way:

Using Count CommandTwitterClient.scala
1
  Tweet.count.map(c => println("Tweets: " + c))

Great stuff, I really like ReactiveMongo. The documentation has also gotten a lot better in 0.9, compared to previous versions. Nonetheless it takes some source code reading to find some of the good stuff. I’d be more than to happy help out here and contribute to the project documentation.

-Matthias

« Iteratee: can I have that in a sentence? Server Sent Events vs. WebSockets »

Comments