PolarSPARC |
Spring Integration Notes :: Part - 7
Bhaskar S | 05/22/2021 (UPDATED) |
Overview
In Part-6, we covered basic examples of Spring Integration relating to chaining endpoints and header enrichment.
We will continue our journey by exploring some basic examples relating to the following concepts in Spring Integration:
Message Transformation
Message Router
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p7
$ mkdir -p src/main/resources/p7
The following is the listing for the updated Maven project file pom.xml to add the dependency for the json support:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-amqp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.3.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.19</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.12.3</version> </dependency> </dependencies> </project>
Hands-on Spring Integration
Message Transformation
In this hypothetical car service example, we will transform the output message to JSON format.
XML based Approach |
The following is the XML based Spring Integration configuration that wires up the channels and the endpoints along with header enrichment and Car object transformation to JSON as a chain:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <int:channel id="inChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:chain input-channel="inChannel" output-channel="outChannel"> <int:service-activator ref="refNoHandler" method="assignRefNo" /> <int:header-enricher> <int:header name="CAR_MAKE" expression="payload.getMake()"/> </int:header-enricher> <int:service-activator ref="carServiceHandler" method="recommendedServices" /> <int:object-to-json-transformer /> </int:chain> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.87 from the above needs a little explanation.
The object-to-json-transformer element transforms the incoming message payload to a JSON format.
In addition, Spring Integration provides following ready to use transformers:
object-to-string-transformer :: transforms the incoming message payload to a string format
object-to-map-transformer :: transforms the incoming message payload to a java.util.Map format
To execute the code from Listing.78 with the argument from Listing.87, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p7/CarService3.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:20:19:884 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc. 2021-05-22 14:20:19:971 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:20:19:973 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:20:19:980 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:20:20:152 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:20:20:368 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p7/CarService3.xml]'; from source: ''int:chain'' 2021-05-22 14:20:20:369 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel 2021-05-22 14:20:20:369 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.inChannel' has 1 subscriber(s). 2021-05-22 14:20:20:370 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:20:20:371 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:20:20:371 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.outChannel' has 1 subscriber(s). 2021-05-22 14:20:20:372 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'; defined in: 'class path resource [p7/CarService3.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:20:20:373 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:20:20:374 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.errorChannel' has 1 subscriber(s). 2021-05-22 14:20:20:374 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:20:20:375 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:20:20:376 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:20:20:387 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:20:20:407 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=275750ea-8253-182e-785d-ac0c5935fcc9, timestamp=1621707620387}] (in Xml) 2021-05-22 14:20:20:410 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:20:20:412 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=50ea1561-35be-ed97-376d-2c4e3faa5b1d, CAR_MAKE=Toyota, timestamp=1621707620410}] (in Xml) {"ref":"T-00001","make":"Toyota","miles":30000,"services":{"AIR_FILTER":19.95,"OIL_CHANGE":29.95,"WHEEL_BALANCE":69.95}} 2021-05-22 14:20:20:446 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=bff5dbf9-8de3-19b0-a0d8-653c71d90a96, timestamp=1621707620446}] (in Xml) 2021-05-22 14:20:20:447 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=9f426f34-579d-d755-ab62-3523a6b673a6, CAR_MAKE=Honda, timestamp=1621707620447}] (in Xml) {"ref":"H-00002","make":"Honda","miles":5000,"services":{"AIR_FILTER":19.95,"OIL_CHANGE":29.95}} [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.463 s [INFO] Finished at: 2021-05-22T14:20:20-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that defines the channels and the endpoints along with header enrichment and Car object transformation to JSON as a chain:
/* * Name: CarServiceConfig3 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MessageHandlerChain; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.json.ObjectToJsonTransformer; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.integration.transformer.HeaderEnricher; import org.springframework.integration.transformer.MessageTransformingHandler; import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @Configuration @EnableIntegration public class CarServiceConfig3 { private RefNoHandler2 refNoHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean public MessageHandler refNoEndpoint2() { return new ServiceActivatingHandler(refNoMessageHandler2()); } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For Header Enricher --- */ @Bean public HeaderEnricher carMakeHeader() { Map<String, HeaderValueMessageProcessor<?>> headers = new HashMap<>(); Expression expression = new SpelExpressionParser().parseExpression("payload.getMake()"); headers.put("CAR_MAKE", new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); return new HeaderEnricher(headers); } @Bean public MessageHandler carMakeTransformer() { return new MessageTransformingHandler(carMakeHeader()); } /* --- END: For Header Enricher --- */ /* --- BEGIN: For Object to JSON --- */ @Bean public ObjectToJsonTransformer carJsonMapper() { return new ObjectToJsonTransformer(); } @Bean public MessageHandler carJsonTransformer() { return new MessageTransformingHandler(carJsonMapper()); } /* --- END: For Object to JSON --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean public MessageHandler carServiceEndpoint2() { return new ServiceActivatingHandler(carServiceMessageHandler2()); } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler chainEndpoints() { MessageHandlerChain chain = new MessageHandlerChain(); chain.setHandlers(Arrays.asList(refNoEndpoint2(), carMakeTransformer(), carServiceEndpoint2(), carJsonTransformer())); chain.setOutputChannel(outChannel()); return chain; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, and Listing.88 to test the hypothetical car service object transformation:
/* * Name: CarServiceMainConfig3 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import com.polarsparc.si.p6.Car; import com.polarsparc.si.p6.CarServiceGateway2; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig3 { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, CarServiceConfig3.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.89, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p7.CarServiceMainConfig3"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:23:03:584 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:23:03:590 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:23:03:598 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:23:03:645 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:23:03:676 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:23:03:805 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:23:04:060 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:23:04:061 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s). 2021-05-22 14:23:04:062 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:23:04:062 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig3.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:23:04:063 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s). 2021-05-22 14:23:04:064 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p7.CarServiceConfig3'; from source: 'org.springframework.core.type.StandardMethodMetadata@35bffb63' 2021-05-22 14:23:04:064 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig3.chainEndpoints.serviceActivator' 2021-05-22 14:23:04:065 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig3.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:23:04:066 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s). 2021-05-22 14:23:04:067 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig3.stdoutAdapter.serviceActivator' 2021-05-22 14:23:04:068 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:23:04:069 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:23:04:081 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:23:04:099 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=4cfbda3e-112b-ec1d-cde3-cfc1ef790fdf, timestamp=1621707784081}] (in Config) 2021-05-22 14:23:04:102 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:23:04:103 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=cc882337-3ddd-bc10-6b99-9f1b7d907051, CAR_MAKE=Toyota, timestamp=1621707784102}] (in Config) {"ref":"T-00001","make":"Toyota","miles":30000,"services":{"AIR_FILTER":19.95,"OIL_CHANGE":29.95,"WHEEL_BALANCE":69.95}} 2021-05-22 14:23:04:142 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=67442bb3-5394-0458-4651-1df8727372f0, timestamp=1621707784141}] (in Config) 2021-05-22 14:23:04:143 [com.polarsparc.si.p7.CarServiceMainConfig3.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=a5ee27e3-c123-056d-18a0-43f98330c1b4, CAR_MAKE=Honda, timestamp=1621707784143}] (in Config) {"ref":"H-00002","make":"Honda","miles":5000,"services":{"AIR_FILTER":19.95,"OIL_CHANGE":29.95}} [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.450 s [INFO] Finished at: 2021-05-22T14:23:04-04:00 [INFO] ------------------------------------------------------------------------
Message Router
In this example, we want to route the incoming Cars to the appropriate specialist based on the make of the Car.
For example, a Toyota Car will be served by a Toyota specialist and a Honda Car will be served by a Honda specialist.
Each Car specialist will include the appropriate labor charges.
XML based Approach |
The following POJO handles Cars whose make is Toyota:
/* * Name: ToyotaSpecialist * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class ToyotaSpecialist { private static final Logger LOGGER = LoggerFactory.getLogger(ToyotaSpecialist.class); private static String WHEEL_BALANCE = "WHEEL_BALANCE"; private static String LABOR_CHARGES = "LABOR_CHARGES"; public Message<?> completeService(Message<Car> input) { if (input.getPayload().getServices().containsKey(WHEEL_BALANCE)) { input.getPayload().getServices().put(LABOR_CHARGES, (float) 99.95); } else { input.getPayload().getServices().put(LABOR_CHARGES, (float) 39.95); } LOGGER.info("Input: {}", input.toString()); return input; } }
The following POJO handles Cars whose make is Honda:
/* * Name: HondaSpecialist * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class HondaSpecialist { private static final Logger LOGGER = LoggerFactory.getLogger(HondaSpecialist.class); private static String WHEEL_BALANCE = "WHEEL_BALANCE"; private static String LABOR_CHARGES = "LABOR_CHARGES"; public Message<?> completeService(Message<Car> input) { if (input.getPayload().getServices().containsKey(WHEEL_BALANCE)) { input.getPayload().getServices().put(LABOR_CHARGES, (float) 89.95); } else { input.getPayload().getServices().put(LABOR_CHARGES, (float) 29.95); } LOGGER.info("Input: {}", input.toString()); return input; } }
The following is the XML based Spring Integration configuration that wires up the channels, the endpoints along with header enrichment in a chain, the header based router, and Car object transformation to JSON:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="carServiceHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <bean id="toyotaSpecialist" class="com.polarsparc.si.p7.ToyotaSpecialist" /> <bean id="hondaSpecialist" class="com.polarsparc.si.p7.HondaSpecialist" /> <int:channel id="inChannel" /> <int:channel id="routerChannel" /> <int:channel id="toyotaChannel" /> <int:channel id="hondaChannel" /> <int:channel id="xformChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:chain input-channel="inChannel" output-channel="routerChannel"> <int:service-activator ref="refNoHandler" method="assignRefNo" /> <int:header-enricher> <int:header name="CAR_MAKE" expression="payload.getMake()"/> </int:header-enricher> <int:service-activator ref="carServiceHandler" method="recommendedServices" /> </int:chain> <int:header-value-router input-channel="routerChannel" header-name="CAR_MAKE"> <int:mapping value="Toyota" channel="toyotaChannel" /> <int:mapping value="Honda" channel="hondaChannel" /> </int:header-value-router> <int:service-activator input-channel="toyotaChannel" output-channel="xformChannel" ref="toyotaSpecialist" method="completeService" /> <int:service-activator input-channel="hondaChannel" output-channel="xformChannel" ref="hondaSpecialist" method="completeService" /> <int:object-to-json-transformer input-channel="xformChannel" output-channel="outChannel" /> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.92 from the above needs a little explanation.
The header-value-router element allows one to route the incoming message based on a header property to the appropriate channel.
In our example, we use the value of the header property CAR_MAKE to route the message.
If the value of CAR_MAKE is Toyota, the message is routed to the channel toyotaChannel.
If the value of CAR_MAKE is Honda, the message is routed to the channel hondaChannel.
To execute the code from Listing.78 with the argument from Listing.92, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p7/CarService4.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:25:34:141 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.xml.ChainParser - It is useful to provide an explicit 'id' attribute on 'chain' elements to simplify the identification of child elements in logs etc. 2021-05-22 14:25:34:234 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:25:34:238 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:25:34:248 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:25:34:452 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:25:34:697 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'org.springframework.integration.handler.MessageHandlerChain#0'; defined in: 'class path resource [p7/CarService4.xml]'; from source: ''int:chain'' 2021-05-22 14:25:34:698 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {chain} as a subscriber to the 'inChannel' channel 2021-05-22 14:25:34:699 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.inChannel' has 1 subscriber(s). 2021-05-22 14:25:34:699 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:25:34:700 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {router} as a subscriber to the 'routerChannel' channel 2021-05-22 14:25:34:700 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.routerChannel' has 1 subscriber(s). 2021-05-22 14:25:34:701 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 2021-05-22 14:25:34:702 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'toyotaChannel' channel 2021-05-22 14:25:34:702 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.toyotaChannel' has 1 subscriber(s). 2021-05-22 14:25:34:703 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' 2021-05-22 14:25:34:703 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'hondaChannel' channel 2021-05-22 14:25:34:704 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.hondaChannel' has 1 subscriber(s). 2021-05-22 14:25:34:704 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' 2021-05-22 14:25:34:705 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {object-to-json-transformer} as a subscriber to the 'xformChannel' channel 2021-05-22 14:25:34:706 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.xformChannel' has 1 subscriber(s). 2021-05-22 14:25:34:707 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' 2021-05-22 14:25:34:707 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:25:34:708 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.outChannel' has 1 subscriber(s). 2021-05-22 14:25:34:708 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'; defined in: 'class path resource [p7/CarService4.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:25:34:709 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:25:34:710 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@346f8b44.errorChannel' has 1 subscriber(s). 2021-05-22 14:25:34:711 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:25:34:712 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:25:34:712 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:25:34:725 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:25:34:736 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=dd70f6b0-4283-c270-7e12-840b2fccb8f3, timestamp=1621707934724}] (in Xml) 2021-05-22 14:25:34:739 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:25:34:740 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=fcbfc102-5e46-611e-5dd4-72aa3cea0093, CAR_MAKE=Toyota, timestamp=1621707934739}] (in Xml) 2021-05-22 14:25:34:741 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:25:34:742 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p7.ToyotaSpecialist - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, LABOR_CHARGES=99.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=fcbfc102-5e46-611e-5dd4-72aa3cea0093, CAR_MAKE=Toyota, timestamp=1621707934739}] (in Xml) {"ref":"T-00001","make":"Toyota","miles":30000,"services":{"AIR_FILTER":19.95,"LABOR_CHARGES":99.95,"OIL_CHANGE":29.95,"WHEEL_BALANCE":69.95}} 2021-05-22 14:25:34:788 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=55d43dcd-744c-81a7-b33d-a0b7148d83eb, timestamp=1621707934787}] (in Xml) 2021-05-22 14:25:34:789 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=b71af17f-12bb-8d99-ce72-38e21063c233, CAR_MAKE=Honda, timestamp=1621707934789}] (in Xml) 2021-05-22 14:25:34:791 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:25:34:792 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p7.HondaSpecialist - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, LABOR_CHARGES=29.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=b71af17f-12bb-8d99-ce72-38e21063c233, CAR_MAKE=Honda, timestamp=1621707934789}] (in Xml) {"ref":"H-00002","make":"Honda","miles":5000,"services":{"AIR_FILTER":19.95,"LABOR_CHARGES":29.95,"OIL_CHANGE":29.95}} [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.551 s [INFO] Finished at: 2021-05-22T14:25:34-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following Java Config based POJO handles a Cars whose make is Toyota:
/* * Name: ToyotaSpecialist2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class ToyotaSpecialist2 { private static final Logger LOGGER = LoggerFactory.getLogger(ToyotaSpecialist2.class); private static String WHEEL_BALANCE = "WHEEL_BALANCE"; private static String LABOR_CHARGES = "LABOR_CHARGES"; @ServiceActivator(inputChannel = "toyotaChannel", outputChannel = "xformChannel") public Message<?> completeService(Message<Car> input) { if (input.getPayload().getServices().containsKey(WHEEL_BALANCE)) { input.getPayload().getServices().put(LABOR_CHARGES, (float) 99.95); } else { input.getPayload().getServices().put(LABOR_CHARGES, (float) 39.95); } LOGGER.info("Input: {}", input.toString()); return input; } }
The following Java Config based POJO handles a Cars whose make is Honda:
/* * Name: HondaSpecialist2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class HondaSpecialist2 { private static final Logger LOGGER = LoggerFactory.getLogger(HondaSpecialist2.class); private static String WHEEL_BALANCE = "WHEEL_BALANCE"; private static String LABOR_CHARGES = "LABOR_CHARGES"; @ServiceActivator(inputChannel = "hondaChannel", outputChannel = "xformChannel") public Message<?> completeService(Message<Car> input) { if (input.getPayload().getServices().containsKey(WHEEL_BALANCE)) { input.getPayload().getServices().put(LABOR_CHARGES, (float) 89.95); } else { input.getPayload().getServices().put(LABOR_CHARGES, (float) 29.95); } LOGGER.info("Input: {}", input.toString()); return input; } }
The following is the Java Config based POJO that defines the endpoints along with header enrichment in a chain, the header based router, and Car object transformation to JSON:
/* * Name: CarServiceConfig4 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MessageHandlerChain; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.json.ObjectToJsonTransformer; import org.springframework.integration.router.HeaderValueRouter; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.integration.transformer.HeaderEnricher; import org.springframework.integration.transformer.MessageTransformingHandler; import org.springframework.integration.transformer.support.ExpressionEvaluatingHeaderValueMessageProcessor; import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @Configuration @EnableIntegration public class CarServiceConfig4 { private RefNoHandler2 refNoHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel routerChannel() { return new DirectChannel(); } @Bean public MessageChannel toyotaChannel() { return new DirectChannel(); } @Bean public MessageChannel hondaChannel() { return new DirectChannel(); } @Bean public MessageChannel xformChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean public MessageHandler refNoEndpoint2() { return new ServiceActivatingHandler(refNoMessageHandler2()); } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For Header Enricher --- */ @Bean public HeaderEnricher carMakeHeader() { Map<String, HeaderValueMessageProcessor<?>> headers = new HashMap<>(); Expression expression = new SpelExpressionParser().parseExpression("payload.getMake()"); headers.put("CAR_MAKE", new ExpressionEvaluatingHeaderValueMessageProcessor<>(expression, String.class)); return new HeaderEnricher(headers); } @Bean public MessageHandler carMakeTransformer() { return new MessageTransformingHandler(carMakeHeader()); } /* --- END: For Header Enricher --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean public MessageHandler carServiceEndpoint2() { return new ServiceActivatingHandler(carServiceMessageHandler2()); } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler chainEndpoints() { MessageHandlerChain chain = new MessageHandlerChain(); chain.setHandlers(Arrays.asList(refNoEndpoint2(), carMakeTransformer(), carServiceEndpoint2())); chain.setOutputChannel(routerChannel()); return chain; } @Bean @ServiceActivator(inputChannel = "routerChannel", outputChannel = "xformChannel") public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("CAR_MAKE"); router.setChannelMapping("Toyota", "toyotaChannel"); router.setChannelMapping("Honda", "hondaChannel"); return router; } /* --- BEGIN: For Object to JSON --- */ @Bean public ObjectToJsonTransformer carJsonMapper() { return new ObjectToJsonTransformer(); } @Bean @Transformer(inputChannel = "xformChannel") public MessageHandler carJsonTransformer() { MessageTransformingHandler transformer = new MessageTransformingHandler(carJsonMapper()); transformer.setOutputChannel(outChannel()); return transformer; } /* --- END: For Object to JSON --- */ @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, Listing.93, Listing.94, and Listing.95 to test the hypothetical car service header based routing:
/* * Name: CarServiceMainConfig4 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p7; import com.polarsparc.si.p6.Car; import com.polarsparc.si.p6.CarServiceGateway2; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig4 { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, HondaSpecialist2.class, ToyotaSpecialist2.class, CarServiceConfig4.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.96, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p7.CarServiceMainConfig4"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:30:08:928 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:30:08:933 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:30:08:941 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:30:08:998 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:30:09:016 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:30:09:157 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:30:09:417 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:30:09:418 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s). 2021-05-22 14:30:09:419 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:30:09:420 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:hondaSpecialist2.completeService.serviceActivator} as a subscriber to the 'hondaChannel' channel 2021-05-22 14:30:09:421 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.hondaChannel' has 1 subscriber(s). 2021-05-22 14:30:09:422 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'hondaSpecialist2.completeService.serviceActivator' 2021-05-22 14:30:09:422 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:toyotaSpecialist2.completeService.serviceActivator} as a subscriber to the 'toyotaChannel' channel 2021-05-22 14:30:09:423 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.toyotaChannel' has 1 subscriber(s). 2021-05-22 14:30:09:424 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'toyotaSpecialist2.completeService.serviceActivator' 2021-05-22 14:30:09:425 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig4.chainEndpoints.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:30:09:426 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s). 2021-05-22 14:30:09:426 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.handler.MessageHandlerChain - started bean 'chainEndpoints'; defined in: 'com.polarsparc.si.p7.CarServiceConfig4'; from source: 'org.springframework.core.type.StandardMethodMetadata@64794e81' 2021-05-22 14:30:09:427 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig4.chainEndpoints.serviceActivator' 2021-05-22 14:30:09:428 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig4.router.serviceActivator} as a subscriber to the 'routerChannel' channel 2021-05-22 14:30:09:429 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.routerChannel' has 1 subscriber(s). 2021-05-22 14:30:09:430 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig4.router.serviceActivator' 2021-05-22 14:30:09:430 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {object-to-json-transformer:carServiceConfig4.carJsonTransformer.transformer} as a subscriber to the 'xformChannel' channel 2021-05-22 14:30:09:431 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.xformChannel' has 1 subscriber(s). 2021-05-22 14:30:09:432 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig4.carJsonTransformer.transformer' 2021-05-22 14:30:09:433 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig4.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:30:09:434 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s). 2021-05-22 14:30:09:434 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig4.stdoutAdapter.serviceActivator' 2021-05-22 14:30:09:436 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:30:09:436 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:30:09:449 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:30:09:468 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=dc36b439-efc6-d039-b20f-35297e3fecab, timestamp=1621708209449}] (in Config) 2021-05-22 14:30:09:472 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:30:09:473 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=69d7f754-b19c-99c7-1753-f5e729e94aed, CAR_MAKE=Toyota, timestamp=1621708209472}] (in Config) 2021-05-22 14:30:09:474 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:30:09:475 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p7.ToyotaSpecialist2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={AIR_FILTER=19.95, LABOR_CHARGES=99.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, id=69d7f754-b19c-99c7-1753-f5e729e94aed, CAR_MAKE=Toyota, timestamp=1621708209472}] (in Config) {"ref":"T-00001","make":"Toyota","miles":30000,"services":{"AIR_FILTER":19.95,"LABOR_CHARGES":99.95,"OIL_CHANGE":29.95,"WHEEL_BALANCE":69.95}} 2021-05-22 14:30:09:513 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=11a7bc95-28e7-b527-83ab-0f836714466a, timestamp=1621708209513}] (in Config) 2021-05-22 14:30:09:515 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=1c3aed39-a5b6-fc76-b41b-cc41a1a36d2e, CAR_MAKE=Honda, timestamp=1621708209514}] (in Config) 2021-05-22 14:30:09:516 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:30:09:516 [com.polarsparc.si.p7.CarServiceMainConfig4.main()] INFO com.polarsparc.si.p7.HondaSpecialist2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={AIR_FILTER=19.95, LABOR_CHARGES=29.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, id=1c3aed39-a5b6-fc76-b41b-cc41a1a36d2e, CAR_MAKE=Honda, timestamp=1621708209514}] (in Config) {"ref":"H-00002","make":"Honda","miles":5000,"services":{"AIR_FILTER":19.95,"LABOR_CHARGES":29.95,"OIL_CHANGE":29.95}} [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.531 s [INFO] Finished at: 2021-05-22T14:30:09-04:00 [INFO] ------------------------------------------------------------------------
References
Spring Integration Notes :: Part - 6
Spring Integration Notes :: Part - 5
Spring Integration Notes :: Part - 4
Spring Integration Notes :: Part - 3
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1