python进阶的小知识
进程之间通讯
Manager 多进程间内存共享
from multiprocessing import Manager, Process
# from multiprocessing import
def add_data(p_dict, key, value):
p_dict[key] = value
import os
print(os.getpid())
print(p_dict)
if __name__ == '__main__':
process_dict = Manager().dict()
from queue import PriorityQueue
first_progress = Process(target=add_data, args=(process_dict, "amyk1", "amyv1"))
second_progress = Process(target=add_data, args=(process_dict, "amyk2", "amyv2"))
first_progress.start()
second_progress.start()
first_progress.join()
second_progress.join()
print(process_dict)
Pipe 用于两个进程
from multiprocessing import Pipe, Process
# from multiprocessing import
def producer(pipe):
pipe.send("amy")
def consumer(pipe):
print(pipe.recv())
if __name__ == '__main__':
receive_pipe, send_pipe = Pipe()
my_producer = Process(target=producer, args=(send_pipe,))
my_consumer = Process(target=consumer, args=(receive_pipe, ))
my_producer.start()
my_consumer.start()
my_consumer.join()
my_consumer.join()
Queue 队列
不能用于进程池,进程池间通信需要使用Manager().Queue()
from multiprocessing import Queue, Process
import time
def producer(queue):
queue.put("amy")
queue.put("amy")
queue.put("amy")
queue.put("amy")
queue.put("amy")
queue.put("amy")
queue.put(None)
def consumer(queue):
for _ in range(10):
data = queue.get()
if data:
print(data)
else:
break
if __name__ == '__main__':
queue = Queue(5)
my_producer = Process(target=producer, args=(queue,))
my_consumer = Process(target=consumer, args=(queue,))
my_producer.start()
my_consumer.start()
my_producer.join()
my_consumer.join()
进程池之间的通讯 Manager().Queue()
from multiprocessing import Manager, Process, Pool
def producer(queue):
queue.put("a")
queue.put("a")
queue.put("a")
queue.put("a")
queue.put(None)
def consumer(queue):
while 1:
data = queue.get()
if not data: break
print(data)
return '1'
def error_callback(error):
print(error)
print('send error email ')
def callback(r):
print(r)
if __name__ == '__main__':
queue = Manager().Queue(10)
pool = Pool(2)
pool.apply_async(producer, args=(queue, ), error_callback=error_callback)
pool.apply_async(consumer, args=(queue, ), callback=callback)
pool.close()
pool.join()