Python3入门指南Python语言的特点和实际应用Python3环境搭建配置VSCode进行Python开发Python基础语法Python变量与数据类型Python数据类型转换Python解释器使用Python注释使用Python运算符Python数字类型Python字符串操作Python列表操作Python元组使用Python字典使用Python集合使用Python条件控制详解Python循环语句详解Python编程入门实践Python推导式详解Python迭代器和生成器Python with语句详解Python函数详解Python lambda(匿名函数)Python装饰器Python数据结构Python模块和包使用Python中__name__和__main__的用法Python输入输出:从基础到文件操作Python文件操作Python OS模块使用Python错误和异常处理Python面向对象编程Python命名空间和作用域Python虚拟环境:venv详细教程Python类型注解Python标准库常用模块Python正则表达式Python CGI编程Python MySQL(mysql-connector驱动)Python MySQL(PyMySQL驱动)Python网络编程Python发送邮件Python多线程编程Python XML解析Python JSON解析Python日期和时间处理Python操作MongoDBPython urllib库使用Python uWSGI 安装与配置Python pip包管理工具Python operator模块Python math模块Python requests模块HTTP请求Python random模块Python OpenAI库Python AI绘画制作Python statistics模块Python hashlib模块:哈希加密Python量化交易Python pyecharts数据可视化Python Selenium网页自动化Python BeautifulSoup网页数据提取Python Scrapy爬虫框架Python Markdown转HTMLPython sys模块Python Pickle模块:数据存储Python subprocess模块Python queue队列模块Python StringIO内存文件操作Python logging日志记录Python datetime日期时间处理Python re正则表达式Python csv表格数据处理Python threading 多线程编程Python asyncio 异步编程Python PyQt 图形界面开发Python 应用方向和常用库框架

Python MySQL(PyMySQL驱动)

PyMySQL是Python中连接MySQL数据库的重要工具。通过PyMySQL,我们可以在Python程序中执行SQL语句,实现对数据库的增删改查操作。


PyMySQL简介

PyMySQL是一个纯Python编写的MySQL客户端库,专门用于Python 3.x版本。如果你使用Python 2.x,则需要使用MySQLdb库。

PyMySQL完全遵循Python数据库api规范,使用起来简单直观。


安装PyMySQL

使用pip命令安装PyMySQL:

pip install PyMySQL

验证安装是否成功:

import pymysql
print("PyMySQL安装成功")

如果没有报错,说明安装成功。


连接数据库

基本连接

连接数据库需要提供数据库地址、用户名、密码和数据库名:

import pymysql

# 创建数据库连接
db = pymysql.connect(
    host='localhost',      # 数据库服务器地址
    user='root',          # 用户名
    password='123456',    # 密码
    database='test_db',   # 数据库名
    charset='utf8mb4'     # 字符编码
)

print("数据库连接成功")

# 创建游标对象
cursor = db.cursor()

# 关闭连接
db.close()

完整的连接示例

import pymysql

def connect_database():
    try:
        # 连接数据库
        db = pymysql.connect(
            host='localhost',
            user='root',
            password='123456',
            database='test_db',
            charset='utf8mb4'
        )
        
        # 创建游标
        cursor = db.cursor()
        
        # 测试连接
        cursor.execute("SELECT VERSION()")
        version = cursor.fetchone()
        print(f"MySQL版本: {version[0]}")
        
        return db, cursor
        
    except pymysql.Error as e:
        print(f"数据库连接失败: {e}")
        return None, None

# 使用示例
db, cursor = connect_database()
if db:
    db.close()


创建数据库和表

创建数据库

import pymysql

# 连接MySQL服务器(不指定数据库)
db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456'
)

cursor = db.cursor()

# 创建数据库
cursor.execute("CREATE DATABASE IF NOT EXISTS school_db")
print("数据库创建成功")

db.close()

创建数据表

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 创建学生表
create_table_sql = """
CREATE TABLE IF NOT EXISTS students (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(50) NOT NULL,
    age INT,
    grade VARCHAR(20),
    email VARCHAR(100),
    created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""

cursor.execute(create_table_sql)
print("学生表创建成功")

db.close()


插入数据

插入单条数据

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 插入单条数据
sql = """
INSERT INTO students (name, age, grade, email) 
VALUES (%s, %s, %s, %s)
"""
values = ('张三', 18, '高三', 'zhangsan@fly63.com')

try:
    cursor.execute(sql, values)
    db.commit()  # 提交事务
    print(f"插入成功,学生ID: {cursor.lastrowid}")
except Exception as e:
    db.rollback()  # 回滚事务
    print(f"插入失败: {e}")

db.close()

批量插入数据

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 批量插入数据
sql = """
INSERT INTO students (name, age, grade, email) 
VALUES (%s, %s, %s, %s)
"""

students = [
    ('李四', 17, '高二', 'lisi@fly63.com'),
    ('王五', 16, '高一', 'wangwu@fly63.com'),
    ('赵六', 19, '高三', 'zhaoliu@fly63.com'),
    ('钱七', 18, '高三', 'qianqi@fly63.com')
]

try:
    cursor.executemany(sql, students)
    db.commit()
    print(f"批量插入成功,共{cursor.rowcount}条记录")
except Exception as e:
    db.rollback()
    print(f"批量插入失败: {e}")

db.close()


查询数据

查询所有数据

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 查询所有学生
cursor.execute("SELECT * FROM students")
results = cursor.fetchall()

print("所有学生信息:")
print("ID\t姓名\t年龄\t年级\t邮箱")
print("-" * 50)

for row in results:
    print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}")

db.close()

条件查询

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 查询高三学生
sql = "SELECT * FROM students WHERE grade = %s"
cursor.execute(sql, ('高三',))
results = cursor.fetchall()

print("高三学生:")
for student in results:
    print(f"姓名: {student[1]}, 年龄: {student[2]}, 邮箱: {student[4]}")

# 查询年龄大于17的学生
sql = "SELECT * FROM students WHERE age > %s"
cursor.execute(sql, (17,))
results = cursor.fetchall()

print("\n年龄大于17的学生:")
for student in results:
    print(f"姓名: {student[1]}, 年龄: {student[2]}")

db.close()

使用fetchone()和fetchmany()

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

cursor.execute("SELECT * FROM students")

# 获取第一条记录
first_student = cursor.fetchone()
print("第一条记录:", first_student)

# 再获取两条记录
next_two_students = cursor.fetchmany(2)
print("接下来两条记录:", next_two_students)

# 获取剩余所有记录
remaining_students = cursor.fetchall()
print(f"剩余{len(remaining_students)}条记录")

db.close()


更新数据

更新单条记录

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 更新学生邮箱
sql = "UPDATE students SET email = %s WHERE name = %s"
values = ('new_email@fly63.com', '张三')

try:
    cursor.execute(sql, values)
    db.commit()
    print(f"更新成功,影响{cursor.rowcount}条记录")
except Exception as e:
    db.rollback()
    print(f"更新失败: {e}")

db.close()

批量更新

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 批量更新年龄
update_data = [
    (19, '张三'),
    (18, '李四'),
    (17, '王五')
]

sql = "UPDATE students SET age = %s WHERE name = %s"

try:
    cursor.executemany(sql, update_data)
    db.commit()
    print(f"批量更新成功,影响{cursor.rowcount}条记录")
except Exception as e:
    db.rollback()
    print(f"批量更新失败: {e}")

db.close()


删除数据

删除指定记录

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 删除指定学生
sql = "DELETE FROM students WHERE name = %s"
values = ('赵六',)

try:
    cursor.execute(sql, values)
    db.commit()
    print(f"删除成功,影响{cursor.rowcount}条记录")
except Exception as e:
    db.rollback()
    print(f"删除失败: {e}")

db.close()

清空表数据

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

# 清空学生表(谨慎操作!)
try:
    cursor.execute("DELETE FROM students")
    db.commit()
    print("所有学生记录已清空")
except Exception as e:
    db.rollback()
    print(f"清空失败: {e}")

db.close()


事务处理

事务确保数据库操作的原子性,要么全部成功,要么全部失败。

import pymysql

db = pymysql.connect(
    host='localhost',
    user='root',
    password='123456',
    database='school_db'
)

cursor = db.cursor()

try:
    # 开始事务
    db.begin()
    
    # 插入新学生
    sql1 = "INSERT INTO students (name, age, grade) VALUES (%s, %s, %s)"
    cursor.execute(sql1, ('孙八', 17, '高二'))
    
    # 更新现有学生
    sql2 = "UPDATE students SET age = %s WHERE name = %s"
    cursor.execute(sql2, (20, '张三'))
    
    # 提交事务
    db.commit()
    print("事务执行成功")
    
except Exception as e:
    # 回滚事务
    db.rollback()
    print(f"事务执行失败,已回滚: {e}")

db.close()


使用上下文管理器

使用with语句可以自动管理连接,确保资源被正确释放。

import pymysql

def get_students_by_grade(grade):
    """根据年级查询学生"""
    with pymysql.connect(
        host='localhost',
        user='root',
        password='123456',
        database='school_db'
    ) as db:
        with db.cursor() as cursor:
            sql = "SELECT * FROM students WHERE grade = %s"
            cursor.execute(sql, (grade,))
            return cursor.fetchall()

# 使用示例
students = get_students_by_grade('高三')
for student in students:
    print(f"姓名: {student[1]}, 年龄: {student[2]}")


实际应用示例

学生管理系统

import pymysql

class StudentManager:
    def __init__(self, host, user, password, database):
        self.db_config = {
            'host': host,
            'user': user,
            'password': password,
            'database': database,
            'charset': 'utf8mb4'
        }
    
    def add_student(self, name, age, grade, email):
        """添加学生"""
        with pymysql.connect(**self.db_config) as db:
            with db.cursor() as cursor:
                sql = """
                INSERT INTO students (name, age, grade, email) 
                VALUES (%s, %s, %s, %s)
                """
                cursor.execute(sql, (name, age, grade, email))
                db.commit()
                return cursor.lastrowid
    
    def get_all_students(self):
        """获取所有学生"""
        with pymysql.connect(**self.db_config) as db:
            with db.cursor() as cursor:
                cursor.execute("SELECT * FROM students")
                return cursor.fetchall()
    
    def search_students(self, name=None, grade=None, min_age=None):
        """搜索学生"""
        with pymysql.connect(**self.db_config) as db:
            with db.cursor() as cursor:
                sql = "SELECT * FROM students WHERE 1=1"
                params = []
                
                if name:
                    sql += " AND name LIKE %s"
                    params.append(f"%{name}%")
                
                if grade:
                    sql += " AND grade = %s"
                    params.append(grade)
                
                if min_age:
                    sql += " AND age >= %s"
                    params.append(min_age)
                
                cursor.execute(sql, params)
                return cursor.fetchall()
    
    def update_student(self, student_id, **kwargs):
        """更新学生信息"""
        if not kwargs:
            return False
            
        with pymysql.connect(**self.db_config) as db:
            with db.cursor() as cursor:
                set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()])
                sql = f"UPDATE students SET {set_clause} WHERE id = %s"
                
                params = list(kwargs.values())
                params.append(student_id)
                
                cursor.execute(sql, params)
                db.commit()
                return cursor.rowcount > 0

# 使用示例
manager = StudentManager('localhost', 'root', '123456', 'school_db')

# 添加学生
new_id = manager.add_student("周九", 16, "高一", "zhoujiu@fly63.com")
print(f"添加学生成功,ID: {new_id}")

# 搜索学生
students = manager.search_students(grade="高三", min_age=18)
print("高三且年龄≥18的学生:")
for student in students:
    print(f"姓名: {student[1]}, 年龄: {student[2]}")

数据库备份工具

import pymysql
import datetime

def backup_database(host, user, password, database, backup_file=None):
    """备份数据库到SQL文件"""
    if not backup_file:
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"backup_{database}_{timestamp}.sql"
    
    try:
        with pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database
        ) as db:
            with db.cursor() as cursor:
                # 获取所有表
                cursor.execute("SHOW TABLES")
                tables = [table[0] for table in cursor.fetchall()]
                
                with open(backup_file, 'w', encoding='utf-8') as f:
                    f.write(f"-- 数据库备份: {database}\n")
                    f.write(f"-- 备份时间: {datetime.datetime.now()}\n\n")
                    
                    for table in tables:
                        # 备份表结构
                        cursor.execute(f"SHOW CREATE TABLE {table}")
                        create_sql = cursor.fetchone()[1]
                        f.write(f"{create_sql};\n\n")
                        
                        # 备份表数据
                        cursor.execute(f"SELECT * FROM {table}")
                        rows = cursor.fetchall()
                        
                        if rows:
                            f.write(f"-- 数据表: {table}\n")
                            for row in rows:
                                values = ", ".join([f"'{str(value)}'" for value in row])
                                f.write(f"INSERT INTO {table} VALUES ({values});\n")
                            f.write("\n")
        
        print(f"备份成功: {backup_file}")
        return True
        
    except Exception as e:
        print(f"备份失败: {e}")
        return False

# 使用备份功能
backup_database('localhost', 'root', '123456', 'school_db')


错误处理最佳实践

import pymysql
from pymysql import Error

def safe_database_operation():
    try:
        with pymysql.connect(
            host='localhost',
            user='root',
            password='123456',
            database='school_db'
        ) as db:
            with db.cursor() as cursor:
                # 执行数据库操作
                cursor.execute("SELECT * FROM students")
                results = cursor.fetchall()
                return results
                
    except Error as e:
        print(f"数据库错误: {e}")
        return None
    except Exception as e:
        print(f"其他错误: {e}")
        return None

# 使用示例
students = safe_database_operation()
if students:
    print(f"查询到{len(students)}名学生")

掌握PyMySQL的使用,能够让你在Python程序中轻松操作MySQL数据库。无论是Web开发还是数据分析,这都是非常重要的技能。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2112

目录选择