namespace Hrynco.RabbitMq; using System; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; using RabbitMQ.Client.Events; /// /// Base class for RabbitMQ consumers. Handles connection management, manual ack, /// and retry with backoff before dead-lettering. /// Override to use a named instance. /// public abstract class RabbitMqConsumerBase : BackgroundService where TMessage : class, IRabbitMqMessage { private readonly IOptionsMonitor _options; private readonly ILogger _logger; private IConnection? _connection; private IChannel? _channel; protected RabbitMqConsumerBase(IOptionsMonitor options, ILogger logger) { _options = options; _logger = logger; } protected abstract string QueueName { get; } /// /// Name of the instance to use. /// Override to use a named instance when multiple RabbitMQ connections are configured. /// Defaults to (the unnamed instance). /// protected virtual string SettingsName => Options.DefaultName; protected virtual int MaxRetries => 3; protected virtual TimeSpan RetryDelay => TimeSpan.FromSeconds(5); private RabbitMqSettings Settings => _options.Get(SettingsName); protected abstract Task HandleMessageAsync(TMessage message, CancellationToken cancellationToken); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await EnsureConnectionAsync(stoppingToken); var consumer = new AsyncEventingBasicConsumer(_channel!); consumer.ReceivedAsync += async (_, args) => { await ProcessMessageAsync(args, stoppingToken); }; await _channel!.BasicConsumeAsync(queue: QueueName, autoAck: false, consumer: consumer, cancellationToken: stoppingToken); // Hold until cancellation — consumer events fire on the channel thread await Task.Delay(Timeout.Infinite, stoppingToken).ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing); } private async Task ProcessMessageAsync(BasicDeliverEventArgs args, CancellationToken cancellationToken) { TMessage? message = null; try { var json = Encoding.UTF8.GetString(args.Body.ToArray()); message = JsonSerializer.Deserialize(json); if (message is null) { _logger.LogWarning("Received null message on queue {Queue} — nacking without requeue", QueueName); await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); return; } } catch (JsonException ex) { _logger.LogError(ex, "Failed to deserialize message on queue {Queue} — nacking without requeue", QueueName); await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); return; } for (int attempt = 1; attempt <= MaxRetries; attempt++) { try { await HandleMessageAsync(message, cancellationToken); await _channel!.BasicAckAsync(args.DeliveryTag, multiple: false, cancellationToken: cancellationToken); return; } catch (Exception ex) when (attempt < MaxRetries) { _logger.LogWarning(ex, "Attempt {Attempt}/{Max} failed for message on queue {Queue} [CorrelationId={CorrelationId}] — retrying in {Delay}s", attempt, MaxRetries, QueueName, message.CorrelationContext?.CorrelationId, RetryDelay.TotalSeconds); await Task.Delay(RetryDelay, cancellationToken); } catch (Exception ex) { _logger.LogError(ex, "All {Max} attempts exhausted for message on queue {Queue} [CorrelationId={CorrelationId}] — nacking without requeue", MaxRetries, QueueName, message.CorrelationContext?.CorrelationId); await _channel!.BasicNackAsync(args.DeliveryTag, multiple: false, requeue: false, cancellationToken: cancellationToken); } } } private async Task EnsureConnectionAsync(CancellationToken cancellationToken) { var s = Settings; var factory = new ConnectionFactory { HostName = s.Host, Port = s.Port, UserName = s.User, Password = s.Password, VirtualHost = s.VirtualHost }; _connection = await factory.CreateConnectionAsync(cancellationToken); _channel = await _connection.CreateChannelAsync(cancellationToken: cancellationToken); await _channel.QueueDeclareAsync(QueueName, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken); await _channel.BasicQosAsync(prefetchSize: 0, prefetchCount: 1, global: false, cancellationToken: cancellationToken); _logger.LogInformation("RabbitMQ consumer connected to queue {Queue} (settings: {SettingsName})", QueueName, SettingsName); } public override void Dispose() { _channel?.DisposeAsync().AsTask().GetAwaiter().GetResult(); _connection?.DisposeAsync().AsTask().GetAwaiter().GetResult(); base.Dispose(); } }