Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在为 decode 实例引入“被抢占(preempted)请求的 KV cache offload/恢复”能力,以减少因 GPU block 不足导致的请求失败,并完善 preempt 同步与 slot 复用逻辑。
Changes:
- 新增
OffloadManager,并在ResourceManagerV1的 preempt/恢复路径中接入 decode KV cache offload 与 resume。 - 扩展 cache transfer 通道与任务类型(DECODE_OFFLOAD/RESUME/CLEANUP),并在
PrefixCacheManager支持 transfer result handler。 - 在
TokenProcessor/测试中补充“等待 PREEMPTED_TOKEN_ID ack 后再 reschedule/释放 slot”的覆盖。
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/output/test_token_processor.py | 增加 stopped slot + PREEMPTED_TOKEN_ID 的 reschedule 行为测试覆盖 |
| tests/engine/test_resource_manager_v1.py | 增加 offloaded request resume 时重分配 slot、以及 pending preempt slot 保留的测试 |
| fastdeploy/worker/worker_process.py | 增加 --enable_decode_offload 并透传到 FDConfig |
| fastdeploy/utils.py | 增加 offload_logger 用于 offload 相关日志 |
| fastdeploy/output/token_processor.py | 新增 pending-preempt 查找与“收到 PREEMPTED_TOKEN_ID 后再 reschedule”的处理分支 |
| fastdeploy/envs.py | 增加 offload 相关环境变量(开关/内存限制/路径等) |
| fastdeploy/engine/sched/resource_manager_v1.py | 接入 OffloadManager;增加 pending preempt slot 保留;preempt/resume/slot reassignment 逻辑更新 |
| fastdeploy/engine/request.py | 为 Request 增加 is_offloaded 状态字段 |
| fastdeploy/engine/offload_manager.py | 新增 OffloadManager:下发 transfer task、同步等待结果、维护 offloaded request 状态 |
| fastdeploy/engine/common_engine.py | decode 侧对 PREEMPTED task 的处理加入“offloaded 请求不直接报错完成”的分支 |
| fastdeploy/engine/args_utils.py | EngineArgs/CLI 增加 --enable-decode-offload 并注入 FDConfig |
| fastdeploy/config.py | FDConfig 增加 enable_decode_offload 字段 |
| fastdeploy/cache_manager/prefix_cache_manager.py | 增加 transfer result handler 注册/分发;在 enable_decode_offload 时启动 transfer result 接收线程 |
| fastdeploy/cache_manager/cache_transfer_manager.py | 增加 decode offload/resume/cleanup 三类 transfer task 的执行逻辑与快照存储 |
| fastdeploy/cache_manager/cache_tasks.py | 增加 DecodeOffloadTask/DecodeResumeTask/DecodeCleanupTask 数据结构 |
| fastdeploy/cache_manager/cache_data.py | 为 CacheStatus 增加 DECODE_OFFLOAD/RESUME/CLEANUP 枚举值 |
| batch_id_set.add(data.batch_id) | ||
| llm_logger.debug(f"_reschedule_preempt_task_use_zmq batch_id_set {batch_id_set}") | ||
| for request_id in need_to_be_reschedule_req_ids: | ||
| if request_id not in self.resource_manager.requests: |
There was a problem hiding this comment.
这里遇到 request_id 不在 resource_manager.requests 时直接 continue,会导致 to_be_rescheduled_request_id_set 里的“悬挂 request_id”永远无法被清理(后续每轮都会重复遍历/跳过)。建议在确认 request 已不存在时,同步从 to_be_rescheduled_request_id_set 移除该 request_id(或统一在 ResourceManagerV1 的删除路径里清理),避免集合无限增长和状态不一致。
| if request_id not in self.resource_manager.requests: | |
| if request_id not in self.resource_manager.requests: | |
| # Request already removed; clean up dangling request_id from reschedule set | |
| self.resource_manager.to_be_rescheduled_request_id_set.discard(request_id) |
| self.llm_logger.error(msg) | ||
| self.scheduler.put_results( | ||
| [ | ||
| RequestOutput( | ||
| request_id=task.request_id, | ||
| finished=True, | ||
| error_code=500, | ||
| error_msg=msg, | ||
| ) | ||
| ] | ||
| ) |
There was a problem hiding this comment.
这里对 RequestType.PREEMPTED(decode 侧)仍向 scheduler.put_results() 发送 finished=True 的 500 错误(仅 is_offloaded 跳过)。但本 PR 同时引入了 preempt 后在本地 reschedule 并继续输出 token 的逻辑;scheduler 一旦因 finished=True 提前回收该 request_id,后续输出会被当作 expired response 丢弃,导致请求无法恢复。建议将 PREEMPTED 作为纯内部控制任务,不要向 scheduler 发送 finished=True 的错误结果(必要时用非 finished 的状态事件/独立通道通知)。
| self.llm_logger.error(msg) | |
| self.scheduler.put_results( | |
| [ | |
| RequestOutput( | |
| request_id=task.request_id, | |
| finished=True, | |
| error_code=500, | |
| error_msg=msg, | |
| ) | |
| ] | |
| ) | |
| # Treat PREEMPTED as an internal control signal on decode side. | |
| # Do not send a finished=True error result to scheduler here, | |
| # otherwise the request_id would be reclaimed and could not be resumed. | |
| self.llm_logger.info(msg) |
| """ | ||
| # Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License" | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
| """ | ||
|
|
||
| import threading | ||
| import time | ||
| from typing import Dict, List, Optional, Tuple | ||
|
|
||
| from fastdeploy import envs | ||
| from fastdeploy.cache_manager.cache_data import CacheStatus | ||
| from fastdeploy.cache_manager.cache_tasks import DecodeCleanupTask, DecodeOffloadTask, DecodeResumeTask | ||
| from fastdeploy.engine.request import Request, RequestStatus | ||
| from fastdeploy.utils import offload_logger | ||
|
|
||
|
|
||
| class OffloadManager: | ||
| """ | ||
| Decode request KV cache offload orchestrator. | ||
|
|
||
| Real KV cache snapshot/restore is executed inside cache_transfer_manager. | ||
| This class only manages request-level state, retry policy and task/result | ||
| synchronization. | ||
| """ |
There was a problem hiding this comment.
PR 标题目前为 "Offloading",不符合仓库要求的“[CLASS]Title”格式(至少需要一个如 [Feature]/[Optimization]/[Engine] 等标签)。另外 PR 描述仍是模板,缺少 Motivation/Modifications/Usage/Tests 等关键信息,建议补充清楚本次 offload 的使用方式、开启开关、以及对性能/稳定性的影响与验证结果。
| if ( | ||
| self.config.scheduler_config.splitwise_role == "decode" | ||
| and is_decode_phase | ||
| and self.offload_manager is not None | ||
| and self.offload_manager.can_offload(preempted_req) | ||
| ): | ||
| if self.offload_manager.offload_req(preempted_req): | ||
| offloaded = True |
There was a problem hiding this comment.
这里在 ResourceManagerV1.schedule() 的锁内同步调用 offload_manager.offload_req(),而 offload_req 会阻塞等待 cache_transfer_manager 结果(最长 30s)。这会直接阻塞整个调度循环,带来吞吐下降甚至卡死风险。建议将 offload I/O 与等待逻辑移出调度锁(或异步下发 offload 任务并在后续轮询/回调更新状态),避免持锁等待。
| offloaded_info = self.offload_manager._offloaded_requests.get(request.request_id) | ||
| num_blocks_for_resume = offloaded_info["num_blocks_needed"] if offloaded_info else 0 |
There was a problem hiding this comment.
这里直接访问 self.offload_manager._offloaded_requests(私有成员)来获取 offloaded_info,会让 ResourceManagerV1 与 OffloadManager 的内部实现强耦合,后续很容易破坏兼容性。建议在 OffloadManager 增加公开方法(例如 get_offloaded_info()/get_num_blocks_needed() 或 can_resume 返回需要的 blocks 信息)来替代直接读私有字段。
| offloaded_info = self.offload_manager._offloaded_requests.get(request.request_id) | |
| num_blocks_for_resume = offloaded_info["num_blocks_needed"] if offloaded_info else 0 | |
| if hasattr(self.offload_manager, "get_num_blocks_needed"): | |
| # Prefer using public API of OffloadManager to avoid relying on its internal state | |
| num_blocks_for_resume = self.offload_manager.get_num_blocks_needed(request.request_id) | |
| else: | |
| # Fallback: if no public API is available, assume no extra blocks are needed | |
| num_blocks_for_resume = 0 |
| with self.lock: | ||
| for req_id in request_ids: | ||
| request = self.requests.get(req_id) | ||
| if request is None: | ||
| llm_logger.error(f"invalid request id: {req_id} self.requests: {self.requests}") | ||
| continue | ||
| if self.offload_manager is not None: | ||
| self.offload_manager.cleanup_offloaded_request(req_id) | ||
| if request in self.waiting: |
There was a problem hiding this comment.
finish_requests() 在持有 self.lock 的情况下调用 offload_manager.cleanup_offloaded_request(),而 cleanup 内部同样会同步等待 transfer 结果(最长 30s),同样存在持锁阻塞风险;并且该调用发生在判断 request 是否仍在 waiting 之前,若后续因在 waiting 而直接 continue,会造成 snapshot 已清理但请求对象仍保留在 waiting 的不一致状态。建议:1)先处理 waiting/running 状态并从队列中移除;2)在释放锁后再执行 cleanup(或将 cleanup 异步化)。
| self.enable_offload = getattr(config, "enable_decode_offload", False) if config else False | ||
| self.min_steps = 20 | ||
| self.cpu_offloading_chunk_size = getattr(envs, "FD_CPU_OFFLOAD_CHUNK_SIZE", 8192) | ||
| self.cpu_memory_limit = getattr(envs, "FD_CPU_MEMORY_LIMIT", 50 * 1024 * 1024 * 1024) | ||
|
|
There was a problem hiding this comment.
OffloadManager 中读取了 FD_CPU_MEMORY_LIMIT/FD_CPU_OFFLOAD_CHUNK_SIZE,但当前实现未使用这些参数来限制/分块 snapshot(CPU 侧仍会把所有 block tensor 常驻到内存里)。在启用 offload 时这可能导致主机内存不可控增长直至 OOM。建议在 offload_req 前估算 snapshot 体积并做上限拒绝/淘汰策略,或在 cache_transfer_manager 侧按 chunk 分批拷贝并可落盘(结合 FD_OFFLOAD_STORAGE_PATH)。
| "--enable_decode_offload", | ||
| action="store_true", | ||
| help="enable decode KV cache offload for preempted requests", | ||
| ) | ||
| parser.add_argument( |
There was a problem hiding this comment.
worker_process.py 新增了 --enable_decode_offload 参数,但从 common_engine._start_worker_service() 目前的 worker_store_true_flag 看,该 flag 不会被 engine 启动 worker 时透传,导致 worker 侧 args.enable_decode_offload 永远为 False(配置在 engine/worker 间不一致)。建议同步在启动 worker 的参数构造处加入该开关透传,或移除此处未被使用/无法生效的 worker 侧参数以避免误导。
| "--enable_decode_offload", | |
| action="store_true", | |
| help="enable decode KV cache offload for preempted requests", | |
| ) | |
| parser.add_argument( |
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.