gevent概述

» gevent-tutorial

基本内容

gevent是基于libev的并发库。它为各种并发网络相关任务提供了一个干净的API

Greenlets

gevent中使用的主要模式是Greenlet,这是作为C扩展模块提供给Python的轻量协程

Greenlets都在主程序的OS进程内运行,但是是协同安排

Only one greenlet is ever running at any given time.

The primary pattern used in gevent is the Greenlet, a lightweight coroutine provided to Python as a C extension module.
Greenlets all run inside of the OS process for the main program but are scheduled cooperatively

与由multiprocessingthreading提供的并行性构造不同, 它们执行spin processesPOSIX线程(真正由操作系统调度,是真正并行的)

This differs from any of the real parallelism constructs provided by multiprocessing or threading libraries 
which do spin processes and POSIX threads which are scheduled by the operating system and are truly parallel.

同步与异步执行

并发核心思想是可以将一个较大的任务分解为一组子任务,这些子任务计划同时或异步运行,而不是一次或同步运行。

两个子任务之间的切换称为上下文切换

在GEVENT上下文切换是通过yielding得到

实例:

import gevent

def foo():
    print('Running in foo')
    gevent.sleep(0)     # 调用
    print('Explicit context switch to foo again')

def bar():
    print('Explicit context to bar')
    gevent.sleep(0)   # 调用
    print('Implicit context switch back to bar')

gevent.joinall([
    gevent.spawn(foo),
    gevent.spawn(bar),
])

#  gevent.spawn(foo).join()
#  gevent.spawn(bar).join()



# Running in foo
# Explicit context to bar
# Explicit context switch to foo again
# Implicit context switch back to bar

当将gevent 用于可协作调度网络IO绑定功能时,gevent的真正力量就来了。

Gevent可以确保您的网络库在任何可能的情况下都隐式地产生其 greenlet上下文(greenlet contexts )

select() function is normally a blocking call that polls on various file descriptors.

select()函数 轮询各种文件描述符的阻塞调用

例子


import time
import gevent
from gevent import select

start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)

def gr1():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr2():
    # Busy waits for a second, but we don't want to stick around...
    print('Started Polling: %s' % tic())
    select.select([], [], [], 2)
    print('Ended Polling: %s' % tic())

def gr3():
    print("Hey lets do some stuff while the greenlets poll, %s" % tic())
    gevent.sleep(1)

gevent.joinall([
    gevent.spawn(gr1),
    gevent.spawn(gr2),
    gevent.spawn(gr3),
])

Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds

gevent.spawn将函数包装在Greenlet线程

例子:


# 同步 vs 异步

import gevent
import random

def task(pid):
    """
    Some non-deterministic task
    """
    gevent.sleep(random.randint(0,2)*0.001)
    print('Task %s done' % pid)

def synchronous():
    for i in range(1,10):
        task(i)

def asynchronous():
    threads = [gevent.spawn(task, i) for i in xrange(10)]
    # gevent.spawn 将给定功能包装在Greenlet线程中
    # 初始化的greenlets列表存储在数组中threads,该数组传递给gevent.joinall函数
    gevent.joinall(threads)
    # 该函数阻止当前程序运行所有给定的greenlets。
    # 仅当所有greenlet终止时,执行才会向前执行

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()


# Synchronous:
# Task 1 done
# Task 2 done
# Task 3 done
# Task 4 done
# Task 5 done
# Task 6 done
# Task 7 done
# Task 8 done
# Task 9 done
# Asynchronous:
# Task 1 done
# Task 5 done
# Task 6 done
# Task 2 done
# Task 4 done
# Task 7 done
# Task 8 done
# Task 9 done
# Task 0 done
# Task 3 done

网络请求的例子: (服务器异步获取数据)

from gevent import monkey
monkey.patch_all()

import gevent
import requests
import json


def fetch(pid):
    response = requests.get('https://www.python.org/')
    result = response.text[:200]

    print('Process %s: %s' % (pid, result))
    return result

def synchronous():
    for i in range(1,10):
        fetch(i)

def asynchronous():
    threads = []
    for i in range(1,10):
        threads.append(gevent.spawn(fetch, i))
    gevent.joinall(threads)

print('Synchronous:')
synchronous()

print('Asynchronous:')
asynchronous()


greenlets是确定性的 deterministic

给定相同的greenlet配置相同的输入,它们始终会产生相同的输出



import time

def echo(i):
    time.sleep(0.001)
    return i

# Non Deterministic Process Pool

from multiprocessing.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)

# False

# Deterministic Gevent Pool

from gevent.pool import Pool

p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]

print(run1 == run2 == run3 == run4)


# True

并发相关的问题 race condition

即使gevent通常是确定性的,但当开始与套接字文件之类的外部服务进行交互时, 不确定性的来源仍会渗入您的程序中

两个并发线程/进程依赖于某个共享资源但还试图修改该值时,就会发生竞争状态。
这导致资源的值随时间而变,取决于执行顺序

最好的方法是始终避免所有全局状态

Greenlets 的初始化包装 Spawning Greenlets



import gevent
from gevent import Greenlet

def foo(message, n):
    """
    Each thread will be passed the message, and n arguments
    in its initialization.
    """
    gevent.sleep(n)
    print(message)

# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)

# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)

# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)

threads = [thread1, thread2, thread3]

# Block until all threads complete.
gevent.joinall(threads)

类的方式初始化 Greenlet_run方法 (类似thread)


import gevent
from gevent import Greenlet

class MyGreenlet(Greenlet):

    def __init__(self, message, n):
        Greenlet.__init__(self)
        self.message = message
        self.n = n

    def _run(self):
        print(self.message)
        gevent.sleep(self.n)

g = MyGreenlet("Hi there!", 3)
g.start()
g.join()

Greenlet State 状态

started -- Boolean, indicates whether the Greenlet has been started

ready() -- Boolean, indicates whether the Greenlet has halted

successful() -- Boolean, indicates whether the Greenlet has halted and not thrown an exception

value -- arbitrary, the value returned by the Greenlet

exception -- exception, uncaught exception instance thrown inside the greenlet

import gevent

def win():
    return 'You win!'

def fail():
    raise Exception('You fail at failing.')

winner = gevent.spawn(win)
loser = gevent.spawn(fail)

print(winner.started) # True
print(loser.started)  # True

# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
    gevent.joinall([winner, loser])
except Exception as e:
    print('This will never be reached')

print(winner.value) # 'You win!'
print(loser.value)  # None

print(winner.ready()) # True
print(loser.ready())  # True

print(winner.successful()) # True
print(loser.successful())  # False

# The exception raised in fail, will not propagate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.

print(loser.exception)

# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()

程序关闭

主程序上侦听SIGQUIT事件,并gevent.shutdown在退出前调用

import gevent
import signal

def run_forever():
    gevent.sleep(1000)

if __name__ == '__main__':
    gevent.signal(signal.SIGQUIT, gevent.kill)
    thread = gevent.spawn(run_forever)
    thread.join()

超时处理

import gevent
from gevent import Timeout

seconds = 10

timeout = Timeout(seconds)
timeout.start()

def wait():
    gevent.sleep(10)

try:
    gevent.spawn(wait).join()
except Timeout:
    print('Could not complete')
    
    
# 上下文处理
import gevent
from gevent import Timeout

time_to_wait = 5 # seconds

class TooLong(Exception):
    pass

with Timeout(time_to_wait, TooLong):
    gevent.sleep(10)


# (3)

# --
import gevent
from gevent import Timeout

def wait():
    gevent.sleep(2)

timer = Timeout(1).start()
thread1 = gevent.spawn(wait)

try:
    thread1.join(timeout=timer)
except Timeout:
    print('Thread 1 timed out')

# --

timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)

try:
    thread2.get(timeout=timer)
except Timeout:
    print('Thread 2 timed out')

# --

try:
    gevent.with_timeout(1, wait)
except Timeout:
    print('Thread 3 timed out')

` Monkey patching`

把标准库中的thread/socket等给替换掉(grpc不行)

Python的运行时允许在运行时修改大多数对象,包括模块,类甚至函数

在极端情况下,如果一个库需要更改Python本身的基本行为,则可以使用猴子补丁

gevent.monkey.patch_all() ,其作用是把标准库中的thread/socket等给替换掉。这样我们在后面使用socket的时候可以跟平常一样使用,无需修改任何代码,但是它变成非阻塞的了

gevent.monkey.patch_all()

数据结构

Events

事件Events是Greenlets之间异步通信的一种形式


import gevent
from gevent.event import Event

'''
Illustrates the use of events
'''


evt = Event()

def setter():
    '''After 3 seconds, wake all threads waiting on the value of evt'''
    print('A: Hey wait for me, I have to do something')
    gevent.sleep(3)
    print("Ok, I'm done")
    evt.set()


def waiter():
    '''After 3 seconds the get call will unblock'''
    print("I'll wait for you")
    evt.wait()  # blocking
    print("It's about time")

def main():
    gevent.joinall([
        gevent.spawn(setter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter),
        gevent.spawn(waiter)
    ])

if __name__ == '__main__': main()

Event对象扩展AsyncResult,它允许您将值与唤醒调用一起发送。

send a value along with the wakeup call

有时将其称为“将来 future ”“递延 deferred”,它引用了可以在任意时间表上设置的将来值

import gevent
from gevent.event import AsyncResult
a = AsyncResult()

def setter():
    """
    After 3 seconds set the result of a.
    """
    print(f"sleep 3 seconds")
    gevent.sleep(3)
    print("set hello start")
    a.set('Hello!')
    print("setted ")

def waiter():
    """
    After 3 seconds the get call will unblock after the setter
    puts a value into the AsyncResult.
    """
    print("get hello")
    print(a.get())

gevent.joinall([
    gevent.spawn(setter),
    gevent.spawn(waiter),
])

Queues

队列是按顺序排列的数据集,它们具有通常的put / get操作,但以可以在Greenlets安全操作的方式编写

如果一个Greenlet队列中获取了一个项目,则同一项目不会被同时执行的另一个Greenlet获取


import gevent
from gevent.queue import Queue

tasks = Queue()

def worker(n):
    while not tasks.empty():
        task = tasks.get()
        print('Worker %s got task %s' % (n, task))
        gevent.sleep(0)

    print('Quitting time!')

def boss():
    for i in xrange(1,25):
        tasks.put_nowait(i)

gevent.spawn(boss).join()

gevent.joinall([
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'nancy'),
])

每个putget操作都有一个非阻塞的对应方法, put_nowaitget_nowait

它们不会阻塞,但如果无法执行操作,则引发gevent.queue.Emptygevent.queue.Full的异常

设置队列的大小


import gevent
from gevent.queue import Queue, Empty

tasks = Queue(maxsize=3)  # 队列进行了限制,防止其包含三个以上的元素

def worker(name):
    try:
        while True:
            task = tasks.get(timeout=1) # 在该时间范围内找不到任何 task, 队列会抛出gevent.queue.Empty异常, 并且退出
            print('Worker %s got task %s' % (name, task))
            gevent.sleep(0)
    except Empty:
        print('Quitting time!')

def boss():
    """
    Boss will wait to hand out work until a individual worker is
    free since the maxsize of the task queue is 3.
    """

    for i in xrange(1,10):
        tasks.put(i)
    print('Assigned all work in iteration 1')

    for i in xrange(10,20):
        tasks.put(i)
    print('Assigned all work in iteration 2')

gevent.joinall([
    gevent.spawn(boss),
    gevent.spawn(worker, 'steve'),
    gevent.spawn(worker, 'john'),
    gevent.spawn(worker, 'bob'),
])

Groups and Pools

Groups是运行中的Greenlet的集合,这些Greenlet一起作为一组进行管理和调度


(1)

import gevent

def talk(msg):
    for _ in range(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

gevent.joinall(
 [g1, g2, g3]
)



(2) 用group管理 异步任务 

import gevent
from gevent.pool import Group

def talk(msg):
    for _ in range(3):
        print(msg)

g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')

group = Group()

group.add(g1)
group.add(g2)
group.add(g3)

group.join()

Group还提供了API,用于以各种方式将任务分发到分组的greenlet, 并收集结果 result

import gevent
from gevent import getcurrent
from gevent.pool import Group

group = Group()

def hello_from(n):
    print('Size of group %s' % len(group))
    print('Hello from Greenlet %s' % id(getcurrent()))

group.map(hello_from, range(3))


# Size of group 3
# Hello from Greenlet 139670428683336
# Size of group 3
# Hello from Greenlet 139670428683592
# Size of group 3
# Hello from Greenlet 139670428683848



def intensive(n):
    gevent.sleep(3 - n)
    return 'task', n

print('Ordered')

ogroup = Group()
for i in ogroup.imap(intensive, range(3)):
    print(i)



# Ordered
# ('task', 0)
# ('task', 1)
# ('task', 2)


print('Unordered')

igroup = Group()
for i in igroup.imap_unordered(intensive, range(3)):
    print(i)

# Unordered
# ('task', 2)
# ('task', 1)
# ('task', 0)

` Pool 是一种设计用于处理, 限制并发的动态数量greenlet的结构`。

并行执行许多网络或IO绑定任务

import gevent
from gevent.pool import Pool

pool = Pool(2)

def hello_from(n):
    print('Size of pool %s' % len(pool))

pool.map(hello_from, xrange(3))

套接字上轮询的类 的实例


from gevent.pool import Pool

class SocketPool(object):

    def __init__(self):
        self.pool = Pool(1000)
        self.pool.start()      # 启动 pool

    def listen(self, socket):
        while True:
            socket.recv()

    def add_handler(self, socket):
        if self.pool.full():
            raise Exception("At maximum pool size")
        else:
            self.pool.spawn(self.listen, socket)
            # =增加不同的 socket 套接字

    def shutdown(self):
        self.pool.kill()       # 关闭 pool

Locks 锁 and Semaphores 信号量

信号量

信号量 Semaphores是一种更低级别的同步方法协调限制greenlet并发访问或执行

信号量公开两种方法,acquire and release

信号量的界限

获取和释放信号量被获取和释放的次数之间的差称为信号量的界限

The difference between the number of times a semaphore has been acquired and released is called 

the bound of the semaphore

如果信号量绑定达到0,它将阻塞,直到另一个greenlet释放其捕获。


from gevent import sleep
from gevent.pool import Pool
from gevent.lock import BoundedSemaphore

sem = BoundedSemaphore(2)

def worker1(n):
    sem.acquire()
    print('Worker %i acquired semaphore' % n)
    sleep(0)
    sem.release()
    print('Worker %i released semaphore' % n)

def worker2(n):
    with sem:
        print('Worker %i acquired semaphore' % n)
        sleep(0)
    print('Worker %i released semaphore' % n)

pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))

Lock 锁 (信号量绑定范围为1的信号量)

范围为1的信号量称为锁 Lock

它为一个greenlet提供独占执行

通常用于确保在程序上下文中 仅一次使用的资源。

resources are only in use at one time in the context of a program

Thread Locals

Gevent还允许您指定Greenlet上下文本地的数据

在内部,实现为以greenletgetcurrent()值私有名称空间全局查找

import gevent
from gevent.local import local

stash = local()

def f1():
    stash.x = 1
    print(stash.x)

def f2():
    stash.y = 2
    print(stash.y)

    try:
        stash.x
    except AttributeError:
        print("x is not local to f2")

g1 = gevent.spawn(f1) 
g2 = gevent.spawn(f2)

gevent.joinall([g1, g2])

# 1
# 2
# x is not local to f2

许多使用gevent的Web框架HTTP会话对象 session存储在gevent线程本地变量内

例如,使用Werkzeug实用程序库及其代理对象,我们可以创建Flask风格的请求对象


# Flask的系统比本示例要复杂一些,但是使用 线程本地变量  作为  本地会话存储   的想法仍然相同。

from gevent.local import local
from werkzeug.local import LocalProxy
from werkzeug.wrappers import Request
from contextlib import contextmanager

from gevent.wsgi import WSGIServer

_requests = local()
request = LocalProxy(lambda: _requests.request)

@contextmanager
def sessionmanager(environ):
    _requests.request = Request(environ)
    yield
    _requests.request = None

def logic():
    return "Hello " + request.remote_addr

def application(environ, start_response):
    status = '200 OK'

    with sessionmanager(environ):
        body = logic()

    headers = [
        ('Content-Type', 'text/html')
    ]

    start_response(status, headers)
    return [body]

WSGIServer(('', 8000), application).serve_forever()

Subprocess 子流程

gevent 1.0开始,已添加 gevent.subprocess-Python的subprocess模块的修补版本。

支持在子流程上的协同等待


import gevent
from gevent.subprocess import Popen, PIPE

def cron():
    while True:
        print("cron")
        gevent.sleep(0.2)

g = gevent.spawn(cron)
sub = Popen(['sleep 1; uname'], stdout=PIPE, shell=True)
out, err = sub.communicate()
g.kill()
print(out.rstrip())

由于基于multiprocessing.Connection的对象(例如Pipe)公开其底层文件描述符

因此gevent.socket.wait_readwait_write 可用于在实际读取/写入之前, 协同等待就绪/就绪事件

 cooperatively wait for ready-to-read/ready-to-write events before actually reading/writing


import gevent
from multiprocessing import Process, Pipe
from gevent.socket import wait_read, wait_write

# To Process
a, b = Pipe()

# From Process
c, d = Pipe()

def relay():
    for i in xrange(10):
        msg = b.recv()
        c.send(msg + " in " + str(i))

def put_msg():
    for i in xrange(10):
        wait_write(a.fileno())
        a.send('hi')

def get_msg():
    for i in xrange(10):
        wait_read(d.fileno())
        print(d.recv())

if __name__ == '__main__':
    proc = Process(target=relay)
    proc.start()

    g1 = gevent.spawn(get_msg)
    g2 = gevent.spawn(put_msg)
    gevent.joinall([g1, g2], timeout=1)

Actor模型

Actor模型是由Erlang语言推广的高级并发模型

一个独立的Actor集合,这些Actor具有一个收件箱,从中 可以接收其他Actor的消息

Actor内部的主循环 遍历其消息并根据所需的行为采取措施。

Gevent没有原始的Actor类型,但是我们可以使用子类Greenlet内部的Queue非常简单地定义一个


import gevent
from gevent.queue import Queue

class Actor(gevent.Greenlet):

    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)

    def receive(self, message):
        """
        Define in your subclass.
        """
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)
            
            
            
import gevent
from gevent.queue import Queue
from gevent import Greenlet

class Pinger(Actor):
    def receive(self, message):
        print(message)
        pong.inbox.put('ping')
        gevent.sleep(0)

class Ponger(Actor):
    def receive(self, message):
        print(message)
        ping.inbox.put('pong')
        gevent.sleep(0)

ping = Pinger()
pong = Ponger()

ping.start()
pong.start()

ping.inbox.put('start')
gevent.joinall([ping, pong])
            

实例运用

» 更多

Gevent ZeroMQ

Simple Servers

WSGI Servers

Streaming Servers

Long Polling 长轮询

Websockets

Chat Server

Buy me a 肥仔水!