好的,我们来深入且完整地剖析 data-parallel-rpc-port 这个参数,并梳理它在代码中的完整流程。


https://docs.vllm.ai/en/latest/serving/data_parallel_deployment/#internal-load-balancing

在这里插入图片描述
您好!这两个问题都非常棒,直击这张架构图的核心。


1. 图中的 “Launcher” 是指什么?

图中的 “Launcher” 指的是用户在命令行中执行的那个初始进程,也就是 vllm serve ... 这个命令本身所启动的那个父进程

它之所以被称为“启动器”,是因为它的核心职责就是**“启动和协调”**整个 vLLM 服务的所有其他组件。

让我们看看它具体做了哪些“启动”工作(以 Node 0 为例):

  • 启动 DP Coordinator: Launcher 进程创建并启动了 DP Coordinator 这个独立的后台进程。
  • 启动 API Server: Launcher 进程创建并启动了所有 API Server 工作进程。在多 API Server 模式下,它会创建共享套接字,然后 fork 出多个子进程。
  • 启动 Engine Core: Launcher 进程创建并启动了在本地节点上运行的 Engine Core 进程。

图中的虚线箭头从 Launcher 指向这些组件,正是表示这种“创建”或“启动”的关系。

可以把 “Launcher” 理解为整个 vLLM 服务的“总司令”或“入口点进程”。 它负责解析配置、搭建好通信框架、把所有需要的“士兵”(API Server, Engine Core, DP Coordinator)都派到各自的岗位上,然后监控整个战场的运行。

在 Node 1 上,Launcher 的角色更简单,因为它是在 --headless 模式下运行的,所以它只需要负责启动 Node 1 本地的 Engine Core 进程即可。


2. 为什么 “API Server” 后面有 “AsyncLLM”?

这个标注是为了揭示 API Server 内部的核心逻辑和关键对象。API ServerAsyncLLM 是两个不同层面但紧密相关的概念:

  • API Server: 这是一个进程级别的概念。它指的是一个完整的、独立的 Web 服务器进程(通常由 uvicorn 运行)。它的职责是:

    • 监听网络端口,接收 HTTP 请求。
    • 解析 OpenAI 兼容的 API 请求(比如 /v1/chat/completions)。
    • 将 HTTP 响应返回给客户端。
  • AsyncLLM: 这是一个对象/类级别的概念。它是 vLLM 提供的一个核心异步引擎接口类。在每个 API Server 进程内部,都会有一个 AsyncLLM 的实例。这个实例是前端(Web 服务)和后端(模型推理)之间的桥梁。它的职责是:

    • 提供异步方法,如 generate()chat()
    • 将上层 API Server 传来的请求(如 prompt, sampling_params 等)打包成 vLLM 内部的 Request 对象。
    • 通过它内部的 EngineCoreClient(如 AsyncMPClient),使用 ZMQ 将请求发送给后端的 Engine Core
    • 异步地从 Engine Core 接收流式返回的 token,并将它们传递回 API Server

所以,“API Server | AsyncLLM” 这个标注的含义是:

“这是一个 API Server 进程,其内部的核心驱动逻辑是由一个 AsyncLLM 对象实例来完成的。”

API Server 负责“外壳”(处理 HTTP 协议),而 AsyncLLM 负责“内核”(与 vLLM 的分布式推理引擎进行实际的异步交互)。这个标注帮助我们理解,当一个 HTTP 请求进入 API Server 后,它最终会被转换成对 AsyncLLM 对象的调用,从而进入 vLLM 的推理流程。

简而言之:API Server 是房子的外观,AsyncLLM 是房子里的智能管家。

data-parallel-rpc-port 详解

简单来说,data-parallel-rpc-port 定义了在多节点数据并行部署中,所有计算节点(Worker Nodes)用来向主节点(Head Node)“报到”和进行初始同步的那个唯一的、指定的网络端口。

您可以把它想象成一个**“报到大厅的门牌号”**。

  • 主节点 (Head Node):它在启动时,会在自己的 IP 地址和这个指定的端口上开设这个“报到大厅”。它在这里等待所有其他节点的工作人员前来登记。
  • 计算节点 (Worker Node):它们在启动时,被告知“报到大厅”的完整地址(IP + 端口)。它们会主动前往这个地址,完成登记(握手),并领取后续工作的指令(获取 DP Coordinator 等组件的地址)。
它的核心作用
  1. 初始集结点 (Initial Rendezvous Point): 在一个分布式系统中,各个独立的进程需要一个共同的、预先知道的“地方”来首次建立联系。data-parallel-rpc-portdata-parallel-address 共同构成了这个集结点的地址。

  2. 握手通信通道 (Handshake Channel): 它是 wait_for_engine_startup 函数进行两阶段握手(HELLO/READY)所使用的网络通道。所有 Engine Core,无论本地还是远程,都通过这个端口与主节点的启动程序进行通信。

  3. 配置分发通道 (Configuration Distribution Channel): 在握手过程中,主节点需要将关键的配置信息(如 DP Coordinator 的地址)分发给所有 Engine Core。这个分发过程也是通过这个端口建立的 ZMQ 连接完成的。

为什么需要这个参数?
因为在多机环境下,进程无法像在单机上那样轻易地共享资源或发现彼此。必须有一个稳定、可预测的地址作为通信的起点。这个端口必须是在主节点上一个可用的、且在计算节点防火墙策略中允许访问的端口。


完整代码流程

下面我们来追踪当用户在命令行指定 --data-parallel-rpc-port 13345 后,这个值在代码中是如何被使用的。

第 1 步: 命令行输入与参数解析

用户在两个节点上分别执行命令:

  • 主节点 (Node 0):
    vllm serve ... --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345
    
  • 计算节点 (Node 1):
    vllm serve --headless ... --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345
    
  • 代码位置: vllm/entrypoints/cli/serve.py
  • 发生的事: ServeSubcommand 类中的 argparse 解析器会读取这些参数,并将 13345 存入 args.data_parallel_rpc_port
第 2 步: 创建配置对象

serve.pycmd 方法或其调用的函数中,VllmConfig 对象会被创建。

  • 代码位置: vllm.AsyncEngineArgs.from_cli_args(args).create_engine_config(...)
  • 发生的事: 命令行参数 args.data_parallel_rpc_port 的值被用来初始化 ParallelConfig,最终存储在 vllm_config.parallel_config.data_parallel_rpc_port。现在,这个端口号成为了整个 vLLM 实例配置的一部分。
第 3 步: 启动核心引擎 (launch_core_engines)

这是最关键的协调步骤,发生在主节点上(对于 headless 节点,流程类似但更简化)。

  • 代码位置: vllm/v1/engine/utils.py -> launch_core_engines 函数

  • 发生的事:

    1. 获取配置: 函数从传入的 vllm_config 中读取 host (data_parallel_address) 和 port (data_parallel_rpc_port)。
      parallel_config = vllm_config.parallel_config
      host = parallel_config.data_parallel_master_ip
      port = parallel_config.data_parallel_rpc_port
      
    2. 构建握手地址: 使用 hostport 构建一个完整的 ZMQ TCP 地址。
      # from vllm.utils.network_utils import get_tcp_uri
      handshake_address = get_tcp_uri(host, port) 
      # 结果为 "tcp://10.0.0.1:13345"
      
    3. 主节点绑定套接字 (开设“报到大厅”): launch_core_engines 函数内部使用一个上下文管理器来创建和管理握手套接字。
      with zmq_socket_ctx(
          local_handshake_address, zmq.ROUTER, bind=True
      ) as handshake_socket:
          # ...
      
      这里,主节点创建了一个 ROUTER 类型的 ZMQ 套接字,并绑定 (bind) 到了 "tcp://10.0.0.1:13345"。它现在开始在这个地址上监听传入的连接。
第 4 步: 启动工作进程并传递地址

主节点和计算节点都会执行这一步来启动它们各自的 Engine Core 进程。

  • 代码位置: vllm/v1/engine/utils.py -> CoreEngineProcManager

  • 发生的事:

    1. launch_core_engineswith 块内部,CoreEngineProcManager 被实例化。
    2. 在创建 CoreEngineProcManager 时,上一步构建的 handshake_address (“tcp://10.0.0.1:13345”) 被作为参数传递进去。
      local_engine_manager = CoreEngineProcManager(
          ...,
          handshake_address=handshake_address, # "tcp://10.0.0.1:13345"
          ...
      )
      
    3. CoreEngineProcManager 在其 __init__ 方法中,为每个要启动的 Engine Core 创建一个 multiprocessing.Processhandshake_address 作为 kwargs 的一部分,被准备好传递给子进程的目标函数。
第 5 步: 工作进程连接主节点 (前来“报到”)

每个新创建的 Engine Core 子进程会执行其目标函数 EngineCoreProc.run_engine_core

  • 代码位置: vllm/v1/engine/core.py -> EngineCoreProc.run_engine_core

  • 发生的事:

    1. 子进程从其启动参数中获取 handshake_address
    2. EngineCore 的初始化过程中,它会创建一个 ZMQ DEALER 套接字。
    3. 然后,它使用这个 DEALER 套接字连接 (connect)handshake_address
      # EngineCore 内部逻辑的简化表示
      handshake_client = context.socket(zmq.DEALER)
      handshake_client.connect(handshake_address) # 连接到 "tcp://10.0.0.1:13345"
      
    4. 连接成功后,它立刻通过这个连接发送 “HELLO” 消息,开始握手流程。
第 6 步: 握手和同步
  • 代码位置: vllm/v1/engine/utils.py -> wait_for_engine_startup
  • 发生的事:
    1. 主节点上的 wait_for_engine_startup 函数使用它在第 3 步创建的 handshake_socket(它监听在 13345 端口)接收来自所有 Engine Core(包括远程的)的 “HELLO” 和 “READY” 消息。
    2. 它通过同一个套接字将配置信息发回给每个 Engine Core
    3. 一旦所有 Engine Core 都完成了这个过程,wait_for_engine_startup 函数结束,整个集群的初始同步完成。

总结

data-parallel-rpc-port 是多机部署的“生命线端口”。它定义了一个众所周知的集合点,让分散在网络中的各个 vLLM 进程能够找到彼此,完成身份验证和初始配置交换。一旦这个引导(bootstrap)过程完成,这个端口的使命就基本结束了,后续的常规通信(如请求分发、结果返回、状态同步)会通过在握手过程中分发的其他 ZMQ 地址和端口来进行。

多机部署的后端选择

在您提到的配置中(--data-parallel-size 2 --data-parallel-size-local 1),默认不使用 Ray 后端,而是使用 multiprocessing 后端

后端选择逻辑

vLLM 的数据并行后端由 data_parallel_backend 参数控制: 1

data_parallel_backend: DataParallelBackend = "mp"
"""Backend to use for data parallel, either "mp" or "ray"."""

默认行为

在多节点部署中,系统默认选择 multiprocessing 后端: 2

elif current_platform.is_cuda() and self.nnodes > 1:
    backend = "mp"

如何使用 Ray 后端

要使用 Ray 后端,需要显式指定:

# 主节点
python -m vllm.entrypoints.openai.api_server \
    --model llama-2-7b \
    --api-server-count 2 \
    --data-parallel-size 2 \
    --data-parallel-size-local 1 \
    --data-parallel-address 192.168.1.100 \
    --data-parallel-backend ray

# 计算节点
python -m vllm.entrypoints.openai.api_server \
    --model llama-2-7b \
    --headless \
    --data-parallel-size 2 \
    --data-parallel-size-local 1 \
    --data-parallel-start-rank 1 \
    --data-parallel-address 192.168.1.100 \
    --data-parallel-backend ray

Ray 后端的优势

使用 Ray 后端时的优势: 3

  • 只需在任意节点运行单个启动命令
  • 无需指定 --data-parallel-address
  • 无需指定 --data-parallel-rpc-port
  • 自动分配远程 DP 副本

执行器选择

系统根据后端选择相应的执行器: 4

elif distributed_executor_backend == "ray":
    from vllm.v1.executor.ray_executor import RayDistributedExecutor
    executor_class = RayDistributedExecutor
elif distributed_executor_backend == "mp":
    from vllm.v1.executor.multiproc_executor import MultiprocExecutor
    executor_class = MultiprocExecutor

Notes

总结:您提到的配置默认使用 multiprocessing 后端,不是 Ray 后端。要使用 Ray 后端必须显式添加 --data-parallel-backend ray 参数。Ray 后端简化了多节点部署但需要 Ray 集群环境,而 multiprocessing 后端更适合传统集群环境。

Wiki pages you might want to explore:

Citations

File: vllm/config/parallel.py (L119-120)

    data_parallel_backend: DataParallelBackend = "mp"
    """Backend to use for data parallel, either "mp" or "ray"."""

File: vllm/engine/arg_utils.py (L610-612)

            self.attention_config = AttentionConfig(**self.attention_config)
        if isinstance(self.kernel_config, dict):
            self.kernel_config = KernelConfig(**self.kernel_config)

File: docs/serving/data_parallel_deployment.md (L67-73)

There are several notable differences when using Ray:

- A single launch command (on any node) is needed to start all local and remote DP ranks, therefore it is more convenient compared to launching on each node
- There is no need to specify `--data-parallel-address`, and the node where the command is run is used as `--data-parallel-address`
- There is no need to specify `--data-parallel-rpc-port`
- When a single DP group requires multiple nodes, *e.g.* in case a single model replica needs to run on at least two nodes, make sure to set `VLLM_RAY_DP_PACK_STRATEGY="span"` in which case `--data-parallel-size-local` is ignored and will be automatically determined
- Remote DP ranks will be allocated based on node resources of the Ray cluster

File: vllm/v1/executor/abstract.py (L59-66)

        elif distributed_executor_backend == "ray":
            from vllm.v1.executor.ray_executor import RayDistributedExecutor

            executor_class = RayDistributedExecutor
        elif distributed_executor_backend == "mp":
            from vllm.v1.executor.multiproc_executor import MultiprocExecutor

            executor_class = MultiprocExecutor

Coordinator 部署位置详解

这句注释解释了 DP Coordinator 进程的部署位置决策逻辑:在非外部或混合负载均衡模式下,假设协调器与前端进程共址部署1

核心逻辑

# Assume coordinator is colocated with front-end procs when not in
# either external or hybrid DP LB mode.
local_only = not parallel_config.local_engines_only

部署模式判断

local_engines_only 属性定义了不同负载均衡模式下的部署策略: 2

@property
def local_engines_only(self) -> bool:
    """
    Client manages local+remote EngineCores in pure internal LB case.
    Client manages local EngineCores in hybrid and external LB case.
    """
    return self.data_parallel_external_lb or self.data_parallel_hybrid_lb

三种负载均衡模式的部署差异

负载均衡模式 local_engines_only local_only Coordinator 位置 通信方式
内部 LB (默认) False True 与前端进程共址 IPC (本地)
外部 LB True False 与 DP rank 0 共址 TCP (跨节点)
混合 LB True False 与本地前端共址 TCP (跨节点)

地址生成影响

这个决策直接影响 ZMQ 地址的生成: 3

front_publish_address = get_engine_client_zmq_addr(
    local_only=local_only, host=host
)
  • local_only=True: 使用 IPC 地址 (ipc:///tmp/vllm_rpc/...)
  • local_only=False: 使用 TCP 地址 (tcp://host:port)

实际应用场景

  1. 内部负载均衡:单节点或多节点部署,API Server 管理所有引擎,Coordinator 与 API Server 在同一节点
  2. 外部负载均衡:每个 DP rank 独立部署,Coordinator 与 rank 0 的引擎在同一节点
  3. 混合负载均衡:每个节点有本地 API Server,Coordinator 与本地前端在同一节点

Notes

这种设计确保了在不同负载均衡模式下,Coordinator 能够以最优的通信方式与相关进程通信,同时简化了部署配置的复杂性。内部模式下的共址部署减少了网络开销,而外部/混合模式下的分布式部署支持了更大规模的扩展需求。

Wiki pages you might want to explore:

Citations

File: vllm/v1/engine/coordinator.py (L66-68)

        # Assume coordinator is colocated with front-end procs when not in
        # either external or hybrid DP LB mode.
        local_only = not parallel_config.local_engines_only

File: vllm/v1/engine/coordinator.py (L69-75)

        front_publish_address = get_engine_client_zmq_addr(
            local_only=local_only, host=host
        )

        local_only_eng = dp_size == parallel_config.data_parallel_size_local
        back_publish_address = get_engine_client_zmq_addr(local_only_eng, host)
        back_output_address = get_engine_client_zmq_addr(local_only_eng, host)

File: vllm/config/parallel.py (L375-380)

    def local_engines_only(self) -> bool:
        """
        Client manages local+remote EngineCores in pure internal LB case.
        Client manages local EngineCores in hybrid and external LB case.
        """
        return self.data_parallel_external_lb or self.data_parallel_hybrid_lb

Coordinator 部署位置示例

1. 内部负载均衡模式 (默认)

场景:单节点或多节点部署,API Server 管理所有引擎

# 单节点部署 - DP=2
vllm serve $MODEL --data-parallel-size 2

Coordinator 位置:与 API Server 共址部署

  • local_engines_only = False
  • local_only = True
  • 使用 IPC 地址通信 1
# 生成的地址示例
front_publish_address = "ipc:///tmp/vllm_rpc/abc123"

2. 外部负载均衡模式

场景:每个 DP rank 独立部署,外部负载均衡器分发请求

# Node 0
vllm serve $MODEL --data-parallel-size 2 --data-parallel-rank 0 \
                  --data-parallel-address 192.168.1.100 --port 8000

# Node 1  
vllm serve $MODEL --data-parallel-size 2 --data-parallel-rank 1 \
                  --data-parallel-address 192.168.1.100 --port 8001

Coordinator 位置:与 DP rank 0 引擎共址

  • local_engines_only = True
  • local_only = False
  • 使用 TCP 地址通信 2
# 生成的地址示例
front_publish_address = "tcp://192.168.1.100:5555"

3. 混合负载均衡模式

场景:每个节点有本地 API Server,上游负载均衡器分发请求

# Node 0
vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 2 \
                  --data-parallel-start-rank 0 --data-parallel-hybrid-lb

# Node 1
vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 2 \
                  --data-parallel-start-rank 2 --data-parallel-hybrid-lb

Coordinator 位置:与本地前端进程共址

  • local_engines_only = True
  • local_only = False
  • 使用 TCP 地址进行跨节点通信 3

部署架构对比

模式 Coordinator 位置 通信方式 典型用例
内部 LB 与 API Server 共址 IPC (本地) 简化部署,中小规模
外部 LB 与 DP rank 0 共址 TCP (跨节点) 大规模,Kubernetes
混合 LB 与本地前端共址 TCP (跨节点) 多节点,避免单点瓶颈

Notes

内部负载均衡模式下,Coordinator 与前端进程共址部署可以使用高效的 IPC 通信,减少网络开销。而外部和混合模式需要跨节点通信,因此使用 TCP 地址。 4

Wiki pages you might want to explore:

Wiki pages you might want to explore:

Citations

File: vllm/v1/engine/coordinator.py (L66-75)

        # Assume coordinator is colocated with front-end procs when not in
        # either external or hybrid DP LB mode.
        local_only = not parallel_config.local_engines_only
        front_publish_address = get_engine_client_zmq_addr(
            local_only=local_only, host=host
        )

        local_only_eng = dp_size == parallel_config.data_parallel_size_local
        back_publish_address = get_engine_client_zmq_addr(local_only_eng, host)
        back_output_address = get_engine_client_zmq_addr(local_only_eng, host)

File: vllm/config/parallel.py (L375-380)

    def local_engines_only(self) -> bool:
        """
        Client manages local+remote EngineCores in pure internal LB case.
        Client manages local EngineCores in hybrid and external LB case.
        """
        return self.data_parallel_external_lb or self.data_parallel_hybrid_lb

File: docs/serving/data_parallel_deployment.md (L83-94)

## Hybrid Load Balancing

Hybrid load balancing sits between the internal and external approaches. Each node runs its own API server(s) that only queue requests to the data-parallel engines colocated on that node. An upstream load balancer (for example, an ingress controller or traffic router) spreads user requests across those per-node endpoints.

Enable this mode with `--data-parallel-hybrid-lb` while still launching every node with the global data-parallel size. The key differences from internal load balancing are:

- You must provide `--data-parallel-size-local` and `--data-parallel-start-rank` so each node knows which ranks it owns.
- Not compatible with `--headless` since every node exposes an API endpoint.
- Scale `--api-server-count` per node based on the number of local ranks

In this configuration, each node keeps scheduling decisions local, which reduces cross-node traffic and avoids single node bottlenecks at larger DP sizes.

这是一个非常好的问题,选择 zmq.ROUTER 类型是 ZMQ 中一个经过深思熟虑的、非常经典的设计模式,尤其适用于这种“一个中心服务对多个客户端”的场景。

简单来说,选择 zmq.ROUTER 的核心原因是:它能够让服务端(在这里是主节点的 launch_core_engines 进程)准确地知道消息是哪个客户端(哪个 Engine Core)发来的,并且能够精确地将回复发送回那个特定的客户端。

让我们来详细分解为什么 ROUTER 是最佳选择,并与其他类型进行对比。


zmq.ROUTER 套接字的特性

ROUTER 套接字(曾被称为 XREP)有两个关键特性,使其非常适合作为中心服务器:

  1. 自动添加身份帧 (Identity Frame):

    • 当一个客户端(通常是 DEALERREQ 类型)连接到 ROUTER 并发送消息时,ROUTER 在接收这个消息时,并不会直接收到消息内容。它会在消息的最前面自动添加一个额外的**“身份帧 (identity frame)”**。
    • 这个身份帧包含了发送方客户端的唯一标识。如果客户端没有设置 identity,ZMQ 会自动为其生成一个 UUID。在 vLLM 中,Engine Core 会将自己的 rank 设置为 identity
    • 所以,当 ROUTER 调用 recv_multipart() 时,它收到的消息结构是:[<identity_frame>, <empty_frame>, <message_content>]
    • 效果: 服务端可以通过读取第一个帧,准确地知道“这是谁发来的消息”
  2. 精确路由回复:

    • ROUTER 服务端想要回复一个消息时,它必须使用 send_multipart(),并且消息的格式必须是:[<identity_frame_of_recipient>, <empty_frame>, <reply_content>]
    • ROUTER 会读取第一个帧(身份帧),并确保这条回复消息只会被发送给具有该身份的那个客户端
    • 效果: 服务端可以与多个客户端同时通信,并且能够进行精确的、点对点的回复,而不会发生混淆。

为什么在 vLLM 的握手场景中 ROUTER 是必需的?

wait_for_engine_startup 的握手过程中,主节点需要处理以下任务:

  1. 接收报到: 多个 Engine Core(客户端)会几乎同时发来 “HELLO” 消息。主节点需要区分哪个 “HELLO” 来自 rank 0,哪个来自 rank 1,哪个来自远程的 rank 2…
  2. 发送配置: 在收到一个 Engine Core 的 “HELLO” 后,主节点需要将包含 DP Coordinator 地址的配置信息只发送回给那个特定的 Engine Core。如果把配置信息广播给所有 Engine Core,虽然在这个场景下问题不大,但不是一个好的设计,而且在其他场景下会引发严重问题。
  3. 接收就绪信号: 主节点需要接收每个 Engine Core 发来的 “READY” 消息,并更新对应 Engine Core 的状态。

ROUTER 套接字完美地满足了这些需求:

  • Engine Core (使用 DEALER 套接字) 发送 “HELLO” 时,handshake_socket (ROUTER) 收到的第一帧就是 Engine Core 的 rank (eng_identity)。
  • wait_for_engine_startup 函数可以据此知道是哪个 Engine Core 报到了。
  • 当它需要回复配置时,它将 eng_identity 作为 send_multipart() 的第一个帧,确保了配置信息被准确路由回正确的 Engine Core
# wait_for_engine_startup 简化逻辑

# 1. 接收消息,第一帧就是身份
eng_identity, ready_msg_bytes = handshake_socket.recv_multipart()

# 2. 根据身份解析
eng_index = int.from_bytes(eng_identity, "little")

# ... 处理消息 ...

# 3. 精确回复,将身份帧放在最前面
handshake_socket.send_multipart((eng_identity, init_message), copy=False)

与其他套接字类型的对比

  • 为什么不用 REP (Reply)?

    • REP 套接字强制执行严格的“请求-响应”(recv-send-recv-send...)循环。
    • REP 服务器一次只能处理一个请求。在收到一个请求后,它必须发送一个回复,然后才能接收下一个请求。
    • 这不适用于 vLLM 的场景,因为多个 Engine Core 会并发地发送 “HELLO” 消息。如果使用 REP,当第一个 Engine Core 发来 “HELLO” 后,在主节点回复之前,其他所有 Engine Core 的 “HELLO” 都会被阻塞,导致启动变慢甚至死锁。ROUTER 是完全异步的,可以随时接收来自任何客户端的消息。
  • 为什么不用 PULL?

    • PULL 套接字用于从多个 PUSH 源接收数据,形成一个单向的管道。
    • PULL 套接字无法知道消息的来源,也无法回复消息。它只是一个纯粹的、匿名的消息汇集点。
    • 这显然不满足握手过程中需要身份识别和点对点回复的需求。
  • 为什么客户端用 DEALER 而不是 REQ?

    • DEALERROUTER 的天然搭档。它也是完全异步的,可以随时发送消息,而不用等待回复。
    • REQ 套接字和 REP 一样,也强制执行 send-recv-send-recv... 的循环。如果 Engine Core 使用 REQ,它在发送 “HELLO” 之后就必须阻塞等待回复,然后才能发送 “READY”,这会限制异步能力。使用 DEALEREngine Core 可以在发送 “HELLO” 后继续做其他事,异步地等待回复。

结论

在 ZMQ 中,ROUTER-DEALER 组合是构建健壮、异步、多客户端的服务器的黄金搭档

在 vLLM 的握手场景中,主节点作为中心服务器,需要同时处理来自多个 Engine Core 客户端的连接和消息,并能对每个客户端进行独立的、精确的响应。zmq.ROUTER 是唯一能完美满足“识别来源”和“精确路由”这两个核心需求的套接字类型。

好的,我们来逐段详细解释这段代码的含义。这段代码位于 launch_core_engines 函数中,它的核心作用是确定当前这个 vLLM 进程需要与哪些 Engine Core 进程进行握手(handshake)

这个决策逻辑根据不同的部署模式(离线模式、内部负载均衡、外部负载均衡等)而有所不同。让我们分解来看:


上下文理解

在进入代码细节之前,先理解几个关键变量:

  • offline_mode: 是否为离线推理模式。在这种模式下,通常没有 API 服务器,每个进程独立工作。
  • dp_rank: 当前这个 vllm serve 进程在数据并行(DP)全局中的 rank(索引)。从 0 开始。
  • dp_size: 数据并行的总大小,即 Engine Core 的总数量。
  • local_engine_count: 将要在这个节点上启动的 Engine Core 的数量。
  • local_engines_only: 一个布尔值,用于区分不同的负载均衡模式。

if offline_mode:

if offline_mode:
    assert local_engine_count == 1
    engines_to_handshake = [CoreEngine(index=dp_rank, local=True)]
  • 含义: 这是处理离线推理的场景。
  • 逻辑: 在离线模式下,每个 DP 进程通常只管理一个与自己 rank 相同的 Engine Core。它们之间没有复杂的协调关系。
  • 代码解释:
    • assert local_engine_count == 1: 确认在这种模式下,本地只会启动一个 Engine Core
    • engines_to_handshake = [CoreEngine(index=dp_rank, local=True)]: 这段代码的意思是:“我是一个离线推理进程,我的 rank 是 dp_rank。我只需要和我自己启动的那个、与我同 rank 的 Engine Core 进行握手,确认它已准备就绪就行了。我不需要关心其他任何 Engine Core。”

elif dp_rank == 0:

elif dp_rank == 0:
    # Rank 0 holds Coordinator, so it handshakes with all Cores
    # in both external dplb and internal dplb mode.
    # Note this also covers the case where we have zero local engines
    # and rank 0 is headless.
    engines_to_handshake = [
        CoreEngine(index=i, local=(i < local_engine_count)) for i in range(dp_size)
    ]
  • 含义: 这是处理**主节点(rank 0)的场景。在任何数据并行部署中,rank 0 进程都扮演着“总协调者”**的角色。
  • 逻辑:
    • rank 0 进程负责启动 DP Coordinator
    • 它需要确保所有Engine Core(无论是在本地还是在远程节点上)都已成功启动并连接到协调系统。
    • 因此,它必须与集群中的每一个 Engine Core 进行握手。
  • 代码解释:
    • engines_to_handshake = [...] for i in range(dp_size): 创建一个包含所有 Engine Core 的列表,从 rank 0 到 dp_size - 1。这表明它期望收到来自所有 Engine Core 的报到。
    • local=(i < local_engine_count): 这是一个巧妙的判断。rank 0 进程知道前 local_engine_countEngine Core 是在它自己的节点上启动的(本地的),而其余的都是远程的。它会用这个 local 标志来追踪本地和远程 Engine Core 的启动进度。
    • 一个特殊情况: 注释中提到,这也覆盖了 rank 0 自身是 headless(即 local_engine_count 为 0)的情况。此时,rank 0 进程只作为协调者存在,它仍然需要与所有(此时全都是远程的)Engine Core 进行握手。

else: (即 dp_rank > 0)

else:
    # Rank > 0 handshakes with just the local cores it is managing.
    assert local_engines_only, (
        "Attempting to launch core_engines from dp_rank > 0, but "
        "found internal DPLB, which is incompatible."
    )
    engines_to_handshake = [
        CoreEngine(index=i, local=True)
        for i in range(dp_rank, dp_rank + local_engine_count)
    ]
  • 含义: 这是处理非主节点(rank > 0),也就是计算节点的场景。
  • 逻辑: 一个非主节点的 vllm serve 进程,其职责是管理好在它自己节点上启动的那些 Engine Core。它不需要像 rank 0 那样关心整个集群的状态。
  • 代码解释:
    • assert local_engines_only, ...: 这是一个重要的断言。它检查是否处于 local_engines_only 模式(通常意味着外部或混合负载均衡)。在这种模式下,一个 rank > 0 的进程是允许存在的,它只管理本地引擎。如果不是这种模式(即是内部负载均衡),那么所有的协调都应该由 rank 0 完成,rank > 0 的进程根本不应该走到这一步,所以会报错。
    • engines_to_handshake = [...] for i in range(dp_rank, dp_rank + local_engine_count): 这段代码的意思是:“我是一个计算节点,我的起始 rank 是 dp_rank,我负责启动 local_engine_countEngine Core。因此,我只需要与我负责的这些 Engine Core(它们的 rank 范围是 [dp_rank, dp_rank + local_engine_count))进行握手,确保它们都已准备好就行了。至于集群里的其他 Engine Core,那是 rank 0 的事,我不管。”
    • local=True: 因为这些 Engine Core 都是在本节点上启动的,所以 local 标志自然是 True

总结

这段代码是一个职责划分的逻辑开关。它根据当前进程的角色(离线工作者、集群总指挥、普通计算节点)来精确地定义它的“握手范围”:

  • 离线工作者: 只关心自己。
  • 集群总指挥 (rank 0): 必须关心所有人(所有 Engine Core)。
  • 普通计算节点 (rank > 0): 只关心自己手下的人(在自己节点上启动的 Engine Core)。

通过这种方式,vLLM 确保了在复杂的分布式部署中,每个进程都有明确的职责,启动和同步过程既完整又高效。

这是一个非常好的问题,因为它触及了理解这段代码的关键!

这里的**“我”指的是当前正在执行 launch_core_engines 这个函数的那个 vllm serve 进程**。

让我们把这个概念具象化。


场景重现

回想一下我们在多节点部署时执行的命令:

在主节点 (Node 0) 上,你输入了:

vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 2 \
                  --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345
  • 这个命令启动了一个进程。在这个进程的上下文中,dp_rank0 (默认值)。
  • 当这个进程执行到我们讨论的代码块时,它会进入 elif dp_rank == 0: 这个分支。
  • 这里的“我”就是主节点上的 vllm serve 进程。它的任务是与所有 4 个 Engine Core 握手。

在计算节点 (Node 1) 上,你输入了:

vllm serve $MODEL --headless --data-parallel-size 4 --data-parallel-size-local 2 \
                  --data-parallel-start-rank 2 \
                  --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345
  • 这个命令也启动了一个进程。在这个进程的上下文中,dp_rank2 (由 --data-parallel-start-rank 2 指定)。
  • 当这个进程执行到我们讨论的代码块时,它会进入 else: (dp_rank > 0) 这个分支。
  • 这里的“我”就是计算节点上的 vllm serve --headless 进程。它的任务是只与它自己负责的、从 rank 2 开始的 2 个 Engine Core(即 rank 2 和 rank 3)进行握手。

进程关系图

为了更清晰地理解,我们可以画一个简化的进程关系图:

+----------------------------------- Node 0 ------------------------------------+
|                                                                                |
|  +---------------------------+                                                 |
|  | vllm serve (dp_rank=0)    |  <--- "我" (主节点)                            |
|  | (The Coordinator Process) |                                                 |
|  +---------------------------+                                                 |
|          | (forks)                                                             |
|          +----------------------> +------------------+                          |
|          |                        | Engine Core (r=0)|  <--\                    |
|          | (forks)                +------------------+     |                    |
|          +----------------------> +------------------+     | 主节点的"我"      |
|                                   | Engine Core (r=1)|  <--+ 需要与这4个握手   |
|                                   +------------------+     |                    |
|                                                          <--/                    |
|                                                          <--\ (via network)      |
+--------------------------------------------------------------------------------+
                                                                                 |
+----------------------------------- Node 1 ------------------------------------+
|                                                                                |
|  +--------------------------------+                                            |
|  | vllm serve --headless (dp_rank=2)| <--- "我" (计算节点)                      |
|  | (The Worker Launcher Process)    |                                            |
|  +--------------------------------+                                            |
|          | (forks)                                                             |
|          +-------------------------> +------------------+                        |
|          |                         | Engine Core (r=2)| <--\ 计算节点的"我"   |
|          | (forks)                 +------------------+    | 只需要与这2个握手|
|          +-------------------------> +------------------+    |                      |
|                                    | Engine Core (r=3)| <--/                      |
|                                    +------------------+                          |
|                                                                                |
+--------------------------------------------------------------------------------+

总结一下:

  • vllm serve 命令本身会启动一个父进程/管理进程
  • 这个父进程会根据自己的 dp_rank 来确定自己的角色。
  • 这个父进程就是代码逻辑中的“我”。
  • 然后,“我”会创建(fork)它所负责的 Engine Core 子进程,并与它们进行握手,以确认它们已准备就绪。

所以,在 else: 分支中,“我”特指那个在计算节点上运行的、dp_rank > 0 的、通常是 --headlessvllm serve 进程。它的职责范围被限定在管理本节点的资源上。

bash vllm serve $MODEL --data-parallel-size 4 --data-parallel-size-local 2 \ --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345 […Truncated 6 lines…] vllm serve $MODEL --headless --data-parallel-size 4 --data-parallel-size-local 2 \ --data-parallel-start-rank 2 \ --data-parallel-address 10.0.0.1 --data-parallel-rpc-port 13345 — 这里的“我”就是计算节点上的 vllm serve --headless 进程。它的任务是只与它自己负责的、从 rank 2 开始的 2 个 Engine Core(即 rank 2 和 rank 3)进行握手。 节点上的哪个进程会分别与 Engincore进程握手。 节点0 已经和所有Engincore 握手了, 节点1 还会和本地的握手吗,为什么,更新到文档中

这是local 场景下节点1 要与其他节点通信
在这里插入图片描述

节点握手机制详解

在您描述的跨节点部署场景中,握手机制如下:

Node 0 (主节点) 的握手行为

Node 0 上的 launch_core_engines 函数会与所有 Engine Core 进行握手: 1

elif dp_rank == 0:
    # Rank 0 holds Coordinator, so it handshakes with all Cores
    # in both external dplb and internal dplb mode.
    engines_to_handshake = [
        CoreEngine(index=i, local=(i < local_engine_count)) for i in range(dp_size)
    ]

Node 1 (计算节点) 的握手行为

Node 1 上的 run_headless 进程创建 CoreEngineProcManager,该管理器负责与本地的 Engine Core (rank 2 和 3) 握手: 2

engine_manager = CoreEngineProcManager(
    target_fn=EngineCoreProc.run_engine_core,
    local_engine_count=local_engine_count,
    start_index=vllm_config.parallel_config.data_parallel_rank,
    local_start_index=0,
    vllm_config=vllm_config,
    local_client=False,
    handshake_address=handshake_address,
    executor_class=Executor.get_class(vllm_config),
    log_stats=not engine_args.disable_log_stats,
)

为什么 Node 1 还需要与本地 Engine Core 握手?

  1. 进程管理需要:Node 1 上的 run_headless 进程需要启动和管理本地的 Engine Core 进程
  2. 状态同步:确保本地 Engine Core 正确初始化并返回就绪状态
  3. 资源分配:验证本地 GPU 资源和 KV 缓存配置

完整握手流程

Engine Core 3 Engine Core 2 Node 1 (计算节点) Node 0 (主节点) Engine Core 3 Engine Core 2 Node 1 (计算节点) Node 0 (主节点) 启动时创建handshake socket run_headless进程启动 等待本地引擎就绪 监听握手地址 启动Engine Core进程 启动Engine Core进程 发送HELLO消息 发送HELLO消息 返回初始化消息 返回初始化消息 发送READY消息 发送READY消息 本地状态同步 本地状态同步

关键代码位置

  1. Node 0 握手逻辑3
  2. Node 1 引擎管理2
  3. 握手等待处理4

Notes

总结:Node 1 上的 run_headless 进程会与本地 Engine Core 进行握手,这是为了确保本地进程正确启动和初始化。虽然 Node 0 已经与所有 Engine Core 完成了握手用于建立通信连接,但 Node 1 仍需要与本地引擎进行握手来管理进程生命周期和状态同步。

Wiki pages you might want to explore:

Wiki pages you might want to explore:

Citations

File: vllm/v1/engine/utils.py (L864-880)

        # Rank 0 holds Coordinator, so it handshakes with all Cores
        # in both external dplb and internal dplb mode.
        # Note this also covers the case where we have zero local engines
        # and rank 0 is headless.
        engines_to_handshake = [
            CoreEngine(index=i, local=(i < local_engine_count)) for i in range(dp_size)
        ]
    else:
        # Rank > 0 handshakes with just the local cores it is managing.
        assert local_engines_only, (
            "Attempting to launch core_engines from dp_rank > 0, but "
            "found internal DPLB, which is incompatible."
        )
        engines_to_handshake = [
            CoreEngine(index=i, local=True)
            for i in range(dp_rank, dp_rank + local_engine_count)
        ]

File: vllm/v1/engine/utils.py (L937-950)

def wait_for_engine_startup(
    handshake_socket: zmq.Socket,
    addresses: EngineZmqAddresses,
    core_engines: list[CoreEngine],
    parallel_config: ParallelConfig,
    coordinated_dp: bool,
    cache_config: CacheConfig,
    proc_manager: CoreEngineProcManager | None,
    coord_process: Process | None,
):
    # Wait for engine core process(es) to send ready messages.
    local_count = parallel_config.data_parallel_size_local
    remote_count = len(core_engines) - local_count
    # [local, remote] counts

File: vllm/entrypoints/cli/serve.py (L199-209)

    engine_manager = CoreEngineProcManager(
        target_fn=EngineCoreProc.run_engine_core,
        local_engine_count=local_engine_count,
        start_index=vllm_config.parallel_config.data_parallel_rank,
        local_start_index=0,
        vllm_config=vllm_config,
        local_client=False,
        handshake_address=handshake_address,
        executor_class=Executor.get_class(vllm_config),
        log_stats=not engine_args.disable_log_stats,
    )

您观察得非常敏锐,并且准确地抓住了问题的关键!是的,您说得完全正确。

rank > 0vllm serve 进程只与它自己本地的 Engine Core 进行握手,这种行为是特定于某些部署模式的,特别是那些 local_engines_onlyTrue 的场景。

让我们来深入解析 assert local_engines_only 这句代码,因为它正是这个逻辑的“守门员”。


local_engines_only 是什么?

这个布尔标志 local_engines_onlyParallelConfig 的一个属性,它用来区分两种主要的多节点 DP 部署模式:

  1. 内部负载均衡 (Internal Load Balancing)

    • 在这种模式下,只有一个地方API ServerDP Coordinator,那就是 rank 0 的主节点
    • 主节点上的 API Server 会直接与所有Engine Core(包括远程节点上的)进行通信和负载均衡。
    • 在这种架构下,计算节点(rank > 0)上运行的 vllm serve --headless 进程的角色非常纯粹:它只需要启动本地的 Engine Core 进程,然后这些 Engine Core 进程会自己去和主节点报到。计算节点上的父进程(vllm serve --headless)在启动完子进程后,就不再参与后续的协调工作了。
    • 在这种模式下,local_engines_only 通常是 False
  2. 外部/混合负载均衡 (External/Hybrid Load Balancing)

    • 在这种模式下,架构更加去中心化。每个节点(或每组节点)可能会有自己的“小前端”或 API 入口。
    • 一个 rank > 0 的进程可能不仅仅是启动 Engine Core,它可能还需要运行一个本地的 API Server,这个 API Server 只服务于它本地的 Engine Core
    • 在这种情况下,这个 rank > 0 的进程就需要自己确认本地的 Engine Core 是否准备就绪,因为它要直接使用它们。
    • 在这种模式下,local_engines_only 会被设置为 True

assert local_engines_only 的作用

现在我们来看 else: 分支中的那句断言:

else: # (即 dp_rank > 0)
    # Rank > 0 handshakes with just the local cores it is managing.
    assert local_engines_only, (
        "Attempting to launch core_engines from dp_rank > 0, but "
        "found internal DPLB, which is incompatible."
    )
    # ...

这句代码的意思是:

“如果我是一个 rank > 0 的进程,并且我正准备启动 core_engines,那么我必须处于一个 local_engines_only 的模式下。如果我发现当前是内部负载均衡模式(local_engines_onlyFalse),那肯定是哪里搞错了,因为在内部负载均衡模式下,所有的协调和握手都应该由 rank 0 的主节点统一完成,我作为一个 rank > 0 的进程根本不应该走到这一步来尝试自己进行握手。所以,我必须立即报错退出。”

换句话说,rank > 0 的进程自己去和本地 Engine Core 握手的行为,是为“外部/混合负载均衡”这种更复杂的、分片式的部署模式设计的。

在最常见的内部负载均衡多节点部署场景中:

  • 主节点 (rank 0): 执行 elif dp_rank == 0: 分支,准备与所有 Engine Core 握手。
  • 计算节点 (rank > 0): 它的 vllm serve --headless 进程在 launch_core_engines 函数中会因为 assert local_engines_only 失败而报错吗?不会。 因为在内部负载均衡的典型设置下,rank > 0vllm serve --headless 进程的职责更加简化,它可能根本就不会调用到 launch_core_engines 这个复杂的协调函数,或者在调用时 local_engines_only 会是 True。它的主要任务就是根据命令行参数启动 CoreEngineProcManager,然后由 CoreEngineProcManager 启动的子进程自己去连接主节点。

结论

您完全正确。rank > 0 进程与它本地的 Engine Core 进行握手,是特定于 local_engines_only 场景的。

在标准的、集中的内部负载均衡模式下,所有握手和协调的“重担”都压在了 rank 0 主节点身上,而 rank > 0 的计算节点则扮演着更纯粹的“启动器”角色,它启动完本地的 Engine Core 之后,这些 Engine Core 就像被放出巢的信鸽,会自己飞向主节点去报到。这个 assert 语句正是保证了这种职责划分的正确性。

coord_socket.send(b"\x01")

HELLO/READY 握手机制

HELLO/READY 握手是 EngineCore 与 Frontend 进程 之间的通信机制,不是直接与 API Server。 1

Frontend 与 API Server 的区别

Frontend 进程的定义

Frontend 是一个更广泛的概念,根据部署场景可以指代不同的进程: 2

"""
Here, "front-end" process can mean the process containing the engine
core client (which is the API server process in the case the API
server is not scaled out), OR the launcher process running the
run_multi_api_server() function in serve.py.
"""

具体场景

  1. 单 API Server 场景:Frontend = API Server 进程
  2. 多 API Server 场景:Frontend = Launcher 进程 (run_multi_api_server)

握手流程

APIServer Frontend EngineCore APIServer Frontend EngineCore HELLO/READY 握手 Frontend管理API Server 发送 HELLO 消息 返回初始化消息(包含ZMQ地址) 发送 READY 消息 启动API Server进程 通过ZMQ发送请求

关键代码位置

  1. EngineCore 发送 HELLO3
  2. Frontend 处理握手4
  3. EngineCore 发送 READY5

Notes

总结:HELLO/READY 是 EngineCore 与其管理进程(Frontend)之间的握手协议。在大多数情况下,Frontend 就是 API Server 进程或管理 API Server 的 Launcher 进程。这个握手机制用于建立通信连接和交换初始化配置信息。 6

Wiki pages you might want to explore:

Citations

File: vllm/v1/engine/core.py (L834-837)

        Here, "front-end" process can mean the process containing the engine
        core client (which is the API server process in the case the API
        server is not scaled out), OR the launcher process running the
        run_multi_api_server() function in serve.py.

File: vllm/v1/engine/core.py (L892-912)

            # Send ready message.
            num_gpu_blocks = vllm_config.cache_config.num_gpu_blocks
            # We pass back the coordinator stats update address here for the
            # external LB case for our colocated front-end to use (coordinator
            # only runs with rank 0).
            dp_stats_address = self.frontend_stats_publish_address

            # Include config hash for DP configuration validation
            ready_msg = {
                "status": "READY",
                "local": local_client,
                "headless": headless,
                "num_gpu_blocks": num_gpu_blocks,
                "dp_stats_address": dp_stats_address,
            }
            if vllm_config.parallel_config.data_parallel_size > 1:
                ready_msg["parallel_config_hash"] = (
                    vllm_config.parallel_config.compute_hash()
                )

            handshake_socket.send(msgspec.msgpack.encode(ready_msg))

File: vllm/v1/engine/core.py (L914-950)

    @staticmethod
    def startup_handshake(
        handshake_socket: zmq.Socket,
        local_client: bool,
        headless: bool,
        parallel_config: ParallelConfig | None = None,
    ) -> EngineZmqAddresses:
        # Send registration message.
        handshake_socket.send(
            msgspec.msgpack.encode(
                {
                    "status": "HELLO",
                    "local": local_client,
                    "headless": headless,
                }
            )
        )

        # Receive initialization message.
        logger.debug("Waiting for init message from front-end.")
        if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
            raise RuntimeError(
                "Did not receive response from front-end "
                f"process within {HANDSHAKE_TIMEOUT_MINS} "
                f"minutes"
            )
        init_bytes = handshake_socket.recv()
        init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
            init_bytes, type=EngineHandshakeMetadata
        )
        logger.debug("Received init message: %s", init_message)

        if parallel_config is not None:
            for key, value in init_message.parallel_config.items():
                setattr(parallel_config, key, value)

        return init_message.addresses

File: vllm/v1/engine/utils.py (L990-1007)

        # Receive HELLO and READY messages from the input socket.
        eng_identity, ready_msg_bytes = handshake_socket.recv_multipart()
        eng_index = int.from_bytes(eng_identity, "little")
        engine = next((e for e in core_engines if e.identity == eng_identity), None)
        if engine is None:
            raise RuntimeError(
                f"Message from engine with unexpected data parallel rank: {eng_index}"
            )
        msg = msgspec.msgpack.decode(ready_msg_bytes)
        status, local, headless = msg["status"], msg["local"], msg["headless"]
        if local != engine.local:
            raise RuntimeError(
                f"{status} message from "
                f"{'local' if local else 'remote'} "
                f"engine {eng_index}, expected it to be "
                f"{'local' if engine.local else 'remote'}"
            )

File: vllm/v1/engine/utils.py (L1023-1044)

        if status == "HELLO" and engine.state == CoreEngineState.NEW:
            # Send init message with DP config info.
            init_message = msgspec.msgpack.encode(
                EngineHandshakeMetadata(
                    addresses=addresses,
                    parallel_config={
                        k: getattr(parallel_config, k)
                        for k in (
                            "data_parallel_master_ip",
                            "data_parallel_master_port",
                            "_data_parallel_master_port_list",
                            "data_parallel_size",
                        )
                    }
                    if coordinated_dp
                    else {},
                )
            )
            handshake_socket.send_multipart((eng_identity, init_message), copy=False)
            conn_pending[0 if local else 1] -= 1
            start_pending[0 if local else 1] += 1
            engine.state = CoreEngineState.CONNECTED

DP Coordinator 与其他进程的关系

DP Coordinator 与其他进程的通信关系如下:

1. 与 Engine Core 的关系

DP Coordinator 不直接与 Engine Core 进行 HELLO/READY 握手,而是通过订阅机制建立连接:

# Coordinator 等待所有引擎订阅
for _ in self.engines:
    if publish_back.recv() != b"\x01":
        logger.error("DP Coordinator received unexpected message...")
        return
# 发送 READY 消息给引擎
publish_back.send(b"READY")
```[1](#18-0) 

Engine Core 通过 XSUB socket 订阅 Coordinator:
```python
coord_socket.send(b"\x01")  # 发送订阅消息
assert coord_socket.recv() == b"READY"  # 等待 READY 消息
```[2](#18-1) 

### 2. 与 API Server 的关系

DP Coordinator 作为**发布者**,API Server 作为**订阅者**,使用 XPUB/XSUB 模式:

- Coordinator 发布统计信息给 API Server
- API Server 订阅负载均衡更新
- 不使用 HELLO/READY 握手,而是发布-订阅模式

### 3. 握手机制对比

| 进程对 | 握手方式 | 消息格式 |
|--------|----------|----------|
| Engine Core ↔ Frontend | HELLO/READY | JSON 格式 |
| Engine Core ↔ Coordinator | 订阅机制 | `\x01` / `READY` |
| Coordinator ↔ API Server | 发布-订阅 | msgpack 编码 |

### 4. 完整通信流程

```mermaid
sequenceDiagram
    participant EngineCore
    participant DPCoordinator
    participant APIServer
    
    Note over EngineCore,DPCoordinator: 订阅建立
    EngineCore->>DPCoordinator: 发送 \x01 (订阅)
    DPCoordinator->>EngineCore: 发送 READY
    
    Note over EngineCore,Frontend: HELLO/READY 握手
    EngineCore->>Frontend: 发送 HELLO
    Frontend->>EngineCore: 返回初始化消息
    EngineCore->>Frontend: 发送 READY
    
    Note over DPCoordinator,APIServer: 发布-订阅
    DPCoordinator->>APIServer: 发布负载统计
    APIServer->>DPCoordinator: 订阅更新

关键区别

DP Coordinator 使用的是订阅确认机制而非传统的 HELLO/READY 握手:

  • Engine Core 发送 \x01 表示订阅
  • Coordinator 响应 READY 表示接受订阅
  • 这是一种轻量级的连接建立方式,专门用于发布-订阅模式

而 Engine Core 与 Frontend 进程之间使用完整的 HELLO/READY 握手机制来交换配置信息和初始化参数。 3

Wiki pages you might want to explore:

Citations

File: vllm/v1/engine/coordinator.py (L189-198)

            # Wait until all engines subscribe.
            for _ in self.engines:
                if publish_back.recv() != b"\x01":
                    logger.error(
                        "DP Coordinator received unexpected message while "
                        "waiting for engines to subscribe"
                    )
                    return
            # Send ready message to engines.
            publish_back.send(b"READY")

File: vllm/v1/engine/core.py (L914-950)

    @staticmethod
    def startup_handshake(
        handshake_socket: zmq.Socket,
        local_client: bool,
        headless: bool,
        parallel_config: ParallelConfig | None = None,
    ) -> EngineZmqAddresses:
        # Send registration message.
        handshake_socket.send(
            msgspec.msgpack.encode(
                {
                    "status": "HELLO",
                    "local": local_client,
                    "headless": headless,
                }
            )
        )

        # Receive initialization message.
        logger.debug("Waiting for init message from front-end.")
        if not handshake_socket.poll(timeout=HANDSHAKE_TIMEOUT_MINS * 60_000):
            raise RuntimeError(
                "Did not receive response from front-end "
                f"process within {HANDSHAKE_TIMEOUT_MINS} "
                f"minutes"
            )
        init_bytes = handshake_socket.recv()
        init_message: EngineHandshakeMetadata = msgspec.msgpack.decode(
            init_bytes, type=EngineHandshakeMetadata
        )
        logger.debug("Received init message: %s", init_message)

        if parallel_config is not None:
            for key, value in init_message.parallel_config.items():
                setattr(parallel_config, key, value)

        return init_message.addresses

File: vllm/v1/engine/core.py (L1220-1235)

                # Send subscription message to coordinator.
                coord_socket.send(b"\x01")

            # Register sockets with poller.
            poller = zmq.Poller()
            for input_socket in input_sockets:
                # Send initial message to each input socket - this is required
                # before the front-end ROUTER socket can send input messages
                # back to us.
                input_socket.send(b"")
                poller.register(input_socket, zmq.POLLIN)

            if coord_socket is not None:
                # Wait for ready message from coordinator.
                assert coord_socket.recv() == b"READY"
                poller.register(coord_socket, zmq.POLLIN)

DP coordinator 进程和其他进程的关系 发布订阅,结合代码分析流程?

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐