使用OpenAI Agent SDK和Databricks MCP服务器构建供应链Copilot
解决方案概述
在供应链运营中,代理可以解决直接影响服务水平和收入的问题:我们是否有库存和能力来满足当前需求?制造延迟将在何处发生,以及这些延迟将如何向下游传播?哪些工作流程调整可以最大程度地减少中断?
本指南概述了使用OpenAI Agent SDK和Databricks Managed MCP构建供应链Copilot的过程。MCP使代理能够查询结构化和非结构化的企业数据,例如库存、销售、供应商提要、本地事件等,以实现实时可见性、早期检测物料短缺和主动建议。一个编排层支撑着该系统,统一了:
- 对结构化库存、需求和供应商数据的查询
- 每个批发商的时间序列预测
- 基于图的原材料需求和运输优化
- 支持跨非结构化通信进行语义搜索的向量索引电子邮件存档
- 收入风险计算
在本指南结束时,您将部署一个查询分布式数据源、预测模型、突出新兴瓶颈并推荐主动措施的模板。它可以解决以下问题:
- 哪些产品依赖于L6HUK材料?
- 如果我们无法生产预测量的autoclave_1产品,将有多少收入面临风险?
- 目前哪些产品有延迟?
- 注射器_1是否有任何延迟?
- 生产注射器_1需要哪些原材料?
- 以下原材料中是否有任何短缺:O4GRQ、Q5U3A、OAIFB或58RJD?
- 与批发商9相关的延迟是什么?
利益相关者可以提交自然语言提示并立即获得答案。 本指南将引导您完成在自己的环境中实现此解决方案的每个步骤。
架构
本指南中提出的架构将OpenAI Agent置于Databricks现有分析工作负载的顶部。您可以将Databricks组件公开为可调用的Unity Catalog函数。该代理使用OpenAI Agent SDK实现,并连接到Databricks Managed MCP服务器。
结果是一个单一的、近乎实时的对话界面,可为整个供应链提供精细的预测、动态库存建议和数据驱动的决策。该架构产生了一个代理层,该层利用您现有的企业数据(结构化和非结构化)、经典ML模型和图分析功能。
设置Databricks身份验证
您可以通过向~/.databrickscfg
添加配置文件来设置Databricks身份验证。一个Databricks配置配置文件包含Databricks进行身份验证所需的信息和其他信息。
代码片段的WorkspaceClient(profile=...)
调用将拾取该信息。它告诉SDK加载存储的凭据,因此您的代码永远不需要嵌入令牌。另一种选择是创建环境变量,例如DATABRICKS_HOST
和DATABRICKS_TOKEN
,但建议使用~/.databrickscfg
。
通过“设置”->“开发人员”->“访问令牌”->“生成新令牌”生成工作区个人访问令牌(PAT),然后将其记录在~/.databrickscfg
中。
要创建此Databricks配置配置文件,请运行Databricks CLI databricks configure
命令,或按照以下步骤操作:
- 如果
~/.databrickscfg
丢失,请创建它:touch ~/.databrickscfg
- 打开文件:
nano ~/.databrickscfg
- 插入一个配置文件部分,其中列出了工作区URL和个人访问令牌(PAT)(可以随时添加其他配置文件):
[DEFAULT]
host = https://dbc-a1b2345c-d6e7.cloud.databricks.com #在此处添加您的工作区URL
token = dapi123... #在此处添加您的PAT
然后,您可以使用Databricks CLI或SDK运行此健全性检查命令databricks clusters list
。如果它在不提示输入凭据的情况下返回数据,则表示主机正确且令牌有效。
作为先决条件,必须在Databricks工作区中启用Serverless计算和Unity Catalog。
(可选)Databricks供应链设置
本指南可用于处理您自己的Databricks供应链数据集和分析工作负载。
或者,您可以使用定制版本的Databricks的供应链优化解决方案加速器来加快设置。为此,您可以将此GitHub存储库克隆到Databricks工作区中,并按照README文件中的说明进行操作。运行该解决方案将建立代理稍后通过MCP访问的每个资产,从原始企业表和非结构化电子邮件到经典ML模型和图工作负载。
如果您希望使用自己的数据集和模型,请确保将相关组件包装为Unity Catalog函数,并定义向量搜索索引,如加速器所示。您也可以公开Genie Spaces。
示例数据镜像了一个现实的制药网络:三个工厂生产30种产品,将它们运往五个配送中心,每个配送中心服务30-60个批发商。该存储库提供每个产品-批发商对的时间序列需求、配送中心到批发商的映射、工厂到配送中心的成本矩阵、工厂产量上限以及标记发货延迟的电子邮件存档。
回答供应链运营问题需要对上游瓶颈如何通过生产、物流和履行进行级联建模,以便利益相关者可以缩短交货时间、避免过多的库存并控制成本。这些笔记本将这些原始提要转换为受管的、可调用的工件:
- 需求预测和汇总(笔记本2):使用Holt-Winters季节性模型(或任何首选的时间序列方法)为每个批发商和配送中心生成提前一周的SKU需求。它利用Spark的并行化能力来处理大规模预测任务,方法是使用Pandas UDF(将您的单节点数据科学代码分发到多个节点)。然后,将预测汇总到每个产品的DC级别总数。输出是一个表
product_demand_forecasted
,其中包含配送中心级别的聚合预测。 - 原材料计划(笔记本3):通过图处理构建产品到材料的映射,将需求沿着物料清单层次结构传播,以大规模计算组件需求。我们将物料清单转换为图,以便将产品预测转换为精确的原材料需求,从而得到两个表:
raw_material_demand
和raw_material_supply
。 - 运输优化(笔记本4):在容量和需求约束下最小化工厂到配送中心的运输成本,利用Pandas UDF,在
shipment_recommendations
中输出建议。 - 语义电子邮件搜索(笔记本6):使用OpenAI嵌入模型将供应链经理的电子邮件嵌入向量索引中,从而实现语义查询,以显示延迟和风险信号。
每个见解都包装在笔记本5和笔记本7中的Unity Catalog(UC)函数中,例如product_from_raw
、raw_from_product
、revenue_risk
、lookup_product_demand
、query_unstructured_emails
。由于UC同时管理表、模型和向量索引,因此代理可以在运行时决定是进行预测、跟踪BOM依赖关系、评估收入影响、获取历史记录还是搜索电子邮件,始终在调用者的数据访问权限之内。
结果是一个端到端的管道,该管道预测需求、识别原材料差距、优化物流、显示隐藏风险,并允许分析师提出临时问题和显示延迟警告。
在执行完所有笔记本(通过运行笔记本1)后,Databricks环境已准备就绪,您可以继续构建代理并将其连接到Databricks。
连接到Databricks MCP服务器
目前,MCP规范定义了三种服务器,基于它们使用的传输机制:
- stdio服务器作为应用程序的子进程运行。您可以将其视为在“本地”运行。
- HTTP over SSE服务器远程运行。您通过URL连接到它们。
- Streamable HTTP服务器使用MCP规范中定义的Streamable HTTP传输远程运行。
Databricks托管的MCP端点(向量搜索、Unity Catalog函数、Genie)位于标准HTTPS URL之后,并实现MCP规范中定义的Streamable HTTP传输。确保您的工作区已启用Serverless,以便您可以连接到Databricks托管的MCP。
将Databricks MCP服务器集成到OpenAI Agent中
OpenAI Agent可在此处获得。首先安装所需的依赖项:
pip install -r requirements.txt
您需要一个OpenAI API密钥才能安全地访问API。如果您是OpenAI API的新手,请注册一个帐户。您可以按照这些步骤创建密钥并将其存储在安全的位置。
本指南展示了如何使用FastAPI服务此Agent并通过React UI进行聊天。但是,main.py
设置为一个独立的REPL,因此在安装了必需的依赖项并设置了必要的凭据(包括上面所述的Databricks主机和个人访问令牌)之后,您可以通过单个命令直接从命令行运行Agent:
python main.py
main.py文件编排了代理逻辑,使用OpenAI Agent SDK并公开Databricks MCP向量搜索端点和Unity Catalog函数作为可调用的工具。它首先读取指向目标目录、模式和Unity Catalog(UC)函数路径的环境变量,然后公开两个工具:vector_search
,它查询Databricks向量搜索索引;以及uc_function
,它通过MCP执行Unity Catalog函数。这两个工具都通过httpx进行身份验证的POST请求,并返回Databricks REST API的原始JSON。这两个助手都通过_databricks_ctx()
实用程序(由DatabricksOAuthClientProvider支持)获取工作区主机和个人访问令牌,并使用httpx发出身份验证的POST请求,返回原始JSON响应。
在run_agent()
内部,脚本实例化一个名为“Assistant”的Agent,该Agent硬范围限定为供应链主题。每个响应都必须调用已注册的两个工具之一,并且保护程序会强制Agent拒绝任何超出物流、库存、采购或预测范围的内容。每个用户提示都在SDK跟踪上下文内处理。一个简单的REPL驱动交互:用户输入包装在OpenTelemetry风格的跟踪中,通过Runner.run
分派,并打印最终答案(或保护程序道歉)。程序通过main()
中的asyncio.run
调用启动,使整个流程完全异步且非阻塞。
"""
使用Databricks MCP Vector Search和UC Functions通过OpenAI Agents SDK的CLI助手。
"""
import asyncio
import os
import httpx
from typing import Dict, Any
from agents import Agent, Runner, function_tool, gen_trace_id, trace
from agents.exceptions import (
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
)
from agents.model_settings import ModelSettings
from databricks_mcp import DatabricksOAuthClientProvider
from databricks.sdk import WorkspaceClient
from supply_chain_guardrails import supply_chain_guardrail
CATALOG = os.getenv("MCP_VECTOR_CATALOG", "main") # 如果您的数据资产位于不同的位置,请覆盖目录、模式、functions_path名称
SCHEMA = os.getenv("MCP_VECTOR_SCHEMA", "supply_chain_db")
FUNCTIONS_PATH = os.getenv("MCP_FUNCTIONS_PATH", "main/supply_chain_db")
DATABRICKS_PROFILE = os.getenv("DATABRICKS_PROFILE", "DEFAULT") # 如果使用不同的配置文件名,请覆盖
HTTP_TIMEOUT = 30.0 # 秒
async def _databricks_ctx():
"""返回 (workspace, PAT token, base_url)。"""
ws = WorkspaceClient(profile=DATABRICKS_PROFILE)
token = DatabricksOAuthClientProvider(ws).get_token()
return ws, token, ws.config.host
@function_tool
async def vector_search(query: str) -> Dict[str, Any]:
"""查询Databricks MCP Vector Search索引。"""
ws, token, base_url = await _databricks_ctx()
url = f"{base_url}/api/2.0/mcp/vector-search/{CATALOG}/{SCHEMA}"
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(url, json={"query": query}, headers=headers)
resp.raise_for_status()
return resp.json()
@function_tool
async def uc_function(function_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""使用参数调用Databricks Unity Catalog函数。"""
ws, token, base_url = await _databricks_ctx()
url = f"{base_url}/api/2.0/mcp/functions/{FUNCTIONS_PATH}"
headers = {"Authorization": f"Bearer {token}"}
payload = {"function": function_name, "params": params}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(url, json=payload, headers=headers)
resp.raise_for_status()
return resp.json()
async def run_agent():
agent = Agent(
name="Assistant",
instructions="您是Databricks MCP的供应链助手;您**只能**回答**严格**关于供应链数据、物流、库存、采购、需求预测等方面的问题;对于每个答案,您都必须调用一个已注册的工具;如果用户询问任何与供应链无关的内容,请**精确地**回复‘抱歉,我只能帮助解决供应链问题’。",
tools=[vector_search, uc_function],
model_settings=ModelSettings(model="gpt-4o", tool_choice="required"),
output_guardrails=[supply_chain_guardrail],
)
print("Databricks MCP助手已准备就绪。请输入问题或输入“exit”退出。")
while True:
user_input = input("您: ").strip()
if user_input.lower() in {"exit", "quit"}:
break
trace_id = gen_trace_id()
with trace(workflow_name="Databricks MCP Agent", trace_id=trace_id):
try:
result = await Runner.run(starting_agent=agent, input=user_input)
print("助手:", result.final_output)
except InputGuardrailTripwireTriggered:
print("助手: 抱歉,我只能帮助解决供应链问题。")
except OutputGuardrailTripwireTriggered:
print("助手: 抱歉,我只能帮助解决供应链问题。")
def main():
asyncio.run(run_agent())
if __name__ == "__main__":
main()
databricks_mcp.py作为集中的身份验证抽象:它从给定的WorkspaceClient(ws.config.token)获取我们之前创建的个人访问令牌,并保护应用程序的其余部分免受Databricks特定OAuth逻辑的影响。通过将所有令牌处理细节限制在此单个模块中,可以通过更新此文件来适应Databricks身份验证方案的任何未来更改。
"""
Databricks OAuth客户端提供商,用于MCP服务器。
"""
class DatabricksOAuthClientProvider:
def __init__(self, ws):
self.ws = ws
def get_token(self):
# 对于Databricks SDK >=0.57.0,令牌可作为ws.config.token获得
return self.ws.config.token
supply_chain_guardrails.py通过启动第二个代理(“供应链检查”)来实现轻量级的输出保护程序,该代理对候选答案进行分类。主代理将其草稿回复交给此检查程序,后者返回一个具有布尔值is_supply_chain
的Pydantic对象。如果该标志为false,则保护程序会引发一个触发器,调用者会替换为拒绝。
"""
输出保护程序,阻止与供应链主题无关的答案。
"""
from __future__ import annotations
from pydantic import BaseModel
from agents import Agent, Runner, GuardrailFunctionOutput
from agents import output_guardrail
from agents.run_context import RunContextWrapper
class SupplyChainCheckOutput(BaseModel):
reasoning: str
is_supply_chain: bool
guardrail_agent = Agent(
name="Supply-chain check",
instructions=(
"检查文本是否属于供应链分析和运营领域 "
"严格返回匹配SupplyChainCheckOutput模式的JSON"
),
output_type=SupplyChainCheckOutput,
)
@output_guardrail
async def supply_chain_guardrail(
ctx: RunContextWrapper, agent: Agent, output
) -> GuardrailFunctionOutput:
"""输出保护程序,阻止非供应链答案"""
text = output if isinstance(output, str) else getattr(output, "response", str(output))
result = await Runner.run(guardrail_agent, text, context=ctx.context)
return GuardrailFunctionOutput(
output_info=result.final_output,
tripwire_triggered=not result.final_output.is_supply_chain,
)
使用FastAPI服务代理
要启动后端(Fast API),请运行以下命令:
python -m uvicorn api_server:app --reload --port 8000
API将在http://localhost:8000可用(有关FastAPI文档,请访问:http://localhost:8000/docs)。
api_server.py是一个FastAPI后端,它将您的代理公开为流式/chat
API端点。启动时,它配置CORS以便本地前端可以与其通信,然后定义build_mcp_servers()
,该函数对调用者的Databricks工作区进行身份验证,构建两个HTTP“服务器工具”(一个用于向量搜索,一个用于Unity-Catalog函数),并预先连接它们以实现低延迟使用。每个传入到/chat
的POST请求都包含一个用户消息。处理程序启动一个全新的Agent,其mcp_servers
列表由这些流式工具填充,并且其模型被强制要求在每个回合调用一个工具。
"""
FastAPI包装器,将代理公开为流式`/chat`端点。
"""
import os
import asyncio
import logging
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from agents.exceptions import (
InputGuardrailTripwireTriggered,
OutputGuardrailTripwireTriggered,
)
from agents import Agent, Runner, gen_trace_id, trace
from agents.mcp import MCPServerStreamableHttp, MCPServerStreamableHttpParams
from agents.model_settings import ModelSettings
from databricks_mcp import DatabricksOAuthClientProvider
from databricks.sdk import WorkspaceClient
from supply_chain_guardrails import supply_chain_guardrail
CATALOG = os.getenv("MCP_VECTOR_CATALOG", "main")
SCHEMA = os.getenv("MCP_VECTOR_SCHEMA", "supply_chain_db")
FUNCTIONS_PATH = os.getenv("MCP_FUNCTIONS_PATH", "main/supply_chain_db")
DATABRICKS_PROFILE = os.getenv("DATABRICKS_PROFILE", "DEFAULT")
HTTP_TIMEOUT = 30.0 # 秒
app = FastAPI()
# 允许本地开发前端
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class ChatRequest(BaseModel):
message: str
async def build_mcp_servers():
"""初始化Databricks MCP向量和UC函数服务器。"""
ws = WorkspaceClient(profile=DATABRICKS_PROFILE)
token = DatabricksOAuthClientProvider(ws).get_token()
base = ws.config.host
vector_url = f"{base}/api/2.0/mcp/vector-search/{CATALOG}/{SCHEMA}"
fn_url = f"{base}/api/2.0/mcp/functions/{FUNCTIONS_PATH}"
async def _proxy_tool(request_json: dict, url: str):
import httpx
headers = {"Authorization": f"Bearer {token}"}
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
resp = await client.post(url, json=request_json, headers=headers)
resp.raise_for_status()
return resp.json()
headers = {"Authorization": f"Bearer {token}"}
servers = [
MCPServerStreamableHttp(
MCPServerStreamableHttpParams(
url=vector_url,
headers=headers,
timeout=HTTP_TIMEOUT,
),
name="vector_search",
client_session_timeout_seconds=60,
),
MCPServerStreamableHttp(
MCPServerStreamableHttpParams(
url=fn_url,
headers=headers,
timeout=HTTP_TIMEOUT,
),
name="uc_functions",
client_session_timeout_seconds=60,
),
]
# 确保在之前初始化服务器
await asyncio.gather(*(s.connect() for s in servers))
return servers
@app.post("/chat")
async def chat_endpoint(req: ChatRequest):
try:
servers = await build_mcp_servers()
agent = Agent(
name="Assistant",
instructions="使用工具回答问题。",
mcp_servers=servers,
model_settings=ModelSettings(tool_choice="required"),
output_guardrails=[supply_chain_guardrail],
)
trace_id = gen_trace_id()
async def agent_stream():
logging.info(f"[AGENT_STREAM] Input message: {req.message}")
try:
with trace(workflow_name="Databricks MCP Example", trace_id=trace_id):
result = await Runner.run(starting_agent=agent, input=req.message)
logging.info(f"[AGENT_STREAM] Raw agent result: {result}")
try:
logging.info(
f"[AGENT_STREAM] RunResult __dict__: {getattr(result, '__dict__', str(result))}"
)
raw_responses = getattr(result, "raw_responses", None)
logging.info(f"[AGENT_STREAM] RunResult raw_responses: {raw_responses}")
except Exception as log_exc:
logging.warning(f"[AGENT_STREAM] Could not log RunResult details: {log_exc}")
yield result.final_output
except InputGuardrailTripwireTriggered:
# 话题不符的问题被保护程序拒绝
yield "抱歉,我只能帮助解决供应链问题。"
except OutputGuardrailTripwireTriggered:
# 范围外答案被保护程序阻止
yield "抱歉,我只能帮助解决供应链问题。"
except Exception:
logging.exception("[AGENT_STREAM] Exception during agent run")
yield "[ERROR] Agent运行期间发生异常。请检查后端日志以获取详细信息。"
return StreamingResponse(agent_stream(), media_type="text/plain")
except Exception:
logging.exception("chat_endpoint failed")
return StreamingResponse(
(line.encode() for line in ["Internal server error 🙈"]),
media_type="text/plain",
status_code=500,
)
该端点在代理进行推理和调用MCP工具时,将令牌流式传输回浏览器。
通过React聊天界面与用户互动
在另一个终端中,运行以下命令来启动前端(React UI):
cd ui
npm install
npm run dev
该应用程序将在http://localhost:5173上可用
/ui文件夹中的React聊天UI提供了一个用户友好的Web界面,用于与后端代理进行交互。它包含用于显示对话历史记录的组件以及用于发送消息的文本输入。
当用户提交消息时,UI会将其发送到后端/chat
端点,并实时流式传输代理的响应,在新的内容到达时更新聊天窗口。设计强调对话体验,使用户能够轻松地提出问题并从Databricks支持的代理那里获得答案,所有这些都在一个响应迅速且交互式的Web应用程序中完成。
特别是,ChatUI.jsx文件包含聊天界面的核心逻辑,包括如何将用户消息发送到后端以及如何处理和实时显示代理的流式响应。
"""
处理来自FastAPI /chat端点的令牌流的代码片段。
"""
const reader = response.body.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
assistantMsg.text += new TextDecoder().decode(value);
setMessages(m => {
const copy = [...m];
copy[copy.length - 1] = { ...assistantMsg };
return copy;
});
}
UI在代理响应到达时对其进行流式传输和显示,从而创建流畅的实时聊天体验。突出显示这一点将清楚地向读者展示UI如何从您的后端代理实现交互式对话反馈。
提示应用程序
导航到http://localhost:5173并尝试以下提示:
- 哪些产品依赖于L6HUK材料?
- 如果我们无法生产预测量的autoclave_1产品,将有多少收入面临风险?
- 目前哪些产品有延迟?
- 注射器_1是否有任何延迟?
- 生产注射器_1需要哪些原材料?
- 以下原材料中是否有任何短缺:O4GRQ、Q5U3A、OAIFB或58RJD?
- 与批发商9相关的延迟是什么? 代理将调用相关工具并为用户格式化一个有根据的答案。
在OpenAI API仪表板中跟踪代理调用
在OpenAI API仪表板中,您可以打开“跟踪”视图,查看代理调用的每个函数。在下面的示例中,代理首先调用raw_from_product
来获取与特定产品关联的材料,然后调用revenue_risk
来估算短缺的收入影响。
后续步骤
您可以考虑添加多轮功能 如果您想将此设置改编到自己的工作区,还可以添加Genie Space MCP服务器