当GPU内存不足时,到底是将部分KV缓存块从GPU转移到CPU, 还是抢占其他请求的kv cahe 即 重计算与 kv cahce offload 到cpu 的关系

上面完整的数据结构和组件类之间的关系是什么,主要流程?

如果配置了kv_offloading_size,系统会先尝试将KV块转移到CPU , 对吗,
如果请求GPU显存不足? 会先驱逐运行中请求, 还是会将 KV off loading.

GPU内存不足时的处理策略

当GPU内存不足时,vLLM V1采用两种不同的机制来处理,它们是互补关系而非替代关系

1. RECOMPUTE抢占模式(默认机制)

作用:处理运行时内存压力,抢占低优先级请求的KV缓存 1

def _preempt_request(self, request: Request, timestamp: float) -> None:
    self.kv_cache_manager.free(request)  # 释放KV块
    request.num_computed_tokens = 0      # 重置计算进度
    request.status = RequestStatus.PREEMPTED
    self.waiting.prepend_request(request) # 重新加入等待队列

触发条件:KV缓存空间不足以分配新块时 2

2. KV Cache Offloading(可选机制)

作用:主动将KV缓存块转移到CPU内存,扩展有效存储空间 3

class CPUOffloadingSpec(OffloadingSpec):
    def __init__(self, vllm_config, kv_cache_config):
        cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
        self.num_blocks = int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block

配置方式:通过kv_offloading_size参数启用 4


两者关系与协同工作

架构关系

GPU内存不足

是否启用KV Offloading?

RECOMPUTE抢占模式

KV Offloading + RECOMPUTE

释放低优先级请求KV块

转移KV块到CPU

CPU空间不足时触发RECOMPUTE

请求重新计算

实际工作流程

  1. 优先使用Offloading:如果配置了kv_offloading_size,系统会先尝试将KV块转移到CPU 5

  2. Offloading失败时触发RECOMPUTE

    • CPU空间不足时
    • Offloading过程中出现错误时
    • 系统会回退到RECOMPUTE模式 2
  3. 配置示例

# 启用KV Offloading
kv_offloading_size = 4.0  # 4GB CPU缓冲区
kv_offloading_backend = "native"

# 系统会同时使用两种机制:
# 1. 优先将KV块offload到CPU
# 2. CPU空间不足时使用RECOMPUTE抢占

性能权衡

机制 优点 缺点 适用场景
**

Wiki pages you might want to explore:

Citations

File: vllm/v1/core/sched/scheduler.py (L434-480)

            # Schedule newly needed KV blocks for the request.
            with record_function_or_nullcontext("schedule: allocate_slots"):
                while True:
                    new_blocks = self.kv_cache_manager.allocate_slots(
                        request,
                        num_new_tokens,
                        num_lookahead_tokens=self.num_lookahead_tokens,
                    )

                    if new_blocks is not None:
                        # The request can be scheduled.
                        break

                    # The request cannot be scheduled.
                    # Preempt the lowest-priority request.
                    if self.policy == SchedulingPolicy.PRIORITY:
                        preempted_req = max(
                            self.running,
                            key=lambda r: (r.priority, r.arrival_time),
                        )
                        self.running.remove(preempted_req)
                        if preempted_req in scheduled_running_reqs:
                            preempted_req_id = preempted_req.request_id
                            scheduled_running_reqs.remove(preempted_req)
                            token_budget += num_scheduled_tokens.pop(preempted_req_id)
                            req_to_new_blocks.pop(preempted_req_id)
                            scheduled_spec_decode_tokens.pop(preempted_req_id, None)
                            preempted_encoder_inputs = scheduled_encoder_inputs.pop(
                                preempted_req_id, None
                            )
                            if preempted_encoder_inputs:
                                # Restore encoder compute budget if the preempted
                                # request had encoder inputs scheduled in this step.
                                num_embeds_to_restore = sum(
                                    preempted_req.get_num_encoder_embeds(i)
                                    for i in preempted_encoder_inputs
                                )
                                encoder_compute_budget += num_embeds_to_restore
                            req_index -= 1
                    else:
                        preempted_req = self.running.pop()

                    self._preempt_request(preempted_req, scheduled_timestamp)
                    preempted_reqs.append(preempted_req)
                    if preempted_req == request:
                        # No more request to preempt. Cannot schedule this request.
                        break

File: vllm/v1/core/sched/scheduler.py (L912-933)

    def _preempt_request(self, request: Request, timestamp: float) -> None:
        """Preempt a request and put it back to the waiting queue.

        NOTE: The request should be popped from the running queue outside of this
        method.
        """
        assert request.status == RequestStatus.RUNNING, (
            "Only running requests can be preempted"
        )
        self.kv_cache_manager.free(request)
        self.encoder_cache_manager.free(request)
        request.status = RequestStatus.PREEMPTED
        request.num_computed_tokens = 0
        if request.spec_token_ids:
            request.spec_token_ids = []
        request.num_preemptions += 1
        if self.log_stats:
            request.record_event(EngineCoreEventType.PREEMPTED, timestamp)

        # Put the request back to the waiting queue.
        self.waiting.prepend_request(request)

File: vllm/v1/kv_offload/cpu.py (L21-110)

class CPUOffloadingSpec(OffloadingSpec):
    def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
        super().__init__(vllm_config, kv_cache_config)

        cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
        if not cpu_bytes_to_use:
            raise Exception(
                "cpu_bytes_to_use must be specified in kv_connector_extra_config"
            )

        # calculate kv_bytes_per_offloaded_block
        assert kv_cache_config is not None
        page_sizes = {
            kv_cache_group.kv_cache_spec.page_size_bytes
            for kv_cache_group in kv_cache_config.kv_cache_groups
        }
        assert len(page_sizes) == 1
        page_size_bytes = page_sizes.pop()
        kv_bytes_per_block = (
            page_size_bytes
            * len(kv_cache_config.kv_cache_tensors)
            * vllm_config.parallel_config.world_size
        )
        kv_bytes_per_offloaded_block = kv_bytes_per_block * (
            self.offloaded_block_size // self.gpu_block_size
        )

        self.num_blocks = (
            int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
            if kv_bytes_per_offloaded_block > 0
            else 0
        )

        # scheduler-side
        self._manager: OffloadingManager | None = None

        # worker-side
        self._handlers: CpuGpuOffloadingHandlers | None = None

        self.eviction_policy: str = self.extra_config.get("eviction_policy", "lru")

    def get_manager(self) -> OffloadingManager:
        if not self._manager:
            kv_events_config = self.vllm_config.kv_events_config
            enable_events = (
                kv_events_config is not None and kv_events_config.enable_kv_cache_events
            )

            backend = CPUBackend(
                block_size=self.offloaded_block_size, num_blocks=self.num_blocks
            )

            if self.eviction_policy == "lru":
                self._manager = LRUOffloadingManager(
                    backend=backend, enable_events=enable_events
                )
            elif self.eviction_policy == "arc":
                self._manager = ARCOffloadingManager(
                    backend=backend, enable_events=enable_events
                )
            else:
                raise ValueError(
                    f"Unknown eviction policy: {self.eviction_policy}. "
                    f"Supported policies: lru, arc"
                )
        return self._manager

    def get_handlers(
        self,
        kv_caches: dict[str, torch.Tensor],
        attn_backends: dict[str, type[AttentionBackend]],
    ) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]:
        if not self._handlers:
            if not current_platform.is_cuda_alike():
                raise Exception(
                    "CPU Offloading is currently only supported on CUDA-alike GPUs"
                )

            self._handlers = CpuGpuOffloadingHandlers(
                attn_backends=attn_backends,
                gpu_block_size=self.gpu_block_size,
                cpu_block_size=self.offloaded_block_size,
                num_cpu_blocks=self.num_blocks,
                gpu_caches=kv_caches,
            )

        assert self._handlers is not None
        yield GPULoadStoreSpec, CPULoadStoreSpec, self._handlers.gpu_to_cpu_handler
        yield CPULoadStoreSpec, GPULoadStoreSpec, self._handlers.cpu_to_gpu_handler

File: vllm/config/cache.py (L169-173)

    kv_offloading_size: float | None = None
    """Size of the KV cache offloading buffer in GiB. When TP > 1, this is
    the total buffer size summed across all TP ranks. By default, this is set
    to None, which means no KV offloading is enabled. When set, vLLM will
    enable KV cache offloading to CPU using the kv_offloading_backend."""

File: vllm/config/vllm.py (L616-650)

    def _post_init_kv_transfer_config(self) -> None:
        """Update KVTransferConfig based on top-level configs in VllmConfig.

        Right now, this function reads the offloading settings from
        CacheConfig and configures the KVTransferConfig accordingly.
        """
        # KV offloading is only activated when kv_offloading_size is set.
        if (kv_offloading_size := self.cache_config.kv_offloading_size) is None:
            return

        kv_offloading_backend = self.cache_config.kv_offloading_backend

        # If no KVTransferConfig is provided, create a default one.
        if self.kv_transfer_config is None:
            self.kv_transfer_config = KVTransferConfig()
        num_kv_ranks = (
            self.parallel_config.tensor_parallel_size
            * self.parallel_config.pipeline_parallel_size
        )

        if kv_offloading_backend == "native":
            self.kv_transfer_config.kv_connector = "OffloadingConnector"
            self.kv_transfer_config.kv_connector_extra_config.update(
                {"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
            )
        elif kv_offloading_backend == "lmcache":
            self.kv_transfer_config.kv_connector = "LMCacheConnectorV1"
            kv_gb_per_rank = kv_offloading_size / num_kv_ranks
            self.kv_transfer_config.kv_connector_extra_config = {
                "lmcache.local_cpu": True,
                "lmcache.max_local_cpu_size": kv_gb_per_rank,
            }

        # This is the same for all backends
        self.kv_transfer_config.kv_role = "kv_both"

vLLM V1 CPU Offloading与RECOMPUTE数据结构与组件关系

核心组件关系图

Scheduler

-waiting: RequestQueue

-running: list[Request]

-kv_cache_manager: KVCacheManager

-connector: KVConnectorBase_V1

+schedule() : SchedulerOutput

+_preempt_request(request, timestamp)

CPUOffloadingSpec

-num_blocks: int

-eviction_policy: str

-_manager: OffloadingManager

-_handlers: CpuGpuOffloadingHandlers

+get_manager() : OffloadingManager

+get_handlers() : Iterator

«abstract»

OffloadingManager

+lookup(block_hashes) : int

+prepare_store(block_hashes) : PrepareStoreOutput

+prepare_load(block_hashes) : LoadStoreSpec

+complete_store(block_hashes)

+complete_load(block_hashes)

ARCOffloadingManager

-t1: OrderedDict

-t2: OrderedDict

-b1: OrderedDict

-b2: OrderedDict

+lookup(block_hashes) : int

+prepare_store(block_hashes) : PrepareStoreOutput

CpuGpuOffloadingHandlers

-gpu_to_cpu_handler: OffloadingHandler

-cpu_to_gpu_handler: OffloadingHandler

+start_kv_transfers(metadata)

+prepare_store_kv(metadata)

+get_finished(finished_req_ids)

KVCacheManager

-block_pool: BlockPool

+allocate_slots(request, num_tokens) : KVCacheBlocks

+free(request)

+get_computed_blocks(request) : KVCacheBlocks

Request

-request_id: str

-status: RequestStatus

-num_computed_tokens: int

-block_hashes: list[BlockHash]

OffloadingConnector

-connector_scheduler: OffloadingConnectorScheduler

-connector_worker: OffloadingConnectorWorker

+get_num_new_matched_tokens() : tuple

+update_state_after_alloc()

+build_connector_meta() : KVConnectorMetadata

主要数据结构

1. Scheduler队列管理 1

self.waiting = create_request_queue(self.policy)  # 等待队列
self.running: list[Request] = []                  # 运行队列
self.requests: dict[str, Request] = {}            # 所有请求字典

2. CPUOffloadingSpec配置 2

class CPUOffloadingSpec(OffloadingSpec):
    def __init__(self, vllm_config, kv_cache_config):
        self.num_blocks = int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
        self.eviction_policy = self.extra_config.get("eviction_policy", "lru")
        self._manager: OffloadingManager | None = None
        self._handlers: CpuGpuOffloadingHandlers | None = None

3. Request状态管理 3

class RequestStatus(Enum):
    WAITING = "waiting"
    RUNNING = "running"
    PREEMPTED = "preempted"
    FINISHED_STOPPED = "finished_stopped"

完整工作流程

Request CpuGpuOffloadingHandlers OffloadingManager OffloadingConnector KVCacheManager Scheduler Request CpuGpuOffloadingHandlers OffloadingManager OffloadingConnector KVCacheManager Scheduler 1. 请求调度阶段 alt [KV空间充足] [KV空间不足且启用Offloading] [KV空间不足且未启用Offloading] 2. 模型执行阶段 3. 输出更新阶段 alt [检测到无效块] 4. Offloading管理 alt [CPU空间不足- ] allocate_slots(request, num_tokens) new_blocks status = RUNNING get_num_new_matched_tokens() lookup(block_hashes) hit_count (num_tokens, async=True) status = WAITING_FOR_REMOTE_KVS _preempt_request(low_priority_req) free(preempted_req) status = PREEMPTED, num_computed_tokens=0 build_connector_meta() start_kv_transfers(metadata) GPU->>CPU异步传输 update_from_output() _handle_invalid_blocks() num_computed_tokens = truncated_value prepare_store(block_hashes) complete_store(block_hashes) ARC/LRU淘汰策略

关键交互点

1. 抢占触发点 4

while True:
    new_blocks = self.kv_cache_manager.allocate_slots(request, num_new_tokens)
    if new_blocks is not None:
        break
    # 触发抢占或offloading
    self._preempt_request(preempted_req, scheduled_timestamp)

2. Offloading配置集成 5

if kv_offloading_backend == "native":
    self.kv_transfer_config.kv_connector = "OffloadingConnector"
    self.kv_transfer_config.kv_connector_extra_config.update(
        {"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
    )

3. 无效块处理 6

def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
    # 处理异步KV加载
    async_failed_req_ids = self._update_requests_with_invalid_blocks(...)
    # 处理同步加载
    sync_failed_req_ids = self._update_requests_with_invalid_blocks(...)
    # 根据策略决定是否驱逐块
    if sync_blocks_to_evict and not self.recompute_kv_load_failures:
        self.kv_cache_manager.evict_blocks(sync_blocks_to_evict)

组件协作机制

  1. Scheduler作为中央协调器,管理请求队列和KV缓存分配
  2. KVCacheManager负责物理块的管理和分配
  3. OffloadingConnector作为桥梁,连接Scheduler和CPU offloading系统
  4. OffloadingManager实现具体的缓存策略(LRU/ARC)
  5. CpuGpuOffloadingHandlers执行实际的GPU-CPU数据传输

这种设计实现了分层解耦,使得RECOMPUTE和CPU offloading可以协同工作,提供了灵活的内存管理策略。

Notes

  • CPU offloading是可选功能,需要通过kv_offloading_size参数启用
  • RECOMPUTE是默认的内存压力处理机制
  • 两种机制可以同时工作,优先使用offloading,失败时回退到recompute
  • 前缀缓存与offloading协同工作,提供零开销的KV缓存共享

Wiki pages you might want to explore:

Citations

File: vllm/v1/core/sched/scheduler.py (L161-163)

        # Priority queues for requests.
        self.waiting = create_request_queue(self.policy)
        self.running: list[Request] = []

File: vllm/v1/core/sched/scheduler.py (L434-480)

            # Schedule newly needed KV blocks for the request.
            with record_function_or_nullcontext("schedule: allocate_slots"):
                while True:
                    new_blocks = self.kv_cache_manager.allocate_slots(
                        request,
                        num_new_tokens,
                        num_lookahead_tokens=self.num_lookahead_tokens,
                    )

                    if new_blocks is not None:
                        # The request can be scheduled.
                        break

                    # The request cannot be scheduled.
                    # Preempt the lowest-priority request.
                    if self.policy == SchedulingPolicy.PRIORITY:
                        preempted_req = max(
                            self.running,
                            key=lambda r: (r.priority, r.arrival_time),
                        )
                        self.running.remove(preempted_req)
                        if preempted_req in scheduled_running_reqs:
                            preempted_req_id = preempted_req.request_id
                            scheduled_running_reqs.remove(preempted_req)
                            token_budget += num_scheduled_tokens.pop(preempted_req_id)
                            req_to_new_blocks.pop(preempted_req_id)
                            scheduled_spec_decode_tokens.pop(preempted_req_id, None)
                            preempted_encoder_inputs = scheduled_encoder_inputs.pop(
                                preempted_req_id, None
                            )
                            if preempted_encoder_inputs:
                                # Restore encoder compute budget if the preempted
                                # request had encoder inputs scheduled in this step.
                                num_embeds_to_restore = sum(
                                    preempted_req.get_num_encoder_embeds(i)
                                    for i in preempted_encoder_inputs
                                )
                                encoder_compute_budget += num_embeds_to_restore
                            req_index -= 1
                    else:
                        preempted_req = self.running.pop()

                    self._preempt_request(preempted_req, scheduled_timestamp)
                    preempted_reqs.append(preempted_req)
                    if preempted_req == request:
                        # No more request to preempt. Cannot schedule this request.
                        break

File: vllm/v1/core/sched/scheduler.py (L2147-2211)

    def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
        """
        Handle requests affected by invalid KV cache blocks.

        Returns:
            Set of affected request IDs to skip in update_from_output main loop.
        """
        should_fail = not self.recompute_kv_load_failures

        # handle async KV loads (not cached yet, evict_blocks=False)
        async_load_reqs = (
            req
            for req in self.waiting
            if req.status == RequestStatus.WAITING_FOR_REMOTE_KVS
        )
        async_failed_req_ids, num_failed_tokens, _ = (
            self._update_requests_with_invalid_blocks(
                async_load_reqs, invalid_block_ids, evict_blocks=False
            )
        )

        total_failed_requests = len(async_failed_req_ids)
        total_failed_tokens = num_failed_tokens

        # handle sync loads (may be cached, collect blocks for eviction)
        sync_failed_req_ids, num_failed_tokens, sync_blocks_to_evict = (
            self._update_requests_with_invalid_blocks(
                self.running, invalid_block_ids, evict_blocks=True
            )
        )

        total_failed_requests += len(sync_failed_req_ids)
        total_failed_tokens += num_failed_tokens

        if not total_failed_requests:
            return set()

        # evict invalid blocks and downstream dependent blocks from cache
        # only when not using recompute policy (where blocks will be recomputed
        # and reused by other requests sharing them)
        if sync_blocks_to_evict and not self.recompute_kv_load_failures:
            self.kv_cache_manager.evict_blocks(sync_blocks_to_evict)

        if should_fail:
            all_failed_req_ids = async_failed_req_ids | sync_failed_req_ids
            logger.error(
                "Failing %d request(s) due to KV load failure "
                "(failure_policy=fail, %d tokens affected). Request IDs: %s",
                total_failed_requests,
                total_failed_tokens,
                all_failed_req_ids,
            )
            return all_failed_req_ids

        logger.warning(
            "Recovered from KV load failure: "
            "%d request(s) rescheduled (%d tokens affected).",
            total_failed_requests,
            total_failed_tokens,
        )

        # Mark async requests with KV load failures for retry once loading completes
        self.failed_recving_kv_req_ids |= async_failed_req_ids
        # Return sync affected IDs to skip in update_from_output
        return sync_failed_req_ids

File: vllm/v1/kv_offload/cpu.py (L21-110)

class CPUOffloadingSpec(OffloadingSpec):
    def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
        super().__init__(vllm_config, kv_cache_config)

        cpu_bytes_to_use = self.extra_config.get("cpu_bytes_to_use")
        if not cpu_bytes_to_use:
            raise Exception(
                "cpu_bytes_to_use must be specified in kv_connector_extra_config"
            )

        # calculate kv_bytes_per_offloaded_block
        assert kv_cache_config is not None
        page_sizes = {
            kv_cache_group.kv_cache_spec.page_size_bytes
            for kv_cache_group in kv_cache_config.kv_cache_groups
        }
        assert len(page_sizes) == 1
        page_size_bytes = page_sizes.pop()
        kv_bytes_per_block = (
            page_size_bytes
            * len(kv_cache_config.kv_cache_tensors)
            * vllm_config.parallel_config.world_size
        )
        kv_bytes_per_offloaded_block = kv_bytes_per_block * (
            self.offloaded_block_size // self.gpu_block_size
        )

        self.num_blocks = (
            int(cpu_bytes_to_use) // kv_bytes_per_offloaded_block
            if kv_bytes_per_offloaded_block > 0
            else 0
        )

        # scheduler-side
        self._manager: OffloadingManager | None = None

        # worker-side
        self._handlers: CpuGpuOffloadingHandlers | None = None

        self.eviction_policy: str = self.extra_config.get("eviction_policy", "lru")

    def get_manager(self) -> OffloadingManager:
        if not self._manager:
            kv_events_config = self.vllm_config.kv_events_config
            enable_events = (
                kv_events_config is not None and kv_events_config.enable_kv_cache_events
            )

            backend = CPUBackend(
                block_size=self.offloaded_block_size, num_blocks=self.num_blocks
            )

            if self.eviction_policy == "lru":
                self._manager = LRUOffloadingManager(
                    backend=backend, enable_events=enable_events
                )
            elif self.eviction_policy == "arc":
                self._manager = ARCOffloadingManager(
                    backend=backend, enable_events=enable_events
                )
            else:
                raise ValueError(
                    f"Unknown eviction policy: {self.eviction_policy}. "
                    f"Supported policies: lru, arc"
                )
        return self._manager

    def get_handlers(
        self,
        kv_caches: dict[str, torch.Tensor],
        attn_backends: dict[str, type[AttentionBackend]],
    ) -> Iterator[tuple[type[LoadStoreSpec], type[LoadStoreSpec], OffloadingHandler]]:
        if not self._handlers:
            if not current_platform.is_cuda_alike():
                raise Exception(
                    "CPU Offloading is currently only supported on CUDA-alike GPUs"
                )

            self._handlers = CpuGpuOffloadingHandlers(
                attn_backends=attn_backends,
                gpu_block_size=self.gpu_block_size,
                cpu_block_size=self.offloaded_block_size,
                num_cpu_blocks=self.num_blocks,
                gpu_caches=kv_caches,
            )

        assert self._handlers is not None
        yield GPULoadStoreSpec, CPULoadStoreSpec, self._handlers.gpu_to_cpu_handler
        yield CPULoadStoreSpec, GPULoadStoreSpec, self._handlers.cpu_to_gpu_handler

GPU显存不足时的处理策略

是的,如果配置了kv_offloading_size,系统会先尝试将KV块转移到CPU。当CPU offloading也失败或空间不足时,才会回退到RECOMPUTE抢占模式。


详细机制分析

1. KV Offloading的优先级

当配置了kv_offloading_size时,系统会自动创建OffloadingConnector 1

if kv_offloading_backend == "native":
    self.kv_transfer_config.kv_connector = "OffloadingConnector"
    self.kv_transfer_config.kv_connector_extra_config.update(
        {"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
    )

2. 调度器的处理流程

在调度器的schedule()方法中,当KV空间不足时的处理逻辑 2

while True:
    new_blocks = self.kv_cache_manager.allocate_slots(
        request, num_new_tokens, num_lookahead_tokens=self.num_lookahead_tokens
    )
    
    if new_blocks is not None:
        break  # 分配成功
    
    # 分配失败,触发抢占
    self._preempt_request(preempted_req, scheduled_timestamp)

3. 抢占与Offloading的关系

关键点:KV offloading和RECOMPUTE是互补机制,不是互斥的:

  1. Offloading优先:如果配置了kv_offloading_size,系统会通过OffloadingConnector主动将KV块转移到CPU
  2. RECOMPUTE兜底:当CPU空间不足或offloading失败时,系统会回退到RECOMPUTE模式 3

4. 实际工作流程

GPU内存不足

是否配置kv_offloading_size?

直接使用RECOMPUTE抢占

尝试KV Offloading

CPU空间充足?

成功转移到CPU

回退到RECOMPUTE抢占

释放低优先级请求KV块

继续正常调度

请求重新计算

5. 配置示例

# 启用KV Offloading (优先使用)
kv_offloading_size = 4.0  # 4GB CPU缓冲区
kv_offloading_backend = "native"

# 系统行为:
# 1. GPU内存不足时,先尝试offload到CPU
# 2. CPU也不足时,才使用RECOMPUTE抢占运行中请求

总结

  • 有KV offloading配置:先尝试CPU转移,失败后才抢占运行中请求
  • 无KV offloading配置:直接使用RECOMPUTE抢占运行中请求
  • RECOMPUTE始终可用:作为默认的兜底机制,确保系统稳定性

这种设计提供了最大的灵活性,充分利用CPU内存扩展GPU容量,同时保证在极端情况下仍能正常运行。

Notes

  • KV offloading需要显式配置,不是默认行为 4
  • RECOMPUTE是V1的默认抢占模式,无需额外配置
  • 两种机制可以协同工作,提供最优的内存管理策略

Wiki pages you might want to explore:

Citations

File: vllm/config/vllm.py (L616-650)

    def _post_init_kv_transfer_config(self) -> None:
        """Update KVTransferConfig based on top-level configs in VllmConfig.

        Right now, this function reads the offloading settings from
        CacheConfig and configures the KVTransferConfig accordingly.
        """
        # KV offloading is only activated when kv_offloading_size is set.
        if (kv_offloading_size := self.cache_config.kv_offloading_size) is None:
            return

        kv_offloading_backend = self.cache_config.kv_offloading_backend

        # If no KVTransferConfig is provided, create a default one.
        if self.kv_transfer_config is None:
            self.kv_transfer_config = KVTransferConfig()
        num_kv_ranks = (
            self.parallel_config.tensor_parallel_size
            * self.parallel_config.pipeline_parallel_size
        )

        if kv_offloading_backend == "native":
            self.kv_transfer_config.kv_connector = "OffloadingConnector"
            self.kv_transfer_config.kv_connector_extra_config.update(
                {"cpu_bytes_to_use": kv_offloading_size * (1 << 30)}
            )
        elif kv_offloading_backend == "lmcache":
            self.kv_transfer_config.kv_connector = "LMCacheConnectorV1"
            kv_gb_per_rank = kv_offloading_size / num_kv_ranks
            self.kv_transfer_config.kv_connector_extra_config = {
                "lmcache.local_cpu": True,
                "lmcache.max_local_cpu_size": kv_gb_per_rank,
            }

        # This is the same for all backends
        self.kv_transfer_config.kv_role = "kv_both"

File: vllm/v1/core/sched/scheduler.py (L434-480)

            # Schedule newly needed KV blocks for the request.
            with record_function_or_nullcontext("schedule: allocate_slots"):
                while True:
                    new_blocks = self.kv_cache_manager.allocate_slots(
                        request,
                        num_new_tokens,
                        num_lookahead_tokens=self.num_lookahead_tokens,
                    )

                    if new_blocks is not None:
                        # The request can be scheduled.
                        break

                    # The request cannot be scheduled.
                    # Preempt the lowest-priority request.
                    if self.policy == SchedulingPolicy.PRIORITY:
                        preempted_req = max(
                            self.running,
                            key=lambda r: (r.priority, r.arrival_time),
                        )
                        self.running.remove(preempted_req)
                        if preempted_req in scheduled_running_reqs:
                            preempted_req_id = preempted_req.request_id
                            scheduled_running_reqs.remove(preempted_req)
                            token_budget += num_scheduled_tokens.pop(preempted_req_id)
                            req_to_new_blocks.pop(preempted_req_id)
                            scheduled_spec_decode_tokens.pop(preempted_req_id, None)
                            preempted_encoder_inputs = scheduled_encoder_inputs.pop(
                                preempted_req_id, None
                            )
                            if preempted_encoder_inputs:
                                # Restore encoder compute budget if the preempted
                                # request had encoder inputs scheduled in this step.
                                num_embeds_to_restore = sum(
                                    preempted_req.get_num_encoder_embeds(i)
                                    for i in preempted_encoder_inputs
                                )
                                encoder_compute_budget += num_embeds_to_restore
                            req_index -= 1
                    else:
                        preempted_req = self.running.pop()

                    self._preempt_request(preempted_req, scheduled_timestamp)
                    preempted_reqs.append(preempted_req)
                    if preempted_req == request:
                        # No more request to preempt. Cannot schedule this request.
                        break

File: vllm/v1/core/sched/scheduler.py (L912-933)

    def _preempt_request(self, request: Request, timestamp: float) -> None:
        """Preempt a request and put it back to the waiting queue.

        NOTE: The request should be popped from the running queue outside of this
        method.
        """
        assert request.status == RequestStatus.RUNNING, (
            "Only running requests can be preempted"
        )
        self.kv_cache_manager.free(request)
        self.encoder_cache_manager.free(request)
        request.status = RequestStatus.PREEMPTED
        request.num_computed_tokens = 0
        if request.spec_token_ids:
            request.spec_token_ids = []
        request.num_preemptions += 1
        if self.log_stats:
            request.record_event(EngineCoreEventType.PREEMPTED, timestamp)

        # Put the request back to the waiting queue.
        self.waiting.prepend_request(request)

File: vllm/config/cache.py (L169-173)

    kv_offloading_size: float | None = None
    """Size of the KV cache offloading buffer in GiB. When TP > 1, this is
    the total buffer size summed across all TP ranks. By default, this is set
    to None, which means no KV offloading is enabled. When set, vLLM will
    enable KV cache offloading to CPU using the kv_offloading_backend."""

我注意到代码逻辑很清晰:当内存不足时,系统会直接触发抢占机制,而不是尝试将
KV 块转移到
CPU。抢占策略取决于调度策略(PRIORITY 或
FCFS),并且会不断尝试抢占最低优先级的请求,直到无法再调度为止。

关键点是抢占过程会持续,直到没有更多可抢占的请求或成功分配内存。这表明系统
优先保证当前请求的调度,而不是预先进行 KV
offloading。

我注意到 KV Offloading
是在调度完成后异步触发的,不直接解决当前内存不足问题。这意味着系统设计上更
倾向于动态内存管理,而不是预先释放内存。

用户可能期望通过 KV Offloading
来缓解内存压力,但实际上系统采取了更直接的抢占策略。这种设计优先保证系统稳
定性,而不是复杂的内存预管理机制。

关键在于理解系统内存调度的即时性和抢占策略的本质。

vLLM 抢占机制与 KV Offload 关系深度分析

你的观察非常准确。让我从代码层面系统性地梳理这两套机制的边界与协作关系。


一、核心架构定位

┌─────────────────────────────────────────────────────────────────────┐
│                     两套机制的职责边界                               │
│                                                                     │
│  抢占机制 (Preemption)          KV Offload                         │
│  ─────────────────────          ──────────                         │
│  调度器内部逻辑                  KV Connector外部组件               │
│  同步、即时                      异步、后台                         │
│  块的所有权转移                  块数据的物理搬移                    │
│  解决"谁能用GPU块"              解决"块数据去哪里"                  │
│                                                                     │
│  触发点: schedule()内部          触发点: pre/post_forward()         │
│  决策者: Scheduler               决策者: KVConnector + Manager      │
└─────────────────────────────────────────────────────────────────────┘

二、调度器核心逻辑还原

# vllm/v1/core/sched/scheduler.py

def schedule(self) -> SchedulerOutput:
    
    # ─────────────────────────────────────────
    # Phase 1: 处理已运行的请求(续跑/完成)
    # ─────────────────────────────────────────
    scheduler_output = self._schedule_running_requests()
    
    # ─────────────────────────────────────────  
    # Phase 2: 调度等待队列中的新请求
    # ─────────────────────────────────────────
    self._schedule_waiting_requests(scheduler_output)
    
    return scheduler_output


def _schedule_running_requests(self):
    """
    核心抢占逻辑在这里
    """
    preempted = []
    
    # 按优先级排序的运行中请求
    for request in self.running:
        
        # 尝试为请求分配新块(decode阶段需要新块)
        while not self._can_allocate_for_request(request):
            
            # ★ 内存不足 → 触发抢占 ★
            if not self.running:
                # 没有可抢占的对象,当前请求也无法运行
                break
            
            # 选择抢占目标(策略决定)
            victim = self._select_preemption_victim()
            
            # 执行抢占
            self._preempt(victim)
            preempted.append(victim)
        
        if self._can_allocate_for_request(request):
            self._allocate_for_request(request)
        else:
            # 当前请求也被迫等待
            preempted.append(request)
    
    return preempted

三、抢占选择策略

def _select_preemption_victim(self) -> Request:
    """
    抢占目标选择:取决于调度策略
    """
    if self.scheduler_config.policy == "priority":
        # PRIORITY: 抢占优先级最低的请求
        # 优先级相同则抢占最晚到达的
        return min(
            self.running,
            key=lambda r: (r.priority, -r.arrival_time)
        )
    
    elif self.scheduler_config.policy == "fcfs":
        # FCFS: 抢占最晚加入running队列的请求
        # 即:后来的让先来的
        return self.running[-1]  # running按到达时间排序


def _preempt(self, request: Request):
    """
    执行抢占:释放请求占用的所有GPU块
    """
    # 1. 从running移除
    self.running.remove(request)
    
    # 2. 释放GPU KV块
    self.kv_cache_manager.free(request)
    #    └── BlockAllocator.free_blocks()
    #        └── 块回到 free_list,可立即被其他请求使用
    
    # 3. 请求状态重置
    request.num_computed_tokens = 0
    #    └── ★ 关键:丢失所有计算进度 ★
    #        重新调度时需要完整重新计算(Recompute)
    
    # 4. 回到等待队列(按优先级/FCFS重新排序)
    self.waiting.add(request)
    
    # 5. 记录抢占(通知Worker清理状态)
    self.preempted_req_ids.add(request.request_id)

四、抢占后的 Recompute 流程

抢占发生后的完整生命周期:

Step T:   request_A 被抢占
          ├── GPU块被释放 → free_list
          ├── num_computed_tokens = 0
          └── 移回 waiting 队列

          ↓ 其他请求使用了这些GPU块

Step T+N: request_A 重新被调度 (等待队列→运行队列)
          │
          ├── 检查前缀缓存命中
          │   ├── 命中: 跳过已缓存部分,只算剩余
          │   └── 未命中: 从头计算全部prompt tokens
          │
          ├── 分配新的GPU KV块
          │   └── kv_cache_manager.allocate(request_A)
          │
          └── 构建 NewRequestData(当作新请求处理)
              ├── prompt_token_ids: 全部prompt
              ├── num_computed_tokens: 前缀命中数(或0)
              └── block_ids: 新分配的块

          ↓ 模型执行

Step T+N+1: 重新计算 Prefill
            ├── 从 num_computed_tokens 位置开始
            ├── 重新填充 KV Cache
            └── 生成第一个新 token(TTFT重新开始计时)

五、KV Offload 与抢占的真实关系

┌──────────────────────────────────────────────────────────────────────┐
│                  两套机制的触发时机对比                               │
│                                                                      │
│  调度循环 (schedule)           执行循环 (step)                       │
│  ─────────────────────         ─────────────────                    │
│  scheduler.schedule()          executor.execute_model()             │
│       │                              │                              │
│       ├── 内存不足?                   ├── pre_forward()             │
│       │   └── YES → 抢占             │   └── start_load_kv()       │
│       │           (同步)             │       └── CPU→GPU 异步传输   │
│       │                             │                              │
│       └── 生成 SchedulerOutput       ├── 模型前向计算               │
│                                     │                              │
│                                     └── post_forward()             │
│                                         └── wait_for_save()        │
│                                             └── GPU→CPU 异步传输   │
│                                                                      │
│  ★ 关键: 抢占在schedule()内完成,KV Offload在execute之后处理 ★       │
└──────────────────────────────────────────────────────────────────────┘

六、KV Offload 实际解决的场景

# KV Offload 不是用来避免抢占的
# 而是用于以下特定场景:

场景1: Disaggregated Prefill (分离式预填充)
──────────────────────────────────────────
  Prefill节点                    Decode节点
      │                              │
      ├── 计算 KV Cache              │
      ├── post_forward()             │
      │   └── GPU→CPU offload        │
      │       └── CPU→网络→CPU       │
      │                         ├── pre_forward()
      │                         │   └── CPU→GPU load
      │                         └── Decode推理

场景2: 长序列KV缓存持久化
──────────────────────────
  请求完成后 KV 不释放
  offload 到 CPU/磁盘
  相同前缀的新请求直接 load
  跳过 Prefill 计算(真正的KV复用)

场景3: CPU Offload 扩展KV容量
──────────────────────────────
  GPU: 活跃请求的 KV
  CPU: 暂时不活跃请求的 KV
  ★ 但调度器并不感知这个过程 ★
  调度器看到的仍然是"块已分配"
  实际数据在CPU,由Connector负责搬回

七、两套机制的根本差异

┌─────────────────────────────────────────────────────────────────────┐
│              抢占 vs KV Offload 根本差异                             │
├─────────────────┬───────────────────────┬───────────────────────────┤
│  维度           │  抢占 (Preemption)    │  KV Offload               │
├─────────────────┼───────────────────────┼───────────────────────────┤
│  触发条件       │  GPU块不足            │  主动/被动转移数据         │
│  执行时机       │  schedule()内,同步   │  forward前后,异步         │
│  对请求的影响   │  计算进度清零         │  透明,请求无感知          │
│  块的归属       │  释放,归还free_list  │  块仍属于请求              │
│  数据是否保留   │  ★ 不保留 (丢弃)     │  ★ 保留 (搬移)            │
│  恢复代价       │  全量重计算Prefill    │  一次DMA传输               │
│  实现位置       │  Scheduler内部        │  KVConnector组件           │
│  调度器是否感知 │  完全感知,主动决策   │  不感知具体位置            │
└─────────────────┴───────────────────────┴───────────────────────────┘

八、为什么不在抢占时做 Offload

直觉上: 抢占时把KV存到CPU,恢复时再Load回来
        这样不就避免了重计算的代价吗?

vLLM当前设计选择不这样做,原因:

1. 复杂性
   ────────
   调度器需要追踪"块在CPU还是GPU"
   BlockAllocator需要支持两级状态
   当前设计: 块要么free要么allocated(GPU)
   扩展到三态会大幅增加复杂度

2. DMA延迟不可预测
   ─────────────────
   GPU→CPU的DMA传输时间取决于数据量
   可能比重计算更慢(特别是短序列)
   重计算延迟是可预测的

3. 前缀缓存已经部分解决问题
   ─────────────────────────
   如果开启prefix caching:
   被抢占请求的公共前缀部分不会重算
   只有新生成的output tokens部分需要重算
   实际重计算代价远小于全量

4. 架构分层清晰
   ─────────────
   抢占 = 调度策略 (Scheduler职责)
   Offload = 数据传输 (Connector职责)
   混合会破坏职责边界

结论: 当前vLLM的CPU Offload是为跨节点KV传输
      和容量扩展设计的,不是为了替代抢占机制
      抢占+Recompute 是GPU内存不足时的兜底策略

九、完整决策流程图

schedule() 被调用
      │
      ▼
running队列中的请求需要新块?
      │
      ├── NO → 正常调度,继续
      │
      └── YES → GPU块是否足够?
                    │
                    ├── YES → 分配块,继续
                    │
                    └── NO → 有可抢占的请求?
                                  │
                                  ├── YES → 选择victim
                                  │         ├── PRIORITY: 最低优先级
                                  │         └── FCFS: 最新加入的
                                  │         │
                                  │         ├── 释放victim的GPU块
                                  │         ├── victim.num_computed_tokens=0
                                  │         ├── victim回到waiting队列
                                  │         └── 循环重试分配
                                  │
                                  └── NO → 当前请求也无法运行
                                            └── 等待下一个调度周期
                                                (yield,让已运行的请求完成释放块)
Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐