使用 PolarDB-PG 作为 OpenAI 嵌入的向量数据库
本笔记本将指导您逐步了解如何使用 PolarDB-PG 作为 OpenAI 嵌入的向量数据库。
本笔记本展示了一个端到端的过程:
- 使用 OpenAI API 创建的预计算嵌入。
- 将嵌入存储在 PolarDB-PG 的云实例中。
- 使用 OpenAI API 将原始文本查询转换为嵌入。
- 使用 PolarDB-PG 在创建的集合中执行最近邻搜索。
什么是 PolarDB-PG
PolarDB-PG 是一个高性能的向量数据库,采用读写分离架构。它是阿里云管理的原生云数据库,100% 兼容 PostgreSQL,并与 Oracle 语法高度兼容。它支持处理海量向量数据存储和查询,并通过优化底层执行算法大大提高了向量计算的效率,为用户提供快速、弹性、高性能、海量存储、安全可靠的向量数据库服务。此外,PolarDB-PG 还支持多维多模态时空信息引擎和地理信息引擎。同时,PolarDB-PG 还具备完整的 OLAP 功能和 SLA 服务,已获得众多用户的认可和使用;
部署选项
- 使用 PolarDB-PG 云向量数据库。单击此处可快速部署。
先决条件
为了完成此练习,我们需要准备几件事:
- PolarDB-PG 云服务器实例。
- 'psycopg2' 库,用于与向量数据库进行交互。任何其他 postgresql 客户端库都可以。
- 一个 OpenAI API 密钥。
我们可以通过运行简单的 curl 命令来验证服务器是否已成功启动:
安装要求
本笔记本显然需要 openai
和 psycopg2
包,但还有一些其他附加库我们将使用。以下命令将它们全部安装:
! pip install openai psycopg2 pandas wget
准备您的 OpenAI API 密钥 OpenAI API 密钥用于文档和查询的向量化。
如果您没有 OpenAI API 密钥,可以从 https://beta.openai.com/account/api-keys 获取。
获得密钥后,请将其添加为环境变量 OPENAI_API_KEY。
如果您对如何通过环境变量设置 API 密钥有任何疑问,请参阅 API 密钥安全最佳实践。
# 测试您的 OpenAI API 密钥是否已正确设置为环境变量
# 注意。如果您在本地运行此笔记本,则需要重新加载终端和笔记本才能使环境变量生效。
if os.getenv("OPENAI_API_KEY") is not None:
print("OPENAI_API_KEY is ready")
else:
print("OPENAI_API_KEY environment variable not found")
OPENAI_API_KEY is ready
连接到 PolarDB
首先将其添加到您的环境变量中。或者,您也可以直接更改下面的 "psycopg2.connect" 参数
使用官方 Python 库连接到正在运行的 PolarDB 服务器实例非常简单:
import os
import psycopg2
# 注意。或者,您可以设置一个临时环境变量,如下所示:
# os.environ["PGHOST"] = "your_host"
# os.environ["PGPORT"] "5432"),
# os.environ["PGDATABASE"] "postgres"),
# os.environ["PGUSER"] "user"),
# os.environ["PGPASSWORD"] "password"),
connection = psycopg2.connect(
host=os.environ.get("PGHOST", "localhost"),
port=os.environ.get("PGPORT", "5432"),
database=os.environ.get("PGDATABASE", "postgres"),
user=os.environ.get("PGUSER", "user"),
password=os.environ.get("PGPASSWORD", "password")
)
# 创建一个新的游标对象
cursor = connection.cursor()
我们可以通过运行任何可用方法来测试连接:
# 执行一个简单的查询来测试连接
cursor.execute("SELECT 1;")
result = cursor.fetchone()
# 检查查询结果
if result == (1,):
print("Connection successful!")
else:
print("Connection failed.")
Connection successful!
import wget
embeddings_url = "https://cdn.openai.com/API/examples/data/vector_database_wikipedia_articles_embedded.zip"
# 文件大约为 700 MB,因此这需要一些时间
wget.download(embeddings_url)
'vector_database_wikipedia_articles_embedded.zip'
下载的文件然后必须解压:
import zipfile
import os
import re
import tempfile
current_directory = os.getcwd()
zip_file_path = os.path.join(current_directory, "vector_database_wikipedia_articles_embedded.zip")
output_directory = os.path.join(current_directory, "../../data")
with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
zip_ref.extractall(output_directory)
# 检查 csv 文件是否存在
file_name = "vector_database_wikipedia_articles_embedded.csv"
data_directory = os.path.join(current_directory, "../../data")
file_path = os.path.join(data_directory, file_name)
if os.path.exists(file_path):
print(f"The file {file_name} exists in the data directory.")
else:
print(f"The file {file_name} does not exist in the data directory.")
The file vector_database_wikipedia_articles_embedded.csv exists in the data directory.
索引数据
PolarDB 将数据存储在 关系 中,其中每个对象至少由一个向量描述。我们的关系将被称为 articles,每个对象将由 title 和 content 向量描述。
我们将首先创建一个关系,并在 title 和 content 上创建一个向量索引,然后用我们预先计算的嵌入填充它。
create_table_sql = '''
CREATE TABLE IF NOT EXISTS public.articles (
id INTEGER NOT NULL,
url TEXT,
title TEXT,
content TEXT,
title_vector vector(1536),
content_vector vector(1536),
vector_id INTEGER
);
ALTER TABLE public.articles ADD PRIMARY KEY (id);
'''
# 用于创建索引的 SQL 语句
create_indexes_sql = '''
CREATE INDEX ON public.articles USING ivfflat (content_vector) WITH (lists = 1000);
CREATE INDEX ON public.articles USING ivfflat (title_vector) WITH (lists = 1000);
'''
# 执行 SQL 语句
cursor.execute(create_table_sql)
cursor.execute(create_indexes_sql)
# 提交更改
connection.commit()
加载数据
在本节中,我们将加载在此会话之前准备好的数据,这样您就不必花费自己的积分重新计算维基百科文章的嵌入。
import io
# 本地 CSV 文件的路径
csv_file_path = '../../data/vector_database_wikipedia_articles_embedded.csv'
# 定义一个生成器函数逐行处理文件
def process_file(file_path):
with open(file_path, 'r') as file:
for line in file:
yield line
# 创建一个 StringIO 对象来存储修改后的行
modified_lines = io.StringIO(''.join(list(process_file(csv_file_path))))
# 为 copy_expert 方法创建 COPY 命令
copy_command = '''
COPY public.articles (id, url, title, content, title_vector, content_vector, vector_id)
FROM STDIN WITH (FORMAT CSV, HEADER true, DELIMITER ',');
'''
# 使用 copy_expert 方法执行 COPY 命令
cursor.copy_expert(copy_command, modified_lines)
# 提交更改
connection.commit()
# 检查集合大小以确保所有点都已存储
count_sql = """select count(*) from public.articles;"""
cursor.execute(count_sql)
result = cursor.fetchone()
print(f"Count:{result[0]}")
Count:25000
搜索数据
将数据放入 Qdrant 后,我们将开始查询集合以查找最接近的向量。我们可以提供一个附加参数 vector_name
来从基于标题的搜索切换到基于内容的搜索。由于预计算的嵌入是使用 text-embedding-3-small
OpenAI 模型创建的,因此我们在搜索时也必须使用它。
def query_polardb(query, collection_name, vector_name="title_vector", top_k=20):
# 使用 OpenAI API 从用户查询创建嵌入向量
embedded_query = openai.Embedding.create(
input=query,
model="text-embedding-3-small",
)["data"][0]["embedding"]
# 将 embedded_query 转换为 PostgreSQL 兼容格式
embedded_query_pg = "[" + ",".join(map(str, embedded_query)) + "]"
# 创建 SQL 查询
query_sql = f"""
SELECT id, url, title, l2_distance({vector_name},'{embedded_query_pg}'::VECTOR(1536)) AS similarity
FROM {collection_name}
ORDER BY {vector_name} <-> '{embedded_query_pg}'::VECTOR(1536)
LIMIT {top_k};
"""
# 执行查询
cursor.execute(query_sql)
results = cursor.fetchall()
return results
import openai
query_results = query_polardb("modern art in Europe", "Articles")
for i, result in enumerate(query_results):
print(f"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})")
1. Museum of Modern Art (Score: 0.5)
2. Western Europe (Score: 0.485)
3. Renaissance art (Score: 0.479)
4. Pop art (Score: 0.472)
5. Northern Europe (Score: 0.461)
6. Hellenistic art (Score: 0.457)
7. Modernist literature (Score: 0.447)
8. Art film (Score: 0.44)
9. Central Europe (Score: 0.439)
10. European (Score: 0.437)
11. Art (Score: 0.437)
12. Byzantine art (Score: 0.436)
13. Postmodernism (Score: 0.434)
14. Eastern Europe (Score: 0.433)
15. Europe (Score: 0.433)
16. Cubism (Score: 0.432)
17. Impressionism (Score: 0.432)
18. Bauhaus (Score: 0.431)
19. Surrealism (Score: 0.429)
20. Expressionism (Score: 0.429)
# This time we'll query using content vector
query_results = query_polardb("Famous battles in Scottish history", "Articles", "content_vector")
for i, result in enumerate(query_results):
print(f"{i + 1}. {result[2]} (Score: {round(1 - result[3], 3)})")
1. Battle of Bannockburn (Score: 0.489)
2. Wars of Scottish Independence (Score: 0.474)
3. 1651 (Score: 0.457)
4. First War of Scottish Independence (Score: 0.452)
5. Robert I of Scotland (Score: 0.445)
6. 841 (Score: 0.441)
7. 1716 (Score: 0.441)
8. 1314 (Score: 0.429)
9. 1263 (Score: 0.428)
10. William Wallace (Score: 0.426)
11. Stirling (Score: 0.419)
12. 1306 (Score: 0.419)
13. 1746 (Score: 0.418)
14. 1040s (Score: 0.414)
15. 1106 (Score: 0.412)
16. 1304 (Score: 0.411)
17. David II of Scotland (Score: 0.408)
18. Braveheart (Score: 0.407)
19. 1124 (Score: 0.406)
20. July 27 (Score: 0.405)