在金融量化领域的工程实践中,最脏、最累但又最重要的一环,往往不是策略回测,而是原始数据的清洗与预处理。它直接决定了下游分析和交易系统的稳定性和效率。

最近我面临这样一个场景:我们需要处理海量的 UDP 抓包数据。
现状是:

  • 数据源:HDFS 上存储的压缩文件,按天存储,每天按时间切分为子目录,每个目录下又有多个不同 IP 和端口的文件。
  • 数据结构:UDP 包中混杂着成千上万个股票(Symbol)的报价和成交信息,这些数据本身并没有经过预分类。
  • 目标:将这些混在一起的二进制数据解析出来,按 Symbol 归类。最终,每个 Symbol 输出一个独立的文件。

这是一个典型的 Shuffle(洗牌) 问题:大量分散的输入数据需要根据某个键(Symbol)重新聚合到不同的输出目标。

起初,我写了一个简单的串行处理版本,跑完一天的数据大概需要 30 到 40 分钟。对于要求时效性或需要回溯几年历史数据的场景来说,这个速度是完全无法接受的。

这篇文章就来复盘一下,我是如何通过文件级并行、分片锁(Sharded Locking)和缓冲 I/O等一系列工程优化,将处理速度提升一个数量级,实现高性能数据清洗的。


一、串行处理的“慢动作”

最初,我尝试的逻辑是直观的串行处理:

# 伪代码:串行处理的朴素逻辑
for file in all_hdfs_files: # 遍历所有 HDFS 上的压缩 Pcap 文件
    packets = parse_pcap_and_moldudp(file) # 解压并解析所有 UDP 包
    for pkt in packets: # 遍历解析出的每个消息
        write_record_to_csv(pkt.symbol, pkt.data) # 将数据写入对应 Symbol 的 CSV 文件

代码写完跑起来,通过 htopiostat 等工具观察系统资源使用情况,我很快发现了性能瓶颈所在:

  1. CPU 利用率低:Gzip 解压和二进制协议解析(Protocol Parsing)虽然是计算密集型任务,但单个文件的数据量有限,且单线程处理时无法充分利用多核 CPU 资源,导致 CPU 大部分时间处于空闲状态。
  2. I/O 效率低下:由于是串行读取 HDFS 文件,网络 I/O 和本地磁盘 I/O 都在“读一点、停一下、写一点”的循环中浪费了大量时间。特别是写入阶段,频繁打开/关闭文件句柄,或频繁执行磁盘写入操作,严重拖慢了整体进度。
  3. 瓶颈在“写”不在“读”:从少量的大文件(压缩 Pcap)读入,但需要拆解成无数个小片段写入到 5000 多个 Symbol 对应的 CSV 文件里。这种将少量输入扇出到大量输出的操作,对 IOPS (每秒 I/O 操作数) 的要求极高,并且在串行模式下,频繁切换写入目标文件是主要瓶颈。

显然,为了显著提升处理速度,并行化处理是唯一的出路。


二、任务池与流式并行处理

为了榨干服务器的多核 CPU 性能,并将 HDFS 的网络带宽利用到极致,我们将整个数据清洗任务拆解为两个核心阶段,并采用了 流式处理(Streaming) + 任务窃取(Work Stealing) 的并行模式。

文件级并行 (File-Level Parallelism)

在并行化策略的选择上,我们没有选择复杂的“时间片切分”或“按 Symbol 预分配”,而是选择了最自然、实现难度最低且效率极高的策略——文件级并行。这是因为原始数据本身就是以文件为单位物理隔离的,每个文件都是一个独立的、可自包含的处理单元。

  1. 任务池(Task Pool)构建
    主线程(或一个独立的协调线程)负责扫描 HDFS 上的所有目录,收集所有待处理的文件路径,并将这些文件路径作为“任务”提交到一个共享的任务队列中。
  2. 工作线程池(Worker Pool)启动
    我们创建了一个固定大小的线程池(通常线程数设置为逻辑 CPU 核心数,或略多),每个线程都是一个独立的工作者。
  3. 任务抢占与独立执行
    线程池中的每个工作线程从任务队列中“抢”一个文件任务。拿到文件后,该线程将独立、完整地负责该文件的所有处理流程:从 HDFS 下载文件、解压、解析二进制协议,直到将解析出的数据写入内存缓冲。

这种文件级的并行方式,最大化了 CPU 利用率,并能有效利用 HDFS 的分布式读取能力。同时,由于 rayon 等现代并行库采用了 Work Stealing (工作窃取) 算法,即使文件大小不一,也能有效平衡线程负载,确保所有核心高效运转。

然而,文件级并行解决了数据解析的效率问题,却立刻带来了新的挑战:写入冲突

解决海量文件写入的竞态条件

当 100 个工作线程并行解析文件时,它们可能会同时解析到同一只股票(例如 AAPL)的行情数据。如果不加任何控制,多个线程同时尝试向同一个 AAPL.csv 文件写入数据,文件内容就会混乱不堪,甚至导致数据丢失或程序崩溃。

这就是典型的 竞态条件(Race Condition)

传统的解决方案及其局限性:

  • 方案 A:全局大锁(Global Lock)
    所有线程在写入任何文件之前都必须先获取一个全局锁。这会导致所有并行处理的线程在写入阶段又退化成串行,甚至因为频繁的锁竞争和上下文切换而导致性能比单线程更差。这种“一管就死”的方案是不可接受的。
  • 方案 B:每个 Symbol 一个独立的锁(Fine-grained Locking)
    我们可以为每个 Symbol(例如 AAPL)创建一个独立的锁。这样,线程 A 写 AAPL 和线程 B 写 MSFT 可以并行进行。
    虽然这种方式能够提供更细粒度的并发,但考虑到我们有 5000 多个 Symbol,这意味着系统需要管理 5000 多个独立的锁。维护如此大量的锁对象会带来额外的内存开销,并且在实现上也会增加复杂性。更重要的是,在并发访问时,查找和管理这些锁的开销也可能变得显著。

分片锁 (Sharded Locking)

为了在并发性能和管理开销之间找到最佳平衡点,我们引入了 分片锁(Sharded Locking) 机制。

核心思想:我们不给每个 Symbol 配一把独立的锁,而是设立 N 个(例如 512 个)“管理员”(或称 Shard),每个管理员负责管理一部分 Symbol。

  • 设定 SHARD_COUNT:我们定义一个常量 SHARD_COUNT (例如,512)。这个数值通常是 CPU 核心数的 4-8 倍,用于最大化并发同时避免过度细化。
  • Symbol 分配到 Shard:每个 Symbol 在需要写入时,会根据其哈希值(hash(Symbol) % SHARD_COUNT)被分配到一个特定的 Shard。
  • Shard 内锁保护:每个 Shard 内部维护一个 HashMap,存储归它管辖的所有 Symbol 对应的 Writer。这个 HashMap 本身由一个独立的 Mutex 保护。

新的写入流程变为:

// 伪代码示意:分片锁实现
const SHARD_COUNT: usize = 512; // 设定分片数量

// global_symbol_registry 是一个包含 SHARD_COUNT 个 Mutex<HashMap<String, Arc<Mutex<SymbolWriter>>>> 的数组
struct GlobalSymbolRegistry {
    shards: [Mutex<HashMap<String, Arc<Mutex<SymbolWriter>>>>; SHARD_COUNT],
    // ... 其他字段 ...
}

impl GlobalSymbolRegistry {
    // 根据 Symbol 计算它应该归哪个分片管
    fn shard_index(symbol: &str) -> usize {
        // ... 哈希计算 ...
        (hasher.finish() as usize) % SHARD_COUNT
    }

    // 获取 Symbol 对应的写入器
    fn get_writer(&self, symbol: &str) -> Result<Arc<Mutex<SymbolWriter>>> {
        let shard_idx = Self::shard_index(symbol);
        
        // 1. 获取该分片(Shard)的锁。
        //    此时,其他线程无法在该分片中查找或创建 SymbolWriter。
        let mut shard_guard = self.shards[shard_idx].lock().unwrap();

        // 2. 在该分片内部的 HashMap 中查找或创建 SymbolWriter。
        //    每个 SymbolWriter 自己也带有一个 Mutex,用于保护其内部状态。
        match shard_guard.entry(symbol.to_string()) {
            Entry::Occupied(entry) => Ok(entry.get().clone()),
            Entry::Vacant(entry) => {
                let new_writer = create_new_csv_writer(symbol);
                let symbol_writer = Arc::new(Mutex::new(new_writer));
                entry.insert(symbol_writer.clone());
                Ok(symbol_writer)
            }
        }
    }
}

// 在处理逻辑中:
// 获取到 SymbolWriter 后,还需要再获取其内部的锁,才能执行写入操作
let writer_arc = registry.get_writer(&symbol)?;
let mut symbol_writer_guard = writer_arc.lock().unwrap();
symbol_writer_guard.write_record(record)?;

分片锁的优势

  • 显著降低锁冲突:对于 5000 多个 Symbol 和 100 个并发线程,512 个分片使得两个不同的线程争抢同一个分片锁的概率大大降低。大部分线程可以无冲突地同时操作不同分片下的 Symbol。
  • 细粒度管理:每个 SymbolWriter 自身也带有一个锁,这确保了即使在同一个分片内,多个线程对不同 Symbol 的写入也是并行的。Shard 的锁主要用于管理 SymbolWriter 对象的生命周期和查找。
  • 可扩展性:通过调整 SHARD_COUNT,可以灵活适应不同的 CPU 核心数和 Symbol 数量,在并发性能和内存开销之间找到最佳平衡。

六、缓冲写入 (Buffered I/O)

解决了锁竞争问题后,我们发现系统调用(System Call)成了新的性能瓶颈。

原因在于:解析出来的一条行情数据可能只有几十字节。如果每解析一条数据,就立即调用一次底层文件系统的 write() 操作,那么程序将频繁地在用户态(User Mode)和内核态(Kernel Mode)之间切换。对于每天数亿行数据处理量来说,这种频繁的模式切换和上下文开销是极其巨大的。

优化方案:应用层缓冲(Application-Level Buffering)

我们在每个 Symbol 的 SymbolWriter 结构中内置了一个 内存缓冲区(Vec<StringRecord>

struct SymbolWriter {
    writer: CsvWriter<File>,
    buffer: Vec<StringRecord>, // 核心:每个 Symbol 独有的内存缓冲区
    buffer_size: usize,        // 例如配置为 1000 条记录
    seen: HashSet<RowKey>,     // 用于去重
}

impl SymbolWriter {
    fn write_record(&mut self, record: StringRecord) -> Result<()> {
        self.buffer.push(record); // 数据首先写入内存缓冲区

        // 只有当缓冲区满了,才一次性写入磁盘
        if self.buffer.len() >= self.buffer_size {
            self.flush_buffer_to_disk()?;
        }
        Ok(())
    }

    fn flush_buffer_to_disk(&mut self) -> Result<()> {
        for record in self.buffer.drain(..) { // 一次性取出所有缓冲数据
            self.writer.write_record(&record)?; // 批量写入 CSV 文件
        }
        self.writer.flush()?; // 确保数据写入物理介质
        Ok(())
    }
}

缓冲写入的优势

  • 减少系统调用:将多条小记录聚合为一次大写入操作,显著减少了用户态/内核态切换的次数。例如,1000 条记录可能只需要一次系统调用。
  • 优化磁盘 I/O:批量写入通常比零散写入更高效,尤其是对于机械硬盘,可以优化磁头寻道时间;对于 SSD 也能减少碎片化写入。
  • 缩短锁持有时间:由于 SymbolWriter 的锁只在 write_recordflush_buffer_to_disk 期间被持有,缓冲机制使得真正进行磁盘写入(可能较慢)的锁持有时间大大减少,从而进一步降低了对 SymbolWriter 内部锁的竞争。
Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐