Event sourcing with RabbitMQ in Rails


How usually you implement communication layer between your microservices?

Let’s check, how it can be done via message broker.

In our example - RabbitMQ.

As described in RabbitMQ site:

“RabbitMQ is the most widely deployed open source message broker”.

Sounds great, yes?

In this article I prefer to skip installation of RabbitMQ on your machine, you can find a lot of tutorials about it with Google.

Today I want to implement simple Admin Dashboard panel, which will monitor last 10 published jobs.

So, we should have two Rails apps - our Remootee Job Board app and Dashboard application.

How it works?

  • Job is created in Remoote application and will go to Producer (JobPublisher)
  • Job will then send a message to RabbitMQ exchange
  • RabbitMQ exchange will put the message into a queue
  • Then the Consumer, connected to this queue, takes the message and update Dashboard’s Redis-based datastore.

Let’s add bunny gem.

gem "bunny"

And implement our JobPublisher:

class JobPublisher
  def self.publish(exchange, message = {})
    x = channel.fanout("remoote.#{exchange}")
    x.publish(message.to_json)
  end

  def self.channel
    @channel ||= connection.create_channel
  end

  def self.connection
    @connection ||= Bunny.new.tap do |c|
      c.start
    end
  end
end

Great, now let’s call our JobPublisher in our controller:

  def create
    @job = Job.new(job_params)
    if @job.save
      # simple call our JobPublisher
      JobPublisher.publish("jobs", @job.attributes)
      redirect_to @job, notice: 'Job was successfully created.'
    else
      render :new
    end
  end

Let’s go to our second app - Job Dashboard Admin panel:

gem 'redis-rails'
gem 'redis-namespace'
gem 'sneakers'

Interesting thing here - Sneakers gem , it’s super fast background processing framework for Ruby and RabbitMQ.

Then setup Redis:

$redis = Redis::Namespace.new("dashboard:#{Rails.env}", redis: Redis.new)

And add line into Rakefile:

require_relative 'config/application'
require 'sneakers/tasks'
Rails.application.load_tasks

So, now we need to fetch recently added jobs into our dashboard from Redis.

class RecentJobs
  KEY = "recent_jobs"
  STORE_LIMIT = 10

  def self.list(limit = STORE_LIMIT)
    $redis.lrange(KEY, 0, limit-1).map do |raw_job|
      JSON.parse(raw_job).with_indifferent_access
    end
  end

  def self.push(raw_job)
    $redis.lpush(KEY, raw_job)
    $redis.ltrim(KEY, 0, STORE_LIMIT-1)
  end
end

Let’s get our data from Redis Store in HomeController:

class HomeController < ApplicationController
  def index
    @jobs = RecentJobs.list
  end
end

Now we need JobsWorker, which will push job JSON into Redis via RecentJobs:

class JobsWorker
  include Sneakers::Worker
  from_queue "dashboard.jobs", env: nil

  def work(raw_job)
    RecentJobs.push(raw_job)
    ack!
  end
end

Then we need to start our JobsWorker:

WORKERS=JobsWorker rake sneakers:run

And the last step. We need to connect our remoote.jobs exchange with dashboard.jobs queue.

We can use simple rake task for it.

namespace :rabbitmq do
  desc "Setup routing"
  task :setup do
    require "bunny"
    conn = Bunny.new
    conn.start
    ch = conn.create_channel
    x = ch.fanout("remoote.jobs")
    queue = ch.queue("dashboard.jobs", durable: true)
    queue.bind("remoote.jobs")
    conn.close
  end
end

Let’s start our applications, and take a look, how it works:

First create new job in Remoote application:

rm1

Then go into Dashboard application and you see last added jobs:

rm2