Semaphore(信号量)
控制数量 减少无效的请求
import aiohttp
import asyncio
import time
from functools import wraps
NUMBERS = range(12)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(6)
def timer(func):
@wraps(func)
async def inner(*args, **kw):
start = time.perf_counter()
await func(*args, **kw)
print(f'{func.__name__} cost: {time.perf_counter() - start}')
return inner
@timer
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
print(data)
return data['args']['a']
@timer
async def print_result(a):
async with sema:
r = await fetch_async(a)
print('fetch({}) = {}'.format(a, r))
loop = asyncio.get_event_loop()
f = asyncio.wait([print_result(num) for num in NUMBERS])
loop.run_until_complete(f)
loop.close()
Lock 锁
1 await lock.acquire() # 使用 acquire 加锁
2 loop.call_later(0.1, functools.partial(unlock, lock)) 通过 call_later 方法添加一个 0.1 秒后释放锁的函数
3 async with lock: 依次获取锁
import asyncio
import functools
def unlock(lock):
print("callback releasing lock")
lock.release()
async def test(locker, lock):
print('{} waiting for the lock'.format(locker))
async with lock:
print("{} acquired lock".format(locker))
print('{} released lock'.format(locker))
async def main(loop):
lock = asyncio.Lock()
await lock.acquire()
# 使用 acquire 加锁
loop.call_later(0.1, functools.partial(unlock, lock))
# 通过 call_later 方法添加一个 0.1 秒后释放锁的函数
await asyncio.wait([test('l1', lock), test('l2', lock)])
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Condition(条件)
producer 和 producer2 是异步的函数,所以不能使用之前 call_later 方法,需要用 create_task 把它创建成一个任务(Task)
最后要把任务取消掉
import asyncio
import functools
async def consumer(cond, name, second):
await asyncio.sleep(second)
async with cond:
await cond.wait()
print('{}: Resource is available to consumer'.format(name))
async def producer(cond):
await asyncio.sleep(2)
for n in range(1, 3):
async with cond:
print('notifying consumer {}'.format(n))
cond.notify(n=n)
# 使用 notify 方法挨个通知单个消费者
await asyncio.sleep(0.1)
async def producer2(cond):
await asyncio.sleep(2)
async with cond:
print('Making resource available')
cond.notify_all()
# 使用 notify_all 方法一次性的通知全部消费者
async def main(loop):
condition = asyncio.Condition()
task = loop.create_task(producer(condition))
consumers = [consumer(condition, name, index)
for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
task = loop.create_task(producer2(condition))
consumers = [consumer(condition, name, index)
for index, name in enumerate(('c1', 'c2'))]
await asyncio.wait(consumers)
task.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Event(事件)
和锁的意思很像,不同的是,事件被触发时,消费者不用获取锁就要尽快的执行下去了
import asyncio
import functools
def set_event(event):
print('setting event in callback')
event.set()
async def test(name, event):
print('{} waiting for event'.format(name))
await event.wait()
print('{} triggered'.format(name))
async def main(loop):
event = asyncio.Event()
print('event start state: {}'.format(event.is_set()))
loop.call_later(
0.1, functools.partial(set_event, event)
)
await asyncio.wait([test('e1', event), test('e2', event)])
print('event end state: {}'.format(event.is_set()))
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
Queue 队列 (优先级队列)
消费者 consumer 按照优先级消耗
import asyncio
import random
import aiohttp
NUMBERS = random.sample(range(100), 7)
URL = 'http://httpbin.org/get?a={}'
sema = asyncio.Semaphore(6)
async def fetch_async(a):
async with aiohttp.request('GET', URL.format(a)) as r:
data = await r.json()
return data['args']['a']
async def collect_result(a):
async with sema:
return await fetch_async(a)
async def produce(queue):
for num in NUMBERS:
print('producing {}'.format(num))
item = (num, num)
await queue.put(item)
async def consume(queue):
while 1:
item = await queue.get()
num = item[0]
rs = await collect_result(num)
print('consuming {}...'.format(rs))
queue.task_done()
async def run():
queue = asyncio.PriorityQueue()
consumer = asyncio.ensure_future(consume(queue))
await produce(queue)
await queue.join()
consumer.cancel()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
run_in_executor
让同步函数在一个执行器 (executor) 里面运行
run_into_executor
可以把同步函数逻辑转化成一个协程,且实现了并发
import asyncio
import time
def a():
time.sleep(1)
return 'A'
async def b():
await asyncio.sleep(1)
return 'B'
def show_perf(func):
print('*' * 20)
start = time.perf_counter()
asyncio.run(func())
print(f'{func.__name__} Cost: {time.perf_counter() - start}')
async def c1():
loop = asyncio.get_running_loop()
await asyncio.gather(
loop.run_in_executor(None, a),
# 把普通函数 a 用loop.run_in_executor封装到协程
# loop.run_in_executor(None, a)这里面第一个参数是要传递concurrent.futures.Executor实例的,
# 传递 None 会选择默认的 executor
b()
)
show_perf(c1)
run_in_executor
可以用进程池 或者是 线程池
concurrent.futures.ProcessPoolExecutor
concurrent.futures.ThreadPoolExecutor
run_in_executor(executor, func)
async def c3():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as e:
print(await asyncio.gather(
loop.run_in_executor(e, a),
b()
))
在其他线程中执行协程 (线程安全的)
import asyncio
from threading import Thread
from functools import partial
import time
async def a():
time.sleep(1)
print("a in ")
return "a"
def start_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
def shutdown(loop):
loop.stop()
async def b1():
new_loop = asyncio.new_event_loop()
# 程应该从另一个线程中调用,而非事件循环运行所在线程,所以用 asyncio.new_event_loop () 新建一个事件循环
t = Thread(target=start_loop, args=(new_loop,))
t.start()
future = asyncio.run_coroutine_threadsafe(a(), new_loop)
# 用 asyncio.run_coroutine_threadsafe 执行协程 a 了,它返回了一个 Future 对象
print(future)
print(f'Result: {future.result(timeout=2)}')
# future.result (timeout=2) 就可以获得结果,设置 timeout 的值要大于 a 协程执行时间
new_loop.call_soon_threadsafe(partial(shutdown, new_loop))
# 由于 loop.run_forever 会阻塞程序关闭,所以需要结束时杀掉线程,所以用 call_soon_threadsafe 回调函数 shutdown 去停止事件循环
asyncio.run(b1())