CommonOptAsyncConfiguration.java

  1. /*
  2.  * Copyright 2005-2025 the original author or authors.
  3.  *
  4.  * Licensed under the Apache License, Version 2.0 (the "License");
  5.  * you may not use this file except in compliance with the License.
  6.  * You may obtain a copy of the License at
  7.  *
  8.  * http://www.apache.org/licenses/LICENSE-2.0
  9.  *
  10.  * Unless required by applicable law or agreed to in writing, software
  11.  * distributed under the License is distributed on an "AS IS" BASIS,
  12.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13.  * See the License for the specific language governing permissions and
  14.  * limitations under the License.
  15.  */
  16. package org.openwms.common.app;

  17. import org.ameba.amqp.RabbitTemplateConfigurable;
  18. import org.openwms.core.SpringProfiles;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import org.springframework.amqp.core.Binding;
  22. import org.springframework.amqp.core.BindingBuilder;
  23. import org.springframework.amqp.core.DirectExchange;
  24. import org.springframework.amqp.core.Queue;
  25. import org.springframework.amqp.core.QueueBuilder;
  26. import org.springframework.amqp.core.TopicExchange;
  27. import org.springframework.amqp.rabbit.annotation.EnableRabbit;
  28. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
  29. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  30. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  31. import org.springframework.amqp.support.converter.MessageConverter;
  32. import org.springframework.amqp.support.converter.SerializerMessageConverter;
  33. import org.springframework.beans.factory.ObjectProvider;
  34. import org.springframework.beans.factory.annotation.Autowired;
  35. import org.springframework.beans.factory.annotation.Qualifier;
  36. import org.springframework.beans.factory.annotation.Value;
  37. import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
  38. import org.springframework.cloud.context.config.annotation.RefreshScope;
  39. import org.springframework.context.annotation.Bean;
  40. import org.springframework.context.annotation.Configuration;
  41. import org.springframework.context.annotation.Primary;
  42. import org.springframework.context.annotation.Profile;
  43. import org.springframework.retry.backoff.ExponentialBackOffPolicy;
  44. import org.springframework.retry.support.RetryTemplate;

  45. import java.util.Objects;

  46. import static org.ameba.LoggingCategories.BOOT;
  47. import static org.openwms.common.CommonProfiles.SHIPPING_SUPPORT;

  48. /**
  49.  * A CommonOptAsyncConfiguration contains the modules asynchronous configuration that is
  50.  * only active when Springs ASYNCHRONOUS profile is set.
  51.  *
  52.  * @author Heiko Scherrer
  53.  */
  54. @Profile(SpringProfiles.ASYNCHRONOUS_PROFILE)
  55. @Configuration
  56. @RefreshScope
  57. @EnableRabbit
  58. class CommonOptAsyncConfiguration {

  59.     private static final Logger BOOT_LOGGER = LoggerFactory.getLogger(BOOT);
  60.     public static final String POISON_MESSAGE = "poison-message";
  61.     public static final String DEAD_LETTER_EXCHANGE = "x-dead-letter-exchange";
  62.     public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

  63.     @ConditionalOnExpression("'${owms.common.serialization}'=='json'")
  64.     @Bean
  65.     MessageConverter messageConverter() {
  66.         var messageConverter = new Jackson2JsonMessageConverter();
  67.         BOOT_LOGGER.info("Using JSON serialization over AMQP");
  68.         return messageConverter;
  69.     }

  70.     @ConditionalOnExpression("'${owms.common.serialization}'=='barray'")
  71.     @Bean
  72.     MessageConverter serializerMessageConverter() {
  73.         var messageConverter = new SerializerMessageConverter();
  74.         BOOT_LOGGER.info("Using byte array serialization over AMQP");
  75.         return messageConverter;
  76.     }

  77.     @Primary
  78.     @Bean(name = "amqpTemplate")
  79.     public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,
  80.             ObjectProvider<MessageConverter> messageConverter,
  81.             @Autowired(required = false) RabbitTemplateConfigurable rabbitTemplateConfigurable) {
  82.         var rabbitTemplate = new RabbitTemplate(connectionFactory);
  83.         rabbitTemplate.setObservationEnabled(true);
  84.         var backOffPolicy = new ExponentialBackOffPolicy();
  85.         backOffPolicy.setMultiplier(2);
  86.         backOffPolicy.setMaxInterval(15000);
  87.         backOffPolicy.setInitialInterval(500);
  88.         var retryTemplate = new RetryTemplate();
  89.         retryTemplate.setBackOffPolicy(backOffPolicy);
  90.         rabbitTemplate.setRetryTemplate(retryTemplate);
  91.         rabbitTemplate.setMessageConverter(Objects.requireNonNull(messageConverter.getIfUnique()));
  92.         if (rabbitTemplateConfigurable != null) {
  93.             rabbitTemplateConfigurable.configure(rabbitTemplate);
  94.         }
  95.         return rabbitTemplate;
  96.     }

  97.     /*~ --------------- Exchanges --------------- */
  98.     @RefreshScope
  99.     @Bean DirectExchange dlExchange(
  100.             @Value("${spring.application.name}") String applicationName,
  101.             @Value("${owms.dead-letter.exchange-name:}") String exchangeName
  102.     ) {
  103.         if (exchangeName.isEmpty()) {
  104.             BOOT_LOGGER.warn("No dead letter exchange configured, falling back to: [{}]", applicationName);
  105.             return new DirectExchange("dle." + applicationName);
  106.         }
  107.         return new DirectExchange(exchangeName);
  108.     }
  109.     @RefreshScope
  110.     @Bean TopicExchange commonTuExchange(@Value("${owms.events.common.tu.exchange-name}") String exchangeName) {
  111.         return new TopicExchange(exchangeName, true, false);
  112.     }
  113.     @RefreshScope
  114.     @Bean TopicExchange commonTuTExchange(@Value("${owms.events.common.tut.exchange-name}") String exchangeName) {
  115.         return new TopicExchange(exchangeName, true, false);
  116.     }
  117.     @RefreshScope
  118.     @Bean TopicExchange commonTuCommandsExchange(@Value("${owms.commands.common.tu.exchange-name}") String exchangeName) {
  119.         return new TopicExchange(exchangeName, true, false);
  120.     }
  121.     @RefreshScope
  122.     @Bean TopicExchange commonLocCommandsExchange(@Value("${owms.commands.common.loc.exchange-name}") String exchangeName) {
  123.         return new TopicExchange(exchangeName, true, false);
  124.     }
  125.     @RefreshScope
  126.     @Bean TopicExchange commonLocRegistrationCommandsExchange(@Value("${owms.commands.common.registration.exchange-name}") String exchangeName) {
  127.         return new TopicExchange(exchangeName, true, false);
  128.     }
  129.     @Profile(SHIPPING_SUPPORT)
  130.     @RefreshScope
  131.     @Bean TopicExchange shippingExchange(@Value("${owms.events.shipping.exchange-name}") String exchangeName) {
  132.         return new TopicExchange(exchangeName, true, false);
  133.     }

  134.     /*~ ------------ Queues ------------- */
  135.     @RefreshScope
  136.     @Bean Queue dlQueue(
  137.             @Value("${spring.application.name}") String applicationName,
  138.             @Value("${owms.dead-letter.queue-name:}") String queueName
  139.     ) {
  140.         if (queueName.isEmpty()) {
  141.             return QueueBuilder.durable(applicationName + "-dl-queue").build();
  142.         }
  143.         return QueueBuilder.durable(queueName).build();
  144.     }
  145.     @RefreshScope
  146.     @Bean Queue commandsQueue(@Value("${owms.commands.common.tu.queue-name}") String queueName, DirectExchange dlExchange) {
  147.         return QueueBuilder.durable(queueName)
  148.                 .withArgument(DEAD_LETTER_EXCHANGE, dlExchange.getName())
  149.                 .withArgument(DEAD_LETTER_ROUTING_KEY, POISON_MESSAGE)
  150.                 .build();
  151.     }
  152.     @RefreshScope
  153.     @Bean Queue commonLocCommandsQueue(@Value("${owms.commands.common.loc.queue-name}") String queueName, DirectExchange dlExchange) {
  154.         return QueueBuilder.durable(queueName)
  155.                 .withArgument(DEAD_LETTER_EXCHANGE, dlExchange.getName())
  156.                 .withArgument(DEAD_LETTER_ROUTING_KEY, POISON_MESSAGE)
  157.                 .build();
  158.     }
  159.     @RefreshScope
  160.     @Bean Queue commonLocRegistrationCommandsQueue(@Value("${owms.commands.common.registration.queue-name}") String queueName, DirectExchange dlExchange) {
  161.         return QueueBuilder.durable(queueName)
  162.                 .withArgument(DEAD_LETTER_EXCHANGE, dlExchange.getName())
  163.                 .withArgument(DEAD_LETTER_ROUTING_KEY, POISON_MESSAGE)
  164.                 .build();
  165.     }
  166.     @Profile(SHIPPING_SUPPORT)
  167.     @RefreshScope
  168.     @Bean Queue shippingSplitQueue(
  169.             @Value("${owms.events.shipping.split.queue-name}") String queueName,
  170.             @Value("${owms.dead-letter.exchange-name}") String exchangeName
  171.     ) {
  172.         return QueueBuilder.durable(queueName)
  173.                 .withArgument(DEAD_LETTER_EXCHANGE, exchangeName)
  174.                 .withArgument(DEAD_LETTER_ROUTING_KEY, POISON_MESSAGE)
  175.                 .build();
  176.     }

  177.     /*~ ------------ Bindings ------------- */
  178.     @RefreshScope
  179.     @Bean Binding dlBinding(
  180.             @Value("${spring.application.name}") String applicationName,
  181.             @Value("${owms.dead-letter.queue-name:}") String queueName,
  182.             @Value("${owms.dead-letter.exchange-name:}") String exchangeName
  183.     ) {
  184.         return BindingBuilder
  185.                 .bind(dlQueue(applicationName, queueName))
  186.                 .to(dlExchange(applicationName, exchangeName))
  187.                 .with(POISON_MESSAGE);
  188.     }
  189.     @RefreshScope
  190.     @Bean Binding commandsBinding(
  191.             @Qualifier("commonTuCommandsExchange") TopicExchange commonTuCommandsExchange,
  192.             @Qualifier("commandsQueue") Queue commandsQueue,
  193.             @Value("${owms.commands.common.tu.routing-key}") String routingKey
  194.     ) {
  195.         return BindingBuilder
  196.                 .bind(commandsQueue)
  197.                 .to(commonTuCommandsExchange)
  198.                 .with(routingKey);
  199.     }
  200.     @RefreshScope
  201.     @Bean Binding locCommandsBinding(
  202.             @Qualifier("commonLocCommandsExchange") TopicExchange commonLocCommandsExchange,
  203.             @Qualifier("commonLocCommandsQueue") Queue commonLocCommandsQueue,
  204.             @Value("${owms.commands.common.loc.routing-key}") String routingKey
  205.     ) {
  206.         return BindingBuilder
  207.                 .bind(commonLocCommandsQueue)
  208.                 .to(commonLocCommandsExchange)
  209.                 .with(routingKey);
  210.     }
  211.     @RefreshScope
  212.     @Bean Binding locRegistrationCommandsBinding(
  213.             @Qualifier("commonLocRegistrationCommandsExchange") TopicExchange commonLocRegistrationCommandsExchange,
  214.             @Qualifier("commonLocRegistrationCommandsQueue") Queue commonLocRegistrationCommandsQueue,
  215.             @Value("${owms.commands.common.registration.routing-key}") String routingKey
  216.     ) {
  217.         return BindingBuilder
  218.                 .bind(commonLocRegistrationCommandsQueue)
  219.                 .to(commonLocRegistrationCommandsExchange)
  220.                 .with(routingKey);
  221.     }
  222.     @Profile(SHIPPING_SUPPORT)
  223.     @RefreshScope
  224.     @Bean Binding splitBinding(
  225.             @Qualifier("shippingExchange") TopicExchange shippingExchange,
  226.             @Qualifier("shippingSplitQueue") Queue shippingSplitQueue,
  227.             @Value("${owms.events.shipping.split.routing-key}") String routingKey
  228.     ) {
  229.         return BindingBuilder
  230.                 .bind(shippingSplitQueue)
  231.                 .to(shippingExchange)
  232.                 .with(routingKey);
  233.     }
  234. }