need
In modern distributed systems, real-time acquisition of database change information is a common requirement. For example, in an e-commerce system, when an order table is updated, it may be necessary to synchronize these changes to a search service, a cache service, or notify other microservices. Traditional solutions include methods such as timed polling the database or writing changes to a message queue through triggers, but these solutions are either inefficient or complex. Using Canal + RabbitMQ provides an efficient and reliable way to capture changes in MySQL databases and send them to RabbitMQ for other services to consume.
Canal is an open source tool for incremental subscription and consumption of MySQL database Binlog, which simulates the MySQL master-slave replication mechanism and captures database changes without intrusion into business logic. RabbitMQ is a popular open source messaging broker that supports multiple protocols and provides rich features to ensure the reliability of messaging. Combining these two, a powerful real-time data change listening and processing system can be built.
step
Environment construction
Integrate SpringBoot and Canal to implement clients
Canal integrates RabbitMQ
SpringBoot integrates RabbitMQ
Environment construction
1. Install MySQL
Make sure you have a running instance of MySQL and enable binlog logging. This is the basis for Canal to capture database changes.
# Modify MySQL configuration file or[mysqld] server-id=1 log-bin=mysql-bin binlog-format=ROW
Restart the MySQL service to enable the configuration.
2. Install Canal Server
Download the latest version of Canal Server and unzip it to the right place. Make necessary configurations according to the official documentation, especially the database connection information in the file.
3. Install RabbitMQ
RabbitMQ can be quickly installed through Docker:
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
Visit http://localhost:15672 to log in to the management interface, with the default username/password being guest/guest.
Integrate SpringBoot and Canal to implement clients
Create SpringBoot Project
Use Spring Initializr to create a new Spring Boot project and add Web, JPA, and AMQP (for subsequent integration of RabbitMQ) dependencies.
Introducing Canal dependencies
Add the dependencies of the Canal Client in :
<dependency> <groupId></groupId> <artifactId></artifactId> <version>1.1.5</version> </dependency>
Writing Canal client code
Create a Canal client class that listens for changes in the MySQL database and forwards the change event to RabbitMQ.
import ; import ; import ; public class CanalClient { private final CanalConnector connector; private final Channel channel; public CanalClient(CanalConnector connector, Channel channel) { = connector; = channel; } public void start() { // Canal connection configuration (); (".*\\..*"); // Subscribe to all databases and tables (); while (true) { int batchSize = 1000; EntryBatch batch = (batchSize); // Get a batch of data long batchId = (); int size = ().size(); if (batchId == -1 || size == 0) { try { (1000); } catch (InterruptedException e) { (); } } else { printEntry(()); (batchId); // Submit confirmation } if (().isInterrupted()) { break; } } (); } private void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (() == || () == ) { continue; } RowChange rowChage = null; try { rowChage = (()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + (), e); } EventType eventType = (); (("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s", ().getLogfileName(), ().getLogfileOffset(), ().getSchemaName(), ().getTableName(), eventType)); for (RowData rowData : ()) { if (eventType == ) { sendToRabbitMQ(()); } else if (eventType == ) { sendToRabbitMQ(()); } else { ("-------> before"); sendToRabbitMQ(()); ("-------> after"); sendToRabbitMQ(()); } } } } private void sendToRabbitMQ(List<Column> columns) { StringBuilder message = new StringBuilder(); for (Column column : columns) { (()).append("=").append(()).append(","); } try { ("", "canal_exchange", null, ().getBytes()); } catch (IOException e) { (); } } }
Canal integrates RabbitMQ
Configuring Canal Server
Make sure Canal Server is correctly configured and started to listen to MySQL's binlog logs. Modify the Canal Server configuration file to point to your MySQL instance and set appropriate filtering rules.
Configuring RabbitMQ Exchange
Create an exchange named canal_exchange in RabbitMQ, the type can be selected according to your needs, such as fanout, direct, topic or headers.
rabbitmqadmin declare exchange name=canal_exchange type=fanout
SpringBoot integrates RabbitMQ
Add dependencies
Make sure that the Spring AMQP dependency of RabbitMQ is already included in .
<dependency> <groupId></groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Configure RabbitMQ connection information
Configure the connection parameters of RabbitMQ in or .
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
Create a consumer
Write a consumer class to receive messages from RabbitMQ.
import ; import ; @Component public class CanalMessageConsumer { @RabbitListener(queues = "canal_queue") public void receive(String message) { ("Received message: " + message); } }
Configure queues and bindings
Make sure that the required queues are automatically created when the application starts and bind them to the exchange you created earlier.
import ; import ; import ; import ; import ; @Configuration public class RabbitConfig { @Bean public Queue canalQueue() { return new Queue("canal_queue", false); } @Bean public TopicExchange canalExchange() { return new TopicExchange("canal_exchange"); } @Bean public Binding binding(Queue canalQueue, TopicExchange canalExchange) { return (canalQueue).to(canalExchange).with("#"); } }
Summarize
Through the above steps, we successfully integrated Canal and RabbitMQ into our Spring Boot application. This allows us to listen for changes to the MySQL database in real time and publish these changes as messages to RabbitMQ for other microservices to consume. This method not only improves the system's response speed, but also simplifies the process of data synchronization and reduces development and maintenance costs.
This is the end of this article about the detailed explanation of SpringBoot's integrated Canal+RabbitMQ monitoring data changes. For more related SpringBoot monitoring data changes, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!