Message Bus In Action

It has been a long time since my last post, but I do have a partial excuse: I started a new job at pebble {code} the day after posting it. My two previous posts on the message bus have been cross posted to the company blog: head over there for an eclectic mix of articles. If you like what you see, why not consider applying for a job?

In this post I want to summarise the advantages of the message bus approach, and also look at a potential limitation. The benefits as I see them are:

  1. (Very) loose coupling between services.
  2. Encourages narrow interfaces (what is narrower than an interface with one method e.g. IRequestHandler?)
  3. Provides not only Event Aggregator functionality, but also asynchronous Request-Response.
  4. The ability to enforce sequential message processing can make it simpler to develop stateful services.

If all the request processing is sequential then we have something akin to the Actor Model but without the concept of identity: a request can only have a single registered handler, so it is meaningless to talk about the handler’s identity. In general I think this is a good thing – having to know about the identity increases the coupling between the services.

However, there are situations where we really do need to distinguish message destinations based on the identity of the receiver. Is this a situation that we can’t handle? Or are there workarounds using the message bus?

The example I’ve chosen to use is the simplest two person game I could think of – Noughts and Crosses. The design consists of 4 services: a Game service (responsible for controlling the game and ensuring legal play), two instances of a Player service (P1 and P2) and a UI service. In all of the sequence diagrams that follow the messages are assumed to go via the bus (not shown), unless explicitly stated otherwise. Async/sync requests are shown with a solid line and an open/closed arrowhead; events are shown with a dashed line and an open arrowhead. First the players have to register to play the game:

Players send a RegistrationRequest to the Game (N.B. as long as the Game has been registered as a handler for RegistrationRequest using MessageProcessingMode.Sequential, we don’t need to worry about race conditions during registration). The Game returns a RegistrationResult indicating whether registration has been successful. Once two players have registered the game can begin. The idea is that the Game should be able to send a PlayerMoveRequest and receive a PlayerMoveResponse containing the player’s chosen move. However, both players are capable of handling a PlayerMoveRequest, and they can’t both register as handlers for this request. The options seem to be:

Unique Request Types

Use different request types for Player1 and Player2:

This is ugly because of the large amounts of duplication (P1MoveRequest = P2MoveRequest; Player service needs to implement IRequestHandler<P1MoveRequest, P1MoveResponse> and IRequestHandler<P2MoveRequest, P2MoveResponse>).

Two Message Bus Instances

Introduce two message bus instances, with communication between the game and each player happening on separate buses: Whilst this initially felt more natural – we are simply saying that the game has a dedicated communication channel with each player – it gets complicated once we consider the UI. Does each player use its dedicated bus to communicate with the UI? Then how does the Game communicate with the UI? Via a third bus? Or does it arbitrarily choose one of the two existing buses?

Player Message Router

Introduce a PlayerMessageRouter that receives the PlayerMoveRequest and routes it (using information present within the request) to the appropriate Player. This routing does not go via the bus – it will probably just be a direct method call: This initially felt like an unnatural abstraction – there is no concept of a player router in the Noughts and Crosses domain – introduced to get around a limitation in the message bus. However, on closer inspection the Game/Player/UI don’t need to know anything about the router, which can be implemented in a generic manner and just becomes another infrastructure component that we can choose to use if appropriate (like the load balancers):

namespace ItWorksOnMyMachine.MicroSoa
{
    using System;
    using System.Diagnostics.Contracts;
    using System.Linq;
    using System.Threading.Tasks;

    public class RequestRouter<TRequest, TResponse, THandler> : IRequestHandler<TRequest, TResponse> where THandler : IRequestHandler<TRequest, TResponse>
    {
        private readonly IMessageDispatcher messageDispatcher;

        private readonly Func<TRequest, THandler, bool> isCorrectHandlerForRequest;

        private readonly IMessageQueue[] requestQueues;

        public RequestRouter(IMessageDispatcher messageDispatcher, Func<TRequest, THandler, bool> isCorrectHandlerForRequest, params Tuple<THandler, MessageProcessingMode>[] handlerProcessingModePairs)
        {
            Contract.Requires<ArgumentNullException>(isCorrectHandlerForRequest != null);
            Contract.Requires<ArgumentOutOfRangeException>(handlerProcessingModePairs.Count() >= 2);

            this.messageDispatcher = messageDispatcher;
            this.isCorrectHandlerForRequest = isCorrectHandlerForRequest;
            this.requestQueues = handlerProcessingModePairs.Select(CreateQueue).ToArray();
        }

        private static IMessageQueue CreateQueue(Tuple<THandler, MessageProcessingMode> handlerProcessingModePair)
        {
            var handler = handlerProcessingModePair.Item1;
            var messageProcessingMode = handlerProcessingModePair.Item2;

            return MessageBusFactory.MessageQueueCache.GetOrAdd(handler, messageProcessingMode);
        }

        public Task<TResponse> HandleRequest(TRequest request)
        {
            var targetQueue = requestQueues.Single(queue => isCorrectHandlerForRequest(request, (THandler)queue.Service));

            return targetQueue.Send<TRequest, TResponse>(request, messageDispatcher);
        }
    }
}

The RequestRouter is also a good example of the first three of the SOLID design principles:

  • Single Responsibility: it simply forwards requests to the correct recipient. It does not even understand how the correct recipient is chosen, it simply knows that the provided delegate will do the work for it.
  • Open/Closed: we’ve introduced routed messages into our system by adding a new component rather than modifying existing ones.
  • Liskov Substitution: it implements IRequestHandler<TRequest, TResponse> and hence is a drop-in replacement for the original request handler.

Bringing It All Together

I’ve implemented a solution using the “Player Message Router” approach. I’ve assumed that both players are humans, and they forward the move requests to a UI, of which there is both a console version and a WinForms version. The Game, HumanPlayer, ConsoleUI and WinFormUI classes are quick and dirty implementations (though take a peek at HumanPlayer.HandleRequest and WinFormUI.GetMoveFromButtonClick for some Async/Await and Rx tastiness). The real interest is in how they are wired up. The code below is for wiring up the ConsoleUI version:

class Program
{
    static void Main(string[] args)
    {
        var bus = MessageBusFactory.Create();

        var game = new Game(bus);

        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);
        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);

        var player1 = new HumanPlayer("Player 1", bus);
        var player2 = new HumanPlayer("Player 2", bus);

        var playerRouter = new RequestRouter(
                bus,
                (request, player) => player.Name == request.Player,
                Tuple.Create(player1, MessageProcessingMode.Sequential),
                Tuple.Create(player2, MessageProcessingMode.Sequential));

        bus.RegisterRequestHandler(playerRouter, MessageProcessingMode.Sequential);

        var userInterface = new ConsoleUI(bus);
        bus.RegisterRequestHandler(userInterface, MessageProcessingMode.Sequential);
        bus.RegisterEventHandler(userInterface, MessageProcessingMode.Sequential);

        player1.RegisterForGame();
        player2.RegisterForGame();

        userInterface.Run();
    }
}

What I like about this is that all of the messages that flow through the system are clearly laid out in the application root. This provides a nice high level overview of the system. Also notice that the Game and the Players don’t know anything about the RequestRouter that has been introduced to route messages to the correct player. Now, if we compare this to the setup for the WinFormUI:

static class Program
{
    [STAThread]
    static void Main()
    {
        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);

        var bus = MessageBusFactory.Create();

        var game = new Game(bus);

        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);
        bus.RegisterRequestHandler(game, MessageProcessingMode.Sequential);

        var player1 = new HumanPlayer("Player 1", bus);
        var player2 = new HumanPlayer("Player 2", bus);

        var playerRouter = new RequestRouter(
                bus,
                (request, player) => player.Name == request.Player,
                Tuple.Create(player1, MessageProcessingMode.Sequential),
                Tuple.Create(player2, MessageProcessingMode.Sequential));

        bus.RegisterRequestHandler(playerRouter, MessageProcessingMode.Sequential);

        var userInterface = new WinFormUI(bus);
        bus.RegisterRequestHandler(userInterface, MessageProcessingMode.Sequential);
        bus.RegisterEventHandler(userInterface, MessageProcessingMode.Sequential);

        player1.RegisterForGame();
        player2.RegisterForGame();

        Application.Run(userInterface);
    }
}

The only differences from the previous ConsoleUI example are the call to create the UI instance and the WinForms-specific calls to the Application class. Everything else is identical.

Summary

The lack of routed messages initially seems like a big limitation of the message bus approach. However, following the SOA analogy that started this series of posts, we can simply introduce a router to add the missing functionality in a way that respects the O in SOLID.

The Nought and Crosses implementation then shows the router and the rest of the message bus in action in a real, albeit small, example.

Leave a Reply