Skip to main content

核心库:概念指南

图(Graphs)

Spring AI Alibaba Graph 将智能体工作流建模为图。您可以使用三个关键组件来定义智能体的行为:

  1. State:共享的数据结构,表示应用程序的当前快照。它由 OverAllState 对象表示。

  2. Nodes:一个函数式接口 (AsyncNodeAction),编码智能体的逻辑。它们接收当前的 State 作为输入,执行一些计算或副作用,并返回更新后的 State

  3. Edges:一个函数式接口 (AsyncEdgeAction),根据当前的 State 确定接下来执行哪个 Node。它们可以是条件分支或固定转换。

通过组合 NodesEdges,您可以创建复杂的循环工作流,随时间演化 State。真正的力量来自于 Spring AI Alibaba 如何管理 State。 需要强调的是:NodesEdges 就像函数一样 - 它们可以包含 LLM 调用或只是 Java 代码。

简而言之:节点完成工作,边决定下一步做什么

StateGraph

StateGraph 类是使用的主要图类。它通过用户定义的状态策略进行参数化。

编译图

要构建您的图,首先定义 state,然后添加 nodesedges,最后编译它。编译图是什么意思?为什么需要编译?

编译是一个非常简单的步骤。它提供了对图结构的一些基本检查(没有孤立节点等)。这也是您可以指定运行时参数(如检查点器和中断点)的地方。您只需调用 .compile() 方法来编译图:

import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.CompiledGraph;

// 编译您的图
CompiledGraph graph = stateGraph.compile();

在使用图之前,您必须编译它。

The first thing you do when you define a graph is define the State of the graph. The State consists of the schema of the graph as well as reducer functions which specify how to apply updates to the state. The schema of the State will be the input schema to all Nodes and Edges in the graph, and should be defined using a map of [Channel] object. All Nodes will emit updates to the State which are then applied using the specified reducer function.

Schema

State(状态)

The way to specify the schema of a graph is by defining map of [Channel] objects where each key is an item in the state. 定义图时首先要做的是定义图的 StateState图的 schema 以及 reducer 函数组成,reducer 函数指定如何将更新应用于状态。State 的 schema 将是图中所有 NodesEdges 的输入 schema,应使用 KeyStrategyFactory 定义。所有 Nodes 将发出对 State 的更新,然后使用指定的 reducer 函数应用这些更新。

Schema(模式)

在 Spring AI Alibaba 中,可以通过 KeyStrategyFactory 定义状态的更新策略。每个键都可以指定自己的更新策略(如替换或追加)。 如果没有为某个键指定策略,则默认假定该键的所有更新都应覆盖它。 Example A:

Reducers(归约器)

[Reducers][KeyStrategy] 是理解如何将节点的更新应用到 `State` 的关键。`State` 中的每个键都有自己独立的 reducer 函数。如果没有显式指定 reducer 函数,则假定该键的所有更新都应覆盖它。让我们看几个例子来更好地理解它们。

**示例 A:**
"messages", Channels.appender(ArrayList::new)
);
import com.alibaba.cloud.ai.graph.KeyStrategyFactory;
import com.alibaba.cloud.ai.graph.state.strategy.AppendStrategy;

private static KeyStrategyFactory createKeyStrategyFactory() {
return () -> {
Map<String, KeyStrategy> keyStrategyMap = new HashMap<>();
keyStrategyMap.put("messages", new AppendStrategy());
return keyStrategyMap;
};

### AppenderChannel
var graphBuilder = new StateGraph(createKeyStrategyFactory());

<a id="remove-messages"></a>
### AppendStrategy(追加策略)

#### 删除消息

[AppendStrategy] 支持通过 [RemoveByHash] 删除消息。

Langgraph4j provides a Built in [AppederChannel.RemoveIdentifier] named [RemoveByHash] that allow to remove messages comparing their `hashCode`, below an example of its usage:

Spring AI Alibaba 提供了内置的 [RemoveByHash],允许通过比较其 `hashCode` 来删除消息,下面是其用法示例:
var workflow = new StateGraph<>(MessagesState.SCHEMA, MessagesState::new)
.addNode("agent_1", node_async(state -> Map.of("messages", "message1")))
import static com.alibaba.cloud.ai.graph.StateGraph.END;
import static com.alibaba.cloud.ai.graph.StateGraph.START;
import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;
import com.alibaba.cloud.ai.graph.state.RemoveByHash;

var workflow = new StateGraph(createKeyStrategyFactory())
.addNode("agent_3", node_async(state ->
Map.of("messages", ReplaceAllWith.of( List.of("a1", "a2"))) // this replace current messages values with ["a1", "a2"]
))
Map.of("messages", RemoveByHash.of("message2.1")) // 从消息值中删除 "message2.1"
.addEdge("agent_2", "agent_3")
.addEdge(START, "agent_1")
.addEdge("agent_3", END);

Custom Reducer

You can also specify a custom reducer for a particular state property

Example B:

static class MyState extends AgentState {

static Map<String, Channel<?>> SCHEMA = Map.of(
"property", Channel.<String>of( ( oldValue, newValue ) -> newValue.toUpperCase() )
);
}

var graphBuilder = new StateGraph<>( MessagesState.SCHEMA, MyState::new)
### 自定义 Reducer

您也可以为特定的状态属性指定自定义的 reducer

Serializer

示例 B: During graph execution the state needs to be serialized (mostly for cloning purpose) also for providing ability to persist the state across different executions. To do this we have provided a new streighforward implementation based on [Serializer] interface.

import com.alibaba.cloud.ai.graph.KeyStrategy;

private static KeyStrategyFactory createCustomKeyStrategyFactory() {
return () -> {
Map<String, KeyStrategy> keyStrategyMap = new HashMap<>();
keyStrategyMap.put("property", (oldValue, newValue) ->
((String) newValue).toUpperCase()
);
return keyStrategyMap;
};
  1. Manage nullable value in serialization process

序列化器(Serializer)

  • Allow to plug also different serialization techniques 在图执行期间,状态需要被序列化(主要用于克隆目的),同时也为跨不同执行持久化状态提供能力。Spring AI Alibaba 提供了基于 [PlainTextStateSerializer] 接口的直接实现。 Currently the main class for state's serialization using built-in java stream is [ObjectStreamStateSerializer]. It is also available an abstraction allowing to plug serialization techniques text based like JSON and/or YAML that is [PlainTextStateSerializer].

为什么需要序列化框架?

There are several provided Serializers out-of-the-box:

  1. 不依赖不安全的标准序列化框架
  2. 允许为第三方(非可序列化)类实现序列化
  3. 尽可能避免类加载问题
  4. 在序列化过程中管理可空值 class | description

特性

MapSerializer | built-in Map<String,Object> serializer

  • 允许使用 Java 内置标准二进制序列化技术进行序列化
  • 允许插入不同的序列化技术 AiMessageSerializer | langchain4j AiMessage Serializer 当前,使用内置 Java 流进行状态序列化的主要类是 [PlainTextStateSerializer]。它支持基于文本的序列化技术,如 JSONYAML
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;

public class State extends AgentState {

public State(Map<String, Object> initData) {
super( initData );
}

Optional<String> input() { return value("input"); }
Optional<String> results() { return value("results"); }

}

AsyncNodeAction<State> myNode = node_async(state -> {
System.out.println( "In myNode: " );
return Map.of( results: "Hello " + state.input().orElse( "" ) );
});

AsyncNodeAction<State> myOtherNode = node_async(state -> state);

节点(Nodes)

var builder = new StateGraph( State::new ) 在 Spring AI Alibaba 中,节点通常是一个函数式接口 ([AsyncNodeAction]),其参数是 state,您可以使用 [addNode] 方法将这些节点添加到图中:

Since [AsyncNodeAction] is designed to work with [CompletableFuture], you can use node_async static method that adapt it to a simpler synchronous scenario.

START Node

import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;
import com.alibaba.cloud.ai.graph.StateGraph;
import java.util.Map;

var myNode = node_async(state -> {
System.out.println("In myNode: ");
String input = (String) state.value("input").orElse("");
return Map.of("results", "Hello " + input);
import static org.bsc.langgraph4j.StateGraph.END;

var myOtherNode = node_async(state -> Map.of());
var builder = new StateGraph()
.addNode("myOtherNode", myOtherNode);

Edges

  • Normal Edges: 由于 [AsyncNodeAction] 设计用于与 [CompletableFuture] 一起工作,您可以使用 node_async 静态方法将其适配为更简单的同步场景。
  • Conditional Edges:

    Call a function to determine which node(s) to go to next.

  • Entry Point:

    Which node to call first when user input arrives.

  • Conditional Entry Point:

    Call a function to determine which node(s) to call first when user input arrives.

START 节点

// add a normal edge START 节点是一个特殊节点,表示将用户输入发送到图的节点。引用此节点的主要目的是确定首先应该调用哪些节点。

import static com.alibaba.cloud.ai.graph.StateGraph.START;

Conditional Edges

If you want to optionally route to 1 or more edges (or optionally terminate), you can use the [addConditionalEdges] method. This method accepts the name of a node and a Functional Interface ([AsyncEdgeAction]) that will be used as " routing function" to call after that node is executed:

END 节点

END 节点是一个特殊节点,表示终端节点。当您想要表示哪些边在完成后没有任何操作时,会引用此节点。

边(Edges)

边定义了逻辑如何路由以及图如何决定停止。这是智能体工作方式和不同节点之间如何通信的重要部分。有几种关键类型的边:

  • 普通边(Normal Edges)

    直接从一个节点到下一个节点。

  • 条件边(Conditional Edges)

    调用函数来确定接下来要去哪个节点。

  • 入口点(Entry Point)

    当用户输入到达时首先调用哪个节点。

  • 条件入口点(Conditional Entry Point)

    调用函数来确定当用户输入到达时首先调用哪个节点。

普通边

如果您总是想从节点 A 到节点 B,可以直接使用 [addEdge] 方法。

import static org.bsc.langgraph4j.utils.CollectionsUtils.mapOf;
// 添加普通边
graph.addConditionalEdges(START, routingFunction, Map.of( "first": "nodeB", "second": "nodeC" ) );

You must provide an object that maps the routingFunction's output to the name of the next node.

workflows by allowing humans to inspect, interrupt, and approve steps. Checkpointers are needed for these workflows as the human has to be able to view the state of a graph at any point in time, and the graph has to be to resume execution after the human has made any updates to the state.

import static com.alibaba.cloud.ai.graph.StateGraph.START;

Threads

条件入口点

Threads enable the checkpointing of multiple different runs, making them essential for multi-tenant chat applications and other scenarios where maintaining separate states is necessary. A thread is a unique ID assigned to a series of checkpoints saved by a checkpointer. When using a checkpointer, you must specify a thread_id when running the graph. 条件入口点允许您根据自定义逻辑从不同的节点开始。您可以从虚拟 START 节点使用 [addConditionalEdges] 来实现此目的。 thread_id is simply the ID of a thread. This is always required

import static com.alibaba.cloud.ai.graph.StateGraph.START;
import static com.alibaba.cloud.ai.graph.action.AsyncEdgeAction.edge_async;

graph.addConditionalEdges(START, edge_async(state -> "nodeB"),
Map.of("nodeB", "nodeB", "nodeC", "nodeC"));
var config = RunnableConfig.builder()
.threadId("a")
您必须提供一个对象,将 `routingFunction` 的输出映射到下一个节点的名称。
graph.invoke(inputs, config);

Checkpointer(检查点)

Spring AI Alibaba 具有内置的持久化层,通过 Checkpointers 实现。当您将 checkpointer 与图一起使用时,可以与图的状态进行交互。checkpointer 在每一步保存图状态的_检查点_,实现几个强大的功能:

首先,checkpointers 通过允许人类检查、中断和批准步骤来促进人机协作工作流。这些工作流需要 checkpointers,因为人类必须能够在任何时间点查看图的状态,并且图必须能够在人类对状态进行任何更新后恢复执行。

其次,它允许在交互之间保持"记忆"。您可以使用 checkpointers 创建线程并在图执行后保存线程的状态。在重复的人类交互(如对话)的情况下,任何后续消息都可以发送到该检查点,它将保留之前的记忆。

有关如何向图添加 checkpointer 的信息,请参阅此指南

线程(Threads)

Update state

线程支持对多个不同运行进行检查点,这对于多租户聊天应用程序和其他需要维护独立状态的场景至关重要。线程是分配给 checkpointer 保存的一系列检查点的唯一 ID。使用 checkpointer 时,必须在运行图时指定 thread_id。 You can also interact with the state directly and update it using [graph.updateState(config,values,asNode)]. This takes three different components: thread_id 只是线程的 ID。这始终是必需的。

  • config 在调用图时,您必须将这些作为配置的可配置部分传递。
  • asNode
import com.alibaba.cloud.ai.graph.RunnableConfig;

config

.threadId("a") .build(); values

These are the values that will be used to update the state. Note that this update is treated exactly as any update from a node is treated. This means that these values will be passed to the reducer functions that are part of the state. So this does NOT automatically overwrite the state. 有关如何使用线程的信息,请参阅此指南asNode

Checkpointer 状态

与 checkpointer 状态交互时,必须指定线程标识符。checkpointer 保存的每个检查点都有两个属性: The final thing you specify when calling updateState is asNode. This update will be applied as if it came from node asNode. If asNode is null, it will be set to the last node that updated the state, if not ambiguous.

  • state:这是此时的状态值。
  • nextNodeId:这是图中接下来要执行的节点的标识符。

Configuration

获取状态

您可以通过调用 graph.getState(config) 来获取 checkpointer 的状态。配置应包含 thread_id,并将为该线程获取状态。

const config = { configurable: { llm: "anthropic" }};

获取状态历史

await graph.invoke(inputs, config);

您还可以调用 graph.getStateHistory(config) 来获取图的历史记录列表。配置应包含 thread_id,并将为该线程获取状态历史记录。

You can then access and use this configuration inside a node:

更新状态

您还可以直接与状态交互并使用 graph.updateState(config, values, asNode) 更新它。这需要三个不同的组件:

  const llmType = config?.configurable?.llm;
let llm: BaseChatModel;
if (llmType) {
const llm = getLlm(llmType);
}
...
};

配置应包含指定要更新哪个线程的 thread_id

这些是将用于更新状态的值。请注意,此更新的处理方式与节点的任何更新完全相同。这意味着这些值将传递给作为状态一部分的 reducer 函数。因此,这不会自动覆盖状态。

Breakpoints (AKA interruptions )

调用 updateState 时指定的最后一件事是 asNode。此更新将应用为好像它来自节点 asNode。如果 asNode 为 null,它将被设置为更新状态的最后一个节点。

Spring AI Alibaba 开源项目基于 Spring AI 构建,是阿里云通义系列模型及服务在 Java AI 应用开发领域的最佳实践,提供高层次的 AI API 抽象与云原生基础设施集成方案,帮助开发者快速构建 AI 应用。