Chapter 3 — Kleisli Arrow:把“带 IO 的函数”当作可组合箭头
1. 开篇段落:Agent 开发中的“胶水危机”
在构建 LLM Agent 时,我们本质上是在编排一系列不确定且有副作用的操作:
- 用户输入 检索向量库(可能失败、网络延迟)
- 构建 Prompt 调用 LLM(耗时、可能超时、返回乱码)
- 解析结果 执行工具(文件读写、API 调用)
在传统的命令式编程(Imperative Programming)中,这种流程通常长这样:
# 典型的 "Spaghetti Code" Agent
async def run_agent(user_input):
# 1. 检索
try:
context = await vector_db.search(user_input)
except DbError:
context = "" # 容错逻辑混在主流程里
# 2. 构造 Prompt (纯逻辑)
prompt = f"Context: {context}\nQuestion: {user_input}"
# 3. LLM 调用
response = await llm_client.chat(prompt)
if not response.content:
return "Error: Empty response" # 错误检查
# 4. 解析与执行
if "<tool>" in response.content:
cmd = parse_tool(response.content)
result = await exec_tool(cmd) # 又是 IO
return result
else:
return response.content
这段代码的问题在于:业务逻辑(做什么)与控制流(错误处理、等待、重试)紧密耦合。如果你想给每一步都加上“Tracing 追踪”或“Retry 重试”,代码体积会爆炸。
Kleisli Arrow(克莱斯利箭头) 是函数式编程提供的一把手术刀。它将形式为 a -> m b 的函数(即“输入 a,产生带 m 上下文的 m b”)视为基本的组合单元。通过 Kleisli 组合,我们可以像搭积木一样, Agent 的思考、行动、观察串联成一条线性的、类型安全的、自带错误处理的管道。
本章学习目标:
- 彻底理解 Kleisli Arrow 的定义 与普通函数的区别。
- 掌握核心操作符 Fish Operator (
>=>) 及其在不同语言中的实现。 - 利用 Kleisli Category 重构 Agent:将 Planner、Executor、Memory 变成可组合模块。
- 中间件模式:如何利用 Kleisli 组合在不修改业务代码的前提下,透明地注入 Log、Trace 和 Retry。
- 辨析 Kleisli 与 Arrow 的区别:何时我们需要并行能力。
2. 文字论述
2.1 什么是 Kleisli Arrow?
在数学范畴论中,“箭头”(Morphism)通常指纯函数 。我们可以直接组合它们:。 但在 Agent 工程中,绝大多数函数是不纯的(Effectful)。我们称这类函数为 Kleisli Arrow。
定义:一个 Kleisli Arrow 是一个具有如下类型签名的函数:
其中:
- a : 输入类型(Input。
- m : Monad 上下文(Context/Effect)。在 Agent 中,这通常是
IO、Promise、Future,或者是包含错误处理的IO[Either[Error, ?]]。 - b : 输出值类型(Result)。
Agent 组件作为 Kleisli Arrow 的映射:
| Agent 组件 | 输入 () | Effect () | 输出 () | 函数签名示例 |
| Agent 组件 | 输入 () | Effect () | 输出 () | 函数签名示例 |
|---|---|---|---|---|
| Retriever | Query |
IO (网络/DB) |
List[Doc] |
search: Query -> IO [Doc] |
| LLM Model | Prompt |
IO (网络/延迟) |
String |
chat: Prompt -> IO String |
| Parser | String |
Either Error (解析可能失败) |
Action |
parse: String -> Either Err Action |
| Tool | Action |
IO (副作用) |
Observation |
run: Action -> IO Observation |
2.2 组合的几何学:为什么不能直接调用?
如果我们有两个 Kleisli Arrow:
- (例如:
getUserInput) - (例如:
searchGoogle)
我们不能写 。
因为 返回的是一个“盒子” IO B,而 需要的是盒子里的裸值” B。即类型不匹配:
在命令式语言里,我们用 await 或 .then() 强行拆包。但在抽象层面,我们需要一种操作符,能够自动完成“拆包 -> 传递 -> 封包”的过程。
2.3 The Fish Operator (>=>)
这种组合操作符被称为 Kleisli Composition,符号通常写作 >=>(形状像一条鱼),也称为“鱼操作符”。
定义如下(以 Haskell 风格为例):
它的执行逻辑是:
- 拿到输入 ,传给 。
- 运行,产生 (比如一个 Promise)。
- 利用 Monad 的
bind() 能力,等待 完成并取出 。 - 将 传给 。
- 运行,产生 。
- 返回这个 。
ASCII 流程图:Kleisli Pipeline
Pipeline: agent = step1 >=> step2 >=> step3
[ Input A ]
|
v
+-----------+
| Step 1 | f: A -> m B
+-----------+
| outputs (m B)
v
[ Bind/FlatMap Magic ] <--- 自动拆包,如果是 Error 则跳过后续
| passes (B)
v
+-----------+
| Step 2 | g: B -> m C
+-----------+
| outputs (m C)
v
[ Bind/FlatMap Magic ]
| passes (C)
v
+-----------+
| Step 3 | h: C -> m D
+-----------+
|
v
[ Output m D ]
2.4 工程实战:重构 Agent Pipeline
假设我们有以下三个原子能力(Atomic Capabilities):
-
promptTemplate: 将用户问题转换为 Prompt。这是一个纯函数,但为了组合,我们可以用pure提升它,或者使用map。 * 类型:String -> Prompt -
llmInference: 调用模型。 * 类型:Prompt -> IO String -
extractTool: 解析 JSON。 * 类型:String -> IO ToolCall(假设解析失败抛出 IO 错误)
使用 Kleisli 组合的代码(伪代码):
-- 定义 pipeline
thinkingProcess :: String -> IO ToolCall
thinkingProcess =
(pure . promptTemplate) -- 1. 纯函数提升为 Kleisli
>=> llmInference -- 2. IO 操作
>=> extractTool -- 3. IO 操作 (含解析逻辑)
-- 运行
result = thinkingProcess("帮我查一下天气")
-- result 是 IO ToolCall
这一行的价值在于: 我们定义了控制流的形状,而没有执行它。这让我们可以对 thinkingProcess 进行整体的测试、超时控制或修饰,而不需要深入到函数内部。
2.5 强大的中间件能力 (Middleware)
Kleisli Arrow 最迷人的地方在于它非常适合做 AOP (面向切面编程)。我们可以编写高阶函数来“修饰”任何一个 Kleisli Arrow。
场景 1:自动重试 (Retry)
定义一个修饰器 withRetry:
它接收一个箭头,返回一个新的箭头:如果原箭头失败,新箭头会重试 N 次。
场景 2:链路追踪 (Tracing)
定义 withTrace(name):
它会在执行前后记录开始和结束时间,并向分布式追踪系统发送 Span。
组合后的 Agent:
robustAgent =
(pure . template)
>=> withTrace("LLM_Call", withRetry(3, llmInference)) // 带追踪和重试的 LLM
>=> withTrace("Parser", extractTool) // 带追踪的解析
这种声明式的写法,让“功能需求”(业务逻辑)和“非功能需求”(稳定性、观测性)完全解耦。
2.6 Kleisli 的局限性:串行 vs 并行
Kleisli Arrow 本质上是串行的(Sequential)。
f >=> g 意味着 必须等待 完成才能获得输入。
如果你需要并行(例如:同时调用 3 个不同的 LLM 模型投票),Kleisli Arrow 就不够用了。这时我们需要引入 Arrow (更通用的接口) 或 Applicative Functor (parMap, parZip)。
- Kleisli: (链式依赖)
- Applicative: (无依赖并行)
在第 5 章(Runtime)中,我们会详细讨论如何在 Kleisli 管道的某个节点内部嵌入并行计算。
3. 本章小结
- Kleisli Arrow 是类型为 的函数,它是 Agent 系统中“带副作用步骤”的标准抽象。
- Fish Operator (
>=>) 是 Kleisli 的组合工具,它自动处理了 Monad 上下文的解包与传递,消除了回调地狱。 - 短路特性:基于
IO或EitherMonad 的 Kleisli 组合天生具有短路能力。上一步报错,下一步自动跳过。 - 中间件架构:Kleisli Arrow 极易被装饰。我们可以通过高阶函数无侵入地为 Agent 的任意步骤添加 Retry、Timeout、Tracing 和 Logging。
- 设计原则:将 Agent 拆解为细粒度的 Kleisli Arrow,然后通过组合子(Combinators)将它们拼装成完整的运行时。
4. 练习题
提示:除了思考类型签名,尝试用你熟悉的语言(JS/TS/Python)构思其实现逻辑。
基础题 (50%)
Q1. 类型体操:拼接管道 已知:
fetchUser : UserID -> IO UserProfilevectorSearch : UserProfile -> IO [Document]summarize : [Document] -> IO String
请写出将 UserID 转换为 String (Summary) 的 Kleisli 组合表达式。
(Hint: 只需要关注输入输出类型的首尾相接)
参考答案 (折叠)
表达式:
fetchUser >=> vectorSearch >=> summarize
类型推导过程:
UserID -> IO UserProfileUserProfile -> IO [Document](输入匹配上一步的输出)[Document] -> IO String(输入匹配上一步的输出) 最终得到:UserID -> IO String
Q2. 纯函数的介入
假设我们有一个纯函数 filterDocs : [Document] -> [Document],它不过滤网络,只在内存里操作。
如何将它插入到 Q1 的管道中 vectorSearch 和 summarize 之间?
(Hint: 纯函数不能直接用 >=>,需要提升)
参考答案 (折叠)
纯函数 A -> B 需要提升为 A -> IO B 才能参与 Kleisli 组合。
方法是使用 pure (或 return / async (x) => x)。
表达式:
fetchUser >=> vectorSearch >=> (pure . filterDocs) >=> summarize
或者利用 Functor 的 map 语义,在组合外部操作:
fetchUser >=> (vectorSearch.map(filterDocs)) >=> summarize (这种写法取决于具体语言库的实现,第一种更通用)
Q3. TypeScript 实现
在 TypeScript 中实现 composeK (即 >=>)。
类型定义参考:type KFunc<A, B> = (a: A) => Promise<B>。
参考答案 (折叠)
// 这是一个泛型的高阶函数
const composeK = <A, B, C>(
f: (a: A) => Promise<B>,
g: (b: B) => Promise<C>
) => {
// 返回一个新的函数
return async (a: A): Promise<C> => {
const b = await f(a); // 1. 执行 f 并等待解包
return g(b); // 2. 将结果传给 g 并返回
};
};
// 使用示例
// const pipeline = composeK(step1, step2);
挑战题 (50%)
Q4. "Circuit Breaker" (熔断器) 中间件
设计一个高阶函数 circuitBreaker,它包装一个 Kleisli Arrow。
逻辑:如果最近 5 次调用中有 3 次失败,则在接下来的 1 分钟内直返回 Error,不再实际执行被包装的函数。
问题:实现这个逻辑需要什么额外的“副作用”?这是否破坏了 Kleisli 的纯粹性?
(Hint: IO Monad 内部可以包含 State 或 Ref)
参考答案 (折叠)
所需能力:需要一个可变的、跨请求持久化的状态(State/Ref),用来记录失败计数和上次失败时间。
类型签名设计:
circuitBreaker : StateRef -> (A -> IO B) -> (A -> IO B)
纯粹性分析:
这没有破坏纯粹性,前提是“读取/写入状态”这个动作本身被封装在了 IO Effect 中。
当我们组合出这个 Agent 时,我们只是构建了一个“描述”。只有当 Agent 运行时,这个状态才会被修改。
在 Haskell/Scala Cats 中,通常使用 Ref[IO, BreakerState] 来实现这种并发安全的内部状态。
Q5. 分支逻辑 (Switching)
有时候我们需要根据上一步的结果决定下一步走哪条路(例如:如果意图是 "Search" 走搜索管道,如果是 "Chat" 走闲聊管道)。
请定义一个函数 branch:
输入:
check : A -> Boolean(判断条件)ifTrue : A -> IO B(Kleisli Arrow 1)ifFalse : A -> IO B(Kleisli Arrow 2) 输出:A -> IO B
请问这个 branch 函数本身是 Kleisli Arrow 吗?它能被组合进管道吗?
参考答案 (折叠)
实现逻辑:
const branch = (check, ifTrue, ifFalse) => (input) => {
return check(input) ? ifTrue(input) : ifFalse(input);
}
回答:
是的,branch 返回的结果依然是一个 A -> IO B 类型的函数。
因此,它完美符合 Kleisli Arrow 的定义。
你可以把它像普通步骤一样组合进管道:
preprocess >=> branch(isSearch, searchPipe, chatPipe) >=> postprocess
这就是 Kleisli 的强大之处:控制流也是管道的一部分。
Q6. 调试 Kleisli
在一个长长的 f >=> g >=> h >=> ... 链条中,如果中间某一步数据不对,如何调试?
能不能写一个 tap 函数,允许我们在管道中间 console.log 数据,但不改变数据流向?
参考答案 (折叠)
可以。这个模式通常叫 tap 或 inspect。
定义:
tap : (A -> IO Unit) -> (A -> IO A)
或者更简单版本(只打印):
logResult : String -> (A -> IO A)
实现 (TS):
const inspect = <A>(tag: string) => async (x: A): Promise<A> => {
console.log(`[${tag}]`, x);
return x; // 原样返回,保持管道流动
};
使用:
step1 >=> inspect("After Step1") >=> step2
这让调试变得像在管道上打孔一样简单。
5. 常见陷阱与错误 (Gotchas)
1. 隐式状态丢失
- 现象:在管道的一开始获取了
UserID,但在执行了 3 步之后,第 4 步又需要UserID。 - 错误:因为 Kleisli 是
A -> m B,B -> m C,中间的信息如果没有显式传递,就会丢失。 - 解决:
- Payload 传递:让每一步返回
(Result, Context)元组。 - Reader Monad:将 Monad 栈升级为
ReaderT Context IO,这样任何步骤都可以随时ask获取环境信息。
2. 这里的 Error 到底是谁的 Error?
- 现象:LLM 返回了
"I don't know"(业务层面的失败),但 IO Monad 认为这是成功的(网络请求成功了)。 - 陷阱:直接用
IO的错误机制处理业务逻辑错误。 - 最佳实践:
- IO Error (Exception):保留给网络断连、超时、API 500 等基础设施错误。
- Domain Error (Either):LLM 拒绝回答、工具参数解析错误等,应该作为返回值的一部分(
IO (Either Refusal Answer)),而不是抛出异常。
3. "Promise Hell" 的变体
- 现象:虽然用了 Kleisli 概念,但实现时还是手动写
step1().then(res => step2(res))。 - 建议:一定要封装通用的
compose或pipe函数。如果语言支持(如 F#>=>, Haskell>=>, Scala CatsandThen),请直接使用库函数。在 TS/JS 中,使用fp-ts或简单的 utility function 来保持代码的平铺。
4. 忽略了 pure 的开销
- 现象:把大量的纯计算逻辑(如复杂的字符串正则匹配)强行拆成微小的 Kleisli Arrow。
- 权衡:虽然组合性好了,但每次
pure提升并在 Monad 中 bind 都会带来微小的运行时开销(Event Loop tick)。对于极度密集的 CPU 计算,直接写成纯函数块,只在最后提升一次即可。