-
Notifications
You must be signed in to change notification settings - Fork 690
[Feature] [KVCache] support attention_store kv cache backend #5823
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature] [KVCache] support attention_store kv cache backend #5823
Conversation
|
Thanks for your contribution! |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #5823 +/- ##
==========================================
Coverage ? 67.80%
==========================================
Files ? 383
Lines ? 50105
Branches ? 7840
==========================================
Hits ? 33976
Misses ? 13649
Partials ? 2480
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| if len(k_cache_keys) == 0: | ||
| logger.info(f"No uncached keys found for task {task_id}") | ||
| cpu_block_ids = cpu_block_ids[match_block_num:] | ||
| if match_block_num >= len(k_cache_keys): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里的k_cache_keys已经slice了,有问题吧
| read_storage_task = ReadStorageTask( | ||
| task_id=req_id, | ||
| keys=no_match_block_keys, | ||
| token_ids=input_token_ids, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
如果长文带上token_ids,跨进程通信可能偏重,可以判断下mooncake就不带上token_ids了,看transfer_manager.py也只有as需要token_ids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
嗯可以
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
这个 PR 为 FastDeploy 添加了使用 AttentionStore 作为 KV Cache 外部存储后端的支持。主要目的是扩展缓存存储选项,使系统能够使用 AttentionStore SDK 进行 KV Cache 的读写操作。
Changes:
- 新增
CacheTask抽象数据结构,统一管理缓存传输任务的参数 - 实现
AttentionStore类,集成 attentionstore_sdk 提供 query/read/write 接口 - 重构缓存传输管理器以支持多种存储后端(mooncake 和 attention_store)
- 修复了资源管理器中的 GPU block 使用量统计问题
- 改进了日志文件命名方式和错误信息
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 20 comments.
Show a summary per file
| File | Description |
|---|---|
| cache_tasks.py | 定义了 CacheTask、ReadStorageTask 和 WriteStorageTask 数据类,作为缓存任务的统一数据结构 |
| attention_store.py | AttentionStore 后端的核心实现,包括 SDK 初始化和 query/read/write 方法 |
| cache_transfer_manager.py | 添加了 attention_store 分支的处理逻辑,重构了读写存储的方法签名 |
| prefix_cache_manager.py | 在发起传输任务前构造 CacheTask 对象,传递必要的 token_ids 等参数 |
| kvcache_storage.py | 在存储接口中新增 query() 抽象方法 |
| mooncake_store.py | 为 MooncakeStore 实现 query() 方法 |
| args_utils.py | 在 CLI 参数中添加 "attention_store" 选项 |
| resource_manager_v1.py | 修复了 GPU block 使用量的统计逻辑 |
| engine.py / common_engine.py | 移除了未使用的 cache manager signal 设置代码 |
| test_*.py | 更新测试以适应新的参数要求(model_id, model) |
| cache_messager.py | 改进日志文件命名 |
| metrics.md | 修正文档中的错误描述 |
fastdeploy/cache_manager/transfer_factory/mooncake_store/attention_store.py
Outdated
Show resolved
Hide resolved
| """ | ||
| # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License" | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
|
|
||
| import time | ||
| import traceback | ||
| from dataclasses import dataclass | ||
| from typing import List | ||
|
|
||
| import paddle | ||
|
|
||
| from fastdeploy.cache_manager.transfer_factory.kvcache_storage import ( | ||
| KVCacheStorage, | ||
| logger, | ||
| ) | ||
|
|
||
| try: | ||
| from attentionstore_sdk.sdk import AttentionStoreSDK, Tokens | ||
| from attentionstore_sdk.utils.err import AttentionStoreSDKError | ||
|
|
||
| _ATTENTIONSTORE_AVAILABLE = True | ||
| except Exception: | ||
| AttentionStoreSDK = None | ||
| Tokens = None | ||
| AttentionStoreSDKError = None | ||
| _ATTENTIONSTORE_AVAILABLE = False | ||
|
|
||
|
|
||
| @dataclass | ||
| class AttentionStoreConfig: | ||
| namespace: str = "default_ns" | ||
| pod_name: str = "default_pod" | ||
| model_version: str = "v0" | ||
| shard_id: int = 0 | ||
| shard_num: int = 1 | ||
| layer_num: int = 1 | ||
| block_token_size: int = 64 | ||
| bytes_per_shard_layer_per_block: int = 1024 | ||
| device_id: int = 0 | ||
| dp_id: int = 0 | ||
|
|
||
|
|
||
| class AttentionStore(KVCacheStorage): | ||
| def __init__(self, **args): | ||
|
|
||
| if not _ATTENTIONSTORE_AVAILABLE: | ||
| raise ImportError("Please install attentionstore_sdk to run Fastdeploy with attentionstore_sdk.") | ||
|
|
||
| self.config = AttentionStoreConfig(**args) | ||
|
|
||
| try: | ||
| logger.info(f"[INIT] Start initializing AttentionStoreSDK with config: {self.config}") | ||
| self.sdk = AttentionStoreSDK( | ||
| self.config.namespace, | ||
| self.config.pod_name, | ||
| self.config.model_version, | ||
| self.config.shard_id, | ||
| self.config.shard_num, | ||
| self.config.layer_num, | ||
| self.config.block_token_size, | ||
| self.config.bytes_per_shard_layer_per_block, | ||
| self.config.device_id, | ||
| self.config.dp_id, | ||
| ) | ||
| self.wait_for_sdk_ready(timeout=300, delta_t=5) | ||
| logger.info("[INIT] ✅ AttentionStore is initialized successfully!") | ||
| except Exception as e: | ||
| logger.error( | ||
| f"[INIT] ❌ AttentionStore initialization failed, error: {e}, traceback:\n{traceback.format_exc()}" | ||
| ) | ||
|
|
||
| def wait_for_sdk_ready(self, timeout: float, delta_t: float): | ||
| t = 0 | ||
| while t < timeout: | ||
| try: | ||
| tokens = Tokens(list(range(self.config.block_token_size + 1)), self.config.block_token_size) | ||
| self.sdk.match(tokens, 0, delta_t) | ||
| return | ||
| except AttentionStoreSDKError as e: | ||
| if "cuda memory not ready" in str(e): | ||
| logger.debug("[INIT] cuda memory not ready, try again..") | ||
| time.sleep(delta_t) | ||
| continue | ||
| else: | ||
| raise RuntimeError( | ||
| f"Unexpected exception during AttentionStoreSDK initialization: {e}\n{traceback.format_exc()}" | ||
| ) | ||
| finally: | ||
| t += delta_t | ||
| raise TimeoutError(f"AttentionStoreSDK initialization timed out after {timeout} seconds") | ||
|
|
||
| def read( | ||
| self, | ||
| task_id: str, | ||
| key_cache: List[paddle.Tensor], | ||
| val_cache: List[paddle.Tensor], | ||
| token_ids: List[int], | ||
| gpu_block_ids: List[int], | ||
| start_read_block_idx: int, | ||
| timeout: float = 30.0, | ||
| ): | ||
| logger.debug( | ||
| f"[READ BEGIN] task_id: {task_id} token_ids: {token_ids} gpu_block_ids: {gpu_block_ids} start_read_block_idx: {start_read_block_idx} timeout: {timeout}" | ||
| ) | ||
| tokens = Tokens(token_ids, self.config.block_token_size) | ||
| k_data_ptrs = [k.data_ptr() for k in key_cache] | ||
| v_data_ptrs = [v.data_ptr() for v in val_cache] | ||
| num = 0 | ||
| try: | ||
| num = self.sdk.read( | ||
| list(range(self.config.layer_num)), | ||
| tokens, | ||
| start_read_block_idx, | ||
| k_data_ptrs, | ||
| v_data_ptrs, | ||
| gpu_block_ids, | ||
| timeout, | ||
| ) | ||
| logger.debug(f"[READ END] task_id: {task_id} read_blocks: {num}") | ||
| except AttentionStoreSDKError: | ||
| logger.error( | ||
| f"[READ ERROR] failed to execute sdk read, task_id: {task_id}, traceback:\n{traceback.format_exc()}" | ||
| ) | ||
| return num | ||
|
|
||
| def write( | ||
| self, | ||
| task_id: str, | ||
| key_cache: List[paddle.Tensor], | ||
| val_cache: List[paddle.Tensor], | ||
| token_ids: List[int], | ||
| gpu_block_ids: List[int], | ||
| start_write_block_idx: int, | ||
| timeout: float = 30.0, | ||
| ) -> int: | ||
| logger.debug( | ||
| f"[WRITE BEGIN] task_id: {task_id} token_ids: {token_ids} gpu_block_ids: {gpu_block_ids} start_write_block_idx: {start_write_block_idx} timeout: {timeout}" | ||
| ) | ||
| tokens = Tokens(token_ids, self.config.block_token_size) | ||
| k_data_ptrs = [k.data_ptr() for k in key_cache] | ||
| v_data_ptrs = [v.data_ptr() for v in val_cache] | ||
| num = 0 | ||
| try: | ||
| num = self.sdk.write( | ||
| list(range(self.config.layer_num)), | ||
| tokens, | ||
| start_write_block_idx, | ||
| k_data_ptrs, | ||
| v_data_ptrs, | ||
| gpu_block_ids, | ||
| timeout, | ||
| ) | ||
| logger.debug(f"[WRITE END] task_id: {task_id} written_blocks: {num}") | ||
| except AttentionStoreSDKError: | ||
| logger.error( | ||
| f"[WRITE ERROR] failed to execute sdk write, task_id: {task_id}, traceback:\n{traceback.format_exc()}" | ||
| ) | ||
| return num | ||
|
|
||
| def query(self, task_id: str, token_ids: List[int], start_match_block_idx: int, timeout: float = 10.0): | ||
| """ | ||
| Given the input ids and starting index to match, get the valid blocks number that | ||
| can be prefetched from storage backend. | ||
| """ | ||
| logger.debug( | ||
| f"[QUERY BEGIN] task_id: {task_id} token_ids: {token_ids} start_match_block_idx: {start_match_block_idx} timeout: {timeout}" | ||
| ) | ||
| tokens = Tokens(token_ids, self.config.block_token_size) | ||
| num = 0 | ||
| try: | ||
| num = self.sdk.match(tokens, start_match_block_idx, timeout) | ||
| logger.debug(f"[QUERY END] task_id: {task_id} matched_blocks: {num}") | ||
| except AttentionStoreSDKError: | ||
| logger.error( | ||
| f"[QUERY ERROR] Failed to execute sdk match, task_id: {task_id}, traceback:\n{traceback.format_exc()}" | ||
| ) | ||
| return num | ||
|
|
||
| def get(self, **kwargs): | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def batch_get(self, **kwargs): | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def set(self, **kwargs) -> bool: | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def batch_set(self, **kwargs) -> bool: | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def exists(self, keys: List[str]) -> bool: | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def clear(self) -> bool: | ||
| raise NotImplementedError("AttentionStore does not support this method") | ||
|
|
||
| def register_buffer(self, buffer_ptr, buffer_size, buffer_type="none_type") -> None: | ||
| raise NotImplementedError("AttentionStore does not support this method") |
Copilot
AI
Jan 22, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
新增的 AttentionStore 实现和 CacheTask 数据结构缺少单元测试。虽然 PR 描述中提到进行了准确性测试,但建议添加单元测试覆盖以下场景:
- AttentionStore 的初始化、read、write、query 方法
- CacheTask/ReadStorageTask/WriteStorageTask 的创建和使用
- cache_transfer_manager.py 中 attention_store 分支的逻辑
考虑到项目中已有 cache_manager 相关组件的完整测试套件,建议为新功能添加相应的单元测试。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
后续会添加单测
fastdeploy/cache_manager/transfer_factory/mooncake_store/mooncake_store.py
Show resolved
Hide resolved
fastdeploy/cache_manager/transfer_factory/mooncake_store/attention_store.py
Show resolved
Hide resolved
fastdeploy/cache_manager/transfer_factory/mooncake_store/attention_store.py
Show resolved
Hide resolved
fastdeploy/cache_manager/transfer_factory/mooncake_store/attention_store.py
Show resolved
Hide resolved
fastdeploy/cache_manager/transfer_factory/mooncake_store/attention_store.py
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…tion_store.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
juncaipeng
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Motivation
支持使用 AttentionStore 作为 KV Cache 的外部存储。
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.