PolarSPARC

Spring Integration Notes :: Part - 5


Bhaskar S 05/15/2021 (UPDATED)


Overview

In Part-4, we covered basic examples of Spring Integration relating to the JDBC and HTTP channel adapters.

We will continue our journey on channel adapters by exploring examples in Spring Integration relating to the following:

Setup

To setup the Java directory structure for the demonstrations in this part, execute the following commands:

$ cd $HOME/java/SpringIntegration

$ mkdir -p src/main/java/com/polarsparc/si/p5

$ mkdir -p src/main/resources/p5

To setup the directory structure(s) for RabbitMQ and Apache Kafka, execute the following commands:

$ mkdir -p $HOME/Downloads/rabbitmq

$ mkdir -p $HOME/Downloads/zookeeper/data $HOME/Downloads/zookeeper/logs

$ mkdir -p $HOME/Downloads/kafka/data $HOME/Downloads/kafka/logs

To download the required docker images for the RabbitMQ, Apache Zookeeper, and Apache Kafka, execute the following commands:

$ docker pull rabbitmq:3.8.16-management

$ docker pull confluentinc/cp-zookeeper:6.1.1

$ docker pull confluentinc/cp-kafka:6.1.1

The following is the listing for the updated Maven project file pom.xml to support the AMQP and Apache Kafka channel adapters:


pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.polarsparc.si</groupId>
    <artifactId>SpringIntegration</artifactId>
    <version>1.0</version>
    <name>SpringIntegration</name>
    <description>Spring Integration Examples</description>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <fork>true</fork>
                        <meminitial>128m</meminitial>
                        <maxmem>512m</maxmem>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>javax.annotation</groupId>
            <artifactId>javax.annotation-api</artifactId>
            <version>1.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>5.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-file</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-amqp</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-kafka</artifactId>
            <version>5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.3.6</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>log4j-over-slf4j</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.19</version>
        </dependency>
    </dependencies>

</project>

Hands-on Spring Integration

Receiving Message(s) from AMQP (using RabbitMQ)

RabbitMQ is an open source, language neutral message-oriented middleware that implements the AMQP messaging standard. One can refer to the 3-part tutorial series on RabbitMQ to quickly get started - Part - 1, Part - 2, and Part - 3.

To start the RabbitMQ server on the localhost using docker, open a terminal window and execute the following command:

$ docker run -d --rm --hostname rabbitmq-dev --name rabbitmq-dev -v $HOME/Downloads/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=polarsparc -e RABBITMQ_DEFAULT_PASS=polarsparc\$123 rabbitmq:3.8.16-management

The following is the properties file amqp.properties located in the resources/p5 directory:


Listing.57
#
# Properties for amqp processing
#

amqp.host=localhost
amqp.port=5672
amqp.username=polarsparc
amqp.password=polarsparc$123

XML based Approach

The following is the XML based Spring Framework application context defines the various RabbitMQ connection related resources:


Listing.58
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/rabbit
                            http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:p5/amqp.properties" />

    <rabbit:connection-factory id="connectionFactory"
                                host="${amqp.host}"
                                port="${amqp.port}"
                                username="${amqp.username}"
                                password="${amqp.password}" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="si_queue" />

    <rabbit:direct-exchange name="si_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="si_queue"
                            key="si_queue" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="amqpTemplate"
                      connection-factory="connectionFactory" />

</beans>

Some aspects of the Listing.58 from the above needs a little explanation.

The following is the main application to publish amqp messages to the amqp queue on the RabbitMQ broker:


Listing.59
/*
 * Name:   AmqpPublisher
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.*;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class AmqpPublisher {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPublisher.class);

    private static final String MESSAGE_TEXT = "AMQP with Spring Integration - %d";

    public static void main(String[] args) {
        ApplicationContext context = new ClassPathXmlApplicationContext("p5/AmqpPublisher.xml");

        AmqpTemplate template = (AmqpTemplate) context.getBean("amqpTemplate");

        for (int i = 1; i <= 10; i++) {
            String text = String.format(MESSAGE_TEXT, i);
            try {
                template.convertAndSend("si_exchange", "si_queue", text);

                LOGGER.info("Published message - {}", text);
            }
            catch (AmqpException ex) {
                LOGGER.error(ex.getLocalizedMessage());
            }
        }
    }
}

The following is the amqp message handler POJO that displays the messages received from the specific amqp queue in the RabbitMQ broker:


Listing.60
/*
 * Name:   AmqpProcessHandler
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class AmqpProcessHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpProcessHandler.class);

    public void handler(Message<?> msg) {
        LOGGER.info("AMQP message payload (in Xml): {}", msg.getPayload().toString());
    }
}

The following is the XML based Spring Integration configuration that wires up the channel, the various RabbitMQ connection related resources, and the endpoint:


Listing.61
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
        xmlns:rabbit="http://www.springframework.org/schema/rabbit"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
                            http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context
                            http://www.springframework.org/schema/context/spring-context.xsd
                            http://www.springframework.org/schema/integration
                            http://www.springframework.org/schema/integration/spring-integration.xsd
                            http://www.springframework.org/schema/integration/amqp
                            http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
                            http://www.springframework.org/schema/rabbit
                            http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <context:property-placeholder location="classpath:p5/amqp.properties" />

    <rabbit:connection-factory id="connectionFactory"
                                host="${amqp.host}"
                                port="${amqp.port}"
                                username="${amqp.username}"
                                password="${amqp.password}" />

    <rabbit:admin connection-factory="connectionFactory" />

    <rabbit:queue name="si_queue" />

    <rabbit:direct-exchange name="si_exchange">
        <rabbit:bindings>
            <rabbit:binding queue="si_queue"
                            key="si_queue" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <rabbit:template id="amqpTemplate"
                      connection-factory="connectionFactory" />

    <int:channel id="amqpMsgsChannel" />

    <amqp:inbound-channel-adapter id="inChannel"
                                  queue-names="si_queue"
                                  channel="amqpMsgsChannel"
                                  connection-factory="connectionFactory" />

    <bean id="inAmqpHandler" class="com.polarsparc.si.p5.AmqpProcessHandler" />

    <int:service-activator input-channel="amqpMsgsChannel"
                            ref="inAmqpHandler"
                            method="handler" />

</beans>

Some aspects of the Listing.61 from the above needs a little explanation.

The following is our main application to test the amqp channel adapter:


Listing.62
/*
 * Name:   AmqpProcessMainXml
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class AmqpProcessMainXml {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("p5/AmqpProcess.xml");
    }
}

To execute the code from Listing.62, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpProcessMainXml"

The following would be the typical output:

Output.25

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 08:37:53:282 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-15 08:37:53:286 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-15 08:37:53:290 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-15 08:37:53:590 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-15 08:37:53:621 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'amqpMsgsChannel' channel
2021-05-15 08:37:53:622 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@e98a152.amqpMsgsChannel' has 1 subscriber(s).
2021-05-15 08:37:53:623 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-15 08:37:53:624 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-15 08:37:53:625 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@e98a152.errorChannel' has 1 subscriber(s).
2021-05-15 08:37:53:626 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-15 08:37:53:627 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
2021-05-15 08:37:53:683 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#3059cdde:0/SimpleConnection@689c9ac [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35422]
2021-05-15 08:37:53:729 [com.polarsparc.si.p5.AmqpProcessMainXml.main()] INFO org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter - started bean 'inChannel'; defined in: 'class path resource [p5/AmqpProcess.xml]'; from source: ''amqp:inbound-channel-adapter' with id='inChannel''

To execute the amqp publisher code from Listing.59, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpPublisher"

The following would be the typical output:

Output.26

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 08:39:10:414 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
2021-05-15 08:39:10:470 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#3600308e:0/SimpleConnection@682ccde5 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35428]
2021-05-15 08:39:10:504 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 1
2021-05-15 08:39:10:505 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 2
2021-05-15 08:39:10:506 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 3
2021-05-15 08:39:10:507 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 4
2021-05-15 08:39:10:507 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 5
2021-05-15 08:39:10:510 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 6
2021-05-15 08:39:10:511 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 7
2021-05-15 08:39:10:512 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 8
2021-05-15 08:39:10:514 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 9
2021-05-15 08:39:10:514 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 10
--- CTRL-C ---

We will see the following update in the terminal of Output.25:

Output.27

2021-05-15 08:39:10:524 [(inner bean)#3d4053d9-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-15 08:39:10:528 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 1
2021-05-15 08:39:10:529 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 2
2021-05-15 08:39:10:529 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 3
2021-05-15 08:39:10:530 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 4
2021-05-15 08:39:10:531 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 5
2021-05-15 08:39:10:532 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 6
2021-05-15 08:39:10:532 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 7
2021-05-15 08:39:10:533 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 8
2021-05-15 08:39:10:534 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 9
2021-05-15 08:39:10:535 [(inner bean)#3d4053d9-1] INFO com.polarsparc.si.p5.AmqpProcessHandler - AMQP message payload (in Xml): AMQP with Spring Integration - 10
--- CTRL-C ---
Java Config based Approach

The following is the Java Config based POJO that defines the amqp message handler that displays the messages received from the RabbitMQ broker:


Listing.63
/*
 * Name:   AmqpProcessHandler2
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;

@Configuration
@EnableIntegration
public class AmqpProcessHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpProcessHandler2.class);

    @ServiceActivator(inputChannel = "amqpMsgsChannel")
    public void handler(Message<?> msg) {
        LOGGER.info("AMQP message payload (in Config): {}", msg.getPayload().toString());
    }
}

The following is the Java Config based POJO that refers to the external amqp.properties file and defines the channel, the various RabbitMQ connection related resources, and the endpoint similar to the way defined in the XML configuration file of Listing.61 above:


Listing.64
/*
* Name:   AmqpProcessConfig
* Author: Bhaskar S
* Date:   05/15/2021
* Blog:   https://www.polarsparc.com
*/

package com.polarsparc.si.p5;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.MessageChannel;

@Configuration
@EnableIntegration
@PropertySource("classpath:p5/amqp.properties")
public class AmqpProcessConfig {
    @Value("${amqp.host}")
    private String amqpHost;

    @Value("${amqp.port}")
    private int amqpPort;

    @Value("${amqp.username}")
    private String amqpUsername;

    @Value("${amqp.password}")
    private String amqpPassword;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
        factory.setHost(amqpHost);
        factory.setPort(amqpPort);
        factory.setUsername(amqpUsername);
        factory.setPassword(amqpPassword);
        return factory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    DirectExchange amqpDirectExchange() {
        return new DirectExchange("si_exchange");
    }

    @Bean
    Binding amqpBinding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("si_queue");
    }

    @Bean
    public SimpleMessageListenerContainer amqpMessageListenerContainer(ConnectionFactory factory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(factory);
        container.setQueueNames("si_queue");
        return container;
    }

    @Bean
    public AmqpInboundChannelAdapter amqpInboundChannelAdapter(SimpleMessageListenerContainer container,
                                                              @Qualifier("amqpMsgsChannel") MessageChannel channel) {
        AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(container);
        adapter.setOutputChannel(channel);
        return adapter;
    }

    @Bean
    public Queue amqpQueue() {
        return new Queue("si_queue");
    }

    @Bean
    public RabbitTemplate amqpTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public MessageChannel amqpMsgsChannel() {
        return new DirectChannel();
    }
}

Some aspects of the Listing.64 from the above needs a little explanation.

And finally, the following is the main application that uses the POJOs from Listing.63 and Listing.64 to test the amqp channel adapter:


Listing.65
/*
 * Name:   AmqpProcessMainConfig
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class AmqpProcessMainConfig {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(AmqpProcessHandler2.class, AmqpProcessConfig.class);
    }
}

To execute the code from Listing.65, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpProcessMainConfig"

The following would be the typical output:

Output.28

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 08:44:50:733 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-15 08:44:50:740 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-15 08:44:50:744 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-15 08:44:50:780 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-15 08:44:50:786 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-15 08:44:51:089 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-15 08:44:51:164 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-15 08:44:51:165 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23df8bde.errorChannel' has 1 subscriber(s).
2021-05-15 08:44:51:165 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-15 08:44:51:166 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:amqpProcessHandler2.handler.serviceActivator} as a subscriber to the 'amqpMsgsChannel' channel
2021-05-15 08:44:51:167 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@23df8bde.amqpMsgsChannel' has 1 subscriber(s).
2021-05-15 08:44:51:167 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'amqpProcessHandler2.handler.serviceActivator'
2021-05-15 08:44:51:168 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
2021-05-15 08:44:51:223 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#67fa9137:0/SimpleConnection@33593621 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35566]
2021-05-15 08:44:51:264 [com.polarsparc.si.p5.AmqpProcessMainConfig.main()] INFO org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter - started bean 'amqpInboundChannelAdapter'; defined in: 'com.polarsparc.si.p5.AmqpProcessConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@7c5de007'

To execute the amqp publisher code from Listing.59, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.AmqpPublisher"

The following would be the typical output:

Output.29

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 08:45:41:123 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Attempting to connect to: localhost:5672
2021-05-15 08:45:41:168 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO org.springframework.amqp.rabbit.connection.CachingConnectionFactory - Created new connection: connectionFactory#32774e22:0/SimpleConnection@65c5fc89 [delegate=amqp://polarsparc@127.0.0.1:5672/, localPort= 35570]
2021-05-15 08:45:41:202 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 1
2021-05-15 08:45:41:203 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 2
2021-05-15 08:45:41:204 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 3
2021-05-15 08:45:41:205 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 4
2021-05-15 08:45:41:206 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 5
2021-05-15 08:45:41:207 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 6
2021-05-15 08:45:41:208 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 7
2021-05-15 08:45:41:209 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 8
2021-05-15 08:45:41:209 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 9
2021-05-15 08:45:41:210 [com.polarsparc.si.p5.AmqpPublisher.main()] INFO com.polarsparc.si.p5.AmqpPublisher - Published message - AMQP with Spring Integration - 10
--- CTRL-C ---

We will see the following update in the terminal of Output.28:

Output.30

2021-05-15 08:45:41:218 [amqpMessageListenerContainer-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-15 08:45:41:221 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 1
2021-05-15 08:45:41:222 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 2
2021-05-15 08:45:41:223 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 3
2021-05-15 08:45:41:224 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 4
2021-05-15 08:45:41:225 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 5
2021-05-15 08:45:41:225 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 6
2021-05-15 08:45:41:226 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 7
2021-05-15 08:45:41:227 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 8
2021-05-15 08:45:41:228 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 9
2021-05-15 08:45:41:228 [amqpMessageListenerContainer-1] INFO com.polarsparc.si.p5.AmqpProcessHandler2 - AMQP message payload (in Config): AMQP with Spring Integration - 10
--- CTRL-C ---

As can be inferred from the Output.27 and Output.30 above, Spring Integration successfully processed the amqp messages from the RabbitMQ broker.

Receiving Message(s) from Apache Kafka

Apache Kafka is an open source, low-latency, high-throughput, fault-tolerant, distributed, persistent, durable, data streaming platform. One can refer to the 2-part tutorial series on Apache Kafka to quickly get started - Part - 1 and Part - 2.

To start the Apache Kafka broker on the localhost using docker, open a terminal window and execute the following commands (in that order):

$ docker run -d --rm --name cp-zk-dev --net=host -u $(id -u $USER):$(id -g $USER) -v $HOME/Downloads/zk/data:/var/lib/zookeeper/data -v $HOME/Downloads/zk/logs:/var/log/kafka -e ZOOKEEPER_CLIENT_PORT=10001 -e ZOOKEEPER_TICK_TIME=2000 confluentinc/cp-zookeeper:6.1.1

$ docker run -d --rm --name cp-kafka-dev --net=host -u $(id -u $USER):$(id -g $USER) -v $HOME/Downloads/kafka/data:/var/lib/kafka/data -v $HOME/Downloads/kafka/logs:/var/log/kafka -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=localhost:10001 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:20001 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:6.1.1

To create a kafka topic called si_topic in the Apache Kafka broker, open a terminal window and execute the following command:

$ docker run --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-topics --create --topic si_topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper localhost:10001

The following is the properties file kafka.properties located in the resources/p5 directory:


Listing.66
#
# Properties for kafka processing
#

bootstrap.servers=localhost:20001
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
consumer.topic=si_topic
group.id=si_group
auto.offset.reset=earliest
enable.auto.commit=true

XML based Approach

The following is the kafka message handler POJO that displays the messages received from the specific kafka topic in the Apache Kafka broker:


Listing.67
/*
 * Name:   KafkaProcessHandler
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

public class KafkaProcessHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessHandler.class);

    public void handler(Message<?> msg) {
        LOGGER.info("Kafka message payload (in Xml): {}", msg.getPayload().toString());
    }
}

The following is the XML based Spring Integration configuration that wires up the channel, the Apache Kafka message consumer and listener, and the endpoint:


Listing.68
<?xml version="1.0" encoding="UTF-8"?>

  <beans xmlns="http://www.springframework.org/schema/beans"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns:context="http://www.springframework.org/schema/context"
         xmlns:int="http://www.springframework.org/schema/integration"
         xmlns:kafka="http://www.springframework.org/schema/integration/kafka"
         xsi:schemaLocation="http://www.springframework.org/schema/beans
                             http://www.springframework.org/schema/beans/spring-beans.xsd
                             http://www.springframework.org/schema/context
                             http://www.springframework.org/schema/context/spring-context.xsd
                             http://www.springframework.org/schema/integration
                             http://www.springframework.org/schema/integration/spring-integration.xsd
                             http://www.springframework.org/schema/integration/kafka
                             http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">
  
      <context:property-placeholder location="classpath:p5/kafka.properties" />
  
      <bean id="kafkaContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
          <constructor-arg>
              <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                  <constructor-arg>
                      <map>
                          <entry key="bootstrap.servers" value="${bootstrap.servers}" />
                          <entry key="key.deserializer" value="${key.deserializer}" />
                          <entry key="value.deserializer" value="${value.deserializer}" />
                          <entry key="group.id" value="${group.id}" />
                          <entry key="auto.offset.reset" value="${auto.offset.reset}" />
                          <entry key="enable.auto.commit" value="${enable.auto.commit}" />
                      </map>
                  </constructor-arg>
              </bean>
          </constructor-arg>
          <constructor-arg>
              <bean class="org.springframework.kafka.listener.ContainerProperties">
                  <constructor-arg name="topics" value="${consumer.topic}" />
              </bean>
          </constructor-arg>
      </bean>
  
      <int:channel id="inKafkaChannel" />
  
      <kafka:message-driven-channel-adapter id="kafkaListener"
                                            listener-container="kafkaContainer"
                                            mode="record"
                                            channel="inKafkaChannel" />
  
      <bean id="inKafkaHandler" class="com.polarsparc.si.p5.KafkaProcessHandler" />
  
      <int:service-activator input-channel="inKafkaChannel"
                             ref="inKafkaHandler"
                             method="handler" />
  
  </beans>

Some aspects of the Listing.68 from the above needs a little explanation.

The following is our main application to test the kafka channel adapter:


Listing.69
/*
 * Name:   KafkaProcessMainXml
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class KafkaProcessMainXml {
    public static void main(String[] args) {
        new ClassPathXmlApplicationContext("p5/KafkaProcess.xml");
    }
}

To execute the code from Listing.69, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.KafkaProcessMainXml"

The following would be the typical output:

Output.31

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 16:07:25:294 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-15 16:07:25:298 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-15 16:07:25:301 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-15 16:07:25:552 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-15 16:07:25:582 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator} as a subscriber to the 'inKafkaChannel' channel
2021-05-15 16:07:25:583 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@27ab83e2.inKafkaChannel' has 1 subscriber(s).
2021-05-15 16:07:25:584 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2021-05-15 16:07:25:584 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-15 16:07:25:585 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.support.ClassPathXmlApplicationContext@27ab83e2.errorChannel' has 1 subscriber(s).
2021-05-15 16:07:25:586 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-15 16:07:25:600 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
  allow.auto.create.topics = true
  auto.commit.interval.ms = 5000
  auto.offset.reset = earliest
  bootstrap.servers = [localhost:20001]
  check.crcs = true
  client.dns.lookup = use_all_dns_ips
  client.id = consumer-si_group-1
  client.rack = 
  connections.max.idle.ms = 540000
  default.api.timeout.ms = 60000
  enable.auto.commit = true
  exclude.internal.topics = true
  fetch.max.bytes = 52428800
  fetch.max.wait.ms = 500
  fetch.min.bytes = 1
  group.id = si_group
  group.instance.id = null
  heartbeat.interval.ms = 3000
  interceptor.classes = []
  internal.leave.group.on.close = true
  internal.throw.on.fetch.stable.offset.unsupported = false
  isolation.level = read_uncommitted
  key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  max.partition.fetch.bytes = 1048576
  max.poll.interval.ms = 300000
  max.poll.records = 500
  metadata.max.age.ms = 300000
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  receive.buffer.bytes = 65536
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  request.timeout.ms = 30000
  retry.backoff.ms = 100
  sasl.client.callback.handler.class = null
  sasl.jaas.config = null
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 60000
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.login.callback.handler.class = null
  sasl.login.class = null
  sasl.login.refresh.buffer.seconds = 300
  sasl.login.refresh.min.period.seconds = 60
  sasl.login.refresh.window.factor = 0.8
  sasl.login.refresh.window.jitter = 0.05
  sasl.mechanism = GSSAPI
  security.protocol = PLAINTEXT
  security.providers = null
  send.buffer.bytes = 131072
  session.timeout.ms = 10000
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
  ssl.endpoint.identification.algorithm = https
  ssl.engine.factory.class = null
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLSv1.3
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2021-05-15 16:07:25:742 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.1
2021-05-15 16:07:25:743 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 6b2021cd52659cef
2021-05-15 16:07:25:743 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621109245740
2021-05-15 16:07:25:745 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-si_group-1, groupId=si_group] Subscribed to topic(s): si_topic
2021-05-15 16:07:25:746 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService
2021-05-15 16:07:25:751 [com.polarsparc.si.p5.KafkaProcessMainXml.main()] INFO org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter - started bean 'kafkaListener'; defined in: 'class path resource [p5/KafkaProcess.xml]'; from source: ''kafka:message-driven-channel-adapter' with id='kafkaListener''
2021-05-15 16:07:25:945 [kafkaContainer-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-si_group-1, groupId=si_group] Cluster ID: DUwH0zVlRhK0-xbCh5qioQ
2021-05-15 16:07:25:946 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null)
2021-05-15 16:07:25:949 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group
2021-05-15 16:07:25:958 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group
2021-05-15 16:07:28:962 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully joined group with generation Generation{generationId=13, memberId='consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588', protocol='range'}
2021-05-15 16:07:28:965 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Finished assignment for group at generation 13: {consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588=Assignment(partitions=[si_topic-0])}
2021-05-15 16:07:28:971 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully synced group in generation Generation{generationId=13, memberId='consumer-si_group-1-fc33be3d-bce6-47a6-8da6-1d34833df588', protocol='range'}
2021-05-15 16:07:28:973 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Notifying assignor about the new Assignment(partitions=[si_topic-0])
2021-05-15 16:07:28:975 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Adding newly assigned partitions: si_topic-0
2021-05-15 16:07:28:984 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Setting offset for partition si_topic-0 to the committed offset FetchPosition{offset=2, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:20001 (id: 1 rack: null)], epoch=0}}
2021-05-15 16:07:28:994 [kafkaContainer-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - si_group: partitions assigned: [si_topic-0]

To execute the kafka publisher using the docker image, open a terminal window and run the following command:

$ docker run -it --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic si_topic

The prompt will change to '>'. Enter each of the following 3 text messages and press ENTER before exiting by pressing CTRL-C:

>Kafka with Spring Integration - A

>Kafka with Spring Integration - B

>Kafka with Spring Integration - C

>CTRL-C

We will see the following update in the terminal of Output.31:

Output.32

2021-05-15 16:09:12:210 [kafkaContainer-C-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
2021-05-15 16:09:12:213 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - A
2021-05-15 16:09:17:230 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - B
2021-05-15 16:09:22:271 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler - Kafka message payload (in Xml): Kafka with Spring Integration - C
--- CTRL-C ---
Java Config based Approach

The following is the Java Config based POJO that displays the messages received from the specific kafka topic in the Apache Kafka broker:


Listing.70
/*
 * Name:   KafkaProcessHandler2
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;

@Configuration
@EnableIntegration
public class KafkaProcessHandler2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProcessHandler2.class);

    @ServiceActivator(inputChannel = "inKafkaChannel")
    public void handler(Message<?> msg) {
        LOGGER.info("Kafka message payload (in Config): {}", msg.getPayload().toString());
    }
}

The following is the Java Config based POJO that refers to the external kafka.properties file and defines the channel, the Apache Kafka message consumer and listener, and the endpoint similar to the way defined in the XML configuration file of Listing.68 above:


Listing.71
/*
 * Name:   KafkaProcessConfig
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.messaging.MessageChannel;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableIntegration
@PropertySource("classpath:p5/kafka.properties")
public class KafkaProcessConfig {
    @Value("${bootstrap.servers}")
    private String bootstrapServer;

    @Value("${key.deserializer}")
    private String keyDeserializer;

    @Value("${value.deserializer}")
    private String valueDeserializer;

    @Value("${group.id}")
    private String groupId;

    @Value("${auto.offset.reset}")
    private String autoOffsetReset;

    @Value("${enable.auto.commit}")
    private String enableAutoCommit;

    @Bean
    public MessageChannel inKafkaChannel() {
        return new DirectChannel();
    }

    @Bean
    public ConsumerFactory<String, String> kafkaConsumerFactory() {
        Map<String, Object> map = new HashMap<>();
        map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
        map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
        map.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        return new DefaultKafkaConsumerFactory<>(map);
    }

    @Bean
    public KafkaMessageListenerContainer<String, String> kafkaContainer() {
        ContainerProperties properties = new ContainerProperties("si_topic");
        return new KafkaMessageListenerContainer<>(kafkaConsumerFactory(), properties);
    }

    @Bean
    public KafkaMessageDrivenChannelAdapter<String, String> kafkaListener() {
        KafkaMessageDrivenChannelAdapter<String, String> adapter =
                new KafkaMessageDrivenChannelAdapter<>(kafkaContainer(),
                        KafkaMessageDrivenChannelAdapter.ListenerMode.record);
        adapter.setOutputChannel(inKafkaChannel());
        return adapter;
    }
}

And finally, the following is the main application that uses the POJOs from Listing.70 and Listing.71 to test the kafka channel adapter:


Listing.72
/*
 * Name:   KafkaProcessMainConfig
 * Author: Bhaskar S
 * Date:   05/15/2021
 * Blog:   https://www.polarsparc.com
 */

package com.polarsparc.si.p5;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class KafkaProcessMainConfig {
    public static void main(String[] args) {
        new AnnotationConfigApplicationContext(KafkaProcessHandler2.class, KafkaProcessConfig.class);
    }
}

To execute the code from Listing.72, open a terminal window and run the following commands:

$ cd $HOME/java/SpringIntegration

$ mvn exec:java -Dexec.mainClass="com.polarsparc.si.p5.KafkaProcessMainConfig"

The following would be the typical output:

Output.33

[INFO] Scanning for projects...
[INFO] 
[INFO] ----------------< com.polarsparc.si:SpringIntegration >-----------------
[INFO] Building SpringIntegration 1.0
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.0.0:java (default-cli) @ SpringIntegration ---
2021-05-15 16:11:29:781 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2021-05-15 16:11:29:787 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2021-05-15 16:11:29:791 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.config.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2021-05-15 16:11:29:827 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationChannelResolver' of type [org.springframework.integration.support.channel.BeanFactoryChannelResolver] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-15 16:11:29:832 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.context.support.PostProcessorRegistrationDelegate$BeanPostProcessorChecker - Bean 'integrationDisposableAutoCreatedBeans' of type [org.springframework.integration.config.annotation.Disposables] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2021-05-15 16:11:30:093 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService 'taskScheduler'
2021-05-15 16:11:30:162 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2021-05-15 16:11:30:163 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.channel.PublishSubscribeChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@52d54aa9.errorChannel' has 1 subscriber(s).
2021-05-15 16:11:30:164 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean '_org.springframework.integration.errorLogger'
2021-05-15 16:11:30:165 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - Adding {service-activator:kafkaProcessHandler2.handler.serviceActivator} as a subscriber to the 'inKafkaChannel' channel
2021-05-15 16:11:30:166 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.channel.DirectChannel - Channel 'org.springframework.context.annotation.AnnotationConfigApplicationContext@52d54aa9.inKafkaChannel' has 1 subscriber(s).
2021-05-15 16:11:30:167 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.endpoint.EventDrivenConsumer - started bean 'kafkaProcessHandler2.handler.serviceActivator'
2021-05-15 16:11:30:180 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
  allow.auto.create.topics = true
  auto.commit.interval.ms = 5000
  auto.offset.reset = earliest
  bootstrap.servers = [localhost:20001]
  check.crcs = true
  client.dns.lookup = use_all_dns_ips
  client.id = consumer-si_group-1
  client.rack = 
  connections.max.idle.ms = 540000
  default.api.timeout.ms = 60000
  enable.auto.commit = true
  exclude.internal.topics = true
  fetch.max.bytes = 52428800
  fetch.max.wait.ms = 500
  fetch.min.bytes = 1
  group.id = si_group
  group.instance.id = null
  heartbeat.interval.ms = 3000
  interceptor.classes = []
  internal.leave.group.on.close = true
  internal.throw.on.fetch.stable.offset.unsupported = false
  isolation.level = read_uncommitted
  key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
  max.partition.fetch.bytes = 1048576
  max.poll.interval.ms = 300000
  max.poll.records = 500
  metadata.max.age.ms = 300000
  metric.reporters = []
  metrics.num.samples = 2
  metrics.recording.level = INFO
  metrics.sample.window.ms = 30000
  partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
  receive.buffer.bytes = 65536
  reconnect.backoff.max.ms = 1000
  reconnect.backoff.ms = 50
  request.timeout.ms = 30000
  retry.backoff.ms = 100
  sasl.client.callback.handler.class = null
  sasl.jaas.config = null
  sasl.kerberos.kinit.cmd = /usr/bin/kinit
  sasl.kerberos.min.time.before.relogin = 60000
  sasl.kerberos.service.name = null
  sasl.kerberos.ticket.renew.jitter = 0.05
  sasl.kerberos.ticket.renew.window.factor = 0.8
  sasl.login.callback.handler.class = null
  sasl.login.class = null
  sasl.login.refresh.buffer.seconds = 300
  sasl.login.refresh.min.period.seconds = 60
  sasl.login.refresh.window.factor = 0.8
  sasl.login.refresh.window.jitter = 0.05
  sasl.mechanism = GSSAPI
  security.protocol = PLAINTEXT
  security.providers = null
  send.buffer.bytes = 131072
  session.timeout.ms = 10000
  ssl.cipher.suites = null
  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
  ssl.endpoint.identification.algorithm = https
  ssl.engine.factory.class = null
  ssl.key.password = null
  ssl.keymanager.algorithm = SunX509
  ssl.keystore.location = null
  ssl.keystore.password = null
  ssl.keystore.type = JKS
  ssl.protocol = TLSv1.3
  ssl.provider = null
  ssl.secure.random.implementation = null
  ssl.trustmanager.algorithm = PKIX
  ssl.truststore.location = null
  ssl.truststore.password = null
  ssl.truststore.type = JKS
  value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2021-05-15 16:11:30:343 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.6.1
2021-05-15 16:11:30:344 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 6b2021cd52659cef
2021-05-15 16:11:30:344 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1621109490341
2021-05-15 16:11:30:347 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-si_group-1, groupId=si_group] Subscribed to topic(s): si_topic
2021-05-15 16:11:30:348 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler - Initializing ExecutorService
2021-05-15 16:11:30:353 [com.polarsparc.si.p5.KafkaProcessMainConfig.main()] INFO org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter - started bean 'kafkaListener'; defined in: 'com.polarsparc.si.p5.KafkaProcessConfig'; from source: 'org.springframework.core.type.StandardMethodMetadata@2c228b2a'
2021-05-15 16:11:30:553 [kafkaContainer-C-1] INFO org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-si_group-1, groupId=si_group] Cluster ID: DUwH0zVlRhK0-xbCh5qioQ
2021-05-15 16:11:30:554 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Discovered group coordinator localhost:20001 (id: 2147483646 rack: null)
2021-05-15 16:11:30:557 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group
2021-05-15 16:11:30:566 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] (Re-)joining group
2021-05-15 16:11:33:569 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully joined group with generation Generation{generationId=15, memberId='consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49', protocol='range'}
2021-05-15 16:11:33:573 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Finished assignment for group at generation 15: {consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49=Assignment(partitions=[si_topic-0])}
2021-05-15 16:11:33:578 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Successfully synced group in generation Generation{generationId=15, memberId='consumer-si_group-1-c91ae21c-11b9-4160-9a4c-206adebabd49', protocol='range'}
2021-05-15 16:11:33:580 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Notifying assignor about the new Assignment(partitions=[si_topic-0])
2021-05-15 16:11:33:583 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Adding newly assigned partitions: si_topic-0
2021-05-15 16:11:33:592 [kafkaContainer-C-1] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-si_group-1, groupId=si_group] Setting offset for partition si_topic-0 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:20001 (id: 1 rack: null)], epoch=0}}
2021-05-15 16:11:33:602 [kafkaContainer-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer - si_group: partitions assigned: [si_topic-0]

To execute the kafka publisher using the docker image, open a terminal window and run the following command:

$ docker run -it --rm --net=host confluentinc/cp-kafka:6.1.1 kafka-console-producer --request-required-acks 1 --broker-list localhost:20001 --topic si_topic

The prompt will change to a '>'. Enter each of the following 3 text messages and press ENTER before exiting by pressing CTRL-C:

>Kafka with Spring Integration - A

>Kafka with Spring Integration - B

>Kafka with Spring Integration - C

>CTRL-C

We will see the following update in the terminal of Output.33:

Output.34

2021-05-15 16:13:25:261 [kafkaContainer-C-1] INFO org.springframework.integration.handler.support.MessagingMethodInvokerHelper - Overriding default instance of MessageHandlerMethodFactory with provided one.
        2021-05-15 16:13:25:263 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - A
        2021-05-15 16:13:33:113 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - B
        2021-05-15 16:13:39:001 [kafkaContainer-C-1] INFO com.polarsparc.si.p5.KafkaProcessHandler2 - Kafka message payload (in Config): Kafka with Spring Integration - C
--- CTRL-C ---

As can be inferred from the Output.32 and Output.34 above, Spring Integration successfully processed the kafka messages from the Apache Kafka broker.

References

Spring Integration Notes :: Part - 4

Spring Integration Notes :: Part - 3

Spring Integration Notes :: Part - 2

Spring Integration Notes :: Part - 1

Spring Integration - AMQP Support

Spring Integration - Apache Kafka Support



© PolarSPARC