RabbitMQ vs Redis
RabbitMQ is a popular messaging middleware, before I have been using redis as a messaging middleware, but the production environment is more recommended RabbitMQ to replace Redis, so I went to query some RabbitMQ information. Compared to Redis, RabbitMQ has many advantages, such as:
- With message consumption confirmation mechanism
- Queues, messages, all have the option of being persisted or not, with less granularity and more flexibility.
- Load balancing is possible
RabbitMQ Application Scenarios
- Asynchronous processing: for example, confirmation emails and SMS messages during user registration are handled asynchronously by rabbitMQ.
- Application decoupling: for example, send and receive messages both sides can use the message queue, with a certain buffer function
- Traffic peak shaving: generally applied to spike activities, you can control the number of users, you can also reduce the flow of
- Logging: Separate storage for info, warning, error, etc.
The RabbitMQ Message Model
Here's how to use thePython (used form a nominal expression)pika This library to implement the six common message models in RabbitMQ. If you don't have it, you can install it first:
pip install pika
1. Single-production-single-consumption model: that is, to complete the basic one-to-one message forwarding.
# Producer code import pika credentials = ('chuan', '123') # mq username and password, if you don't have one, you need to create your own. # The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default. connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) # Establishing a channel for rabbit protocol channel = () # Declare a message queue where messages will be delivered, or created if they don't exist. durable specifies whether the queue is persistent or not. channel.queue_declare(queue='python-test', durable=False) # message can not be sent directly to the queue, you need to go through the exchange to reach the queue, here use the default exchange identified by the empty string # Insert values into the queue routing_key is the queue name channel.basic_publish(exchange='', routing_key='python-test', body='Hello world!2') # Close the connection to the rabbitmq server ()
# Consumer code import pika credentials = ('chuan', '123') # BlockingConnection:SynchronizationMode connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) channel = () # Declare the message queue. When there is uncertainty about which producer and consumer will start first, the message queue can be declared repeatedly on both sides. channel.queue_declare(queue='python-test', durable=False) # Define a callback function to handle messages in the message queue, here they are printed out def callback(ch, method, properties, body): # Send confirmation message manually ch.basic_ack(delivery_tag=method.delivery_tag) print(()) # Tell the producer that the consumer has received the message # Tell rabbitmq to receive the message with callback # Acknowledgement of messages is done by default to prevent message loss. # Here auto_ack is explicitly specified as True and the message is not acknowledged. channel.basic_consume('python-test', on_message_callback=callback) # auto_ack=True) # Automatically send an acknowledgement message # Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing channel.start_consuming()
2. Message distribution model: multiple listeners listen to a queue.
# Producer code import pika credentials = ('chuan', '123') # mq username and password # The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default. connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) # Establishing a channel for rabbit protocol channel = () # Declare a message queue where messages will be delivered, or created if they don't exist. durable specifies whether the queue is persistent. Ensures that unacknowledged messages are not lost channel.queue_declare(queue='rabbitmqtest', durable=True) # message can not be sent directly to the queue, you need to go through the exchange to reach the queue, here use the default exchange identified by the empty string # Insert values into the queue routing_key is the queue name # The properties parameter of basic_publish specifies the properties of the message. Here, delivery_mode=2 specifies that the message is persistent. for i in range(10): channel.basic_publish(exchange='', routing_key='python-test', body='Hello world!%s' % i, properties=(delivery_mode=2)) # Close the connection to the rabbitmq server ()
# Consumer code, consume1 and consume2 import pika import time credentials = ('chuan', '123') # BlockingConnection:SynchronizationMode connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) channel = () # Declare the message queue. When there is uncertainty about which producer and consumer will start first, the message queue can be declared repeatedly on both sides. channel.queue_declare(queue='rabbitmqtest', durable=True) # Define a callback function to handle messages in the message queue, here they are printed out def callback(ch, method, properties, body): # Send confirmation message manually (10) print(()) # Tell the producer that the consumer has received the message ch.basic_ack(delivery_tag=method.delivery_tag) # If the number of unacknowledged messages on this consumer's channel reaches the prefetch_count count, no message is sent to this consumer channel.basic_qos(prefetch_count=1) # Tell rabbitmq to receive the message with callback # Acknowledge messages by default to prevent message loss. # Here no_ack is explicitly specified as True, and the message is not acknowledged. channel.basic_consume('python-test', on_message_callback=callback) # auto_ack=True) # Automatically send an acknowledgement message # Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing channel.start_consuming()
Message Subscription Pattern: The producer sends the message to Exchange, which then forwards it to the Queue with which it is bound, and each consumer then goes to its own Queue to fetch the message.
# Producer code import pika credentials = ('chuan', '123') # mq username and password # The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default. connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) # Establishing a channel for rabbit protocol channel = () # fanout: all queues bound to this exchange can receive messages (real-time broadcast) # direct: the queue in the group determined by routingKey and exchange can receive messages (with opt-in) # topic: all queues bound by a routingKey that matches the routingKey (which can be an expression at this point) can receive messages (more granular filtering) channel.exchange_declare('logs', exchange_type='fanout') #Because it is a fanout broadcast type exchange, there is no need to specify a routing_key here. for i in range(10): channel.basic_publish(exchange='logs', routing_key='', body='Hello world!%s' % i) # Close the connection to the rabbitmq server ()
import pika credentials = ('chuan', '123') # BlockingConnection:SynchronizationMode connection = ((host='localhost', port=5672, virtual_host='/', credentials=credentials)) channel = () # As a good practice, declare it once in the producer and once in the consumer to ensure that the exchange you want to use exists. channel.exchange_declare(exchange='logs', exchange_type='fanout') # Randomly generate a new empty queue, set exclusive to True so that the queue will be deleted after the consumer disconnects from RabbitMQ. # It's exclusive. result = channel.queue_declare('', exclusive=True) # Used to get the name of the temporary queue queue_name = # The relationship between exchange and queue becomes binding # binding tells exchange which queues to send the message to channel.queue_bind(exchange='logs', queue=queue_name) # Define a callback function to handle messages in the message queue, here they are printed out def callback(ch, method, properties, body): # Send confirmation message manually print(()) # Tell the producer that the consumer has received the message #ch.basic_ack(delivery_tag=method.delivery_tag) # If the number of unacknowledged messages on this consumer's channel reaches the prefetch_count count, no message is sent to this consumer channel.basic_qos(prefetch_count=1) # Tell rabbitmq to receive the message with callback # Acknowledgement of messages is done by default to prevent message loss. # Here no_ack is explicitly specified as True, and the message is not acknowledged. channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) # Automatically send confirmation messages # Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing channel.start_consuming()
Routing Mode: In this case, the producer needs to specify the RoutingKey, i.e. the RoutingKey, when the Exchange receives the message, it forwards it to the queue that matches the RoutingKey.
# producer code, the test command can be used: python error 404error import pika import sys connection = ((host='localhost')) channel = () # Declare an exchange of type direct named direct_logs # direct type exchange channel.exchange_declare(exchange='direct_logs', exchange_type='direct') # Get the configuration parameters for basic_publish from the command line severity = [1] if len() > 1 else 'info' message = ' '.join([2:]) or 'Hello World!' # Send message to exchage named direct_logs with routing_key set channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) ()
# Consumer code, testing can be done using: python error import pika import sys connection = ((host='localhost')) channel = () # Declare an exchange named direct_logs of type direct # It's good practice to declare the exchage or queue in both producer and consumer to ensure its presence channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare('', exclusive=True) queue_name = # Get parameter:routing_key from command line severities = [1:] if not severities: print(, "Usage: %s [info] [warning] [error]" % ([0],)) (1) for severity in severities: # The binding between exchange and queue can accept the routing_key parameter. # fanout type exchange directly ignores this parameter. direct type exchange exactly matches this keyword for message routing # A consumer can bind multiple routing_keys # Exchange is to match this RoutingKey with all the bindingKey of the current Exchange. # If the requirements are met, send a message to the Queue bound to the BindingKey. channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
Match Patterns: more granular grouping that allows the use of matchers in RoutingKey.
- *: Match a word
- #: Match 0 or more words
# producer code, basically unchanged, just change exchange_type to topic (test: python) # red color is my favorite import pika import sys connection = ((host='localhost')) channel = () # Declare an exchange of type direct named direct_logs # direct type exchange channel.exchange_declare(exchange='topic_logs', exchange_type='topic') # Get the configuration parameters for basic_publish from the command line severity = [1] if len() > 1 else 'info' message = ' '.join([2:]) or 'Hello World!' # Send message to exchange named direct_logs with routing_key set channel.basic_publish(exchange='topic_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) ()
# Consumer code, (test: python *.red) import pika import sys connection = ((host='localhost')) channel = () # Declare an exchange named direct_logs of type direct # It's good practice to declare the exchage or queue in both the producer and consumer to ensure its presence channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare('', exclusive=True) queue_name = # Get parameter:routing_key from command line severities = [1:] if not severities: print(, "Usage: %s [info] [warning] [error]" % ([0],)) (1) for severity in severities: # The binding between exchange and queue can accept the routing_key parameter. # fanout type exchange directly ignores this parameter. direct type exchange exactly matches this keyword for message routing # A consumer can bind multiple routing_keys # Exchange is to match this RoutingKey with all the bindingKey of the current Exchange. # If the requirements are met, send a message to the Queue bound to the BindingKey. channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=severity) def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
Remote procedure call: the client and server are completely decoupled from each other, i.e., both ends are both senders and receivers of messages.
# Producer code import pika import uuid # Encapsulates connection creation, queue declaration, consumer configuration, callback functions, etc., in a single class class FibonacciRpcClient(object): def __init__(self): # Establish a connection to RabbitMQ Server = ((host='localhost')) = () # Declare a temporary callback queue result = .queue_declare('', exclusive=True) self._queue = # Here the client is both a producer and a consumer, so configure the consume parameter # Here's the specification to receive messages from a temporary queue created by the client itself. # and use the on_response function to process the message # No acknowledgement of messages .basic_consume(queue=self._queue, on_message_callback=self.on_response, auto_ack=True) = None self.corr_id = None # Define callback functions # Compare the value of the corr_id property of a class with the value of the corr_id property in props # If the same then the response attribute is the received message def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: = body def call(self, n): # Initialize the response and corr_id attributes self.corr_id = str(uuid.uuid4()) # Use the default exchange to send messages to the rpc_queue defined in the server # Specify the replay_to attribute and the correlation_id attribute in the properties for informing the remote server # correlation_id attribute is used to match request and response .basic_publish(exchange='', routing_key='rpc_queue', properties=( reply_to=self._queue, correlation_id=self.corr_id, ), # message needs to be a string body=str(n)) while is None: .process_data_events() return int() # Generate instances of the class fibonacci_rpc = FibonacciRpcClient() print(" [x] Requesting fib(30)") # Call the instance's call method response = fibonacci_rpc.call(30) print(" [.] Got %r" % response)
# Consumer code, in this case generating Fibonacci numbers import pika # Establish a connection to RabbitMQ Server connection = ((host='localhost')) channel = () # Declare a queue named rpc_queue channel.queue_declare(queue='rpc_queue') # Calculate the Fibonacci number for a given number def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n - 1) + fib(n - 2) # Callback function, called when a message is received from the queue. def on_request(ch, method, props, body): # Get the number to calculate the Fibonacci number from message n = int(body) print(" [.] fib(%s)" % n) # Call the fib function to get the result of a calculation response = fib(n) # exchage is an empty string then send the message to the queue specified by routing_key # Here the queue is the queue specified by reply_ro in the callback function props parameter. # The message to be sent is the calculated Fibonacci number # correlation_id in properties is specified as the correlation_id of co in the callback function parameter props # Final acknowledgement of the message ch.basic_publish(exchange='', routing_key=props.reply_to, properties=(correlation_id=props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) # The queue only dispatches a new message to the consumer if it has already processed and acknowledged the previous message. channel.basic_qos(prefetch_count=1) # Set the consumer parameter, i.e., which queue to get the message from and which function to use to process it, and whether or not to acknowledge the message channel.basic_consume(queue='rpc_queue', on_message_callback=on_request) print(" [x] Awaiting RPC requests") # Start receiving and processing messages channel.start_consuming()
This article on the Python implementation of RabbitMQ 6 message model sample code is introduced to this article, more related Python RabbitMQ message model content please search my previous posts or continue to browse the following related articles I hope you will support me in the future!