python进阶的小知识


python进阶的小知识


进程之间通讯

Manager 多进程间内存共享


from multiprocessing import Manager, Process
# from multiprocessing import

def add_data(p_dict, key, value):
    p_dict[key] = value
    import os
    print(os.getpid())
    print(p_dict)


if __name__ == '__main__':
    process_dict = Manager().dict()
    
    from queue import PriorityQueue
    
    first_progress = Process(target=add_data, args=(process_dict, "amyk1", "amyv1"))
    second_progress = Process(target=add_data, args=(process_dict, "amyk2", "amyv2"))

    first_progress.start()
    second_progress.start()
    first_progress.join()
    second_progress.join()
    print(process_dict)

Pipe 用于两个进程


from multiprocessing import Pipe, Process
# from multiprocessing import

def producer(pipe):
    pipe.send("amy")

def consumer(pipe):
    print(pipe.recv())



if __name__ == '__main__':
    receive_pipe, send_pipe = Pipe()

    my_producer = Process(target=producer, args=(send_pipe,))
    my_consumer = Process(target=consumer, args=(receive_pipe, ))

    my_producer.start()
    my_consumer.start()
    my_consumer.join()
    my_consumer.join()

Queue 队列 不能用于进程池,进程池间通信需要使用Manager().Queue()

from multiprocessing import Queue, Process
import time

def producer(queue):
    queue.put("amy")
    queue.put("amy")
    queue.put("amy")
    queue.put("amy")
    queue.put("amy")
    queue.put("amy")
    queue.put(None)


def consumer(queue):
    for _ in range(10):
        data = queue.get()
        if data:
            print(data)
        else:
            break


if __name__ == '__main__':
    queue = Queue(5)
    my_producer = Process(target=producer, args=(queue,))
    my_consumer = Process(target=consumer, args=(queue,))
    my_producer.start()
    my_consumer.start()
    my_producer.join()
    my_consumer.join()

进程池之间的通讯 Manager().Queue()


from multiprocessing import Manager, Process, Pool

def producer(queue):
    queue.put("a")
    queue.put("a")
    queue.put("a")
    queue.put("a")
    queue.put(None)

def consumer(queue):
    while 1:
        data = queue.get()
        if not data: break
        print(data)
    return '1'


def error_callback(error):
    print(error)
    print('send error email ')


def callback(r):
    print(r)

if __name__ == '__main__':
    queue = Manager().Queue(10)

    pool = Pool(2)
    pool.apply_async(producer, args=(queue, ), error_callback=error_callback)
    pool.apply_async(consumer, args=(queue, ), callback=callback)

    pool.close()
    pool.join()


Buy me a 肥仔水!