Orchestrator-Workers Workflow
在此工作流程中,一个中心LLM会动态地分解任务,将其委托给工作LLM,并综合它们的结果。
何时使用此工作流程
此工作流程非常适合那些你无法预测所需子任务的复杂任务。与简单并行化的关键区别在于其灵活性——子任务不是预先定义的,而是由协调器根据特定输入确定的。
from typing import Dict, List, Optional
from util import llm_call, extract_xml
def parse_tasks(tasks_xml: str) -> List[Dict]:
"""将XML任务解析为任务字典列表。"""
tasks = []
current_task = {}
for line in tasks_xml.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith("<task>"):
current_task = {}
elif line.startswith("<type>"):
current_task["type"] = line[6:-7].strip()
elif line.startswith("<description>"):
current_task["description"] = line[12:-13].strip()
elif line.startswith("</task>"):
if "description" in current_task:
if "type" not in current_task:
current_task["type"] = "default"
tasks.append(current_task)
return tasks
class FlexibleOrchestrator:
"""分解任务并使用工作LLM并行运行它们。"""
def __init__(
self,
orchestrator_prompt: str,
worker_prompt: str,
):
"""使用提示模板进行初始化。"""
self.orchestrator_prompt = orchestrator_prompt
self.worker_prompt = worker_prompt
def _format_prompt(self, template: str, **kwargs) -> str:
"""使用变量格式化提示模板。"""
try:
return template.format(**kwargs)
except KeyError as e:
raise ValueError(f"缺少必需的提示变量: {e}")
def process(self, task: str, context: Optional[Dict] = None) -> Dict:
"""通过分解任务并并行运行子任务来处理任务。"""
context = context or {}
# 步骤1:获取协调器响应
orchestrator_input = self._format_prompt(
self.orchestrator_prompt,
task=task,
**context
)
orchestrator_response = llm_call(orchestrator_input)
# 解析协调器响应
analysis = extract_xml(orchestrator_response, "analysis")
tasks_xml = extract_xml(orchestrator_response, "tasks")
tasks = parse_tasks(tasks_xml)
print("\n=== ORCHESTRATOR OUTPUT ===")
print(f"\nANALYSIS:\n{analysis}")
print(f"\nTASKS:\n{tasks}")
# 步骤2:处理每个任务
worker_results = []
for task_info in tasks:
worker_input = self._format_prompt(
self.worker_prompt,
original_task=task,
task_type=task_info['type'],
task_description=task_info['description'],
**context
)
worker_response = llm_call(worker_input)
result = extract_xml(worker_response, "response")
worker_results.append({
"type": task_info["type"],
"description": task_info["description"],
"result": result
})
print(f"\n=== WORKER RESULT ({task_info['type']}) ===\n{result}\n")
return {
"analysis": analysis,
"worker_results": worker_results,
}
示例用例:营销变体生成
ORCHESTRATOR_PROMPT = """
分析此任务并将其分解为2-3个不同的方法:
任务:{task}
以以下格式返回您的响应:
<analysis>
解释您对任务的理解以及哪些变体是有价值的。
重点关注每种方法如何服务于任务的不同方面。
</analysis>
<tasks>
<task>
<type>formal</type>
<description>撰写一个精确的技术版本,强调规格</description>
</task>
<task>
<type>conversational</type>
<description>撰写一个引人入胜、友好的版本,与读者建立联系</description>
</task>
</tasks>
"""
WORKER_PROMPT = """
根据以下内容生成内容:
任务:{original_task}
风格:{task_type}
指南:{task_description}
以以下格式返回您的响应:
<response>
您的内容在此,保持指定的风格并完全满足要求。
</response>
"""
orchestrator = FlexibleOrchestrator(
orchestrator_prompt=ORCHESTRATOR_PROMPT,
worker_prompt=WORKER_PROMPT,
)
results = orchestrator.process(
task="为一款新的环保水瓶撰写产品描述",
context={
"target_audience": "有环保意识的千禧一代",
"key_features": ["不含塑料", "隔热", "终身保修"]
}
)
=== ORCHESTRATOR OUTPUT ===
ANALYSIS:
此任务需要为环保水瓶创建营销文案,这提供了多个有效的沟通角度。关键挑战在于平衡环保效益与实用功能,同时保持对不同消费者群体的吸引力。
关键变体之所以有价值,是因为:
1. 技术型买家需要有关材料和环境影响的具体细节
2. 生活方式型消费者更能接受情感利益和故事叙述
3. 不同的语气可以针对不同的市场细分,同时推广相同的核心产品
技术方法服务于那些根据规格和可衡量的影响做出购买决策的人,而对话方法则与那些根据生活方式契合度和情感共鸣进行购买的人建立联系。
TASKS:
[{'type': 'formal', 'description': '>创建专注于规格的描述,突出材料成分、环保认证、容量测量和可量化的生态影响(例如,“每年节省的塑料瓶数量”)。包括制造工艺和回收能力的技术细节。<'}, {'type': 'conversational', 'description': '>开发一种叙事风格的描述,侧重于用户体验、生活方式效益和与环境保护的情感联系。使用相关的场景和休闲语言来帮助读者将水瓶视为他们日常生活的一部分。<'}, {'type': 'hybrid', 'description': '>通过将技术细节融入故事驱动的格式中,将情感吸引力与关键规格相结合。平衡环境影响统计数据与关于可持续生活方式选择的励志信息。<'}]
=== WORKER RESULT (formal) ===
隆重推出 EcoVessel Pro 系列:一款精密工程的水合解决方案,由全球回收标准 (GRS) 认证的 100% 消费后回收不锈钢制成。
技术规格:
• 容量:750毫升 / 25.4液量盎司
• 材料:18/8 食品级回收不锈钢(304级)
• 壁厚:2毫米双壁真空绝缘
• 温度保持:冷藏24小时 / 保温12小时
• 重量:340克 / 12盎司(空)
环境认证:
• Climate Partner 认证的碳中和产品
• NSF International 的不含 BPA 认证
• ISO 14001 环境管理体系认证
制造工艺:
在我们的碳中和工厂中使用水力发电制造,每个容器都经过专有的蒸汽灭菌环保工艺处理,无需化学清洁剂。粉末涂层通过零浪费静电工艺进行涂覆。
环境影响指标:
• 每个用户每年可消除约 167 个一次性塑料瓶
• 与传统瓶子制造相比,碳足迹降低 87%
• 通过我们的闭环回收计划,在生命周期结束时可 100% 回收
• 通过消除塑料瓶生产,每年可节约 2,920 升水
每个单元都包含一个数字跟踪代码,用于实时影响监控和真实性验证。设计使用寿命至少为 10 年(正常使用条件下)。
=== WORKER RESULT (conversational) ===
=== WORKER RESULT (hybrid) ===
认识 AquaVerde Elite - 让您的日常补水仪式成为对我们地球未来的有力宣言。
想象一下,您知道您喝的每一口都能帮助每年阻止多达 167 个一次性塑料瓶进入我们的海洋,从而开始您的一天。AquaVerde Elite 不仅仅是一个水瓶;它是您对抗塑料污染的个人大使,由航空级回收不锈钢制成,获得了第二次生命。
这款 24 盎司的伴侣旨在陪伴您度过生活的冒险之旅,它采用了我们创新的 ThermaLock™ 技术,可将冷饮保持凉爽 24 小时,或将热饮保温 12 小时。双壁真空绝缘不仅关乎性能——它经过精心设计,使用的材料比传统设计少 30%,同时提供卓越的保温性能。
水瓶时尚的轮廓融入了提升您日常体验的贴心细节:防漏的 AutoSeal 瓶盖,单手即可操作;由回收登山绳制成的内置提环;以及我们标志性的 CloudTouch™ 外层涂层,既防滑又美观。有四种自然启发的颜色(Ocean Deep、Forest Canopy、Desert Dawn 和 Mountain Mist),每个水瓶的涂层均采用水性、零 VOC 的涂装工艺制成。
但也许最美的特点是您看不见的——每个 AquaVerde Elite 都有助于资助发展中国家的清洁水项目,每次购买的 2% 都用于支持全球的水资源保护倡议。您选择携带 AquaVerde Elite 不仅仅是为了保持水分;更是为了成为全球可持续发展运动的一部分。
重要的规格:
• 容量:24盎司/710毫升
• 重量:12.8盎司
• 材料:90% 回收 18/8 不锈钢
• 不含 BPA、不含邻苯二甲酸盐
• 可洗碗
• 终身保修
加入日益壮大的 AquaVerde 携带者社区,他们共同阻止了超过 200 万个一次性瓶子进入我们的生态系统。因为每一滴都很重要,每一个选择都很重要。