哲学与向量嵌入、OpenAI 和 Cassandra / Astra DB

CQL 版本

在本快速入门教程中,您将学习如何使用 OpenAI 的向量嵌入和 Apache Cassandra®(或等效的 DataStax 通过 CQL 的 Astra DB)作为数据持久化的向量存储来构建一个“哲学名言查找器和生成器”。

本笔记本的基本工作流程概述如下。您将评估并存储大量名言的向量嵌入,利用它们构建一个强大的搜索引擎,然后甚至可以生成新的名言!

本笔记本例举了一些向量搜索的标准用法模式——同时展示了如何轻松开始使用 Cassandra / 通过 CQL 的 Astra DB 的向量功能。

有关使用向量搜索和文本嵌入构建问答系统的背景知识,请参阅这篇出色的实践笔记本:使用嵌入进行问答

选择您的框架

请注意,本笔记本使用 Cassandra 驱动程序 并直接运行 CQL(Cassandra 查询语言)语句,但我们也涵盖了完成相同任务的其他技术选择。请查看此文件夹的 README 以了解其他选项。本笔记本可以作为 Colab 笔记本或常规 Jupyter 笔记本运行。

目录:

  • 设置
  • 获取数据库连接
  • 连接到 OpenAI
  • 将名言加载到向量存储中
  • 用例 1:名言搜索引擎
  • 用例 2:名言生成器
  • (可选)利用向量存储中的分区

工作原理

索引

每条名言都通过 OpenAI 的 Embedding 转换为嵌入向量。这些向量将保存在向量存储中,供以后搜索使用。一些元数据,包括作者姓名和一些其他预先计算的标签,将与向量一起存储,以允许搜索定制。

1_vector_indexing_cql

搜索

要查找与提供的搜索名言相似的名言,后者将被即时转换为嵌入向量,然后该向量将用于查询存储以查找相似向量……即,先前索引的相似名言。搜索可以选择性地受其他元数据约束(“找到一些斯宾诺莎的名言,与这句相似……”)。

2_vector_search_cql

关键点在于,“内容相似的名言”在向量空间中转化为彼此距离接近的向量:因此,向量相似性搜索有效地实现了语义相似性。这是向量嵌入如此强大的关键原因。

下图试图传达这个想法。每条名言一旦被制成向量,就成为空间中的一个点。在这个例子中,它在一个球体上,因为 OpenAI 的嵌入向量(像大多数其他向量一样)被归一化为_单位长度_。哦,而且这个球体实际上不是三维的,而是 1536 维的!

所以,本质上,向量空间中的相似性搜索会返回最接近查询向量的向量:

3_vector_space

设置

安装并导入必要的依赖项:

!pip install --quiet "cassandra-driver>=0.28.0" "openai>=1.0.0" datasets
import os
from uuid import uuid4
from getpass import getpass
from collections import Counter

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

import openai
from datasets import load_dataset

不用太在意下一个单元格,我们需要它来检测 Colab 并让您上传 SCB 文件(见下文):

try:
    from google.colab import files
    IS_COLAB = True
except ModuleNotFoundError:
    IS_COLAB = False

获取数据库连接

创建 Session 对象(连接到您的 Astra DB 实例)需要一些秘密信息。

(注意:在 Google Colab 和本地 Jupyter 上,一些步骤会略有不同,这就是为什么笔记本会检测运行时类型。)

# 您数据库的安全连接包 zip 文件是必需的:
if IS_COLAB:
    print('请上传您的安全连接包 zip 文件: ')
    uploaded = files.upload()
    if uploaded:
        astraBundleFileTitle = list(uploaded.keys())[0]
        ASTRA_DB_SECURE_BUNDLE_PATH = os.path.join(os.getcwd(), astraBundleFileTitle)
    else:
        raise ValueError(
            '没有安全连接包无法继续。请重新运行该单元格。'
        )
else:
    # 您正在运行本地 Jupyter 笔记本:
    ASTRA_DB_SECURE_BUNDLE_PATH = input("请输入您的数据库令牌(“AstraCS:...”字符串)的完整路径: ")

ASTRA_DB_APPLICATION_TOKEN = getpass("请输入您的数据库令牌(“AstraCS:...”字符串): ")
ASTRA_DB_KEYSPACE = input("请输入您的数据库的 Keyspace 名称: ")
请输入您的数据库令牌(“AstraCS:...”字符串)的完整路径:  /path/to/secure-connect-DatabaseName.zip
请输入您的数据库令牌(“AstraCS:...”字符串):  ········
请输入您的数据库的 Keyspace 名称:  my_keyspace

创建数据库连接

这是创建 Astra DB 连接的方法:

(顺便说一句,您也可以使用任何提供向量功能的 Cassandra 集群,只需通过 更改参数 来实例化 Cluster 即可。)

# 不用太在意“正在降级协议...”消息之后的“正在关闭连接”错误,
# 它实际上只是一个警告:连接将顺利工作。
cluster = Cluster(
    cloud={
        "secure_connect_bundle": ASTRA_DB_SECURE_BUNDLE_PATH,
    },
    auth_provider=PlainTextAuthProvider(
        "token",
        ASTRA_DB_APPLICATION_TOKEN,
    ),
)

session = cluster.connect()
keyspace = ASTRA_DB_KEYSPACE

在 CQL 中创建向量表

您需要一个支持向量的表,并配备元数据。称之为“philosophers_cql”。

每行将存储:名言、其向量嵌入、作者姓名以及一组“标签”。您还需要一个主键来确保行的唯一性。

以下是创建表的完整 CQL 命令(请查看 此页面 以了解此及后续语句的 CQL 语法):

create_table_statement = f"""CREATE TABLE IF NOT EXISTS {keyspace}.philosophers_cql (
    quote_id UUID PRIMARY KEY,
    body TEXT,
    embedding_vector VECTOR<FLOAT, 1536>,
    author TEXT,
    tags SET<TEXT>
);"""

将此语句传递给您的数据库会话以执行它:

session.execute(create_table_statement)

添加用于 ANN 搜索的向量索引

为了在表中的向量上运行 ANN(近似最近邻)搜索,您需要为 embedding_vector 列创建一个特定的索引。

创建索引时,您可以选择性地选择 用于计算向量距离的“相似性函数”:由于 OpenAI 的单位长度向量(如大多数其他向量)的“余弦差”与“点积”相同,因此您将使用后者,它在计算上成本较低。

运行此 CQL 语句:

create_vector_index_statement = f"""CREATE CUSTOM INDEX IF NOT EXISTS idx_embedding_vector
    ON {keyspace}.philosophers_cql (embedding_vector)
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'
    WITH OPTIONS = {{'similarity_function' : 'dot_product'}};
"""
# 注意:双重 '{{' 和 '}}' 只是 '{' 和 '}' 的 F-string 转义序列

session.execute(create_vector_index_statement)

添加作者和标签过滤索引

这足以在表上运行向量搜索……但您希望能够选择性地指定作者和/或一些标签来限制名言搜索。创建另外两个索引来支持这一点:

create_author_index_statement = f"""CREATE CUSTOM INDEX IF NOT EXISTS idx_author
    ON {keyspace}.philosophers_cql (author)
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex';
"""
session.execute(create_author_index_statement)

create_tags_index_statement = f"""CREATE CUSTOM INDEX IF NOT EXISTS idx_tags
    ON {keyspace}.philosophers_cql (VALUES(tags))
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex';
"""
session.execute(create_tags_index_statement)

连接到 OpenAI

设置您的密钥

OPENAI_API_KEY = getpass("请输入您的 OpenAI API 密钥: ")
请输入您的 OpenAI API 密钥:  ········

嵌入的测试调用

快速检查如何获取输入文本列表的嵌入向量:

client = openai.OpenAI(api_key=OPENAI_API_KEY)
embedding_model_name = "text-embedding-3-small"

result = client.embeddings.create(
    input=[
        "This is a sentence",
        "A second sentence"
    ],
    model=embedding_model_name,
)

注意:以上是 OpenAI v1.0+ 的语法。如果使用早期版本,获取嵌入的代码将有所不同。

print(f"len(result.data)              = {len(result.data)}")
print(f"result.data[1].embedding      = {str(result.data[1].embedding)[:55]}...")
print(f"len(result.data[1].embedding) = {len(result.data[1].embedding)}")

将名言加载到向量存储中

获取包含名言的数据集。(我们改编并扩充了来自 此 Kaggle 数据集 的数据,已准备好在此演示中使用。)

philo_dataset = load_dataset("datastax/philosopher-quotes")["train"]

快速检查:

print("一个示例条目:")
print(philo_dataset[16])

检查数据集大小:

author_count = Counter(entry["author"] for entry in philo_dataset)
print(f"总计:{len(philo_dataset)} 条名言。按作者统计:")
for author, count in author_count.most_common():
    print(f"    {author:<20}: {count} 条名言")

将名言插入向量存储

您将为名言计算嵌入,并将它们与文本本身和计划供以后使用的元数据一起保存到向量存储中。

为了优化速度和减少调用次数,您将执行批量调用 OpenAI 嵌入服务。

数据库写入是通过 CQL 语句完成的。但由于您将运行此特定插入多次(尽管使用不同的值),因此最好_准备_该语句,然后一遍又一遍地运行它。

(注意:为了加快插入速度,Cassandra 驱动程序允许您进行并发插入,但在这里我们没有这样做,以使演示代码更简单。)

prepared_insertion = session.prepare(
    f"INSERT INTO {keyspace}.philosophers_cql (quote_id, author, body, embedding_vector, tags) VALUES (?, ?, ?, ?, ?);"
)

BATCH_SIZE = 20

num_batches = ((len(philo_dataset) + BATCH_SIZE - 1) // BATCH_SIZE)

quotes_list = philo_dataset["quote"]
authors_list = philo_dataset["author"]
tags_list = philo_dataset["tags"]

print("开始存储条目:")
for batch_i in range(num_batches):
    b_start = batch_i * BATCH_SIZE
    b_end = (batch_i + 1) * BATCH_SIZE
    # 为此批次计算嵌入向量
    b_emb_results = client.embeddings.create(
        input=quotes_list[b_start : b_end],
        model=embedding_model_name,
    )
    # 准备要插入的行
    print("B ", end="")
    for entry_idx, emb_result in zip(range(b_start, b_end), b_emb_results.data):
        if tags_list[entry_idx]:
            tags = {
                tag
                for tag in tags_list[entry_idx].split(";")
            }
        else:
            tags = set()
        author = authors_list[entry_idx]
        quote = quotes_list[entry_idx]
        quote_id = uuid4()  # 为每条名言生成一个新的随机 ID。在生产应用程序中,您需要更好地控制……
        session.execute(
            prepared_insertion,
            (quote_id, author, quote, emb_result.embedding, tags),
        )
        print("*", end="")
    print(f" 完成 ({len(b_emb_results.data)})")

print("\n已完成存储条目。")

用例 1:名言搜索引擎

对于名言搜索功能,您首先需要将输入名言转换为向量,然后使用它来查询存储(除了处理可选的元数据到搜索调用中)。

将搜索引擎功能封装到一个函数中以便于重用:

def find_quote_and_author(query_quote, n, author=None, tags=None):
    query_vector = client.embeddings.create(
        input=[query_quote],
        model=embedding_model_name,
    ).data[0].embedding
    # 根据传递的条件,语句中的 WHERE 子句可能会有所不同。
    where_clauses = []
    where_values = []
    if author:
        where_clauses += ["author = %s"]
        where_values += [author]
    if tags:
        for tag in tags:
            where_clauses += ["tags CONTAINS %s"]
            where_values += [tag]
    # 这两个列表的原因是,在运行 CQL 搜索语句时,传递的值
    # 必须与语句中的 "?" 标记序列匹配。
    if where_clauses:
        search_statement = f"""SELECT body, author FROM {keyspace}.philosophers_cql
            WHERE {' AND '.join(where_clauses)}
            ORDER BY embedding_vector ANN OF %s
            LIMIT %s;
        """
    else:
        search_statement = f"""SELECT body, author FROM {keyspace}.philosophers_cql
            ORDER BY embedding_vector ANN OF %s
            LIMIT %s;
        """
    # 为了获得最佳性能,您应该缓存准备好的语句(参见上面的插入代码)
    # 用于此处各种可能的语句。
    # (我们将它留给读者作为练习,以避免使此代码过长。
    # 记住:要准备一个语句,请使用 '?' 而不是 '%s'。)
    query_values = tuple(where_values + [query_vector] + [n])
    result_rows = session.execute(search_statement, query_values)
    return [
        (result_row.body, result_row.author)
        for result_row in result_rows
    ]

测试搜索

仅传递名言:

find_quote_and_author("We struggle all our life for nothing", 3)

按作者限制搜索:

find_quote_and_author("We struggle all our life for nothing", 2, author="nietzsche")

按标签限制搜索(从之前与名言一起保存的标签中):

find_quote_and_author("We struggle all our life for nothing", 2, tags=["politics"])

剔除不相关结果

向量相似性搜索通常会返回最接近查询的向量,即使这意味着结果可能有些不相关,如果没有任何更好的结果的话。

为了控制这个问题,您可以获取查询与每个结果之间的实际“相似性”,然后设置一个阈值,从而有效地丢弃超出该阈值的结果。 正确调整此阈值并非易事:在这里,我们只向您展示方法。

为了感受它是如何工作的,请尝试以下查询,并调整名言和阈值的选择以比较结果:

注意(对于有数学头脑的人):此值是向量之间_余弦差(即标量积除以两个向量范数的乘积)的_零到一的重缩放_。换句话说,对于方向相反的向量,此值为 0,对于平行向量,此值为 +1。有关其他相似性度量,请参阅文档——并请记住,为了获得有意义的、有序的结果,SELECT 查询中的度量应与之前创建索引时使用的度量相匹配。_

quote = "Animals are our equals."
# quote = "Be good."
# quote = "This teapot is strange."

similarity_threshold = 0.92

quote_vector = client.embeddings.create(
    input=[quote],
    model=embedding_model_name,
).data[0].embedding

# 再说一遍:请记住在生产环境中准备您的语句以获得更高的性能……

search_statement = f"""SELECT body, similarity_dot_product(embedding_vector, %s) as similarity
    FROM {keyspace}.philosophers_cql
    ORDER BY embedding_vector ANN OF %s
    LIMIT %s;
"""
query_values = (quote_vector, quote_vector, 8)

result_rows = session.execute(search_statement, query_values)
results = [
    (result_row.body, result_row.similarity)
    for result_row in result_rows
    if result_row.similarity >= similarity_threshold
]

print(f"{len(results)} 条名言在阈值内:")
for idx, (r_body, r_similarity) in enumerate(results):
    print(f"    {idx}. [similarity={r_similarity:.3f}] \"{r_body[:70]}...\"")

用例 2:名言生成器

对于此任务,您需要 OpenAI 的另一个组件,即 LLM,来为我们生成名言(基于通过查询向量存储获得的输入)。

您还需要一个模板来填充用于生成名言的 LLM 完成任务的提示。

completion_model_name = "gpt-3.5-turbo"

generation_prompt_template = """"生成一条关于给定主题的简短哲学名言,
其精神和形式要与提供的实际示例名言相似。
您的名言不要超过 20-30 个字。

参考主题:“{topic}”

实际示例:
{examples}
"""

与搜索一样,最好将此功能封装到一个方便的函数中(该函数内部使用搜索):

def generate_quote(topic, n=2, author=None, tags=None):
    quotes = find_quote_and_author(query_quote=topic, n=n, author=author, tags=tags)
    if quotes:
        prompt = generation_prompt_template.format(
            topic=topic,
            examples="\n".join(f"  - {quote[0]}" for quote in quotes),
        )
        # 一点日志记录:
        print("** 找到的名言:")
        for q, a in quotes:
            print(f"**    - {q} ({a})")
        print("** 日志记录结束")
        #
        response = client.chat.completions.create(
            model=completion_model_name,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.7,
            max_tokens=320,
        )
        return response.choices[0].message.content.replace('"', '').strip()
    else:
        print("** 未找到名言。")
        return None

注意:与嵌入计算一样,对于 OpenAI 1.0 之前的版本,Chat Completion API 的代码会略有不同。

测试名言生成

仅传递文本(一个“名言”,但实际上可以只建议一个主题,因为它的向量嵌入仍然会放在向量空间中的正确位置):

q_topic = generate_quote("politics and virtue")
print("\n一条新生成的名言:")
print(q_topic)

从一位哲学家的灵感来源:

q_topic = generate_quote("animals", author="schopenhauer")
print("\n一条新生成的名言:")
print(q_topic)

(可选)分区

在完成此快速入门之前,有一个有趣的主题需要探讨。虽然通常情况下,标签和名言可以有任何关系(例如,一条名言有多个标签),但_作者_实际上是一个精确的分组(它们定义了名言集上的“不相交分区”):每条名言只有一个作者(至少对我们来说是这样)。

现在,假设您预先知道您的应用程序通常(或总是)会针对_单个作者_运行查询。那么您可以充分利用底层数据库结构:如果您将名言分组到分区中(每个作者一个分区),那么针对单个作者的向量查询将消耗更少的资源并返回得更快。

我们在这里不深入细节,这些细节与 Cassandra 存储的内部结构有关:重要的信息是如果您的查询是在一个组内运行的,请考虑相应地进行分区以提高性能

您现在将看到这个选择在实践中的应用。

按作者进行分区需要新的表架构:创建一个名为“philosophers_cql_partitioned”的新表以及必要的索引:

create_table_p_statement = f"""CREATE TABLE IF NOT EXISTS {keyspace}.philosophers_cql_partitioned (
    author TEXT,
    quote_id UUID,
    body TEXT,
    embedding_vector VECTOR<FLOAT, 1536>,
    tags SET<TEXT>,
    PRIMARY KEY ( (author), quote_id )
) WITH CLUSTERING ORDER BY (quote_id ASC);"""

session.execute(create_table_p_statement)

create_vector_index_p_statement = f"""CREATE CUSTOM INDEX IF NOT EXISTS idx_embedding_vector_p
    ON {keyspace}.philosophers_cql_partitioned (embedding_vector)
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex'
    WITH OPTIONS = {{'similarity_function' : 'dot_product'}};
"""

session.execute(create_vector_index_p_statement)

create_tags_index_p_statement = f"""CREATE CUSTOM INDEX IF NOT EXISTS idx_tags_p
    ON {keyspace}.philosophers_cql_partitioned (VALUES(tags))
    USING 'org.apache.cassandra.index.sai.StorageAttachedIndex';
"""
session.execute(create_tags_index_p_statement)

现在,在表上重复计算嵌入并插入的步骤。

您可以使用与之前相同的插入代码,因为差异隐藏在“幕后”:数据库将根据此新表的 असतात方案以不同的方式存储插入的行。

但是,为了演示,您将利用 Cassandra 驱动程序提供的一个便捷功能,可以轻松地并发运行多个查询(在本例中为 INSERT)。这是 Cassandra / 通过 CQL 的 Astra DB 非常支持的一项功能,并且可以带来显著的加速,而客户端代码的更改却很少。

(注意:可以另外缓存先前计算的嵌入以节省一些 API 令牌——但在这里,我们希望代码更容易检查。)

from cassandra.concurrent import execute_concurrent_with_args
prepared_insertion = session.prepare(
    f"INSERT INTO {keyspace}.philosophers_cql_partitioned (quote_id, author, body, embedding_vector, tags) VALUES (?, ?, ?, ?, ?);"
)

BATCH_SIZE = 50

num_batches = ((len(philo_dataset) + BATCH_SIZE - 1) // BATCH_SIZE)

quotes_list = philo_dataset["quote"]
authors_list = philo_dataset["author"]
tags_list = philo_dataset["tags"]

print("开始存储条目:")
for batch_i in range(num_batches):
    print("[...", end="")
    b_start = batch_i * BATCH_SIZE
    b_end = (batch_i + 1) * BATCH_SIZE
    # 为此批次计算嵌入向量
    b_emb_results = client.embeddings.create(
        input=quotes_list[b_start : b_end],
        model=embedding_model_name,
    )
    # 准备此批次的插入条目
    tuples_to_insert = []
    for entry_idx, emb_result in zip(range(b_start, b_end), b_emb_results.data):
        if tags_list[entry_idx]:
            tags = {
                tag
                for tag in tags_list[entry_idx].split(";")
            }
        else:
            tags = set()
        author = authors_list[entry_idx]
        quote = quotes_list[entry_idx]
        quote_id = uuid4()  # 为每条名言生成一个新的随机 ID。在生产应用程序中,您需要更好地控制……
        # 将一个*元组*附加到列表中,并在元组中,值按顺序排列以匹配准备好的语句中的“?”:
        tuples_to_insert.append((quote_id, author, quote, emb_result.embedding, tags))
    # 通过驱动程序的并发原语一次性插入批次
    conc_results = execute_concurrent_with_args(
        session,
        prepared_insertion,
        tuples_to_insert,
    )
    # 检查所有插入是否成功(最好始终这样做):
    if any([not success for success, _ in conc_results]):
        print("插入过程中出现问题!")
    else:
        print(f"{len(b_emb_results.data)}] ", end="")

print("\n已完成存储条目。")

尽管表架构不同,但向量搜索背后的数据库查询基本相同:

def find_quote_and_author_p(query_quote, n, author=None, tags=None):
    query_vector = client.embeddings.create(
        input=[query_quote],
        model=embedding_model_name,
    ).data[0].embedding
    # 根据传递的条件,语句中的 WHERE 子句可能会有所不同。
    # 相应地构建它:
    where_clauses = []
    where_values = []
    if author:
        where_clauses += ["author = %s"]
        where_values += [author]
    if tags:
        for tag in tags:
            where_clauses += ["tags CONTAINS %s"]
            where_values += [tag]
    if where_clauses:
        search_statement = f"""SELECT body, author FROM {keyspace}.philosophers_cql_partitioned
            WHERE {' AND '.join(where_clauses)}
            ORDER BY embedding_vector ANN OF %s
            LIMIT %s;
        """
    else:
        search_statement = f"""SELECT body, author FROM {keyspace}.philosophers_cql_partitioned
            ORDER BY embedding_vector ANN OF %s
            LIMIT %s;
        """
    query_values = tuple(where_values + [query_vector] + [n])
    result_rows = session.execute(search_statement, query_values)
    return [
        (result_row.body, result_row.author)
        for result_row in result_rows
    ]

这样就可以了:新表仍然支持“通用”相似性搜索……

find_quote_and_author_p("We struggle all our life for nothing", 3)

……但当指定作者时,您会注意到巨大的性能优势:

find_quote_and_author_p("We struggle all our life for nothing", 2, author="nietzsche")

嗯,如果您有一个真实规模的数据集,您确实会注意到性能上的提升:在这个演示中,只有几十个条目,没有明显的区别——但您明白了。

结论

恭喜!您已经学会了如何使用 OpenAI 进行向量嵌入,以及使用 Astra DB / Cassandra 进行存储,从而构建一个复杂的哲学搜索引擎和名言生成器。

本示例使用了 Cassandra 驱动程序 并直接运行 CQL(Cassandra 查询语言)语句来与向量存储进行交互——但这并非唯一的选择。请查看 README 以了解其他选项以及与流行框架的集成。

要了解有关 Astra DB 的向量搜索功能如何成为您机器学习/生成式人工智能应用程序的关键要素的更多信息,请访问 Astra DB 关于该主题的网页。

清理

如果您想删除为此演示使用的所有资源,请运行此单元格(警告:这将删除表及其插入的数据!):

session.execute(f"DROP TABLE IF EXISTS {keyspace}.philosophers_cql;")
session.execute(f"DROP TABLE IF EXISTS {keyspace}.philosophers_cql_partitioned;")