Browse Source

update with background task for failed message processing

SRE-3019-using-claude-implement-the-azure-mail-queue-retry-feature
Opeyemi Alao 3 months ago
parent
commit
a1e613ca5a
No known key found for this signature in database
GPG Key ID: 8F3E63D26B40E836
  1. 193
      src/Admin/HostedServices/AzureQueueMailHostedService.cs

193
src/Admin/HostedServices/AzureQueueMailHostedService.cs

@ -2,6 +2,7 @@ @@ -2,6 +2,7 @@
#nullable disable
using System.Text.Json;
using System.Collections.Concurrent;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;
using Bit.Core.Models.Mail;
@ -11,6 +12,11 @@ using Bit.Core.Utilities; @@ -11,6 +12,11 @@ using Bit.Core.Utilities;
namespace Bit.Admin.HostedServices;
public record FailedMailMessage(MailQueueMessage Message, int RetryCount)
{
public DateTime LastAttemptTime { get; init; } = default;
};
public class AzureQueueMailHostedService : IHostedService
{
private readonly ILogger<AzureQueueMailHostedService> _logger;
@ -20,9 +26,9 @@ public class AzureQueueMailHostedService : IHostedService @@ -20,9 +26,9 @@ public class AzureQueueMailHostedService : IHostedService
private Task _executingTask;
private QueueClient _mailQueueClient;
private const int MaxRetryAttempts = 3;
private readonly TimeSpan[] RetryDelays = { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15) };
private readonly ConcurrentQueue<FailedMailMessage> _failedMessages = new();
private readonly TimeSpan[] RetryDelays = { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(9) };
private Task _failedMessageProcessingTask;
public AzureQueueMailHostedService(
ILogger<AzureQueueMailHostedService> logger,
@ -38,7 +44,9 @@ public class AzureQueueMailHostedService : IHostedService @@ -38,7 +44,9 @@ public class AzureQueueMailHostedService : IHostedService
{
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_executingTask = ExecuteAsync(_cts.Token);
return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask;
_failedMessageProcessingTask = ProcessFailedMessagesBackgroundAsync(_cts.Token);
return Task.WhenAny(_executingTask, _failedMessageProcessingTask).IsCompleted ?
Task.WhenAll(_executingTask, _failedMessageProcessingTask) : Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
@ -48,7 +56,14 @@ public class AzureQueueMailHostedService : IHostedService @@ -48,7 +56,14 @@ public class AzureQueueMailHostedService : IHostedService
return;
}
_cts.Cancel();
await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken));
var tasksToWait = new List<Task> { _executingTask };
if (_failedMessageProcessingTask != null)
{
tasksToWait.Add(_failedMessageProcessingTask);
}
await Task.WhenAny(Task.WhenAll(tasksToWait), Task.Delay(-1, cancellationToken));
cancellationToken.ThrowIfCancellationRequested();
}
@ -56,80 +71,168 @@ public class AzureQueueMailHostedService : IHostedService @@ -56,80 +71,168 @@ public class AzureQueueMailHostedService : IHostedService
{
_mailQueueClient = new QueueClient(_globalSettings.Mail.ConnectionString, "mail");
QueueMessage[] mailMessages;
while (!cancellationToken.IsCancellationRequested)
{
if (!(mailMessages = await RetrieveMessagesAsync()).Any())
var mailMessages = await RetrieveMessagesAsync();
if (!mailMessages.Any())
{
await Task.Delay(TimeSpan.FromSeconds(15));
await Task.Delay(TimeSpan.FromSeconds(15), cancellationToken);
continue;
}
foreach (var message in mailMessages)
// Process all messages concurrently
var processingTasks = mailMessages.Select(message => ProcessMessageAsync(message, cancellationToken));
await Task.WhenAll(processingTasks);
}
}
private async Task ProcessMessageAsync(QueueMessage message, CancellationToken cancellationToken)
{
try
{
using var document = JsonDocument.Parse(message.DecodeMessageText());
var root = document.RootElement;
var mailMessages = new List<MailQueueMessage>();
if (root.ValueKind == JsonValueKind.Array)
{
mailMessages.AddRange(root.Deserialize<List<MailQueueMessage>>());
}
else if (root.ValueKind == JsonValueKind.Object)
{
var success = await ProcessMessageWithRetryAsync(message, cancellationToken);
mailMessages.Add(root.Deserialize<MailQueueMessage>());
}
if (success || cancellationToken.IsCancellationRequested)
// Try to send each individual mail message
var failedMessages = new List<MailQueueMessage>();
foreach (var mailMessage in mailMessages)
{
try
{
await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
await _mailService.SendEnqueuedMailMessageAsync(mailMessage);
}
catch (Exception e)
{
_logger.LogWarning(e, "Failed to send individual email message. Will be re-enqueued for retry.");
failedMessages.Add(mailMessage);
}
}
if (cancellationToken.IsCancellationRequested)
// If all messages succeeded, delete the original message
if (!failedMessages.Any())
{
await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
_logger.LogInformation("Successfully processed all email messages in batch");
}
else
{
// Queue failed messages for re-enqueuing
foreach (var failedMessage in failedMessages)
{
break;
_failedMessages.Enqueue(new FailedMailMessage(failedMessage, 0));
}
// Delete the original message since we've extracted individual failures
await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt);
_logger.LogInformation("Processed batch with {SuccessCount} successful and {FailedCount} failed messages",
mailMessages.Count - failedMessages.Count, failedMessages.Count);
}
}
catch (Exception e)
{
_logger.LogError(e, "Failed to parse or process queue message. Message will be left in queue for retry.");
}
}
private async Task<bool> ProcessMessageWithRetryAsync(QueueMessage message, CancellationToken cancellationToken)
private async Task ProcessFailedMessagesBackgroundAsync(CancellationToken cancellationToken)
{
for (int attempt = 0; attempt < MaxRetryAttempts; attempt++)
const int maxRetryAttempts = 3;
const int pollingIntervalSeconds = 5;
while (!cancellationToken.IsCancellationRequested)
{
try
{
using var document = JsonDocument.Parse(message.DecodeMessageText());
var root = document.RootElement;
var messagesToRequeue = new List<FailedMailMessage>();
var processedCount = 0;
if (root.ValueKind == JsonValueKind.Array)
// Process failed messages in batches to avoid overwhelming the system
while (_failedMessages.TryDequeue(out var failedMessage) && processedCount < 10)
{
foreach (var mailQueueMessage in root.Deserialize<List<MailQueueMessage>>())
processedCount++;
// Calculate when this message should be retried based on its retry count
var nextRetryTime = CalculateNextRetryTime(failedMessage);
if (DateTime.UtcNow < nextRetryTime)
{
// Message is not ready for retry yet, put it back for later
messagesToRequeue.Add(failedMessage);
continue;
}
try
{
await _mailService.SendEnqueuedMailMessageAsync(failedMessage.Message);
_logger.LogInformation("Successfully sent previously failed email message on retry attempt {RetryCount}",
failedMessage.RetryCount + 1);
}
catch (Exception e)
{
await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage);
var newRetryCount = failedMessage.RetryCount + 1;
if (newRetryCount < maxRetryAttempts)
{
_logger.LogWarning(e, "Failed to send email on retry attempt {RetryCount}/{MaxRetryAttempts}. Will retry later.",
newRetryCount, maxRetryAttempts);
messagesToRequeue.Add(new FailedMailMessage(failedMessage.Message, newRetryCount)
{
LastAttemptTime = DateTime.UtcNow
});
}
else
{
_logger.LogError(e, "Failed to send email after {MaxRetryAttempts} retry attempts. Message will be permanently discarded.",
maxRetryAttempts);
}
}
}
else if (root.ValueKind == JsonValueKind.Object)
// Put messages that need more processing back into the queue
foreach (var messageToRequeue in messagesToRequeue)
{
var mailQueueMessage = root.Deserialize<MailQueueMessage>();
await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage);
_failedMessages.Enqueue(messageToRequeue);
}
_logger.LogInformation("Successfully sent email message after {Attempt} attempts", attempt + 1);
return true;
}
catch (Exception e)
{
var isLastAttempt = attempt == MaxRetryAttempts - 1;
if (isLastAttempt)
if (processedCount > 0)
{
_logger.LogError(e, "Failed to send email after {MaxAttempts} attempts. Message will be deleted from queue.", MaxRetryAttempts);
_logger.LogInformation("Background processor handled {ProcessedCount} failed messages", processedCount);
}
else
{
_logger.LogWarning(e, "Failed to send email on attempt {Attempt}/{MaxAttempts}. Retrying in {Delay}ms",
attempt + 1, MaxRetryAttempts, RetryDelays[attempt].TotalMilliseconds);
await Task.Delay(RetryDelays[attempt], cancellationToken);
if (cancellationToken.IsCancellationRequested)
{
return false;
}
}
// Wait before next polling cycle
await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds), cancellationToken);
}
catch (Exception e) when (!cancellationToken.IsCancellationRequested)
{
_logger.LogError(e, "Error in background failed message processor. Will retry in {IntervalSeconds}s.", pollingIntervalSeconds);
await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds), cancellationToken);
}
}
}
private DateTime CalculateNextRetryTime(FailedMailMessage failedMessage)
{
if (failedMessage.RetryCount == 0 || failedMessage.LastAttemptTime == default)
{
return DateTime.UtcNow; // First attempt or no previous attempt time
}
return false;
var delayIndex = Math.Min(failedMessage.RetryCount - 1, RetryDelays.Length - 1);
var delay = RetryDelays[delayIndex];
return failedMessage.LastAttemptTime.Add(delay);
}
private async Task<QueueMessage[]> RetrieveMessagesAsync()

Loading…
Cancel
Save