RabbitMQ: Getting Started
Introduction
RabbitMQ is an open-source message broker software that implements the Advanced Message Queuing Protocol (AMQP). It is designed to handle large numbers of messages and to route them to the appropriate consumers. It is commonly used in distributed systems and microservices architectures to decouple and scale the different components of a system.
In RabbitMQ, messages are sent to "queues," which are like mailboxes that hold messages until they are consumed by a "consumer." Producers, or the sender of the message, do not send messages directly to consumers, but instead send messages to a queue. This allows for messages to be buffered and for consumers to process messages at their own pace, without being overwhelmed by a flood of incoming messages. This also allows for multiple consumers to consume messages from the same queue, allowing for easy scaling of systems.
The guide will cover the steps required to setup a producer and consumer using Node.js and RabbitMQ.
Running RabbitMQ on Docker
Running RabbitMQ on Docker is a simple process that can be done using the official RabbitMQ Docker image. Here are the steps to run RabbitMQ on Docker:
1. Run the following command on the terminal to pull the official RabbitMQ image from Docker Hub:
docker pull rabbitmq:latest
2. Once the image is downloaded, you can run a container from the image by using the docker run
command. To run RabbitMQ, execute the following command:
docker run --name rabbitmq-test -p 5672:5672 rabbitmq
rabbitmq-test
is the container name (it can be changed to your own preferred container name)-p
allows us to map the host port to a container port. The syntax is-p host_port:container_port
. With this command, you can access the RabbitMQ management interface by navigating tohttp://localhost:5672
on your browser.
3. To check the running container, run docker ps
and you should see a container running with the name rabbitmq-test
.
4. You can also use the docker stop
and docker start
command to stop and start the container. And use docker rm
to remove the container once you are done with it.
Producers and Consumers
Let's start by initializing our project directory:
1. Create a directory named `rabbitmq`
2. Initialize this directory into a Node.js project by running the command below:
npm init -y
3. Install the pre-requisite library called amqplib
npm install amqplib
4. Create two files named producer.js
and consumer.js
Producer
Here is the code for the producer.js
:
const amqp = require("amqplib");
sendMessage();
async function sendMessage() {
try {
const connection = await amqp.connect("amqp://localhost:5672");
const channel = await connection.createChannel();
const result = channel.assertQueue("jobs");
const msg = {
number: +process.argv[2]
}
channel.sendToQueue("jobs", Buffer.from(JSON.stringify(msg)));
console.log(`Job sent successfully ${msg.number}`);
} catch (ex) {
console.error(ex);
}
}
This code is using the amqplib library to connect to a RabbitMQ instance running on the local machine on port 5672.
- The
amqp.connect
function is used to establish a connection to the RabbitMQ server, and the "amqp://localhost:5672" is the URL of the server. - Once the connection is established, it creates a channel using the
connection.createChannel()
method. Channels are a way for the application to communicate with the message broker, and they are used to send and receive messages. - The code then calls the
channel.assertQueue("jobs")
function which creates a queue named "jobs" if it doesn't exist already, and it asserts that the queue exists and is active. The function returns an object that contains information about the queue, such as the number of messages in it. - The message or
msg
is prepared. The value is taken from the command line. Note that the+
symbol is use to convert the String input into a Number. - Then the message is sent to the "jobs" queue using the
channel.sendToQueue("jobs", Buffer.from(JSON.stringify(msg)))
method. TheBuffer.from(JSON.stringify(msg))
is used to convert the object to a Buffer object before sending it to the queue.
Consumer
Here is the code for the consumer.js
:
const amqp = require("amqplib");
receiveMessage();
async function receiveMessage() {
try {
const connection = await amqp.connect("amqp://localhost:5672");
const channel = await connection.createChannel();
const result = channel.assertQueue("jobs");
channel.consume("jobs", message => {
const receivedMessage = JSON.parse(message.content.toString());
console.log(" [x] Received '%s'", receivedMessage);
channel.ack(message);
});
console.log("Waiting for messages...");
} catch (ex) {
console.error(ex);
}
}
The consumer code is doing the following steps:
1. It connects to RabbitMQ, creates a channel, and asserts that the `jobs` queue exists.
2. Then, it starts consuming from the jobs
queue. The channel.consume
function takes a callback function that is called every time a message is received. The message
is passed to the callback as parameter, and it is converted back to a JavaScript object so that it can be logged to the console.
3. The channel.ack(message)
method is used to acknowledge the message, which tells the RabbitMQ server that the message has been successfully processed and can be removed from the queue.
Testing our Codes
Let's prepare two consoles.
1. On the first console (consumer console), run the following command:
node consumer.js
This will show Waiting for messages...
and will continue running. Once we send a message from the producer, it will display the received message.
2. On the second console (producer console), run the following command:
node producer.js 23
This will send a message with a number 23
.
If done correctly, consumer console should display the following:
[x] Received '{ number: 23 }'
Code Repository
Here is the link to the repository: