需求描述
- Flink 数据流的处理过程中,需要将同一数据流输出到多个输出器(Sink)。
需求分析
Flink 的侧流
- 在处理数据流时,Flink 提供了一种称为侧输出流(Side Output)的机制,可以将主数据流分割成多个不同的侧输出流。
这种机制在处理不同类型的数据时非常有用,避免了多次复制数据流带来的性能浪费。
- 使用场景
侧输出流主要有2个作用:
- 分隔过滤:将源数据中的不同类型的数据进行分割处理。例如,可以将不同价格类型的订单从主流中分开处理。
- 延时数据处理:在处理延时窗口计算时,对延时到达的数据进行处理,避免数据丢失。
解决方案
案例示范
import com.alibaba.fastjson2.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @description 验证同一数据流,输出到 多个 sink (含: 侧流的方式)*/
public class FlinkJobDemo {private final static Logger log = LoggerFactory.getLogger(FlinkJobDemo.class);public static final String JOB_NAME = "DemoJob";public static void main(String[] args) throws Exception {StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // new StreamExecutionEnvironment();————> 错误示范,会报错: `NullPointerException: No execution.target specified in your configuration file.` )// 设置运行模式 | STREAMING, BATCH , AUTOMATICenvironment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//将配置设置成全局变量//environment.getConfig().setGlobalJobParameters(jobParameterTool);//加载数据源DataStreamSource<String> dataStreamSource = environment.fromElements( new String [] { "Hello", "World", "Bdp"});OutputTag< String > thirdSideOutput = new OutputTag<String>("ThirdSideOutput", TypeInformation.of(new TypeHint< String >() { }));//第1个输出流(主流)dataStreamSource.keyBy( in -> in ).map( in -> { return in.toLowerCase(); } ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:1> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});SingleOutputStreamOperator< String > dataStreamSource2 =dataStreamSource.process(new ProcessFunction< String , String>() {@Overridepublic void processElement(String value, ProcessFunction<String, String>.Context context, Collector<String> collector) throws Exception {collector.collect( value );//发送到主流context.output( thirdSideOutput, value );//发送到侧流}});//第2个输出流 (主流)dataStreamSource2.addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:2> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});//第3个输出流 (侧流)dataStreamSource2 //dataStreamSource (x).getSideOutput(thirdSideOutput).keyBy( in -> in ).addSink(new RichSinkFunction<String>() {@Overridepublic void open(Configuration parameters) throws Exception {}public void invoke(String input, Context context) throws Exception {System.out.println(String.format("<sink:3> ts:%d, input:%s", System.currentTimeMillis(), JSON.toJSONString( input ) ));}});environment.execute(JOB_NAME);}
}
输出:
<sink:2> ts:1757422085116, input:"Hello"
<sink:2> ts:1757422085116, input:"Bdp"
<sink:2> ts:1757422085116, input:"World"
<sink:1> ts:1757422085048, input:"hello"
<sink:1> ts:1757422085048, input:"bdp"
<sink:1> ts:1757422085048, input:"world"<sink:3> ts:1757422085623, input:"Hello"
<sink:3> ts:1757422085650, input:"World"
<sink:3> ts:1757422085724, input:"Bdp"