当前位置: 首页 > news >正文

ClickHouse UPDATE 机制详解 - 若

ClickHouse UPDATE 机制详解

问题现象

在使用ClickHouse进行UPDATE操作时,经常会遇到这样的现象:

UPDATE ethereum.block_tasks SETstatus = 'pending', owner = 'consumer-1_1758676754070328000', assigned_at = '2025-09-24 09:19:14.07', updated_at = '2025-09-24 09:19:14.07'
WHERE start_block = 12345;

执行结果:

  • RowsAffected = 0
  • 但通过SELECT查询却能查到更新后的数据 ✅

这种看似矛盾的现象让很多开发者困惑,实际上是ClickHouse UPDATE机制的正常行为。

ClickHouse UPDATE机制原理

1. 异步Mutations机制

ClickHouse的UPDATE操作不是传统的就地更新,而是通过mutations机制异步处理:

传统数据库UPDATE:
[数据] → [直接修改] → [立即生效]ClickHouse UPDATE:
[数据] → [创建mutation] → [后台异步处理] → [最终生效]

2. 执行流程

UPDATE table SET column = 'value' WHERE condition;

执行步骤:

  1. 提交mutation:ClickHouse立即返回,但实际更新在后台进行
  2. 异步处理:后台进程处理mutation,重写相关数据块
  3. 最终一致性:查询时总是返回最新数据

3. 为什么RowsAffected = 0

  • RowsAffected = 0 表示mutation已成功提交到队列
  • 不表示实际影响的行数
  • 实际更新在后台异步进行
  • 这是ClickHouse的设计特性,不是错误

监控Mutation状态

1. 查看Mutation队列

-- 查看所有mutations
SELECT mutation_id,table,command,create_time,is_done,latest_failed_part,latest_fail_reason
FROM system.mutations 
WHERE table = 'block_tasks' 
ORDER BY create_time DESC 
LIMIT 10;

2. 监控执行进度

-- 查看未完成的mutations
SELECT count() as pending_mutations
FROM system.mutations 
WHERE table = 'block_tasks' 
AND is_done = 0;-- 查看最近的mutation详情
SELECT mutation_id,create_time,is_done,elapsed_time
FROM system.mutations 
WHERE table = 'block_tasks' 
ORDER BY create_time DESC 
LIMIT 1;

3. 检查Mutation性能

-- 查看mutation性能统计
SELECT table,count() as total_mutations,sum(is_done) as completed_mutations,avg(elapsed_time) as avg_elapsed_time
FROM system.mutations 
WHERE table = 'block_tasks'
GROUP BY table;

等待时间估算

1. 影响因子

因素 影响程度 说明
数据量 数据越多,处理时间越长
数据块数量 每个块需要单独处理
系统负载 CPU、内存、磁盘I/O
UPDATE复杂度 子查询、批量更新
并发度 其他mutation的竞争

2. 典型等待时间

小表(<10万行):     5-30秒
中等表(10万-1000万行):  1-10分钟
大表(>1000万行):   10-60分钟

3. 实际测试数据

-- 测试不同规模的UPDATE时间
-- 100万行数据,简单UPDATE:约2-5分钟
-- 1000万行数据,批量UPDATE:约10-30分钟
-- 复杂子查询UPDATE:时间增加2-5倍

代码实现方案

1. Go语言等待实现

package mainimport ("fmt""time""gorm.io/gorm"
)// 等待mutation完成的通用函数
func waitForMutation(db *gorm.DB, tableName string, timeout time.Duration) error {start := time.Now()ticker := time.NewTicker(1 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:var count int64err := db.Raw("SELECT count() FROM system.mutations WHERE table = ? AND is_done = 0", tableName).Scan(&count).Errorif err != nil {return err}if count == 0 {fmt.Printf("Mutation completed in %v\n", time.Since(start))return nil}// 检查超时if time.Since(start) > timeout {return fmt.Errorf("mutation timeout after %v", timeout)}case <-time.After(timeout):return fmt.Errorf("mutation timeout after %v", timeout)}}
}// 使用示例
func updateBlockTask(db *gorm.DB, taskID int64) error {// 执行UPDATEerr := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err != nil {return err}// 等待mutation完成return waitForMutation(db, "block_tasks", 5*time.Minute)
}

2. 带进度显示的等待

// 显示mutation进度的等待函数
func waitForMutationWithProgress(db *gorm.DB, tableName string, timeout time.Duration) error {start := time.Now()ticker := time.NewTicker(2 * time.Second)defer ticker.Stop()for {select {case <-ticker.C:var mutations []struct {MutationID string    `gorm:"column:mutation_id"`CreateTime time.Time `gorm:"column:create_time"`IsDone     bool      `gorm:"column:is_done"`}err := db.Raw(`SELECT mutation_id, create_time, is_done FROM system.mutations WHERE table = ? AND is_done = 0 ORDER BY create_time DESC LIMIT 5`, tableName).Scan(&mutations).Errorif err != nil {return err}if len(mutations) == 0 {fmt.Printf("All mutations completed in %v\n", time.Since(start))return nil}// 显示进度elapsed := time.Since(start)fmt.Printf("Mutation in progress for %v, %d pending...\n", elapsed, len(mutations))// 检查超时if elapsed > timeout {return fmt.Errorf("mutation timeout after %v", timeout)}case <-time.After(timeout):return fmt.Errorf("mutation timeout after %v", timeout)}}
}

3. 批量更新优化

// 批量更新,减少mutation数量
func batchUpdateTasks(db *gorm.DB, tasks []*model.BlockTask) error {return db.Transaction(func(tx *gorm.DB) error {for _, task := range tasks {err := tx.Table(model.BlockTaskTableName).Where("start_block = ? AND end_block = ?", task.StartBlock, task.EndBlock).Updates(map[string]interface{}{"status":     task.Status,"updated_at": time.Now(),}).Errorif err != nil {return err}}return nil})
}

最佳实践

1. 设置合理的超时时间

// 根据表大小动态设置超时时间
func getMutationTimeout(tableSize int64) time.Duration {switch {case tableSize < 100000:return 1 * time.Minutecase tableSize < 1000000:return 5 * time.Minutecase tableSize < 10000000:return 15 * time.Minutedefault:return 30 * time.Minute}
}

2. 异步处理策略

// 对于非关键更新,使用异步处理
func asyncUpdate(db *gorm.DB, taskID int64) {go func() {err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err != nil {log.Printf("Async update failed: %v", err)}}()
}

3. 错误处理和重试

// 带重试的更新函数
func updateWithRetry(db *gorm.DB, taskID int64, maxRetries int) error {for i := 0; i < maxRetries; i++ {err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Errorif err == nil {// 等待mutation完成err = waitForMutation(db, "block_tasks", 5*time.Minute)if err == nil {return nil}}if i < maxRetries-1 {time.Sleep(time.Duration(i+1) * time.Second) // 指数退避}}return fmt.Errorf("update failed after %d retries", maxRetries)
}

常见问题解决

1. Mutation卡住不动

-- 检查是否有失败的mutations
SELECT mutation_id,latest_failed_part,latest_fail_reason
FROM system.mutations 
WHERE table = 'block_tasks' 
AND is_done = 0 
AND latest_failed_part != '';

2. 性能优化

-- 优化mutation性能的设置
ALTER TABLE block_tasks MODIFY SETTING number_of_mutations_to_throw = 100,number_of_mutations_to_delay = 50;

3. 监控和告警

// 监控mutation积压
func monitorMutationBacklog(db *gorm.DB) {ticker := time.NewTicker(30 * time.Second)go func() {for range ticker.C {var count int64db.Raw("SELECT count() FROM system.mutations WHERE is_done = 0").Scan(&count)if count > 10 {log.Printf("Warning: %d mutations pending", count)}}}()
}

总结

ClickHouse的UPDATE机制具有以下特点:

  1. 异步处理:UPDATE立即返回,实际更新在后台进行
  2. 最终一致性:查询时总是返回最新数据
  3. RowsAffected不可靠:不能依赖此值判断更新是否成功
  4. 需要等待机制:通过监控system.mutations表等待完成
  5. 性能考虑:大表更新可能需要较长时间

关键要点:

  • 理解异步机制,不要被RowsAffected = 0误导
  • 实现等待机制,确保更新完成
  • 设置合理的超时时间
  • 监控mutation状态,及时发现问题
  • 考虑异步处理,提高系统响应性

这种机制虽然增加了复杂性,但提供了更好的并发性能和最终一致性保证。

http://www.hskmm.com/?act=detail&tid=15639

相关文章:

  • ClickHouse index_granularity 详解 - 若
  • P13885 [蓝桥杯 2023 省 Java/Python A] 反异或 01 串
  • clickhouse轻量级更新 - 若
  • 西电PCB设计指南第3章学习笔记
  • Vitrualbox、kali、metaspolitable2下载安装
  • LazyLLM端到端实战:用RAG+Agent实现自动出题与学习计划的个性化学习助手智能体
  • 补充图
  • 【阿里云事件总线】域名+邮件推送+事件总线=实现每天定时邮件!
  • llm入门环境
  • FLASH空间划分/存储数据至指定CODEFLASH位置
  • SOOMAL 降噪数据表
  • 案例分享|借助IronPDF IronOCR,打造医疗等行业的智能化解决方案
  • ClickHouse UPDATE 操作问题解决方案 - 若
  • 利用 Milvus + RustFS,快速打造一个 RAG!
  • Docker 私有镜像仓库 Harbor 安装部署带签名认证
  • ARC180 做题记
  • 借助Aspose.HTML控件,使用 Python 编辑 HTML
  • 微前端 micro-app 在vue 中的路由跳转问题
  • 1. 设计模式--工厂办法模式
  • 汽车视频总线采集过程中,如何兼顾响应速度和可靠性?
  • P8865 [NOIP2022] 种花
  • traefik 反向代理 + IdentityServer4
  • 麦角硫因制备关键技术和设备
  • 2025年十大好用网盘推荐:功能、口碑与性价比大对比
  • 卡特兰数
  • Word-通过宏格式化文档中的表格和图片
  • 反向代理 traefik - 健康检查
  • 一些想法 - CelestialZ
  • 使用 Ansible 批量安装 Docker
  • 编程规范---日志规范