当前位置: 首页 > news >正文

mysql的单表如何仅保留半年的数据

一、问题背景

mysql分表之后,只想保留某张表(例如订单表)最近半年内的数据,然后历史数据放到ES中进行归档,所以要定期处理。像这种的话,要怎么设计呢

二、问题解析

把「近 6 个月留在 MySQL,历史数据归档到 ES 并定期清理」做成可运维、可回滚、可校验的流程需要兼顾性能(对在线库影响最小)一致性(不丢数据/可校验)查询体验(历史走 ES)合规(备份/脱敏)。下面给出一套实战可落地的方案(含多种实现路径、SQL/脚本示例、注意事项与验证策略),可以根据当前 MySQL 版本/表结构/外键/运维能力选用。


方案总览(两条主线)

  1. 最佳实践(强烈推荐):使用 MySQL 分区 + EXCHANGE PARTITION 快速切分 + 批量导出到 ES + 校验 + 删除/丢弃分区表
    优点:对主表影响最小、删除快速(即刻回收)、适合大表、工业级可运维。
    条件/限制:表不能有外键约束(MySQL 分区/EXCHANGE 有限制),需要支持 partition 的 MySQL 版本(常见 5.6+、8.0 均支持)。

  2. 通用方案(没有分区或不能使用 EXCHANGE 的情况):分批批量导出(pt-archiver / 自研批次脚本)→ 写入 ES → 验证 → 批量删除(小批次)
    优点:兼容性高(任何 MySQL),可在线化,风险可控。
    缺点:删除可能比较慢(大量 DELETE 会产生 undo/iblogs/主从延迟),需 throttling 与删除策略。

下文先给分区方案的详细流程与代码,然后给非分区的批处理方案与工具选择,并列出校验、回滚、后续更新/同步(退款/补数据)处理等关键操作与注意点。


一、强烈推荐:分区 + EXCHANGE_PARTITION 流程(最佳稳定性与性能)

1. 设计前提

  • 表:orders(主键 idorder_id),建立在 InnoDB 上

  • 有按时间字段(例如 created_atorder_date)可以按月分区

  • 无外键(若有 FK,需先评估或去掉 FK,生产级电商通常不开 FK)

  • MySQL 支持 EXCHANGE PARTITION(多数 5.6+ 都支持)

2. 分区建表示例(按月)

ALTER TABLE orders
PARTITION BY RANGE (TO_DAYS(created_at)) (PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),...PARTITION pmax VALUES LESS THAN MAXVALUE
);

也可以用 RANGE COLUMNS(created_at)PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)),按你偏好的表达式。

运维说明:需要每月 ALTER TABLE ... REORGANIZE PARTITION 或提前创建下月 partition,构建分区管理脚本。

3. 每月/每日归档作业(示例:按月归档早于 cutoff 的分区)

总体步骤(事务/停顿最小):

  1. 确定要归档的 partition(例如 p202401),该 partition 的数据全部是某个月的订单;只选完全早于 now() - 6 months 的 partition。

  2. 创建一个空表 orders_p202401_tmporders 结构完全相同(包含索引),此表必须为空: 

    CREATE TABLE orders_p202401_tmp LIKE orders;
  3. 交换分区(快速、无行拷贝):

    ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp;

    说明:此命令把 p202401 里的数据置入 orders_p202401_tmp,而把 orders_p202401_tmp(空)放到 orders 的 partition 位置。操作非常快(元数据交换)。
    注意:orders_p202401_tmp 不能有外键,且表结构需严格一致。

  4. 导出/迁移数据到 ES:对 orders_p202401_tmp 执行批量导出到 ES(见下面 ES 导出方式)。因为这时主表 orders 已经无该 partition 的数据,主库继续对其他数据提供服务,无锁等待。

  5. 校验:对比 COUNT(*)、必要字段 checksum(例如 MD5(CONCAT(...)))或 sample 校验。记录在 archive_batches 审计表。

  6. 确认无误 → 删除 orders_p202401_tmp 表来释放空间:

    DROP TABLE orders_p202401_tmp;

    (或保留一阵子并备份为 MySQL dump / S3 存档作为冷备份)

  7. ES 上执行 index lifecycle(ILM)与冷/热分层策略(见下文)。

4. EXAMPLE:归档脚本伪代码(bash)

PARTITION=p202401
TMP_TABLE=orders_${PARTITION}_tmpmysql -e "CREATE TABLE ${TMP_TABLE} LIKE orders;"
mysql -e "ALTER TABLE orders EXCHANGE PARTITION (${PARTITION}) WITH TABLE ${TMP_TABLE};"# 导出到 ES(调用 python 脚本或 logstash)
python export_to_es.py --table ${TMP_TABLE} --index orders-${PARTITION}# 等待并验证(count/checksum)
mysql -e "SELECT COUNT(*) FROM ${TMP_TABLE};"  > cnt_tmp.txt
# use ES query to count documents for index orders-${PARTITION}
# 比对,如果一致:
mysql -e "DROP TABLE ${TMP_TABLE};"
# 记录批次到 archive_batches

5. 优点总结(为什么强烈推荐)

  • EXCHANGE PARTITION 几乎是 O(1) 的元数据操作,避免了在主表上做大量 DELETE / UPDATE 导致 undo/iblog/复制延迟。

  • 主表对在线业务影响极小(短时间元数据操作),适合大流量电商场景。

  • 支持对外工具的高效并发导出(因为导出是在独立表上做)。

6. 限制与注意事项

  • 外键:如果该表有外键,EXCHANGE PARTITION 可能会失败。解决:先移除外键、或使用非 FK 的数据模型(推荐)。

  • 唯一索引/自增:确保 tmp 表与原表结构一致(AUTO_INCREMENT 不影响 EXCHANGE)。

  • 并发:在交换分区时会需要短锁(metadata lock),避免在大量 DDL 操作窗口并发执行。

  • 变更:若归档过程中发现需回滚,确保保留 tmp 表直至校验通过。


二、通用方案(没有分区或不能 EXCHANGE 的情况)

如果不能使用分区/EXCHANGE(例如外键存在、MySQL 版本限制、无法停机更改表结构),用分批导出 + 校验 + 删除。关键点是小批量、有幂等、可重试和限速

1. 推荐工具

  • Percona pt-archiver:非常成熟的在线归档工具,支持分批复制到目标表并删除原行,支持 --commit-each、--limit、--sleep、--txn-size 等参数控制速率。

  • 自研脚本(Python/Go/Java):使用 ORDER BY id LIMIT N 分页导出并批量写入 ES(bulk API),并用 DELETE ... WHERE id IN (...)DELETE LIMIT 分批删除。

  • Debezium / binlog CDC:如果你已经有 CDC + Kafka,可以在把历史数据推到 ES 之后,继续通过 CDC 同步后续变更(见后述)。

2. 批量导出示例(批量导出 MySQL → ES)

伪代码(更完整在下):

  • 分页(主键 id 增量或按 created_at)

  • 每批大小 e.g. 1k ~ 10k(按 MySQL 压力调整)

  • 用 ES helpers.bulk 写入

  • 写完一批后记录 archive_batches 表并删除 MySQL 的那批(或标记为已归档)

  • 使用重试、backoff、日志

Java 导出/导入示例(简化)

import java.sql.*;
import java.util.*;
import java.io.IOException;import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.*;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.common.xcontent.XContentType;public class MysqlToEsArchiver {private static final String MYSQL_URL = "jdbc:mysql://127.0.0.1:3306/mydb?useSSL=false&serverTimezone=UTC";private static final String MYSQL_USER = "root";private static final String MYSQL_PASS = "password";private static final String ES_HOST = "localhost";private static final int ES_PORT = 9200;private static final String ES_SCHEME = "http";private static final String ES_INDEX = "orders-archive";private static final int BATCH_SIZE = 1000;private static final String CUTOFF = "2024-03-01";  // 半年前日期public static void main(String[] args) throws Exception {try (Connection conn = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASS);RestHighLevelClient esClient = new RestHighLevelClient(RestClient.builder(new HttpHost(ES_HOST, ES_PORT, ES_SCHEME)))) {long lastId = 0L;while (true) {List<Map<String, Object>> rows = fetchBatch(conn, lastId);if (rows.isEmpty()) break;bulkInsertToEs(esClient, rows);// 更新归档标记
                markArchived(conn, rows);lastId = (long) rows.get(rows.size() - 1).get("id");}}}private static List<Map<String, Object>> fetchBatch(Connection conn, long lastId) throws SQLException {String sql = "SELECT id, user_id, created_at, status, total_amount " +"FROM orders WHERE created_at < ? AND id > ? ORDER BY id LIMIT ?";try (PreparedStatement ps = conn.prepareStatement(sql)) {ps.setString(1, CUTOFF);ps.setLong(2, lastId);ps.setInt(3, BATCH_SIZE);try (ResultSet rs = ps.executeQuery()) {List<Map<String, Object>> list = new ArrayList<>();while (rs.next()) {Map<String, Object> row = new HashMap<>();row.put("id", rs.getLong("id"));row.put("user_id", rs.getLong("user_id"));row.put("created_at", rs.getTimestamp("created_at"));row.put("status", rs.getString("status"));row.put("total_amount", rs.getBigDecimal("total_amount"));list.add(row);}return list;}}}private static void bulkInsertToEs(RestHighLevelClient client, List<Map<String, Object>> rows) throws IOException {BulkRequest bulkRequest = new BulkRequest();for (Map<String, Object> row : rows) {IndexRequest req = new IndexRequest(ES_INDEX).id(row.get("id").toString()) // 用订单ID做文档ID,保证幂等
                .source(row, XContentType.JSON);bulkRequest.add(req);}BulkResponse resp = client.bulk(bulkRequest, RequestOptions.DEFAULT);if (resp.hasFailures()) {System.err.println("Bulk insert failures: " + resp.buildFailureMessage());// 可以加上重试逻辑
        }}private static void markArchived(Connection conn, List<Map<String, Object>> rows) throws SQLException {String sql = "UPDATE orders SET archived = 1 WHERE id = ?";try (PreparedStatement ps = conn.prepareStatement(sql)) {for (Map<String, Object> row : rows) {ps.setLong(1, (long) row.get("id"));ps.addBatch();}ps.executeBatch();}}
}

说明

    1. 依赖
      Maven 需要加:

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.15</version> <!-- 版本需与你 ES 对应 -->
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.34</version>
</dependency>
    • 如果用 ES 8.x,可以换 elasticsearch-java 官方新客户端。

    • 批量导出
      每次从 MySQL 取 BATCH_SIZE 行(示例用 1000),按 ID 顺序递增分页,直到取不到数据。

    • 写入 ES
      使用 Bulk API,_id 设置为订单 ID,保证幂等性。

    • 更新归档标记
      用一个 archived 字段标记已归档,避免重复迁移。也可以改成 DELETE,不过推荐先标记、再统一删除。

    • 异常与重试
      实际中要加 失败重试断点续传(记录 lastId),以及 监控日志

3. 使用 pt-archiver(Percona Toolkit)

示例把旧数据搬到另一库或表:

pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \--dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1

pt-archiver 会把数据安全搬走,支持断点与 resume;搬完后再 run DELETE 或直接在源表上执行批量删除(pt-archiver 也有 --delete 选项,但这会在源表上 delete 并影响 undo)。


三、ES 建模与索引管理(历史数据放 ES 的推荐做法)

把订单放 ES 不是简单“直接复制列”,通常建议把订单及其子表(items/payments/discounts)做成一个 denormalized 文档,便于搜索与聚合。

1. 推荐 Index 策略

  • 按月索引orders-YYYY.MM(便于 ILM 管理、分区式索引)

  • 字段 mapping 示例(简化):

 
PUT /orders-2024.01
{"mappings": {"properties": {"order_id": {"type":"keyword"},"user_id": {"type":"keyword"},"created_at": {"type":"date", "format":"strict_date_optional_time||epoch_millis"},"status": {"type":"keyword"},"total_amount": {"type":"double"},"items": {"type": "nested","properties": {"sku_id": {"type":"keyword"},"qty": {"type":"integer"},"price": {"type":"double"},"title": {"type":"text"}}},"payments": {"properties": {"method": {"type":"keyword"},"paid_at": {"type":"date"}}}}}
}
  • LM(Index Lifecycle Management):设置 hot → warm → cold → delete 策略。例如:hot 30d,warm 365d,冷存储 7 年或根据法规。

2. 写入策略

  • 用 ES Bulk API 批量写入(batch 1000~5000,视网路与 ES 集群性能调节)

  • _id = order_id 保证幂等(如果后续有订单状态变化,做 upsert)

  • 若还要支持在线写入历史区(比如退款后更改),确保有 CDC 或消息机制把更新写入 ES(见下一节)

3. 查询策略

  • 近 6 个月的数据在 MySQL / 主库查询(事务/强一致)

  • 超过 6 个月的历史通过 ES 查询(全文、聚合)

  • 在应用层实现透明路由:先按时间判断去哪个数据源查询,或者先查询 MySQL,再 fallback ES(或并行查询合并结果)。


四、一致性 / 后续变更处理(退款 / 更新在归档后如何同步)

订单归档后仍可能发生的事:退款/拒付/订单状态修正。必须设计事件同步策略:

推荐做法(两种组合)

  1. CDC 驱动(推荐):

    • 使用 Debezium / Maxwell / Canal 把 MySQL binlog 变更发送到 Kafka,然后用 sink connector 写回 ES(针对归档后的数据也会被更新)。

    • 优点:可靠、可重放、可处理后续修改。

    • 在分区方案中:在交换分区并删除源表后,binlog 仍然可以捕捉绝大部分变更(注意:若你移除源表中的分区且在其他系统对订单做变更,必须保证这些变更仍被写到某个可捕捉的变更源,或在业务上禁止对历史订单做写操作,只允许通过退款服务触发更新并同时写 ES)。

  2. 事件驱动 + 应用层写 ES:

    • 业务中任何对订单的状态更新,同时写入 MySQL(如果涉及近期数据)并写一条“order_event” 到 Kafka;消费者订阅并把该变更写入 ES。

    • 对于已经归档的订单,消费者会在 ES 执行 update/upsert。

  3. 补偿任务(每日/每小时):

    • 定期扫描最近归档范围内(例如归档后 30 天)有变更的订单(通过 audit log 或变更表),并把它们补发到 ES。

    • 适合没有 CDC 的遗留系统,但需要维护变更日志。

注意:如果归档后不能接受更改(比如业务保证 6 个月后订单不再变更),那业务逻辑需要强制化(不允许更新),这在支付/退款场景通常不可接受,因此 CDC 或事件驱动是更健壮的选择。


五、可观测 / 审计 / 回滚策略

1. 审计表(示例)

CREATE TABLE archive_batches (batch_id BIGINT PRIMARY KEY AUTO_INCREMENT,partition_name VARCHAR(64),cutoff_date DATE,rows_exported BIGINT,es_index VARCHAR(128),es_count BIGINT,checksum VARCHAR(64),status ENUM('EXPORTING','VERIFIED','DELETED','FAILED') DEFAULT 'EXPORTING',created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);

每批次写入并更新状态,便于审计和人为回溯。

2. 校验策略

  • 导出完成后:SELECT COUNT(*) FROM tmp_tableGET /orders-*-/_count 对比

  • 对比 MD5 列(例如对每行做 md5(concat(fields)) 聚合,比较聚合值)

  • 随机抽样多行字段对比

3. 备份与回滚

  • 在 DROP TABLE 之前,把 tmp 表做 mysqldump 导出或导出 CSV 保存到 S3(保留 30 天),以便回滚。

  • 若发现 ES 写入错误,恢复方法:从 S3 或 tmp 表重跑 bulk 写入;或从 audit log 做 selective reindex。


六、性能 / 运维细节(保证主库稳定)

  • 批量大小:ES Bulk 建议 1k~5k(单请求大小 < 10–20MB),MySQL select batch 建议 1k~10k,根据 IO 与 RAM 调整。

  • 并发 worker:并行度控制,典型 4~8 个并行线程写 ES(不要压垮 ES)

  • 速率限制:根据主库负载自动调节(当主库 CPU/IO > threshold 时减速)

  • 事务边界:避免长事务(会增加 undo),使用小批量并发快提交

  • 监控指标:每批 rows exported, ES success count, export latency, MySQL load, replication lag, errors; 报警门槛


七、示例:完整实践脚本(分区 + EXCHANGE)总结步骤

  1. 计算 cutoff month (6 months ago) → 需要归档的 partition 列表

  2. 对每个 partition:

    • CREATE tmp table LIKE orders

    • ALTER TABLE orders EXCHANGE PARTITION (pYYYYMM) WITH TABLE tmp_table

    • launch exporter job (python/Logstash) to write tmp_table -> ES index orders-YYYY.MM

    • verify counts/checksum

    • BACKUP tmp_table (mysqldump -> S3) (可选,短期保留)

    • DROP TABLE tmp_table

    • insert archive_batches record with status=VERIFIED/DELETED

  3. ES ILM 管理旧索引(move to warm/cold or delete per retention)


八、常见问题与 FAQ(快速回答)

  • Q:归档会丢失后续退款等变更怎么办?
    A:使用 CDC(Debezium / Maxwell / Canal)或事件化链路把后续变更写入 ES;保持 archive consumer 对 archived orders 做 update。

  • Q:为什么不用直接在 MySQL 中 delete?
    A:DELETE 大量行会产生大量 undo/redo、binlog 放大、主从复制延迟,并占用空间直到 purge;分区/EXCHANGE 或拆批删除能极大降低影响。

  • Q:ES 会成为新的“写源”吗?
    A:ES 仅作为归档/查询/分析用。权威数据仍留在 MySQL(或冷备)。对账/结算仍需基于 DB 备份或独立账务库。

  • Q:如何处理敏感字段(PII)?
    A:归档前先脱敏/加密/mask(例如只保留 last4 digits,或使用哈希),并记录解密/访问权限流程,符合 GDPR/法规。


九、附:实用 SQL / 工具片段汇总

建分区(示例) 

ALTER TABLE orders
PARTITION BY RANGE (TO_DAYS(created_at)) (PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),PARTITION pmax VALUES LESS THAN MAXVALUE
);

EXCHANGE PARTITION

CREATE TABLE orders_p202401_tmp LIKE orders;
ALTER TABLE orders EXCHANGE PARTITION (p202401) WITH TABLE orders_p202401_tmp;
-- now orders_p202401_tmp holds data for that month

DROP tmp table(释放空间)

DROP TABLE orders_p202401_tmp;

pt-archiver(替代)

pt-archiver --source h=127.0.0.1,D=mydb,t=orders --where 'created_at < "2024-03-01"' \--dest h=127.0.0.1,D=archive_db,t=orders_archive --limit 1000 --commit-each --sleep 0.1

ES mapping 示例(简化)

PUT /orders-2024.01
{"mappings": {"properties": {"order_id": {"type":"keyword"},"user_id": {"type":"keyword"},"created_at": {"type":"date"},"status": {"type":"keyword"},"total_amount": {"type":"double"},"items": {"type":"nested", "properties": { "sku_id":{"type":"keyword"}, "qty":{"type":"integer"} } }}}
}

十、结论(一句话)

如果你能预先在表上做按时间分区并且没有外键,优先使用 EXCHANGE PARTITION + 批量导出到 ES + 校验 + DROP tmp 表 的流程。
如果不能分区,则使用 pt-archiver 或自研分批导出(小批量、限速、audit、备份),并通过 CDC/事件保证归档后仍能同步后续变更。


 

 

http://www.hskmm.com/?act=detail&tid=19423

相关文章:

  • Java基础核心问题 链接版
  • java作业
  • Insightly存储型XSS漏洞分析:通过链接名称注入恶意脚本
  • H3C交换机的配置学习-01
  • Python脚本生成包含标准的#ifndef保护宏的头文件
  • java实验作业和动手动脑
  • (第三次)Numpy Pandas
  • sg.帮我写一个类似于vb6窗体设计的PySimpleGUI布局设计助手
  • ABC325EF 题解
  • Win11 安装 Python
  • mysql的单表多大要考虑分库分表
  • 2025 采购传感器不踩坑!国内传感器优秀厂家清单:解决精度,防爆,极端环境难题
  • sg.有没有一个可视化辅助设计pysimplegui布局的小工具?
  • 无刷电机速度闭环控制
  • sg.如何使用PySimpleGUI调试器实时监控变量
  • 微信小程序云开发 授权手机号快捷登陆
  • newDay05
  • AtCoder Beginner Contest 425 ABCDEF 题目解析
  • sg.如何使用PySimpleGUI调试器窗口
  • 对话汇总:从东方哲学到可计算架构的演进
  • 25.9.27 继续MyBatis
  • MoeCTF 2025 二进制漏洞审计:boomboom_revenge
  • 集训总结(九)
  • 完整教程:操作系统之初识Linux
  • XJSOJ优化(Stylus脚本)
  • 使用mpm-itk让Apache以不同用户身份运行的完整指南
  • sg.如何打开PySimpleGUI调试器窗口?
  • 第6篇、Flask 表单处理与用户认证完全指南:从零到实战
  • 威联通 NAS Docker 容器更新详解:从备份、推送到重建的全流程指南
  • parameter和defparam的简单用法