Python MySQL(mysql-connector驱动)

MySQL是广泛使用的关系型数据库,Python通过mysql-connector驱动可以方便地操作MySQL数据库。本文将详细介绍如何使用Python连接和操作MySQL。


安装mysql-connector

首先需要安装MySQL官方提供的Python驱动:

pip install mysql-connector-python

验证安装是否成功:

import mysql.connector
print("mysql-connector安装成功")

如果没有报错,说明安装成功。


连接MySQL数据库

基本连接

import mysql.connector

# 创建数据库连接
mydb = mysql.connector.connect(
    host="localhost",      # 数据库地址
    user="root",           # 用户名
    password="123456",     # 密码
    database="test_db"     # 数据库名
)

print("数据库连接成功")

处理连接异常

在实际应用中,应该处理连接可能出现的异常:

import mysql.connector
from mysql.connector import Error

try:
    mydb = mysql.connector.connect(
        host="localhost",
        user="root",
        password="123456",
        database="test_db"
    )
    
    if mydb.is_connected():
        print("成功连接到MySQL数据库")
        
except Error as e:
    print(f"连接错误: {e}")
finally:
    if mydb.is_connected():
        mydb.close()
        print("数据库连接已关闭")


数据库操作

创建数据库

import mysql.connector

mydb = mysql.connector.connect(
    host="localhost",
    user="root",
    password="123456"
)

mycursor = mydb.cursor()

# 创建数据库
mycursor.execute("CREATE DATABASE IF NOT EXISTS school_db")
print("数据库创建成功")

查看所有数据库

mycursor.execute("SHOW DATABASES")

print("所有数据库:")
for db in mycursor:
    print(f"- {db[0]}")


数据表操作

创建数据表

import mysql.connector

mydb = mysql.connector.connect(
    host="localhost",
    user="root",
    password="123456",
    database="school_db"
)

mycursor = mydb.cursor()

# 创建学生表
mycursor.execute("""
CREATE TABLE IF NOT EXISTS students (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    age INT,
    grade VARCHAR(10),
    email VARCHAR(100)
)
""")

print("学生表创建成功")

查看数据表

mycursor.execute("SHOW TABLES")

print("所有数据表:")
for table in mycursor:
    print(f"- {table[0]}")


数据操作

插入单条数据

# 插入一条学生记录
sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"
val = ("张三", 18, "高三", "zhangsan@fly63.com")

mycursor.execute(sql, val)
mydb.commit()  # 提交事务

print(f"插入成功,ID: {mycursor.lastrowid}")

批量插入数据

# 批量插入学生记录
sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"

students = [
    ("李四", 17, "高二", "lisi@fly63.com"),
    ("王五", 19, "高三", "wangwu@fly63.com"),
    ("赵六", 16, "高一", "zhaoliu@fly63.com"),
    ("钱七", 18, "高三", "qianqi@fly63.com")
]

mycursor.executemany(sql, students)
mydb.commit()

print(f"批量插入成功,共{mycursor.rowcount}条记录")


查询数据

查询所有数据

# 查询所有学生
mycursor.execute("SELECT * FROM students")

results = mycursor.fetchall()

print("所有学生信息:")
for row in results:
    print(f"ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 年级: {row[3]}, 邮箱: {row[4]}")

查询指定字段

# 只查询姓名和年龄
mycursor.execute("SELECT name, age FROM students")

results = mycursor.fetchall()

print("学生姓名和年龄:")
for name, age in results:
    print(f"{name} - {age}岁")

条件查询

# 查询高三学生
sql = "SELECT * FROM students WHERE grade = %s"
val = ("高三",)

mycursor.execute(sql, val)
results = mycursor.fetchall()

print("高三学生:")
for row in results:
    print(f"姓名: {row[1]}, 年龄: {row[2]}")

使用LIKE模糊查询

# 查询姓名包含"张"的学生
sql = "SELECT * FROM students WHERE name LIKE %s"
val = ("%张%",)

mycursor.execute(sql, val)
results = mycursor.fetchall()

print("姓张的学生:")
for row in results:
    print(f"姓名: {row[1]}")


数据排序和限制

排序查询

# 按年龄升序排列
mycursor.execute("SELECT * FROM students ORDER BY age ASC")
results = mycursor.fetchall()

print("按年龄升序:")
for row in results:
    print(f"姓名: {row[1]}, 年龄: {row[2]}")

限制查询结果数量

# 查询前3条记录
mycursor.execute("SELECT * FROM students LIMIT 3")
results = mycursor.fetchall()

print("前3名学生:")
for row in results:
    print(f"姓名: {row[1]}")

分页查询

# 分页查询,每页2条记录
page = 1
page_size = 2
offset = (page - 1) * page_size

sql = "SELECT * FROM students LIMIT %s OFFSET %s"
val = (page_size, offset)

mycursor.execute(sql, val)
results = mycursor.fetchall()

print(f"第{page}页数据:")
for row in results:
    print(f"姓名: {row[1]}")


更新数据

更新单条记录

# 更新学生年龄
sql = "UPDATE students SET age = %s WHERE name = %s"
val = (19, "张三")

mycursor.execute(sql, val)
mydb.commit()

print(f"更新成功,影响{mycursor.rowcount}条记录")

批量更新

# 批量更新年级
updates = [
    (20, "张三"),
    (18, "李四")
]

sql = "UPDATE students SET age = %s WHERE name = %s"
mycursor.executemany(sql, updates)
mydb.commit()

print(f"批量更新成功,影响{mycursor.rowcount}条记录")


删除数据

删除单条记录

# 删除指定学生
sql = "DELETE FROM students WHERE name = %s"
val = ("赵六",)

mycursor.execute(sql, val)
mydb.commit()

print(f"删除成功,影响{mycursor.rowcount}条记录")

清空表数据

# 清空学生表(谨慎使用!)
mycursor.execute("DELETE FROM students")
mydb.commit()

print("所有学生记录已清空")


高级操作

使用事务

try:
    # 开始事务
    mydb.start_transaction()
    
    # 插入新学生
    sql1 = "INSERT INTO students (name, age, grade) VALUES (%s, %s, %s)"
    val1 = ("孙八", 17, "高二")
    mycursor.execute(sql1, val1)
    
    # 更新另一个学生
    sql2 = "UPDATE students SET age = %s WHERE name = %s"
    val2 = (20, "张三")
    mycursor.execute(sql2, val2)
    
    # 提交事务
    mydb.commit()
    print("事务执行成功")
    
except Exception as e:
    # 回滚事务
    mydb.rollback()
    print(f"事务执行失败,已回滚: {e}")

创建带外键的表

# 创建课程表
mycursor.execute("""
CREATE TABLE IF NOT EXISTS courses (
    course_id INT AUTO_INCREMENT PRIMARY KEY,
    course_name VARCHAR(100) NOT NULL,
    teacher VARCHAR(100)
)
""")

# 创建选课表(带外键)
mycursor.execute("""
CREATE TABLE IF NOT EXISTS student_courses (
    id INT AUTO_INCREMENT PRIMARY KEY,
    student_id INT,
    course_id INT,
    score DECIMAL(4,1),
    FOREIGN KEY (student_id) REFERENCES students(id),
    FOREIGN KEY (course_id) REFERENCES courses(course_id)
)
""")

print("带外键的表创建成功")


实际应用示例

学生管理系统

class StudentManager:
    def __init__(self):
        self.connection = mysql.connector.connect(
            host="localhost",
            user="root",
            password="123456",
            database="school_db"
        )
        self.cursor = self.connection.cursor()
    
    def add_student(self, name, age, grade, email):
        """添加学生"""
        sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"
        val = (name, age, grade, email)
        
        self.cursor.execute(sql, val)
        self.connection.commit()
        return self.cursor.lastrowid
    
    def get_students_by_grade(self, grade):
        """按年级查询学生"""
        sql = "SELECT * FROM students WHERE grade = %s"
        self.cursor.execute(sql, (grade,))
        return self.cursor.fetchall()
    
    def update_student_email(self, student_id, new_email):
        """更新学生邮箱"""
        sql = "UPDATE students SET email = %s WHERE id = %s"
        self.cursor.execute(sql, (new_email, student_id))
        self.connection.commit()
        return self.cursor.rowcount
    
    def delete_student(self, student_id):
        """删除学生"""
        sql = "DELETE FROM students WHERE id = %s"
        self.cursor.execute(sql, (student_id,))
        self.connection.commit()
        return self.cursor.rowcount
    
    def close(self):
        """关闭连接"""
        self.cursor.close()
        self.connection.close()

# 使用示例
manager = StudentManager()

# 添加学生
new_id = manager.add_student("周九", 16, "高一", "zhoujiu@fly63.com")
print(f"添加学生成功,ID: {new_id}")

# 查询高二学生
students = manager.get_students_by_grade("高二")
print("高二学生:")
for student in students:
    print(f"姓名: {student[1]}")

manager.close()

数据库备份工具

import mysql.connector
import datetime

def backup_database(host, user, password, database):
    """备份数据库到文件"""
    try:
        # 连接数据库
        conn = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            database=database
        )
        cursor = conn.cursor()
        
        # 获取所有表
        cursor.execute("SHOW TABLES")
        tables = [table[0] for table in cursor.fetchall()]
        
        # 生成备份文件名
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_file = f"backup_{database}_{timestamp}.sql"
        
        with open(backup_file, 'w', encoding='utf-8') as f:
            f.write(f"-- 数据库备份: {database}\n")
            f.write(f"-- 备份时间: {timestamp}\n\n")
            
            for table in tables:
                # 备份表结构
                cursor.execute(f"SHOW CREATE TABLE {table}")
                create_table_sql = cursor.fetchone()[1]
                f.write(f"{create_table_sql};\n\n")
                
                # 备份表数据
                cursor.execute(f"SELECT * FROM {table}")
                rows = cursor.fetchall()
                
                if rows:
                    f.write(f"-- 数据表: {table}\n")
                    for row in rows:
                        # 简化处理,实际应该更复杂
                        values = ", ".join([f"'{str(col)}'" for col in row])
                        f.write(f"INSERT INTO {table} VALUES ({values});\n")
                    f.write("\n")
        
        print(f"备份成功: {backup_file}")
        
    except Exception as e:
        print(f"备份失败: {e}")
    finally:
        if conn.is_connected():
            cursor.close()
            conn.close()

# 使用备份功能
backup_database("localhost", "root", "123456", "school_db")


性能优化建议

  1. 使用连接池:对于Web应用,使用连接池管理数据库连接

  2. 批量操作:尽量使用executemany进行批量插入和更新

  3. 索引优化:为经常查询的字段创建索引

  4. 适时关闭连接:用完数据库连接后及时关闭

  5. 使用预处理语句:防止SQL注入,提高性能


错误处理最佳实践

import mysql.connector
from mysql.connector import Error

def safe_database_operation():
    try:
        conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="123456",
            database="school_db"
        )
        
        if conn.is_connected():
            cursor = conn.cursor()
            
            # 执行数据库操作
            cursor.execute("SELECT * FROM students")
            results = cursor.fetchall()
            
            return results
            
    except Error as e:
        print(f"数据库错误: {e}")
        return None
        
    finally:
        # 确保连接被关闭
        if 'conn' in locals() and conn.is_connected():
            cursor.close()
            conn.close()

掌握Python操作MySQL数据库是Web开发和数据分析的重要技能。通过本文的介绍,你应该能够完成基本的数据库操作,为实际项目开发打下基础。

本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!

链接: https://fly63.com/course/36_2111

目录选择