queue模块是Python中线程安全的队列实现,用于在多线程环境下安全地传递数据。队列就像排队一样,先来的数据先处理,后来的数据后处理。
这是最常用的队列类型,先加入的数据先取出:
import queue
# 创建队列
q = queue.Queue()
# 添加数据
q.put("第一个任务")
q.put("第二个任务")
q.put("第三个任务")
# 取出数据
print(q.get()) # 输出: 第一个任务
print(q.get()) # 输出: 第二个任务
print(q.get()) # 输出: 第三个任务这种队列像栈一样,最后加入的数据先取出:
import queue
# 创建后进先出队列
q = queue.LifoQueue()
q.put("底层数据")
q.put("中间数据")
q.put("顶层数据")
print(q.get()) # 输出: 顶层数据
print(q.get()) # 输出: 中间数据
print(q.get()) # 输出: 底层数据数据按优先级顺序取出,优先级数字小的先出:
import queue
# 创建优先级队列
q = queue.PriorityQueue()
# 添加数据,格式为(优先级, 数据)
q.put((3, "普通任务"))
q.put((1, "紧急任务"))
q.put((2, "重要任务"))
print(q.get()) # 输出: (1, '紧急任务')
print(q.get()) # 输出: (2, '重要任务')
print(q.get()) # 输出: (3, '普通任务')import queue
q = queue.Queue(maxsize=3) # 设置队列最大容量
# 添加元素
q.put("任务A")
q.put("任务B")
q.put("任务C")
print(f"队列大小: {q.qsize()}")
print(f"队列是否满: {q.full()}")
print(f"队列是否空: {q.empty()}")
# 取出元素
while not q.empty():
item = q.get()
print(f"处理: {item}")
q.task_done() # 标记任务完成
print("所有任务处理完成")import queue
import time
q = queue.Queue(maxsize=2)
def test_timeout():
try:
# 尝试在2秒内添加第三个元素(会超时)
q.put("任务1")
q.put("任务2")
print("前两个任务添加成功")
# 这个会超时,因为队列已满
q.put("任务3", timeout=2)
except queue.Full:
print("队列已满,添加超时")
def test_non_blocking():
try:
# 非阻塞方式获取
item = q.get(block=False)
print(f"获取到: {item}")
except queue.Empty:
print("队列为空")
test_timeout()
test_non_blocking()import queue
import threading
import time
import random
# 创建任务队列
task_queue = queue.Queue()
def producer(worker_id):
"""生产者函数,生成任务"""
for i in range(5):
task = f"任务_{worker_id}_{i}"
task_queue.put(task)
print(f"生产者{worker_id} 生成: {task}")
time.sleep(random.uniform(0.1, 0.5))
print(f"生产者{worker_id} 完成工作")
def consumer(worker_id):
"""消费者函数,处理任务"""
while True:
try:
# 等待任务,最多等待2秒
task = task_queue.get(timeout=2)
print(f"消费者{worker_id} 处理: {task}")
time.sleep(random.uniform(0.2, 1.0)) # 模拟处理时间
task_queue.task_done() # 标记任务完成
except queue.Empty:
print(f"消费者{worker_id} 没有任务,退出")
break
# 启动多个生产者和消费者
producers = []
consumers = []
# 创建2个生产者
for i in range(2):
p = threading.Thread(target=producer, args=(i,))
producers.append(p)
p.start()
# 创建3个消费者
for i in range(3):
c = threading.Thread(target=consumer, args=(i,))
consumers.append(c)
c.start()
# 等待所有生产者完成
for p in producers:
p.join()
# 等待所有任务处理完成
task_queue.join()
print("所有任务处理完毕")import queue
import threading
import time
class WorkerPool:
"""工作线程池"""
def __init__(self, num_workers=3):
self.task_queue = queue.Queue()
self.workers = []
self.running = True
# 创建工作线程
for i in range(num_workers):
worker = threading.Thread(target=self._worker_loop, args=(i,))
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker_loop(self, worker_id):
"""工作线程主循环"""
while self.running:
try:
task = self.task_queue.get(timeout=1)
if task is None: # 停止信号
break
print(f"工作者{worker_id} 开始处理: {task}")
# 执行任务
result = self._execute_task(task)
print(f"工作者{worker_id} 完成: {task} -> {result}")
self.task_queue.task_done()
except queue.Empty:
continue
def _execute_task(self, task):
"""执行具体任务"""
time.sleep(1) # 模拟工作耗时
return f"结果_{task}"
def add_task(self, task):
"""添加任务"""
self.task_queue.put(task)
def wait_completion(self):
"""等待所有任务完成"""
self.task_queue.join()
def shutdown(self):
"""关闭线程池"""
self.running = False
for _ in self.workers:
self.task_queue.put(None) # 发送停止信号
# 使用工作线程池
pool = WorkerPool(2)
# 添加10个任务
for i in range(10):
pool.add_task(f"工作{i}")
print("等待所有任务完成...")
pool.wait_completion()
print("所有任务已完成")
pool.shutdown()import queue
import threading
import requests
import time
from urllib.parse import urljoin
class WebCrawler:
"""简单的网页爬虫"""
def __init__(self, max_workers=3):
self.url_queue = queue.Queue()
self.visited = set()
self.results = queue.Queue()
self.max_workers = max_workers
def crawl(self, start_url, max_pages=10):
"""开始爬取"""
self.url_queue.put(start_url)
self.visited.add(start_url)
# 启动工作线程
threads = []
for i in range(self.max_workers):
t = threading.Thread(target=self._crawl_worker, args=(i,))
t.daemon = True
t.start()
threads.append(t)
# 等待爬取完成或达到最大页面数
pages_crawled = 0
while pages_crawled < max_pages and not self.url_queue.empty():
try:
result = self.results.get(timeout=5)
pages_crawled += 1
print(f"已爬取 {pages_crawled}/{max_pages} 页面")
except queue.Empty:
break
print(f"爬取完成,共处理 {pages_crawled} 个页面")
def _crawl_worker(self, worker_id):
"""爬虫工作线程"""
while True:
try:
url = self.url_queue.get(timeout=3)
print(f"爬虫{worker_id} 处理: {url}")
# 获取网页内容
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
# 这里可以解析链接并添加到队列
# 简化处理,只是保存结果
self.results.put({
'url': url,
'status': response.status_code,
'size': len(response.content)
})
except Exception as e:
print(f"爬取失败 {url}: {e}")
self.url_queue.task_done()
except queue.Empty:
break
# 使用爬虫
crawler = WebCrawler(2)
crawler.crawl('http://httpbin.org/html', max_pages=5)import queue
import threading
import time
import random
class DataProcessingPipeline:
"""数据处理管道"""
def __init__(self):
# 创建三个阶段的队列
self.raw_data_queue = queue.Queue()
self.processed_data_queue = queue.Queue()
self.final_result_queue = queue.Queue()
def start_stage1(self):
"""第一阶段:数据收集"""
def collector():
for i in range(10):
data = f"原始数据_{i}"
self.raw_data_queue.put(data)
print(f"收集: {data}")
time.sleep(0.1)
self.raw_data_queue.put(None) # 结束信号
threading.Thread(target=collector, daemon=True).start()
def start_stage2(self):
"""第二阶段:数据处理"""
def processor():
while True:
data = self.raw_data_queue.get()
if data is None:
self.raw_data_queue.put(None) # 传递结束信号
self.processed_data_queue.put(None)
break
# 模拟数据处理
processed = f"处理后的[{data}]"
self.processed_data_queue.put(processed)
print(f"处理: {data} -> {processed}")
self.raw_data_queue.task_done()
threading.Thread(target=processor, daemon=True).start()
def start_stage3(self):
"""第三阶段:结果保存"""
def saver():
while True:
data = self.processed_data_queue.get()
if data is None:
break
# 模拟保存操作
print(f"保存: {data}")
self.final_result_queue.put(data)
self.processed_data_queue.task_done()
time.sleep(0.2)
threading.Thread(target=saver, daemon=True).start()
def run_pipeline(self):
"""运行整个管道"""
print("启动数据处理管道...")
self.start_stage1()
self.start_stage2()
self.start_stage3()
# 等待所有阶段完成
self.raw_data_queue.join()
self.processed_data_queue.join()
print("数据处理管道完成")
# 运行管道
pipeline = DataProcessingPipeline()
pipeline.run_pipeline()import queue
import threading
import time
class TaskScheduler:
"""任务调度器"""
def __init__(self):
self.task_queue = queue.PriorityQueue()
self.worker_thread = threading.Thread(target=self._worker)
self.running = False
def add_task(self, priority, task_name, task_func, *args):
"""添加任务"""
task_data = (priority, time.time(), task_name, task_func, args)
self.task_queue.put(task_data)
print(f"添加任务: {task_name} (优先级: {priority})")
def _worker(self):
"""工作线程"""
while self.running or not self.task_queue.empty():
try:
priority, timestamp, name, func, args = self.task_queue.get(timeout=1)
print(f"执行任务: {name}")
# 执行任务函数
try:
result = func(*args)
print(f"任务完成: {name} -> {result}")
except Exception as e:
print(f"任务失败: {name} - {e}")
self.task_queue.task_done()
except queue.Empty:
continue
def start(self):
"""启动调度器"""
self.running = True
self.worker_thread.start()
print("任务调度器启动")
def stop(self):
"""停止调度器"""
self.running = False
self.worker_thread.join()
print("任务调度器停止")
# 定义一些示例任务
def backup_data():
time.sleep(2)
return "备份完成"
def send_email():
time.sleep(1)
return "邮件发送完成"
def generate_report():
time.sleep(3)
return "报告生成完成"
# 使用任务调度器
scheduler = TaskScheduler()
scheduler.start()
# 添加不同优先级的任务
scheduler.add_task(1, "紧急备份", backup_data)
scheduler.add_task(3, "生成周报", generate_report)
scheduler.add_task(2, "发送通知邮件", send_email)
scheduler.add_task(1, "紧急安全检查", lambda: "安全检查完成")
# 等待所有任务完成
scheduler.task_queue.join()
scheduler.stop()import queue
import threading
import time
from dataclasses import dataclass
from typing import Any
@dataclass
class QueueStats:
"""队列统计信息"""
total_processed: int = 0
average_wait_time: float = 0.0
max_queue_size: int = 0
error_count: int = 0
class MonitoredQueue:
"""带监控的队列"""
def __init__(self, maxsize=0):
self.queue = queue.Queue(maxsize=maxsize)
self.stats = QueueStats()
self._lock = threading.Lock()
self._start_times = {}
def put(self, item, task_id=None):
"""放入项目并记录开始时间"""
if task_id is not None:
self._start_times[task_id] = time.time()
self.queue.put(item)
with self._lock:
current_size = self.queue.qsize()
if current_size > self.stats.max_queue_size:
self.stats.max_queue_size = current_size
def get(self):
"""获取项目"""
return self.queue.get()
def task_done(self, task_id=None, success=True):
"""标记任务完成并更新统计"""
self.queue.task_done()
with self._lock:
self.stats.total_processed += 1
if not success:
self.stats.error_count += 1
if task_id and task_id in self._start_times:
wait_time = time.time() - self._start_times[task_id]
# 更新平均等待时间
n = self.stats.total_processed
self.stats.average_wait_time = (
(self.stats.average_wait_time * (n - 1) + wait_time) / n
)
del self._start_times[task_id]
def get_stats(self):
"""获取统计信息"""
with self._lock:
return QueueStats(
total_processed=self.stats.total_processed,
average_wait_time=self.stats.average_wait_time,
max_queue_size=self.stats.max_queue_size,
error_count=self.stats.error_count
)
# 测试监控队列
def test_monitored_queue():
mq = MonitoredQueue()
def worker():
for i in range(5):
task_id = f"task_{i}"
mq.put(f"数据{i}", task_id)
time.sleep(0.1)
def processor():
for i in range(5):
item = mq.get()
task_id = f"task_{i}"
print(f"处理: {item}")
time.sleep(0.2) # 模拟处理时间
mq.task_done(task_id, success=True)
# 启动线程
t1 = threading.Thread(target=worker)
t2 = threading.Thread(target=processor)
t1.start()
t2.start()
t1.join()
t2.join()
# 输出统计信息
stats = mq.get_stats()
print(f"\n队列统计:")
print(f"总处理数: {stats.total_processed}")
print(f"平均等待时间: {stats.average_wait_time:.2f}秒")
print(f"最大队列大小: {stats.max_queue_size}")
print(f"错误数: {stats.error_count}")
test_monitored_queue()以下是 Python queue 模块(线程安全队列)的常用类、方法及属性的表格说明,包含功能描述和示例:
| 类 | 说明 | 适用场景 |
|---|---|---|
| queue.Queue | 先进先出(FIFO)队列 | 通用任务队列 |
| queue.LifoQueue | 后进先出(LIFO)队列(类似栈) | 需要后进先出的场景 |
| queue.PriorityQueue | 优先级队列(最小堆实现) | 按优先级处理任务 |
| queue.SimpleQueue | 更简单的FIFO队列(Python 3.7+) | 不需要高级功能的场景 |
| 方法 | 说明 | 示例 | 返回值 |
|---|---|---|---|
| put(item) | 放入元素 | q.put("task1") | None |
| get() | 取出并移除元素 | item = q.get() | 队列元素 |
| empty() | 判断队列是否为空 | if q.empty(): | True/False |
| full() | 判断队列是否已满 | if q.full(): | True/False |
| qsize() | 返回队列当前大小 | size = q.qsize() | 整数 |
| task_done() | 标记任务完成(用于join()) | q.task_done() | None |
| join() | 阻塞直到所有任务完成 | q.join() | None |
| 参数 | 说明 | 默认值 | 示例 |
|---|---|---|---|
| block | 当队列为空/满时是否阻塞 | True | q.get(block=False) |
| timeout | 阻塞超时时间(秒) | None | q.put(x, timeout=5) |
queue模块提供了线程安全的队列实现,主要特点:
线程安全:多个线程可以安全地同时操作
多种队列类型:先进先出、后进先出、优先级队列
阻塞控制:可以设置超时和非阻塞操作
任务跟踪:使用task_done()和join()跟踪任务完成情况
使用queue模块的常见场景:
多线程任务分配
生产者-消费者模式
工作线程池
数据处理管道
任务调度系统
记住在使用队列时要合理设置队列大小,避免内存问题,同时要正确处理队列空和队列满的情况。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!