吐槽

 相比cuda,网上关于AscendC的资料,数量少且质量不高。我来写一个。看完本篇,希望读者可以看懂昇腾平台上的算子代码。

Reference

[1] Ascend C保姆级教程:我的第一份Ascend C代码
[2] Ascend C 多核并行/流水计算/dobule buffer技术
[3] 一文看懂昇腾达芬奇架构计算单元
[4] Ascend C算子开发接口
[5] asc-devkit文档

前置知识

 华为昇腾npu,达芬奇架构。每张卡上有若干个AI Core。

在这里插入图片描述
          pic source:昇腾Ascend C编程入门教程(纯干货)
 每个Ai Core中,有1个Cube计算单元,负责矩阵计算,2个Vector计算单元,负责向量计算。MTE(Memory Transfer Engine)负责数据搬运。需要指出的是,AscendC编写的代码,被编译成二进制后,由Scalar单元负责指令执行和分发。在我理解,Scalar就是一个cpu的能力。for循环需要Scala计算单元执行,向量运算由Vector负责。理解算子代码,要点:指令的分发是串行的。不同的计算单元并行执行各自的指令。Vector计算单元,Cube计算单元,MTE搬运单元由各自的指令队列。每个单元串行执行指令。 在没有数据依赖的情况下,MTE在搬运数据的同时,Vector可以执行向量运算。在存在数据依赖的情况下,比如MTE搬入数据完成后,Vector负责计算,两个之前需要信号同步:SetFlag/WaitFlag。double buffer就是从UB申请了两块空间。

template <HardEvent event>
__aicore__ inline void SetFlag(int32_t eventID)
template <HardEvent event>
__aicore__ inline void WaitFlag(int32_t eventID)

 SetFlag<HardEvent::V_S>(0), 这条指令发送到Vector计算单元,由其设置对应的信号。
 WaitFlag<HardEvent::V_S>(0), 这条指令由Scalar计算单元执行,等待Vector计算单元设置对应的时间,然后程序才会向下执行。
 针对Atlas 推理系列产品AI Core,eventID的数据范围为:0-7。
 HardEvent的定义:

enum class HardEvent : uint8_t {
    // 名称(源流水_目标流水),例如MTE2_V,代表PIPE_MTE2为源流水,PIPE_V为目标流水。标识从PIPE_MTE2到PIPE_V的同步,PIPE_V等待PIPE_MTE2。
    MTE2_MTE1
    MTE1_MTE2
    MTE1_M
    M_MTE1
    MTE2_V
    V_MTE2
    MTE3_V
    V_MTE3
    M_V
    V_M
    V_V
    MTE3_MTE1
    MTE1_MTE3
    MTE1_V
    MTE2_M
    M_MTE2
    V_MTE1
    M_FIX // 当前版本暂不支持
    FIX_M // 当前版本暂不支持
    MTE3_MTE2
    MTE2_MTE3
    S_V
    V_S
    S_MTE2
    MTE2_S
    S_MTE3
    MTE3_S
    MTE2_FIX // 当前版本暂不支持
    FIX_MTE2 // 当前版本暂不支持
    FIX_S // 当前版本暂不支持
    M_S
    FIX_MTE3 // 当前版本暂不支持
}

 不同的型号,AI Core的数量不同。根据下图提供的线索,找到配置文件。
在这里插入图片描述
       pic source:叭一叭AscendC算子支持芯片的运算单元数和AscendC提供的API
 https://gitcode.com/cann/runtime/tree/master/src/platform/platform_config

910B1 910B2 910B3 910B4
ai_core_cnt 24 24 20 20
cube_core_cnt 24 24 20 20
vector_core_cnt 48 48 40 40

 在torch_npu中,torch.randn((2, 2), device=torch.device(‘npu’)),其数据存储在GM(Global Memory)。
 AIV中,MTE2负责将数据从GM搬运到UB。MTE3负载将数据从UB搬运到GM。Vector提供的函数接口(Add, Sub, Mul),输入和输出需要存储在UB上。LocalTensor的数据存储在UB上。
 每个AiCore上有2个Vector计算单元,2块UB。每块UB的大小:ub_size = 196608 bytes = 192 KB × 1024 bytes/KB。

template <typename T>
__aicore__ inline void Add(const LocalTensor<T>& dstLocal, const LocalTensor<T>& src0Local, const LocalTensor<T>& src1Local, const int32_t& calCount)

template <typename T>
__aicore__ inline void Mul(const LocalTensor<T>& dstLocal, const LocalTensor<T>& src0Local, const LocalTensor<T>& src1Local, const int32_t& calCount)

在这里插入图片描述

            pic source:昇腾处理器的硬件架构与关键技术

算子源码分析

 官方给出的算子样例add_custom.cpp
 BUFFER_NUM = 2,inQueueX有两块buffer。

#include "kernel_operator.h"

constexpr int32_t TOTAL_LENGTH = 8 * 2048;                            // total length of data
constexpr int32_t USE_CORE_NUM = 8;                                   // num of core used
constexpr int32_t BLOCK_LENGTH = TOTAL_LENGTH / USE_CORE_NUM;         // length computed of each core
constexpr int32_t TILE_NUM = 8;                                       // split data into 8 tiles for each core
constexpr int32_t BUFFER_NUM = 2;                                     // tensor num for each queue
constexpr int32_t TILE_LENGTH = BLOCK_LENGTH / TILE_NUM / BUFFER_NUM; // separate to 2 parts, due to double buffer

class KernelAdd {
public:
    __aicore__ inline KernelAdd() {}
    __aicore__ inline void Init(GM_ADDR x, GM_ADDR y, GM_ADDR z)
    {
        xGm.SetGlobalBuffer((__gm__ half *)x + BLOCK_LENGTH * AscendC::GetBlockIdx(), BLOCK_LENGTH);
        yGm.SetGlobalBuffer((__gm__ half *)y + BLOCK_LENGTH * AscendC::GetBlockIdx(), BLOCK_LENGTH);
        zGm.SetGlobalBuffer((__gm__ half *)z + BLOCK_LENGTH * AscendC::GetBlockIdx(), BLOCK_LENGTH);
        pipe.InitBuffer(inQueueX, BUFFER_NUM, TILE_LENGTH * sizeof(half));
        pipe.InitBuffer(inQueueY, BUFFER_NUM, TILE_LENGTH * sizeof(half));
        pipe.InitBuffer(outQueueZ, BUFFER_NUM, TILE_LENGTH * sizeof(half));
    }
    __aicore__ inline void Process()
    {
        int32_t loopCount = TILE_NUM * BUFFER_NUM;
        for (int32_t i = 0; i < loopCount; i++) {
            CopyIn(i);
            Compute(i);
            CopyOut(i);
        }
    }

private:
    __aicore__ inline void CopyIn(int32_t progress)
    {
        AscendC::LocalTensor<half> xLocal = inQueueX.AllocTensor<half>();
        AscendC::LocalTensor<half> yLocal = inQueueY.AllocTensor<half>();
        AscendC::DataCopy(xLocal, xGm[progress * TILE_LENGTH], TILE_LENGTH);
        AscendC::DataCopy(yLocal, yGm[progress * TILE_LENGTH], TILE_LENGTH);
        inQueueX.EnQue(xLocal);
        inQueueY.EnQue(yLocal);
    }
    __aicore__ inline void Compute(int32_t progress)
    {
        AscendC::LocalTensor<half> xLocal = inQueueX.DeQue<half>();
        AscendC::LocalTensor<half> yLocal = inQueueY.DeQue<half>();
        AscendC::LocalTensor<half> zLocal = outQueueZ.AllocTensor<half>();
        AscendC::Add(zLocal, xLocal, yLocal, TILE_LENGTH);
        outQueueZ.EnQue<half>(zLocal);
        inQueueX.FreeTensor(xLocal);
        inQueueY.FreeTensor(yLocal);
    }
    __aicore__ inline void CopyOut(int32_t progress)
    {
        AscendC::LocalTensor<half> zLocal = outQueueZ.DeQue<half>();
        AscendC::DataCopy(zGm[progress * TILE_LENGTH], zLocal, TILE_LENGTH);
        outQueueZ.FreeTensor(zLocal);
    }

private:
    AscendC::TPipe pipe;
    AscendC::TQue<AscendC::QuePosition::VECIN, BUFFER_NUM> inQueueX, inQueueY;
    AscendC::TQue<AscendC::QuePosition::VECOUT, BUFFER_NUM> outQueueZ;
    AscendC::GlobalTensor<half> xGm;
    AscendC::GlobalTensor<half> yGm;
    AscendC::GlobalTensor<half> zGm;
};

extern "C" __global__ __aicore__ void add_custom(GM_ADDR x, GM_ADDR y, GM_ADDR z)
{
    if (g_coreType == AIC) {
        return;
    }
    KernelAdd op;
    op.Init(x, y, z);
    op.Process();
}

#ifndef ASCENDC_CPU_DEBUG
void add_custom_do(uint32_t blockDim, void *stream, uint8_t *x, uint8_t *y, uint8_t *z)
{
    add_custom<<<blockDim, nullptr, stream>>>(x, y, z);
}
#endif

 从官网抄下一些文字解释。参考文档:Ascend C算子开发接口。
 TPipe用于管理内存和同步。
TQueBind简介,文档链接。TQueBind绑定源逻辑位置和目的逻辑位置,根据源位置和目的位置,来确定内存分配的位置 、插入对应的同步事件,帮助开发者解决内存分配和管理、同步等问题。Tque是TQueBind的简化模式。通常情况下开发者使用TQue进行编程,TQueBind对外提供一些特殊数据通路的内存管理和同步控制,涉及这些通路时可以直接使用TQueBind。

数据通路 TQueBind定义 TQue定义
GM->VECIN TQueBind<TPosition::GM, TPosition::VECIN, 1> TQue<TPosition::VECIN, 1>
VECOUT->GM TQueBind<TPosition::VECOUT, TPosition::GM, 1> TQue<TPosition::VECOUT, 1>

 我在autodl租用了一台昇腾910B,在/usr/local/Ascend/ascend-toolkit/8.1.RC1可以看到一些cann的头文件。这部分代码已经开源,在asc-devkit
 TQue类的定义:

/*
/usr/local/Ascend/ascend-toolkit/8.1.RC1/aarch64-linux/include/ascendc/include/basic_api/impl/kernel_event.h
*/
__aicore__ constexpr TPosition GetBufferLogicPos(TPosition pos, bool isSrc)
{
    } else if (pos == TPosition::VECIN) {
        return isSrc ? TPosition::GM : TPosition::VECIN;
    } else if (pos == TPosition::VECOUT) {
        return isSrc ? TPosition::VECOUT : TPosition::GM;
    }
}

/*
ascendc/include/basic_api/interface/kernel_tpipe.h
*/
template <TPosition pos, int32_t depth, auto mask = 0>
class TQue : public TQueBind<GetBufferLogicPos(pos, true), GetBufferLogicPos(pos, false), depth, mask>

 blockDim是逻辑核的概念,取值范围为[1,65535]。为了充分利用硬件资源,一般设置为物理核的核数或其倍数。对于耦合架构和分离架构,blockDim在运行时的意义和设置规则有一些区别,具体说明如下:

  • 耦合架构:由于其Vector、Cube单元是集成在一起的,blockDim用于设置启动多个AICore核实例执行,不区分Vector、Cube。
  • 分离架构
    • 针对仅包含Vector计算的算子,blockDim用于设置启动多少个Vector(AIV)实例执行。
    • 针对仅包含Cube计算的算子,blockDim用于设置启动多少个Cube(AIC)实例执行。
    • 针对Vector/Cube融合计算的算子,启动时,按照AIV和AIC组合启动,blockDim用于设置启动多少个组合执行,比如某款AI处理器上有40个Vector核和20个Cube核,一个组合是2个Vector核和1个Cube核。如果设置为20,此时会启动20个组合,即40个Vector核和20个Cube核。

文字来源source link
 测试文件main.cpp

int32_t main(int32_t argc, char *argv[])
{
    uint32_t blockDim = 8;
    size_t inputByteSize = 8 * 2048 * sizeof(uint16_t);
    size_t outputByteSize = 8 * 2048 * sizeof(uint16_t);

    CHECK_ACL(aclInit(nullptr));
    int32_t deviceId = 0;
    CHECK_ACL(aclrtSetDevice(deviceId));
    aclrtStream stream = nullptr;
    CHECK_ACL(aclrtCreateStream(&stream));

    uint8_t *xHost, *yHost, *zHost;
    uint8_t *xDevice, *yDevice, *zDevice;

    CHECK_ACL(aclrtMallocHost((void **)(&xHost), inputByteSize));
    CHECK_ACL(aclrtMallocHost((void **)(&yHost), inputByteSize));
    CHECK_ACL(aclrtMallocHost((void **)(&zHost), outputByteSize));
    CHECK_ACL(aclrtMalloc((void **)&xDevice, inputByteSize, ACL_MEM_MALLOC_HUGE_FIRST));
    CHECK_ACL(aclrtMalloc((void **)&yDevice, inputByteSize, ACL_MEM_MALLOC_HUGE_FIRST));
    CHECK_ACL(aclrtMalloc((void **)&zDevice, outputByteSize, ACL_MEM_MALLOC_HUGE_FIRST));

    ReadFile("./input/input_x.bin", inputByteSize, xHost, inputByteSize);
    ReadFile("./input/input_y.bin", inputByteSize, yHost, inputByteSize);

    CHECK_ACL(aclrtMemcpy(xDevice, inputByteSize, xHost, inputByteSize, ACL_MEMCPY_HOST_TO_DEVICE));
    CHECK_ACL(aclrtMemcpy(yDevice, inputByteSize, yHost, inputByteSize, ACL_MEMCPY_HOST_TO_DEVICE));

    add_custom_do(blockDim, stream, xDevice, yDevice, zDevice);
    CHECK_ACL(aclrtSynchronizeStream(stream));

    CHECK_ACL(aclrtMemcpy(zHost, outputByteSize, zDevice, outputByteSize, ACL_MEMCPY_DEVICE_TO_HOST));
    WriteFile("./output/output_z.bin", zHost, outputByteSize);

    CHECK_ACL(aclrtFree(xDevice));
    CHECK_ACL(aclrtFree(yDevice));
    CHECK_ACL(aclrtFree(zDevice));
    CHECK_ACL(aclrtFreeHost(xHost));
    CHECK_ACL(aclrtFreeHost(yHost));
    CHECK_ACL(aclrtFreeHost(zHost));

    CHECK_ACL(aclrtDestroyStream(stream));
    CHECK_ACL(aclrtResetDevice(deviceId));
    CHECK_ACL(aclFinalize());
    return 0;
}

 在main函数中,xDevice,yDevice,zDevice分配的内存空间位于GM。blockDim =8,程序使用4个AICore,8个Vector计算单元。
 add_custom_do(blockDim, stream, xDevice, yDevice, zDevice); xDevice,yDevice,zDevice地址分别传递给核函数中的x,y,z。
 add_custom中,我增加了三行代码。add_custom算子用不到Cube计算单元,增加return的逻辑。

    if (g_coreType == AIC) {
        return;
    }

 内部函数的调用关系示意图如下:
在这里插入图片描述
         pic source:Ascend C保姆级教程:我的第一份Ascend C代码
 初始化函数Init主要完成以下内容:设置输入输出Global Tensor的Global Memory内存地址,通过Pipe内存管理对象为输入输出Queue分配内存。

 本样例将数据切分成8块,平均分配到8个核上运行,每个核上处理的数据大小BLOCK_LENGTH为2048。那么我们是如何实现这种切分的呢?

 每个核上处理的数据地址需要在起始地址上增加GetBlockIdx()*BLOCK_LENGTH(每个block处理的数据长度)的偏移来获取。这样也就实现了多核并行计算的数据切分。

 以输入x为例,x + BLOCK_LENGTH * GetBlockIdx()即为单核处理程序中x在Global Memory上的内存偏移地址,获取偏移地址后,使用GlobalTensor类的SetGlobalBuffer接口设定该核上Global Memory的起始地址以及长度。具体示意图如下。
在这里插入图片描述
         pic source:Ascend C保姆级教程:我的第一份Ascend C代码

 Process函数中主要由三个步骤:CopyIn,Compute,CopyOut。
 CopyIn负责从GM搬运数据到UB。AscendC::DataCopy使用MTE2提供的数据搬入能力。
 Compute负责计算。AscendC::Add由Vector负责计算,计算结果存储到zLocal。
 CopyOut负责将计算结果从UB拷贝到GM。AscendC::DataCopy使用MTE3提供的数据搬出能力。

 CopyIn中数据搬入的指令下发MTE2。
 Compute中AscendC::Add指令下发给Vector单元。
 CopyOut中数据搬出的指令下发MTE3。
 三个器件需要协作,需要使用同步指令:

  • 在CopyIn,MTE2将GM中的数据搬运到xLocal,yLocal,需要调用SetFlag<MTE2_V>,通知Vector可以进行计算。SetFlag<MTE2_V>由inQueueX.EnQue(xLocal)函数触发。
  • 在Compute,Vector计算单元需要调用WaitFlag<MTE2_V>,等待数据拷贝完成,然后才能执行AscendC::Add指令。WaitFlag<MTE2_V>由inQueueX.DeQue触发。AscendC::Add计算完成之后,Vector计算单元会执行SetFlag<V_MTE3>,通知MTE3搬出数据。SetFlag<V_MTE3>由outQueueZ.EnQue触发。
  • 在CopyOut,outQueueZ.DeQue会触发WaitFlag<V_MTE3>,等待Vector计算完成,然后执行AscendC::DataCopy,将zLocal拷贝到zGM。

DataCopy函数说明文档。在使用DataCopy接口进行数据传输时,必须确保搬运的数据长度和操作数在UB上的起始地址为32字节对齐;在进行向量计算时,操作数的起始地址也需满足32字节对齐的要求。source。非32B对齐场景,需要使用DataCopyPad接口。
在这里插入图片描述
         pic source:Ascend C保姆级教程:我的第一份Ascend C代码

double buffer的原理分析

 在当前的硬件情况下,计算快于拷贝。double buffer主要是为了提高计算核心的利用率。在计算核心进行计算的同时,MTE可以拷贝另一部分数据。
 double buffer机制可以实现下图的结果。
 每个并行任务(Stage1、2、3)需要依次完成n个数据切片的处理。Stage1,Stage2,Stage3分别代表:数据搬入,计算,数据搬出。 Progress1、2、3代表处理的数据分片,对于同一片数据,Stage1、Stage2、Stage3之间的处理具有依赖关系,需要串行处理;不同的数据切片,同一时间点,可以有多个任务在并行处理,由此达到任务并行、提升性能的目的。
在这里插入图片描述
         pic source:Ascend C算子性能优化实用技巧01——流水优化

 在Init函数中, inQueueX有两块存储空间。inQueueY,outQueueZ同理。

        pipe.InitBuffer(inQueueX, BUFFER_NUM, TILE_LENGTH * sizeof(half));
        pipe.InitBuffer(inQueueY, BUFFER_NUM, TILE_LENGTH * sizeof(half));
        pipe.InitBuffer(outQueueZ, BUFFER_NUM, TILE_LENGTH * sizeof(half));

 TQue继承自TQueBind。源码
 TQueBind定义的几个函数:

  • AllocTensor
  • EnQue
  • DeQue
  • FreeTensor

 inQueueX中的两块buffer分别用inBuf0, inBuf1表示。
 在Process函数的for循环中,scalar按序下发CopyIn,Compute,CopyOut中包含的指令。
 i = 0

CopyIn(0);
Compute(0);
CopyOut(0);

 在CopyIn函数中,AscendC::LocalTensor xLocal = inQueueX.AllocTensor(); 使用inBuf0。inQueueX.EnQue(xLocal)中包含SetFlag<MTE2_V>。MTE2触发SetFlag<MTE2_V>,通知Vector计算单元可以计算。
 在Compute函数中,AscendC::LocalTensor xLocal = inQueueX.DeQue();包含有WaitFlag<MTE2_V>。Vector计算单元等待数据完成copy,才会执行AscendC::Add指令。
inQueueX.FreeTensor(xLocal); FreeTensor函数中包含SetFlag<V_MTE2>。Vector计算单元完成计算后,触发SetFlag,通知这块内存可以写入新的数据。在i = 2时候,程序向inBuf0拷贝新的数据。
 i = 1

CopyIn(1);
Compute(1);
CopyOut(1);

 在CopyIn函数中,AscendC::LocalTensor xLocal = inQueueX.AllocTensor(); 使用inBuf1。
 在Compute函数中,inQueueX.FreeTensor(xLocal); Vector计算单元完成计算后,触发SetFlag<V_MTE2>,通知这块内存可以写入新的数据。在i = 3时候,程序向inBuf1拷贝新的数据。
 i = 2

CopyIn(2);
Compute(2);
CopyOut(2);

 在CopyIn函数中,AscendC::LocalTensor xLocal = inQueueX.AllocTensor(); 使用inBuf0。
 在Compute函数中,inQueueX.FreeTensor(xLocal); Vector计算单元完成计算后,触发SetFlag<V_MTE2>,通知这块内存可以写入新的数据。在i = 4时候,程序向inBuf0拷贝新的数据。
 i = 3

CopyIn(3);
Compute(3);
CopyOut(3);

 在CopyIn函数中,AscendC::LocalTensor xLocal = inQueueX.AllocTensor(); 使用inBuf1。
 在Compute函数中,inQueueX.FreeTensor(xLocal); Vector计算单元完成计算后,触发SetFlag<V_MTE2>,通知这块内存可以写入新的数据。在i = 5时候,程序向inBuf1拷贝新的数据。

EnQue/DeQue代码实现片段

 EnQue代码实现片段

template <TPosition src, TPosition dst, int32_t depth, auto mask>
template <typename T>
__aicore__ inline __sync_alias__ bool TQueBind<src, dst, depth, mask>::EnQue(const LocalTensor<T>& tensor)
{
    auto buf = tensor.GetBufferHandle();
    return EnQue(reinterpret_cast<TBufHandle>(buf));
}

template <TPosition src, TPosition dst, int32_t depth, auto mask>
__aicore__ inline __sync_alias__ bool TQueBind<src, dst, depth, mask>::EnQue(TBufHandle buf)
{
    auto ptr = reinterpret_cast<TBufType*>(buf);
    if constexpr (depth == 1) {
        this->que_ = buf;
    } else {
        this->que_[this->tail] = buf;
    }
    this->usedCount++;

    auto enQueEvtID = GetTPipePtr()->AllocEventID<enQueEvt>();
    SetFlag<enQueEvt>(enQueEvtID);
    ptr->enQueEvtID = enQueEvtID;

    if constexpr (depth != 1) {
        if (++this->tail >= depth) {
            this->tail = 0;
        }
    }
}

 EnQue中包含SetFlag(enQueEvtID)。
 enQueEvt是根据srcHardType和dstHardType确定的。
 当从GM拷贝数据到UB,例如CopyIn中的AscendC::DataCopy(xLocal, xGm[progress * TILE_LENGTH], TILE_LENGTH); 此时src == Hardware::GM,dst == Hardware::UB,那么enQueEvt=HardEvent::MTE2_V。
 当从UB拷贝数据到GM,例如CopyOut中的AscendC::DataCopy(zGm[progress * TILE_LENGTH], zLocal, TILE_LENGTH); 此时src == Hardware::UB,dst == Hardware::GM,那么enQueEvt= HardEvent::V_MTE3。
 DeQue代码实现片段:

template <TPosition src, TPosition dst, int32_t depth, auto mask>
template <typename T>
__aicore__ inline __sync_alias__ LocalTensor<T> TQueBind<src, dst, depth, mask>::DeQue()
{
    auto buf = DeQue();
    auto ret = Buf2Tensor<T>(buf);
    return ret;
}

template <TPosition src, TPosition dst, int32_t depth, auto mask>
__aicore__ inline __sync_alias__ TBufHandle TQueBind<src, dst, depth, mask>::DeQue()
{
    TBufHandle buf;
    if constexpr (depth == 1) {
        buf = this->que_;
    } else {
        buf = this->que_[this->head];
    }
    if (ptr->enQueEvtID != INVALID_TEVENTID) {
        WaitFlag<enQueEvt>(ptr->enQueEvtID);
        GetTPipePtr()->ReleaseEventID<enQueEvt>(ptr->enQueEvtID);
        ptr->enQueEvtID = INVALID_TEVENTID;
    }
    if constexpr (depth != 1) {
        if (++this->head >= depth) {
            this->head = 0;
        }
    }
}

 enQueEvt参考上述说明。DeQue中包含WaitFlag。WaitFlag获取到事件之后,就释放了:
GetTPipePtr()->ReleaseEventID。

FreeTensor/AllocTensor

 FreeTensor代码实现片段

template <TPosition src, TPosition dst, int32_t depth, auto mask>
template <typename T>
__aicore__ inline void TQueBind<src, dst, depth, mask>::FreeTensor(LocalTensor<T>& tensor)
{
    FreeBuffer(tensor.GetBufferHandle());
    return;
}

template <TPosition src, TPosition dst, int32_t depth, auto mask>
__aicore__ inline void TQueBind<src, dst, depth, mask>::FreeBuffer(TBufHandle buf)
{
    auto ptr = reinterpret_cast<TBufType*>(buf);
    ptr->freeBufEvtID = GetTPipePtr()->AllocEventID<freeBufEvt>();
    SetFlag<freeBufEvt>(ptr->freeBufEvtID);
    ptr->state = TBufState::FREE;
    this->bufUsedCount--;
}

 当src == Hardware::GM,dst == Hardware::UB,那么freeBufEvt = HardEvent::V_MTE2。
 当src == Hardware::UB,dst == Hardware::GM,那么freeBufEvt = HardEvent::MTE3_V。
 SetFlag通知这块内存可以使用。
 以add_custom算子为例
 当i = 0时,在Compute函数中,inQueueX.FreeTensor(xLocal),SetFlag<V_MTE2>,通知MTE2拷贝数据。ptr->freeBufEvtID赋值为申请的event id。这个事件在i = 2时候,在CopyIn函数中被消费(inQueueX.AllocTensor<half>).
 AllocTensor代码实现片段

template <TPosition src, TPosition dst, int32_t depth, auto mask>
__aicore__ inline TBufHandle TQueBind<src, dst, depth, mask>::AllocBuffer()
{
    do {
        ret = this->bufStart + this->bufCursor;
        if constexpr (config.bufferNumber != 1) {
            this->bufCursor += 1;
            if (this->bufCursor == this->bufNum) {
                this->bufCursor = 0;
            }
        }
        if (ret->state == TBufState::FREE) {
            ret->state = TBufState::OCCUPIED;
            if (ret->freeBufEvtID != INVALID_TEVENTID) {
                WaitFlag<freeBufEvt>(ret->freeBufEvtID);
                GetTPipePtr()->ReleaseEventID<freeBufEvt>(ret->freeBufEvtID);
                ret->freeBufEvtID = INVALID_TEVENTID;
            }
            break;
        }
        ASCENDC_ASSERT((++size <= this->bufNum), {
            KERNEL_LOG(KERNEL_ERROR, "size is %d, which exceed limits %d", size, static_cast<int32_t>(this->bufNum));
        });
    } while (true);
    this->bufUsedCount++;
}

 当i = 0时,在CopyIn函数中,inQueueX.AllocTensor<half>,此时ret->freeBufEvtID = INVALID_TEVENTID。Vector计算单元完成AscendC::Add后,执行inQueueX.FreeTensor中封装的SetFlag<V_MTE2>。
 当i = 2时,在CopyIn函数中,inQueueX.AllocTensor<half>,此时ret->freeBufEvtID != INVALID_TEVENTID, 下发WaitFlag<freeBufEvt>指令。MTE2需要等待事件。这就需要等待inBuf0中的数据在 i = 0已经被使用,就是完成AscendC::Add计算。

SetFlag/WaitFlag例子

 当Scala需要用到Vector计算的数据,就需要使用HardEvent::V_S类型的同步事件。
 例子:moe_v2_fullload_dynamic_quant.h
 Process函数

template <typename T>
__aicore__ inline void MoeV2FullLoadDynamicQuant<T>::Process()
{
    if (this->blockIdx_ < this->needCoreNum_) {
        CopyIn();
        SortCompute();
        if (this->blockIdx_ == 0) {
            CopyOutIdx();
        }
        if (this->blockIdx_ == this->needCoreNum_ - 1 && this->expertTokensCountOrCumsumFlag > EXERPT_TOKENS_NONE) {
            ComputeExpertTokenCountOrCumsum();
        } else {
            CopyOutEmpty();
        }
        if (smoothType == 2) {
            CopyOutXQuantEH();
        } else {
            CopyOutXQuant1H();
        }
    }
}

 SortCompute函数, Cast, Muls, Duplicate, Concat, Sort, Extract均是Vector计算单元提供的能力。PipeBarrier: 阻塞相同流水,具有数据依赖的相同流水之间需要插此同步。

template <typename T>
__aicore__ inline void MoeV2FullLoadDynamicQuant<T>::SortCompute()
{
    LocalTensor<int32_t> inLocal = sortDataCopyInQueue.DeQue<int32_t>();
    LocalTensor<int32_t> expertIdxLocal = inLocal[0];
    LocalTensor<float> expertIdxLocalFp32 = expertIdxLocal.ReinterpretCast<float>();
    Cast(expertIdxLocalFp32, expertIdxLocal, RoundMode::CAST_ROUND, this->totalLength);
    PipeBarrier<PIPE_V>();
    Muls(expertIdxLocalFp32, expertIdxLocalFp32, (float)-1, this->totalLength);
    PipeBarrier<PIPE_V>();
    int64_t duplicateNum = this->totalLength % ONE_REPEAT_SORT_NUM;
    if (duplicateNum > 0) {
        int duplicateIndex = this->totalLength - duplicateNum;
        uint64_t mask0 = UINT64_MAX;
        mask0 = mask0 << duplicateNum;
        mask0 = mask0 & (UINT64_MAX >> ONE_REPEAT_SORT_NUM);
        uint64_t mask[2] = {mask0, 0};
        Duplicate(expertIdxLocalFp32[duplicateIndex], MIN_FP32, mask, 1, DST_BLK_STRIDE, DST_REP_STRIDE);
        PipeBarrier<PIPE_V>();
    }
    LocalTensor<float> concatLocal;
    LocalTensor<float> tempTensor = tempBuffer.Get<float>(GetSortLen<float>(this->sortNum_));
    Concat(concatLocal, expertIdxLocalFp32, tempTensor, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    LocalTensor<uint32_t> rowIdxLocal = inLocal[this->sortNum_].template ReinterpretCast<uint32_t>();
    LocalTensor<float> sortedLocal = sortedBuffer.Get<float>(GetSortLen<float>(this->sortNum_));
    Sort<float, true>(sortedLocal, concatLocal, rowIdxLocal, tempTensor, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    LocalTensor<float> expandedExpertIdxLocal = expandedExpertIdxCopyOutQueue_.AllocTensor<float>();
    expandDstToSrcRowLocal = expandDstToSrcRowQueue_.AllocTensor<uint32_t>();
    LocalTensor<float> expandDstToSrcRowLocalFp32 = expandDstToSrcRowLocal.ReinterpretCast<float>();
    Extract(expandedExpertIdxLocal, expandDstToSrcRowLocal, sortedLocal, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    Cast(
        expandDstToSrcRowLocalFp32, expandDstToSrcRowLocal.ReinterpretCast<int32_t>(), RoundMode::CAST_ROUND,
        this->totalLength);
    PipeBarrier<PIPE_V>();
    Muls(expandedExpertIdxLocal, expandedExpertIdxLocal, (float)-1, this->totalLength);
    PipeBarrier<PIPE_V>();
    LocalTensor<int32_t> expandedExpertIdxLocalInt32;
    expandedExpertIdxLocalInt32 = expandedExpertIdxLocal.ReinterpretCast<int32_t>();
    Cast(expandedExpertIdxLocalInt32, expandedExpertIdxLocal, RoundMode::CAST_ROUND, this->totalLength);
    PipeBarrier<PIPE_V>();
    expandedExpertIdxCopyOutQueue_.EnQue<int32_t>(expandedExpertIdxLocalInt32);

    LocalTensor<uint32_t> expandedRowIdx = expandedRowIdxCopyOutQueue_.AllocTensor<uint32_t>();
    LocalTensor<uint32_t> expandedRowIdxU32 = expandedRowIdx.ReinterpretCast<uint32_t>();
    Muls(expandDstToSrcRowLocalFp32, expandDstToSrcRowLocalFp32, (float)-1, this->totalLength);
    PipeBarrier<PIPE_V>();
    ArithProgression<int32_t>(inLocal[this->sortNum_], 0, 1, this->totalLength);
    PipeBarrier<PIPE_V>();
    if (duplicateNum > 0) {
        int duplicateIndex = this->totalLength - duplicateNum;
        uint64_t mask0 = UINT64_MAX;
        mask0 = mask0 << duplicateNum;
        mask0 = mask0 & (UINT64_MAX >> ONE_REPEAT_SORT_NUM);
        uint64_t mask[2] = {mask0, 0};
        Duplicate(expandDstToSrcRowLocalFp32[duplicateIndex], MIN_FP32, mask, 1, DST_BLK_STRIDE, DST_REP_STRIDE);
        PipeBarrier<PIPE_V>();
    }
    Concat(concatLocal, expandDstToSrcRowLocalFp32, tempTensor, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    Sort<float, true>(sortedLocal, concatLocal, rowIdxLocal, tempTensor, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    Extract(tempTensor, expandedRowIdxU32, sortedLocal, this->sortNum_ / ONE_REPEAT_SORT_NUM);
    PipeBarrier<PIPE_V>();
    expandedRowIdxCopyOutQueue_.EnQue<uint32_t>(expandedRowIdx);
    sortDataCopyInQueue.FreeTensor(inLocal);
}

 ComputeExpertTokenCountOrCumsum函数

template <typename T>
__aicore__ inline void MoeV2FullLoadDynamicQuant<T>::ComputeExpertTokenCountOrCumsum()
{
    expandedExpertIdxLocal = expandedExpertIdxCopyOutQueue_.DeQue<int32_t>();
    LocalTensor<int32_t> expertTokensCount = expertTokensCopyOutQueue_.AllocTensor<int32_t>();

    int64_t expertNumAlign = Align(this->expertNum, sizeof(int32_t));
    Duplicate(expertTokensCount, 0, expertNumAlign);
    SetWaitFlag<HardEvent::V_S>(HardEvent::V_S);

    int32_t lastExpertId = expandedExpertIdxLocal.GetValue(0);
    int64_t tokenCount = 0;
    int64_t lastExpertCount = 0;
    for (int64_t i = 0; i < this->totalLength; i++) {
        int32_t curExpertId = expandedExpertIdxLocal.GetValue(i);
        tokenCount++;
        while (lastExpertId < curExpertId) {
            expertTokensCount.SetValue(lastExpertId, tokenCount - 1);
            if (this->expertTokensCountOrCumsumFlag == EXERPT_TOKENS_COUNT) {
                tokenCount = 1;
            }
            lastExpertId++;
        }
    }
    expertTokensCount.SetValue(lastExpertId, tokenCount);
    if (this->expertTokensCountOrCumsumFlag == EXERPT_TOKENS_CUMSUM) {
        lastExpertId++;
        while (lastExpertId < this->expertNum) {
            expertTokensCount.SetValue(lastExpertId, tokenCount);
            lastExpertId++;
        }
    }
    DataCopyExtParams copyParams{
        static_cast<uint16_t>(1), static_cast<uint32_t>(this->expertNum * sizeof(int32_t)), 0, 0, 0};
    if (this->expertTokensCountOrCumsumFlag > 0) {
        DataCopyPad(expertTokensCountOrCumsumGm, expertTokensCount, copyParams);
    }
    expertTokensCopyOutQueue_.FreeTensor(expertTokensCount);
}

 SetWaitFlag封装了SetFlag/WaitFlag。

template <HardEvent event>
__aicore__ inline void SetWaitFlag(HardEvent evt)
{
    event_t eventId = static_cast<event_t>(GetTPipePtr()->FetchEventID(evt));
    SetFlag<event>(eventId);
    WaitFlag<event>(eventId);
}

 在ComputeExpertTokenCountOrCumsum函数中,Scala计算单元需要等待Vector计算完成,设置相应的同步时间。这是因为for循环中的代码,需要读取expandedExpertIdxLocal中的数值(expandedExpertIdxLocal.GetValue)。
 在SortCompute函数中,expandedExpertIdxLocal的结果是由Vector计算单元写入数据的。

Logo

昇腾计算产业是基于昇腾系列(HUAWEI Ascend)处理器和基础软件构建的全栈 AI计算基础设施、行业应用及服务,https://devpress.csdn.net/organization/setting/general/146749包括昇腾系列处理器、系列硬件、CANN、AI计算框架、应用使能、开发工具链、管理运维工具、行业应用及服务等全产业链

更多推荐