PolarSPARC |
Spring Integration Notes :: Part - 1
Bhaskar S | 04/17/2021 (UPDATED) |
Overview
Any Enterprise (small or large) typically has different application systems (some vendor and some custom built) and often times there is a need to integrate two or more of these systems. As we all know, application systems integration in an enterprise is always complex and challenging.
The book Enterprise Integration Patterns (EIP) captures and catalogs various enterprise integration patterns.
Spring Integration implements the various components and patterns described in the book and attempts to simplify enterprise integration.
Setup
The setup will be on a Ubuntu 20.04 LTS based Linux desktop. Ensure at least Java 11 or above is installed and setup. Also, ensure Apache Maven is installed and setup.
We will be using Spring Integration along with Spring Framework for all the demonstrations. The following are the versions at the time of this article:
Spring Framework ⇨ 5.3.5
Spring Integration ⇨ 5.4.5
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME
$ mkdir -p $HOME/java/SpringIntegration
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java src/main/resources target
$ mkdir -p src/main/java/com/polarsparc/si
$ mkdir -p src/main/java/com/polarsparc/si/p1
$ mkdir -p src/main/resources/p1
The following is the listing for the Maven project file pom.xml that will be used:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> </dependencies> </project>
The following is the listing for the slf4j-simple logger properties file simplelogger.properties:
# ### SLF4J Simple Logger properties # org.slf4j.simpleLogger.defaultLogLevel=info org.slf4j.simpleLogger.showDateTime=true org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS org.slf4j.simpleLogger.showThreadName=true
In Spring Integration, one can provide the integration flow configuration information using one of the following 3 approaches:
XML based ⇨ via XML configuration files
Java Config based ⇨ using POJO classes and some annotations
Java DSL based ⇨ using convenient builders and a fluent API and some annotations
We will demonstrate examples using the 2 approaches - XML based and Java Config based.
Concepts
Spring Integration uses the pipes-and-filter based messaging paradigm (similar to two or more commands separated by pipes in Unix/Linux) for integration, resulting in a loosely-coupled, modular and extensible design.
In Spring Integration parlance, the pipes are referred to as the Channels and the filters are referred to as the Endpoints. The data that passes through the channels and the endpoints is referred to as the Messages.
In other words, the following are the three core components in Spring Integration:
Message ⇨ encapsulates the data that is exchanged during the integration of two components. Under the hood, it consists of two parts - the payload, which wraps the actual data, and the header properties, which wraps additional metadata, such as the message-id, time-stamp, priority, correlation-id, etc
Channel ⇨ is a logical destination where a message is delivered. They can either be consumed from (input) or published to (output).
A consumer can operate on a channel in either a point-to-point mode or a publish-subscribe mode.
A channel can also buffer incoming messages in memory. This allows for throttling of the input messages and preventing the consumer from being overwhelmed
Endpoint ⇨ is the component that processes messages by consuming from an input channel and producing response messages to an output channel. They are the connection point(s) of an application for integration
Hands-on Spring Integration
Now, let us dive into a simple hello-world like example using Spring Integration. In this example:
The main application will publish a message containing a text payload to an input channel
The message is consumed from the input channel by a POJO and the text is converted to upper-case
The resulting upper-case text wrapped in a message and published to an output channel
The main application consumes the message from the output channel and displays the result
XML based Approach |
The following is the POJO that converts the input text to upper-case. This will act as an endpoint:
/* * Name: StrUpCaseHandler * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class StrUpCaseHandler { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler.class); public String handler(String msg) { LOGGER.info("INPUT: msg = {}", msg); return msg.toUpperCase(); } }
Next, the following is the XML based Spring Integration configuration that wires up the channels and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <bean id="inMsgHandler" class="com.polarsparc.si.p1.StrUpCaseHandler" /> <int:channel id="inChannel" /> <int:channel id="outChannel"> <int:queue capacity="5" /> </int:channel> <int:service-activator input-channel="inChannel" output-channel="outChannel" ref="inMsgHandler" method="handler" /> </beans>
Some aspects of the Listing.2 from the above needs a little explanation.
Notice the XML namespace definition for Spring Integration with the prefix int - xmlns:int="http://www.springframework.org/schema/integration"
We define a bean with an id of inMsgHandler to be an instance of our POJO com.polarsparc.si.p1.StrUpCaseHandler from Listing.1
We define an input channel with an id of inChannel.
Under the hood, Spring Integration will use an instance of org.springframework.integration.channel.DirectChannel which implements the interface org.springframework.messaging.SubscribableChannel. This type of a channel is one of the simplest implementations with a point-to-point semantics and the message is directly delivered to the subscribed consumer
We define an output channel with an id of outChannel. This is a queue backed channel with a maximum capacity of 5 messages. The queue is used for buffering messages and to throttle the consumer.
Under the hood, Spring Integration will use an instance of org.springframework.integration.channel.QueueChannel which implements the interface org.springframework.messaging.PollableChannel. This type of a channel is point-to-point with only one consumer processing a message at a time
Finally, we define the service-activator, which is a type of endpoint that invokes a specified method on a specified bean. In this case, the endpoint will invoke the method handler on the bean with the id of inChannel
In other words, a Service Activator invokes an operation on some service object to process the incoming message, extracting the message payload and converting it to the type of the operations input parameter type. Once the service operation completes and returns a value, that return value is wrapped in a message and sent to an output channel
And finally, the following is the main application that publishes and consumes data:
/* * Name: StrUpCaseMainXml * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.integration.support.MessageBuilder; public class StrUpCaseMainXml { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseMainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p1/StrUpCase.xml"); Message<?> in = MessageBuilder.withPayload("Hello Spring Integration with Xml").build(); MessageChannel inMsg = (MessageChannel) context.getBean("inChannel"); PollableChannel outMsg = (PollableChannel) context.getBean("outChannel"); inMsg.send(in); Message<?> out = outMsg.receive(); LOGGER.info("OUTPUT: {}", out); } }
Some aspects of the Listing.3 from the above needs a little explanation.
We first create a Spring ApplicationContext using the XML configuration from Listing.2
A message in Spring Integration is defined by the interface org.springframework.messaging.Message.
We create an instance of message using the builder class org.springframework.integration.support.MessageBuilder
A channel in Spring Integration is defined by the interface org.springframework.messaging.MessageChannel.
We fetch the input channel with the id of inChannel from the Spring context
We fetch the queue backed output channel with the id of outChannel from the Spring context
We publish the created message to the input channel
Finally, we poll for a response message from the output channel and display the response
To execute the code from Listing.3, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p1.StrUpCaseMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-17 20:46:56:309 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-17 20:46:56:313 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-17 20:46:56:316 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-17 20:46:56:534 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-17 20:46:56:567 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-04-17 20:46:56:568 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.inChannel' has 1 subscriber(s). 2021-04-17 20:46:56:569 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-04-17 20:46:56:569 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-17 20:46:56:570 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.errorChannel' has 1 subscriber(s). 2021-04-17 20:46:56:571 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-17 20:46:56:581 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-17 20:46:56:586 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler - INPUT: msg = Hello Spring Integration with Xml 2021-04-17 20:46:56:588 [com.polarsparc.si.p1.StrUpCaseMainXml.main()] INFO com.polarsparc.si.p1.StrUpCaseMainXml - OUTPUT: GenericMessage [payload=HELLO SPRING INTEGRATION WITH XML, headers={id=d8ec9e77-b73a-b38f-f4c3-bade5efbb55a, timestamp=1619138816588}] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.246 s [INFO] Finished at: 2021-04-17T20:46:56-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that defines the input and output channels similar to the one defined in the XML configuration file of Listing.2 above:
/* * Name: StrUpCaseConfig * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.MessageChannel; @Configuration @EnableIntegration public class StrUpCaseConfig { @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new QueueChannel(5); } }
Notice the use of 3 annotations @Configuration, @EnableIntegration, and @Bean in the Listing.4 above.
@Configuration is a class level annotation and is used to tag a class as the source of bean definitions. It is similar to the XML element <beans> from the XML based configuration file.
@EnableIntegration is a class level annotation that is used to register the core components of the Spring Integration framework.
@Bean is a method level annotation and corresponds to the XML element <bean> from the XML based configuration file. By default, the method name is used as the name of the bean.
One will encounter the following exception (at runtime) if the annotation @EnableIntegration is not used:
Exception in thread "main" org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel
The following is the Java Config based POJO that acts as an endpoint that converts the input text to upper-case:
/* * Name: StrUpCaseHandler2 * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration public class StrUpCaseHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseHandler2.class); @ServiceActivator(inputChannel = "inChannel", outputChannel = "outChannel") public String handler(String msg) { LOGGER.info("INPUT: msg = {}", msg); return msg.toUpperCase(); } }
Notice the use of a new annotation @ServiceActivator in the Listing.5 above.
@ServiceActivator is a method level annotation and corresponds to the XML element <bean> with the id of inMsgHandler from the XML based configuration file in Listing.2 above. In addition, it specifies the names of the associated input and output channels for this endpoint.
And finally, the following is the main application that uses the POJOs from Listing.4 and Listing.5 to publish and consume data:
/* * Name: StrUpCaseMainConfig * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; public class StrUpCaseMainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCaseMainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCaseConfig.class, StrUpCaseHandler2.class); Message<?> in = MessageBuilder.withPayload("Hello Spring Integration with Config").build(); MessageChannel inMsg = (MessageChannel) context.getBean("inChannel"); PollableChannel outMsg = (PollableChannel) context.getBean("outChannel"); inMsg.send(in); Message<?> out = outMsg.receive(); LOGGER.info("OUTPUT: {}", out); } }
To execute the code from Listing.6, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p1.StrUpCaseMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-17 20:49:11:718 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-17 20:49:11:724 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-17 20:49:11:728 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-17 20:49:11:770 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-17 20:49:11:774 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-17 20:49:11:971 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-17 20:49:12:046 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-17 20:49:12:047 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-04-17 20:49:12:047 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-17 20:49:12:048 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-04-17 20:49:12:049 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-04-17 20:49:12:049 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler2.handler.serviceActivator' 2021-04-17 20:49:12:056 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-17 20:49:12:060 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler2 - INPUT: msg = Hello Spring Integration with Config 2021-04-17 20:49:12:062 [com.polarsparc.si.p1.StrUpCaseMainConfig.main()] INFO com.polarsparc.si.p1.StrUpCaseMainConfig - OUTPUT: GenericMessage [payload=HELLO SPRING INTEGRATION WITH CONFIG, headers={id=55d5c657-3b9a-a929-af78-ffc51a82de1e, timestamp=1619138952062}] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.103 s [INFO] Finished at: 2021-04-17T20:49:12-04:00 [INFO] ------------------------------------------------------------------------
HOORAY !!! We have successfully demonstrated a simple application using Spring Integration.
One observation from the code in Listing.3 (OR Listing.6) is that we have a tight coupling between the core application logic and Spring Integration. In other words, the application logic directly references and uses some of the Spring Integration components such as Message, MessageBuilder, MessageChannel and PollableChannel.
It is desirable to have an application built with no direct and tight coupling with the core Spring Integration components.
Let us go ahead and eliminate the direct references to Spring Integration components from the application logic in Listing.3 (AND Listing.6).
XML based Approach |
The following is a simple interface to send and receive text data:
/* * Name: StrUpCaseGateway * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; public interface StrUpCaseGateway { public String receive(); public void send(String msg); }
Next, the following is the modified XML based Spring Integration configuration that wires up the channels and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> <bean id="inMsgHandler" class="com.polarsparc.si.p1.StrUpCaseHandler" /> <int:channel id="inChannel" /> <int:channel id="outChannel"> <int:queue capacity="5" /> </int:channel> <!-- The primary purpose of a Gateway is to hide the messaging API provided by Spring Integration --> <int:gateway id="gateway" service-interface="com.polarsparc.si.p1.StrUpCaseGateway" default-request-channel="inChannel" default-reply-channel="outChannel" /> <int:service-activator input-channel="inChannel" output-channel="outChannel" ref="inMsgHandler" method="handler" /> </beans>
Most of the XML configuration is similar to that of Listing.2 except for one aspect that needs a little explanation.
A gateway is a type of proxy that hides the underlying messaging components of Spring Integration from the core application.
In order to use the gateway pattern, we need to define a simple interface that exposes the desired functionality (sending and receiving data in this example). This is exactly what we did with our custom interface in Listing.7.
Under the hood, Spring Integration makes uses of org.springframework.integration.gateway.GatewayProxyFactoryBean to dynamically generate a proxy for the specified custom interface and hides all the interaction with messaging components. The application will only invoke the methods on the custom interface
The following is our modified main application that uses the methods in our custom interface from Listing.7:
/* * Name: StrUpCase2MainXml * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class StrUpCase2MainXml { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase2MainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p1/StrUpCase2.xml"); StrUpCaseGateway gateway = (StrUpCaseGateway) context.getBean("gateway"); gateway.send("Spring Integration using Gateway with Xml"); String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } }
As can be seen from the code in Listing.9, the core application logic does not use any of the Spring Integration components.
To execute the code from Listing.9, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p1.StrUpCase2MainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-04-17 20:51:11:137 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-17 20:51:11:141 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-17 20:51:11:145 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-17 20:51:11:357 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-17 20:51:11:450 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-04-17 20:51:11:451 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.inChannel' has 1 subscriber(s). 2021-04-17 20:51:11:451 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-04-17 20:51:11:452 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-17 20:51:11:453 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.errorChannel' has 1 subscriber(s). 2021-04-17 20:51:11:453 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-17 20:51:11:454 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-17 20:51:11:455 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-17 20:51:11:456 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-17 20:51:11:466 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-17 20:51:11:470 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler - INPUT: msg = Spring Integration using Gateway with Xml 2021-04-17 20:51:11:472 [com.polarsparc.si.p1.StrUpCase2MainXml.main()] INFO com.polarsparc.si.p1.StrUpCase2MainXml - OUTPUT: SPRING INTEGRATION USING GATEWAY WITH XML [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.238 s [INFO] Finished at: 2021-04-17T20:51:11-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that defines the gateway similar to the one defined in the XML configuration file of Listing.8 above:
/* * Name: StrUpCaseGateway2 * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration @IntegrationComponentScan @MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel", defaultReplyChannel = "outChannel") public interface StrUpCaseGateway2 { public String receive(); public void send(String msg); }
Notice the use of 2 new annotations @IntegrationComponentScan and @MessagingGateway in the Listing.10 above.
@MessagingGateway is a method level annotation and corresponds to the XML element <gateway> with the id of gateway from the XML based configuration file in Listing.8 above. In addition, it also specifies the names of the associated request and reply channels.
@IntegrationComponentScan is a class level annotation and scans Java POJOs for the annotation @MessagingGateway to dynamically generate a proxy for the specified custom gateway interface.
One will encounter the following exception (at runtime) if the annotation @IntegrationComponentScan is not used:
Exception in thread "main" org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'gateway' available
And finally, the following is the main application that uses the POJOs from Listing.4, Listing.5, and Listing.10 to publish and consume data:
/* * Name: StrUpCase2MainConfig * Author: Bhaskar S * Date: 04/16/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p1; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class StrUpCase2MainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(StrUpCase2MainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(StrUpCaseConfig.class, StrUpCaseHandler2.class, StrUpCaseGateway2.class); StrUpCaseGateway2 gateway = (StrUpCaseGateway2) context.getBean("gateway"); gateway.send("Spring Integration using Gateway with Config"); String msg = gateway.receive(); LOGGER.info("OUTPUT: {}", msg); } }
To execute the code from Listing.11, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p1.StrUpCase2MainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-04-17 20:55:31:638 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-04-17 20:55:31:644 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-04-17 20:55:31:648 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-04-17 20:55:31:685 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-17 20:55:31:721 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-04-17 20:55:31:875 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-04-17 20:55:31:992 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-04-17 20:55:31:993 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-04-17 20:55:31:994 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-04-17 20:55:31:995 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:strUpCaseHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-04-17 20:55:31:996 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-04-17 20:55:31:997 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'strUpCaseHandler2.handler.serviceActivator' 2021-04-17 20:55:31:998 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#send(String)' 2021-04-17 20:55:31:999 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#receive()' 2021-04-17 20:55:32:000 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-04-17 20:55:32:014 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-04-17 20:55:32:017 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO com.polarsparc.si.p1.StrUpCaseHandler2 - INPUT: msg = Spring Integration using Gateway with Config 2021-04-17 20:55:32:019 [com.polarsparc.si.p1.StrUpCase2MainConfig.main()] INFO com.polarsparc.si.p1.StrUpCase2MainConfig - OUTPUT: SPRING INTEGRATION USING GATEWAY WITH CONFIG [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.290 s [INFO] Finished at: 2021-04-17T20:55:32-04:00 [INFO] ------------------------------------------------------------------------
BINGO !!! We have successfully demonstrated a simple application using Spring Integration without the tight coupling.
References