当前位置: 首页 > news >正文

同步框架与底层消费机制解决方案梳理

同步框架与底层消费机制解决方案梳理

业务背景,在多集群分布式场景下。业务数据隔离的两个环境需要进行数据同步,如何设计同步方案,合理利用资源是本次的主题。

环境同步方案设计

整体架构概述

  • 分布式节点注册表(distribute_node_registry):记录所有可用的服务实例(IP和端口),并通过心跳机制维护其健康状态。metadata字段用于存储负载信息(如权重、当前负载系数等)。
  • 消息表(distribute_event、distribute_event_step):存储所有消息,通过状态机(status)和乐观锁(version)管理消息生命周期,确保幂等消费。
  • 调度器:一个独立的后台服务(可多实例部署,通过分布式锁协调),负责从消息表中获取待处理消息,并根据服务注册表中的实例负载情况,将消息分配给合适的消费者实例。
  • 消费者:实际处理消息的服务实例,定期从消息表中拉取分配给自己的消息,处理后更新消息状态。

服务注册与发现的“自制简化版”表结构设计

CREATE TABLE `distribute_node_registry` (`id` BIGINT AUTO_INCREMENT COMMENT '主键ID,自增长' PRIMARY KEY,`service_name` VARCHAR(128) NOT NULL COMMENT '服务名称,如:user-service, order-service',`node_address` VARCHAR(50) NOT NULL COMMENT '服务实例的IP+端口地址,如:192.168.1.10:10003',`status` TINYINT(1) NOT NULL DEFAULT '1' COMMENT '状态:0-离线,1-在线',`last_heartbeat_time` DATETIME NOT NULL COMMENT '最后一次心跳上报时间',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`modifier_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`metadata` TEXT COMMENT '元数据(JSON格式),用于存储额外信息,如:版本号、地区、权重等',`metadata_version` VARCHAR(20) NOT NULL DEFAULT '1' COMMENT '元数据版本',UNIQUE KEY `uk_service_instance` (`service_name`, `node_address`),KEY `idx_service_status` (`service_name`, `status`),KEY `idx_heartbeat_time` (`last_heartbeat_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式节点注册表';

索引说明:

  1. UNIQUE uk_service_instance (service_name, instance_ip): 唯一索引。防止同一个服务的同一个IP被重复插入。这是保证数据一致性的关键。
  2. KEY idx_service_status (service_name, status): 复合索引。加速“查询某个服务下所有在线实例”这个最常用的查询场景。
  3. KEY idx_heartbeat_time (last_heartbeat_time): 普通索引。加速定时任务中“扫描过期心跳”的操作,大幅提升性能。

1.服务实例心跳如何上报

每个分布式服务实例,在启动后,需要以一个固定的频率(例如每30秒)执行一个“心跳上报”操作。ON DUPLICATE KEY UPDATE 语法,利用唯一索引 uk_service_instance 来实现“存在则更新,不存在则插入”的 “upsert” 操作,非常高效和简洁。

INSERT INTO distribute_node_registry (service_name, node_address, instance_port, last_heartbeat_time, metadata) 
VALUES ('user-service', '192.168.1.10:10003', 8080, NOW(), '{"version": "1.0.0", "weight": 50}')
ON DUPLICATE KEY UPDATE last_heartbeat_time = VALUES(last_heartbeat_time),instance_port = VALUES(instance_port),metadata = VALUES(metadata),status = 1; -- 一旦上报心跳,立即将状态置为1(在线)

2.状态检查与失效标记

  • 频率:比心跳上报间隔稍长,例如每60秒一次。
  • 逻辑:检查所有记录,如果 last_heartbeat_time 超过了预设的“超时时间”(例如 90秒),则认为该实例已下线,将 status 字段更新为 0
UPDATE distribute_node_registry 
SET status = 0 
WHERE last_heartbeat_time < DATE_SUB(NOW(), INTERVAL 90 SECOND);

3. 数据清理

  • 频率:较低,例如每天凌晨执行一次。
  • 逻辑:清理掉长时间(例如7天)没有心跳的陈旧记录,防止表无限增大。
DELETE FROM distribute_node_registry 
WHERE last_heartbeat_time < DATE_SUB(NOW(), INTERVAL 7 DAY);

消息表结构设计


CREATE TABLE `distribute_event` (`id` BIGINT AUTO_INCREMENT COMMENT '主键ID,自增长' PRIMARY KEY,`creator` VARCHAR(30) NOT NULL DEFAULT '0' COMMENT '创建人',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`modifier` VARCHAR(30) NULL DEFAULT '0' COMMENT '修改人',`modifier_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`sort` INT NOT NULL DEFAULT 0 COMMENT '排序',`tenant_id` VARCHAR(30) NOT NULL COMMENT '租户ID',`code` VARCHAR(64) NOT NULL COMMENT '任务编码',`data_id` VARCHAR(64) NOT NULL COMMENT '操作实体ID,如仓库ID',`topic` VARCHAR(50) NOT NULL COMMENT '任务类型,如 SPACE_ADD',`status` VARCHAR(30) NOT NULL DEFAULT 'PENDING' COMMENT '任务状态: PENDING, RUNNING, SUCCESS, FAILED, COMPENSATING, COMPENSATED',`process_log` LONGTEXT NULL COMMENT '处理过程日志',`payload` LONGTEXT NULL COMMENT '本步骤执行所需数据',`progress` INT NULL COMMENT '处理进度',`result` LONGTEXT NULL COMMENT '最终结果信息',`retry_count` INT NOT NULL DEFAULT 0 COMMENT '重试次数',`max_retry` INT NOT NULL DEFAULT 3 COMMENT '最大重试次数',`node_address` VARCHAR(50) NULL COMMENT '处理节点IP+端口',`version` INT NOT NULL DEFAULT 1 COMMENT '乐观锁版本,用于并发控制',UNIQUE KEY `uk_code` (`code`),KEY `idx_topic_status` (`topic`, `status`),KEY `idx_node_address` (`node_address`),KEY `idx_status_modifier_time` (`status`, `modifier_time`),KEY `idx_tenant_id` (`tenant_id`),KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式事件表';

关键字段说明:

  • uk_code:确保任务编码唯一,用于幂等性控制。
  • idx_topic_status:加速按任务类型和状态查询任务。
  • idx_node_address:加速按处理节点查询任务。
  • idx_status_modifier_time:加速按状态和修改时间查询(用于超时检查)。
  • idx_tenant_id:支持按租户查询。
  • idx_create_time:支持按创建时间排序或查询。
CREATE TABLE `distribute_event_step` (`id` BIGINT AUTO_INCREMENT COMMENT '主键ID,自增长' PRIMARY KEY,`creator` VARCHAR(30) NOT NULL DEFAULT '0' COMMENT '创建人',`create_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`modifier` VARCHAR(30) NULL DEFAULT '0' COMMENT '修改人',`modifier_time` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`sort` INT NOT NULL DEFAULT 0 COMMENT '排序',`tenant_id` VARCHAR(30) NOT NULL COMMENT '租户ID',`code` VARCHAR(64) NOT NULL COMMENT '步骤编码',`job_code` VARCHAR(64) NOT NULL COMMENT '所属顶层任务ID,关联 distribute_event.code',`parent_step_id` BIGINT NULL COMMENT '父步骤ID,用于构建层级树(NULL 表示根步骤)',`step_type` VARCHAR(50) NOT NULL COMMENT '步骤类型,如 DOC_SYNC, TAG_SYNC, PERMISSION_SYNC',`step_name` VARCHAR(50) NOT NULL COMMENT '步骤名称,如 "同步文档标签"',`status` VARCHAR(30) NOT NULL DEFAULT 'PENDING' COMMENT '步骤状态: PENDING, EXECUTING, SUCCESS, FAILED, COMPENSATING, COMPENSATED',`payload` LONGTEXT NULL COMMENT '本步骤执行所需数据',`payload_version` VARCHAR(20) NOT NULL DEFAULT '1' COMMENT '负载数据版本',`compensation_data` LONGTEXT NULL COMMENT '补偿所需快照数据,用于回滚',`error_details` LONGTEXT NULL COMMENT '本步骤执行失败的错误详情',`retry_count` INT NOT NULL DEFAULT 0 COMMENT '本步骤重试次数',`max_retry` INT NOT NULL DEFAULT 3 COMMENT '最大重试次数',`executed_at` DATETIME NULL COMMENT '正向操作执行完成时间',`compensated_at` DATETIME NULL COMMENT '补偿操作执行完成时间',UNIQUE KEY `uk_code` (`code`),KEY `idx_job_code` (`job_code`),KEY `idx_parent_step_id` (`parent_step_id`),KEY `idx_status` (`status`),KEY `idx_tenant_id` (`tenant_id`),KEY `idx_executed_at` (`executed_at`),KEY `idx_compensated_at` (`compensated_at`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式事件步骤表';
  • uk_code:确保步骤编码唯一。
  • idx_job_code:加速按任务编码查询步骤。
  • idx_parent_step_id:加速查询子步骤。
  • idx_status:加速按状态查询步骤。
  • idx_tenant_id:支持按租户查询。
  • idx_executed_atidx_compensated_at:支持按执行时间查询。

如何确保消息不会重复消费?

1.消息生产时确保唯一性

  • 插入消息时,使用唯一code(如雪花算法生成的ID),避免重复消息。

示例SQL

INSERT INTO distribute_event (code, topic, payload, status, version)
VALUES ('unique_snowflake_id', 'ORDER_CREATED', '{"orderId": 123}', 'PENDING', 0);

2. 消息消费时原子性更新状态

  • 消费者不能直接获取消息,而是由调度器分配消息到具体消费者实例。调度器使用原子UPDATE操作将消息状态从PENDING更新为PROCESSING,并设置node_address

  • 调度器分配消息的SQL:

    UPDATE distribute_event
    SET status = 'PROCESSING', node_address = '目标IP:端口', version = version + 1, updated_at = NOW()
    WHERE status = 'PENDING'
    AND topic = 'ORDER_CREATED'
    where version = '查询到的版本'
    ORDER BY created_at ASC
    LIMIT 1; -- 每次分配一条消息,避免锁竞争
    

    这个UPDATE语句是原子的,确保一条消息只被分配一次。

3. 消费者处理消息后更新状态

  • 消费者处理消息后,必须更新状态为COMPLETEDFAILED,并增加retry_count(如果失败)。

  • 示例SQL:

    UPDATE distribute_event
    SET status = 'COMPLETED', consumed_at = NOW(), version = version + 1
    WHERE code = '消息code' AND node_address = '目标IP:端口';
    

    这里通过consumer_ip条件确保只有分配给的消费者才能更新状态,防止误操作。

4. 超时消息重置

  • 后台任务定期检查处理中超时的消息(可能由于消费者崩溃),将其重置为PENDING,以便重新分配。

  • 检查SQL:

    UPDATE distribute_event
    SET status = 'PENDING', node_address = NULL, version = version + 1
    WHERE status = 'PROCESSING'
    AND updated_at < DATE_SUB(NOW(), INTERVAL 5 MINUTE); -- 超时时间5分钟
    

    同时,结合服务注册表,如果node_address对应的实例已下线(心跳超时),则重置消息。

三、均匀分配消息到存活的机器服务

均匀分配依赖于服务注册表中的负载信息(metadata字段)和调度器的智能分配策略。

1. 服务实例上报负载信息

  • 每个服务实例(消费者)定期向服务注册表发送心跳,并在metadata中上报当前负载信息。例如:

    INSERT INTO distribute_node_registry (service_name, node_address, last_heartbeat_time, metadata) 
    VALUES ('order-service', '192.168.1.10:10003', NOW(), '{"load": 0.3, "capacity": 100, "processing_count": 5}')
    ON DUPLICATE KEY UPDATE last_heartbeat_time = VALUES(last_heartbeat_time),metadata = VALUES(metadata),status = 1;
    
    • load:当前负载系数(0-1),基于CPU、内存或处理中的消息数计算。
    • processing_count:当前正在处理的消息数(可以从消息表统计)。
    • capacity:最大处理能力,用于加权计算。

2. 调度器分配策略

调度器定期执行以下步骤:

a. 获取所有存活实例及其负载

SELECT node_address, metadata 
FROM distribute_node_registry 
WHERE service_name = 'kbs-service' AND status = 1 AND last_heartbeat_time > DATE_SUB(NOW(), INTERVAL 90 SECOND);

解析每个实例的metadata,获取loadcapacity

b. 选择目标实例
根据负载均衡策略选择实例:

  • 基于负载:选择负载最低的实例(load最小)。
  • 加权轮询:根据capacity计算权重,选择权重高的实例。
  • 最少连接:根据processing_count选择处理消息最少的实例。

示例代码(伪代码):

# 假设 instances 是查询到的实例列表
instances = [{'instance_ip': '192.168.1.10', 'metadata': {'load': 0.3, 'capacity': 100}},{'instance_ip': '192.168.1.11', 'metadata': {'load': 0.5, 'capacity': 200}}
]# 基于负载选择:选择负载最小的实例
target_instance = min(instances, key=lambda x: x['metadata']['load'])# 或者基于权重选择:计算权重(capacity / load),选择权重最高的
def get_weight(instance):metadata = instance['metadata']return metadata['capacity'] / (metadata['load'] + 0.001)  # 避免除零
target_instance = max(instances, key=get_weight)

c. 分配消息到目标实例
对于每个待分配的消息,执行UPDATE操作:

UPDATE distribute_event
SET status = 'PROCESSING', node_address = '目标IP:端口', version = version + 1, updated_at = NOW()
WHERE status = 'PENDING'
AND topic = 'ORDER_CREATED'
where version = '查询到的版本'
ORDER BY created_at ASC
LIMIT 1;

注意:为了减少数据库压力,可以一次分配多条消息(例如LIMIT 10),但需要确保分配均匀。

d. 调度器运行频率
调度器应每隔几秒(如5秒)运行一次,以适应负载变化。如果消息量大,可以增加调度频率。

3. 消费者拉取消息

  • 消费者实例定期查询消息表,获取分配给自己的消息(状态为PROCESSINGconsumer_ip为本机IP):

    SELECT * FROM distribute_event 
    WHERE node_address = '目标IP:端口' AND status = 'PROCESSING';
    
  • 消费者处理这些消息,并更新状态。

http://www.hskmm.com/?act=detail&tid=1241

相关文章:

  • QOJ1838 Intellectual Implementation 题解
  • P11967 [GESP202503 八级] 割裂 题解
  • 变量和运算符和类型
  • win 图片和视频文件无法显示缩略图如何解决?
  • 输入和输出
  • 三种语句
  • 力扣第5题最长回文子串
  • 用 Python 和 PaddleOCR 进行验证码识别
  • TASK 1 训练一个网络识别手写数字
  • 复杂背景验证码的识别思路与图像处理方法
  • Symfony学习笔记 - The Symfony Framework Best Practices
  • 大学军训
  • Vue Day3【综合案例2】vue小兔鲜儿
  • Java 基础知识解析
  • 力扣第3题 无重复字符的最长子串
  • UniApp 自定义导航栏
  • P3177 [HAOI2015] 树上染色
  • UniApp 自定义tabBar
  • NOIP2024复盘
  • Avalonia 学习笔记04. Page Navigation(页面导航) (转载)
  • 判断左手坐标系和右手坐标系的方法
  • 题解:P11894 「LAOI-9」Update
  • 题解:P2012 拯救世界2
  • 一键安装小雅Alist
  • 题解:AT_abc394_c [ABC394C] Debug
  • Lumion Pro 12.0 下载安装教程包含安装包下载、安装、激活超详细图文步骤
  • 题解:CF348C Subset Sums
  • 题解:CF351B Jeff and Furik
  • 题解:CF2118D1 Red Light, Green Light (Easy version)
  • js和vue的数据类型