python的并发编程


并发编程, Python有多种长期支持的方法, 包括多线程, 调用子进程, 以及各种各样的关于生成器函数的技巧

1 启动与停止线程

threading库 可以在单独的线程中执行任何的在 Python 中可以调用的对象

可以创建一个 Thread 对象并将你要执行的对象以 target 参数的形式提供给该对象

创建线程

# Code to execute in an independent thread

import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread

from threading import Thread
t = Thread(target=countdown, args=(10,))
# 创建好一个线程对象后,该对象并不会立即执行
t.start()
# 除非你调用它的 start() 方法(当你调用 start() 方法时,它会调用你传递进来的函数,并把你传递进来的参数传递给该函数)

Python中的线程会在一个单独的系统级线程中执行(比如说一个 POSIX 线程或者一个 Windows 线程),这些线程将由操作系统来全权管理

线程一旦启动,将独立执行直到目标函数返回

查看线程对象的状态

if t.is_alive():
    print('Still running')
else:
    print('Completed')

t.join()

将一个线程加入到当前主线程,并等待它终止

当一个线程必须等待另一个线程执行完毕才能执行时,Thread类提供了join方法来完成这个功能

主线程生成并启动了子线程,而子线程里要进行大量的耗时的运算(这里可以借鉴下线程的作用),当主线程处理完其他的事务后,
需要用到子线程的处理结果,这个时候就要用到`join()`方法了

后台线程

Python解释器在所有线程都终止后继续执行代码剩余的部分

对于需要长时间运行的线程或者需要一直运行的后台任务,你应当考虑使用后台线程

t = Thread(target=countdown, args=(10,), daemon=True)
t.start()

后台线程无法等待,不过,这些线程会在主线程终止时自动销毁

结束线程

通过轮询, 手动信号 signal 退出

class CountdownTask:
    def __init__(self):
        self._running = True

    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)

c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join()      # Wait for actual termination (if needed)

处理阻塞的I/O 操作, 增加超时


def terminate(self):
    self._running = False

def run(self, sock):
    # sock is a socket
    sock.settimeout(5)        # Set timeout period
    while self._running:
        # Perform a blocking I/O operation w/ timeout
        try:
            data = sock.recv(8192)
            break
        except socket.timeout:
            continue
        # Continued processing
        ...
    # Terminated
    return

python的 GIL

Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型

Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等)

而不是需要多处理器并行的计算密集型任务。

通用的并发类写法 threading multiprocessing

class CountdownTask:
    def __init__(self, n):
        self._running = True
        self.n = 0

    def terminate(self):
        self._running = False

    def run(self):
        while self._running and self.n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(5)


c = CountdownTask(5)

t = threading.Thread(target=c.run)
t.start()


p = multiprocessing.Process(target=c.run)
p.start()

2 判断线程是否已经启动

线程的一个关键特性是每个线程都是独立运行状态不可预测

如何判断某个线程的状态确定自己下一步的操作,线程的同步问题

Event 对象

threading 库中的 Event 对象

Event 对象包含一个可由线程设置信号标志,它允许线程等待某些事件的发生

在初始情况下,event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。

一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。 
from threading import Thread, Event
import time

# Code to execute in an independent thread
def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    # 设置event对象为真
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create the event object that will be used to signal startup
started_evt = Event()

# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()

# Wait for the thread to start
started_evt.wait()
# 等到event对象为真 才执行打印
print('countdown is running')

event对象的一个重要特点 当它被设置为真时唤醒所有等待它的线程

  • 设置信号 event.set()

    使用Event的set()方法可以设置Event对象内部的信号标志为真。 Event对象提供了isSet()方法来判断其内部信号标志的状态。 当使用event对象的set()方法后,isSet()方法返回真

  • 清除信号 event.clear()

    使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假 当使用Event的clear方法后,isSet()方法返回假

  • 等待 event.wait()

    Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。 当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。 也就是说必须set新号标志位真

Event 对象最好单次使用,就是说,你创建一个 event 对象,让某个线程等待这个对象, 一旦这个对象被设置为真,你就应该丢弃它。尽管可以通过 clear() 方法来重置 event 对象, 但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错过事件、死锁或者其他问题 (特别是,你无法保证重置 event 对象的代码会在线程再次等待这个 event 对象之前执行)。

信号量 Semaphore

event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。

如果你只想唤醒单个线程,最好是使用信号量


import threading

def worker(n, sema):
    # Wait to be signaled
    sema.acquire()

    # Do some work
    print('Working', n)

# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
    t = threading.Thread(target=worker, args=(n, sema,))
    t.start()

for n in range(nworkers):
    sema.release()

所有的线程都在等待获取信号量。每次信号量被释放,只有一个线程会被唤醒并执行

3 线程之间的通信

线程之间安全地交换信息或数据

Queue 队列

创建一个被多个线程共享Queue 对象,这些线程通过使用 put()get() 操作来向队列中添加或者删除元素

Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据


class Service(object):
    def run(self):
        # info_logger = get_logger(log_name, "INFO")
        try:
            self.task_queue = queue.Queue()
            for account_token in account_token_list:
                self.task_queue.put(account_token)
            try:
                working_list = []
                for _ in range(30):
                    working_thread = Working(self.task_queue)
                    working_list.append(working_thread)
                for working_thread in working_list:
                    working_thread.daemon = True
                    working_thread.start()
                for working_thread in working_list:
                    working_thread.join()
            except Exception as e:
                print(e)
        except Exception as e:
            print(e)


class Working(threading.Thread):
    def __init__(self, task_queue):
        threading.Thread.__init__(self)
        self.task_queue = task_queue
    
    def _handle(self, access_token):
    pass
  
    def run(self):
        while True:
            try:
                account_token = self.task_queue.get(False)
                self._handlec(account_token)
            except queue.Empty:
                return
            except Exception as e:
                print(e)
            time.sleep(0.3)


4 给关键部分加锁

多线程程序中的临界区 加锁以避免竞争条件

要在多线程程序中安全使用可变对象,你需要使用 threading 库中的Lock 对象


import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta

5 防止死锁 – 使用上下文管理器

死锁问题很大一部分是由于线程同时获取多个锁造成的

一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死

解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,这个规则使用上下文管理器 是非常容易实现的


import threading
from contextlib import contextmanager

# Thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
    # Sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))
    
    # 对这些锁进行了排序。通过排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取
    # Make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')

    # Acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


# 使用

x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    while True:
        with acquire(x_lock, y_lock):
            print('Thread-1')

def thread_2():
    while True:
        with acquire(y_lock, x_lock):
            print('Thread-2')

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

# 即使在不同的函数中以不同的顺序获取锁也没有发生死锁

# 

经典的哲学家就餐问题--死锁

import threading

# The philosopher thread
def philosopher(left, right):
    while True:
        with acquire(left,right):
             print(threading.currentThread(), 'eating')

# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]

# Create all of the philosophers
for n in range(NSTICKS):
    t = threading.Thread(target=philosopher,
                         args=(chopsticks[n],chopsticks[(n+1) % NSTICKS]))
    t.start()

6 创建一个线程池

concurrent.futures 函数库有一个 ThreadPoolExecutor 类可以被用来完成这个任务

当然 使用 Queue 也可以手动实现

使用 ThreadPoolExecutor 相对于手动实现的一个好处在于它使得 任务提交者更方便的从被调用函数中获取返回值

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()
# a.result() 操作会阻塞进程直到对应的函数执行完成并返回一个结果。

特别的,a.result() 操作阻塞进程直到对应的函数执行完成并返回一个结果

7 简化的并行编程

1 CPU密集型工作

concurrent.futures 库提供了一个 ProcessPoolExecutor 类, 可被用来在一个单独的Python解释器中执行计算密集型任务

实例:

Apache web服务器日志目录的gzip压缩包:

logs/
   20120701.log.gz
   20120702.log.gz
   20120703.log.gz
   20120704.log.gz
   20120705.log.gz
   20120706.log.gz
   ...

假设每个日志文件内容类似下面这样:

124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...

需求在这些日志文件中查找出所有访问过robots.txt文件主机

1 map-reduce风格来编写

import gzip
import io
import glob

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file
    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    for robots in map(find_robots, files):
        all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

2 使用 multiprocess


import gzip
import io
import glob
from concurrent import futures

def find_robots(filename):
    '''
    Find all of the hosts that access robots.txt in a single log file

    '''
    robots = set()
    with gzip.open(filename) as f:
        for line in io.TextIOWrapper(f,encoding='ascii'):
            fields = line.split()
            if fields[6] == '/robots.txt':
                robots.add(fields[0])
    return robots

def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+'/*.log.gz')
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        # 获取result
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots

if __name__ == '__main__':
    robots = find_all_robots('logs')
    for ipaddr in robots:
        print(ipaddr)

运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。

实际的性能优化效果根据你的机器CPU数量的不同而不同

pool.submit() 手动添加单个任务

手动提交一个任务,结果是一个 Future 实例

result() 方法阻塞进程直到结果被返回来, 获取结果


def work(x):
    ...
    return result

with ProcessPoolExecutor() as pool:
    ...
    # Example of submitting work to the pool
    future_result = pool.submit(work, arg)

    # Obtaining the result (blocks until done)
    r = future_result.result()

add_done_callback(fn) 不阻塞, 回调函数

回调函数接受一个 Future 实例,被用来获取最终的结果(比如通过调用它的result()方法)


def when_done(r):
    print('Got:', r.result())

with ProcessPoolExecutor() as pool:
     future_result = pool.submit(work, arg)
     future_result.add_done_callback(when_done)
     

处理池很容易使用,在设计大程序的时候还是有很多需要注意的地方

这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。

被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。

函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化

被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情

8 使用生成器代替线程

使用生成器(协程)替代系统线程来实现并发

这个有时又被称为用户级线程绿色线程

生成器函数yield 语句

yield 语句会让一个生成器挂起它的执行,这样就可以编写一个调度器, 将生成器当做某种“任务”并使用任务协作切换来替换它们的执行


# Two simple generator functions
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')

def countup(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1


from collections import deque

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()

    def new_task(self, task):
        '''
        Admit a newly started task to the scheduler

        '''
        self._task_queue.append(task)

    def run(self):
        '''
        Run until there are no more tasks
        '''
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # Run until the next yield statement
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                # Generator is no longer executing
                pass

# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()



结果

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...

生成器函数就是任务,而yield语句任务挂起的信号

使用生成器来实现一个不依赖线程的actor


from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = { }          # Mapping of names to actors
        self._msg_queue = deque()   # Message queue

    def new_actor(self, name, actor):
        '''
        Admit a newly started actor to the scheduler and give it a name
        '''
        self._msg_queue.append((actor,None))
        self._actors[name] = actor

    def send(self, name, msg):
        '''
        Send a message to a named actor
        '''
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor,msg))

    def run(self):
        '''
        Run as long as there are pending messages.
        '''
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            try:
                 actor.send(msg)
            except StopIteration:
                 pass

# Example use
if __name__ == '__main__':
    def printer():
        while True:
            msg = yield
            print('Got:', msg)

    def counter(sched):
        while True:
            # Receive the current count
            n = yield
            if n == 0:
                break
            # Send to the printer task
            sched.send('printer', n)
            # Send the next count to the counter task (recursive)

            sched.send('counter', n-1)

    sched = ActorScheduler()
    # Create the initial actors
    sched.new_actor('printer', printer())
    sched.new_actor('counter', counter(sched))

    # Send an initial message to the counter to initiate
    sched.send('counter', 10000)
    sched.run()

使用生成器来实现一个并发网络应用程序


from select import select

# This class represents a generic yield event in the scheduler
class YieldEvent:
    def handle_yield(self, sched, task):
        pass
    def handle_resume(self, sched, task):
        pass

# Task Scheduler
class Scheduler:
    def __init__(self):
        self._numtasks = 0       # Total num of tasks
        self._ready = deque()    # Tasks ready to run
        self._read_waiting = {}  # Tasks waiting to read
        self._write_waiting = {} # Tasks waiting to write

    # Poll for I/O events and restart waiting tasks
    def _iopoll(self):
        rset,wset,eset = select(self._read_waiting,
                                self._write_waiting,[])
        for r in rset:
            evt, task = self._read_waiting.pop(r)
            evt.handle_resume(self, task)
        for w in wset:
            evt, task = self._write_waiting.pop(w)
            evt.handle_resume(self, task)

    def new(self,task):
        '''
        Add a newly started task to the scheduler
        '''

        self._ready.append((task, None))
        self._numtasks += 1

    def add_ready(self, task, msg=None):
        '''
        Append an already started task to the ready queue.
        msg is what to send into the task when it resumes.
        '''
        self._ready.append((task, msg))

    # Add a task to the reading set
    def _read_wait(self, fileno, evt, task):
        self._read_waiting[fileno] = (evt, task)

    # Add a task to the write set
    def _write_wait(self, fileno, evt, task):
        self._write_waiting[fileno] = (evt, task)

    def run(self):
        '''
        Run the task scheduler until there are no tasks
        '''
        while self._numtasks:
             if not self._ready:
                  self._iopoll()
             task, msg = self._ready.popleft()
             try:
                 # Run the coroutine to the next yield
                 r = task.send(msg)
                 if isinstance(r, YieldEvent):
                     r.handle_yield(self, task)
                 else:
                     raise RuntimeError('unrecognized yield event')
             except StopIteration:
                 self._numtasks -= 1

# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
    def __init__(self, sock, nbytes):
        self.sock = sock
        self.nbytes = nbytes
    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        data = self.sock.recv(self.nbytes)
        sched.add_ready(task, data)

class WriteSocket(YieldEvent):
    def __init__(self, sock, data):
        self.sock = sock
        self.data = data
    def handle_yield(self, sched, task):

        sched._write_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        nsent = self.sock.send(self.data)
        sched.add_ready(task, nsent)

class AcceptSocket(YieldEvent):
    def __init__(self, sock):
        self.sock = sock
    def handle_yield(self, sched, task):
        sched._read_wait(self.sock.fileno(), self, task)
    def handle_resume(self, sched, task):
        r = self.sock.accept()
        sched.add_ready(task, r)

# Wrapper around a socket object for use with yield
class Socket(object):
    def __init__(self, sock):
        self._sock = sock
    def recv(self, maxbytes):
        return ReadSocket(self._sock, maxbytes)
    def send(self, data):
        return WriteSocket(self._sock, data)
    def accept(self):
        return AcceptSocket(self._sock)
    def __getattr__(self, name):
        return getattr(self._sock, name)

if __name__ == '__main__':
    from socket import socket, AF_INET, SOCK_STREAM
    import time

    # Example of a function involving generators.  This should
    # be called using line = yield from readline(sock)
    def readline(sock):
        chars = []
        while True:
            c = yield sock.recv(1)
            if not c:
                break
            chars.append(c)
            if c == b'\n':
                break
        return b''.join(chars)

    # Echo server using generators
    class EchoServer:
        def __init__(self,addr,sched):
            self.sched = sched
            sched.new(self.server_loop(addr))

        def server_loop(self,addr):
            s = Socket(socket(AF_INET,SOCK_STREAM))

            s.bind(addr)
            s.listen(5)
            while True:
                c,a = yield s.accept()
                print('Got connection from ', a)
                self.sched.new(self.client_handler(Socket(c)))

        def client_handler(self,client):
            while True:
                line = yield from readline(client)
                if not line:
                    break
                line = b'GOT:' + line
                while line:
                    nsent = yield client.send(line)
                    line = line[nsent:]
            client.close()
            print('Client closed')

    sched = Scheduler()
    EchoServer(('',16000),sched)
    sched.run()

有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。 
还有很多调度器负责在就绪队列和I/O等待区域之间移动任务
Buy me a 肥仔水!