Python exports all table data in the MySQL database as CSV files to one directory, compresses it into a zip file to another directory, and then decompresses all zip files in this directory to a third directory. If you do not use the Pandas library, you need to consider that SQL result sets are exported in batches of large data. The program performance is improved through multi-threading and asynchronous operations. The program needs exception handling and output, output error information when errors occur, the running status of the exported data, the number of rows of the table data and the running time stamp of each query, the export time, and the number of logs of each file record.
The script has been fully designed with large data volumes, exception handling and performance optimization in mind, and is able to handle most common scenarios. The batch size (batch_size) and thread count (max_workers) can be further adjusted according to the specific needs for best performance.
import os import csv import zipfile import logging import from datetime import datetime import time import import glob #Configuration log( level=, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ ('data_export.log'), () ] ) logger = (__name__) def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000): """Export data from a single table to a CSV file, batch processing""" conn = None cursor = None total_rows = 0 try: conn = (**db_config) cursor = () # Get data and write to CSV with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: writer = (csvfile) # Execute the query and get the column name (f"SELECT * FROM `{table_name}`") columns = [col[0] for col in ] (columns) # Get data in batches while True: rows = (batch_size) if not rows: break (rows) total_rows += len(rows) (f"{table_name} Exported {total_rows} OK") (f"{table_name} CSVExport is complete,总OK数:{total_rows}") return total_rows except Exception as e: (f"Export table {table_name} fail: {str(e)}", exc_info=True) raise finally: if cursor: () if conn and conn.is_connected(): () def compress_to_zip(source_path, zip_path): """Compressed file in ZIP format""" try: with (zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: (source_path, arcname=(source_path)) (f"Successfully compressed {source_path} arrive {zip_path}") except Exception as e: (f"compression {source_path} fail: {str(e)}", exc_info=True) raise def process_table(table_name, db_config, csv_dir, zip_dir): """Train the export and compression of a single table""" start_time = () (f"Start processing table: {table_name}") status = "success" rows_exported = 0 try: # Define file path csv_filename = f"{table_name}.csv" zip_filename = f"{table_name}.zip" csv_path = (csv_dir, csv_filename) zip_path = (zip_dir, zip_filename) # Export CSV rows_exported = export_table_to_csv(table_name, csv_path, db_config) # Compress files compress_to_zip(csv_path, zip_path) except Exception as e: status = f"fail: {str(e)}" # Clean up possible intermediate files for path in [csv_path, zip_path]: if path and (path): try: (path) (f"Files have been cleaned: {path}") except Exception as clean_error: (f"清理文件fail: {clean_error}") finally: duration = () - start_time log_message = ( f"Table processing is completed - Table name: {table_name}, " f"state: {status}, " f"导出OK数: {rows_exported}, " f"time consuming: {duration:.2f}Second" ) (log_message) def unzip_files(zip_dir, unzip_dir): """Decompress all ZIP files in the specified directory""" zip_files = ((zip_dir, '*.zip')) if not zip_files: ("No ZIP file found, skip decompression") return with () as executor: futures = [] for zip_path in zip_files: (( lambda: extract_zip(zip_path, unzip_dir) )) for future in .as_completed(futures): try: () except Exception as e: (f"An error occurred during decompression: {str(e)}") def extract_zip(zip_path, unzip_dir): """Decompress a single ZIP file""" try: start_time = () with (zip_path, 'r') as zip_ref: zip_ref.extractall(unzip_dir) duration = () - start_time (f"Decompression is complete: {zip_path} => {unzip_dir} (time consuming: {duration:.2f}Second)") except Exception as e: (f"Decompression {zip_path} fail: {str(e)}", exc_info=True) raise def main(): #Configuration parameters db_config = { 'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database' } # Directory configuration base_dir = ((__file__)) csv_dir = (base_dir, 'csv_exports') zip_dir = (base_dir, 'zip_archives') unzip_dir = (base_dir, 'unzipped_files') # Create a directory for dir_path in [csv_dir, zip_dir, unzip_dir]: (dir_path, exist_ok=True) (f"Directory is ready: {dir_path}") # Get all table names try: conn = (**db_config) cursor = () ("SHOW TABLES") tables = [table[0] for table in ()] (f"Discover {len(tables)} A table that needs to be processed") except Exception as e: (f"获取数据库表fail: {str(e)}", exc_info=True) return finally: if 'cursor' in locals(): () if 'conn' in locals() and conn.is_connected(): () # Process all tables (multi-threaded export and compression) with (max_workers=4) as executor: futures = [] for table in tables: (( process_table, table, db_config, csv_dir, zip_dir )) # Process task results for future in .as_completed(futures): try: () except Exception as e: (f"Table handling exception: {str(e)}") # Unzip all ZIP files (multi-threaded decompression) ("Start decompressing all ZIP files") unzip_files(zip_dir, unzip_dir) ("All processing process is completed") if __name__ == "__main__": main()
Key Feature Description:
1. Process big data in batches:
- Use fetchmany(batch_size) to get data in batches (default 1000 rows per batch)
- Streaming reduces memory usage
2. Multithreaded processing:
- Use ThreadPoolExecutor to handle export and compression of different tables in parallel
- Independent database connection pool (each thread has its own connection)
- Parallel decompression processing
3. Exception handling:
- Comprehensive try-except blocks cover all key operations
- Automatically clean up intermediate files generated when failure
- Detailed error logging (including stack trace)
4. Logging:
- output to files and terminals simultaneously
- Record key information such as timestamps, operation type, status, time consumption, etc.
- Contains the processing results statistics for each table
5. File management:
- Automatically create the required directory
- High-efficiency compression using ZIP_DEFLATED
- Secure file path processing
6. Performance optimization:
- Use server-side cursors to avoid memory overload
- Configurable batch size and number of threads
- Asynchronous I/O operation
Instructions for use:
Installation dependencies:
pip install mysql-connector-python
Modify the configuration:
Update database connection information in db_config
Adjust directory paths as needed (csv_dir, zip_dir, unzip_dir)
Run the script:
python
View the log:
Real-time terminal output
Detailed log file data_export.log
Extension suggestions:
Accept database configuration and directory paths through command line parameters
Add email notification function (notify when processing is completed or failed)
Implement breakpoint continuous transmission function
Add file checksum (MD5 checksum)
Support configuration files (YAML/JSON format)
Add progress bar display
This is the article about Python implementing exporting all table data in MySQL into CSV files and compressing it. For more related Python MySQL data exporting to CSV content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!