PolarSPARC |
Spring Integration Notes :: Part - 6
Bhaskar S | 05/22/2021 (UPDATED) |
Overview
In Part-5, we covered basic examples of Spring Integration relating to the AMQP (using RabbitMQ) and Apache Kafka channel adapters.
We will continue our journey by exploring some basic examples relating to the following concepts in Spring Integration:
Chaining Endpoints
Header Enrichment
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p6
$ mkdir -p src/main/resources/p6
The following is the listing for the updated Maven project file pom.xml to add the dependencies for the stream and Lombok support:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-amqp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.3.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.19</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> </dependencies> </project>
Hands-on Spring Integration
We will consider a hypothetical car service use-case to demonstrate the various concepts in this part.
Chaining Endpoints
In this hypothetical example, as cars come in for servicing, we want to assign each car, a reference number on arrival, and then based on the current miles on the car, assign recommended services.
XML based Approach |
The following is the Car POJO that encapsulates the car make, the current miles, the reference number, and a java.util.Map of services:
/* * Name: Car * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import lombok.Getter; import lombok.Setter; import lombok.ToString; import java.util.Map; import java.util.TreeMap; @Getter @Setter @ToString public class Car { private String ref; private String make; private long miles; private Map<String, Float> services = new TreeMap<>(); public Car(String make, long miles) { this.make = make; this.miles = miles; } public static Car makeClone(Car car) { return new Car(car.make, car.miles); } }
The following handler is a Java POJO that assigns a reference number to an arriving Car:
/* * Name: RefNoHandler * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class RefNoHandler { private static final Logger LOGGER = LoggerFactory.getLogger(RefNoHandler.class); private static int nextNo = 1; private static String REF_NO_FMT = "%s-%05d"; public Message<Car> assignRefNo(Message<Car> input) { input.getPayload().setRef(String.format(REF_NO_FMT, input.getPayload().getMake().substring(0, 1), nextNo++)); LOGGER.info("Input: {} (in Xml)", input.toString()); return input; } }
The following handler is a Java POJO that assigns the recommended services for an arriving Car based on the current miles:
/* * Name: CarServiceHandler * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class CarServiceHandler { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceHandler.class); private static String OIL_CHANGE = "OIL_CHANGE"; private static String AIR_FILTER = "AIR_FILTER"; private static String WHEEL_BALANCE = "WHEEL_BALANCE"; public Message<?> recommendedServices(Message<Car> input) { input.getPayload().getServices().put(OIL_CHANGE, (float) 29.95); input.getPayload().getServices().put(AIR_FILTER, (float) 19.95); if (input.getPayload().getMiles() % 15000 == 0) { input.getPayload().getServices().put(WHEEL_BALANCE, (float) 69.95); } LOGGER.info("Input: {} (in Xml)", input.toString()); return input; } }
The following is a simple interface to service a Car:
/* * Name: CarServiceGateway * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; public interface CarServiceGateway { public void serviceCar(Car car); }
The following is the XML based Spring Integration configuration that wires up the channels and the endpoints as a chain:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <int:channel id="inChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:chain input-channel="inChannel" output-channel="outChannel"> <int:service-activator ref="refNoHandler" method="assignRefNo" /> <int:service-activator ref="carServiceHandler" method="recommendedServices" /> </int:chain> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.77 from the above needs a little explanation.
The element chain defines a single logical endpoint, which under-the-hood actually delegates to a chain of endpoints. Internally, the chain creates a linear list of the specified endpoints connected by direct channels. Under-the-hood it uses the Spring Framework class org.springframework.integration.handler.MessageHandlerChain to implement the chain
The stream stdout-channel-adapter element allows one to stream the message payload to the standard output (stdout).
Under-the-hood, Spring Integration uses org.springframework.integration.stream.CharacterStreamWritingMessageHandler to stream the message payload.
For application POJOs in the message payload, ensure that the toString() method is implemented
The following is our main application to test the hypothetical car service:
/* * Name: CarServiceMainXml * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class CarServiceMainXml { public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext(args[0]); CarServiceGateway gateway = (CarServiceGateway) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.78 with the argument from Listing.77, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p6/CarService.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:01:38:134 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc. 2021-05-22 14:01:38:211 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:01:38:214 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:01:38:222 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:01:38:411 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:01:38:623 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p6/CarService.xml]'; from source: ''int:chain'' 2021-05-22 14:01:38:624 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel 2021-05-22 14:01:38:625 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.inChannel' has 1 subscriber(s). 2021-05-22 14:01:38:626 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:01:38:626 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:01:38:627 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.outChannel' has 1 subscriber(s). 2021-05-22 14:01:38:628 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [p6/CarService.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:01:38:628 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:01:38:629 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.errorChannel' has 1 subscriber(s). 2021-05-22 14:01:38:630 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:01:38:631 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:01:38:631 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:01:38:643 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:01:38:654 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=25ea080f-0449-8eb3-5a08-e42712779f76, timestamp=1621706498643}] (in Xml) 2021-05-22 14:01:38:656 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:01:38:657 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=25ea080f-0449-8eb3-5a08-e42712779f76, timestamp=1621706498643}] (in Xml) Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:01:38:660 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=9b7c4304-639a-4189-2596-2152694af8e2, timestamp=1621706498659}] (in Xml) 2021-05-22 14:01:38:661 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=9b7c4304-639a-4189-2596-2152694af8e2, timestamp=1621706498659}] (in Xml) Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.429 s [INFO] Finished at: 2021-05-22T14:01:38-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO handler that assigns a reference number to an arriving Car:
/* * Name: RefNoHandler2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class RefNoHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(RefNoHandler2.class); private static int nextNo = 1; private static String REF_NO_FMT = "%s-%05d"; @ServiceActivator public Message<Car> assignRefNo(Message<Car> input) { input.getPayload().setRef(String.format(REF_NO_FMT, input.getPayload().getMake().substring(0, 1), nextNo++)); LOGGER.info("Input: {} (in Config)", input.toString()); return input; } }
The following is the Java Config based POJO handler that assigns the recommended services for an arriving Car based on the current miles:
/* * Name: CarServiceHandler2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class CarServiceHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceHandler2.class); private static String OIL_CHANGE = "OIL_CHANGE"; private static String AIR_FILTER = "AIR_FILTER"; private static String WHEEL_BALANCE = "WHEEL_BALANCE"; @ServiceActivator public Message<?> recommendedServices(Message<Car> input) { input.getPayload().getServices().put(OIL_CHANGE, (float) 29.95); input.getPayload().getServices().put(AIR_FILTER, (float) 19.95); if (input.getPayload().getMiles() % 15000 == 0) { input.getPayload().getServices().put(WHEEL_BALANCE, (float) 69.95); } LOGGER.info("Input: {} (in Config)", input.toString()); return input; } }
The following is the Java Config based simple interface to service a Car:
/* * Name: CarServiceGateway2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration @IntegrationComponentScan @MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel") public interface CarServiceGateway2 { public void serviceCar(Car car); }
The following is the Java Config based POJO that defines the channels and the endpoints as a chain:
/* * Name: CarServiceConfig * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MessageHandlerChain; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Arrays; @Configuration @EnableIntegration public class CarServiceConfig { private RefNoHandler2 refNoHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean public MessageHandler refNoEndpoint2() { return new ServiceActivatingHandler(refNoMessageHandler2()); } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean public MessageHandler carServiceEndpoint2() { return new ServiceActivatingHandler(carServiceMessageHandler2()); } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler chainEndpoints() { MessageHandlerChain chain = new MessageHandlerChain(); chain.setHandlers(Arrays.asList(refNoEndpoint2(), carServiceEndpoint2())); chain.setOutputChannel(outChannel()); return chain; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
Some aspects of the Listing.82 from the above needs a little explanation.
The class org.springframework.integration.handler.MessageHandlerChain is a composite endpoint that invokes a chain of the specified endpoints in the order. This is equivalent to the chain from the XML configuration from Listing.77 above
The class org.springframework.integration.handler.MethodInvokingMessageProcessor is an implementation of a message processor that invokes a method on the specified target Object
The class org.springframework.integration.handler.ServiceActivatingHandler wraps and exposes a message processor as an endpoint
The class org.springframework.integration.stream.CharacterStreamWritingMessageHandler is an implementation of a message handler that writes the character stream to a java.io.Writer. The static method stdout() is a factory method that creates an instance of this class to write to the standard output
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, and Listing.82 to test the hypothetical car service:
/* * Name: CarServiceMainConfig * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, CarServiceConfig.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.83, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:10:06:603 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:10:06:609 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:10:06:618 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:10:06:674 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:10:06:690 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:10:06:824 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:10:07:063 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:10:07:064 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s). 2021-05-22 14:10:07:065 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:10:07:066 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:10:07:067 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s). 2021-05-22 14:10:07:068 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p6.CarServiceConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@6826ec1a' 2021-05-22 14:10:07:069 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig.chainEndpoints.serviceActivator' 2021-05-22 14:10:07:070 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:10:07:071 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s). 2021-05-22 14:10:07:071 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig.stdoutAdapter.serviceActivator' 2021-05-22 14:10:07:073 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:10:07:073 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:10:07:088 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:10:07:107 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=0db165fe-48ac-bbcb-a94b-d4a9f631b1b2, timestamp=1621707007088}] (in Config) 2021-05-22 14:10:07:109 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:10:07:110 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=0db165fe-48ac-bbcb-a94b-d4a9f631b1b2, timestamp=1621707007088}] (in Config) Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:10:07:112 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=e99e996f-22a4-023b-52cd-7516a8774c1a, timestamp=1621707007112}] (in Config) 2021-05-22 14:10:07:113 [com.polarsparc.si.p6.CarServiceMainConfig.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=e99e996f-22a4-023b-52cd-7516a8774c1a, timestamp=1621707007112}] (in Config) Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.363 s [INFO] Finished at: 2021-05-22T14:10:07-04:00 [INFO] ------------------------------------------------------------------------
Header Enrichment
In this hypothetical car service example, we will add an additional header property to the incoming message.
XML based Approach |
The following is the XML based Spring Integration configuration that wires up the channels and the endpoints along with header enrichment (add a header property called CAR_MAKE with the value of the Car make from the message payload) as a chain:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <int:channel id="inChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:chain input-channel="inChannel" output-channel="outChannel"> <int:service-activator ref="refNoHandler" method="assignRefNo" /> <int:header-enricher> <int:header name="CAR_MAKE" expression="payload.getMake()"/> </int:header-enricher> <int:service-activator ref="carServiceHandler" method="recommendedServices" /> </int:chain> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.84 from the above needs a little explanation.
The header-enricher element allows one to add additional headers properties to the incoming message.
To add a header property, use the header sub-element under the header-enricher element.
To add a header property with a name SOME_HEADER with a static value 12345, specify the following header sub-element:
<int:header name="SOME_HEADER" value="12345" />
To add a header property with a name SOME_HEADER2 with a dynamic value by calling a method func() on the message payload, specify the following header sub-element:
<int:header name="SOME_HEADER2" expression="payload.func()" />
To execute the code from Listing.78 with the argument from Listing.84, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p6/CarService2.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:11:53:956 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc. 2021-05-22 14:11:54:035 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:11:54:038 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:11:54:046 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:11:54:240 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:11:54:453 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p6/CarService2.xml]'; from source: ''int:chain'' 2021-05-22 14:11:54:454 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel 2021-05-22 14:11:54:454 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inChannel' has 1 subscriber(s). 2021-05-22 14:11:54:455 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:11:54:456 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:11:54:456 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.outChannel' has 1 subscriber(s). 2021-05-22 14:11:54:457 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [p6/CarService2.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:11:54:457 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:11:54:458 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.errorChannel' has 1 subscriber(s). 2021-05-22 14:11:54:459 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:11:54:460 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:11:54:460 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:11:54:478 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:11:54:488 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=ab6f3ea0-bcd1-f4e1-9057-2e64507766de, timestamp=1621707114477}] (in Xml) 2021-05-22 14:11:54:490 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:11:54:492 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=2da4a1d9-a7b3-82af-c9c6-7b5d115cad0a, CAR_MAKE=Toyota, timestamp=1621707114490}] (in Xml) Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:11:54:494 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=273bfdf0-3290-24ff-c604-fc4733ab13d2, timestamp=1621707114494}] (in Xml) 2021-05-22 14:11:54:496 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=80c35f67-1b02-c039-fb2b-20974bb5ae02, CAR_MAKE=Honda, timestamp=1621707114496}] (in Xml) Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.446 s [INFO] Finished at: 2021-05-22T14:11:54-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that defines the channels and the endpoints along with header enrichment (add a header property called CAR_MAKE with the value of the Car make from the message payload) as a chain:
/* * Name: CarServiceConfig2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MessageHandlerChain; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.integration.transformer.HeaderEnricher; import org.springframework.integration.transformer.MessageTransformingHandler; import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @Configuration @EnableIntegration public class CarServiceConfig2 { private RefNoHandler2 refNoHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean public MessageHandler refNoEndpoint2() { return new ServiceActivatingHandler(refNoMessageHandler2()); } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For Header Enricher --- */ @Bean public HeaderEnricher carMakeHeader() { Map<String, HeaderValueMessageProcessor<?>> headers = new HashMap<>(); Expression expression = new SpelExpressionParser().parseExpression("payload.getMake()"); headers.put("CAR_MAKE", new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); return new HeaderEnricher(headers); } @Bean public MessageHandler carMakeTransformer() { return new MessageTransformingHandler(carMakeHeader()); } /* --- END: For Header Enricher --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean public MessageHandler carServiceEndpoint2() { return new ServiceActivatingHandler(carServiceMessageHandler2()); } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler chainEndpoints() { MessageHandlerChain chain = new MessageHandlerChain(); chain.setHandlers(Arrays.asList(refNoEndpoint2(), carMakeTransformer(), carServiceEndpoint2())); chain.setOutputChannel(outChannel()); return chain; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
Some aspects of the Listing.85 from the above needs a little explanation.
The class org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor allows one to create a header value processor for the given expression and the expected type of the expression evaluation result
The class org.springframework.integration.transformer.HeaderEnricher implements a transformer that adds statically configured header values to the message header. This is equivalent to the header-enricher from the XML configuration from Listing.84 above
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, and Listing.85 to test the hypothetical car service:
/* * Name: CarServiceMainConfig2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p6; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig2 { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, CarServiceConfig2.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.86, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainConfig2"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:14:46:222 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:14:46:228 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:14:46:235 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:14:46:285 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:14:46:302 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:14:46:437 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:14:46:689 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:14:46:690 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s). 2021-05-22 14:14:46:691 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:14:46:691 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig2.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:14:46:692 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s). 2021-05-22 14:14:46:692 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p6.CarServiceConfig2'; from source: 'org.springframework.core.type.StandardMethodMetadata@4d9800b2' 2021-05-22 14:14:46:693 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig2.chainEndpoints.serviceActivator' 2021-05-22 14:14:46:694 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig2.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:14:46:694 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s). 2021-05-22 14:14:46:695 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig2.stdoutAdapter.serviceActivator' 2021-05-22 14:14:46:696 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:14:46:697 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:14:46:709 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:14:46:729 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=abf81eba-d821-6df1-29d2-0024f0f92e94, timestamp=1621707286709}] (in Config) 2021-05-22 14:14:46:732 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:14:46:733 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=4660e666-25c1-abb7-b134-be652b126bd3, CAR_MAKE=Toyota, timestamp=1621707286732}] (in Config) Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:14:46:734 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=ec7fefc1-0e62-2d3d-467a-16e3dc843dcf, timestamp=1621707286734}] (in Config) 2021-05-22 14:14:46:736 [com.polarsparc.si.p6.CarServiceMainConfig2.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=39f97081-c90a-2bf6-807f-29f9527a593c, CAR_MAKE=Honda, timestamp=1621707286735}] (in Config) Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.427 s [INFO] Finished at: 2021-05-22T14:14:46-04:00 [INFO] ------------------------------------------------------------------------
References
Spring Integration Notes :: Part - 5
Spring Integration Notes :: Part - 4
Spring Integration Notes :: Part - 3
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1