PolarSPARC |
Spring Integration Notes :: Part - 8
Bhaskar S | 05/22/2021 (UPDATED) |
Overview
In Part-7, we covered basic examples of Spring Integration relating to message transformation and message routing.
We will wrap-up our journey by exploring some basic examples relating to the following concepts in Spring Integration:
Message Splitter
Message Aggregator
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p8
$ mkdir -p src/main/resources/p8
Hands-on Spring Integration
Message Splitter
In this hypothetical car service example, we will split the incoming message into two separate messages dispatched to two separate channels - one for selecting the recommednded services and the other for additional services based on inspection.
XML based Approach |
The following handler is a Java POJO that performs hypothetical inspection to indicate addition service on the arriving Cars:
/* * Name: CarInspectionHandler * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import java.util.Random; public class CarInspectionHandler { private static final Logger LOGGER = LoggerFactory.getLogger(CarInspectionHandler.class); private static String WIPERS = "WIPERS"; private static String HEADLIGHT_LAMP = "HEADLIGHT_LAMP"; private static String BRAKE_PADS = "BRAKE_PADS"; private static Random rand = new Random(); public Message<?> inspectionServices(Message<Car> input) { switch (rand.nextInt(3)) { case 0: input.getPayload().getServices().put(WIPERS, (float) 21.95); break; case 1: input.getPayload().getServices().put(HEADLIGHT_LAMP, (float) 37.95); break; case 2: input.getPayload().getServices().put(BRAKE_PADS, (float) 44.95); break; } LOGGER.info("Input: {} (in Xml)", input.toString()); return input; } }
The following is a Java POJO that splits the incoming message and adds a header with the name SERVICE_TYPE with two different values for appropriate routing:
/* * Name: CarServiceSplitter * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import java.util.ArrayList; import java.util.List; public class CarServiceSplitter { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceSplitter.class); public List<Message<Car>> splitter(Message<Car> input) { List<Message<Car>> tasks = new ArrayList<>(); Car p1 = Car.makeClone(input.getPayload()); Message<Car> inspection = MessageBuilder.withPayload(p1) .setHeader("SERVICE_TYPE", "INSP") .build(); tasks.add(inspection); LOGGER.info("Inspection message: {} (in Xml)", input.toString()); Car p2 = Car.makeClone(input.getPayload()); Message<Car> recommended = MessageBuilder.withPayload(p2) .setHeader("SERVICE_TYPE", "RECO") .build(); tasks.add(recommended); LOGGER.info("Recommended message: {} (in Xml)", input.toString()); return tasks; } }
The following is the XML based Spring Integration configuration that wires up the channels, the splitter, the router, and the endpoints:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="splitHandler" class="com.polarsparc.si.p8.CarServiceSplitter" /> <bean id="carInspectHandler" class="com.polarsparc.si.p8.CarInspectionHandler" /> <bean id="carRecommendHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <int:channel id="inChannel" /> <int:channel id="splitChannel" /> <int:channel id="routerChannel" /> <int:channel id="inspChannel" /> <int:channel id="recoChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:service-activator input-channel="inChannel" output-channel="splitChannel" ref="refNoHandler" method="assignRefNo" /> <int:splitter input-channel="splitChannel" ref="splitHandler" method="splitter" output-channel="routerChannel" /> <int:header-value-router input-channel="routerChannel" header-name="SERVICE_TYPE"> <int:mapping value="INSP" channel="inspChannel" /> <int:mapping value="RECO" channel="recoChannel" /> </int:header-value-router> <int:service-activator input-channel="inspChannel" output-channel="outChannel" ref="carInspectHandler" method="inspectionServices" /> <int:service-activator input-channel="recoChannel" output-channel="outChannel" ref="carRecommendHandler" method="recommendedServices" /> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.98 from the above needs a little explanation.
The splitter element uses the method indicated by the attribute method on the bean class referenced by the attribute ref to split the message arriving on the input channel and sends the resulting messages to the specified output channel.
When the splitter breaks the main message into a number of child messages, each of the children message receives 3 additional headers - a unique correlationId that is same for all the children messages, a sequenceSize that is same for all the children messages and indicates how many children messages the main parent message was sliced into, and a sequenceNumber that starts at 1 for the first child message and is incremented by one for each of the remaining child messages
To execute the code from Listing.78 with the argument from Listing.99, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p8/CarService5.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:35:15:977 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:35:15:980 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:35:15:988 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:35:16:163 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:35:16:378 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-05-22 14:35:16:379 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inChannel' has 1 subscriber(s). 2021-05-22 14:35:16:380 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:35:16:380 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {splitter} as a subscriber to the 'splitChannel' channel 2021-05-22 14:35:16:381 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.splitChannel' has 1 subscriber(s). 2021-05-22 14:35:16:382 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 2021-05-22 14:35:16:382 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {router} as a subscriber to the 'routerChannel' channel 2021-05-22 14:35:16:383 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.routerChannel' has 1 subscriber(s). 2021-05-22 14:35:16:383 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' 2021-05-22 14:35:16:384 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inspChannel' channel 2021-05-22 14:35:16:385 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inspChannel' has 1 subscriber(s). 2021-05-22 14:35:16:385 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' 2021-05-22 14:35:16:386 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'recoChannel' channel 2021-05-22 14:35:16:386 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.recoChannel' has 1 subscriber(s). 2021-05-22 14:35:16:387 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' 2021-05-22 14:35:16:388 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:35:16:388 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.outChannel' has 1 subscriber(s). 2021-05-22 14:35:16:389 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'; defined in: 'class path resource [p8/CarService5.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:35:16:390 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:35:16:390 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.errorChannel' has 1 subscriber(s). 2021-05-22 14:35:16:391 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:35:16:392 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:35:16:393 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:35:16:405 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:35:16:420 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=66c65b57-ac3c-2c68-5b9e-3aa0d8a36499, timestamp=1621708516405}] (in Xml) 2021-05-22 14:35:16:422 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:35:16:423 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Inspection message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=66c65b57-ac3c-2c68-5b9e-3aa0d8a36499, timestamp=1621708516405}] (in Xml) 2021-05-22 14:35:16:424 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Recommended message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=66c65b57-ac3c-2c68-5b9e-3aa0d8a36499, timestamp=1621708516405}] (in Xml) 2021-05-22 14:35:16:427 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:35:16:428 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarInspectionHandler - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={BRAKE_PADS=44.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=66c65b57-ac3c-2c68-5b9e-3aa0d8a36499, id=129869b6-47eb-3c97-af43-e2f77d2290d7, SERVICE_TYPE=INSP, timestamp=1621708516427}] (in Xml) Car(ref=null, make=Toyota, miles=30000, services={BRAKE_PADS=44.95}) 2021-05-22 14:35:16:430 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:35:16:430 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=66c65b57-ac3c-2c68-5b9e-3aa0d8a36499, id=5b215d09-b907-3143-cb7d-a37a1b924615, SERVICE_TYPE=RECO, timestamp=1621708516430}] (in Xml) Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:35:16:432 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=dd1397e7-2bc8-9131-88d4-af35fa9d5bdc, timestamp=1621708516432}] (in Xml) 2021-05-22 14:35:16:433 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Inspection message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=dd1397e7-2bc8-9131-88d4-af35fa9d5bdc, timestamp=1621708516432}] (in Xml) 2021-05-22 14:35:16:434 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Recommended message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=dd1397e7-2bc8-9131-88d4-af35fa9d5bdc, timestamp=1621708516432}] (in Xml) 2021-05-22 14:35:16:435 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarInspectionHandler - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={WIPERS=21.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=dd1397e7-2bc8-9131-88d4-af35fa9d5bdc, id=04b4d718-d39a-d1ed-c6c2-2cadc49333bd, SERVICE_TYPE=INSP, timestamp=1621708516434}] (in Xml) Car(ref=null, make=Honda, miles=5000, services={WIPERS=21.95}) 2021-05-22 14:35:16:436 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=dd1397e7-2bc8-9131-88d4-af35fa9d5bdc, id=92dd7df2-de23-d1bb-22ff-0b090636f3a9, SERVICE_TYPE=RECO, timestamp=1621708516436}] (in Xml) Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.413 s [INFO] Finished at: 2021-05-22T14:35:16-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that that performs hypothetical inspection to indicate addition service on the arriving Cars:
/* * Name: CarInspectionHandler2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import java.util.Random; @Configuration @EnableIntegration public class CarInspectionHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(CarInspectionHandler2.class); private static String WIPERS = "WIPERS"; private static String HEADLIGHT_LAMP = "HEADLIGHT_LAMP"; private static String BRAKE_PADS = "BRAKE_PADS"; private static Random rand = new Random(); @ServiceActivator public Message<?> inspectionServices(Message<Car> input) { switch (rand.nextInt(3)) { case 0: input.getPayload().getServices().put(WIPERS, (float) 21.95); break; case 1: input.getPayload().getServices().put(HEADLIGHT_LAMP, (float) 37.95); break; case 2: input.getPayload().getServices().put(BRAKE_PADS, (float) 44.95); break; } LOGGER.info("Input: {} (in Config)", input.toString()); return input; } }
The following is the Java Config based POJO that splits the incoming message and adds a header with the name SERVICE_TYPE with two different values for appropriate routing:
/* * Name: CarServiceSplitter2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Splitter; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; import java.util.ArrayList; import java.util.List; @Configuration @EnableIntegration public class CarServiceSplitter2 { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceSplitter2.class); @Splitter(inputChannel = "splitChannel", outputChannel = "routerChannel") public List<Message<Car>> splitter(Message<Car> input) { List<Message<Car>> tasks = new ArrayList<>(); Car p1 = Car.makeClone(input.getPayload()); Message<Car> inspection = MessageBuilder.withPayload(p1) .setHeader("SERVICE_TYPE", "INSP") .build(); tasks.add(inspection); LOGGER.info("Inspection message: {} (in Config)", input.toString()); Car p2 = Car.makeClone(input.getPayload()); Message<Car> recommended = MessageBuilder.withPayload(p2) .setHeader("SERVICE_TYPE", "RECO") .build(); tasks.add(recommended); LOGGER.info("Recommended message: {} (in Config)", input.toString()); return tasks; } }
The following is the Java Config based POJO that defines the channels, the splitter, the router, and the endpoints:
/* * Name: CarServiceConfig5 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.router.HeaderValueRouter; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration @EnableIntegration public class CarServiceConfig5 { private RefNoHandler2 refNoHandler2; private CarInspectionHandler2 carInspectionHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarInspectionHandler2(CarInspectionHandler2 carInspectionHandler2) { this.carInspectionHandler2 = carInspectionHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel splitChannel() { return new DirectChannel(); } @Bean public MessageChannel routerChannel() { return new DirectChannel(); } @Bean public MessageChannel inspChannel() { return new DirectChannel(); } @Bean public MessageChannel recoChannel() { return new DirectChannel(); } @Bean public MessageChannel outChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler refNoEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(refNoMessageHandler2()); handler.setOutputChannel(splitChannel()); return handler; } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For CarInspectionHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carInspectionMessageHandler2() { return new MethodInvokingMessageProcessor<>(carInspectionHandler2, "inspectionServices"); } @Bean @ServiceActivator(inputChannel = "inspChannel") public MessageHandler carInspectionEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(carInspectionMessageHandler2()); handler.setOutputChannel(outChannel()); return handler; } /* --- END: For CarInspectionHandler2 --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean @ServiceActivator(inputChannel = "recoChannel") public MessageHandler carServiceEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(carServiceMessageHandler2()); handler.setOutputChannel(outChannel()); return handler; } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "routerChannel") public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("SERVICE_TYPE"); router.setChannelMapping("INSP", "inspChannel"); router.setChannelMapping("RECO", "recoChannel"); return router; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, Listing.100, Listing.101, and Listing.102 to test the hypothetical car service with the splitter:
/* * Name: CarServiceMainConfig5 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import com.polarsparc.si.p6.CarServiceGateway2; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig5 { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceSplitter2.class, CarInspectionHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, CarServiceConfig5.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.103, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p8.CarServiceMainConfig5"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:37:15:014 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:37:15:018 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:37:15:026 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:37:15:077 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:37:15:094 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:37:15:231 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:37:15:504 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:37:15:505 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.errorChannel' has 1 subscriber(s). 2021-05-22 14:37:15:506 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:37:15:506 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {splitter:carServiceSplitter2.splitter.splitter} as a subscriber to the 'splitChannel' channel 2021-05-22 14:37:15:507 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.splitChannel' has 1 subscriber(s). 2021-05-22 14:37:15:508 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceSplitter2.splitter.splitter' 2021-05-22 14:37:15:508 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig5.refNoEndpoint2.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:37:15:509 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inChannel' has 1 subscriber(s). 2021-05-22 14:37:15:510 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig5.refNoEndpoint2.serviceActivator' 2021-05-22 14:37:15:511 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig5.carInspectionEndpoint2.serviceActivator} as a subscriber to the 'inspChannel' channel 2021-05-22 14:37:15:511 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.inspChannel' has 1 subscriber(s). 2021-05-22 14:37:15:512 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig5.carInspectionEndpoint2.serviceActivator' 2021-05-22 14:37:15:513 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig5.carServiceEndpoint2.serviceActivator} as a subscriber to the 'recoChannel' channel 2021-05-22 14:37:15:514 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.recoChannel' has 1 subscriber(s). 2021-05-22 14:37:15:515 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig5.carServiceEndpoint2.serviceActivator' 2021-05-22 14:37:15:515 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig5.router.serviceActivator} as a subscriber to the 'routerChannel' channel 2021-05-22 14:37:15:516 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.routerChannel' has 1 subscriber(s). 2021-05-22 14:37:15:517 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig5.router.serviceActivator' 2021-05-22 14:37:15:517 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig5.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:37:15:518 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23c0f7ba.outChannel' has 1 subscriber(s). 2021-05-22 14:37:15:519 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig5.stdoutAdapter.serviceActivator' 2021-05-22 14:37:15:520 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:37:15:521 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:37:15:533 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:37:15:552 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=39a27449-d601-6a50-81eb-28df427daea5, timestamp=1621708635532}] (in Config) 2021-05-22 14:37:15:554 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:37:15:556 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Inspection message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=39a27449-d601-6a50-81eb-28df427daea5, timestamp=1621708635532}] (in Config) 2021-05-22 14:37:15:557 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Recommended message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=39a27449-d601-6a50-81eb-28df427daea5, timestamp=1621708635532}] (in Config) 2021-05-22 14:37:15:561 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:37:15:562 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarInspectionHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={BRAKE_PADS=44.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=39a27449-d601-6a50-81eb-28df427daea5, id=7c74ec9d-40e6-ff34-c570-6669549df340, SERVICE_TYPE=INSP, timestamp=1621708635561}] (in Config) Car(ref=null, make=Toyota, miles=30000, services={BRAKE_PADS=44.95}) 2021-05-22 14:37:15:564 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:37:15:566 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=39a27449-d601-6a50-81eb-28df427daea5, id=92bb2797-4ce6-0f2e-7ff2-1d4b39606714, SERVICE_TYPE=RECO, timestamp=1621708635564}] (in Config) Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:37:15:569 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=10824c5c-92ee-3fbf-5d47-ed6160e7cbc5, timestamp=1621708635568}] (in Config) 2021-05-22 14:37:15:570 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Inspection message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=10824c5c-92ee-3fbf-5d47-ed6160e7cbc5, timestamp=1621708635568}] (in Config) 2021-05-22 14:37:15:571 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Recommended message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=10824c5c-92ee-3fbf-5d47-ed6160e7cbc5, timestamp=1621708635568}] (in Config) 2021-05-22 14:37:15:572 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p8.CarInspectionHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={BRAKE_PADS=44.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=10824c5c-92ee-3fbf-5d47-ed6160e7cbc5, id=02466d83-c490-bfb9-f0c7-75f62c0e7072, SERVICE_TYPE=INSP, timestamp=1621708635572}] (in Config) Car(ref=null, make=Honda, miles=5000, services={BRAKE_PADS=44.95}) 2021-05-22 14:37:15:574 [com.polarsparc.si.p8.CarServiceMainConfig5.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=10824c5c-92ee-3fbf-5d47-ed6160e7cbc5, id=bad6cd20-6bc7-8a6e-9e2c-dee961c9a5d0, SERVICE_TYPE=RECO, timestamp=1621708635574}] (in Config) Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.498 s [INFO] Finished at: 2021-05-22T14:37:15-04:00 [INFO] ------------------------------------------------------------------------
Message Aggregator
In this hypothetical car service example, we will aggregate the multiple incoming messages into one master message. This is the opposite of the splitter operation.
XML based Approach |
The following is a Java POJO that aggregates the incoming messages into a single message by merging all the recommended services from the incoming messages:
/* * Name: CarServiceAggregator * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import java.util.List; public class CarServiceAggregator { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceAggregator.class); public Message<Car> aggregator(List<Message<Car>> tasks) { if (tasks != null && tasks.size() > 0) { Message<Car> service = tasks.get(0); LOGGER.info("Task # 0: {} (in Xml)", service.toString()); for (int i = 1; i < tasks.size(); i++) { Message<Car> next = tasks.get(i); LOGGER.info("Task # {}: {} (in Xml)", i, next.toString()); service.getPayload() .getServices() .putAll(next.getPayload().getServices()); } LOGGER.info("Final service: {} (in Xml)", service.toString()); return service; } return null; } }
The following is the XML based Spring Integration configuration that wires up the channels, the splitter, the router, the aggregator, and the endpoints:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <bean id="refNoHandler" class="com.polarsparc.si.p6.RefNoHandler" /> <bean id="splitHandler" class="com.polarsparc.si.p8.CarServiceSplitter" /> <bean id="carInspectHandler" class="com.polarsparc.si.p8.CarInspectionHandler" /> <bean id="carRecommendHandler" class="com.polarsparc.si.p6.CarServiceHandler" /> <bean id="mergeHandler" class="com.polarsparc.si.p8.CarServiceAggregator" /> <int:channel id="inChannel" /> <int:channel id="splitChannel" /> <int:channel id="routerChannel" /> <int:channel id="inspChannel" /> <int:channel id="recoChannel" /> <int:channel id="aggChannel" /> <int:channel id="outChannel" /> <int:gateway id="gateway" service-interface="com.polarsparc.si.p6.CarServiceGateway" default-request-channel="inChannel" /> <int:service-activator input-channel="inChannel" output-channel="splitChannel" ref="refNoHandler" method="assignRefNo" /> <int:splitter input-channel="splitChannel" ref="splitHandler" method="splitter" output-channel="routerChannel" /> <int:header-value-router input-channel="routerChannel" header-name="SERVICE_TYPE"> <int:mapping value="INSP" channel="inspChannel" /> <int:mapping value="RECO" channel="recoChannel" /> </int:header-value-router> <int:service-activator input-channel="inspChannel" output-channel="aggChannel" ref="carInspectHandler" method="inspectionServices" /> <int:service-activator input-channel="recoChannel" output-channel="aggChannel" ref="carRecommendHandler" method="recommendedServices" /> <int:aggregator input-channel="aggChannel" ref="mergeHandler" method="aggregator" output-channel="outChannel" /> <stream:stdout-channel-adapter channel="outChannel" append-newline="true" /> </beans>
Some aspects of the Listing.105 from the above needs a little explanation.
The aggregator element invokes the method indicated by the attribute method on the bean class referenced by the attribute ref to aggregate the children messages arriving on the input channel into a single master message and sends it to the specified output channel.
The aggregator uses the 3 headers - the unique correlationId, the sequenceSize, and the sequenceNumber to merge the children messages in a single master message
To execute the code from Listing.78 with the argument from Listing.105, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p6.CarServiceMainXml" -Dexec.args="p8/CarService6.xml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-22 14:39:35:415 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:39:35:419 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:39:35:426 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:39:35:602 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:39:35:780 [com.polarsparc.si.p6.CarServiceMainXml.main()] WARN org.springframework.integration.config.ReleaseStrategyFactoryBean - No ReleaseStrategy annotated method found on CarServiceAggregator; falling back to SimpleSequenceSizeReleaseStrategy, target: com.polarsparc.si.p8.CarServiceAggregator@62681db5, methodName: null 2021-05-22 14:39:35:837 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-05-22 14:39:35:838 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inChannel' has 1 subscriber(s). 2021-05-22 14:39:35:838 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-22 14:39:35:839 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {splitter} as a subscriber to the 'splitChannel' channel 2021-05-22 14:39:35:840 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.splitChannel' has 1 subscriber(s). 2021-05-22 14:39:35:841 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' 2021-05-22 14:39:35:842 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {router} as a subscriber to the 'routerChannel' channel 2021-05-22 14:39:35:842 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.routerChannel' has 1 subscriber(s). 2021-05-22 14:39:35:843 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' 2021-05-22 14:39:35:844 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inspChannel' channel 2021-05-22 14:39:35:844 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.inspChannel' has 1 subscriber(s). 2021-05-22 14:39:35:845 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' 2021-05-22 14:39:35:846 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'recoChannel' channel 2021-05-22 14:39:35:846 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.recoChannel' has 1 subscriber(s). 2021-05-22 14:39:35:847 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' 2021-05-22 14:39:35:847 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {aggregator} as a subscriber to the 'aggChannel' channel 2021-05-22 14:39:35:848 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.aggChannel' has 1 subscriber(s). 2021-05-22 14:39:35:849 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' 2021-05-22 14:39:35:849 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {stream:outbound-channel-adapter(character)} as a subscriber to the 'outChannel' channel 2021-05-22 14:39:35:850 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.outChannel' has 1 subscriber(s). 2021-05-22 14:39:35:850 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#6'; defined in: 'class path resource [p8/CarService6.xml]'; from source: ''stream:stdout-channel-adapter'' 2021-05-22 14:39:35:851 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:39:35:852 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@7504efe4.errorChannel' has 1 subscriber(s). 2021-05-22 14:39:35:853 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:39:35:854 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:39:35:854 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:39:35:867 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:39:35:879 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, timestamp=1621708775866}] (in Xml) 2021-05-22 14:39:35:881 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:39:35:882 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Inspection message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, timestamp=1621708775866}] (in Xml) 2021-05-22 14:39:35:883 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Recommended message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, timestamp=1621708775866}] (in Xml) 2021-05-22 14:39:35:887 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:39:35:888 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarInspectionHandler - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, id=f026e3a7-2490-8408-66c6-aff7f2386433, SERVICE_TYPE=INSP, timestamp=1621708775887}] (in Xml) 2021-05-22 14:39:35:899 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:39:35:900 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, id=cfe22e83-706c-d77f-5716-33660c61ab8e, SERVICE_TYPE=RECO, timestamp=1621708775899}] (in Xml) 2021-05-22 14:39:35:901 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:39:35:904 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Task # 0: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, id=f026e3a7-2490-8408-66c6-aff7f2386433, SERVICE_TYPE=INSP, timestamp=1621708775887}] (in Xml) 2021-05-22 14:39:35:905 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Task # 1: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, id=cfe22e83-706c-d77f-5716-33660c61ab8e, SERVICE_TYPE=RECO, timestamp=1621708775899}] (in Xml) 2021-05-22 14:39:35:906 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Final service: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=0f21c6dc-5f59-90ea-cd26-3f777ec0033a, id=f026e3a7-2490-8408-66c6-aff7f2386433, SERVICE_TYPE=INSP, timestamp=1621708775887}] (in Xml) Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:39:35:908 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.RefNoHandler - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=691d4c13-8772-e209-1ff3-e70f23e328fb, timestamp=1621708775908}] (in Xml) 2021-05-22 14:39:35:909 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Inspection message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=691d4c13-8772-e209-1ff3-e70f23e328fb, timestamp=1621708775908}] (in Xml) 2021-05-22 14:39:35:910 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceSplitter - Recommended message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=691d4c13-8772-e209-1ff3-e70f23e328fb, timestamp=1621708775908}] (in Xml) 2021-05-22 14:39:35:911 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarInspectionHandler - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=691d4c13-8772-e209-1ff3-e70f23e328fb, id=924e2da3-cac2-1ed5-4390-b5e5057ee842, SERVICE_TYPE=INSP, timestamp=1621708775911}] (in Xml) 2021-05-22 14:39:35:912 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p6.CarServiceHandler - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=691d4c13-8772-e209-1ff3-e70f23e328fb, id=d356132b-ca9c-44c2-2ae1-14bbff3fb9b2, SERVICE_TYPE=RECO, timestamp=1621708775912}] (in Xml) 2021-05-22 14:39:35:914 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Task # 0: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=691d4c13-8772-e209-1ff3-e70f23e328fb, id=924e2da3-cac2-1ed5-4390-b5e5057ee842, SERVICE_TYPE=INSP, timestamp=1621708775911}] (in Xml) 2021-05-22 14:39:35:915 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Task # 1: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=691d4c13-8772-e209-1ff3-e70f23e328fb, id=d356132b-ca9c-44c2-2ae1-14bbff3fb9b2, SERVICE_TYPE=RECO, timestamp=1621708775912}] (in Xml) 2021-05-22 14:39:35:916 [com.polarsparc.si.p6.CarServiceMainXml.main()] INFO com.polarsparc.si.p8.CarServiceAggregator - Final service: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=691d4c13-8772-e209-1ff3-e70f23e328fb, id=924e2da3-cac2-1ed5-4390-b5e5057ee842, SERVICE_TYPE=INSP, timestamp=1621708775911}] (in Xml) Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.452 s [INFO] Finished at: 2021-05-22T14:39:35-04:00 [INFO] ------------------------------------------------------------------------
Java Config based Approach |
The following is the Java Config based POJO that aggregates the incoming messages into a single message by merging all the recommended services from the incoming messages:
/* * Name: CarServiceAggregator2 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Aggregator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import java.util.List; @Configuration @EnableIntegration public class CarServiceAggregator2 { private static final Logger LOGGER = LoggerFactory.getLogger(CarServiceAggregator2.class); @Aggregator(inputChannel = "aggChannel", outputChannel = "outChannel") public Message<Car> aggregator(List<Message<Car>> tasks) { if (tasks != null && tasks.size() > 0) { Message<Car> service = tasks.get(0); LOGGER.info("Task # 0: {} (in Xml)", service.toString()); for (int i = 1; i < tasks.size(); i++) { Message<Car> next = tasks.get(i); LOGGER.info("Task # {}: {} (in Xml)", i, next.toString()); service.getPayload() .getServices() .putAll(next.getPayload().getServices()); } LOGGER.info("Final service: {} (in Xml)", service.toString()); return service; } return null; } }
The following is the Java Config based POJO that defines the channels, the splitter, the router, the aggregator, and the endpoints:
/* * Name: CarServiceConfig6 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.MethodInvokingMessageProcessor; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.router.HeaderValueRouter; import org.springframework.integration.stream.CharacterStreamWritingMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration @EnableIntegration public class CarServiceConfig6 { private RefNoHandler2 refNoHandler2; private CarInspectionHandler2 carInspectionHandler2; private CarServiceHandler2 carServiceHandler2; @Autowired public void setRefNoHandler2(RefNoHandler2 refNoHandler2) { this.refNoHandler2 = refNoHandler2; } @Autowired public void setCarInspectionHandler2(CarInspectionHandler2 carInspectionHandler2) { this.carInspectionHandler2 = carInspectionHandler2; } @Autowired public void setCarServiceHandler2(CarServiceHandler2 carServiceHandler2) { this.carServiceHandler2 = carServiceHandler2; } @Bean public MessageChannel inChannel() { return new DirectChannel(); } @Bean public MessageChannel splitChannel() { return new DirectChannel(); } @Bean public MessageChannel routerChannel() { return new DirectChannel(); } @Bean public MessageChannel inspChannel() { return new DirectChannel(); } @Bean public MessageChannel recoChannel() { return new DirectChannel(); } @Bean public MessageChannel aggChannel() { return new DirectChannel(); } /* --- BEGIN: For RefNoHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> refNoMessageHandler2() { return new MethodInvokingMessageProcessor<>(refNoHandler2, "assignRefNo"); } @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler refNoEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(refNoMessageHandler2()); handler.setOutputChannel(splitChannel()); return handler; } /* --- END: For RefNoHandler2 --- */ /* --- BEGIN: For CarInspectionHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carInspectionMessageHandler2() { return new MethodInvokingMessageProcessor<>(carInspectionHandler2, "inspectionServices"); } @Bean @ServiceActivator(inputChannel = "inspChannel") public MessageHandler carInspectionEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(carInspectionMessageHandler2()); handler.setOutputChannel(aggChannel()); return handler; } /* --- END: For CarInspectionHandler2 --- */ /* --- BEGIN: For CarServiceHandler2 --- */ @Bean public MethodInvokingMessageProcessor<?> carServiceMessageHandler2() { return new MethodInvokingMessageProcessor<>(carServiceHandler2, "recommendedServices"); } @Bean @ServiceActivator(inputChannel = "recoChannel") public MessageHandler carServiceEndpoint2() { ServiceActivatingHandler handler = new ServiceActivatingHandler(carServiceMessageHandler2()); handler.setOutputChannel(aggChannel()); return handler; } /* --- END: For CarServiceHandler2 --- */ @Bean @ServiceActivator(inputChannel = "routerChannel") public HeaderValueRouter router() { HeaderValueRouter router = new HeaderValueRouter("SERVICE_TYPE"); router.setChannelMapping("INSP", "inspChannel"); router.setChannelMapping("RECO", "recoChannel"); return router; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler stdoutAdapter() { CharacterStreamWritingMessageHandler handler = CharacterStreamWritingMessageHandler.stdout(); handler.setShouldAppendNewLine(true); return handler; } }
And finally, the following is the main application that uses the POJOs from Listing.79, Listing.80, Listing.81, Listing.100, Listing.101, Listing.106, and Listing.107 to test the hypothetical car service with the splitter and the aggregator:
/* * Name: CarServiceMainConfig6 * Author: Bhaskar S * Date: 05/22/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p8; import com.polarsparc.si.p6.Car; import com.polarsparc.si.p6.CarServiceGateway2; import com.polarsparc.si.p6.CarServiceHandler2; import com.polarsparc.si.p6.RefNoHandler2; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class CarServiceMainConfig6 { public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(RefNoHandler2.class, CarServiceSplitter2.class, CarInspectionHandler2.class, CarServiceHandler2.class, CarServiceGateway2.class, CarServiceAggregator2.class, CarServiceConfig6.class); CarServiceGateway2 gateway = (CarServiceGateway2) context.getBean("gateway"); gateway.serviceCar(new Car("Toyota", 30000)); gateway.serviceCar(new Car("Honda", 5000)); } }
To execute the code from Listing.108, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p8.CarServiceMainConfig6"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-22 14:42:42:544 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-22 14:42:42:549 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-22 14:42:42:556 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-22 14:42:42:610 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:42:42:633 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-22 14:42:42:770 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-22 14:42:43:052 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-22 14:42:43:053 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.errorChannel' has 1 subscriber(s). 2021-05-22 14:42:43:054 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-22 14:42:43:055 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {splitter:carServiceSplitter2.splitter.splitter} as a subscriber to the 'splitChannel' channel 2021-05-22 14:42:43:056 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.splitChannel' has 1 subscriber(s). 2021-05-22 14:42:43:057 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceSplitter2.splitter.splitter' 2021-05-22 14:42:43:058 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {aggregator:carServiceAggregator2.aggregator.aggregator} as a subscriber to the 'aggChannel' channel 2021-05-22 14:42:43:059 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.aggChannel' has 1 subscriber(s). 2021-05-22 14:42:43:060 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceAggregator2.aggregator.aggregator' 2021-05-22 14:42:43:061 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig6.refNoEndpoint2.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-22 14:42:43:062 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.inChannel' has 1 subscriber(s). 2021-05-22 14:42:43:063 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig6.refNoEndpoint2.serviceActivator' 2021-05-22 14:42:43:064 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig6.carInspectionEndpoint2.serviceActivator} as a subscriber to the 'inspChannel' channel 2021-05-22 14:42:43:065 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.inspChannel' has 1 subscriber(s). 2021-05-22 14:42:43:066 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig6.carInspectionEndpoint2.serviceActivator' 2021-05-22 14:42:43:067 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:carServiceConfig6.carServiceEndpoint2.serviceActivator} as a subscriber to the 'recoChannel' channel 2021-05-22 14:42:43:067 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.recoChannel' has 1 subscriber(s). 2021-05-22 14:42:43:068 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig6.carServiceEndpoint2.serviceActivator' 2021-05-22 14:42:43:069 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig6.router.serviceActivator} as a subscriber to the 'routerChannel' channel 2021-05-22 14:42:43:069 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.routerChannel' has 1 subscriber(s). 2021-05-22 14:42:43:070 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig6.router.serviceActivator' 2021-05-22 14:42:43:070 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:carServiceConfig6.stdoutAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-22 14:42:43:071 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@318d5f3a.outChannel' has 1 subscriber(s). 2021-05-22 14:42:43:072 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'carServiceConfig6.stdoutAdapter.serviceActivator' 2021-05-22 14:42:43:073 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#serviceCar(Car)' 2021-05-22 14:42:43:074 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-22 14:42:43:087 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:42:43:107 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=b7017038-1f49-ba3e-e0c6-3c9e770546d2, timestamp=1621708963087}] (in Config) 2021-05-22 14:42:43:109 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:42:43:110 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Inspection message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=b7017038-1f49-ba3e-e0c6-3c9e770546d2, timestamp=1621708963087}] (in Config) 2021-05-22 14:42:43:112 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Recommended message: GenericMessage [payload=Car(ref=T-00001, make=Toyota, miles=30000, services={}), headers={replyChannel=nullChannel, id=b7017038-1f49-ba3e-e0c6-3c9e770546d2, timestamp=1621708963087}] (in Config) 2021-05-22 14:42:43:116 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:42:43:118 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarInspectionHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=b7017038-1f49-ba3e-e0c6-3c9e770546d2, id=ed7d8dc4-6ed2-e7fb-a85c-9702a3420fb3, SERVICE_TYPE=INSP, timestamp=1621708963116}] (in Config) 2021-05-22 14:42:43:121 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:42:43:122 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=b7017038-1f49-ba3e-e0c6-3c9e770546d2, id=ff4ed940-553e-5ad4-e6de-ae6270930565, SERVICE_TYPE=RECO, timestamp=1621708963121}] (in Config) 2021-05-22 14:42:43:123 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-22 14:42:43:126 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Task # 0: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={HEADLIGHT_LAMP=37.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=b7017038-1f49-ba3e-e0c6-3c9e770546d2, id=ed7d8dc4-6ed2-e7fb-a85c-9702a3420fb3, SERVICE_TYPE=INSP, timestamp=1621708963116}] (in Xml) 2021-05-22 14:42:43:127 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Task # 1: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=b7017038-1f49-ba3e-e0c6-3c9e770546d2, id=ff4ed940-553e-5ad4-e6de-ae6270930565, SERVICE_TYPE=RECO, timestamp=1621708963121}] (in Xml) 2021-05-22 14:42:43:128 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Final service: GenericMessage [payload=Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=b7017038-1f49-ba3e-e0c6-3c9e770546d2, id=ed7d8dc4-6ed2-e7fb-a85c-9702a3420fb3, SERVICE_TYPE=INSP, timestamp=1621708963116}] (in Xml) Car(ref=null, make=Toyota, miles=30000, services={AIR_FILTER=19.95, HEADLIGHT_LAMP=37.95, OIL_CHANGE=29.95, WHEEL_BALANCE=69.95}) 2021-05-22 14:42:43:130 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p6.RefNoHandler2 - Input: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=8a33f4e6-0740-2d88-ddde-fa98417c5fde, timestamp=1621708963130}] (in Config) 2021-05-22 14:42:43:131 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Inspection message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=8a33f4e6-0740-2d88-ddde-fa98417c5fde, timestamp=1621708963130}] (in Config) 2021-05-22 14:42:43:132 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceSplitter2 - Recommended message: GenericMessage [payload=Car(ref=H-00002, make=Honda, miles=5000, services={}), headers={replyChannel=nullChannel, id=8a33f4e6-0740-2d88-ddde-fa98417c5fde, timestamp=1621708963130}] (in Config) 2021-05-22 14:42:43:133 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarInspectionHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={WIPERS=21.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=8a33f4e6-0740-2d88-ddde-fa98417c5fde, id=80f3b962-c51f-6dfa-c1fb-73bb97f11369, SERVICE_TYPE=INSP, timestamp=1621708963133}] (in Config) 2021-05-22 14:42:43:135 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p6.CarServiceHandler2 - Input: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=8a33f4e6-0740-2d88-ddde-fa98417c5fde, id=079461e7-9d64-7962-c72d-97bf147d19a7, SERVICE_TYPE=RECO, timestamp=1621708963134}] (in Config) 2021-05-22 14:42:43:136 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Task # 0: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={WIPERS=21.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=8a33f4e6-0740-2d88-ddde-fa98417c5fde, id=80f3b962-c51f-6dfa-c1fb-73bb97f11369, SERVICE_TYPE=INSP, timestamp=1621708963133}] (in Xml) 2021-05-22 14:42:43:137 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Task # 1: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95}), headers={replyChannel=nullChannel, sequenceNumber=2, sequenceSize=2, correlationId=8a33f4e6-0740-2d88-ddde-fa98417c5fde, id=079461e7-9d64-7962-c72d-97bf147d19a7, SERVICE_TYPE=RECO, timestamp=1621708963134}] (in Xml) 2021-05-22 14:42:43:138 [com.polarsparc.si.p8.CarServiceMainConfig6.main()] INFO com.polarsparc.si.p8.CarServiceAggregator2 - Final service: GenericMessage [payload=Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WIPERS=21.95}), headers={replyChannel=nullChannel, sequenceNumber=1, sequenceSize=2, correlationId=8a33f4e6-0740-2d88-ddde-fa98417c5fde, id=80f3b962-c51f-6dfa-c1fb-73bb97f11369, SERVICE_TYPE=INSP, timestamp=1621708963133}] (in Xml) Car(ref=null, make=Honda, miles=5000, services={AIR_FILTER=19.95, OIL_CHANGE=29.95, WIPERS=21.95}) [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 1.532 s [INFO] Finished at: 2021-05-22T14:42:43-04:00 [INFO] ------------------------------------------------------------------------
References
Spring Integration Notes :: Part - 7
Spring Integration Notes :: Part - 6
Spring Integration Notes :: Part - 5
Spring Integration Notes :: Part - 4
Spring Integration Notes :: Part - 3
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1