Spring AI 源码解析:MCP链路调用流程及示例
· 阅读需 80 分钟
MCP官方文档:https://modelcontextprotocol.io/introduction
java版的MCP源码:https://github.com/modelcontextprotocol/java-sdk
- 本版源码解析,取自mcp/java-sdk(20250322),等正式发版后会再度更新
理论部分
MCP调用链路(核心)
以client-webflux、server-webflux为例
初始化连接链路...

客户端咨询问题,调用服务端工具链路

McpClient(客户端)
用于创建MCP客户端的工厂类,提供了同步、异步客户端的方法,并支持多种配置,提供如下核心功能
- 配置选项:运行设置请求超时、客户端能力、客户端信息、根URI等
- 工具和资源管理:支持工具发现、资源访问、提示模版处理等
- 实时更新:通过变更消费者接收工具、资源和提示的实时更新
- 日志记录:支持结构化日志记录,提供多种日志级别和日志消费者配置
内部类
- AsyncSpec:配置异步 MCP Server 的构建器类
- SyncSpec:配置同步 MCP Server 的构建器类
public interface McpClient {
static SyncSpec sync(McpClientTransport transport) {
return new SyncSpec(transport);
}
static AsyncSpec async(McpClientTransport transport) {
return new AsyncSpec(transport);
}
class SyncSpec {
private final McpClientTransport transport;
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
private Duration initializationTimeout = Duration.ofSeconds(20);
private ClientCapabilities capabilities;
private Implementation clientInfo = new Implementation("Java SDK MCP Client", "1.0.0");
private final Map<String, Root> roots = new HashMap<>();
private final List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers = new ArrayList<>();
private final List<Consumer<List<McpSchema.Resource>>> resourcesChangeConsumers = new ArrayList<>();
private final List<Consumer<List<McpSchema.Prompt>>> promptsChangeConsumers = new ArrayList<>();
private final List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers = new ArrayList<>();
private Function<CreateMessageRequest, CreateMessageResult> samplingHandler;
private SyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
}
public SyncSpec requestTimeout(Duration requestTimeout) {
Assert.notNull(requestTimeout, "Request timeout must not be null");
this.requestTimeout = requestTimeout;
return this;
}
public SyncSpec initializationTimeout(Duration initializationTimeout) {
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
this.initializationTimeout = initializationTimeout;
return this;
}
public SyncSpec capabilities(ClientCapabilities capabilities) {
Assert.notNull(capabilities, "Capabilities must not be null");
this.capabilities = capabilities;
return this;
}
public SyncSpec clientInfo(Implementation clientInfo) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
return this;
}
public SyncSpec roots(List<Root> roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public SyncSpec roots(Root... roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public SyncSpec sampling(Function<CreateMessageRequest, CreateMessageResult> samplingHandler) {
Assert.notNull(samplingHandler, "Sampling handler must not be null");
this.samplingHandler = samplingHandler;
return this;
}
public SyncSpec toolsChangeConsumer(Consumer<List<McpSchema.Tool>> toolsChangeConsumer) {
Assert.notNull(toolsChangeConsumer, "Tools change consumer must not be null");
this.toolsChangeConsumers.add(toolsChangeConsumer);
return this;
}
public SyncSpec resourcesChangeConsumer(Consumer<List<McpSchema.Resource>> resourcesChangeConsumer) {
Assert.notNull(resourcesChangeConsumer, "Resources change consumer must not be null");
this.resourcesChangeConsumers.add(resourcesChangeConsumer);
return this;
}
public SyncSpec promptsChangeConsumer(Consumer<List<McpSchema.Prompt>> promptsChangeConsumer) {
Assert.notNull(promptsChangeConsumer, "Prompts change consumer must not be null");
this.promptsChangeConsumers.add(promptsChangeConsumer);
return this;
}
public SyncSpec loggingConsumer(Consumer<McpSchema.LoggingMessageNotification> loggingConsumer) {
Assert.notNull(loggingConsumer, "Logging consumer must not be null");
this.loggingConsumers.add(loggingConsumer);
return this;
}
public SyncSpec loggingConsumers(List<Consumer<McpSchema.LoggingMessageNotification>> loggingConsumers) {
Assert.notNull(loggingConsumers, "Logging consumers must not be null");
this.loggingConsumers.addAll(loggingConsumers);
return this;
}
public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler);
McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);
return new McpSyncClient(
new McpAsyncClient(transport, this.requestTimeout, this.initializationTimeout, asyncFeatures));
}
}
class AsyncSpec {
private final McpClientTransport transport;
private Duration requestTimeout = Duration.ofSeconds(20); // Default timeout
private Duration initializationTimeout = Duration.ofSeconds(20);
private ClientCapabilities capabilities;
private Implementation clientInfo = new Implementation("Spring AI MCP Client", "0.3.1");
private final Map<String, Root> roots = new HashMap<>();
private final List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers = new ArrayList<>();
private final List<Function<List<McpSchema.Resource>, Mono<Void>>> resourcesChangeConsumers = new ArrayList<>();
private final List<Function<List<McpSchema.Prompt>, Mono<Void>>> promptsChangeConsumers = new ArrayList<>();
private final List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers = new ArrayList<>();
private Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler;
private AsyncSpec(McpClientTransport transport) {
Assert.notNull(transport, "Transport must not be null");
this.transport = transport;
}
public AsyncSpec requestTimeout(Duration requestTimeout) {
Assert.notNull(requestTimeout, "Request timeout must not be null");
this.requestTimeout = requestTimeout;
return this;
}
public AsyncSpec initializationTimeout(Duration initializationTimeout) {
Assert.notNull(initializationTimeout, "Initialization timeout must not be null");
this.initializationTimeout = initializationTimeout;
return this;
}
public AsyncSpec capabilities(ClientCapabilities capabilities) {
Assert.notNull(capabilities, "Capabilities must not be null");
this.capabilities = capabilities;
return this;
}
public AsyncSpec clientInfo(Implementation clientInfo) {
Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
return this;
}
public AsyncSpec roots(List<Root> roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public AsyncSpec roots(Root... roots) {
Assert.notNull(roots, "Roots must not be null");
for (Root root : roots) {
this.roots.put(root.uri(), root);
}
return this;
}
public AsyncSpec sampling(Function<CreateMessageRequest, Mono<CreateMessageResult>> samplingHandler) {
Assert.notNull(samplingHandler, "Sampling handler must not be null");
this.samplingHandler = samplingHandler;
return this;
}
public AsyncSpec toolsChangeConsumer(Function<List<McpSchema.Tool>, Mono<Void>> toolsChangeConsumer) {
Assert.notNull(toolsChangeConsumer, "Tools change consumer must not be null");
this.toolsChangeConsumers.add(toolsChangeConsumer);
return this;
}
public AsyncSpec resourcesChangeConsumer(
Function<List<McpSchema.Resource>, Mono<Void>> resourcesChangeConsumer) {
Assert.notNull(resourcesChangeConsumer, "Resources change consumer must not be null");
this.resourcesChangeConsumers.add(resourcesChangeConsumer);
return this;
}
public AsyncSpec promptsChangeConsumer(Function<List<McpSchema.Prompt>, Mono<Void>> promptsChangeConsumer) {
Assert.notNull(promptsChangeConsumer, "Prompts change consumer must not be null");
this.promptsChangeConsumers.add(promptsChangeConsumer);
return this;
}
public AsyncSpec loggingConsumer(Function<McpSchema.LoggingMessageNotification, Mono<Void>> loggingConsumer) {
Assert.notNull(loggingConsumer, "Logging consumer must not be null");
this.loggingConsumers.add(loggingConsumer);
return this;
}
public AsyncSpec loggingConsumers(
List<Function<McpSchema.LoggingMessageNotification, Mono<Void>>> loggingConsumers) {
Assert.notNull(loggingConsumers, "Logging consumers must not be null");
this.loggingConsumers.addAll(loggingConsumers);
return this;
}
public McpAsyncClient build() {
return new McpAsyncClient(this.transport, this.requestTimeout, this.initializationTimeout,
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.promptsChangeConsumers,
this.loggingConsumers, this.samplingHandler));
}
}
}
McpAsyncClient
异步客户端实现,基于Project Reactor的Mono与Flux类型,支持非阻塞操作
初始化与关闭
- initialize:初始化客户端与服务端的连接,协商协议版本、交换能力并共享实现信息
- close:立即关闭客户端连接
- closeGracefully:优雅关闭客户端连接
工具操作
- callTool:调用服务器提供的工具
- listTools:获取服务器提供的工具列表
