Python3入门指南Python语言的特点和实际应用Python3环境搭建配置VSCode进行Python开发Python基础语法Python变量与数据类型Python数据类型转换Python解释器使用Python注释使用Python运算符Python数字类型Python字符串操作Python列表操作Python元组使用Python字典使用Python集合使用Python条件控制详解Python循环语句详解Python编程入门实践Python推导式详解Python迭代器和生成器Python with语句详解Python函数详解Python lambda(匿名函数)Python装饰器Python数据结构Python模块和包使用Python中__name__和__main__的用法Python输入输出:从基础到文件操作Python文件操作Python OS模块使用Python错误和异常处理Python面向对象编程Python命名空间和作用域Python虚拟环境:venv详细教程Python类型注解Python标准库常用模块Python正则表达式Python CGI编程Python MySQL(mysql-connector驱动)Python MySQL(PyMySQL驱动)Python网络编程Python发送邮件Python多线程编程Python XML解析Python JSON解析Python日期和时间处理Python操作MongoDBPython urllib库使用Python uWSGI 安装与配置Python pip包管理工具Python operator模块Python math模块Python requests模块HTTP请求Python random模块Python OpenAI库Python AI绘画制作Python statistics模块Python hashlib模块:哈希加密Python量化交易Python pyecharts数据可视化Python Selenium网页自动化Python BeautifulSoup网页数据提取Python Scrapy爬虫框架Python Markdown转HTMLPython sys模块Python Pickle模块:数据存储Python subprocess模块Python queue队列模块Python StringIO内存文件操作Python logging日志记录Python datetime日期时间处理Python re正则表达式Python csv表格数据处理Python threading 多线程编程Python asyncio 异步编程Python PyQt 图形界面开发Python 应用方向和常用库框架

Python csv表格数据处理

csv模块是Python中处理表格数据的标准工具。CSV文件用纯文本存储表格数据,每行代表一行数据,列之间用逗号分隔。这种格式简单通用,适合数据交换和存储。


读取CSV文件

基本读取方法

import csv

# 读取CSV文件
with open('data.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file)
    
    # 逐行读取数据
    for row_num, row in enumerate(reader, 1):
        print(f"第{row_num}行: {row}")

假设data.csv文件内容:

姓名,年龄,城市
张三,25,北京
李四,30,上海
王五,28,广州

输出结果:

第1行: ['姓名', '年龄', '城市']
第2行: ['张三', '25', '北京']
第3行: ['李四', '30', '上海']
第4行: ['王五', '28', '广州']

处理表头和数据

import csv

def read_csv_with_header(filename):
    """读取CSV文件并分离表头和数据"""
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        
        # 读取表头
        header = next(reader)
        print("表头:", header)
        
        # 读取数据行
        data = []
        for row in reader:
            data.append(row)
            print(f"数据: {row}")
        
        return header, data

# 使用示例
header, data = read_csv_with_header('data.csv')
print(f"总共读取 {len(data)} 行数据")


写入CSV文件

基本写入方法

import csv

# 准备数据
employees = [
    ['姓名', '部门', '工资'],
    ['张三', '技术部', '8000'],
    ['李四', '销售部', '6000'],
    ['王五', '人事部', '5500']
]

# 写入CSV文件
with open('employees.csv', 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file)
    
    # 写入所有行
    writer.writerows(employees)

print("数据写入完成")

逐行写入数据

import csv

# 准备数据
students = [
    ['学号', '姓名', '成绩'],
    ['001', '小明', '85'],
    ['002', '小红', '92'],
    ['003', '小刚', '78']
]

with open('students.csv', 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file)
    
    # 逐行写入
    for student in students:
        writer.writerow(student)
        print(f"写入: {student}")

print("学生数据保存完成")


使用字典方式读写

DictReader读取

import csv

def read_csv_as_dict(filename):
    """以字典方式读取CSV文件"""
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        print("字段名:", reader.fieldnames)
        
        # 读取数据
        records = []
        for record in reader:
            records.append(record)
            print(f"记录: {record}")
        
        return records

# 使用示例
records = read_csv_as_dict('data.csv')

# 访问特定字段
for record in records:
    print(f"{record['姓名']} 来自 {record['城市']}")

DictWriter写入

import csv

# 准备字典数据
products = [
    {'名称': '笔记本电脑', '价格': '5999', '库存': '50'},
    {'名称': '智能手机', '价格': '3999', '库存': '100'},
    {'名称': '平板电脑', '价格': '2999', '库存': '30'}
]

# 写入CSV文件
with open('products.csv', 'w', encoding='utf-8', newline='') as file:
    # 定义字段名
    fieldnames = ['名称', '价格', '库存']
    
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    
    # 写入表头
    writer.writeheader()
    
    # 写入数据
    for product in products:
        writer.writerow(product)

print("商品数据保存完成")


处理不同格式的CSV文件

自定义分隔符

import csv

# 处理以分号分隔的CSV文件
with open('semicolon_data.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file, delimiter=';')
    
    for row in reader:
        print(row)

# 处理制表符分隔的TSV文件
with open('data.tsv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file, delimiter='\t')
    
    for row in reader:
        print(row)

处理引号和空格

import csv

# 读取包含引号和空格的CSV
with open('quoted_data.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file, 
                       skipinitialspace=True,  # 忽略分隔符后的空格
                       quotechar='"')          # 引号字符
    
    for row in reader:
        print(row)

# 写入时引用所有字段
data = [['包含,逗号的值', '正常值'], ['另一个,测试', '数据']]

with open('output.csv', 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file, 
                       quoting=csv.QUOTE_ALL)  # 引用所有字段
    
    writer.writerows(data)


实际应用示例

数据统计和分析

import csv
from collections import defaultdict

def analyze_sales_data(filename):
    """分析销售数据"""
    sales_by_city = defaultdict(float)
    total_sales = 0
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            city = row['城市']
            amount = float(row['销售额'])
            
            sales_by_city[city] += amount
            total_sales += amount
    
    # 输出分析结果
    print(f"总销售额: {total_sales:.2f}")
    print("\n各城市销售额:")
    for city, sales in sales_by_city.items():
        percentage = (sales / total_sales) * 100
        print(f"  {city}: {sales:.2f} ({percentage:.1f}%)")

# 假设sales.csv文件包含城市和销售额数据
# analyze_sales_data('sales.csv')

数据清洗和转换

import csv

def clean_and_convert_csv(input_file, output_file):
    """清洗和转换CSV数据"""
    
    cleaned_data = []
    
    with open(input_file, 'r', encoding='utf-8') as infile:
        reader = csv.reader(infile)
        header = next(reader)  # 读取表头
        cleaned_data.append(header)
        
        for row_num, row in enumerate(reader, 2):
            # 清洗数据:去除空格,处理空值
            cleaned_row = []
            for value in row:
                cleaned_value = value.strip() if value.strip() else '未知'
                cleaned_row.append(cleaned_value)
            
            # 只保留有效行(非空行)
            if any(value != '未知' for value in cleaned_row):
                cleaned_data.append(cleaned_row)
            else:
                print(f"跳过第{row_num}行空数据")
    
    # 写入清洗后的数据
    with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
        writer = csv.writer(outfile)
        writer.writerows(cleaned_data)
    
    print(f"数据清洗完成,原始{len(cleaned_data)-1}行数据已保存到{output_file}")

# 使用示例
# clean_and_convert_csv('dirty_data.csv', 'cleaned_data.csv')

合并多个CSV文件

import csv
import glob

def merge_csv_files(pattern, output_file):
    """合并多个CSV文件"""
    
    all_data = []
    headers_set = set()
    
    # 查找所有匹配的CSV文件
    csv_files = glob.glob(pattern)
    
    if not csv_files:
        print("没有找到匹配的CSV文件")
        return
    
    for filename in csv_files:
        print(f"处理文件: {filename}")
        
        with open(filename, 'r', encoding='utf-8') as file:
            reader = csv.reader(file)
            header = next(reader)
            headers_set.add(tuple(header))  # 记录表头结构
            
            # 读取数据行
            for row in reader:
                all_data.append(row)
    
    # 检查表头是否一致
    if len(headers_set) > 1:
        print("警告: 不同文件的表头不一致")
    
    # 写入合并后的文件
    with open(output_file, 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        
        # 使用第一个文件的表头
        first_header = next(iter(headers_set))
        writer.writerow(first_header)
        writer.writerows(all_data)
    
    print(f"合并完成,共处理 {len(csv_files)} 个文件,{len(all_data)} 行数据")

# 使用示例:合并所有以data_开头的CSV文件
# merge_csv_files('data_*.csv', 'merged_data.csv')


高级用法

自定义方言

import csv

# 注册自定义方言
csv.register_dialect('my_dialect',
                    delimiter='|',          # 使用竖线分隔
                    quotechar='"',          # 引号字符
                    quoting=csv.QUOTE_MINIMAL,  # 最小引用
                    skipinitialspace=True,  # 忽略起始空格
                    lineterminator='\n')    # 行终止符

# 使用自定义方言写入
data = [['字段1', '字段2'], ['值1', '值,2']]

with open('custom_format.csv', 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file, dialect='my_dialect')
    writer.writerows(data)

# 使用自定义方言读取
with open('custom_format.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file, dialect='my_dialect')
    for row in reader:
        print(row)

# 查看已注册的方言
print("已注册的方言:", csv.list_dialects())

处理大型CSV文件

import csv

def process_large_csv(filename, chunk_size=1000):
    """分批处理大型CSV文件"""
    
    processed_count = 0
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        current_chunk = []
        
        for row in reader:
            current_chunk.append(row)
            
            # 达到批处理大小时处理当前批次
            if len(current_chunk) >= chunk_size:
                process_chunk(current_chunk)
                processed_count += len(current_chunk)
                current_chunk = []
                print(f"已处理 {processed_count} 行")
        
        # 处理剩余数据
        if current_chunk:
            process_chunk(current_chunk)
            processed_count += len(current_chunk)
    
    print(f"处理完成,总共 {processed_count} 行数据")

def process_chunk(chunk):
    """处理数据批次"""
    # 这里可以添加实际的数据处理逻辑
    # 例如:数据验证、转换、保存到数据库等
    pass

# 使用示例
# process_large_csv('large_data.csv')


错误处理和调试

健壮的CSV处理

import csv
import sys

def safe_csv_operation(filename, operation='read'):
    """安全的CSV操作,包含错误处理"""
    
    try:
        if operation == 'read':
            with open(filename, 'r', encoding='utf-8') as file:
                reader = csv.reader(file)
                
                data = []
                for row_num, row in enumerate(reader, 1):
                    try:
                        data.append(row)
                    except csv.Error as e:
                        print(f"第{row_num}行解析错误: {e}")
                        continue
                
                return data
        
        elif operation == 'write':
            # 这里可以添加写入操作的错误处理
            pass
    
    except FileNotFoundError:
        print(f"文件不存在: {filename}")
        return None
    except PermissionError:
        print(f"没有权限访问文件: {filename}")
        return None
    except Exception as e:
        print(f"处理文件时发生错误: {e}")
        return None

# 使用安全函数
data = safe_csv_operation('data.csv')
if data is not None:
    print(f"成功读取 {len(data)} 行数据")


实用工具函数

CSV文件检查器

import csv
import os

class CSVInspector:
    """CSV文件检查工具"""
    
    @staticmethod
    def inspect_file(filename):
        """检查CSV文件基本信息"""
        
        if not os.path.exists(filename):
            print(f"文件不存在: {filename}")
            return
        
        file_size = os.path.getsize(filename)
        print(f"文件: {filename}")
        print(f"大小: {file_size} 字节")
        
        with open(filename, 'r', encoding='utf-8') as file:
            # 尝试检测分隔符
            first_line = file.readline()
            file.seek(0)  # 回到文件开头
            
            # 常见分隔符
            delimiters = [',', ';', '\t', '|']
            delimiter_counts = {delim: first_line.count(delimiters) for delim in delimiters}
            detected_delimiter = max(delimiter_counts, key=delimiter_counts.get)
            
            print(f"检测到的分隔符: '{detected_delimiter}'")
            
            # 读取数据统计
            reader = csv.reader(file, delimiter=detected_delimiter)
            rows = list(reader)
            
            print(f"总行数: {len(rows)}")
            if rows:
                print(f"列数: {len(rows[0])}")
                print("表头:", rows[0])
            
            # 显示前几行数据预览
            print("\n数据预览:")
            for i, row in enumerate(rows[:5]):
                print(f"行{i+1}: {row}")

# 使用检查器
CSVInspector.inspect_file('data.csv')


常用的属性和方法

csv 模块核心方法

方法说明示例
csv.reader()从文件对象读取 CSV 数据reader = csv.reader(file)
csv.writer()将数据写入 CSV 文件writer = csv.writer(file)
csv.DictReader()将 CSV 行读取为字典(带表头)dict_reader = csv.DictReader(file)
csv.DictWriter()将字典写入 CSV 文件(需指定字段名)dict_writer = csv.DictWriter(file, fieldnames)
csv.register_dialect()注册自定义 CSV 格式(如分隔符)csv.register_dialect('mydialect', delimiter='|')
csv.unregister_dialect()删除已注册的方言csv.unregister_dialect('mydialect')
csv.list_dialects()列出所有已注册的方言print(csv.list_dialects())

csv.reader 和 csv.writer 对象常用方法

方法说明适用对象
__next__()迭代读取下一行(或使用 for 循环)reader
writerow(row)写入单行数据writer
writerows(rows)写入多行数据(列表的列表)writer

csv.DictReader 和 csv.DictWriter 对象特性

特性/方法说明示例
fieldnames字段名列表(DictReader 自动从首行获取)dict_reader.fieldnames
writeheader()写入表头行(DictWriter 专用)dict_writer.writeheader()

常用参数说明

参数说明示例值适用方法
delimiter字段分隔符','(默认), '\t'reader/writer
quotechar引用字符(包围特殊字段)'"'(默认)reader/writer
quoting引用规则csv.QUOTE_ALL(全部引用)reader/writer
skipinitialspace忽略分隔符后的空格True/Falsereader
lineterminator行结束符'\r\n'(默认)writer
dialect预定义的方言名称'excel'(默认)所有方法

总结

csv模块提供了完整的CSV文件处理功能:

  • 读取数据:csv.reader和csv.DictReader

  • 写入数据:csv.writer和csv.DictWriter

  • 格式控制:支持自定义分隔符、引号等

  • 字典操作:可以用字段名访问数据,更直观

  • 错误处理:包含完善的异常处理机制

使用技巧:

  • 总是指定编码(如utf-8)避免乱码

  • 在Windows系统使用newline=''参数

  • 处理大型文件时考虑分批读取

  • 使用DictReader/DictWriter提高代码可读性

掌握csv模块对于处理表格数据非常重要,特别是在数据分析和机器学习项目中。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2145

目录选择