Quick Tour of RabbitMQ - Part 2
Bhaskar S | 11/18/2017 |
Overview
Continuing from Part-1 of this series, we will demonstrate the Fanout Exchange type in this part.
Hands-on with RabbitMQ using Java
A Fanout Exchange routes a copy of a message to all the Queues bound to it (ignoring the Routing key). In other words, it broadcats incoming messages to all the Queues bound to it. For example, when a credit-card processor processes an incoming transaction message, it publishes the transaction to both the fraud verifier and the analytics engine.
The following picture illustrates the high level view of the Fanout Exchange:
Now, to demonstrate the hypothetical credit-card processing system with a credit-card processor (publisher) and two consumers - a fraud verifier and an analytics engine, we will implement the corresponding CreditCardProcessor, FraudVerifier, and AnalyticsEngine classes.
The following is the common class that captures and exposes some constants:
The following is the code for the credit-card processor, which acts as a publisher of credit transaction messages:
Let us explain and understand some of the classes/methods used in the CreditCardProcessor code shown above.
The enum com.rabbitmq.client.BuiltinExchangeType defines the exchange types supported in RabbitMQ.
The method exchangeDeclare(String, BuiltinExchangeType) on the channel class instance declares a non-auto-deleteable and non-durable Exchange of the specified type.
The class com.rabbitmq.client.AMQP.BasicProperties.Builder is a builder class, an instance of which allows one to create instance of com.rabbitmq.client.AMQP.BasicProperties with the message properties, such as the message app id, content type, priority, etc.
The following is the code for the fraud verifier, which acts as a consumer of credit-card transaction messages:
Let us explain and understand some of the classes/methods used in the FraudVerifier code shown above.
The method queueBind(String, String, String) binds the specified Queue to the specified Exchange.
The method basicAck(long, boolean) allows one to explicitly acknowledge one or more messages. If the second parameter is true, it means acknowledge all the messages up to and including this message with the specified delivery tag. If false, means just acknowledge this message with the specified delivery tag. In some cases, we may want to acknowledge the receipt of message(s) only after successful processing. In such scenarios, we want to disable auto-acknowledgement by specifying false as the second parameter to the basicConsume method and explicitly calling basicAck after processing the message(s).
The following is the code for the analytics engine, which acts as another consumer of credit-card transaction messages:
Now, for the demostration of messaging using Fanout Exchange, open three terminal windows. Lets refer to them as the publisher, consumer-fv, and consumer-ae respectively.
In the consumer-fv terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.FraudVerifier 172.17.0.2 rabbitusr s3cr3t
The following should be the typical output:
Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main INFO: Ready to create communication channel for fraud Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main INFO: Ready to create a fanout exchange credit Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main INFO: Ready to bind the queue fraud to exchange credit Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main INFO: Ready to create a consumer for fraud Nov 18, 2017 8:01:45 PM com.polarsparc.rabbitmq.fanout.FraudVerifier main INFO: Ready to consume test credit-card messages for fraud
In the consumer-ae terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.AnalyticsEngine 172.17.0.2 rabbitusr s3cr3t
The following should be the typical output:
Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main INFO: Ready to create communication channel for analytics Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main INFO: Ready to create a fanout exchange credit Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main INFO: Ready to bind the queue analytics to exchange credit Nov 18, 2017 8:01:50 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine main INFO: Ready to create a consumer for analytics
In the publisher terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.fanout.CreditCardProcessor 172.17.0.2 rabbitusr s3cr3t
The following should be the typical output:
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main INFO: Ready to create a communication channel ... Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main INFO: Ready to create a fanout exchange credit Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main INFO: Ready to publish test credit transaction messages Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main INFO: Ready to close the communication channel Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.CreditCardProcessor main INFO: Payment processing done !!!
In the consumer-fv terminal, we should see the following messages pop-up:
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery INFO: Received credit-card message (properties): #contentHeader(content-type=text/json, content-encoding=null, headers=null, delivery-mode=null, priority=1, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=CreditCardProcessor, cluster-id=null) Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery INFO: Received credit-card message (body): { 'cc_no': '1234-5678-9012-3456' 'txn_id': '0000000001' 'txn_dt': '11/01/2017 10:25:34' 'txn_amt': '112.75' 'merchant_id': '123' } Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery INFO: Received credit-card message (properties): #contentHeader (content-type=text/json, content-encoding=null, headers=null, delivery-mode=null, priority=1, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=CreditCardProcessor, cluster-id=null) Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.FraudVerifier$1 handleDelivery INFO: Received credit-card message (body): { 'cc_no': '9876-5432-2109-8765' 'txn_id': '0000000002' 'txn_dt': '11/02/2017 16:44:21' 'txn_amt': '33.09' 'merchant_id': '456' }
In the consumer-ae terminal, we should see the following messages pop-up:
Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine$1 handleDelivery INFO: Received credit-card message (body): { 'cc_no': '1234-5678-9012-3456' 'txn_id': '0000000001' 'txn_dt': '11/01/2017 10:25:34' 'txn_amt': '112.75' 'merchant_id': '123' } Nov 18, 2017 8:01:56 PM com.polarsparc.rabbitmq.fanout.AnalyticsEngine$1 handleDelivery INFO: Received credit-card message (body): { 'cc_no': '9876-5432-2109-8765' 'txn_id': '0000000002' 'txn_dt': '11/02/2017 16:44:21' 'txn_amt': '33.09' 'merchant_id': '456' }
From the consumer-fv and consumer-ae terminals, we infer that both the FraudVerifier and the AnalyticsEngine consumers have received the two credit-card transaction messages that were published by the CreditCardProcessor.
The following picture shows the screenshot of the Exchange tab from the RabbitMQ web management console with our Exchange named credit highlighted:
We have successfully demonstrated RabbitMQ messaging using the Fanout Exchange !!!
References