PolarSPARC |
Spring Integration Notes :: Part - 3
Bhaskar S | 05/01/2021 (UPDATED) |
Overview
In Part-2, we covered basic examples of Spring Integration relating to Logging of Messages, Multi-Threading, and Exception Handling.
We will continue our journey to explore some basic examples in Spring Integration relating to the following:
Polling for Files in a Directory
Transfer Files from a Remote Directory using SFTP
In Spring Integration, to connect to an external system such as a Filesystem, or an SFTP, or a Database, or a Messaging System etc., one needs to use the appropriate Channel Adapter.
A Channel Adapter is an endpoint that connects a channel to the external system(s) or transport(s).
A Channel Adapter can be:
Inbound ⇨ in which case the external system acts as the source of the messages
Outbound ⇨ in which case the external system is the target of the messages
Spring Integration comes with a number of out-of-the-box Channel Adapters.
Setup
Ensure Docker is setup in the Linux system as we will be demonstrating the channel adapters relating to SFTP using the appropriate docker image.
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p3
$ mkdir -p src/main/resources/p3
$ mkdir -p src/main/resources/META-INF/keys
To setup the other directory structure(s), execute the following commands:
$ mkdir -p /tmp/in-dir
$ mkdir -p /tmp/out-dir
To download the required docker image for the sftp server, execute the following command:
$ docker pull atmoz/sftp
To create the asymmetric public-private keys for sftp, execute the following commands:
$ mkdir -p $HOME/Downloads/sftp/share
$ cd $HOME/Downloads/sftp
$ ssh-keygen -m pem -t rsa -b 2048 -f sftp_rsa_key -C noname < /dev/null
The private key is stored in the file $HOME/Downloads/sftp/sftp_rsa_key and the public key is stored in the file $HOME/Downloads/sftp/sftp_rsa_key.pub.
To setup the private key for the sftp client, execute the following commands:
$ cd $HOME/Downloads/sftp
$ mv sftp_rsa_key $HOME/java/SpringIntegration/src/main/resources/META-INF/keys
The following is the listing for the updated Maven project file pom.xml to support the file and sftp channel adapters:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> </dependencies> </project>
Hands-on Spring Integration
Polling for Files in a Directory
In this example, we poll for file(s) matching a regular expression file name pattern in a specified input directory. On finding new file(s), they are processed by a handler, moved to a specified output directory, and deleted from the input directory.
We will use an external properties file to configure the locations of the source (input) and target (output) directories versus hardcoding the locations in Spring Integration configuration.
The following is the properties file file.properties located in the resources/p3 directory:
# # Properties for file processing # input.files.directory=/tmp/in-dir output.files.directory=/tmp/out-dir files.regex.pattern="[a-z]+.dat"
XML based Approach |
The following is the file handler POJO that displays the file name and file size:
/* * Name: FileProcessHandler * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; public class FileProcessHandler { private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler.class); public File handler(File input) { LOGGER.info("Processed input file: {}, size: {} (using Xml)", input.getAbsolutePath(), input.length()); return input; } }
The following is the XML based Spring Integration configuration that refers to the external file.properties file and wires up the channels, the input and output file channel adapters, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:file="http://www.springframework.org/schema/integration/file" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd"> <context:property-placeholder location="classpath:p3/file.properties" /> <bean id="inFileHandler" class="com.polarsparc.si.p3.FileProcessHandler" /> <file:inbound-channel-adapter id="inChannel" directory="file:${input.files.directory}" filename-regex="${files.regex.pattern}" prevent-duplicates="true"> <int:poller id="poller" fixed-delay="5000" /> </file:inbound-channel-adapter> <file:outbound-channel-adapter id="outChannel" directory="file:${output.files.directory}" delete-source-files="true" /> <int:service-activator input-channel="inChannel" output-channel="outChannel" ref="inFileHandler" method="handler" /> </beans>
Some aspects of the Listing.33 from the above needs a little explanation.
The property-placeholder element allows one to read and load the externally stored properties file via the classpath into the spring context.
Under-the-hood, Spring Integration uses org.springframework.context.support.PropertySourcesPlaceholderConfigurer to load the properties file from the classpath
The file inbound-channel-adapter element allows one to scan for file(s) matching the regular expression pattern as specified by the attribute filename-regex in the filesystem location as specified by the attribute directory.
The prevent-duplicates attribute when set to true, indicates to the underlying file reader to only fetch file(s) that were not identified in the previous polling.
Under-the-hood, Spring Integration uses org.springframework.integration.file.FileReadingMessageSource to fetch the file(s) matching the specified pattern from the specified filesystem location
The file outbound-channel-adapter element allows one to write file(s) to the filesystem at the location as specified by the attribute directory.
The delete-source-files attribute when set to true, will delete the original source file(s) from the input directory.
Under-the-hood, Spring Integration uses org.springframework.integration.file.FileWritingMessageHandler to write file(s) at the specified filesystem location
The following is our main application to test the file channel adapter:
/* * Name: FileProcessMainXml * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.springframework.context.support.ClassPathXmlApplicationContext; public class FileProcessMainXml { public static void main(String[] args) { new ClassPathXmlApplicationContext("p3/FileProcess.xml"); } }
To execute the code from Listing.34, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.FileProcessMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-01 11:56:37:290 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-01 11:56:37:293 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-01 11:56:37:298 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-01 11:56:37:518 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-01 11:56:37:543 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {file:outbound-channel-adapter:outChannel.adapter} as a subscriber to the 'outChannel' channel 2021-05-01 11:56:37:544 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.outChannel' has 1 subscriber(s). 2021-05-01 11:56:37:545 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'outChannel.adapter'; defined in: 'class path resource [p3/FileProcess.xml]'; from source: ''file:outbound-channel-adapter' with id='outChannel'' 2021-05-01 11:56:37:546 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-05-01 11:56:37:547 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.inChannel' has 1 subscriber(s). 2021-05-01 11:56:37:547 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-01 11:56:37:548 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-01 11:56:37:549 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@42062658.errorChannel' has 1 subscriber(s). 2021-05-01 11:56:37:550 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-01 11:56:37:554 [com.polarsparc.si.p3.FileProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'inChannel.adapter'; defined in: 'class path resource [p3/FileProcess.xml]'; from source: ''file:inbound-channel-adapter' with id='inChannel''
Create a file called abc.dat in the directory /tmp/in-dir by executing the following command:
$ echo 'Spring Integration' > /tmp/in-dir/abc.dat
We will see the following update in the terminal output:
2021-05-01 11:58:02:569 [task-scheduler-6] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-01 11:58:02:575 [task-scheduler-6] INFO com.polarsparc.si.p3.FileProcessHandler - Processed input file: /tmp/in-dir/abc.dat, size: 19 (using Xml)
Now, create another file called def.txt in the directory /tmp/in-dir by executing the following command:
$ echo 'Ignored' > /tmp/in-dir/def.txt
Nothing will happen as the file name does not match the pattern.
Java Config based Approach |
The following is the Java Config based POJO that defines the file handler endpoint that displays the file name and file size:
/* * Name: FileProcessHandler2 * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import java.io.File; @Configuration @EnableIntegration public class FileProcessHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler2.class); @ServiceActivator(inputChannel = "inChannel", outputChannel = "outChannel") public File handler(File input) { LOGGER.info("Processed input file: {}, size: {} (using Config)", input.getAbsolutePath(), input.length()); return input; } }
The following is the Java Config based POJO that refers to the external file.properties file and defines the input and output file channel adapters as well as the endpoint similar to the way defined in the XML configuration file of Listing.33 above:
/* * Name: FileProcessConfig * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.file.FileReadingMessageSource; import org.springframework.integration.file.FileWritingMessageHandler; import org.springframework.integration.file.filters.AcceptOnceFileListFilter; import org.springframework.integration.file.filters.CompositeFileListFilter; import org.springframework.integration.file.filters.RegexPatternFileListFilter; import org.springframework.messaging.MessageHandler; import java.io.File; @Configuration @EnableIntegration @PropertySource("classpath:p3/file.properties") public class FileProcessConfig { @Value("${input.files.directory}") private String filesInDir; @Value("${output.files.directory}") private String filesOutDir; @Bean @InboundChannelAdapter(value = "inChannel", poller = @Poller(fixedDelay = "5000")) public MessageSource<File> fileInboundChannelAdapter() { FileReadingMessageSource adapter = new FileReadingMessageSource(); adapter.setDirectory(new File(filesInDir)); CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(); filter.addFilter(new AcceptOnceFileListFilter<>()); filter.addFilter(new RegexPatternFileListFilter("[a-z]+.dat")); adapter.setFilter(filter); return adapter; } @Bean @ServiceActivator(inputChannel = "outChannel") public MessageHandler fileOutboundChannelAdapter() { FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(filesOutDir)); adapter.setDeleteSourceFiles(true); adapter.setExpectReply(false); return adapter; } }
Some aspects of the Listing.36 from the above needs a little explanation.
The annotation class org.springframework.context.annotation.PropertySource provides a convenient and declarative way of accessing external properties file(s) into the Spring environment
The annotation class org.springframework.beans.factory.annotation.Value allows one to access any application specific configuration property defined in external properties file(s) (that are sourced via the annotation PropertySource)
The file filter class org.springframework.integration.file.filters.AcceptOnceFileListFilter prevents duplicate processing of the source file(s)
The file filter class org.springframework.integration.file.filters.RegexPatternFileListFilter encapsulates the regular expression pattern for the source file(s)
The class org.springframework.integration.file.filters.CompositeFileListFilter evaluates a collection of file filter(s)
The annotation class org.springframework.integration.annotation.Poller allows one to define a polled endpoint similar to the element <int:poller> from the XML configuration file of Listing.33 above
The annotation class org.springframework.integration.annotation.InboundChannelAdapter allows one to configure the inbound channel adapter
The class org.springframework.integration.file.FileReadingMessageSource is used to implement the input file source for the inbound channel adapter
The class org.springframework.integration.file.FileWritingMessageHandler is used to implement the outbound channel adapter
And finally, the following is the main application that uses the POJOs from Listing.35 and Listing.36 to test the file channel adapter:
/* * Name: FileProcessMainConfig * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class FileProcessMainConfig { public static void main(String[] args) { new AnnotationConfigApplicationContext(FileProcessHandler2.class, FileProcessConfig.class); } }
To execute the code from Listing.37, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.FileProcessMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-01 12:11:33:439 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-01 12:11:33:444 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-01 12:11:33:448 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-01 12:11:33:485 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-01 12:11:33:489 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-01 12:11:33:702 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-01 12:11:33:792 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-01 12:11:33:793 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-05-01 12:11:33:794 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-01 12:11:33:794 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:fileProcessHandler2.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-01 12:11:33:795 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-05-01 12:11:33:796 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessHandler2.handler.serviceActivator' 2021-05-01 12:11:33:796 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {file:outbound-channel-adapter:fileProcessConfig.fileOutboundChannelAdapter.serviceActivator} as a subscriber to the 'outChannel' channel 2021-05-01 12:11:33:797 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.outChannel' has 1 subscriber(s). 2021-05-01 12:11:33:798 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessConfig.fileOutboundChannelAdapter.serviceActivator' 2021-05-01 12:11:33:802 [com.polarsparc.si.p3.FileProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'fileProcessConfig.fileInboundChannelAdapter.inboundChannelAdapter'
Once again, create a file called abc.dat in the directory /tmp/in-dir by executing the following command:
$ echo 'Spring Integration' > /tmp/in-dir/abc.dat
We will see the following update in the terminal output:
2021-05-01 12:13:33:818 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-01 12:13:33:824 [task-scheduler-3] INFO com.polarsparc.si.p3.FileProcessHandler2 - Processed input file: /tmp/in-dir/abc.dat, size: 19 (using Config)
Now, create another file called def.txt in the directory /tmp/in-dir by executing the following command:
$ echo 'Ignored' > /tmp/in-dir/def.txt
Nothing will happen as the file name does not match the pattern.
As can be inferred from the Output.12 and Output.14 above, Spring Integration successfully processed the two input files using the file channel adapter.
Transfer Files from a Remote Directory using SFTP
The Secure File Transfer Protocol (SFTP) is a network protocol that allows one to transfer file(s) between two computers over the network using a reliable network stream.
In order to transfer file(s) using SFTP, a client (consumer) initiates a connection to a remote host running the SFTP server. For this demonstration, we will run the SFTP server using docker.
Assuming the currently logged in user-id is polarsparc, to start the SFTP server, open a terminal window and execute the following command:
$ docker run --rm --name atmoz-sftp -v $HOME/Downloads/sftp/sftp_rsa_key.pub:/home/polarsparc/.ssh/keys/sftp_rsa_key.pub:ro -v $HOME/Downloads/sftp/share:/home/polarsparc/ -p 2222:22 -d atmoz/sftp polarsparc:polarsparc:1000
The following are the contents of the properties file sftp.properties:
# # Properties for sftp processing # sftp.host=localhost sftp.port=2222 sftp.username=polarsparc sftp.privateKey=classpath:META-INF/keys/sftp_rsa_key sftp.allowUnknownKeys=true sftp.file.pattern=*.dat sftp.remote.dir=. sftp.local.dir=/tmp/out-dir
XML based Approach |
For processing the files from the SFTP site, we will leverage the same POJO defined in Listing.32 above.
The following is the XML based Spring Integration configuration that wires up a SFTP session factory, the necessary channels, the sftp channel adapter, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:sftp="http://www.springframework.org/schema/integration/sftp" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/sftp http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"> <context:property-placeholder location="classpath:p3/sftp.properties" /> <bean id="inFileHandler" class="com.polarsparc.si.p3.FileProcessHandler" /> <bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory"> <property name="host" value="${sftp.host}" /> <property name="port" value="${sftp.port}" /> <property name="user" value="${sftp.username}" /> <property name="privateKey" value="${sftp.privateKey}" /> <property name="allowUnknownKeys" value="${sftp.allowUnknownKeys}" /> </bean> <sftp:inbound-channel-adapter id="sftpInBound" channel="inChannel" session-factory="sftpSessionFactory" filename-pattern="${sftp.file.pattern}" delete-remote-files="false" remote-directory="${sftp.remote.dir}" local-directory="${sftp.local.dir}"> <int:poller id="poller" fixed-rate="5000" /> </sftp:inbound-channel-adapter> <int:service-activator input-channel="inChannel" output-channel="nullChannel" ref="inFileHandler" method="handler" /> </beans>
Some aspects of the Listing.39 from the above needs a little explanation.
The Spring Integration class org.springframework.integration.sftp.session.DefaultSftpSessionFactory is the default SFTP session factory implementation that allows one to open an SFTP network connection with the SFTP server. The default Spring Integration session factory uses a separate physical connection for each channel. The property allowUnknownKeys allows connections from hosts with unknown (or changed) keys. By default, it is set to false
The sftp inbound-channel-adapter element allows one to connect to an SFTP server using the session factory specified via the attribute session-factory and scan for file(s) with the pattern as specified by the attribute filename-pattern in the remote directory location as specified by the attribute remote-directory. The remote file(s) from the SFTP server are downloaded to the local directory as specified by the attribute local-directory. The delete-remote-files attribute when set to true, will delete the file(s) from the remote directory after the successful download
The following is our main application to test the sftp channel adapter:
/* * Name: SftpProcessMainXml * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SftpProcessMainXml { public static void main(String[] args) { new ClassPathXmlApplicationContext("p3/SftpProcess.xml"); } }
To execute the code from Listing.40, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.SftpProcessMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-01 12:51:32:146 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-01 12:51:32:152 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-01 12:51:32:156 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-01 12:51:32:384 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-01 12:51:32:411 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inChannel' channel 2021-05-01 12:51:32:412 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.inChannel' has 1 subscriber(s). 2021-05-01 12:51:32:413 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-01 12:51:32:413 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-01 12:51:32:414 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@4278a583.errorChannel' has 1 subscriber(s). 2021-05-01 12:51:32:415 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-01 12:51:32:418 [com.polarsparc.si.p3.SftpProcessMainXml.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'sftpInBound'; defined in: 'class path resource [p3/SftpProcess.xml]'; from source: ''sftp:inbound-channel-adapter' with id='sftpInBound'' 2021-05-01 12:51:32:665 [task-scheduler-1] INFO org.springframework.integration.sftp.session.DefaultSftpSessionFactory - The authenticity of host 'localhost' can't be established. RSA key fingerprint is f6:24:d1:19:a9:4e:8d:6b:47:15:36:54:55:31:44:21. Are you sure you want to continue connecting? 2021-05-01 12:51:32:666 [task-scheduler-1] WARN com.jcraft.jsch - Permanently added 'localhost' (RSA) to the list of known hosts.
Create a file called abc.dat in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:
$ echo 'Spring Integration' > /tmp/in-dir/abc.dat
$ sudo mv /tmp/in-dir/abc.dat $HOME/Downloads/DATA/sftp/share
We will see the following update in the terminal output:
2021-05-01 12:55:32:801 [task-scheduler-3] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-01 12:55:32:805 [task-scheduler-3] INFO com.polarsparc.si.p3.FileProcessHandler - Processed input file: /tmp/out-dir/abc.dat, size: 19 (using Xml)
Now, create another file called def.txt in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:
$ echo 'Ignored' > /tmp/in-dir/def.txt
$ sudo mv /tmp/in-dir/def.txt $HOME/Downloads/DATA/sftp/share
Nothing will happen as the file name does not match the pattern.
Java Config based Approach |
The following is the Java Config based POJO that defines the file handler endpoint that displays the file name and file size:
/* * Name: FileProcessHandler3 * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import java.io.File; @Configuration @EnableIntegration public class FileProcessHandler3 { private static final Logger LOGGER = LoggerFactory.getLogger(FileProcessHandler3.class); @ServiceActivator(inputChannel = "inChannel", outputChannel = "nullChannel") public File handler(File input) { LOGGER.info("Processed input file: {}, size: {} (using Config)", input.getAbsolutePath(), input.length()); return input; } }
The following is the Java Config based POJO refers to the external sftp.properties file and defines the SFTP session factory, the sftp channel adapter, and the endpoint similar to the way defined in the XML configuration file of Listing.39 above:
/* * Name: SftpProcessConfig * Author: Bhaskar S * Date: 04/24/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.core.io.Resource; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.core.MessageSource; import org.springframework.integration.file.filters.AcceptOnceFileListFilter; import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter; import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer; import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource; import org.springframework.integration.sftp.session.DefaultSftpSessionFactory; import java.io.File; @Configuration @EnableIntegration @PropertySource("classpath:p3/sftp.properties") public class SftpProcessConfig { private static final Logger LOGGER = LoggerFactory.getLogger(SftpProcessConfig.class); @Value("${sftp.host}") private String sftpHost; @Value("${sftp.port:22}") private int sftpPort; @Value("${sftp.username}") private String sftpUser; @Value("${sftp.privateKey}") private Resource sftpPrivateKey; @Value("${sftp.allowUnknownKeys}") private boolean sftpAllowUnknownKeys; @Value("${sftp.remote.dir}") private String sftpRemoteDir; @Value("${sftp.local.dir}") private String sftpLocalDir; @Value("${sftp.file.pattern}") private String sftpFilePattern; @Bean public DefaultSftpSessionFactory sftpSessionFactory() { LOGGER.info(String.format("Host: %s, Port: %d, User: %s", sftpHost, sftpPort, sftpUser)); DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(); factory.setHost(sftpHost); factory.setPort(sftpPort); factory.setUser(sftpUser); factory.setPrivateKey(sftpPrivateKey); factory.setAllowUnknownKeys(sftpAllowUnknownKeys); return factory; } @Bean public SftpInboundFileSynchronizer sftpFileSynchronizer() { LOGGER.info(String.format("Remote Dir: %s", sftpRemoteDir)); SftpInboundFileSynchronizer synchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); synchronizer.setDeleteRemoteFiles(false); synchronizer.setRemoteDirectory(sftpRemoteDir); synchronizer.setFilter(new SftpSimplePatternFileListFilter(sftpFilePattern)); return synchronizer; } @Bean @InboundChannelAdapter(channel = "inChannel", poller = @Poller(fixedDelay = "5000")) public MessageSource<File> sftpChannelAdapter() { LOGGER.info(String.format("Local Dir: %s", sftpLocalDir)); SftpInboundFileSynchronizingMessageSource adapter = new SftpInboundFileSynchronizingMessageSource(sftpFileSynchronizer()); adapter.setLocalDirectory(new File(sftpLocalDir)); adapter.setMaxFetchSize(1); adapter.setLocalFilter(new AcceptOnceFileListFilter<File>()); return adapter; } }
Some aspects of the Listing.42 from the above needs a little explanation.
The class org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer uses the SFTP session factory to manage the SFTP connection and handle the synchronization between a remote SFTP directory and a local directory
The file filter class org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter supports ant style patterns for the source file(s)
The class org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource implements the inbound channel adapter for SFTP file synchronization
The file filter class org.springframework.integration.file.filters.AcceptOnceFileListFilter prevents duplicate processing of the source file(s)
And finally, the following is the main application that uses the POJOs from Listing.41 and Listing.42 to test the sftp channel adapter:
/* * Name: SftpProcessMainConfig * Author: Bhaskar S * Date: 05/01/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p3; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class SftpProcessMainConfig { public static void main(String[] args) { new AnnotationConfigApplicationContext(FileProcessHandler3.class, SftpProcessConfig.class); } }
To execute the code from Listing.43, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p3.SftpProcessMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-01 13:04:51:311 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-01 13:04:51:317 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-01 13:04:51:321 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-01 13:04:51:358 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-01 13:04:51:362 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-01 13:04:51:488 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Host: localhost, Port: 2222, User: bswamina 2021-05-01 13:04:51:495 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Remote Dir: . 2021-05-01 13:04:51:509 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO com.polarsparc.si.p3.SftpProcessConfig - Local Dir: /tmp/out-dir 2021-05-01 13:04:51:575 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-01 13:04:51:653 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-01 13:04:51:654 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.errorChannel' has 1 subscriber(s). 2021-05-01 13:04:51:655 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-01 13:04:51:655 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:fileProcessHandler3.handler.serviceActivator} as a subscriber to the 'inChannel' channel 2021-05-01 13:04:51:656 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@396abb96.inChannel' has 1 subscriber(s). 2021-05-01 13:04:51:657 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'fileProcessHandler3.handler.serviceActivator' 2021-05-01 13:04:51:660 [com.polarsparc.si.p3.SftpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.SourcePollingChannelAdapter - started bean 'sftpProcessConfig.sftpChannelAdapter.inboundChannelAdapter' 2021-05-01 13:04:51:885 [task-scheduler-1] INFO org.springframework.integration.sftp.session.DefaultSftpSessionFactory - The authenticity of host 'localhost' can't be established. RSA key fingerprint is f6:24:d1:19:a9:4e:8d:6b:47:15:36:54:55:31:44:21. Are you sure you want to continue connecting? 2021-05-01 13:04:51:886 [task-scheduler-1] WARN com.jcraft.jsch - Permanently added 'localhost' (RSA) to the list of known hosts.
Once again, create a file called abc.dat in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:
$ echo 'Spring Integration' > /tmp/in-dir/abc.dat
$ sudo mv /tmp/in-dir/abc.dat $HOME/Downloads/DATA/sftp/share
We will see the following update in the terminal output:
2021-05-01 13:07:11:763 [task-scheduler-2] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-01 13:07:11:766 [task-scheduler-2] INFO com.polarsparc.si.p3.FileProcessHandler3 - Processed input file: /tmp/out-dir/abc.dat, size: 19 (using Config)
Now, create another file called def.txt in the directory /tmp/in-dir and transfer to the sftp remote directory by executing the following commands:
$ echo 'Ignored' > /tmp/in-dir/def.txt
$ sudo mv /tmp/in-dir/def.txt $HOME/Downloads/DATA/sftp/share
Nothing will happen as the file name does not match the pattern.
As can be inferred from the Output.16 and Output.18 above, Spring Integration successfully transferred and processed the two files.
References
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1