一、存在则忽略(只插入全新用户)
from clickhouse_driver import Client import pandas as pd client = Client(host='localhost', port=9000, database='default')# 0) 待写入的新数据 df_new = pd.DataFrame({ 'user_id': [1, 2, 3, 4], 'balance': [100, 200, 300, 400], 'snap_date': pd.to_datetime(['2025-10-17'] * 4) })# 1) 正式表 CREATE TABLE IF NOT EXISTS balance_ignore ( user_id UInt32, balance Float64, snap_date Date ) ENGINE = MergeTree() ORDER BY user_id; -- 主键# 2) 落地临时表 client.execute('DROP TABLE IF EXISTS tmp_balance') client.execute('CREATE TABLE tmp_balance AS balance_ignore ENGINE = Memory') client.insert_dataframe('INSERT INTO tmp_balance VALUES', df_new)# 3) + 4) 反连接后插入 client.execute(''' INSERT INTO balance_ignore SELECT t.* FROM tmp_balance AS t LEFT JOIN balance_ignore AS u USING (user_id) WHERE u.user_id = 0 -- 0 表示“找不到”,ClickHouse 反连接特征值 ''')# 5) 验证 print(client.query_dataframe('SELECT * FROM balance_ignore ORDER BY user_id'))
二、存在则覆盖(Insert-or-replace)
思路
① 把表改成 ReplacingMergeTree
(相同主键只保留最后一条)
② 无脑 INSERT
即可,无需任何过滤
③ 查询时加 FINAL
或再 GROUP BY
拿到“最新版本”
# -- 1) 覆盖型表 CREATE TABLE IF NOT EXISTS balance_replace (user_id UInt32,balance Float64,snap_date Date ) ENGINE = ReplacingMergeTree() -- 关键:按 ORDER BY 去重 ORDER BY user_id; # 2) 第一次写入 client.insert_dataframe('INSERT INTO balance_replace VALUES', df_new)# 3) 模拟“余额变更”再写一次(user_id=1,2 余额变了) df_update = pd.DataFrame({'user_id': [1, 2],'balance': [999, 888],'snap_date': pd.to_datetime(['2025-10-18'] * 2) }) client.insert_dataframe('INSERT INTO balance_replace VALUES', df_update)