csv模块是Python中处理表格数据的标准工具。CSV文件用纯文本存储表格数据,每行代表一行数据,列之间用逗号分隔。这种格式简单通用,适合数据交换和存储。
import csv
# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file)
# 逐行读取数据
for row_num, row in enumerate(reader, 1):
print(f"第{row_num}行: {row}")假设data.csv文件内容:
姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州输出结果:
第1行: ['姓名', '年龄', '城市']
第2行: ['张三', '25', '北京']
第3行: ['李四', '30', '上海']
第4行: ['王五', '28', '广州']import csv
def read_csv_with_header(filename):
"""读取CSV文件并分离表头和数据"""
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
# 读取表头
header = next(reader)
print("表头:", header)
# 读取数据行
data = []
for row in reader:
data.append(row)
print(f"数据: {row}")
return header, data
# 使用示例
header, data = read_csv_with_header('data.csv')
print(f"总共读取 {len(data)} 行数据")import csv
# 准备数据
employees = [
['姓名', '部门', '工资'],
['张三', '技术部', '8000'],
['李四', '销售部', '6000'],
['王五', '人事部', '5500']
]
# 写入CSV文件
with open('employees.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
# 写入所有行
writer.writerows(employees)
print("数据写入完成")import csv
# 准备数据
students = [
['学号', '姓名', '成绩'],
['001', '小明', '85'],
['002', '小红', '92'],
['003', '小刚', '78']
]
with open('students.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
# 逐行写入
for student in students:
writer.writerow(student)
print(f"写入: {student}")
print("学生数据保存完成")import csv
def read_csv_as_dict(filename):
"""以字典方式读取CSV文件"""
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
print("字段名:", reader.fieldnames)
# 读取数据
records = []
for record in reader:
records.append(record)
print(f"记录: {record}")
return records
# 使用示例
records = read_csv_as_dict('data.csv')
# 访问特定字段
for record in records:
print(f"{record['姓名']} 来自 {record['城市']}")import csv
# 准备字典数据
products = [
{'名称': '笔记本电脑', '价格': '5999', '库存': '50'},
{'名称': '智能手机', '价格': '3999', '库存': '100'},
{'名称': '平板电脑', '价格': '2999', '库存': '30'}
]
# 写入CSV文件
with open('products.csv', 'w', encoding='utf-8', newline='') as file:
# 定义字段名
fieldnames = ['名称', '价格', '库存']
writer = csv.DictWriter(file, fieldnames=fieldnames)
# 写入表头
writer.writeheader()
# 写入数据
for product in products:
writer.writerow(product)
print("商品数据保存完成")import csv
# 处理以分号分隔的CSV文件
with open('semicolon_data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file, delimiter=';')
for row in reader:
print(row)
# 处理制表符分隔的TSV文件
with open('data.tsv', 'r', encoding='utf-8') as file:
reader = csv.reader(file, delimiter='\t')
for row in reader:
print(row)import csv
# 读取包含引号和空格的CSV
with open('quoted_data.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file,
skipinitialspace=True, # 忽略分隔符后的空格
quotechar='"') # 引号字符
for row in reader:
print(row)
# 写入时引用所有字段
data = [['包含,逗号的值', '正常值'], ['另一个,测试', '数据']]
with open('output.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file,
quoting=csv.QUOTE_ALL) # 引用所有字段
writer.writerows(data)import csv
from collections import defaultdict
def analyze_sales_data(filename):
"""分析销售数据"""
sales_by_city = defaultdict(float)
total_sales = 0
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
city = row['城市']
amount = float(row['销售额'])
sales_by_city[city] += amount
total_sales += amount
# 输出分析结果
print(f"总销售额: {total_sales:.2f}")
print("\n各城市销售额:")
for city, sales in sales_by_city.items():
percentage = (sales / total_sales) * 100
print(f" {city}: {sales:.2f} ({percentage:.1f}%)")
# 假设sales.csv文件包含城市和销售额数据
# analyze_sales_data('sales.csv')import csv
def clean_and_convert_csv(input_file, output_file):
"""清洗和转换CSV数据"""
cleaned_data = []
with open(input_file, 'r', encoding='utf-8') as infile:
reader = csv.reader(infile)
header = next(reader) # 读取表头
cleaned_data.append(header)
for row_num, row in enumerate(reader, 2):
# 清洗数据:去除空格,处理空值
cleaned_row = []
for value in row:
cleaned_value = value.strip() if value.strip() else '未知'
cleaned_row.append(cleaned_value)
# 只保留有效行(非空行)
if any(value != '未知' for value in cleaned_row):
cleaned_data.append(cleaned_row)
else:
print(f"跳过第{row_num}行空数据")
# 写入清洗后的数据
with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
writer = csv.writer(outfile)
writer.writerows(cleaned_data)
print(f"数据清洗完成,原始{len(cleaned_data)-1}行数据已保存到{output_file}")
# 使用示例
# clean_and_convert_csv('dirty_data.csv', 'cleaned_data.csv')import csv
import glob
def merge_csv_files(pattern, output_file):
"""合并多个CSV文件"""
all_data = []
headers_set = set()
# 查找所有匹配的CSV文件
csv_files = glob.glob(pattern)
if not csv_files:
print("没有找到匹配的CSV文件")
return
for filename in csv_files:
print(f"处理文件: {filename}")
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
header = next(reader)
headers_set.add(tuple(header)) # 记录表头结构
# 读取数据行
for row in reader:
all_data.append(row)
# 检查表头是否一致
if len(headers_set) > 1:
print("警告: 不同文件的表头不一致")
# 写入合并后的文件
with open(output_file, 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file)
# 使用第一个文件的表头
first_header = next(iter(headers_set))
writer.writerow(first_header)
writer.writerows(all_data)
print(f"合并完成,共处理 {len(csv_files)} 个文件,{len(all_data)} 行数据")
# 使用示例:合并所有以data_开头的CSV文件
# merge_csv_files('data_*.csv', 'merged_data.csv')import csv
# 注册自定义方言
csv.register_dialect('my_dialect',
delimiter='|', # 使用竖线分隔
quotechar='"', # 引号字符
quoting=csv.QUOTE_MINIMAL, # 最小引用
skipinitialspace=True, # 忽略起始空格
lineterminator='\n') # 行终止符
# 使用自定义方言写入
data = [['字段1', '字段2'], ['值1', '值,2']]
with open('custom_format.csv', 'w', encoding='utf-8', newline='') as file:
writer = csv.writer(file, dialect='my_dialect')
writer.writerows(data)
# 使用自定义方言读取
with open('custom_format.csv', 'r', encoding='utf-8') as file:
reader = csv.reader(file, dialect='my_dialect')
for row in reader:
print(row)
# 查看已注册的方言
print("已注册的方言:", csv.list_dialects())import csv
def process_large_csv(filename, chunk_size=1000):
"""分批处理大型CSV文件"""
processed_count = 0
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.DictReader(file)
current_chunk = []
for row in reader:
current_chunk.append(row)
# 达到批处理大小时处理当前批次
if len(current_chunk) >= chunk_size:
process_chunk(current_chunk)
processed_count += len(current_chunk)
current_chunk = []
print(f"已处理 {processed_count} 行")
# 处理剩余数据
if current_chunk:
process_chunk(current_chunk)
processed_count += len(current_chunk)
print(f"处理完成,总共 {processed_count} 行数据")
def process_chunk(chunk):
"""处理数据批次"""
# 这里可以添加实际的数据处理逻辑
# 例如:数据验证、转换、保存到数据库等
pass
# 使用示例
# process_large_csv('large_data.csv')import csv
import sys
def safe_csv_operation(filename, operation='read'):
"""安全的CSV操作,包含错误处理"""
try:
if operation == 'read':
with open(filename, 'r', encoding='utf-8') as file:
reader = csv.reader(file)
data = []
for row_num, row in enumerate(reader, 1):
try:
data.append(row)
except csv.Error as e:
print(f"第{row_num}行解析错误: {e}")
continue
return data
elif operation == 'write':
# 这里可以添加写入操作的错误处理
pass
except FileNotFoundError:
print(f"文件不存在: {filename}")
return None
except PermissionError:
print(f"没有权限访问文件: {filename}")
return None
except Exception as e:
print(f"处理文件时发生错误: {e}")
return None
# 使用安全函数
data = safe_csv_operation('data.csv')
if data is not None:
print(f"成功读取 {len(data)} 行数据")import csv
import os
class CSVInspector:
"""CSV文件检查工具"""
@staticmethod
def inspect_file(filename):
"""检查CSV文件基本信息"""
if not os.path.exists(filename):
print(f"文件不存在: {filename}")
return
file_size = os.path.getsize(filename)
print(f"文件: {filename}")
print(f"大小: {file_size} 字节")
with open(filename, 'r', encoding='utf-8') as file:
# 尝试检测分隔符
first_line = file.readline()
file.seek(0) # 回到文件开头
# 常见分隔符
delimiters = [',', ';', '\t', '|']
delimiter_counts = {delim: first_line.count(delimiters) for delim in delimiters}
detected_delimiter = max(delimiter_counts, key=delimiter_counts.get)
print(f"检测到的分隔符: '{detected_delimiter}'")
# 读取数据统计
reader = csv.reader(file, delimiter=detected_delimiter)
rows = list(reader)
print(f"总行数: {len(rows)}")
if rows:
print(f"列数: {len(rows[0])}")
print("表头:", rows[0])
# 显示前几行数据预览
print("\n数据预览:")
for i, row in enumerate(rows[:5]):
print(f"行{i+1}: {row}")
# 使用检查器
CSVInspector.inspect_file('data.csv')| 方法 | 说明 | 示例 |
|---|---|---|
| csv.reader() | 从文件对象读取 CSV 数据 | reader = csv.reader(file) |
| csv.writer() | 将数据写入 CSV 文件 | writer = csv.writer(file) |
| csv.DictReader() | 将 CSV 行读取为字典(带表头) | dict_reader = csv.DictReader(file) |
| csv.DictWriter() | 将字典写入 CSV 文件(需指定字段名) | dict_writer = csv.DictWriter(file, fieldnames) |
| csv.register_dialect() | 注册自定义 CSV 格式(如分隔符) | csv.register_dialect('mydialect', delimiter='|') |
| csv.unregister_dialect() | 删除已注册的方言 | csv.unregister_dialect('mydialect') |
| csv.list_dialects() | 列出所有已注册的方言 | print(csv.list_dialects()) |
| 方法 | 说明 | 适用对象 |
|---|---|---|
| __next__() | 迭代读取下一行(或使用 for 循环) | reader |
| writerow(row) | 写入单行数据 | writer |
| writerows(rows) | 写入多行数据(列表的列表) | writer |
| 特性/方法 | 说明 | 示例 |
|---|---|---|
| fieldnames | 字段名列表(DictReader 自动从首行获取) | dict_reader.fieldnames |
| writeheader() | 写入表头行(DictWriter 专用) | dict_writer.writeheader() |
| 参数 | 说明 | 示例值 | 适用方法 |
|---|---|---|---|
| delimiter | 字段分隔符 | ','(默认), '\t' | reader/writer |
| quotechar | 引用字符(包围特殊字段) | '"'(默认) | reader/writer |
| quoting | 引用规则 | csv.QUOTE_ALL(全部引用) | reader/writer |
| skipinitialspace | 忽略分隔符后的空格 | True/False | reader |
| lineterminator | 行结束符 | '\r\n'(默认) | writer |
| dialect | 预定义的方言名称 | 'excel'(默认) | 所有方法 |
csv模块提供了完整的CSV文件处理功能:
读取数据:csv.reader和csv.DictReader
写入数据:csv.writer和csv.DictWriter
格式控制:支持自定义分隔符、引号等
字典操作:可以用字段名访问数据,更直观
错误处理:包含完善的异常处理机制
使用技巧:
总是指定编码(如utf-8)避免乱码
在Windows系统使用newline=''参数
处理大型文件时考虑分批读取
使用DictReader/DictWriter提高代码可读性
掌握csv模块对于处理表格数据非常重要,特别是在数据分析和机器学习项目中。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!