Qluent
Qluent is a Fluent Queue Client for Azure storage queues
Qluent is simple fluent API and set of wrapper classes around the Microsoft Azure Storage SDK, allowing you to interact with storage queues using strongly typed objects like in the code snippet below. You can see lots of other ways to use it in the Documentation on Github.
var queue = await Builder
.CreateAQueueOf<Entity>()
.UsingStorageQueue("my-entity-queue")
.BuildAsync();
await queue.PushAsync(new Entity());
So why did I build this.
Back in March, some colleagues and I ran into an issue with a legacy project that we’d inherited at work. At random times a queue consumer would just get stuck and stop dequeuing messages. When we went to debug it we discovered the code responsible for dequeueing, processing and deleting the message was buried in an assembly, and the source code was … unavailable .
After much hair pulling and assembly decompilations, we eventually tracked down the bug, but it got me thinkings about a couple of things:
-
Setting up an azure storage queues through the SDK is a little tedious. There’s quite a bit of ceremony involved to create an
CloudStorageAccount
,CloudQueueClient
andCloudQueue
, to ensure it exists and to deal with serialization/deserialization. -
There are some aspects of the SDK I dislike. Specifying the large majority of settings on the methods (such as message timeout visibility etc…), rather than as configurations on the
CloudQueueClient
itself seems wrong. It leaves lots of sharp corners for the developer to get caught on, after they fetch a queue from DI and want to interact with it. -
There are lots of tricky scenarios to account for even in simple messaging use cases, such as idempotency issues, handling retries and dealing with poison messages.
-
Developers shouldn’t need to worry about writing consumers/dispatchers. They should just need to worry about getting their message handled.
The Goal: Keep it simple
What I really wanted to provide was a very simple fluent API for creating a CloudQueue
and a message consumer around that CloudQueue
. Creating a consumer is simply a matter of providing, a type, a queue, a message handler and starting it up.
var consumer = Builder
.CreateAConsumerFor<Entity>()
.UsingQueue(queue)
.ThatHandlesMessagesUsing((msg) =>
{
Console.WriteLine($"Processing {msg.Value.Property}");
return true;
})
.Build();
await consumer.Start()
The library is intentionally meant to simplify things. Often times I’ll find myself having to scaffold something and spending way too long focusing on the infrastructure code to support message queuing when I should be focusing on the actual problem I’m trying to solve. That’s what this is for. It is a simple wrapper around Azure storage queues to make working with them a little easier.
However there are lots of complicated things you may find yourself needing doing in a distributed environment: Complex Retry Policies; complicated routing paths; Pub/Sub models involving topics and queues; the list goes on. If that’s the case, then perhaps you should be looking at a different technology stack (Azure Service Bus, Event Hubs, Event Grid, Kafka, NService Bus, Mulesoft etc…)
Below you can see some of the features and that the library supports.
Features
Creating a Queue
Queues can be created by simply specifying a storage account, queue name and a type for your message payload. You can purge the queue and obtain an approximate count of messages from it. All operations are async
awaitable, and all support taking a CancellationToken.
var q = await Builder
.CreateAQueueOf<Person>()
.ConnectedToAccount("UseDevelopmentStorage=true")
.UsingStorageQueue("my-test-queue")
.BuildAsync();
await q.PurgeAsync();
var count = await q.CountAsync()
Basic Push/Pop Operations
Basic queue operations include push
, pop
and peek
for one or multiple messages.
var person = new Person("Eoin");
await q.PushAsync(person);
var peekedPerson = await q.PeekAsync();
var poppedPerson = await q.PopAsync();
IEnumerable<Person> peekedPeople = await q.PeekAsync(5);
IEnumerable<Person> poppedPeople = await q.PopAsync(5);
Receipted Deletes
You can also control the deletion of messages from the CloudQueue
using the Get
and Delete
overrides. Under the hood this will use PopReceipts
to subsequently remove the message or on visibility timeout, the message will reappear on the queue.
var wrappedMessage = await q.GetAsync();
try
{
//attempt to process wrappedPerson.Value;
await q.DeleteAsync(wrappedMessage);
}
catch(Exception ex)
{
//message will reappear on queue after timeout
}
Queues can also be configured to support
- Delayed visibility of messages
- Message TTLs
- Visibility timeouts for dequeue events
- Automatic rerouting of poison messages after a number of dequeue & deserialize attempts
- Customized object serialization
The message consumer provides a simple way to asynchronously poll a queue including. It supports
- Message Handlers
- Failed Processing Handlers (Fallback)
- Exception Handlers
- Flow control for when exceptions occur (Exit or Continue)
- Custom Queue Polling Policies
- Integration with NLog for Logging
And there’s more detailed info in Github Repo README.md.
Get It on Nuget
I’d really appreciate feedback on it so if you want to try it out, you can get it on nuget
Eoin Campbell