如何使用函数自动化任务(S3 存储桶示例)

此代码演示了如何与 ChatGPT 函数进行交互,以执行与 Amazon S3 存储桶相关的任务。该笔记本涵盖了 S3 存储桶的关键功能,例如运行简单的列表命令、在所有存储桶中搜索特定文件、将文件上传到存储桶以及从存储桶下载文件。OpenAI Chat API 可理解用户指令,生成自然语言响应,并根据用户输入提取适当的函数调用。

要求: 运行笔记本需要生成具有 S3 存储桶写入权限的 AWS 访问密钥,并将它们与 Openai 密钥一起存储在本地环境变量文件中。.env 文件格式:

AWS_ACCESS_KEY_ID=<your-key>
AWS_SECRET_ACCESS_KEY=<your-key>
OPENAI_API_KEY=<your-key>
! pip install openai
! pip install boto3
! pip install tenacity
! pip install python-dotenv
from openai import OpenAI
import json
import boto3
import os
import datetime
from urllib.request import urlretrieve

# 加载环境变量
from dotenv import load_dotenv
load_dotenv()
True

初始化

OpenAI.api_key = os.environ.get("OPENAI_API_KEY")
GPT_MODEL = "gpt-3.5-turbo"
# 可选 - 如果加载环境变量时遇到问题,可以使用以下代码设置 AWS 值
# os.environ['AWS_ACCESS_KEY_ID'] = ''
# os.environ['AWS_SECRET_ACCESS_KEY'] = ''

# 创建 S3 客户端
s3_client = boto3.client('s3')

# 创建 openai 客户端
client = OpenAI()

实用工具

为了将用户的问题或命令与适当的函数关联起来,我们需要向 ChatGPT 提供必要的函数详细信息和预期的参数。

# 传递 S3 操作详细信息的函数字典,供 GPT 模型使用
functions = [
    {
        "type": "function",
        "function":{
            "name": "list_buckets",
            "description": "列出所有可用的 S3 存储桶",
            "parameters": {
                "type": "object",
                "properties": {}
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "list_objects",
            "description": "列出给定 S3 存储桶中的对象或文件",
            "parameters": {
                "type": "object",
                "properties": {
                    "bucket": {"type": "string", "description": "S3 存储桶的名称"},
                    "prefix": {"type": "string", "description": "S3 存储桶中的文件夹路径"},
                },
                "required": ["bucket"],
            },
        }
    },
    {
        "type": "function",
        "function":{
            "name": "download_file",
            "description": "将特定文件从 S3 存储桶下载到本地分发文件夹。",
            "parameters": {
                "type": "object",
                "properties": {
                    "bucket": {"type": "string", "description": "S3 存储桶的名称"},
                    "key": {"type": "string", "description": "存储桶中文件的路径"},
                    "directory": {"type": "string", "description": "下载文件的本地目标目录,应由用户指定。"},
                },
                "required": ["bucket", "key", "directory"],
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "upload_file",
            "description": "将文件上传到 S3 存储桶",
            "parameters": {
                "type": "object",
                "properties": {
                    "source": {"type": "string", "description": "本地源路径或远程 URL"},
                    "bucket": {"type": "string", "description": "S3 存储桶的名称"},
                    "key": {"type": "string", "description": "存储桶中文件的路径"},
                    "is_remote_url": {"type": "boolean", "description": "提供的源是 URL(True)还是本地路径(False)"},
                },
                "required": ["source", "bucket", "key", "is_remote_url"],
            }
        }
    },
    {
        "type": "function",
        "function":{
            "name": "search_s3_objects",
            "description": "在 S3 存储桶中搜索特定文件名",
            "parameters": {
                "type": "object",
                "properties": {
                    "search_name": {"type": "string", "description": "您要搜索的文件名"},
                    "bucket": {"type": "string", "description": "S3 存储桶的名称"},
                    "prefix": {"type": "string", "description": "S3 存储桶中的文件夹路径"},
                    "exact_match": {"type": "boolean", "description": "如果搜索应精确匹配文件名,则将 exact_match 设置为 True。将 exact_match 设置为 False 以比较部分文件名字符串(文件包含)"}
                },
                "required": ["search_name"],
            },
        }
    }
]

创建辅助函数以与 S3 服务进行交互,例如列出存储桶、列出对象、下载和上传文件以及搜索特定文件。

def datetime_converter(obj):
    if isinstance(obj, datetime.datetime):
        return obj.isoformat()
    raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
def list_buckets():
    response = s3_client.list_buckets()
    return json.dumps(response['Buckets'], default=datetime_converter)

def list_objects(bucket, prefix=''):
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix)
    return json.dumps(response.get('Contents', []), default=datetime_converter)

def download_file(bucket, key, directory):

    filename = os.path.basename(key)

    # 解析目标到正确的文件路径
    destination = os.path.join(directory, filename)

    s3_client.download_file(bucket, key, destination)
    return json.dumps({"status": "success", "bucket": bucket, "key": key, "destination": destination})

def upload_file(source, bucket, key, is_remote_url=False):
    if is_remote_url:
        file_name = os.path.basename(source)
        urlretrieve(source, file_name)
        source = file_name

    s3_client.upload_file(source, bucket, key)
    return json.dumps({"status": "success", "source": source, "bucket": bucket, "key": key})

def search_s3_objects(search_name, bucket=None, prefix='', exact_match=True):
    search_name = search_name.lower()

    if bucket is None:
        buckets_response = json.loads(list_buckets())
        buckets = [bucket_info["Name"] for bucket_info in buckets_response]
    else:
        buckets = [bucket]

    results = []

    for bucket_name in buckets:
        objects_response = json.loads(list_objects(bucket_name, prefix))
        if exact_match:
            bucket_results = [obj for obj in objects_response if search_name == obj['Key'].lower()]
        else:
            bucket_results = [obj for obj in objects_response if search_name in obj['Key'].lower()]

        if bucket_results:
            results.extend([{"Bucket": bucket_name, "Object": obj} for obj in bucket_results])

    return json.dumps(results)

下面的字典将名称与函数关联起来,以便根据 ChatGPT 的响应进行执行。

available_functions = {
    "list_buckets": list_buckets,
    "list_objects": list_objects,
    "download_file": download_file,
    "upload_file": upload_file,
    "search_s3_objects": search_s3_objects
}

ChatGPT

def chat_completion_request(messages, functions=None, function_call='auto',
                            model_name=GPT_MODEL):

    if functions is not None:
        return client.chat.completions.create(
            model=model_name,
            messages=messages,
            tools=functions,
            tool_choice=function_call)
    else:
        return client.chat.completions.create(
            model=model_name,
            messages=messages)

对话流程

创建一个主函数供聊天机器人使用,该函数接收用户输入,将其发送到 OpenAI Chat API,接收响应,执行 API 生成的任何函数调用,并将最终响应返回给用户。

def run_conversation(user_input, topic="S3 bucket functions.", is_log=False):

    system_message=f"Don't make assumptions about what values to plug into functions. Ask for clarification if a user request is ambiguous. If the user ask question not related to {topic} response your scope is {topic} only."

    messages = [{"role": "system", "content": system_message},
                {"role": "user", "content": user_input}]

    # 调用模型获取响应
    response = chat_completion_request(messages, functions=functions)
    response_message = response.choices[0].message

    if is_log:
        print(response.choices)

    # 检查 GPT 是否想要调用函数
    if response_message.tool_calls:
        function_name = response_message.tool_calls[0].function.name
        function_args = json.loads(response_message.tool_calls[0].function.arguments)

        # 调用函数
        function_response = available_functions[function_name](**function_args)

        # 将响应添加到对话中
        messages.append(response_message)
        messages.append({
            "role": "tool",
            "content": function_response,
            "tool_call_id": response_message.tool_calls[0].id,
        })

        # 再次调用模型以总结结果
        second_response = chat_completion_request(messages)
        final_message = second_response.choices[0].message.content
    else:
        final_message = response_message.content

    return final_message

S3 存储桶机器人测试

在以下示例中,请确保在执行前将 <file_name><bucket_name><directory_path> 等占位符替换为您的具体值。

列出和搜索

让我们开始列出所有可用的存储桶。

print(run_conversation('list my S3 buckets'))

您可以要求助手在所有存储桶或特定存储桶中搜索特定文件名。

search_file = '<file_name>'
print(run_conversation(f'search for a file {search_file} in all buckets'))
search_word = '<file_name_part>'
bucket_name = '<bucket_name>'
print(run_conversation(f'search for a file contains {search_word} in {bucket_name}'))

模型应在参数值不明确时澄清用户的要求,如系统消息中所述。

print(run_conversation('search for a file'))
Sure, to help me find what you're looking for, could you please provide the name of the file you want to search for and the name of the S3 bucket? Also, should the search match the file name exactly, or should it also consider partial matches?

验证边缘情况

我们还指示模型拒绝不相关的任务。让我们试一试,看看它是如何工作的。

# 模型不应回答与范围无关的细节
print(run_conversation('what is the weather today'))
Apologies for the misunderstanding, but I am only able to assist with S3 bucket functions. Can you please ask a question related to S3 bucket functions?

提供的函数不仅限于检索信息。它们还可以帮助用户上传或下载文件。

下载文件

search_file = '<file_name>'
bucket_name = '<bucket_name>'
local_directory = '<directory_path>'
print(run_conversation(f'download {search_file} from {bucket_name} bucket to {local_directory} directory'))

上传文件

local_file = '<file_name>'
bucket_name = '<bucket_name>'
print(run_conversation(f'upload {local_file} to {bucket_name} bucket'))