PolarSPARC |
Spring Integration Notes :: Part - 2
Bhaskar S | 04/24/2021 (UPDATED) |
Overview
In Part-1, we covered the basics of Spring Integration and got our hands dirty with two simple examples.
We will continue our journey to explore the following capabilities in Spring Integration:
Logging Messages
Multi-Threading
Exception Handling
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p2
$ mkdir -p src/main/resources/p2
Hands-on Spring Integration
Logging Messages
Spring Integration provides facilities for intercepting and logging messages that flow through a channel. This can be useful for audit or debugging purposes.
XML based Approach |
For demonstrating the logging capability, we will leverage the handler class defined in Listing.1 and the custom gateway interface defined in Listing.7 from Part-1.
The following is the XML based Spring Integration configuration that wires up the channels and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <bean id="inMsgHandler" class="com.polarsparc.si.p1.StrUpCaseHandler" /> <int:channel id="inChannel"> <int:interceptors> <int:wire-tap channel="logger1" /> </int:interceptors> </int:channel> <int:channel id="outChannel"> <int:queue capacity="5" /> <int:interceptors> <int:wire-tap channel="logger2" /> </int:interceptors> </int:channel> <int:logging-channel-adapter id="logger1" level="INFO" /> <int:logging-channel-adapter id="logger2" level="INFO" log-full-message="true" /> <!-- The primary purpose of a Gateway is to hide the messaging API provided by Spring Integration --> <int:gateway id="gateway" service-interface="com.polarsparc.si.p1.StrUpCaseGateway" default-request-channel="inChannel" default-reply-channel="outChannel" /> <int:service-activator input-channel="inChannel" output-channel="outChannel" ref="inMsgHandler" method="handler" /> </beans>
Some aspects of the Listing.12 from the above needs a little explanation.
A logging-channel-adapter sub-element allows one to log messages.
By default, it only logs the payload.
To log the full message (that includes the header as well as the payload), set the log-full-message attribute to true.
Under-the-hood, Spring Integration uses org.springframework.integration.handler.LoggingHandler for logging messages
In our example, we have defined two logging-channel-adapters - one with an id of logger1 which logs only the payload and the other with an id of logger2 which logs the full message
An interceptors sub-element allows one to specify an interceptor for a channel. An interceptor allows us to peek at the messages flowing through the channel.
A wire-tap is a simple out-of-the-box interceptor provided by Spring Integration. It intercepts a message following into a channel, makes a copy and forks it into another channel as specified in the channel attribute.
In our example, we have defined two wire-taps - one for the channel named inChannel to go to the logging-channel-adapter named logger1 and the other for the channel named outChannel to go to the logging-channel-adapter named logger2
The following is our main application to test logging of messages:
/* * Name: StrUpCase3MainXml * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.polarsparc.si.p1.StrUpCaseGateway; public class StrUpCase3MainXml { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase3MainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p2/StrUpCase3.xml"); StrUpCaseGateway gateway = (StrUpCaseGateway) context.getBean("gateway"); gateway.send("Spring Integration using WireTap with Xml"); String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } }
To execute the code from Listing.13, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase3MainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-24 11:31:27:533 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 11:31:27:537 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 11:31:27:544 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 11:31:27:790 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 11:31:27:881 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger1.adapter} as a subscriber to the 'logger1' channel 2021-04-24 11:31:27:882 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.logger1' has 1 subscriber(s). 2021-04-24 11:31:27:883 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger1.adapter'; defined in: 'class path resource [p2/StrUpCase3.xml]'; from source: ''int:logging-channel-adapter' with id='logger1'' 2021-04-24 11:31:27:884 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger2.adapter} as a subscriber to the 'logger2' channel 2021-04-24 11:31:27:884 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.logger2' has 1 subscriber(s). 2021-04-24 11:31:27:885 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger2.adapter'; defined in: 'class path resource [p2/StrUpCase3.xml]'; from source: ''int:logging-channel-adapter' with id='logger2'' 2021-04-24 11:31:27:886 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-04-24 11:31:27:887 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.inChannel' has 1 subscriber(s). 2021-04-24 11:31:27:887 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-04-24 11:31:27:888 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 11:31:27:889 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.errorChannel' has 1 subscriber(s). 2021-04-24 11:31:27:890 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 11:31:27:891 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-24 11:31:27:891 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-24 11:31:27:892 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 11:31:27:905 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - Spring Integration using WireTap with Xml 2021-04-24 11:31:27:906 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 11:31:27:911 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler - INPUT: msg = Spring Integration using WireTap with Xml 2021-04-24 11:31:27:913 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=SPRING INTEGRATION USING WIRETAP WITH XML, headers={replyChannel=nullChannel, id=3423703a-f024-185c-d469-bc49e1750f1c, timestamp=1619278287913}] 2021-04-24 11:31:27:914 [com.polarsparc.si.p2.StrUpCase3MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase3MainXml - OUTPUT: SPRING INTEGRATION USING WIRETAP WITH XML [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.552 s [INFO] Finished at: 2021-04-24T11:31:27-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that defines the input and output channels, the two logging-channel-adapters, and the two wire-taps similar to the one defined in the XML configuration file of Listing.12 above:
/* * Name: StrUpCase3Config * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.LoggingHandler; import org.springframework.messaging.MessageChannel; @Configuration @EnableIntegration public class StrUpCase3Config { @Bean public MessageChannel inChannel() { AbstractMessageChannel channel = new DirectChannel(); channel.addInterceptor(wireTapOne()); return channel; } @Bean public MessageChannel outChannel() { AbstractMessageChannel channel = new QueueChannel(5); channel.addInterceptor(wireTapTwo()); return channel; } @Bean @ServiceActivator(inputChannel = "logger1") public LoggingHandler loggerOne() { return new LoggingHandler(LoggingHandler.Level.INFO); } @Bean @ServiceActivator(inputChannel = "logger2") public LoggingHandler loggerTwo() { LoggingHandler logger = new LoggingHandler(LoggingHandler.Level.INFO); logger.setShouldLogFullMessage(true); return logger; } @Bean public WireTap wireTapOne() { return new WireTap("logger1"); } @Bean public WireTap wireTapTwo() { return new WireTap("logger2"); } }
Some aspects of the Listing.14 from the above needs a little explanation.
The class org.springframework.integration.channel.interceptor.WireTap implements the equivalent wire-tap functionality similar to the one in XML based configuration
The abstract base class org.springframework.integration.channel.AbstractMessageChannel implements the interface org.springframework.messaging.MessageChannel and allows one to add a wire-tap interceptor via the method addInterceptor(...)
The class org.springframework.integration.handler.LoggingHandler implements the equivalent logging-channel-adapter functionality similar to the one in XML based configuration
And finally, the following is the main application that uses the POJOs from Listing.5, Listing.10 and Listing.14 to publish and consume data:
/* * Name: StrUpCase3MainConfig * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.polarsparc.si.p1.StrUpCaseGateway2; import com.polarsparc.si.p1.StrUpCaseHandler2; public class StrUpCase3MainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase3MainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCase3Config.class, StrUpCaseHandler2.class, StrUpCaseGateway2.class); StrUpCaseGateway2 gateway = (StrUpCaseGateway2) context.getBean("gateway"); gateway.send("Spring Integration using WireTap with Config"); String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } }
To execute the code from Listing.15, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase3MainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 11:33:51:221 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 11:33:51:225 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 11:33:51:230 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 11:33:51:268 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 11:33:51:307 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 11:33:51:470 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-24 11:33:51:568 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 11:33:51:569 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-04-24 11:33:51:570 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 11:33:51:571 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:strUpCase3Config.loggerOne.serviceActivator} as a subscriber to the 'logger1' channel 2021-04-24 11:33:51:571 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.logger1' has 1 subscriber(s). 2021-04-24 11:33:51:572 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCase3Config.loggerOne.serviceActivator' 2021-04-24 11:33:51:573 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:strUpCase3Config.loggerTwo.serviceActivator} as a subscriber to the 'logger2' channel 2021-04-24 11:33:51:574 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.logger2' has 1 subscriber(s). 2021-04-24 11:33:51:574 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCase3Config.loggerTwo.serviceActivator' 2021-04-24 11:33:51:575 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-04-24 11:33:51:576 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-04-24 11:33:51:577 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler2.handler.serviceActivator' 2021-04-24 11:33:51:578 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-24 11:33:51:579 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-24 11:33:51:579 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 11:33:51:592 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - Spring Integration using WireTap with Config 2021-04-24 11:33:51:593 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 11:33:51:597 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler2 - INPUT: msg = Spring Integration using WireTap with Config 2021-04-24 11:33:51:598 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=SPRING INTEGRATION USING WIRETAP WITH CONFIG, headers={replyChannel=nullChannel, id=ef5b1978-35d8-7064-e500-bdf5da953e8f, timestamp=1619278431598}] 2021-04-24 11:33:51:600 [com.polarsparc.si.p2.StrUpCase3MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase3MainConfig - OUTPUT: SPRING INTEGRATION USING WIRETAP WITH CONFIG [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.258 s [INFO] Finished at: 2021-04-24T11:33:51-04:00 [INFO] ------------------------------------------------------------------------
As can be inferred from the Output.5 and Output.6 above, Spring Integration is using LoggingHandler to log messages intercepted by the wire-tap.
Multi-Threading
By default, Spring Integration uses the caller thread to process messages from end-to-end. There are situations when one needs to use a pool of thread(s) to process a stream of messages.
XML based Approach |
For demonstrating the multi-threading capability, we will leverage the custom gateway interface defined in Listing.7 from Part-1.
The following is the POJO that converts the input text to upper-case and introduces an artificial delay of 1000ms. This will act as an endpoint:
/* * Name: StrUpCaseHandler3 * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StrUpCaseHandler3 { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler3.class); public String handler(String msg) { LOGGER.info("INPUT: msg = {}", msg); try { Thread.sleep(1000); } catch (Exception ignored) { } return msg.toUpperCase(); } }
The following is the XML based Spring Integration configuration that wires up the channels and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <bean id="inMsgHandler" class="com.polarsparc.si.p2.StrUpCaseHandler3" /> <task:executor id="executor" queue-capacity="5" pool-size="3" /> <int:channel id="inChannel1"> <int:dispatcher task-executor="executor" /> </int:channel> <int:channel id="outChannel1"> <int:queue capacity="5" /> </int:channel> <!-- The primary purpose of a Gateway is to hide the messaging API provided by Spring Integration --> <int:gateway id="gateway" service-interface="com.polarsparc.si.p1.StrUpCaseGateway" default-request-channel="inChannel1" default-reply-channel="outChannel1" /> <int:service-activator input-channel="inChannel1" output-channel="outChannel1" ref="inMsgHandler" method="handler" /> </beans>
Some aspects of the Listing.17 from the above needs a little explanation.
The executor element allows one to define a thread pool.
The value of the id attribute is used as the prefix for the thread names in the thread pool.
The value of the pool-size attribute specifies the maximum number of threads in the thread pool.
The value of the queue-capacity attribute specifies the maximum queue size for the tasks being submitted for execution. The executor will first try to use a free thread from the thread pool. If the thread pool is empty and all the threads are active, then the task will be added to the queue as long as its capacity has not yet been reached. If the queue becomes full, then the executor will reject the task.
Under-the-hood, Spring Integration uses org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor for the thread pool
The dispatcher sub-element creates and uses an instance of org.springframework.integration.channel.ExecutorChannel under-the-hood. This channel is a point-to-point channel similar to the default simple channel except that it uses a thread pool executor.
Every message coming into the channel is dispatched to the specified thread pool executor for further processing
The following is our main application to test the multi-threaded execution:
/* * Name: StrUpCase4MainXml * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.polarsparc.si.p1.StrUpCaseGateway; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class StrUpCase4MainXml { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase4MainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p2/StrUpCase4.xml"); StrUpCaseGateway gateway = (StrUpCaseGateway) context.getBean("gateway"); for (int i = 1; i <= 5; i++) { gateway.send("Spring Integration using Executor - " + i + " with Xml"); } for (int i = 1; i <= 5; i++) { String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) context.getBean("executor"); while (executor.getActiveCount() != 0) { try { Thread.sleep(1000); } catch (Exception ignored) { } } executor.shutdown(); } }
To execute the code from Listing.18, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase4MainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-24 12:22:50:277 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 12:22:50:280 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 12:22:50:284 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 12:22:50:322 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 2021-04-24 12:22:50:465 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 12:22:50:558 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel1' channel 2021-04-24 12:22:50:558 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.inChannel1' has 1 subscriber(s). 2021-04-24 12:22:50:559 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-04-24 12:22:50:560 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 12:22:50:560 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7a15dd75.errorChannel' has 1 subscriber(s). 2021-04-24 12:22:50:561 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 12:22:50:562 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-24 12:22:50:563 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-24 12:22:50:563 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 12:22:50:578 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:22:50:580 [executor-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:22:50:581 [executor-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:22:50:583 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 1 with Xml 2021-04-24 12:22:50:583 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 3 with Xml 2021-04-24 12:22:50:583 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 2 with Xml 2021-04-24 12:22:51:586 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 4 with Xml 2021-04-24 12:22:51:586 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 1 WITH XML 2021-04-24 12:22:51:586 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler3 - INPUT: msg = Spring Integration using Executor - 5 with Xml 2021-04-24 12:22:51:588 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 3 WITH XML 2021-04-24 12:22:51:590 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 2 WITH XML 2021-04-24 12:22:52:588 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 4 WITH XML 2021-04-24 12:22:52:589 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO com.polarsparc.si.p2.StrUpCase4MainXml - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 5 WITH XML 2021-04-24 12:22:52:590 [com.polarsparc.si.p2.StrUpCase4MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.221 s [INFO] Finished at: 2021-04-24T12:22:52-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that acts as an endpoint that converts the input text to upper-case (with an artificially introduced delay):
/* * Name: StrUpCaseHandler4 * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration public class StrUpCaseHandler4 { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler4.class); @ServiceActivator(inputChannel = "inChannel", outputChannel = "outChannel") public String handler(String msg) { LOGGER.info("INPUT: msg = {}", msg); try { Thread.sleep(1000); } catch (Exception ignored) { } return msg.toUpperCase(); } }
The following is the Java Config based POJO that defines the input and output channels and the executor thread pool similar to the one defined in the XML configuration file of Listing.17 above:
/* * Name: StrUpCase4Config * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.ExecutorChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.MessageChannel; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableIntegration public class StrUpCase4Config { @Bean public MessageChannel inChannel() { return new ExecutorChannel(threadPoolTaskExecutor()); } @Bean public MessageChannel outChannel() { return new QueueChannel(5); } @Bean(name = "executor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setQueueCapacity(5); executor.setCorePoolSize(3); executor.setBeanName("executor"); return executor; } }
Some aspects of the Listing.20 from the above needs a little explanation.
The class org.springframework.integration.channel.ExecutorChannel is an implementation of the channel that delegates the dispatching of messages to an underlying thread pool for further processing
The class org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor implements a thread pool is similar to the executor defined in XML based configuration
And finally, the following is the main application that uses the POJOs from Listing.10, Listing.19, and Listing.20 to publish data that is consumed and processed by a pool of threads:
/* * Name: StrUpCase4MainConfig * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import com.polarsparc.si.p1.StrUpCaseGateway2; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; public class StrUpCase4MainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase4MainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCase4Config.class, StrUpCaseHandler4.class, StrUpCaseGateway2.class); StrUpCaseGateway2 gateway = (StrUpCaseGateway2) context.getBean("gateway"); for (int i = 1; i <= 5; i++) { gateway.send("Spring Integration using Executor - " + i + " with Config"); } for (int i = 1; i <= 5; i++) { String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } ThreadPoolTaskExecutor executor = (ThreadPoolTaskExecutor) context.getBean("executor"); while (executor.getActiveCount() != 0) { try { Thread.sleep(1000); } catch (Exception ignored) { } } executor.shutdown(); } }
To execute the code from Listing.21, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.StrUpCase4MainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 12:27:25:339 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 12:27:25:345 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 12:27:25:348 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 12:27:25:384 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 12:27:25:423 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 12:27:25:465 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 'executor' 2021-04-24 12:27:25:589 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-24 12:27:25:688 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 12:27:25:690 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-04-24 12:27:25:691 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 12:27:25:691 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler4.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-04-24 12:27:25:692 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-04-24 12:27:25:693 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler4.handler.serviceActivator' 2021-04-24 12:27:25:695 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-24 12:27:25:695 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-24 12:27:25:696 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 12:27:25:715 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:27:25:718 [executor-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:27:25:718 [executor-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:27:25:721 [executor-3] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 3 with Config 2021-04-24 12:27:25:721 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 2 with Config 2021-04-24 12:27:25:721 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 1 with Config 2021-04-24 12:27:26:723 [executor-1] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 4 with Config 2021-04-24 12:27:26:724 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 3 WITH CONFIG 2021-04-24 12:27:26:724 [executor-2] INFO com.polarsparc.si.p2.StrUpCaseHandler4 - INPUT: msg = Spring Integration using Executor - 5 with Config 2021-04-24 12:27:26:726 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 1 WITH CONFIG 2021-04-24 12:27:26:727 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 2 WITH CONFIG 2021-04-24 12:27:27:725 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 4 WITH CONFIG 2021-04-24 12:27:27:726 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO com.polarsparc.si.p2.StrUpCase4MainConfig - OUTPUT: SPRING INTEGRATION USING EXECUTOR - 5 WITH CONFIG 2021-04-24 12:27:27:728 [com.polarsparc.si.p2.StrUpCase4MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Shutting down ExecutorService 'executor' [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 3.220 s [INFO] Finished at: 2021-04-24T12:27:27-04:00 [INFO] ------------------------------------------------------------------------
As can be inferred from the Output.7 and Output.8 above, Spring Integration is using the configured thread pool executor to process messages.
Exception Handling
In Spring Integration, when a message is processed by an endpoint via a synchronous channel (without any queue or executor), any exception(s) thrown by the endpoint is sent back to the caller in the callers thread. In the case of an asynchronous channel, the message is processed by the endpoint in a thread that is different from the callers thread and the only way to handle the exception(s) is to configure an error-channel, where the exception(s) can be logged.
XML based Approach |
For demonstrating the error handling capability, we will use a different example - the use-case of checking if a user-id is valid and throwing an exception if not valid.
The following is the POJO that checks if the specified user-id is valid. Else, it throws a run-time exception:
/* * Name: CheckUserHandler * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.List; public class CheckUserHandler { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserHandler.class); Listusers = Arrays.asList("alice", "bob", "charlie"); public boolean checkUser(String id) { LOGGER.info("Check user = {}", id); if (!users.contains(id)) { throw new RuntimeException("User " + id + " is invalid"); } return true; } }
The following is the POJO that handles the run-time exception:
/* * Name: CheckUserErrorHandler * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; public class CheckUserErrorHandler { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserErrorHandler.class); @ServiceActivator(inputChannel = "errChannel") public void handleError(Message<MessageHandlingException> msg) { LOGGER.error("Exception message = {}", msg); } }
Spring Integration wraps all exceptions into the class org.springframework.messaging.MessageHandlingException.
The following is the simple interface to check the validity of an user-id:
/* * Name: CheckUserGateway * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; public interface CheckUserGateway { public boolean checkUser(String id); }
The following is the XML based Spring Integration configuration that wires up the channels, a gateway with with the appropriate error-channel, and the endpoints:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:task="http://www.springframework.org/schema/task" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <bean id="inMsgHandler" class="com.polarsparc.si.p2.CheckUserHandler" /> <bean id="errMsgHandler" class="com.polarsparc.si.p2.CheckUserErrorHandler" /> <task:executor id="executor" queue-capacity="5" pool-size="3" /> <int:channel id="inChannel"> <int:dispatcher task-executor="executor" /> </int:channel> <!-- The primary purpose of a Gateway is to hide the messaging API provided by Spring Integration --> <int:gateway id="gateway" service-interface="com.polarsparc.si.p2.CheckUserGateway" default-request-channel="inChannel" error-channel="errChannel" /> <int:service-activator input-channel="inChannel" ref="inMsgHandler" method="checkUser" /> <int:service-activator input-channel="errChannel" ref="errMsgHandler" method="handleError" /> </beans>
Only one aspect of the Listing.25 from the above needs a little explanation.
The error-channel attribute of the gateway element specifies the channel to which exception messages need to be routed. In our example, we have defined a service-activator to handle exceptions
The following is our main application to test exception handling in Spring Integration:
/* * Name: CheckUserMainXml * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class CheckUserMainXml { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserMainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p2/CheckUser.xml"); CheckUserGateway gateway = (CheckUserGateway) context.getBean("gateway"); LOGGER.info("CheckUser for alice: {} with Xml", Boolean.toString(gateway.checkUser("alice"))); LOGGER.info("CheckUser for john: {} with Xml", Boolean.toString(gateway.checkUser("john"))); } }
To execute the code from Listing.26, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.CheckUserMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-24 12:48:16:705 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 12:48:16:710 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 12:48:16:714 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 12:48:16:759 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor - Initializing ExecutorService 2021-04-24 12:48:16:908 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 12:48:17:012 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-04-24 12:48:17:013 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.ExecutorChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.inChannel' has 1 subscriber(s). 2021-04-24 12:48:17:014 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-04-24 12:48:17:014 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'errChannel' channel 2021-04-24 12:48:17:015 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errChannel' has 1 subscriber(s). 2021-04-24 12:48:17:016 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 2021-04-24 12:48:17:016 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 12:48:17:017 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errorChannel' has 1 subscriber(s). 2021-04-24 12:48:17:018 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 12:48:17:020 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#checkUser(String)' 2021-04-24 12:48:17:021 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 12:48:17:036 [executor-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:48:17:043 [executor-1] INFO com.polarsparc.si.p2.CheckUserHandler - Check user = alice 2021-04-24 12:48:17:045 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO com.polarsparc.si.p2.CheckUserMainXml - CheckUser for alice: true with Xml 2021-04-24 12:48:17:047 [executor-2] INFO com.polarsparc.si.p2.CheckUserHandler - Check user = john 2021-04-24 12:48:17:054 [com.polarsparc.si.p2.CheckUserMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:48:17:056 [com.polarsparc.si.p2.CheckUserMainXml.main()] ERROR com.polarsparc.si.p2.CheckUserErrorHandler - Exception message = ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@62b3a0b]; nested exception is java.lang.RuntimeException: User john is invalid, failedMessage=GenericMessage [payload=john, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d242962, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d242962, id=4d37bdf5-2f87-461b-9ca3-069fd7b3b9bc, timestamp=1619282897047}], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e640abb, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@5e640abb, id=7486dc01-1677-14eb-688a-93632a1aafa5, timestamp=1619282897054}] --- CTRL-C ---
Java Config based Approach |
The following is the Java Config based POJO that acts as an endpoint that checks if the specified user-id is valid. Else, it throws a run-time exception:
/* * Name: CheckUserHandler2 * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import java.util.Arrays; import java.util.List; @Configuration @EnableIntegration public class CheckUserHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserHandler2.class); Listusers = Arrays.asList("alice", "bob", "charlie"); @ServiceActivator(inputChannel = "inChannel") public boolean checkUser(String id) { LOGGER.info("Check user = {}", id); if (!users.contains(id)) { throw new RuntimeException("User " + id + " is invalid"); } return true; } }
The following is the Java Config based POJO that acts as an endpoint that handles the run-time exception:
/* * Name: CheckUserErrorHandler2 * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandlingException; @Configuration @EnableIntegration public class CheckUserErrorHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserErrorHandler2.class); @ServiceActivator(inputChannel = "errChannel") public void handleError(Message<MessageHandlingException> msg) { LOGGER.error("Exception message = {}", msg); } }
The following is the Java Config based POJO that acts as a custom gateway interface that checks the validity of an user-id and defines the request and error channels:
/* * Name: CheckUserGateway2 * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration @IntegrationComponentScan @MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel", errorChannel = "errChannel") public interface CheckUserGateway2 { public boolean checkUser(String id); }
And finally, the following is the main application that uses the POJOs from Listing.27, Listing.28, and Listing.29 to test exception handling:
/* * Name: CheckUserMainConfig * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CheckUserMainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(CheckUserMainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(CheckUserHandler2.class, CheckUserErrorHandler2.class, CheckUserGateway2.class); CheckUserGateway2 gateway = (CheckUserGateway2) context.getBean("gateway"); LOGGER.info("CheckUser for alice: {} with Config", Boolean.toString(gateway.checkUser("alice"))); LOGGER.info("CheckUser for john: {} with Config", Boolean.toString(gateway.checkUser("john"))); } }
To execute the code from Listing.30, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p2.CheckUserMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-24 12:55:26:520 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-24 12:55:26:526 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-24 12:55:26:530 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-24 12:55:26:571 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 12:55:26:607 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-24 12:55:26:735 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-24 12:55:26:846 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-24 12:55:26:847 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.errorChannel' has 1 subscriber(s). 2021-04-24 12:55:26:848 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-24 12:55:26:848 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:checkUserHandler2.checkUser.serviceActivator} as a subscriber to the 'inChannel' channel 2021-04-24 12:55:26:849 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.inChannel' has 1 subscriber(s). 2021-04-24 12:55:26:850 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'checkUserHandler2.checkUser.serviceActivator' 2021-04-24 12:55:26:850 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:checkUserErrorHandler2.handleError.serviceActivator} as a subscriber to the 'errChannel' channel 2021-04-24 12:55:26:851 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@b1f9496.errChannel' has 1 subscriber(s). 2021-04-24 12:55:26:851 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'checkUserErrorHandler2.handleError.serviceActivator' 2021-04-24 12:55:26:852 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#checkUser(String)' 2021-04-24 12:55:26:853 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-24 12:55:26:865 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:55:26:868 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserHandler2 - Check user = alice 2021-04-24 12:55:26:870 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserMainConfig - CheckUser for alice: true with Config 2021-04-24 12:55:26:871 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO com.polarsparc.si.p2.CheckUserHandler2 - Check user = john 2021-04-24 12:55:26:878 [com.polarsparc.si.p2.CheckUserMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-24 12:55:26:879 [com.polarsparc.si.p2.CheckUserMainConfig.main()] ERROR com.polarsparc.si.p2.CheckUserErrorHandler2 - Exception message = ErrorMessage [payload=org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1ee88cfc]; nested exception is java.lang.RuntimeException: User john is invalid, failedMessage=GenericMessage [payload=john, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@183d1409, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@183d1409, id=b174d92e-3559-9d82-7ff6-b75a2c389034, timestamp=1619283326871}], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d477d5a, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@6d477d5a, id=9317d5f2-87f8-5add-bb8a-533050d57d32, timestamp=1619283326878}] --- CTRL-C ---
As can be inferred from the Output.9 and Output.10 above, Spring Integration is wrapping the generated exception into MessageHandlingException and routing it to our exception handler.
References