批量处理消息批次API
消息批次允许您异步且经济高效地处理大量消息请求。本指南演示了如何使用消息批次API来处理批量操作,同时将成本降低50%。
在本指南中,我们将演示如何:
- 创建和提交消息批次
- 监控批次处理状态
- 检索和处理批次结果
- 实施有效批处理的最佳实践
设置
首先,让我们用必要的导入来设置我们的环境:
%pip install anthropic
import anthropic
import time
client = anthropic.Anthropic()
MODEL_NAME = "claude-3-5-sonnet-20241022"
示例1:基本批处理
让我们从一个演示创建和监控消息请求批次的简单示例开始。
# 准备一个用于批量处理的问题列表
questions = [
"太阳能电池板如何将阳光转化为电能?",
"共同基金和ETF有什么区别?",
"篮球中的“挡拆”是什么意思?",
"为什么叶子在秋天会变色?"
]
# 创建批次请求
batch_requests = [
{
"custom_id": f"question-{i}",
"params": {
"model": MODEL_NAME,
"max_tokens": 1024,
"messages": [
{"role": "user", "content": question}
]
}
}
for i, question in enumerate(questions)
]
# 提交批次
response = client.beta.messages.batches.create(
requests=batch_requests
)
print(f"Batch ID: {response.id}")
print(f"Status: {response.processing_status}")
print(f"Created at: {response.created_at}")
Batch ID: msgbatch_01GgqTz9XzriGNHzTSGZsJJ8
Status: in_progress
Created at: 2024-10-08 00:46:30.694748+00:00
监控批次进度
现在让我们监控批次处理状态:
def monitor_batch(batch_id, polling_interval=5):
while True:
batch_update = client.beta.messages.batches.retrieve(batch_id)
batch_update_status = batch_update.processing_status
print(batch_update)
print(f"Status: {batch_update_status}")
if batch_update_status == "ended":
return batch_update
time.sleep(polling_interval)
# 监控我们的批次
batch_result = monitor_batch(response.id)
print("\nBatch processing complete!")
print("\nRequest counts:")
print(f" Succeeded: {batch_result.request_counts.succeeded}")
print(f" Errored: {batch_result.request_counts.errored}")
print(f" Processing: {batch_result.request_counts.processing}")
print(f" Canceled: {batch_result.request_counts.canceled}")
print(f" Expired: {batch_result.request_counts.expired}")
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')
Status: ended
Batch processing complete!
Request counts:
Succeeded: 4
Errored: 0
Processing: 0
Canceled: 0
Expired: 0
检索结果
批次完成后,我们可以检索和处理结果:
def process_results(batch_id):
# 首先获取批次状态
batch = client.beta.messages.batches.retrieve(batch_id)
print(f"\nBatch {batch.id} Summary:")
print(f"Status: {batch.processing_status}")
print(f"Created: {batch.created_at}")
print(f"Ended: {batch.ended_at}")
print(f"Expires: {batch.expires_at}")
if batch.processing_status == "ended":
print("\nIndividual Results:")
for result in client.beta.messages.batches.results(batch_id):
print(f"\nResult for {result.custom_id}:")
print(f"Status: {result.result.type}")
if result.result.type == "succeeded":
print(f"Content: {result.result.message.content[0].text[:200]}...")
elif result.result.type == "errored":
print("Request errored")
elif result.result.type == "canceled":
print("Request was canceled")
elif result.result.type == "expired":
print("Request expired")
# 示例用法:
batch_status = monitor_batch(response.id)
if batch_status.processing_status == "ended":
process_results(batch_status.id)
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')
Status: ended
Batch msgbatch_01GgqTz9XzriGNHzTSGZsJJ8 Summary:
Status: ended
Created: 2024-10-08 00:46:30.694748+00:00
Ended: 2024-10-08 00:46:47.283392+00:00
Expires: 2024-10-09 00:46:30.694748+00:00
Individual Results:
Result for question-0:
Status: succeeded
Content: 太阳能电池板通过一个称为光伏效应的过程将阳光转化为电能。以下是其工作原理的分步说明:
1. 太阳能电池板的组成:
太阳能电池板由...
Result for question-1:
Status: succeeded
Content: 共同基金和ETF(交易所交易基金)都是流行的投资工具,允许投资者分散其投资组合,但它们有几个关键区别:
1. 交易:
- 共同基金...
Result for question-2:
Status: succeeded
Content: 挡拆,也称为掩护和滚转,是篮球中涉及两名球员的基本进攻战术。它是这样进行的:
1. 持球者(通常是后卫)拥有...
Result for question-3:
Status: succeeded
Content: 叶子在秋天变色是由于多种因素共同作用的结果,主要与温度、日照的变化以及树木的生物过程有关。以下是发生这种情况的原因分析:
1. 叶绿素的分解:随着白天变短和温度下降,树木停止生产叶绿素...
示例2:用于不同消息类型的更高级批处理
此示例演示了更高级的用法,包括错误处理和在单个批次中处理不同类型的请求,包括简单消息、带系统提示的消息、多轮消息以及带图像的消息。
import base64
def create_complex_batch():
# 获取base64编码的图像
def get_base64_encoded_image(image_path):
with open(image_path, "rb") as image_file:
binary_data = image_file.read()
base_64_encoded_data = base64.b64encode(binary_data)
base_64_string = base_64_encoded_data.decode('utf-8')
return base_64_string
# 不同请求类型的混合
batch_requests = [
{
"custom_id": "simple-question",
"params": {
"model": MODEL_NAME,
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "什么是量子计算?"}
]
}
},
{
"custom_id": "image-analysis",
"params": {
"model": MODEL_NAME,
"max_tokens": 1024,
"messages": [
{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": get_base64_encoded_image("../images/sunset-dawn-nature-mountain-preview.jpg")
}
},
{
"type": "text",
"text": "描述一下这个山景。看起来是什么时间,你观察到了什么天气条件?"
}
]
}
]
}
},
{
"custom_id": "system-prompt",
"params": {
"model": MODEL_NAME,
"max_tokens": 1024,
"system": "你是一位乐于助人的科学老师。",
"messages": [
{"role": "user", "content": "向一个5岁的孩子解释重力。"}
]
}
},
{
"custom_id": "multi-turn",
"params": {
"model": MODEL_NAME,
"max_tokens": 1024,
"messages": [
{"role": "user", "content": "什么是DNA?"},
{"role": "assistant", "content": "DNA就像是生物体的蓝图..."},
{"role": "user", "content": "DNA是如何复制的?"}
]
}
}
]
try:
response = client.beta.messages.batches.create(
requests=batch_requests
)
return response.id
except Exception as e:
print(f"Error creating batch: {e}")
return None
complex_batch_id = create_complex_batch()
print(f"Complex batch ID: {complex_batch_id}")
Complex batch ID: msgbatch_011FAkvqkL8pEskdyS3xdmNW
现在让我们查看批次的结果:
# 示例用法:
batch_status = monitor_batch(complex_batch_id)
if batch_status.processing_status == "ended":
process_results(batch_status.id)
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 24, 27, 768229, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_011FAkvqkL8pEskdyS3xdmNW/results', type='message_batch')
Status: ended
Batch msgbatch_011FAkvqkL8pEskdyS3xdmNW Summary:
Status: ended
Created: 2024-10-08 00:23:58.507550+00:00
Ended: 2024-10-08 00:24:27.768229+00:00
Expires: 2024-10-09 00:23:58.507550+00:00
Individual Results:
Result for simple-question:
Status: succeeded
Content: 量子计算是一种先进的计算形式,它利用量子力学的原理来处理信息。与使用比特(0和1)来存储和处理数据的经典计算机不同...
Result for image-analysis:
Status: succeeded
Content: 这张图片捕捉了日落时分令人惊叹的山景。太阳是一个明亮的圆盘,刚刚落在远处的山脉后面,将温暖的金色光芒投射到整个场景中...
Result for system-prompt:
Status: succeeded
Content: 当然!以下是我如何向5岁的孩子解释重力:
重力就像地球给它上面所有东西的一个巨大的无形拥抱。它使我们能够固定在地面上,而不是漂浮起来...
Result for multi-turn:
Status: succeeded
Content: DNA复制是细胞分裂过程中DNA自我复制的过程。以下是其工作原理的基本概述:
1. 解旋:DNA的双螺旋结构解开,两条链...