一、问题背景
mysql分表之后,只想保留某张表(例如订单表)最近半年内的数据,然后历史数据放到ES中进行归档,所以要定期处理。像这种的话,要怎么设计呢
二、问题解析
把「近 6 个月留在 MySQL,历史数据归档到 ES 并定期清理」做成可运维、可回滚、可校验的流程需要兼顾性能(对在线库影响最小)、一致性(不丢数据/可校验)、查询体验(历史走 ES)与合规(备份/脱敏)。下面给出一套实战可落地的方案(含多种实现路径、SQL/脚本示例、注意事项与验证策略),可以根据当前 MySQL 版本/表结构/外键/运维能力选用。
方案总览(两条主线)
-
最佳实践(强烈推荐):使用 MySQL 分区 + EXCHANGE PARTITION 快速切分 + 批量导出到 ES + 校验 + 删除/丢弃分区表
优点:对主表影响最小、删除快速(即刻回收)、适合大表、工业级可运维。
条件/限制:表不能有外键约束(MySQL 分区/EXCHANGE 有限制),需要支持 partition 的 MySQL 版本(常见 5.6+、8.0 均支持)。 -
通用方案(没有分区或不能使用 EXCHANGE 的情况):分批批量导出(pt-archiver / 自研批次脚本)→ 写入 ES → 验证 → 批量删除(小批次)
优点:兼容性高(任何 MySQL),可在线化,风险可控。
缺点:删除可能比较慢(大量 DELETE 会产生 undo/iblogs/主从延迟),需 throttling 与删除策略。
下文先给分区方案的详细流程与代码,然后给非分区的批处理方案与工具选择,并列出校验、回滚、后续更新/同步(退款/补数据)处理等关键操作与注意点。
一、强烈推荐:分区 + EXCHANGE_PARTITION 流程(最佳稳定性与性能)
1. 设计前提
-
表:
orders
(主键id
或order_id
),建立在 InnoDB 上 -
有按时间字段(例如
created_at
或order_date
)可以按月分区 -
无外键(若有 FK,需先评估或去掉 FK,生产级电商通常不开 FK)
-
MySQL 支持
EXCHANGE PARTITION
(多数 5.6+ 都支持)
2. 分区建表示例(按月)
ALTER TABLE orders PARTITION BY RANGE (TO_DAYS(created_at)) (PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),...PARTITION pmax VALUES LESS THAN MAXVALUE );
也可以用
RANGE COLUMNS(created_at)
或PARTITION BY RANGE (UNIX_TIMESTAMP(created_at))
,按你偏好的表达式。
运维说明:需要每月
ALTER TABLE ... REORGANIZE PARTITION
或提前创建下月 partition,构建分区管理脚本。
3. 每月/每日归档作业(示例:按月归档早于 cutoff 的分区)
总体步骤(事务/停顿最小):
-
确定要归档的 partition(例如
p202401
),该 partition 的数据全部是某个月的订单;只选完全早于now() - 6 months
的 partition。 -
创建一个空表
orders_p202401_tmp
与orders
结构完全相同(包含索引),此表必须为空: -
交换分区(快速、无行拷贝):
说明:此命令把
p202401
里的数据置入orders_p202401_tmp
,而把orders_p202401_tmp
(空)放到orders
的 partition 位置。操作非常快(元数据交换)。
注意:orders_p202401_tmp
不能有外键,且表结构需严格一致。 -
导出/迁移数据到 ES:对
orders_p202401_tmp
执行批量导出到 ES(见下面 ES 导出方式)。因为这时主表orders
已经无该 partition 的数据,主库继续对其他数据提供服务,无锁等待。 -
校验:对比
COUNT(*)
、必要字段 checksum(例如MD5(CONCAT(...))
)或 sample 校验。记录在archive_batches
审计表。 -
确认无误 → 删除
orders_p202401_tmp
表来释放空间:(或保留一阵子并备份为 MySQL dump / S3 存档作为冷备份)
-
ES 上执行 index lifecycle(ILM)与冷/热分层策略(见下文)。
4. EXAMPLE:归档脚本伪代码(bash)
PARTITION=p202401 TMP_TABLE=orders_${PARTITION}_tmpmysql -e "CREATE TABLE ${TMP_TABLE} LIKE orders;" mysql -e "ALTER TABLE orders EXCHANGE PARTITION (${PARTITION}) WITH TABLE ${TMP_TABLE};"# 导出到 ES(调用 python 脚本或 logstash) python export_to_es.py --table ${TMP_TABLE} --index orders-${PARTITION}# 等待并验证(count/checksum) mysql -e "SELECT COUNT(*) FROM ${TMP_TABLE};" > cnt_tmp.txt # use ES query to count documents for index orders-${PARTITION} # 比对,如果一致: mysql -e "DROP TABLE ${TMP_TABLE};" # 记录批次到 archive_batches
5. 优点总结(为什么强烈推荐)
-
EXCHANGE PARTITION
几乎是 O(1) 的元数据操作,避免了在主表上做大量 DELETE / UPDATE 导致 undo/iblog/复制延迟。 -
主表对在线业务影响极小(短时间元数据操作),适合大流量电商场景。
-
支持对外工具的高效并发导出(因为导出是在独立表上做)。
6. 限制与注意事项
-
外键:如果该表有外键,EXCHANGE PARTITION 可能会失败。解决:先移除外键、或使用非 FK 的数据模型(推荐)。
-
唯一索引/自增:确保 tmp 表与原表结构一致(AUTO_INCREMENT 不影响 EXCHANGE)。
-
并发:在交换分区时会需要短锁(metadata lock),避免在大量 DDL 操作窗口并发执行。
-
变更:若归档过程中发现需回滚,确保保留 tmp 表直至校验通过。
二、通用方案(没有分区或不能 EXCHANGE 的情况)
如果不能使用分区/EXCHANGE(例如外键存在、MySQL 版本限制、无法停机更改表结构),用分批导出 + 校验 + 删除。关键点是小批量、有幂等、可重试和限速。
1. 推荐工具
-
Percona pt-archiver:非常成熟的在线归档工具,支持分批复制到目标表并删除原行,支持 --commit-each、--limit、--sleep、--txn-size 等参数控制速率。
-
自研脚本(Python/Go/Java):使用
ORDER BY id LIMIT N
分页导出并批量写入 ES(bulk API),并用DELETE ... WHERE id IN (...)
或DELETE LIMIT
分批删除。 -
Debezium / binlog CDC:如果你已经有 CDC + Kafka,可以在把历史数据推到 ES 之后,继续通过 CDC 同步后续变更(见后述)。
2. 批量导出示例(批量导出 MySQL → ES)
伪代码(更完整在下):
-
分页(主键 id 增量或按 created_at)
-
每批大小 e.g. 1k ~ 10k(按 MySQL 压力调整)
-
用 ES helpers.bulk 写入
-
写完一批后记录
archive_batches
表并删除 MySQL 的那批(或标记为已归档) -
使用重试、backoff、日志
Java 导出/导入示例(简化)
import java.sql.*; import java.util.*; import java.io.IOException;import org.apache.http.HttpHost; import org.elasticsearch.action.bulk.*; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.*; import org.elasticsearch.client.indices.*; import org.elasticsearch.common.xcontent.XContentType;public class MysqlToEsArchiver {private static final String MYSQL_URL = "jdbc:mysql://127.0.0.1:3306/mydb?useSSL=false&serverTimezone=UTC";private static final String MYSQL_USER = "root";private static final String MYSQL_PASS = "password";private static final String ES_HOST = "localhost";private static final int ES_PORT = 9200;private static final String ES_SCHEME = "http";private static final String ES_INDEX = "orders-archive";private static final int BATCH_SIZE = 1000;private static final String CUTOFF = "2024-03-01"; // 半年前日期public static void main(String[] args) throws Exception {try (Connection conn = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASS);RestHighLevelClient esClient = new RestHighLevelClient(RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME)))) {long lastId = 0L;while (true) {List<Map<String, Object>> rows = fetchBatch(conn, lastId);if (rows.isEmpty()) break;bulkInsertToEs(esClient, rows);// 更新归档标记 markArchived(conn, rows);lastId = (long) rows.get(rows.size() - 1).get("id");}}}private static List<Map<String, Object>> fetchBatch(Connection conn, long lastId) throws SQLException {String sql = "SELECT id, user_id, created_at, status, total_amount " +"FROM orders WHERE created_at < ? AND id > ? ORDER BY id LIMIT ?";try (PreparedStatement ps = conn.prepareStatement(sql)) {ps.setString(1, CUTOFF);ps.setLong(2, lastId);ps.setInt(3, BATCH_SIZE);try (ResultSet rs = ps.executeQuery()) {List<Map<String, Object>> list = new ArrayList<>();while (rs.next()) {Map<String, Object> row = new HashMap<>();row.put("id", rs.getLong("id"));row.put("user_id", rs.getLong("user_id"));row.put("created_at", rs.getTimestamp("created_at"));row.put("status", rs.getString("status"));row.put("total_amount", rs.getBigDecimal("total_amount"));list.add(row);}return list;}}}private static void bulkInsertToEs(RestHighLevelClient client, List<Map<String, Object>> rows) throws IOException {BulkRequest bulkRequest = new BulkRequest();for (Map<String, Object> row : rows) {IndexRequest req = new IndexRequest(ES_INDEX).id(row.get("id").toString()) // 用订单ID做文档ID,保证幂等 .source(row, XContentType.JSON);bulkRequest.add(req);}BulkResponse resp = client.bulk(bulkRequest, RequestOptions.DEFAULT);if (resp.hasFailures()) {System.err.println("Bulk insert failures: " + resp.buildFailureMessage());// 可以加上重试逻辑 }}private static void markArchived(Connection conn, List<Map<String, Object>> rows) throws SQLException {String sql = "UPDATE orders SET archived = 1 WHERE id = ?";try (PreparedStatement ps = conn.prepareStatement(sql)) {for (Map<String, Object> row : rows) {ps.setLong(1, (long) row.get("id"));ps.addBatch();}ps.executeBatch();}} }
说明
-
依赖
Maven 需要加:
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.15</version> <!-- 版本需与你 ES 对应 --> </dependency> <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.34</version> </dependency>
-
如果用 ES 8.x,可以换
elasticsearch-java
官方新客户端。 -
批量导出
每次从 MySQL 取 BATCH_SIZE 行(示例用 1000),按 ID 顺序递增分页,直到取不到数据。 -
写入 ES
使用 Bulk API,_id
设置为订单 ID,保证幂等性。 -
更新归档标记
用一个archived
字段标记已归档,避免重复迁移。也可以改成DELETE
,不过推荐先标记、再统一删除。 -
异常与重试
实际中要加 失败重试、断点续传(记录 lastId),以及 监控日志。
3. 使用 pt-archiver(Percona Toolkit)
示例把旧数据搬到另一库或表:
pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \--dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1
pt-archiver 会把数据安全搬走,支持断点与 resume;搬完后再 run DELETE
或直接在源表上执行批量删除(pt-archiver 也有 --delete 选项,但这会在源表上 delete 并影响 undo)。
三、ES 建模与索引管理(历史数据放 ES 的推荐做法)
把订单放 ES 不是简单“直接复制列”,通常建议把订单及其子表(items/payments/discounts)做成一个 denormalized 文档,便于搜索与聚合。
1. 推荐 Index 策略
-
按月索引:
orders-YYYY.MM
(便于 ILM 管理、分区式索引) -
字段 mapping 示例(简化):
CREATE TABLE archive_batches (batch_id BIGINT PRIMARY KEY AUTO_INCREMENT,partition_name VARCHAR(64),cutoff_date DATE,rows_exported BIGINT,es_index VARCHAR(128),es_count BIGINT,checksum VARCHAR(64),status ENUM('EXPORTING','VERIFIED','DELETED','FAILED') DEFAULT 'EXPORTING',created_at DATETIME DEFAULT CURRENT_TIMESTAMP );
每批次写入并更新状态,便于审计和人为回溯。
2. 校验策略
-
导出完成后:
SELECT COUNT(*) FROM tmp_table
与GET /orders-*-/_count
对比 -
对比 MD5 列(例如对每行做 md5(concat(fields)) 聚合,比较聚合值)
-
随机抽样多行字段对比
3. 备份与回滚
-
在 DROP TABLE 之前,把 tmp 表做 mysqldump 导出或导出 CSV 保存到 S3(保留 30 天),以便回滚。
-
若发现 ES 写入错误,恢复方法:从 S3 或 tmp 表重跑 bulk 写入;或从 audit log 做 selective reindex。
六、性能 / 运维细节(保证主库稳定)
-
批量大小:ES Bulk 建议 1k~5k(单请求大小 < 10–20MB),MySQL select batch 建议 1k~10k,根据 IO 与 RAM 调整。
-
并发 worker:并行度控制,典型 4~8 个并行线程写 ES(不要压垮 ES)
-
速率限制:根据主库负载自动调节(当主库 CPU/IO > threshold 时减速)
-
事务边界:避免长事务(会增加 undo),使用小批量并发快提交
-
监控指标:每批 rows exported, ES success count, export latency, MySQL load, replication lag, errors; 报警门槛
七、示例:完整实践脚本(分区 + EXCHANGE)总结步骤
-
计算 cutoff month (6 months ago) → 需要归档的 partition 列表
-
对每个 partition:
-
CREATE tmp table LIKE orders
-
ALTER TABLE orders EXCHANGE PARTITION (pYYYYMM) WITH TABLE tmp_table
-
launch exporter job (python/Logstash) to write tmp_table -> ES index orders-YYYY.MM
-
verify counts/checksum
-
BACKUP tmp_table (mysqldump -> S3) (可选,短期保留)
-
DROP TABLE tmp_table
-
insert archive_batches record with status=VERIFIED/DELETED
-
-
ES ILM 管理旧索引(move to warm/cold or delete per retention)
八、常见问题与 FAQ(快速回答)
-
Q:归档会丢失后续退款等变更怎么办?
A:使用 CDC(Debezium / Maxwell / Canal)或事件化链路把后续变更写入 ES;保持 archive consumer 对 archived orders 做 update。 -
Q:为什么不用直接在 MySQL 中 delete?
A:DELETE 大量行会产生大量 undo/redo、binlog 放大、主从复制延迟,并占用空间直到 purge;分区/EXCHANGE 或拆批删除能极大降低影响。 -
Q:ES 会成为新的“写源”吗?
A:ES 仅作为归档/查询/分析用。权威数据仍留在 MySQL(或冷备)。对账/结算仍需基于 DB 备份或独立账务库。 -
Q:如何处理敏感字段(PII)?
A:归档前先脱敏/加密/mask(例如只保留 last4 digits,或使用哈希),并记录解密/访问权限流程,符合 GDPR/法规。
九、附:实用 SQL / 工具片段汇总
建分区(示例)
CREATE TABLE orders_p202401_tmp LIKE orders; ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp; -- now orders_p202401_tmp holds data for that month
DROP tmp table(释放空间)
DROP TABLE orders_p202401_tmp;
pt-archiver(替代)
pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \--dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1
ES mapping 示例(简化)
PUT /orders-2024.01 {"mappings": {"properties": {"order_id": {"type":"keyword"},"user_id": {"type":"keyword"},"created_at": {"type":"date"},"status": {"type":"keyword"},"total_amount": {"type":"double"},"items": {"type":"nested", "properties": { "sku_id":{"type":"keyword"}, "qty":{"type":"integer"} } }}} }
十、结论(一句话)
如果你能预先在表上做按时间分区并且没有外键,优先使用 EXCHANGE PARTITION
+ 批量导出到 ES + 校验 + DROP tmp 表 的流程。
如果不能分区,则使用 pt-archiver
或自研分批导出(小批量、限速、audit、备份),并通过 CDC/事件保证归档后仍能同步后续变更。