Monday, May 30, 2011

Dependency Injection Haskell Style

Today I was thinking about dependency injection and Haskell. If we think about how an IoC container in a language like C# works, there are several pieces:

  1. Services are described by types (usually interfaces).
  2. Components (classes) describe their dependencies with the types of their constructor arguments.
  3. The components in turn describe the services they provide by implementing service interfaces.
  4. Service types are registered against implementation types using some API provided by the IoC container.
  5. A clever piece of framework (the IoC container), that when asked for a service (described by an interface), creates the entire dependency chain and then passes it the caller.

The important point is that we don’t have to manually wire up the dependency chain. We simply build our software in terms of service contracts and implementations, register them with the IoC container which then magically does the wiring up for us.

If you dig into the source code for any of the .NET IoC containers (such as Windsor, StructureMap, Autofac, Ninject etc) you’ll see they do an awful lot of reflection to work out the dependency graph. I remember reading (or hearing someone say) that reflection is often used to make up for inadequacies in C#’s type system, so I started experimenting to see if Haskell could provide the decoupling of registration and dependency graph building without the need for an IoC container. And indeed it can.

First let’s define some services:

-- this could talk to a database
type GetReport = Int -> IO Report
-- this could talk to an email API
type SendReport = Report -> IO ()
-- this takes a report id and does something with it
type ProcessReport = Int -> IO ()

Now let’s define some implementations:

-- getReport simply creates a new report with the given id
getReport :: GetReport
getReport id =
return $ Report id "Hello"

-- sendReport simply prints the report
sendReport :: SendReport
sendReport report = putStr $ show report

-- processReport uses a GetReport and a SendReport to process a report
processReport :: GetReport -> SendReport -> ProcessReport
processReport get send id = do
r <- get id
send r

Partial function application is equivalent to dependency injection in OO. Here our processReport’s dependencies are given as the first two arguments of the processReport function.

Now let’s define a type class with a ‘resolve’ member. The resolve member isn’t a function as such, it’s just a label for whatever ‘a’ happens to be when we define instances of the type class:

class Resolvable a where
resolve :: a

Now let’s make each of our services an instance of ‘Resolvable’, and ‘register’ the implementation for each service:

instance Resolvable GetReport where
resolve = getReport

instance Resolvable SendReport where
resolve = sendReport

instance Resolvable ProcessReport where
resolve = processReport resolve resolve

Note that we partially apply processReport with two resolve calls that will provide implementations of the service types.

The whole caboodle compiles and we can use resolve to grab a ProcessReport implementation with its dependencies provided:

> let p = resolve :: ProcessReport
> p 23
Report 23 "Hello"

All the functionality of an IoC container without an IoC container. Wonderful.

So, to sum up, we’ve simply registered implementations against services and let the Haskell type system build the dependency graph for us. The added benefit we get here over reflection based IoC containers in C# and Java, is that this is all typed checked at compile time. No need to run it to find missing dependencies.

Please bear in mind that I’m a total Haskell novice and this is probably something that real Haskell programmers would never do. But it’s an interesting illustration of the power of the Haskell type system, and how a lot of what we do with reflection in C# is simply to augment the limitations of the language.

Thursday, May 19, 2011

FuturePublish with the EasyNetQ RabbitMQ API

If you’re a regular reader, you’ll remember that I published an initial version EasyNetQ, my .NET friendly API for RabbitMQ, earlier this month. Today I want to show off a little addition that allows messages to be scheduled for publishing at a future date.

Many business scenarios require some kind of scheduling. For example, say I want to send a party invitation, but I know that it will be forgotten if I send it too early. Instead I want it to arrive two days before the party. I’d like to ‘future publish’ my invite at the time I’m planning my party, and let the messaging system worry about sending it two days before hand.

I’ve added a FuturePublish method to the EasyNetQ, you simply give it a messasge and the time that you want it sent.

var invitation = new PartyInvitation
{
Text = "Please come to my party",
Date = new DateTime(2011, 5, 24)
};

bus.FuturePublish(invitation.Date.AddDays(-2), invitation);

That’s cool, how does it work?

Internally the FuturePublish method wraps the given message in a special ‘Schedule Me’ message. This message is then published to Rabbit as normal. A windows service, EasyNetQ.Scheduler, subscribes to the Schedule Me messages which it writes to its database. At a pre-defined interval, it polls its database looking for messages who’s publish date is the current time, retrieves the wrapped message and publishes it.

Check out the source on GitHub here:

https://github.com/mikehadlow/EasyNetQ

Wednesday, May 04, 2011

EasyNetQ, a simple .NET API for RabbitMQ

After pondering the results of our message queue shootout, we decided to run with Rabbit MQ. Rabbit ticks all of the boxes, it’s supported (by Spring Source and then VMware ultimately), scales and has the features and performance we need. The RabbitMQ.Client provided by Spring Source is a thin wrapper that quite faithfully exposes the AMQP protocol, so it expects messages as byte arrays.

For the shootout tests spraying byte arrays around was fine, but in the real world, we want our messages to be .NET types. I also wanted to provide developers with a very simple API that abstracted away the Exchange/Binding/Queue model of AMQP and instead provides a simple publish/subscribe and request/response model. My inspiration was the excellent work done by Dru Sellers and Chris Patterson with MassTransit (the new V2.0 beta is just out).

The code is on GitHub here:

https://github.com/mikehadlow/EasyNetQ

The API centres around an IBus interface that looks like this:

/// <summary>
/// Provides a simple Publish/Subscribe and Request/Response API for a message bus.
/// </summary>
public interface IBus : IDisposable
{
/// <summary>
/// Publishes a message.
/// </summary>
/// <typeparam name="T">The message type</typeparam>
/// <param name="message">The message to publish</param>
void Publish<T>(T message);

/// <summary>
/// Subscribes to a stream of messages that match a .NET type.
/// </summary>
/// <typeparam name="T">The type to subscribe to</typeparam>
/// <param name="subscriptionId">
/// A unique identifier for the subscription. Two subscriptions with the same subscriptionId
/// and type will get messages delivered in turn. This is useful if you want multiple subscribers
/// to load balance a subscription in a round-robin fashion.
/// </param>
/// <param name="onMessage">
/// The action to run when a message arrives.
/// </param>
void Subscribe<T>(string subscriptionId, Action<T> onMessage);

/// <summary>
/// Makes an RPC style asynchronous request.
/// </summary>
/// <typeparam name="TRequest">The request type.</typeparam>
/// <typeparam name="TResponse">The response type.</typeparam>
/// <param name="request">The request message.</param>
/// <param name="onResponse">The action to run when the response is received.</param>
void Request<TRequest, TResponse>(TRequest request, Action<TResponse> onResponse);

/// <summary>
/// Responds to an RPC request.
/// </summary>
/// <typeparam name="TRequest">The request type.</typeparam>
/// <typeparam name="TResponse">The response type.</typeparam>
/// <param name="responder">
/// A function to run when the request is received. It should return the response.
/// </param>
void Respond<TRequest, TResponse>(Func<TRequest, TResponse> responder);
}

To create a bus, just use a RabbitHutch, sorry I couldn’t resist it :)

var bus = RabbitHutch.CreateRabbitBus("localhost");

You can just pass in the name of the server to use the default Rabbit virtual host ‘/’, or you can specify a named virtual host like this:

var bus = RabbitHutch.CreateRabbitBus("localhost/myVirtualHost");

The first messaging pattern I wanted to support was publish/subscribe. Once you’ve got a bus instance, you can publish a message like this:

var message = new MyMessage {Text = "Hello!"};
bus.Publish(message);

This publishes the message to an exchange named by the message type.

You subscribe to a message like this:

bus.Subscribe<MyMessage>("test", message => Console.WriteLine(message.Text));

This creates a queue named ‘test_<message type>’ and binds it to the message type’s exchange. When a message is received it is passed to the Action<T> delegate.  If there are more than one subscribers to the same message type named ‘test’, Rabbit will hand out the messages in a round-robin fashion, so you get simple load balancing out of the box. Subscribers to the same message type, but with different names will each get a copy of the message, as you’d expect.

The second messaging pattern is an asynchronous RPC. You can call a remote service like this:

var request = new TestRequestMessage {Text = "Hello from the client! "};

bus.Request<TestRequestMessage, TestResponseMessage>(request, response =>
Console.WriteLine("Got response: '{0}'", response.Text));

This first creates a new temporary queue for the TestResponseMessage. It then publishes the TestRequestMessage with a return address to the temporary queue. When the TestResponseMessage is received, it passes it to the Action<T> delegate. RabbitMQ happily creates temporary queues and provides a return address header, so this was very easy to implement.

To write an RPC server. Simple use the Respond method like this:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => 
new TestResponseMessage { Text = request.Text + " all done!" });

This creates a subscription for the TestRequestMessage. When a message is received, the Func<TRequest, TResponse> delegate is passed the request and returns the response. The response message is then published to the temporary client queue.

Once again, scaling RPC servers is simply a question of running up new instances. Rabbit will automatically distribute messages to them.

The features of AMQP (and Rabbit) make creating this kind of API a breeze. Check it out and let me know what you think.