📡 第四步:部署 Fluent Bit Kafka → Elasticsearch(消费端)
在第三步我们实现了日志采集端的 DaemonSet 部署,将日志采集并推送到 Apache Kafka。
这一阶段的目标是:
✅ 订阅 Kafka 中的日志 → 解析 payload → 清洗处理 → 写入 Elasticsearch。
🧾 1. ConfigMap 配置详解
📜 主配置 fluent-bit.conf
[SERVICE]Flush 1Log_Level debugDaemon OffParsers_File /fluent-bit/etc/parser-payload.conf@INCLUDE input-kafka.conf
@INCLUDE filter-unpack-payload.conf
@INCLUDE output-es.conf
⚠️ 注意:
Flush
控制数据刷新的时间(秒),1
表示实时性较高。Log_Level
建议调试期使用debug
,生产环境可改为info
。Parsers_File
指定了解析器配置路径。
📨 输入配置 input-kafka.conf
[INPUT]Name kafkaBrokers 172.22.162.152:31092Topics k8s-pod-logsGroup_Id fluentbit-esrdkafka.enable.auto.commit truerdkafka.socket.keepalive.enable truerdkafka.queued.min.messages 100000rdkafka.fetch.wait.max.ms 50
字段 | 说明 | 注意事项 |
---|---|---|
Name |
指定输入插件类型,这里为 kafka | 必须启用 Fluent Bit 的 Kafka 插件 |
Brokers |
Kafka 地址 | ⚠️ 确保网络连通性和防火墙开放 |
Topics |
订阅的 Kafka Topic | 必须与采集端推送的 Topic 一致 |
Group_Id |
消费组 | 用于 Kafka 消费管理 |
auto.commit |
自动提交 offset | 建议开启,防止重复消费 |
⚠️ 建议
Group_Id
合理命名并固定,避免重启后重新消费旧日志。
🧰 解析配置 parser-payload.conf
[PARSER]Name json_payloadFormat jsonTime_Key @timestampTime_Format %Y-%m-%dT%H:%M:%S.%LZ
用于解析日志中的 payload 字段,将其结构化,便于后续过滤和输出。
🧪 过滤配置 filter-unpack-payload.conf
[FILTER]Name parserMatch *Key_Name payloadParser json_payloadReserve_Data TruePreserve_Key False[FILTER]Name modifyMatch *Remove topicRemove partitionRemove offsetRemove key[FILTER]Name modifyMatch *Add source_file /workspace/stdout/stdout.log
功能 | 说明 |
---|---|
parser |
解包 payload JSON |
modify |
删除不必要字段,减少存储冗余 |
add |
增加字段,便于溯源 |
⚠️ 这一步非常关键,否则写入 Elasticsearch 时字段可能混乱或冗余。
📤 输出配置 output-es.conf
[OUTPUT]Name esMatch *Host 172.22.162.154Port 9200HTTP_User elasticHTTP_Passwd qhPR1mAZzcIndex assignment_logLogstash_Format OffReplace_Dots OnTime_Key @timestampSuppress_Type_Name OnRetry_Limit False
字段 | 说明 | 注意事项 |
---|---|---|
Name |
输出插件,这里为 es |
需要支持 Elasticsearch 输出插件 |
Host / Port |
ES 地址 | 确保连通性 |
HTTP_User / Passwd |
认证信息 | ⚠️ 建议使用安全凭证管理(Secret) |
Index |
写入的索引名称 | 可用动态变量构造(如 %Y.%m.%d ) |
Replace_Dots |
替换字段名中的 . |
避免 ES 索引报错 |
Suppress_Type_Name |
禁用 _type |
符合 ES 7+ 规范 |
📦 2. Deployment 部署配置
apiVersion: apps/v1
kind: Deployment
metadata:name: fluent-bit-kafkanamespace: logginglabels:app: fluent-bit-kafka
spec:replicas: 1selector:matchLabels:app: fluent-bit-kafkatemplate:metadata:labels:app: fluent-bit-kafkaspec:serviceAccountName: fluent-bitcontainers:- name: fluent-bitimage: 172.22.162.152:30020/library/fluent-bit-kylin:4.0.1imagePullPolicy: IfNotPresentvolumeMounts:- name: configmountPath: /fluent-bit/etc/volumes:- name: configconfigMap:name: fluent-bit-kafka-config
⚠️ 3. 部署与使用注意事项
🧭 命名空间与权限
- 确保
namespace
和serviceAccountName
与前面步骤一致。 - 必须有
ClusterRole
与ClusterRoleBinding
赋权,否则可能访问 API 出错。
🧰 ConfigMap 挂载
volumeMounts
路径必须与 Fluent Bit 镜像的配置路径一致。- 若你自定义了路径,需修改
fluent-bit.conf
的Parsers_File
路径。
🧠 Kafka 与 ES 连接
- 确保 Kafka broker 地址和 topic 配置正确。
- Elasticsearch 必须开放访问权限,账号密码正确。
- 如果 ES 启用了 HTTPS,需要额外配置 TLS。
🧪 调试建议
# 查看 Deployment 状态
kubectl get deploy -n logging# 查看 Pod 是否正常启动
kubectl get pods -n logging -o wide# 查看日志是否消费成功
kubectl logs -n logging -l app=fluent-bit-kafka# 查看索引是否写入
curl -u elastic:qhPR1mAZzc http://172.22.162.154:9200/_cat/indices?v
🧼 4. 常见问题排查
问题 | 可能原因 | 解决方案 |
---|---|---|
Pod CrashLoopBackOff | ConfigMap 路径错误 | 检查 volumeMounts 与文件名 |
日志无法消费 | Kafka 地址或 Topic 错误 | 使用 Kafka CLI 验证 |
写入 ES 报错 | 字段冲突或认证错误 | 检查 filter 配置和账号密码 |
@timestamp 显示不对 | Parser 时间格式错误 | 校对 Time_Format 与实际日志格式 |
📝 小结
✅ 至此,日志采集链路完整如下:
[容器日志] → [Fluent Bit DaemonSet] → [Kafka Topic] → [Fluent Bit Kafka Deployment] → [Elasticsearch 索引]
- 🧠 Kafka 解耦采集与存储
- 🔐 RBAC 确保访问安全
- 🪄 解析与过滤优化 ES 存储
- 🧪 通过
kubectl logs
+curl ES
可快速验证链路是否畅通。