feat: add RabbitMQ worker, contracts, usage UI in channels screen
- Add HrynCo.NotificationService.Contracts project with SendEmailMessage and NotificationResultMessage - Add SendEmailConsumer (RabbitMQ worker) with reply-to pattern via CorrelationContext.ReplyTo - Add SendEmailHandler owning SMTP send + usage increment as business logic - Add GetChannelUsageSummaryHandler with single DB query via navigation property - Merge usage stats inline into channels list (daily/monthly with progress bars) - Refactor AdminChannelsController.Index to use GetChannelUsageSummaryQuery - Add RabbitMQ service to docker-compose files - Remove dead AdminChannelUsageController, ChannelUsageViewModel, ChannelUsageSummary Ref: IT-628 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
using Hrynco.RabbitMq;
|
||||
|
||||
namespace HrynCo.NotificationService.Worker;
|
||||
|
||||
public sealed class AppSettings
|
||||
@@ -5,4 +7,5 @@ public sealed class AppSettings
|
||||
public const string SectionName = "App";
|
||||
|
||||
public string ConnectionString { get; init; } = string.Empty;
|
||||
public RabbitMqSettings RabbitMq { get; init; } = null!;
|
||||
}
|
||||
@@ -9,6 +9,8 @@
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" />
|
||||
<PackageReference Include="HrynCo.RabbitMq" />
|
||||
<PackageReference Include="MediatR" />
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" />
|
||||
<PackageReference Include="Serilog.Settings.Configuration" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" />
|
||||
@@ -16,6 +18,7 @@
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\HrynCo.NotificationService.Contracts\HrynCo.NotificationService.Contracts.csproj" />
|
||||
<ProjectReference Include="..\HrynCo.NotificationService.Services\HrynCo.NotificationService.Services.csproj" />
|
||||
<ProjectReference Include="..\HrynCo.NotificationService.DAL.EF\HrynCo.NotificationService.DAL.EF.csproj" />
|
||||
</ItemGroup>
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
using HrynCo.NotificationService.DAL.EF;
|
||||
using HrynCo.NotificationService.Services;
|
||||
using HrynCo.NotificationService.Worker;
|
||||
using Hrynco.RabbitMq;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
var builder = Host.CreateApplicationBuilder(args);
|
||||
|
||||
@@ -13,7 +15,11 @@ var appSettings = builder.Configuration
|
||||
builder.Services.AddSingleton(appSettings);
|
||||
builder.Services.AddNotificationDataAccess(appSettings.ConnectionString);
|
||||
builder.Services.AddNotificationServices();
|
||||
builder.Services.AddHostedService<Worker>();
|
||||
|
||||
builder.Services.Configure<RabbitMqSettings>(
|
||||
builder.Configuration.GetSection($"{AppSettings.SectionName}:RabbitMq"));
|
||||
|
||||
builder.Services.AddHostedService<SendEmailConsumer>();
|
||||
|
||||
var host = builder.Build();
|
||||
host.Run();
|
||||
@@ -0,0 +1,3 @@
|
||||
namespace HrynCo.NotificationService.Worker;
|
||||
|
||||
public record RenderedEmail(string Subject, string HtmlBody, string TextBody);
|
||||
@@ -0,0 +1,174 @@
|
||||
namespace HrynCo.NotificationService.Worker;
|
||||
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using HrynCo.NotificationService.Contracts.Messages;
|
||||
using HrynCo.NotificationService.DAL.Abstract.Providers;
|
||||
using HrynCo.NotificationService.DAL.Abstract.Repositories;
|
||||
using HrynCo.NotificationService.DAL.Abstract.Templates;
|
||||
using HrynCo.NotificationService.Services.EmailChannels.Send;
|
||||
using Hrynco.RabbitMq;
|
||||
using MediatR;
|
||||
using Microsoft.Extensions.Options;
|
||||
using RabbitMQ.Client;
|
||||
|
||||
public sealed class SendEmailConsumer(
|
||||
IOptionsMonitor<RabbitMqSettings> options,
|
||||
IEmailChannelRepository channelRepository,
|
||||
IEmailTemplateRepository templateRepository,
|
||||
IEmailChannelUsageRepository usageRepository,
|
||||
IMediator mediator,
|
||||
AppSettings appSettings,
|
||||
ILogger<SendEmailConsumer> logger)
|
||||
: RabbitMqConsumerBase<SendEmailMessage, SendEmailMessageData>(options, logger)
|
||||
{
|
||||
private const string IncomingQueue = "notification.send-email";
|
||||
|
||||
protected override string QueueName => IncomingQueue;
|
||||
|
||||
protected override async Task HandleMessageAsync(SendEmailMessage message, CancellationToken cancellationToken)
|
||||
{
|
||||
var data = message.Data;
|
||||
|
||||
logger.LogInformation(
|
||||
"Processing SendEmail for service={Service} template={Template} recipient={Recipient} [CorrelationId={CorrelationId}]",
|
||||
data.ServiceName, data.TemplateKey, data.RecipientEmail, message.CorrelationContext?.CorrelationId);
|
||||
|
||||
var channel = await ResolveChannelAsync(data.ServiceName, cancellationToken);
|
||||
var template = await ResolveTemplateAsync(data.ServiceName, data.TemplateKey, data.LanguageCode, cancellationToken);
|
||||
|
||||
await EnforceLimitsAsync(channel, cancellationToken);
|
||||
|
||||
var rendered = RenderTemplate(template, data);
|
||||
|
||||
var sendResult = await mediator.Send(
|
||||
new SendEmailCommand(channel.Id, data.RecipientEmail, data.RecipientName,
|
||||
rendered.Subject, rendered.HtmlBody, rendered.TextBody),
|
||||
cancellationToken);
|
||||
|
||||
if (!sendResult.IsSuccess)
|
||||
throw new InvalidOperationException(sendResult.Error?.Message ?? "Send failed.");
|
||||
|
||||
logger.LogInformation(
|
||||
"Email sent successfully service={Service} template={Template} recipient={Recipient}",
|
||||
data.ServiceName, data.TemplateKey, data.RecipientEmail);
|
||||
|
||||
await PublishResultAsync(message.CorrelationContext, data, errorMessage: null, cancellationToken);
|
||||
}
|
||||
|
||||
private async Task<EmailChannel> ResolveChannelAsync(string serviceName, CancellationToken ct)
|
||||
{
|
||||
var channels = await channelRepository.GetByServiceAsync(serviceName, ct);
|
||||
|
||||
return channels
|
||||
.Where(c => c.IsActive)
|
||||
.OrderBy(c => c.Priority)
|
||||
.FirstOrDefault()
|
||||
?? throw new InvalidOperationException(
|
||||
$"No active email channel found for service '{serviceName}'.");
|
||||
}
|
||||
|
||||
private async Task<EmailTemplate> ResolveTemplateAsync(
|
||||
string serviceName, string templateKey, string? languageCode, CancellationToken ct)
|
||||
{
|
||||
var lang = string.IsNullOrWhiteSpace(languageCode) ? "en" : languageCode;
|
||||
var template = await templateRepository.GetAsync(serviceName, templateKey, lang, ct);
|
||||
|
||||
if (template is null && lang != "en")
|
||||
template = await templateRepository.GetAsync(serviceName, templateKey, "en", ct);
|
||||
|
||||
return template
|
||||
?? throw new InvalidOperationException(
|
||||
$"Template not found: service='{serviceName}' key='{templateKey}' language='{lang}'.");
|
||||
}
|
||||
|
||||
private async Task EnforceLimitsAsync(EmailChannel channel, CancellationToken ct)
|
||||
{
|
||||
var today = DateOnly.FromDateTime(DateTime.UtcNow);
|
||||
|
||||
if (channel.DailyLimit.HasValue)
|
||||
{
|
||||
var daily = await usageRepository.GetDailyCountAsync(channel.Id, today, ct);
|
||||
if (daily >= channel.DailyLimit.Value)
|
||||
throw new InvalidOperationException(
|
||||
$"Channel '{channel.Id}' daily limit of {channel.DailyLimit.Value} reached ({daily} sent today).");
|
||||
}
|
||||
|
||||
if (channel.MonthlyLimit.HasValue)
|
||||
{
|
||||
var monthly = await usageRepository.GetMonthlyCountAsync(channel.Id, today.Year, today.Month, ct);
|
||||
if (monthly >= channel.MonthlyLimit.Value)
|
||||
throw new InvalidOperationException(
|
||||
$"Channel '{channel.Id}' monthly limit of {channel.MonthlyLimit.Value} reached ({monthly} sent this month).");
|
||||
}
|
||||
}
|
||||
|
||||
private static RenderedEmail RenderTemplate(EmailTemplate template, SendEmailMessageData data)
|
||||
{
|
||||
return new RenderedEmail(
|
||||
Interpolate(template.Subject, data.Variables),
|
||||
Interpolate(template.HtmlBody, data.Variables),
|
||||
Interpolate(template.TextBody, data.Variables));
|
||||
}
|
||||
|
||||
private static string Interpolate(string text, IReadOnlyDictionary<string, string> variables)
|
||||
{
|
||||
var sb = new StringBuilder(text);
|
||||
foreach (var (key, value) in variables)
|
||||
sb.Replace($"{{{{{key}}}}}", value);
|
||||
return sb.ToString();
|
||||
}
|
||||
|
||||
private async Task PublishResultAsync(
|
||||
CorrelationContext? correlationContext,
|
||||
SendEmailMessageData data,
|
||||
string? errorMessage,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var replyTo = correlationContext?.ReplyTo;
|
||||
if (string.IsNullOrWhiteSpace(replyTo))
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
var result = new NotificationResultMessage
|
||||
{
|
||||
CorrelationContext = (correlationContext ?? new CorrelationContext { CorrelationId = Guid.NewGuid().ToString() }) with { ReplyTo = null },
|
||||
Data = new NotificationResultData
|
||||
{
|
||||
ServiceName = data.ServiceName,
|
||||
RecipientEmail = data.RecipientEmail,
|
||||
TemplateKey = data.TemplateKey,
|
||||
Timestamp = DateTimeOffset.UtcNow,
|
||||
ErrorMessage = errorMessage
|
||||
}
|
||||
};
|
||||
|
||||
byte[] body = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(result));
|
||||
|
||||
var factory = new ConnectionFactory
|
||||
{
|
||||
HostName = appSettings.RabbitMq.Host,
|
||||
Port = appSettings.RabbitMq.Port,
|
||||
UserName = appSettings.RabbitMq.User,
|
||||
Password = appSettings.RabbitMq.Password,
|
||||
VirtualHost = appSettings.RabbitMq.VirtualHost
|
||||
};
|
||||
|
||||
await using var conn = await factory.CreateConnectionAsync(ct);
|
||||
await using var ch = await conn.CreateChannelAsync(cancellationToken: ct);
|
||||
|
||||
await ch.QueueDeclareAsync(replyTo, durable: true, exclusive: false, autoDelete: false,
|
||||
cancellationToken: ct);
|
||||
await ch.BasicPublishAsync(exchange: string.Empty, routingKey: replyTo, body: body,
|
||||
cancellationToken: ct);
|
||||
|
||||
logger.LogDebug("Result published to reply queue '{Queue}' [CorrelationId={CorrelationId}]",
|
||||
replyTo, correlationContext?.CorrelationId);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogWarning(ex, "Failed to publish notification result to reply queue '{Queue}'", replyTo);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,16 +0,0 @@
|
||||
namespace HrynCo.NotificationService.Worker;
|
||||
|
||||
public class Worker(ILogger<Worker> logger) : BackgroundService
|
||||
{
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
if (logger.IsEnabled(LogLevel.Information))
|
||||
{
|
||||
logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
|
||||
}
|
||||
await Task.Delay(1000, stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,13 @@
|
||||
{
|
||||
"App": {
|
||||
"ConnectionString": "Host=localhost;Port=5432;Database=notification_service;Username=postgres;Password=postgres"
|
||||
"ConnectionString": "Host=localhost;Port=5432;Database=notification_service;Username=postgres;Password=postgres",
|
||||
"RabbitMq": {
|
||||
"Host": "localhost",
|
||||
"Port": 5672,
|
||||
"User": "guest",
|
||||
"Password": "guest",
|
||||
"VirtualHost": "/"
|
||||
}
|
||||
},
|
||||
"Serilog": {
|
||||
"MinimumLevel": {
|
||||
|
||||
Reference in New Issue
Block a user