批量处理消息批次API

消息批次允许您异步且经济高效地处理大量消息请求。本指南演示了如何使用消息批次API来处理批量操作,同时将成本降低50%。

在本指南中,我们将演示如何:

  1. 创建和提交消息批次
  2. 监控批次处理状态
  3. 检索和处理批次结果
  4. 实施有效批处理的最佳实践

设置

首先,让我们用必要的导入来设置我们的环境:

%pip install anthropic
import anthropic
import time

client = anthropic.Anthropic()
MODEL_NAME = "claude-3-5-sonnet-20241022"

示例1:基本批处理

让我们从一个演示创建和监控消息请求批次的简单示例开始。

# 准备一个用于批量处理的问题列表
questions = [
    "太阳能电池板如何将阳光转化为电能?",
    "共同基金和ETF有什么区别?",
    "篮球中的“挡拆”是什么意思?",
    "为什么叶子在秋天会变色?"
]

# 创建批次请求
batch_requests = [
    {
        "custom_id": f"question-{i}",
        "params": {
            "model": MODEL_NAME,
            "max_tokens": 1024,
            "messages": [
                {"role": "user", "content": question}
            ]
        }
    }
    for i, question in enumerate(questions)
]

# 提交批次
response = client.beta.messages.batches.create(
    requests=batch_requests
)

print(f"Batch ID: {response.id}")
print(f"Status: {response.processing_status}")
print(f"Created at: {response.created_at}")
Batch ID: msgbatch_01GgqTz9XzriGNHzTSGZsJJ8
Status: in_progress
Created at: 2024-10-08 00:46:30.694748+00:00

监控批次进度

现在让我们监控批次处理状态:

def monitor_batch(batch_id, polling_interval=5):
    while True:
        batch_update = client.beta.messages.batches.retrieve(batch_id)
        batch_update_status = batch_update.processing_status
        print(batch_update)
        print(f"Status: {batch_update_status}")
        if batch_update_status == "ended":  
            return batch_update

        time.sleep(polling_interval)

# 监控我们的批次
batch_result = monitor_batch(response.id) 
print("\nBatch processing complete!")
print("\nRequest counts:")
print(f"  Succeeded: {batch_result.request_counts.succeeded}")
print(f"  Errored: {batch_result.request_counts.errored}")
print(f"  Processing: {batch_result.request_counts.processing}")
print(f"  Canceled: {batch_result.request_counts.canceled}")
print(f"  Expired: {batch_result.request_counts.expired}")
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')
Status: ended

Batch processing complete!

Request counts:
  Succeeded: 4
  Errored: 0
  Processing: 0
  Canceled: 0
  Expired: 0

检索结果

批次完成后,我们可以检索和处理结果:

def process_results(batch_id):
    # 首先获取批次状态
    batch = client.beta.messages.batches.retrieve(batch_id)

    print(f"\nBatch {batch.id} Summary:")
    print(f"Status: {batch.processing_status}")
    print(f"Created: {batch.created_at}")
    print(f"Ended: {batch.ended_at}")
    print(f"Expires: {batch.expires_at}")

    if batch.processing_status == "ended":
        print("\nIndividual Results:")
        for result in client.beta.messages.batches.results(batch_id):
            print(f"\nResult for {result.custom_id}:")
            print(f"Status: {result.result.type}")

            if result.result.type == "succeeded":
                print(f"Content: {result.result.message.content[0].text[:200]}...")
            elif result.result.type == "errored":
                print("Request errored")
            elif result.result.type == "canceled":
                print("Request was canceled")
            elif result.result.type == "expired":
                print("Request expired")

# 示例用法:
batch_status = monitor_batch(response.id)
if batch_status.processing_status == "ended":
    process_results(batch_status.id)
BetaMessageBatch(id='msgbatch_01GgqTz9XzriGNHzTSGZsJJ8', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 46, 47, 283392, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 46, 30, 694748, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_01GgqTz9XzriGNHzTSGZsJJ8/results', type='message_batch')
Status: ended

Batch msgbatch_01GgqTz9XzriGNHzTSGZsJJ8 Summary:
Status: ended
Created: 2024-10-08 00:46:30.694748+00:00
Ended: 2024-10-08 00:46:47.283392+00:00
Expires: 2024-10-09 00:46:30.694748+00:00

Individual Results:

Result for question-0:
Status: succeeded
Content: 太阳能电池板通过一个称为光伏效应的过程将阳光转化为电能。以下是其工作原理的分步说明:

1. 太阳能电池板的组成:
太阳能电池板由...

Result for question-1:
Status: succeeded
Content: 共同基金和ETF(交易所交易基金)都是流行的投资工具,允许投资者分散其投资组合,但它们有几个关键区别:

1. 交易:
- 共同基金...

Result for question-2:
Status: succeeded
Content: 挡拆,也称为掩护和滚转,是篮球中涉及两名球员的基本进攻战术。它是这样进行的:

1. 持球者(通常是后卫)拥有...

Result for question-3:
Status: succeeded
Content: 叶子在秋天变色是由于多种因素共同作用的结果,主要与温度、日照的变化以及树木的生物过程有关。以下是发生这种情况的原因分析:

1. 叶绿素的分解:随着白天变短和温度下降,树木停止生产叶绿素...

示例2:用于不同消息类型的更高级批处理

此示例演示了更高级的用法,包括错误处理和在单个批次中处理不同类型的请求,包括简单消息、带系统提示的消息、多轮消息以及带图像的消息。

import base64
def create_complex_batch():
    # 获取base64编码的图像
    def get_base64_encoded_image(image_path):
        with open(image_path, "rb") as image_file:
            binary_data = image_file.read()
            base_64_encoded_data = base64.b64encode(binary_data)
            base_64_string = base_64_encoded_data.decode('utf-8')
            return base_64_string

    # 不同请求类型的混合
    batch_requests = [
        {
            "custom_id": "simple-question",
            "params": {
                "model": MODEL_NAME,
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "什么是量子计算?"}
                ]
            }
        },
        {
            "custom_id": "image-analysis",
            "params": {
                "model": MODEL_NAME,
                "max_tokens": 1024,
                "messages": [
                    {
                        "role": "user",
                        "content": [
                            {
                                "type": "image",
                                "source": {
                                    "type": "base64",
                                    "media_type": "image/jpeg",
                                    "data": get_base64_encoded_image("../images/sunset-dawn-nature-mountain-preview.jpg")
                                }
                            },
                            {
                                "type": "text",
                                "text": "描述一下这个山景。看起来是什么时间,你观察到了什么天气条件?"
                            }
                        ]
                    }
                ]
            }
        },
        {
            "custom_id": "system-prompt",
            "params": {
                "model": MODEL_NAME,
                "max_tokens": 1024,
                "system": "你是一位乐于助人的科学老师。",
                "messages": [
                    {"role": "user", "content": "向一个5岁的孩子解释重力。"}
                ]
            }
        },
        {
            "custom_id": "multi-turn",
            "params": {
                "model": MODEL_NAME,
                "max_tokens": 1024,
                "messages": [
                    {"role": "user", "content": "什么是DNA?"},
                    {"role": "assistant", "content": "DNA就像是生物体的蓝图..."},
                    {"role": "user", "content": "DNA是如何复制的?"}
                ]
            }
        }
    ]

    try:
        response = client.beta.messages.batches.create(
            requests=batch_requests
        )
        return response.id
    except Exception as e:
        print(f"Error creating batch: {e}")
        return None
complex_batch_id = create_complex_batch()
print(f"Complex batch ID: {complex_batch_id}")
Complex batch ID: msgbatch_011FAkvqkL8pEskdyS3xdmNW

现在让我们查看批次的结果:

# 示例用法:
batch_status = monitor_batch(complex_batch_id)
if batch_status.processing_status == "ended":
    process_results(batch_status.id)
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=None, expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='in_progress', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=4, succeeded=0), results_url=None, type='message_batch')
Status: in_progress
BetaMessageBatch(id='msgbatch_011FAkvqkL8pEskdyS3xdmNW', cancel_initiated_at=None, created_at=datetime.datetime(2024, 10, 8, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), ended_at=datetime.datetime(2024, 10, 8, 0, 24, 27, 768229, tzinfo=TzInfo(UTC)), expires_at=datetime.datetime(2024, 10, 9, 0, 23, 58, 507550, tzinfo=datetime.timezone.utc), processing_status='ended', request_counts=RequestCounts(canceled=0, errored=0, expired=0, processing=0, succeeded=4), results_url='https://api.anthropic.com/v1/messages/batches/msgbatch_011FAkvqkL8pEskdyS3xdmNW/results', type='message_batch')
Status: ended

Batch msgbatch_011FAkvqkL8pEskdyS3xdmNW Summary:
Status: ended
Created: 2024-10-08 00:23:58.507550+00:00
Ended: 2024-10-08 00:24:27.768229+00:00
Expires: 2024-10-09 00:23:58.507550+00:00

Individual Results:

Result for simple-question:
Status: succeeded
Content: 量子计算是一种先进的计算形式,它利用量子力学的原理来处理信息。与使用比特(0和1)来存储和处理数据的经典计算机不同...

Result for image-analysis:
Status: succeeded
Content: 这张图片捕捉了日落时分令人惊叹的山景。太阳是一个明亮的圆盘,刚刚落在远处的山脉后面,将温暖的金色光芒投射到整个场景中...

Result for system-prompt:
Status: succeeded
Content: 当然!以下是我如何向5岁的孩子解释重力:

重力就像地球给它上面所有东西的一个巨大的无形拥抱。它使我们能够固定在地面上,而不是漂浮起来...

Result for multi-turn:
Status: succeeded
Content: DNA复制是细胞分裂过程中DNA自我复制的过程。以下是其工作原理的基本概述:

1. 解旋:DNA的双螺旋结构解开,两条链...