Python中可以使用多进程和多线程来实现并行处理。多进程和多线程的组合可能会增加复杂性,需要小心处理线程之间的同步和资源共享。确保开发的应用程序设计适合多进程和多线程的并行性需求。本文主要介绍Python 中多进程中使用多线程示例。

1、多线程和多进程

参考文档:

Python 异步编程 多线程

Python 异步编程 多进程

2、ProcessPoolExecutor 和 Process

ProcessPoolExecutorconcurrent.futures 模块中的一部分,提供了高级接口,用于创建和管理进程池,自动处理任务分发和结果收集。Process 是 Python 的 multiprocessing 模块中的一部分,它提供了更低级的接口,允许手动创建和管理单个进程,以及控制进程之间的通信和同步。ProcessPoolExecutor 更适合于 CPU 密集型任务,因为它能够在多个 CPU 核心上并行执行任务。Process 更适合于 I/O 密集型任务或需要更复杂进程间通信的情况。

3、多进程中使用多线程

1)Process 和 ThreadPoolExecutor

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from queue import Queue
from time import sleep


class Handler:
    def __init__(self):
        self.queue = Queue()  # this object is essential

    def put(self, item):
        self.queue.put(item)

    def run(self):
        while True:
            item = self.queue.get()
            if item == 'exit':
                break
            # do other things on the item ...
            print(item)
            sleep(1)


class Runner:
    def __init__(self, name):
        self.name = name
        self.a = Handler()
        self.b = Handler()

    def start(self):
        for _ in range(3):
            self.a.put(f'{self.name}: hello a')
            self.b.put(f'{self.name}: hello b')
        # 请求正常关闭
        self.a.put('exit')
        self.b.put('exit')
        with ThreadPoolExecutor() as exe:
            futures = [exe.submit(r.run) for r in [self.a, self.b]]
            for f in futures:
                f.result()


# 要求一切都可以被 Pickle 序列化
def run_in_process_pool():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ProcessPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


# 不会对任何内容进行 Pickle 序列化
def run_in_processes():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    procs = [Process(target=r.start) for r in [rA, rB, rC]]
    for p in procs:
        p.start()
    for p in procs:
        p.join()


if __name__ == '__main__':
    # run_in_process_pool()  # `TypeError: cannot pickle '_thread.lock' object`
    run_in_processes()  # 正常运行

2)ThreadPoolExecutor 和 ProcessPoolExecutor

import concurrent.futures

# 定义一个简单的函数,用于演示任务执行
def worker(task_id):
    print(f"Task {task_id} started")
    # 在这里执行任务的逻辑
    print(f"Task {task_id} completed")

def main():
    # 创建一个 ThreadPoolExecutor,指定线程数量
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as thread_executor:
        # 创建一个 ProcessPoolExecutor,指定进程数量
        with concurrent.futures.ProcessPoolExecutor(max_workers=2) as process_executor:
            # 提交多个任务给线程池执行
            thread_task_ids = [1, 2]
            thread_futures = [thread_executor.submit(worker, task_id) for task_id in thread_task_ids]

            # 提交多个任务给进程池执行
            process_task_ids = [3, 4]
            process_futures = [process_executor.submit(worker, task_id) for task_id in process_task_ids]

            # 等待所有任务完成
            concurrent.futures.wait(thread_futures + process_futures)

if __name__ == "__main__":
    main()

推荐文档