Skip to content

Offloading#7017

Open
CyanScholar wants to merge 6 commits intoPaddlePaddle:developfrom
CyanScholar:offloading
Open

Offloading#7017
CyanScholar wants to merge 6 commits intoPaddlePaddle:developfrom
CyanScholar:offloading

Conversation

@CyanScholar
Copy link
Contributor

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings March 25, 2026 12:45
@paddle-bot
Copy link

paddle-bot bot commented Mar 25, 2026

Thanks for your contribution!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 旨在为 decode 实例引入“被抢占(preempted)请求的 KV cache offload/恢复”能力,以减少因 GPU block 不足导致的请求失败,并完善 preempt 同步与 slot 复用逻辑。

Changes:

  • 新增 OffloadManager,并在 ResourceManagerV1 的 preempt/恢复路径中接入 decode KV cache offload 与 resume。
  • 扩展 cache transfer 通道与任务类型(DECODE_OFFLOAD/RESUME/CLEANUP),并在 PrefixCacheManager 支持 transfer result handler。
  • TokenProcessor/测试中补充“等待 PREEMPTED_TOKEN_ID ack 后再 reschedule/释放 slot”的覆盖。

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/output/test_token_processor.py 增加 stopped slot + PREEMPTED_TOKEN_ID 的 reschedule 行为测试覆盖
tests/engine/test_resource_manager_v1.py 增加 offloaded request resume 时重分配 slot、以及 pending preempt slot 保留的测试
fastdeploy/worker/worker_process.py 增加 --enable_decode_offload 并透传到 FDConfig
fastdeploy/utils.py 增加 offload_logger 用于 offload 相关日志
fastdeploy/output/token_processor.py 新增 pending-preempt 查找与“收到 PREEMPTED_TOKEN_ID 后再 reschedule”的处理分支
fastdeploy/envs.py 增加 offload 相关环境变量(开关/内存限制/路径等)
fastdeploy/engine/sched/resource_manager_v1.py 接入 OffloadManager;增加 pending preempt slot 保留;preempt/resume/slot reassignment 逻辑更新
fastdeploy/engine/request.py 为 Request 增加 is_offloaded 状态字段
fastdeploy/engine/offload_manager.py 新增 OffloadManager:下发 transfer task、同步等待结果、维护 offloaded request 状态
fastdeploy/engine/common_engine.py decode 侧对 PREEMPTED task 的处理加入“offloaded 请求不直接报错完成”的分支
fastdeploy/engine/args_utils.py EngineArgs/CLI 增加 --enable-decode-offload 并注入 FDConfig
fastdeploy/config.py FDConfig 增加 enable_decode_offload 字段
fastdeploy/cache_manager/prefix_cache_manager.py 增加 transfer result handler 注册/分发;在 enable_decode_offload 时启动 transfer result 接收线程
fastdeploy/cache_manager/cache_transfer_manager.py 增加 decode offload/resume/cleanup 三类 transfer task 的执行逻辑与快照存储
fastdeploy/cache_manager/cache_tasks.py 增加 DecodeOffloadTask/DecodeResumeTask/DecodeCleanupTask 数据结构
fastdeploy/cache_manager/cache_data.py 为 CacheStatus 增加 DECODE_OFFLOAD/RESUME/CLEANUP 枚举值

batch_id_set.add(data.batch_id)
llm_logger.debug(f"_reschedule_preempt_task_use_zmq batch_id_set {batch_id_set}")
for request_id in need_to_be_reschedule_req_ids:
if request_id not in self.resource_manager.requests:
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里遇到 request_id 不在 resource_manager.requests 时直接 continue,会导致 to_be_rescheduled_request_id_set 里的“悬挂 request_id”永远无法被清理(后续每轮都会重复遍历/跳过)。建议在确认 request 已不存在时,同步从 to_be_rescheduled_request_id_set 移除该 request_id(或统一在 ResourceManagerV1 的删除路径里清理),避免集合无限增长和状态不一致。

Suggested change
if request_id not in self.resource_manager.requests:
if request_id not in self.resource_manager.requests:
# Request already removed; clean up dangling request_id from reschedule set
self.resource_manager.to_be_rescheduled_request_id_set.discard(request_id)

Copilot uses AI. Check for mistakes.
Comment on lines +1038 to +1048
self.llm_logger.error(msg)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里对 RequestType.PREEMPTED(decode 侧)仍向 scheduler.put_results() 发送 finished=True 的 500 错误(仅 is_offloaded 跳过)。但本 PR 同时引入了 preempt 后在本地 reschedule 并继续输出 token 的逻辑;scheduler 一旦因 finished=True 提前回收该 request_id,后续输出会被当作 expired response 丢弃,导致请求无法恢复。建议将 PREEMPTED 作为纯内部控制任务,不要向 scheduler 发送 finished=True 的错误结果(必要时用非 finished 的状态事件/独立通道通知)。

Suggested change
self.llm_logger.error(msg)
self.scheduler.put_results(
[
RequestOutput(
request_id=task.request_id,
finished=True,
error_code=500,
error_msg=msg,
)
]
)
# Treat PREEMPTED as an internal control signal on decode side.
# Do not send a finished=True error result to scheduler here,
# otherwise the request_id would be reclaimed and could not be resumed.
self.llm_logger.info(msg)

Copilot uses AI. Check for mistakes.
Comment on lines +1 to +35
"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""

import threading
import time
from typing import Dict, List, Optional, Tuple

from fastdeploy import envs
from fastdeploy.cache_manager.cache_data import CacheStatus
from fastdeploy.cache_manager.cache_tasks import DecodeCleanupTask, DecodeOffloadTask, DecodeResumeTask
from fastdeploy.engine.request import Request, RequestStatus
from fastdeploy.utils import offload_logger


class OffloadManager:
"""
Decode request KV cache offload orchestrator.

Real KV cache snapshot/restore is executed inside cache_transfer_manager.
This class only manages request-level state, retry policy and task/result
synchronization.
"""
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR 标题目前为 "Offloading",不符合仓库要求的“[CLASS]Title”格式(至少需要一个如 [Feature]/[Optimization]/[Engine] 等标签)。另外 PR 描述仍是模板,缺少 Motivation/Modifications/Usage/Tests 等关键信息,建议补充清楚本次 offload 的使用方式、开启开关、以及对性能/稳定性的影响与验证结果。

Copilot uses AI. Check for mistakes.
Comment on lines +371 to +378
if (
self.config.scheduler_config.splitwise_role == "decode"
and is_decode_phase
and self.offload_manager is not None
and self.offload_manager.can_offload(preempted_req)
):
if self.offload_manager.offload_req(preempted_req):
offloaded = True
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里在 ResourceManagerV1.schedule() 的锁内同步调用 offload_manager.offload_req(),而 offload_req 会阻塞等待 cache_transfer_manager 结果(最长 30s)。这会直接阻塞整个调度循环,带来吞吐下降甚至卡死风险。建议将 offload I/O 与等待逻辑移出调度锁(或异步下发 offload 任务并在后续轮询/回调更新状态),避免持锁等待。

Copilot uses AI. Check for mistakes.
Comment on lines +1034 to +1035
offloaded_info = self.offload_manager._offloaded_requests.get(request.request_id)
num_blocks_for_resume = offloaded_info["num_blocks_needed"] if offloaded_info else 0
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接访问 self.offload_manager._offloaded_requests(私有成员)来获取 offloaded_info,会让 ResourceManagerV1 与 OffloadManager 的内部实现强耦合,后续很容易破坏兼容性。建议在 OffloadManager 增加公开方法(例如 get_offloaded_info()/get_num_blocks_needed() 或 can_resume 返回需要的 blocks 信息)来替代直接读私有字段。

Suggested change
offloaded_info = self.offload_manager._offloaded_requests.get(request.request_id)
num_blocks_for_resume = offloaded_info["num_blocks_needed"] if offloaded_info else 0
if hasattr(self.offload_manager, "get_num_blocks_needed"):
# Prefer using public API of OffloadManager to avoid relying on its internal state
num_blocks_for_resume = self.offload_manager.get_num_blocks_needed(request.request_id)
else:
# Fallback: if no public API is available, assume no extra blocks are needed
num_blocks_for_resume = 0

Copilot uses AI. Check for mistakes.
Comment on lines 1512 to 1520
with self.lock:
for req_id in request_ids:
request = self.requests.get(req_id)
if request is None:
llm_logger.error(f"invalid request id: {req_id} self.requests: {self.requests}")
continue
if self.offload_manager is not None:
self.offload_manager.cleanup_offloaded_request(req_id)
if request in self.waiting:
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finish_requests() 在持有 self.lock 的情况下调用 offload_manager.cleanup_offloaded_request(),而 cleanup 内部同样会同步等待 transfer 结果(最长 30s),同样存在持锁阻塞风险;并且该调用发生在判断 request 是否仍在 waiting 之前,若后续因在 waiting 而直接 continue,会造成 snapshot 已清理但请求对象仍保留在 waiting 的不一致状态。建议:1)先处理 waiting/running 状态并从队列中移除;2)在释放锁后再执行 cleanup(或将 cleanup 异步化)。

Copilot uses AI. Check for mistakes.
Comment on lines +45 to +49
self.enable_offload = getattr(config, "enable_decode_offload", False) if config else False
self.min_steps = 20
self.cpu_offloading_chunk_size = getattr(envs, "FD_CPU_OFFLOAD_CHUNK_SIZE", 8192)
self.cpu_memory_limit = getattr(envs, "FD_CPU_MEMORY_LIMIT", 50 * 1024 * 1024 * 1024)

Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OffloadManager 中读取了 FD_CPU_MEMORY_LIMIT/FD_CPU_OFFLOAD_CHUNK_SIZE,但当前实现未使用这些参数来限制/分块 snapshot(CPU 侧仍会把所有 block tensor 常驻到内存里)。在启用 offload 时这可能导致主机内存不可控增长直至 OOM。建议在 offload_req 前估算 snapshot 体积并做上限拒绝/淘汰策略,或在 cache_transfer_manager 侧按 chunk 分批拷贝并可落盘(结合 FD_OFFLOAD_STORAGE_PATH)。

Copilot uses AI. Check for mistakes.
Comment on lines +872 to 876
"--enable_decode_offload",
action="store_true",
help="enable decode KV cache offload for preempted requests",
)
parser.add_argument(
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worker_process.py 新增了 --enable_decode_offload 参数,但从 common_engine._start_worker_service() 目前的 worker_store_true_flag 看,该 flag 不会被 engine 启动 worker 时透传,导致 worker 侧 args.enable_decode_offload 永远为 False(配置在 engine/worker 间不一致)。建议同步在启动 worker 的参数构造处加入该开关透传,或移除此处未被使用/无法生效的 worker 侧参数以避免误导。

Suggested change
"--enable_decode_offload",
action="store_true",
help="enable decode KV cache offload for preempted requests",
)
parser.add_argument(

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants