Message Bus In Action

It has been a long time since my last post, but I do have a partial excuse: I started a new job at pebble {code} the day after posting it. My two previous posts on the message bus have been cross posted to the company blog: head over there for an eclectic mix of articles. If you like what you see, why not consider applying for a job?

In this post I want to summarise the advantages of the message bus approach, and also look at a potential limitation. The benefits as I see them are:

  1. (Very) loose coupling between services.
  2. Encourages narrow interfaces (what is narrower than an interface with one method e.g. IRequestHandler?)
  3. Provides not only Event Aggregator functionality, but also asynchronous Request-Response.
  4. The ability to enforce sequential message processing can make it simpler to develop stateful services.

If all the request processing is sequential then we have something akin to the Actor Model but without the concept of identity: a request can only have a single registered handler, so it is meaningless to talk about the handler’s identity. In general I think this is a good thing – having to know about the identity increases the coupling between the services.

However, there are situations where we really do need to distinguish message destinations based on the identity of the receiver. Is this a situation that we can’t handle? Or are there workarounds using the message bus?

The example I’ve chosen to use is the simplest two person game I could think of – Noughts and Crosses. The design consists of 4 services: a Game service (responsible for controlling the game and ensuring legal play), two instances of a Player service (P1 and P2) and a UI service. In all of the sequence diagrams that follow the messages are assumed to go via the bus (not shown), unless explicitly stated otherwise. Async/sync requests are shown with a solid line and an open/closed arrowhead; events are shown with a dashed line and an open arrowhead. First the players have to register to play the game:

Players send a RegistrationRequest to the Game (N.B. as long as the Game has been registered as a handler for RegistrationRequest using MessageProcessingMode.Sequential, we don’t need to worry about race conditions during registration). The Game returns a RegistrationResult indicating whether registration has been successful. Once two players have registered the game can begin. The idea is that the Game should be able to send a PlayerMoveRequest and receive a PlayerMoveResponse containing the player’s chosen move. However, both players are capable of handling a PlayerMoveRequest, and they can’t both register as handlers for this request. The options seem to be:

Unique Request Types

Use different request types for Player1 and Player2:

This is ugly because of the large amounts of duplication (P1MoveRequest = P2MoveRequest; Player service needs to implement IRequestHandler<P1MoveRequest, P1MoveResponse> and IRequestHandler<P2MoveRequest, P2MoveResponse>).

Two Message Bus Instances

Introduce two message bus instances, with communication between the game and each player happening on separate buses: Whilst this initially felt more natural – we are simply saying that the game has a dedicated communication channel with each player – it gets complicated once we consider the UI. Does each player use its dedicated bus to communicate with the UI? Then how does the Game communicate with the UI? Via a third bus? Or does it arbitrarily choose one of the two existing buses?

Player Message Router

Introduce a PlayerMessageRouter that receives the PlayerMoveRequest and routes it (using information present within the request) to the appropriate Player. This routing does not go via the bus – it will probably just be a direct method call: This initially felt like an unnatural abstraction – there is no concept of a player router in the Noughts and Crosses domain – introduced to get around a limitation in the message bus. However, on closer inspection the Game/Player/UI don’t need to know anything about the router, which can be implemented in a generic manner and just becomes another infrastructure component that we can choose to use if appropriate (like the load balancers):

namespace ItWorksOnMyMachine.MicroSoa
{
    using System;
    using System.Diagnostics.Contracts;
    using System.Linq;
    using System.Threading.Tasks;

    public class RequestRouter<TRequest, TResponse, THandler> : IRequestHandler<TRequest, TResponse> where THandler : IRequestHandler<TRequest, TResponse>
    {
        private readonly IMessageDispatcher messageDispatcher;

        private readonly Func<TRequest, THandler, bool> isCorrectHandlerForRequest;

        private readonly IMessageQueue[] requestQueues;

        public RequestRouter(IMessageDispatcher messageDispatcher, Func<TRequest, THandler, bool> isCorrectHandlerForRequest, params Tuple<THandler, MessageProcessingMode>[] handlerProcessingModePairs)
        {
            Contract.Requires<ArgumentNullException>(isCorrectHandlerForRequest != null);
            Contract.Requires<ArgumentOutOfRangeException>(handlerProcessingModePairs.Count() >= 2);

            this.messageDispatcher = messageDispatcher;
            this.isCorrectHandlerForRequest = isCorrectHandlerForRequest;
            this.requestQueues = handlerProcessingModePairs.Select(CreateQueue).ToArray();
        }

        private static IMessageQueue CreateQueue(Tuple<THandler, MessageProcessingMode> handlerProcessingModePair)
        {
            var handler = handlerProcessingModePair.Item1;
            var messageProcessingMode = handlerProcessingModePair.Item2;

            return MessageBusFactory.MessageQueueCache.GetOrAdd(handler, messageProcessingMode);
        }

        public Task<TResponse> HandleRequest(TRequest request)
        {
            var targetQueue = requestQueues.Single(queue => isCorrectHandlerForRequest(request, (THandler)queue.Service));

            return targetQueue.Send<TRequest, TResponse>(request, messageDispatcher);
        }
    }
}

The RequestRouter is also a good example of the first three of the SOLID design principles:

  • Single Responsibility: it simply forwards requests to the correct recipient. It does not even understand how the correct recipient is chosen, it simply knows that the provided delegate will do the work for it.
  • Open/Closed: we’ve introduced routed messages into our system by adding a new component rather than modifying existing ones.
  • Liskov Substitution: it implements IRequestHandler<TRequest, TResponse> and hence is a drop-in replacement for the original request handler.

Bringing It All Together

I’ve implemented a solution using the “Player Message Router” approach. I’ve assumed that both players are humans, and they forward the move requests to a UI, of which there is both a console version and a WinForms version. The Game, HumanPlayer, ConsoleUI and WinFormUI classes are quick and dirty implementations (though take a peek at HumanPlayer.HandleRequest and WinFormUI.GetMoveFromButtonClick for some Async/Await and Rx tastiness). The real interest is in how they are wired up. The code below is for wiring up the ConsoleUI version:

class Program
{
    static void Main(string[] args)
    {
        var bus = MessageBusFactory.Create();

        var game = new Game(bus);

        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);
        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);

        var player1 = new HumanPlayer("Player 1", bus);
        var player2 = new HumanPlayer("Player 2", bus);

        var playerRouter = new RequestRouter(
                bus,
                (request, player) => player.Name == request.Player,
                Tuple.Create(player1, MessageProcessingMode.Sequential),
                Tuple.Create(player2, MessageProcessingMode.Sequential));

        bus.RegisterRequestHandler(playerRouter, MessageProcessingMode.Sequential);

        var userInterface = new ConsoleUI(bus);
        bus.RegisterRequestHandler(userInterface, MessageProcessingMode.Sequential);
        bus.RegisterEventHandler(userInterface, MessageProcessingMode.Sequential);

        player1.RegisterForGame();
        player2.RegisterForGame();

        userInterface.Run();
    }
}

What I like about this is that all of the messages that flow through the system are clearly laid out in the application root. This provides a nice high level overview of the system. Also notice that the Game and the Players don’t know anything about the RequestRouter that has been introduced to route messages to the correct player. Now, if we compare this to the setup for the WinFormUI:

static class Program
{
    [STAThread]
    static void Main()
    {
        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);

        var bus = MessageBusFactory.Create();

        var game = new Game(bus);

        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);
        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);

        var player1 = new HumanPlayer("Player 1", bus);
        var player2 = new HumanPlayer("Player 2", bus);

        var playerRouter = new RequestRouter(
                bus,
                (request, player) => player.Name == request.Player,
                Tuple.Create(player1, MessageProcessingMode.Sequential),
                Tuple.Create(player2, MessageProcessingMode.Sequential));

        bus.RegisterRequestHandler(playerRouter, MessageProcessingMode.Sequential);

        var userInterface = new WinFormUI(bus);
        bus.RegisterRequestHandler(userInterface, MessageProcessingMode.Sequential);
        bus.RegisterEventHandler(userInterface, MessageProcessingMode.Sequential);

        player1.RegisterForGame();
        player2.RegisterForGame();

        Application.Run(userInterface);
    }
}

The only differences from the previous ConsoleUI example are the call to create the UI instance and the WinForms-specific calls to the Application class. Everything else is identical.

Summary

The lack of routed messages initially seems like a big limitation of the message bus approach. However, following the SOA analogy that started this series of posts, we can simply introduce a router to add the missing functionality in a way that respects the O in SOLID.

The Nought and Crosses implementation then shows the router and the rest of the message bus in action in a real, albeit small, example.

In-Process Message Bus

In my last post I discussed the idea of designing single process applications in a way that mimics a SOA i.e. the application is decomposed into services that communicate solely via asynchronous messaging. As promised, here I’ll discuss the key concepts in the proof-of-concept implementation. If you want all the details then take a look at the source, which is a mixture of F# and C# code.

IMessageBus

Firstly, the message bus interface has been split into an IMessageDispatcher and an IMessageBus:

type IMessageDispatcher =
    abstract member Publish : 'TEvent -> unit
    abstract member Send : 'TRequest -> Task<'TResponse>

type MessageProcessingMode = 
    | Sequential = 0
    | Concurrent = 1

type IMessageBus
    inherit IMessageDispatcher
    abstract member RegisterEventHandler: IEventHandler<'TEvent> -> MessageProcessingMode -> unit
    abstract member RegisterRequestHandler: IRequestHandler<'TRequest, 'TResponse> -> MessageProcessingMode -> unit

IMessageDispatcher is only concerned with dispatching messages to the correct recipients; IMessageBus augments this with the ability to register message handlers. The RegisterEventHandler and RegisterRequestHandler methods now take a MessageProcessingMode that specifies whether the handler can process messages concurrently i.e. whether it is thread-safe.

Services will typically take a dependency on IMessageDispatcher, as they just need to publish/send messages rather than register new handlers.

IMessageQueue

Next we need to wrap each service in a message queue that can forward messages to the correct handler implementation whilst ensuring that the MessageProcessingMode is enforced:

type IMessageQueue = 
    abstract member Publish : 'TEvent -> IMessageDispatcher -> unit
    abstract member Send : 'TRequest -> IMessageDispatcher -> Task<'TResponse>
    abstract member Service : obj

The reason for IMessageDispatcher appearing in the method signatures is so that the queue can publish an ExceptionMessage if the handler throws an exception. This makes it possible for a global exception handler to deal with all exceptions.

IMessageQueue Implementations

There are two implementations of IMessageQueueMessageQueue and NullMessageQueue. MessageQueue, as expected, enforces sequential processing of messages via an F# agent; NullMessageQueue is a simple pass-through to the underlying handler (i.e. it allows concurrent message processing) that can be used as a performance optimisation in the case where the handler is known to be thread-safe (i.e. the handler was registered using MessageProcessingMode.Concurrent).

MessageQueue

type internal AgentMessage =
    | AsyncAction of (unit-> Async<unit>)

type internal AgentResponse<'T> = 
    | Response of 'T
    | Error of exn
                
type MessageQueue(service : obj) =
    let agent = 
        Agent.Start(fun inbox ->
            let rec loop () =
                async { 
                    let! AsyncAction(forwardMessageToService) = inbox.Receive()
                    do! forwardMessageToService()
                    return! loop() 
                }
            loop())
    
    let publishExceptionAndReply e (bus : IMessageDispatcher) (replyChannel : AsyncReplyChannel<AgentResponse<'TOut>>) = bus.Publish (ExceptionMessage(e))
                                                                                                                         replyChannel.Reply (Error(e))

    let sendRequest (request : 'TIn) (replyChannel : AsyncReplyChannel<AgentResponse<'TOut>>) bus (service : obj) =
        async { 
            let handler = service :?> IRequestHandler<'TIn, 'TOut>
            try
                let! response = Async.AwaitTask (handler.HandleRequest request)
                replyChannel.Reply (Response(response))
            with
            | :? System.AggregateException as ae -> publishExceptionAndReply ae.InnerException bus replyChannel
            | e -> publishExceptionAndReply e bus replyChannel
        }
    
    let publishEvent (event : 'TIn) (bus : IMessageDispatcher) (service : obj) =
        async { 
            let handler = service :?> IEventHandler<'TIn>
            try
                handler.HandleEvent event
            with 
            | e -> bus.Publish (ExceptionMessage(e))
        }
    
    interface IMessageQueue with
        member x.Service = service

        member x.Send<'TIn, 'TOut> (request : 'TIn) bus = 
            let resp  = agent.PostAndAsyncReply<AgentResponse<'TOut>>(fun replyChannel -> AsyncAction(sendRequest request replyChannel bus service))
            Async.StartAsTask (
                async { 
                    let! res = resp
                    match res with
                    | Response r -> return r
                    | Error e -> return (raise e)
                }
            )

        member x.Publish<'TIn> (event : 'TIn) bus =
            agent.Post (AsyncAction(publishEvent event bus service))

MessageQueue‘s agent receives messages of type AgentMessage and replies with messages of type AgentResponse. AgentMessage is just an async computation that the agent can execute. The exact computation is specified by the sendRequest and publishEvent methods, but they both forward the original message to the correct message handler implementation and deal with responses/exceptions. AgentResponse contains either the successful response or the failure exception, and is a useful way to get around MailboxProcessor’s strange exception handling behaviour.

NullMessageQueue

type NullMessageQueue(service : obj) =
    let publishExceptionAndReply (e : exn) (bus : IMessageDispatcher) = bus.Publish (ExceptionMessage(e))
                                                                        raise e

    interface IMessageQueue with
        member x.Service = service

        member x.Send<'TIn, 'TOut> request bus =
            let handler = service :?> IRequestHandler<'TIn, 'TOut>
            let handleAsync = async { 
                                  try
                                      let! response = Async.AwaitTask (handler.HandleRequest request)
                                      return response
                                  with
                                  | :? System.AggregateException as ae -> return (publishExceptionAndReply ae.InnerException bus)
                                  | e -> return (publishExceptionAndReply e bus)
                              }
            Async.StartAsTask handleAsync
            
        member x.Publish<'TIn> event bus =
            let handler = service :?> IEventHandler<'TIn>
            let handleAsync = async { 
                                  try 
                                      handler.HandleEvent event 
                                  with
                                  | e -> bus.Publish (ExceptionMessage(e))
                              }
            Async.Start handleAsync

NullMessageQueue is just directly forwarding messages to handlers without involve an F# agent. However, to ensure that exceptions are handled in the same manner as MessageQueue, the invocations need to be wrapped in async computations.

Architecture

So, the updated diagram looks like this:

When handlers are registered with the message bus, the MessageToQueuesMap is told to add the required message queue to the collection of queues that process the given message type. Then, when the message bus receives a message it asks the map for the collection of queues that the message should be forwarded to. Events can be forwarded to multiple handlers (e.g. ServiceA and ServiceB both implement IEventHandler<EventA>),
but requests must only have a single handler.

Note that each service can be wrapped in at most two IMessageQueue instances – one MessageQueue and one NullMessageQueue. All messages that need to be processed sequentially go through the MessageQueue, and all messages that can be processed concurrently go through the NullMessageQueue. The MessageQueueCache (not shown) is responsible for providing the correct queue(s) for each service, and is used by the MessageToQueuesMap during handler registration.

Load Balancers

If a message handler cannot handle concurrent messages, it may by swamped by high message volumes whilst not utilising the available cores. As mentioned in the previous post, this can be solved by using a load balancer. There are two versions, one for handling events and one for requests. Both make use of Tomas Petricek’s BlockingQueueAgent class, which enables Producer/Consumer scenarios.

Rather than listing the code here, I’ll point you to the EventLoadBalancer and RequestLoadBalancer.

Wrapping Up

This post is already long enough and I’ve delayed publishing it for too long, so I’m going to defer further discussion for the next post. There I’ll discuss the pros and cons of the current implementation and also compare the concept to a pure Actor Model.

Honey, I Shrunk the SOA

I’ve recently been thinking what would happen if we applied the principles of SOA (done properly, using asynchronous messaging) to the design of code running within a single process. In other words, the code would consist of independent services that could only communicate asynchronously via some kind of message bus. What would this look like? Would it have any benefits?

Before I go any further, I should point out that I did some research and discovered that Svein Arne Ackenhausen had not only been thinking along these lines, but was also using them in production. His blog post summarises the benefits as he sees them, and he linked to his standard implementation. However I’ve not been able to find any other articles discussing these ideas, so I thought I’d write up my thoughts in the hope that I can get some useful feedback.

Svein’s implementation is a very nice, simple approach (a good thing!) which clearly demonstrates that there aren’t any great technological barriers to adopting this design pattern. However, I think it may benefit from some additions.

Firstly, as Svein mentions, stateful services have to be modelled very carefully because the service can be handling multiple messages concurrently. What if we sidestepped this problem by combining the message bus concept with the Actor Model so that each service behaves as an actor that processes messages sequentially? Now we can use stateful services where appropriate without having to worry about concurrency.

Secondly, it only really deals with publishing events: there is no concept of request/response. We could mimic request/response by convention (handlers of IFoo must themselves publish a message of type IFooResponse in return), but as Svein pointed out in his blog post, one of the benefits of this approach is that:

By making sure that your code feature produces an output in the shape of a message you have made that code feature a stand alone piece of code. It can now make sense beyond the contract that would have otherwise been passed to it. Being a stand alone piece of code it becomes a natural place for integration. Integration that would happen through consuming a message or producing a message that other participants can consume through the message dispatcher.

To maximise this benefit I feel we should be making explicit the difference between publishing an event and sending a command or querying for data. This is something that, for example, NServiceBus makes explicit via the Publish() and Send() methods on its service bus interface.

So what would such an API look like? First we need the message handling interfaces that services can implement:


public interface IEventHandler<TEvent>
{
    void HandleEvent(TEvent message);
}

public interface IRequestHandler<TRequest, TResponse>
{
    Task<TResponse> HandleRequest(TRequest message);
}

By returning a Task the IRequestHandler will integrate nicely with the async features of C# 5.

Next we need the message bus interface:

public interface IMessageBus
{
    public void RegisterEventHandler<TEvent>(IEventHandler<TEvent> handler);
    public void RegisterRequestHandler<TRequest, TResponse>(IRequestHandler<TRequest, TResponse> handler);
    public void Publish<TEvent>(TEvent message);
    public Task<TResponse> Send<TRequest, TResponse>(TRequest message);
 }

Then at the application root we can configure the system by registering services with the message bus:

var serviceA = new ServiceA; // implements IEventHandler
bus.RegisterEventHandler(IEventHandler serviceA);

var serviceB = new ServiceB; // implements IRequestHandler
bus.RegisterRequestHandler(IRequestHandler serviceB);

And services can interact with the bus like so:

public async void DoStuff()
{
    // some actions...

    // this will send the ReqA message and asynchronously await the response
    var result = await bus.Send(new ReqA());

    // do other stuff with the result...
}

Whilst it is a nice idea to avoid concurrent message processing in the services, we need a way of handling the situation where the service cannot process the messages quickly enough. The solution comes from considering the SOA approach: simply add multiple service instances behind a load balancer! For example:

public class EventLoadBalancer : IEventHandler<TEvent>
{
    public LoadBalancer(Func<IEventHandler<TEvent>> slaveFactory)
    {
        // set up appropriate queues etc to hold slave instances
    }

    public void HandleEvent(TEvent message)
    {
        // forward message to chosen slave instance
    }
}

Because the load balancer is itself an IEventHandler<TEvent>, we can simply pass it to the RegisterEventHandler method on the message bus instead of passing the service instance.

So far, so theoretical. How can we implement the actor functionality? Luckily, F# already supports agents (F# terminology for actors) via its MailboxProcessor class. So all we need to do is place an F# Agent between each service and the message bus (this can be done by the RegisterEventHandler and RegisterRequestHandler methods of the message bus):

So, the F# Agent will be responsible for forwarding the message to the appropriate message handler interface of the service and it also ensures that the service does not process multiple messages concurrently.

This approach could be the high level architecture, and would in no way constrain lower level design choices. So, individual services could be implemented in a traditional OO manner, a pure functional manner, or anything in-between. A service could even be decomposed into sub-services and use an internal message bus.

In the next post I’ll look in more detail at a proof-of-concept implementation. In the meantime, I’ll point out a couple of things I found out whilst writing this post:

  1. The next version of NServiceBus will likely have an in-process mode. It will be interesting to see what form this takes.
  2. Alan Kay, who coined the term “object oriented” believes that the messaging between objects is far more important than the objects themselves. Maybe we are heading back closer to his original vision?