PyMySQL是Python中连接MySQL数据库的重要工具。通过PyMySQL,我们可以在Python程序中执行SQL语句,实现对数据库的增删改查操作。
PyMySQL是一个纯Python编写的MySQL客户端库,专门用于Python 3.x版本。如果你使用Python 2.x,则需要使用MySQLdb库。
PyMySQL完全遵循Python数据库api规范,使用起来简单直观。
使用pip命令安装PyMySQL:
pip install PyMySQL验证安装是否成功:
import pymysql
print("PyMySQL安装成功")如果没有报错,说明安装成功。
连接数据库需要提供数据库地址、用户名、密码和数据库名:
import pymysql
# 创建数据库连接
db = pymysql.connect(
host='localhost', # 数据库服务器地址
user='root', # 用户名
password='123456', # 密码
database='test_db', # 数据库名
charset='utf8mb4' # 字符编码
)
print("数据库连接成功")
# 创建游标对象
cursor = db.cursor()
# 关闭连接
db.close()import pymysql
def connect_database():
try:
# 连接数据库
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='test_db',
charset='utf8mb4'
)
# 创建游标
cursor = db.cursor()
# 测试连接
cursor.execute("SELECT VERSION()")
version = cursor.fetchone()
print(f"MySQL版本: {version[0]}")
return db, cursor
except pymysql.Error as e:
print(f"数据库连接失败: {e}")
return None, None
# 使用示例
db, cursor = connect_database()
if db:
db.close()import pymysql
# 连接MySQL服务器(不指定数据库)
db = pymysql.connect(
host='localhost',
user='root',
password='123456'
)
cursor = db.cursor()
# 创建数据库
cursor.execute("CREATE DATABASE IF NOT EXISTS school_db")
print("数据库创建成功")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 创建学生表
create_table_sql = """
CREATE TABLE IF NOT EXISTS students (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(50) NOT NULL,
age INT,
grade VARCHAR(20),
email VARCHAR(100),
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
print("学生表创建成功")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 插入单条数据
sql = """
INSERT INTO students (name, age, grade, email)
VALUES (%s, %s, %s, %s)
"""
values = ('张三', 18, '高三', 'zhangsan@fly63.com')
try:
cursor.execute(sql, values)
db.commit() # 提交事务
print(f"插入成功,学生ID: {cursor.lastrowid}")
except Exception as e:
db.rollback() # 回滚事务
print(f"插入失败: {e}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 批量插入数据
sql = """
INSERT INTO students (name, age, grade, email)
VALUES (%s, %s, %s, %s)
"""
students = [
('李四', 17, '高二', 'lisi@fly63.com'),
('王五', 16, '高一', 'wangwu@fly63.com'),
('赵六', 19, '高三', 'zhaoliu@fly63.com'),
('钱七', 18, '高三', 'qianqi@fly63.com')
]
try:
cursor.executemany(sql, students)
db.commit()
print(f"批量插入成功,共{cursor.rowcount}条记录")
except Exception as e:
db.rollback()
print(f"批量插入失败: {e}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 查询所有学生
cursor.execute("SELECT * FROM students")
results = cursor.fetchall()
print("所有学生信息:")
print("ID\t姓名\t年龄\t年级\t邮箱")
print("-" * 50)
for row in results:
print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 查询高三学生
sql = "SELECT * FROM students WHERE grade = %s"
cursor.execute(sql, ('高三',))
results = cursor.fetchall()
print("高三学生:")
for student in results:
print(f"姓名: {student[1]}, 年龄: {student[2]}, 邮箱: {student[4]}")
# 查询年龄大于17的学生
sql = "SELECT * FROM students WHERE age > %s"
cursor.execute(sql, (17,))
results = cursor.fetchall()
print("\n年龄大于17的学生:")
for student in results:
print(f"姓名: {student[1]}, 年龄: {student[2]}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
cursor.execute("SELECT * FROM students")
# 获取第一条记录
first_student = cursor.fetchone()
print("第一条记录:", first_student)
# 再获取两条记录
next_two_students = cursor.fetchmany(2)
print("接下来两条记录:", next_two_students)
# 获取剩余所有记录
remaining_students = cursor.fetchall()
print(f"剩余{len(remaining_students)}条记录")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 更新学生邮箱
sql = "UPDATE students SET email = %s WHERE name = %s"
values = ('new_email@fly63.com', '张三')
try:
cursor.execute(sql, values)
db.commit()
print(f"更新成功,影响{cursor.rowcount}条记录")
except Exception as e:
db.rollback()
print(f"更新失败: {e}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 批量更新年龄
update_data = [
(19, '张三'),
(18, '李四'),
(17, '王五')
]
sql = "UPDATE students SET age = %s WHERE name = %s"
try:
cursor.executemany(sql, update_data)
db.commit()
print(f"批量更新成功,影响{cursor.rowcount}条记录")
except Exception as e:
db.rollback()
print(f"批量更新失败: {e}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 删除指定学生
sql = "DELETE FROM students WHERE name = %s"
values = ('赵六',)
try:
cursor.execute(sql, values)
db.commit()
print(f"删除成功,影响{cursor.rowcount}条记录")
except Exception as e:
db.rollback()
print(f"删除失败: {e}")
db.close()import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
# 清空学生表(谨慎操作!)
try:
cursor.execute("DELETE FROM students")
db.commit()
print("所有学生记录已清空")
except Exception as e:
db.rollback()
print(f"清空失败: {e}")
db.close()事务确保数据库操作的原子性,要么全部成功,要么全部失败。
import pymysql
db = pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
)
cursor = db.cursor()
try:
# 开始事务
db.begin()
# 插入新学生
sql1 = "INSERT INTO students (name, age, grade) VALUES (%s, %s, %s)"
cursor.execute(sql1, ('孙八', 17, '高二'))
# 更新现有学生
sql2 = "UPDATE students SET age = %s WHERE name = %s"
cursor.execute(sql2, (20, '张三'))
# 提交事务
db.commit()
print("事务执行成功")
except Exception as e:
# 回滚事务
db.rollback()
print(f"事务执行失败,已回滚: {e}")
db.close()使用with语句可以自动管理连接,确保资源被正确释放。
import pymysql
def get_students_by_grade(grade):
"""根据年级查询学生"""
with pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
) as db:
with db.cursor() as cursor:
sql = "SELECT * FROM students WHERE grade = %s"
cursor.execute(sql, (grade,))
return cursor.fetchall()
# 使用示例
students = get_students_by_grade('高三')
for student in students:
print(f"姓名: {student[1]}, 年龄: {student[2]}")import pymysql
class StudentManager:
def __init__(self, host, user, password, database):
self.db_config = {
'host': host,
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
def add_student(self, name, age, grade, email):
"""添加学生"""
with pymysql.connect(**self.db_config) as db:
with db.cursor() as cursor:
sql = """
INSERT INTO students (name, age, grade, email)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(sql, (name, age, grade, email))
db.commit()
return cursor.lastrowid
def get_all_students(self):
"""获取所有学生"""
with pymysql.connect(**self.db_config) as db:
with db.cursor() as cursor:
cursor.execute("SELECT * FROM students")
return cursor.fetchall()
def search_students(self, name=None, grade=None, min_age=None):
"""搜索学生"""
with pymysql.connect(**self.db_config) as db:
with db.cursor() as cursor:
sql = "SELECT * FROM students WHERE 1=1"
params = []
if name:
sql += " AND name LIKE %s"
params.append(f"%{name}%")
if grade:
sql += " AND grade = %s"
params.append(grade)
if min_age:
sql += " AND age >= %s"
params.append(min_age)
cursor.execute(sql, params)
return cursor.fetchall()
def update_student(self, student_id, **kwargs):
"""更新学生信息"""
if not kwargs:
return False
with pymysql.connect(**self.db_config) as db:
with db.cursor() as cursor:
set_clause = ", ".join([f"{key} = %s" for key in kwargs.keys()])
sql = f"UPDATE students SET {set_clause} WHERE id = %s"
params = list(kwargs.values())
params.append(student_id)
cursor.execute(sql, params)
db.commit()
return cursor.rowcount > 0
# 使用示例
manager = StudentManager('localhost', 'root', '123456', 'school_db')
# 添加学生
new_id = manager.add_student("周九", 16, "高一", "zhoujiu@fly63.com")
print(f"添加学生成功,ID: {new_id}")
# 搜索学生
students = manager.search_students(grade="高三", min_age=18)
print("高三且年龄≥18的学生:")
for student in students:
print(f"姓名: {student[1]}, 年龄: {student[2]}")import pymysql
import datetime
def backup_database(host, user, password, database, backup_file=None):
"""备份数据库到SQL文件"""
if not backup_file:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{database}_{timestamp}.sql"
try:
with pymysql.connect(
host=host,
user=user,
password=password,
database=database
) as db:
with db.cursor() as cursor:
# 获取所有表
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
with open(backup_file, 'w', encoding='utf-8') as f:
f.write(f"-- 数据库备份: {database}\n")
f.write(f"-- 备份时间: {datetime.datetime.now()}\n\n")
for table in tables:
# 备份表结构
cursor.execute(f"SHOW CREATE TABLE {table}")
create_sql = cursor.fetchone()[1]
f.write(f"{create_sql};\n\n")
# 备份表数据
cursor.execute(f"SELECT * FROM {table}")
rows = cursor.fetchall()
if rows:
f.write(f"-- 数据表: {table}\n")
for row in rows:
values = ", ".join([f"'{str(value)}'" for value in row])
f.write(f"INSERT INTO {table} VALUES ({values});\n")
f.write("\n")
print(f"备份成功: {backup_file}")
return True
except Exception as e:
print(f"备份失败: {e}")
return False
# 使用备份功能
backup_database('localhost', 'root', '123456', 'school_db')import pymysql
from pymysql import Error
def safe_database_operation():
try:
with pymysql.connect(
host='localhost',
user='root',
password='123456',
database='school_db'
) as db:
with db.cursor() as cursor:
# 执行数据库操作
cursor.execute("SELECT * FROM students")
results = cursor.fetchall()
return results
except Error as e:
print(f"数据库错误: {e}")
return None
except Exception as e:
print(f"其他错误: {e}")
return None
# 使用示例
students = safe_database_operation()
if students:
print(f"查询到{len(students)}名学生")掌握PyMySQL的使用,能够让你在Python程序中轻松操作MySQL数据库。无论是Web开发还是数据分析,这都是非常重要的技能。
本文内容仅供个人学习/研究/参考使用,不构成任何决策建议或专业指导。分享/转载时请标明原文来源,同时请勿将内容用于商业售卖、虚假宣传等非学习用途哦~感谢您的理解与支持!