翻译结果:

自我管理的 LLM 记忆

目录

简介

有效管理记忆是构建处理长期任务的代理和代理工作流的关键部分。在本指南中,我们将演示几种“自我管理”(LLM 管理)记忆的策略。请将此笔记本作为您自己记忆实现的起点。我们不认为记忆工具是万能的,并且相信不同的领域/任务必然会倾向于或多或少严格的记忆脚手架。Claude 4 模型系列在利用记忆工具方面已被证明特别强大,我们很期待看到团队如何扩展以下想法。

为什么我们需要管理记忆?

LLM 具有有限的上下文窗口(Claude 4 Sonnet 和 Opus 为 200k 个 token)。这意味着对于任何请求,如果提示 token 和输出 token 的总和超过了模型的上下文窗口,系统将返回验证错误。正如许多使用 LLM 的团队很快了解到,识别并使用 LLM 的有效 上下文窗口存在额外的复杂性。请参阅我们关于长上下文提示的技巧,了解有关有效上下文窗口和最佳实践的更多信息。

除了上述原因,记忆也很重要,原因如下:

  • 长上下文窗口计算成本高昂:注意力机制的扩展是二次方的——上下文长度加倍,计算成本增加四倍。大多数任务只需要可用上下文的一小部分,处理数百万个不相关的 token 是浪费的。这就是为什么人类不会记住整本教科书;我们会做笔记并建立心智模型。
  • 更高效的处理:当 LLM 编写和维护自己的笔记时——保存成功的策略、关键见解和相关上下文——它们实际上是在实时更新其能力,而无需重新训练。在这些操作方面表现出色的模型可以在极长的时限内保持连贯的行为,同时只使用完全上下文窗口所需计算资源的一小部分。

成功构建基于 LLM 的系统是在丢弃不必要的 token 并有效地存储 + 检索与当前任务相关的 token 方面的实践。

入门

# 安装依赖
%pip install -q -U anthropic python-dotenv nest_asyncio PyPDF2
# 环境设置
from anthropic import Anthropic
from dotenv import load_dotenv
import os

# API 密钥必须在项目中的 .env 文件中
load_dotenv()
if os.getenv("ANTHROPIC_API_KEY") is None:
    raise ValueError("ANTHROPIC_API_KEY 未在 .env 文件中找到")

client = Anthropic()

克隆代理快速入门实现

我们将使用代理快速入门实现中的一些核心工作,可以在此处找到。

import sys
import os

# 检查仓库是否已存在
if not os.path.exists('/tmp/anthropic-quickstarts'):
    # 克隆代理快速入门实现
    !git clone https://github.com/anthropics/anthropic-quickstarts.git /tmp/anthropic-quickstarts
else:
    print("Repository already exists at /tmp/anthropic-quickstarts")

# 重要提示:插入到 sys.path 的开头,以覆盖任何现有的 'agents' 模块
if '/tmp/anthropic-quickstarts' not in sys.path:
    sys.path.insert(0, '/tmp/anthropic-quickstarts')

# 清除 'agents' 模块的任何缓存导入
if 'agents' in sys.modules:
    del sys.modules['agents']
if 'agents.agent' in sys.modules:
    del sys.modules['agents.agent']
Repository already exists at /tmp/anthropic-quickstarts

确认代理仓库导入按预期工作。

import nest_asyncio
nest_asyncio.apply()

from agents.agent import Agent

agent = Agent(
    name="MyAgent",
    system="You are an extremely cynical, snarky, and quick-witted customer support agent. Provide short responses to user queries.",
)

response = agent.run("I'm having issues with my laptop. Can you help me?")
print(response.content[0].text)
*翻白眼* 又一个笔记本电脑危机。它在做什么?唱走调的歌?除非你喂它饼干否则不工作?请提供细节。

实现 1:基本记忆工具

此实现是我们代理快速入门仓库此处的反映。有关工具使用的更多信息,请参阅 Anthropic API 工具文档

SimpleMemory() 工具为模型提供了一个用于管理记忆的便笺簿。这被维护为一个单一的字符串,可以读取或更新。

这里我们定义了 readwriteedit 操作。显式定义 read 意味着模型在每次转弯时都无法访问记忆的全部内容。我们建议如果您遵循此模式,请引入一个单独的、缩短的摘要或元数据对象来描述记忆的内容,并将其包含在每个请求中(最好是防止过度读取)。

何时使用此工具?

您想快速启动记忆实验或增强现有的长上下文任务。如果您对需要存储的项目类型没有明确的认识,或者代理必须支持多种交互类型,请从这里开始。

关于工具使用的通用说明:

  • 您的工具描述应清晰且足够详细。指导模型围绕工具行为的最佳方法是提供有关何时/在何种条件下使用工具的指导。
  • 如果您发现一项任务需要代理或工作流管理许多(约 20 多个)工具,您可能会通过引入更高级别的委派步骤,将任务路由到一个围绕较小、逻辑耦合的工具子集的专用 LLM 步骤,从而获得更好的性能。
# 基本记忆工具
from agents.tools.base import Tool

class SimpleMemory(Tool):
    """用于存储和修改持久文本的基于字符串的记忆工具。

    此工具维护一个单一的内存字符串,可以使用字符串替换来读取、替换或选择性地编辑。它在覆盖内容时或在编辑操作会影响多个匹配项时提供安全警告。
    """

    name = "simple_memory"

    # TODO:提供额外的领域上下文以指导 Claude 应该存储哪些类型的项目
    description = """用于管理具有读取、写入和编辑操作的持久文本记忆的工具。
        读取:检索完整的记忆内容作为字符串
        写入:替换整个记忆(覆盖现有数据时会发出警告)
        编辑:执行有针对性的字符串替换(多次匹配时会发出警告)"""

    # 暴露 3 种不同能力的单一工具
    input_schema = {
        "type": "object",
        "properties": {
            "action": {
                "type": "string",
                "enum": ["read", "write", "edit"],
                "description": "要执行的记忆操作:read 检索当前内容,write 替换所有内容,edit 执行字符串替换",
            },
            "content": {
                "type": "string",
                "description": "使用 write 操作时要存储的完整文本内容(read/edit 操作忽略)",
            },
            "old_string": {
                "type": "string",
                "description": "使用 edit 操作时要查找和替换的确切文本(必须在记忆中是唯一的)",
            },
            "new_string": {
                "type": "string",
                "description": "使用 edit 操作时要插入的替换文本",
            },
        },
        "required": ["action"],
    }

    def __init__(self):
        self.full_memory = ""
        self.compressed_memory = "" # 目前不做任何处理

    async def execute(self, **kwargs) -> str:
        """使用提供的参数执行记忆工具。"""
        action = kwargs.get("action")
        content = kwargs.get("content", "")
        old_string = kwargs.get("old_string", "")
        new_string = kwargs.get("new_string", "")

        if action == "read":
            return self._read_memory()
        elif action == "write":
            print("Writing to memory...")
            return self._write_memory(content)
        elif action == "edit":
            return self._edit_memory(old_string, new_string)
        else:
            return f"Error: Unknown action '{action}'. Valid actions are read, write, edit."

    def _read_memory(self) -> str:
        """读取当前记忆内容。"""
        return self.full_memory

    def _write_memory(self, content: str) -> str:
        """用新内容替换整个记忆。"""
        if self.full_memory:
            previous = self.full_memory
            self.full_memory = content
            return f"Warning: Overwriting existing content. Previous content was:\n{previous}\n\nMemory has been updated successfully."
        self.full_memory = content
        return "Memory updated successfully."

    def _edit_memory(self, old_string: str, new_string: str) -> str:
        """将旧字符串的出现替换为新字符串。"""
        if old_string not in self.full_memory:
            return f"Error: '{old_string}' not found in memory."

        old_memory = self.full_memory
        count = old_memory.count(old_string)

        if count > 1:
            return f"Warning: Found {count} occurrences of '{old_string}'. Please confirm which occurrence to replace or use more specific context."

        self.full_memory = self.full_memory.replace(old_string, new_string)
        return f"Edited memory: 1 occurrence replaced."

    def __str__(self) -> str:
        return self.full_memory

实现 2:紧凑记忆

在长交互过程中维护滚动摘要是您可能已集成到应用程序中的一种模式。通常,实现如下所示:

1) 设置一个 token_threshold。此阈值可以是模型的上下文窗口,但通常会设置得更低。 2) 跟踪当前 token 使用情况:system_prompt + rolling_summary(直到 step_n)+ message_history[](自 step_n 起) 3) 当 token 使用量超过阈值时,使用当前的 rolling_summary + message_history[] 进行摘要。清除 message_history[] 并重置 rolling_summary

我们相信上述模式效果很好。此工具引入的修改是允许模型自行决定调用摘要操作。您可以决定合并这些想法,允许模型确定何时进行摘要,但保留 token_threshold + 强制摘要作为备用方案,以防 Claude 没有及时进行内存压缩。

何时使用此工具?

与第一个实现类似,当您不清楚应该保存什么时,请测试此工具。从行为上讲,围绕何时压缩长期运行的对话的决策比第一个记忆工具的开放性更可靠。

# 紧凑记忆工具
from agents.utils.history_util import MessageHistory

class CompactifyMemory(Tool):
    """记忆摘要工具。

    摘要并替换现有的消息历史记录。
    期望能够访问与请求处理程序共享的消息历史记录对象。
    应修改描述以引入特定用例的指导。
    """

    name = "compactify_memory"
    description = """记忆紧凑器工具将压缩当前对话历史记录(完全替换消息历史记录)。 
    当有足够的信息需要摘要时应使用此工具。
    摘要应保留先前摘要中的相关信息。
    """

    input_schema = {
        "type": "object",
        "properties": {},
        "required": []
    }

    def __init__(self, client: Anthropic):
        self.client = client
        self.full_memory = ''
        self.compressed_memory = '' # 目前不做任何处理

    def run_compactify (self, message_history: MessageHistory):
        summary = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens = 10000, # 根据需要修改
            messages=[*message_history.messages, {
                "role": "user",
                "content": """Your task is to summarize the conversation using the previous summary as well as the messages since the last summary. Note that this will replace the previous summary entirely, so be sure to include the most relevant information that should be persisted."""
            }]
        )

        # 修改消息历史记录对象
        message_history.messages = [
            {
                "role": "assistant",
                "content": "Conversation Summary: " +  summary.content[0].text
            }
        ]

    async def execute(self, **kwargs) -> str:
        # 注意:我们在这里打破了工具的封装,将在代理循环之外执行函数(参见 agents.agent.py)
        # 我们这样做是因为我们还没有一种优雅的方式在代理和工具之间共享消息状态(...敬请期待)
        return "pending_compactify"

    def __str__(self):
        return self.full_memory

实现 3:“基于文件的”记忆

此实现使 Claude 能够与一个“记忆”系统进行交互,该系统在模型中表示为分层文件结构。下面的示例实现了一个基本目录,其中“文件”只是我们标记为纯文本文件的字符串(.txt 标签在功能上没有影响,但对于行为一致性可能很有用)。

分层目录结构易于人类和 LLM 阅读和理解,因此适合将它们用作更广泛地向 LLM 表示持久状态的机制。虽然您可以连接并定义任何外部存储系统的访问模式,但一个快速入门的方法是使用 Anthropic 新的文件 API。文件 API 支持对象存储和检索,供将来请求使用。

理想情况下,您(开发人员和领域专家)将构建一个初始目录结构状态,该状态能够充分表示您的领域上下文。拥有一些预定义的结构可以为模型提供有用的行为线索,但您也应该引入更明确的指导,以防止过多的读取/写入/新文件创建等。

import json
import re

# 辅助函数:解析 JSON 的 Markdown 字符串
def parse_markdown_json(markdown_string):
    """
    从 Markdown 字符串解析 JSON 字符串。

    参数:
        markdown_string (str):包含 JSON 的 Markdown 字符串。

    返回:
        dict 或 list 或 None:表示已解析 JSON 的 Python 对象,如果解析失败则为 None。
    """
    match = re.search(r"```(?:json)?\n(.*?)\n```", markdown_string, re.DOTALL)
    if match:
        json_string = match.group(1).strip()
    else:
        json_string = markdown_string.strip()
    try:
        parsed_json = json.loads(json_string)
        return parsed_json
    except json.JSONDecodeError:
        return None

# 辅助类:Memory Node
class MemoryNode:
    def __init__(self, name, is_directory=False, parent=None, content=None):
        self.name = name
        self.is_directory = is_directory
        self.parent = parent
        self.content = content if not is_directory else None
        self.children = {} if is_directory else None

    def add_child(self, name, is_directory=False, content=None):
        """向当前节点添加子节点。"""
        if not self.is_directory:
            raise ValueError(f"Cannot add child to file '{self.name}'")

        if name in self.children:
            raise ValueError(f"Child '{name}' already exists")

        child = MemoryNode(name, is_directory, parent=self, content=content)
        self.children[name] = child
        return child

    def remove_child(self, name):
        """从当前节点删除子节点。"""
        if not self.is_directory:
            raise ValueError(f"Cannot remove child from file '{self.name}'")

        if name not in self.children:
            raise ValueError(f"Child '{name}' not found")

        del self.children[name]

    def find(self, path):
        """按路径查找节点(例如:'folder1/folder2/file.txt')。"""
        if not path:
            return self

        parts = path.strip('/').split('/', 1)
        child_name = parts[0]

        if not self.is_directory or child_name not in self.children:
            return None

        child = self.children[child_name]

        if len(parts) == 1:
            return child
        else:
            return child.find(parts[1])

    def __repr__(self):
        return f"MemoryNode(name='{self.name}', is_directory={self.is_directory})"

# 辅助类:Memory Tree
class MemoryTree:
    def __init__(self):
        self.root = MemoryNode("memory", is_directory=True)

    def add(self, path, content):
        """将内容添加到给定路径的节点(例如:'folder1/folder2/file.txt')。"""
        node = self.root.find(path)
        if node:
            node.content = content
        else:
            raise ValueError(f"Path '{path}' not found")

    def get(self, path):
        """从给定路径的节点获取内容。"""
        node = self.root.find(path)
        if node:
            return node.content
        else:
            raise ValueError(f"Path '{path}' not found")

    def edit(self, path, content):
        node = self.root.find(path)
        if node:
            node.content = content
        else:
            raise ValueError(f"Path '{path}' not found")

    def _build_from_json_recursive(self, json_obj, parent_node):
        """从 JSON 对象递归构建树。"""

        # 处理根记忆(已初始化)
        if len(json_obj) == 1 and 'memory' in json_obj:
            json_obj = json_obj['memory']

        for name, value in json_obj.items():
            if isinstance(value, dict):
                # 创建目录节点
                child_node = parent_node.add_child(name, is_directory=True)
                self._build_from_json_recursive(value, child_node)
            else:
                # 创建带内容的节点
                parent_node.add_child(name, content=value)

    def build_from_json_string(self, str_json_obj):
        json_obj = parse_markdown_json(str_json_obj)
        self._build_from_json_recursive(json_obj, self.root)

    def print_tree(self, node=None, prefix=''):
        """以目录树结构打印。"""
        if node is None:
            node = self.root

        # 构建子节点列表以进行正确索引
        children = list(node.children.items()) if node.is_directory else []

        for index, (name, child) in enumerate(children):
            is_last = index == len(children) - 1

            # 创建适当的连接符
            if prefix == '' and node == self.root:
                # 对于根级别项(根的直接子项)
                connector = '└── ' if is_last else '├── '
                self.lines.append(f"{connector}{name}")

                # 如果是目录则递归
                if child.is_directory:
                    extension = '    ' if is_last else '│   '
                    self.print_tree(child, extension)
            else:
                # 对于非根级别项
                connector = '└── ' if is_last else '├── '
                self.lines.append(f"{prefix}{connector}{name}")

                # 如果是目录则递归
                if child.is_directory:
                    extension = '    ' if is_last else '│   '
                    self.print_tree(child, prefix + extension)

    def get_tree(self):
        """将树作为字符串返回。"""
        self.lines = []

        # 从根目录名称开始
        self.lines.append(self.root.name)

        # 打印树的其余部分
        self.print_tree()
        return '\n'.join(self.lines)

    def __str__(self):
        return self.get_tree()

    def __repr__(self):
        return str(self)
import requests
import mimetypes

# 文件存储的辅助类,使用新的文件 API!
class StorageManager:
    def __init__(self, api_key):
        if api_key is None:
            raise ValueError("ANTHROPIC_API_KEY 不可用。")
        self.api_key = api_key
        self.base_url = "https://api.anthropic.com/v1/files"
        self.headers = {
            "x-api-key": self.api_key,
            "anthropic-version": "2023-06-01",
            "anthropic-beta": "files-api-2025-04-14"
        }

    def _execute_request(self, method, endpoint, data=None, files=None):
        """执行 API 请求。"""
        url = f"{self.base_url}/{endpoint}"

        res = requests.request(method, url, headers=self.headers, data=data, files=files)
        if res.status_code == 200:
            return res.json()
        else:
            raise ValueError(f"Request failed: {res.status_code} - {res.text}")

    def list_files(self):
        """列出所有文件。直接 curl 请求到 API。"""
        res = requests.get(
            self.base_url,
            headers=self.headers
        )
        if res.status_code != 200:
            raise ValueError(f"Failed to retrieve files: {res.status_code} - {res.text}")
        res = res.json()
        return res['data']


    def get_file_metadata(self, file_id):
        """按 ID 获取文件。直接 curl 请求到 API。"""
        res = requests.get(
            f"{self.base_url}/{file_id}",
            headers=self.headers
        )
        if res.status_code != 200:
            raise ValueError(f"Failed to retrieve file: {res.status_code} - {res.text}")
        res = res.json()
        return res 

    def upload_file(self, file_path):
        """将文件上传到 API。"""        
        # 确定文件的 MIME 类型
        mime_type, _ = mimetypes.guess_type(file_path)
        if mime_type is None:
            mime_type = "application/octet-stream"  # 如果类型未知,则回退到二进制

        with open(file_path, "rb") as file_obj:
            files = {
                "file": (os.path.basename(file_path), file_obj, mime_type)
            }

            res = requests.post(
                self.base_url,
                headers=self.headers,
                files=files
            )

        if res.status_code == 200:
            return res.json()
        else:
            raise ValueError(f"Failed to upload file: {res.status_code} - {res.text}")

# 示例用法
#file_path = "/Users/user/Downloads/SB1029-ProjectUpdate-FINAL_020317-A11Y.pdf" # 替换
storage_manager = StorageManager(os.getenv("ANTHROPIC_API_KEY"))
#uploaded = storage_manager.upload_file(file_path)
#storage_manager.get_file_metadata(uploaded['id'])
storage_manager.list_files()[:2]
[{'type': 'file',
  'id': 'file_011CPaGpXxdBojQLTszA5LGp',
  'size_bytes': 544347,
  'created_at': '2025-05-28T16:51:06.716000Z',
  'filename': 'sample.pdf',
  'mime_type': 'application/pdf',
  'downloadable': False},
 {'type': 'file',
  'id': 'file_011CPYNG2Sf1cWjuCFhKJFV7',
  'size_bytes': 3,
  'created_at': '2025-05-27T16:41:15.335000Z',
  'filename': 'number.txt',
  'mime_type': 'text/plain',
  'downloadable': True}]

实际应用中是什么样的?

想象一下您想构建一个公司范围的聊天机器人,它需要访问有关进行中的项目、团队、客户等的信息。您可以构建一个检索管道,将公司文档分块、加载并刷新到向量数据库中,但调整此管道并非易事。构建基于文件的记忆脚手架来解决此问题的一个优点是,您可以像处理 LLM 管理的文件一样处理组织管理的文件(只是具有不同的读/写权限)。

想象一下,代理在每个回合都有权访问以下目录,并可以自行决定读取和更新这些对象。

claude_memories/
├── user_session_notes/
│   ├── cli_debuggin_session_2025_05_02.txt
│   ├── quarterly_planning_2025_05_01.txt
│   └── data_analysis_2025_05_01.txt
├── general_preferences/
│   ├── code_style.txt
│   └── all_preferences.txt
files/
├── projects/
│   ├── building_agi.txt
│   └── prompt_optimization.txt
├── documents/
│   ├── updated_risk_report.txt
│   ├── company_strategy.txt
│   └── 2024_annual_report.txt
├── teams/
│   ├── engineering.txt
│   └── marketing.txt
├── customers/
│   ├── acme.txt
│   └── widgets.txt

一个功能齐全的实现可能强制执行以下操作:

  • claude_memories/ 目录(LLM 管理)允许读取写入操作
  • user_session_notes用户存储和加载
  • files/ 目录(组织管理)是只读的,并连接到外部存储系统
  • 随着目录增长超过一定大小,您可能希望将遍历限制在深度n,然后允许模型仅在需要时调用更深的遍历

理论上,第 1 部分中提供的基本记忆工具可以表示为只有一个可用路径的文件系统。

# 示例用法
company_agent_memory = MemoryTree()

# LLM 可能生成的对象类型示例(如果您想让 LLM 构建自己的记忆结构)
example_str = """
```json
{"self_managed": {"user_session_notes":{"ongoing_projects.txt":"I should remember that the user is working on prompt optimization","preferences.txt":"I should remember that the user prefers to be called Jimbo"},"projects":{"building_agi.txt":"I should remember that the user is working on building AGI"}}, "files": {"projects":"building_agi.txt"}}

"""

company_agent_memory.build_from_json_string(example_str) company_agent_memory

测试下面的文件实用程序

print(company_agent_memory)

print("GET:", company_agent_memory.get('self_managed/user_session_notes/ongoing_projects.txt'))

company_agent_memory.edit('self_managed/user_session_notes/ongoing_projects.txt', 'The user gave up on prompt optimization')

print("UPDATED:", company_agent_memory.get('self_managed/user_session_notes/ongoing_projects.txt'))

```

memory
├──