Skip to content

Commit

Permalink
Add common container factory interface (#829)
Browse files Browse the repository at this point in the history
This introduces a common interface for all message listener containers.
Prior to this commit, the reader and listener containers had no common
abstraction. This is needed to introduce a generic container factory
customizer in Spring Boot.
  • Loading branch information
onobc authored Sep 9, 2024
1 parent 0012125 commit 8dcff29
Show file tree
Hide file tree
Showing 18 changed files with 222 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ When using Spring Boot the `PulsarTopicBuilder` is now a registered bean that is
Therefore, if you are using Spring Boot, you can simply inject the builder where needed.
Otherwise, use one of the `PulsarTopicBuilder` constructors directly.

==== Listener/ReaderContainerFactory
The `PulsarContainerFactory` common interface was introduced to bridge the gap between listener and reader container factories.
As part of this, the following APIs were deprecated, copied, and renamed:

- `ListenerContainerFactory#createListenerContainer` replaced with `ListenerContainerFactory#createRegisteredContainer`

- `ReaderContainerFactory#createReaderContainer(E endpoint)` replaced with `ReaderContainerFactory#createRegisteredContainer`

- `ReaderContainerFactory#createReaderContainer(String... topics)` replaced with `ReaderContainerFactory#createContainer`


=== Breaking Changes

==== PulsarTopic#<init>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -139,25 +139,21 @@ public DefaultReactivePulsarMessageListenerContainer<T> createContainerInstance(
return new DefaultReactivePulsarMessageListenerContainer<>(this.getConsumerFactory(), containerProps);
}

@SuppressWarnings("rawtypes")
@Override
public DefaultReactivePulsarMessageListenerContainer<T> createListenerContainer(
public DefaultReactivePulsarMessageListenerContainer<T> createRegisteredContainer(
ReactivePulsarListenerEndpoint<T> endpoint) {
DefaultReactivePulsarMessageListenerContainer<T> instance = createContainerInstance(endpoint);
if (endpoint instanceof AbstractReactivePulsarListenerEndpoint) {
configureEndpoint((AbstractReactivePulsarListenerEndpoint<T>) endpoint);
var instance = createContainerInstance(endpoint);
if (endpoint instanceof AbstractReactivePulsarListenerEndpoint abstractReactiveEndpoint) {
if (abstractReactiveEndpoint.getFluxListener() == null) {
JavaUtils.INSTANCE.acceptIfNotNull(this.fluxListener, abstractReactiveEndpoint::setFluxListener);
}
}

endpoint.setupListenerContainer(instance, this.messageConverter);
initializeContainer(instance, endpoint);
return instance;
}

private void configureEndpoint(AbstractReactivePulsarListenerEndpoint<T> aplEndpoint) {
if (aplEndpoint.getFluxListener() == null) {
JavaUtils.INSTANCE.acceptIfNotNull(this.fluxListener, aplEndpoint::setFluxListener);
}
}

@Override
public DefaultReactivePulsarMessageListenerContainer<T> createContainer(String... topics) {
ReactivePulsarListenerEndpoint<T> endpoint = new ReactivePulsarListenerEndpoint<>() {
Expand All @@ -168,7 +164,7 @@ public List<String> getTopics() {
}

};
DefaultReactivePulsarMessageListenerContainer<T> container = createContainerInstance(endpoint);
var container = createContainerInstance(endpoint);
initializeContainer(container, endpoint);
return container;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -32,6 +34,18 @@
*/
class DefaultReactivePulsarListenerContainerFactoryTests {

@SuppressWarnings({ "removal", "unchecked" })
@Test
void deprecatedCreateListenerContainerCallsReplacementApi() {
var containerFactory = spy(new DefaultReactivePulsarListenerContainerFactory<String>(
mock(ReactivePulsarConsumerFactory.class), new ReactivePulsarContainerProperties<>()));
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
assertThat(createdContainer).isNotNull();
verify(containerFactory).createRegisteredContainer(endpoint);
}

@SuppressWarnings("unchecked")
@Nested
class SubscriptionTypeFrom {
Expand All @@ -44,7 +58,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() {
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
var createdContainer = containerFactory.createRegisteredContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Shared);
}
Expand All @@ -58,7 +72,7 @@ void endpointTakesPrecedenceOverFactoryProps() {
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
when(endpoint.getSubscriptionType()).thenReturn(SubscriptionType.Failover);
var createdContainer = containerFactory.createListenerContainer(endpoint);
var createdContainer = containerFactory.createRegisteredContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Failover);
}
Expand All @@ -70,7 +84,7 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
var createdContainer = containerFactory.createRegisteredContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionType())
.isEqualTo(SubscriptionType.Exclusive);

Expand All @@ -90,7 +104,7 @@ void factoryPropsUsedWhenNotSetOnEndpoint() {
mock(ReactivePulsarConsumerFactory.class), factoryProps);
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
var createdContainer = containerFactory.createListenerContainer(endpoint);
var createdContainer = containerFactory.createRegisteredContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
.isEqualTo("my-factory-subscription");
}
Expand All @@ -104,7 +118,7 @@ void endpointTakesPrecedenceOverFactoryProps() {
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);
when(endpoint.getSubscriptionName()).thenReturn("my-endpoint-subscription");
var createdContainer = containerFactory.createListenerContainer(endpoint);
var createdContainer = containerFactory.createRegisteredContainer(endpoint);
assertThat(createdContainer.getContainerProperties().getSubscriptionName())
.isEqualTo("my-endpoint-subscription");
}
Expand All @@ -117,10 +131,10 @@ void defaultUsedWhenNotSetOnEndpointNorFactoryProps() {
var endpoint = mock(ReactivePulsarListenerEndpoint.class);
when(endpoint.getConcurrency()).thenReturn(1);

var container1 = containerFactory.createListenerContainer(endpoint);
var container1 = containerFactory.createRegisteredContainer(endpoint);
assertThat(container1.getContainerProperties().getSubscriptionName())
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
var container2 = containerFactory.createListenerContainer(endpoint);
var container2 = containerFactory.createRegisteredContainer(endpoint);
assertThat(container2.getContainerProperties().getSubscriptionName())
.startsWith("org.springframework.Pulsar.ReactivePulsarListenerEndpointContainer#");
assertThat(container1.getContainerProperties().getSubscriptionName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ class ContainerFactoryAttribute {
@Test
void containerFactoryDerivedFromAttribute(
@Autowired ReactivePulsarListenerContainerFactory<String> containerFactory) {
verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("foo")));
verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("bar")));
verify(containerFactory).createListenerContainer(argThat(endpoint -> endpoint.getId().equals("zaa")));
verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("foo")));
verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("bar")));
verify(containerFactory).createRegisteredContainer(argThat(endpoint -> endpoint.getId().equals("zaa")));
}

@EnablePulsar
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -106,7 +106,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv

@SuppressWarnings("unchecked")
@Override
public C createListenerContainer(PulsarListenerEndpoint endpoint) {
public C createRegisteredContainer(PulsarListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractPulsarListenerEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -99,7 +99,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv

@SuppressWarnings("unchecked")
@Override
public C createReaderContainer(PulsarReaderEndpoint<PulsarMessageReaderContainer> endpoint) {
public C createRegisteredContainer(PulsarReaderEndpoint<PulsarMessageReaderContainer> endpoint) {
C instance = createContainerInstance(endpoint);
JavaUtils.INSTANCE.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
if (endpoint instanceof AbstractPulsarReaderEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,7 @@ protected void initializeContainer(DefaultPulsarMessageReaderContainer<T> instan
}

@Override
public DefaultPulsarMessageReaderContainer<T> createReaderContainer(String... topics) {
public DefaultPulsarMessageReaderContainer<T> createContainer(String... topics) {
// TODO
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -138,7 +138,7 @@ public void registerListenerContainer(E endpoint, ListenerContainerFactory<? ext

protected C createListenerContainer(E endpoint, ListenerContainerFactory<? extends C, E> factory) {

C listenerContainer = factory.createListenerContainer(endpoint);
C listenerContainer = factory.createRegisteredContainer(endpoint);

if (listenerContainer instanceof InitializingBean) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -136,7 +136,7 @@ public void registerReaderContainer(E endpoint, ReaderContainerFactory<? extends
}

protected C createReaderContainer(E endpoint, ReaderContainerFactory<? extends C, E> factory) {
C readerContainer = factory.createReaderContainer(endpoint);
C readerContainer = factory.createRegisteredContainer(endpoint);
if (readerContainer instanceof InitializingBean) {
try {
((InitializingBean) readerContainer).afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

package org.springframework.pulsar.config;

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.listener.MessageListenerContainer;

/**
Expand All @@ -26,24 +25,22 @@
* @param <E> listener endpoint type.
* @author Soby Chacko
* @author Christophe Bornet
* @author Chris Bono
*/
public interface ListenerContainerFactory<C extends MessageListenerContainer, E extends ListenerEndpoint<C>> {
public interface ListenerContainerFactory<C extends MessageListenerContainer, E extends ListenerEndpoint<C>>
extends PulsarContainerFactory<C, E> {

/**
* Create a {@link MessageListenerContainer} for the given {@link ListenerEndpoint}.
* Containers created using this method are added to the listener endpoint registry.
* @param endpoint the endpoint to configure
* @return the created container
* @deprecated since 1.2.0 for removal in 1.4.0 in favor of
* {@link PulsarContainerFactory#createRegisteredContainer}
*/
C createListenerContainer(E endpoint);

/**
* Create and configure a container without a listener; used to create containers that
* are not used for {@link PulsarListener} annotations. Containers created using this
* method are not added to the listener endpoint registry.
* @param topics the topics.
* @return the container.
*/
C createContainer(String... topics);
@Deprecated(since = "1.2.0", forRemoval = true)
default C createListenerContainer(E endpoint) {
return createRegisteredContainer(endpoint);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,7 +41,7 @@ public interface ListenerEndpoint<C extends MessageListenerContainer> {
* Return the id of this endpoint.
* @return the id of this endpoint. The id can be further qualified when the endpoint
* is resolved against its actual listener container.
* @see ListenerContainerFactory#createListenerContainer
* @see ListenerContainerFactory#createRegisteredContainer
*/
@Nullable
default String getId() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.pulsar.config;

/**
* Factory for Pulsar message listener containers.
*
* @param <C> message container
* @param <E> message listener endpoint
* @author Chris Bono
* @since 1.2.0
*/
public interface PulsarContainerFactory<C, E> {

/**
* Create a message listener container for the given endpoint. Containers created
* using this method are added to the listener endpoint registry.
* @param endpoint the endpoint to configure
* @return the created container
*/
C createRegisteredContainer(E endpoint);

/**
* Create and configure a container without a listener. Containers created using this
* method are not added to the listener endpoint registry.
* @param topics the topics.
* @return the container.
*/
C createContainer(String... topics);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023 the original author or authors.
* Copyright 2023-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -39,7 +39,7 @@ public interface PulsarReaderEndpoint<C extends PulsarMessageReaderContainer> {
* Return the id of this endpoint.
* @return the id of this endpoint. The id can be further qualified when the endpoint
* is resolved against its actual listener container.
* @see ListenerContainerFactory#createListenerContainer
* @see ListenerContainerFactory#createRegisteredContainer
*/
@Nullable
String getId();
Expand Down
Loading

0 comments on commit 8dcff29

Please sign in to comment.