Handling Poison Messages In An Azure Service Bus Queue

One of the difficulties in working with Azure Service Bus queues is what to do with poisoned messages — that is, a message that simply cannot be processed / completed.

For example, suppose you have an Azure Cloud Services worker role that pulls messages off a Service Bus queue. The queue contains the names of XML files in an Azure Storage blob container. Your worker processes retrieves those files, parses them and populates the information into a SQL Server database for use by other processes.

How do you gracefully handle the presence of a corrupted or otherwise unusable XML file?

How do you handle temporary interruptions or faults when communicating with SQL Server, or other seemingly recoverable errors?

A corrupt or unusable XML file is never going to properly process. In that case, our worker process should log (if not issue some sort of alert) that the file is unreadable, and discard any further attempt to process that queue message, by marking it complete.

In the case of an error talking to SQL Server, we probably will want to try again in a little bit. This is especially true if each XML file represents a brand-new record for our database.

Azure Service Bus
Not exactly the kind of Azure Service Bus I am talking about. “A Stagecoach A1 Service Transbus Trident bus in Ardrossan, Ayrshire. Taken by Ayrshire–77 20:58, 29 Apr 2005 (UTC).” Via Wikimedia Commons, released to public domain by its author.

Beware Abandoned Message Concurrency

This brings up the issue of message concurrency, which can be problematic when working on Azure Service Bus queues.

It may be that these XML files update existing database entries; perhaps, a single record can be updated many times by these XML files. In that case, processing the XML files in the right order is critical; we don’t want old data overwriting new data. (That is, we want to avoid race conditions.)

Because we need to process the XML files in the right order, we almost certainly want to discard (complete) any message that doesn’t process correctly on its first try, since reprocessing it later could overwrite newer data.

For example, suppose it takes 5 seconds for our worker role to process an XML file. A file received at 5:00:00 p.m. fails to correctly process, and is abandoned at 5:00:05 p.m.

Suppose, as that file was being processed, a newer XML file showed up at 5:00:04 p.m.

If the original file had properly processed, that wouldn’t be an issue; the older file’s values would have been superseded by the newer file, which is probably the next message in the queue.

Service bus message concurrency: proper processing
What proper message concurrency looks like if a previous message is properly processed.

But because we abandoned the older file’s message, and it is thus returned to the queue for processing on the worker processes’ next swing through, we’ve effectively inverted the processing order for these XML files.

Because Azure Service Bus messages are enqueued as last-in, first-out, once we abandon the older file’s message, our worker process will grab the newer XML file and process it … then grab the older file, because its message is older than the newer file’s message, and thus is processed after the newer file!

Race condition when service bus messages don't complete properly
Because Azure Service Bus queue messages are processed last-in, first-out, a race condition can occur if we don’t pop a poisoned message off our Service Bus Queue.

This probably wouldn’t be a problem if you’re not on a high-volume queue, and the chance that a newer file shows up while you’re processing an older file is low.

But if the chances are good that in the time it takes to process a message, a newer message can arrive that supersedes the message you’re working on, you almost certainly don’t want to abandon a poison message.

You probably want to complete it, log the failure (and somehow report it), since the next record update will be along shortly.

using System;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
using NLog;

namespace WorkerRoleWithSBQueue1
{
	public class WorkerRole : RoleEntryPoint
	{
		const string QueueName = "MY-QUEUE-NAME";

		QueueClient _client;
		readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);

		// I like to use NLog for logging; it has native support for Trace logging to Azure Diagnostics
		private static readonly Logger Log = LogManager.GetCurrentClassLogger();

		public override void Run()
		{
			Log.Trace("Starting processing of messages");
			_client.OnMessage(receivedMessage =>
			{
				try
				{
					// message is OK if try succeeds
					Log.Info("Processing Service Bus message: {0}", receivedMessage.Label);
					// your processing code here
					Log.Info("Processing of {0} complete.", receivedMessage.Label);
				}
				catch(Exception ex)
				{
					// Handle any message processing specific exceptions here
					Log.Error("Could not process {0}. Reason: {1}", receivedMessage.Label, ex.Message);
				}
				finally {
					// complete the message, regardless of success or failure
					receivedMessage.Complete();
				}
			});

			_completedEvent.WaitOne();
		}

		public override bool OnStart()
		{
			// Set the maximum number of concurrent connections 
			ServicePointManager.DefaultConnectionLimit = 12;

			// Create the queue if it does not exist already
			var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

			// Initialize the connection to Service Bus Queue
			_client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
			return base.OnStart();
		}

		public override void OnStop()
		{
			// Close the connection to Service Bus Queue
			_client.Close();
			_completedEvent.Set();
			base.OnStop();
		}
	}
}

Finally, let’s suppose you must process all messages in the order in which they were originally received. In other words, you need to avoid race conditions at all costs.

In that case, you’ll need to complete the original message, then put it back into the queue as a new message.

public override void Run()
{
	Log.Trace("Starting processing of messages");
	_client.OnMessage(receivedMessage =>
	{
		try
		{
			// message is OK if try succeeds
			Log.Info("Processing Service Bus message: {0}", receivedMessage.Label);
			// your processing code here
			Log.Info("Processing of {0} complete.", receivedMessage.Label);
		}
		catch(Exception ex)
		{
			// Handle any message processing specific exceptions here
			Log.Error("Could not process {0}. Reason: {1}", receivedMessage.Label, ex.Message);
			
			// recreate message and send it again
			var newMsg = new BrokeredMessage {
				Label = receivedMessage.Label;
			};
			_client.Send(newMsg);
		}
		finally {
			// complete the message, regardless of success or failure
			receivedMessage.Complete();
		}
	});

	_completedEvent.WaitOne();
}

The problem with this, unfortunately, is that it becomes quite difficult to determine if the message you are recycling is truly poisoned.

In other words, are you only having to try the message a handful of times, to get around a bottleneck? Or is your worker process on a fool’s errand, trying to accomplish a task that will never complete?

If so, the code as it appears above would put us, quite quickly, into an infinite loop.

So if you went with this route, you’ll almost certainly need to install a persistent counter — maybe, in a SQL Server table — to see if the message has made a lot of round trips, and stop trying after X number of tries.

Of course, this defeats the purpose of having a message queue; you might as well just use the SQL Server to hold onto the work to be done, and pull “messages” from a table.

Which leads us to the Service Bus dead-letter queue.

Dead letter queue settings
In the Azure management portal, you can set the default time to live for each Service Bus message in a given queue (red highlight), and optionally send expired messages to a dead letter subqueue (green hightlight).

The Service Bus Dead Letter Queue

Let’s change gears a bit and assume that each XML file represents a unique record, so concurrency is not an issue. Or, at the very least, we’re not tied to a workflow where all messages must be processed successfully before moving on to the next message.

In other words, I want to keep trying to process each message for some reasonable number of tries, expecting that trouble I run across is temporary.

After that period of reasonable attempts expires, I want to dump the message. But I also want to make note of the fact that I could not complete the message, so that I can audit the effect of missing that message and / or do something about it, either manually or automatically.

This functionality comes built-in through Service Bus, in the form of the dead-letter queue.

I can set a period of time on each message that I want as the “time to live.” Once that timespan has expired, the message is transferred from the main queue to the dead-letter subqueue, where it will remain until such time as I complete it from that subqueue.

Optionally, I can simply let messages expire. In some cases, it may make sense to simply let old messages exit the queue without notice, but I haven’t found one yet. Generally speaking, for audit purposes alone, I want to know if a message has expired, even if it’s not something I need to handle.

Handling the dead-letter queue is exactly the same as handling the master queue; you can peek, complete and abandon messages, exactly as though they were on the primary queue.

The biggest difference is that there is no time-to-live setting for a dead letter queue. Once a message enters it, there it stays until you complete it with some other process.

Also, there is no way, via the management portal, to see details about the messages in a queue or its dead-letter queue. Therefore, if you want to know anything about a message in a queue or its dead-letter subqueue, you need to handle that in code.

Fortunately, as previously mentioned, getting messages off the dead letter queue is exactly the same as pulling them off the primary queue, save appending /$DeadLetterQueue to the name of your queue:

using System;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
using NLog;

namespace WorkerRoleWithSBQueue1
{
	public class WorkerRole : RoleEntryPoint
	{
		const string QueueName = "MY-QUEUE-NAME/$DeadLetterQueue";

		QueueClient _client;
		readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);

		// I like to use NLog for logging; it has native support for Trace logging to Azure Diagnostics
		private static readonly Logger Log = LogManager.GetCurrentClassLogger();

		public override void Run()
		{
			Log.Trace("Starting processing of dead letter queue messages");
			_client.OnMessage(receivedMessage =>
			{
				try
				{
					// message is OK if try succeeds
					Log.Info("Dead letter {0} received", receivedMessage.Label);
					// your processing code here
					Log.Info("Processing of dead letter {0} complete.", receivedMessage.Label);
				}
				catch(Exception ex)
				{
					// Handle any message processing specific exceptions here
					Log.Error("Could not process dead letter {0}. Reason: {1}", receivedMessage.Label, ex.Message);
				}
				finally {
					// complete the message, regardless of success or failure
					receivedMessage.Complete();
				}
			});

			_completedEvent.WaitOne();
		}

		public override bool OnStart()
		{
			// Set the maximum number of concurrent connections 
			ServicePointManager.DefaultConnectionLimit = 12;

			// Create the queue if it does not exist already
			var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");

			// Initialize the connection to Service Bus Queue
			_client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
			return base.OnStart();
		}

		public override void OnStop()
		{
			// Close the connection to Service Bus Queue
			_client.Close();
			_completedEvent.Set();
			base.OnStop();
		}
	}
}

This code on github: https://github.com/dougvdotcom/azure-servicebus-poisoned-messages.

All links in this post on delicious: https://delicious.com/dougvdotcom/handling-poison-messages-in-an-azure-service-bus-queue

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

  • Check out the Commenting Guidelines before commenting, please!
  • Want to share code? Please put it into a GitHub Gist, CodePen or pastebin and link to that in your comment.
  • Just have a line or two of markup? Wrap them in an appropriate SyntaxHighlighter Evolved shortcode for your programming language, please!