Matthias Nehlsen

Software, Data, and Stuff

Building a System in #Clojure Part 4 - Inter-process communication with Redis

Last week, I drew a picture of how I wanted to break apart a monolithic application and instead run different parts of the application in separate processes / separate JVMs. The idea was to have a single client for the connection to the Twitter Streaming API and the persistence of the received Tweets in ElasticSearch, plus multiple machines to serve WebSocket connections to the client. For the communication between the processes, I picked Redis Pub/Sub because its model of communication appears to suit the requirements really well. As cute as the drawing may be, I prefer code (plus drawing), so I took the previous monolith apart over the weekend and put Redis in between for communication. It worked really well and here's how.

Okay, it wasn't a total surprise to see how well it worked. After all, I started using the Component library together with core.async for exactly this reason a few weeks ago. I wanted the freedom to only ever having to put stuff on conveyor belts and not having to think about how a thing got where it needs to go, or even where it needs to go at all.

Redesigned Architecture - InterOp

Redis Pub/Sub with Carmine

I chose Pub/Sub over a queue because I wanted to fan-out messages to multiple clients. Any connected processes are only supposed to be fed with data during their uptime, with no need to store anything for when they aren't connected. For interfacing with Redis from Clojure, I then chose Peter Taoussanis's carmine client and it turned out to be a great choice.

Let's look at some code. First of all, I am using a component that provides a send channel and a receive channel. It can be reused on either side of the Pub/Sub connection (or for bidirectional communication, of course):

(defrecord Interop-Channels []
  component/Lifecycle
  (start [component] (log/info "Starting Interop Channels Component")
         (assoc component :send (chan) :receive (chan)))
  (stop  [component] (log/info "Stop Interop Channels Component")
         (assoc component :send nil :receive nil)))

Interop Channels Component

This channels component can now be wired into other components. Here's the component on the publisher side:

(defrecord Interop [conf channels]
  component/Lifecycle
  (start [component] (log/info "Starting Interop Component")
         (let [conn {:pool {} :spec {:host (:redis-host conf) :port (:redis-port conf)}}]
           (red/run-send-loop (:send channels) conn "matches")
           (assoc component :conn conn)))
  (stop  [component] (log/info "Stopping Interop Component")
         (assoc component :conn nil)))

Publishing Interop Component

Here, we are creating a configuration map and start a send loop with this configuration for the "matches" topic. Here's that loop:

(defn run-send-loop
  "loop for sending items by publishing them on a Redis pub topic"
  [send-chan conn topic]
  (go-loop [] (let [msg (<! send-chan)]
                (car/wcar conn (car/publish topic msg))
                (recur))))

Send Loop

This go-loop consumes all messages that come in on send-chan channel and publishes them on topic for the specified configuration conn.

Here's the other side of the communication with the component subscribing to the same topic. The channels component stays the same. The component itself looks a little different:

(defrecord Interop [conf channels listener]
  component/Lifecycle
  (start [component] (log/info "Starting Interop Component")
         (let [conn {:pool {} :spec {:host (:redis-host conf) :port (:redis-port conf)}}
               listener (red/subscribe-topic (:receive channels) conn "matches")]
           (assoc component :conn conn :listener listener)))
  (stop  [component] (log/info "Stopping Interop Component")
         (red/unsubscribe listener)
         (red/close listener)
         (assoc component :conn nil :listener nil)))

Subscribing Interop Component

Just like for the publisher side, there's the configuration map. Next, we subscribe to a topic and hold on to the returned listener so that we can unsubscribe from the topic and shut it down later when the component is shut down1.

(defn- msg-handler-fn
  "create handler function for messages from Redis Pub/Sub"
  [receive-chan]
  (fn [[msg-type topic payload]]
    (when (= msg-type "message")
      (put! receive-chan payload))))

(defn subscribe-topic
  "subscribe to topic, put items on specified channel"
  [receive-chan conn topic]
  (car/with-new-pubsub-listener
    (:spec conn)
    {"matches" (msg-handler-fn receive-chan)}
    (car/subscribe topic)))

(defn unsubscribe
  "unsubscribe listener from all topics"
  [listener]
  (car/with-open-listener listener (car/unsubscribe)))

(defn close
  "close listener"
  [listener]
  (car/close-listener listener))

Subscription-related Functions

Performance of Redis

Redis does a lot with very little CPU utilization. In a non-scientific test, I fired up 50 JVMs (on four machines) subscribing to the topic on which the TwitterClient publishes tweets with matched percolation queries. Then I changed the tracked term from the Twitter Streaming API to "love", which reliably maxes out the rate of tweets permitted. Typically, with this term I see around 60 to 70 tweets per second. With 50 connected processes, 3000 to 3500 tweets were delivered per second overall, yet the CPU utilization of Redis idled somewhere between 1.7% and 2.3%.

Conclusion

I'm glad I got around to the process separation last weekend. It was fun to do and gives me confidence to proceed with the design I have in mind. Very little had to change in order to break the application apart, thanks to Component and core.async. In one of my next articles, I will describe the Docker configuration for running a TwitterClient container, a couple of containers with the client-serving JVMs connecting over Redis, a container with Redis itself and another container with nginx for load-balancing, plus a few containers for running an ElasticSearch cluster. Subscribe to the newsletter or follow me on Twitter if you want to be informed once the next article is out. The code of the fully functioning application is on GitHub. Let me know if you run into any issues when trying it out.

Cheers and have a great weekend, Matthias

P.S. This series of articles is now continued in this book:


  1. The beauty of the component library is that during development, we can stop a component and restart it after reloading the code. This takes much less time than completely reloading the application. Watch Stuart Sierra's talk for more information on the component library. I also created a transcript of this talk if you prefer reading.

© 2022 Matthias Nehlsen