PolarSPARC |
Spring Integration Notes :: Part - 5
Bhaskar S | 05/15/2021 (UPDATED) |
Overview
In Part-4, we covered basic examples of Spring Integration relating to the JDBC and HTTP channel adapters.
We will continue our journey on channel adapters by exploring examples in Spring Integration relating to the following:
Receiving Message(s) from AMQP (using RabbitMQ)
Receiving Message(s) from Apache Kafka
Setup
To setup the Java directory structure for the demonstrations in this part, execute the following commands:
$ cd $HOME/java/SpringIntegration
$ mkdir -p src/main/java/com/polarsparc/si/p5
$ mkdir -p src/main/resources/p5
To setup the directory structure(s) for RabbitMQ and Apache Kafka, execute the following commands:
$ mkdir -p $HOME/Downloads/rabbitmq
$ mkdir -p $HOME/Downloads/zookeeper/data $HOME/Downloads/zookeeper/logs
$ mkdir -p $HOME/Downloads/kafka/data $HOME/Downloads/kafka/logs
To download the required docker images for the RabbitMQ, Apache Zookeeper, and Apache Kafka, execute the following commands:
$ docker pull rabbitmq:3.8.16-management
$ docker pull confluentinc/cp-zookeeper:6.1.1
$ docker pull confluentinc/cp-kafka:6.1.1
The following is the listing for the updated Maven project file pom.xml to support the AMQP and Apache Kafka channel adapters:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.polarsparc.si</groupId> <artifactId>SpringIntegration</artifactId> <version>1.0</version> <name>SpringIntegration</name> <description>Spring Integration Examples</description> <properties> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <fork>true</fork> <meminitial>128m</meminitial> <maxmem>512m</maxmem> <source>11</source> <target>11</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>javax.annotation</groupId> <artifactId>javax.annotation-api</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-beans</artifactId> <version>5.3.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-file</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-sftp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-http</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-amqp</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>5.4.5</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.3.6</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.19</version> </dependency> </dependencies> </project>
Hands-on Spring Integration
Receiving Message(s) from AMQP (using RabbitMQ)
RabbitMQ is an open source, language neutral message-oriented middleware that implements the AMQP messaging standard. One can refer to the 3-part tutorial series on RabbitMQ to quickly get started - Part - 1, Part - 2, and Part - 3.
To start the RabbitMQ server on the localhost using docker, open a terminal window and execute the following command:
$ docker run -d --rm --hostname rabbitmq-dev --name rabbitmq-dev -v $HOME/Downloads/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=polarsparc -e RABBITMQ_DEFAULT_PASS=polarsparc\$123 rabbitmq:3.8.16-management
The following is the properties file amqp.properties located in the resources/p5 directory:
# # Properties for amqp processing # amqp.host=localhost amqp.port=5672 amqp.username=polarsparc amqp.password=polarsparc$123
XML based Approach |
The following is the XML based Spring Framework application context defines the various RabbitMQ connection related resources:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:p5/amqp.properties" /> <rabbit:connection-factory id="connectionFactory" host="${amqp.host}" port="${amqp.port}" username="${amqp.username}" password="${amqp.password}" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="si_queue" /> <rabbit:direct-exchange name="si_exchange"> <rabbit:bindings> <rabbit:binding queue="si_queue" key="si_queue" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> </beans>
Some aspects of the Listing.58 from the above needs a little explanation.
The element rabbit:connection-factory defines the central component for managing a connection to the RabbitMQ broker. Under-the-hood it uses the Spring Framework class org.springframework.amqp.rabbit.connection.CachingConnectionFactory to establish connection with the broker
The element rabbit:queue defines the message queue on the RabbitMQ broker and under-the-hood it uses the Spring Framework class org.springframework.amqp.core.Queue to represent a queue
The element rabbit:direct-exchange defines how messages are routed to a message queue on the RabbitMQ broker and under-the-hood it uses the Spring Framework class org.springframework.amqp.core.DirectExchange to represent the exchange
The element rabbit:binding is important and defines the connection between the amqp exchange to the corresponding amqp queue. Under-the-hood it uses the Spring Framework class org.springframework.amqp.core.Binding to implement the binding
The element rabbit:admin is very *CRITICAL* and is responsible for automatically creating the amqp related connection resources such as the queue(s), the exchange(s), and the binding(s) in the RabbitMQ broker on behalf of the user. Under-the-hood it uses the Spring Framework class org.springframework.amqp.rabbit.core.RabbitAdmin to implement the administrative functionality
The element rabbit:template defines amqp connection abstraction for sending or receiving amqp messages from the RabbitMQ broker. It uses the connection factory as specified by the attribute connection-factory. Under-the-hood it uses the Spring Framework class org.springframework.amqp.rabbit.core.RabbitTemplate as the concrete implementation
The following is the main application to publish amqp messages to the amqp queue on the RabbitMQ broker:
/* * Name: AmqpPublisher * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.*; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; public class AmqpPublisher { private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublisher.class); private static final String MESSAGE_TEXT = "AMQP with Spring Integration - %d"; public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext("p5/AmqpPublisher.xml"); AmqpTemplate template = (AmqpTemplate) context.getBean("amqpTemplate"); for (int i = 1; i <= 10; i++) { String text = String.format(MESSAGE_TEXT, i); try { template.convertAndSend("si_exchange", "si_queue", text); LOGGER.info("Published message - {}", text); } catch (AmqpException ex) { LOGGER.error(ex.getLocalizedMessage()); } } } }
The following is the amqp message handler POJO that displays the messages received from the specific amqp queue in the RabbitMQ broker:
/* * Name: AmqpProcessHandler * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class AmqpProcessHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AmqpProcessHandler.class); public void handler(Message<?> msg) { LOGGER.info("AMQP message payload (in Xml): {}", msg.getPayload().toString()); } }
The following is the XML based Spring Integration configuration that wires up the channel, the various RabbitMQ connection related resources, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:amqp="http://www.springframework.org/schema/integration/amqp" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <context:property-placeholder location="classpath:p5/amqp.properties" /> <rabbit:connection-factory id="connectionFactory" host="${amqp.host}" port="${amqp.port}" username="${amqp.username}" password="${amqp.password}" /> <rabbit:admin connection-factory="connectionFactory" /> <rabbit:queue name="si_queue" /> <rabbit:direct-exchange name="si_exchange"> <rabbit:bindings> <rabbit:binding queue="si_queue" key="si_queue" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> <int:channel id="amqpMsgsChannel" /> <amqp:inbound-channel-adapter id="inChannel" queue-names="si_queue" channel="amqpMsgsChannel" connection-factory="connectionFactory" /> <bean id="inAmqpHandler" class="com.polarsparc.si.p5.AmqpProcessHandler" /> <int:service-activator input-channel="amqpMsgsChannel" ref="inAmqpHandler" method="handler" /> </beans>
Some aspects of the Listing.61 from the above needs a little explanation.
The amqp inbound-channel-adapter element uses the connection factory as specified by the attribute connection-factory to consume amqp messages from a comma-separated list of queues as apecified by the attribute queue-names and passes them as message to the channel as specified by the attribute channel. Under-the-hood it uses the Spring Framework class org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter to represent the input channel adapter
The element rabbit:template defines amqp connection abstraction for sending or receiving amqp messages from the RabbitMQ broker. It uses the connection factory as specified by the attribute connection-factory. Under-the-hood it uses the Spring Framework class org.springframework.amqp.rabbit.core.RabbitTemplate as the concrete implementation
The following is our main application to test the amqp channel adapter:
/* * Name: AmqpProcessMainXml * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.springframework.context.support.ClassPathXmlApplicationContext; public class AmqpProcessMainXml { public static void main(String[] args) { new ClassPathXmlApplicationContext("p5/AmqpProcess.xml"); } }
To execute the code from Listing.62, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpProcessMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 08:37:53:282 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-15 08:37:53:286 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-15 08:37:53:290 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-15 08:37:53:590 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-15 08:37:53:621 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'amqpMsgsChannel' channel 2021-05-15 08:37:53:622 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@e98a152.amqpMsgsChannel' has 1 subscriber(s). 2021-05-15 08:37:53:623 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-15 08:37:53:624 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-15 08:37:53:625 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@e98a152.errorChannel' has 1 subscriber(s). 2021-05-15 08:37:53:626 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-15 08:37:53:627 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672 2021-05-15 08:37:53:683 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#3059cdde:0/SimpleConnection@689c9ac [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35422] 2021-05-15 08:37:53:729 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter - started bean 'inChannel'; defined in: 'class path resource [p5/AmqpProcess.xml]'; from source: ''amqp:inbound-channel-adapter' with id='inChannel''
To execute the amqp publisher code from Listing.59, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpPublisher"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 08:39:10:414 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672 2021-05-15 08:39:10:470 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#3600308e:0/SimpleConnection@682ccde5 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35428] 2021-05-15 08:39:10:504 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 1 2021-05-15 08:39:10:505 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 2 2021-05-15 08:39:10:506 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 3 2021-05-15 08:39:10:507 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 4 2021-05-15 08:39:10:507 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 5 2021-05-15 08:39:10:510 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 6 2021-05-15 08:39:10:511 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 7 2021-05-15 08:39:10:512 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 8 2021-05-15 08:39:10:514 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 9 2021-05-15 08:39:10:514 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 10 --- CTRL-C ---
We will see the following update in the terminal of Output.25:
2021-05-15 08:39:10:524 [(inner bean)#3d4053d9-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-15 08:39:10:528 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 1 2021-05-15 08:39:10:529 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 2 2021-05-15 08:39:10:529 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 3 2021-05-15 08:39:10:530 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 4 2021-05-15 08:39:10:531 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 5 2021-05-15 08:39:10:532 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 6 2021-05-15 08:39:10:532 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 7 2021-05-15 08:39:10:533 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 8 2021-05-15 08:39:10:534 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 9 2021-05-15 08:39:10:535 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 10 --- CTRL-C ---
Java Config based Approach |
The following is the Java Config based POJO that defines the amqp message handler that displays the messages received from the RabbitMQ broker:
/* * Name: AmqpProcessHandler2 * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class AmqpProcessHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(AmqpProcessHandler2.class); @ServiceActivator(inputChannel = "amqpMsgsChannel") public void handler(Message<?> msg) { LOGGER.info("AMQP message payload (in Config): {}", msg.getPayload().toString()); } }
The following is the Java Config based POJO that refers to the external amqp.properties file and defines the channel, the various RabbitMQ connection related resources, and the endpoint similar to the way defined in the XML configuration file of Listing.61 above:
/* * Name: AmqpProcessConfig * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.MessageChannel; @Configuration @EnableIntegration @PropertySource("classpath:p5/amqp.properties") public class AmqpProcessConfig { @Value("${amqp.host}") private String amqpHost; @Value("${amqp.port}") private int amqpPort; @Value("${amqp.username}") private String amqpUsername; @Value("${amqp.password}") private String amqpPassword; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory("localhost"); factory.setHost(amqpHost); factory.setPort(amqpPort); factory.setUsername(amqpUsername); factory.setPassword(amqpPassword); return factory; } @Bean public RabbitAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory()); } @Bean DirectExchange amqpDirectExchange() { return new DirectExchange("si_exchange"); } @Bean Binding amqpBinding(Queue queue, DirectExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("si_queue"); } @Bean public SimpleMessageListenerContainer amqpMessageListenerContainer(ConnectionFactory factory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory); container.setQueueNames("si_queue"); return container; } @Bean public AmqpInboundChannelAdapter amqpInboundChannelAdapter(SimpleMessageListenerContainer container, @Qualifier("amqpMsgsChannel") MessageChannel channel) { AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container); adapter.setOutputChannel(channel); return adapter; } @Bean public Queue amqpQueue() { return new Queue("si_queue"); } @Bean public RabbitTemplate amqpTemplate() { return new RabbitTemplate(connectionFactory()); } @Bean public MessageChannel amqpMsgsChannel() { return new DirectChannel(); } }
Some aspects of the Listing.64 from the above needs a little explanation.
The class org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer is the implementation of a message container that is responsible for acting as a bridge between the amqp message queue(s) and the amqp message listener. It uses the connection factory to make the connection with the RabbitMQ broker
The class org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter is the amqp channel adapter that receives the amqp messages from an amqp queue defined in the RabbitMQ broker as messages, which is then sent to specified channel
And finally, the following is the main application that uses the POJOs from Listing.63 and Listing.64 to test the amqp channel adapter:
/* * Name: AmqpProcessMainConfig * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class AmqpProcessMainConfig { public static void main(String[] args) { new AnnotationConfigApplicationContext(AmqpProcessHandler2.class, AmqpProcessConfig.class); } }
To execute the code from Listing.65, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpProcessMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 08:44:50:733 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-15 08:44:50:740 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-15 08:44:50:744 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-15 08:44:50:780 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-15 08:44:50:786 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-15 08:44:51:089 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-15 08:44:51:164 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-15 08:44:51:165 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23df8bde.errorChannel' has 1 subscriber(s). 2021-05-15 08:44:51:165 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-15 08:44:51:166 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:amqpProcessHandler2.handler.serviceActivator} as a subscriber to the 'amqpMsgsChannel' channel 2021-05-15 08:44:51:167 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23df8bde.amqpMsgsChannel' has 1 subscriber(s). 2021-05-15 08:44:51:167 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'amqpProcessHandler2.handler.serviceActivator' 2021-05-15 08:44:51:168 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672 2021-05-15 08:44:51:223 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#67fa9137:0/SimpleConnection@33593621 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35566] 2021-05-15 08:44:51:264 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter - started bean 'amqpInboundChannelAdapter'; defined in: 'com.polarsparc.si.p5.AmqpProcessConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@7c5de007'
To execute the amqp publisher code from Listing.59, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpPublisher"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 08:45:41:123 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672 2021-05-15 08:45:41:168 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#32774e22:0/SimpleConnection@65c5fc89 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35570] 2021-05-15 08:45:41:202 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 1 2021-05-15 08:45:41:203 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 2 2021-05-15 08:45:41:204 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 3 2021-05-15 08:45:41:205 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 4 2021-05-15 08:45:41:206 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 5 2021-05-15 08:45:41:207 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 6 2021-05-15 08:45:41:208 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 7 2021-05-15 08:45:41:209 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 8 2021-05-15 08:45:41:209 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 9 2021-05-15 08:45:41:210 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 10 --- CTRL-C ---
We will see the following update in the terminal of Output.28:
2021-05-15 08:45:41:218 [amqpMessageListenerContainer-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-15 08:45:41:221 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 1 2021-05-15 08:45:41:222 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 2 2021-05-15 08:45:41:223 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 3 2021-05-15 08:45:41:224 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 4 2021-05-15 08:45:41:225 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 5 2021-05-15 08:45:41:225 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 6 2021-05-15 08:45:41:226 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 7 2021-05-15 08:45:41:227 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 8 2021-05-15 08:45:41:228 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 9 2021-05-15 08:45:41:228 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 10 --- CTRL-C ---
As can be inferred from the Output.27 and Output.30 above, Spring Integration successfully processed the amqp messages from the RabbitMQ broker.
Receiving Message(s) from Apache Kafka
Apache Kafka is an open source, low-latency, high-throughput, fault-tolerant, distributed, persistent, durable, data streaming platform. One can refer to the 2-part tutorial series on Apache Kafka to quickly get started - Part - 1 and Part - 2.
To start the Apache Kafka broker on the localhost using docker, open a terminal window and execute the following commands (in that order):
$ docker run -d --rm --name cp-zk-dev --net=host -u $(id -u $USER):$(id -g $USER) -v $HOME/Downloads/zk/data:/var/lib/zookeeper/data -v $HOME/Downloads/zk/logs:/var/log/kafka -e ZOOKEEPER_CLIENT_PORT=10001 -e ZOOKEEPER_TICK_TIME=2000 confluentinc/cp-zookeeper:6.1.1
$ docker run -d --rm --name cp-kafka-dev --net=host -u $(id -u $USER):$(id -g $USER) -v $HOME/Downloads/kafka/data:/var/lib/kafka/data -v $HOME/Downloads/kafka/logs:/var/log/kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=localhost:10001 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:20001 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:6.1.1
To create a kafka topic called si_topic in the Apache Kafka broker, open a terminal window and execute the following command:
$ docker run --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-topics --create --topic si_topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:10001
The following is the properties file kafka.properties located in the resources/p5 directory:
# # Properties for kafka processing # bootstrap.servers=localhost:20001 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer consumer.topic=si_topic group.id=si_group auto.offset.reset=earliest enable.auto.commit=true
XML based Approach |
The following is the kafka message handler POJO that displays the messages received from the specific kafka topic in the Apache Kafka broker:
/* * Name: KafkaProcessHandler * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; public class KafkaProcessHandler { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessHandler.class); public void handler(Message<?> msg) { LOGGER.info("Kafka message payload (in Xml): {}", msg.getPayload().toString()); } }
The following is the XML based Spring Integration configuration that wires up the channel, the Apache Kafka message consumer and listener, and the endpoint:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:kafka="http://www.springframework.org/schema/integration/kafka" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> <context:property-placeholder location="classpath:p5/kafka.properties" /> <bean id="kafkaContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"> <constructor-arg> <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <map> <entry key="bootstrap.servers" value="${bootstrap.servers}" /> <entry key="key.deserializer" value="${key.deserializer}" /> <entry key="value.deserializer" value="${value.deserializer}" /> <entry key="group.id" value="${group.id}" /> <entry key="auto.offset.reset" value="${auto.offset.reset}" /> <entry key="enable.auto.commit" value="${enable.auto.commit}" /> </map> </constructor-arg> </bean> </constructor-arg> <constructor-arg> <bean class="org.springframework.kafka.listener.ContainerProperties"> <constructor-arg name="topics" value="${consumer.topic}" /> </bean> </constructor-arg> </bean> <int:channel id="inKafkaChannel" /> <kafka:message-driven-channel-adapter id="kafkaListener" listener-container="kafkaContainer" mode="record" channel="inKafkaChannel" /> <bean id="inKafkaHandler" class="com.polarsparc.si.p5.KafkaProcessHandler" /> <int:service-activator input-channel="inKafkaChannel" ref="inKafkaHandler" method="handler" /> </beans>
Some aspects of the Listing.68 from the above needs a little explanation.
The class org.springframework.kafka.core.DefaultKafkaConsumerFactory is the Spring Framework factory class that uses the specified configuration parameters for creating the appropriate kafka consumer instance
The class org.springframework.kafka.listener.KafkaMessageListenerContainer is the single-threaded Spring Framework kafka message listener class that consumes messages from a single kafka partition
The class org.springframework.kafka.listener.ContainerProperties is the Spring Framework container class for storing the kafka message listener runtime properties such as the name of kafka topics to consume message(s) from
The element kafka:message-driven-channel-adapter is responsible for consuming messages from the Apache Kafka broker using a consumer factory instance, which is specified using the attribute listener-container. The attribute mode with the value of record indicates that the message payload be treated as a single entity. The message from kafka is sent to the specified channel
The following is our main application to test the kafka channel adapter:
/* * Name: KafkaProcessMainXml * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.springframework.context.support.ClassPathXmlApplicationContext; public class KafkaProcessMainXml { public static void main(String[] args) { new ClassPathXmlApplicationContext("p5/KafkaProcess.xml"); } }
To execute the code from Listing.69, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.KafkaProcessMainXml"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 16:07:25:294 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-15 16:07:25:298 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-15 16:07:25:301 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-15 16:07:25:552 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-15 16:07:25:582 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inKafkaChannel' channel 2021-05-15 16:07:25:583 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@27ab83e2.inKafkaChannel' has 1 subscriber(s). 2021-05-15 16:07:25:584 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' 2021-05-15 16:07:25:584 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-15 16:07:25:585 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@27ab83e2.errorChannel' has 1 subscriber(s). 2021-05-15 16:07:25:586 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-15 16:07:25:600 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:20001] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-si_group-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = si_group group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2021-05-15 16:07:25:742 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.1 2021-05-15 16:07:25:743 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 6b2021cd52659cef 2021-05-15 16:07:25:743 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621109245740 2021-05-15 16:07:25:745 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-si_group-1, groupId=si_group] Subscribed to topic(s): si_topic 2021-05-15 16:07:25:746 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 2021-05-15 16:07:25:751 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter - started bean 'kafkaListener'; defined in: 'class path resource [p5/KafkaProcess.xml]'; from source: ''kafka:message-driven-channel-adapter' with id='kafkaListener'' 2021-05-15 16:07:25:945 [kafkaContainer-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-si_group-1, groupId=si_group] Cluster ID: DUwH0zVlRhK0-xbCh5qioQ 2021-05-15 16:07:25:946 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null) 2021-05-15 16:07:25:949 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group 2021-05-15 16:07:25:958 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group 2021-05-15 16:07:28:962 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully joined group with generation Generation{generationId=13, memberId='consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588', protocol='range'} 2021-05-15 16:07:28:965 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Finished assignment for group at generation 13: {consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588=Assignment(partitions=[si_topic-0])} 2021-05-15 16:07:28:971 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully synced group in generation Generation{generationId=13, memberId='consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588', protocol='range'} 2021-05-15 16:07:28:973 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Notifying assignor about the new Assignment(partitions=[si_topic-0]) 2021-05-15 16:07:28:975 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Adding newly assigned partitions: si_topic-0 2021-05-15 16:07:28:984 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Setting offset for partition si_topic-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:20001 (id: 1 rack: null)], epoch=0}} 2021-05-15 16:07:28:994 [kafkaContainer-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - si_group: partitions assigned: [si_topic-0]
To execute the kafka publisher using the docker image, open a terminal window and run the following command:
$ docker run -it --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic si_topic
The prompt will change to '>'. Enter each of the following 3 text messages and press ENTER before exiting by pressing CTRL-C:
>Kafka with Spring Integration - A
>Kafka with Spring Integration - B
>Kafka with Spring Integration - C
>CTRL-C
We will see the following update in the terminal of Output.31:
2021-05-15 16:09:12:210 [kafkaContainer-C-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-15 16:09:12:213 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - A 2021-05-15 16:09:17:230 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - B 2021-05-15 16:09:22:271 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - C --- CTRL-C ---
Java Config based Approach |
The following is the Java Config based POJO that displays the messages received from the specific kafka topic in the Apache Kafka broker:
/* * Name: KafkaProcessHandler2 * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; @Configuration @EnableIntegration public class KafkaProcessHandler2 { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessHandler2.class); @ServiceActivator(inputChannel = "inKafkaChannel") public void handler(Message<?> msg) { LOGGER.info("Kafka message payload (in Config): {}", msg.getPayload().toString()); } }
The following is the Java Config based POJO that refers to the external kafka.properties file and defines the channel, the Apache Kafka message consumer and listener, and the endpoint similar to the way defined in the XML configuration file of Listing.68 above:
/* * Name: KafkaProcessConfig * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.messaging.MessageChannel; import java.util.HashMap; import java.util.Map; @Configuration @EnableIntegration @PropertySource("classpath:p5/kafka.properties") public class KafkaProcessConfig { @Value("${bootstrap.servers}") private String bootstrapServer; @Value("${key.deserializer}") private String keyDeserializer; @Value("${value.deserializer}") private String valueDeserializer; @Value("${group.id}") private String groupId; @Value("${auto.offset.reset}") private String autoOffsetReset; @Value("${enable.auto.commit}") private String enableAutoCommit; @Bean public MessageChannel inKafkaChannel() { return new DirectChannel(); } @Bean public ConsumerFactory<String, String> kafkaConsumerFactory() { Map<String, Object> map = new HashMap<>(); map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer); map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer); map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); return new DefaultKafkaConsumerFactory<>(map); } @Bean public KafkaMessageListenerContainer<String, String> kafkaContainer() { ContainerProperties properties = new ContainerProperties("si_topic"); return new KafkaMessageListenerContainer<>(kafkaConsumerFactory(), properties); } @Bean public KafkaMessageDrivenChannelAdapter<String, String> kafkaListener() { KafkaMessageDrivenChannelAdapter<String, String> adapter = new KafkaMessageDrivenChannelAdapter<>(kafkaContainer(), KafkaMessageDrivenChannelAdapter.ListenerMode.record); adapter.setOutputChannel(inKafkaChannel()); return adapter; } }
And finally, the following is the main application that uses the POJOs from Listing.70 and Listing.71 to test the kafka channel adapter:
/* * Name: KafkaProcessMainConfig * Author: Bhaskar S * Date: 05/15/2021 * Blog: https://www.polarsparc.com */ package com.polarsparc.si.p5; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class KafkaProcessMainConfig { public static void main(String[] args) { new AnnotationConfigApplicationContext(KafkaProcessHandler2.class, KafkaProcessConfig.class); } }
To execute the code from Listing.72, open a terminal window and run the following commands:
$ cd $HOME/java/SpringIntegration
$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.KafkaProcessMainConfig"
The following would be the typical output:
[INFO] Scanning for projects... [INFO] [INFO] ----------------< com.polarsparc.si:SpringIntegration >----------------- [INFO] Building SpringIntegration 1.0 [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration --- 2021-05-15 16:11:29:781 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 2021-05-15 16:11:29:787 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 2021-05-15 16:11:29:791 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 2021-05-15 16:11:29:827 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-15 16:11:29:832 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 2021-05-15 16:11:30:093 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler' 2021-05-15 16:11:30:162 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 2021-05-15 16:11:30:163 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@52d54aa9.errorChannel' has 1 subscriber(s). 2021-05-15 16:11:30:164 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger' 2021-05-15 16:11:30:165 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:kafkaProcessHandler2.handler.serviceActivator} as a subscriber to the 'inKafkaChannel' channel 2021-05-15 16:11:30:166 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@52d54aa9.inKafkaChannel' has 1 subscriber(s). 2021-05-15 16:11:30:167 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'kafkaProcessHandler2.handler.serviceActivator' 2021-05-15 16:11:30:180 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: allow.auto.create.topics = true auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [localhost:20001] check.crcs = true client.dns.lookup = use_all_dns_ips client.id = consumer-si_group-1 client.rack = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = si_group group.instance.id = null heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true internal.throw.on.fetch.stable.offset.unsupported = false isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT security.providers = null send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.3] ssl.endpoint.identification.algorithm = https ssl.engine.factory.class = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLSv1.3 ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 2021-05-15 16:11:30:343 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.1 2021-05-15 16:11:30:344 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 6b2021cd52659cef 2021-05-15 16:11:30:344 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621109490341 2021-05-15 16:11:30:347 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-si_group-1, groupId=si_group] Subscribed to topic(s): si_topic 2021-05-15 16:11:30:348 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 2021-05-15 16:11:30:353 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter - started bean 'kafkaListener'; defined in: 'com.polarsparc.si.p5.KafkaProcessConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@2c228b2a' 2021-05-15 16:11:30:553 [kafkaContainer-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-si_group-1, groupId=si_group] Cluster ID: DUwH0zVlRhK0-xbCh5qioQ 2021-05-15 16:11:30:554 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null) 2021-05-15 16:11:30:557 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group 2021-05-15 16:11:30:566 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group 2021-05-15 16:11:33:569 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully joined group with generation Generation{generationId=15, memberId='consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49', protocol='range'} 2021-05-15 16:11:33:573 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Finished assignment for group at generation 15: {consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49=Assignment(partitions=[si_topic-0])} 2021-05-15 16:11:33:578 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully synced group in generation Generation{generationId=15, memberId='consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49', protocol='range'} 2021-05-15 16:11:33:580 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Notifying assignor about the new Assignment(partitions=[si_topic-0]) 2021-05-15 16:11:33:583 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Adding newly assigned partitions: si_topic-0 2021-05-15 16:11:33:592 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Setting offset for partition si_topic-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:20001 (id: 1 rack: null)], epoch=0}} 2021-05-15 16:11:33:602 [kafkaContainer-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - si_group: partitions assigned: [si_topic-0]
To execute the kafka publisher using the docker image, open a terminal window and run the following command:
$ docker run -it --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic si_topic
The prompt will change to a '>'. Enter each of the following 3 text messages and press ENTER before exiting by pressing CTRL-C:
>Kafka with Spring Integration - A
>Kafka with Spring Integration - B
>Kafka with Spring Integration - C
>CTRL-C
We will see the following update in the terminal of Output.33:
2021-05-15 16:13:25:261 [kafkaContainer-C-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one. 2021-05-15 16:13:25:263 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - A 2021-05-15 16:13:33:113 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - B 2021-05-15 16:13:39:001 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - C --- CTRL-C ---
As can be inferred from the Output.32 and Output.34 above, Spring Integration successfully processed the kafka messages from the Apache Kafka broker.
References
Spring Integration Notes :: Part - 4
Spring Integration Notes :: Part - 3
Spring Integration Notes :: Part - 2
Spring Integration Notes :: Part - 1