内部负载均衡 (Internal Load Balancing)

内部负载均衡 (Internal Load Balancing) 是 vLLM 数据并行部署的一种模式,其中系统在内部自动处理请求在多个数据并行引擎之间的分发,对外暴露单一的 API 端点。 1

工作原理

在内部负载均衡模式下,vLLM 通过以下组件实现负载均衡:

  1. DP Coordinator 进程:负责收集各 DP 引擎的统计信息并发布给前端进程用于负载均衡决策 2

  2. 统计信息收集:每个 DP 引擎收集运行中和等待中的请求数量,通过 ZMQ 套接字发布给协调器 3

  3. 负载均衡决策:基于各引擎的队列状态和 KV 缓存状态进行智能请求分发 4

配置方式

内部负载均衡通过以下配置启用:

vllm serve $MODEL --data-parallel-size 4 --tensor-parallel-size 2

系统会根据配置自动判断是否使用内部负载均衡: 5

internal_dp_balancing = (
    self.has_coordinator
    and not vllm_config.parallel_config.data_parallel_external_lb
)

适用场景

内部负载均衡适用于:

  • 单节点或多节点数据并行部署
  • 需要简化部署架构的场景
  • 中小规模的并行部署(API 服务器进程可能成为瓶颈时)

对于大规模部署,可以考虑使用 --api-server-count 进行 API 服务器扩展,或者切换到外部负载均衡模式 6

Notes

内部负载均衡与外部负载均衡(External Load Balancing)和混合负载均衡(Hybrid Load Balancing)是 vLLM 提供的三种数据并行负载均衡模式。内部模式提供最简单的部署体验,而外部模式更适合大规模生产环境中的复杂负载均衡需求。

Wiki pages you might want to explore:

Citations

File: docs/serving/data_parallel_deployment.md (L17-18)

In all cases, it is beneficial to load-balance requests between DP ranks. For online deployments, this balancing can be optimized by taking into account the state of each DP engine - in particular its currently scheduled and waiting (queued) requests, and KV cache state. Each DP engine has an independent KV cache, and the benefit of prefix caching can be maximized by directing prompts intelligently.

File: docs/serving/data_parallel_deployment.md (L23-27)

## Internal Load Balancing

vLLM supports "self-contained" data parallel deployments that expose a single API endpoint.

It can be configured by simply including e.g. `--data-parallel-size=4` in the vllm serve command line arguments. This will require 4 GPUs. It can be combined with tensor parallel, for example `--data-parallel-size=4 --tensor-parallel-size=2`, which would require 8 GPUs. When sizing DP deployments, remember that `--max-num-seqs` applies per DP rank.

File: docs/serving/data_parallel_deployment.md (L75-78)

Currently, the internal DP load balancing is done within the API server process(es) and is based on the running and waiting queues in each of the engines. This could be made more sophisticated in future by incorporating KV cache aware logic.

When deploying large DP sizes using this method, the API server process can become a bottleneck. In this case, the orthogonal `--api-server-count` command line option can be used to scale this out (for example `--api-server-count=4`). This is transparent to users - a single HTTP endpoint / port is still exposed. Note that this API server scale-out is "internal" and still confined to the "head" node.

File: vllm/v1/engine/coordinator.py (L22-31)

class DPCoordinator:
    """Coordinator process used for data-parallel deployments (DP>1).

    Intermediates between multiple DP engine rank processes and one or more
    front-end API server processes.

    * Collects stats from each DP engine (currently just waiting and running
      queue lengths), and publishes these to all front-ends for use in
      load-balancing decisions.

File: vllm/v1/engine/core_client.py (L1093-1082)


File: vllm/v1/engine/core.py (L753-759)

            internal_dp_balancing = (
                self.has_coordinator
                and not vllm_config.parallel_config.data_parallel_external_lb
            )
            # Only publish request queue stats to coordinator for "internal"
            # and "hybrid" LB modes.
            self.publish_dp_lb_stats = internal_dp_balancing

对外暴露单一的 API 端点, 什么是单一的API断点, API端点是什么,代码怎么实现的

单一 API 端点的含义

单一 API 端点指的是 vLLM 在内部负载均衡模式下,虽然底层可能有多个数据并行引擎和多个 API 服务器进程,但对客户端只暴露一个统一的 HTTP 地址和端口(如 http://localhost:8000),客户端只需连接这一个端点即可使用所有服务。 1

API 端点的概念

API 端点是 HTTP 服务器监听的网络地址,通常由 IP 地址和端口组成。在 vLLM 中,API 端点提供 OpenAI 兼容的 REST API 接口,包括:

  • /v1/completions - 文本补全
  • /v1/chat/completions - 对话补全
  • /v1/models - 模型列表
  • /health - 健康检查 2

代码实现

1. 服务器启动和监听

API 服务器通过 FastAPI 创建并监听指定端口:

# vllm/entrypoints/openai/api_server.py
app = FastAPI()
# 服务器启动时会监听配置的端口(默认8000)

2. 多 API 服务器进程管理

当使用 --api-server-count > 1 时,系统会启动多个 API 服务器进程,但它们共享同一个监听端口:

# vllm/entrypoints/cli/serve.py:218-287
def run_multi_api_server(args: argparse.Namespace):
    num_api_servers: int = args.api_server_count
    
    # 设置共享的监听地址和socket
    listen_address, sock = setup_server(args)
    
    # 启动多个API服务器进程管理器
    api_server_manager = APIServerProcessManager(
        target_server_fn=run_api_server_worker_proc,
        listen_address=listen_address,
        sock=sock,  # 共享同一个socket
        num_servers=num_api_servers,
        # ...
    )

3. 负载均衡实现

多个 API 服务器进程通过共享 socket 接收请求,操作系统内核自动进行负载均衡。每个 API 服务器进程连接到所有数据并行引擎:

# vllm/v1/engine/utils.py:810-820
addresses = EngineZmqAddresses(
    inputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)  # 每个API服务器都有输入地址
    ],
    outputs=[
        get_engine_client_zmq_addr(client_local_only, host) 
        for _ in range(num_api_servers)  # 每个API服务器都有输出地址
    ],
)

4. 进程架构

典型的内部负载均衡部署架构: 3

进程类型 数量 作用
API 服务器 A (默认等于 DP 大小) 处理 HTTP 请求,暴露单一端点
引擎核心 DP 调度和 KV 缓存管理
GPU 工作进程 N (= DP × TP) 执行模型前向传播
DP 协调器 1 (如果 DP > 1) 跨 DP 秩的负载均衡

Notes

单一 API 端点的优势是简化客户端配置,客户端无需知道后端的复杂架构。即使内部有多个 API 服务器进程和数据并行引擎,客户端仍然只需要连接一个地址。这种设计对用户透明,提供了简单的部署体验。 4

Wiki pages you might want to explore:

Citations

File: docs/serving/data_parallel_deployment.md (L23-27)

## Internal Load Balancing

vLLM supports "self-contained" data parallel deployments that expose a single API endpoint.

It can be configured by simply including e.g. `--data-parallel-size=4` in the vllm serve command line arguments. This will require 4 GPUs. It can be combined with tensor parallel, for example `--data-parallel-size=4 --tensor-parallel-size=2`, which would require 8 GPUs. When sizing DP deployments, remember that `--max-num-seqs` applies per DP rank.

File: docs/serving/data_parallel_deployment.md (L75-78)

Currently, the internal DP load balancing is done within the API server process(es) and is based on the running and waiting queues in each of the engines. This could be made more sophisticated in future by incorporating KV cache aware logic.

When deploying large DP sizes using this method, the API server process can become a bottleneck. In this case, the orthogonal `--api-server-count` command line option can be used to scale this out (for example `--api-server-count=4`). This is transparent to users - a single HTTP endpoint / port is still exposed. Note that this API server scale-out is "internal" and still confined to the "head" node.

File: docs/serving/openai_compatible_server.md (L43-60)

## Supported APIs

We currently support the following OpenAI APIs:

- [Completions API](#completions-api) (`/v1/completions`)
    - Only applicable to [text generation models](../models/generative_models.md).
    - *Note: `suffix` parameter is not supported.*
- [Responses API](#responses-api) (`/v1/responses`)
    - Only applicable to [text generation models](../models/generative_models.md).
- [Chat Completions API](#chat-api) (`/v1/chat/completions`)
    - Only applicable to [text generation models](../models/generative_models.md) with a [chat template](../serving/openai_compatible_server.md#chat-template).
    - *Note: `user` parameter is ignored.*
    - *Note:* Setting the `parallel_tool_calls` parameter to `false` ensures vLLM only returns zero or one tool call per request. Setting it to `true` (the default) allows returning more than one tool call per request. There is no guarantee more than one tool call will be returned if this is set to `true`, as that behavior is model dependent and not all models are designed to support parallel tool calls.
- [Embeddings API](#embeddings-api) (`/v1/embeddings`)
    - Only applicable to [embedding models](../models/pooling_models.md).
- [Transcriptions API](#transcriptions-api) (`/v1/audio/transcriptions`)
    - Only applicable to [Automatic Speech Recognition (ASR) models](../models/supported_models.md#transcription).
- [Translation API](#translations-api) (`/v1/audio/translations`)

File: docs/design/arch_overview.md (L117-127)

### Process Count Summary

For a deployment with `N` GPUs, `TP` tensor parallel size, `DP` data parallel size, and `A` API server count:

| Process Type | Count | Notes |
|---|---|---|
| API Server | `A` (default `DP`) | Handles HTTP requests and input processing |
| Engine Core | `DP` (default 1) | Scheduler and KV cache management |
| GPU Worker | `N` (= `DP x TP`) | One per GPU, executes model forward passes |
| DP Coordinator | 1 if `DP > 1`, else 0 | Load balancing across DP ranks |
| **Total** | **`A + DP + N` (+ 1 if DP > 1)** | |

在这里插入图片描述
好的,我们来详细解释这张 vLLM 数据并行部署架构图。这张图展示了一个跨两个节点的、采用内部负载均衡的数据并行部署方案。

核心组件概览

这张图包含了几个关键的组件,分布在两个物理节点(Node 0 和 Node 1)上:

  • Launcher: 启动器进程,负责在每个节点上启动 vLLM 服务。
  • API Server (AsyncLLM): 前端进程,接收外部 HTTP 请求,并与后端的 Engine Core 通信。这里显示了多个 API Server 实例,表示 API 服务本身也可以横向扩展。
  • Engine Core: 后端计算核心,负责实际的模型推理。每个 Engine Core 都是一个独立的数据并行(DP)单元。
  • DP Coordinator: 数据并行协调器,一个独立的进程,用于在所有 Engine Core 和 API Server 之间同步状态和协调工作。
  • Bind to shared socket: 一个操作系统级别的机制,允许多个 API Server 进程监听同一个网络端口。

工作流程和交互详解

我们按照数据流和控制流的路径来解读这张图:

1. 启动流程 (Launcher)

  • Node 0Node 1 上,Launcher 进程(即用户执行的 vllm serve ... 命令)被分别启动。
  • Node 0 的 Launcher(主节点)负责启动:
    • DP Coordinator 进程。
    • 多个 API Server 进程。
    • Node 0 上的 Engine Core 进程(在此图中是 2 个)。
  • Node 1 的 Launcher 负责启动:
    • Node 1 上的 Engine Core 进程(在此图中也是 2 个)。它以 --headless 模式运行,意味着不启动 API Server。
  • 虚线箭头从 Launcher 指向其他组件,表示这种“创建”或“启动”关系。

2. 外部 HTTP 请求处理

  • 外部的 HTTP 请求到达 Node 0
  • Bind to shared socket 机制(通常是 Linux 内核的 SO_REUSEPORT 特性)允许多个 API Server 进程绑定到同一个端口(例如 8000)。
  • Kernel(操作系统内核)会将传入的 HTTP 连接以轮询(Round-Robin)的方式分发给其中一个 API Server 进程,实现了第一层的负载均衡。

3. API Server 与 Engine Core 的交互 (核心数据流)

  • 当一个 API Server 收到一个 HTTP 请求后,它内部的 AsyncLLM 实例需要将这个请求发送给一个 Engine Core 进行处理。
  • 负载均衡决策: 在发送之前,API Server 需要决定将请求发往哪个 Engine Core。这个决策是基于从 DP Coordinator 获取的信息(详见第 4 点)。
  • 通信方式: API ServerEngine Core 之间通过 ZMQ 套接字进行通信。图中的实线黑箭头表示了这种通信关系。
  • 全连接网络: 注意,每个 API Server 都可以与所有Engine Core(包括 Node 0 和 Node 1 上的)直接通信。这是一个全连接的网络拓扑,为动态负载均衡提供了基础。一个请求可以被 Node 0 上的 API Server 接收,然后被发送到 Node 1 上的 Engine Core 去处理。

4. DP Coordinator 的协调作用 (核心控制流)

DP Coordinator 是整个数据并行架构的“大脑”,负责两件大事,图中用虚线箭头表示:

  • A. 发布请求计数用于负载均衡 (publish request counts for LB)

    • 每个 Engine Core 会定期向 DP Coordinator 上报 (publish) 自己的负载状态,主要是正在运行 (running)等待中 (waiting) 的请求数量。
    • DP Coordinator 收集并聚合所有 Engine Core 的状态。
    • 然后,DP Coordinator 将这个全局的负载视图广播 (publish) 给所有订阅了它的 API Server
    • API Server 收到这些最新的负载计数后,就可以做出智能的负载均衡决策,例如,将新请求发送到当前负载最低(等待队列最短)的 Engine Core
  • B. 协调请求波次 (Co-ordinate starting request waves)

    • 这主要用于 MoE 模型。为了保证所有 DP rank 的同步,即使某些 Engine Core 没有请求,也需要执行“虚拟”前向传播。
    • DP Coordinator 维护一个全局的“运行/暂停”状态和“请求波次 (request wave)”编号。
    • 当所有 Engine Core 都空闲时,DP Coordinator 会将全局状态标记为“暂停”。
    • 当一个处于“暂停”状态的 API Server 收到新请求时,它会通知 DP Coordinator
    • DP Coordinator 随即向所有Engine Core 广播一个“开始新波次 (START_DP_WAVE)”的命令,唤醒所有 Engine Core,使其进入同步运行状态。
    • Engine Core 也会在特定情况下(例如收到过时的请求)通知 DP Coordinator,以处理竞态条件。

总结

这张架构图描绘了一个高度可扩展和健壮的 vLLM 部署方案:

  • 横向扩展:
    • API 服务层: 通过 Bind to shared socket 和增加 API Server 进程数量(--api-server-count)来实现。
    • 计算层: 通过增加 Engine Core 数量(--data-parallel-size)和增加节点来实现。
  • 解耦: 前端(API Server)和后端(Engine Core)是完全解耦的独立进程,可以通过网络进行灵活部署。
  • 智能负载均衡: DP Coordinator 作为中央协调者,收集全局状态并指导 API Server 进行负载均衡,从而优化资源利用率和吞- 吐量。
  • 同步机制: 对于需要同步的 MoE 模型,DP Coordinator 通过“请求波次”机制确保所有 DP rank 的正确执行。

简而言之,这是一个典型的“控制器-工作者” (Controller-Worker) 模式,其中 API ServerEngine Core 是工作者,而 DP Coordinator 则是控制器,通过 ZMQ 这个高性能的消息总线将它们联系在一起。

内部负载均衡实现详解

关键数据结构

1. EngineState
class EngineState:
    def __init__(self):
        self.request_counts = [0, 0]  # [waiting, running]
```[1](#2-0) 

#### 2. EngineZmqAddresses
管理ZMQ通信地址,包含输入、输出和协调器地址:
```python
addresses = EngineZmqAddresses(
    inputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
    outputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
)
```[2](#2-1) 

#### 3. 负载均衡统计
每个引擎维护waiting和running队列长度,用于负载均衡决策:
```python
waiting, running = current_counts[idx]
score = waiting * 4 + running
```[3](#2-2) 

### 涉及的进程架构

| 进程类型 | 数量 | 作用 |
|---------|------|------|
| API Server | `A` (默认等于DP大小) | 处理HTTP请求,负载均衡决策 |
| Engine Core | `DP` | 调度和KV缓存管理 |
| GPU Worker | `N` (= `DP × TP`) | 执行模型前向传播 |
| DP Coordinator | 1 (如果DP > 1) | 收集统计信息,协调请求波次 | [4](#2-3) 

### 完整流程

#### 1. 初始化阶段
```mermaid
sequenceDiagram
    participant Main
    participant DPCoordinator
    participant EngineCore
    participant APIServer
    
    Main->>DPCoordinator: 启动协调器进程
    Main->>EngineCore: 启动DP个引擎核心
    Main->>APIServer: 启动A个API服务器
    DPCoordinator->>APIServer: 发布统计信息地址
    APIServer->>EngineCore: 建立ZMQ连接
2. 统计信息收集

每个引擎核心定期发布队列状态:

internal_dp_balancing = (
    self.has_coordinator
    and not vllm_config.parallel_config.data_parallel_external_lb
)
self.publish_dp_lb_stats = internal_dp_balancing
```[5](#2-4) 

#### 3. 负载均衡决策
API服务器使用`DPLBAsyncMPClient`进行负载均衡:
```python
def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
    if (eng_index := request.data_parallel_rank) is None:
        current_counts = self.lb_engines
        num_engines = len(current_counts)
        min_score = sys.maxsize
        eng_index = 0
        for i in range(num_engines):
            idx = (self.eng_start_index + i) % num_engines
            waiting, running = current_counts[idx]
            score = waiting * 4 + running
            if score < min_score:
                min_score = score
                eng_index = idx
```[6](#2-5) 

#### 4. 请求处理流程
```mermaid
sequenceDiagram
    participant Client
    participant APIServer
    participant DPCoordinator
    participant EngineCore
    participant GPUWorker
    
    Client->>APIServer: HTTP请求
    APIServer->>APIServer: 选择最优引擎
    APIServer->>EngineCore: 发送请求
    EngineCore->>DPCoordinator: 更新队列状态
    EngineCore->>GPUWorker: 执行模型
    GPUWorker-->>EngineCore: 返回结果
    EngineCore-->>APIServer: 返回输出
    APIServer-->>Client: HTTP响应

关键实现细节

1. 协调器启动
coordinator = DPCoordinator(
    parallel_config,
    enable_wave_coordination=vllm_config.model_config.is_moe,
)
```[7](#2-6) 

#### 2. 请求波次协调
对于MoE模型,协调器还负责请求波次同步:
```python
* Keeps track of the current DP "request wave" number and running state
  of the engines. This is received from the DP rank 0 engine and published
  to the front-end processes along with the current load stats.
```[8](#2-7) 

#### 3. 统计信息发布
协调器收集并发布引擎统计信息:
```python
* Collects stats from each DP engine (currently just waiting and running
  queue lengths), and publishes these to all front-ends for use in
  load-balancing decisions.
```[9](#2-8) 

## Notes

内部负载均衡的核心是通过DP Coordinator收集各引擎的队列状态,API服务器基于这些统计信息做出智能的请求分发决策。这种设计实现了对用户透明的单一API端点,同时充分利用了多个数据并行引擎的处理能力。 [10](#2-9) 

Wiki pages you might want to explore:
- [Overview (vllm-project/vllm)](/wiki/vllm-project/vllm#1)

### Citations

**File:** vllm/v1/engine/coordinator.py (L28-30)
```python
    * Collects stats from each DP engine (currently just waiting and running
      queue lengths), and publishes these to all front-ends for use in
      load-balancing decisions.

File: vllm/v1/engine/coordinator.py (L32-34)

    * Keeps track of the current DP "request wave" number and running state
      of the engines. This is received from the DP rank 0 engine and published
      to the front-end processes along with the current load stats.

File: vllm/v1/engine/coordinator.py (L108-110)

class EngineState:
    def __init__(self):
        self.request_counts = [0, 0]  # [waiting, running]

File: vllm/v1/engine/utils.py (L810-820)

    # Set up input and output addresses.
    addresses = EngineZmqAddresses(
        inputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
        outputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
    )

File: vllm/v1/engine/utils.py (L830-834)

    if run_coordinator:
        coordinator = DPCoordinator(
            parallel_config,
            enable_wave_coordination=vllm_config.model_config.is_moe,
        )

File: vllm/v1/engine/core_client.py (L1242-1262)

    def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
        # Engines are in rank order.
        if (eng_index := request.data_parallel_rank) is None:
            current_counts = self.lb_engines
            # TODO use P2C alg for larger DP sizes
            num_engines = len(current_counts)
            min_score = sys.maxsize
            eng_index = 0
            for i in range(num_engines):
                # Start from client_index to help with balancing when engines
                # are empty.
                idx = (self.eng_start_index + i) % num_engines
                waiting, running = current_counts[idx]
                score = waiting * 4 + running
                if score < min_score:
                    min_score = score
                    eng_index = idx
            # Increment local waiting count for better balancing between stats
            # updates from the coordinator (which happen every 100ms).
            current_counts[eng_index][0] += self.client_count

File: docs/design/arch_overview.md (L117-127)

### Process Count Summary

For a deployment with `N` GPUs, `TP` tensor parallel size, `DP` data parallel size, and `A` API server count:

| Process Type | Count | Notes |
|---|---|---|
| API Server | `A` (default `DP`) | Handles HTTP requests and input processing |
| Engine Core | `DP` (default 1) | Scheduler and KV cache management |
| GPU Worker | `N` (= `DP x TP`) | One per GPU, executes model forward passes |
| DP Coordinator | 1 if `DP > 1`, else 0 | Load balancing across DP ranks |
| **Total** | **`A + DP + N` (+ 1 if DP > 1)** | |

File: vllm/v1/engine/core.py (L753-759)

            internal_dp_balancing = (
                self.has_coordinator
                and not vllm_config.parallel_config.data_parallel_external_lb
            )
            # Only publish request queue stats to coordinator for "internal"
            # and "hybrid" LB modes.
            self.publish_dp_lb_stats = internal_dp_balancing

File: docs/serving/data_parallel_deployment.md (L75-78)

Currently, the internal DP load balancing is done within the API server process(es) and is based on the running and waiting queues in each of the engines. This could be made more sophisticated in future by incorporating KV cache aware logic.

When deploying large DP sizes using this method, the API server process can become a bottleneck. In this case, the orthogonal `--api-server-count` command line option can be used to scale this out (for example `--api-server-count=4`). This is transparent to users - a single HTTP endpoint / port is still exposed. Note that this API server scale-out is "internal" and still confined to the "head" node.



## 内部负载均衡实现详解

### 关键数据结构

#### 1. EngineState
```python
class EngineState:
    def __init__(self):
        self.request_counts = [0, 0]  # [waiting, running]
```[1](#2-0) 

#### 2. EngineZmqAddresses
管理ZMQ通信地址,包含输入、输出和协调器地址:
```python
addresses = EngineZmqAddresses(
    inputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
    outputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
)
```[2](#2-1) 

#### 3. 负载均衡统计
每个引擎维护waiting和running队列长度,用于负载均衡决策:
```python
waiting, running = current_counts[idx]
score = waiting * 4 + running
```[3](#2-2) 

### 涉及的进程架构

| 进程类型 | 数量 | 作用 |
|---------|------|------|
| API Server | `A` (默认等于DP大小) | 处理HTTP请求,负载均衡决策 |
| Engine Core | `DP` | 调度和KV缓存管理 |
| GPU Worker | `N` (= `DP × TP`) | 执行模型前向传播 |
| DP Coordinator | 1 (如果DP > 1) | 收集统计信息,协调请求波次 | [4](#2-3) 

### 完整流程

#### 1. 初始化阶段
```mermaid
sequenceDiagram
    participant Main
    participant DPCoordinator
    participant EngineCore
    participant APIServer
    
    Main->>DPCoordinator: 启动协调器进程
    Main->>EngineCore: 启动DP个引擎核心
    Main->>APIServer: 启动A个API服务器
    DPCoordinator->>APIServer: 发布统计信息地址
    APIServer->>EngineCore: 建立ZMQ连接
2. 统计信息收集

每个引擎核心定期发布队列状态:

internal_dp_balancing = (
    self.has_coordinator
    and not vllm_config.parallel_config.data_parallel_external_lb
)
self.publish_dp_lb_stats = internal_dp_balancing
```[5](#2-4) 

#### 3. 负载均衡决策
API服务器使用`DPLBAsyncMPClient`进行负载均衡:
```python
def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
    if (eng_index := request.data_parallel_rank) is None:
        current_counts = self.lb_engines
        num_engines = len(current_counts)
        min_score = sys.maxsize
        eng_index = 0
        for i in range(num_engines):
            idx = (self.eng_start_index + i) % num_engines
            waiting, running = current_counts[idx]
            score = waiting * 4 + running
            if score < min_score:
                min_score = score
                eng_index = idx
```[6](#2-5) 

#### 4. 请求处理流程
```mermaid
sequenceDiagram
    participant Client
    participant APIServer
    participant DPCoordinator
    participant EngineCore
    participant GPUWorker
    
    Client->>APIServer: HTTP请求
    APIServer->>APIServer: 选择最优引擎
    APIServer->>EngineCore: 发送请求
    EngineCore->>DPCoordinator: 更新队列状态
    EngineCore->>GPUWorker: 执行模型
    GPUWorker-->>EngineCore: 返回结果
    EngineCore-->>APIServer: 返回输出
    APIServer-->>Client: HTTP响应

关键实现细节

1. 协调器启动
coordinator = DPCoordinator(
    parallel_config,
    enable_wave_coordination=vllm_config.model_config.is_moe,
)
```[7](#2-6) 

#### 2. 请求波次协调
对于MoE模型,协调器还负责请求波次同步:
```python
* Keeps track of the current DP "request wave" number and running state
  of the engines. This is received from the DP rank 0 engine and published
  to the front-end processes along with the current load stats.
```[8](#2-7) 

#### 3. 统计信息发布
协调器收集并发布引擎统计信息:
```python
* Collects stats from each DP engine (currently just waiting and running
  queue lengths), and publishes these to all front-ends for use in
  load-balancing decisions.
```[9](#2-8) 

## Notes

内部负载均衡的核心是通过DP Coordinator收集各引擎的队列状态,API服务器基于这些统计信息做出智能的请求分发决策。这种设计实现了对用户透明的单一API端点,同时充分利用了多个数据并行引擎的处理能力。 [10](#2-9) 

Wiki pages you might want to explore:
- [Overview (vllm-project/vllm)](/wiki/vllm-project/vllm#1)

### Citations

**File:** vllm/v1/engine/coordinator.py (L28-30)
```python
    * Collects stats from each DP engine (currently just waiting and running
      queue lengths), and publishes these to all front-ends for use in
      load-balancing decisions.

File: vllm/v1/engine/coordinator.py (L32-34)

    * Keeps track of the current DP "request wave" number and running state
      of the engines. This is received from the DP rank 0 engine and published
      to the front-end processes along with the current load stats.

File: vllm/v1/engine/coordinator.py (L108-110)

class EngineState:
    def __init__(self):
        self.request_counts = [0, 0]  # [waiting, running]

File: vllm/v1/engine/utils.py (L810-820)

    # Set up input and output addresses.
    addresses = EngineZmqAddresses(
        inputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
        outputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
    )

File: vllm/v1/engine/utils.py (L830-834)

    if run_coordinator:
        coordinator = DPCoordinator(
            parallel_config,
            enable_wave_coordination=vllm_config.model_config.is_moe,
        )

File: vllm/v1/engine/core_client.py (L1242-1262)

    def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
        # Engines are in rank order.
        if (eng_index := request.data_parallel_rank) is None:
            current_counts = self.lb_engines
            # TODO use P2C alg for larger DP sizes
            num_engines = len(current_counts)
            min_score = sys.maxsize
            eng_index = 0
            for i in range(num_engines):
                # Start from client_index to help with balancing when engines
                # are empty.
                idx = (self.eng_start_index + i) % num_engines
                waiting, running = current_counts[idx]
                score = waiting * 4 + running
                if score < min_score:
                    min_score = score
                    eng_index = idx
            # Increment local waiting count for better balancing between stats
            # updates from the coordinator (which happen every 100ms).
            current_counts[eng_index][0] += self.client_count

File: docs/design/arch_overview.md (L117-127)

### Process Count Summary

For a deployment with `N` GPUs, `TP` tensor parallel size, `DP` data parallel size, and `A` API server count:

| Process Type | Count | Notes |
|---|---|---|
| API Server | `A` (default `DP`) | Handles HTTP requests and input processing |
| Engine Core | `DP` (default 1) | Scheduler and KV cache management |
| GPU Worker | `N` (= `DP x TP`) | One per GPU, executes model forward passes |
| DP Coordinator | 1 if `DP > 1`, else 0 | Load balancing across DP ranks |
| **Total** | **`A + DP + N` (+ 1 if DP > 1)** | |

File: vllm/v1/engine/core.py (L753-759)

            internal_dp_balancing = (
                self.has_coordinator
                and not vllm_config.parallel_config.data_parallel_external_lb
            )
            # Only publish request queue stats to coordinator for "internal"
            # and "hybrid" LB modes.
            self.publish_dp_lb_stats = internal_dp_balancing

File: docs/serving/data_parallel_deployment.md (L75-78)

Currently, the internal DP load balancing is done within the API server process(es) and is based on the running and waiting queues in each of the engines. This could be made more sophisticated in future by incorporating KV cache aware logic.

When deploying large DP sizes using this method, the API server process can become a bottleneck. In this case, the orthogonal `--api-server-count` command line option can be used to scale this out (for example `--api-server-count=4`). This is transparent to users - a single HTTP endpoint / port is still exposed. Note that this API server scale-out is "internal" and still confined to the "head" node.

前端进程(API Server)概述

**前端进程(API Server)**是 vLLM 中处理 HTTP 请求的进程,负责接收客户端请求、进行输入处理(如 tokenization)、执行负载均衡决策,并将请求转发给后端的引擎核心进程。 1

完整的请求流程代码追踪

1. API Server 启动

API Server 通过 run_multi_api_server 函数启动: 2

def run_multi_api_server(args: argparse.Namespace):
    # 设置共享的监听地址和socket
    listen_address, sock = setup_server(args)
    
    # 启动API服务器进程管理器
    api_server_manager = APIServerProcessManager(
        target_server_fn=run_api_server_worker_proc,
        listen_address=listen_address,
        sock=sock,  # 共享同一个socket
        num_servers=num_api_servers,
        # ...
    )

2. 请求接收与处理

HTTP 请求到达后,API Server 使用 AsyncLLM 处理请求: 3

3. 负载均衡决策

API Server 使用 DPLBAsyncMPClient 进行负载均衡,选择最优的引擎: 4

def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
    # 如果请求没有指定DP rank,则进行负载均衡
    if (eng_index := request.data_parallel_rank) is None:
        current_counts = self.lb_engines  # 获取各引擎的负载统计
        num_engines = len(current_counts)
        min_score = sys.maxsize
        eng_index = 0
        
        # 遍历所有引擎,计算负载分数
        for i in range(num_engines):
            idx = (self.eng_start_index + i) % num_engines
            waiting, running = current_counts[idx]
            score = waiting * 4 + running  # 等待队列权重更高
            
            if score < min_score:
                min_score = score
                eng_index = idx
        
        # 增加本地等待计数,提高均衡性
        current_counts[eng_index][0] += self.client_count
    
    chosen_engine = self.core_engines[eng_index]
    return chosen_engine

4. 请求转发

选择引擎后,通过 add_request_async 将请求发送到选定引擎: 5

async def add_request_async(self, request: EngineCoreRequest) -> None:
    self._ensure_stats_update_task()
    
    request.current_wave = self.current_wave
    request.client_index = self.client_index
    
    # 选择最优引擎
    chosen_engine = self.get_core_engine_for_request(request)
    
    # 发送请求到选定引擎
    to_await = self._send_input(EngineCoreRequestType.ADD, request, chosen_engine)
    
    if not self.engines_running:
        # 通知协调器有新请求
        req_msg = msgspec.msgpack.encode(("FIRST_REQ", chosen_engine))
        await self.first_req_send_socket.send(req_msg)
    
    await to_await

5. 统计信息更新

API Server 定期从 DP Coordinator 接收各引擎的统计信息: 6

async def run_engine_stats_update_task():
    # 订阅协调器的统计信息发布
    await socket.send(b"\x01")  # 发送订阅消息
    
    while True:
        # 接收统计信息更新
        buf = socket.recv(flags=zmq.NOBLOCK).result()
        counts, wave, running = msgspec.msgpack.decode(buf)
        
        self.current_wave = wave
        self.engines_running = running
        
        if counts is not None:
            # 更新本地负载均衡状态
            sliced_counts = counts[count_slice]
            self.lb_engines = sliced_counts

完整流程图

EngineCore DPCoordinator DPLBAsyncMPClient APIServer Client EngineCore DPCoordinator DPLBAsyncMPClient APIServer Client 基于lb_engines计算最优引擎 HTTP请求 (/v1/completions) add_request_async() get_core_engine_for_request() _send_input() via ZMQ FIRST_REQ通知 (如需要) 更新队列状态 发布统计信息 返回结果 处理后的输出 HTTP响应

关键数据结构关系

  1. EngineState: 存储每个引擎的 [waiting, running] 计数 7
  2. lb_engines: API Server 本地维护的各引擎负载统计
  3. EngineZmqAddresses: 管理 ZMQ 通信地址 8

Notes

API Server 作为前端进程,通过 DPLBAsyncMPClient 实现智能的内部负载均衡。它定期从 DP Coordinator 获取各引擎的实时负载信息,基于等待队列和运行队列长度计算负载分数,选择最优引擎处理请求。整个过程对用户透明,实现了单一 API 端点下的高效负载分发。 9

Wiki pages you might want to explore:

Citations

File: docs/design/arch_overview.md (L85-91)

### API Server Process

The API server process handles HTTP requests (e.g., the OpenAI-compatible API), performs input processing (tokenization, multi-modal data loading), and streams results back to clients. It communicates with the engine core process(es) via ZMQ sockets.

By default, there is **1 API server process**, but when data parallelism is used, the API server count automatically scales to match the data parallel size. This can also be manually configured with the `--api-server-count` flag. Each API server connects to **all** engine cores via ZMQ in a many-to-many topology, enabling any API server to route requests to any engine core. Each API server process uses multiple CPU threads for media loading (controlled by `VLLM_MEDIA_LOADING_THREAD_COUNT`, default 8).

The code can be found in [vllm/entrypoints/openai/api_server.py](../../vllm/entrypoints/openai/api_server.py) and [vllm/v1/utils.py](../../vllm/v1/utils.py).

File: vllm/entrypoints/cli/serve.py (L218-288)

def run_multi_api_server(args: argparse.Namespace):
    assert not args.headless
    num_api_servers: int = args.api_server_count
    assert num_api_servers > 0

    if num_api_servers > 1:
        setup_multiprocess_prometheus()

    listen_address, sock = setup_server(args)

    engine_args = vllm.AsyncEngineArgs.from_cli_args(args)
    engine_args._api_process_count = num_api_servers
    engine_args._api_process_rank = -1

    usage_context = UsageContext.OPENAI_API_SERVER
    vllm_config = engine_args.create_engine_config(usage_context=usage_context)

    if num_api_servers > 1 and envs.VLLM_ALLOW_RUNTIME_LORA_UPDATING:
        raise ValueError(
            "VLLM_ALLOW_RUNTIME_LORA_UPDATING cannot be used with api_server_count > 1"
        )

    executor_class = Executor.get_class(vllm_config)
    log_stats = not engine_args.disable_log_stats

    parallel_config = vllm_config.parallel_config
    dp_rank = parallel_config.data_parallel_rank
    assert parallel_config.local_engines_only or dp_rank == 0

    api_server_manager: APIServerProcessManager | None = None

    with launch_core_engines(
        vllm_config, executor_class, log_stats, num_api_servers
    ) as (local_engine_manager, coordinator, addresses):
        # Construct common args for the APIServerProcessManager up-front.
        api_server_manager_kwargs = dict(
            target_server_fn=run_api_server_worker_proc,
            listen_address=listen_address,
            sock=sock,
            args=args,
            num_servers=num_api_servers,
            input_addresses=addresses.inputs,
            output_addresses=addresses.outputs,
            stats_update_address=coordinator.get_stats_publish_address()
            if coordinator
            else None,
        )

        # For dp ranks > 0 in external/hybrid DP LB modes, we must delay the
        # start of the API servers until the local engine is started
        # (after the launcher context manager exits),
        # since we get the front-end stats update address from the coordinator
        # via the handshake with the local engine.
        if dp_rank == 0 or not parallel_config.local_engines_only:
            # Start API servers using the manager.
            api_server_manager = APIServerProcessManager(**api_server_manager_kwargs)

    # Start API servers now if they weren't already started.
    if api_server_manager is None:
        api_server_manager_kwargs["stats_update_address"] = (
            addresses.frontend_stats_publish_address
        )
        api_server_manager = APIServerProcessManager(**api_server_manager_kwargs)

    # Wait for API servers
    wait_for_completion_or_failure(
        api_server_manager=api_server_manager,
        engine_manager=local_engine_manager,
        coordinator=coordinator,
    )

File: vllm/v1/engine/async_llm.py (L1-54)

# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
import asyncio
import os
import socket
import time
import warnings
from collections.abc import AsyncGenerator, Iterable, Mapping
from copy import copy
from typing import Any

import torch

import vllm.envs as envs
from vllm import TokensPrompt
from vllm.config import VllmConfig
from vllm.distributed.weight_transfer.base import (
    WeightTransferInitRequest,
    WeightTransferUpdateRequest,
)
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.protocol import EngineClient, StreamingInput
from vllm.inputs import ProcessorInputs, PromptType
from vllm.logger import init_logger
from vllm.lora.request import LoRARequest
from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry
from vllm.outputs import STREAM_FINISHED, PoolingRequestOutput, RequestOutput
from vllm.plugins.io_processors import get_io_processor
from vllm.pooling_params import PoolingParams
from vllm.renderers import renderer_from_config
from vllm.renderers.inputs.preprocess import extract_prompt_components
from vllm.sampling_params import RequestOutputKind, SamplingParams
from vllm.tasks import SupportedTask
from vllm.tokenizers import TokenizerLike
from vllm.tracing import init_tracer
from vllm.transformers_utils.config import maybe_register_config_serialize_by_value
from vllm.usage.usage_lib import UsageContext
from vllm.utils.async_utils import cancel_task_threadsafe
from vllm.utils.collection_utils import as_list
from vllm.v1.engine import EngineCoreRequest, PauseMode
from vllm.v1.engine.core_client import EngineCoreClient
from vllm.v1.engine.exceptions import EngineDeadError, EngineGenerateError
from vllm.v1.engine.input_processor import InputProcessor
from vllm.v1.engine.output_processor import OutputProcessor, RequestOutputCollector
from vllm.v1.engine.parallel_sampling import ParentRequest
from vllm.v1.executor import Executor
from vllm.v1.metrics.loggers import (
    StatLoggerFactory,
    StatLoggerManager,
    load_stat_logger_plugin_factories,
)
from vllm.v1.metrics.prometheus import shutdown_prometheus
from vllm.v1.metrics.stats import IterationStats

File: vllm/v1/engine/core_client.py (L1093-1186)

    def _ensure_stats_update_task(self):
        resources = self.resources
        if resources.stats_update_task is not None:
            return

        assert self.stats_update_address is not None
        stats_addr: str = self.stats_update_address
        assert len(self.engine_ranks_managed) > 0
        # NOTE: running and waiting counts are all global from
        # the Coordinator include all global EngineCores. This
        # slice includes just the cores managed by this client.
        count_slice = slice(
            self.engine_ranks_managed[0], self.engine_ranks_managed[-1] + 1
        )

        async def run_engine_stats_update_task():
            with (
                make_zmq_socket(self.ctx, stats_addr, zmq.XSUB, linger=0) as socket,
                make_zmq_socket(
                    self.ctx, self.first_req_sock_addr, zmq.PAIR, bind=False, linger=0
                ) as first_req_rcv_socket,
            ):
                assert isinstance(socket, zmq.asyncio.Socket)
                assert isinstance(first_req_rcv_socket, zmq.asyncio.Socket)
                self.resources.stats_update_socket = socket
                self.resources.first_req_rcv_socket = first_req_rcv_socket
                # Send subscription message.
                await socket.send(b"\x01")

                poller = zmq.asyncio.Poller()
                poller.register(socket, zmq.POLLIN)
                poller.register(first_req_rcv_socket, zmq.POLLIN)

                while True:
                    events = await poller.poll()
                    if (
                        not self.engines_running
                        and len(events) == 2
                        or (events[0][0] == first_req_rcv_socket)
                    ):
                        # Check if this is a regular request notification or
                        # scale up notification
                        buf = first_req_rcv_socket.recv(flags=zmq.NOBLOCK).result()

                        decoded = msgspec.msgpack.decode(buf)
                        if (
                            isinstance(decoded, (list, tuple))
                            and len(decoded) == 2
                            and decoded[0] == "SCALE_ELASTIC_EP"
                        ):
                            # Extract new engine count from the decoded message
                            new_engine_count = decoded[1]
                            # Send scale up notification to coordinator
                            scale_msg = msgspec.msgpack.encode(
                                ("SCALE_ELASTIC_EP", new_engine_count)
                            )
                            await socket.send(scale_msg)
                            continue

                        # we're sending a request while the engines are
                        # paused, so that it can wake the others up
                        # (to run dummy EP loop).
                        assert decoded[0] == "FIRST_REQ"
                        target_eng_index = decoded[1]
                        self.engines_running = True
                        msg = msgspec.msgpack.encode(
                            (target_eng_index, self.current_wave)
                        )
                        await socket.send(msg)

                    buf = None
                    while True:
                        # Drain all stats events (we only care about latest).
                        future: asyncio.Future[bytes] = socket.recv(flags=zmq.NOBLOCK)
                        if isinstance(future.exception(), zmq.Again):
                            break
                        buf = future.result()
                    if buf is None:
                        continue

                    # Update local load-balancing state.
                    counts, wave, running = msgspec.msgpack.decode(buf)
                    self.current_wave = wave
                    self.engines_running = running
                    if counts is not None:
                        sliced_counts = counts[count_slice]
                        self.lb_engines = sliced_counts
                        logger.debug(
                            "Received counts: %s (%s)", sliced_counts, count_slice
                        )

        resources.stats_update_task = asyncio.create_task(
            run_engine_stats_update_task()
        )

File: vllm/v1/engine/core_client.py (L1188-1204)

    async def add_request_async(self, request: EngineCoreRequest) -> None:
        self._ensure_stats_update_task()

        request.current_wave = self.current_wave
        request.client_index = self.client_index

        chosen_engine = self.get_core_engine_for_request(request)
        to_await = self._send_input(EngineCoreRequestType.ADD, request, chosen_engine)
        if not self.engines_running:
            # Notify coordinator that we're sending a request
            req_msg = msgspec.msgpack.encode(("FIRST_REQ", chosen_engine))
            await self.first_req_send_socket.send(req_msg)

        await to_await

        self._ensure_output_queue_task()

File: vllm/v1/engine/core_client.py (L1242-1266)

    def get_core_engine_for_request(self, request: EngineCoreRequest) -> EngineIdentity:
        # Engines are in rank order.
        if (eng_index := request.data_parallel_rank) is None:
            current_counts = self.lb_engines
            # TODO use P2C alg for larger DP sizes
            num_engines = len(current_counts)
            min_score = sys.maxsize
            eng_index = 0
            for i in range(num_engines):
                # Start from client_index to help with balancing when engines
                # are empty.
                idx = (self.eng_start_index + i) % num_engines
                waiting, running = current_counts[idx]
                score = waiting * 4 + running
                if score < min_score:
                    min_score = score
                    eng_index = idx
            # Increment local waiting count for better balancing between stats
            # updates from the coordinator (which happen every 100ms).
            current_counts[eng_index][0] += self.client_count

        chosen_engine = self.core_engines[eng_index]
        # Record which engine is chosen for this request, to handle aborts.
        self.reqs_in_flight[request.request_id] = chosen_engine
        return chosen_engine

File: vllm/v1/engine/utils.py (L810-820)

    # Set up input and output addresses.
    addresses = EngineZmqAddresses(
        inputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
        outputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
    )

File: docs/serving/data_parallel_deployment.md (L75-78)

Currently, the internal DP load balancing is done within the API server process(es) and is based on the running and waiting queues in each of the engines. This could be made more sophisticated in future by incorporating KV cache aware logic.

When deploying large DP sizes using this method, the API server process can become a bottleneck. In this case, the orthogonal `--api-server-count` command line option can be used to scale this out (for example `--api-server-count=4`). This is transparent to users - a single HTTP endpoint / port is still exposed. Note that this API server scale-out is "internal" and still confined to the "head" node.

多个 API Server 的通信机制

共享监听端口接收请求

多个 API Server 进程通过共享同一个监听 socket 来接收 HTTP 请求。操作系统内核负责在多个进程间分发连接,实现负载均衡。 1

# 设置共享的监听地址和socket
listen_address, sock = setup_server(args)

API Server 进程管理

APIServerProcessManager 负责启动和管理多个 API Server 工作进程: 2

class APIServerProcessManager:
    def __init__(self, target_server_fn, listen_address, sock, args, num_servers, 
                 input_addresses, output_addresses, stats_update_address=None):
        # 启动多个API服务器工作进程
        for i, in_addr, out_addr in zip(range(num_servers), input_addresses, output_addresses):
            client_config = {
                "input_address": in_addr,
                "output_address": out_addr,
                "client_count": num_servers,
                "client_index": i,
            }
            proc = spawn_context.Process(
                target=target_server_fn,
                name=f"ApiServer_{i}",
                args=(listen_address, sock, args, client_config),
            )
            proc.start()

与 Engine Core 的 ZMQ 通信

每个 API Server 通过 ZMQ socket 与所有 Engine Core 进程建立多对多连接: 3

# 为每个API服务器设置输入输出地址
addresses = EngineZmqAddresses(
    inputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
    outputs=[
        get_engine_client_zmq_addr(client_local_only, host)
        for _ in range(num_api_servers)
    ],
)

与 DP Coordinator 的统计信息订阅

API Server 订阅 DP Coordinator 发布的负载统计信息: 4

async def run_engine_stats_update_task():
    # 订阅协调器的统计信息发布
    await socket.send(b"\x01")  # 发送订阅消息
    
    while True:
        # 接收统计信息更新
        counts, wave, running = msgspec.msgpack.decode(buf)
        if counts is not None:
            # 更新本地负载均衡状态
            sliced_counts = counts[count_slice]
            self.lb_engines = sliced_counts

完整通信流程

EngineCore2 EngineCore1 DPCoordinator APIServer2 APIServer1 OS Client EngineCore2 EngineCore1 DPCoordinator APIServer2 APIServer1 OS Client 1. HTTP请求接收 2. 负载均衡决策 3. 请求转发 4. 处理与响应 5. HTTP响应 HTTP请求到共享端口 分发连接(内核负载均衡) 分发连接(内核负载均衡) 订阅统计信息 订阅统计信息 发布引擎负载统计 发布引擎负载统计 基于统计选择最优引擎 基于统计选择最优引擎 ZMQ发送请求 ZMQ发送请求 执行模型推理 执行模型推理 ZMQ返回结果 ZMQ返回结果 HTTP响应 HTTP响应

关键通信机制

1. Socket 共享机制

  • 多个 API Server 进程共享同一个监听 socket
  • 操作系统内核自动进行连接分发
  • 对客户端透明,只需连接单一端口

2. ZMQ 多对多拓扑

  • 每个 API Server 连接到所有 Engine Core
  • 使用 DEALER/PUSH 模式进行请求发送
  • 使用 ROUTER/PULL 模式接收响应

3. 发布-订阅模式

  • DP Coordinator 作为发布者
  • API Server 作为订阅者
  • 实时分发引擎负载统计信息

Notes

多个 API Server 通过共享监听端口和 ZMQ 通信机制实现了高效的并行处理。这种设计既保持了单一 API 端点的简洁性,又能够充分利用多核 CPU 资源进行输入处理和负载均衡决策。 5

Wiki pages you might want to explore:

Citations

File: vllm/entrypoints/cli/serve.py (L226-227)

    listen_address, sock = setup_server(args)

File: vllm/v1/utils.py (L159-218)

class APIServerProcessManager:
    """Manages a group of API server processes.

    Handles creation, monitoring, and termination of API server worker
    processes. Also monitors extra processes to check if they are healthy.
    """

    def __init__(
        self,
        target_server_fn: Callable,
        listen_address: str,
        sock: Any,
        args: argparse.Namespace,
        num_servers: int,
        input_addresses: list[str],
        output_addresses: list[str],
        stats_update_address: str | None = None,
    ):
        """Initialize and start API server worker processes.

        Args:
            target_server_fn: Function to call for each API server process
            listen_address: Address to listen for client connections
            sock: Socket for client connections
            args: Command line arguments
            num_servers: Number of API server processes to start
            input_addresses: Input addresses for each API server
            output_addresses: Output addresses for each API server
            stats_update_address: Optional stats update address
        """
        self.listen_address = listen_address
        self.sock = sock
        self.args = args

        # Start API servers
        spawn_context = multiprocessing.get_context("spawn")
        self.processes: list[BaseProcess] = []

        for i, in_addr, out_addr in zip(
            range(num_servers), input_addresses, output_addresses
        ):
            client_config = {
                "input_address": in_addr,
                "output_address": out_addr,
                "client_count": num_servers,
                "client_index": i,
            }
            if stats_update_address is not None:
                client_config["stats_update_address"] = stats_update_address

            proc = spawn_context.Process(
                target=target_server_fn,
                name=f"ApiServer_{i}",
                args=(listen_address, sock, args, client_config),
            )
            self.processes.append(proc)
            proc.start()

        logger.info("Started %d API server processes", len(self.processes))

File: vllm/v1/engine/utils.py (L810-820)

    # Set up input and output addresses.
    addresses = EngineZmqAddresses(
        inputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
        outputs=[
            get_engine_client_zmq_addr(client_local_only, host)
            for _ in range(num_api_servers)
        ],
    )

File: vllm/v1/engine/core_client.py (L1093-1186)

    def _ensure_stats_update_task(self):
        resources = self.resources
        if resources.stats_update_task is not None:
            return

        assert self.stats_update_address is not None
        stats_addr: str = self.stats_update_address
        assert len(self.engine_ranks_managed) > 0
        # NOTE: running and waiting counts are all global from
        # the Coordinator include all global EngineCores. This
        # slice includes just the cores managed by this client.
        count_slice = slice(
            self.engine_ranks_managed[0], self.engine_ranks_managed[-1] + 1
        )

        async def run_engine_stats_update_task():
            with (
                make_zmq_socket(self.ctx, stats_addr, zmq.XSUB, linger=0) as socket,
                make_zmq_socket(
                    self.ctx, self.first_req_sock_addr, zmq.PAIR, bind=False, linger=0
                ) as first_req_rcv_socket,
            ):
                assert isinstance(socket, zmq.asyncio.Socket)
                assert isinstance(first_req_rcv_socket, zmq.asyncio.Socket)
                self.resources.stats_update_socket = socket
                self.resources.first_req_rcv_socket = first_req_rcv_socket
                # Send subscription message.
                await socket.send(b"\x01")

                poller = zmq.asyncio.Poller()
                poller.register(socket, zmq.POLLIN)
                poller.register(first_req_rcv_socket, zmq.POLLIN)

                while True:
                    events = await poller.poll()
                    if (
                        not self.engines_running
                        and len(events) == 2
                        or (events[0][0] == first_req_rcv_socket)
                    ):
                        # Check if this is a regular request notification or
                        # scale up notification
                        buf = first_req_rcv_socket.recv(flags=zmq.NOBLOCK).result()

                        decoded = msgspec.msgpack.decode(buf)
                        if (
                            isinstance(decoded, (list, tuple))
                            and len(decoded) == 2
                            and decoded[0] == "SCALE_ELASTIC_EP"
                        ):
                            # Extract new engine count from the decoded message
                            new_engine_count = decoded[1]
                            # Send scale up notification to coordinator
                            scale_msg = msgspec.msgpack.encode(
                                ("SCALE_ELASTIC_EP", new_engine_count)
                            )
                            await socket.send(scale_msg)
                            continue

                        # we're sending a request while the engines are
                        # paused, so that it can wake the others up
                        # (to run dummy EP loop).
                        assert decoded[0] == "FIRST_REQ"
                        target_eng_index = decoded[1]
                        self.engines_running = True
                        msg = msgspec.msgpack.encode(
                            (target_eng_index, self.current_wave)
                        )
                        await socket.send(msg)

                    buf = None
                    while True:
                        # Drain all stats events (we only care about latest).
                        future: asyncio.Future[bytes] = socket.recv(flags=zmq.NOBLOCK)
                        if isinstance(future.exception(), zmq.Again):
                            break
                        buf = future.result()
                    if buf is None:
                        continue

                    # Update local load-balancing state.
                    counts, wave, running = msgspec.msgpack.decode(buf)
                    self.current_wave = wave
                    self.engines_running = running
                    if counts is not None:
                        sliced_counts = counts[count_slice]
                        self.lb_engines = sliced_counts
                        logger.debug(
                            "Received counts: %s (%s)", sliced_counts, count_slice
                        )

        resources.stats_update_task = asyncio.create_task(
            run_engine_stats_update_task()
        )

File: docs/design/arch_overview.md (L85-91)

### API Server Process

The API server process handles HTTP requests (e.g., the OpenAI-compatible API), performs input processing (tokenization, multi-modal data loading), and streams results back to clients. It communicates with the engine core process(es) via ZMQ sockets.

By default, there is **1 API server process**, but when data parallelism is used, the API server count automatically scales to match the data parallel size. This can also be manually configured with the `--api-server-count` flag. Each API server connects to **all** engine cores via ZMQ in a many-to-many topology, enabling any API server to route requests to any engine core. Each API server process uses multiple CPU threads for media loading (controlled by `VLLM_MEDIA_LOADING_THREAD_COUNT`, default 8).

The code can be found in [vllm/entrypoints/openai/api_server.py](../../vllm/entrypoints/openai/api_server.py) and [vllm/v1/utils.py](../../vllm/v1/utils.py).




Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐