python异步协程使用


python异步协程使用


基本概念

阻塞

阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。

常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。

阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。
如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。

非阻塞

程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞的。

非阻塞并不是在任何程序级别、任何情况下都可以存在的。
仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。

非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。

同步

不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。

例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。

同步意味着有序

异步

为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。

例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。

不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。

异步意味着无序。

多进程

多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。

协程

协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。

协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。

因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。

协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。


Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的

Python 3.5 则增加了 async/await,使得协程的实现更加方便。


python 异步协程用法

event_loop:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。

coroutine:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。

task:任务,它是对协程对象的进一步封装,包含了任务的各个状态。

future:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。

async 定义一个协程

await 用来挂起阻塞方法的执行。


async 定义的方法就会变成一个无法直接执行的 coroutine 对象,必须将其注册到事件循环中才可以执行


# 创建 coroutine 对象

import asyncio

async def execute(x):
    print('Number:', x)

coroutine = execute(1)   # 无法直接执行的 coroutine 对象
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()   # 定义了 loop 对象
loop.run_until_complete(coroutine)  #  注册到事件循环中
print('After calling loop')

coroutine 对象转化为了 task 对象


# 创建 task 对象

import asyncio
 
async def execute(x):
    print('Number:', x)
    return x
 
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
 
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)     # 状态是 pending Task: <Task pending coro=<execute() running at demo.py:4>
loop.run_until_complete(task)
print('Task:', task)     # 状态是done  <Task finished coro=<execute() done, defined at demo.py:4> result=1>
print('After calling loop')


直接通过 asyncio  ensure_future() 方法 创建 task 对象 不需要 loop创建


import asyncio
 
async def execute(x):
    print('Number:', x)
    return x
 
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
 
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')


给task 对象绑定回调函数

import asyncio
import requests
 
async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url)
    return status
 
def callback(task):
    print('Status:', task.result())  # 调用 task 对象的 result() 方法 获取 task 函数的返回值
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)  # task 执行完毕之后就可以调用 callback() 方法
print('Task:', task) 
 
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)

当然不用回调方法,直接在 loop.run_until_complete(task) 后, task 运行完毕之后也可以直接调用 result() 方法获取结果


同时执行多任务协程 使用 asyncio.wait(task_list)

loop.run_until_complete(asyncio.wait(tasks))


支持异步请求的库 aiohttp

asyncio 配合使用 可以非常方便地实现异步请求操作

客户端请求 多任务协程

import asyncio
import aiohttp
import requests
import time

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    await session.close()
    return result

async def request():
    url = "http://127.0.0.1:5000/"
    print("Waiting for ", url)
    response = await get(url)
    print("get response fromm ", url, "result", response)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()

print('Cost time is ', end - start )


服务端

from flask import Flask

import time

app = Flask(__name__)
@app.route('/')
def index():
    time.sleep(3)
    return "Hello!"

if __name__ == '__main__':
    app.run(threaded=True)  # 支持多线程


遇到阻塞式操作时,任务被挂起,程序接着去执行其他的任务,

可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上

前提

服务器无限抗压 
    
    服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压

 IO 时延
 
    还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。


同时使用 多进程和多线程 aiomultiprocess库 需要 python3.6+


import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

start = time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

end = time.time()
print('Cost time:', end - start)

Buy me a 肥仔水!