asyncio的同步(并发量控制)

参考–深入理解asyncio asyncio

Semaphore(信号量)

控制数量 减少无效的请求


import aiohttp
import asyncio
import time
from functools import wraps

NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(6)



def timer(func):
    @wraps(func)
    async def inner(*args, **kw):
        start = time.perf_counter()
        await func(*args, **kw)
        print(f'{func.__name__} cost: {time.perf_counter() - start}')
    return inner


@timer
async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
        print(data)
    return data['args']['a']

@timer
async def print_result(a):
    async with sema:
        r = await fetch_async(a)
        print('fetch({}) = {}'.format(a, r))


loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
loop.close()

Lock 锁

1  await lock.acquire()  # 使用 acquire 加锁 
2  loop.call_later(0.1, functools.partial(unlock, lock))  通过 call_later 方法添加一个 0.1 秒后释放锁的函数
3  async with lock:  依次获取锁

import asyncio
import functools

def unlock(lock):
    print("callback  releasing lock")
    lock.release()


async def test(locker, lock):
    print('{} waiting for the lock'.format(locker))
    async with lock:
        print("{} acquired lock".format(locker))
    print('{} released lock'.format(locker))

async def main(loop):
    lock = asyncio.Lock()
    await lock.acquire()
    # 使用 acquire 加锁
    loop.call_later(0.1, functools.partial(unlock, lock))
    # 通过 call_later 方法添加一个 0.1 秒后释放锁的函数
    await asyncio.wait([test('l1', lock), test('l2', lock)])


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()



Condition(条件)

producer 和 producer2 是异步的函数,所以不能使用之前 call_later 方法,需要用 create_task 把它创建成一个任务(Task)

最后要把任务取消掉

import asyncio
import functools


async def consumer(cond, name, second):
    await asyncio.sleep(second)
    async with cond:
        await cond.wait()
        print('{}: Resource is available to consumer'.format(name))


async def producer(cond):
    await asyncio.sleep(2)
    for n in range(1, 3):
        async with cond:
            print('notifying consumer {}'.format(n))
            cond.notify(n=n)
            # 使用 notify 方法挨个通知单个消费者
        await asyncio.sleep(0.1)

async def producer2(cond):
    await asyncio.sleep(2)
    async with cond:
        print('Making resource available')
        cond.notify_all()
        # 使用 notify_all 方法一次性的通知全部消费者

async def main(loop):
    condition = asyncio.Condition()

    task = loop.create_task(producer(condition))
    consumers = [consumer(condition, name, index)
                 for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()

    task = loop.create_task(producer2(condition))
    consumers = [consumer(condition, name, index)
                 for index, name in enumerate(('c1', 'c2'))]
    await asyncio.wait(consumers)
    task.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Event(事件)

和锁的意思很像,不同的是,事件被触发时,消费者不用获取锁就要尽快的执行下去了


import asyncio
import functools


def set_event(event):
    print('setting event in callback')
    event.set()


async def test(name, event):
    print('{} waiting for event'.format(name))
    await event.wait()
    print('{} triggered'.format(name))


async def main(loop):
    event = asyncio.Event()
    print('event start state: {}'.format(event.is_set()))
    loop.call_later(
        0.1, functools.partial(set_event, event)
    )
    await asyncio.wait([test('e1', event), test('e2', event)])
    print('event end state: {}'.format(event.is_set()))

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Queue 队列 (优先级队列)

消费者 consumer 按照优先级消耗


import asyncio
import random
import aiohttp

NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(6)


async def fetch_async(a):
    async with aiohttp.request('GET', URL.format(a)) as r:
        data = await r.json()
    return data['args']['a']


async def collect_result(a):
    async with sema:
        return await fetch_async(a)


async def produce(queue):
    for num in NUMBERS:
        print('producing {}'.format(num))
        item = (num, num)
        await queue.put(item)


async def consume(queue):
    while 1:
        item = await queue.get()
        num = item[0]
        rs = await collect_result(num)
        print('consuming {}...'.format(rs))
        queue.task_done()


async def run():
    queue = asyncio.PriorityQueue()
    consumer = asyncio.ensure_future(consume(queue))
    await produce(queue)
    await queue.join()
    consumer.cancel()


loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

run_in_executor

让同步函数在一个执行器 (executor) 里面运行

run_into_executor可以把同步函数逻辑转化成一个协程,且实现了并发

import asyncio
import time

def a():
    time.sleep(1)
    return 'A'


async def b():
    await asyncio.sleep(1)
    return 'B'


def show_perf(func):
    print('*' * 20)
    start = time.perf_counter()
    asyncio.run(func())
    print(f'{func.__name__} Cost: {time.perf_counter() - start}')


async def c1():
    loop = asyncio.get_running_loop()
    await asyncio.gather(
        loop.run_in_executor(None, a),
        # 把普通函数 a 用loop.run_in_executor封装到协程
        # loop.run_in_executor(None, a)这里面第一个参数是要传递concurrent.futures.Executor实例的,
        #  传递 None 会选择默认的 executor
        b()
    )

show_perf(c1)


run_in_executor 可以用进程池 或者是 线程池

concurrent.futures.ProcessPoolExecutor
concurrent.futures.ThreadPoolExecutor

run_in_executor(executor, func)

async def c3():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as e:
        print(await asyncio.gather(
            loop.run_in_executor(e, a),
            b()
        ))

在其他线程中执行协程 (线程安全的)


import asyncio
from threading import Thread
from  functools import partial
import time

async def a():
    time.sleep(1)
    print("a in ")
    return "a"

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def shutdown(loop):
    loop.stop()


async def b1():
    new_loop = asyncio.new_event_loop()
    # 程应该从另一个线程中调用,而非事件循环运行所在线程,所以用 asyncio.new_event_loop () 新建一个事件循环
    t = Thread(target=start_loop, args=(new_loop,))
    t.start()

    future = asyncio.run_coroutine_threadsafe(a(), new_loop)
    # 用 asyncio.run_coroutine_threadsafe 执行协程 a 了,它返回了一个 Future 对象

    print(future)
    print(f'Result: {future.result(timeout=2)}')
    # future.result (timeout=2) 就可以获得结果,设置 timeout 的值要大于 a 协程执行时间
    new_loop.call_soon_threadsafe(partial(shutdown, new_loop))
    # 由于 loop.run_forever 会阻塞程序关闭,所以需要结束时杀掉线程,所以用 call_soon_threadsafe 回调函数 shutdown 去停止事件循环


asyncio.run(b1())

Buy me a 肥仔水!