From 6c611bcbc18d767ac65eb13478e2a1f3cec8cdba Mon Sep 17 00:00:00 2001 From: Vaishnavi <41518119+VaishnaviNandakumar@users.noreply.github.com> Date: Sun, 8 Oct 2023 02:14:32 +0530 Subject: [PATCH] feat: update aqmp config (#310) * Updated config file for amqp * Added publisher service * Added listeners and publisher code * Updated AMQP config and added sample config files to test * Updated amqp config * Segregated declarables for exchanges and queues * Updated test snapshots * fix double dependency * fix snapshots --------- Co-authored-by: Semen --- partials/AmqpConfig.java | 78 ++++++----------- partials/AmqpPublisher.java | 43 ++++++++++ .../service/CommandLinePublisher.java | 3 +- .../service/MessageHandlerService.java | 24 +++++- .../asyncapi/service/PublisherService.java | 3 + template/src/main/resources/application.yml | 22 ++--- tests/__snapshots__/kafka.test.js.snap | 2 + tests/__snapshots__/mqtt.test.js.snap | 14 ++-- tests/__snapshots__/oneOf.test.js.snap | 2 + tests/__snapshots__/parameters.test.js.snap | 2 + .../aqmp_config_multiple-channels.yaml | 83 +++++++++++++++++++ .../aqmp_config_single-channel.yaml | 63 ++++++++++++++ 12 files changed, 265 insertions(+), 74 deletions(-) create mode 100644 partials/AmqpPublisher.java create mode 100644 tests/user_examples/aqmp_config_multiple-channels.yaml create mode 100644 tests/user_examples/aqmp_config_single-channel.yaml diff --git a/partials/AmqpConfig.java b/partials/AmqpConfig.java index 03865566b..7b18ccfc6 100644 --- a/partials/AmqpConfig.java +++ b/partials/AmqpConfig.java @@ -6,17 +6,11 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.integration.amqp.dsl.Amqp; -import org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint; -import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.messaging.MessageChannel; import javax.annotation.processing.Generated; @@ -36,16 +30,22 @@ public class Config { @Value("${amqp.broker.password}") private String password; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - @Value("${amqp.exchange.{{- channelName -}}}") + + {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} + @Value("${amqp.{{- channelName -}}.exchange}") private String {{channelName}}Exchange; - {% endif %}{% endfor %} - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - @Value("${amqp.queue.{{- channelName -}}}") + @Value("${amqp.{{- channelName -}}.routingKey}") + private String {{channelName}}RoutingKey; + {% endif %} + + {% if channel.hasPublish() %} + @Value("${amqp.{{- channelName -}}.queue}") private String {{channelName}}Queue; + {% endif %} - {% endif %}{% endfor %} + {% endfor %} @Bean public ConnectionFactory connectionFactory() { @@ -56,17 +56,12 @@ public ConnectionFactory connectionFactory() { return connectionFactory; } - @Bean - public AmqpAdmin amqpAdmin() { - return new RabbitAdmin(connectionFactory()); - } - @Bean public Declarables exchanges() { return new Declarables( {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} + new TopicExchange({{channelName}}Exchange, true, false){% if not loop.last %},{% endif %} + {% endif %}{% endfor %} ); } @@ -74,47 +69,26 @@ public Declarables exchanges() { public Declarables queues() { return new Declarables( {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} - new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %} - {% endif %}{% endfor %} + new Queue({{channelName}}Queue, true, false, false){% if not loop.last %},{% endif %} + {% endif %}{% endfor %} ); - } - // consumer - - @Autowired - MessageHandlerService messageHandlerService; - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasPublish() %} @Bean - public IntegrationFlow {{channelName | camelCase}}Flow() { - return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory(), {{channelName}}Queue)) - .handle(messageHandlerService::handle{{channelName | upperFirst}}) - .get(); + public MessageConverter converter() { + return new Jackson2JsonMessageConverter(); } - {% endif %}{% endfor %} - - // publisher @Bean - public RabbitTemplate rabbitTemplate() { - RabbitTemplate template = new RabbitTemplate(connectionFactory()); - return template; - } - {% for channelName, channel in asyncapi.channels() %}{% if channel.hasSubscribe() %} - - @Bean - public MessageChannel {{channel.subscribe().id() | camelCase}}OutboundChannel() { - return new DirectChannel(); + public AmqpAdmin amqpAdmin() { + return new RabbitAdmin(connectionFactory()); } @Bean - @ServiceActivator(inputChannel = "{{channel.subscribe().id() | camelCase}}OutboundChannel") - public AmqpOutboundEndpoint {{channelName | camelCase}}Outbound(AmqpTemplate amqpTemplate) { - AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate); - outbound.setExchangeName({{channelName}}Exchange); - outbound.setRoutingKey("#"); - return outbound; + public AmqpTemplate template() { + final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); + rabbitTemplate.setMessageConverter(converter()); + return rabbitTemplate; } - {% endif %}{% endfor %} } {% endmacro %} diff --git a/partials/AmqpPublisher.java b/partials/AmqpPublisher.java new file mode 100644 index 000000000..411ce9735 --- /dev/null +++ b/partials/AmqpPublisher.java @@ -0,0 +1,43 @@ +{% macro amqpPublisher(asyncapi, params) %} + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; +{% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasSubscribe() %} + {%- for message in channel.subscribe().messages() %} +import {{params['userJavaPackage']}}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor -%} + {% endif -%} +{% endfor %} + + +@Service +public class PublisherService { + @Autowired + private RabbitTemplate template; + + {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} + @Value("${amqp.{{- channelName -}}.exchange}") + private String {{channelName}}Exchange; + @Value("${amqp.{{- channelName -}}.routingKey}") + private String {{channelName}}RoutingKey; + {% endif %} + {% endfor %} + + {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasSubscribe() %} + {%- set schemaName = channel.subscribe().message().payload().uid() | camelCase | upperFirst %} + public void {{channel.subscribe().id() | camelCase}}(){ + {{schemaName}} {{channelName}}Payload = new {{schemaName}}(); + template.convertAndSend({{channelName}}Exchange, {{channelName}}RoutingKey, {{channelName}}Payload); + } + + {% endif %} + {% endfor %} + +} + +{% endmacro %} \ No newline at end of file diff --git a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java index 5a8ad0f9e..867b4a4a1 100644 --- a/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java +++ b/template/src/main/java/com/asyncapi/service/CommandLinePublisher.java @@ -21,7 +21,8 @@ public void run(String... args) { {%- for channelName, channel in asyncapi.channels() %} {%- if channel.hasSubscribe() %} {%- for message in channel.subscribe().messages() %} - publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}(){% else %}"Hello World from {{channelName}}"{% endif %}); + publisherService.{{channel.subscribe().id() | camelCase}}({% if asyncapi | isProtocol('kafka') %}(new Random()).nextInt(), new {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}() + {% elif asyncapi | isProtocol('amqp') %}{% else %}"Hello World from {{channelName}}"{% endif %}); {% endfor -%} {% endif -%} {%- endfor %} diff --git a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java index ab054224c..21dd43052 100644 --- a/template/src/main/java/com/asyncapi/service/MessageHandlerService.java +++ b/template/src/main/java/com/asyncapi/service/MessageHandlerService.java @@ -27,6 +27,16 @@ {%- endif %} {%- endfor %} {% endif %} +{% if asyncapi | isProtocol('amqp') and hasPublish %} +import org.springframework.amqp.rabbit.annotation.RabbitListener; + {% for channelName, channel in asyncapi.channels() %} + {%- if channel.hasPublish() %} + {%- for message in channel.publish().messages() %} +import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}}; + {%- endfor %} + {%- endif %} + {%- endfor %} + {% endif %} import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="{{''|currentTime }}") @@ -55,9 +65,21 @@ public class MessageHandlerService { } {%- endif %} {% endfor %} + + {% elif asyncapi | isProtocol('amqp') %} + {% for channelName, channel in asyncapi.channels() %} + {% if channel.hasPublish() %} + {%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %} + @RabbitListener(queues = "${amqp.{{- channelName -}}.queue}") + public void {{channel.publish().id() | camelCase}}({{schemaName}} {{channelName}}Payload ){ + LOGGER.info("Message received from {{- channelName -}} : " + {{channelName}}Payload); + } + {% endif %} + {% endfor %} + {% else %} {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() %} + {% if channel.hasPublish() %} {% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %} * {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %} * {{line | safe}}{% endfor %} diff --git a/template/src/main/java/com/asyncapi/service/PublisherService.java b/template/src/main/java/com/asyncapi/service/PublisherService.java index 8041ab432..044148da5 100644 --- a/template/src/main/java/com/asyncapi/service/PublisherService.java +++ b/template/src/main/java/com/asyncapi/service/PublisherService.java @@ -1,8 +1,11 @@ package {{ params['userJavaPackage'] }}.service; {%- from "partials/CommonPublisher.java" import commonPublisher -%} {%- from "partials/KafkaPublisher.java" import kafkaPublisher -%} +{%- from "partials/AmqpPublisher.java" import amqpPublisher -%} {%- if asyncapi | isProtocol('kafka') -%} {{- kafkaPublisher(asyncapi, params) -}} +{%- elif asyncapi | isProtocol('amqp') -%} +{{- amqpPublisher(asyncapi, params) -}} {%- else -%} {{- commonPublisher(asyncapi) -}} {%- endif -%} \ No newline at end of file diff --git a/template/src/main/resources/application.yml b/template/src/main/resources/application.yml index 148633a24..b9575fcf1 100644 --- a/template/src/main/resources/application.yml +++ b/template/src/main/resources/application.yml @@ -9,27 +9,21 @@ {%- endif -%} {%- endfor -%} -{%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %} + {%- for serverName, server in asyncapi.servers() %}{% if server.protocol() == 'amqp' %} amqp: broker: {% for line in server.description() | splitByLines %} # {{line | safe}}{% endfor %} - host: {{server.url() | replace(':{port}', '') }} + host: {% if server.variable('port') %}{{server.url() | replace('{port}', server.variable('port').defaultValue())}}{% else %}{{server.url()}}{% endif %} port: {% if server.variable('port') %}{{server.variable('port').defaultValue()}}{% endif %} - username: + username: {% if server.variable('username') %}{{server.variable('username').defaultValue()}}{% endif %} password: - exchange: - {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasSubscribe() and channel.subscribe().binding('amqp') %} - {{channelName}}: {{channel.subscribe().binding('amqp').exchange.name}} - {% endif %} - {% endfor %} - queue: {% for channelName, channel in asyncapi.channels() %} - {% if channel.hasPublish() and channel.publish().binding('amqp') %} - {{channelName}}: {{channel.publish().binding('amqp').queue.name}} - {% endif %} + {{channelName}}: + {% if channel.hasSubscribe() %} exchange: {{channel.subscribe().binding('amqp').exchange.name}} {% endif %} + {% if channel.hasSubscribe() %} routingKey: {{channel.subscribe().binding('amqp').routingKey}}{% endif %} + {% if channel.hasPublish() %} queue: {{channel.publish().binding('amqp').queue.name}}{% endif %} {% endfor %} -{% endif %} + {% endif %} {% if server.protocol() == 'mqtt' %} mqtt: diff --git a/tests/__snapshots__/kafka.test.js.snap b/tests/__snapshots__/kafka.test.js.snap index 9abd9f95d..b8e1a8642 100644 --- a/tests/__snapshots__/kafka.test.js.snap +++ b/tests/__snapshots__/kafka.test.js.snap @@ -142,6 +142,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -161,6 +162,7 @@ public class MessageHandlerService { } + } " `; diff --git a/tests/__snapshots__/mqtt.test.js.snap b/tests/__snapshots__/mqtt.test.js.snap index 228cf14b4..07f23a8b8 100644 --- a/tests/__snapshots__/mqtt.test.js.snap +++ b/tests/__snapshots__/mqtt.test.js.snap @@ -213,6 +213,7 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -222,7 +223,7 @@ public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -232,11 +233,11 @@ public class MessageHandlerService { } - - - + + + } @@ -1087,6 +1088,7 @@ import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.stereotype.Service; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -1096,7 +1098,7 @@ public class MessageHandlerService { private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class); - + /** * The topic on which measured values may be produced and consumed. */ @@ -1106,7 +1108,7 @@ public class MessageHandlerService { } - + } diff --git a/tests/__snapshots__/oneOf.test.js.snap b/tests/__snapshots__/oneOf.test.js.snap index 42c7e4f90..e5fc6efde 100644 --- a/tests/__snapshots__/oneOf.test.js.snap +++ b/tests/__snapshots__/oneOf.test.js.snap @@ -16,6 +16,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.AnonymousSchema1; import com.asyncapi.model.AnonymousSchema7; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -35,6 +36,7 @@ public class MessageHandlerService { } + } " `; diff --git a/tests/__snapshots__/parameters.test.js.snap b/tests/__snapshots__/parameters.test.js.snap index 2f87b50a1..c9b001b22 100644 --- a/tests/__snapshots__/parameters.test.js.snap +++ b/tests/__snapshots__/parameters.test.js.snap @@ -149,6 +149,7 @@ import org.springframework.messaging.handler.annotation.Payload; import com.asyncapi.model.LightMeasuredPayload; + import javax.annotation.processing.Generated; @Generated(value="com.asyncapi.generator.template.spring", date="AnyDate") @@ -168,6 +169,7 @@ public class MessageHandlerService { } + } " `; diff --git a/tests/user_examples/aqmp_config_multiple-channels.yaml b/tests/user_examples/aqmp_config_multiple-channels.yaml new file mode 100644 index 000000000..4ee1ed9de --- /dev/null +++ b/tests/user_examples/aqmp_config_multiple-channels.yaml @@ -0,0 +1,83 @@ +asyncapi: 2.5.0 +info: + title: Streetlights API Simplified + version: 1.0.0 + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 +servers: + production: + url: localhost + protocol: amqp + description: RabbitMQ + variables: + port: + default: '5672' + username: + default: guest + +channels: + lightMeasured_Streetlight1: + publish: + summary: Inform about environmental lighting conditions for Streetlight 1. + operationId: readLightMeasurement_Streetlight1 + bindings: + amqp: + is: queue + queue: + name: lightMeasurementQueue_Streetlight1 + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement_Streetlight1 + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + is: routingKey + exchange: + name: lightMeasurementExchange_Streetlight1 + routingKey: lightMeasurementRoutingKey_Streetlight1 + lightMeasured_Streetlight2: + publish: + summary: Inform about environmental lighting conditions for Streetlight 2. + operationId: readLightMeasurement_Streetlight2 + bindings: + amqp: + is: queue + queue: + name: lightMeasurementQueue_Streetlight2 + message: + $ref: '#/components/messages/lightMeasured' + subscribe: + operationId: updateLightMeasurement_Streetlight2 + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + is: routingKey + exchange: + name: lightMeasurementExchange_Streetlight2 + routingKey: lightMeasurementRoutingKey_Streetlight2 +components: + messages: + lightMeasured: + summary: Inform about environmental lighting conditions for a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the streetlight. + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. + diff --git a/tests/user_examples/aqmp_config_single-channel.yaml b/tests/user_examples/aqmp_config_single-channel.yaml new file mode 100644 index 000000000..b76d23df5 --- /dev/null +++ b/tests/user_examples/aqmp_config_single-channel.yaml @@ -0,0 +1,63 @@ +asyncapi: 2.5.0 +info: + title: Streetlights API Simplified + version: 1.0.0 + description: | + The Smartylighting Streetlights API allows you to remotely manage the city lights. + This is a simplified version of the Streetlights API from other examples. This version is used in AsyncAPI documentation. + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0 +servers: + production: + url: localhost + protocol: amqp + description: RabbitMQ + variables: + port: + default: '5672' + username: + default: guest + +channels: + lightMeasured: + publish: + summary: Inform about environmental lighting conditions for a particular streetlight. + operationId: readLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + is: queue + queue: + name: lightMeasurementQueue + subscribe: + operationId: updateLightMeasurement + message: + $ref: '#/components/messages/lightMeasured' + bindings: + amqp: + is: routingKey + exchange: + name: lightMeasurementExchange + routingKey: lightMeasurementRoutingKey + +components: + messages: + lightMeasured: + summary: Inform about environmental lighting conditions for a particular streetlight. + payload: + $ref: "#/components/schemas/lightMeasuredPayload" + schemas: + lightMeasuredPayload: + type: object + properties: + id: + type: integer + minimum: 0 + description: Id of the streetlight. + lumens: + type: integer + minimum: 0 + description: Light intensity measured in lumens. +