SoFunction
Updated on 2025-04-13

Python implementation exports data from all tables in MySQL as CSV files and compresses them

Python exports all table data in the MySQL database as CSV files to one directory, compresses it into a zip file to another directory, and then decompresses all zip files in this directory to a third directory. If you do not use the Pandas library, you need to consider that SQL result sets are exported in batches of large data. The program performance is improved through multi-threading and asynchronous operations. The program needs exception handling and output, output error information when errors occur, the running status of the exported data, the number of rows of the table data and the running time stamp of each query, the export time, and the number of logs of each file record.

The script has been fully designed with large data volumes, exception handling and performance optimization in mind, and is able to handle most common scenarios. The batch size (batch_size) and thread count (max_workers) can be further adjusted according to the specific needs for best performance.

import os
import csv
import zipfile
import logging
import 
from datetime import datetime
import time
import 
import glob

#Configuration log(
    level=,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        ('data_export.log'),
        ()
    ]
)
logger = (__name__)

def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000):
    """Export data from a single table to a CSV file, batch processing"""
    conn = None
    cursor = None
    total_rows = 0
    try:
        conn = (**db_config)
        cursor = ()

        # Get data and write to CSV        with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile:
            writer = (csvfile)
            
            # Execute the query and get the column name            (f"SELECT * FROM `{table_name}`")
            columns = [col[0] for col in ]
            (columns)
            
            # Get data in batches            while True:
                rows = (batch_size)
                if not rows:
                    break
                (rows)
                total_rows += len(rows)
                (f"{table_name} Exported {total_rows} OK")

        (f"{table_name} CSVExport is complete,总OK数:{total_rows}")
        return total_rows

    except Exception as e:
        (f"Export table {table_name} fail: {str(e)}", exc_info=True)
        raise
    finally:
        if cursor:
            ()
        if conn and conn.is_connected():
            ()

def compress_to_zip(source_path, zip_path):
    """Compressed file in ZIP format"""
    try:
        with (zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            (source_path, arcname=(source_path))
        (f"Successfully compressed {source_path} arrive {zip_path}")
    except Exception as e:
        (f"compression {source_path} fail: {str(e)}", exc_info=True)
        raise

def process_table(table_name, db_config, csv_dir, zip_dir):
    """Train the export and compression of a single table"""
    start_time = ()
    (f"Start processing table: {table_name}")
    status = "success"
    rows_exported = 0

    try:
        # Define file path        csv_filename = f"{table_name}.csv"
        zip_filename = f"{table_name}.zip"
        csv_path = (csv_dir, csv_filename)
        zip_path = (zip_dir, zip_filename)

        # Export CSV        rows_exported = export_table_to_csv(table_name, csv_path, db_config)
        
        # Compress files        compress_to_zip(csv_path, zip_path)

    except Exception as e:
        status = f"fail: {str(e)}"
        # Clean up possible intermediate files        for path in [csv_path, zip_path]:
            if path and (path):
                try:
                    (path)
                    (f"Files have been cleaned: {path}")
                except Exception as clean_error:
                    (f"清理文件fail: {clean_error}")

    finally:
        duration = () - start_time
        log_message = (
            f"Table processing is completed - Table name: {table_name}, "
            f"state: {status}, "
            f"导出OK数: {rows_exported}, "
            f"time consuming: {duration:.2f}Second"
        )
        (log_message)

def unzip_files(zip_dir, unzip_dir):
    """Decompress all ZIP files in the specified directory"""
    zip_files = ((zip_dir, '*.zip'))
    if not zip_files:
        ("No ZIP file found, skip decompression")
        return

    with () as executor:
        futures = []
        for zip_path in zip_files:
            ((
                lambda: extract_zip(zip_path, unzip_dir)
            ))
        for future in .as_completed(futures):
            try:
                ()
            except Exception as e:
                (f"An error occurred during decompression: {str(e)}")

def extract_zip(zip_path, unzip_dir):
    """Decompress a single ZIP file"""
    try:
        start_time = ()
        with (zip_path, 'r') as zip_ref:
            zip_ref.extractall(unzip_dir)
        duration = () - start_time
        (f"Decompression is complete: {zip_path} => {unzip_dir} (time consuming: {duration:.2f}Second)")
    except Exception as e:
        (f"Decompression {zip_path} fail: {str(e)}", exc_info=True)
        raise

def main():
    #Configuration parameters    db_config = {
        'host': 'localhost',
        'user': 'your_username',
        'password': 'your_password',
        'database': 'your_database'
    }
    
    # Directory configuration    base_dir = ((__file__))
    csv_dir = (base_dir, 'csv_exports')
    zip_dir = (base_dir, 'zip_archives')
    unzip_dir = (base_dir, 'unzipped_files')

    # Create a directory    for dir_path in [csv_dir, zip_dir, unzip_dir]:
        (dir_path, exist_ok=True)
        (f"Directory is ready: {dir_path}")

    # Get all table names    try:
        conn = (**db_config)
        cursor = ()
        ("SHOW TABLES")
        tables = [table[0] for table in ()]
        (f"Discover {len(tables)} A table that needs to be processed")
    except Exception as e:
        (f"获取数据库表fail: {str(e)}", exc_info=True)
        return
    finally:
        if 'cursor' in locals():
            ()
        if 'conn' in locals() and conn.is_connected():
            ()

    # Process all tables (multi-threaded export and compression)    with (max_workers=4) as executor:
        futures = []
        for table in tables:
            ((
                process_table,
                table,
                db_config,
                csv_dir,
                zip_dir
            ))

        # Process task results        for future in .as_completed(futures):
            try:
                ()
            except Exception as e:
                (f"Table handling exception: {str(e)}")

    # Unzip all ZIP files (multi-threaded decompression)    ("Start decompressing all ZIP files")
    unzip_files(zip_dir, unzip_dir)
    ("All processing process is completed")

if __name__ == "__main__":
    main()

Key Feature Description:

1. Process big data in batches:

  • Use fetchmany(batch_size) to get data in batches (default 1000 rows per batch)
  • Streaming reduces memory usage

2. Multithreaded processing:

  • Use ThreadPoolExecutor to handle export and compression of different tables in parallel
  • Independent database connection pool (each thread has its own connection)
  • Parallel decompression processing

3. Exception handling:

  • Comprehensive try-except blocks cover all key operations
  • Automatically clean up intermediate files generated when failure
  • Detailed error logging (including stack trace)

4. Logging:

  • output to files and terminals simultaneously
  • Record key information such as timestamps, operation type, status, time consumption, etc.
  • Contains the processing results statistics for each table

5. File management:

  • Automatically create the required directory
  • High-efficiency compression using ZIP_DEFLATED
  • Secure file path processing

6. Performance optimization:

  • Use server-side cursors to avoid memory overload
  • Configurable batch size and number of threads
  • Asynchronous I/O operation

Instructions for use:

Installation dependencies:

pip install mysql-connector-python

Modify the configuration:

Update database connection information in db_config

Adjust directory paths as needed (csv_dir, zip_dir, unzip_dir)

Run the script:

python 

View the log:

Real-time terminal output

Detailed log file data_export.log

Extension suggestions:

Accept database configuration and directory paths through command line parameters

Add email notification function (notify when processing is completed or failed)

Implement breakpoint continuous transmission function

Add file checksum (MD5 checksum)

Support configuration files (YAML/JSON format)

Add progress bar display

This is the article about Python implementing exporting all table data in MySQL into CSV files and compressing it. For more related Python MySQL data exporting to CSV content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!