python异步协程使用
基本概念
阻塞
阻塞状态
指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则称该程序在该操作上是阻塞的。
常见的阻塞形式有:网络 I/O 阻塞、磁盘 I/O 阻塞、用户输入阻塞等。
阻塞是无处不在的,包括 CPU 切换上下文时,所有的进程都无法真正干事情,它们也会被阻塞。
如果是多核 CPU 则正在执行上下文切换操作的核不可被利用。
非阻塞
程序在等待某操作过程中,自身不被阻塞,可以继续运行干别的事情,则称该程序在该操作上是非阻塞
的。
非阻塞并不是在任何程序级别、任何情况下都可以存在的。
仅当程序封装的级别可以囊括独立的子程序单元时,它才可能存在非阻塞状态。
非阻塞的存在是因为阻塞存在,正因为某个操作阻塞导致的耗时与效率低下,我们才要把它变成非阻塞的。
同步
不同程序单元为了完成某个任务,在执行过程中需靠某种通信方式以协调一致,称这些程序单元是同步执行的。
例如购物系统中更新商品库存,需要用“行锁”作为通信信号,让不同的更新请求强制排队顺序执行,那更新库存的操作是同步的。
同步意味着有序
异步
为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式,不相关的程序单元之间可以是异步的。
例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。
不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。
异步意味着无序。
多进程
多进程就是利用 CPU 的多核优势,在同一时间并行地执行多个任务,可以大大提高执行效率。
协程
协程,英文叫做 Coroutine,又称微线程,纤程,协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。
协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
因此协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入时,就相当于进入上一次调用的状态。
协程本质上是个单进程,协程相对于多进程来说,无需线程上下文切换的开销,无需原子操作锁定及同步的开销,编程模型也非常简单。
Python 3.4 开始,Python 中加入了协程的概念,但这个版本的协程还是以生成器对象为基础的
Python 3.5 则增加了 async/await,使得协程的实现更加方便。
python 异步协程用法
event_loop
:事件循环,相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件发生的时候,就会调用对应的处理方法。
coroutine
:中文翻译叫协程,在 Python 中常指代为协程对象类型,我们可以将协程对象注册到时间循环中,它会被事件循环调用。我们可以使用 async 关键字来定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
task
:任务,它是对协程对象的进一步封装,包含了任务的各个状态。
future
:代表将来执行或没有执行的任务的结果,实际上和 task 没有本质区别。
async
定义一个协程
await
用来挂起阻塞方法的执行。
async 定义的方法就会变成一个无法直接执行的 coroutine 对象
,必须将其注册到事件循环中才可以执行
# 创建 coroutine 对象
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(1) # 无法直接执行的 coroutine 对象
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop() # 定义了 loop 对象
loop.run_until_complete(coroutine) # 注册到事件循环中
print('After calling loop')
将 coroutine 对象
转化为了 task 对象
# 创建 task 对象
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task) # 状态是 pending Task: <Task pending coro=<execute() running at demo.py:4>
loop.run_until_complete(task)
print('Task:', task) # 状态是done <Task finished coro=<execute() done, defined at demo.py:4> result=1>
print('After calling loop')
直接通过 asyncio 的 ensure_future() 方法 创建 task 对象, 不需要 loop创建
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
给task 对象绑定回调函数
import asyncio
import requests
async def request():
url = 'https://www.baidu.com'
status = requests.get(url)
return status
def callback(task):
print('Status:', task.result()) # 调用 task 对象的 result() 方法 获取 task 函数的返回值
coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback) # task 执行完毕之后就可以调用 callback() 方法
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
当然不用回调方法,直接在 loop.run_until_complete(task)
后, task 运行完毕之后也可以直接调用 result() 方法获取结果
同时执行多任务协程
使用 asyncio.wait(task_list)
loop.run_until_complete(asyncio.wait(tasks))
支持异步请求的库 aiohttp
与 asyncio
配合使用 可以非常方便地实现异步请求操作
客户端请求 多任务协程
import asyncio
import aiohttp
import requests
import time
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
await session.close()
return result
async def request():
url = "http://127.0.0.1:5000/"
print("Waiting for ", url)
response = await get(url)
print("get response fromm ", url, "result", response)
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time is ', end - start )
服务端
from flask import Flask
import time
app = Flask(__name__)
@app.route('/')
def index():
time.sleep(3)
return "Hello!"
if __name__ == '__main__':
app.run(threaded=True) # 支持多线程
遇到阻塞式操作
时,任务被挂起
,程序接着去执行其他的任务,
可以充分利用 CPU 时间,而不必把时间浪费在等待 IO 上
前提
服务器无限抗压
服务器在同一时刻接受无限次请求都能保证正常返回结果,也就是服务器无限抗压
IO 时延
还要忽略 IO 传输时延,确实可以做到无限 task 一起执行且在预想时间内得到结果。
同时使用 多进程和多线程 aiomultiprocess
库 需要 python3.6+
import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
result = await response.text()
session.close()
return result
async def request():
url = 'http://127.0.0.1:5000'
urls = [url for _ in range(100)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
end = time.time()
print('Cost time:', end - start)