Definition of Concurrent Programs
concurrent program(Coroutine
), also known as microthreads and fibers. (A co-thread is a user-state lightweight thread)
corresponds English -ity, -ism, -ization: while executing function A, you can interrupt at any time to execute function B, and then interrupt function B to continue executing function A (which can be switched automatically), but this process is not a function call (there is no call statement), the process is very much like multi-threading, however, there is only one thread executing the concurrent program
colloquial understanding: a function in a thread, can be anywhere in the current function to save some temporary variables and other information, and then switch to another function to execute, note that it is not done by calling the function, and the number of times to switch and when to switch back to the original function are determined by the developer themselves
Differences between concatenation and threading
In multitasking, thread switching is much more than just saving and restoring the CPU context at the system level. The operating system has its own cache and other data for each thread to run the program efficiently, and the operating system will also help you to do these data recovery operations. So thread switching is very performance intensive. But the switching of concurrent threads is simply a matter ofContext of the operating CPU
So the system is resistant to switching a million times a second.
Criteria for Concurrent Programs
Concurrency must be realized in only one single thread.
- Modifying shared data without locks
- Context stack for keeping multiple control flows in the user program itself
- One thread automatically switches to another thread when it encounters an IO operation.
Advantages of Concurrent Programs
- Because it has its own context and stack, there is no need for the overhead of thread context switching, which belongs to program-level switching and is completely unnoticeable by the operating system, thus making it more lightweight.
- No need for the locking and synchronization overhead of atomic operations
- Easy switching of control flow and simplified programming model
- The effect of concurrency can be achieved within a single thread, maximizing the use of cpu, and high scalability, low cost
Disadvantages of Concurrent Programs
- Inability to utilize multi-core resources: the essence of the concatenation is a single thread, it can not be a single CPU at the same time on multiple cores, the concatenation needs to work with the process in order to run on multiple CPUs.
- Blocking operations (e.g. IO) block the entire program.
- It doesn't make any sense to switch back and forth between computational operations, and switching back and forth and saving state can in turn degrade performance.
Ways to implement concatenation in python
-
greenlet
, is a third-party module for implementing concurrent code (Gevent concurrent is based on the greenlet implementation) -
yield
, generators, with the help of generator features can also implement concurrent code. -
asyncio
, a module introduced in Python 3.4 for writing concurrent code. -
async & awiat
, two keywords introduced in Python 3.5, in combination with theasyncio
module makes it easier to write concurrent code (recommended).
async&await keyword
There are a variety of ways to implement a concatenation, the most popular way at the moment is theasync&await
If you want to know about other ways of doing things, this article is about one of the most popular ones.
There are two things you need to know about using co-programming, the event loop and defining co-programmed functions.
event loop
The event loop is an effective way to deal with multiple concurrencies, it can be understood as a dead loop, the loop process to detect and execute certain code, let's look at the following pseudo-code
Task List = [mandates1, mandates2, mandates3............] while True: 可执行的Task List,已完成的Task List = 去Task List中检查所有的mandates,commander-in-chief (military)'Executable'cap (a poem)'Completed'的mandates返回 for 就绪mandates in 可执行的Task List: 执行已就绪的mandates for 已完成的mandates in 已完成的Task List: 在Task List中移除 已完成的mandates 如果Task List中的mandates都已完成,then the loop is terminated
The above pseudo-code means: get in the event loop, and then constantly listen to the task list, execute tasks as they are available, and remove tasks until all tasks in the task list are completed, terminating the loop.
Benefits of using the event loop: allows the programmer not to control the addition and deletion of tasks and events
The code is written as follows:
import asyncio # Get the event loop loop = asyncio.get_event_loop() # Put tasks in `task list', listen to event loop loop.run_until_complete(mandates) # Close the event ()
Concurrent Functions and Concurrent Objects
Want to define a concurrent function, format:async def function name
Concurrent object: the concurrent object obtained by executing the concurrent function ().
# Define a concurrent function async def func(): pass # Create a concurrent object result = func()
Note: Execute the concurrent function, create the concurrent object, the function code will not run, if you want to run the internal code of the concurrent function, you must give the concurrent object to the event loop to handle, see the following code
import asyncio async def func(): print("Hello.") result = func() # Mode 1 loop = asyncio.get_event_loop() loop.run_until_complete(result) # Mode 2 (result) # python3.7style of writing (literary style)
await
wait is a keyword that can only be used in a concurrent function. It is used to hang the current concurrent program (task) when encountering an IO operation. During the hanging process of the current concurrent program (task), the event loop can go to execute other concurrent programs (tasks), and the current concurrent program can switch back to executing the code after the completion of the IO processing.
Example: we have created two tasks, a download image, a download video, we first execute the download image task, when encountered io operation, under normal circumstances will wait for the picture to finish downloading, but await can be suspended first download image task, and then automatically switch to download video tasks
Usage:await + waitable objects (coprogram objects, Future objects, Task objects)
Case 1
import asyncio async def func(): print("Execute the code inside the coprocessor function.") # Hang the current thread (task) when it encounters an IO operation, and wait for the IO operation to complete before proceeding further. # The event loop can go to other threads (tasks) when the current thread hangs. response = await (2) print("The IO request ended with the following result:", response) result = func() (result)
Case 2
import asyncio async def others(): print("start") # ④Print start await (2) # ⑤ Wait for 2 seconds, during which you can switch to another program. print("end") # ⑥ Print end return 'Return Value' async def func(): print("Execute the code inside the coprocessor function.") # ② Execute the coprocessor function and print the print code. response = await others() # ③ Wait for the co-programming function others print(f"ioRequest closed,prove{response}") # ⑦ Wait for others to finish and print the print statement. if __name__ == '__main__': (func()) # ①Concurrent functions run in an event loop
All of the above examples only create one task, i.e.: there is only one task in the task list of the event loop, so it is not possible to demonstrate the switch to other task effect while IO is waiting. In the program want to create multiple task objects, you need to use the Task object to achieve.
Task object
Tasks is used to concurrently schedule co-programs via theasyncio.create_task(concatenation object)
This allows the concurrent program to join the event loop and wait to be scheduled for execution. In addition to using theasyncio.create_task()
Functions other than the low-levelloop.create_task()
maybeensure_future()
function. It is not recommended to manually instantiate theTask
Object.
Essentially, it encapsulates the concatenation object into a task object and adds the concatenation to the event loop immediately, while tracking the concatenation's state.
take note of:asyncio.create_task()
functions were added in Python 3.7. Prior to Python 3.7, it was possible to instead use the low-levelasyncio.ensure_future()
function.
Case 1
import asyncio async def func(): print(1) await (2) print(2) return "Return Value" async def main(): print("Main start") # Create a concurrent program, encapsulate the concurrent program in a Task object and immediately add it to the list of tasks in the event loop, waiting for the event loop to execute it (the default is the ready state). task1 = asyncio.create_task(func()) # Create a concurrent program, encapsulate the concurrent program in a Task object and immediately add it to the list of tasks in the event loop, waiting for the event loop to execute it (the default is the ready state). task2 = asyncio.create_task(func()) print("End of main") # Automatically switch to execute other tasks when the execution of a particular concatenation encounters an IO operation. # wait here is to wait for the corresponding concurrent program to finish executing and get the result ret1 = await task1 ret2 = await task2 print(ret1, ret2) (main())
Case await + task list (most used)
import asyncio async def func(): print(1) await (2) print(2) return "Return Value" async def main(): print("Main start") # Create a concurrent program, encapsulate the concurrent program in a Task object and add it to the list of tasks in the event loop, and wait for the event loop to execute it (the default is the ready state). # In the call to task_list = [asyncio.create_task(func()), asyncio.create_task(func())] print("End of main") # Automatically switch to execute other tasks when the execution of a particular concatenation encounters an IO operation. # wait here is to wait for the execution of all concatenations to complete and save the return values of all concatenations to done # If the timeout value is set, it means the maximum number of seconds to wait here, the return value of the completed concatenation is written to done, and the unfinished one is written to pending. done, pending = await (task_list) print(done) (main())
take note of: The source code internally executes each concatenation in the list of
ensure_future
thus encapsulatingTask
object, so there is no need to use it in conjunction with thewait
when used in combinationtask_list
The value of the[func(),func()]
It's also possible.
boyfriend
The Future object in asyncio is a relatively low-level object, usually we don't use this object directly, but use the Task object directly to complete the task and state tracking. (Task is a subclass of Futrue.)
Future provides us with handling of the end result in asynchronous programming (the Task class also has state handling).
Case 1
async def main(): # Get the current event loop loop = asyncio.get_running_loop() # Create a task (Future object) that does nothing. fut = loop.create_future() # Wait for the final result of the task (Future object), without which it will wait forever. await fut (main())
The result is that the program keeps waiting and won't end
Case 2
import asyncio async def set_after(fut): await (2) fut.set_result("666") async def main(): # Get the current event loop loop = asyncio.get_running_loop() # Creating a task (Future object) with no behavior bound to it means that the task will never know when to end. fut = loop.create_future() # Create a task (Task object), bound to the set_after function, which internally assigns a value to fut after 2s. # i.e. manually set the final result of the future task, then the fut can end. await loop.create_task(set_after(fut)) # Wait for the Future object to get the final result, otherwise it waits forever. data = await fut print(data) (main())
The Future object itself is bound to a function, so if you want the event loop to get the result of the Future, you need to set it manually. The Task object inherits the Future object, in fact, it extends Future, he can realize that after the corresponding bound function execution is completed, automatically execute set_result, so as to realize the automatic end.
Although, the usual use of Task objects, but the processing of the results of the essence of the implementation is based on the Future object.
boyfriend
There is also a Future object in Python's module that is used when implementing asynchronous operations based on thread pools and process pools.
import time from import Future from import ThreadPoolExecutor from import ProcessPoolExecutor def func(value): (1) print(value) pool = ThreadPoolExecutor(max_workers=5) # or pool = ProcessPoolExecutor(max_workers=5) for i in range(10): fut = (func, i) print(fut)
The two Future objects are different, they are designed for different application scenarios, for example:unsupported
await
Grammar, etc.
In Python provides a way to convert the object wrapped in
Functions for objects
asynic.wrap_future
。
The next question you must be asking is: why does python provide this functionality?
In fact, generally in program development we either uniformly use theasycio
The asynchronous operation is implemented using either the process pool or the thread pool, but if the asynchrony of the protocols is mixed with the asynchrony of the process pool/thread pool, then this feature will be used. However, if the asynchrony of a concatenation is mixed with the asynchrony of a process/thread pool, then this feature is used.
import time import asyncio import def func1(): # A time-consuming operation (2) return "OK" async def main(): loop = asyncio.get_running_loop() # way (of life)1. Run in the default loop's executor ( default (setting)ThreadPoolExecutor ) # The first step: the internal will first call the ThreadPoolExecutor submit method to apply for a thread in the thread pool to execute func1 function, and return an object # Step 2: Call asyncio.wrap_future to wrap the object. # Because objects don't support the await syntax, they need to be wrapped as objects in order to be used. fut = loop.run_in_executor(None, func1) result = await fut print('default thread pool', result) # way (of life)2. Run in a custom thread pool: # with () as pool: # result = await loop.run_in_executor( # pool, func1) # print('custom thread pool', result) # way (of life)3. Run in a custom process pool: # with () as pool: # result = await loop.run_in_executor( # pool, func1) # print('custom process pool', result) (main())
application scenario: When the project is developed with asynchronous programming in a concurrent manner, if you want to use a third-party module that does not support asynchronous programming in a concurrent manner, you will need to use this feature, such as the requests module:
import asyncio import requests async def download_image(url): # Send network requests to download images (automated switching to other tasks when encountering network IO requests to download images) print("Starting download:", url) loop = asyncio.get_event_loop() The # requests module doesn't support asynchronous operations by default, so it's implemented using a thread pool to go along with it. future = loop.run_in_executor(None, , url) response = await future print('Download complete') # Pictures saved to local files file_name = ('_')[-1] with open(file_name, mode='wb') as file_object: file_object.write() if __name__ == '__main__': url_list = [ '/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg', '/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg', '/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg' ] tasks = [download_image(url) for url in url_list] loop = asyncio.get_event_loop() loop.run_until_complete( (tasks) )
asynchronous iterator
What is an asynchronous iterator?
Realized__aiter__()
cap (a poem)__anext__()
method of the object.__anext__
must return aawaitable
Object.async for
will handle asynchronous iterators of__anext__()
method returns a waitable object until it raises aStopAsyncIteration
Exception.
What is an asynchronous iterable object?
available atasync for
object being used in the statement. It must be passed through its__aiter__()
method returns aasynchronous iterator
。
import asyncio class Reader(object): """ Customizing asynchronous iterators (also asynchronous iterable objects) """ def __init__(self): = 0 async def readline(self): # await (1) += 1 if == 100: return None return def __aiter__(self): return self async def __anext__(self): val = await () if val == None: raise StopAsyncIteration return val async def func(): # Create asynchronous iterable objects async_iter = Reader() # async for must be placed inside an async def function, otherwise it is syntactically incorrect. async for item in async_iter: print(item) (func())
Asynchronous iterators don't really do much, they just support the async for syntax.
Asynchronous Context Manager
This type of object is defined by defining the__aenter__()
cap (a poem)__aexit__()
method for theasync with
Control the environment in the statement
import asyncio class AsyncContextManager: def __init__(self): = None async def do_something(self): # Asynchronous operation of databases return 666 async def __aenter__(self): # Asynchronous linking of databases = await (1) return self async def __aexit__(self, exc_type, exc, tb): # Asynchronous closure of database links await (1) async def func(): async with AsyncContextManager() as f: result = await f.do_something() print(result) (func())
This asynchronous context manager is useful for opening, processing, and closing operations during development.
uvloop
uvloop
beasyncio
The replacement of the event loop in asyncio can make asyncio performance improve. In fact, uvloop is at least 2x faster than other python asynchronous frameworks such as nodejs, gevent, etc., and the performance can be comparable to Go.
Install uvloop
pip3 install uvloop
It is also very easy to replace asyncio's event loop with uvloop in your project, just do it in your code.
import asyncio import uvloop asyncio.set_event_loop_policy(()) # Write asyncio code that matches what you wrote earlier. # Internal event loop automation becomes uvloop (...)
take note of: The well-known asgi uvicorn uses the uvloop event loop internally.
Asynchronous redis
When operating redis through python, linking, setting values, and getting values all involve network IO requests, and using asycio asynchronously allows you to do other tasks while IO waits, thus improving performance.
Installing the Python asynchronous operations redis module
pip3 install aioredis
case (law):: Connect multiple redis to do operations (IO encountered will switch to other tasks, providing performance)
import asyncio import aioredis async def execute(address, password): print("Commencement of execution", address) # Network IO operation: first go to connect 77.95.4.197:6379, when encountering IO then automatically switch tasks to connect 77.95.4.198:6379 redis = await aioredis.create_redis_pool(address, password=password) # Network IO operations: automatically switch tasks when IO is encountered await redis.hmset_dict('car', key1=1, key2=2, key3=3) # Network IO operations: automatically switch tasks when IO is encountered result = await ('car', encoding='utf-8') print(result) () # Network IO operations: automatically switch tasks when IO is encountered await redis.wait_closed() print("The end.", address) task_list = [ execute('redis://77.95.4.197:6379', "123456"), execute('redis://77.95.4.198:6379', "123456") ] ((task_list))
Asynchronous MySQL
When operating MySQL via python, connecting, executing SQL, and shutting down all involve network IO requests, and using asycio asynchronously allows you to do some other tasks while the IO waits, thus improving performance.
Installing the Python asynchronous operations redis module
pip3 install aiomysql
case (law)
import asyncio import aiomysql async def execute(host, password): print("Begin.", host) # Network IO operation: first go to connect to 77.95.40.197, when encountering IO then automatically switch tasks to connect to 77.95.40.198:6379 conn = await (host=host, port=3306, user='root', password=password, db='mysql') # Network IO operations: automatically switch tasks when IO is encountered cur = await () # Network IO operations: automatically switch tasks when IO is encountered await ("SELECT Host,User FROM user") # Network IO operations: automatically switch tasks when IO is encountered result = await () print(result) # Network IO operations: automatically switch tasks when IO is encountered await () () print("The end.", host) task_list = [ execute('77.95.40.197', "123456"), execute('77.95.40.198', "123456") ] ((task_list))
reptile
When writing a crawler application, you need to request the target data through network IO, this situation is suitable for the use of asynchronous programming to improve performance, next we use the aiohttp module that supports asynchronous programming to achieve.
Install the aiohttp module
pip3 install aiohttp
case (law)
import aiohttp import asyncio async def fetch(session, url): print(f"Send Request:{url}") async with (url, verify_ssl=False) as response: text = await () print("Getting results:", url, len(text)) async def main(): async with () as session: url_list = ["", "", ""] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await (tasks) if __name__ == '__main__': (main())
summarize
That's all for this post, I hope it was helpful and I hope you'll check back for more from me!