当前位置: 首页 > news >正文

从零开始学Flink:数据转换的艺术

在实时数据处理流程中,数据转换(Transformation)是连接数据源与输出结果的桥梁,也是体现计算逻辑的核心环节。Flink提供了丰富的数据转换操作,让开发者能够灵活地对数据流进行各种处理和分析。本文将以Flink DataStream API为核心,带你探索Flink数据转换的精妙世界,并结合之前文章中的Kafka Source实现一个完整的数据处理流程。

一、数据转换概览

数据转换是指将原始输入数据通过一系列操作转换为所需输出结果的过程。在Flink中,这些操作主要分为以下几类:

  • 基本转换:如映射(Map)、过滤(Filter)、扁平映射(FlatMap)等
  • 键控转换:如分组(KeyBy)、聚合(Reduce、Aggregate)等
  • 多流转换:如联合(Union)、连接(Join)、拆分(Split)等
  • 状态转换:如键控状态(Keyed State)、算子状态(Operator State)等

这些转换操作就像数据的"加工厂",让原始数据经过一系列"工序"后,变成有价值的信息产品。

二、环境准备与依赖配置

为了演示数据转换,我们将继续使用之前文章中的Kafka Source环境。如果您已经完成了《从零开始学Flink:数据源》中的环境搭建,可以直接使用现有配置;如果还没有,请先参考该文章完成环境准备。

1. 版本说明

  • Flink:1.20.1
  • Kafka:3.4.0
  • JDK:17+
  • gradle 8.3+

2. 核心依赖

除了基础的Flink和Kafka依赖外,我们在本文中将引入一些额外的依赖来支持更丰富的数据处理场景:

dependencies {// Flink核心依赖implementation 'org.apache.flink:flink-java:1.20.1'implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'// Flink Kafka Connectorimplementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'// 日志依赖implementation 'org.apache.logging.log4j:log4j-api:2.17.1'implementation 'org.apache.logging.log4j:log4j-core:2.17.1'implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'// JSON处理库(用于处理JSON格式数据)implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
}

三、基本转换操作

基本转换是Flink中最常用、最简单的数据转换操作,它们对数据流中的每个元素进行独立处理,不涉及状态管理。

1. 映射(Map)

Map操作将输入流中的每个元素转换为另一个元素。例如,将字符串转换为大写:

// 从Kafka读取字符串数据
DataStream<String> kafkaStream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(), "Kafka Source"
);// 使用Map将字符串转换为大写
DataStream<String> upperCaseStream = kafkaStream.map(s -> s.toUpperCase());upperCaseStream.print("UppercaseData");

2. 过滤(Filter)

Filter操作根据条件过滤掉不需要的元素,只保留满足条件的元素:

// 过滤出包含"flink"关键词的消息
DataStream<String> filteredStream = kafkaStream.filter(s -> s.toLowerCase().contains("flink"));filteredStream.print("FilteredData");

3. 扁平映射(FlatMap)

FlatMap操作类似于Map,但它可以将一个元素转换为零个、一个或多个元素,常用于数据拆分场景:

// 将每行文本拆分为单词
DataStream<String> wordStream = kafkaStream.flatMap((String value, Collector<String> out) -> {// 按空格拆分字符串String[] words = value.split(" ");// 将每个单词发送到输出流for (String word : words) {out.collect(word);}
});wordStream.print("WordData");

四、键控转换操作

键控转换是基于键(Key)对数据进行分组和聚合的操作,是实现复杂业务逻辑的基础。

1. 分组(KeyBy)

KeyBy操作根据指定的键将数据流划分为不同的分区,具有相同键的元素将被发送到同一个分区进行处理:

// 假设我们的Kafka消息格式为"userId:message"
// 先将消息拆分为用户ID和消息内容
DataStream<Tuple2<String, String>> userMessageStream = kafkaStream.flatMap((String value, Collector<Tuple2<String, String>> out) -> {if (value.contains(":")) {String[] parts = value.split(":", 2);if (parts.length == 2) {out.collect(new Tuple2<>(parts[0], parts[1]));}}
});// 按键分组(这里以用户ID为键)
KeyedStream<Tuple2<String, String>, String> keyedStream = userMessageStream.keyBy(tuple -> tuple.f0);

2. 聚合(Reduce)

Reduce操作对KeyedStream进行聚合,常用于计算总和、最大值等:

// 假设我们的消息格式为"userId:count",其中count是数字
// 先将消息转换为(userId, count)元组
DataStream<Tuple2<String, Integer>> userCountStream = kafkaStream.flatMap((String value, Collector<Tuple2<String, Integer>> out) -> {if (value.contains(":")) {String[] parts = value.split(":");if (parts.length == 2) {try {int count = Integer.parseInt(parts[1]);out.collect(new Tuple2<>(parts[0], count));} catch (NumberFormatException e) {// 处理格式错误LOG.warn("Invalid number format: {}", parts[1]);}}}
});// 按键分组
KeyedStream<Tuple2<String, Integer>, String> keyedCountStream = userCountStream.keyBy(tuple -> tuple.f0);// 使用Reduce计算每个用户的总计数
DataStream<Tuple2<String, Integer>> sumStream = keyedCountStream.reduce((value1, value2) -> new Tuple2<>(value1.f0, value1.f1 + value2.f1)
);sumStream.print("SumData");

3. 自定义聚合(Aggregate)

对于更复杂的聚合需求,可以使用Aggregate操作,它提供了更灵活的聚合方式:

// 计算每个用户消息的平均值长度
DataStream<Tuple2<String, Double>> avgLengthStream = keyedStream.aggregate(new AggregateFunction<Tuple2<String, String>, Tuple2<Integer, Integer>, Double>() {// 创建初始累加器@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return new Tuple2<>(0, 0); // (总长度, 消息数量)}// 将元素添加到累加器@Overridepublic Tuple2<Integer, Integer> add(Tuple2<String, String> value, Tuple2<Integer, Integer> accumulator) {return new Tuple2<>(accumulator.f0 + value.f1.length(), accumulator.f1 + 1);}// 获取聚合结果@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f1 > 0 ? (double) accumulator.f0 / accumulator.f1 : 0;}// 合并累加器(用于并行计算)@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);}
});avgLengthStream.print("AvgLengthData");

五、多流转换操作

在实际应用中,我们经常需要处理多个数据流。Flink提供了多种多流转换操作,让我们能够灵活地处理复杂的数据场景。

1. 联合(Union)

Union操作可以将多个同类型的数据流合并为一个数据流:

// 假设我们有两个Kafka主题,都产生字符串数据
KafkaSource<String> kafkaSource1 = KafkaSource.<String>builder().setBootstrapServers(kafkaBootstrapServers).setTopics("topic1").setGroupId(consumerGroup).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)).setStartingOffsets(OffsetsInitializer.earliest()).build();KafkaSource<String> kafkaSource2 = KafkaSource.<String>builder().setBootstrapServers(kafkaBootstrapServers).setTopics("topic2").setGroupId(consumerGroup).setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)).setStartingOffsets(OffsetsInitializer.earliest()).build();// 创建两个数据流
DataStream<String> stream1 = env.fromSource(kafkaSource1, WatermarkStrategy.noWatermarks(), "Kafka Source 1");
DataStream<String> stream2 = env.fromSource(kafkaSource2, WatermarkStrategy.noWatermarks(), "Kafka Source 2");// 合并两个数据流
DataStream<String> unionStream = stream1.union(stream2);unionStream.print("UnionData");

2. 连接(Connect)

Connect操作可以连接两个不同类型的数据流,保留各自的数据类型,适用于需要对不同类型数据进行协同处理的场景:

// 假设我们有一个用户数据流和一个订单数据流
// 用户数据流格式:userId:username
DataStream<Tuple2<String, String>> userStream = kafkaStream1.flatMap((String value, Collector<Tuple2<String, String>> out) -> {if (value.contains(":")) {String[] parts = value.split(":");if (parts.length == 2) {out.collect(new Tuple2<>(parts[0], parts[1]));}}
});// 订单数据流格式:orderId:userId:amount
DataStream<Tuple3<String, String, Double>> orderStream = kafkaStream2.flatMap((String value, Collector<Tuple3<String, String, Double>> out) -> {if (value.contains(":")) {String[] parts = value.split(":");if (parts.length == 3) {try {double amount = Double.parseDouble(parts[2]);out.collect(new Tuple3<>(parts[0], parts[1], amount));} catch (NumberFormatException e) {LOG.warn("Invalid number format: {}", parts[2]);}}}
});// 按键连接两个数据流(这里以用户ID为键)
ConnectedStreams<Tuple2<String, String>, Tuple3<String, String, Double>> connectedStreams = userStream.keyBy(tuple -> tuple.f0).connect(orderStream.keyBy(tuple -> tuple.f1));// 处理连接后的数据流
DataStream<String> resultStream = connectedStreams.map(// 处理用户数据user -> "User: " + user.f1,// 处理订单数据order -> "Order from user " + order.f1 + ", amount: " + order.f2
);resultStream.print("ConnectedData");

六、实战案例:实时日志分析系统

现在,让我们结合之前学到的Kafka Source和本文介绍的数据转换操作,实现一个简单的实时日志分析系统。

1. 需求分析

我们需要从Kafka读取应用程序日志,实时分析日志级别分布、错误日志数量以及按小时统计的日志量。

2. 数据模型

假设我们的日志格式为:timestamp|logLevel|message,例如:2025-09-22 12:30:45|ERROR|Failed to connect to database

3. 完整代码实现

创建一个主类LogAnalysisDemo,用于实现实时日志分析系统的逻辑。

package com.cn.daimajiangxin.flink.transformation;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;public class LogAnalysisDemo {private static final Logger LOG = LoggerFactory.getLogger(LogAnalysisDemo.class);private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static void main(String[] args) throws Exception {// 1. 创建Flink流执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 启用检查点env.enableCheckpointing(10000); // 每10秒做一次检查点env.getCheckpointConfig().setCheckpointTimeout(60000); // 检查点超时时间60秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); // 检查点之间最小暂停时间env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最大并发检查点数量// 2. 配置Kafka参数String kafkaBootstrapServers = "172.30.244.152:9092";String topic = "app_logs";String consumerGroup = "flink-log-analysis";// 3. 定义Kafka SourceKafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers(kafkaBootstrapServers).setTopics(topic).setGroupId(consumerGroup).setDeserializer(new KafkaRecordDeserializationSchema<String>() {@Overridepublic void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<String> out) throws IOException {String value = new String(record.value(), StandardCharsets.UTF_8);out.collect(value);}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}}).setStartingOffsets(OffsetsInitializer.earliest())// 添加Kafka客户端属性以提高稳定性.setProperty("enable.auto.commit", "false") // 由Flink管理偏移量提交.setProperty("session.timeout.ms", "45000").setProperty("max.poll.interval.ms", "300000").setProperty("heartbeat.interval.ms", "10000").setProperty("retry.backoff.ms", "1000").setProperty("reconnect.backoff.max.ms", "10000").setProperty("reconnect.backoff.ms", "1000").build();// 4. 从Kafka读取数据DataStream<String> logStream = env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Log Source");// 5. 解析日志数据DataStream<LogEntry> parsedLogStream = logStream.flatMap(new FlatMapFunction<String, LogEntry>() {@Overridepublic void flatMap(String value, Collector<LogEntry> out) throws Exception {try {String[] parts = value.split("\\|", 3);if (parts.length == 3) {Date timestamp = DATE_FORMAT.parse(parts[0]);String logLevel = parts[1];String message = parts[2];LogEntry entry = new LogEntry(timestamp, logLevel, message);LOG.info("Parsed log entry: {}", entry);out.collect(entry);} else {LOG.warn("Failed to parse log entry (wrong part count): {}", value);}} catch (ParseException e) {LOG.warn("Failed to parse log entry: {}", value, e);} catch (Exception e) {LOG.error("Unexpected error while parsing log entry: {}", value, e);}}});// 6. 统计日志级别分布KeyedStream<LogEntry, String> levelKeyedStream = parsedLogStream.keyBy(entry -> entry.getLogLevel());DataStream<Tuple2<String, Long>> levelCountStream = levelKeyedStream.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1))) // 每1分钟统计一次.aggregate(new AggregateFunction<LogEntry, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(LogEntry value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}},new ProcessWindowFunction<Long, Tuple2<String, Long>, String, TimeWindow>() {@Overridepublic void process(String level, Context context, Iterable<Long> elements, Collector<Tuple2<String, Long>> out) throws Exception {long count = elements.iterator().next();out.collect(new Tuple2<>(level, count));}});levelCountStream.print("LogLevelCount");// 7. 统计错误日志数量DataStream<LogEntry> errorLogStream = parsedLogStream.filter(entry -> entry.getLogLevel().equals("ERROR"));KeyedStream<LogEntry, String> errorKeyedStream = errorLogStream.keyBy(entry -> "ERROR"); // 所有错误日志为同一个键DataStream<Tuple2<String, Long>> errorCountStream = errorKeyedStream.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1))).aggregate(new AggregateFunction<LogEntry, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(LogEntry value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}},new ProcessWindowFunction<Long, Tuple2<String, Long>, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<Long> elements, Collector<Tuple2<String, Long>> out) {long count = elements.iterator().next();out.collect(new Tuple2<>("ERROR_COUNT", count));}});errorCountStream.print("ErrorCount");// 8. 按小时统计日志量DataStream<Tuple2<String, LogEntry>> hourlyLogStream = parsedLogStream.map(new MapFunction<LogEntry, Tuple2<String, LogEntry>>() {@Overridepublic Tuple2<String, LogEntry> map(LogEntry entry) throws Exception {String hourKey = new SimpleDateFormat("yyyy-MM-dd HH").format(entry.getTimestamp());return new Tuple2<>(hourKey, entry);}}).returns(new TypeHint<Tuple2<String, LogEntry>>() {});KeyedStream<Tuple2<String, LogEntry>, String> hourlyKeyedStream = hourlyLogStream.keyBy(tuple -> tuple.f0);DataStream<Tuple3<String, Long, Long>> hourlyCountStream = hourlyKeyedStream.window(TumblingProcessingTimeWindows.of(Duration.ofHours(1))).aggregate(new AggregateFunction<Tuple2<String, LogEntry>, Long, Long>() {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(Tuple2<String, LogEntry> value, Long accumulator) {return accumulator + 1;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}},new ProcessWindowFunction<Long, Tuple3<String, Long, Long>, String, TimeWindow>() {@Overridepublic void process(String hour, Context context, Iterable<Long> elements, Collector<Tuple3<String, Long, Long>> out) {long count = elements.iterator().next();out.collect(new Tuple3<>(hour, count, context.window().getEnd()));}});hourlyCountStream.print("HourlyLogCount");// 9. 启动任务env.execute("Log Analysis Demo");}
}

七、测试与验证

1. 创建测试主题

在Kafka中创建日志主题:

# 创建日志主题
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic app_logs

2. 发送测试数据

使用Kafka生产者发送测试日志数据:

# 启动Kafka生产者
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic app_logs# 输入以下测试数据(每行一条)
2025-10-05 12:30:45|INFO|Application started
2025-10-05 12:31:10|DEBUG|Connecting to database
2025-10-05 12:31:15|ERROR|Failed to connect to database
2025-10-05 12:32:00|INFO|Retry connection to database
2025-10-05 12:32:05|INFO|Database connected successfully
2025-10-05 12:33:20|WARN|Low disk space warning
2025-10-05 12:34:00|ERROR|Out of memory error
2025-10-05 13:00:00|INFO|Daily report generated

3. 运行程序并验证结果

在IDE中运行LogAnalysisDemo类的main方法,观察控制台输出。您应该能看到类似以下的输出:

LogLevelCount: INFO, 4
LogLevelCount: ERROR, 2
LogLevelCount: DEBUG, 1
LogLevelCount: WARN, 1
ErrorCount: ERROR_COUNT, 2
HourlyLogCount: 2025-10-05 12, 7, 1730793600000
HourlyLogCount: 2025-10-05 13, 1, 1730797200000

八、性能优化与最佳实践

1. 并行度调优

合理设置并行度可以充分利用集群资源,提高处理性能:

// 设置全局并行度
env.setParallelism(4);

2. 避免数据倾斜

数据倾斜会导致部分任务处理速度慢,整体性能下降。可以通过以下方式避免:

  • 合理设计键(Key),避免热点键
  • 使用自定义分区器
  • 对倾斜的数据进行预聚合

3. 状态管理

对于有状态的操作,合理管理状态可以提高程序的可靠性和性能:

  • 使用Checkpoint确保状态一致性
  • 对于大状态,考虑使用RocksDB后端
  • 定期清理不需要的状态

九、总结与展望

本文详细介绍了Flink的数据转换操作,包括基本转换、键控转换和多流转换,并结合Kafka Source实现了一个实时日志分析系统。通过这些转换操作,我们可以灵活地处理和分析实时数据流,实现各种复杂的业务需求。

在后续文章中,我们将继续深入探讨Flink的窗口计算、状态管理以及数据输出(Sink)等核心概念,包括各种Sink连接器的使用、输出格式配置、可靠性保证机制等内容,帮助您更全面地掌握Flink的端到端数据处理能力。敬请关注!


源文来自:http://blog.daimajiangxin.com.cn

源码地址:https://gitee.com/daimajiangxin/flink-learning

http://www.hskmm.com/?act=detail&tid=14468

相关文章:

  • java创建线程池去实现某个任务(多线程)
  • 20250827_黔西南网信杯_丢失的数据
  • 敏捷已死?2025年项目管理软件支持的混合管理模式正成为新主流!
  • 螺旋矩阵-leetcode
  • 【第十一章】Python 调用 MySQL 全面指南:从基础到实践​ - 实践
  • 开源中国社区:AI驱动下的开发者生态革命
  • 日志清理脚本模板 - 一叶舟
  • 11.备库出现gap处理方法
  • [原创]《C#高级GDI+实战:从零开发一个流程图》第10章:鼠标拖动完成连线、拖动时实时显示半透明虚线连线效果、自定义连接点样式
  • 修改Abp中Auto API Controllers中 默认生成的 Put、Delete请求
  • 电阻-温度数据拟合工具(最小二乘法)
  • delphi clientdataset 中文过滤问题
  • 基于 systemd 的 Go 应用自动化部署完整指南
  • 马来西亚股票数据API对接文档
  • [OpenGL]相机环境
  • 指令流水线的影响因素
  • Gitee本土化创新实践:中国企业研发效能提升的新引擎
  • 画面拼接后推流/64路画面同时拼接到一路流/指定程序窗口采集推流/另一种解决方案
  • Markdown的基本语法
  • 工业级CAD数据优化工具:PiXYZ Studio 2025 图文安装指南
  • BIM建模利器 Tekla Structures 2025 全流程安装指南
  • containerd离线安装
  • (转)使用 Embarcadero Delphi FMX 应用程序实现多点触控
  • 百度云服务ubtuntu安装docker
  • ubuntu安装mysql8并切换数据存储目录
  • WCF-双工通讯
  • 跨网文件安全交换系统:打破数据壁垒的高效之选!
  • 【F#学习】可区分联合 Discriminated Unions
  • Midscene.js - 开源的 AI 操作助手 - 广东靓仔
  • 详细介绍:【Datawhale25年9月组队学习:llm-preview+Task1:大模型介绍与环境配置】