create a sql server queue processor in csharp

main.cs
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;

public class QueueProcessor
{
    private readonly string _connectionString;
    private readonly string _queueName;

    public QueueProcessor(string connectionString, string queueName)
    {
        _connectionString = connectionString;
        _queueName = queueName;
    }

    public async Task StartAsync(Func<string, Task> messageHandler)
    {
        while (true)
        {
            using (var connection = new SqlConnection(_connectionString))
            {
                await connection.OpenAsync();

                using (var command = new SqlCommand($"RECEIVE TOP(1) * FROM {_queueName} WITH (UPDLOCK, READPAST)", connection))
                {
                    var message = await GetMessageAsync(command);

                    if (message != null)
                    {
                        try
                        {
                            await messageHandler(message);
                            await CompleteMessageAsync(connection, message);
                        }
                        catch
                        {
                            await AbandonMessageAsync(connection, message);
                            throw;
                        }
                    }
                }
            }
        }
    }

    private async Task<Message> GetMessageAsync(SqlCommand command)
    {
        using (var reader = await command.ExecuteReaderAsync())
        {
            if (!reader.Read())
            {
                return null;
            }

            return new Message
            {
                Id = reader.GetInt64(0),
                Body = reader.GetString(1),
                LockedUntil = reader.GetDateTime(2)
            };
        }
    }

    private async Task CompleteMessageAsync(SqlConnection connection, Message message)
    {
        using (var command = new SqlCommand($"DELETE FROM {_queueName} WHERE Id = @id", connection))
        {
            command.Parameters.AddWithValue("@id", message.Id);
            await command.ExecuteNonQueryAsync();
        }
    }

    private async Task AbandonMessageAsync(SqlConnection connection, Message message)
    {
        using (var command = new SqlCommand($"UPDATE {_queueName} SET LockedUntil = DATEADD(SECOND, 30, GETUTCDATE()) WHERE Id = @id", connection))
        {
            command.Parameters.AddWithValue("@id", message.Id);
            await command.ExecuteNonQueryAsync();
        }
    }

    private class Message
    {
        public long Id { get; set; }
        public string Body { get; set; }
        public DateTime LockedUntil { get; set; }
    }
}
2685 chars
89 lines

To use this code, create a new QueueProcessor instance and pass in the connection string to your SQL Server instance and the name of the queue you want to process. Then call the StartAsync method, passing in a function that will be called for each message in the queue. The message will be passed to the function as a string. The QueueProcessor class will handle locking and deleting the message from the queue after it has been successfully processed. If the function throws an exception, the message will be unlocked and returned to the head of the queue.

gistlibby LogSnag