diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
index 2668402fa..10ece1b13 100644
--- a/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
+++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/Program.cs
@@ -45,7 +45,12 @@ public static async Task Main()
ListenToDeadLetterQueue(rabbitMQConfig);
- await pipeline.EnqueueAsync($"test {DateTimeOffset.Now:T}");
+ // Change ConcurrentThreads and PrefetchCount to 1 to see
+ // how they affect total execution time
+ for (int i = 1; i <= 3; i++)
+ {
+ await pipeline.EnqueueAsync($"test #{i} {DateTimeOffset.Now:T}");
+ }
while (true)
{
diff --git a/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
index 55ee25231..2afea6593 100644
--- a/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
+++ b/extensions/RabbitMQ/RabbitMQ.TestApplication/appsettings.json
@@ -14,8 +14,14 @@
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
+ // How many messages to process asynchronously at a time, in each queue
+ "ConcurrentThreads": 3,
+ // How many messages to fetch at a time from each queue
+ // The value should be higher than ConcurrentThreads
+ "PrefetchCount": 6,
// How many times to dequeue a messages and process before moving it to a poison queue
"MaxRetriesBeforePoisonQueue": 5,
+ "DelayBeforeRetryingMsecs": 750,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
}
diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
index 37163dc07..cffb0cd38 100644
--- a/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
+++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQConfig.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
+using Microsoft.Extensions.Logging;
#pragma warning disable IDE0130 // reduce number of "using" statements
// ReSharper disable once CheckNamespace - reduce number of "using" statements
@@ -16,7 +17,7 @@ public class RabbitMQConfig
///
/// TCP port for the connection, e.g. 5672
///
- public int Port { get; set; } = 0;
+ public int Port { get; set; } = 5672;
///
/// Authentication username
@@ -46,6 +47,22 @@ public class RabbitMQConfig
///
public bool SslEnabled { get; set; } = false;
+ ///
+ /// How many messages to process asynchronously at a time, in each queue.
+ /// Note that this applies to each queue, and each queue is used
+ /// for a specific pipeline step.
+ ///
+ public ushort ConcurrentThreads { get; set; } = 2;
+
+ ///
+ /// How many messages to fetch at a time from each queue.
+ /// The value should be higher than ConcurrentThreads to make sure each
+ /// thread has some work to do.
+ /// Note that this applies to each queue, and each queue is used
+ /// for a specific pipeline step.
+ ///
+ public ushort PrefetchCount { get; set; } = 3;
+
///
/// How many times to retry processing a message before moving it to a poison queue.
/// Example: a value of 20 means that a message will be processed up to 21 times.
@@ -54,6 +71,14 @@ public class RabbitMQConfig
///
public int MaxRetriesBeforePoisonQueue { get; set; } = 20;
+ ///
+ /// How long to wait before putting a message back to the queue in case of failure.
+ /// Note: currently a basic strategy not based on RabbitMQ exchanges, potentially
+ /// affecting the pipeline concurrency performance: consumers hold
+ /// messages for N msecs, slowing down the delivery of other messages.
+ ///
+ public int DelayBeforeRetryingMsecs { get; set; } = 500;
+
///
/// Suffix used for the poison queues.
///
@@ -62,7 +87,7 @@ public class RabbitMQConfig
///
/// Verify that the current state is valid.
///
- public void Validate()
+ public void Validate(ILogger? log = null)
{
const int MinTTLSecs = 5;
@@ -81,6 +106,11 @@ public void Validate()
throw new ConfigurationException($"RabbitMQ: {nameof(this.MessageTTLSecs)} value {this.MessageTTLSecs} is too low, cannot be less than {MinTTLSecs}");
}
+ if (this.ConcurrentThreads < 1)
+ {
+ throw new ConfigurationException($"RabbitMQ: {nameof(this.ConcurrentThreads)} value cannot be less than 1");
+ }
+
if (string.IsNullOrWhiteSpace(this.PoisonQueueSuffix) || this.PoisonQueueSuffix != $"{this.PoisonQueueSuffix}".Trim())
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} cannot be empty or have leading or trailing spaces");
@@ -102,5 +132,12 @@ public void Validate()
{
throw new ConfigurationException($"RabbitMQ: {nameof(this.PoisonQueueSuffix)} can be up to 60 characters length");
}
+
+#pragma warning disable CA2254
+ if (this.PrefetchCount < this.ConcurrentThreads)
+ {
+ log?.LogWarning($"The value of {nameof(this.PrefetchCount)} ({this.PrefetchCount}) should not be lower than the value of {nameof(this.ConcurrentThreads)} ({this.ConcurrentThreads})");
+ }
+#pragma warning restore CA2254
}
}
diff --git a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
index e51aaeba6..6fcc816bb 100644
--- a/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
+++ b/extensions/RabbitMQ/RabbitMQ/RabbitMQPipeline.cs
@@ -24,6 +24,8 @@ public sealed class RabbitMQPipeline : IQueue
private readonly AsyncEventingBasicConsumer _consumer;
private readonly RabbitMQConfig _config;
private readonly int _messageTTLMsecs;
+ private readonly int _delayBeforeRetryingMsecs;
+ private readonly int _maxAttempts;
private string _queueName = string.Empty;
private string _poisonQueueName = string.Empty;
@@ -32,20 +34,22 @@ public sealed class RabbitMQPipeline : IQueue
///
public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = null)
{
- this._config = config;
- this._config.Validate();
-
this._log = (loggerFactory ?? DefaultLogger.Factory).CreateLogger();
+ this._config = config;
+ this._config.Validate(this._log);
+
// see https://www.rabbitmq.com/dotnet-api-guide.html#consuming-async
var factory = new ConnectionFactory
{
+ ClientProvidedName = "KernelMemory",
HostName = config.Host,
Port = config.Port,
UserName = config.Username,
Password = config.Password,
VirtualHost = !string.IsNullOrWhiteSpace(config.VirtualHost) ? config.VirtualHost : "/",
DispatchConsumersAsync = true,
+ ConsumerDispatchConcurrency = config.ConcurrentThreads,
Ssl = new SslOption
{
Enabled = config.SslEnabled,
@@ -56,8 +60,11 @@ public RabbitMQPipeline(RabbitMQConfig config, ILoggerFactory? loggerFactory = n
this._messageTTLMsecs = config.MessageTTLSecs * 1000;
this._connection = factory.CreateConnection();
this._channel = this._connection.CreateModel();
- this._channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
+ this._channel.BasicQos(prefetchSize: 0, prefetchCount: config.PrefetchCount, global: false);
this._consumer = new AsyncEventingBasicConsumer(this._channel);
+
+ this._delayBeforeRetryingMsecs = Math.Max(0, this._config.DelayBeforeRetryingMsecs);
+ this._maxAttempts = Math.Max(0, this._config.MaxRetriesBeforePoisonQueue) + 1;
}
///
@@ -171,7 +178,6 @@ public void OnDequeue(Func> processMessageAction)
this._consumer.Received += async (object sender, BasicDeliverEventArgs args) =>
{
// Just for logging, extract the attempt number from the message headers
- var maxAttempts = this._config.MaxRetriesBeforePoisonQueue + 1;
var attemptNumber = 1;
if (args.BasicProperties?.Headers != null && args.BasicProperties.Headers.TryGetValue("x-delivery-count", out object? value))
{
@@ -181,7 +187,7 @@ public void OnDequeue(Func> processMessageAction)
try
{
this._log.LogDebug("Message '{0}' received, expires after {1}ms, attempt {2} of {3}",
- args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, maxAttempts);
+ args.BasicProperties?.MessageId, args.BasicProperties?.Expiration, attemptNumber, this._maxAttempts);
byte[] body = args.Body.ToArray();
string message = Encoding.UTF8.GetString(body);
@@ -194,15 +200,19 @@ public void OnDequeue(Func> processMessageAction)
}
else
{
- if (attemptNumber < maxAttempts)
+ if (attemptNumber < this._maxAttempts)
{
this._log.LogWarning("Message '{0}' failed to process (attempt {1} of {2}), putting message back in the queue",
- args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
+ args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
+ if (this._delayBeforeRetryingMsecs > 0)
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
+ }
}
else
{
this._log.LogError("Message '{0}' failed to process (attempt {1} of {2}), moving message to dead letter queue",
- args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
+ args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
}
// Note: if "requeue == false" the message would be moved to the dead letter exchange
@@ -217,8 +227,20 @@ public void OnDequeue(Func> processMessageAction)
// - failed to delete message from queue
// - failed to unlock message in the queue
- this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
- args.BasicProperties?.MessageId, attemptNumber, maxAttempts);
+ if (attemptNumber < this._maxAttempts)
+ {
+ this._log.LogWarning(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
+ args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
+ if (this._delayBeforeRetryingMsecs > 0)
+ {
+ await Task.Delay(TimeSpan.FromMilliseconds(this._delayBeforeRetryingMsecs)).ConfigureAwait(false);
+ }
+ }
+ else
+ {
+ this._log.LogError(e, "Message '{0}' processing failed with exception (attempt {1} of {2}), putting message back in the queue",
+ args.BasicProperties?.MessageId, attemptNumber, this._maxAttempts);
+ }
// TODO: verify and document what happens if this fails. RabbitMQ should automatically unlock messages.
// Note: if "requeue == false" the message would be moved to the dead letter exchange
diff --git a/service/Service/appsettings.json b/service/Service/appsettings.json
index 100b25119..9347adfb1 100644
--- a/service/Service/appsettings.json
+++ b/service/Service/appsettings.json
@@ -539,10 +539,17 @@
"VirtualHost": "/",
"MessageTTLSecs": 3600,
"SslEnabled": false,
+ // How many messages to process asynchronously at a time, in each queue
+ "ConcurrentThreads": 4,
+ // How many messages to fetch at a time from each queue
+ // The value should be higher than ConcurrentThreads
+ "PrefetchCount": 8,
// How many times to dequeue a messages and process before moving it to a poison queue
// Note: this value cannot be changed after queues have been created. In such case
// you might need to drain all queues, delete them, and restart the ingestion service(s).
"MaxRetriesBeforePoisonQueue": 20,
+ // How long to wait before putting a message back to the queue in case of failure
+ "DelayBeforeRetryingMsecs": 500,
// Suffix used for the poison queues.
"PoisonQueueSuffix": "-poison"
},