Planet X

NCCL源码阅读笔记

NCCL 源码阅读笔记

最近简单读了一下NVIDIA Collective Communications Library (NCCL, pronounced “Nickel”) 的源代码。现将初步的一些收获总结一下。

代码框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
nccl-master-src
├── Makefile
├── include
├── collectives // 集合通信原语的实现
├── graph // 检测网络拓扑结构
├── transport // 与数据传输相关的函数实现
├── bootstrap.cc // (我的理解是)很多内建的helper function
├── channel.cc
├── debug.cc
├── enqueue.cc // 关于队列的操作
├── group.cc // NCCL group API的实现
├── init.cc // 初始化的代码
├── proxy.cc // Proxy线程相关的代码
└── transport.cc // 数据传输相关

目前我读到的一些代码以及相应的功能已经标注在文件树中。关于底层的数据传输、怎样建立socket通信的代码,主要集中在transport.cc proxy.cc以及transport文件夹中。关于上层的集合通信原语的实现,例如Allreduce, Allgather等,主要集中在collectives文件夹中。

Start from NCCL example

拿到源码有点无从下手,先看一个NCCL Doc中给出的example。根据example调用的API入手阅读源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
int main(int argc, char* argv[])
{
int size = 32*1024*1024;
int myRank, nRanks, localRank = 0;
//initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
//calculating localRank based on hostname which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p=0; p<nRanks; p++) {
if (p == myRank) break;
if (hostHashs[p] == hostHashs[myRank]) localRank++;
}
ncclUniqueId id;
ncclComm_t comm;
float *sendbuff, *recvbuff;
cudaStream_t s;
//get NCCL unique ID at rank 0 and broadcast it to all others
if (myRank == 0) ncclGetUniqueId(&id);
MPICHECK(MPI_Bcast((void *)&id, sizeof(id), MPI_BYTE, 0, MPI_COMM_WORLD));
//picking a GPU based on localRank, allocate device buffers
CUDACHECK(cudaSetDevice(localRank));
CUDACHECK(cudaMalloc(&sendbuff, size * sizeof(float)));
CUDACHECK(cudaMalloc(&recvbuff, size * sizeof(float)));
CUDACHECK(cudaStreamCreate(&s));
//initializing NCCL
NCCLCHECK(ncclCommInitRank(&comm, nRanks, id, myRank));
//communicating using NCCL
NCCLCHECK(ncclAllReduce((const void*)sendbuff, (void*)recvbuff, size, ncclFloat, ncclSum,
comm, s));
//completing NCCL operation by synchronizing on the CUDA stream
CUDACHECK(cudaStreamSynchronize(s));
//free device buffers
CUDACHECK(cudaFree(sendbuff));
CUDACHECK(cudaFree(recvbuff));
//finalizing NCCL
ncclCommDestroy(comm);
//finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}

可以看到,这段代码先是执行了MPI相关的一些操作。调用MPI主要是为了在NCCL连接还未建立时,在设备之间传输一些额外信息,比如ncclUniqueId。然后是CUDA相关的一些操作,分配内存、创建stream等等。真正涉及到NCCL的代码,在上述example中只有两个API:ncclCommInitRankncclAllReduce。(除去ncclUniqueIdncclCommDestroy)。

因此,我们就来看看这两个API是如何调用的好了。

初始化Communicator: ncclCommInitRank

1
ncclResult_t ncclCommInitRank(ncclComm_t comm, int nranks, ncclUniqueId commId, int rank)

要使用NCCL进行通信,每个设备上都要有一个NCCL Communicator object。属于同一个Communicator的各个设备具有相同的ncclUniqueId以及不同的rank。这个API用于在一个设备上初始化Communicator object。在设置不同设备上的communicator时,这个API必须被不同的线程/进程调用。或者使用ncclGroupStart/ncclGroupEnd 来通过一个线程/进程设置多个设备的Communicator。

  • (init.cc) ncclCommInitRank-> (init.cc) ncclCommInitRankDev

    1
    2
    3
    4
    5
    6
    7
    ncclCommInitRankDev(){
    // rank == 0 时,执行bootstrapCreateRoot
    if (env && myrank == 0)
    bootstrapCreateRoot();
    // 每个rank都要执行
    ncclCommInitRankSync();
    }
  • (init.cc) ncclCommInitRankDev ->bootstrapCreateRoot

    1
    2
    3
    4
    5
    6
    bootstrapCreateRoot(){
    // 此处用来create listen socket的Addr是Communicator 的UniqID
    createListenSocket(&listenFd, connectAddr);
    // 创建监听线程
    pthread_create(&thread, NULL, bootstrapRoot, (void*)(uint64_t)listenFd);
    }
  • (init.cc) ncclCommInitRankDev ->ncclCommInitRankSync

    1
    2
    3
    4
    ncclResult_t ncclCommInitRankSync(ncclComm_t* newcomm, int nranks, ncclUniqueId commId, int myrank, int cudaDev) {
    initTransportsRank(*newcomm, &commId);
    devCommSetup(*newcomm);
    }
  • initTransportsRank 这个函数对数据传输做了大量的初始化工作。包括:

    • 建立设备之间的socket连接

    • 检测系统里的设备以及设备之间的拓扑结构

    • 计算当前系统中的RING、TREE、COLLNET结构

      CollNet is a new algorithm in NCCL that allows GPUs on multiple nodes to do in-network reductions.

    • 建立每个设备之间的连接。peer之间的通信方式有三种:

      • p2p transport (uses CUDA direct access between GPUs, using NVLink or PCI.)
      • shared memory transport (using host memory.)
      • net transport (InfiniBand or IP sockets.)

      ncclTransportP2pSetup() -> selectTransport()中,会选择各个peer之间适用的通信方式。对于p2p和shm通信方式,在建立连接后,可以直接进行数据传输(通过GPU peer-to-peer或者host memory),而通过network连接的peer,还需要proxy线程来通过socket进行数据传输。

集合通信原语的实现

在完成设备的Communicator初始化后,就可以调用集合通信的相关原语。在这里我们以Allreduce为例,分析集合通信原语的实现逻辑。

1
2
3
4
5
6
7
8
9
10
NCCL_API(ncclResult_t, ncclAllReduce, const void* sendbuff, void* recvbuff, size_t count,
ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream);
ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count,
ncclDataType_t datatype, ncclRedOp_t op, ncclComm* comm, cudaStream_t stream) {
NVTX3_FUNC_RANGE_IN(nccl_domain);
struct ncclInfo info = { ncclFuncAllReduce, "AllReduce",
sendbuff, recvbuff, count, datatype, op, 0, comm, stream, /* Args */
ALLREDUCE_CHUNKSTEPS, ALLREDUCE_SLICESTEPS };
return ncclEnqueueCheck(&info);
}

可以看到,调用ncclAllReduce时,所有的变量被存到一个ncclInfo 结构体中,然后通过ncclEnqueueCheck 将这个结构体插入到队列中。

ncclEnqueueCheck 执行了以下操作:

  • ncclSaveKernel

    对将要执行的操作进行一些准备工作。主要是调用computeColl()计算ncclProxyArgs 这个结构体中的信息。这个结构体将被用于初始化Proxy。并且这个函数也计算了将要launch的 CUDA kernel的参数。

  • ncclBarrierEnqueue ncclBarrierEnqueueWait

    launch所有的 CUDA kernel,通过插入barrier的方式设置kernel之间的依赖关系。最后,通过ncclProxyStart() 启动Proxy线程。

AllReduce的具体执行逻辑:all_reduce.h 中,有9种实现。

1
2
三种算法:NCCL_ALGO_RING, NCCL_ALGO_TREE, NCCL_ALGO_COLLNET
三种协议:NCCL_PROTO_SIMPLE, NCCL_PROTO_LL, NCCL_PROTO_LL128
1
2
3
4
// 以NCCL_ALGO_RING, NCCL_PROTO_SIMPLE的实现为例
// 首先从args这个结构体中获取当前操作所需要的参数,例如当前的channel,数据传输的chunk size等。
// 实例化一个ncclPrimitives类
ncclPrimitives<UNROLL, ALLREDUCE_CHUNKSTEPS/ALLREDUCE_SLICESTEPS, ALLREDUCE_SLICESTEPS, T, 1, 1, 1, FUNC> prims(tid, nthreads, &ring->prev, &ring->next, thisOutput, stepSize, channel, comm, ncclShmem->ptrs, 0);
  • 一个非常重要的类:ncclPrimitives 。这个类实现了各类通信原语。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    #define SYNC_GROUP 8
    static_assert(NSEND < SYNC_GROUP && NRECV < SYNC_GROUP, "Not enough threads to cover all peers");
    int g = tid / SYNC_GROUP;
    int ng = nthreads / SYNC_GROUP;
    index = tid % SYNC_GROUP;
    if (g == 0) {
    if (index < nrecv) role |= ROLE_WAIT_RECV;
    if (index == nrecv) role |= ROLE_SRC;
    } else if (g == 1) {
    if (index < nsend) role |= ROLE_WAIT_SEND;
    if (index == nsend) role |= ROLE_DST;
    } else if (g == ng - 2) {
    if (index < nrecv) role |= ROLE_POST_RECV;
    } else if (g == ng - 1) {
    if (index < nsend) role |= ROLE_POST_SEND;
    }

    根据tid(thread ID)nthreads 以及SYNC_GROUP 就能确定当前设备的角色。这些角色分别是什么意思?

    1
    2
    3
    4
    5
    6
    ROLE_WAIT_RECV
    ROLE_SRC
    ROLE_WAIT_SEND
    ROLE_DST
    ROLE_POST_RECV
    ROLE_POST_SEND

    比较巧妙的设计是,通过设计一个GenericOp,改变其6个input的值就能使用相同的代码实现多种不同的功能。

    1
    2
    3
    template <int DIRECTRECV, int DIRECTSEND, int RECV, int SEND, int SRC, int DST>
    inline __device__ void
    GenericOp(const T* srcPtr, T* dstPtr, int nelem, ssize_t directOffset)

    在GenericOp中,调用send和recv之类的操作时,会修改ncclConnInfo 这个结构体中的void* *ptrsFifo; 以及int *sizesFifo; 这两个数据结构,这两个结构中存储着要传输的数据地址以及对应的数据大小。在 netTransport的proxy相关的函数中,会检查这个队列,进行传输。

其他

  • 异步模式是如何被启用的?

    1
    2
    3
    bool ncclAsyncMode() {
    return ncclGroupMode > 0;
    }

    可以看到,异步模式仅在 ncclGroupMode>0时启用。而这个变量是被group操作相关的函数所修改的。

    1
    2
    3
    4
    5
    6
    7
    8
    ncclResult_t ncclGroupStart() {
    NVTX3_FUNC_RANGE_IN(nccl_domain);
    if (ncclGroupMode == 0) {
    memset(ncclGroupArgs, 0, sizeof(struct ncclAsyncArgs)*MAX_ASYNC_OPS);
    }
    ncclGroupMode++;
    return ncclSuccess;
    }

    可以看出,每次使用group操作时,都会启用异步模式。异步模式下,很多操作不再阻塞等待结果返回。因此同一个函数的同步实现和异步实现会有些许不同。

  • 关于NCCL call在GPU上执行的方式:

    The NCCL call returns when the operation has been effectively enqueued to the given stream, or returns an error. The collective operation is then executed asynchronously on the CUDA device.

    类似于TensorFlow中,各个operator的执行流程。先是enqueue到一个给定的stream。然后在GPU上异步地执行,The operation status can be queried using standard CUDA semantics, for example, calling cudaStreamSynchronize or using CUDA events。

  • 以broadcast为例分析集合通信原语的具体逻辑:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
    int realChunkSize = min(chunkSize, DIVUP(size-gridOffset,nChannels));
    ALIGN_SIZE(realChunkSize, nthreads*sizeof(uint64_t)/sizeof(T));
    ssize_t offset = gridOffset + bid*realChunkSize;
    int nelem = min(realChunkSize, size-offset);
    if (rank == root) {
    // 如果是root,就要向其他设备广播当前设备buffer中的数据
    if (thisInput == thisOutput) {
    prims.send(thisInput+offset, nelem);
    } else {
    prims.copySend(thisInput+offset, thisOutput+offset, nelem);
    }
    }
    // 如果不是root,就要从其他节点接收root传来的数据。因为是ring结构,所以root发来的数据可能在环上依次传递,并不一定直接接收来自root的数据。
    else if (nextRank == root) {
    //如果是root之前的最后一个设备,接收即可,不再发送。
    prims.recv(thisOutput+offset, nelem);
    } else {
    // 否则,不仅要receive,还要copy,send.
    prims.recvCopySend(thisOutput+offset, nelem);
    }
    }
  • proxy可以看做是在CPU端运行了一个当前GPU跟其他GPU通信的代理,由proxy来决定通过何种方式传输数据。(采用netTransport的)Proxy会运行一个驻留线程。循环检查队列中有无op。op->progress则是每个op对应的proxy操作。

  • 跨CPU的数据通信可以采用Socket、Infiniband等机制。

  • GPU上的kernel内部也有一个(待传输数据的)队列。这个队列是怎样维护的?

    这个队列不需要维护。在launch kernel之后,kernel会计算要传输的数据的地址和大小,修改ncclConnInfo 这个结构体中的数据队列指针*tail,然后调用同步函数,等待proxy线程传输刚刚添加到*tail中的数据。

    因此,数据传输过程其实是GPU上的NCCL kernel跟CPU上的Proxy线程协同完成的。

    Proxy线程中与数据传输有关的三个函数是

    ncclNetIrecv:Proxy从网络收到数据

    ncclNetIsend:Proxy发送数据到网络

    ncclNetIflush: Proxy将数据从CPU传到GPU

    在NCCL中没有这三个函数的实现,应该是调用了外部API。

  • 数据怎样从一个GPU传输到同一个节点的另一个GPU?

    Peer-to-peer, PCI+host memory

  • 数据怎样从一个GPU传输到另一个节点的GPU?

    Socket, InfiniBand

总结

总的来说,NCCL的工作流程如下:

  1. 首先,每个要参与数据传输的GPU都要调用ncclCommInitRank创建一个与其rank对应的Communicator,同一个communication group中的每个communicator具有相同的unique ID。

  2. 当每个设备调用ncclCommInitRank时,设备之间会交换一些信息,例如各自的IP,bus ID等。然后检测整个系统中的网络拓扑结构。

  3. 有了网络拓扑结构,NCCL会进一步搜索当前网络中最佳的RING、TREE、COLLNET图结构。

  4. 有了设备之间的图结构信息,就可以在存在通路的设备之间建立点对点的连接。主要有三种连接方式:p2p,shared memory以及network。采用哪种方式取决于这两个节点之间支持怎样的连接方式。

    以上就是初始化阶段的所有准备工作。

  5. 初始化完成后,就可以调用集合通信原语。例如ncclAllReduce。集合通信函数会被enqueue到一个CUDA stream上,在GPU上异步执行。

  6. 接下来在CPU上启动Proxy线程,作为GPU上集合通信kernel的代理,与GPU kernel协同完成与其他设备之间的数据传输。GPU kernel负责计算所需传输的数据的地址以及数据量大小,而Proxy线程负责完成实际的数据传输。对于采用p2pTransport 以及shmTransport的设备,在建立连接后可以直接传输数据,对于采用netTransport的设备,则需要通过socket进行数据传输。