基本内容
gevent
是基于libev
的并发库。它为各种并发
和网络相关任务
提供了一个干净的API
Greenlets
gevent
中使用的主要模式是Greenlet
,这是作为C扩展模块
提供给Python的轻量协程
。
Greenlets
都在主程序的OS进程内运行
,但是是协同安排
的
Only one greenlet is ever running at any given time.
The primary pattern used in gevent is the Greenlet, a lightweight coroutine provided to Python as a C extension module.
Greenlets all run inside of the OS process for the main program but are scheduled cooperatively
与由multiprocessing
或threading
提供的并行性构造不同,
它们执行spin processes
和POSIX线程
(真正由操作系统调度
,是真正并行的)
This differs from any of the real parallelism constructs provided by multiprocessing or threading libraries
which do spin processes and POSIX threads which are scheduled by the operating system and are truly parallel.
同步与异步执行
并发
的核心思想
是可以将一个较大的任务分解为一组子任务
,这些子任务计划同时或异步运行,而不是一次或同步运行。
两个子任务之间的切换称为上下文切换
在GEVENT上下文切换是通过yielding
得到
实例:
import gevent
def foo():
print('Running in foo')
gevent.sleep(0) # 调用
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0) # 调用
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
# gevent.spawn(foo).join()
# gevent.spawn(bar).join()
# Running in foo
# Explicit context to bar
# Explicit context switch to foo again
# Implicit context switch back to bar
当将gevent
用于可协作调度
的网络
和IO绑定功能
时,gevent的真正力量就来了。
Gevent
可以确保您的网络库在任何可能的情况下都隐式地
产生其 greenlet上下文(greenlet contexts )
。
select()
function is normally a blocking call that polls on various file descriptors.
select()函数
轮询各种文件描述符的阻塞调用
例子
import time
import gevent
from gevent import select
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr2():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr3():
print("Hey lets do some stuff while the greenlets poll, %s" % tic())
gevent.sleep(1)
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
gevent.spawn
将函数包装在Greenlet线程
中
例子:
# 同步 vs 异步
import gevent
import random
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0,2)*0.001)
print('Task %s done' % pid)
def synchronous():
for i in range(1,10):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
# gevent.spawn 将给定功能包装在Greenlet线程中
# 初始化的greenlets列表存储在数组中threads,该数组传递给gevent.joinall函数
gevent.joinall(threads)
# 该函数阻止当前程序运行所有给定的greenlets。
# 仅当所有greenlet终止时,执行才会向前执行
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
# Synchronous:
# Task 1 done
# Task 2 done
# Task 3 done
# Task 4 done
# Task 5 done
# Task 6 done
# Task 7 done
# Task 8 done
# Task 9 done
# Asynchronous:
# Task 1 done
# Task 5 done
# Task 6 done
# Task 2 done
# Task 4 done
# Task 7 done
# Task 8 done
# Task 9 done
# Task 0 done
# Task 3 done
网络请求的例子: (服务器异步获取数据)
from gevent import monkey
monkey.patch_all()
import gevent
import requests
import json
def fetch(pid):
response = requests.get('https://www.python.org/')
result = response.text[:200]
print('Process %s: %s' % (pid, result))
return result
def synchronous():
for i in range(1,10):
fetch(i)
def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
greenlets
是确定性的 deterministic
给定相同的greenlet配置
和相同的输入
,它们始终会产生相同的输出
import time
def echo(i):
time.sleep(0.001)
return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
# False
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
# True
并发相关的问题
race condition
即使gevent
通常是确定性
的,但当开始与套接字
和文件之类
的外部服务进行交互时,
不确定性
的来源仍会渗入您的程序中
两个并发线程/进程依赖于某个共享资源但还试图修改该值时,就会发生竞争状态。
这导致资源的值随时间而变,取决于执行顺序
最好的方法是始终避免所有全局状态
Greenlets 的初始化包装 Spawning Greenlets
import gevent
from gevent import Greenlet
def foo(message, n):
"""
Each thread will be passed the message, and n arguments
in its initialization.
"""
gevent.sleep(n)
print(message)
# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)
# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)
# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
# Block until all threads complete.
gevent.joinall(threads)
类的方式初始化
Greenlet
的_run
方法 (类似thread)
import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):
def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()
Greenlet State 状态
started -- Boolean, indicates whether the Greenlet has been started
ready() -- Boolean, indicates whether the Greenlet has halted
successful() -- Boolean, indicates whether the Greenlet has halted and not thrown an exception
value -- arbitrary, the value returned by the Greenlet
exception -- exception, uncaught exception instance thrown inside the greenlet
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You fail at failing.')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached')
print(winner.value) # 'You win!'
print(loser.value) # None
print(winner.ready()) # True
print(loser.ready()) # True
print(winner.successful()) # True
print(loser.successful()) # False
# The exception raised in fail, will not propagate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.
print(loser.exception)
# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
程序关闭
主程序上侦听SIGQUIT
事件,并gevent.shutdown
在退出前调用
import gevent
import signal
def run_forever():
gevent.sleep(1000)
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.kill)
thread = gevent.spawn(run_forever)
thread.join()
超时处理
import gevent
from gevent import Timeout
seconds = 10
timeout = Timeout(seconds)
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
# 上下文处理
import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
# (3)
# --
import gevent
from gevent import Timeout
def wait():
gevent.sleep(2)
timer = Timeout(1).start()
thread1 = gevent.spawn(wait)
try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out')
# --
timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)
try:
thread2.get(timeout=timer)
except Timeout:
print('Thread 2 timed out')
# --
try:
gevent.with_timeout(1, wait)
except Timeout:
print('Thread 3 timed out')
` Monkey patching`
把标准库中的thread/socket
等给替换掉(grpc不行)
Python的运行时允许在运行时修改大多数对象,包括模块,类甚至函数
在极端情况下,如果一个库需要更改Python本身的基本行为
,则可以使用猴子补丁
gevent.monkey.patch_all() ,其作用是把标准库中的thread/socket等给替换掉。这样我们在后面使用socket的时候可以跟平常一样使用,无需修改任何代码,但是它变成非阻塞的了
gevent.monkey.patch_all()
数据结构
Events
事件Events
是Greenlets之间异步通信的一种形式
import gevent
from gevent.event import Event
'''
Illustrates the use of events
'''
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # blocking
print("It's about time")
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
if __name__ == '__main__': main()
Event对象
的扩展
是AsyncResult
,它允许您将值与唤醒调用一起发送。
send a value along with the wakeup call
有时将其称为“将来 future ”
或“递延 deferred”
,它引用了可以在任意时间表上设置的将来值
import gevent
from gevent.event import AsyncResult
a = AsyncResult()
def setter():
"""
After 3 seconds set the result of a.
"""
print(f"sleep 3 seconds")
gevent.sleep(3)
print("set hello start")
a.set('Hello!')
print("setted ")
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print("get hello")
print(a.get())
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
Queues
队列是按顺序排列的数据集,它们具有通常的put / get
操作,但以可以在Greenlets
上安全操作的方式编写
。
如果一个Greenlet
从队列
中获取了一个项目,则同一项目不会被同时执行的另一个Greenlet
获取
import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
print('Quitting time!')
def boss():
for i in xrange(1,25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
每个put
和get
操作都有一个非阻塞的对应方法
, put_nowait
和get_nowait
它们不会阻塞,但如果无法执行操作,则引发gevent.queue.Empty
或gevent.queue.Full
的异常
设置队列的大小
import gevent
from gevent.queue import Queue, Empty
tasks = Queue(maxsize=3) # 队列进行了限制,防止其包含三个以上的元素
def worker(name):
try:
while True:
task = tasks.get(timeout=1) # 在该时间范围内找不到任何 task, 队列会抛出gevent.queue.Empty异常, 并且退出
print('Worker %s got task %s' % (name, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')
def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""
for i in xrange(1,10):
tasks.put(i)
print('Assigned all work in iteration 1')
for i in xrange(10,20):
tasks.put(i)
print('Assigned all work in iteration 2')
gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
Groups and Pools
Groups
是运行中的Greenlet的集合
,这些Greenlet
一起作为一组
进行管理和调度
(1)
import gevent
def talk(msg):
for _ in range(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
gevent.joinall(
[g1, g2, g3]
)
(2) 用group管理 异步任务
import gevent
from gevent.pool import Group
def talk(msg):
for _ in range(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.add(g3)
group.join()
Group
还提供了API,用于以各种方式将任务分发到分组的greenlet
, 并收集结果 result
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, range(3))
# Size of group 3
# Hello from Greenlet 139670428683336
# Size of group 3
# Hello from Greenlet 139670428683592
# Size of group 3
# Hello from Greenlet 139670428683848
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, range(3)):
print(i)
# Ordered
# ('task', 0)
# ('task', 1)
# ('task', 2)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):
print(i)
# Unordered
# ('task', 2)
# ('task', 1)
# ('task', 0)
` Pool
是一种设计用于处理,
限制并发的动态数量的
greenlet的结构`。
并行执行许多网络或IO绑定任务
import gevent
from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):
print('Size of pool %s' % len(pool))
pool.map(hello_from, xrange(3))
套接字上轮询
的类 的实例
from gevent.pool import Pool
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start() # 启动 pool
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
# =增加不同的 socket 套接字
def shutdown(self):
self.pool.kill() # 关闭 pool
Locks 锁 and Semaphores 信号量
信号量
信号量 Semaphores
是一种更低级别的同步方法
,协调
并限制greenlet并发访问或执行
。
信号量
公开两种方法,acquire
and release
信号量的界限
获取和释放信号量被获取和释放的次数之间的差称为信号量的界限
。
The difference between the number of times a semaphore has been acquired and released is called
the bound of the semaphore
如果信号量绑定
达到0,它将阻塞,直到另一个greenlet
释放其捕获。
from gevent import sleep
from gevent.pool import Pool
from gevent.lock import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)
pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))
Lock
锁 (信号量绑定范围为1的信号量)
范围为1的信号量
称为锁 Lock
。
它为一个greenlet
提供独占执行
。
通常用于确保在程序上下文中
仅一次使用的资源。
resources are only in use at one time in the context of a program
Thread Locals
Gevent
还允许您指定Greenlet上下文本地的数据
。
在内部,实现为以greenlet
的getcurrent()值
为键
的私有名称空间
的 全局查找
import gevent
from gevent.local import local
stash = local()
def f1():
stash.x = 1
print(stash.x)
def f2():
stash.y = 2
print(stash.y)
try:
stash.x
except AttributeError:
print("x is not local to f2")
g1 = gevent.spawn(f1)
g2 = gevent.spawn(f2)
gevent.joinall([g1, g2])
# 1
# 2
# x is not local to f2
许多使用gevent的Web框架
将HTTP会话对象 session
存储在gevent线程本地变量内
。
例如,使用Werkzeug
实用程序库及其代理对象,我们可以创建Flask风格的请求对象
# Flask的系统比本示例要复杂一些,但是使用 线程本地变量 作为 本地会话存储 的想法仍然相同。
from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager
from gevent.wsgi import WSGIServer
_requests = local()
request = LocalProxy(lambda: _requests.request)
@contextmanager
def sessionmanager(environ):
_requests.request = Request(environ)
yield
_requests.request = None
def logic():
return "Hello " + request.remote_addr
def application(environ, start_response):
status = '200 OK'
with sessionmanager(environ):
body = logic()
headers = [
('Content-Type', 'text/html')
]
start_response(status, headers)
return [body]
WSGIServer(('', 8000), application).serve_forever()
Subprocess 子流程
从gevent 1.0
开始,已添加 gevent.subprocess
-Python的subprocess模块
的修补版本。
支持在子流程上的协同等待
import gevent
from gevent.subprocess import Popen, PIPE
def cron():
while True:
print("cron")
gevent.sleep(0.2)
g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())
由于基于multiprocessing.Connection
的对象(例如Pipe
)公开其底层文件描述符
,
因此gevent.socket.wait_read
和wait_write
可用于在实际读取/写入之前, 协同等待
就绪/就绪事件
cooperatively wait for ready-to-read/ready-to-write events before actually reading/writing
import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write
# To Process
a, b = Pipe()
# From Process
c, d = Pipe()
def relay():
for i in xrange(10):
msg = b.recv()
c.send(msg + " in " + str(i))
def put_msg():
for i in xrange(10):
wait_write(a.fileno())
a.send('hi')
def get_msg():
for i in xrange(10):
wait_read(d.fileno())
print(d.recv())
if __name__ == '__main__':
proc = Process(target=relay)
proc.start()
g1 = gevent.spawn(get_msg)
g2 = gevent.spawn(put_msg)
gevent.joinall([g1, g2], timeout=1)
Actor模型
Actor模型
是由Erlang语言
推广的高级并发模型
一个独立的Actor集合
,这些Actor具有一个收件箱
,从中 可以接收其他Actor的消息
Actor内部的主循环
遍历其消息并根据所需的行为采取措施。
Gevent
没有原始的Actor类型,但是我们可以使用子类Greenlet
内部的Queue
非常简单地定义一个
import gevent
from gevent.queue import Queue
class Actor(gevent.Greenlet):
def __init__(self):
self.inbox = Queue()
Greenlet.__init__(self)
def receive(self, message):
"""
Define in your subclass.
"""
raise NotImplemented()
def _run(self):
self.running = True
while self.running:
message = self.inbox.get()
self.receive(message)
import gevent
from gevent.queue import Queue
from gevent import Greenlet
class Pinger(Actor):
def receive(self, message):
print(message)
pong.inbox.put('ping')
gevent.sleep(0)
class Ponger(Actor):
def receive(self, message):
print(message)
ping.inbox.put('pong')
gevent.sleep(0)
ping = Pinger()
pong = Ponger()
ping.start()
pong.start()
ping.inbox.put('start')
gevent.joinall([ping, pong])
实例运用
Gevent ZeroMQ
Simple Servers
WSGI Servers
Streaming Servers
Long Polling 长轮询
Websockets
Chat Server