PolarSPARC |
Spring Integration Notes :: Part - 4
Bhaskar S | 05/08/2021 (UPDATED) |
Overview
In Part-3, we covered basic examples of Spring Integration relating to the File and SFTP channel adapters.
We will continue our journey on channel adapters by exploring examples in Spring Integration relating to the following:
Polling a Database for Records
Invoking a REST Service using HTTP
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p4
$ mkdir -p src/main/resources/p4
To setup the directory structure for the database server, execute the following command:
$ mkdir -p $HOME/Downloads/postgres
To download the required docker image for the PostgreSQL database server, execute the following command:
$ docker pull postgres:13.2
The following is the listing for the updated Maven project file pom.xml to support the jdbc and http channel adapters:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.19</version> </dependency> </dependencies> </project>
Hands-on Spring Integration
Polling a Database for Records
For this example, one needs access to a relational database, hence we will start a PostgreSQL database server on the localhost using docker. Open a terminal window and execute the following command:
$ docker run -d --rm --name postgres-13.2 -e POSTGRES_USER=polarsparc -e POSTGRES_PASSWORD=polarsparc\$123 -p 5432:5432 -v $HOME/Downloads/DATA/postgres:/var/lib/postgresql/data postgres:13.2
To create a database called my_test_db, execute the following command in the terminal:
$ docker exec -it postgres-13.2 sh
The prompt changes to # and continue to execute the following command:
# psql -U polarsparc
The prompt changes to polarsparc=# and continue to execute the following commands:
polarsparc=# CREATE DATABASE my_test_db;
polarsparc=# GRANT ALL PRIVILEGES ON DATABASE my_test_db TO polarsparc;
polarsparc=# \q
The prompt changes to # and continue to execute the following command:
# psql my_test_db -U polarsparc
The prompt changes to my_test_db=> and continue to execute the following commands:
my_test_db=> CREATE TABLE ORDERS_TBL (ORDER_NO VARCHAR(10) NOT NULL, ITEM VARCHAR(100) NOT NULL, SHIPPED CHAR(1) DEFAULT 'N', PRIMARY KEY (ORDER_NO));
my_test_db=> \q
The prompt changes to # and continue to execute the following command:
# exit
The following is the properties file jdbc.properties located in the resources/p4 directory:
# # Properties for jdbc processing # jdbc.driver.class=org.postgresql.Driver jdbc.url=jdbc:postgresql://localhost:5432/my_test_db jdbc.username=polarsparc jdbc.password=polarsparc$123 query.orders=SELECT * FROM ORDERS_TBL WHERE SHIPPED = 'N' update.orders=UPDATE ORDERS_TBL SET SHIPPED = 'Y' WHERE ORDER_NO IN (:ORDER_NO)
XML based Approach |
The following is the database handler POJO that displays the rows retrieved from the database:
/* * Name: DbProcessHandler * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; public class DbProcessHandler { private static final Logger LOGGER = LoggerFactory.getLogger(DbProcessHandler.class); private final static String ORDER_NO = "ORDER_NO"; private final static String ITEM = "ITEM"; public void handler(List<Map<String, Object>> list) { LOGGER.info("No of records to process: {}", list.size()); for (Map<String, Object> rec : list) { LOGGER.info("Processed order {} for item {}", rec.get(ORDER_NO), rec.get(ITEM)); } } }
The following is the XML based Spring Integration configuration that wires up the channels, the jdbc channel adapter, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:jdbc="http://www.springframework.org/schema/integration/jdbc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd"> <context:property-placeholder location="classpath:p4/jdbc.properties" /> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="${jdbc.driver.class}" /> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" /> </bean> <bean id="inDbHandler" class="com.polarsparc.si.p4.DbProcessHandler" /> <jdbc:inbound-channel-adapter channel="inChannel" data-source="dataSource" query="${query.orders}" update="${update.orders}" max-rows-per-poll="5"> <int:poller id="poller" fixed-delay="10000" /> </jdbc:inbound-channel-adapter> <int:service-activator input-channel="inChannel" ref="inDbHandler" method="handler" /> </beans>
Some aspects of the Listing.46 from the above needs a little explanation.
The Spring Framework class org.springframework.jdbc.datasource.DriverManagerDataSource defines a JDBC data source and under-the-hood it establishes a connection with the specified database
The jdbc inbound-channel-adapter element uses the data source as specified by the attribute data-source to execute the SELECT statement as specified by the attribute query.
The result set from the SELECT query is converted into a message that has a payload of Java java.util.List (for list of rows). By default, each row from the SELECT query is mapped to a Java Map<String, Object>.
The attribute update specifies an UPDATE statement that will be used to mark the rows as processed, so that they will not appear in the next poll. The UPDATE statement can be parameterized using the naming convention :column-name.
The attribute max-rows-per-poll specifies the number of rows that can be returned on each poll
The following is our main application to test the jdbc channel adapter:
/* * Name: JdbcProcessMainXml * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.springframework.context.support.ClassPathXmlApplicationContext; public class JdbcProcessMainXml { public static void main(String[] args) { new ClassPathXmlApplicationContext("p4/JdbcProcess.xml"); } }
To execute the code from Listing.47, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.JdbcProcessMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-08 14:58:16:681 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-08 14:58:16:686 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-08 14:58:16:690 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-08 14:58:16:885 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-08 14:58:16:910 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-05-08 14:58:16:910 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@3b71ca90.inChannel' has 1 subscriber(s). 2021-05-08 14:58:16:911 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-08 14:58:16:912 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-08 14:58:16:913 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@3b71ca90.errorChannel' has 1 subscriber(s). 2021-05-08 14:58:16:913 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-08 14:58:16:916 [com.polarsparc.si.p4.JdbcProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'; defined in: 'class path resource [p4/JdbcProcess.xml]'; from source: ''jdbc:inbound-channel-adapter''
Execute the following SQL commands using a database client such as DBeaver:
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('1', 'iPad Air 128G WiFi', 'N');
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('2', 'Roku Express 4K+', 'N');
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('3', '128GB MicroSD Card', 'N');
We will see the following update in the terminal output:
2021-05-08 14:59:47:113 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-08 14:59:47:118 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - No of records to process: 3 2021-05-08 14:59:47:118 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 1 for item iPad Air 128G WiFi 2021-05-08 14:59:47:119 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 2 for item Roku Express 4K+ 2021-05-08 14:59:47:119 [task-scheduler-3] INFO com.polarsparc.si.p4.DbProcessHandler - Processed order 3 for item 128GB MicroSD Card
The rows in the database are automatically updated (the column SHIPPED) is set to 'Y'.
Java Config based Approach |
The following is the Java Config based POJO that defines the handler to display the rows retrieved from the database:
/* * Name: DbProcessHandler2 * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import java.util.List; import java.util.Map; @Configuration @EnableIntegration public class DbProcessHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(DbProcessHandler2.class); private final static String ORDER_NO = "ORDER_NO"; private final static String ITEM = "ITEM"; @ServiceActivator(inputChannel = "inChannel") public void handler(List<Map<String, Object>> list) { LOGGER.info("No of records to process: {}", list.size()); for (Map<String, Object> rec : list) { LOGGER.info("Processed order {} for item {}", rec.get(ORDER_NO), rec.get(ITEM)); } } }
The following is the Java Config based POJO that refers to the external jdbc.properties file and defines the jdbc data source and the input jdbc channel adapter similar to the way defined in the XML configuration file of Listing.46 above:
/* * Name: JdbcProcessConfig * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.jdbc.JdbcPollingChannelAdapter; import org.springframework.jdbc.datasource.DriverManagerDataSource; import javax.sql.DataSource; @Configuration @EnableIntegration @PropertySource("classpath:p4/jdbc.properties") public class JdbcProcessConfig { @Value("${jdbc.driver.class}") private String jdbcDriver; @Value("${jdbc.url}") private String jdbcUrl; @Value("${jdbc.username}") private String jdbcUser; @Value("${jdbc.password}") private String jdbcPassword; @Value("${query.orders}") private String selectOrders; @Value("${update.orders}") private String updateOrders; @Bean public DataSource jdbcDataSource() { DriverManagerDataSource source = new DriverManagerDataSource(); source.setDriverClassName(jdbcDriver); source.setUrl(jdbcUrl); source.setUsername(jdbcUser); source.setPassword(jdbcPassword); return source; } @Bean @InboundChannelAdapter(channel = "inChannel", poller = @Poller(fixedDelay = "10000")) public MessageSource<Object> jdbcChannelAdapter() { JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(jdbcDataSource(), selectOrders); adapter.setUpdateSql(updateOrders); adapter.setMaxRows(5); return adapter; } }
Some aspects of the Listing.49 from the above needs a little explanation.
The class org.springframework.jdbc.datasource.DriverManagerDataSource provides a simple implementation of the jdbc based data source using the provided jdbc properties such as the driver manager, the database url, connection credentials, etc
The class org.springframework.integration.jdbc.JdbcPollingChannelAdapter implements a a polling database channel adapter (using jdbc) that creates messages by querying the underlying database using the specified SELECT query. After querying for the specified number of rows, it executes the specified UPDATE statement
And finally, the following is the main application that uses the POJOs from Listing.48 and Listing.49 to test the jdbc channel adapter:
/* * Name: JdbcProcessMainConfig * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class JdbcProcessMainConfig { public static void main(String[] args) { new AnnotationConfigApplicationContext(DbProcessHandler2.class, JdbcProcessConfig.class); } }
To execute the code from Listing.50, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.JdbcProcessMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-08 15:45:03:965 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-08 15:45:03:970 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-08 15:45:03:975 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-08 15:45:04:014 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-08 15:45:04:019 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-08 15:45:04:235 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-08 15:45:04:313 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-08 15:45:04:314 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@4c4f6598.errorChannel' has 1 subscriber(s). 2021-05-08 15:45:04:315 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-08 15:45:04:315 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:dbProcessHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-08 15:45:04:316 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@4c4f6598.inChannel' has 1 subscriber(s). 2021-05-08 15:45:04:317 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'dbProcessHandler2.handler.serviceActivator' 2021-05-08 15:45:04:320 [com.polarsparc.si.p4.JdbcProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'jdbcProcessConfig.jdbcChannelAdapter.inboundChannelAdapter'
Execute the following SQL commands using a database client such as DBeaver:
DELETE FROM ORDERS_TBL;
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('1', 'iPad Air 128G WiFi', 'N');
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('2', 'Roku Express 4K+', 'N');
INSERT INTO ORDERS_TBL (ORDER_NO, ITEM, SHIPPED) VALUES('3', '128GB MicroSD Card', 'N');
We will see the following update in the terminal output:
2021-05-08 15:46:24:516 [task-scheduler-5] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-08 15:46:24:519 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - No of records to process: 3 2021-05-08 15:46:24:520 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 1 for item iPad Air 128G WiFi 2021-05-08 15:46:24:520 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 2 for item Roku Express 4K+ 2021-05-08 15:46:24:521 [task-scheduler-5] INFO com.polarsparc.si.p4.DbProcessHandler2 - Processed order 3 for item 128GB MicroSD Card
The rows in the database are automatically updated (the column SHIPPED) is set to 'Y'.
As can be inferred from the Output.20 and Output.22 above, Spring Integration successfully processed the rows from the database.
Invoking a REST Service using HTTP
XML based Approach |
For this example, we will invoke a free external web-service to get the spot prices on precious metals in a JSON format. The web-service endpoint to invoke is Spot Prices on Precious Metals.
The following is a simple interface to fetch the spot prices on precious metals:
/* * Name: SpotPricesGateway * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; public interface SpotPricesGateway { public String preciousMetalPrices(String type); }
The following is the XML based Spring Integration configuration that wires up the channels, the http channel adapter, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:http="http://www.springframework.org/schema/integration/http" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/http http://www.springframework.org/schema/integration/http/spring-integration-http.xsd"> <int:logging-channel-adapter id="logger" level="INFO" log-full-message="true" /> <int:channel id="inChannel"> <int:interceptors> <int:wire-tap channel="logger" /> </int:interceptors> </int:channel> <int:channel id="outChannel"> <int:queue capacity="5" /> </int:channel> <int:gateway id="gateway" service-interface="com.polarsparc.si.p4.SpotPricesGateway"> <int:method name="preciousMetalPrices" request-channel="inChannel" reply-channel="outChannel"> <int:header name="f" expression="#args[0]" /> <int:header name="Content-Type" value="application/json" /> </int:method> </int:gateway> <http:outbound-gateway id="httpGateway" request-channel="inChannel" url="http://services.packetizer.com/spotprices/?f={f}" http-method="GET" expected-response-type="java.lang.String" reply-timeout="10000" reply-channel="outChannel"> <http:uri-variable name="f" expression="headers.f" /> </http:outbound-gateway> </beans>
Some aspects of the Listing.52 from the above needs a little explanation.
We have learnt about the gateway element in Part-1. It also allows one to specify one or more method sub-elements.
A method sub-element allows one to configure the behavior (such as adding name-value pairs to the message header) when the specified method is invoked.
To add a name-value pair to the message header, use the header sub-element under the method sub-element.
The header sub-element uses the name attribute to specify the header name.
The header sub-element uses the value attribute to specify a static header value.
The header sub-element uses the expression attribute to specify a dynamic header value. In the above, the expression #args[0] stands for the first parameter of the specified method
The http outbound-gateway element allows one to invoke the specified URL endpoint.
If the specified URL uses a query parameter (as in our case), one should use the uri-variable sub-element.
In our case, we have one uri-variable sub-element for the query parameter f which gets its value from the message header. The expression headers.f gets the value of the message header property with name f
The following is our main application to test the http channel adapter:
/* * Name: SpotPricesMainXml * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpotPricesMainXml { private static final Logger LOGGER = LoggerFactory.getLogger(SpotPricesMainXml.class); public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p4/SpotPrices.xml"); SpotPricesGateway gateway = (SpotPricesGateway) context.getBean("gateway"); LOGGER.info("Precious Metal Prices = {}", gateway.preciousMetalPrices("json")); } }
To execute the code from Listing.53, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.SpotPricesMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-08 16:26:37:579 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-08 16:26:37:582 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-08 16:26:37:588 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-08 16:26:37:807 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.util.ReflectionUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to constructor java.lang.invoke.MethodHandles$Lookup(java.lang.Class) WARNING: Please consider reporting this to the maintainers of org.springframework.util.ReflectionUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-08 16:26:37:909 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:logger.adapter} as a subscriber to the 'logger' channel 2021-05-08 16:26:37:909 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.logger' has 1 subscriber(s). 2021-05-08 16:26:37:910 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'logger.adapter'; defined in: 'class path resource [p4/SpotPrices.xml]'; from source: ''int:logging-channel-adapter' with id='logger'' 2021-05-08 16:26:37:911 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {http:outbound-gateway:httpGateway} as a subscriber to the 'inChannel' channel 2021-05-08 16:26:37:911 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.inChannel' has 1 subscriber(s). 2021-05-08 16:26:37:912 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'httpGateway' 2021-05-08 16:26:37:913 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-08 16:26:37:913 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@761cb475.errorChannel' has 1 subscriber(s). 2021-05-08 16:26:37:914 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-08 16:26:37:915 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#preciousMetalPrices(String)' 2021-05-08 16:26:37:915 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-08 16:26:37:926 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.endpoint.PollingConsumer - started org.springframework.integration.endpoint.PollingConsumer@3ccf0884 2021-05-08 16:26:37:933 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=json, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4b153b1d, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@4b153b1d, id=2a42c117-1c9c-0b9a-0745-83dcc71aff6e, f=json, Content-Type=application/json, timestamp=1620505597932}] 2021-05-08 16:26:38:075 [com.polarsparc.si.p4.SpotPricesMainXml.main()] INFO com.polarsparc.si.p4.SpotPricesMainXml - Precious Metal Prices = { "date" : "2021-05-07", "gold" : "1831.10", "silver" : "27.46", "platinum" : "1256.75" } --- CTRL-C ---
Java Config based Approach |
The following is the Java Config based POJO that defines the gateway similar to the one defined in the XML configuration file of Listing.52 above:
/* * Name: SpotPricesGateway2 * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.GatewayHeader; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.config.EnableIntegration; @Configuration @EnableIntegration @IntegrationComponentScan @MessagingGateway(name = "gateway", defaultRequestChannel = "inChannel", defaultReplyChannel = "outChannel") public interface SpotPricesGateway2 { @Gateway(requestChannel = "inChannel", replyChannel = "outChannel", headers = { @GatewayHeader(name = "f", expression = "#args[0]"), @GatewayHeader(name = "Content-Type", value = "application/json") }) public String preciousMetalPrices(String type); }
Some aspects of the Listing.54 from the above needs a little explanation.
The annotation class org.springframework.integration.annotation.GatewayHeader allows one to set a message header name with a value or an expression (thats dynamically resolves to a value)
The method annotation class org.springframework.integration.annotation.Gateway allows for the mapping of the method parameter(s) to message header(s) via the headers attribute. Notice we have specified a list of two GatewayHeader annotations that is the equivalent of the int:header elements from the XML configuration file of Listing.52 above
The following is the Java Config based POJO that defines the input and output channels, a logging wiretap, and the out bound http channel adapter similar to the way defined in the XML configuration file of Listing.52 above:
/* * Name: SpotPricesConfig * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.expression.Expression; import org.springframework.expression.ExpressionParser; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.http.HttpMethod; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.handler.LoggingHandler; import org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import java.util.HashMap; import java.util.Map; @Configuration @EnableIntegration public class SpotPricesConfig { @Bean @ServiceActivator(inputChannel = "logger") public LoggingHandler logHandler() { LoggingHandler logger = new LoggingHandler(LoggingHandler.Level.INFO); logger.setShouldLogFullMessage(true); return logger; } @Bean public MessageChannel inChannel() { AbstractMessageChannel channel = new DirectChannel(); channel.addInterceptor(wireTap()); return channel; } @Bean public WireTap wireTap() { return new WireTap("logger"); } @Bean public MessageChannel outChannel() { return new QueueChannel(5); } @Bean @ServiceActivator(inputChannel = "inChannel") public MessageHandler httpGateway() { HttpRequestExecutingMessageHandler handler = new HttpRequestExecutingMessageHandler("http://services.packetizer.com/spotprices/?f={f}"); handler.setHttpMethod(HttpMethod.GET); handler.setExpectedResponseType(String.class); handler.setSendTimeout(10000); handler.setOutputChannel(outChannel()); ExpressionParser parser = new SpelExpressionParser(); Map<String, Expression> variables = new HashMap<>(); variables.put("f", parser.parseExpression("headers.f")); handler.setUriVariableExpressions(variables); return handler; } }
Some aspects of the Listing.55 from the above needs a little explanation.
The class org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler implements the outbound http channel adapter which makes http requests to the specified url. Under the hood it uses an instance of the class org.springframework.web.client.RestTemplate to make the outgoing http requests
The class org.springframework.integration.jdbc.JdbcPollingChannelAdapter implements a a polling database channel adapter (using jdbc) that creates messages by querying the underlying database using the specified SELECT query. After querying for the specified number of rows, it executes the specified UPDATE statement
The interface org.springframework.expression.Expression provides a common abstraction for expression evaluation
The interface org.springframework.expression.ExpressionParser provides an abstraction for parsing expression strings into compiled Expression instances that can be later evaluated
The class org.springframework.expression.spel.standard.SpelExpressionParser is a concrete implementation of an ExpressionParser that supports the Spring Expression Language syntax
The method call handler.setUriVariableExpressions(variables) on the instance of the HttpRequestExecutingMessageHandler is used to provide a table of mapping between a uri variable and an instance of Expression that needs to be resolved prior to making the http request. This is the equivalent of the element http:uri-variable from the XML configuration file of Listing.52 above
And finally, the following is the main application that uses the POJOs from Listing.54 and Listing.55 to test the http channel adapter:
/* * Name: SpotPricesMainConfig * Author: Bhaskar S * Date: 05/08/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p4; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class SpotPricesMainConfig { private static final Logger LOGGER = LoggerFactory.getLogger(SpotPricesMainConfig.class); public static void main(String[] args) { ApplicationContext context = new AnnotationConfigApplicationContext(SpotPricesGateway2.class, SpotPricesConfig.class); SpotPricesGateway2 gateway = (SpotPricesGateway2) context.getBean("gateway"); LOGGER.info("Precious Metal Prices with Config = {}", gateway.preciousMetalPrices("json")); } }
To execute the code from Listing.56, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p4.SpotPricesMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.springframework.cglib.core.ReflectUtils (file:/home/bswamina/.m2/repository/org/springframework/spring-core/5.3.5/spring-core-5.3.5.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain) WARNING: Please consider reporting this to the maintainers of org.springframework.cglib.core.ReflectUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release 2021-05-08 20:09:11:006 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-08 20:09:11:010 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-08 20:09:11:014 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-08 20:09:11:048 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-08 20:09:11:081 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-08 20:09:11:211 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-08 20:09:11:383 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-08 20:09:11:384 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.errorChannel' has 1 subscriber(s). 2021-05-08 20:09:11:385 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-08 20:09:11:385 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {message-handler:spotPricesConfig.logHandler.serviceActivator} as a subscriber to the 'logger' channel 2021-05-08 20:09:11:386 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.logger' has 1 subscriber(s). 2021-05-08 20:09:11:387 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'spotPricesConfig.logHandler.serviceActivator' 2021-05-08 20:09:11:387 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {http:outbound-gateway:spotPricesConfig.httpGateway.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-08 20:09:11:388 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@7bb01ac9.inChannel' has 1 subscriber(s). 2021-05-08 20:09:11:389 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'spotPricesConfig.httpGateway.serviceActivator' 2021-05-08 20:09:11:390 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway - started bean 'gateway#preciousMetalPrices(String)' 2021-05-08 20:09:11:390 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.gateway.GatewayProxyFactoryBean - started bean 'gateway' 2021-05-08 20:09:11:403 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.endpoint.PollingConsumer - started org.springframework.integration.endpoint.PollingConsumer@3d38adba 2021-05-08 20:09:11:410 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO org.springframework.integration.handler.LoggingHandler - GenericMessage [payload=json, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@bcdb17b, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@bcdb17b, id=22abb0b7-fccc-d9cd-649e-7f1741c4eaa7, f=json, Content-Type=application/json, timestamp=1620565391409}] 2021-05-08 20:09:11:564 [com.polarsparc.si.p4.SpotPricesMainConfig.main()] INFO com.polarsparc.si.p4.SpotPricesMainConfig - Precious Metal Prices with Config = { "date" : "2021-05-07", "gold" : "1831.10", "silver" : "27.46", "platinum" : "1256.75" } --- CTRL-C ---
As can be inferred from the Output.23 and Output.24 above, Spring Integration successfully invoked the web-service and fetched the current spot prices of precious metals.
References
Spring Integration Notes :: Part - 3
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1