Skip to content

Conversation

@liyonghua0910
Copy link
Collaborator

@liyonghua0910 liyonghua0910 commented Dec 29, 2025

Motivation

支持使用 AttentionStore 作为 KV Cache 的外部存储。

Modifications

File Modification
cache_tasks.py 抽象出来的 CacheTask 类,作为缓存传输任务的公用结构体
cache_transfer_manager.py 新增 attention_store 分支的处理逻辑
prefix_cache_manager.py 发起传输任务前构造 CacheTask,包含必要的参数
kvcache_storage.py 新增 query() 接口,查询匹配的 block 数
attention_store.py 核心实现,包括 sdk 初始化和 query/read/write 接口实现

Usage or Command

python -m fastdeploy.entrypoints.openai.api_server \
    --tensor-parallel-size 1 \
    --port 8580 \
    --quantization wint8 \
    --max-model-len 32768 \
    --max-num-seqs 128 \
    --num-gpu-blocks-override 1000 \
    --model /models/ERNIE-4.5-0.3B-Paddle \
    --kvcache-storage-backend attention_store

Accuracy Tests

image

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Dec 29, 2025

Thanks for your contribution!

@codecov-commenter
Copy link

codecov-commenter commented Jan 4, 2026

Codecov Report

❌ Patch coverage is 27.93103% with 209 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (develop@309c7d9). Learn more about missing BASE report.

Files with missing lines Patch % Lines
fastdeploy/cache_manager/cache_transfer_manager.py 13.97% 117 Missing ⚠️
...transfer_factory/mooncake_store/attention_store.py 40.00% 57 Missing ⚠️
fastdeploy/cache_manager/prefix_cache_manager.py 16.00% 21 Missing ⚠️
.../transfer_factory/mooncake_store/mooncake_store.py 12.50% 7 Missing ⚠️
fastdeploy/engine/sched/resource_manager_v1.py 0.00% 5 Missing ⚠️
fastdeploy/cache_manager/cache_messager.py 33.33% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             develop    #5823   +/-   ##
==========================================
  Coverage           ?   67.80%           
==========================================
  Files              ?      383           
  Lines              ?    50105           
  Branches           ?     7840           
==========================================
  Hits               ?    33976           
  Misses             ?    13649           
  Partials           ?     2480           
Flag Coverage Δ
GPU 67.80% <27.93%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@liyonghua0910 liyonghua0910 changed the title [Feature] support attention_store kv cache backend [Feature] [KVCache] support attention_store kv cache backend Jan 5, 2026
if len(k_cache_keys) == 0:
logger.info(f"No uncached keys found for task {task_id}")
cpu_block_ids = cpu_block_ids[match_block_num:]
if match_block_num >= len(k_cache_keys):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的k_cache_keys已经slice了,有问题吧

read_storage_task = ReadStorageTask(
task_id=req_id,
keys=no_match_block_keys,
token_ids=input_token_ids,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果长文带上token_ids,跨进程通信可能偏重,可以判断下mooncake就不带上token_ids了,看transfer_manager.py也只有as需要token_ids

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯可以

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

这个 PR 为 FastDeploy 添加了使用 AttentionStore 作为 KV Cache 外部存储后端的支持。主要目的是扩展缓存存储选项,使系统能够使用 AttentionStore SDK 进行 KV Cache 的读写操作。

Changes:

  • 新增 CacheTask 抽象数据结构,统一管理缓存传输任务的参数
  • 实现 AttentionStore 类,集成 attentionstore_sdk 提供 query/read/write 接口
  • 重构缓存传输管理器以支持多种存储后端(mooncake 和 attention_store)
  • 修复了资源管理器中的 GPU block 使用量统计问题
  • 改进了日志文件命名方式和错误信息

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 20 comments.

Show a summary per file
File Description
cache_tasks.py 定义了 CacheTask、ReadStorageTask 和 WriteStorageTask 数据类,作为缓存任务的统一数据结构
attention_store.py AttentionStore 后端的核心实现,包括 SDK 初始化和 query/read/write 方法
cache_transfer_manager.py 添加了 attention_store 分支的处理逻辑,重构了读写存储的方法签名
prefix_cache_manager.py 在发起传输任务前构造 CacheTask 对象,传递必要的 token_ids 等参数
kvcache_storage.py 在存储接口中新增 query() 抽象方法
mooncake_store.py 为 MooncakeStore 实现 query() 方法
args_utils.py 在 CLI 参数中添加 "attention_store" 选项
resource_manager_v1.py 修复了 GPU block 使用量的统计逻辑
engine.py / common_engine.py 移除了未使用的 cache manager signal 设置代码
test_*.py 更新测试以适应新的参数要求(model_id, model)
cache_messager.py 改进日志文件命名
metrics.md 修正文档中的错误描述

Comment on lines 1 to 210
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

import time
import traceback
from dataclasses import dataclass
from typing import List

import paddle

from fastdeploy.cache_manager.transfer_factory.kvcache_storage import (
KVCacheStorage,
logger,
)

try:
from attentionstore_sdk.sdk import AttentionStoreSDK, Tokens
from attentionstore_sdk.utils.err import AttentionStoreSDKError

_ATTENTIONSTORE_AVAILABLE = True
except Exception:
AttentionStoreSDK = None
Tokens = None
AttentionStoreSDKError = None
_ATTENTIONSTORE_AVAILABLE = False


@dataclass
class AttentionStoreConfig:
namespace: str = "default_ns"
pod_name: str = "default_pod"
model_version: str = "v0"
shard_id: int = 0
shard_num: int = 1
layer_num: int = 1
block_token_size: int = 64
bytes_per_shard_layer_per_block: int = 1024
device_id: int = 0
dp_id: int = 0


class AttentionStore(KVCacheStorage):
def __init__(self, **args):

if not _ATTENTIONSTORE_AVAILABLE:
raise ImportError("Please install attentionstore_sdk to run Fastdeploy with attentionstore_sdk.")

self.config = AttentionStoreConfig(**args)

try:
logger.info(f"[INIT] Start initializing AttentionStoreSDK with config: {self.config}")
self.sdk = AttentionStoreSDK(
self.config.namespace,
self.config.pod_name,
self.config.model_version,
self.config.shard_id,
self.config.shard_num,
self.config.layer_num,
self.config.block_token_size,
self.config.bytes_per_shard_layer_per_block,
self.config.device_id,
self.config.dp_id,
)
self.wait_for_sdk_ready(timeout=300, delta_t=5)
logger.info("[INIT] ✅ AttentionStore is initialized successfully!")
except Exception as e:
logger.error(
f"[INIT] ❌ AttentionStore initialization failed, error: {e}, traceback:\n{traceback.format_exc()}"
)

def wait_for_sdk_ready(self, timeout: float, delta_t: float):
t = 0
while t < timeout:
try:
tokens = Tokens(list(range(self.config.block_token_size + 1)), self.config.block_token_size)
self.sdk.match(tokens, 0, delta_t)
return
except AttentionStoreSDKError as e:
if "cuda memory not ready" in str(e):
logger.debug("[INIT] cuda memory not ready, try again..")
time.sleep(delta_t)
continue
else:
raise RuntimeError(
f"Unexpected exception during AttentionStoreSDK initialization: {e}\n{traceback.format_exc()}"
)
finally:
t += delta_t
raise TimeoutError(f"AttentionStoreSDK initialization timed out after {timeout} seconds")

def read(
self,
task_id: str,
key_cache: List[paddle.Tensor],
val_cache: List[paddle.Tensor],
token_ids: List[int],
gpu_block_ids: List[int],
start_read_block_idx: int,
timeout: float = 30.0,
):
logger.debug(
f"[READ BEGIN] task_id: {task_id} token_ids: {token_ids} gpu_block_ids: {gpu_block_ids} start_read_block_idx: {start_read_block_idx} timeout: {timeout}"
)
tokens = Tokens(token_ids, self.config.block_token_size)
k_data_ptrs = [k.data_ptr() for k in key_cache]
v_data_ptrs = [v.data_ptr() for v in val_cache]
num = 0
try:
num = self.sdk.read(
list(range(self.config.layer_num)),
tokens,
start_read_block_idx,
k_data_ptrs,
v_data_ptrs,
gpu_block_ids,
timeout,
)
logger.debug(f"[READ END] task_id: {task_id} read_blocks: {num}")
except AttentionStoreSDKError:
logger.error(
f"[READ ERROR] failed to execute sdk read, task_id: {task_id}, traceback:\n{traceback.format_exc()}"
)
return num

def write(
self,
task_id: str,
key_cache: List[paddle.Tensor],
val_cache: List[paddle.Tensor],
token_ids: List[int],
gpu_block_ids: List[int],
start_write_block_idx: int,
timeout: float = 30.0,
) -> int:
logger.debug(
f"[WRITE BEGIN] task_id: {task_id} token_ids: {token_ids} gpu_block_ids: {gpu_block_ids} start_write_block_idx: {start_write_block_idx} timeout: {timeout}"
)
tokens = Tokens(token_ids, self.config.block_token_size)
k_data_ptrs = [k.data_ptr() for k in key_cache]
v_data_ptrs = [v.data_ptr() for v in val_cache]
num = 0
try:
num = self.sdk.write(
list(range(self.config.layer_num)),
tokens,
start_write_block_idx,
k_data_ptrs,
v_data_ptrs,
gpu_block_ids,
timeout,
)
logger.debug(f"[WRITE END] task_id: {task_id} written_blocks: {num}")
except AttentionStoreSDKError:
logger.error(
f"[WRITE ERROR] failed to execute sdk write, task_id: {task_id}, traceback:\n{traceback.format_exc()}"
)
return num

def query(self, task_id: str, token_ids: List[int], start_match_block_idx: int, timeout: float = 10.0):
"""
Given the input ids and starting index to match, get the valid blocks number that
can be prefetched from storage backend.
"""
logger.debug(
f"[QUERY BEGIN] task_id: {task_id} token_ids: {token_ids} start_match_block_idx: {start_match_block_idx} timeout: {timeout}"
)
tokens = Tokens(token_ids, self.config.block_token_size)
num = 0
try:
num = self.sdk.match(tokens, start_match_block_idx, timeout)
logger.debug(f"[QUERY END] task_id: {task_id} matched_blocks: {num}")
except AttentionStoreSDKError:
logger.error(
f"[QUERY ERROR] Failed to execute sdk match, task_id: {task_id}, traceback:\n{traceback.format_exc()}"
)
return num

def get(self, **kwargs):
raise NotImplementedError("AttentionStore does not support this method")

def batch_get(self, **kwargs):
raise NotImplementedError("AttentionStore does not support this method")

def set(self, **kwargs) -> bool:
raise NotImplementedError("AttentionStore does not support this method")

def batch_set(self, **kwargs) -> bool:
raise NotImplementedError("AttentionStore does not support this method")

def exists(self, keys: List[str]) -> bool:
raise NotImplementedError("AttentionStore does not support this method")

def clear(self) -> bool:
raise NotImplementedError("AttentionStore does not support this method")

def register_buffer(self, buffer_ptr, buffer_size, buffer_type="none_type") -> None:
raise NotImplementedError("AttentionStore does not support this method")
Copy link

Copilot AI Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增的 AttentionStore 实现和 CacheTask 数据结构缺少单元测试。虽然 PR 描述中提到进行了准确性测试,但建议添加单元测试覆盖以下场景:

  1. AttentionStore 的初始化、read、write、query 方法
  2. CacheTask/ReadStorageTask/WriteStorageTask 的创建和使用
  3. cache_transfer_manager.py 中 attention_store 分支的逻辑

考虑到项目中已有 cache_manager 相关组件的完整测试套件,建议为新功能添加相应的单元测试。

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

后续会添加单测

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…tion_store.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link
Collaborator

@juncaipeng juncaipeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Jiang-Jia-Jun Jiang-Jia-Jun merged commit 8d27a52 into PaddlePaddle:develop Jan 22, 2026
21 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants