当前位置: 首页 > news >正文

[Flink] Flink 经典场景:数据流输出到多个Sink

需求描述

  • Flink 数据流的处理过程中,需要将同一数据流输出到多个输出器(Sink)。

需求分析

  • 在处理数据流时,Flink 提供了一种称为侧输出流(Side Output)的机制,可以将主数据流分割成多个不同的侧输出流。

这种机制在处理不同类型的数据时非常有用,避免了多次复制数据流带来的性能浪费。

  • 使用场景

侧输出流主要有2个作用:

  • 分隔过滤:将源数据中的不同类型的数据进行分割处理。例如,可以将不同价格类型的订单从主流中分开处理。
  • 延时数据处理:在处理延时窗口计算时,对延时到达的数据进行处理,避免数据丢失。

解决方案

案例示范

import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @description 验证同一数据流,输出到 多个 sink (含: 侧流的方式)*/
public class FlinkJobDemo {private final static Logger log = LoggerFactory.getLogger(FlinkJobDemo.class);public static final String JOB_NAME = "DemoJob";public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // new StreamExecutionEnvironment();————> 错误示范,会报错: `NullPointerException: No execution.target specified in your configuration file.` )// 设置运行模式 | STREAMING, BATCH , AUTOMATICenvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//将配置设置成全局变量//environment.getConfig().setGlobalJobParameters(jobParameterTool);//加载数据源DataStreamSource<String> dataStreamSource = environment.fromElements( new String [] { "Hello", "World", "Bdp"});OutputTag< String > thirdSideOutput = new OutputTag<String>("ThirdSideOutput", TypeInformation.of(new TypeHint< String >() {  }));//第1个输出流(主流)dataStreamSource.keyBy( in -> in ).map( in -> { return in.toLowerCase(); } ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:1> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});SingleOutputStreamOperator< String > dataStreamSource2 =dataStreamSource.process(new ProcessFunction< String , String>() {@Overridepublic void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {collector.collect( value );//发送到主流context.output( thirdSideOutput, value );//发送到侧流}});//第2个输出流 (主流)dataStreamSource2.addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:2> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});//第3个输出流 (侧流)dataStreamSource2 //dataStreamSource (x).getSideOutput(thirdSideOutput).keyBy( in -> in ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:3> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});environment.execute(JOB_NAME);}
}

输出:

<sink:2> ts:1757422085116, input:"Hello"
<sink:2> ts:1757422085116, input:"Bdp"
<sink:2> ts:1757422085116, input:"World"
<sink:1> ts:1757422085048, input:"hello"
<sink:1> ts:1757422085048, input:"bdp"
<sink:1> ts:1757422085048, input:"world"<sink:3> ts:1757422085623, input:"Hello"
<sink:3> ts:1757422085650, input:"World"
<sink:3> ts:1757422085724, input:"Bdp"

X 参考文献

http://www.hskmm.com/?act=detail&tid=475

相关文章:

  • 都江堰操作系统
  • [OLAP/Doris] Doris 之表设计
  • cmov用法一例
  • 20250909 之所思 - 人生如梦
  • 认识人工智能-基础认知
  • Codeforces Round 1049 (Div. 2)(A~D)
  • 苹果im虚拟机协议群发系统,苹果imessage推信软件,苹果iMessage自动群发协议–持续更新中...
  • 【ChipIntelli 系列】SDK详解4——Makefile 设置 单SDK多工程文件夹实现方法
  • Codeforces Round 1049 (Div. 2)
  • 课前问题思考1
  • huggingface
  • 安全不是一个功能-而是一个地基
  • Python基础-27 match-case 使用教程
  • 从0到1实现Transformer模型-CS336作业1
  • 准备工作之结构体[基于郝斌课程]
  • 软工课程第一次作业
  • java学习起航喽
  • 初始化树莓派(Raspberry Pi)系统并以 ssh 连接教程(只需读卡器、手机开热点,无需显示器) - tsunchi
  • 从windows 自动进入BIOS
  • 软件工程第一次作业
  • Morpheus 审计报告分享:AAVE 项目 Pool 合约地址更新导致的组合性风险
  • Offer发放革命:Moka软件如何将平均入职转化率提升25%
  • U3D动作游戏开发读书笔记--2.1一些通用的预备知识
  • 常见的一些Dos命令
  • AUC和ROC
  • CF
  • Word中VBA提取人名所在的页码
  • Ubuntu 安装 VSCode
  • A
  • ARC