namespace Hrynco.RabbitMq; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using RabbitMQ.Client; public sealed class RabbitMqPublisher : IRabbitMqPublisher { private readonly IOptionsMonitor _options; private readonly string _settingsName; private readonly ILogger _logger; /// Named options monitor for . /// Logger. /// /// Name of the instance to use. /// Defaults to (i.e. the unnamed instance). /// public RabbitMqPublisher( IOptionsMonitor options, ILogger logger, string? settingsName = null) { _options = options; _logger = logger; _settingsName = settingsName ?? Options.DefaultName; } private RabbitMqSettings Settings => _options.Get(_settingsName); public async Task PublishAsync(string queue, IRabbitMqMessage message, CancellationToken cancellationToken = default) { var factory = BuildFactory(Settings); await using var connection = await factory.CreateConnectionAsync(cancellationToken); await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken); await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false, cancellationToken: cancellationToken); var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)); var props = new BasicProperties { Persistent = true, ContentType = "application/json" }; await channel.BasicPublishAsync(exchange: "", routingKey: queue, mandatory: false, basicProperties: props, body: body, cancellationToken: cancellationToken); _logger.LogDebug("Published to queue {Queue} [CorrelationId={CorrelationId}]", queue, message.CorrelationContext?.CorrelationId); } private static ConnectionFactory BuildFactory(RabbitMqSettings s) => new() { HostName = s.Host, Port = s.Port, UserName = s.User, Password = s.Password, VirtualHost = s.VirtualHost }; }