MySQL迁移指定表的历史数据有多种方案,根据数据量、停机时间要求和系统复杂性等因素,可以选择不同的方法。以下是最常见的几种方案:
# 导出指定表的数据(可加时间条件)
mysqldump -h 源主机 -u 用户 -p密码 数据库名 表名 \
--where="create_time < '2024-01-01'" \
--no-create-info \
--skip-triggers \
--skip-lock-tables > data.sql
# 导入到目标数据库
mysql -h 目标主机 -u 用户 -p密码 目标数据库 < data.sql
-- 源库导出
SELECT * INTO OUTFILE '/tmp/data.csv'
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM 表名
WHERE create_time < '2024-01-01';
-- 目标库导入
LOAD DATA INFILE '/tmp/data.csv'
INTO TABLE 表名
FIELDS TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n';
import mysql.connector
import pandas as pd
from datetime import datetime
# 配置连接
source_config = {
'host': 'source_host',
'user': 'user',
'password': 'password',
'database': 'db_name'
}
target_config = {
'host': 'target_host',
'user': 'user',
'password': 'password',
'database': 'db_name'
}
def migrate_historical_data(table_name, cutoff_date, batch_size=1000):
"""分批次迁移历史数据"""
source_conn = mysql.connector.connect(**source_config)
target_conn = mysql.connector.connect(**target_config)
source_cursor = source_conn.cursor(dictionary=True)
target_cursor = target_conn.cursor()
# 获取总记录数
count_query = f"""
SELECT COUNT(*) as total
FROM {table_name}
WHERE create_time < '{cutoff_date}'
"""
source_cursor.execute(count_query)
total = source_cursor.fetchone()['total']
print(f"需要迁移 {total} 条记录")
offset = 0
while offset < total:
# 分批查询
query = f"""
SELECT * FROM {table_name}
WHERE create_time < '{cutoff_date}'
ORDER BY id
LIMIT {batch_size} OFFSET {offset}
"""
source_cursor.execute(query)
rows = source_cursor.fetchall()
if not rows:
break
# 构建插入语句
columns = list(rows[0].keys())
placeholders = ', '.join(['%s'] * len(columns))
insert_query = f"""
INSERT INTO {table_name} ({', '.join(columns)})
VALUES ({placeholders})
ON DUPLICATE KEY UPDATE ... -- 根据需求添加
"""
# 批量插入
for row in rows:
values = [row[col] for col in columns]
target_cursor.execute(insert_query, values)
target_conn.commit()
offset += len(rows)
print(f"已迁移 {offset}/{total} 条记录")
source_cursor.close()
target_cursor.close()
source_conn.close()
target_conn.close()
# 使用
migrate_historical_data('orders', '2024-01-01')
# 迁移并删除源数据
pt-archiver \
--source h=源主机,D=数据库,t=表名,u=用户,p=密码 \
--dest h=目标主机,D=数据库,t=表名,u=用户,p=密码 \
--where "create_time < '2024-01-01'" \
--limit 1000 \
--commit-each \
--statistics
# 只迁移不删除
pt-archiver \
--source ... \
--dest ... \
--where ... \
--no-delete \
--limit 1000
# debezium配置示例
connector.class: io.debezium.connector.mysql.MySqlConnector
database.hostname: source_host
database.user: user
database.password: password
database.server.id: 184054
database.server.name: source_db
table.include.list: db_name.table_name
snapshot.mode: initial
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| mysqldump | 简单、内置工具 | 锁表、停机时间长 | 小数据量、可停机 |
| SELECT OUTFILE | 性能好、CSV格式 | 需要文件传输 | 大数据量迁移 |
| pt-archiver | 专业、可分批、可删除 | 需要安装工具 | 生产环境推荐 |
| Python脚本 | 灵活可控 | 开发成本高 | 复杂业务逻辑 |
| Debezium | 实时同步 | 架构复杂 | 需要持续同步 |
前期准备
-- 1. 确认数据结构一致
SHOW CREATE TABLE 表名;
-- 2. 检查数据量
SELECT COUNT(*) FROM 表名 WHERE create_time < '2024-01-01';
-- 3. 创建索引优化查询
CREATE INDEX idx_time ON 表名(create_time);
分阶段迁移
# 按时间分片迁移
date_ranges = [
('2020-01-01', '2020-12-31'),
('2021-01-01', '2021-12-31'),
('2022-01-01', '2022-12-31')
]
for start_date, end_date in date_ranges:
migrate_by_date_range(start_date, end_date)
验证数据一致性
-- 数据量对比
SELECT COUNT(*) FROM 源表 WHERE create_time < '2024-01-01';
SELECT COUNT(*) FROM 目标表;
-- 抽样验证
SELECT * FROM 源表 WHERE id IN (1, 100, 1000);
SELECT * FROM 目标表 WHERE id IN (1, 100, 1000);
监控指标
mysqldump 或 pt-archiverDebezium 或 CanalPython脚本 或 ETL工具pt-archiver + 分批次迁移根据您的具体场景选择合适的方案,建议先在测试环境验证迁移流程。