BirdWatch is an open-source reactive web application that consumes the Twitter Streaming API for a selection of terms. It processes those matching tweets in a server side application that is based on Play Framework. The tweets are then stored inside ElasticSearch, where they are available for complex full-text searches. On the client side, a Single Page Application based on AngularJS allows the user to perform a live search for tweets with certain keywords and to do some analysis on them, such as word count statistics, activity over time and sorting results by followers and retweet counts.
Searches are conducted in real time thanks to so called Percolation queries within ElasticSearch. Besides being used to retrieve previous matches, each search is also registered with ElasticSearch. New tweets are then matched against existing queries and delivered to the client via Server Sent Events (SSE). This will be explained in more detail in the ElasticSearch section towards the end of this article. The client side visualizations based on D3.js are then updated with those new search results.
Here is an architectural overview with a focus on the Twitter client:
This server side application component establishes the communication with Twitter and then monitors the connection with a supervisor actor. The connection may be disrupted, but the supervisor will then notice inactivity and start a new connection.
So what is an Actor?
Actors are very lightweight concurrent entities. They process messages asynchronously using an event-driven receive loop. Pattern matching against messages is a convenient way to express an actor’s behavior. They raise the abstraction level and make it much easier to write, test, understand and maintain concurrent and/or distributed systems. You focus on workflow—how the messages flow in the system—instead of low level primitives like threads, locks and socket IO.
The underlying Actor Model as a model of concurrent computation was first described in a 1973 paper by Carl Hewitt, Peter Bishop and Richard Steiger. I can recommend this video in which Carl Hewitt explains the Actor Model 39 years after its initial inception. Be warned of Erik Meijers vibrant shirt, you may want to dial down the color saturation of your screen ;-) Other than that, I found this video really helpful in getting a better understanding of the subject.
Let’s have a look at the source code. The Twitter client establishes a connection to the Twitter streaming endpoint using the Play WS API. This connection stays open indefinitely. The remote side then delivers new tweets in byte array chunks whenever a match for the specified set of topics has been tweeted. This set of topics is passed in via a query string parameter (see start() function). The URL for starting a streaming API client has the following format:
1 2 3 4 5 6
The supervisor monitors the connection through TweetReceived messages it receives for each tweet and that indicate when the last tweet was received. CheckStatus messages are sent to the supervisor at regular intervals and prompt it to check when the last tweet was received. If the time span is too long, the supervisor will treat the connection as dead and establish a new one.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
The WS client receives tweets as byte array chunk and passes them to the TweetIteratee function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
The TweetIteratee function once again uses an asynchronous WS client to insert the JSON representation of the tweet into the ElasticSearch index. It then calls the matchAndPush function with the tweet as a JsValue. It also checks if Twitter finds that you have been calling the Streaming API too often, which has happened to me during the development process, most likely due to some mistakes on my part. In that case the chunk coming in through the open connection to Twitter contained the “Easy there, Turbo…” string you will find in the code above. I found that the best way to deal with that was to implement a backoff strategy, which is initiated by sending a BackOff message to the Supervisor actor. The receive method of the actor then performs pattern matching on incoming messages. In the case of receiving a BackOff case object, it will set the lastBackOff timestamp, keeping it from reconnecting until the backOffInterval has passed (see CheckStatus in the earlier code block).
1 2 3 4 5 6 7 8 9 10 11
MatchAndPush then matches the tweet with pre-registered queries by POSTing it to the percolation query endpoint of ElasticSearch, which returns a list of the matched query IDs. The query IDs are hashes of the query string itself. That way each query will only be inserted once instead of individually for every client. The tweet is then combined with the query IDs for matching searches and pushed into the tweets channel of Concurrent.broadcast. The controller action responsible for streaming tweets to web clients will then attach an Emuratee / Iteratee chain which determines if the tweet is to be relayed to a particular client or not, depending on the hash of the search string.
Now let’s have a look at the controller of the application which serves:
- the main page
- previous tweets that match a search
- a Server Sent Events (SSE) stream with future matches for the query.
The endpoints for these actions are defined in the routes file:
1 2 3
Here is an overview of the controller actions:
Let’s start with code for the index action which serves the main page. The HTML comes from a rendered view, which in this case is almost entirely plain HTML, except that the some configuration parameters for Google Analytics are inserted here. This has the advantage that the instance specific configuration can be kept in the application.conf file.
1 2 3 4
The search action serves search results from ElasticSearch. The search itself is POSTed in JSON format and passed straight through to ElasticSearch. The WS client is used to make a request to a local instance of ElasticSearch. The future response to this request is then mapped into the response of the search action. The search controller action is really only a proxy for development purposes. My deployed instance of the application has nginx running in front of it, which for this route directly talks to ElasticSearch instead of keeping the garbage collection mechanism of the JVM busy with unprocessed data. We will have a look at nginx configuration further down in this article.
1 2 3 4 5 6 7 8 9
Now we’ll get to the most complicated part: serving the live stream for a search in the tweetFeed action. This controller makes use of the Iteratee library from Play Framework. I wrote an article about Iteratees a while back. I haven’t read it in a while, it may need some revision but you might still find it useful. It’s rather long, but then this article isn’t exactly what you would call short either.
The client establishes a connection to the streaming endpoint served by the tweetFeed action, which then delivers the results - not all at once, but in chunks whenever new data is available for this request. This data originates from the Enumerator from the Concurrent.broadcast object (provided by Play Framework) which we have seen above. Iteratees can attach to this Enumerator. In essence, Iteratees are functions that define what to do with each new piece of information. Enumeratees are transformer functions that can be placed in between the Enumerator as the source and the Iteratee as the final sink of this information. As to the streaming action, the Ok.feed itself represents the Iteratee, doing nothing more than delivering each chunk to the connected client. Iteratees can also hold an accumulator for the current state of an ongoing computation, in which case the individual Iteratee becomes the representation of a step of an ongoing computation, but that feature of Iteratees is not used in this use case.
Enumeratees are then placed between the source and the sink, forming a processing chain. This is the most interesting part of the code:
1 2 3 4 5 6 7 8
Inside the action we establish a connection to ElasticSearch by posting the query as a percolation query (see ElasticSearch section below). The ID of the query is determined by hashing the entire query using SHA-256. That way repeated queries always have the same ID within ElasticSearch. Once that request is complete, we respond to the client with a feed that contains the following processing chain:
- Tweets with matched query IDs originate from the TwitterClient.jsonTweetsOut Enumerator.
- The matchesFilter Enumeratee checks if the matches set contains the query hash. If not, no further actions will take place.
- A buffer ensures that the application is not held up if the sink is too slow, for example, when a client connection suffers from network congestion. Tweets will be dropped when the buffer is full, which won’t be much of an issue because if your connection is so slow, you probably don’t want to use this application in the first place.
- Matches are converted to JSON
- The connection uptime is monitored. In this Enumeratee the duration of the connection will be logged.
- The data is converted to comply with the Server Sent Events (SSE) specifications.
Below, you’ll find the entire code related to the streaming endpoint. The Enumeratees are adapters between the Enumerator from the TwitterClient where the Tweets originate and the chunked response we pass back to the client. They can either transform elements passing through the chain from one type to another, filter them based on a predicate function or buffer them.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
AngularJS is a modern approach to Single Page Applications. It is said to teach the browser new tricks. It also brings the fun back to developing Single Page applications. Seriously. So what is so special about it? It’s approach is a declarative one. This means that we declare how UI elements are supposed to look like depending on the application state, but we do not have to concern ourselves with how exactly this is achieved. This might not sound like much at first, but it really does make all the difference. No more direct DOM manipulation with jQuery or the like. Instead we create new elements as so called directives that know how to lay themselves out on the page. These elements are then used in the page markup, as if they existed all along in HTML. We will look at that in more detail for the TweetCard directive, which shows a simple custom directive.
Here’s the overall architecture of the AngularJS application:
There are singleton services in the application that only get instantiated once for the lifecycle of the application. First there is the tweets service which takes care of the communication with the server side. It pre-loads existing tweets and also establishes a Server Sent Events (SSE) connection for future search results. This service allows the registration of a callback function which is called with search results, no matter if from previous tweets or from the SSE connection.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
The only other components that knows anything about this service is controller which provides the callback function that specifies what needs to happen with each new tweet / array of tweets. This allows for a proper decoupling of the services.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
The controller provides the $scope for the associated view, which is written as HTML, with some custom AngularJS code. The $scope variables are fairly straighforward, AngularJS two-way binds items in the view to the $scope so that when the value in either changes, the other updates as well. An example of this two-way data binding is the search text field. The binding to $scope.searchText is defined in the view:
1 2 3 4 5 6 7 8 9 10 11 12
Now with this binding in place, modifying the content of the search field mutates $scope.searchText and vice versa. Changing the $scope.searchText programmatically would update the content of the search field as well. There is no need to concern ourselves with complicated ways of manipulating the DOM directly. This is probably the main reason why code in AngularJS tends to be much shorter than in more traditional approaches.
We briefly talked about directives above. Let’s have a look at one simple directive to get a better understanding, the TweetCard directive. A directive can either be an entirely new element or apply to a class. In this case we are using the class approach. Any element on the page that has class of tweetCard will be rendered by AngularJS according to the code in the directive. In this particular case the code is very simple:
1 2 3 4 5 6 7 8 9
All that happens here is a $scope variable named tweet is assigned, which becomes available for two-way data binding inside the template code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
Now whenever the underlying data representation in the model changes, the rendering of the tweetCard changes as well thanks to two-way data binding. The more complicated markup of the tweetCard is encapsulated in the template, using the directive from the view becomes simple and concise:
1 2 3 4 5 6
Above a div of class tweetCard is declared with data-ng-repeat, which means that the element is repeated for each element in the result of the cf.tweetPage function. For each individual item (for tweet in cf.tweetPage), data-tweet is assigned with the item. It could also have been data-ng-repeat=”item in …” data-tweet=”item”, the names correspond here.
Here’s how the $scope of an individual tweetCard element looks like:
Above we can see that the $scope of the TweetCard contains the previously assigned tweet object which becomes available to the template code for two-way data binding. The two-way data binding can be seen in action here when sorting the tweets by retweet count. For popular tweets that get retweeted a lot we can grab some popcorn and watch the visual representation of the tweet change in the browser based on data model changes.
Visualizations using D3.js
What we see above on the left side is that rects (rectangles) get rendered depending on the data that is provided to the D3 code on the right side. This is why the library is said to be data-driven, the data drives what gets rendered on the page.
Data Analysis using crossfilter.js
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
A simple example is the followersDim dimension. The function provided does nothing but return the number of followers of the author of the Tweet. The dimension then provides access to the data set sorted by the followers count.
Dimensions can also be grouped, as can be seen with the different timing dimensions. In order to get all Tweets for a particular time span of say the hour between 4pm and 5pm of a particular day, the creation time for each Tweet is rounded down to the nearest hour and then the dimension is grouped by the hours. This powers the ‘Activity by Time Unit’ chart in which the number of Tweets for the current search is broken down into time units of varying length, depending on the total time span.
ElasticSearch is a distributed open source search engine. The more obvious feature is that it provides full-text search over our entire dataset, which by the time of the writing of this article consists of about ten million tweets.
The less obvious but very useful feature is that of the so called Percolation Queries. These are kind of reverse searches that allow the registration of an observing real-time search in the percolation index. Each new tweet is then presented to the percolation endpoint of ElasticSearch to determine which of the registered searches match on the new item. This allows us to notify the web clients on each new match for the current search (also see the controller description on the server side above). The IDs of the queries could be generated randomly. I have chosen a different approach here and use SHA-256 hashes of the search text instead. That way each unique query (e.g. “shutdown beer”) only ever gets inserted (and matched against) once.
In a production environment it might make sense to not expose applications to the outside world directly but instead have a reverse proxy in between which responds to all requests and routes the requests to the proper IP-address / port internally.
This can be useful for the following reasons:
- Load Balancing. The reverse proxy can talk to multiple server backends and distribute the load among them.
- Static file serving. Some implementations can serve static files much faster with less overhead than a solution based on the JVM.
- SSL encryption. Not all application servers support SSL themselved, but all we need then is a proxy that does.
- Using multiple server backend that run on different ports.
- Serving multiple domain names.
I am using nginx as a reverse proxy for two instances of the application on the same server, one for tech-related stuff birdwatch.matthiasnehlsen.com and the other for things related to US politics beltway.matthiasnehlsen.com. That works really well, I have found nginx to be rock solid and very fast.
Here is the configuration file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
Note the two server blocks in the configuration above for the two separate domains, each of which maps to one backend Play application. It would also be possible to have multiple backends for the same domain name and then let nginx balance the load between the multiple backends. There is only one shared ElasticSearch backend for the two domains, but /tweets/search maps to different indices depending on the domain name. In a development configuration this endpoint would be handled directly by the Play application, but for production I let nginx handle this transparently.
Okay, this concludes the explanation of my BirdWatch toy project. Hope you enjoyed this rather long article. Please let me know if there is any need for clarification.