欢迎光临百色市中国丧葬服务网
详情描述

MySQL迁移指定表的历史数据有多种方案,根据数据量、停机时间要求和系统复杂性等因素,可以选择不同的方法。以下是最常见的几种方案:

方案一:使用原生工具导出导入(简单直接)

1. mysqldump + mysql

# 导出指定表的数据(可加时间条件)
mysqldump -h 源主机 -u 用户 -p密码 数据库名 表名 \
  --where="create_time < '2024-01-01'" \
  --no-create-info \
  --skip-triggers \
  --skip-lock-tables > data.sql

# 导入到目标数据库
mysql -h 目标主机 -u 用户 -p密码 目标数据库 < data.sql

2. SELECT INTO OUTFILE + LOAD DATA

-- 源库导出
SELECT * INTO OUTFILE '/tmp/data.csv'
FIELDS TERMINATED BY ',' 
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM 表名 
WHERE create_time < '2024-01-01';

-- 目标库导入
LOAD DATA INFILE '/tmp/data.csv'
INTO TABLE 表名
FIELDS TERMINATED BY ',' 
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n';

方案二:ETL工具(适合复杂场景)

1. 使用Apache NiFi

  • 图形化界面,支持数据转换
  • 可增量同步,监控数据流
  • 适合大数据量场景

2. 使用Talend/Kettle

  • 强大的ETL功能
  • 支持数据清洗和转换
  • 可视化作业设计

方案三:程序化迁移(灵活可控)

Python脚本示例

import mysql.connector
import pandas as pd
from datetime import datetime

# 配置连接
source_config = {
    'host': 'source_host',
    'user': 'user',
    'password': 'password',
    'database': 'db_name'
}

target_config = {
    'host': 'target_host',
    'user': 'user',
    'password': 'password',
    'database': 'db_name'
}

def migrate_historical_data(table_name, cutoff_date, batch_size=1000):
    """分批次迁移历史数据"""

    source_conn = mysql.connector.connect(**source_config)
    target_conn = mysql.connector.connect(**target_config)

    source_cursor = source_conn.cursor(dictionary=True)
    target_cursor = target_conn.cursor()

    # 获取总记录数
    count_query = f"""
        SELECT COUNT(*) as total 
        FROM {table_name} 
        WHERE create_time < '{cutoff_date}'
    """
    source_cursor.execute(count_query)
    total = source_cursor.fetchone()['total']

    print(f"需要迁移 {total} 条记录")

    offset = 0
    while offset < total:
        # 分批查询
        query = f"""
            SELECT * FROM {table_name} 
            WHERE create_time < '{cutoff_date}'
            ORDER BY id
            LIMIT {batch_size} OFFSET {offset}
        """

        source_cursor.execute(query)
        rows = source_cursor.fetchall()

        if not rows:
            break

        # 构建插入语句
        columns = list(rows[0].keys())
        placeholders = ', '.join(['%s'] * len(columns))
        insert_query = f"""
            INSERT INTO {table_name} ({', '.join(columns)})
            VALUES ({placeholders})
            ON DUPLICATE KEY UPDATE ...  -- 根据需求添加
        """

        # 批量插入
        for row in rows:
            values = [row[col] for col in columns]
            target_cursor.execute(insert_query, values)

        target_conn.commit()
        offset += len(rows)
        print(f"已迁移 {offset}/{total} 条记录")

    source_cursor.close()
    target_cursor.close()
    source_conn.close()
    target_conn.close()

# 使用
migrate_historical_data('orders', '2024-01-01')

方案四:数据库同步工具

1. 使用pt-archiver(推荐)

# 迁移并删除源数据
pt-archiver \
  --source h=源主机,D=数据库,t=表名,u=用户,p=密码 \
  --dest h=目标主机,D=数据库,t=表名,u=用户,p=密码 \
  --where "create_time < '2024-01-01'" \
  --limit 1000 \
  --commit-each \
  --statistics

# 只迁移不删除
pt-archiver \
  --source ... \
  --dest ... \
  --where ... \
  --no-delete \
  --limit 1000

2. 使用gh-ost(在线DDL工具衍生)

  • 适合大表迁移
  • 对线上业务影响小
  • 可暂停、可监控

方案五:CDC(Change Data Capture)方案

使用Debezium + Kafka

# debezium配置示例
connector.class: io.debezium.connector.mysql.MySqlConnector
database.hostname: source_host
database.user: user
database.password: password
database.server.id: 184054
database.server.name: source_db
table.include.list: db_name.table_name
snapshot.mode: initial

方案对比

方案 优点 缺点 适用场景
mysqldump 简单、内置工具 锁表、停机时间长 小数据量、可停机
SELECT OUTFILE 性能好、CSV格式 需要文件传输 大数据量迁移
pt-archiver 专业、可分批、可删除 需要安装工具 生产环境推荐
Python脚本 灵活可控 开发成本高 复杂业务逻辑
Debezium 实时同步 架构复杂 需要持续同步

迁移最佳实践

前期准备

-- 1. 确认数据结构一致
SHOW CREATE TABLE 表名;

-- 2. 检查数据量
SELECT COUNT(*) FROM 表名 WHERE create_time < '2024-01-01';

-- 3. 创建索引优化查询
CREATE INDEX idx_time ON 表名(create_time);

分阶段迁移

# 按时间分片迁移
date_ranges = [
    ('2020-01-01', '2020-12-31'),
    ('2021-01-01', '2021-12-31'),
    ('2022-01-01', '2022-12-31')
]

for start_date, end_date in date_ranges:
    migrate_by_date_range(start_date, end_date)

验证数据一致性

-- 数据量对比
SELECT COUNT(*) FROM 源表 WHERE create_time < '2024-01-01';
SELECT COUNT(*) FROM 目标表;

-- 抽样验证
SELECT * FROM 源表 WHERE id IN (1, 100, 1000);
SELECT * FROM 目标表 WHERE id IN (1, 100, 1000);

监控指标

  • 迁移速率(行/秒)
  • 网络带宽使用
  • 源库和目标库负载
  • 错误率

选择建议

  • 数据量 < 1000万行:使用 mysqldumppt-archiver
  • 需要持续同步:使用 DebeziumCanal
  • 需要数据转换:使用 Python脚本ETL工具
  • 生产环境推荐pt-archiver + 分批次迁移
  • 最小化停机时间:先迁移历史数据,再用CDC同步增量

根据您的具体场景选择合适的方案,建议先在测试环境验证迁移流程。