diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs index 897e30f9a..36b0774b4 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentAvro/ConsumerConfigurationBuilderExtensions.cs @@ -19,7 +19,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryAvroDeser this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( - resolver => new SerializerConsumerMiddleware( + resolver => new DeserializerConsumerMiddleware( new ConfluentAvroDeserializer(resolver), new SchemaRegistryTypeResolver(new ConfluentAvroTypeNameResolver(resolver.Resolve())))); } diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs index 51f628ccb..3722dc35d 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConfluentProtobufDeserializer.cs @@ -4,7 +4,6 @@ using System.Collections.Generic; using System.IO; using System.Threading.Tasks; - using Confluent.SchemaRegistry; using Confluent.SchemaRegistry.Serdes; /// @@ -12,37 +11,6 @@ /// public class ConfluentProtobufDeserializer : IDeserializer { - private readonly ISchemaRegistryClient schemaRegistryClient; - private readonly ProtobufSerializerConfig serializerConfig; - - /// - /// Initializes a new instance of the class. - /// - /// An instance of - /// An instance of - public ConfluentProtobufDeserializer(IDependencyResolver resolver, ProtobufSerializerConfig serializerConfig = null) - { - this.schemaRegistryClient = - resolver.Resolve() ?? - throw new InvalidOperationException( - $"No schema registry configuration was found. Set it using {nameof(ClusterConfigurationBuilderExtensions.WithSchemaRegistry)} on cluster configuration"); - - this.serializerConfig = serializerConfig; - } - - /// - public Task SerializeAsync(object message, Stream output, ISerializerContext context) - { - return ConfluentSerializerWrapper - .GetOrCreateSerializer( - message.GetType(), - () => Activator.CreateInstance( - typeof(ProtobufSerializer<>).MakeGenericType(message.GetType()), - this.schemaRegistryClient, - this.serializerConfig)) - .SerializeAsync(message, output, context); - } - /// public Task DeserializeAsync(Stream input, Type type, ISerializerContext context) { diff --git a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs index 69fed7c13..a7cb9e449 100644 --- a/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs +++ b/src/KafkaFlow.Serializer.SchemaRegistry.ConfluentProtobuf/ConsumerConfigurationBuilderExtensions.cs @@ -19,8 +19,8 @@ public static IConsumerMiddlewareConfigurationBuilder AddSchemaRegistryProtobufD this IConsumerMiddlewareConfigurationBuilder middlewares) { return middlewares.Add( - resolver => new SerializerConsumerMiddleware( - new ConfluentProtobufDeserializer(resolver), + resolver => new DeserializerConsumerMiddleware( + new ConfluentProtobufDeserializer(), new SchemaRegistryTypeResolver(new ConfluentProtobufTypeNameResolver(resolver.Resolve())))); } } diff --git a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs index efff17244..1757da6db 100644 --- a/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/Serializers/SerializerConsumerMiddlewareTests.cs @@ -19,7 +19,7 @@ public class SerializerConsumerMiddlewareTests private bool nextCalled; - private SerializerConsumerMiddleware target; + private DeserializerConsumerMiddleware target; [TestInitialize] public void Setup() @@ -28,7 +28,7 @@ public void Setup() this.deserializerMock = new Mock(); this.typeResolverMock = new Mock(); - this.target = new SerializerConsumerMiddleware( + this.target = new DeserializerConsumerMiddleware( this.deserializerMock.Object, this.typeResolverMock.Object); } diff --git a/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs index 487731d1f..938cedfa3 100644 --- a/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs +++ b/src/KafkaFlow/Serializer/Configuration/ConsumerMiddlewareConfigurationBuilder.cs @@ -47,7 +47,7 @@ public static IConsumerMiddlewareConfigurationBuilder AddDeserializer new SerializerConsumerMiddleware( + resolver => new DeserializerConsumerMiddleware( serializerFactory(resolver), resolverFactory(resolver))); } diff --git a/src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs b/src/KafkaFlow/Serializer/Middlewares/DeserializerConsumerMiddleware.cs similarity index 92% rename from src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs rename to src/KafkaFlow/Serializer/Middlewares/DeserializerConsumerMiddleware.cs index 21de06428..863dcebfe 100644 --- a/src/KafkaFlow/Serializer/Middlewares/SerializerConsumerMiddleware.cs +++ b/src/KafkaFlow/Serializer/Middlewares/DeserializerConsumerMiddleware.cs @@ -8,17 +8,17 @@ /// /// Middleware to deserialize messages when consuming /// - public class SerializerConsumerMiddleware : IMessageMiddleware + public class DeserializerConsumerMiddleware : IMessageMiddleware { private readonly IDeserializer deserializer; private readonly IMessageTypeResolver typeResolver; /// - /// Initializes a new instance of the class. + /// Initializes a new instance of the class. /// /// Instance of /// Instance of - public SerializerConsumerMiddleware( + public DeserializerConsumerMiddleware( IDeserializer deserializer, IMessageTypeResolver typeResolver) {