参考–使用Python进行并发编程 使用Python进行并发编程-asyncio篇(二)
核心关键字
Eventloop
+Coroutine
+Future
+Task
Eventloop
asyncio 应用的核心
,是中央总控。Eventloop 实例提供了注册、取消和执行任务和回调的方法
。
把一些异步函数 (就是任务,Task
) 注册到这个事件循环上,事件循环会循环执行这些函数 (但同时只能执行一个),当执行到某个函数时,如果它正在等待 I/O 返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成 I/O 后会恢复,下次循环到它的时候继续执行。
因此,这些异步函数可以协同 (Cooperative) 运行
:这就是事件循环的目标
asyncio 根据你的操作系统信息会帮你选择默认的事件循环类
在*nix
下使用的类继承于 BaseEventLoop
eventloop的基本实现
# call_exception_handler 和 get_debug 是必须存在
from collections import deque
def done_callback(fut):
fut._loop.stop()
class Loop:
def __init__(self):
self._ready = deque()
self._stopping = False
def create_task(self, coro):
Task = asyncio.tasks.Task
task = Task(coro, loop=self)
return task
def run_until_complete(self, fut):
tasks = asyncio.tasks
# 获取任务
fut = tasks.ensure_future(
fut, loop=self)
# 增加任务到self._ready
fut.add_done_callback(done_callback)
# 跑全部任务
self.run_forever()
# 从self._ready中移除
fut.remove_done_callback(done_callback)
def run_forever(self):
try:
while 1:
self._run_once()
if self._stopping:
break
finally:
self._stopping = False
def call_soon(self, cb, *args):
self._ready.append((cb, args))
def _run_once(self):
ntodo = len(self._ready)
for i in range(ntodo):
t, a = self._ready.popleft()
t(*a)
def stop(self):
self._stopping = True
def close(self):
self._ready.clear()
def call_exception_handler(self, c):
pass
def get_debug(self):
return False
async def foo():
print('Hello Foo')
async def bar():
print('Hello Bar')
loop = Loop()
tasks = [loop.create_task(foo()),
loop.create_task(bar())]
loop.run_until_complete(
asyncio.wait(tasks))
loop.close()
Coroutine
协程 (Coroutine) 本质上是一个函数,特点是在代码块中
可以将执行权交给其他协程`:
# coding: utf-8
import asyncio
async def a():
print("Suspending a")
await asyncio.sleep(0)
# 协程 a 被挂起, sleep 0 并不会真的 sleep(因为时间为 0),但是却可以把控制权交出去
print("Resuming a")
async def b():
print("fn b")
async def main():
await asyncio.gather(a(), b())
# asyncio.gather 用来并发运行任务
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
# asyncio.run(main()) Python 3.7+ 新加的接口
Future
代表了一个「未来」对象
,异步操作结束后会把最终结果设置到这个 Future 对象上
Future 是对协程的封装
,不过日常开发基本是不需要直接用这个底层 Future 类的
future 對象的方法 属性
cancel
cancelled
done
exception
get_loop
remove_done_callback
result
set_exception
set_result
Process
finished
可以对这个 Future 实例添加完成后的回调 (add_done_callback)
、取消任务 (cancel)
、 设置最终结果 (set_result)
、设置异常 (如果有的话,set_exception)
等
future 对象 在 await
之后状态成了 finished
因为用loop.run_in_executor创建的 Future 注册了一个回调(通过asyncio.futures.wrap_future,
加了一个_call_set_state回调
# 一个对象可以被 await 的条件
def __await_(self):
if not self.done():
self._asyncio_future_blocking = True
yield self
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result()
Task
Eventloop
除了支持协程
,还支持注册 Future
和 Task
类型的对象
-
Future 是协程的封装
Future 对象提供了很多任务方法 (如完成后的回调、取消、设置任务结果等等), 但是开发者并不需要直接操作 Future 这种底层对象
-
Task
Future 的子类
协同的调度协程以实现并发
Task 类用来管理协同程序运行的状态
Task类的基本实现
import asyncio
class Task(asyncio.futures.Future):
def __init__(self, gen, *,loop):
super().__init__(loop=loop)
self._gen = gen
self._loop.call_soon(self._step)
def _step(self, val=None, exc=None):
# 如果_step 方法没有让协程执行完成,就会添加回调,_wakeup 又会继续执行_step... 直到协程程序完成,并 set_result。
# 写个使用它的例子:
try:
if exc:
f = self._gen.throw(exc)
else:
f = self._gen.send(val)
except StopIteration as e:
self.set_result(e.value)
except Exception as e:
self.set_exception(e)
else:
f.add_done_callback(
self._wakeup)
def _wakeup(self, fut):
# _wakeup 又会继续执行_step... 直到协程程序完成,并 set_result
try:
res = fut.result()
except Exception as e:
self._step(None, e)
else:
self._step(res, None)
async def foo():
await asyncio.sleep(2)
print('Hello Foo')
async def bar():
await asyncio.sleep(1)
print('Hello Bar')
loop = asyncio.get_event_loop()
tasks = [Task(foo(), loop=loop),
loop.create_task(bar())]
loop.run_until_complete(
asyncio.wait(tasks))
loop.close()
asyncio 并发
正确的使用方式
create_task
是 AbstractEventLoop 的抽象方法,不同的 loop 可以实现不同的创建 Task 方法,这里用的是 BaseEventLoop 的实现。
ensure_future
是 asyncio 封装好的创建 Task 的函数,它还支持一些参数,甚至指定 loop。
一般应该使用它
import asyncio
import time
async def a():
print('Suspending a')
await asyncio.sleep(3)
print('Resuming a')
async def b():
print('Suspending b')
await asyncio.sleep(1)
print('Resuming b')
# 错误的用法1
async def e1():
await a()
await b()
# 错误的用法2
# 直接 await task 不会对并发有帮助
async def e2():
await asyncio.create_task(a())
await asyncio.create_task(b())
async def c1():
await asyncio.gather(a(), b())
async def c2():
await asyncio.wait((a(), b()))
async def c3():
# asyncio.create_task 相当于把协程封装成 Task
task1 = asyncio.create_task(a())
task2 = asyncio.create_task(b())
await task1
await task2
async def c4():
task1 = asyncio.create_task(a())
await b()
await task1
async def c5():
task = asyncio.ensure_future(b())
await a()
# 挂起 a(),
await task
async def c6():
loop = asyncio.get_event_loop()
task = loop.create_task(b())
await a()
await task
# time.perf_counter 是现在推荐的计算耗时的用法
def show_perf(func):
print('*' * 20)
start = time.perf_counter()
asyncio.run(func())
print(f'{func.__name__} Cost: {time.perf_counter() - start}')
if __name__ == '__main__':
show_perf(c4)
不同并发请求方式的响应时间比较
requests
+ThreadPoolExecutor
2s左右
import time
import requests
from concurrent.futures import ThreadPoolExecutor
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
def fetch(a):
r = requests.get(URL.format(a))
return r.json()['args']['a']
start = time.perf_counter()
with ThreadPoolExecutor(max_workers=3) as executor:
for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
print('fetch({}) = {}'.format(num, result))
print(f'Use requests+ThreadPoolExecutor cost: {time.perf_counter() - start}')
asyncio
+aiohttp
0.5s左右
import aiohttp
import asyncio
import time
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
async def fetch_async(a):
async with aiohttp.request("GET", URL.format(a)) as r:
data = await r.json()
# 使用 await 关键字进行协程切换
# r.json 方法会等待 I/O(也就是正在做一个网络请求),这种就可以切换去做其他的时候,之后再切换回来
return data
start = time.perf_counter()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))
for num, result in zip(NUMBERS, results):
print(f"fetch{num} = {result}")
print(f'cost {time.perf_counter() - start}')
C10K
以及 I/O 多路復用
1 多进程/ 多线程 消耗过大
进程 / 线程作为处理单元还是太厚重
系统调度的代价太高
2 用同一进程 / 线程来同时处理若干连接
「用同一进程 / 线程来同时处理若干连接」
I/O 多路复用
select
每个连接对应一个描述符(socket)
,循环处理各个连接,先查下它的状态,
ready 了就进行处理,不 ready 就不进行处理
缺點
单个进程能够监视的文件描述符的数量存在最大限制
对 socket 进行扫描时是线性扫描,即采用轮询的方法,效率较低。
需要维护一个用来存放大量的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
poll
基于链表
来存储的,没有最大连接数的限制
缺點
大量的的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
poll 的特点是「水平触发 (只要有数据可以读,不管怎样都会通知)」,如果报告后没有被处理,那么下次 poll 时会再次报告它。
epoll
使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次
epoll 支持水平触发和边缘触发,最大的特点在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,并且只会通知一次
没有最大并发连接的限制,能打开的 fd 的上限远大于 1024(1G 的内存上能监听约 10 万个端口)
效率提升,不是轮询的方式,不会随着 fd 数目的增加效率下降
内存拷贝,利用 mmap () 文件映射内存加速与内核空间的消息传递;即 epoll 使用 mmap 减少复制开销
epoll 成为 C10K killer、高并发、高性能、异步非阻塞这些技术的代名词
FreeBSD 推出了 kqueue,
Linux 推出了 epoll,
Windows 推出了 IOCP,
Solaris 推出了 /dev/poll
epoll 技术的编程模型就是异步非阻塞回调
,也可以叫做 Reactor 事件驱动、事件轮循(EventLoop)
libevent、Tornado、Node.js 这些就是 epoll 时代的产物
协程
试图用一组少量的线程来实现多个任务,一旦某个任务阻塞,则可能用同一线程继续运行其他任务
,避免大量上下文的切换
本质上也是异步非阻塞技术
# 实例
import time
import threading
import asyncio
import aiomysql
import numpy
import pandas as pd
from datetime import datetime
import os, sys
import django
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
django.setup()
from django.conf import settings
from os import path, makedirs, remove
from libs.model.account import MAccount
from libs.serializer.accounts import SAccountListItem, SAccountListItemSpendOnly, SAccountListItemDaily, SAccountListItemDailySpendOnly
DATABASE_SETTING = settings.DATABASES['default']
f_str = '%H:%M:%S'
class AccountDataMail():
"""
账户数据邮件
"""
CHUNK_SIZE = 100
# CHUNK_SIZE = 2
loop = None
pool = None
source_data = {}
def __init__(self, info):
self.user = info['user_id']
data = info['data']
self.level = data['level']
if self.level == 0:
self.date_range = data['date_range']
else:
self.month = data['month']
self.spend_only = data['spend_only']
try:
self.queryset = MAccount.objects.raw(data.get('queryset_sql'))
except:
self.queryset = MAccount.objects.all().filter(active=True)
self.create_thread()
def create_thread(self):
print('主线程 ID:{}'.format(threading.currentThread().ident))
self.loop = asyncio.new_event_loop()
self.thread_handler = threading.Thread(target=self.init_ioloop, args=(self.loop, ))
self.thread_handler.start()
def init_ioloop(self, lp: asyncio.AbstractEventLoop):
print('初始化 Loop,并启动子线程,ID 为:{}'.format(threading.currentThread().ident))
asyncio.set_event_loop(lp)
async def init_mysql_pool(self):
mysql_settings = {
'host': DATABASE_SETTING['HOST'],
'port': int(DATABASE_SETTING['PORT']),
'db': DATABASE_SETTING['NAME'],
'user': DATABASE_SETTING['USER'],
'password': DATABASE_SETTING['PASSWORD'],
'charset': DATABASE_SETTING['OPTIONS']['charset'],
'maxsize': 100
}
print('初始化连接池')
return await aiomysql.create_pool(**mysql_settings)
async def close_mysql(self):
self.pool.close()
await self.pool.wait_closed()
def mail(self):
file_name = 'test.xlsx'
total = len(self.queryset)
chunks = round(total / self.CHUNK_SIZE)
print(total)
self.loop.run_until_complete(self.get_source_data(chunks))
data_list = []
print(datetime.now().strftime(f_str), '开始处理数据')
for i in range(chunks):
data_list = numpy.append(data_list, self.source_data.pop(i))
df = pd.DataFrame.from_records(data_list)
props = self.get_table_field_props()
df = df[props.keys()]
dsl = 'daily_spend_list'
tds = 'total_daily_spend'
if dsl in df.columns:
cdata = df[dsl].apply(pd.Series).stack().reset_index(level=-1, drop=True)
cdf = cdata.to_frame()[0].apply(pd.Series)
ldf = cdf.pivot(values='system_value', columns='date')
total = df.pop(tds)
df = df.drop([dsl,], axis=1)
df = pd.concat([df, ldf], axis=1)
df.insert(len(df.columns), tds, total)
print(datetime.now().strftime(f_str), '开始写文件')
if path.exists(file_name):
remove(file_name)
df.rename(columns=props).set_index(props['account_id']).to_excel(file_name)
print(datetime.now().strftime(f_str), '写文件完毕')
async def get_source_data(self, chunks):
print(datetime.now().strftime(f_str), '获取源数据')
self.pool = await self.init_mysql_pool()
takes = [self.get_chunk(i) for i in range(chunks)]
await asyncio.gather(*takes)
async def get_chunk(self, index):
sql = self.queryset.query
query_sql = '{} LIMIT {}, {}'.format(sql, index * self.CHUNK_SIZE, self.CHUNK_SIZE)
arr = []
async with self.pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(query_sql)
row = await cur.fetchone()
print(row)
while row:
arr.append(MAccount(**row))
row = await cur.fetchone()
try:
print(datetime.now().strftime(f_str), '获取到源数据块【{}】'.format(index))
serializer = self.get_serializer(arr)
self.source_data[index] = serializer.data
print(datetime.now().strftime(f_str), '处理源数据块【{}】成功'.format(index))
except Exception as e:
print(e)
def get_serializer(self, instance=None):
kwargs = {
'instance': instance or self.queryset,
'many': True,
'context': {}
}
if self.level == 0:
kwargs['context']['date_range'] = self.date_range
if self.spend_only:
return SAccountListItemDailySpendOnly(**kwargs)
else:
return SAccountListItemDaily(**kwargs)
else:
kwargs['context']['month'] = self.month
if self.spend_only:
return SAccountListItemSpendOnly(**kwargs)
else:
return SAccountListItem(**kwargs)
def get_table_field_props(self):
serializer = SAccountListItem
if self.level == 0:
if self.spend_only:
serializer = SAccountListItemDailySpendOnly
else:
serializer = SAccountListItemDaily
else:
if self.spend_only:
serializer = SAccountListItemSpendOnly
return serializer.TABLE_FIELDS_PROPS