并发编程
, Python
有多种长期支持的方法, 包括多线程
, 调用子进程
, 以及各种各样的关于生成器函数
的技巧
1 启动与停止线程
threading
库 可以在单独的线程中执行任何的在 Python 中可以调用的对象
可以创建一个 Thread 对象
并将你要执行的对象以 target 参数
的形式提供给该对象
创建线程
# Code to execute in an independent thread
import time
def countdown(n):
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
# 创建好一个线程对象后,该对象并不会立即执行
t.start()
# 除非你调用它的 start() 方法(当你调用 start() 方法时,它会调用你传递进来的函数,并把你传递进来的参数传递给该函数)
Python中的线程
会在一个单独的系统级线程
中执行(比如说一个 POSIX 线程或者一个 Windows 线程),这些线程将由操作系统来全权管理
线程一旦启动,将独立执行直到目标函数返回
查看
线程对象的状态
if t.is_alive():
print('Still running')
else:
print('Completed')
t.join()
将一个线程加入到当前主线程,并等待它终止
当一个线程必须等待另一个线程执行完毕才能执行时,Thread类提供了join方法
来完成这个功能
主线程生成并启动了子线程,而子线程里要进行大量的耗时的运算(这里可以借鉴下线程的作用),当主线程处理完其他的事务后,
需要用到子线程的处理结果,这个时候就要用到`join()`方法了
后台线程
Python解释器在所有线程都终止后
才继续执行代码剩余的部分
对于需要长时间运行的线程或者需要一直运行的后台任务,你应当考虑使用后台线程
t = Thread(target=countdown, args=(10,), daemon=True)
t.start()
后台线程无法等待,不过,这些线程会在主线程终止时自动销毁
结束线程
通过轮询, 手动信号 signal 退出
class CountdownTask:
def __init__(self):
self._running = True
def terminate(self):
self._running = False
def run(self, n):
while self._running and n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()
c.terminate() # Signal termination
t.join() # Wait for actual termination (if needed)
处理阻塞的
I/O 操作
, 增加超时
def terminate(self):
self._running = False
def run(self, sock):
# sock is a socket
sock.settimeout(5) # Set timeout period
while self._running:
# Perform a blocking I/O operation w/ timeout
try:
data = sock.recv(8192)
break
except socket.timeout:
continue
# Continued processing
...
# Terminated
return
python的
GIL
Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型
Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等)
而不是需要多处理器并行的计算密集型任务。
通用的并发类写法
threading
multiprocessing
class CountdownTask:
def __init__(self, n):
self._running = True
self.n = 0
def terminate(self):
self._running = False
def run(self):
while self._running and self.n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
c = CountdownTask(5)
t = threading.Thread(target=c.run)
t.start()
p = multiprocessing.Process(target=c.run)
p.start()
2 判断线程是否已经启动
线程的一个关键特性是
每个线程都是独立运行
且状态不可预测
如何判断某个线程的状态
来确定自己下一步的操作
,线程的同步问题
Event 对象
threading
库中的Event 对象
Event 对象
包含一个可由线程设置
的信号标志
,它允许线程等待某些事件的发生
。
在初始情况下,event 对象中的信号标志被设置为假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。
一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待这个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。
from threading import Thread, Event
import time
# Code to execute in an independent thread
def countdown(n, started_evt):
print('countdown starting')
started_evt.set()
# 设置event对象为真
while n > 0:
print('T-minus', n)
n -= 1
time.sleep(5)
# Create the event object that will be used to signal startup
started_evt = Event()
# Launch the thread and pass the startup event
print('Launching countdown')
t = Thread(target=countdown, args=(10,started_evt))
t.start()
# Wait for the thread to start
started_evt.wait()
# 等到event对象为真 才执行打印
print('countdown is running')
event对象的一个重要特点 当它被设置为真时
会唤醒所有等待它的线程
。
-
设置信号
event.set()
使用Event的set()方法可以设置Event对象内部的信号标志为真。 Event对象提供了isSet()方法来判断其内部信号标志的状态。 当使用event对象的set()方法后,isSet()方法返回真
-
清除信号
event.clear()
使用Event对象的clear()方法可以清除Event对象内部的信号标志,即将其设为假 当使用Event的clear方法后,isSet()方法返回假
-
等待
event.wait()
Event对象wait的方法只有在内部信号为真的时候才会很快的执行并完成返回。 当Event对象的内部信号标志位假时,则wait方法一直等待到其为真时才返回。 也就是说必须set新号标志位真
Event 对象最好单次使用
,就是说,你创建一个 event 对象,让某个线程等待这个对象,
一旦这个对象被设置为真,你就应该丢弃它。尽管可以通过 clear()
方法来重置 event 对象
,
但是很难确保安全地清理 event 对象并对它重新赋值。很可能会发生错过事件、死锁或者其他问题
(特别是,你无法保证重置 event 对象的代码会在线程再次等待这个 event 对象之前执行)。
信号量
Semaphore
event对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程。
如果你只想唤醒单个线程
,最好是使用信号量
import threading
def worker(n, sema):
# Wait to be signaled
sema.acquire()
# Do some work
print('Working', n)
# Create some threads
sema = threading.Semaphore(0)
nworkers = 10
for n in range(nworkers):
t = threading.Thread(target=worker, args=(n, sema,))
t.start()
for n in range(nworkers):
sema.release()
所有的线程都在等待获取信号量。每次信号量被释放,只有一个线程会被唤醒并执行
3 线程之间的通信
线程之间安全地交换信息或数据
Queue 队列
创建一个被多个线程共享
的 Queue 对象
,这些线程通过使用 put()
和 get()
操作来向队列中添加或者删除元素
Queue 对象
已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据
class Service(object):
def run(self):
# info_logger = get_logger(log_name, "INFO")
try:
self.task_queue = queue.Queue()
for account_token in account_token_list:
self.task_queue.put(account_token)
try:
working_list = []
for _ in range(30):
working_thread = Working(self.task_queue)
working_list.append(working_thread)
for working_thread in working_list:
working_thread.daemon = True
working_thread.start()
for working_thread in working_list:
working_thread.join()
except Exception as e:
print(e)
except Exception as e:
print(e)
class Working(threading.Thread):
def __init__(self, task_queue):
threading.Thread.__init__(self)
self.task_queue = task_queue
def _handle(self, access_token):
pass
def run(self):
while True:
try:
account_token = self.task_queue.get(False)
self._handlec(account_token)
except queue.Empty:
return
except Exception as e:
print(e)
time.sleep(0.3)
4 给关键部分加锁
对多线程程序中的临界区
加锁以避免竞争条件
要在多线程程序中安全使用可变对象,你需要使用 threading 库中的Lock 对象
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
def __init__(self, initial_value = 0):
self._value = initial_value
self._value_lock = threading.Lock()
def incr(self,delta=1):
'''
Increment the counter with locking
'''
with self._value_lock:
self._value += delta
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with self._value_lock:
self._value -= delta
5 防止死锁 – 使用上下文管理器
死锁问题很大一部分是由于线程同时获取多个锁
造成的
一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死
解决死锁
问题的一种方案是为程序中的每一个锁分配一个唯一的id
,然后只允许按照升序规则来使用多个锁
,这个规则使用上下文管理器
是非常容易实现的
import threading
from contextlib import contextmanager
# Thread-local state to stored information on locks already acquired
_local = threading.local()
@contextmanager
def acquire(*locks):
# Sort locks by object identifier
locks = sorted(locks, key=lambda x: id(x))
# 对这些锁进行了排序。通过排序,使得不管用户以什么样的顺序来请求锁,这些锁都会按照固定的顺序被获取
# Make sure lock order of previously acquired locks is not violated
acquired = getattr(_local,'acquired',[])
if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
raise RuntimeError('Lock Order Violation')
# Acquire all of the locks
acquired.extend(locks)
_local.acquired = acquired
try:
for lock in locks:
lock.acquire()
yield
finally:
# Release locks in reverse order of acquisition
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]
# 使用
x_lock = threading.Lock()
y_lock = threading.Lock()
def thread_1():
while True:
with acquire(x_lock, y_lock):
print('Thread-1')
def thread_2():
while True:
with acquire(y_lock, x_lock):
print('Thread-2')
t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()
# 即使在不同的函数中以不同的顺序获取锁也没有发生死锁
#
经典的哲学家就餐问题--死锁
import threading
# The philosopher thread
def philosopher(left, right):
while True:
with acquire(left,right):
print(threading.currentThread(), 'eating')
# The chopsticks (represented by locks)
NSTICKS = 5
chopsticks = [threading.Lock() for n in range(NSTICKS)]
# Create all of the philosophers
for n in range(NSTICKS):
t = threading.Thread(target=philosopher,
args=(chopsticks[n],chopsticks[(n+1) % NSTICKS]))
t.start()
6 创建一个线程池
concurrent.futures
函数库有一个 ThreadPoolExecutor 类
可以被用来完成这个任务
当然 使用 Queue
也可以手动实现
使用 ThreadPoolExecutor
相对于手动实现的一个好处
在于它使得 任务提交者更方便的从被调用函数中获取返回值
from concurrent.futures import ThreadPoolExecutor
import urllib.request
def fetch_url(url):
u = urllib.request.urlopen(url)
data = u.read()
return data
pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')
# Get the results back
x = a.result()
y = b.result()
# a.result() 操作会阻塞进程直到对应的函数执行完成并返回一个结果。
特别的,a.result() 操作
会阻塞进程直到对应的函数执行完成并返回一个结果
。
7 简化的并行编程
1 CPU密集型工作
concurrent.futures
库提供了一个 ProcessPoolExecutor 类
, 可被用来在一个单独的Python解释器中执行计算密集型任务
实例:
Apache web服务器日志目录的gzip压缩包:
logs/
20120701.log.gz
20120702.log.gz
20120703.log.gz
20120704.log.gz
20120705.log.gz
20120706.log.gz
...
假设每个日志文件内容类似下面这样:
124.115.6.12 - - [10/Jul/2012:00:18:50 -0500] "GET /robots.txt ..." 200 71
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /ply/ ..." 200 11875
210.212.209.67 - - [10/Jul/2012:00:18:51 -0500] "GET /favicon.ico ..." 404 369
61.135.216.105 - - [10/Jul/2012:00:20:04 -0500] "GET /blog/atom.xml ..." 304 -
...
需求在这些日志文件中查找出所有访问过robots.txt文件
的主机
:
1 map-reduce
风格来编写
import gzip
import io
import glob
def find_robots(filename):
'''
Find all of the hosts that access robots.txt in a single log file
'''
robots = set()
with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):
fields = line.split()
if fields[6] == '/robots.txt':
robots.add(fields[0])
return robots
def find_all_robots(logdir):
'''
Find all hosts across and entire sequence of files
'''
files = glob.glob(logdir+'/*.log.gz')
all_robots = set()
for robots in map(find_robots, files):
all_robots.update(robots)
return all_robots
if __name__ == '__main__':
robots = find_all_robots('logs')
for ipaddr in robots:
print(ipaddr)
2 使用 multiprocess
import gzip
import io
import glob
from concurrent import futures
def find_robots(filename):
'''
Find all of the hosts that access robots.txt in a single log file
'''
robots = set()
with gzip.open(filename) as f:
for line in io.TextIOWrapper(f,encoding='ascii'):
fields = line.split()
if fields[6] == '/robots.txt':
robots.add(fields[0])
return robots
def find_all_robots(logdir):
'''
Find all hosts across and entire sequence of files
'''
files = glob.glob(logdir+'/*.log.gz')
all_robots = set()
with futures.ProcessPoolExecutor() as pool:
# 获取result
for robots in pool.map(find_robots, files):
all_robots.update(robots)
return all_robots
if __name__ == '__main__':
robots = find_all_robots('logs')
for ipaddr in robots:
print(ipaddr)
运行这个脚本产生同样的结果,但是在四核机器上面比之前快了3.5倍。
实际的性能优化效果根据你的机器CPU数量的不同而不同
pool.submit()
手动添加单个任务
手动提交一个任务,结果是一个 Future 实例
result() 方法
会阻塞进程
直到结果被返回来, 获取结果
def work(x):
...
return result
with ProcessPoolExecutor() as pool:
...
# Example of submitting work to the pool
future_result = pool.submit(work, arg)
# Obtaining the result (blocks until done)
r = future_result.result()
add_done_callback(fn)
不阻塞, 回调函数
回调函数接受一个 Future 实例
,被用来获取最终的结果(比如通过调用它的result()方法)
def when_done(r):
print('Got:', r.result())
with ProcessPoolExecutor() as pool:
future_result = pool.submit(work, arg)
future_result.add_done_callback(when_done)
处理池
很容易使用,在设计大程序的时候还是有很多需要注意的地方
这种并行处理技术只适用于那些可以被分解为互相独立部分的问题。
被提交的任务必须是简单函数形式。对于方法、闭包和其他类型的并行执行还不支持。
函数参数和返回值必须兼容pickle,因为要使用到进程间的通信,所有解释器之间的交换数据必须被序列化
被提交的任务函数不应保留状态或有副作用。除了打印日志之类简单的事情
8 使用生成器代替线程
使用生成器(协程)
替代系统线程来实现并发
这个有时又被称为用户级线程
或绿色线程
生成器函数
和yield 语句
yield 语句
会让一个生成器挂起它的执行
,这样就可以编写一个调度器
,
将生成器当做某种“任务”
并使用任务协作切换
来替换它们的执行
# Two simple generator functions
def countdown(n):
while n > 0:
print('T-minus', n)
yield
n -= 1
print('Blastoff!')
def countup(n):
x = 0
while x < n:
print('Counting up', x)
yield
x += 1
from collections import deque
class TaskScheduler:
def __init__(self):
self._task_queue = deque()
def new_task(self, task):
'''
Admit a newly started task to the scheduler
'''
self._task_queue.append(task)
def run(self):
'''
Run until there are no more tasks
'''
while self._task_queue:
task = self._task_queue.popleft()
try:
# Run until the next yield statement
next(task)
self._task_queue.append(task)
except StopIteration:
# Generator is no longer executing
pass
# Example use
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()
结果
T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
...
生成器函数
就是任务
,而yield语句
是任务挂起的信号
使用生成器来实现一个不依赖线程的actor
from collections import deque
class ActorScheduler:
def __init__(self):
self._actors = { } # Mapping of names to actors
self._msg_queue = deque() # Message queue
def new_actor(self, name, actor):
'''
Admit a newly started actor to the scheduler and give it a name
'''
self._msg_queue.append((actor,None))
self._actors[name] = actor
def send(self, name, msg):
'''
Send a message to a named actor
'''
actor = self._actors.get(name)
if actor:
self._msg_queue.append((actor,msg))
def run(self):
'''
Run as long as there are pending messages.
'''
while self._msg_queue:
actor, msg = self._msg_queue.popleft()
try:
actor.send(msg)
except StopIteration:
pass
# Example use
if __name__ == '__main__':
def printer():
while True:
msg = yield
print('Got:', msg)
def counter(sched):
while True:
# Receive the current count
n = yield
if n == 0:
break
# Send to the printer task
sched.send('printer', n)
# Send the next count to the counter task (recursive)
sched.send('counter', n-1)
sched = ActorScheduler()
# Create the initial actors
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))
# Send an initial message to the counter to initiate
sched.send('counter', 10000)
sched.run()
使用生成器来实现一个并发网络应用程序
from select import select
# This class represents a generic yield event in the scheduler
class YieldEvent:
def handle_yield(self, sched, task):
pass
def handle_resume(self, sched, task):
pass
# Task Scheduler
class Scheduler:
def __init__(self):
self._numtasks = 0 # Total num of tasks
self._ready = deque() # Tasks ready to run
self._read_waiting = {} # Tasks waiting to read
self._write_waiting = {} # Tasks waiting to write
# Poll for I/O events and restart waiting tasks
def _iopoll(self):
rset,wset,eset = select(self._read_waiting,
self._write_waiting,[])
for r in rset:
evt, task = self._read_waiting.pop(r)
evt.handle_resume(self, task)
for w in wset:
evt, task = self._write_waiting.pop(w)
evt.handle_resume(self, task)
def new(self,task):
'''
Add a newly started task to the scheduler
'''
self._ready.append((task, None))
self._numtasks += 1
def add_ready(self, task, msg=None):
'''
Append an already started task to the ready queue.
msg is what to send into the task when it resumes.
'''
self._ready.append((task, msg))
# Add a task to the reading set
def _read_wait(self, fileno, evt, task):
self._read_waiting[fileno] = (evt, task)
# Add a task to the write set
def _write_wait(self, fileno, evt, task):
self._write_waiting[fileno] = (evt, task)
def run(self):
'''
Run the task scheduler until there are no tasks
'''
while self._numtasks:
if not self._ready:
self._iopoll()
task, msg = self._ready.popleft()
try:
# Run the coroutine to the next yield
r = task.send(msg)
if isinstance(r, YieldEvent):
r.handle_yield(self, task)
else:
raise RuntimeError('unrecognized yield event')
except StopIteration:
self._numtasks -= 1
# Example implementation of coroutine-based socket I/O
class ReadSocket(YieldEvent):
def __init__(self, sock, nbytes):
self.sock = sock
self.nbytes = nbytes
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
data = self.sock.recv(self.nbytes)
sched.add_ready(task, data)
class WriteSocket(YieldEvent):
def __init__(self, sock, data):
self.sock = sock
self.data = data
def handle_yield(self, sched, task):
sched._write_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
nsent = self.sock.send(self.data)
sched.add_ready(task, nsent)
class AcceptSocket(YieldEvent):
def __init__(self, sock):
self.sock = sock
def handle_yield(self, sched, task):
sched._read_wait(self.sock.fileno(), self, task)
def handle_resume(self, sched, task):
r = self.sock.accept()
sched.add_ready(task, r)
# Wrapper around a socket object for use with yield
class Socket(object):
def __init__(self, sock):
self._sock = sock
def recv(self, maxbytes):
return ReadSocket(self._sock, maxbytes)
def send(self, data):
return WriteSocket(self._sock, data)
def accept(self):
return AcceptSocket(self._sock)
def __getattr__(self, name):
return getattr(self._sock, name)
if __name__ == '__main__':
from socket import socket, AF_INET, SOCK_STREAM
import time
# Example of a function involving generators. This should
# be called using line = yield from readline(sock)
def readline(sock):
chars = []
while True:
c = yield sock.recv(1)
if not c:
break
chars.append(c)
if c == b'\n':
break
return b''.join(chars)
# Echo server using generators
class EchoServer:
def __init__(self,addr,sched):
self.sched = sched
sched.new(self.server_loop(addr))
def server_loop(self,addr):
s = Socket(socket(AF_INET,SOCK_STREAM))
s.bind(addr)
s.listen(5)
while True:
c,a = yield s.accept()
print('Got connection from ', a)
self.sched.new(self.client_handler(Socket(c)))
def client_handler(self,client):
while True:
line = yield from readline(client)
if not line:
break
line = b'GOT:' + line
while line:
nsent = yield client.send(line)
line = line[nsent:]
client.close()
print('Client closed')
sched = Scheduler()
EchoServer(('',16000),sched)
sched.run()
有一个就绪的任务队列,并且还有因I/O休眠的任务等待区域。
还有很多调度器负责在就绪队列和I/O等待区域之间移动任务