Tuesday, September 24, 2013

EasyNetQ: IConsumerDispatcher

logo_design_150

EasyNetQ has always had a single dispatcher thread that runs user message handlers. This means that a slow handler on one queue can cause other handlers on other queues to wait. The intention is that one shouldn’t write long running consumers. If you are doing long running IO you should use the SubsribeAsync method and return a task that completes when the long running IO completes.

A recent change to EasyNetQ has been to make the dispatcher a separate abstraction. This means you can replace it with your own implementation if desired.

IConsumerDispatcher

Inside EasyNetQ the dispatcher receives deliveries from the RabbitMQ C# Client library and places the delivery information on an internal concurrent queue. By default, all consumers share a single internal queue. A single dispatcher thread pulls deliveries from the queue and then asks the consumer to invoke the user message handler.

The consumer dispatcher implementation is very simple, it simply maintains a queue of Action and a thread which takes those actions from the end of the queue and runs them. You could use the same pattern whenever you need to marshal an action onto a single thread. I wrote about this more general terms here.

public class ConsumerDispatcher : IConsumerDispatcher
{
    private readonly Thread dispatchThread;
    private readonly BlockingCollection<Action> queue = new BlockingCollection<Action>();
    private bool disposed;

    public ConsumerDispatcher(IEasyNetQLogger logger)
    {
        Preconditions.CheckNotNull(logger, "logger");

        dispatchThread = new Thread(_ =>
            {
                try
                {
                    while (true)
                    {
                        if (disposed) break;

                        queue.Take()();
                    }
                }
                catch (InvalidOperationException)
                {
                    // InvalidOperationException is thrown when Take is called after 
                    // queue.CompleteAdding(), this is signals that this class is being
                    // disposed, so we allow the thread to complete.
                }
                catch (Exception exception)
                {
                    logger.ErrorWrite(exception);
                }
            }) { Name = "EasyNetQ consumer dispatch thread" };
        dispatchThread.Start();
    }

    public void QueueAction(Action action)
    {
        Preconditions.CheckNotNull(action, "action");
        queue.Add(action);
    }

    public void Dispose()
    {
        queue.CompleteAdding();
        disposed = true;
    }
}

An implementation of IConsumerDispatcherFactory maintains a single instance of IConsumerDispatcher:

public class ConsumerDispatcherFactory : IConsumerDispatcherFactory
{
    private readonly Lazy<IConsumerDispatcher> dispatcher;

    public ConsumerDispatcherFactory(IEasyNetQLogger logger)
    {
        Preconditions.CheckNotNull(logger, "logger");
        
        dispatcher = new Lazy<IConsumerDispatcher>(() => new ConsumerDispatcher(logger));
    }

    public IConsumerDispatcher GetConsumerDispatcher()
    {
        return dispatcher.Value;
    }
    
    public void Dispose()
    {
        if (dispatcher.IsValueCreated)
        {
            dispatcher.Value.Dispose();
        }
    }
}

If you wanted to use an alternative dispatch strategy; say for example you (quite reasonably) wanted a new dispatch thread for each consumer, you would simply implement an alternative IConsumerDispatcherFactory and register it with EasyNetQ like this:

var bus = RabbitHutch.CreateBus("host=localhost", 
    x => x.Register<IConsumerDispatcherFactory>(_ => new MyConsumerDispatcherFactory()));

Happy messaging!

Monday, September 23, 2013

RabbitMQ: AMQP Channel Best Practices

I’ve long been confused with best practices around AMQP channel handling. This post is my attempt to explain my current thoughts, partly in an attempt to elicit feedback.

The conversation between a message broker and a client is two-way. Both the client and the server can initiate ‘communication events’; the client can invoke a method on the server: ‘publish’ or ‘declare’, for example; and the server can invoke a method on the client such as ‘deliver’ or ‘reject’. Because the client needs to receive methods from the server, it needs to keep a connection open for its lifetime. This means that broker connections may last long periods; hours, days, or weeks. Maintaining these connections is expensive, both for the client and the server. In order to have many logical connections without the overhead of many physical TCP/IP connections, AMQP has the concept of ‘channel’ (confusingly represented by the IModel interface in the Java and .NET clients). You can create multiple channels on a single connection and it’s relatively cheap to create and dispose of them.

But what are the recommended rules for handling these channels? Should I create a new one for every operation? Or should I just keep a single one around and execute every method on it?

First some hard and fast rules (note I’m using the RabbitMQ .NET client for reference here, other clients might have different behaviour):

Channels are not thread safe. You should create and use a channel on a single thread. From the .NET client documentation (section 2.10):

“In general, IModel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for IModel instances.”

Apparently this is not a hard and fast rule with the Java client, just good advice. The java client serializes all calls to a channel.

An ACK should be invoked on the same channel on which the delivery was received. The delivery tag is scoped to the channel and an ACK sent to a different channel from which the delivery was received will cause a channel error. This somewhat contradicts the ‘Channels are not thread safe’ directive above since you will probably ACK on a different thread from one where you invoked basic.consume, but this seems to be acceptable.

Now some softer suggestions:

It seems neater to create a channel for each consumer. I like thinking of the consumer and channel as a unit: when I create a consumer, I also create a channel for it to consume from. When the consumer goes away, so does the channel and vice-versa.

Do not mix publishing and consuming on the same channel. If you follow the suggestion above, it implies that you have dedicated channels for each consumer. It follows that you should create separate channels for server and client originated methods.

Maintain a long running publish channel. My current implementation of EasyNetQ makes creating channels for publishing the responsibility of the user. I now think this is mistake. It encourages the pattern of: create a channel, publish, dispose the channel. This pattern doesn’t work with publisher confirms where you need to keep the channel open at least until you receive an ACK or NACK. And although creating channels is relatively lightweight, there is still some overhead. I now favour the approach of maintaining a single publish channel on a dedicated thread and marshalling publish (and declare) calls to it. This is potentially a bottleneck, but the impressive performance of non-transactional publishing means that I’m not overly concerned about it.

Thursday, September 12, 2013

EasyNetQ: Topic Confusion!

This is a quick post to highlight a common cause of confusion when people play with topics in EasyNetQ.

You can subscribe to a message type with a topic like this:

bus.Subscribe<MyMessage>("id1", myHandler, x => x.WithTopic("X.*"));

Topics are dot separated strings that match with the routing key attached to a message at publication. In the code above I’ve said, give me any message of type MyMessage who’s topic matches “x.*”. The ‘*’ character is a wildcard, so “X.A” would match, as would “X.Z”, but “Y.A” wouldn’t.

You publish with a topic like this:

using (var publishChannel = bus.OpenPublishChannel())
{
    publishChannel.Publish(myMessage, x => x.WithTopic("X.A"));
}

The confusion occurs when the topic in the subscribe call changes. Maybe you are experimenting with topics by changing the string in the WithTopic( … ) method, or perhaps you are hoping to dynamically change the topic at runtime? Maybe you’ve done several subscribes, each with a different handler and a different topic, but the same subscription Id. Either way, you’ll probably be surprised to find that you still get all the messages matched by previously set topics as well as those matched by the current topic.

In order to explain why this happens, let’s look at how EasyNetQ creates exchanges, queues and bindings when you call these API methods.

If I call the subscribe method above, EasyNetQ will declare a topic-exchange named after the type, a queue named after the type and the subscription id, and a bind them with the given topic:

queue_binding_with_topic

If I change the topic and run the code again like this:

bus.Subscribe<MyMessage>("id1", myHandler, x => x.WithTopic("Y.*"));

EasyNetQ will declare the same queue and exchange. The word ‘declare’ is the key here, it means, “if the object doesn’t exist, create it, otherwise do nothing.” The queue and exchange already exist, so declaring them has no effect. EasyNetQ then binds them with the given routing key. The original binding still exists – we haven’t done anything to remove it – so we now have two bindings with two different topics between the exchange and the queue:

queue_binding_with_topic2

This means that any message matching ‘X.*’ or ‘Y.*’ will be routed to the queue, and thus to our consumer.

So, beware when playing with topics!

As an aside, the ability to create multiple bindings is a very powerful feature. It allows you to implement ‘OR’ semantics for routing messages. If this is what you want, you should concatenate multiple WithTopic methods rather than make multiple calls to Subscribe. For example, say we wanted to implement ‘*.B OR Y.*’:

bus.Subscribe<MyMessage>("id4", myHandler, x => x.WithTopic("Y.*").WithTopic("*.B"));

Which would give us the desired result:

queue_binding_with_topic3

Happy routing!

Tuesday, September 10, 2013

EasyNetQ: Big Breaking Changes in the Advanced Bus

logo_design_240
EasyNetQ is my little, easy to use, client API for RabbitMQ. It’s been doing really well recently. As I write this it has 24,653 downloads on NuGet making it by far the most popular high-level RabbitMQ API.
The goal of EasyNetQ is to make working with RabbitMQ as easy as possible. I wanted junior developers to be able to use basic messaging patterns out-of-the-box with just a few lines of code and have EasyNetQ do all the heavy lifting: exchange-binding-queue configuration, error management, connection management, serialization, thread handling; all the things that make working against the low level AMQP C# API, provided by RabbitMQ, such a steep learning curve.
To meet this goal, EasyNetQ has to be a very opinionated library. It has a set way of configuring exchanges, bindings and queues based the .NET type of your messages. However, right from the first release, many users said that they liked the connection management, thread handling, and error management, but wanted to be able to set up their own broker topology. To support this we introduced the advanced API, an idea stolen shamelessly from Ayende’s RavenDb client.
You access the advanced bus (IAdvancedBus) via the Advanced property on IBus:
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;

Sometimes something can seem like a good idea at the time, and then later you think, “WTF! Why on earth did I do that?” It happens to me all the time. I thought it would be cool if one created the exchange-binding-queue topology and then passed it to the publish and subscribe methods, which would then internally declare the exchanges and queues and do the binding. I implemented a tasty little visitor pattern in my ITopologyVisitor. I optimised for (my) programming fun rather than an a simple, obvious, easy to understand API.
I realised a while ago that a more straightforward set of declares on IAdvancedBus would be a far more obvious and intentional design. To this end, I’ve refactored the advanced bus to separate declares from publishing and consuming. I just pushed the changes to NuGet and have also updated the Advanced Bus documentation. Note these are breaking changes, so please be careful if you are upgrading to the latest version, 0.12, and upwards.
Here are some tasters of how it works:
Declare a queue, exchange and binding, and consume raw message bytes:
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;

var queue = advancedBus.QueueDeclare("my_queue");
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);
advancedBus.Bind(exchange, queue, "routing_key");

advancedBus.Consume(queue, (body, properties, info) => Task.Factory.StartNew(() =>
    {
        var message = Encoding.UTF8.GetString(body);
        Console.Out.WriteLine("Got message: '{0}'", message);
    }));

Note I’ve renamed ‘Subscribe’ to ‘Consume’ to better reflect the underlying AMQP method.
Declare an exchange and publish a message:
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;

var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);

using (var channel = advancedBus.OpenPublishChannel())
{
    var body = Encoding.UTF8.GetBytes("Hello World!");
    channel.Publish(exchange, "routing_key", new MessageProperties(), body);
}

You can also delete exchanges, queues and bindings:
var advancedBus = RabbitHutch.CreateBus("host=localhost").Advanced;

// declare some objects
var queue = advancedBus.QueueDeclare("my_queue");
var exchange = advancedBus.ExchangeDeclare("my_exchange", ExchangeType.Direct);
var binding = advancedBus.Bind(exchange, queue, "routing_key");

// and then delete them
advancedBus.BindingDelete(binding);
advancedBus.ExchangeDelete(exchange);
advancedBus.QueueDelete(queue);

advancedBus.Dispose();

I think these changes make for a much better advanced API. Have a look at the documentation for the details.