多线程执行任务

在 Python 中,通过多线程方式执行耗时任务。

import threading
import queue

import random
import time

'''
loguru 是第三方库
使用前,需要安装 loguru
!pip instsall loguru
'''
from loguru import logger

cost = time.time()

# 创建一个队列来存储任务
task_queue = queue.Queue()

# 模拟一个任务的执行
def simulate_task():
    delay = random.randint(0, 99)/10  # 模拟任务延迟
    logger.debug('任务开始 > ', end='')
    time.sleep(delay)  # 模拟任务耗时
    logger.debug(f"任务耗时 {delay:.2f} 秒")

# 定义一个任务函数
def process_task(task):
    print(f"执行任务: {task}")
    simulate_task()  # 模拟任务执行

# 定义一个工作线程,用来执行任务
def worker():
    while True:
        task = task_queue.get()  # 从队列中获取任务
        process_task(task)  # 处理任务
        task_queue.task_done()  # 标记任务完成

# 创建并启动多个工作线程
num_worker_threads = 4
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.daemon = True
    t.start()

# 添加任务到队列
max_tasks = 100
for i in range(max_tasks):  # 外层循环
    level_tasks = random.randint(1, 10)  # 随机生成任务数量
    logger.debug(f"添加 {level_tasks} 个任务到队列 {i}")
    for j in range(level_tasks):  # 内层循环
        task_queue.put((i, j))  # 添加任务到队列

# 等待队列中的任务全部完成
task_queue.join()
cost = time.time() - cost
logger.success("所有任务执行完成! 总共耗时 {cost:.2f} 秒")

任务执行前,先定义一个任务执行的方法

# 定义一个工作线程,用来执行任务
def worker():
    while True:
        task = task_queue.get()  # 从队列中获取任务
        process_task(task)  # 处理任务
        task_queue.task_done()  # 标记任务完成

创建指定数量的线程,并指定线程执行方法 worker

# 创建并启动多个工作线程
num_worker_threads = 4
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.daemon = True
    t.start()

先创建队列 task_queue

task_queue = queue.Queue()

将待执行的任务添加到队列中,由队列调度任务执行

# 添加任务到队列
task_queue.put((i, j)) # 以 Tuple 传参
# 或
task_queue.put({'a':1,'b':2}) # 以 Dict 传参

在执行任务方法 worker 中,可以通过以下方法获取传参信息

# 从队列中获取任务(参数)
task = task_queue.get()
# 比如上面例子中的
# (i, j)
# 或
# {'a':1,'b':2}

# ... 执行真实的任务处理过程

# 当任务成功完成后,标记任务完成
task_queue.task_done()

回到主流程中,在将待处理的任务都添加完成之后,需要做以下的设定

# 等待队列中的任务全部完成
task_queue.join()

通过线程进行任务执行,与顺序执行比较,效果并不是太好。

赞赏