MySQL是广泛使用的关系型数据库,Python通过mysql-connector驱动可以方便地操作MySQL数据库。本文将详细介绍如何使用Python连接和操作MySQL。
首先需要安装MySQL官方提供的Python驱动:
pip install mysql-connector-python验证安装是否成功:
import mysql.connector
print("mysql-connector安装成功")如果没有报错,说明安装成功。
import mysql.connector
# 创建数据库连接
mydb = mysql.connector.connect(
host="localhost", # 数据库地址
user="root", # 用户名
password="123456", # 密码
database="test_db" # 数据库名
)
print("数据库连接成功")在实际应用中,应该处理连接可能出现的异常:
import mysql.connector
from mysql.connector import Error
try:
mydb = mysql.connector.connect(
host="localhost",
user="root",
password="123456",
database="test_db"
)
if mydb.is_connected():
print("成功连接到MySQL数据库")
except Error as e:
print(f"连接错误: {e}")
finally:
if mydb.is_connected():
mydb.close()
print("数据库连接已关闭")import mysql.connector
mydb = mysql.connector.connect(
host="localhost",
user="root",
password="123456"
)
mycursor = mydb.cursor()
# 创建数据库
mycursor.execute("CREATE DATABASE IF NOT EXISTS school_db")
print("数据库创建成功")mycursor.execute("SHOW DATABASES")
print("所有数据库:")
for db in mycursor:
print(f"- {db[0]}")import mysql.connector
mydb = mysql.connector.connect(
host="localhost",
user="root",
password="123456",
database="school_db"
)
mycursor = mydb.cursor()
# 创建学生表
mycursor.execute("""
CREATE TABLE IF NOT EXISTS students (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
age INT,
grade VARCHAR(10),
email VARCHAR(100)
)
""")
print("学生表创建成功")mycursor.execute("SHOW TABLES")
print("所有数据表:")
for table in mycursor:
print(f"- {table[0]}")# 插入一条学生记录
sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"
val = ("张三", 18, "高三", "zhangsan@fly63.com")
mycursor.execute(sql, val)
mydb.commit() # 提交事务
print(f"插入成功,ID: {mycursor.lastrowid}")# 批量插入学生记录
sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"
students = [
("李四", 17, "高二", "lisi@fly63.com"),
("王五", 19, "高三", "wangwu@fly63.com"),
("赵六", 16, "高一", "zhaoliu@fly63.com"),
("钱七", 18, "高三", "qianqi@fly63.com")
]
mycursor.executemany(sql, students)
mydb.commit()
print(f"批量插入成功,共{mycursor.rowcount}条记录")# 查询所有学生
mycursor.execute("SELECT * FROM students")
results = mycursor.fetchall()
print("所有学生信息:")
for row in results:
print(f"ID: {row[0]}, 姓名: {row[1]}, 年龄: {row[2]}, 年级: {row[3]}, 邮箱: {row[4]}")# 只查询姓名和年龄
mycursor.execute("SELECT name, age FROM students")
results = mycursor.fetchall()
print("学生姓名和年龄:")
for name, age in results:
print(f"{name} - {age}岁")# 查询高三学生
sql = "SELECT * FROM students WHERE grade = %s"
val = ("高三",)
mycursor.execute(sql, val)
results = mycursor.fetchall()
print("高三学生:")
for row in results:
print(f"姓名: {row[1]}, 年龄: {row[2]}")# 查询姓名包含"张"的学生
sql = "SELECT * FROM students WHERE name LIKE %s"
val = ("%张%",)
mycursor.execute(sql, val)
results = mycursor.fetchall()
print("姓张的学生:")
for row in results:
print(f"姓名: {row[1]}")# 按年龄升序排列
mycursor.execute("SELECT * FROM students ORDER BY age ASC")
results = mycursor.fetchall()
print("按年龄升序:")
for row in results:
print(f"姓名: {row[1]}, 年龄: {row[2]}")# 查询前3条记录
mycursor.execute("SELECT * FROM students LIMIT 3")
results = mycursor.fetchall()
print("前3名学生:")
for row in results:
print(f"姓名: {row[1]}")# 分页查询,每页2条记录
page = 1
page_size = 2
offset = (page - 1) * page_size
sql = "SELECT * FROM students LIMIT %s OFFSET %s"
val = (page_size, offset)
mycursor.execute(sql, val)
results = mycursor.fetchall()
print(f"第{page}页数据:")
for row in results:
print(f"姓名: {row[1]}")# 更新学生年龄
sql = "UPDATE students SET age = %s WHERE name = %s"
val = (19, "张三")
mycursor.execute(sql, val)
mydb.commit()
print(f"更新成功,影响{mycursor.rowcount}条记录")# 批量更新年级
updates = [
(20, "张三"),
(18, "李四")
]
sql = "UPDATE students SET age = %s WHERE name = %s"
mycursor.executemany(sql, updates)
mydb.commit()
print(f"批量更新成功,影响{mycursor.rowcount}条记录")# 删除指定学生
sql = "DELETE FROM students WHERE name = %s"
val = ("赵六",)
mycursor.execute(sql, val)
mydb.commit()
print(f"删除成功,影响{mycursor.rowcount}条记录")# 清空学生表(谨慎使用!)
mycursor.execute("DELETE FROM students")
mydb.commit()
print("所有学生记录已清空")try:
# 开始事务
mydb.start_transaction()
# 插入新学生
sql1 = "INSERT INTO students (name, age, grade) VALUES (%s, %s, %s)"
val1 = ("孙八", 17, "高二")
mycursor.execute(sql1, val1)
# 更新另一个学生
sql2 = "UPDATE students SET age = %s WHERE name = %s"
val2 = (20, "张三")
mycursor.execute(sql2, val2)
# 提交事务
mydb.commit()
print("事务执行成功")
except Exception as e:
# 回滚事务
mydb.rollback()
print(f"事务执行失败,已回滚: {e}")# 创建课程表
mycursor.execute("""
CREATE TABLE IF NOT EXISTS courses (
course_id INT AUTO_INCREMENT PRIMARY KEY,
course_name VARCHAR(100) NOT NULL,
teacher VARCHAR(100)
)
""")
# 创建选课表(带外键)
mycursor.execute("""
CREATE TABLE IF NOT EXISTS student_courses (
id INT AUTO_INCREMENT PRIMARY KEY,
student_id INT,
course_id INT,
score DECIMAL(4,1),
FOREIGN KEY (student_id) REFERENCES students(id),
FOREIGN KEY (course_id) REFERENCES courses(course_id)
)
""")
print("带外键的表创建成功")class StudentManager:
def __init__(self):
self.connection = mysql.connector.connect(
host="localhost",
user="root",
password="123456",
database="school_db"
)
self.cursor = self.connection.cursor()
def add_student(self, name, age, grade, email):
"""添加学生"""
sql = "INSERT INTO students (name, age, grade, email) VALUES (%s, %s, %s, %s)"
val = (name, age, grade, email)
self.cursor.execute(sql, val)
self.connection.commit()
return self.cursor.lastrowid
def get_students_by_grade(self, grade):
"""按年级查询学生"""
sql = "SELECT * FROM students WHERE grade = %s"
self.cursor.execute(sql, (grade,))
return self.cursor.fetchall()
def update_student_email(self, student_id, new_email):
"""更新学生邮箱"""
sql = "UPDATE students SET email = %s WHERE id = %s"
self.cursor.execute(sql, (new_email, student_id))
self.connection.commit()
return self.cursor.rowcount
def delete_student(self, student_id):
"""删除学生"""
sql = "DELETE FROM students WHERE id = %s"
self.cursor.execute(sql, (student_id,))
self.connection.commit()
return self.cursor.rowcount
def close(self):
"""关闭连接"""
self.cursor.close()
self.connection.close()
# 使用示例
manager = StudentManager()
# 添加学生
new_id = manager.add_student("周九", 16, "高一", "zhoujiu@fly63.com")
print(f"添加学生成功,ID: {new_id}")
# 查询高二学生
students = manager.get_students_by_grade("高二")
print("高二学生:")
for student in students:
print(f"姓名: {student[1]}")
manager.close()import mysql.connector
import datetime
def backup_database(host, user, password, database):
"""备份数据库到文件"""
try:
# 连接数据库
conn = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
cursor = conn.cursor()
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
# 生成备份文件名
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{database}_{timestamp}.sql"
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(f"-- 数据库备份: {database}\n")
f.write(f"-- 备份时间: {timestamp}\n\n")
for table in tables:
# 备份表结构
cursor.execute(f"SHOW CREATE TABLE {table}")
create_table_sql = cursor.fetchone()[1]
f.write(f"{create_table_sql};\n\n")
# 备份表数据
cursor.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
if rows:
f.write(f"-- 数据表: {table}\n")
for row in rows:
# 简化处理,实际应该更复杂
values = ", ".join([f"'{str(col)}'" for col in row])
f.write(f"INSERT INTO {table} VALUES ({values});\n")
f.write("\n")
print(f"备份成功: {backup_file}")
except Exception as e:
print(f"备份失败: {e}")
finally:
if conn.is_connected():
cursor.close()
conn.close()
# 使用备份功能
backup_database("localhost", "root", "123456", "school_db")使用连接池:对于Web应用,使用连接池管理数据库连接
批量操作:尽量使用executemany进行批量插入和更新
索引优化:为经常查询的字段创建索引
适时关闭连接:用完数据库连接后及时关闭
使用预处理语句:防止SQL注入,提高性能
import mysql.connector
from mysql.connector import Error
def safe_database_operation():
try:
conn = mysql.connector.connect(
host="localhost",
user="root",
password="123456",
database="school_db"
)
if conn.is_connected():
cursor = conn.cursor()
# 执行数据库操作
cursor.execute("SELECT * FROM students")
results = cursor.fetchall()
return results
except Error as e:
print(f"数据库错误: {e}")
return None
finally:
# 确保连接被关闭
if 'conn' in locals() and conn.is_connected():
cursor.close()
conn.close()掌握Python操作MySQL数据库是Web开发和数据分析的重要技能。通过本文的介绍,你应该能够完成基本的数据库操作,为实际项目开发打下基础。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!