5b963b25a7
- Add Hrynco.RabbitMq class library with RabbitMQ client abstractions - Add Hrynco.RabbitMq.Tests xunit project - ImplicitUsings disabled on all projects Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
69 lines
2.4 KiB
C#
69 lines
2.4 KiB
C#
namespace Hrynco.RabbitMq;
|
|
|
|
using System.Text;
|
|
using System.Text.Json;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
using Microsoft.Extensions.Logging;
|
|
using Microsoft.Extensions.Options;
|
|
using RabbitMQ.Client;
|
|
|
|
public sealed class RabbitMqPublisher : IRabbitMqPublisher
|
|
{
|
|
private readonly IOptionsMonitor<RabbitMqSettings> _options;
|
|
private readonly string _settingsName;
|
|
private readonly ILogger<RabbitMqPublisher> _logger;
|
|
|
|
/// <param name="options">Named options monitor for <see cref="RabbitMqSettings"/>.</param>
|
|
/// <param name="logger">Logger.</param>
|
|
/// <param name="settingsName">
|
|
/// Name of the <see cref="RabbitMqSettings"/> instance to use.
|
|
/// Defaults to <see cref="Options.DefaultName"/> (i.e. the unnamed instance).
|
|
/// </param>
|
|
public RabbitMqPublisher(
|
|
IOptionsMonitor<RabbitMqSettings> options,
|
|
ILogger<RabbitMqPublisher> logger,
|
|
string? settingsName = null)
|
|
{
|
|
_options = options;
|
|
_logger = logger;
|
|
_settingsName = settingsName ?? Options.DefaultName;
|
|
}
|
|
|
|
private RabbitMqSettings Settings => _options.Get(_settingsName);
|
|
|
|
public async Task PublishAsync<TData>(string queue, IRabbitMqMessage<TData> message, CancellationToken cancellationToken = default)
|
|
{
|
|
var factory = BuildFactory(Settings);
|
|
|
|
await using var connection = await factory.CreateConnectionAsync(cancellationToken);
|
|
await using var channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
|
|
|
await channel.QueueDeclareAsync(queue, durable: true, exclusive: false, autoDelete: false,
|
|
cancellationToken: cancellationToken);
|
|
|
|
var body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
|
|
|
|
var props = new BasicProperties
|
|
{
|
|
Persistent = true,
|
|
ContentType = "application/json"
|
|
};
|
|
|
|
await channel.BasicPublishAsync(exchange: "", routingKey: queue, mandatory: false,
|
|
basicProperties: props, body: body, cancellationToken: cancellationToken);
|
|
|
|
_logger.LogDebug("Published to queue {Queue} [CorrelationId={CorrelationId}]",
|
|
queue, message.CorrelationContext?.CorrelationId);
|
|
}
|
|
|
|
private static ConnectionFactory BuildFactory(RabbitMqSettings s) => new()
|
|
{
|
|
HostName = s.Host,
|
|
Port = s.Port,
|
|
UserName = s.User,
|
|
Password = s.Password,
|
|
VirtualHost = s.VirtualHost
|
|
};
|
|
}
|