Python3入门指南Python语言的特点和实际应用Python3环境搭建配置VSCode进行Python开发Python基础语法Python变量与数据类型Python数据类型转换Python解释器使用Python注释使用Python运算符Python数字类型Python字符串操作Python列表操作Python元组使用Python字典使用Python集合使用Python条件控制详解Python循环语句详解Python编程入门实践Python推导式详解Python迭代器和生成器Python with语句详解Python函数详解Python lambda(匿名函数)Python装饰器Python数据结构Python模块和包使用Python中__name__和__main__的用法Python输入输出:从基础到文件操作Python文件操作Python OS模块使用Python错误和异常处理Python面向对象编程Python命名空间和作用域Python虚拟环境:venv详细教程Python类型注解Python标准库常用模块Python正则表达式Python CGI编程Python MySQL(mysql-connector驱动)Python MySQL(PyMySQL驱动)Python网络编程Python发送邮件Python多线程编程Python XML解析Python JSON解析Python日期和时间处理Python操作MongoDBPython urllib库使用Python uWSGI 安装与配置Python pip包管理工具Python operator模块Python math模块Python requests模块HTTP请求Python random模块Python OpenAI库Python AI绘画制作Python statistics模块Python hashlib模块:哈希加密Python量化交易Python pyecharts数据可视化Python Selenium网页自动化Python BeautifulSoup网页数据提取Python Scrapy爬虫框架Python Markdown转HTMLPython sys模块Python Pickle模块:数据存储Python subprocess模块Python queue队列模块Python StringIO内存文件操作Python logging日志记录Python datetime日期时间处理Python re正则表达式Python csv表格数据处理Python threading 多线程编程Python asyncio 异步编程Python PyQt 图形界面开发Python 应用方向和常用库框架

Python queue队列模块

queue模块是Python中线程安全的队列实现,用于在多线程环境下安全地传递数据。队列就像排队一样,先来的数据先处理,后来的数据后处理。


队列的基本类型

先进先出队列(Queue)

这是最常用的队列类型,先加入的数据先取出:

import queue

# 创建队列
q = queue.Queue()

# 添加数据
q.put("第一个任务")
q.put("第二个任务")
q.put("第三个任务")

# 取出数据
print(q.get())  # 输出: 第一个任务
print(q.get())  # 输出: 第二个任务
print(q.get())  # 输出: 第三个任务

后进先出队列(LifoQueue)

这种队列像栈一样,最后加入的数据先取出:

import queue

# 创建后进先出队列
q = queue.LifoQueue()

q.put("底层数据")
q.put("中间数据")
q.put("顶层数据")

print(q.get())  # 输出: 顶层数据
print(q.get())  # 输出: 中间数据
print(q.get())  # 输出: 底层数据

优先级队列(PriorityQueue)

数据按优先级顺序取出,优先级数字小的先出:

import queue

# 创建优先级队列
q = queue.PriorityQueue()

# 添加数据,格式为(优先级, 数据)
q.put((3, "普通任务"))
q.put((1, "紧急任务"))
q.put((2, "重要任务"))

print(q.get())  # 输出: (1, '紧急任务')
print(q.get())  # 输出: (2, '重要任务')
print(q.get())  # 输出: (3, '普通任务')


队列常用操作

基本方法使用

import queue

q = queue.Queue(maxsize=3)  # 设置队列最大容量

# 添加元素
q.put("任务A")
q.put("任务B")
q.put("任务C")

print(f"队列大小: {q.qsize()}")
print(f"队列是否满: {q.full()}")
print(f"队列是否空: {q.empty()}")

# 取出元素
while not q.empty():
    item = q.get()
    print(f"处理: {item}")
    q.task_done()  # 标记任务完成

print("所有任务处理完成")

超时和阻塞控制

import queue
import time

q = queue.Queue(maxsize=2)

def test_timeout():
    try:
        # 尝试在2秒内添加第三个元素(会超时)
        q.put("任务1")
        q.put("任务2")
        print("前两个任务添加成功")
        
        # 这个会超时,因为队列已满
        q.put("任务3", timeout=2)
        
    except queue.Full:
        print("队列已满,添加超时")

def test_non_blocking():
    try:
        # 非阻塞方式获取
        item = q.get(block=False)
        print(f"获取到: {item}")
    except queue.Empty:
        print("队列为空")

test_timeout()
test_non_blocking()


多线程应用

生产者-消费者模式

import queue
import threading
import time
import random

# 创建任务队列
task_queue = queue.Queue()

def producer(worker_id):
    """生产者函数,生成任务"""
    for i in range(5):
        task = f"任务_{worker_id}_{i}"
        task_queue.put(task)
        print(f"生产者{worker_id} 生成: {task}")
        time.sleep(random.uniform(0.1, 0.5))
    
    print(f"生产者{worker_id} 完成工作")

def consumer(worker_id):
    """消费者函数,处理任务"""
    while True:
        try:
            # 等待任务,最多等待2秒
            task = task_queue.get(timeout=2)
            print(f"消费者{worker_id} 处理: {task}")
            time.sleep(random.uniform(0.2, 1.0))  # 模拟处理时间
            task_queue.task_done()  # 标记任务完成
            
        except queue.Empty:
            print(f"消费者{worker_id} 没有任务,退出")
            break

# 启动多个生产者和消费者
producers = []
consumers = []

# 创建2个生产者
for i in range(2):
    p = threading.Thread(target=producer, args=(i,))
    producers.append(p)
    p.start()

# 创建3个消费者
for i in range(3):
    c = threading.Thread(target=consumer, args=(i,))
    consumers.append(c)
    c.start()

# 等待所有生产者完成
for p in producers:
    p.join()

# 等待所有任务处理完成
task_queue.join()

print("所有任务处理完毕")

工作线程池

import queue
import threading
import time

class WorkerPool:
    """工作线程池"""
    
    def __init__(self, num_workers=3):
        self.task_queue = queue.Queue()
        self.workers = []
        self.running = True
        
        # 创建工作线程
        for i in range(num_workers):
            worker = threading.Thread(target=self._worker_loop, args=(i,))
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
    
    def _worker_loop(self, worker_id):
        """工作线程主循环"""
        while self.running:
            try:
                task = self.task_queue.get(timeout=1)
                if task is None:  # 停止信号
                    break
                
                print(f"工作者{worker_id} 开始处理: {task}")
                # 执行任务
                result = self._execute_task(task)
                print(f"工作者{worker_id} 完成: {task} -> {result}")
                
                self.task_queue.task_done()
                
            except queue.Empty:
                continue
    
    def _execute_task(self, task):
        """执行具体任务"""
        time.sleep(1)  # 模拟工作耗时
        return f"结果_{task}"
    
    def add_task(self, task):
        """添加任务"""
        self.task_queue.put(task)
    
    def wait_completion(self):
        """等待所有任务完成"""
        self.task_queue.join()
    
    def shutdown(self):
        """关闭线程池"""
        self.running = False
        for _ in self.workers:
            self.task_queue.put(None)  # 发送停止信号

# 使用工作线程池
pool = WorkerPool(2)

# 添加10个任务
for i in range(10):
    pool.add_task(f"工作{i}")

print("等待所有任务完成...")
pool.wait_completion()
print("所有任务已完成")

pool.shutdown()


实际应用场景

网页爬虫任务队列

import queue
import threading
import requests
import time
from urllib.parse import urljoin

class WebCrawler:
    """简单的网页爬虫"""
    
    def __init__(self, max_workers=3):
        self.url_queue = queue.Queue()
        self.visited = set()
        self.results = queue.Queue()
        self.max_workers = max_workers
    
    def crawl(self, start_url, max_pages=10):
        """开始爬取"""
        self.url_queue.put(start_url)
        self.visited.add(start_url)
        
        # 启动工作线程
        threads = []
        for i in range(self.max_workers):
            t = threading.Thread(target=self._crawl_worker, args=(i,))
            t.daemon = True
            t.start()
            threads.append(t)
        
        # 等待爬取完成或达到最大页面数
        pages_crawled = 0
        while pages_crawled < max_pages and not self.url_queue.empty():
            try:
                result = self.results.get(timeout=5)
                pages_crawled += 1
                print(f"已爬取 {pages_crawled}/{max_pages} 页面")
            except queue.Empty:
                break
        
        print(f"爬取完成,共处理 {pages_crawled} 个页面")
    
    def _crawl_worker(self, worker_id):
        """爬虫工作线程"""
        while True:
            try:
                url = self.url_queue.get(timeout=3)
                print(f"爬虫{worker_id} 处理: {url}")
                
                # 获取网页内容
                try:
                    response = requests.get(url, timeout=5)
                    if response.status_code == 200:
                        # 这里可以解析链接并添加到队列
                        # 简化处理,只是保存结果
                        self.results.put({
                            'url': url,
                            'status': response.status_code,
                            'size': len(response.content)
                        })
                
                except Exception as e:
                    print(f"爬取失败 {url}: {e}")
                
                self.url_queue.task_done()
                
            except queue.Empty:
                break

# 使用爬虫
crawler = WebCrawler(2)
crawler.crawl('http://httpbin.org/html', max_pages=5)

数据处理管道

import queue
import threading
import time
import random

class DataProcessingPipeline:
    """数据处理管道"""
    
    def __init__(self):
        # 创建三个阶段的队列
        self.raw_data_queue = queue.Queue()
        self.processed_data_queue = queue.Queue()
        self.final_result_queue = queue.Queue()
    
    def start_stage1(self):
        """第一阶段:数据收集"""
        def collector():
            for i in range(10):
                data = f"原始数据_{i}"
                self.raw_data_queue.put(data)
                print(f"收集: {data}")
                time.sleep(0.1)
            self.raw_data_queue.put(None)  # 结束信号
        
        threading.Thread(target=collector, daemon=True).start()
    
    def start_stage2(self):
        """第二阶段:数据处理"""
        def processor():
            while True:
                data = self.raw_data_queue.get()
                if data is None:
                    self.raw_data_queue.put(None)  # 传递结束信号
                    self.processed_data_queue.put(None)
                    break
                
                # 模拟数据处理
                processed = f"处理后的[{data}]"
                self.processed_data_queue.put(processed)
                print(f"处理: {data} -> {processed}")
                self.raw_data_queue.task_done()
        
        threading.Thread(target=processor, daemon=True).start()
    
    def start_stage3(self):
        """第三阶段:结果保存"""
        def saver():
            while True:
                data = self.processed_data_queue.get()
                if data is None:
                    break
                
                # 模拟保存操作
                print(f"保存: {data}")
                self.final_result_queue.put(data)
                self.processed_data_queue.task_done()
                time.sleep(0.2)
        
        threading.Thread(target=saver, daemon=True).start()
    
    def run_pipeline(self):
        """运行整个管道"""
        print("启动数据处理管道...")
        
        self.start_stage1()
        self.start_stage2()
        self.start_stage3()
        
        # 等待所有阶段完成
        self.raw_data_queue.join()
        self.processed_data_queue.join()
        
        print("数据处理管道完成")

# 运行管道
pipeline = DataProcessingPipeline()
pipeline.run_pipeline()


高级用法

带优先级的任务调度

import queue
import threading
import time

class TaskScheduler:
    """任务调度器"""
    
    def __init__(self):
        self.task_queue = queue.PriorityQueue()
        self.worker_thread = threading.Thread(target=self._worker)
        self.running = False
    
    def add_task(self, priority, task_name, task_func, *args):
        """添加任务"""
        task_data = (priority, time.time(), task_name, task_func, args)
        self.task_queue.put(task_data)
        print(f"添加任务: {task_name} (优先级: {priority})")
    
    def _worker(self):
        """工作线程"""
        while self.running or not self.task_queue.empty():
            try:
                priority, timestamp, name, func, args = self.task_queue.get(timeout=1)
                print(f"执行任务: {name}")
                
                # 执行任务函数
                try:
                    result = func(*args)
                    print(f"任务完成: {name} -> {result}")
                except Exception as e:
                    print(f"任务失败: {name} - {e}")
                
                self.task_queue.task_done()
                
            except queue.Empty:
                continue
    
    def start(self):
        """启动调度器"""
        self.running = True
        self.worker_thread.start()
        print("任务调度器启动")
    
    def stop(self):
        """停止调度器"""
        self.running = False
        self.worker_thread.join()
        print("任务调度器停止")

# 定义一些示例任务
def backup_data():
    time.sleep(2)
    return "备份完成"

def send_email():
    time.sleep(1)
    return "邮件发送完成"

def generate_report():
    time.sleep(3)
    return "报告生成完成"

# 使用任务调度器
scheduler = TaskScheduler()
scheduler.start()

# 添加不同优先级的任务
scheduler.add_task(1, "紧急备份", backup_data)
scheduler.add_task(3, "生成周报", generate_report)
scheduler.add_task(2, "发送通知邮件", send_email)
scheduler.add_task(1, "紧急安全检查", lambda: "安全检查完成")

# 等待所有任务完成
scheduler.task_queue.join()
scheduler.stop()

队列监控和统计

import queue
import threading
import time
from dataclasses import dataclass
from typing import Any

@dataclass
class QueueStats:
    """队列统计信息"""
    total_processed: int = 0
    average_wait_time: float = 0.0
    max_queue_size: int = 0
    error_count: int = 0

class MonitoredQueue:
    """带监控的队列"""
    
    def __init__(self, maxsize=0):
        self.queue = queue.Queue(maxsize=maxsize)
        self.stats = QueueStats()
        self._lock = threading.Lock()
        self._start_times = {}
    
    def put(self, item, task_id=None):
        """放入项目并记录开始时间"""
        if task_id is not None:
            self._start_times[task_id] = time.time()
        self.queue.put(item)
        
        with self._lock:
            current_size = self.queue.qsize()
            if current_size > self.stats.max_queue_size:
                self.stats.max_queue_size = current_size
    
    def get(self):
        """获取项目"""
        return self.queue.get()
    
    def task_done(self, task_id=None, success=True):
        """标记任务完成并更新统计"""
        self.queue.task_done()
        
        with self._lock:
            self.stats.total_processed += 1
            if not success:
                self.stats.error_count += 1
            
            if task_id and task_id in self._start_times:
                wait_time = time.time() - self._start_times[task_id]
                # 更新平均等待时间
                n = self.stats.total_processed
                self.stats.average_wait_time = (
                    (self.stats.average_wait_time * (n - 1) + wait_time) / n
                )
                del self._start_times[task_id]
    
    def get_stats(self):
        """获取统计信息"""
        with self._lock:
            return QueueStats(
                total_processed=self.stats.total_processed,
                average_wait_time=self.stats.average_wait_time,
                max_queue_size=self.stats.max_queue_size,
                error_count=self.stats.error_count
            )

# 测试监控队列
def test_monitored_queue():
    mq = MonitoredQueue()
    
    def worker():
        for i in range(5):
            task_id = f"task_{i}"
            mq.put(f"数据{i}", task_id)
            time.sleep(0.1)
    
    def processor():
        for i in range(5):
            item = mq.get()
            task_id = f"task_{i}"
            print(f"处理: {item}")
            time.sleep(0.2)  # 模拟处理时间
            mq.task_done(task_id, success=True)
    
    # 启动线程
    t1 = threading.Thread(target=worker)
    t2 = threading.Thread(target=processor)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    # 输出统计信息
    stats = mq.get_stats()
    print(f"\n队列统计:")
    print(f"总处理数: {stats.total_processed}")
    print(f"平均等待时间: {stats.average_wait_time:.2f}秒")
    print(f"最大队列大小: {stats.max_queue_size}")
    print(f"错误数: {stats.error_count}")

test_monitored_queue()


常用的属性和方法

以下是 Python queue 模块(线程安全队列)的常用类、方法及属性的表格说明,包含功能描述和示例:

queue 模块核心类

说明适用场景
queue.Queue先进先出(FIFO)队列通用任务队列
queue.LifoQueue后进先出(LIFO)队列(类似栈)需要后进先出的场景
queue.PriorityQueue优先级队列(最小堆实现)按优先级处理任务
queue.SimpleQueue更简单的FIFO队列(Python 3.7+)不需要高级功能的场景

通用方法(所有队列类都支持)

方法说明示例返回值
put(item)放入元素q.put("task1")None
get()取出并移除元素item = q.get()队列元素
empty()判断队列是否为空if q.empty():True/False
full()判断队列是否已满if q.full():True/False
qsize()返回队列当前大小size = q.qsize()整数
task_done()标记任务完成(用于join())q.task_done()None
join()阻塞直到所有任务完成q.join()None

阻塞控制参数

参数说明默认值示例
block当队列为空/满时是否阻塞Trueq.get(block=False)
timeout阻塞超时时间(秒)Noneq.put(x, timeout=5)


总结

queue模块提供了线程安全的队列实现,主要特点:

  • 线程安全:多个线程可以安全地同时操作

  • 多种队列类型:先进先出、后进先出、优先级队列

  • 阻塞控制:可以设置超时和非阻塞操作

  • 任务跟踪:使用task_done()和join()跟踪任务完成情况

使用queue模块的常见场景:

  • 多线程任务分配

  • 生产者-消费者模式

  • 工作线程池

  • 数据处理管道

  • 任务调度系统

记住在使用队列时要合理设置队列大小,避免内存问题,同时要正确处理队列空和队列满的情况。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2140

目录选择