代码:
package com.yourcompany.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
/**
* 最简版 - 所有代码在一个文件中
*/
public class MinimalFlinkJob {
public static void main(String[] args) throws Exception {
// 1. 检查环境参数
String env = System.getProperty("env");
if (env == null) {
System.err.println("错误: 使用-Denv=dev|test|prod");
System.exit(1);
}
// 2. 加载配置
Properties config = loadConfig(env);
// 3. 创建Flink环境
StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setParallelism(Integer.parseInt(config.getProperty("parallelism", "1")));
// 4. 执行作业
System.out.println("启动Flink作业,环境: " + env);
flinkEnv.execute("Job-" + env);
}
private static Properties loadConfig(String env) {
Properties props = new Properties();
try {
props.load(MinimalFlinkJob.class.getClassLoader()
.getResourceAsStream("config/application-" + env + ".properties"));
} catch (Exception e) {
System.err.println("加载配置失败: " + e.getMessage());
System.exit(1);
}
return props;
}
}
部署时使用参数:
# 提交到Flink集群
flink run -d \
-c com.yourcompany.flink.SimpleFlinkJob \
-Denv=prod \
-Dparallelism=8 \
your-job.jar