代理编排:例程和交接

在处理语言模型时,通常只需要一个好的提示和正确的工具就能获得可靠的性能。然而,当处理许多独特的流程时,事情可能会变得复杂。本指南将介绍一种解决此问题的方法。

我们将引入“例程”和“交接”的概念,然后介绍实现过程,并展示如何以一种简单、强大且可控的方式来编排多个代理。

最后,我们提供了一个名为 Swarm 的示例仓库,它实现了这些想法并包含了一些示例。

让我们从设置导入开始。

from openai import OpenAI
from pydantic import BaseModel
from typing import Optional
import json


client = OpenAI()

例程

“例程”的概念没有严格的定义,而是用来捕捉一系列步骤的想法。具体来说,我们将例程定义为一组自然语言指令(我们将用系统提示来表示),以及完成这些指令所需的工具。

让我们看一个例子。下面,我们为客服代理定义了一个例程,指示它对用户问题进行分类,然后提出修复建议或提供退款。我们还定义了必要的函数 execute_refundlook_up_item。我们可以称之为客服例程、代理、助手等——但其核心思想是相同的:一组步骤和执行它们的工具。

# 客服例程

system_message = (
    "你是 ACME 公司的客服代理。"
    "回答始终不超过一句话。"
    "请遵循以下例程与用户互动:"
    "1. 首先,提出探究性问题,深入了解用户的问题。\n"
    " - 除非用户已提供原因。\n"
    "2. 提出一个修复方案(可以自己编一个)。\n"
    "3. 只有在用户不满意时,才提供退款。\n"
    "4. 如果用户接受,则查找 ID 并执行退款。"
    ""
)

def look_up_item(search_query):
    """用于查找商品 ID。
    搜索查询可以是描述或关键词。"""

    # 返回硬编码的商品 ID - 实际上会进行查找
    return "item_132612938"


def execute_refund(item_id, reason="未提供"):

    print("摘要:", item_id, reason) # 懒惰摘要
    return "成功"

例程的主要优势在于其简洁性和健壮性。请注意,这些指令包含条件,就像状态机或代码中的分支一样。大型语言模型实际上可以相当稳健地处理这些小型和中型例程,并且具有“软”遵循的优点——大型语言模型可以自然地引导对话,而不会陷入死胡同。

执行例程

为了执行例程,我们实现一个简单的循环,该循环:

  1. 获取用户输入。
  2. 将用户消息附加到 messages
  3. 调用模型。
  4. 将模型响应附加到 messages
def run_full_turn(system_message, messages):
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "system", "content": system_message}] + messages,
    )
    message = response.choices[0].message
    messages.append(message)

    if message.content: print("Assistant:", message.content)

    return message


messages = []
while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    run_full_turn(system_message, messages)

如您所见,这目前会忽略函数调用,所以我们来添加它。

模型需要将函数格式化为函数模式。为了方便起见,我们可以定义一个辅助函数,将 Python 函数转换为相应的函数模式。

import inspect

def function_to_schema(func) -> dict:
    type_map = {
        str: "string",
        int: "integer",
        float: "number",
        bool: "boolean",
        list: "array",
        dict: "object",
        type(None): "null",
    }

    try:
        signature = inspect.signature(func)
    except ValueError as e:
        raise ValueError(
            f"Failed to get signature for function {func.__name__}: {str(e)}"
        )

    parameters = {}
    for param in signature.parameters.values():
        try:
            param_type = type_map.get(param.annotation, "string")
        except KeyError as e:
            raise KeyError(
                f"Unknown type annotation {param.annotation} for parameter {param.name}: {str(e)}"
            )
        parameters[param.name] = {"type": param_type}

    required = [
        param.name
        for param in signature.parameters.values()
        if param.default == inspect._empty
    ]

    return {
        "type": "function",
        "function": {
            "name": func.__name__,
            "description": (func.__doc__ or "").strip(),
            "parameters": {
                "type": "object",
                "properties": parameters,
                "required": required,
            },
        },
    }

例如:

def sample_function(param_1, param_2, the_third_one: int, some_optional="John Doe"):
    """
    这是我的文档字符串。当你想要的时候调用这个函数。
    """
    print("Hello, world")

schema =  function_to_schema(sample_function)
print(json.dumps(schema, indent=2))
    {
      "type": "function",
      "function": {
        "name": "sample_function",
        "description": "这是我的文档字符串。当你想要的时候调用这个函数。",
        "parameters": {
          "type": "object",
          "properties": {
            "param_1": {
              "type": "string"
            },
            "param_2": {
              "type": "string"
            },
            "the_third_one": {
              "type": "integer"
            },
            "some_optional": {
              "type": "string"
            }
          },
          "required": [
            "param_1",
            "param_2",
            "the_third_one"
          ]
        }
      }
    }

现在,我们可以在调用模型时使用此函数将工具传递给模型。

messages = []

tools = [execute_refund, look_up_item]
tool_schemas = [function_to_schema(tool) for tool in tools]

response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": "查找黑色靴子。"}],
            tools=tool_schemas,
        )
message = response.choices[0].message

message.tool_calls[0].function
    Function(arguments='{"search_query":"black boot"}', name='look_up_item')

最后,当模型调用工具时,我们需要执行相应的函数并将结果返回给模型。

我们可以通过在 tool_map 中将工具名称映射到 Python 函数,然后在 execute_tool_call 中查找并调用它来实现。最后,我们将结果添加到对话中。

tools_map = {tool.__name__: tool for tool in tools}

def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # 使用提供的参数调用相应的函数
    return tools_map[name](**args)

for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            # 将结果添加回对话中
            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)
    Assistant: look_up_item({'search_query': 'black boot'})

在实践中,我们还希望让模型利用结果产生另一个响应。该响应可能也包含一个工具调用,因此我们可以将其放入一个循环中,直到不再有工具调用为止。

如果我们把所有东西放在一起,它看起来会是这样的:

tools = [execute_refund, look_up_item]


def run_full_turn(system_message, tools, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # 将 Python 函数转换为工具并保存反向映射
        tool_schemas = [function_to_schema(tool) for tool in tools]
        tools_map = {tool.__name__: tool for tool in tools}

        # === 1. 获取 OpenAI 的响应 ===
        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "system", "content": system_message}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # 打印助手响应
            print("Assistant:", message.content)

        if not message.tool_calls:  # 如果处理完工具调用,则中断
            break

        # === 2. 处理工具调用 ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. 返回新的消息 =====
    return messages[num_init_messages:]


def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"Assistant: {name}({args})")

    # 使用提供的参数调用相应的函数
    return tools_map[name](**args)


messages = []
while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    new_messages = run_full_turn(system_message, tools, messages)
    messages.extend(new_messages)

现在我们有了一个例程,假设我们想添加更多的步骤和更多的工具。我们可以做到,但最终如果我们尝试用太多的不同任务来扩展例程,它可能会开始遇到困难。这时我们可以利用多个例程的概念——给定一个用户请求,我们可以加载正确的例程和适当的步骤和工具来处理它。

动态交换系统指令和工具可能看起来令人生畏。但是,如果我们把“例程”看作是“代理”,那么这种“交接”的概念允许我们简单地表示这些交换——就像一个代理将对话转交给另一个代理一样。

交接

让我们将“交接”定义为代理(或例程)将活动对话转交给另一个代理,就像在电话通话中被转接给其他人一样。只不过在这种情况下,代理们完全了解您之前的对话!

为了看到交接的实际效果,让我们先定义一个基本的 Agent 类。

class Agent(BaseModel):
    name: str = "Agent"
    model: str = "gpt-4o-mini"
    instructions: str = "You are a helpful Agent"
    tools: list = []

现在为了让我们的代码支持它,我们可以修改 run_full_turn 以接受一个 Agent 而不是单独的 system_messagetools

def run_full_turn(agent, messages):

    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # 将 Python 函数转换为工具并保存反向映射
        tool_schemas = [function_to_schema(tool) for tool in agent.tools]
        tools_map = {tool.__name__: tool for tool in agent.tools}

        # === 1. 获取 OpenAI 的响应 ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": agent.instructions}] + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # 打印代理响应
            print("Assistant:", message.content)

        if not message.tool_calls:  # 如果处理完工具调用,则中断
            break

        # === 2. 处理工具调用 ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools_map)

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. 返回最新的代理和新的消息 =====
    return messages[num_init_messages:]


def execute_tool_call(tool_call, tools_map):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print("Assistant:", f"{name}({args})")

    # 使用提供的参数调用相应的函数
    return tools_map[name](**args)

我们现在可以轻松地运行多个代理:

def execute_refund(item_name):
    return "成功"

refund_agent = Agent(
    name="Refund Agent",
    instructions="你是退款代理。帮助用户处理退款。",
    tools=[execute_refund],
)

def place_order(item_name):
    return "成功"

sales_assistant = Agent(
    name="Sales Assistant",
    instructions="你是销售助理。向用户销售产品。",
    tools=[place_order],
)


messages = []
user_query = "下订单购买一双黑色靴子。"
print("User:", user_query)
messages.append({"role": "user", "content": user_query})

response = run_full_turn(sales_assistant, messages) # 销售助理
messages.extend(response)


user_query = "实际上,我想要退款。" # 暗示指最后一件商品
print("User:", user_query)
messages.append({"role": "user", "content": user_query})
response = run_full_turn(refund_agent, messages) # 退款代理
User: 下订单购买一双黑色靴子。
Assistant: place_order({'item_name': 'black boot'})
Assistant: 您的黑色靴子订单已成功下单!如果您还需要其他帮助,请随时提出!
User: 实际上,我想要退款。
Assistant: execute_refund({'item_name': 'black boot'})
Assistant: 您的黑色靴子退款已成功处理。如果您需要进一步的帮助,请告知!

太棒了!但我们在这里手动进行了交接——我们希望代理自己决定何时进行交接。一种简单但效果出奇地好的方法是给它们一个 transfer_to_XXX 函数,其中 XXX 是某个代理。模型足够智能,知道何时调用此函数以进行交接!

交接函数

现在代理可以表达进行交接的意图了,我们必须实际实现它。有很多方法可以做到这一点,但有一种方法特别干净。

对于我们迄今为止定义的代理函数,如 execute_refundplace_order,它们返回一个字符串,该字符串将被提供给模型。如果我们返回一个 Agent 对象来指示我们想要转到哪个代理,会怎么样?就像这样:

refund_agent = Agent(
    name="Refund Agent",
    instructions="你是退款代理。帮助用户处理退款。",
    tools=[execute_refund],
)

def transfer_to_refunds():
    return refund_agent

sales_assistant = Agent(
    name="Sales Assistant",
    instructions="你是销售助理。向用户销售产品。",
    tools=[place_order],
)

然后,我们可以更新代码以检查函数响应的返回类型,如果它是 Agent,则更新正在使用的代理!此外,现在 run_full_turn 需要返回最后使用的代理,以防发生交接。(我们可以将其放入一个 Response 类中以保持整洁。)

class Response(BaseModel):
    agent: Optional[Agent]
    messages: list

现在是更新后的 run_full_turn

def run_full_turn(agent, messages):

    current_agent = agent
    num_init_messages = len(messages)
    messages = messages.copy()

    while True:

        # 将 Python 函数转换为工具并保存反向映射
        tool_schemas = [function_to_schema(tool) for tool in current_agent.tools]
        tools = {tool.__name__: tool for tool in current_agent.tools}

        # === 1. 获取 OpenAI 的响应 ===
        response = client.chat.completions.create(
            model=agent.model,
            messages=[{"role": "system", "content": current_agent.instructions}]

            + messages,
            tools=tool_schemas or None,
        )
        message = response.choices[0].message
        messages.append(message)

        if message.content:  # 打印代理响应
            print(f"{current_agent.name}:", message.content)

        if not message.tool_calls:  # 如果处理完工具调用,则中断
            break

        # === 2. 处理工具调用 ===

        for tool_call in message.tool_calls:
            result = execute_tool_call(tool_call, tools, current_agent.name)

            if type(result) is Agent:  # 如果是代理转移,则更新当前代理
                current_agent = result
                result = (
                    f"已转接至 {current_agent.name}。请立即采用该角色。"
                )

            result_message = {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": result,
            }
            messages.append(result_message)

    # ==== 3. 返回最后使用的代理和新的消息 =====
    return Response(agent=current_agent, messages=messages[num_init_messages:])


def execute_tool_call(tool_call, tools, agent_name):
    name = tool_call.function.name
    args = json.loads(tool_call.function.arguments)

    print(f"{agent_name}:", f"{name}({args})")

    return tools[name](**args)  # 使用提供的参数调用相应的函数

让我们看一个包含更多代理的示例。

def escalate_to_human(summary):
    """仅在明确要求时调用。"""
    print("正在升级至人工代理...")
    print("\n=== 升级报告 ===")
    print(f"摘要: {summary}")
    print("=================\n")
    exit()


def transfer_to_sales_agent():
    """用于任何与销售或购买相关的事务。"""
    return sales_agent


def transfer_to_issues_and_repairs():
    """用于处理问题、维修或退款。"""
    return issues_and_repairs_agent


def transfer_back_to_triage():
    """如果用户提出的主题超出了您的职责范围,包括升级给人工代理,请调用此函数。"""
    return triage_agent


triage_agent = Agent(
    name="Triage Agent",
    instructions=(
        "你是 ACME 公司的客服机器人。 "
        "介绍你自己。回答始终要非常简短。 "
        "收集信息以将客户引导至正确的部门。 "
        "但要让你的问题巧妙而自然。"
    ),
    tools=[transfer_to_sales_agent, transfer_to_issues_and_repairs, escalate_to_human],
)


def execute_order(product, price: int):
    """价格应以美元为单位。"""
    print("\n\n=== 订单摘要 ===")
    print(f"产品: {product}")
    print(f"价格: ${price}")
    print("=================\n")
    confirm = input("确认订单?y/n: ").strip().lower()
    if confirm == "y":
        print("订单执行成功!")
        return "成功"
    else:
        print("订单已取消!")
        return "用户取消订单。"


sales_agent = Agent(
    name="Sales Agent",
    instructions=(
        "你是 ACME 公司的销售代理。"
        "回答始终不超过一句话。"
        "请遵循以下例程与用户互动:"
        "1. 询问他们生活中与追捕roadrunner有关的任何问题。\n"
        "2. 随意提及 ACME 的某个虚构产品可以提供帮助。\n"
        " - 不要提及价格。\n"
        "3. 一旦用户被说服,就给出一个荒谬的价格。\n"
        "4. 只有在一切都完成后,并且用户同意,"
        "才告诉他们一个奇怪的注意事项并执行他们的订单。\n"
        ""
    ),
    tools=[execute_order, transfer_back_to_triage],
)


def look_up_item(search_query):
    """用于查找商品 ID。
    搜索查询可以是描述或关键词。"""
    item_id = "item_132612938"
    print("找到商品:", item_id)
    return item_id


def execute_refund(item_id, reason="未提供"):
    print("\n\n=== 退款摘要 ===")
    print(f"商品 ID: {item_id}")
    print(f"原因: {reason}")
    print("=================\n")
    print("退款执行成功!")
    return "成功"


issues_and_repairs_agent = Agent(
    name="Issues and Repairs Agent",
    instructions=(
        "你是 ACME 公司的客服代理。"
        "回答始终不超过一句话。"
        "请遵循以下例程与用户互动:"
        "1. 首先,提出探究性问题,深入了解用户的问题。\n"
        " - 除非用户已提供原因。\n"
        "2. 提出一个修复方案(可以自己编一个)。\n"
        "3. 只有在用户不满意时,才提供退款。\n"
        "4. 如果用户接受,则查找 ID 并执行退款。"
        ""
    ),
    tools=[execute_refund, look_up_item, transfer_back_to_triage],
)

最后,我们可以在一个循环中运行它(这在 Python Notebook 中无法运行,所以你可以在单独的 Python 文件中尝试):

agent = triage_agent
messages = []

while True:
    user = input("User: ")
    messages.append({"role": "user", "content": user})

    response = run_full_turn(agent, messages)
    agent = response.agent
    messages.extend(response.messages)

Swarm

作为概念验证,我们将这些想法打包成一个名为 Swarm 的示例库。它仅用作示例,不应直接用于生产。但是,您可以随意采用这些想法和代码来构建自己的!