当前位置: 首页 > news >正文

图思维胜过链式思维:JGraphlet构建任务流水线的八大核心原则

图思维胜过链式思维:JGraphlet构建任务流水线 🚀

JGraphlet是一个轻量级、零依赖的Java库,用于构建任务流水线。它的强大之处不在于冗长的功能列表,而在于一小套协同工作的核心设计原则。

JGraphlet的核心是简洁性,基于图模型构建。向流水线添加任务并连接它们以创建图。
每个任务都有输入和输出,TaskPipeline构建并执行流水线,同时管理每个任务的I/O。例如,使用Map进行扇入操作,使用Record定义自定义数据模型等。TaskPipeline还包含PipelineContext在任务间共享数据,此外任务还可以被缓存,避免重复计算。

您可以自定义任务流水线的流程,并选择使用SyncTask或AsyncTask。默认情况下所有任务都是异步的。

1. 图优先执行模型

JGraphlet将工作流视为有向无环图。您将任务定义为节点,并显式绘制它们之间的连接。这使得扇出和扇入等复杂模式变得自然。

import dev.shaaf.jgraphlet.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;try (TaskPipeline pipeline = new TaskPipeline()) {Task<String, String> fetchInfo = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Info for " + id);Task<String, String> fetchFeed = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Feed for " + id);Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() ->inputs.get("infoNode") + " | " + inputs.get("feedNode"));pipeline.addTask("infoNode", fetchInfo).addTask("feedNode", fetchFeed).addTask("summaryNode", combine);pipeline.connect("infoNode", "summaryNode").connect("feedNode", "summaryNode");String result = (String) pipeline.run("user123").join();System.out.println(result); // "Info for user123 | Feed for user123"
}

2. 两种任务风格:Task<I,O>和SyncTask<I,O>

JGraphlet提供两种可混合使用的任务类型:

  • Task<I, O>(异步):返回CompletableFuture,适合I/O操作或繁重计算
  • SyncTask<I, O>(同步):直接返回O,适合快速的CPU密集型操作
try (TaskPipeline pipeline = new TaskPipeline()) {Task<String, String> fetchName = (userId, ctx) ->CompletableFuture.supplyAsync(() -> "John Doe");SyncTask<String, String> toUpper = (name, ctx) -> name.toUpperCase();pipeline.add("fetch", fetchName).then("transform", toUpper);String result = (String) pipeline.run("user-42").join();System.out.println(result); // "JOHN DOE"
}

3. 简单显式的API

JGraphlet避免复杂的构建器或魔法配置,API简洁明了:

  • 创建流水线:new TaskPipeline()
  • 注册节点:addTask("uniqueId", task)
  • 连接节点:connect("fromId", "toId")
try (TaskPipeline pipeline = new TaskPipeline()) {SyncTask<String, Integer> lengthTask = (s, c) -> s.length();SyncTask<Integer, String> formatTask = (i, c) -> "Length is " + i;pipeline.addTask("calculateLength", lengthTask);pipeline.addTask("formatOutput", formatTask);pipeline.connect("calculateLength", "formatOutput");String result = (String) pipeline.run("Hello").join();System.out.println(result); // "Length is 5"
}

4. 清晰的扇入输入形状

扇入任务接收Map<String, Object>,其中键是父任务ID,值是它们的结果。

try (TaskPipeline pipeline = new TaskPipeline()) {SyncTask<String, String> fetchUser = (id, ctx) -> "User: " + id;SyncTask<String, String> fetchPerms = (id, ctx) -> "Role: admin";Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() -> {String userData = (String) inputs.get("userNode");String permsData = (String) inputs.get("permsNode");return userData + " (" + permsData + ")";});pipeline.addTask("userNode", fetchUser).addTask("permsNode", fetchPerms).addTask("combiner", combine);pipeline.connect("userNode", "combiner").connect("permsNode", "combiner");String result = (String) pipeline.run("user-1").join();System.out.println(result); // "User: user-1 (Role: admin)"
}

5. 清晰的运行契约

执行流水线很简单:pipeline.run(input)返回最终结果的CompletableFuture。您可以使用.join()阻塞或使用异步链式调用。

String input = "my-data";// 阻塞方式
try {String result = (String) pipeline.run(input).join();System.out.println("Result (blocking): " + result);
} catch (Exception e) {System.err.println("Pipeline failed: " + e.getMessage());
}// 非阻塞方式
pipeline.run(input).thenAccept(result -> System.out.println("Result (non-blocking): " + result)).exceptionally(ex -> {System.err.println("Async pipeline failed: " + ex.getMessage());return null;});

6. 内置资源生命周期

JGraphlet实现AutoCloseable。使用try-with-resources保证内部资源的安全关闭。

try (TaskPipeline pipeline = new TaskPipeline()) {pipeline.add("taskA", new SyncTask<String, String>() {@Overridepublic String executeSync(String input, PipelineContext context) {if (input == null) {throw new IllegalArgumentException("Input cannot be null");}return "Processed: " + input;}});pipeline.run("data").join();} // pipeline.shutdown()自动调用
System.out.println("Pipeline resources have been released.");

7. 上下文

PipelineContext是线程安全的、每次运行的工作空间,用于存储元数据。

SyncTask<String, String> taskA = (input, ctx) -> {ctx.put("requestID", "xyz-123");return input;
};
SyncTask<String, String> taskB = (input, ctx) -> {String reqId = ctx.get("requestID", String.class).orElse("unknown");return "Processed input " + input + " for request: " + reqId;
};

8. 可选缓存

任务可以选择加入缓存以避免重复计算。

Task<String, String> expensiveApiCall = new Task<>() {@Overridepublic CompletableFuture<String> execute(String input, PipelineContext context) {System.out.println("Performing expensive call for: " + input);return CompletableFuture.completedFuture("Data for " + input);}@Overridepublic boolean isCacheable() { return true; }
};try (TaskPipeline pipeline = new TaskPipeline()) {pipeline.add("expensive", expensiveApiCall);System.out.println("First call...");pipeline.run("same-key").join();System.out.println("Second call...");pipeline.run("same-key").join(); // 结果来自缓存
}

最终结果是提供了一种清晰、可测试的方式来编排同步或异步任务,用于组合复杂流程,如并行检索、合并、判断和防护栏,而无需引入重量级的工作流引擎。

了解更多或尝试使用:

  • Maven中央仓库
  • Github仓库
    更多精彩内容 请关注我的个人公众号 公众号(办公AI智能小助手)
    公众号二维码
http://www.hskmm.com/?act=detail&tid=16640

相关文章:

  • 两月九城,纷享销客渠道携手伙伴共创CRM新纪元
  • markdown
  • mstsc带用户名密码自动登录
  • Sql Server Begin TRY sample
  • 基于数据平台构建供应链协同体系,实现业务全链路可视化与智能决策
  • 字节二面挂!面试官追问 Redis 内存淘汰策略 LRU 和传统 LRU 差异,我答懵了
  • UPX压缩工具的用法
  • NM:微生物组数据分析的规划与描述 - 详解
  • 300、金陵图
  • 山东布谷鸟科技:助力教育培训软件开发数字化转型与高效管理
  • 云边云科技4G路由器:连锁门店智慧联网的可靠基石 - 教程
  • PHP 8.5 升级指南 了解即将废弃的 11 个功能和完整迁移方案
  • 普科科技PKR26-3.5M3.5F-1M射频线缆在天线测试中的应用案例​
  • 基于Python+Vue开发的民宿客房预订管理系统源码+运行步骤
  • C#加解密:从入门到放弃?不,是到实战!
  • js react antd 实现页面低分变率和高分变率下字体大小自适用,主要是配置antd
  • C. Strange Function
  • 剑指offer-33、丑数
  • C#操作Excel核心要点:告别手动,拥抱自动化
  • 250925
  • 云平台qcow2镜像的制作
  • 介绍
  • 鸿蒙应用开发从入门到实战(十四):ArkUI组件ColumnRow线性布局
  • 【日记】被迫学习换锁(856 字)
  • 仿生视觉芯片迈向实用化:《Advanced Science》报道双极性宽谱光电晶体管,赋能自动驾驶与机器感知 - 教程
  • 详细介绍:2026毕设-基于Spring Boot的在线海鲜市场交易平台的设计与实现
  • 【源码解读之 Mybatis】【基础篇】-- 第3篇:SqlSession的创建与生命周期
  • AI智慧:于来路与关山之间,活在当下
  • 基于Qt和FFmpeg的安卓监控模拟器/手机摄像头模拟成onvif和28181设备
  • 详细介绍:Flink 2.x 独立集群(Standalone) 的部署