introduction
In modern software development, it is very important to improve processing efficiency and concurrency when processing large amounts of files or data.
Python'sasyncio
The library provides a powerful way to implement asynchronous programming, thereby improving the concurrent processing capabilities of the program.
This article will be aimed at the Python junior programmers and introduce how to use itasyncio
andlogging
Module to implement a concurrent processing system for asynchronous batch files.
Code implementation
1. Log configuration
First, we need to configure the log system to record log information when processing files.
Log configuration includes setting log format and output location.
import logging import os # Get the absolute path to the current filecurrent_file = (__file__) # Configure log formatlog_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s' (format=log_format, level=) # Create a file processor and output the log to the filefile_handler = ('') file_handler.setFormatter((log_format)) ().addHandler(file_handler)
2. Asynchronous batch processing class
Next, we define aAsyncBatchProcessor
Class, used to process batch files.
This class usesto control the number of concurrent tasks.
import asyncio import random DEFAULT_MAX_CONCURRENT_TASKS = 2 # Maximum number of concurrent tasksMAX_RETRIES = 3 # Maximum number of retries class AsyncBatchProcessor: def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS): self.max_concurrent = max_concurrent = (max_concurrent) async def process_single_file( self, input_file: str, retry_count: int = 0 ) -> None: """Asynchronous way to handle single files""" async with : # Use semaphores to control concurrency try: (f"Processing file: {input_file}") # simulate file processing await ((0.5, 2.0)) (f"Successfully processed {input_file}") except Exception as e: (f"Error processing {input_file} of Attempt {retry_count}: {str(e)}") if retry_count < MAX_RETRIES: (f"Retrying {input_file} (Attempt {retry_count + 1})") await (1) await self.process_single_file(input_file, retry_count + 1) else: (f"Failed to process {input_file} after {MAX_RETRIES} attempts") async def process_batch( self, file_list: list ) -> None: total_files = len(file_list) (f"Found {total_files} files to process") # Create a work queue queue = () # Put all files into the queue for file_path in file_list: await (file_path) # Create a work coroutine async def worker(worker_id: int): while True: try: # Get tasks in non-blocking input_file_path = await () (f"Worker {worker_id} processing: {input_file_path}") try: await self.process_single_file(input_file_path) except Exception as e: (f"Error processing {input_file_path}: {str(e)}") finally: queue.task_done() except : # Queue is empty, work is over break except Exception as e: (f"Worker {worker_id} encountered error: {str(e)}") break # Create a job task workers = [] for i in range(self.max_concurrent): worker_task = asyncio.create_task(worker(i)) (worker_task) # Wait for queue processing to complete await () # Cancel all work tasks that are still running for w in workers: () # Wait for all work tasks to be completed await (*workers, return_exceptions=True)
3. Asynchronous batch entry function
Finally, we define an asynchronous batch entry functionbatch_detect
, used to start batch processing tasks.
async def batch_detect( file_list: list, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS ): """Async batch entry function""" processor = AsyncBatchProcessor(max_concurrent) await processor.process_batch(file_list) # Sample callfile_list = ["", "", "", ""] (batch_detect(file_list))
Code explanation
1. Log configuration:
- use
logging
The module records log information, including time, log level, file path and line number, and log messages. - Log output to file
for subsequent viewing and analysis.
2. Asynchronous batch processing classAsyncBatchProcessor
:
-
__init__
Method initializes the maximum number of concurrent tasks and semaphores. -
process_single_file
Methods process individual files, use semaphores to control concurrency, simulate file processing, and try again if it fails. -
process_batch
Methods process batch files, create work queues and coroutines, and control the execution of concurrent tasks.
3. Asynchronous batch entry functionbatch_detect
:
- create
AsyncBatchProcessor
instance and callprocess_batch
Method starts a batch task.
Summarize
By usingasyncio
andlogging
Module, we implement an efficient asynchronous batch file system.
The system is able to process a large number of files concurrently and automatically retry when processing fails until the maximum number of retries is reached.
The log system helps us record the processing process of each file, which facilitates subsequent debugging and analysis.
The above is personal experience. I hope you can give you a reference and I hope you can support me more.