Quick Tour of RabbitMQ - Part 1
Bhaskar S | 11/12/2017 |
Overview
RabbitMQ is an open source, language neutral Message-oriented Middleware that implements the open standard Advanced Message Queuing Protocol (AMQP) specification. AMQP is a wire-level application layer protocol, that specifies and mandates how messages should be queued, routed, and delivered in a reliable and secure way.
Terminology
The following is a high level architectural view of RabbitMQ:
In this section, we will list and briefly describe some of the terms referred to in this article.
Term | Description |
---|---|
Broker | server that receives messages from producers and routes the messages to the appropriate consumers |
Exchange | server component through which a producer connects to send messages. It is like a Mailbox |
Queue | server component from which consumers get messages. It is the storage buffer (memory and/or disk) for messages |
Binding | the logical relationship between Exchanges and Queues. It establishes the rules to route messages from Exchanges to appropriate Queues |
Channel | logical connection between a client (producer or consumer) and the message broker allowing for isolation of communication |
Routing Key | a string attribute of the message that an Exchange will look at to determine which Queue(s) to route the incoming message to |
Setup
The setup will be on a Ubuntu 17.04 LTS based Linux desktop.
Ensure Docker is installed and setup. Else, refer to the article Introduction to Docker.
Assume a hypothetical user alice with the home directory located at /home/alice.
Create a directory called rabbitmq under /home/alice by executing the following command:
mkdir rabbitmq
Create another directory called lib under /home/alice by executing the following command:
mkdir lib
Since we will be implementing code in Java (for demonstration), we need to download some dependent JARs. Download and copy the Slf4J API and Slf4J Simple JARs to /home/alice/lib. Also, download and copy the RabbitMQ Java Client JAR to /home/alice/lib.
For our exploration, we will be downloading and using the official docker image rabbitmq:management.
To pull and download the docker image for RabbitMQ with the management plugin enabled, execute the following command:
docker pull rabbitmq:management
The following should be the typical output:
management: Pulling from library/rabbitmq bc95e04b23c0: Pull complete 41a230aa3726: Pull complete 9190ffbd2271: Pull complete 69e829d8f0d2: Pull complete 2404945c98d3: Pull complete 629c315aa810: Pull complete cff2b1994ea2: Pull complete 3246485e4a5d: Pull complete dd004dbdde7c: Pull complete ef3f0c64cb0d: Pull complete f7d9dcfb8c20: Pull complete 30545194ec57: Pull complete Digest: sha256:3840069fcc704db533cd83ec7fad30daabdd2fda321df4fb788b81b6ed2323b2 Status: Downloaded newer image for rabbitmq:management
To launch the docker instance for RabbitMQ with the management plugin enabled, execute the following command:
docker run -d --hostname rabbit-dev --name rabbit-dev -v /home/alice/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=rabbitusr -e RABBITMQ_DEFAULT_PASS=s3cr3t rabbitmq:management
The following should be the typical output:
f7cfd461ce68256b91e387cc37dde864f752b5c909e70e50c5f1b20c20bf0701
To check if the docker instance for RabbitMQ is up and running, execute the following command:
docker ps
The following should be the typical output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES f7cfd461ce68 rabbitmq:management "docker-entrypoint.sh" 7 seconds ago Up 4 seconds 4369/tcp, 5671-5672/tcp, 15671/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp rabbit-dev
Fire up a web-browser and open the URL http://localhost:15672. The following should be the typical output:
Login to the management console with the user-id rabbitusr and password s3cr3t. The following should be the typical output:
Hands-on with RabbitMQ using Java
An Exchange type controls how incoming messages are routed to the appropriate Queue(s). The simplest of these is the Direct Exchange.
A Direct Exchange routes messages to Queues based on a Routing key. A Routing key is just a string to allow for directed routing. For example, if a payment processor accepts only a Mastercard or a Visa card, then the Routing key with "mastercard" will be routed to the Mastercard verifier, while the Routing key with "visa" will be routed to the Visa verifier.
The following picture illustrates the high level view of the Direct Exchange:
Now, to demonstrate the hypothetical Payments system with a payment processor (publisher) and two payment verifiers (consumers - Mastercard and Visa), we will implement the corresponding PaymentProcessor and PaymentVerifier classes.
The following is the common class that captures and exposes some constants:
The following is the common utility class that implements the commonly used methods between the producer and the consumer classes:
Let us explain and understand some of the classes/methods used in the RabbitMqUtils code shown above.
The class com.rabbitmq.client.ConnectionFactory is a factory class that is used to configure various options for creating a connection instance to the RabbitMQ message broker. In our case, we are configuring the host name on which the message broker is running, the user-id with which to connect, and the user credential to use.
The method setHost(String) on the factory class instance configures the host name of the RabbitMQ message broker.
The method setUsername(String) on the factory class instance configures the user name to use when connecting to the RabbitMQ message broker.
The method setPassword(String) on the factory class instance configures the user name credential to use when connecting to the RabbitMQ message broker.
The class com.rabbitmq.client.Connection represents a connection to the RabbitMQ message broker, which under-the-hood uses the lower level reliable TCP protocol.
The method newConnection() on the factory class instance creates and returns an instance of the connection to the RabbitMQ message broker.
The class com.rabbitmq.client.Channel represents a logical communication path that is multiplexed on top of an instance of a connection to the RabbitMQ message broker. In some use-cases, an application may need multiple communication paths to the RabbitMQ message broker. Rather than creating multiple com.rabbitmq.client.Connection instances (which is expensive as it uses system network resources), it is much more efficient to create multiple channels.
The method createChannel() on the connection class instance creates and returns an instance of the channel (a communication path to the RabbitMQ message broker).
The method close() on the connection class instance closes the connection path to the RabbitMQ message broker.
The method close() on the channel class instance closes the communication to the RabbitMQ message broker.
The following is the code for the payments processor, which acts as a publisher of payment messages. We declare two channels and corresponding queues and publish two sample messages (one for Mastercard and one for Visa):
Let us explain and understand some of the classes/methods used in the PaymentProcessor code shown above.
The method queueDeclare(String, boolean, boolean, boolean, Map) on the channel class instance creates (if it does not already exist) and declares a queue in the RabbitMQ message broker. The first parameter is the queue name. If the second parameter is true, then we are declaring a durable queue. Non-durable queues get cleaned up if the RabbitMQ message broker is shutdown or crashes. If the third parameter is true, then we are declaring an exclusive queue, meaning it is restricted to just this connection. If the fourth parameter is true, then we are declaring an auto delete queue, meaning the server will delete the queue if no longer in use. The last parameter can be used to specify additional properties, such as queue specify message TTL, etc.
The method basicPublish(String, String, AMQP.BasicProperties, byte[]) on the channel class instance allows one to publish a message (specified as a byte array in the last parameter) to the specified routing key (second parameter). The routing key should match the destined queue name. The first parameter specifies the exchange to use. In our case, we use the default RabbitMQ exchange that has no name. The third parameter can be used to specify additional message properties, such as the message priority, correlation id, etc.
The following is the code for the payments verifier, which acts as a consumer of payment messages:
Let us explain and understand some of the classes/methods used in the PaymentVerifier code shown above.
The interface com.rabbitmq.client.Consumer defines the application callbacks for messages received on a queue.
The class com.rabbitmq.client.DefaultConsumer is a convenience class that provides a default implementation of the interface com.rabbitmq.client.Consumer. The constructor takes an instance of com.rabbitmq.client.Channel that is associated with the corresponding queue. This class provides dummy (no-op) implementations of the callback methods.
The class com.rabbitmq.client.Envelope encapsulates some of the basic properties such as the exchange through which the message the message was published, the routing key, etc.
The class com.rabbitmq.client.AMQP.BasicProperties encapsulates some of the basic properties associated with the message such as the message id, priority, timestamp, message expiration, etc.
The method handleDelivery(String, Envelope, AMQP.BasicProperties, byte[]) on the class com.rabbitmq.client.DefaultConsumer is called every time a message is received on the corresponding message queue.
The method basicConsume(String, boolean, Consumer) on the channel class instance associates the specified com.rabbitmq.client.DefaultConsumer instance with the specified queue.
Now, for the demostration of messaging using Direct Exchange, open three terminal windows. Lets refer to them as the publisher, consumer-mc, and consumer-v respectively.
In the consumer-mc terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentVerifier 172.17.0.2 rabbitusr s3cr3t mastercard
The following should be the typical output:
Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create communication channel for mastercard Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create a queue for mastercard Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create a consumer for mastercard Nov 12, 2017 2:51:52 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to consume test messages for mastercard
In the consumer-v terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentVerifier 172.17.0.2 rabbitusr s3cr3t visa
The following should be the typical output:
Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create communication channel for visa Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create a queue for visa Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to create a consumer for visa Nov 12, 2017 2:52:32 PM com.polarsparc.rabbitmq.direct.PaymentVerifier main INFO: Ready to consume test messages for visa
In the publisher terminal, execute the following command:
java -cp ./build/classes:./lib/amqp-client-5.0.0.jar:./lib/slf4j-api-1.7.25.jar:./lib/slf4j-simple-1.7.25.jar com.polarsparc.rabbitmq.direct.PaymentProcessor 172.17.0.2 rabbitusr s3cr3t
The following should be the typical output:
Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main INFO: Ready to create communication channels ... Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main INFO: Ready to create queues for Mastercard and Visa Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main INFO: Ready to publish test messages for Mastercard and Visa Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main INFO: Ready to close the communication channels Nov 12, 2017 2:53:33 PM com.polarsparc.rabbitmq.direct.PaymentProcessor main INFO: Payment processing done !!!
In the consumer-mc terminal, we should see the following message pop-up:
Nov 12, 2017 2:55:41 PM com.polarsparc.rabbitmq.direct.PaymentVerifier$1 handleDelivery INFO: Received message: 12345,19.99
In the consumer-v terminal, we should see the following message pop-up:
Nov 12, 2017 2:55:41 PM com.polarsparc.rabbitmq.direct.PaymentVerifier$1 handleDelivery INFO: Received message: 98765,21.99
The following picture shows the screenshot of the RabbitMQ web management console:
We have successfully demonstrated RabbitMQ messaging using the Direct Exchange !!!
References