One of the difficulties in working with Azure Service Bus queues is what to do with poisoned messages — that is, a message that simply cannot be processed / completed.
For example, suppose you have an Azure Cloud Services worker role that pulls messages off a Service Bus queue. The queue contains the names of XML files in an Azure Storage blob container. Your worker processes retrieves those files, parses them and populates the information into a SQL Server database for use by other processes.
How do you gracefully handle the presence of a corrupted or otherwise unusable XML file?
How do you handle temporary interruptions or faults when communicating with SQL Server, or other seemingly recoverable errors?
A corrupt or unusable XML file is never going to properly process. In that case, our worker process should log (if not issue some sort of alert) that the file is unreadable, and discard any further attempt to process that queue message, by marking it complete.
In the case of an error talking to SQL Server, we probably will want to try again in a little bit. This is especially true if each XML file represents a brand-new record for our database.

Beware Abandoned Message Concurrency
This brings up the issue of message concurrency, which can be problematic when working on Azure Service Bus queues.
It may be that these XML files update existing database entries; perhaps, a single record can be updated many times by these XML files. In that case, processing the XML files in the right order is critical; we don’t want old data overwriting new data. (That is, we want to avoid race conditions.)
Because we need to process the XML files in the right order, we almost certainly want to discard (complete) any message that doesn’t process correctly on its first try, since reprocessing it later could overwrite newer data.
For example, suppose it takes 5 seconds for our worker role to process an XML file. A file received at 5:00:00 p.m. fails to correctly process, and is abandoned at 5:00:05 p.m.
Suppose, as that file was being processed, a newer XML file showed up at 5:00:04 p.m.
If the original file had properly processed, that wouldn’t be an issue; the older file’s values would have been superseded by the newer file, which is probably the next message in the queue.

But because we abandoned the older file’s message, and it is thus returned to the queue for processing on the worker processes’ next swing through, we’ve effectively inverted the processing order for these XML files.
Because Azure Service Bus messages are enqueued as last-in, first-out, once we abandon the older file’s message, our worker process will grab the newer XML file and process it … then grab the older file, because its message is older than the newer file’s message, and thus is processed after the newer file!

This probably wouldn’t be a problem if you’re not on a high-volume queue, and the chance that a newer file shows up while you’re processing an older file is low.
But if the chances are good that in the time it takes to process a message, a newer message can arrive that supersedes the message you’re working on, you almost certainly don’t want to abandon a poison message.
You probably want to complete it, log the failure (and somehow report it), since the next record update will be along shortly.
using System; using System.Net; using System.Threading; using Microsoft.ServiceBus.Messaging; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; using NLog; namespace WorkerRoleWithSBQueue1 { public class WorkerRole : RoleEntryPoint { const string QueueName = "MY-QUEUE-NAME"; QueueClient _client; readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); // I like to use NLog for logging; it has native support for Trace logging to Azure Diagnostics private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public override void Run() { Log.Trace("Starting processing of messages"); _client.OnMessage(receivedMessage => { try { // message is OK if try succeeds Log.Info("Processing Service Bus message: {0}", receivedMessage.Label); // your processing code here Log.Info("Processing of {0} complete.", receivedMessage.Label); } catch(Exception ex) { // Handle any message processing specific exceptions here Log.Error("Could not process {0}. Reason: {1}", receivedMessage.Label, ex.Message); } finally { // complete the message, regardless of success or failure receivedMessage.Complete(); } }); _completedEvent.WaitOne(); } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // Create the queue if it does not exist already var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); // Initialize the connection to Service Bus Queue _client = QueueClient.CreateFromConnectionString(connectionString, QueueName); return base.OnStart(); } public override void OnStop() { // Close the connection to Service Bus Queue _client.Close(); _completedEvent.Set(); base.OnStop(); } } }
Finally, let’s suppose you must process all messages in the order in which they were originally received. In other words, you need to avoid race conditions at all costs.
In that case, you’ll need to complete the original message, then put it back into the queue as a new message.
public override void Run() { Log.Trace("Starting processing of messages"); _client.OnMessage(receivedMessage => { try { // message is OK if try succeeds Log.Info("Processing Service Bus message: {0}", receivedMessage.Label); // your processing code here Log.Info("Processing of {0} complete.", receivedMessage.Label); } catch(Exception ex) { // Handle any message processing specific exceptions here Log.Error("Could not process {0}. Reason: {1}", receivedMessage.Label, ex.Message); // recreate message and send it again var newMsg = new BrokeredMessage { Label = receivedMessage.Label; }; _client.Send(newMsg); } finally { // complete the message, regardless of success or failure receivedMessage.Complete(); } }); _completedEvent.WaitOne(); }
The problem with this, unfortunately, is that it becomes quite difficult to determine if the message you are recycling is truly poisoned.
In other words, are you only having to try the message a handful of times, to get around a bottleneck? Or is your worker process on a fool’s errand, trying to accomplish a task that will never complete?
If so, the code as it appears above would put us, quite quickly, into an infinite loop.
So if you went with this route, you’ll almost certainly need to install a persistent counter — maybe, in a SQL Server table — to see if the message has made a lot of round trips, and stop trying after X number of tries.
Of course, this defeats the purpose of having a message queue; you might as well just use the SQL Server to hold onto the work to be done, and pull “messages” from a table.
Which leads us to the Service Bus dead-letter queue.

The Service Bus Dead Letter Queue
Let’s change gears a bit and assume that each XML file represents a unique record, so concurrency is not an issue. Or, at the very least, we’re not tied to a workflow where all messages must be processed successfully before moving on to the next message.
In other words, I want to keep trying to process each message for some reasonable number of tries, expecting that trouble I run across is temporary.
After that period of reasonable attempts expires, I want to dump the message. But I also want to make note of the fact that I could not complete the message, so that I can audit the effect of missing that message and / or do something about it, either manually or automatically.
This functionality comes built-in through Service Bus, in the form of the dead-letter queue.
I can set a period of time on each message that I want as the “time to live.” Once that timespan has expired, the message is transferred from the main queue to the dead-letter subqueue, where it will remain until such time as I complete it from that subqueue.
Handling the dead-letter queue is exactly the same as handling the master queue; you can peek, complete and abandon messages, exactly as though they were on the primary queue.
The biggest difference is that there is no time-to-live setting for a dead letter queue. Once a message enters it, there it stays until you complete it with some other process.
Also, there is no way, via the management portal, to see details about the messages in a queue or its dead-letter queue. Therefore, if you want to know anything about a message in a queue or its dead-letter subqueue, you need to handle that in code.
Fortunately, as previously mentioned, getting messages off the dead letter queue is exactly the same as pulling them off the primary queue, save appending /$DeadLetterQueue to the name of your queue:
using System; using System.Net; using System.Threading; using Microsoft.ServiceBus.Messaging; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.ServiceRuntime; using NLog; namespace WorkerRoleWithSBQueue1 { public class WorkerRole : RoleEntryPoint { const string QueueName = "MY-QUEUE-NAME/$DeadLetterQueue"; QueueClient _client; readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); // I like to use NLog for logging; it has native support for Trace logging to Azure Diagnostics private static readonly Logger Log = LogManager.GetCurrentClassLogger(); public override void Run() { Log.Trace("Starting processing of dead letter queue messages"); _client.OnMessage(receivedMessage => { try { // message is OK if try succeeds Log.Info("Dead letter {0} received", receivedMessage.Label); // your processing code here Log.Info("Processing of dead letter {0} complete.", receivedMessage.Label); } catch(Exception ex) { // Handle any message processing specific exceptions here Log.Error("Could not process dead letter {0}. Reason: {1}", receivedMessage.Label, ex.Message); } finally { // complete the message, regardless of success or failure receivedMessage.Complete(); } }); _completedEvent.WaitOne(); } public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // Create the queue if it does not exist already var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); // Initialize the connection to Service Bus Queue _client = QueueClient.CreateFromConnectionString(connectionString, QueueName); return base.OnStart(); } public override void OnStop() { // Close the connection to Service Bus Queue _client.Close(); _completedEvent.Set(); base.OnStop(); } } }
This code on github: https://github.com/dougvdotcom/azure-servicebus-poisoned-messages.
All links in this post on delicious: https://delicious.com/dougvdotcom/handling-poison-messages-in-an-azure-service-bus-queue