subprocess模块是Python中用来运行外部命令和程序的工具。通过这个模块,我们可以在Python代码中执行系统命令、启动其他程序,并获取它们的执行结果。
在Python程序中,有时需要调用系统命令或其他程序:
运行系统工具(如ping、ls、dir等)
调用其他编程语言的程序
执行shell脚本或批处理文件
自动化系统管理任务
相比旧的os.system()方法,subprocess更安全、功能更强大。
import subprocess
# 运行ls命令(Linux/Mac)或dir命令(Windows)
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print("命令输出:")
print(result.stdout)
if result.returncode == 0:
print("命令执行成功")
else:
print("命令执行失败")在Windows系统中,可以这样使用:
import subprocess
# Windows下查看目录
result = subprocess.run(['dir'], capture_output=True, text=True, shell=True)
print(result.stdout)import subprocess
def run_command(command):
"""运行命令并返回结果"""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30 # 30秒超时
)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr,
'return_code': result.returncode
}
except subprocess.TimeoutExpired:
return {'success': False, 'error': '命令执行超时'}
except Exception as e:
return {'success': False, 'error': str(e)}
# 测试命令
result = run_command(['python', '--version'])
print(f"Python版本: {result['output']}")
print(f"执行成功: {result['success']}")import subprocess
# 向grep命令传递输入数据
text_data = """苹果
香蕉
橙子
西瓜
葡萄"""
# 查找包含"果"的行
result = subprocess.run(
['grep', '果'],
input=text_data,
capture_output=True,
text=True
)
print("查找结果:")
print(result.stdout)import subprocess
def run_command_realtime(command):
"""实时显示命令输出"""
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
print("命令开始执行...")
# 实时读取输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
# 获取错误信息
stderr = process.stderr.read()
if stderr:
print("错误信息:")
print(stderr)
return process.poll()
# 测试实时输出
return_code = run_command_realtime(['ping', '-c', '4', 'google.com'])
print(f"命令结束,退出码: {return_code}")import subprocess
def safe_command_execution(command):
"""安全执行命令,包含错误处理"""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
check=True # 如果返回码非零则抛出异常
)
return result.stdout
except subprocess.CalledProcessError as e:
print(f"命令执行失败,退出码: {e.returncode}")
print(f"错误信息: {e.stderr}")
return None
except FileNotFoundError:
print(f"命令不存在: {command[0]}")
return None
except Exception as e:
print(f"执行命令时发生错误: {e}")
return None
# 测试错误处理
output = safe_command_execution(['ls', '不存在的文件'])
if output is None:
print("命令执行失败,已处理错误")import subprocess
def pipe_commands():
"""使用管道连接多个命令"""
# 第一个命令:列出文件
list_files = subprocess.Popen(
['ls', '-l'],
stdout=subprocess.PIPE,
text=True
)
# 第二个命令:过滤包含.py的文件
grep_py = subprocess.Popen(
['grep', '.py'],
stdin=list_files.stdout,
stdout=subprocess.PIPE,
text=True
)
# 第三个命令:统计行数
wc_lines = subprocess.Popen(
['wc', '-l'],
stdin=grep_py.stdout,
stdout=subprocess.PIPE,
text=True
)
# 获取最终结果
output = wc_lines.communicate()[0]
print(f"找到 {output.strip()} 个Python文件")
pipe_commands()import subprocess
def interactive_process():
"""与进程进行交互"""
# 启动Python交互式环境
process = subprocess.Popen(
['python'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 要执行的Python代码
commands = [
"print('Hello from subprocess')",
"x = 10 + 20",
"print(f'计算结果: {x}')",
"exit()"
]
# 逐条发送命令
for cmd in commands:
print(f"发送命令: {cmd}")
process.stdin.write(cmd + "\n")
process.stdin.flush()
# 获取所有输出
output, errors = process.communicate()
print("进程输出:")
print(output)
if errors:
print("错误信息:")
print(errors)
interactive_process()import subprocess
import os
class FileManager:
"""文件管理工具类"""
@staticmethod
def find_files(directory, pattern):
"""查找匹配模式的文件"""
try:
result = subprocess.run(
['find', directory, '-name', pattern],
capture_output=True,
text=True,
timeout=60
)
if result.returncode == 0:
files = result.stdout.strip().split('\n')
return [f for f in files if f] # 过滤空行
else:
print(f"查找失败: {result.stderr}")
return []
except subprocess.TimeoutExpired:
print("查找超时")
return []
@staticmethod
def get_file_info(filename):
"""获取文件详细信息"""
if not os.path.exists(filename):
return None
try:
# 在Linux/Mac上使用stat命令
result = subprocess.run(
['stat', filename],
capture_output=True,
text=True
)
if result.returncode == 0:
return result.stdout
else:
# 在Windows上使用dir命令
result = subprocess.run(
['cmd', '/c', 'dir', filename],
capture_output=True,
text=True
)
return result.stdout if result.returncode == 0 else None
except Exception as e:
print(f"获取文件信息失败: {e}")
return None
# 使用文件管理工具
files = FileManager.find_files('.', '*.py')
print(f"找到 {len(files)} 个Python文件")
if files:
info = FileManager.get_file_info(files[0])
if info:
print("第一个文件信息:")
print(info)import subprocess
import time
class SystemMonitor:
"""系统监控工具"""
@staticmethod
def get_system_info():
"""获取系统信息"""
commands = {
'系统时间': ['date'],
'内存使用': ['free', '-h'],
'磁盘空间': ['df', '-h'],
'运行进程': ['ps', 'aux']
}
system_info = {}
for name, cmd in commands.items():
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
system_info[name] = result.stdout
else:
system_info[name] = f"获取失败: {result.stderr}"
except Exception as e:
system_info[name] = f"执行错误: {e}"
return system_info
@staticmethod
def monitor_process(process_name, duration=60):
"""监控特定进程"""
print(f"开始监控进程: {process_name}, 持续时间: {duration}秒")
start_time = time.time()
while time.time() - start_time < duration:
try:
# 检查进程是否存在
result = subprocess.run(
['pgrep', '-f', process_name],
capture_output=True,
text=True
)
if result.returncode == 0:
print(f"{time.strftime('%H:%M:%S')} - 进程运行中")
else:
print(f"{time.strftime('%H:%M:%S')} - 进程未运行")
time.sleep(5) # 每5秒检查一次
except Exception as e:
print(f"监控出错: {e}")
break
print("监控结束")
# 使用系统监控
info = SystemMonitor.get_system_info()
for name, data in info.items():
print(f"=== {name} ===")
print(data[:200] + "..." if len(data) > 200 else data)
print()
# 监控Python进程(在另一个终端运行一个Python程序测试)
# SystemMonitor.monitor_process('python', 30)import subprocess
import shlex
def safe_shell_command(command_string):
"""安全地执行shell命令"""
# 验证命令是否安全(简单示例)
dangerous_commands = ['rm -rf', 'format', 'dd if=']
for dangerous in dangerous_commands:
if dangerous in command_string:
raise ValueError(f"危险命令被阻止: {dangerous}")
try:
# 使用shlex分割命令参数,避免shell注入
command_args = shlex.split(command_string)
result = subprocess.run(
command_args,
capture_output=True,
text=True,
timeout=30,
shell=False # 重要:不使用shell,避免注入攻击
)
return {
'success': result.returncode == 0,
'output': result.stdout,
'error': result.stderr
}
except Exception as e:
return {'success': False, 'error': str(e)}
# 测试安全命令执行
result = safe_shell_command('echo "Hello World"')
if result['success']:
print(f"命令输出: {result['output']}")
else:
print(f"执行失败: {result['error']}")
# 尝试危险命令(会被阻止)
try:
safe_shell_command('rm -rf /')
except ValueError as e:
print(f"安全机制生效: {e}")import subprocess
import platform
class CrossPlatformCommand:
"""跨平台命令执行"""
@staticmethod
def list_files(directory='.'):
"""列出目录文件(跨平台)"""
system = platform.system().lower()
if system == 'windows':
command = ['cmd', '/c', 'dir', directory]
else: # linux, darwin (mac), etc.
command = ['ls', '-la', directory]
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=30
)
return result.stdout if result.returncode == 0 else result.stderr
except Exception as e:
return f"执行失败: {e}"
@staticmethod
def get_system_info():
"""获取系统信息(跨平台)"""
system = platform.system().lower()
if system == 'windows':
commands = {
'系统版本': ['cmd', '/c', 'ver'],
'磁盘信息': ['cmd', '/c', 'wmic logicaldisk get size,freespace,caption']
}
else:
commands = {
'系统版本': ['uname', '-a'],
'磁盘信息': ['df', '-h']
}
info = {}
for name, cmd in commands.items():
try:
result = subprocess.run(cmd, capture_output=True, text=True)
info[name] = result.stdout if result.returncode == 0 else result.stderr
except Exception as e:
info[name] = f"错误: {e}"
return info
# 测试跨平台功能
print("当前系统:", platform.system())
print("文件列表:")
print(CrossPlatformCommand.list_files())
print("\n系统信息:")
info = CrossPlatformCommand.get_system_info()
for name, data in info.items():
print(f"{name}: {data[:100]}...")import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor
def execute_single_command(command):
"""执行单个命令"""
try:
result = subprocess.run(
command,
capture_output=True,
text=True,
timeout=10
)
return (command, result.returncode, result.stdout)
except Exception as e:
return (command, -1, str(e))
def batch_execute_commands(commands, max_workers=5):
"""批量执行命令"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_command = {
executor.submit(execute_single_command, cmd): cmd
for cmd in commands
}
# 收集结果
for future in future_to_command:
command = future_to_command[future]
try:
result = future.result(timeout=15)
results.append(result)
except Exception as e:
results.append((command, -1, f"执行异常: {e}"))
return results
# 测试批量执行
test_commands = [
['echo', '任务1'],
['sleep', '2'],
['python', '--version'],
['date']
]
print("开始批量执行命令...")
results = batch_execute_commands(test_commands)
print("\n执行结果:")
for command, returncode, output in results:
status = "成功" if returncode == 0 else "失败"
print(f"命令: {' '.join(command)} - {status}")
if output.strip():
print(f" 输出: {output.strip()}")以下是 Python subprocess 模块的常用方法、类和参数的说明,包含功能描述及示例:
| 方法 | 说明 | 示例 |
|---|---|---|
| subprocess.run() | 执行命令并等待完成(推荐) | subprocess.run(["ls", "-l"], capture_output=True, text=True) |
| subprocess.Popen() | 创建子进程(底层控制) | proc = subprocess.Popen(["ping", "google.com"], stdout=subprocess.PIPE) |
| subprocess.call() | 执行命令并返回退出码(旧版) | exit_code = subprocess.call(["python", "--version"]) |
| subprocess.check_call() | 执行命令,失败时抛出异常 | subprocess.check_call(["git", "commit"]) |
| subprocess.check_output() | 执行命令并返回输出(旧版) | output = subprocess.check_output(["date"], text=True) |
| 属性 | 说明 |
|---|---|
| args | 执行的命令参数列表 |
| returncode | 进程退出状态码(0表示成功) |
| stdout | 标准输出内容(若设置了capture_output) |
| stderr | 标准错误内容(若设置了capture_output) |
| 方法/属性 | 说明 | 示例 |
|---|---|---|
| poll() | 检查进程是否终止(返回None表示运行中) | if proc.poll() is None: print("Running") |
| wait() | 阻塞等待进程结束 | proc.wait() |
| communicate() | 交互式输入/输出 | stdout, stderr = proc.communicate(input="data") |
| terminate() | 发送终止信号(SIGTERM) | proc.terminate() |
| kill() | 强制终止进程(SIGKILL) | proc.kill() |
| stdin | 进程的标准输入流 | proc.stdin.write("input") |
| stdout | 进程的标准输出流 | print(proc.stdout.read()) |
| stderr | 进程的标准错误流 | errors = proc.stderr.read() |
| 参数 | 说明 | 示例值 |
|---|---|---|
| args | 命令(列表或字符串) | ["ls", "-l"] 或 "ls -l" |
| stdin | 标准输入配置 | subprocess.PIPE(管道)、None(继承) |
| stdout | 标准输出配置 | subprocess.PIPE、open('log.txt', 'w') |
| stderr | 标准错误配置 | subprocess.STDOUT(合并到stdout) |
| shell | 是否通过Shell执行 | True(支持字符串命令) |
| cwd | 工作目录路径 | "/tmp" |
| env | 自定义环境变量 | {"PATH": "/usr/bin"} |
| timeout | 超时时间(秒) | 30 |
| text | 输入/输出是否为字符串(非字节) | True |
subprocess模块是Python中强大的外部命令执行工具:
执行系统命令:运行各种系统命令和外部程序
处理输入输出:可以向命令传递输入,获取输出结果
错误处理:完善的错误处理和超时控制
进程管理:启动、监控和管理子进程
管道功能:连接多个命令,实现复杂操作
使用subprocess时要注意:
避免使用shell=True,防止命令注入攻击
设置合理的超时时间,避免程序卡死
处理各种可能的异常情况
考虑跨平台兼容性
记住在执行外部命令时要特别注意安全性,不要执行不可信的代码。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!