目录
- 一、核心同步策略设计
- 二、常用ETL工具实现方案
- 方案1:使用开源工具Apache NiFi(适合技术团队自主部署)
- 方案2:使用商用工具DataPipeline(适合企业级自动化同步)
- 方案3:轻量方案(Python+SQL脚本,适合简单场景)
- 三、关键注意事项
- 四、工具选择建议
使用ETL工具实现老系统MySQL到新系统MySQL的数据同步,是一种高效且可靠的方案。以下是具体实现步骤和常用工具的操作指南,适用于全量迁移+增量同步的典型场景:
一、核心同步策略设计
-
同步模式:
- 全量迁移:首次将老库历史数据一次性迁移到新库
- 增量同步:实时/定时同步后续新增或变更的数据(基于
update_time
字段或binlog日志)
-
数据处理流程:
抽取(Extract)→ 转换(Transform)→ 加载(Load)- 抽取:从老MySQL读取数据
- 转换:字段映射、格式调整、清洗(去重/补空值等)
- 加载:写入新MySQL(支持插入/更新/删除同步)
二、常用ETL工具实现方案
方案1:使用开源工具Apache NiFi(适合技术团队自主部署)
优势:可视化流程设计、支持实时同步、内置MySQL连接器
步骤:
- 部署NiFi:在私有化服务器安装NiFi(支持Docker部署)
- 配置数据源:
- 拖拽
ExecuteSQL
处理器,配置老MySQL连接(JDBC URL、账号密码),编写全量查询SQL(如SELECT * FROM old_user
) - 拖拽
PutDatabaseRecord
处理器,配置新MySQL连接
- 拖拽
- 设计数据流转:
- 用
ConvertJSONToSQL
处理器转换数据格式(适配新库表结构) - 用
UpdateAttribute
处理器添加字段映射规则(如老库user_name
→新库username
)
- 用
- 增量同步配置:
- 新增
QueryDatabaseTable
处理器,设置增量字段(如update_time > ?
),定期轮询变更数据 - 连接到
PutDatabaseRecord
实现增量写入
- 新增
示例流程:
QueryDatabaseTable
(抽取增量)→ ConvertAvroToJSON
(转换)→ PutDatabaseRecord
(加载到新库)
方案2:使用商用工具DataPipeline(适合企业级自动化同步)
优势:零代码配置、自动字段映射、支持数据校验和监控
步骤:
- 添加数据源:
- 在控制台分别添加老MySQL和新MySQL的连接信息(主机、端口、库名、账号),测试连接通过
- 创建同步任务:
- 选择“全量+增量”模式,指定源表(老库
user
)和目标表(新库user_info
) - 自动匹配字段(或手动调整映射关系,如老库
reg_time
→新库register_time
,并设置类型转换varchar→datetime
)
- 选择“全量+增量”模式,指定源表(老库
- 配置转换规则:
- 在“转换”环节添加清洗规则(如过滤
status=0
的无效数据、用COALESCE(age, 0)
填充空值)
- 在“转换”环节添加清洗规则(如过滤
- 启动并监控:
- 执行全量迁移,完成后自动切换到增量同步(基于binlog实时捕获变更)
- 在控制台查看同步进度、成功率,设置异常告警(邮件/短信通知)
方案3:轻量方案(Python+SQL脚本,适合简单场景)
优势:开发灵活、无需部署复杂工具
步骤:
-
全量迁移脚本:
import pymysql# 连接老库和新库 old_conn = pymysql.connect(host='old_mysql', user='root', password='pwd', db='old_db') new_conn = pymysql.connect(host='new_mysql', user='root', password='pwd', db='new_db')# 全量读取老库数据 with old_conn.cursor() as old_cursor:old_cursor.execute("SELECT id, name, create_time FROM old_user")data = old_cursor.fetchall()# 写入新库(字段映射:create_time→ctime) with new_conn.cursor() as new_cursor:new_cursor.executemany("INSERT INTO new_user (id, username, ctime) VALUES (%s, %s, %s)",[(row[0], row[1], row[2]) for row in data])new_conn.commit()
-
增量同步脚本(配合Crontab定时执行):
# 读取上次同步时间(从文件或数据库记录) last_sync_time = "2024-09-01 00:00:00"# 增量抽取(只同步更新时间晚于上次的记录) with old_conn.cursor() as old_cursor:old_cursor.execute("SELECT id, name, create_time FROM old_user WHERE update_time > %s",(last_sync_time,))incremental_data = old_cursor.fetchall()# 写入新库(存在则更新,不存在则插入) with new_conn.cursor() as new_cursor:for row in incremental_data:new_cursor.execute("""INSERT INTO new_user (id, username, ctime) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE username=%s, ctime=%s""",(row[0], row[1], row[2], row[1], row[2]))new_conn.commit()
三、关键注意事项
-
数据一致性保障:
- 全量迁移时锁表(避免迁移中数据变更),或迁移后执行校验(如对比新旧库表的记录数、关键字段哈希值)
- 增量同步使用
ON DUPLICATE KEY UPDATE
确保幂等性(避免重复写入)
-
性能优化:
- 大表迁移分批次读取(如
LIMIT 1000 OFFSET ?
),避免内存溢出 - 新库添加临时索引,加速写入和后续查询
- 大表迁移分批次读取(如
-
异常处理:
- 脚本或工具需支持失败重试(如网络中断后继续同步)
- 记录同步日志(成功/失败的记录ID、时间),便于排查问题
四、工具选择建议
- 中小团队/简单场景:优先用Python脚本+Crond(开发快、无额外部署成本)
- 复杂业务转换/实时同步:选择Apache NiFi(开源免费,灵活扩展)
- 企业级需求/少运维:选择DataPipeline(商用工具,提供技术支持和监控告警)
通过上述方案,可实现老MySQL到新MySQL的自动化、可靠同步,满足系统迁移中的数据一致性要求。