namespace Hrynco.RabbitMq;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
///
/// Base class for RabbitMQ consumers. Handles connection management, manual ack,
/// and retry with backoff before dead-lettering.
/// Override to use a named instance.
///
public abstract class RabbitMqConsumerBase : BackgroundService
where TMessage : class, IRabbitMqMessage
{
private readonly IOptionsMonitor _options;
private readonly ILogger _logger;
private IConnection? _connection;
private IChannel? _channel;
protected RabbitMqConsumerBase(IOptionsMonitor options, ILogger logger)
{
_options = options;
_logger = logger;
}
protected abstract string QueueName { get; }
///
/// Name of the instance to use.
/// Override to use a named instance when multiple RabbitMQ connections are configured.
/// Defaults to (the unnamed instance).
///
protected virtual string SettingsName => Options.DefaultName;
protected virtual int MaxRetries => 3;
protected virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5);
private RabbitMqSettings Settings => _options.Get(SettingsName);
protected abstract Task HandleMessageAsync(TMessage message, CancellationToken cancellationToken);
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await EnsureConnectionAsync(stoppingToken);
var consumer = new AsyncEventingBasicConsumer(_channel!);
consumer.ReceivedAsync += async (_, args) =>
{
await ProcessMessageAsync(args, stoppingToken);
};
await _channel!.BasicConsumeAsync(queue: QueueName, autoAck: false, consumer: consumer,
cancellationToken: stoppingToken);
// Hold until cancellation — consumer events fire on the channel thread
await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
}
private async Task ProcessMessageAsync(BasicDeliverEventArgs args, CancellationToken cancellationToken)
{
TMessage? message = null;
try
{
var json = Encoding.UTF8.GetString(args.Body.ToArray());
message = JsonSerializer.Deserialize(json);
if (message is null)
{
_logger.LogWarning("Received null message on queue {Queue} — nacking without requeue", QueueName);
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
return;
}
}
catch (JsonException ex)
{
_logger.LogError(ex, "Failed to deserialize message on queue {Queue} — nacking without requeue", QueueName);
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
return;
}
for (int attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
await HandleMessageAsync(message, cancellationToken);
await _channel!.BasicAckAsync(args.DeliveryTag, multiple: false, cancellationToken: cancellationToken);
return;
}
catch (Exception ex) when (attempt < MaxRetries)
{
_logger.LogWarning(ex,
"Attempt {Attempt}/{Max} failed for message on queue {Queue} [CorrelationId={CorrelationId}] — retrying in {Delay}s",
attempt, MaxRetries, QueueName, message.CorrelationContext?.CorrelationId, RetryDelay.TotalSeconds);
await Task.Delay(RetryDelay, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"All {Max} attempts exhausted for message on queue {Queue} [CorrelationId={CorrelationId}] — nacking without requeue",
MaxRetries, QueueName, message.CorrelationContext?.CorrelationId);
await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken);
}
}
}
private async Task EnsureConnectionAsync(CancellationToken cancellationToken)
{
var s = Settings;
var factory = new ConnectionFactory
{
HostName = s.Host,
Port = s.Port,
UserName = s.User,
Password = s.Password,
VirtualHost = s.VirtualHost
};
_connection = await factory.CreateConnectionAsync(cancellationToken);
_channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken);
await _channel.QueueDeclareAsync(QueueName, durable: true, exclusive: false, autoDelete: false,
cancellationToken: cancellationToken);
await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false,
cancellationToken: cancellationToken);
_logger.LogInformation("RabbitMQ consumer connected to queue {Queue} (settings: {SettingsName})",
QueueName, SettingsName);
}
public override void Dispose()
{
_channel?.DisposeAsync().AsTask().GetAwaiter().GetResult();
_connection?.DisposeAsync().AsTask().GetAwaiter().GetResult();
base.Dispose();
}
}