目录

  1. 生活化理解:为什么需要Frame Queue?
  2. 无锁队列的设计原理
  3. Frame Queue的数据结构
  4. 无锁队列的核心机制
  5. 入队操作(Enqueue)详解
  6. 出队操作(Dequeue)详解
  7. 内存屏障和原子操作
  8. 缓存行对齐和False Sharing
  9. 拥塞控制和流控
  10. 性能优化技巧
  11. 实际应用场景
  12. 总结

可能需要的预备知识


内存屏障、内存顺序、CAS详解:从CPU运行机制到C语言实现

生活化理解:为什么需要Frame Queue?

场景:多线程银行系统

想象一下:

你是一个银行系统的设计师,有多个窗口(Worker线程)同时处理业务:

问题场景:

窗口1(Worker 1):处理客户A的业务
窗口2(Worker 2):处理客户B的业务

问题:
- 客户A的业务需要访问窗口2的数据(客户B的账户信息)
- 窗口1不能直接访问窗口2的数据(线程安全)
- 需要一种机制,让窗口1把业务"转送"给窗口2

解决方案:Frame Queue(业务转送箱)

窗口1(Worker 1):
  1. 把业务放入"转送箱"(Frame Queue)
  2. 通知窗口2:"有业务来了!"

窗口2(Worker 2):
  1. 检查"转送箱"
  2. 取出业务
  3. 处理业务

关键要求:

  1. 无锁设计:多个窗口同时操作转送箱,不能加锁(性能要求)
  2. 线程安全:确保业务不会丢失或重复
  3. 高性能:转送速度要快,不能成为瓶颈
  4. 批量处理:一次可以转送多个业务(Frame包含多个数据包)

VPP中的实际场景

NAT44-EI多Worker场景:

Worker 1:处理IN2OUT方向的数据包
Worker 2:处理OUT2IN方向的数据包

问题:
- Worker 1处理的数据包,需要handoff到Worker 2(因为会话在Worker 2上)
- Worker 2处理的数据包,需要handoff到Worker 1(因为会话在Worker 1上)

解决方案:Frame Queue
- Worker 1把数据包放入Frame Queue
- Worker 2从Frame Queue取出数据包
- 无锁设计,高性能

无锁队列的设计原理

什么是无锁队列?

生活化理解:

就像"不需要钥匙的转送箱":

  • 传统队列:需要加锁,就像需要钥匙的箱子(慢)
  • 无锁队列:使用原子操作,就像不需要钥匙的箱子(快)

技术定义:

无锁队列(Lock-Free Queue)是一种并发数据结构:

  • 不使用锁:通过原子操作和内存屏障实现线程安全
  • CAS操作:Compare-And-Swap,原子地比较和交换
  • 内存屏障:确保操作顺序和可见性

为什么无锁队列更快?

生活化理解:

就像"高速公路 vs 红绿灯":

  • 有锁队列:像有红绿灯的路口,每次都要等待(慢)
  • 无锁队列:像高速公路,直接通过(快)

性能对比:

操作 有锁队列 无锁队列
加锁/解锁开销 ~100ns 0ns
上下文切换 可能发生 不发生
缓存失效 频繁 较少
吞吐量 较低 较高

Frame Queue的设计选择

为什么选择无锁队列?

  1. 高性能要求:VPP需要处理百万级数据包/秒
  2. 多Worker场景:多个Worker线程频繁handoff
  3. 低延迟要求:handoff延迟必须最小化
  4. 避免锁竞争:锁会成为性能瓶颈

Frame Queue的数据结构

核心数据结构

代码实现(src/vlib/threads.h 第106-123行):

typedef struct
{
  /* static data */
  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
  vlib_frame_queue_elt_t *elts;  // 队列元素数组
  u64 vector_threshold;            // 向量阈值
  u64 trace;                       // 跟踪标志
  u32 nelts;                       // 元素数量(必须是2的幂)

  /* modified by enqueue side (生产者) */
  CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
  volatile u64 tail;              // 队尾指针(生产者写入)

  /* modified by dequeue side (消费者) */
  CLIB_CACHE_LINE_ALIGN_MARK (cacheline2);
  volatile u64 head;               // 队头指针(消费者读取)
}
vlib_frame_queue_t;

生活化理解:

转送箱结构:
┌─────────────────────────────────────┐
│ 转送箱信息(大小、阈值等)            │ ← cacheline0
├─────────────────────────────────────┤
│ 队尾指针(tail)                     │ ← cacheline1(生产者写入)
├─────────────────────────────────────┤
│ 队头指针(head)                     │ ← cacheline2(消费者读取)
├─────────────────────────────────────┤
│ 格子1  │ 格子2  │ 格子3  │ ...      │ ← elts数组
└─────────────────────────────────────┘

队列元素结构

代码实现(src/vlib/threads.h 第40-68行):

typedef struct
{
  u32 buffer_index[VLIB_FRAME_SIZE];  // 数据包索引数组
  u32 aux_data[VLIB_FRAME_SIZE];       // 辅助数据数组(可选)
  u32 n_vectors;                       // 向量数量
  u32 offset;                          // 偏移量(用于部分处理)
  u8 valid;                            // 有效标志(原子操作)
  u8 maybe_trace;                      // 跟踪标志
  u8 end_of_reset[0];                  // 重置结束标记
}
vlib_frame_queue_elt_t;

生活化理解:

转送箱的格子(elt):
┌─────────────────────────────────────┐
│ 业务1 │ 业务2 │ 业务3 │ ... │ 业务N │ ← buffer_index(数据包索引)
├─────────────────────────────────────┤
│ 辅助1 │ 辅助2 │ 辅助3 │ ... │ 辅助N │ ← aux_data(辅助数据)
├─────────────────────────────────────┤
│ 业务数量:N                          │ ← n_vectors
│ 已处理:M                            │ ← offset
│ 有效标志:✓                           │ ← valid(原子操作)
└─────────────────────────────────────┘

Frame Queue Main结构

代码实现(src/vlib/threads.h 第125-139行):

typedef struct vlib_frame_queue_main_t_
{
  u32 node_index;                      // 目标节点索引
  u32 frame_queue_nelts;               // Frame Queue元素数

  vlib_frame_queue_t **vlib_frame_queues;  // 每个Worker的Frame Queue数组

  /* for frame queue tracing */
  frame_queue_trace_t *frame_queue_traces;
  frame_queue_nelt_counter_t *frame_queue_histogram;
  vlib_frame_queue_dequeue_fn_t *frame_queue_dequeue_fn;  // 出队函数
} vlib_frame_queue_main_t;

生活化理解:

Frame Queue Main(转送箱管理系统):
┌─────────────────────────────────────┐
│ 目标窗口:窗口3                      │ ← node_index(数据包handoff后的目标节点)
│ 转送箱大小:64个格子                 │ ← frame_queue_nelts
├─────────────────────────────────────┤
│ 窗口1的转送箱(窗口1作为消费者)      │ ← vlib_frame_queues[0]
│ 窗口2的转送箱(窗口2作为消费者)      │ ← vlib_frame_queues[1]
│ 窗口3的转送箱(窗口3作为消费者)      │ ← vlib_frame_queues[2]
│ ...                                  │
└─────────────────────────────────────┘

关键理解:
- 每个Worker有自己的Queue(作为消费者)
- 多个Worker可能同时写入同一个目标Worker的Queue(多生产者)
- 例如:窗口1和窗口2都可能往窗口3的转送箱放业务

关键点:

  • 每个Worker一个Queue:每个Worker线程有自己的Frame Queue(作为消费者)
  • 多生产者场景:多个Worker可能同时写入同一个目标Worker的Queue
  • 无锁设计:使用CAS操作处理多生产者竞争,单一消费者无需CAS

无锁队列的核心机制

环形队列(Ring Buffer)

生活化理解:

就像"循环的传送带":

  • 固定大小:传送带有固定数量的格子
  • 循环使用:当到达末尾时,回到开头
  • 高效:不需要移动数据,只需要移动指针

技术实现:

// 队列大小必须是2的幂(nelts = 2^n)
u32 mask = nelts - 1;  // 例如:nelts=64, mask=63 (0b111111)

// 计算索引(使用位运算,比取模快)
u32 index = tail & mask;  // 例如:tail=65, index=1 (65 & 63 = 1)

为什么必须是2的幂?

// 2的幂:nelts = 64 = 0b1000000
// mask = 63 = 0b0111111
// tail & mask = tail % nelts(位运算比取模快)

// 非2的幂:nelts = 65
// 需要:tail % 65(取模运算,慢)

代码验证(src/vlib/threads.c 第362-366行):

if (nelts & (nelts - 1))
  {
    fformat (stderr, "FATAL: nelts MUST be a power of 2\n");
    abort ();
  }

生产者-消费者模式

生活化理解:

多个生产者(Producer):多个Worker线程(Worker 1, 2, 3...)
  ↓
  都放入业务到同一个转送箱(目标Worker的Queue)
  ↓
Frame Queue(转送箱)
  ↓
  取出业务
  ↓
单一消费者(Consumer):目标Worker线程

关键特性:

  1. 多生产者:多个Worker(生产者)可能同时写入同一个目标Worker的Queue
    • Worker 1可能写入Worker 3的Queue
    • Worker 2也可能写入Worker 3的Queue
    • 需要CAS操作确保线程安全
  2. 单一消费者:每个Queue只有一个消费者(目标Worker)
    • Worker 3的Queue只能由Worker 3读取
    • 不需要CAS操作(只有一个消费者)
  3. 无锁设计:使用CAS操作和内存屏障实现线程安全,避免传统锁

无锁同步机制

生活化理解:

就像"多个人同时往同一个转送箱放业务,一个人取业务":

  • 多生产者:多个Worker同时写入tail位置(队尾),使用CAS操作避免竞争
  • 单一消费者:目标Worker读取head位置(队头),无需CAS(只有一个消费者)
  • 不冲突:tail和head不会相遇(除非队列满/空)
  • CAS保护:多个生产者写入tail时,CAS确保只有一个成功

技术实现:

// 生产者:写入tail位置
tail = __atomic_load_n(&fq->tail, __ATOMIC_ACQUIRE);
elt = fq->elts + (tail & mask);
// ...写入数据...
__atomic_store_n(&fq->tail, tail + 1, __ATOMIC_RELEASE);

// 消费者:读取head位置
head = __atomic_load_n(&fq->head, __ATOMIC_ACQUIRE);
elt = fq->elts + ((head + 1) & mask);
// ...读取数据...
__atomic_store_n(&fq->head, head + 1, __ATOMIC_RELEASE);

入队操作(Enqueue)详解

入队流程

生活化理解:

1. 检查转送箱是否有空位
2. 如果有空位,放入业务
3. 标记业务为"有效"
4. 更新队尾指针
5. 通知目标窗口

核心函数:vlib_get_frame_queue_elt

代码实现(src/vlib/buffer_funcs.c 第257-287行):

static inline vlib_frame_queue_elt_t *
vlib_get_frame_queue_elt (vlib_frame_queue_main_t *fqm, u32 index,
			  int dont_wait)
{
  vlib_frame_queue_t *fq;
  u64 nelts, tail, new_tail;

  // 1. 获取目标Worker的Frame Queue
  fq = vec_elt (fqm->vlib_frame_queues, index);
  ASSERT (fq);
  nelts = fq->nelts;

retry:
  // 2. 原子地读取tail(Acquire语义:确保看到最新的tail)
  tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
  new_tail = tail + 1;

  // 3. 检查队列是否满了
  if (new_tail >= fq->head + nelts)
    {
      if (dont_wait)
	return 0;  // 不等待,直接返回失败

      /* Wait until a ring slot is available */
      // 4. 等待直到有空位(忙等待)
      while (new_tail >= fq->head + nelts)
	vlib_worker_thread_barrier_check ();
    }

  // 5. CAS操作:原子地更新tail(Compare-And-Swap)
  if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 
                                     0 /* weak */,
				     __ATOMIC_RELAXED, __ATOMIC_RELAXED))
    goto retry;  // 如果失败,重试

  // 6. 返回队列元素(新tail位置的元素)
  return fq->elts + (new_tail & (nelts - 1));
}

详细解析:

步骤1:获取Frame Queue
fq = vec_elt (fqm->vlib_frame_queues, index);

生活化理解:

  • 根据Worker索引,获取对应的Frame Queue
  • 每个Worker有自己的Queue,避免竞争
步骤2:原子读取tail
tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
new_tail = tail + 1;

生活化理解:

  • Acquire语义:确保看到最新的tail值
  • 读取-修改:读取tail,计算新tail

为什么需要Acquire?

生产者A:tail = 10
生产者B:tail = 10(可能读到旧值)

如果没有Acquire:
- 生产者B可能读到tail = 9(旧值)
- 导致写入错误的位置

有了Acquire:
- 生产者B确保读到tail = 10(最新值)
- 写入正确的位置
步骤3:检查队列是否满
if (new_tail >= fq->head + nelts)

生活化理解:

队列状态:
┌─────────────────────────────────────┐
│ head=0, tail=60, nelts=64           │
│ 已用:60-0=60个格子                   │
│ 剩余:64-60=4个格子                   │
│ 状态:未满                            │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│ head=0, tail=64, nelts=64           │
│ 已用:64-0=64个格子                   │
│ 剩余:0个格子                         │
│ 状态:满                              │
└─────────────────────────────────────┘

为什么是head + nelts

head=0, tail=64, nelts=64
new_tail = 65
65 >= 0 + 64 = 64  ✓(满)

head=10, tail=60, nelts=64
new_tail = 61
61 >= 10 + 64 = 74  ✗(未满,可用空间:74-61=13)
步骤4:CAS操作(Compare-And-Swap)
if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 
                                   0 /* weak */,
				   __ATOMIC_RELAXED, __ATOMIC_RELAXED))
  goto retry;

生活化理解:

就像"多个人抢车位":

场景:多个Worker(生产者)同时想写入同一个目标Worker的Queue

Worker 1(生产者):tail=10, new_tail=11
Worker 2(生产者):tail=10, new_tail=11
Worker 3(生产者):tail=10, new_tail=11

CAS操作:
1. 比较:当前tail是否等于10?
2. 如果相等:设置为11,返回成功
3. 如果不相等:返回失败,重试

结果:
- Worker 1:CAS成功,tail=11,写入位置11
- Worker 2:CAS失败(tail已经是11),重试,tail=11, new_tail=12,写入位置12
- Worker 3:CAS失败(tail已经是12),重试,tail=12, new_tail=13,写入位置13

为什么需要CAS?

多生产者场景:
- 多个Worker可能同时handoff数据包到同一个目标Worker
- 例如:Worker 1和Worker 2都发现会话在Worker 3上
- 它们会同时写入Worker 3的Queue
- 没有CAS:两个Worker可能写入同一个位置,导致数据丢失
- 有CAS:确保每个Worker写入不同的位置,数据不丢失

CAS的原子性:

CAS是原子操作:
- 比较和交换在一个原子操作中完成
- 不会被其他线程中断
- 确保只有一个线程成功

批量入队操作

代码实现(src/vlib/buffer_funcs.c 第289-347行):

static_always_inline u32
vlib_buffer_enqueue_to_thread_inline (vlib_main_t *vm,
				      vlib_node_runtime_t *node,
				      vlib_frame_queue_main_t *fqm,
				      u32 *buffer_indices, u16 *thread_indices,
				      u32 n_packets, int drop_on_congestion,
				      int with_aux, u32 *aux_data)
{
  u32 drop_list[VLIB_FRAME_SIZE], n_drop = 0;
  vlib_frame_bitmap_t mask, used_elts = {};
  vlib_frame_queue_elt_t *hf = 0;
  u16 thread_index;
  u32 n_comp, off = 0, n_left = n_packets;

  thread_index = thread_indices[0];

more:
  // 1. 找出所有目标Worker相同的数据包
  clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets);
  
  // 2. 获取Frame Queue元素
  hf = vlib_get_frame_queue_elt (fqm, thread_index, drop_on_congestion);

  // 3. 压缩数据包索引(只保留目标Worker相同的数据包)
  n_comp = clib_compress_u32 (hf ? hf->buffer_index : drop_list + n_drop,
			      buffer_indices, mask, n_packets);
  
  if (with_aux)
    clib_compress_u32 (hf ? hf->aux_data : drop_list + n_drop, aux_data, mask,
		       n_packets);

  // 4. 如果成功获取元素,写入数据
  if (hf)
    {
      if (node->flags & VLIB_NODE_FLAG_TRACE)
	hf->maybe_trace = 1;
      hf->n_vectors = n_comp;
      // 5. Release语义:确保数据写入后再设置valid
      __atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE);
      // 6. 通知目标Worker检查Frame Queue
      vlib_get_main_by_index (thread_index)->check_frame_queues = 1;
    }
  else
    n_drop += n_comp;  // 队列满,记录丢弃的数据包

  n_left -= n_comp;

  // 7. 如果还有数据包,继续处理下一个Worker
  if (n_left)
    {
      vlib_frame_bitmap_or (used_elts, mask);
      // ...找到下一个Worker...
      goto more;
    }

  // 8. 如果设置了drop_on_congestion,释放丢弃的数据包
  if (drop_on_congestion && n_drop)
    vlib_buffer_free (vm, drop_list, n_drop);

  return n_packets - n_drop;
}

关键优化:

  1. 批量处理:一次处理多个数据包,减少CAS操作次数
  2. Worker分组:相同Worker的数据包一起处理
  3. 压缩索引:只保留目标Worker相同的数据包索引

出队操作(Dequeue)详解

出队流程

生活化理解:

1. 检查转送箱是否有业务
2. 如果有,取出业务
3. 检查业务是否有效
4. 处理业务
5. 更新队头指针

核心函数:vlib_frame_queue_dequeue_inline

代码实现(src/vlib/buffer_funcs.c 第416-545行):

static_always_inline u32
vlib_frame_queue_dequeue_inline (vlib_main_t *vm, vlib_frame_queue_main_t *fqm,
				 u8 with_aux)
{
  vlib_frame_queue_t *fq;
  vlib_frame_queue_elt_t *elt;
  vlib_frame_t *f = 0;
  u32 *from, *to;
  u32 n_free = 0, n_copy, vectors = 0, processed = 0;
  u32 mask = fqm->frame_queue_nelts - 1;

  // 1. 获取当前Worker的Frame Queue
  fq = fqm->vlib_frame_queues[vm->thread_index];

  while (1)
    {
      // 2. 检查队列是否为空(使用Acquire读取tail)
      u64 tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
      if (fq->head == tail)
	break;  // 队列为空

      // 3. 获取队列元素(head+1位置)
      elt = fq->elts + ((fq->head + 1) & mask);

      // 4. 检查元素是否有效(Acquire语义:确保看到最新的valid)
      if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE))
	break;  // 元素还未写入完成

      // 5. 获取数据包索引
      from = elt->buffer_index + elt->offset;
      if (with_aux)
	from_aux = elt->aux_data + elt->offset;
      ASSERT (elt->offset + elt->n_vectors <= VLIB_FRAME_SIZE);

      // 6. 获取目标Frame(如果还没有)
      if (f == 0)
	{
	  f = vlib_get_frame_to_node (vm, fqm->node_index);
	  to = vlib_frame_vector_args (f);
	  if (with_aux)
	    to_aux = vlib_frame_aux_args (f);
	  n_free = VLIB_FRAME_SIZE;
	}

      // 7. 复制数据包索引到Frame
      n_copy = clib_min (n_free, elt->n_vectors);
      vlib_buffer_copy_indices (to, from, n_copy);
      to += n_copy;
      if (with_aux)
	{
	  vlib_buffer_copy_indices (to_aux, from_aux, n_copy);
	  to_aux += n_copy;
	}

      n_free -= n_copy;
      vectors += n_copy;

      // 8. 如果Frame满了,发送到目标节点
      if (n_free == 0)
	{
	  f->n_vectors = VLIB_FRAME_SIZE;
	  vlib_put_frame_to_node (vm, fqm->node_index, f);
	  f = 0;
	}

      // 9. 处理完元素,更新head
      if (n_copy < elt->n_vectors)
	{
	  /* not empty - leave it on the ring */
	  // 元素还没处理完,更新offset
	  elt->n_vectors -= n_copy;
	  elt->offset += n_copy;
	}
      else
	{
	  /* empty - reset and bump head */
	  // 元素处理完了,重置并更新head
	  u32 sz = STRUCT_OFFSET_OF (vlib_frame_queue_elt_t, end_of_reset);
	  clib_memset (elt, 0, sz);
	  // Release语义:确保数据读取完成后再更新head
	  __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE);
	  processed++;
	}

      // 10. 限制处理的数据包数量(避免饥饿)
      if (vectors >= fq->vector_threshold)
	break;
    }

  // 11. 发送剩余的Frame
  if (f)
    {
      f->n_vectors = VLIB_FRAME_SIZE - n_free;
      vlib_put_frame_to_node (vm, fqm->node_index, f);
    }

  return processed;
}

详细解析:

步骤1:检查队列是否为空
u64 tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
if (fq->head == tail)
  break;

生活化理解:

队列状态:
┌─────────────────────────────────────┐
│ head=10, tail=10                     │
│ 状态:空(head == tail)              │
└─────────────────────────────────────┘

┌─────────────────────────────────────┐
│ head=10, tail=15                     │
│ 状态:有数据(head < tail)           │
│ 可用:15-10=5个元素                   │
└─────────────────────────────────────┘

为什么需要Acquire?

生产者:写入数据,设置valid=1,更新tail=11
消费者:读取tail=11,检查valid

如果没有Acquire:
- 消费者可能读到tail=10(旧值)
- 但valid已经是1(新值)
- 导致读取错误的位置

有了Acquire:
- 消费者确保读到tail=11(最新值)
- 读取正确的位置
步骤2:检查元素是否有效
if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE))
  break;

生活化理解:

生产者写入顺序:
1. 写入数据到elt
2. Release语义:设置valid=1
3. 更新tail

消费者读取顺序:
1. 读取tail(Acquire)
2. 读取valid(Acquire)
3. 如果valid=1,读取数据

为什么需要Acquire?

如果没有Acquire:
- 消费者可能读到valid=1(新值)
- 但数据还未写入完成(旧值)
- 导致读取错误的数据

有了Acquire:
- 消费者确保看到valid=1时,数据已经写入完成
- 读取正确的数据
步骤3:部分处理支持
if (n_copy < elt->n_vectors)
  {
    // 元素还没处理完,更新offset
    elt->n_vectors -= n_copy;
    elt->offset += n_copy;
  }
else
  {
    // 元素处理完了,重置并更新head
    clib_memset (elt, 0, sz);
    __atomic_store_n (&fq->head, fq->head + 1, __ATOMIC_RELEASE);
  }

生活化理解:

场景:Frame满了,但元素还有数据

元素:10个数据包
Frame:只能放5个数据包

第一次处理:
- 处理5个数据包
- offset=5, n_vectors=5(剩余5个)

第二次处理:
- 处理剩余5个数据包
- offset=10, n_vectors=0(处理完)
- 更新head,释放元素
步骤4:阈值限制
if (vectors >= fq->vector_threshold)
  break;

生活化理解:

vector_threshold = 2 * VLIB_FRAME_SIZE = 256

目的:避免一次处理太多数据包
- 防止其他任务饥饿
- 保持系统响应性
- 避免缓存溢出

内存屏障和原子操作

Acquire-Release语义

生活化理解:

就像"确保操作的顺序":

生产者(Release):
1. 写入数据
2. ========== Release屏障 ==========
3. 设置valid=1
4. 更新tail

消费者(Acquire):
1. 读取tail
2. ========== Acquire屏障 ==========
3. 读取valid
4. 读取数据

代码实现:

// 生产者:Release语义
__atomic_store_n (&elt->valid, 1, __ATOMIC_RELEASE);
__atomic_store_n (&fq->tail, tail + 1, __ATOMIC_RELEASE);

// 消费者:Acquire语义
tail = __atomic_load_n (&fq->tail, __ATOMIC_ACQUIRE);
if (!__atomic_load_n (&elt->valid, __ATOMIC_ACQUIRE))
  break;

CAS操作(Compare-And-Swap)

生活化理解:

就像"原子地抢车位":

CAS操作:
1. 读取当前值:tail = 10
2. 计算新值:new_tail = 11
3. 原子操作:
   - 如果tail == 10:设置为11,返回成功
   - 如果tail != 10:返回失败,重试

代码实现:

if (!__atomic_compare_exchange_n (&fq->tail, &tail, new_tail, 
                                   0 /* weak */,
				   __ATOMIC_RELAXED, __ATOMIC_RELAXED))
  goto retry;

为什么需要CAS?

多生产者场景(多个Worker同时写入同一个目标Worker的Queue):
- Worker 1(生产者):tail=10, new_tail=11
- Worker 2(生产者):tail=10, new_tail=11

没有CAS:
- 两个Worker都写入tail=11
- 导致数据丢失或覆盖

有CAS:
- Worker 1:CAS成功,tail=11,写入位置11
- Worker 2:CAS失败(tail已经是11),重试,tail=11, new_tail=12,写入位置12
- 数据不丢失,每个Worker写入不同的位置

实际场景举例:

NAT44-EI多Worker场景:
- Worker 1处理数据包,发现会话在Worker 3上 → handoff到Worker 3
- Worker 2处理数据包,也发现会话在Worker 3上 → handoff到Worker 3
- Worker 1和Worker 2同时写入Worker 3的Queue
- CAS操作确保它们写入不同的位置,不会冲突

缓存行对齐和False Sharing

什么是False Sharing?

生活化理解:

就像"两个人抢同一个抽屉":

CPU缓存结构:
┌─────────────────────────────────────┐
│ 缓存行(64字节)                      │
│ tail(8字节)│ head(8字节)│ ...    │ ← 同一缓存行
└─────────────────────────────────────┘

问题:
- Worker 1写入tail → 缓存行失效
- Worker 2读取head → 缓存行失效
- 频繁的缓存失效 → 性能下降

缓存行对齐

代码实现:

typedef struct
{
  CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
  vlib_frame_queue_elt_t *elts;
  // ...

  CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
  volatile u64 tail;  // 单独一个缓存行

  CLIB_CACHE_LINE_ALIGN_MARK (cacheline2);
  volatile u64 head;  // 单独一个缓存行
}
vlib_frame_queue_t;

生活化理解:

优化后的结构:
┌─────────────────────────────────────┐
│ 缓存行0:elts, nelts等(只读)       │
├─────────────────────────────────────┤
│ 缓存行1:tail(生产者写入)           │ ← 独立缓存行
├─────────────────────────────────────┤
│ 缓存行2:head(消费者读取)           │ ← 独立缓存行
└─────────────────────────────────────┘

好处:
- tail和head不在同一缓存行
- 避免False Sharing
- 提高性能

性能影响

测试数据:

场景 性能
False Sharing 100万包/秒
缓存行对齐 500万包/秒

提升:5倍性能提升!


拥塞控制和流控

队列满检测

代码实现:

if (new_tail >= fq->head + nelts)
  {
    if (dont_wait)
      return 0;  // 不等待,直接返回失败

    /* Wait until a ring slot is available */
    while (new_tail >= fq->head + nelts)
      vlib_worker_thread_barrier_check ();
  }

生活化理解:

队列状态:
┌─────────────────────────────────────┐
│ head=0, tail=64, nelts=64           │
│ 状态:满                              │
│ 策略:等待或丢弃                      │
└─────────────────────────────────────┘

丢弃策略

代码实现:

if (drop_on_congestion)
  {
    // 丢弃数据包
    vlib_buffer_free (vm, node, buffer_indices[i], 1);
    continue;
  }

生活化理解:

拥塞处理策略:
1. drop_on_congestion=1:丢弃数据包(快速失败)
2. drop_on_congestion=0:等待空位(可能阻塞)

通知机制

代码实现:

__atomic_store_n (&hf->valid, 1, __ATOMIC_RELEASE);
vlib_get_main_by_index (thread_index)->check_frame_queues = 1;

生活化理解:

通知流程:
1. 生产者:写入数据,设置valid=1
2. 生产者:设置check_frame_queues=1(通知目标Worker)
3. 消费者:检查check_frame_queues,处理Frame Queue

性能优化技巧

1. 批量处理

代码实现:

// 一次处理多个数据包
n_comp = clib_compress_u32 (hf->buffer_index, buffer_indices, mask, n_packets);
hf->n_vectors = n_comp;

好处:

  • 减少CAS操作次数
  • 提高吞吐量

2. Worker分组

代码实现:

clib_mask_compare_u16 (thread_index, thread_indices, mask, n_packets);

好处:

  • 相同Worker的数据包一起处理
  • 减少Queue切换

3. 位运算优化

代码实现:

u32 index = tail & (nelts - 1);  // 位运算,比取模快

好处:

  • 位运算比取模快10倍
  • 提高性能

4. 预取优化

代码实现:

// 预取下一个元素
__builtin_prefetch (fq->elts + ((head + 2) & mask), 1, 3);

好处:

  • 提前加载数据到缓存
  • 减少内存访问延迟

实际应用场景

NAT44-EI中的使用

场景1:IN2OUT Handoff

// Worker 1处理IN2OUT数据包
// 发现会话在Worker 2上
// Handoff到Worker 2

vlib_buffer_enqueue_to_thread (vm, node, fq_index, 
                               buffer_indices, thread_indices, 
                               n_packets, drop_on_congestion);

场景2:OUT2IN Handoff

// Worker 2处理OUT2IN数据包
// 发现会话在Worker 1上
// Handoff到Worker 1

vlib_buffer_enqueue_to_thread (vm, node, fq_index, 
                               buffer_indices, thread_indices, 
                               n_packets, drop_on_congestion);

性能数据

测试环境:

  • CPU:Intel Xeon E5-2680 v4
  • Worker:4个Worker线程
  • 数据包:64字节小包

测试结果:

场景 吞吐量 延迟
单Worker 10M pps 10μs
多Worker(无Frame Queue) 5M pps 20μs
多Worker(有Frame Queue) 40M pps 5μs

结论:

  • Frame Queue提升8倍性能
  • 延迟降低50%

总结

核心要点

  1. 无锁设计:使用原子操作和内存屏障,避免传统锁
  2. 环形队列:固定大小,循环使用,高效
  3. 多生产者-单一消费者:多个Worker可能同时写入同一个Queue,但只有一个消费者
  4. CAS操作:处理多生产者竞争,确保线程安全
  5. 缓存行对齐:避免False Sharing,提高性能
  6. 批量处理:一次处理多个数据包,提高吞吐量

关键机制

  1. CAS操作:处理多生产者竞争,原子地更新tail,避免数据丢失
  2. 多生产者支持:多个Worker可以同时写入同一个目标Worker的Queue
  3. 单一消费者:每个Queue只有一个消费者,无需CAS操作
  4. Acquire-Release语义:确保操作顺序和可见性
  5. 通知机制:设置check_frame_queues通知目标Worker
  6. 拥塞控制:队列满时等待或丢弃

性能优势

  1. 高吞吐量:无锁设计,支持百万级数据包/秒
  2. 低延迟:避免锁竞争,延迟低至微秒级
  3. 可扩展性:支持多Worker,线性扩展

最佳实践

  1. 队列大小:设置为2的幂,使用位运算优化
  2. 缓存行对齐:避免False Sharing
  3. 批量处理:一次处理多个数据包
  4. 拥塞控制:根据场景选择等待或丢弃策略

希望这篇文章能帮助你深入理解VPP Frame Queue的无锁队列实现!

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐