SoFunction
Updated on 2024-10-29

Python implementation of RabbitMQ 6 message model sample code

RabbitMQ vs Redis

RabbitMQ is a popular messaging middleware, before I have been using redis as a messaging middleware, but the production environment is more recommended RabbitMQ to replace Redis, so I went to query some RabbitMQ information. Compared to Redis, RabbitMQ has many advantages, such as:

  • With message consumption confirmation mechanism
  • Queues, messages, all have the option of being persisted or not, with less granularity and more flexibility.
  • Load balancing is possible

 RabbitMQ Application Scenarios

  • Asynchronous processing: for example, confirmation emails and SMS messages during user registration are handled asynchronously by rabbitMQ.
  • Application decoupling: for example, send and receive messages both sides can use the message queue, with a certain buffer function
  • Traffic peak shaving: generally applied to spike activities, you can control the number of users, you can also reduce the flow of
  • Logging: Separate storage for info, warning, error, etc.

The RabbitMQ Message Model

Here's how to use thePython (used form a nominal expression)pika This library to implement the six common message models in RabbitMQ. If you don't have it, you can install it first:

pip install pika

1. Single-production-single-consumption model: that is, to complete the basic one-to-one message forwarding.

# Producer code
import pika


credentials = ('chuan', '123') # mq username and password, if you don't have one, you need to create your own.
# The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default.
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                                credentials=credentials))

# Establishing a channel for rabbit protocol
channel = ()
# Declare a message queue where messages will be delivered, or created if they don't exist. durable specifies whether the queue is persistent or not.
channel.queue_declare(queue='python-test', durable=False)

# message can not be sent directly to the queue, you need to go through the exchange to reach the queue, here use the default exchange identified by the empty string
# Insert values into the queue routing_key is the queue name
channel.basic_publish(exchange='',
           routing_key='python-test',
           body='Hello world!2')
# Close the connection to the rabbitmq server
()
# Consumer code
import pika

credentials = ('chuan', '123')
# BlockingConnection:SynchronizationMode
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                              credentials=credentials))
channel = ()
# Declare the message queue. When there is uncertainty about which producer and consumer will start first, the message queue can be declared repeatedly on both sides.
channel.queue_declare(queue='python-test', durable=False)
# Define a callback function to handle messages in the message queue, here they are printed out
def callback(ch, method, properties, body):
  # Send confirmation message manually
  ch.basic_ack(delivery_tag=method.delivery_tag)
  print(())
  # Tell the producer that the consumer has received the message

# Tell rabbitmq to receive the message with callback
# Acknowledgement of messages is done by default to prevent message loss.
# Here auto_ack is explicitly specified as True and the message is not acknowledged.
channel.basic_consume('python-test',
           on_message_callback=callback)
           # auto_ack=True) # Automatically send an acknowledgement message
# Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing
channel.start_consuming()

2. Message distribution model: multiple listeners listen to a queue.

# Producer code
import pika


credentials = ('chuan', '123') # mq username and password
# The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default.
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                                credentials=credentials))

# Establishing a channel for rabbit protocol
channel = ()
# Declare a message queue where messages will be delivered, or created if they don't exist. durable specifies whether the queue is persistent. Ensures that unacknowledged messages are not lost
channel.queue_declare(queue='rabbitmqtest', durable=True)

# message can not be sent directly to the queue, you need to go through the exchange to reach the queue, here use the default exchange identified by the empty string
# Insert values into the queue routing_key is the queue name
# The properties parameter of basic_publish specifies the properties of the message. Here, delivery_mode=2 specifies that the message is persistent.
for i in range(10):
  channel.basic_publish(exchange='',
             routing_key='python-test',
             body='Hello world!%s' % i,
             properties=(delivery_mode=2))
# Close the connection to the rabbitmq server
()
# Consumer code, consume1 and consume2
import pika
import time

credentials = ('chuan', '123')
# BlockingConnection:SynchronizationMode
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                                credentials=credentials))
channel = ()
# Declare the message queue. When there is uncertainty about which producer and consumer will start first, the message queue can be declared repeatedly on both sides.
channel.queue_declare(queue='rabbitmqtest', durable=True)
# Define a callback function to handle messages in the message queue, here they are printed out
def callback(ch, method, properties, body):
  # Send confirmation message manually
  (10)
  print(())
  # Tell the producer that the consumer has received the message
  ch.basic_ack(delivery_tag=method.delivery_tag)

# If the number of unacknowledged messages on this consumer's channel reaches the prefetch_count count, no message is sent to this consumer
channel.basic_qos(prefetch_count=1)
# Tell rabbitmq to receive the message with callback
# Acknowledge messages by default to prevent message loss.
# Here no_ack is explicitly specified as True, and the message is not acknowledged.
channel.basic_consume('python-test',
           on_message_callback=callback)
           # auto_ack=True) # Automatically send an acknowledgement message
# Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing
channel.start_consuming()

Message Subscription Pattern: The producer sends the message to Exchange, which then forwards it to the Queue with which it is bound, and each consumer then goes to its own Queue to fetch the message.

# Producer code
import pika


credentials = ('chuan', '123') # mq username and password
# The virtual queue needs to specify the parameter virtual_host, which can be left out if it is the default.
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                                credentials=credentials))
# Establishing a channel for rabbit protocol
channel = ()
# fanout: all queues bound to this exchange can receive messages (real-time broadcast)
# direct: the queue in the group determined by routingKey and exchange can receive messages (with opt-in)
# topic: all queues bound by a routingKey that matches the routingKey (which can be an expression at this point) can receive messages (more granular filtering)
channel.exchange_declare('logs', exchange_type='fanout')


#Because it is a fanout broadcast type exchange, there is no need to specify a routing_key here.
for i in range(10):
  channel.basic_publish(exchange='logs',
             routing_key='',
             body='Hello world!%s' % i)

# Close the connection to the rabbitmq server
()
import pika

credentials = ('chuan', '123')
# BlockingConnection:SynchronizationMode
connection = ((host='localhost',
                                port=5672,
                                virtual_host='/',
                                credentials=credentials))
channel = ()

# As a good practice, declare it once in the producer and once in the consumer to ensure that the exchange you want to use exists.
channel.exchange_declare(exchange='logs',
             exchange_type='fanout')

# Randomly generate a new empty queue, set exclusive to True so that the queue will be deleted after the consumer disconnects from RabbitMQ.
# It's exclusive.
result = channel.queue_declare('', exclusive=True)

# Used to get the name of the temporary queue
queue_name = 

# The relationship between exchange and queue becomes binding
# binding tells exchange which queues to send the message to
channel.queue_bind(exchange='logs',
          queue=queue_name)

# Define a callback function to handle messages in the message queue, here they are printed out
def callback(ch, method, properties, body):
  # Send confirmation message manually
  print(())
  # Tell the producer that the consumer has received the message
  #ch.basic_ack(delivery_tag=method.delivery_tag)

# If the number of unacknowledged messages on this consumer's channel reaches the prefetch_count count, no message is sent to this consumer
channel.basic_qos(prefetch_count=1)
# Tell rabbitmq to receive the message with callback
# Acknowledgement of messages is done by default to prevent message loss.
# Here no_ack is explicitly specified as True, and the message is not acknowledged.
channel.basic_consume(queue=queue_name,
           on_message_callback=callback,
           auto_ack=True) # Automatically send confirmation messages
# Start receiving messages, and enter the blocking state, there are messages in the queue before calling the callback for processing
channel.start_consuming()

Routing Mode: In this case, the producer needs to specify the RoutingKey, i.e. the RoutingKey, when the Exchange receives the message, it forwards it to the queue that matches the RoutingKey.

# producer code, the test command can be used: python error 404error
import pika
import sys

connection = ((host='localhost'))
channel = ()

# Declare an exchange of type direct named direct_logs
# direct type exchange
channel.exchange_declare(exchange='direct_logs',
             exchange_type='direct')

# Get the configuration parameters for basic_publish from the command line
severity = [1] if len() > 1 else 'info'
message = ' '.join([2:]) or 'Hello World!'

# Send message to exchage named direct_logs with routing_key set
channel.basic_publish(exchange='direct_logs',
           routing_key=severity,
           body=message)

print(" [x] Sent %r:%r" % (severity, message))
()
# Consumer code, testing can be done using: python error
import pika
import sys

connection = ((host='localhost'))
channel = ()

# Declare an exchange named direct_logs of type direct
# It's good practice to declare the exchage or queue in both producer and consumer to ensure its presence
channel.exchange_declare(exchange='direct_logs',
             exchange_type='direct')

result = channel.queue_declare('', exclusive=True)
queue_name = 

# Get parameter:routing_key from command line
severities = [1:]
if not severities:
  print(, "Usage: %s [info] [warning] [error]" % ([0],))
  (1)

for severity in severities:
  # The binding between exchange and queue can accept the routing_key parameter.
  # fanout type exchange directly ignores this parameter. direct type exchange exactly matches this keyword for message routing
  # A consumer can bind multiple routing_keys
  # Exchange is to match this RoutingKey with all the bindingKey of the current Exchange.
  # If the requirements are met, send a message to the Queue bound to the BindingKey.
  channel.queue_bind(exchange='direct_logs',
            queue=queue_name,
            routing_key=severity)

def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body,))


channel.basic_consume(queue=queue_name,
           on_message_callback=callback,
           auto_ack=True)

channel.start_consuming()

Match Patterns: more granular grouping that allows the use of matchers in RoutingKey.

  • *: Match a word
  • #: Match 0 or more words

# producer code, basically unchanged, just change exchange_type to topic (test: python)
# red color is my favorite
import pika
import sys

connection = ((host='localhost'))
channel = ()

# Declare an exchange of type direct named direct_logs
# direct type exchange
channel.exchange_declare(exchange='topic_logs',
             exchange_type='topic')

# Get the configuration parameters for basic_publish from the command line
severity = [1] if len() > 1 else 'info'
message = ' '.join([2:]) or 'Hello World!'

# Send message to exchange named direct_logs with routing_key set
channel.basic_publish(exchange='topic_logs',
           routing_key=severity,
           body=message)

print(" [x] Sent %r:%r" % (severity, message))
()
# Consumer code, (test: python *.red)
import pika
import sys

connection = ((host='localhost'))
channel = ()

# Declare an exchange named direct_logs of type direct
# It's good practice to declare the exchage or queue in both the producer and consumer to ensure its presence
channel.exchange_declare(exchange='topic_logs',
             exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = 

# Get parameter:routing_key from command line
severities = [1:]
if not severities:
  print(, "Usage: %s [info] [warning] [error]" % ([0],))
  (1)

for severity in severities:
  # The binding between exchange and queue can accept the routing_key parameter.
  # fanout type exchange directly ignores this parameter. direct type exchange exactly matches this keyword for message routing
  # A consumer can bind multiple routing_keys
  # Exchange is to match this RoutingKey with all the bindingKey of the current Exchange.
  # If the requirements are met, send a message to the Queue bound to the BindingKey.
  channel.queue_bind(exchange='topic_logs',
            queue=queue_name,
            routing_key=severity)

def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body,))


channel.basic_consume(queue=queue_name,
           on_message_callback=callback,
           auto_ack=True)

channel.start_consuming()

Remote procedure call: the client and server are completely decoupled from each other, i.e., both ends are both senders and receivers of messages.

# Producer code
import pika
import uuid


# Encapsulates connection creation, queue declaration, consumer configuration, callback functions, etc., in a single class
class FibonacciRpcClient(object):
  def __init__(self):
    # Establish a connection to RabbitMQ Server
     = ((host='localhost'))

     = ()

    # Declare a temporary callback queue
    result = .queue_declare('', exclusive=True)
    self._queue = 

    # Here the client is both a producer and a consumer, so configure the consume parameter
    # Here's the specification to receive messages from a temporary queue created by the client itself.
    # and use the on_response function to process the message
    # No acknowledgement of messages
    .basic_consume(queue=self._queue,
                  on_message_callback=self.on_response,
                  auto_ack=True)
     = None
    self.corr_id = None

  # Define callback functions
  # Compare the value of the corr_id property of a class with the value of the corr_id property in props
  # If the same then the response attribute is the received message
  def on_response(self, ch, method, props, body):
    if self.corr_id == props.correlation_id:
       = body

  def call(self, n):
    # Initialize the response and corr_id attributes
    self.corr_id = str(uuid.uuid4())

    # Use the default exchange to send messages to the rpc_queue defined in the server
    # Specify the replay_to attribute and the correlation_id attribute in the properties for informing the remote server
    # correlation_id attribute is used to match request and response
    .basic_publish(exchange='',
                  routing_key='rpc_queue',
                  properties=(
                    reply_to=self._queue,
                    correlation_id=self.corr_id,
                  ),
                  # message needs to be a string
                  body=str(n))

    while  is None:
      .process_data_events()

    return int()


# Generate instances of the class
fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
# Call the instance's call method
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
# Consumer code, in this case generating Fibonacci numbers
import pika

# Establish a connection to RabbitMQ Server
connection = ((host='localhost'))
channel = ()

# Declare a queue named rpc_queue
channel.queue_declare(queue='rpc_queue')

# Calculate the Fibonacci number for a given number
def fib(n):
  if n == 0:
    return 0
  elif n == 1:
    return 1
  else:
    return fib(n - 1) + fib(n - 2)

# Callback function, called when a message is received from the queue.
def on_request(ch, method, props, body):
  # Get the number to calculate the Fibonacci number from message
  n = int(body)
  print(" [.] fib(%s)" % n)
  # Call the fib function to get the result of a calculation
  response = fib(n)

  # exchage is an empty string then send the message to the queue specified by routing_key
  # Here the queue is the queue specified by reply_ro in the callback function props parameter.
  # The message to be sent is the calculated Fibonacci number
  # correlation_id in properties is specified as the correlation_id of co in the callback function parameter props
  # Final acknowledgement of the message
  ch.basic_publish(exchange='',
           routing_key=props.reply_to,
           properties=(correlation_id=props.correlation_id),
           body=str(response))
  ch.basic_ack(delivery_tag=method.delivery_tag)


# The queue only dispatches a new message to the consumer if it has already processed and acknowledged the previous message.
channel.basic_qos(prefetch_count=1)

# Set the consumer parameter, i.e., which queue to get the message from and which function to use to process it, and whether or not to acknowledge the message
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")

# Start receiving and processing messages
channel.start_consuming()

This article on the Python implementation of RabbitMQ 6 message model sample code is introduced to this article, more related Python RabbitMQ message model content please search my previous posts or continue to browse the following related articles I hope you will support me in the future!