In-Process Message Bus

In my last post I discussed the idea of designing single process applications in a way that mimics a SOA i.e. the application is decomposed into services that communicate solely via asynchronous messaging. As promised, here I’ll discuss the key concepts in the proof-of-concept implementation. If you want all the details then take a look at the source, which is a mixture of F# and C# code.

IMessageBus

Firstly, the message bus interface has been split into an IMessageDispatcher and an IMessageBus:

type IMessageDispatcher =
    abstract member Publish : 'TEvent -> unit
    abstract member Send : 'TRequest -> Task<'TResponse>

type MessageProcessingMode = 
    | Sequential = 0
    | Concurrent = 1

type IMessageBus
    inherit IMessageDispatcher
    abstract member RegisterEventHandler: IEventHandler<'TEvent> -> MessageProcessingMode -> unit
    abstract member RegisterRequestHandler: IRequestHandler<'TRequest, 'TResponse> -> MessageProcessingMode -> unit

IMessageDispatcher is only concerned with dispatching messages to the correct recipients; IMessageBus augments this with the ability to register message handlers. The RegisterEventHandler and RegisterRequestHandler methods now take a MessageProcessingMode that specifies whether the handler can process messages concurrently i.e. whether it is thread-safe.

Services will typically take a dependency on IMessageDispatcher, as they just need to publish/send messages rather than register new handlers.

IMessageQueue

Next we need to wrap each service in a message queue that can forward messages to the correct handler implementation whilst ensuring that the MessageProcessingMode is enforced:

type IMessageQueue = 
    abstract member Publish : 'TEvent -> IMessageDispatcher -> unit
    abstract member Send : 'TRequest -> IMessageDispatcher -> Task<'TResponse>
    abstract member Service : obj

The reason for IMessageDispatcher appearing in the method signatures is so that the queue can publish an ExceptionMessage if the handler throws an exception. This makes it possible for a global exception handler to deal with all exceptions.

IMessageQueue Implementations

There are two implementations of IMessageQueueMessageQueue and NullMessageQueue. MessageQueue, as expected, enforces sequential processing of messages via an F# agent; NullMessageQueue is a simple pass-through to the underlying handler (i.e. it allows concurrent message processing) that can be used as a performance optimisation in the case where the handler is known to be thread-safe (i.e. the handler was registered using MessageProcessingMode.Concurrent).

MessageQueue

type internal AgentMessage =
    | AsyncAction of (unit-> Async<unit>)

type internal AgentResponse<'T> = 
    | Response of 'T
    | Error of exn
                
type MessageQueue(service : obj) =
    let agent = 
        Agent.Start(fun inbox ->
            let rec loop () =
                async { 
                    let! AsyncAction(forwardMessageToService) = inbox.Receive()
                    do! forwardMessageToService()
                    return! loop() 
                }
            loop())
    
    let publishExceptionAndReply e (bus : IMessageDispatcher) (replyChannel : AsyncReplyChannel<AgentResponse<'TOut>>) = bus.Publish (ExceptionMessage(e))
                                                                                                                         replyChannel.Reply (Error(e))

    let sendRequest (request : 'TIn) (replyChannel : AsyncReplyChannel<AgentResponse<'TOut>>) bus (service : obj) =
        async { 
            let handler = service :?> IRequestHandler<'TIn, 'TOut>
            try
                let! response = Async.AwaitTask (handler.HandleRequest request)
                replyChannel.Reply (Response(response))
            with
            | :? System.AggregateException as ae -> publishExceptionAndReply ae.InnerException bus replyChannel
            | e -> publishExceptionAndReply e bus replyChannel
        }
    
    let publishEvent (event : 'TIn) (bus : IMessageDispatcher) (service : obj) =
        async { 
            let handler = service :?> IEventHandler<'TIn>
            try
                handler.HandleEvent event
            with 
            | e -> bus.Publish (ExceptionMessage(e))
        }
    
    interface IMessageQueue with
        member x.Service = service

        member x.Send<'TIn, 'TOut> (request : 'TIn) bus = 
            let resp  = agent.PostAndAsyncReply<AgentResponse<'TOut>>(fun replyChannel -> AsyncAction(sendRequest request replyChannel bus service))
            Async.StartAsTask (
                async { 
                    let! res = resp
                    match res with
                    | Response r -> return r
                    | Error e -> return (raise e)
                }
            )

        member x.Publish<'TIn> (event : 'TIn) bus =
            agent.Post (AsyncAction(publishEvent event bus service))

MessageQueue‘s agent receives messages of type AgentMessage and replies with messages of type AgentResponse. AgentMessage is just an async computation that the agent can execute. The exact computation is specified by the sendRequest and publishEvent methods, but they both forward the original message to the correct message handler implementation and deal with responses/exceptions. AgentResponse contains either the successful response or the failure exception, and is a useful way to get around MailboxProcessor’s strange exception handling behaviour.

NullMessageQueue

type NullMessageQueue(service : obj) =
    let publishExceptionAndReply (e : exn) (bus : IMessageDispatcher) = bus.Publish (ExceptionMessage(e))
                                                                        raise e

    interface IMessageQueue with
        member x.Service = service

        member x.Send<'TIn, 'TOut> request bus =
            let handler = service :?> IRequestHandler<'TIn, 'TOut>
            let handleAsync = async { 
                                  try
                                      let! response = Async.AwaitTask (handler.HandleRequest request)
                                      return response
                                  with
                                  | :? System.AggregateException as ae -> return (publishExceptionAndReply ae.InnerException bus)
                                  | e -> return (publishExceptionAndReply e bus)
                              }
            Async.StartAsTask handleAsync
            
        member x.Publish<'TIn> event bus =
            let handler = service :?> IEventHandler<'TIn>
            let handleAsync = async { 
                                  try 
                                      handler.HandleEvent event 
                                  with
                                  | e -> bus.Publish (ExceptionMessage(e))
                              }
            Async.Start handleAsync

NullMessageQueue is just directly forwarding messages to handlers without involve an F# agent. However, to ensure that exceptions are handled in the same manner as MessageQueue, the invocations need to be wrapped in async computations.

Architecture

So, the updated diagram looks like this:

When handlers are registered with the message bus, the MessageToQueuesMap is told to add the required message queue to the collection of queues that process the given message type. Then, when the message bus receives a message it asks the map for the collection of queues that the message should be forwarded to. Events can be forwarded to multiple handlers (e.g. ServiceA and ServiceB both implement IEventHandler<EventA>),
but requests must only have a single handler.

Note that each service can be wrapped in at most two IMessageQueue instances – one MessageQueue and one NullMessageQueue. All messages that need to be processed sequentially go through the MessageQueue, and all messages that can be processed concurrently go through the NullMessageQueue. The MessageQueueCache (not shown) is responsible for providing the correct queue(s) for each service, and is used by the MessageToQueuesMap during handler registration.

Load Balancers

If a message handler cannot handle concurrent messages, it may by swamped by high message volumes whilst not utilising the available cores. As mentioned in the previous post, this can be solved by using a load balancer. There are two versions, one for handling events and one for requests. Both make use of Tomas Petricek’s BlockingQueueAgent class, which enables Producer/Consumer scenarios.

Rather than listing the code here, I’ll point you to the EventLoadBalancer and RequestLoadBalancer.

Wrapping Up

This post is already long enough and I’ve delayed publishing it for too long, so I’m going to defer further discussion for the next post. There I’ll discuss the pros and cons of the current implementation and also compare the concept to a pure Actor Model.

One thought on “In-Process Message Bus

  1. Pingback: Message Bus In Action | GET http://localhost…200 OK

Leave a Reply