In the Python asynchronous programming ecosystem, it is the core tool for concurrent task scheduling. However, when faced with massive tasks, uncontrolled concurrency may cause problems such as resource exhaustion and service downgrade. This article will show how to combine the semaphore mechanism to achieve precise concurrency control through actual scenarios and code examples, which not only ensures throughput but also avoids system overload.
1. Analysis of the original behavior
The original design intention is to perform asynchronous tasks in batches, and its default behavior is similar to "full speed sprint":
import asyncio async def task(n): print(f"Task {n} started") await (1) print(f"Task {n} completed") return n async def main(): tasks = [task(i) for i in range(10)] results = await (*tasks) print(f"Total results: {len(results)}") (main())
In this example, all 10 tasks will be started immediately and completed almost simultaneously in 1 second. This "full concurrency" mode has hidden dangers in the following scenarios:
Network request: Thousands of HTTP requests at the same time may be blocked by the target server
File IO: Disk IO-intensive operations will slow down system response
Database connection: An error is reported after exceeding the connection pool limit
2. Semaphore control method: install a "throttle valve" on the concurrent
Accurate concurrent control is achieved by limiting the number of tasks executed simultaneously. Its core mechanism is:
Set the maximum number of concurrencies during initialization (such as 10)
The semaphore must be obtained before each task is executed
Release semaphore after the task is completed
async def controlled_task(sem, n): async with sem: # Get semaphore print(f"Task {n} acquired semaphore") await (1) print(f"Task {n} released semaphore") return n async def main(): sem = (3) # Maximum concurrency 3 tasks = [controlled_task(sem, i) for i in range(10)] results = await (*tasks) print(f"Total results: {len(results)}") (main())
Execution effect:
There are always only 3 tasks that are being executed
Start a new task immediately for every task completed
Total time ≈4 seconds (10/3 rounded up)
3. Advanced control strategy
3.1 Dynamically adjust the concurrency number
Dynamically adjust the semaphore by monitoring the queue length:
async def dynamic_control(): sem = (5) task_queue = () # Producer async def producer(): for i in range(20): await task_queue.put(i) # Consumer async def consumer(): while True: item = await task_queue.get() async with sem: print(f"Processing {item}") await (1) task_queue.task_done() # Dynamic adjustment def monitor(queue): while True: size = () if size > 10: sem._value = max(1, sem._value - 1) elif size < 5: sem._value = min(10, sem._value + 1) (1) await ( producer(), *[consumer() for _ in range(3)], asyncio.to_thread(monitor, task_queue) ) (dynamic_control())
3.2 Batch execution strategy
For hyperscale task sets, batch processing can be used:
def chunked(iterable, chunk_size): for i in range(0, len(iterable), chunk_size): yield iterable[i:i+chunk_size] async def batch_processing(): all_tasks = [task(i) for i in range(100)] for batch in chunked(all_tasks, 10): print(f"Processing batch: {len(batch)} tasks") await (*batch) (batch_processing())
Advantages:
- Avoid memory explosion
- Convenient progress tracking
- Support intermediate state saving
4. Performance comparison and best practices
Control method | Throughput | Resource usage | Implement complexity | Applicable scenarios |
---|---|---|---|---|
No control | high | high | Low | Small task set |
Fixed semaphore | middle | middle | middle | General Scenario |
Dynamic semaphore | Medium-high | Medium low | high | Scenarios that require elastic control |
Batch processing | Low | Low | middle | Super large-scale task set |
Best Practice Recommendations:
Network request task: the number of concurrency is controlled between 5-20
File IO operation: the number of concurrency does not exceed the number of CPU logical cores*2
Database operation: Follow the maximum number of connections limit for connection pools
Always set a reasonable timeout:
try: await asyncio.wait_for(task(), timeout=10) except : print("Task timed out")
5. Common Errors and Solutions
Error 1: Semaphore is not released correctly
# Error example: Async with missingsem = (3) () await task() () # It's easy to forget to release
Solution:
# Correct usageasync with sem: await task() # Automatically get and release
Error 2: Semaphore leakage caused by task exception
async def risky_task(): async with sem: raise Exception("Oops!") # Exception causes sem not to be released
Solution:
async def safe_task(): sem_acquired = False try: async with sem: sem_acquired = True # Perform an error finally: if not sem_acquired: ()
Conclusion
Combined with the semaphore mechanism, it is like installing an intelligent throttle valve to an asynchronous program. By setting concurrent parameters reasonably, the program can not only run efficiently, but also avoid system overload. In actual development, the most appropriate concurrency control strategy should be selected based on task type, resource limitations and SLA requirements. Remember: Excellent concurrency control is not about pursuing maximum speed, but about finding the best balance between performance and stability.
This is the end of this article about the detailed explanation of concurrent control in Python asynchronous programming. For more related Python content, please search for my previous articles or continue browsing the related articles below. I hope everyone will support me in the future!