GPT 操作 - Snowflake 中间件

简介

本页面为开发者构建特定应用程序的 GPT 操作提供说明和指南。在继续之前,请确保您已熟悉以下信息:

本指南详细介绍了如何将 ChatGPT 与 Snowflake 数据仓库连接,以便将 SQL 查询返回给 ChatGPT 以便与数据分析一起使用。GPT 需要一个与中间件(例如 Azure 函数)交互的操作,以便操作能够正确地将 Snowflake 的响应格式化,以供 Python notebook 环境使用。数据必须作为文件返回,因此中间件函数应将 SQL 响应转换为 CSV/Excel 文件,大小不超过 10MB。

本文档将概述中间件函数 GPT 操作。有关设置中间件函数本身,请参阅GPT 操作库(中间件)- Azure Functions。您可以将此 Snowflake 中间件操作与直接连接到 Snowflake 的操作结合使用,以启用一个 GPT,该 GPT 可以在执行查询之前形成和测试 SQL 查询。

价值 + 示例业务用例

现有的 Snowflake 客户可以利用这些指南从他们的数据仓库中查询数据,并将这些数据加载到数据分析 Python 环境中以获得更深入的见解。这使得 ChatGPT 驱动的分析成为可能,例如可视化数据集、识别模式/异常或识别数据清理的差距。此 GPT 可用于驱动相对较小数据集的业务决策,或通过 AI 探索数据子集以在您在 BI 工具中探索整体数据集时生成假设,从而节省时间和金钱,同时识别以前未见的模式。

应用程序信息

应用程序关键链接

在开始之前,请查看 Snowflake 和 Azure 的这些链接:

Snowflake 操作

Azure 函数

应用程序先决条件

在开始之前,请确保您已在应用程序环境中完成以下步骤:

  • 预配 Snowflake 数据仓库
  • 确保通过 ChatGPT 身份验证到 Snowflake 的用户具有对数据库、模式和表的访问权限以及必要的角色

此外,在 Azure Function App 中创建应用程序之前,您需要一种方法来处理用户身份验证。您需要设置一个 Azure Entra ID 中的 OAuth 应用注册,该注册可以与 Snowflake 外部 OAuth 安全集成相关联。Snowflake 的外部 OAuth 安全集成允许外部系统颁发 Snowflake 可以用来确定访问级别的访问令牌。在这种情况下,该外部令牌提供商是 Azure Entra ID。由于 ChatGPT 将连接到 Azure 而不是 Snowflake,因此 GPT 用户的 OAuth 令牌将由 Azure 与其在 Entra ID 中的用户相关联。因此,您需要一种方法将 Snowflake 中的用户映射到其在 Azure 中的相应用户。

下面列出了 Azure 端和 Snowflake 端的所有必要步骤。

在 Azure Entra ID 中配置 OAuth 资源

我们将设置一个新的应用注册,在 Azure 中配置必要的 Snowflake 范围,并检索将在 Snowflake 和 ChatGPT 中使用的所有 OAuth 配置参数。本节将在 Azure 中进行,以便在下一节中,您将拥有将此应用注册链接到 Snowflake 端进行配置的必要信息。

  1. 导航到Microsoft Azure 门户并进行身份验证。
  2. 导航到 Azure Entra ID(以前称为 Active Directory)。
  3. 在“管理”下单击应用注册
  4. 单击新建注册
  5. 输入 Snowflake GPT OAuth Client 或类似值作为名称
  6. 验证“支持的帐户类型”是否设置为“单个租户”。
  7. 暂时忽略重定向 URI。配置 GPT 时,您将返回到此处。
  8. 单击注册
  9. 在“必备信息”下记下目录(租户)IDTENANT_ID)。您将使用它来生成 AZURE_AD_ISSUERAZURE_AD_JWS_KEY_ENDPOINT
    • AZURE_AD_ISSUERhttps://sts.windows.net/TENANT_ID/
    • AZURE_AD_JWS_KEY_ENDPOINThttps://login.microsoftonline.com/TENANT_ID/discovery/v2.0/keys
  10. 单击“概述”界面中的终结点
  11. 在右侧,记下“OAuth 2.0 授权终结点 (v2)”作为 AZURE_AD_OAUTH_AUTHORIZATION_ENDPOINT 和“OAuth 2.0 令牌终结点 (v2)”作为 AZURE_AD_OAUTH_TOKEN_ENDPOINT
    • 终结点应类似于 https://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/authorizationhttps://login.microsoftonline.com/90288a9b-97df-4c6d-b025-95713f21cef9/oauth2/v2.0/token
  12. 在“管理”下单击公开 API
  13. 单击“应用程序 ID URI”旁边的设置链接以设置 Application ID URI
    • Application ID URI 在您的组织目录中必须是唯一的,例如 https://your.company.com/4d2a8c2b-a5f4-4b86-93ca-294185f45f2e。在后续配置步骤中,此值将被称为 <SNOWFLAKE_APPLICATION_ID_URI>
  14. 要将 Snowflake 角色添加为 OAuth 范围,以用于程序化客户端代表用户执行操作的 OAuth 流,请单击添加范围以添加代表 Snowflake 角色的范围。
    • 输入范围,名称为 Snowflake 角色的名称,并带有 session:scope: 前缀。例如,对于 Snowflake Analyst 角色,请输入 session:scope:analyst
    • 选择谁可以同意。
    • 为范围输入显示名称(例如:Account Admin)。
    • 输入范围的说明(例如:可以管理 Snowflake 帐户)。
    • 单击添加范围
    • 将范围保存为 AZURE_AD_SCOPE。它应该是您的 Application ID URI 和您的 Scope name 的串联。
  15. 在“概述”部分,从“应用程序(客户端)ID”字段复制 ClientID。在后续步骤中,这将被称为 OAUTH_CLIENT_ID
  16. 单击证书和密钥,然后单击新建客户端密钥
  17. 添加密钥的描述。
  18. 选择“730 天(24 个月)”。出于测试目的,请选择不会很快过期的密钥。
  19. 单击添加。复制密钥。在后续步骤中,这将被称为 OAUTH_CLIENT_SECRET
  20. 对于将代表用户请求访问令牌的程序化客户端,请按如下方式配置应用程序的委派权限。
    • 单击API 权限
    • 单击添加权限
    • 单击我的 API
    • 单击您在在 Azure AD 中配置 OAuth 资源中创建的Snowflake OAuth 资源
    • 单击委派的权限框。
    • 选中与您希望授予此客户端的范围相关的权限。
    • 单击添加权限
    • 单击授予管理员同意按钮以将权限授予客户端。请注意,出于测试目的,权限配置如下。但是,在生产环境中,不建议以这种方式授予权限。
    • 单击

在 Snowflake 中创建安全集成

在 Azure Entra ID 中完成应用注册后,下一步是通过外部 OAuth 安全集成将该应用注册链接到 Snowflake。安全集成的 external_oauth_audience_list 参数必须与您在配置 Azure Entra ID 时指定的应用程序 ID URI 匹配。

颁发者JWS 密钥端点也将来自上一步收集的值。“用户映射声明”可以设置为 EMAIL_ADDRESSLOGIN_NAME,这是 Microsoft 登录凭据如何映射到 Snowflake 中的用户,以确保 Snowflake 中的权限由颁发给 ChatGPT 的访问令牌强制执行。

CREATE OR REPLACE SECURITY INTEGRATION AZURE_OAUTH_INTEGRATION
  TYPE = EXTERNAL_OAUTH
  ENABLED = TRUE
  EXTERNAL_OAUTH_TYPE = 'AZURE'
  EXTERNAL_OAUTH_ISSUER = '<AZURE_AD_ISSUER>'
  EXTERNAL_OAUTH_JWS_KEYS_URL = '<AZURE_AD_JWS_KEY_ENDPOINT>'
  EXTERNAL_OAUTH_AUDIENCE_LIST = ('<SNOWFLAKE_APPLICATION_ID_URI>')
  EXTERNAL_OAUTH_TOKEN_USER_MAPPING_CLAIM = 'upn'
  EXTERNAL_OAUTH_SNOWFLAKE_USER_MAPPING_ATTRIBUTE = 'EMAIL_ADDRESS';

中间件信息:

请确保您已完成 Azure 环境中的以下步骤:

  • Azure 门户或 VS Code,可访问创建 Azure Function Apps 和 Azure Entra 应用注册
  • 本指南的Azure Function App 部分详细介绍了部署和设计包装 Snowflake 响应以将查询结果作为 CSV 返回给 ChatGPT 所需的函数。Azure Function App 允许您的 GPT 摄取更大的数据集,因为 ChatGPT 可以从文件响应中摄取比应用程序/JSON 负载更多的数据。此外,这些数据集将仅通过格式化为 CSV 文件的响应提供给数据分析(也称为 Code Interpreter)。

Azure Function App

现在我们已经创建了 GPT 并处理了 Azure/Snowflake 身份验证,我们可以创建 Azure Function App 本身来执行 SQL 查询并处理响应格式,使 GPT 能够下载 CSV 格式的结果以供数据分析使用。

请遵循此Azure Cookbook 指南以获取有关部署 Azure Function App 的更多详细信息。下面您将找到要添加到函数中的示例代码。

此代码旨在具有指导性 - 虽然它可以直接运行,但您应该根据您的 GPT 和 IT 设置的具体需求对其进行自定义。

应用程序代码

您需要为您的 Azure Function App 设置以下流程:

  • 从 HTTP 请求中提取令牌并使用它连接到 Snowflake
  • 执行 SQL 查询并将结果写入 CSV
  • 将该 CSV 临时存储在 Blob 存储中*
  • 生成预签名 URL 以安全地访问该 CSV*
  • 响应 openaiFileResponse

*如果使用文件流选项而不是返回文件的URL选项,则可能不需要这些步骤。下面将对此进行更多介绍。

确保您已安装必要的库并将其导入到您的脚本中。除了 Python 标准库之外,此示例脚本还利用了以下库:

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, BlobSasPermissions, ContentSettings
import snowflake.connector
import jwt    # pyjwt 用于令牌解码

连接到 Snowflake

要连接到 Snowflake,您需要从授权标头中提取 Azure Entra ID 分配的访问令牌,并在连接到 Snowflake 服务器时使用该令牌。

在此示例中,Snowflake 用户名是电子邮件地址,这简化了从 HTTP 访问令牌中提取的 Entra ID 用户到连接所需的 Snowflake 用户 ID 的映射。如果您的组织不是这种情况,您可以在 Python 应用程序中将电子邮件地址映射到 Snowflake 用户 ID。

我的应用程序是为与单个 Snowflake 帐户(例如 ab12345.eastus2.azure)和仓库接口而构建的。如果您需要访问多个帐户或仓库,可以考虑在 GPT 操作参数中传递这些参数,以便您可以从 HTTP 请求中提取它们。

# 从授权标头中提取令牌
auth_header = req.headers.get('Authorization')
token_type, token = auth_header.split()

try:
    # 从令牌中提取电子邮件地址以用于 Snowflake 用户映射
    # 如果 Snowflake 用户名不是电子邮件,则相应地识别用户名
    decoded_token = jwt.decode(token, options={"verify_signature": False})
    email = decoded_token.get('upn') 

    conn = snowflake.connector.connect(
        user=email, # Snowflake 用户名,例如,我示例中的用户电子邮件
        account=SNOWFLAKE_ACCOUNT, # Snowflake 帐户,例如,ab12345.eastus2.azure
        authenticator="oauth",
        token=token,
        warehouse=SNOWFLAKE_WAREHOUSE # 替换为 Snowflake 仓库
    )
    logging.info("成功连接到 Snowflake。")
except Exception as e:
    logging.error(f"连接到 Snowflake 失败:{e}")

执行查询并保存 CSV

连接到 Snowflake 后,您需要执行查询并将结果存储在 CSV 中。虽然 Snowflake 中的角色应防止任何有害查询的可能性,但您可能希望在应用程序中(如下未包含)像处理任何其他程序化 SQL 查询执行一样对查询进行清理。

# 从请求参数或正文中提取 SQL 查询
sql_query = req.params.get('sql_query')

try:
    # 使用指定的仓库
    cursor = conn.cursor()

    # 执行查询
    cursor.execute(sql_query)
    results = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]
    logger.info(f"查询成功执行:{sql_query}")

    # 将结果转换为 CSV
    csv_file_path = write_results_to_csv(results, column_names)
except Exception as e:
    logger.error(f"执行查询或处理数据时出错:{e}")


def write_results_to_csv(results, column_names):
    try:
        # 创建临时文件
        temp_file = tempfile.NamedTemporaryFile(delete=False, mode='w', newline='')
        csv_writer = csv.writer(temp_file)
        csv_writer.writerow(column_names)  # 写入列标题
        csv_writer.writerows(results)      # 写入数据行
        temp_file.close()  # 关闭文件以刷新内容
        return temp_file.name  # 返回文件路径
    except Exception as e:
        logger.error(f"写入 CSV 结果时出错:{e}")

将文件存储在 Blob 存储中

有两种方法可以将文件返回给 ChatGPT 进行处理。您可以将 base64 编码的数据以及 mimeType 和文件名一起在 openaiFileResponse 列表响应中流式传输,或者您可以返回URL 列表。在此解决方案中,我们将重点关注后者。

为此,您需要将 CSV 上传到 Azure Blob 存储,并返回一个预签名 URL,以便在 ChatGPT 中安全地访问该文件。需要注意的是,要在 ChatGPT 中下载 URL,您需要确保该 URL 包含 content_type 和 content_disposition,如下面的示例所示。如果您想检查 URL 是否具有必要的标头,可以从任何终端使用 curl -I <url>

您需要获取 Azure 存储桶的连接字符串,请参阅此处的说明。

def upload_csv_to_azure(file_path, container_name, blob_name, connect_str):
    try:
        # 创建 BlobServiceClient 对象,用于创建容器客户端
        blob_service_client = BlobServiceClient.from_connection_string(connect_str)

        # 使用本地文件名作为 Blob 名称创建 blob 客户端
        blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

        # 上传具有指定内容设置的文件
        with open(file_path, "rb") as data:
            blob_client.upload_blob(data, overwrite=True, content_settings=ContentSettings(
                content_type='text/csv',
                content_disposition=f'attachment; filename="{blob_name}"'
            ))
        logger.info(f"成功将 {file_path} 上传到 {container_name}/{blob_name}")

        # 为 Blob 生成 SAS 令牌
        sas_token = generate_blob_sas(
            account_name=blob_service_client.account_name,
            container_name=container_name,
            blob_name=blob_name,
            account_key=blob_service_client.credential.account_key,
            permission=BlobSasPermissions(read=True),
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=1)  # 令牌有效期为 1 小时
        )

        # 使用 SAS 令牌生成预签名 URL
        url = f"https://{blob_service_client.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}"
        logger.info(f"生成的预签名 URL:{url}")

        return url
    except Exception as e:
        logger.error(f"上传文件到 Azure Blob 存储时出错:{e}")
        raise

格式化 openaiFileResponse

最后,您需要相应地格式化响应,以指示 ChatGPT 将该响应作为文件或一系列文件进行处理。openaiFileResponse 是一个列表,最多可以包含 10 个 URL(如果使用内联选项,则为 base64 编码)。

# 格式化响应,以便 ChatGPT 将其视为文件
response = {
    'openaiFileResponse': [csv_url]
}
cursor.close()
conn.close()
return func.HttpResponse(
    json.dumps(response), 
    status_code=200
)

此应用程序涉及许多活动部件,因此测试 Azure Function App 非常重要。鉴于请求和响应有时可能比调试所需的更不透明,因此 ChatGPT 可能是一个难以测试的平台。通过 cURL 或 Postman 调用 HTTP 请求,在更受控的环境中进行初始测试,将使您能够更轻松地调试和分类问题。一旦确定在这些工具中按预期返回了响应,您就可以构建 GPT 了。

ChatGPT 步骤

自定义 GPT 说明

创建自定义 GPT 后,请在“说明”面板中使用以下文本作为灵感。有疑问吗?请查看入门示例,了解此步骤的详细信息。

示例说明

让 ChatGPT 理解您的表架构以正确形成 SQL 查询非常重要。有不同的方法可以做到这一点,本说明集代表了最直接的方法。我们正在努力发布适用于不同 Snowflake GPT 版本的其他说明,以允许处理多个不同的表、模式和数据库,甚至动态学习随时间变化的模式。

以下是处理单个模式和表的几条基本说明。此 GPT 已针对单一用例(分析 2013 年 1 月纽约的航班数据)进行了优化,这使得最简单的说明能够提供最可靠的 GPT 性能。

您是编写 SQL 查询以从 Snowflake 获取数据的专家。您帮助用户将提示转换为 SQL 查询。任何关于航班数据的问题都将转换为 Snowflake SQL 查询,该查询将命中表 FLIGHTS.PUBLIC.JAN_2013_NYC。将任何查询传递到“sql_query”参数

表架构包括

ID  NUMBER  每个航班的唯一标识符
YEAR    NUMBER  航班年份
MONTH   NUMBER  航班月份
DAY     NUMBER  航班起飞的月份中的日期
DEP_TIME    NUMBER  航班的实际起飞时间
SCHED_DEP_TIME  NUMBER  航班的计划起飞时间
DEP_DELAY   NUMBER  出发延误(分钟)(负值表示提前出发)
ARR_TIME    NUMBER  航班的实际到达时间
SCHED_ARR_TIME  NUMBER  航班的计划到达时间
ARR_DELAY   NUMBER  到达延误(分钟)(负值表示提前到达)
CARRIER_CODE    TEXT    航空公司的承运人代码
FLIGHT  NUMBER  航班号
TAILNUM TEXT    飞机的尾号
ORIGIN_AIRPORT_CODE TEXT    出发机场代码
DEST_AIRPORT_CODE   TEXT    目的地机场代码
AIR_TIME    NUMBER  航班的空中飞行时间(分钟)
DISTANCE    NUMBER  航班的飞行距离(英里)
HOUR    NUMBER  计划起飞时间的时部分
MINUTE  NUMBER  计划起飞时间的分钟部分
TIME_HOUR   NUMBER  航班起飞的时间(四舍五入到最近的小时)
CARRIER_NAME    TEXT    航空公司承运人的全称
ORIGIN_AIRPORT_NAME TEXT    出发机场的全称
ORIGIN_REGION   TEXT    出发机场的地区代码
ORIGIN_MUNICIPALITY TEXT    出发机场所在的城市
ORIGIN_COORDINATES  TEXT    出发机场的地理坐标
DEST_AIRPORT_NAME   TEXT    目的地机场的全称
DEST_REGION TEXT    目的地机场的地区代码
DEST_MUNICIPALITY   TEXT    目的地机场所在的城市
DEST_COORDINATES    TEXT    目的地机场的地理坐标

当用户询问有关航班的数据时,请执行以下操作:

  1. 使用 executeSQL 操作向 Azure 函数终结点发送 POST 请求
  2. 接收作为操作响应返回的文件。将其显示为电子表格
  3. 对文件进行分析,并提供用户所请求的必要信息

用户希望在代码解释器中询问有关数据的问题,因此请使用代码解释器来获取从数据集中提取的任何数据分析见解。

OpenAPI 架构

创建自定义 GPT 后,请将以下文本复制到“操作”面板中,用您的特定函数详细信息替换占位符值,并根据您在 Azure Function App 中构建的任何其他输入更新参数。

有疑问吗?请查看入门示例,了解此步骤的详细信息。

openapi: 3.1.0
info:
  title: Snowflake GPT API
  description: API to execute SQL queries on Snowflake and get the results as a CSV file URL.
  version: 1.0.0
servers:

  - url: https://<server-name>.azurewebsites.net
    description: Azure Function App server running Snowflake integration application
paths:
  /api/<function_name>?code=<code>:
    post:
      operationId: executeSQL
      summary: Executes a SQL query on Snowflake and returns the result file URL as a CSV.
      requestBody:
        required: true
        content:
          application/json:
            schema:
              type: object
              properties:
                sql_query:
                  type: string
                  description: The SQL query to be executed on Snowflake.
              required:

                - sql_query
      responses:
        '200':
          description: Successfully executed the query.
          content:
            application/json:
              schema:
                type: object
                properties:
                  openaiFileResponse:
                    type: array
                    items:
                      type: string
                      format: uri
                    description: Array of URLs pointing to the result files.
        '401':
          description: Unauthorized. Missing or invalid authentication token.
        '400':
          description: Bad Request. The request was invalid or cannot be otherwise served.
        '500':
          description: Internal Server Error. An error occurred on the server.
components:
  schemas: {} 

常见问题解答和故障排除

  • 返回给 ChatGPT 的文件大小限制为 10MB。如果返回的文件过大,您的请求可能会失败。如果遇到这些限制,请确保在 SQL 命令中包含 LIMIT。
  • 为什么首先需要 Azure Function App? ChatGPT 的数据分析功能(也称为 Code Interpreter)依赖于一个独立于模型上下文窗口的安全 Python 环境。今天必须通过上传文件来传递给数据分析的数据。返回数据的 GPT 操作随后必须将该数据作为 CSV 或其他数据文件类型返回。为了通过 GPT 操作返回文件,响应必须包装在 openaiFileResponse 对象中。这需要自定义代码来正确格式化响应。
  • 我的公司使用不同于 Azure 的云提供商。 要通过 GPT 操作将其他中间件函数连接到 ChatGPT,请参阅其他AWSGCP 中间件食谱。您可以使用此食谱中讨论的概念来指导构建中间件应用程序的注意事项,但将该中间件连接到 Snowflake 可能因云提供商而异。例如,Snowflake 构建了一个专门用于与 Azure Entra ID 链接的外部 OAuth 集成。
  • 如何限制我的 GPT 可以访问的数据集? 限制 ChatGPT 在 Snowflake 中的访问范围可能很重要。有几种方法可以做到这一点:

    • Snowflake 角色可以限制谁可以访问哪些表,并且将由 Azure Entra ID 提供的 GPT 用户的访问令牌强制执行。
    • 在中间件函数中,您可以添加健全性检查以验证访问的表是否已由该应用程序批准。
    • 您可能希望专门为与 ChatGPT 集成生成新的数据库/仓库,该数据库/仓库已清除敏感信息(如 PII)。
  • 架构调用了错误的仓库或数据集: 如果 ChatGPT 调用了错误的仓库或数据库,请考虑更新您的说明,使其更明确地(a)应调用哪个仓库/数据库或(b)要求用户在运行查询之前提供这些确切的详细信息。

您希望我们优先处理哪些集成?我们的集成是否存在错误?在我们的 GitHub 上提交 PR 或 issue,我们将进行查看。