asyncio的基本概念

参考–使用Python进行并发编程 使用Python进行并发编程-asyncio篇(二)

核心关键字 Eventloop + Coroutine + Future + Task

Eventloop

asyncio 应用的核心,是中央总控。Eventloop 实例提供了注册、取消和执行任务和回调的方法

把一些异步函数 (就是任务,Task) 注册到这个事件循环上,事件循环会循环执行这些函数 (但同时只能执行一个),当执行到某个函数时,如果它正在等待 I/O 返回,事件循环会暂停它的执行去执行其他的函数;当某个函数完成 I/O 后会恢复,下次循环到它的时候继续执行。

因此,这些异步函数可以协同 (Cooperative) 运行:这就是事件循环的目标

asyncio 根据你的操作系统信息会帮你选择默认的事件循环类

*nix 下使用的类继承于 BaseEventLoop

eventloop的基本实现


#  call_exception_handler 和 get_debug 是必须存在 

from collections import deque


def done_callback(fut):
    fut._loop.stop()


class Loop:
    def __init__(self):
        self._ready = deque()
        self._stopping = False

    def create_task(self, coro):
        Task = asyncio.tasks.Task
        task = Task(coro, loop=self)
        return task

    def run_until_complete(self, fut):
        tasks = asyncio.tasks
        # 获取任务
        fut = tasks.ensure_future(
                    fut, loop=self)
        # 增加任务到self._ready
        fut.add_done_callback(done_callback)
        # 跑全部任务
        self.run_forever()
        # 从self._ready中移除
        fut.remove_done_callback(done_callback)

    def run_forever(self):
        try:
            while 1:
                self._run_once()
                if self._stopping:
                    break
        finally:
            self._stopping = False

    def call_soon(self, cb, *args):
        self._ready.append((cb, args))

    def _run_once(self):
        ntodo = len(self._ready)
        for i in range(ntodo):
            t, a = self._ready.popleft()
            t(*a)

    def stop(self):
        self._stopping = True

    def close(self):
        self._ready.clear()

    def call_exception_handler(self, c):
        pass

    def get_debug(self):
        return False



async def foo():
    print('Hello Foo')


async def bar():
    print('Hello Bar')

loop = Loop()
tasks = [loop.create_task(foo()),
         loop.create_task(bar())]
loop.run_until_complete(
        asyncio.wait(tasks))
loop.close()

Coroutine

协程 (Coroutine) 本质上是一个函数,特点是在代码块中可以将执行权交给其他协程`:


# coding: utf-8

import asyncio


async def a():
    print("Suspending a")
    await asyncio.sleep(0)
    # 协程 a 被挂起,  sleep 0 并不会真的 sleep(因为时间为 0),但是却可以把控制权交出去
    print("Resuming a")


async def b():
    print("fn b")


async def main():
    await asyncio.gather(a(), b())
    # asyncio.gather 用来并发运行任务


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

    # asyncio.run(main())   Python 3.7+ 新加的接口

Future

代表了一个「未来」对象,异步操作结束后会把最终结果设置到这个 Future 对象上

Future 是对协程的封装,不过日常开发基本是不需要直接用这个底层 Future 类的

future 對象的方法 属性

    cancel
    cancelled
    done
    exception
    get_loop
    remove_done_callback
    result
    set_exception
    set_result
    Process
    finished

可以对这个 Future 实例添加完成后的回调 (add_done_callback)取消任务 (cancel)、 设置最终结果 (set_result)设置异常 (如果有的话,set_exception)

future 对象 在 await 之后状态成了 finished

因为用loop.run_in_executor创建的 Future 注册了一个回调(通过asyncio.futures.wrap_future,
加了一个_call_set_state回调
# 一个对象可以被 await 的条件

def __await_(self):
    if not self.done():
        self._asyncio_future_blocking = True
        yield self
    if not self.done():
        raise RuntimeError("await wasn't used with future")
    return self.result()

Task

Eventloop 除了支持协程,还支持注册 FutureTask 类型的对象

  • Future 是协程的封装

    Future 对象提供了很多任务方法 (如完成后的回调、取消、设置任务结果等等), 但是开发者并不需要直接操作 Future 这种底层对象

  • Task Future 的子类

    协同的调度协程以实现并发

Task 类用来管理协同程序运行的状态

Task类的基本实现

import asyncio


class Task(asyncio.futures.Future):
    def __init__(self, gen, *,loop):
        super().__init__(loop=loop)
        self._gen = gen
        self._loop.call_soon(self._step)

    def _step(self, val=None, exc=None):
        # 如果_step 方法没有让协程执行完成,就会添加回调,_wakeup 又会继续执行_step... 直到协程程序完成,并 set_result。
        # 写个使用它的例子:
        try:
            if exc:
                f = self._gen.throw(exc)
            else:
                f = self._gen.send(val)
        except StopIteration as e:
            self.set_result(e.value)
        except Exception as e:
            self.set_exception(e)
        else:
            f.add_done_callback(
                 self._wakeup)

    def _wakeup(self, fut):
        # _wakeup 又会继续执行_step... 直到协程程序完成,并 set_result
        try:
            res = fut.result()
        except Exception as e:
            self._step(None, e)
        else:
            self._step(res, None)
            



async def foo():
    await asyncio.sleep(2)
    print('Hello Foo')


async def bar():
    await asyncio.sleep(1)
    print('Hello Bar')


loop = asyncio.get_event_loop()
tasks = [Task(foo(), loop=loop),
         loop.create_task(bar())]
loop.run_until_complete(
        asyncio.wait(tasks))
loop.close()

asyncio 并发 正确的使用方式

create_task 是 AbstractEventLoop 的抽象方法,不同的 loop 可以实现不同的创建 Task 方法,这里用的是 BaseEventLoop 的实现。

ensure_future 是 asyncio 封装好的创建 Task 的函数,它还支持一些参数,甚至指定 loop。 一般应该使用它


import asyncio
import time

async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')

# 错误的用法1
async def e1():
    await a()
    await b()

# 错误的用法2
# 直接 await task 不会对并发有帮助
async def e2():
    await asyncio.create_task(a())
    await asyncio.create_task(b())



async def c1():
    await asyncio.gather(a(), b())


async def c2():
    await asyncio.wait((a(), b()))


async def c3():
    # asyncio.create_task 相当于把协程封装成 Task
    task1 = asyncio.create_task(a())
    task2 = asyncio.create_task(b())
    await task1
    await task2


async def c4():
    task1 = asyncio.create_task(a())
    await b()
    await task1

async def c5():
    task = asyncio.ensure_future(b())
    await a()
    # 挂起 a(), 
    await task


async def c6():
    loop = asyncio.get_event_loop()
    task = loop.create_task(b())
    await a()
    await task


#  time.perf_counter  是现在推荐的计算耗时的用法

def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')


if __name__ == '__main__':
    show_perf(c4)

不同并发请求方式的响应时间比较

requests + ThreadPoolExecutor

2s左右

import time
import requests
from concurrent.futures import ThreadPoolExecutor

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

def fetch(a):
    r = requests.get(URL.format(a))
    return r.json()['args']['a']

start = time.perf_counter()
with ThreadPoolExecutor(max_workers=3) as executor:
    for num, result in zip(NUMBERS, executor.map(fetch, NUMBERS)):
        print('fetch({}) = {}'.format(num, result))

print(f'Use requests+ThreadPoolExecutor cost: {time.perf_counter() - start}')

asyncio + aiohttp

0.5s左右



import aiohttp
import asyncio
import time

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'

async def fetch_async(a):
    async with aiohttp.request("GET", URL.format(a)) as r:
        data = await r.json()
        
        # 使用 await 关键字进行协程切换  
        #  r.json 方法会等待 I/O(也就是正在做一个网络请求),这种就可以切换去做其他的时候,之后再切换回来
    return data

start = time.perf_counter()
event_loop = asyncio.get_event_loop()
tasks = [fetch_async(num) for num in NUMBERS]
results = event_loop.run_until_complete(asyncio.gather(*tasks))

for num, result in zip(NUMBERS, results):
    print(f"fetch{num} = {result}")


print(f'cost {time.perf_counter() - start}')

C10K 以及 I/O 多路復用

1 多进程/ 多线程 消耗过大
    进程 / 线程作为处理单元还是太厚重
    系统调度的代价太高

2 用同一进程 / 线程来同时处理若干连接

「用同一进程 / 线程来同时处理若干连接」 I/O 多路复用

  • select

每个连接对应一个描述符(socket),循环处理各个连接,先查下它的状态, ready 了就进行处理,不 ready 就不进行处理

缺點

    单个进程能够监视的文件描述符的数量存在最大限制
    
    对 socket 进行扫描时是线性扫描,即采用轮询的方法,效率较低。
    
    需要维护一个用来存放大量的数据结构,这样会使得用户空间和内核空间在传递该结构时复制开销大
  • poll

基于链表来存储的,没有最大连接数的限制

缺點

    大量的的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义。
    
    poll 的特点是「水平触发 (只要有数据可以读,不管怎样都会通知)」,如果报告后没有被处理,那么下次 poll 时会再次报告它。
  • epoll

使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的 copy 只需一次

epoll 支持水平触发和边缘触发,最大的特点在于「边缘触发」,它只告诉进程哪些刚刚变为就绪态,并且只会通知一次

没有最大并发连接的限制,能打开的 fd 的上限远大于 1024(1G 的内存上能监听约 10 万个端口)

效率提升,不是轮询的方式,不会随着 fd 数目的增加效率下降

内存拷贝,利用 mmap () 文件映射内存加速与内核空间的消息传递;即 epoll 使用 mmap 减少复制开销

epoll 成为 C10K killer、高并发、高性能、异步非阻塞这些技术的代名词

FreeBSD 推出了 kqueue,
Linux 推出了 epoll,
Windows 推出了 IOCP,
Solaris 推出了 /dev/poll

epoll 技术的编程模型就是异步非阻塞回调,也可以叫做 Reactor 事件驱动、事件轮循(EventLoop)

libevent、Tornado、Node.js 这些就是 epoll 时代的产物

协程

试图用一组少量的线程来实现多个任务,一旦某个任务阻塞,则可能用同一线程继续运行其他任务,避免大量上下文的切换

本质上也是异步非阻塞技术

# 实例
import time
import threading
import asyncio
import aiomysql
import numpy
import pandas as pd
from datetime import datetime
import os, sys
import django


BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
django.setup()

from django.conf import settings
from os import path, makedirs, remove
from libs.model.account import MAccount
from libs.serializer.accounts import SAccountListItem, SAccountListItemSpendOnly, SAccountListItemDaily, SAccountListItemDailySpendOnly



DATABASE_SETTING = settings.DATABASES['default']

f_str = '%H:%M:%S'


class AccountDataMail():
    """
    账户数据邮件
    """
    CHUNK_SIZE = 100
    # CHUNK_SIZE = 2
    loop = None
    pool = None
    source_data = {}

    def __init__(self, info):
        self.user = info['user_id']
        data = info['data']
        self.level = data['level']
        if self.level == 0:
            self.date_range = data['date_range']
        else:
            self.month = data['month']
        self.spend_only = data['spend_only']
        try:
            self.queryset = MAccount.objects.raw(data.get('queryset_sql'))
        except:
            self.queryset = MAccount.objects.all().filter(active=True)
        self.create_thread()

    def create_thread(self):
        print('主线程 ID:{}'.format(threading.currentThread().ident))
        self.loop = asyncio.new_event_loop()
        self.thread_handler = threading.Thread(target=self.init_ioloop, args=(self.loop, ))
        self.thread_handler.start()

    def init_ioloop(self, lp: asyncio.AbstractEventLoop):
        print('初始化 Loop,并启动子线程,ID 为:{}'.format(threading.currentThread().ident))
        asyncio.set_event_loop(lp)

    async def init_mysql_pool(self):
        mysql_settings = {
            'host': DATABASE_SETTING['HOST'],
            'port': int(DATABASE_SETTING['PORT']),
            'db': DATABASE_SETTING['NAME'],
            'user': DATABASE_SETTING['USER'],
            'password': DATABASE_SETTING['PASSWORD'],
            'charset': DATABASE_SETTING['OPTIONS']['charset'],
            'maxsize': 100
        }
        print('初始化连接池')
        return await aiomysql.create_pool(**mysql_settings)

    async def close_mysql(self):
        self.pool.close()
        await self.pool.wait_closed()

    def mail(self):
        file_name = 'test.xlsx'
        total = len(self.queryset)
        chunks = round(total / self.CHUNK_SIZE)
        print(total)
        self.loop.run_until_complete(self.get_source_data(chunks))
        data_list = []
        print(datetime.now().strftime(f_str), '开始处理数据')
        for i in range(chunks):
            data_list = numpy.append(data_list, self.source_data.pop(i))
        df = pd.DataFrame.from_records(data_list)
        props = self.get_table_field_props()
        df = df[props.keys()]
        dsl = 'daily_spend_list'
        tds = 'total_daily_spend'
        if dsl in df.columns:
            cdata = df[dsl].apply(pd.Series).stack().reset_index(level=-1, drop=True)
            cdf = cdata.to_frame()[0].apply(pd.Series)
            ldf = cdf.pivot(values='system_value', columns='date')
            total = df.pop(tds)
            df = df.drop([dsl,], axis=1)
            df = pd.concat([df, ldf], axis=1)
            df.insert(len(df.columns), tds, total)
        print(datetime.now().strftime(f_str), '开始写文件')
        if path.exists(file_name):
            remove(file_name)
        df.rename(columns=props).set_index(props['account_id']).to_excel(file_name)
        print(datetime.now().strftime(f_str), '写文件完毕')

    async def get_source_data(self, chunks):
        print(datetime.now().strftime(f_str), '获取源数据')
        self.pool = await self.init_mysql_pool()
        takes = [self.get_chunk(i) for i in range(chunks)]
        await asyncio.gather(*takes)

    async def get_chunk(self, index):
        sql = self.queryset.query
        query_sql = '{} LIMIT {}, {}'.format(sql, index * self.CHUNK_SIZE, self.CHUNK_SIZE)
        arr = []
        async with self.pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(query_sql)
                row = await cur.fetchone()
                print(row)
                while row:
                    arr.append(MAccount(**row))
                    row = await cur.fetchone()
        try:
            print(datetime.now().strftime(f_str), '获取到源数据块【{}】'.format(index))
            serializer = self.get_serializer(arr)
            self.source_data[index] = serializer.data
            print(datetime.now().strftime(f_str), '处理源数据块【{}】成功'.format(index))
        except Exception as e:
            print(e)

    def get_serializer(self, instance=None):
        kwargs = {
            'instance': instance or self.queryset,
            'many': True,
            'context': {}
        }
        if self.level == 0:
            kwargs['context']['date_range'] = self.date_range
            if self.spend_only:
                return SAccountListItemDailySpendOnly(**kwargs)
            else:
                return SAccountListItemDaily(**kwargs)
        else:
            kwargs['context']['month'] = self.month
            if self.spend_only:
                return SAccountListItemSpendOnly(**kwargs)
            else:
                return SAccountListItem(**kwargs)

    def get_table_field_props(self):
        serializer = SAccountListItem
        if self.level == 0:
            if self.spend_only:
                serializer = SAccountListItemDailySpendOnly
            else:
                serializer = SAccountListItemDaily
        else:
            if self.spend_only:
                serializer = SAccountListItemSpendOnly
        return serializer.TABLE_FIELDS_PROPS

Buy me a 肥仔水!