SoFunction
Updated on 2025-03-03

Introduction to Python asynchronous programming to implement concurrent processing methods for file batch processing

introduction

In modern software development, it is very important to improve processing efficiency and concurrency when processing large amounts of files or data.

Python'sasyncioThe library provides a powerful way to implement asynchronous programming, thereby improving the concurrent processing capabilities of the program.

This article will be aimed at the Python junior programmers and introduce how to use itasyncioandloggingModule to implement a concurrent processing system for asynchronous batch files.

Code implementation

1. Log configuration

First, we need to configure the log system to record log information when processing files.

Log configuration includes setting log format and output location.

import logging
import os

# Get the absolute path to the current filecurrent_file = (__file__)

# Configure log formatlog_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
(format=log_format, level=)

# Create a file processor and output the log to the filefile_handler = ('')
file_handler.setFormatter((log_format))
().addHandler(file_handler)

2. Asynchronous batch processing class

Next, we define aAsyncBatchProcessorClass, used to process batch files.

This class usesto control the number of concurrent tasks.

import asyncio
import random

DEFAULT_MAX_CONCURRENT_TASKS = 2  # Maximum number of concurrent tasksMAX_RETRIES = 3  # Maximum number of retries
class AsyncBatchProcessor:
    def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS):
        self.max_concurrent = max_concurrent
         = (max_concurrent)

    async def process_single_file(
            self,
            input_file: str,
            retry_count: int = 0
    ) -> None:
        """Asynchronous way to handle single files"""
        async with :  # Use semaphores to control concurrency            try:
                (f"Processing file: {input_file}")

                # simulate file processing                await ((0.5, 2.0))

                (f"Successfully processed {input_file}")

            except Exception as e:
                (f"Error processing {input_file} of Attempt {retry_count}: {str(e)}")
                if retry_count < MAX_RETRIES:
                    (f"Retrying {input_file} (Attempt {retry_count + 1})")
                    await (1)
                    await self.process_single_file(input_file, retry_count + 1)
                else:
                    (f"Failed to process {input_file} after {MAX_RETRIES} attempts")

    async def process_batch(
            self,
            file_list: list
    ) -> None:
        total_files = len(file_list)
        (f"Found {total_files} files to process")

        # Create a work queue        queue = ()

        # Put all files into the queue        for file_path in file_list:
            await (file_path)

        # Create a work coroutine        async def worker(worker_id: int):
            while True:
                try:
                    # Get tasks in non-blocking                    input_file_path = await ()
                    (f"Worker {worker_id} processing: {input_file_path}")

                    try:
                        await self.process_single_file(input_file_path)
                    except Exception as e:
                        (f"Error processing {input_file_path}: {str(e)}")
                    finally:
                        queue.task_done()

                except :
                    # Queue is empty, work is over                    break
                except Exception as e:
                    (f"Worker {worker_id} encountered error: {str(e)}")
                    break

        # Create a job task        workers = []
        for i in range(self.max_concurrent):
            worker_task = asyncio.create_task(worker(i))
            (worker_task)

        # Wait for queue processing to complete        await ()

        # Cancel all work tasks that are still running        for w in workers:
            ()

        # Wait for all work tasks to be completed        await (*workers, return_exceptions=True)

3. Asynchronous batch entry function

Finally, we define an asynchronous batch entry functionbatch_detect, used to start batch processing tasks.

async def batch_detect(
        file_list: list,
        max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS
):
    """Async batch entry function"""
    processor = AsyncBatchProcessor(max_concurrent)
    await processor.process_batch(file_list)

# Sample callfile_list = ["", "", "", ""]
(batch_detect(file_list))

Code explanation

1. Log configuration

  • useloggingThe module records log information, including time, log level, file path and line number, and log messages.
  • Log output to filefor subsequent viewing and analysis.

2. Asynchronous batch processing classAsyncBatchProcessor

  • __init__Method initializes the maximum number of concurrent tasks and semaphores.
  • process_single_fileMethods process individual files, use semaphores to control concurrency, simulate file processing, and try again if it fails.
  • process_batchMethods process batch files, create work queues and coroutines, and control the execution of concurrent tasks.

3. Asynchronous batch entry functionbatch_detect

  • createAsyncBatchProcessorinstance and callprocess_batchMethod starts a batch task.

Summarize

By usingasyncioandloggingModule, we implement an efficient asynchronous batch file system.

The system is able to process a large number of files concurrently and automatically retry when processing fails until the maximum number of retries is reached.

The log system helps us record the processing process of each file, which facilitates subsequent debugging and analysis.

The above is personal experience. I hope you can give you a reference and I hope you can support me more.