SoFunction
Updated on 2024-10-30

Python Advanced Concurrency do you understand?

Definition of Concurrent Programs

concurrent program(Coroutine), also known as microthreads and fibers. (A co-thread is a user-state lightweight thread)

corresponds English -ity, -ism, -ization: while executing function A, you can interrupt at any time to execute function B, and then interrupt function B to continue executing function A (which can be switched automatically), but this process is not a function call (there is no call statement), the process is very much like multi-threading, however, there is only one thread executing the concurrent program

colloquial understanding: a function in a thread, can be anywhere in the current function to save some temporary variables and other information, and then switch to another function to execute, note that it is not done by calling the function, and the number of times to switch and when to switch back to the original function are determined by the developer themselves

Differences between concatenation and threading

In multitasking, thread switching is much more than just saving and restoring the CPU context at the system level. The operating system has its own cache and other data for each thread to run the program efficiently, and the operating system will also help you to do these data recovery operations. So thread switching is very performance intensive. But the switching of concurrent threads is simply a matter ofContext of the operating CPUSo the system is resistant to switching a million times a second.

Criteria for Concurrent Programs

Concurrency must be realized in only one single thread.

  • Modifying shared data without locks
  • Context stack for keeping multiple control flows in the user program itself
  • One thread automatically switches to another thread when it encounters an IO operation.

Advantages of Concurrent Programs

  • Because it has its own context and stack, there is no need for the overhead of thread context switching, which belongs to program-level switching and is completely unnoticeable by the operating system, thus making it more lightweight.
  • No need for the locking and synchronization overhead of atomic operations
  • Easy switching of control flow and simplified programming model
  • The effect of concurrency can be achieved within a single thread, maximizing the use of cpu, and high scalability, low cost

Disadvantages of Concurrent Programs

  • Inability to utilize multi-core resources: the essence of the concatenation is a single thread, it can not be a single CPU at the same time on multiple cores, the concatenation needs to work with the process in order to run on multiple CPUs.
  • Blocking operations (e.g. IO) block the entire program.
  • It doesn't make any sense to switch back and forth between computational operations, and switching back and forth and saving state can in turn degrade performance.

Ways to implement concatenation in python

  • greenlet, is a third-party module for implementing concurrent code (Gevent concurrent is based on the greenlet implementation)
  • yield, generators, with the help of generator features can also implement concurrent code.
  • asyncio, a module introduced in Python 3.4 for writing concurrent code.
  • async & awiat, two keywords introduced in Python 3.5, in combination with theasynciomodule makes it easier to write concurrent code (recommended).

async&await keyword

There are a variety of ways to implement a concatenation, the most popular way at the moment is theasync&awaitIf you want to know about other ways of doing things, this article is about one of the most popular ones.

There are two things you need to know about using co-programming, the event loop and defining co-programmed functions.

event loop

The event loop is an effective way to deal with multiple concurrencies, it can be understood as a dead loop, the loop process to detect and execute certain code, let's look at the following pseudo-code

Task List = [mandates1, mandates2, mandates3............]
while True:
    可执行的Task List,已完成的Task List = 去Task List中检查所有的mandates,commander-in-chief (military)'Executable'cap (a poem)'Completed'的mandates返回
    for 就绪mandates in 可执行的Task List:
        执行已就绪的mandates
    for 已完成的mandates in 已完成的Task List:
        在Task List中移除 已完成的mandates
    如果Task List中的mandates都已完成,then the loop is terminated

The above pseudo-code means: get in the event loop, and then constantly listen to the task list, execute tasks as they are available, and remove tasks until all tasks in the task list are completed, terminating the loop.

Benefits of using the event loop: allows the programmer not to control the addition and deletion of tasks and events

The code is written as follows:

import asyncio
# Get the event loop
loop = asyncio.get_event_loop()
# Put tasks in `task list', listen to event loop
loop.run_until_complete(mandates)
# Close the event
()

Concurrent Functions and Concurrent Objects

Want to define a concurrent function, format:async def function nameConcurrent object: the concurrent object obtained by executing the concurrent function ().

# Define a concurrent function
async def func():
    pass
# Create a concurrent object
result = func()

Note: Execute the concurrent function, create the concurrent object, the function code will not run, if you want to run the internal code of the concurrent function, you must give the concurrent object to the event loop to handle, see the following code

import asyncio
async def func():
    print("Hello.")
result = func()
# Mode 1
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
# Mode 2
(result)  # python3.7style of writing (literary style)

await

wait is a keyword that can only be used in a concurrent function. It is used to hang the current concurrent program (task) when encountering an IO operation. During the hanging process of the current concurrent program (task), the event loop can go to execute other concurrent programs (tasks), and the current concurrent program can switch back to executing the code after the completion of the IO processing.

Example: we have created two tasks, a download image, a download video, we first execute the download image task, when encountered io operation, under normal circumstances will wait for the picture to finish downloading, but await can be suspended first download image task, and then automatically switch to download video tasks

Usage:await + waitable objects (coprogram objects, Future objects, Task objects)

Case 1

import asyncio
async def func():
    print("Execute the code inside the coprocessor function.")
    # Hang the current thread (task) when it encounters an IO operation, and wait for the IO operation to complete before proceeding further.
    # The event loop can go to other threads (tasks) when the current thread hangs.
    response = await (2)
    print("The IO request ended with the following result:", response)
result = func()
(result)

Case 2

import asyncio
async def others():
    print("start")  # ④Print start
    await (2)  # ⑤ Wait for 2 seconds, during which you can switch to another program.
    print("end")  # ⑥ Print end
    return 'Return Value'
async def func():
    print("Execute the code inside the coprocessor function.")  # ② Execute the coprocessor function and print the print code.
    response = await others()  # ③ Wait for the co-programming function others
    print(f"ioRequest closed,prove{response}")  # ⑦ Wait for others to finish and print the print statement.
if __name__ == '__main__':
    (func())  # ①Concurrent functions run in an event loop

All of the above examples only create one task, i.e.: there is only one task in the task list of the event loop, so it is not possible to demonstrate the switch to other task effect while IO is waiting. In the program want to create multiple task objects, you need to use the Task object to achieve.

Task object

Tasks is used to concurrently schedule co-programs via theasyncio.create_task(concatenation object)This allows the concurrent program to join the event loop and wait to be scheduled for execution. In addition to using theasyncio.create_task() Functions other than the low-levelloop.create_task()maybeensure_future() function. It is not recommended to manually instantiate theTask Object.

Essentially, it encapsulates the concatenation object into a task object and adds the concatenation to the event loop immediately, while tracking the concatenation's state.

take note ofasyncio.create_task() functions were added in Python 3.7. Prior to Python 3.7, it was possible to instead use the low-levelasyncio.ensure_future() function.

Case 1

import asyncio
async def func():
    print(1)
    await (2)
    print(2)
    return "Return Value"
async def main():
    print("Main start")
    # Create a concurrent program, encapsulate the concurrent program in a Task object and immediately add it to the list of tasks in the event loop, waiting for the event loop to execute it (the default is the ready state).
    task1 = asyncio.create_task(func())
    # Create a concurrent program, encapsulate the concurrent program in a Task object and immediately add it to the list of tasks in the event loop, waiting for the event loop to execute it (the default is the ready state).
    task2 = asyncio.create_task(func())
    print("End of main")
    # Automatically switch to execute other tasks when the execution of a particular concatenation encounters an IO operation.
    # wait here is to wait for the corresponding concurrent program to finish executing and get the result
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)
(main())

Case await + task list (most used)

import asyncio
async def func():
    print(1)
    await (2)
    print(2)
    return "Return Value"
async def main():
    print("Main start")
    # Create a concurrent program, encapsulate the concurrent program in a Task object and add it to the list of tasks in the event loop, and wait for the event loop to execute it (the default is the ready state).
    # In the call to
    task_list = [asyncio.create_task(func()), asyncio.create_task(func())]
    print("End of main")
    # Automatically switch to execute other tasks when the execution of a particular concatenation encounters an IO operation.
    # wait here is to wait for the execution of all concatenations to complete and save the return values of all concatenations to done
    # If the timeout value is set, it means the maximum number of seconds to wait here, the return value of the completed concatenation is written to done, and the unfinished one is written to pending.
    done, pending = await (task_list)
    print(done)
(main())

take note of:  The source code internally executes each concatenation in the list ofensure_futurethus encapsulatingTaskobject, so there is no need to use it in conjunction with thewaitwhen used in combinationtask_listThe value of the[func(),func()] It's also possible.

boyfriend

The Future object in asyncio is a relatively low-level object, usually we don't use this object directly, but use the Task object directly to complete the task and state tracking. (Task is a subclass of Futrue.)

Future provides us with handling of the end result in asynchronous programming (the Task class also has state handling).

Case 1

async def main():
    # Get the current event loop
    loop = asyncio.get_running_loop()
    # Create a task (Future object) that does nothing.
    fut = loop.create_future()
    # Wait for the final result of the task (Future object), without which it will wait forever.
    await fut
(main())

The result is that the program keeps waiting and won't end

Case 2

import asyncio
async def set_after(fut):
    await (2)
    fut.set_result("666")
async def main():
    # Get the current event loop
    loop = asyncio.get_running_loop()
    # Creating a task (Future object) with no behavior bound to it means that the task will never know when to end.
    fut = loop.create_future()
    # Create a task (Task object), bound to the set_after function, which internally assigns a value to fut after 2s.
    # i.e. manually set the final result of the future task, then the fut can end.
    await loop.create_task(set_after(fut))
    # Wait for the Future object to get the final result, otherwise it waits forever.
    data = await fut
    print(data)
(main())

The Future object itself is bound to a function, so if you want the event loop to get the result of the Future, you need to set it manually. The Task object inherits the Future object, in fact, it extends Future, he can realize that after the corresponding bound function execution is completed, automatically execute set_result, so as to realize the automatic end.

Although, the usual use of Task objects, but the processing of the results of the essence of the implementation is based on the Future object.

boyfriend

There is also a Future object in Python's module that is used when implementing asynchronous operations based on thread pools and process pools.

import time
from  import Future
from  import ThreadPoolExecutor
from  import ProcessPoolExecutor
def func(value):
    (1)
    print(value)
pool = ThreadPoolExecutor(max_workers=5)
# or pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    fut = (func, i)
    print(fut)

The two Future objects are different, they are designed for different application scenarios, for example:unsupportedawaitGrammar, etc.

In Python provides a way to convert the object wrapped inFunctions for objectsasynic.wrap_future

The next question you must be asking is: why does python provide this functionality?

In fact, generally in program development we either uniformly use theasycio The asynchronous operation is implemented using either the process pool or the thread pool, but if the asynchrony of the protocols is mixed with the asynchrony of the process pool/thread pool, then this feature will be used. However, if the asynchrony of a concatenation is mixed with the asynchrony of a process/thread pool, then this feature is used.

import time
import asyncio
import 
def func1():
    # A time-consuming operation
    (2)
    return "OK"
async def main():
    loop = asyncio.get_running_loop()
    # way (of life)1. Run in the default loop's executor ( default (setting)ThreadPoolExecutor )
    # The first step: the internal will first call the ThreadPoolExecutor submit method to apply for a thread in the thread pool to execute func1 function, and return an object
    # Step 2: Call asyncio.wrap_future to wrap the object.
    # Because objects don't support the await syntax, they need to be wrapped as objects in order to be used.
    fut = loop.run_in_executor(None, func1)
    result = await fut
    print('default thread pool', result)
    # way (of life)2. Run in a custom thread pool:
    # with () as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom thread pool', result)
    # way (of life)3. Run in a custom process pool:
    # with () as pool:
    #     result = await loop.run_in_executor(
    #         pool, func1)
    #     print('custom process pool', result)
(main())

application scenario: When the project is developed with asynchronous programming in a concurrent manner, if you want to use a third-party module that does not support asynchronous programming in a concurrent manner, you will need to use this feature, such as the requests module:

import asyncio
import requests
async def download_image(url):
    # Send network requests to download images (automated switching to other tasks when encountering network IO requests to download images)
    print("Starting download:", url)
    loop = asyncio.get_event_loop()
    The # requests module doesn't support asynchronous operations by default, so it's implemented using a thread pool to go along with it.
    future = loop.run_in_executor(None, , url)
    response = await future
    print('Download complete')
    # Pictures saved to local files
    file_name = ('_')[-1]
    with open(file_name, mode='wb') as file_object:
        file_object.write()
if __name__ == '__main__':
    url_list = [
        '/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg',
        '/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg',
        '/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg'
    ]
    tasks = [download_image(url) for url in url_list]
    loop = asyncio.get_event_loop()
    loop.run_until_complete( (tasks) )

asynchronous iterator

What is an asynchronous iterator?

Realized__aiter__()cap (a poem)__anext__()method of the object.__anext__ must return aawaitable Object.async for will handle asynchronous iterators of__anext__()method returns a waitable object until it raises aStopAsyncIteration Exception.

What is an asynchronous iterable object?

available atasync for object being used in the statement. It must be passed through its__aiter__() method returns aasynchronous iterator

import asyncio
class Reader(object):
    """ Customizing asynchronous iterators (also asynchronous iterable objects) """
    def __init__(self):
         = 0
    async def readline(self):
        # await (1)
         += 1
        if  == 100:
            return None
        return 
    def __aiter__(self):
        return self
    async def __anext__(self):
        val = await ()
        if val == None:
            raise StopAsyncIteration
        return val
async def func():
    # Create asynchronous iterable objects
    async_iter = Reader()
    # async for must be placed inside an async def function, otherwise it is syntactically incorrect.
    async for item in async_iter:
        print(item)
(func())

Asynchronous iterators don't really do much, they just support the async for syntax.

Asynchronous Context Manager

This type of object is defined by defining the__aenter__() cap (a poem)__aexit__() method for theasync with Control the environment in the statement

import asyncio
class AsyncContextManager:
    def __init__(self):
         = None
    async def do_something(self):
        # Asynchronous operation of databases
        return 666
    async def __aenter__(self):
        # Asynchronous linking of databases
         = await (1)
        return self
    async def __aexit__(self, exc_type, exc, tb):
        # Asynchronous closure of database links
        await (1)
async def func():
    async with AsyncContextManager() as f:
        result = await f.do_something()
        print(result)
(func())

This asynchronous context manager is useful for opening, processing, and closing operations during development.

uvloop

uvloopbeasyncio The replacement of the event loop in asyncio can make asyncio performance improve. In fact, uvloop is at least 2x faster than other python asynchronous frameworks such as nodejs, gevent, etc., and the performance can be comparable to Go.

Install uvloop

pip3 install uvloop

It is also very easy to replace asyncio's event loop with uvloop in your project, just do it in your code.

import asyncio
import uvloop
asyncio.set_event_loop_policy(())
# Write asyncio code that matches what you wrote earlier.
# Internal event loop automation becomes uvloop
(...)

take note of: The well-known asgi uvicorn uses the uvloop event loop internally.

Asynchronous redis

When operating redis through python, linking, setting values, and getting values all involve network IO requests, and using asycio asynchronously allows you to do other tasks while IO waits, thus improving performance.

Installing the Python asynchronous operations redis module

pip3 install aioredis

case (law):: Connect multiple redis to do operations (IO encountered will switch to other tasks, providing performance)

import asyncio
import aioredis
async def execute(address, password):
    print("Commencement of execution", address)
    # Network IO operation: first go to connect 77.95.4.197:6379, when encountering IO then automatically switch tasks to connect 77.95.4.198:6379
    redis = await aioredis.create_redis_pool(address, password=password)
    # Network IO operations: automatically switch tasks when IO is encountered
    await redis.hmset_dict('car', key1=1, key2=2, key3=3)
    # Network IO operations: automatically switch tasks when IO is encountered
    result = await ('car', encoding='utf-8')
    print(result)
    ()
    # Network IO operations: automatically switch tasks when IO is encountered
    await redis.wait_closed()
    print("The end.", address)
task_list = [
    execute('redis://77.95.4.197:6379', "123456"),
    execute('redis://77.95.4.198:6379', "123456")
]
((task_list))

Asynchronous MySQL

When operating MySQL via python, connecting, executing SQL, and shutting down all involve network IO requests, and using asycio asynchronously allows you to do some other tasks while the IO waits, thus improving performance.

Installing the Python asynchronous operations redis module

pip3 install aiomysql

case (law)

import asyncio
import aiomysql
async def execute(host, password):
    print("Begin.", host)
    # Network IO operation: first go to connect to 77.95.40.197, when encountering IO then automatically switch tasks to connect to 77.95.40.198:6379
    conn = await (host=host, port=3306, user='root', password=password, db='mysql')
    # Network IO operations: automatically switch tasks when IO is encountered
    cur = await ()
    # Network IO operations: automatically switch tasks when IO is encountered
    await ("SELECT Host,User FROM user")
    # Network IO operations: automatically switch tasks when IO is encountered
    result = await ()
    print(result)
    # Network IO operations: automatically switch tasks when IO is encountered
    await ()
    ()
    print("The end.", host)
task_list = [
    execute('77.95.40.197', "123456"),
    execute('77.95.40.198', "123456")
]
((task_list))

reptile

When writing a crawler application, you need to request the target data through network IO, this situation is suitable for the use of asynchronous programming to improve performance, next we use the aiohttp module that supports asynchronous programming to achieve.

Install the aiohttp module

pip3 install aiohttp

case (law)

import aiohttp
import asyncio
async def fetch(session, url):
    print(f"Send Request:{url}")
    async with (url, verify_ssl=False) as response:
        text = await ()
        print("Getting results:", url, len(text))
async def main():
    async with () as session:
        url_list = ["", "", ""]
        tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
        await (tasks)
if __name__ == '__main__':
    (main())

summarize

That's all for this post, I hope it was helpful and I hope you'll check back for more from me!