From 5423b32fd94d4222c53e6ad1c683f90fd9273bec Mon Sep 17 00:00:00 2001 From: Guilherme Ferreira Date: Mon, 20 Nov 2023 12:56:29 +0000 Subject: [PATCH] docs: sample using wildcard for a topic name --- .../KafkaFlow.Sample.WildcardConsumer.csproj | 27 +++++++ .../PrintConsoleMiddleware.cs | 18 +++++ .../Program.cs | 70 +++++++++++++++++++ .../IConsumerConfigurationBuilder.cs | 3 +- src/KafkaFlow.sln | 7 ++ website/docs/getting-started/samples.md | 6 ++ .../docs/guides/consumers/add-consumers.md | 5 ++ 7 files changed, 135 insertions(+), 1 deletion(-) create mode 100644 samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj create mode 100644 samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs create mode 100644 samples/KafkaFlow.Sample.WildcardConsumer/Program.cs diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj new file mode 100644 index 000000000..05d9e4f29 --- /dev/null +++ b/samples/KafkaFlow.Sample.WildcardConsumer/KafkaFlow.Sample.WildcardConsumer.csproj @@ -0,0 +1,27 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + + + + + + + + diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs b/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs new file mode 100644 index 000000000..d33c68d80 --- /dev/null +++ b/samples/KafkaFlow.Sample.WildcardConsumer/PrintConsoleMiddleware.cs @@ -0,0 +1,18 @@ +using System.Text; +using KafkaFlow; + +public class PrintConsoleMiddleware : IMessageMiddleware +{ + public Task Invoke(IMessageContext context, MiddlewareDelegate next) + { + Console.WriteLine( + "Topic: {0} | Partition: {1} | Offset: {2} | Message: {3}", + context.ConsumerContext.Topic, + context.ConsumerContext.Partition, + context.ConsumerContext.Offset, + Encoding.UTF8.GetString( + (byte[])context.Message.Value)); + + return next(context); + } +} \ No newline at end of file diff --git a/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs new file mode 100644 index 000000000..32d13df22 --- /dev/null +++ b/samples/KafkaFlow.Sample.WildcardConsumer/Program.cs @@ -0,0 +1,70 @@ +using System.Text; +using Confluent.Kafka; +using KafkaFlow; +using KafkaFlow.Producers; +using Microsoft.Extensions.DependencyInjection; +using AutoOffsetReset = KafkaFlow.AutoOffsetReset; + +var services = new ServiceCollection(); + +const string producerName = "RandomProducer"; +const string topicPrefix = "random-topic-"; + +services.AddKafka( + kafka => kafka + .UseConsoleLog() + .AddCluster( + cluster => cluster + .WithBrokers(new[] { "localhost:9092" }) + .AddProducer( + producerName, _ => { }) + .AddConsumer( + consumer => consumer + .Topic($"^{topicPrefix}*") // Any topic starting with `random-topic-*` + .WithGroupId("random-topic-handler") + .WithBufferSize(5) + .WithWorkersCount(3) + .WithAutoOffsetReset(AutoOffsetReset.Earliest) + .WithConsumerConfig(new ConsumerConfig() + { + TopicMetadataRefreshIntervalMs = 5000 // discover new topics every 5 seconds + }) + .AddMiddlewares( + middlewares => middlewares + .Add() + ) + ) + ) +); + +var provider = services.BuildServiceProvider(); + +var bus = provider.CreateKafkaBus(); + +await bus.StartAsync(); + +var producer = provider + .GetRequiredService() + .GetProducer(producerName); + +while (true) +{ + Console.WriteLine("Type the name of a topic to send a message or 'exit' to quit:"); + + var input = Console.ReadLine(); + + if (input is null) + continue; + + if (input.Equals("exit", StringComparison.OrdinalIgnoreCase)) + { + await bus.StopAsync(); + break; + } + + await producer.ProduceAsync( + $"{topicPrefix}{input}", + Guid.NewGuid().ToString(), + Encoding.UTF8.GetBytes( + $"Message to {input}: {Guid.NewGuid()}")); +} \ No newline at end of file diff --git a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs index 0e85e8654..00c04e422 100644 --- a/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs +++ b/src/KafkaFlow.Abstractions/Configuration/IConsumerConfigurationBuilder.cs @@ -14,7 +14,8 @@ public interface IConsumerConfigurationBuilder IDependencyConfigurator DependencyConfigurator { get; } /// - /// Sets the topic that will be used to read the messages, the partitions will be automatically assigned + /// Sets the topic that will be used to read the messages, the partitions will be automatically assigned. + /// librdkafka patterns are accepted. /// /// Topic name /// diff --git a/src/KafkaFlow.sln b/src/KafkaFlow.sln index 0b4f56ea7..73f4d6c43 100644 --- a/src/KafkaFlow.sln +++ b/src/KafkaFlow.sln @@ -95,6 +95,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Telemetry", "Telemetry", "{ EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "KafkaFlow.OpenTelemetry", "KafkaFlow.OpenTelemetry\KafkaFlow.OpenTelemetry.csproj", "{1557B135-4925-4FA2-80DA-8AD13155F3BD}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaFlow.Sample.WildcardConsumer", "..\samples\KafkaFlow.Sample.WildcardConsumer\KafkaFlow.Sample.WildcardConsumer.csproj", "{E3A02BB4-6881-4568-B92F-CC0878BD8175}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -233,6 +235,10 @@ Global {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Debug|Any CPU.Build.0 = Debug|Any CPU {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.ActiveCfg = Release|Any CPU {1557B135-4925-4FA2-80DA-8AD13155F3BD}.Release|Any CPU.Build.0 = Release|Any CPU + {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E3A02BB4-6881-4568-B92F-CC0878BD8175}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -273,6 +279,7 @@ Global {B4A9E7CE-7A37-411E-967E-D9B5FD1A3992} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {4A16F519-FAF8-432C-AD0A-CC44F7BD392D} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} {1557B135-4925-4FA2-80DA-8AD13155F3BD} = {96F5D441-B8DE-4ABC-BEF2-F758D1B2BA39} + {E3A02BB4-6881-4568-B92F-CC0878BD8175} = {303AE78F-6C96-4DF4-AC89-5C4FD53AFF0B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {6AE955B5-16B0-41CF-9F12-66D15B3DD1AB} diff --git a/website/docs/getting-started/samples.md b/website/docs/getting-started/samples.md index aba90fd4a..fa5d87b5a 100644 --- a/website/docs/getting-started/samples.md +++ b/website/docs/getting-started/samples.md @@ -53,3 +53,9 @@ You can find the code here: [/samples/KafkaFlow.Sample.PauseConsumerOnError](htt This is a sample that shows how to throttle a consumer based on others consumers lag You can find the code here: [/samples/KafkaFlow.Sample.ConsumerThrottling](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.ConsumerThrottling) + +## Wildcard Consumers + +This sample shows how to use a consumer to handle all the topics according to a naming convention. This is not a feature of KafkaFlow, but a demonstration of how to use the pattern conventions exposed by [librdkafka](https://github.com/confluentinc/librdkafka/tree/95a542c87c61d2c45b445f91c73dd5442eb04f3c) ([here](https://github.com/confluentinc/librdkafka/blob/95a542c87c61d2c45b445f91c73dd5442eb04f3c/src-cpp/rdkafkacpp.h#L2681)). + +You can find the code here: [/samples/KafkaFlow.Sample.WildcardConsumer](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer) diff --git a/website/docs/guides/consumers/add-consumers.md b/website/docs/guides/consumers/add-consumers.md index 0d2ae78bd..50f1aadf1 100644 --- a/website/docs/guides/consumers/add-consumers.md +++ b/website/docs/guides/consumers/add-consumers.md @@ -31,6 +31,11 @@ services.AddKafka(kafka => kafka On a Consumer, you can configure the Middlewares that will be invoked. You can find more information on how to configure Middlewares [here](../middlewares). +:::tip +You can use a naming pattern such as a wildcard to connect to any topic that matches a naming convention. +You can find a sample on [here](https://github.com/Farfetch/kafkaflow/tree/master/samples/KafkaFlow.Sample.WildcardConsumer). +::: + ## Automatic Partitions Assignment Using the `Topic()` or `Topics()` methods, the consumer will trigger the automatic partition assignment that will distribute the topic partitions across the application instances.