diff --git a/src/Admin/HostedServices/AzureQueueMailHostedService.cs b/src/Admin/HostedServices/AzureQueueMailHostedService.cs index 246879d555..858068071c 100644 --- a/src/Admin/HostedServices/AzureQueueMailHostedService.cs +++ b/src/Admin/HostedServices/AzureQueueMailHostedService.cs @@ -2,6 +2,7 @@ #nullable disable using System.Text.Json; +using System.Collections.Concurrent; using Azure.Storage.Queues; using Azure.Storage.Queues.Models; using Bit.Core.Models.Mail; @@ -11,6 +12,11 @@ using Bit.Core.Utilities; namespace Bit.Admin.HostedServices; +public record FailedMailMessage(MailQueueMessage Message, int RetryCount) +{ + public DateTime LastAttemptTime { get; init; } = default; +}; + public class AzureQueueMailHostedService : IHostedService { private readonly ILogger _logger; @@ -20,9 +26,9 @@ public class AzureQueueMailHostedService : IHostedService private Task _executingTask; private QueueClient _mailQueueClient; - - private const int MaxRetryAttempts = 3; - private readonly TimeSpan[] RetryDelays = { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15) }; + private readonly ConcurrentQueue _failedMessages = new(); + private readonly TimeSpan[] RetryDelays = { TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(9) }; + private Task _failedMessageProcessingTask; public AzureQueueMailHostedService( ILogger logger, @@ -38,7 +44,9 @@ public class AzureQueueMailHostedService : IHostedService { _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); _executingTask = ExecuteAsync(_cts.Token); - return _executingTask.IsCompleted ? _executingTask : Task.CompletedTask; + _failedMessageProcessingTask = ProcessFailedMessagesBackgroundAsync(_cts.Token); + return Task.WhenAny(_executingTask, _failedMessageProcessingTask).IsCompleted ? + Task.WhenAll(_executingTask, _failedMessageProcessingTask) : Task.CompletedTask; } public async Task StopAsync(CancellationToken cancellationToken) @@ -48,7 +56,14 @@ public class AzureQueueMailHostedService : IHostedService return; } _cts.Cancel(); - await Task.WhenAny(_executingTask, Task.Delay(-1, cancellationToken)); + + var tasksToWait = new List { _executingTask }; + if (_failedMessageProcessingTask != null) + { + tasksToWait.Add(_failedMessageProcessingTask); + } + + await Task.WhenAny(Task.WhenAll(tasksToWait), Task.Delay(-1, cancellationToken)); cancellationToken.ThrowIfCancellationRequested(); } @@ -56,80 +71,168 @@ public class AzureQueueMailHostedService : IHostedService { _mailQueueClient = new QueueClient(_globalSettings.Mail.ConnectionString, "mail"); - QueueMessage[] mailMessages; while (!cancellationToken.IsCancellationRequested) { - if (!(mailMessages = await RetrieveMessagesAsync()).Any()) + var mailMessages = await RetrieveMessagesAsync(); + + if (!mailMessages.Any()) { - await Task.Delay(TimeSpan.FromSeconds(15)); + await Task.Delay(TimeSpan.FromSeconds(15), cancellationToken); + continue; } - foreach (var message in mailMessages) + // Process all messages concurrently + var processingTasks = mailMessages.Select(message => ProcessMessageAsync(message, cancellationToken)); + await Task.WhenAll(processingTasks); + } + } + + private async Task ProcessMessageAsync(QueueMessage message, CancellationToken cancellationToken) + { + try + { + using var document = JsonDocument.Parse(message.DecodeMessageText()); + var root = document.RootElement; + + var mailMessages = new List(); + + if (root.ValueKind == JsonValueKind.Array) + { + mailMessages.AddRange(root.Deserialize>()); + } + else if (root.ValueKind == JsonValueKind.Object) { - var success = await ProcessMessageWithRetryAsync(message, cancellationToken); + mailMessages.Add(root.Deserialize()); + } - if (success || cancellationToken.IsCancellationRequested) + // Try to send each individual mail message + var failedMessages = new List(); + + foreach (var mailMessage in mailMessages) + { + try { - await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); + await _mailService.SendEnqueuedMailMessageAsync(mailMessage); } + catch (Exception e) + { + _logger.LogWarning(e, "Failed to send individual email message. Will be re-enqueued for retry."); + failedMessages.Add(mailMessage); + } + } - if (cancellationToken.IsCancellationRequested) + // If all messages succeeded, delete the original message + if (!failedMessages.Any()) + { + await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); + _logger.LogInformation("Successfully processed all email messages in batch"); + } + else + { + // Queue failed messages for re-enqueuing + foreach (var failedMessage in failedMessages) { - break; + _failedMessages.Enqueue(new FailedMailMessage(failedMessage, 0)); } + + // Delete the original message since we've extracted individual failures + await _mailQueueClient.DeleteMessageAsync(message.MessageId, message.PopReceipt); + _logger.LogInformation("Processed batch with {SuccessCount} successful and {FailedCount} failed messages", + mailMessages.Count - failedMessages.Count, failedMessages.Count); } } + catch (Exception e) + { + _logger.LogError(e, "Failed to parse or process queue message. Message will be left in queue for retry."); + } } - private async Task ProcessMessageWithRetryAsync(QueueMessage message, CancellationToken cancellationToken) + private async Task ProcessFailedMessagesBackgroundAsync(CancellationToken cancellationToken) { - for (int attempt = 0; attempt < MaxRetryAttempts; attempt++) + const int maxRetryAttempts = 3; + const int pollingIntervalSeconds = 5; + + while (!cancellationToken.IsCancellationRequested) { try { - using var document = JsonDocument.Parse(message.DecodeMessageText()); - var root = document.RootElement; + var messagesToRequeue = new List(); + var processedCount = 0; - if (root.ValueKind == JsonValueKind.Array) + // Process failed messages in batches to avoid overwhelming the system + while (_failedMessages.TryDequeue(out var failedMessage) && processedCount < 10) { - foreach (var mailQueueMessage in root.Deserialize>()) + processedCount++; + + // Calculate when this message should be retried based on its retry count + var nextRetryTime = CalculateNextRetryTime(failedMessage); + + if (DateTime.UtcNow < nextRetryTime) + { + // Message is not ready for retry yet, put it back for later + messagesToRequeue.Add(failedMessage); + continue; + } + + try + { + await _mailService.SendEnqueuedMailMessageAsync(failedMessage.Message); + _logger.LogInformation("Successfully sent previously failed email message on retry attempt {RetryCount}", + failedMessage.RetryCount + 1); + } + catch (Exception e) { - await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage); + var newRetryCount = failedMessage.RetryCount + 1; + + if (newRetryCount < maxRetryAttempts) + { + _logger.LogWarning(e, "Failed to send email on retry attempt {RetryCount}/{MaxRetryAttempts}. Will retry later.", + newRetryCount, maxRetryAttempts); + messagesToRequeue.Add(new FailedMailMessage(failedMessage.Message, newRetryCount) + { + LastAttemptTime = DateTime.UtcNow + }); + } + else + { + _logger.LogError(e, "Failed to send email after {MaxRetryAttempts} retry attempts. Message will be permanently discarded.", + maxRetryAttempts); + } } } - else if (root.ValueKind == JsonValueKind.Object) + + // Put messages that need more processing back into the queue + foreach (var messageToRequeue in messagesToRequeue) { - var mailQueueMessage = root.Deserialize(); - await _mailService.SendEnqueuedMailMessageAsync(mailQueueMessage); + _failedMessages.Enqueue(messageToRequeue); } - _logger.LogInformation("Successfully sent email message after {Attempt} attempts", attempt + 1); - return true; - } - catch (Exception e) - { - var isLastAttempt = attempt == MaxRetryAttempts - 1; - - if (isLastAttempt) + if (processedCount > 0) { - _logger.LogError(e, "Failed to send email after {MaxAttempts} attempts. Message will be deleted from queue.", MaxRetryAttempts); + _logger.LogInformation("Background processor handled {ProcessedCount} failed messages", processedCount); } - else - { - _logger.LogWarning(e, "Failed to send email on attempt {Attempt}/{MaxAttempts}. Retrying in {Delay}ms", - attempt + 1, MaxRetryAttempts, RetryDelays[attempt].TotalMilliseconds); - await Task.Delay(RetryDelays[attempt], cancellationToken); - - if (cancellationToken.IsCancellationRequested) - { - return false; - } - } + // Wait before next polling cycle + await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds), cancellationToken); } + catch (Exception e) when (!cancellationToken.IsCancellationRequested) + { + _logger.LogError(e, "Error in background failed message processor. Will retry in {IntervalSeconds}s.", pollingIntervalSeconds); + await Task.Delay(TimeSpan.FromSeconds(pollingIntervalSeconds), cancellationToken); + } + } + } + + private DateTime CalculateNextRetryTime(FailedMailMessage failedMessage) + { + if (failedMessage.RetryCount == 0 || failedMessage.LastAttemptTime == default) + { + return DateTime.UtcNow; // First attempt or no previous attempt time } - return false; + var delayIndex = Math.Min(failedMessage.RetryCount - 1, RetryDelays.Length - 1); + var delay = RetryDelays[delayIndex]; + return failedMessage.LastAttemptTime.Add(delay); } private async Task RetrieveMessagesAsync()