SoFunction
Updated on 2025-04-13

Detailed explanation of concurrent control in Python asynchronous programming

In the Python asynchronous programming ecosystem, it is the core tool for concurrent task scheduling. However, when faced with massive tasks, uncontrolled concurrency may cause problems such as resource exhaustion and service downgrade. This article will show how to combine the semaphore mechanism to achieve precise concurrency control through actual scenarios and code examples, which not only ensures throughput but also avoids system overload.

1. Analysis of the original behavior

The original design intention is to perform asynchronous tasks in batches, and its default behavior is similar to "full speed sprint":

import asyncio
 
async def task(n):
    print(f"Task {n} started")
    await (1)
    print(f"Task {n} completed")
    return n
 
async def main():
    tasks = [task(i) for i in range(10)]
    results = await (*tasks)
    print(f"Total results: {len(results)}")
 
(main())

In this example, all 10 tasks will be started immediately and completed almost simultaneously in 1 second. This "full concurrency" mode has hidden dangers in the following scenarios:

Network request: Thousands of HTTP requests at the same time may be blocked by the target server

File IO: Disk IO-intensive operations will slow down system response

Database connection: An error is reported after exceeding the connection pool limit

2. Semaphore control method: install a "throttle valve" on the concurrent

Accurate concurrent control is achieved by limiting the number of tasks executed simultaneously. Its core mechanism is:

Set the maximum number of concurrencies during initialization (such as 10)

The semaphore must be obtained before each task is executed

Release semaphore after the task is completed

async def controlled_task(sem, n):
    async with sem:  # Get semaphore        print(f"Task {n} acquired semaphore")
        await (1)
        print(f"Task {n} released semaphore")
        return n
 
async def main():
    sem = (3)  # Maximum concurrency 3    tasks = [controlled_task(sem, i) for i in range(10)]
    results = await (*tasks)
    print(f"Total results: {len(results)}")
 
(main())

Execution effect:

There are always only 3 tasks that are being executed
Start a new task immediately for every task completed
Total time ≈4 seconds (10/3 rounded up)

3. Advanced control strategy

3.1 Dynamically adjust the concurrency number

Dynamically adjust the semaphore by monitoring the queue length:

async def dynamic_control():
    sem = (5)
    task_queue = ()
    
    # Producer    async def producer():
        for i in range(20):
            await task_queue.put(i)
    
    # Consumer    async def consumer():
        while True:
            item = await task_queue.get()
            async with sem:
                print(f"Processing {item}")
                await (1)
            task_queue.task_done()
    
    # Dynamic adjustment    def monitor(queue):
        while True:
            size = ()
            if size > 10:
                sem._value = max(1, sem._value - 1)
            elif size < 5:
                sem._value = min(10, sem._value + 1)
            (1)
    
    await (
        producer(),
        *[consumer() for _ in range(3)],
        asyncio.to_thread(monitor, task_queue)
    )
 
(dynamic_control())

3.2 Batch execution strategy

For hyperscale task sets, batch processing can be used:

def chunked(iterable, chunk_size):
    for i in range(0, len(iterable), chunk_size):
        yield iterable[i:i+chunk_size]
 
async def batch_processing():
    all_tasks = [task(i) for i in range(100)]
    
    for batch in chunked(all_tasks, 10):
        print(f"Processing batch: {len(batch)} tasks")
        await (*batch)
 
(batch_processing())

Advantages:

  • Avoid memory explosion
  • Convenient progress tracking
  • Support intermediate state saving

4. Performance comparison and best practices

Control method Throughput Resource usage Implement complexity Applicable scenarios
No control high high Low Small task set
Fixed semaphore middle middle middle General Scenario
Dynamic semaphore Medium-high Medium low high Scenarios that require elastic control
Batch processing Low Low middle Super large-scale task set

Best Practice Recommendations:

Network request task: the number of concurrency is controlled between 5-20

File IO operation: the number of concurrency does not exceed the number of CPU logical cores*2

Database operation: Follow the maximum number of connections limit for connection pools

Always set a reasonable timeout:

try:
    await asyncio.wait_for(task(), timeout=10)
except :
    print("Task timed out")

5. Common Errors and Solutions

Error 1: Semaphore is not released correctly

# Error example: Async with missingsem = (3)
()
await task()
()  # It's easy to forget to release

Solution:

# Correct usageasync with sem:
    await task()  # Automatically get and release

Error 2: Semaphore leakage caused by task exception

async def risky_task():
    async with sem:
        raise Exception("Oops!")  # Exception causes sem not to be released

Solution:

async def safe_task():
    sem_acquired = False
    try:
        async with sem:
            sem_acquired = True
            # Perform an error    finally:
        if not sem_acquired:
            ()

Conclusion

Combined with the semaphore mechanism, it is like installing an intelligent throttle valve to an asynchronous program. By setting concurrent parameters reasonably, the program can not only run efficiently, but also avoid system overload. In actual development, the most appropriate concurrency control strategy should be selected based on task type, resource limitations and SLA requirements. Remember: Excellent concurrency control is not about pursuing maximum speed, but about finding the best balance between performance and stability.

This is the end of this article about the detailed explanation of concurrent control in Python asynchronous programming. For more related Python content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!