Python3入门指南Python语言的特点和实际应用Python3环境搭建配置VSCode进行Python开发Python基础语法Python变量与数据类型Python数据类型转换Python解释器使用Python注释使用Python运算符Python数字类型Python字符串操作Python列表操作Python元组使用Python字典使用Python集合使用Python条件控制详解Python循环语句详解Python编程入门实践Python推导式详解Python迭代器和生成器Python with语句详解Python函数详解Python lambda(匿名函数)Python装饰器Python数据结构Python模块和包使用Python中__name__和__main__的用法Python输入输出:从基础到文件操作Python文件操作Python OS模块使用Python错误和异常处理Python面向对象编程Python命名空间和作用域Python虚拟环境:venv详细教程Python类型注解Python标准库常用模块Python正则表达式Python CGI编程Python MySQL(mysql-connector驱动)Python MySQL(PyMySQL驱动)Python网络编程Python发送邮件Python多线程编程Python XML解析Python JSON解析Python日期和时间处理Python操作MongoDBPython urllib库使用Python uWSGI 安装与配置Python pip包管理工具Python operator模块Python math模块Python requests模块HTTP请求Python random模块Python OpenAI库Python AI绘画制作Python statistics模块Python hashlib模块:哈希加密Python量化交易Python pyecharts数据可视化Python Selenium网页自动化Python BeautifulSoup网页数据提取Python Scrapy爬虫框架Python Markdown转HTMLPython sys模块Python Pickle模块:数据存储Python subprocess模块Python queue队列模块Python StringIO内存文件操作Python logging日志记录Python datetime日期时间处理Python re正则表达式Python csv表格数据处理Python threading 多线程编程Python asyncio 异步编程Python PyQt 图形界面开发Python 应用方向和常用库框架

Python subprocess模块

subprocess模块是Python中用来运行外部命令和程序的工具。通过这个模块,我们可以在Python代码中执行系统命令、启动其他程序,并获取它们的执行结果。


为什么需要subprocess

在Python程序中,有时需要调用系统命令或其他程序:

  • 运行系统工具(如ping、ls、dir等)

  • 调用其他编程语言的程序

  • 执行shell脚本或批处理文件

  • 自动化系统管理任务

相比旧的os.system()方法,subprocess更安全、功能更强大。


基本使用方法

运行简单命令

import subprocess

# 运行ls命令(Linux/Mac)或dir命令(Windows)
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)

print("命令输出:")
print(result.stdout)

if result.returncode == 0:
    print("命令执行成功")
else:
    print("命令执行失败")

在Windows系统中,可以这样使用:

import subprocess

# Windows下查看目录
result = subprocess.run(['dir'], capture_output=True, text=True, shell=True)
print(result.stdout)

获取命令执行结果

import subprocess

def run_command(command):
    """运行命令并返回结果"""
    try:
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            timeout=30  # 30秒超时
        )
        
        return {
            'success': result.returncode == 0,
            'output': result.stdout,
            'error': result.stderr,
            'return_code': result.returncode
        }
    
    except subprocess.TimeoutExpired:
        return {'success': False, 'error': '命令执行超时'}
    except Exception as e:
        return {'success': False, 'error': str(e)}

# 测试命令
result = run_command(['python', '--version'])
print(f"Python版本: {result['output']}")
print(f"执行成功: {result['success']}")


处理输入和输出

向命令传递输入

import subprocess

# 向grep命令传递输入数据
text_data = """苹果
香蕉
橙子
西瓜
葡萄"""

# 查找包含"果"的行
result = subprocess.run(
    ['grep', '果'],
    input=text_data,
    capture_output=True,
    text=True
)

print("查找结果:")
print(result.stdout)

实时获取输出

import subprocess

def run_command_realtime(command):
    """实时显示命令输出"""
    process = subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    
    print("命令开始执行...")
    
    # 实时读取输出
    while True:
        output = process.stdout.readline()
        if output == '' and process.poll() is not None:
            break
        if output:
            print(output.strip())
    
    # 获取错误信息
    stderr = process.stderr.read()
    if stderr:
        print("错误信息:")
        print(stderr)
    
    return process.poll()

# 测试实时输出
return_code = run_command_realtime(['ping', '-c', '4', 'google.com'])
print(f"命令结束,退出码: {return_code}")


错误处理

检查命令执行状态

import subprocess

def safe_command_execution(command):
    """安全执行命令,包含错误处理"""
    try:
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            check=True  # 如果返回码非零则抛出异常
        )
        return result.stdout
    
    except subprocess.CalledProcessError as e:
        print(f"命令执行失败,退出码: {e.returncode}")
        print(f"错误信息: {e.stderr}")
        return None
    
    except FileNotFoundError:
        print(f"命令不存在: {command[0]}")
        return None
    
    except Exception as e:
        print(f"执行命令时发生错误: {e}")
        return None

# 测试错误处理
output = safe_command_execution(['ls', '不存在的文件'])
if output is None:
    print("命令执行失败,已处理错误")


高级用法

使用管道连接多个命令

import subprocess

def pipe_commands():
    """使用管道连接多个命令"""
    
    # 第一个命令:列出文件
    list_files = subprocess.Popen(
        ['ls', '-l'],
        stdout=subprocess.PIPE,
        text=True
    )
    
    # 第二个命令:过滤包含.py的文件
    grep_py = subprocess.Popen(
        ['grep', '.py'],
        stdin=list_files.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    
    # 第三个命令:统计行数
    wc_lines = subprocess.Popen(
        ['wc', '-l'],
        stdin=grep_py.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    
    # 获取最终结果
    output = wc_lines.communicate()[0]
    print(f"找到 {output.strip()} 个Python文件")

pipe_commands()

与进程交互

import subprocess

def interactive_process():
    """与进程进行交互"""
    
    # 启动Python交互式环境
    process = subprocess.Popen(
        ['python'],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )
    
    # 要执行的Python代码
    commands = [
        "print('Hello from subprocess')",
        "x = 10 + 20",
        "print(f'计算结果: {x}')",
        "exit()"
    ]
    
    # 逐条发送命令
    for cmd in commands:
        print(f"发送命令: {cmd}")
        process.stdin.write(cmd + "\n")
        process.stdin.flush()
    
    # 获取所有输出
    output, errors = process.communicate()
    
    print("进程输出:")
    print(output)
    
    if errors:
        print("错误信息:")
        print(errors)

interactive_process()


实际应用场景

文件操作工具

import subprocess
import os

class FileManager:
    """文件管理工具类"""
    
    @staticmethod
    def find_files(directory, pattern):
        """查找匹配模式的文件"""
        try:
            result = subprocess.run(
                ['find', directory, '-name', pattern],
                capture_output=True,
                text=True,
                timeout=60
            )
            
            if result.returncode == 0:
                files = result.stdout.strip().split('\n')
                return [f for f in files if f]  # 过滤空行
            else:
                print(f"查找失败: {result.stderr}")
                return []
        
        except subprocess.TimeoutExpired:
            print("查找超时")
            return []
    
    @staticmethod
    def get_file_info(filename):
        """获取文件详细信息"""
        if not os.path.exists(filename):
            return None
        
        try:
            # 在Linux/Mac上使用stat命令
            result = subprocess.run(
                ['stat', filename],
                capture_output=True,
                text=True
            )
            
            if result.returncode == 0:
                return result.stdout
            else:
                # 在Windows上使用dir命令
                result = subprocess.run(
                    ['cmd', '/c', 'dir', filename],
                    capture_output=True,
                    text=True
                )
                return result.stdout if result.returncode == 0 else None
        
        except Exception as e:
            print(f"获取文件信息失败: {e}")
            return None

# 使用文件管理工具
files = FileManager.find_files('.', '*.py')
print(f"找到 {len(files)} 个Python文件")

if files:
    info = FileManager.get_file_info(files[0])
    if info:
        print("第一个文件信息:")
        print(info)

系统监控工具

import subprocess
import time

class SystemMonitor:
    """系统监控工具"""
    
    @staticmethod
    def get_system_info():
        """获取系统信息"""
        commands = {
            '系统时间': ['date'],
            '内存使用': ['free', '-h'],
            '磁盘空间': ['df', '-h'],
            '运行进程': ['ps', 'aux']
        }
        
        system_info = {}
        
        for name, cmd in commands.items():
            try:
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    timeout=10
                )
                
                if result.returncode == 0:
                    system_info[name] = result.stdout
                else:
                    system_info[name] = f"获取失败: {result.stderr}"
            
            except Exception as e:
                system_info[name] = f"执行错误: {e}"
        
        return system_info
    
    @staticmethod
    def monitor_process(process_name, duration=60):
        """监控特定进程"""
        print(f"开始监控进程: {process_name}, 持续时间: {duration}秒")
        
        start_time = time.time()
        
        while time.time() - start_time < duration:
            try:
                # 检查进程是否存在
                result = subprocess.run(
                    ['pgrep', '-f', process_name],
                    capture_output=True,
                    text=True
                )
                
                if result.returncode == 0:
                    print(f"{time.strftime('%H:%M:%S')} - 进程运行中")
                else:
                    print(f"{time.strftime('%H:%M:%S')} - 进程未运行")
                
                time.sleep(5)  # 每5秒检查一次
            
            except Exception as e:
                print(f"监控出错: {e}")
                break
        
        print("监控结束")

# 使用系统监控
info = SystemMonitor.get_system_info()
for name, data in info.items():
    print(f"=== {name} ===")
    print(data[:200] + "..." if len(data) > 200 else data)
    print()

# 监控Python进程(在另一个终端运行一个Python程序测试)
# SystemMonitor.monitor_process('python', 30)


安全注意事项

安全的命令执行

import subprocess
import shlex

def safe_shell_command(command_string):
    """安全地执行shell命令"""
    
    # 验证命令是否安全(简单示例)
    dangerous_commands = ['rm -rf', 'format', 'dd if=']
    for dangerous in dangerous_commands:
        if dangerous in command_string:
            raise ValueError(f"危险命令被阻止: {dangerous}")
    
    try:
        # 使用shlex分割命令参数,避免shell注入
        command_args = shlex.split(command_string)
        
        result = subprocess.run(
            command_args,
            capture_output=True,
            text=True,
            timeout=30,
            shell=False  # 重要:不使用shell,避免注入攻击
        )
        
        return {
            'success': result.returncode == 0,
            'output': result.stdout,
            'error': result.stderr
        }
    
    except Exception as e:
        return {'success': False, 'error': str(e)}

# 测试安全命令执行
result = safe_shell_command('echo "Hello World"')
if result['success']:
    print(f"命令输出: {result['output']}")
else:
    print(f"执行失败: {result['error']}")

# 尝试危险命令(会被阻止)
try:
    safe_shell_command('rm -rf /')
except ValueError as e:
    print(f"安全机制生效: {e}")


跨平台兼容性

处理不同操作系统的命令

import subprocess
import platform

class CrossPlatformCommand:
    """跨平台命令执行"""
    
    @staticmethod
    def list_files(directory='.'):
        """列出目录文件(跨平台)"""
        system = platform.system().lower()
        
        if system == 'windows':
            command = ['cmd', '/c', 'dir', directory]
        else:  # linux, darwin (mac), etc.
            command = ['ls', '-la', directory]
        
        try:
            result = subprocess.run(
                command,
                capture_output=True,
                text=True,
                timeout=30
            )
            
            return result.stdout if result.returncode == 0 else result.stderr
        
        except Exception as e:
            return f"执行失败: {e}"
    
    @staticmethod
    def get_system_info():
        """获取系统信息(跨平台)"""
        system = platform.system().lower()
        
        if system == 'windows':
            commands = {
                '系统版本': ['cmd', '/c', 'ver'],
                '磁盘信息': ['cmd', '/c', 'wmic logicaldisk get size,freespace,caption']
            }
        else:
            commands = {
                '系统版本': ['uname', '-a'],
                '磁盘信息': ['df', '-h']
            }
        
        info = {}
        for name, cmd in commands.items():
            try:
                result = subprocess.run(cmd, capture_output=True, text=True)
                info[name] = result.stdout if result.returncode == 0 else result.stderr
            except Exception as e:
                info[name] = f"错误: {e}"
        
        return info

# 测试跨平台功能
print("当前系统:", platform.system())
print("文件列表:")
print(CrossPlatformCommand.list_files())

print("\n系统信息:")
info = CrossPlatformCommand.get_system_info()
for name, data in info.items():
    print(f"{name}: {data[:100]}...")


性能优化技巧

批量处理命令

import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor

def execute_single_command(command):
    """执行单个命令"""
    try:
        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            timeout=10
        )
        return (command, result.returncode, result.stdout)
    except Exception as e:
        return (command, -1, str(e))

def batch_execute_commands(commands, max_workers=5):
    """批量执行命令"""
    results = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_command = {
            executor.submit(execute_single_command, cmd): cmd 
            for cmd in commands
        }
        
        # 收集结果
        for future in future_to_command:
            command = future_to_command[future]
            try:
                result = future.result(timeout=15)
                results.append(result)
            except Exception as e:
                results.append((command, -1, f"执行异常: {e}"))
    
    return results

# 测试批量执行
test_commands = [
    ['echo', '任务1'],
    ['sleep', '2'],
    ['python', '--version'],
    ['date']
]

print("开始批量执行命令...")
results = batch_execute_commands(test_commands)

print("\n执行结果:")
for command, returncode, output in results:
    status = "成功" if returncode == 0 else "失败"
    print(f"命令: {' '.join(command)} - {status}")
    if output.strip():
        print(f"  输出: {output.strip()}")


subprocess 模块的常用方法、类和参数

以下是 Python subprocess 模块的常用方法、类和参数的说明,包含功能描述及示例:

subprocess 模块核心方法

方法说明示例
subprocess.run()执行命令并等待完成(推荐)subprocess.run(["ls", "-l"], capture_output=True, text=True)
subprocess.Popen()创建子进程(底层控制)proc = subprocess.Popen(["ping", "google.com"], stdout=subprocess.PIPE)
subprocess.call()执行命令并返回退出码(旧版)exit_code = subprocess.call(["python", "--version"])
subprocess.check_call()执行命令,失败时抛出异常subprocess.check_call(["git", "commit"])
subprocess.check_output()执行命令并返回输出(旧版)output = subprocess.check_output(["date"], text=True)

subprocess.CompletedProcess 对象属性(run() 方法的返回对象)

属性说明
args执行的命令参数列表
returncode进程退出状态码(0表示成功)
stdout标准输出内容(若设置了capture_output)
stderr标准错误内容(若设置了capture_output)

subprocess.Popen 类常用方法/属性

方法/属性说明示例
poll()检查进程是否终止(返回None表示运行中)if proc.poll() is None: print("Running")
wait()阻塞等待进程结束proc.wait()
communicate()交互式输入/输出stdout, stderr = proc.communicate(input="data")
terminate()发送终止信号(SIGTERM)proc.terminate()
kill()强制终止进程(SIGKILL)proc.kill()
stdin进程的标准输入流proc.stdin.write("input")
stdout进程的标准输出流print(proc.stdout.read())
stderr进程的标准错误流errors = proc.stderr.read()

常用参数说明(适用于 run() 和 Popen())

参数说明示例值
args命令(列表或字符串)["ls", "-l"] 或 "ls -l"
stdin标准输入配置subprocess.PIPE(管道)、None(继承)
stdout标准输出配置subprocess.PIPE、open('log.txt', 'w')
stderr标准错误配置subprocess.STDOUT(合并到stdout)
shell是否通过Shell执行True(支持字符串命令)
cwd工作目录路径"/tmp"
env自定义环境变量{"PATH": "/usr/bin"}
timeout超时时间(秒)30
text输入/输出是否为字符串(非字节)True

总结

subprocess模块是Python中强大的外部命令执行工具:

  • 执行系统命令:运行各种系统命令和外部程序

  • 处理输入输出:可以向命令传递输入,获取输出结果

  • 错误处理:完善的错误处理和超时控制

  • 进程管理:启动、监控和管理子进程

  • 管道功能:连接多个命令,实现复杂操作

使用subprocess时要注意:

  • 避免使用shell=True,防止命令注入攻击

  • 设置合理的超时时间,避免程序卡死

  • 处理各种可能的异常情况

  • 考虑跨平台兼容性

记住在执行外部命令时要特别注意安全性,不要执行不可信的代码。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2139

目录选择