Recently, I had to dive into the official RabbitMQ documentation and numerous articles of varying quality on the topic of different types of routing in this message broker.
It turned out that there is a lot of material available, but it either explains very basic cases or delves into depths that are quite distant and challenging for someone who simply wants to “get the hang of it.”
There are also few simple and understandable examples in Python, as they often require you to delve into the library code rather than focusing on RabbitMQ specifics. For someone encountering something Pika-like for the first time, this can be quite daunting.
That’s why I decided to write this article. It is aimed at beginners, and you can easily share it with your juniors, as it should be sufficient to get started with RabbitMQ.
The examples in the article will be provided using the Propan framework to spare the beginners from the extra details of establishing connections, channels, and so on.
Copyright TechPlanet.today
Why do you need RabbitMQ?
This section contains a lot of introductory information about messaging, message brokers, and where RabbitMQ stands among them. If you are already familiar with these concepts, feel free to skip to the next section. Otherwise, take a look under the spoiler.
Since this article is targeted towards beginners, it would be good to understand what RabbitMQ is and why we need it.
Simply put, RabbitMQ is a message broker, one of many available options. Other popular alternatives include Kafka (not exactly a broker but serves a similar purpose), Nats, SQS, and even Redis (which also provides messaging capabilities in different ways). There are also more exotic choices like Pulsar, ActiveMQ, Tarantool, and so on.
So, what exactly is a message broker? It’s an external service that receives, stores, and distributes messages among consumers. While the messaging architecture can be implemented without an external broker by sending messages directly (e.g., MQTT, ZeroMQ), introducing a separate entity that manages the lifecycle and distribution of messages enhances the system’s fault tolerance and facilitates its scalability.
This architecture aims to build systems with asynchronous non-blocking operations. While HTTP requests imply receiving a response, sending a message only entails confirming that the message has been placed in the broker and, in some cases, confirming the delivery of the message to the consumer and even the successful processing of the message.
Moreover, some brokers allow you to utilize RPC requests without unnecessary complexity. In this scenario, sending a message implies receiving a response in another message. This response can be sent either through a temporary one-time queue or a permanent queue, enabling us to receive one or more responses to the sent message. It also allows us to separate the request and response in time for an indefinite period without blocking the application during the waiting time (unlike an HTTP request that would time out).
Thus, adopting the Messaging pattern allows us to build a much more flexible system of services capable of executing both synchronous and asynchronous tasks with various guarantees.
However, the primary use case for Messaging is performing asynchronous operations. In fact, a significant portion of actions we want to perform in our applications are inherently asynchronous: sending notifications, email distribution, scheduled task execution, and so on. In scenarios where you simply need to initiate further data processing without waiting for the results, Messaging is a perfect fit. With this pattern, you can easily construct distributed systems for stream processing (where results from each layer are passed to the next via messages), task scheduling, notifications, mass broadcasting, incident response, IoT services, and much more.
While RabbitMQ may not be the most performant among brokers, its distinctive feature is the ability to build a highly complex message routing system.
Key Concepts
In RabbitMQ, there are three main types of routing objects:
- Exchange: This is where messages are sent.
- Queue: This is where we retrieve messages from.
- Binding: It defines the rules for delivering messages from the Exchange to the Queue.
Essentially, these are all you need to know to get started with RabbitMQ: declare an Exchange to which messages will be sent, bind a Queue to it, and start listening to that Queue. Mission complete!
The message routing rules are determined by the Exchange type and the Queue binding parameters. The Queue itself encapsulates the logic of delivering messages to consumers, managing the lifecycle of messages, and more, but we won’t delve into that for now.
Let’s focus on understanding each type of routing one by one.
Direct Exchange
Direct Exchange is the basic message routing method in RabbitMQ. Its essence is straightforward: the Exchange sends messages to those Queues whose routing key matches the routing key of the sent message.
When binding a Queue to an Exchange, a routing key is specified. In most cases, it matches the name of the Queue itself. The routing key is also specified when sending a message, and it determines where the message will be forwarded.
An example helps illustrate this clearly:
from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType
broker = RabbitBroker()
app = PropanApp(broker)
# Declare a Direct Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.DIRECT)
# Declare a couple of different queues
queue_1 = RabbitQueue("test-q-1")
queue_2 = RabbitQueue("test-q-2")
# Listen to queue_1
@broker.handle(queue_1, exchange)
async def handler1():
print("handler1")
# Listen to queue_2
@broker.handle(queue_2, exchange)
async def handler2():
print("handler2")
@app.after_startup
async def send_messages():
# Send a message to our exchange, routed to queue_1
# It will be processed by handler1
await broker.publish(exchange=exchange, routing_key="test-q-1")
# Send a message to our exchange, routed to queue_2
# It will be processed by handler2
await broker.publish(exchange=exchange, routing_key="test-q-2")
The only thing I can add here is that RabbitMQ supports the ability to bind a single queue to multiple different exchanges and even to a single exchange with different routing keys. Therefore, messages sent to different exchanges with different parameters can accumulate in a single queue and be delivered to consumers.
Based on my experience, it is advisable to avoid such scenarios: it is better to declare a new queue instead of trying to bring different message flows into an old one.
Fanout Exchange
The Fanout Exchange is an even simpler way of routing in RabbitMQ. This type of exchange sends messages to all queues subscribed to it, ignoring any arguments of the message itself.
I think the following example also does not require additional comments.
from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType
broker = RabbitBroker()
app = PropanApp(broker)
# Declare a Fanout Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.FANOUT)
# Declare a couple of different queues
queue_1 = RabbitQueue("test-q-1")
queue_2 = RabbitQueue("test-q-2")
# Listen to queue_1
@broker.handle(queue_1, exchange)
async def handler1():
print("handler1")
# Listen to queue_2
@broker.handle(queue_2, exchange)
async def handler2():
print("handler2")
@app.after_startup
async def send_messages():
# Send a message to our exchange
# it will be processed by both consumers
await broker.publish(exchange=exchange)
Topic Exchange
Topic Exchange is a powerful routing mechanism in RabbitMQ. This type of exchange sends messages to queues based on the pattern specified when they are bound to the exchange and the routing key of the message itself.
Topic Exchange can be used as both Direct and Fanout, as well as combined variations of both. However, it works slightly slower (but faster than Headers, which we will discuss next). Overall, it is an excellent compromise between flexibility and performance, which is why it is often used in practice.
Here’s a snippet of code:
from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType
broker = RabbitBroker()
app = PropanApp(broker)
# Declare a Topic Exchange
exchange = RabbitExchange("test-exchange", type=ExchangeType.TOPIC)
# Declare a couple of different queues and their routing keys
queue_1 = RabbitQueue("test-queue-1", routing_key="*.info")
queue_2 = RabbitQueue("test-queue-2", routing_key="*.debug")
queue_3 = RabbitQueue("test-queue-2", routing_key="logs.*")
# Listen to queue_1
@broker.handle(queue_1, exchange)
async def handler1():
print("handler1")
# Listen to queue_2
@broker.handle(queue_2, exchange)
async def handler2():
print("handler2")
# Listen to queue_3
@broker.handle(queue_2, exchange)
async def handler2():
print("handler3")
@app.after_startup
async def send_messages():
# Send a message to our exchange
# The message key matches *.info and logs.*
# It will be processed by handler1 and handler3
await broker.publish(routing_key="logs.info", exchange=exchange)
# Send a message to our exchange
# The message key matches *.debug and logs.*
# It will be processed by handler2 and handler3
await broker.publish(routing_key="logs.debug", exchange=exchange)
Headers Exchange
Headers Exchange is the most complex and flexible way of message routing in RabbitMQ. This type of exchange sends messages to queues based on matching the header arguments of those queues to the message headers in the exchange.
It is such a flexible routing option that it is rarely used in practice. Most of the time, Topic Exchange is preferred over it. However, knowing that such a tool exists can be helpful when basic techniques are not sufficient.
You will hardly find any examples of its usage in Python (or in general). Therefore, I had to delve into it by studying the official RabbitMQ documentation and figuring out how to use it based on the source code of the pika library.
Here is a summary for you:
from propan import PropanApp, RabbitBroker
from propan.brokers.rabbit import RabbitExchange, RabbitQueue, ExchangeType
broker = RabbitBroker()
app = PropanApp(broker)
# Declare Headers Exchange
exch = RabbitExchange("exchange", type=ExchangeType.HEADERS)
# Declare a queue that expects messages
# with the header {"key": 1}
queue_1 = RabbitQueue(
"test-queue-1",
bind_arguments={"key": 1}
)
# Declare a queue that expects messages
# with headers {"key": 2}, {"key2": 2}, or {"key": 2, "key2": 2}
queue_2 = RabbitQueue(
"test-queue-2",
bind_arguments={
"key": 2, "key2": 2,
"x-match": "any"
}
)
# Declare a queue that expects messages
# with the header {"key": 2, "key2": 2}
queue_3 = RabbitQueue(
"test-queue-3",
bind_arguments={
"key": 2, "key2": 2,
"x-match": "all"
}
)
# Listen to queue_1
@broker.handle(queue_1, exch)
async def handler1():
print("handler1")
# Listen to queue_2
@broker.handle(queue_2, exch)
async def handler2():
print("handler2")
# Listen to queue_3
@broker.handle(queue_3, exch)
async def handler3():
logger.info("handler3")
@app.after_startup
async def send_messages():
# Send a message with the header {"key": 1}
# It will be processed by handler1
await broker.publish(exchange=exch, headers={"key": 1})
# Send a message with the header {"key": 2}
# It will be processed by handler2
await broker.publish(exchange=exch, headers={"key": 2})
# Send a message with the header {"key2": 2}
# It will also be processed by handler2
await broker.publish(exchange=exch, headers={"key2": 2})
# Send a message with the header {"key": 2, "key2": 2}
# It will be processed by handler2 and handler3
await broker.publish(exchange=exch, headers={
"key": 2, "key2": 2
})
As you may have guessed, the binding header “x-match
” determines whether the message headers should partially or completely match the declared headers.
Additional Information
It is important to know a few more nuances: when multiple consumers subscribe to a single queue, the messages from that queue will be delivered to them IN SEQUENCE. That means if we have two consumers (1 and 2) on a single queue, they will receive messages in the following pattern:
1 - 2 - 1 - 2 - ...
RabbitMQ automatically manages the distribution of these messages. You simply connect additional consumers, and the load is distributed among them, which can be very useful for horizontal scaling.
RabbitMQ also supports even more complex routing options, where an Exchange is bound to another Exchange, just like binding to a queue. This allows you to combine, for example, Topic and Headers Exchange. However, I do not recommend doing this for three reasons:
It is not part of the AMQP protocol and is a specific feature of RabbitMQ, so once you use it, it may be challenging to switch to another messaging system.
It significantly increases the load on RabbitMQ, which is not the fastest message broker.
It can lead to confusion. It becomes difficult to keep track of the bindings and relationships when one queue is subscribed to multiple exchanges, which are subscribed to other exchanges, and so on. You can easily get tangled in the complexity.
In conclusion
In this article, we have covered the main types of RabbitMQ Exchanges and how to work with them using the Python framework Propan. This basic knowledge is sufficient for writing and maintaining basic services using RabbitMQ.
If you find this topic interesting, next time we will explore the specifics of implementing RPC requests over RabbitMQ without creating temporary queues for receiving messages.
In case you have found a mistake in the text, please send a message to the author by selecting the mistake and pressing Ctrl-Enter.
#Messaging #Beginners #Exploring #Full #Potential #RabbitMQ #Python