Python MySQL recovers data through Binlog
Getting the database change records through MySQL's binary log (Binlog) and using it to restore data is a relatively advanced operation.
This usually involves reading events in Binlog, parsing those events to learn more about data changes, and then restoring or rolling back the data based on this information.
In Python, you can use the pymysqlreplication library to read Binlog, but please note that this library itself does not provide direct data recovery capabilities. It can only help you parse events in Binlog. Recovering data requires you to write additional logic based on these events.
Here is an example of using the pymysqlreplication library to get MySQL operation records through Binlog:
1. Install pymysqlreplication
First, you need to install this library. You can use pip to install:
pip install pymysqlreplication
2. Configure MySQL
Make sure your MySQL server has Binlog enabled and you have a MySQL user with sufficient permissions to read Binlog.
3. Write Python scripts
Below is a simple Python script that uses to read Binlog events and print out information about insert, update, and delete operations.
import json import sys from datetime import datetime from pymysqlreplication import BinLogStreamReader from pymysqlreplication.row_event import ( DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent, ) import pandas as pd MYSQL_SETTINGS = { 'host': '127.0.0.1', 'port': 3306, 'user': 'root', 'password': '123456' } # Database and tables to be monitored ssc_sjzz2database_name = 'ssc_wfg' table_name = 't_sys_user' def default(o): if isinstance(o, datetime): return () raise TypeError("Unserializable object {}".format(o)) def main(): stream = BinLogStreamReader( connection_settings=MYSQL_SETTINGS, server_id=6, # Must be different from other replication clients on MySQL server only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent], only_tables=[table_name], only_schemas=[database_name] ) df = () for binlogevent in stream: if == table_name and == database_name: time = binlogevent.formatted_timestamp.replace('T', ' ') timestamp= for row in : event = { "schema": , "table": , "time": time, "timestamp": timestamp, } if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["value"] = (list(row["values"].items()), default=default) # event = dict(() + row["values"].items()) elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["value"] = (list(row["after_values"].items()), default=default) # event = dict(() + row["after_values"].items()) elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["value"] = (list(row["values"].items()), default=default) print((event, default=default)) df = ([df, (event, index=[0])], ignore_index=True) () df.to_excel('', index=False) if __name__ == "__main__": main()
In this script:
-
MYSQL_SETTINGS
:Contains the settings required to connect to the MySQL server. -
BinLogStreamReader
: Contains the settings required to read Binlog, including server_id (must be a unique identifier to distinguish different replication clients) and only_events (specify the event type we are interested in). -
stream
:The function prints out the corresponding SQL statement based on the type of event (delete, update, or insert). -
main
: The function sets up the Binlog stream reader and elegantly closes the stream when any exception is caught. -
pandas
: Output the results to an excel table for analysis and processing of data.
4. Run the script
Run this Python script, which will connect to your MySQL server and start reading events in Binlog.
Whenever a new event occurs (such as insert, update, or delete operations), it prints out the corresponding SQL statement.
Summarize
The above is personal experience. I hope you can give you a reference and I hope you can support me more.