简述

项目地址

https://github.com/GeeeekExplorer/nano-vllmhttps://github.com/GeeeekExplorer/nano-vllm核心定位: 教学级轻量 vLLM 实现,约 1,200 行 Python 代码复现工业级推理引擎核心能力

维度

信息

⭐ Star

12k+

🔧 Fork

1.7k+

📦 代码量

~1,200 行 Python

🚀 性能

与 vLLM 相当(RTX 4070 上吞吐量 1434 tok/s vs 1361 tok/s)

🎯 适用场景

学习研究、小模型部署、边缘计算、快速原型验证

官方给出的测试结果 RTX 4070 Laptop, Qwen3-0.6B

指标

vLLM

nano-vllm

说明

🔹 吞吐量

1361.84 tok/s

1434.13 tok/s

nano 略优,精简流水线减少 overhead

🔹 显存占用

~4.2 GB

~3.8 GB

无冗余依赖,内存管理更紧凑

🔹 启动时间

~15s

~3s

无复杂初始化,适合快速迭代

核心架构分层

┌─────────────────────────────────┐
│  API Layer (用户接口层)          │
│  • LLM (入口类)                  │
│  • SamplingParams (采样配置)     │
└─────────┬───────────────────────┘
          │
┌─────────▼───────────────────────┐
│  Core Engine Layer (核心引擎)    │
│  • LLMEngine: 请求生命周期管理   │
│  • ModelRunner: 分布式模型执行   │
│  • Scheduler: 动态批处理调度     │
└─────────┬───────────────────────┘
          │
┌─────────▼───────────────────────┐
│  Memory Layer (内存管理)         │
│  • BlockManager: PagedAttention │
│  • Block: KV 缓存分块单元        │
│  • Sequence: 序列状态封装        │
└─────────┬───────────────────────┘
          │
┌─────────▼───────────────────────┐
│  Model Layer (模型层)            │
│  • models/: Qwen3 等架构实现     │
│  • layers/: Attention/MLP 等算子 │
└─────────┬───────────────────────┘
          │
┌─────────▼───────────────────────┐
│  Infrastructure (基础设施)       │
│  • Config: 系统参数配置          │
│  • Context: 执行上下文传递       │
└─────────────────────────────────┘

代码结构梳理

nano-vllm/
├── nanovllm/
│   ├── __init__.py          # 暴露 LLM, SamplingParams
│   ├── config.py            # Config 数据类,参数校验
│   ├── llm.py               # LLM 入口,继承 LLMEngine
│   ├── sampling_params.py   # 采样参数定义
│   ├── engine/
│   │   ├── llm_engine.py    # LLMEngine: 核心编排器
│   │   ├── model_runner.py  # ModelRunner: 分布式执行
│   │   ├── scheduler.py     # Scheduler: 动态批处理
│   │   └── sequence.py      # Sequence: 序列状态管理
│   ├── memory/
│   │   └── block_manager.py # BlockManager + Block 实现
│   ├── models/
│   │   └── qwen3.py         # Qwen3ForCausalLM 架构
│   ├── layers/
│   │   ├── attention.py     # FlashAttention 封装
│   │   ├── mlp.py           # MLP 层实现
│   │   └── rms_norm.py      # 归一化算子
│   └── utils/
│       ├── context.py       # 全局上下文传递
│       └── loader.py        # 模型权重加载
├── example.py               # 快速使用示例
├── bench.py                 # 性能基准测试
└── pyproject.toml           # 依赖与构建配置

PagedAttention:KV Cache

传统 LLM 推理中,KV Cache 采用连续内存分配

文档1: [████████████████████] 20 tokens
文档2: [██████] 6 tokens  
文档3: [████████████] 12 tokens

问题:
• 内存碎片:短文档浪费空间(6 token 占 20 token 槽位)
• 无法复用:相同 prompt 重复计算
• 动态扩缩困难:预分配过大浪费,过小 OOM

PagedAttention 核心设计,将 KV Cache 拆分为固定大小的 Block(页),类似操作系统虚拟内存:

Block 大小 = 256 tokens(可配置)

文档1: [B0][B1][B2]  (3 blocks, 实际用 20 slots)
文档2: [B3]          (1 block, 实际用 6 slots)  
文档3: [B4][B5]      (2 blocks, 实际用 12 slots)

空闲块池: [B6][B7][B8]...
参考Block的实现
# nanovllm/memory/block_manager.py

class Block:
    """KV Cache 的最小分配单元"""
    def __init__(self, block_id: int, block_size: int, num_layers: int, 
                 num_heads: int, head_dim: int, dtype: torch.dtype):
        self.block_id = block_id
        self.block_size = block_size  # 每块容纳的 token 数
        
        # 预分配 KV 缓存张量: [num_layers, 2, block_size, num_heads, head_dim]
        # 2 表示 key 和 value
        self.kv_cache = torch.empty(
            num_layers, 2, block_size, num_heads, head_dim, 
            dtype=dtype, device="cuda"
        )
        self.ref_count = 0  # 引用计数,支持多序列共享
        
    def write(self, layer_idx: int, key: torch.Tensor, value: torch.Tensor, 
              slot_offset: int, num_tokens: int):
        """写入 KV: 支持部分写入(最后一个 block 可能未满)"""
        self.kv_cache[layer_idx, 0, slot_offset:slot_offset+num_tokens] = key
        self.kv_cache[layer_idx, 1, slot_offset:slot_offset+num_tokens] = value
        
    def read(self, layer_idx: int, slot_offset: int, num_tokens: int):
        """读取 KV"""
        return (
            self.kv_cache[layer_idx, 0, slot_offset:slot_offset+num_tokens],
            self.kv_cache[layer_idx, 1, slot_offset:slot_offset+num_tokens]
        )


class BlockManager:
    """KV Cache 的分配器 + 缓存管理器"""
    def __init__(self, config: Config):
        self.block_size = config.block_size  # 默认 256
        self.num_blocks = config.num_gpu_blocks  # 根据显存计算
        
        # 1. 初始化所有 Block(预分配显存,避免运行时分配开销)
        self.blocks = [
            Block(i, self.block_size, ...) for i in range(self.num_blocks)
        ]
        self.free_block_ids = deque(range(self.num_blocks))  # 空闲块池
        
        # 2. Prefix Caching: 哈希映射 token 序列 → block_id
        self.hash_to_block_id: Dict[int, int] = {}
        
    def compute_hash(self, token_ids: List[int], prefix_hash: Optional[int]) -> int:
        """计算 token 序列的哈希,用于前缀缓存匹配"""
        # 使用滚动哈希(Rabin-Karp),支持 O(1) 增量计算
        h = prefix_hash if prefix_hash else HASH_BASE
        for token in token_ids:
            h = (h * HASH_PRIME + token) % HASH_MOD
        return h
    
    def allocate(self, seq: Sequence) -> bool:
        """为序列分配 KV Cache blocks"""
        num_tokens = len(seq.token_ids)
        num_blocks_needed = (num_tokens + self.block_size - 1) // self.block_size
        
        # 1. 检查是否有足够空闲块
        if len(self.free_block_ids) < num_blocks_needed:
            return False  # 显存不足,触发 eviction 或拒绝请求
            
        # 2. 尝试 Prefix Caching:查找已计算的相同前缀
        prefix_hash = self.compute_hash(seq.token_ids, seq.prefix_hash)
        if prefix_hash in self.hash_to_block_id:
            # 缓存命中!复用已有 block,避免重复计算
            cached_block_id = self.hash_to_block_id[prefix_hash]
            self.blocks[cached_block_id].ref_count += 1
            seq.block_table.append(cached_block_id)
            return True
        
        # 3. 分配新 blocks
        for _ in range(num_blocks_needed):
            block_id = self.free_block_ids.popleft()
            block = self.blocks[block_id]
            block.ref_count += 1
            seq.block_table.append(block_id)
            
        # 4. 注册前缀哈希(用于后续请求复用)
        self.hash_to_block_id[prefix_hash] = seq.block_table[0]
        seq.prefix_hash = prefix_hash
        return True
    
    def append_token(self, seq: Sequence, new_token: int) -> bool:
        """Decode 阶段:为序列追加一个新 token"""
        # 1. 定位当前写入位置
        current_len = len(seq.token_ids)  # 追加前的长度
        block_idx = current_len // self.block_size
        slot_offset = current_len % self.block_size
        
        # 2. 如果当前 block 已满,分配新 block
        if slot_offset == 0 and block_idx > 0:  # 新 block 起始
            if not self.free_block_ids:
                return False  # 需要触发 eviction
            new_block_id = self.free_block_ids.popleft()
            seq.block_table.append(new_block_id)
            self.blocks[new_block_id].ref_count = 1
            
        return True
    
    def free(self, seq: Sequence):
        """释放序列占用的 blocks(引用计数归零时回收)"""
        for block_id in seq.block_table:
            block = self.blocks[block_id]
            block.ref_count -= 1
            if block.ref_count == 0:
                # 1. 从哈希表中移除(如果该 block 是前缀缓存)
                for h, bid in list(self.hash_to_block_id.items()):
                    if bid == block_id:
                        del self.hash_to_block_id[h]
                # 2. 回收到空闲池
                self.free_block_ids.append(block_id)
        seq.block_table.clear()

内存布局可视化

GPU 显存布局(简化):
┌─────────────────────────────┐
│ 模型权重 (只读)              │
│ [Layer0] [Layer1] ...       │
├─────────────────────────────┤
│ KV Cache Pool (可写)         │
│ ┌─────┬─────┬─────┬─────┐   │
│ │ B0  │ B1  │ B2  │ B3  │   │ ← 每个 Block 固定大小
│ │[K/V]│[K/V]│[K/V]│[K/V]│   │    256 tokens × heads × dims
│ └─────┴─────┴─────┴─────┘   │
│ ...                         │
├─────────────────────────────┤
│ 中间激活值 (临时)            │
│ FlashAttention workspace    │
└─────────────────────────────┘

逻辑映射:
Sequence A: token[0:256] → Block0, token[256:512] → Block1
Sequence B: token[0:100] → Block2 (部分使用)
Shared Prefix: token[0:50] → Block0 (ref_count=2)

设计选择

优势

代价

固定 Block 大小

内存分配 O(1),无碎片

最后一个 block 可能有内部碎片(平均浪费 < 50%)

引用计数 + 哈希缓存

支持 Prefix Caching,相同 prompt 零重复计算

哈希计算 + 查表开销(约 1-2% 延迟)

预分配所有 Blocks

避免运行时 CUDA malloc 开销

启动时显存占用略高,但可预测

PagedAttention 的本质是用少量计算开销(哈希+查表)换取显存利用率和缓存命中率的大幅提升,在 batch size 大、prompt 重复率高的场景收益显著

两阶段调度:Prefill vs Decode

为什么需要区分阶段?

阶段

计算特征

优化目标

Prefill

处理整个 prompt,计算量 O(n²),但只需执行 1 次

最大化并行度,吃满 GPU

Decode

每次生成 1 个 token,计算量 O(n),但需执行多次

最小化延迟,减少 CPU-GPU 同步

Scheduler 核心逻辑

# nanovllm/engine/scheduler.py

class Scheduler:
    def __init__(self, config: Config):
        self.waiting = deque()    # 等待进入预处理的请求
        self.running = deque()    # 正在生成的请求
        self.swapped = deque()    # 被换出到 CPU 的请求(本简化版未实现)
        
        self.max_num_batched_tokens = config.max_num_batched_tokens  # 批处理上限
        self.max_num_seqs = config.max_num_seqs  # 最大并发序列数
        
    def add_request(self, seq: Sequence):
        """接收新请求"""
        self.waiting.append(seq)
        
    def schedule(self) -> Tuple[List[Sequence], bool]:
        """
        调度决策:选择下一批执行的序列
        Returns: (scheduled_seqs, is_prefill_phase)
        """
        scheduled_seqs = []
        
        # ── 策略 1: 优先处理 Prefill 阶段的新请求 ──
        # 原因: Prefill 计算量大,尽早完成可释放显存
        while self.waiting and self._can_schedule(scheduled_seqs, is_prefill=True):
            seq = self.waiting[0]
            
            # 1. 检查 KV Cache 是否足够
            if not self.block_manager.can_allocate(seq):
                break  # 显存不足,等待其他序列释放
                
            # 2. 分配资源并转移状态
            self.block_manager.allocate(seq)
            self.waiting.popleft()
            self.running.append(seq)
            scheduled_seqs.append(seq)
            seq.status = SequenceStatus.RUNNING_PREFILL
            
        if scheduled_seqs:
            return scheduled_seqs, True  # is_prefill=True
            
        # ── 策略 2: 处理 Decode 阶段的增量生成 ──
        # 原因: Decode 延迟敏感,需保证高优先级序列及时响应
        while self.running and self._can_schedule(scheduled_seqs, is_prefill=False):
            seq = self.running[0]
            
            # 1. 检查是否能追加新 token(block 是否满)
            if not self.block_manager.can_append(seq):
                # 尝试 evict 低优先级序列(简化版直接跳过)
                break
                
            scheduled_seqs.append(seq)
            seq.status = SequenceStatus.RUNNING_DECODE
            
        return scheduled_seqs, False  # is_prefill=False
    
    def _can_schedule(self, current_batch: List[Sequence], is_prefill: bool) -> bool:
        """检查是否还能添加更多序列到当前 batch"""
        if len(current_batch) >= self.max_num_seqs:
            return False
            
        # 计算当前 batch 的总 token 数
        total_tokens = sum(
            len(seq.token_ids) if is_prefill else 1  # Decode 阶段每序列只算 1 个新 token
            for seq in current_batch
        )
        
        # 预估新序列的 token 消耗
        new_seq = self.waiting[0] if is_prefill else self.running[0]
        new_tokens = len(new_seq.token_ids) if is_prefill else 1
        
        return total_tokens + new_tokens <= self.max_num_batched_tokens

动态批处理示例

时间线: t0 → t1 → t2 → t3

t0: 
  waiting: [ReqA(prompt=50t), ReqB(prompt=30t)]
  running: []
  → 调度: ReqA + ReqB 一起 Prefill (总 80t < max_16384)
  
t1:
  waiting: [ReqC(prompt=100t)]
  running: [ReqA(gen=1), ReqB(gen=1)]  # 进入 Decode
  → 调度: 优先 Decode(延迟敏感),ReqC 等待
  
t2:
  running: [ReqA(gen=10, done), ReqB(gen=10), ReqC(prompt=100t)]
  → ReqA 完成释放显存
  → 调度: ReqC 进入 Prefill + ReqB 继续 Decode (混合 batch)
  
t3:
  running: [ReqB(gen=50), ReqC(gen=1)]
  → 纯 Decode 阶段,小 batch 高频调度

关键优化:Continuous Batching

# ModelRunner 中处理混合 batch 的核心
def execute_model(self, seqs: List[Sequence], is_prefill: bool):
    # 1. 构建输入张量(处理变长序列)
    if is_prefill:
        # Prefill: 所有 token 一起处理 → 需 padding 或 varlen 支持
        input_ids = [seq.token_ids for seq in seqs]  # List[List[int]]
        positions = [list(range(len(ids))) for ids in input_ids]
        
        # 使用 cu_seqlens 支持变长序列 FlashAttention
        cu_seqlens = [0]
        for ids in input_ids:
            cu_seqlens.append(cu_seqlens[-1] + len(ids))
        context.cu_seqlens_q = torch.tensor(cu_seqlens, dtype=torch.int32, device="cuda")
        
    else:
        # Decode: 每个序列只取最后一个 token → 无需 padding
        input_ids = [[seq.token_ids[-1]] for seq in seqs]  # [[token], [token], ...]
        positions = [[len(seq.token_ids)-1] for seq in seqs]
        # 记录每个序列的 context 长度,用于 KV Cache 索引
        context.context_lens = torch.tensor(
            [len(seq.token_ids)-1 for seq in seqs], dtype=torch.int32, device="cuda"
        )
    
    # 2. 执行模型前向传播(FlashAttention 内部处理变长逻辑)
    output = self.model(input_ids, positions, is_prefill)
    
    # 3. 采样并更新序列状态
    for i, seq in enumerate(seqs):
        next_token = sample(output[i], seq.sampling_params)
        seq.append_token(next_token)
        
        # 检查是否生成结束
        if next_token == seq.sampling_params.eos_token_id:
            seq.status = SequenceStatus.FINISHED
            self.block_manager.free(seq)  # 立即释放 KV Cache

不等待所有序列完成,而是动态地将完成/新到达的请求插入执行流,最大化 GPU 利用率,降低平均延迟。

FlashAttention 集成:IO 感知的注意力计算

传统 Attention 的瓶颈

# 标准 Self-Attention (O(n²) 内存)
def naive_attention(q, k, v):
    scores = q @ k.transpose(-2, -1) / sqrt(d)  # [B, H, n, n] ← 显存爆炸!
    probs = softmax(scores, dim=-1)
    return probs @ v

问题:当序列长度 n=4096 时,score 矩阵需要 4096²×4bytes ≈ 64MB/头,多 head+batch 时显存不足。

FlashAttention 核心思想

分块计算 + 在线 softmax + 重计算,将内存复杂度从 O(n²) 降为 O(n):

传统: 计算完整 score 矩阵 → softmax → 加权求和
Flash: 
  for 每个 query 块:
    for 每个 key/value 块:
      1. 加载小块 K/V 到 SRAM
      2. 计算局部 score + softmax (在线更新全局统计量)
      3. 累加输出
      4. 丢弃小块 K/V (不保存完整 score 矩阵)

nano-vllm 中的 FlashAttention 封装

# nanovllm/layers/attention.py

class Attention(nn.Module):
    def __init__(self, config: Config):
        super().__init__()
        self.num_heads = config.num_attention_heads
        self.head_dim = config.hidden_size // self.num_heads
        self.block_size = config.block_size  # PagedAttention 的 block 大小
        
        # 线性层
        self.qkv_proj = nn.Linear(config.hidden_size, 3 * config.hidden_size)
        self.o_proj = nn.Linear(config.hidden_size, config.hidden_size)
        
    def forward(self, hidden_states: torch.Tensor, 
                context: ExecutionContext) -> torch.Tensor:
        # 1. 投影到 Q/K/V
        qkv = self.qkv_proj(hidden_states)  # [total_tokens, 3*hidden]
        q, k, v = qkv.chunk(3, dim=-1)
        q = q.view(-1, self.num_heads, self.head_dim)
        k = k.view(-1, self.num_heads, self.head_dim)
        v = v.view(-1, self.num_heads, self.head_dim)
        
        # 2. 根据阶段选择 FlashAttention 变体
        if context.is_prefill:
            # ── Prefill: 变长序列,使用 varlen FlashAttention ──
            # cu_seqlens: [0, len1, len1+len2, ...] 标记每个序列边界
            output = flash_attn_varlen_func(
                q, k, v,
                cu_seqlens_q=context.cu_seqlens_q,  # query 序列边界
                cu_seqlens_k=context.cu_seqlens_k,  # key 序列边界(通常相同)
                max_seqlen_q=context.max_seqlen_q,   # 最长序列长度
                max_seqlen_k=context.max_seqlen_k,
                causal=True,  # 自回归掩码
                softmax_scale=1.0 / (self.head_dim ** 0.5)
            )
        else:
            # ── Decode: 单 token 生成,使用 kvcache 优化版本 ──
            # 关键: k/v 不是传入的张量,而是从 PagedAttention 的 block 中读取
            output = flash_attn_with_kvcache(
                q.unsqueeze(1),  # [batch, 1, heads, dim]
                k_cache=self.k_cache,  # [num_blocks, 2, block_size, heads, dim]
                v_cache=self.v_cache,
                cache_seqlens=context.context_lens,  # 每个序列当前长度
                causal=True,
                softmax_scale=1.0 / (self.head_dim ** 0.5)
            )
            output = output.squeeze(1)  # [batch, heads, dim]
            
        # 3. 输出投影
        output = output.reshape(-1, self.num_heads * self.head_dim)
        return self.o_proj(output)

FlashAttention 与 PagedAttention 的协同

Decode 阶段数据流:
1. Scheduler 决定执行哪些序列
2. BlockManager 提供每个序列的 block_table + 当前写入位置
3. Attention.forward 调用 flash_attn_with_kvcache:
   a. 根据 cache_seqlens 计算每个 token 在哪个 block 的哪个 slot
   b. Triton kernel 按需加载对应的 K/V blocks 到 SRAM
   c. 分块计算 attention,避免加载整个 KV Cache
4. 输出新 token,更新 block_manager 的写入位置

关键协同点:
• PagedAttention 提供"逻辑地址 → 物理 block"的映射
• FlashAttention 提供"按需加载 + 分块计算"的执行引擎
• 两者结合实现: 大 context + 高并发 + 低显存占用

性能对比(理论)

方案

显存占用

计算效率

适用场景

标准 Attention

O(n²)

高(但 OOM 限制 n)

n < 2048

FlashAttention v1

O(n)

中(需重计算)

n ≤ 8192

FlashAttention v2 + Paged

O(n) + 分页开销

高(IO 优化 + 缓存复用)

n ≤ 32768+

nano-vllm 的选择:集成 flash-attn 库的 v2 版本,通过 flash_attn_varlen_funcflash_attn_with_kvcache 两个 API 分别覆盖 Prefill/Decode 场景,在保持代码简洁的同时获得工业级性能。

张量并行(Tensor Parallelism):多卡分布式推理

单卡显存限制(如 RTX 4090 24GB)无法加载大模型(如 Qwen-7B 需 ~14GB 权重 + KV Cache),需将模型切分到多卡

nano-vllm 的 TP 实现(简化版)

# nanovllm/engine/model_runner.py

class ModelRunner:
    def __init__(self, config: Config, rank: int, event: multiprocessing.Event):
        self.rank = rank
        self.world_size = config.tensor_parallel_size
        
        # 1. 初始化 NCCL 通信组
        if self.world_size > 1:
            dist.init_process_group(
                backend="nccl",
                init_method="tcp://localhost:2333",  # 简化:单机多卡
                world_size=self.world_size,
                rank=rank
            )
            torch.cuda.set_device(rank)  # 每张卡绑定一个 GPU
            
        # 2. 加载模型(按 rank 切分权重)
        self.model = self._load_sharded_model(config, rank)
        
        # 3. 等待所有进程就绪
        event.wait()
        
    def _load_sharded_model(self, config: Config, rank: int):
        """按列/行切分线性层权重"""
        model = Qwen3ForCausalLM(config)
        
        for name, module in model.named_modules():
            if isinstance(module, nn.Linear):
                weight = module.weight.data  # [out_features, in_features]
                
                # Column Parallel: 切分输出维度 (qkv_proj, o_proj 的输出)
                if self._is_column_parallel(name):
                    # 将 out_features 按 rank 切分
                    chunk_size = weight.shape[0] // self.world_size
                    start = rank * chunk_size
                    end = start + chunk_size if rank < self.world_size-1 else weight.shape[0]
                    module.weight.data = weight[start:end].clone()
                    module.out_features = chunk_size  # 更新元信息
                    
                # Row Parallel: 切分输入维度 (o_proj 的输入)
                elif self._is_row_parallel(name):
                    chunk_size = weight.shape[1] // self.world_size
                    start = rank * chunk_size
                    end = start + chunk_size if rank < self.world_size-1 else weight.shape[1]
                    module.weight.data = weight[:, start:end].clone()
                    
        return model.cuda()
    
    def forward(self, input_ids: torch.Tensor, positions: torch.Tensor, 
                is_prefill: bool) -> torch.Tensor:
        # 1. 本地前向传播(只计算本卡负责的部分)
        hidden = self.model(input_ids, positions, is_prefill)  # [tokens, local_hidden]
        
        # 2. 如果是 Row Parallel 层输出,需要 all-reduce 汇总
        if self._needs_all_reduce:
            dist.all_reduce(hidden, op=dist.ReduceOp.SUM)  # 多卡结果求和
            
        return hidden

张量并行切分示意图(以 QKV 投影为例)

原始 Linear: out=3072, in=2048
[████████████████████████] 3072 × 2048

TP=2 切分:
Rank 0: [████████████] 1536 × 2048  ← 计算 Q0,K0,V0
Rank 1: [████████████] 1536 × 2048  ← 计算 Q1,K1,V1

Attention 计算:
• 每个 rank 独立计算自己的 Q/K/V attention
• 输出拼接: [O0, O1] → 全连接层输入

Row Parallel (o_proj):
Rank 0: [████] 2048 × 1536  ← 处理 O0 部分
Rank 1: [████] 2048 × 1536  ← 处理 O1 部分
→ all-reduce 求和得到最终输出

nano-vllm 的 TP 实现是教学级简化,与工业版 vLLM 相比:

特性

nano-vllm

工业 vLLM

通信优化

基础 all-reduce

NCCL 异步 + 通信计算重叠

负载均衡

均匀切分

考虑层间差异的动态调度

容错

支持节点故障恢复

多节点

仅单机多卡

支持多机多卡(Ray 调度)

通过 50 行代码理解 TP 的核心思想——权重切分 + 通信同步

CUDA Graph:减少 CPU 调度开销

传统 PyTorch 执行流程:

CPU: 解析 Python 代码 → 构建计算图 → 启动 CUDA kernel → 等待完成 → 下一轮
     ↑ 每步都有 CPU-GPU 同步开销,Decode 阶段高频调用时成为瓶颈

UDA Graph 核心思想

将静态计算图捕获为 GPU 原生指令序列,后续执行时 CPU 只需触发一次 launch:

首次执行(Capture 阶段):
CPU: 记录所有 kernel launch + 内存操作 → 生成 CUDA Graph

后续执行(Replay 阶段):
CPU: graph.replay()  ← 单次系统调用
GPU: 按预录指令流执行,无 CPU 干预

nano-vllm 中的 CUDA Graph 集成

# nanovllm/engine/model_runner.py

class ModelRunner:
    def __init__(self, config: Config, ...):
        self.enforce_eager = config.enforce_eager
        self.cuda_graphs = {}  # batch_size → graph
        
    def capture_cudagraph(self, batch_size: int):
        """捕获指定 batch size 的静态计算图"""
        # 1. 准备静态输入(占位符,实际推理时替换)
        input_ids = torch.zeros(batch_size, 1, dtype=torch.long, device="cuda")
        positions = torch.zeros(batch_size, 1, dtype=torch.long, device="cuda")
        
        # 2. 创建 CUDA Graph
        self.cuda_graphs[batch_size] = torch.cuda.CUDAGraph()
        with torch.cuda.graph(self.cuda_graphs[batch_size]):
            static_output = self.model(input_ids, positions, is_prefill=False)
        
        # 3. 保存静态张量引用(用于 replay 时更新输入)
        self.static_inputs[batch_size] = {"input_ids": input_ids, "positions": positions}
        self.static_outputs[batch_size] = static_output
        
    def run_model(self, input_ids: torch.Tensor, positions: torch.Tensor, 
                  is_prefill: bool) -> torch.Tensor:
        batch_size = input_ids.shape[0]
        
        # 小 batch decode + 非 eager 模式 → 尝试复用 CUDA Graph
        if not is_prefill and not self.enforce_eager and batch_size in self.cuda_graphs:
            # 1. 更新静态输入张量的内容(零拷贝)
            self.static_inputs[batch_size]["input_ids"].copy_(input_ids)
            self.static_inputs[batch_size]["positions"].copy_(positions)
            
            # 2. 重放计算图(CPU 开销 ~10μs vs 传统 ~100μs)
            self.cuda_graphs[batch_size].replay()
            
            # 3. 返回静态输出(内容已更新)
            return self.static_outputs[batch_size].clone()  # 避免后续修改污染
        else:
            # Prefill 或大 batch → 动态执行(无法捕获静态图)
            return self.model(input_ids, positions, is_prefill)

适用条件与收益

条件

是否适用 CUDA Graph

原因

✅ batch size 固定

计算图结构不变

✅ Decode 阶段

每次输入 1 token,计算模式稳定

❌ Prefill 阶段

序列长度可变,图结构动态

❌ 动态 control flow

if/while 导致图结构变化

协同工作全景

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐