在 Python 中,通过多线程方式执行耗时任务。
import threading
import queue
import random
import time
'''
loguru 是第三方库
使用前,需要安装 loguru
!pip instsall loguru
'''
from loguru import logger
cost = time.time()
# 创建一个队列来存储任务
task_queue = queue.Queue()
# 模拟一个任务的执行
def simulate_task():
delay = random.randint(0, 99)/10 # 模拟任务延迟
logger.debug('任务开始 > ', end='')
time.sleep(delay) # 模拟任务耗时
logger.debug(f"任务耗时 {delay:.2f} 秒")
# 定义一个任务函数
def process_task(task):
print(f"执行任务: {task}")
simulate_task() # 模拟任务执行
# 定义一个工作线程,用来执行任务
def worker():
while True:
task = task_queue.get() # 从队列中获取任务
process_task(task) # 处理任务
task_queue.task_done() # 标记任务完成
# 创建并启动多个工作线程
num_worker_threads = 4
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
# 添加任务到队列
max_tasks = 100
for i in range(max_tasks): # 外层循环
level_tasks = random.randint(1, 10) # 随机生成任务数量
logger.debug(f"添加 {level_tasks} 个任务到队列 {i}")
for j in range(level_tasks): # 内层循环
task_queue.put((i, j)) # 添加任务到队列
# 等待队列中的任务全部完成
task_queue.join()
cost = time.time() - cost
logger.success("所有任务执行完成! 总共耗时 {cost:.2f} 秒")
任务执行前,先定义一个任务执行的方法
# 定义一个工作线程,用来执行任务
def worker():
while True:
task = task_queue.get() # 从队列中获取任务
process_task(task) # 处理任务
task_queue.task_done() # 标记任务完成
创建指定数量的线程,并指定线程执行方法 worker
# 创建并启动多个工作线程
num_worker_threads = 4
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
先创建队列 task_queue
task_queue = queue.Queue()
将待执行的任务添加到队列中,由队列调度任务执行
# 添加任务到队列
task_queue.put((i, j)) # 以 Tuple 传参
# 或
task_queue.put({'a':1,'b':2}) # 以 Dict 传参
在执行任务方法 worker 中,可以通过以下方法获取传参信息
# 从队列中获取任务(参数)
task = task_queue.get()
# 比如上面例子中的
# (i, j)
# 或
# {'a':1,'b':2}
# ... 执行真实的任务处理过程
# 当任务成功完成后,标记任务完成
task_queue.task_done()
回到主流程中,在将待处理的任务都添加完成之后,需要做以下的设定
# 等待队列中的任务全部完成
task_queue.join()
通过线程进行任务执行,与顺序执行比较,效果并不是太好。