跳转到内容
ForeverYoung
返回

Numba: 通过python快速学习cuda编程

复杂的编译环境配置,指针操作和debug过程,CUDA编程对于初学者来说总是望而生畏。本文旨在通过使用python的numba库,快速学习和了解CUDA多线程高并发的编程思路。

环境配置

为了简化环境配置,本文中的编程环境将全部由conda配置,并在conda的虚拟环境中测试。所涉及的依赖库分别是cudatoolkit, numba, numpy。相关环境可以通过以下语令进行配置。

conda create -n numba-cuda python=3.7
conda activate numba-cuda
conda install cudatoolkit numba numpy

当安装完成后,可以通过nvidia-smi去检查gpu状态,如下表所示。通过该表可以看到运行在每个gpu上的程序以及它们显存的使用占用,也可以通过memory-usage检查每个显卡总体显存占用。

+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.82       Driver Version: 440.82       CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|===============================+======================+======================|
|   0  GeForce GTX 108...  Off  | 00000000:01:00.0  On |                  N/A |
| 27%   36C    P8   112W / 250W |   6338MiB / 11175MiB |     38%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
|=============================================================================|
|    0     32597      G   /home/yang/anaconda3/envs/ssds/bin/python   6338MiB |
+-----------------------------------------------------------------------------+

并且也可以通过python -c "from numba import cuda; print(cuda.gpus)"检查是否可以成功在numba库中访问到gpu设备。当其返回为<Managed Device 0>...时则证明可在python环境下成功访问显卡。

当主机没有gpu设备时,依然可以通过numba提供的gpu模拟器去运行python的cuda代码,只需设置相关环境变量即可:export NUMBA_ENABLE_CUDASIM=1。需要注意的是该模拟器通过cpu进行模拟调试,物理上的计算单元个数远小于gpu个数。所以通过模拟器运行的程序运行速度略慢于等同cpu多线程。而且在模拟器能够运行的程序,不一定能够在gpu设备上运行。因此只能做学习使用。

当环境搭建成功后,接下来就开始CUDA编程之旅吧~

“Hello, World!”: 初窥GPU中的线程网格(Grid),线程块(Block)和线程(Thread)

环境配置成功后,本小节将通过在gpu设备上执行打印”Hello, World!”程序,来初步了解gpu编程中的基本概念以及启动gpu函数的基本流程。具体程序如下。

代码:"Hello, World!"
from numba import cuda

def cpu_print():
    print('cpu: "hello world!"')

@cuda.jit
def gpu_print():
    print(cuda.blockIdx.x, cuda.threadIdx.x, 'gpu: "hello world!"')

if __name__ == "__main__":
    gpu_print[2, 4]()
    cuda.synchronize()
    cpu_print()

输出:

0 0 gpu: "hello world!"
0 1 gpu: "hello world!"
0 2 gpu: "hello world!"
0 3 gpu: "hello world!"
1 0 gpu: "hello world!"
1 1 gpu: "hello world!"
1 2 gpu: "hello world!"
1 3 gpu: "hello world!"
cpu: "hello world!"

在numba中,cuda相关的函数被封装在numba.cuda中,可通过from numba import cuda引入。如果需要将代码在gpu上执行,需要将代码封装到函数中,并为函数加装饰器@cuda.jit。添加@cuda.jit装饰器的函数常备称为核函数(kernal),由cpu调用,但只在gpu设备上执行。

具体而言,核函数运行在gpu的最小计算单元(core)上,每次运行由单个线程(Thread)所调用,多个线程组成一个线程块(Block),多个线程块组成线程网格(Grid)。粗略的讲,通常执行单个核函数的所有线程在一个线程网络中。该线程网络控制着线程块的数量和生存周期,而每个线程块又控制着其所属的线程的个数和生存周期。运行线程的计算单元被称为流处理器(SP: Streaming Processor),多个线程组成的线程块运行在多流处理器上(SM: Streaming Multiprocessor),多个线程块组成的线程网格运行在一个gpu显卡上。

因此在核函数被cpu调用时,需要传入该核函数所在的线程网络中线程块的总体个数,以及每个线程块的线程个数。因此在执行numba.cuda中的核函数时需要定义线程块和线程的个数,如kernal[num_block_per_grid, num_thread_per_block]()。在本例中,gpu_print[2, 4]()表示使用2个线程块,每个线程块中使用4个线程去并行执行该核函数。因此共有2*4=8个线程并行执行该核函数,也就输出了8次"hello world!"

核函数的启动方式是异步的:启动gpu核函数后,cpu不会等待gpu核函数执行完毕才执行下一行代码。必要时,需要调用cuda.synchronize(),告知cpu等待gpu执行完核函数后,再进行cpu端后续计算。这个过程被称为同步。如果不调用cuda.synchronize()函数,执行结果也将改变,cpu: "hello world!"将先被打印。虽然gpu核函数调用在前,但是程序并没有等待核函数执行完,而是继续执行后面的cpu_print函数。由于cpu调用gpu有一定的延迟,反而后面的cpu_print先被执行,因此cpu_print的结果先被打印了出来。

需要注意的是每个执行核函数的线程都知道其所在的线程块索引(block index)和线程索引(thread index),以及线程块和线程的个数(grid dimension and block dimension)。由于索引值可以通过计算等价于循环中的索引值,因此可以较为简单的通过使用cuda并行计算来取代程序中的顺序循环。线程(块)索引和个数可以有最多三个维度,可分别通过x,y,z进行访问。比如,在本例中我们通过cuda.blockIdx.xcuda.threadIdx.x来得到该线程的线程块索引和线程索引,并且将其打印。

下文会通过并行计算矩阵加法来初步了解cuda多线程并行计算。

矩阵加法: 小试并行计算

上一小节通过”Hello, World!”程序,对gpu核函数的运行流程以及cuda编程中的线程网格(Grid),线程块(Block)和线程(Thread)有了一个基本了解。本节将通过二维矩阵加法来进一步阐述cuda编程中的线程设置以及数据拷贝。具体程序如下。

代码:矩阵加法
import time
import random
import math
import numpy as np
from numba import cuda, jit


def matrix_add_cpu(matrix_A, matrix_B, n_rows, n_cols):
    return [[matrix_A[ridx][cidx] + matrix_B[ridx][cidx] for cidx in range(n_cols)] for ridx in range(n_rows)]

def matrix_add_numpy(matrix_A, matrix_B, n_rows, n_cols):
    return matrix_A + matrix_B

def matrix_add_cpu_v2(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    for ridx in range(n_rows):
        for cidx in range(n_cols):
            matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]
    return matrix_C

@jit
def matrix_add_numba_jit(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    for ridx in range(n_rows):
        for cidx in range(n_cols):
            matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]
    return matrix_C

@cuda.jit
def matrix_add_numba_cuda(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    ridx = cuda.threadIdx.y + cuda.blockDim.y * cuda.blockIdx.y
    cidx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]

@cuda.jit
def matrix_add_numba_cuda_copy(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    ridx = cuda.threadIdx.y + cuda.blockDim.y * cuda.blockIdx.y
    cidx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]

@cuda.jit
def matrix_add_numba_cuda_v2(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    ridx = idx // n_cols
    cidx = idx %  n_cols

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]


if __name__ == "__main__":
    n_row, n_col = 1080, 1920

    arr_A, arr_B = [[[random.random() for _ in range(n_col)] for _ in range(n_row)] for _ in range(2)]
    elapsed_time = time.time()
    arr_cpu = matrix_add_cpu(arr_A, arr_B, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    print("process time on cpu : {:.03f}ms".format(elapsed_time * 1000))

    arr_A, arr_B = np.array(arr_A), np.array(arr_B)
    elapsed_time = time.time()
    arr_numpy = matrix_add_numpy(arr_A, arr_B, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    if (arr_numpy - np.array(arr_cpu)).max() < 1e-8:
        print("process time on numpy : {:.03f}ms".format(elapsed_time * 1000))

    arr_C = arr_B.copy()
    elapsed_time = time.time()
    arr_cpu_v2 = matrix_add_cpu_v2(arr_A, arr_B, arr_C, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    if (arr_numpy - arr_cpu_v2).max() < 1e-8:
        print("process time on cpu v2 : {:.03f}ms".format(elapsed_time * 1000))

    elapsed_time = time.time()
    arr_n_jit = matrix_add_numba_jit(arr_A, arr_B, arr_C, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    if (arr_numpy - arr_n_jit).max() < 1e-8:
        print("process time on numba jit : {:.03f}ms".format(elapsed_time * 1000))

    threadsperblock = (16, 16)
    blockspergrid_x = math.ceil(n_col / threadsperblock[0])
    blockspergrid_y = math.ceil(n_row / threadsperblock[1])
    blockspergrid = (blockspergrid_x, blockspergrid_y)
    elapsed_time = time.time()
    matrix_add_numba_cuda[blockspergrid, threadsperblock](arr_A, arr_B, arr_C, n_row, n_col)
    cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    if (arr_numpy - arr_C).max() < 1e-8:
        print("process time on numba cuda : {:.03f}ms".format(elapsed_time * 1000))

    d_arr_A, d_arr_B, d_arr_C = cuda.to_device(arr_A), cuda.to_device(arr_B), cuda.to_device(arr_C)
    cuda.synchronize()
    elapsed_time = time.time()
    matrix_add_numba_cuda_copy[blockspergrid, threadsperblock](d_arr_A, d_arr_B, d_arr_C, n_row, n_col)
    cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    arr_C = d_arr_C.copy_to_host()
    cuda.synchronize()
    if (arr_numpy - arr_C).max() < 1e-8:
        print("process time on numba cuda with copied data : {:.03f}ms".format(elapsed_time * 1000))

    threadsperblock = 256
    blockspergrid   = math.ceil((n_col*n_row)/threadsperblock)
    elapsed_time = time.time()
    matrix_add_numba_cuda_v2[blockspergrid, threadsperblock](d_arr_A, d_arr_B, d_arr_C, n_row, n_col)
    cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    arr_C = d_arr_C.copy_to_host()
    cuda.synchronize()
    if (arr_numpy - arr_C).max() < 1e-8:
        print("process time on numba cuda with copied data : {:.03f}ms".format(elapsed_time * 1000))

输出1:

process time on cpu : 203.750ms
process time on numpy : 2.630ms
process time on cpu v2 : 1334.258ms
process time on numba jit : 235.476ms
process time on numba cuda : 217.982ms
process time on numba cuda with copied data : 76.042ms
process time on numba cuda v2 with copied data : 75.180ms

输出2:

process time on cpu : 245.238ms
process time on numpy : 2.766ms
process time on cpu v2 : 1410.334ms
process time on numba jit (1st): 234.586ms
process time on numba jit (2nd): 3.086ms
process time on numba cuda (1st): 234.172ms
process time on numba cuda (2nd): 15.431ms
process time on numba cuda with copied data (1st): 80.449ms
process time on numba cuda with copied data (2nd): 0.642ms
process time on numba cuda v2 with copied data (1st): 78.265ms
process time on numba cuda v2 with copied data (2nd): 0.538ms

当计算独立时,并行的cuda核函数可以简单取代程序中的循环并实现巨大的提速。在本例中,纯python编程想要计算两矩阵相加,必须通过循环来遍历矩阵上的所有点来进行计算。正如上一小节所讲,在cuda核函数中的索引值等价于循环中的索引值,python程序中的通过顺序循环遍历矩阵就可以被cuda中的多线程核函数并行遍历矩阵所取代。在核函数中,线程可以通过计算索引来访问到对应的点的位置。比如,循环中的行和列的索引值ridx,cidx 在核函数可通过线程(块)索引和个数来计算得到ridx = cuda.threadIdx.y + cuda.blockDim.y * cuda.blockIdx.ycidx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x。如程序所示,该核函数中只计算了矩阵中一个点计算加法。因此在执行该gpu函数时,每个线程只为矩阵中一个点计算加法,多个线程并行运行,进而实现了快速高并发的并行计算。可以看到,相较于纯python编程,cuda并行计算可提速3倍到18倍。但有一点值得奇怪,如果该函数在运行第二次时,其速度为0.8ms左右,也就是存在264~1654倍的提速。所以可能在numba中第一遍速度较慢是由于numba的动态编译所导致的,具体原因尚需细查。

上小节提到线程块运行在多流处理器上,线程运行在流处理器上。在物理显卡中,多流处理器所包含的流处理器的数量是固定的,对目前市场上的主流显卡,多流处理器一般包含1024或512个流处理器。因此,在设置线程个数时,必须小于等于每个多流处理器包含流处理器的个数,也就是要小于或等于1024或512。所以一般线程个数在程序中常作为常量存在,如在本例线程的个数被固定为(16, 16),也就是256个线程。 并且,多流处理器在物理上通过线程束(warp)来分组调度流处理器,每个线程束调度的线程个数也是固定的,一般为32个。每个线程束中所有流处理器(32个)是一起工作的,执行相同的指令。如果没有这么多的流处理器需要工作,那么这个线程束中的一些流处理器是不工作的。为了使每个线程束中所有的线程都在工作,当进行并行计算时,线程数尽量为32的倍数。如在本例中,线程的个数是256((16, 16)),也就是32的8倍。如果线程数设置为1的话,线程束会生成掩码,使得32个线程中只有一个线程在真正执行,其他31个线程会进入静默状态。这样计算资源就会浪费,在计算量较大时,执行效率就明显变低了。

由于线程的个数在程序中常为常量,为了能够取代所有循环,一般线程块的个数可以由总循环次数除以线程个数,并向上取整。如在本例中,为了能够遍历到矩阵中所有点,线程块个数被设置为blockspergrid_x = math.ceil(n_col / threadsperblock[0]), blockspergrid_y = math.ceil(n_row / threadsperblock[1]),在本例中也就是(120,68),也就是8160个线程块。但向上取整存在一个问题,通过该设置所生成的线程个数大于本身的循环次数,即(8160*256 > 1920*1080),所以在核函数中一般需要进行判断来防止索引溢出。如本例中的if ridx < n_rows and cidx < n_cols:。上小节将到了线程(块)个数最多可定义为三个维度,但每个维度都有可能向上取整,维度越多造成资源浪费的可能性越大。这也就导致了虽然多维线程(块)个数在编程中更容易理解,但为了减少资源浪费的可能,在cuda编程中更建议用单个维度(一维)来定义线程(块)个数。

上小节说,核函数被cpu调用,被gpu执行。再延伸一点,核函数被gpu执行所调用的数据需存储在gpu显存中而非cpu内存中。所以在gpu编程中,常常需要先将输入数据从cpu内存拷贝到gpu显存中,也需要将gpu输出结果从gpu显存拷贝回cpu内存中。在numba.cuda中,如果发现传入数据在cpu内存上时,在执行核函数前,会自动将所有输入数据从cpu内存拷贝到gpu显存中,并在核函数结束后,将所有输入数据从gpu显存同步回cpu内存。但这样就导致了不必要的拷贝和额外的时间开销,如在本例中matrix_Amatrix_B的是不需要拷贝回cpu内存的。所以在cuda编程中,建议通过cuda.to_device()将输入数据从cpu内存拷贝到gpu显存中,结果并将结果通过arr.copy_to_host()从gpu显存拷贝回到cpu内存中。如果想要避免结果数组从cpu内存拷贝到gpu显存造成的额外开销,也可以通过cuda.device_array()在gpu显存中一个空向量来实现。在本例中,在执行核函数时,手动拷贝数据的执行效率要比自动拷贝数据效率要高的多。

此外,因为gpu的传输带宽较大,一般多次传输小数据的速率要低于单次传输同等数据的速率。所以在cpu与gpu相互拷贝数据时,尽量采用单次传输。但当数据较大时,gpu无法同时计算所有的数据,这时分批拷贝并计算就可以进一步提速cuda核函数。

在下一小节,将通过分批处理来进一步优化矩阵加法,并阐述了gpu编程中流(stream)的概念。

分批处理矩阵:流(stream)

上节讲到,当数据计算量较大时,gpu无法同时计算所有的数据,这时候gpu可以将计算任务分批放在一个队列中,排队顺序执行。这种按照队列顺序流水线处理的操作叫做流(stream)。这一小节依然进行矩阵加法,但将计算量是上一小节的100倍,并采用了流进行分批处理。具体代码如下。

代码:矩阵加法-多流
import time
import random
import math
import numpy as np
from numba import cuda

@cuda.jit
def matrix_add_numba_cuda(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    ridx = idx // n_cols
    cidx = idx %  n_cols

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]

@cuda.jit
def matrix_add_numba_cuda_queue(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    ridx = idx // n_cols
    cidx = idx %  n_cols

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]

@cuda.jit
def matrix_add_numba_cuda_stream(matrix_A, matrix_B, matrix_C, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    ridx = idx // n_cols
    cidx = idx %  n_cols

    if ridx < n_rows and cidx < n_cols:
        matrix_C[ridx][cidx] = matrix_A[ridx][cidx] + matrix_B[ridx][cidx]


if __name__ == "__main__":
    n_row, n_col = 10800, 19200

    arr_A, arr_B, arr_C = np.random.random((n_row, n_col)), np.random.random((n_row, n_col)), np.random.random((n_row, n_col))

    # basic one time
    threadsperblock = 256
    blockspergrid   = math.ceil((n_col*n_row)/threadsperblock)

    elapsed_time = time.time()
    d_arr_A, d_arr_B, d_arr_C = cuda.to_device(arr_A), cuda.to_device(arr_B), cuda.to_device(arr_C)
    matrix_add_numba_cuda[blockspergrid, threadsperblock](d_arr_A, d_arr_B, d_arr_C, n_row, n_col)
    cuda.synchronize()
    arr_result = d_arr_C.copy_to_host()
    cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    print("process time on numba cuda : {:.03f}ms".format(elapsed_time * 1000))

    # queue
    n_streams     = 4
    seg_row       = n_row // n_streams
    blockspergrid = math.ceil((seg_row* n_col) / threadsperblock)

    elapsed_time = time.time()
    for i in range(0, n_streams):
        d_arr_A = cuda.to_device(arr_A[i * seg_row : (i + 1) * seg_row])
        d_arr_B = cuda.to_device(arr_B[i * seg_row : (i + 1) * seg_row])
        cuda.synchronize()
        matrix_add_numba_cuda_queue[blockspergrid, threadsperblock](
                d_arr_A,
                d_arr_B,
                d_arr_C[i * seg_row : (i + 1) * seg_row],
                seg_row,
                n_col)
        cuda.synchronize()
        arr_C[i * seg_row : (i + 1) * seg_row] = d_arr_C[i * seg_row : (i + 1) * seg_row].copy_to_host()
        cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    if (arr_result - arr_C).max() < 1e-8:
        print("process time on numba cuda queue: {:.03f}ms".format(elapsed_time * 1000))

    # stream
    stream_list   = list()
    for i in range (0, n_streams):
        stream = cuda.stream()
        stream_list.append(stream)

    elapsed_time = time.time()
    for i in range(0, n_streams):
        d_arr_A = cuda.to_device(arr_A[i * seg_row : (i + 1) * seg_row], stream=stream_list[i])
        d_arr_B = cuda.to_device(arr_B[i * seg_row : (i + 1) * seg_row], stream=stream_list[i])
        matrix_add_numba_cuda_stream[blockspergrid, threadsperblock, stream_list[i]](
                d_arr_A,
                d_arr_B,
                d_arr_C[i * seg_row : (i + 1) * seg_row],
                seg_row,
                n_col)
        arr_C[i * seg_row : (i + 1) * seg_row] = d_arr_C[i * seg_row : (i + 1) * seg_row].copy_to_host(stream=stream_list[i])
    cuda.synchronize()
    elapsed_time = time.time() - elapsed_time
    if (arr_result - arr_C).max() < 1e-8:
        print("process time on numba cuda stream: {:.03f}ms".format(elapsed_time * 1000))

输出1:

process time on numba cuda : 1180.036ms
process time on numba cuda queue: 1019.919ms
process time on numba cuda stream: 973.997ms

输出2:

process time on numba cuda (1st): 1187.603ms
process time on numba cuda (2nd): 883.670ms
process time on numba cuda queue (1st): 1064.817ms
process time on numba cuda queue (2nd): 932.134ms
process time on numba cuda stream (1st): 959.764ms
process time on numba cuda stream (2nd): 887.833ms
gantt
	title Default vs Queue vs Stream
	dateFormat  ss
    axisFormat  %S
	section Default
	Host2Device      : a1, 0s, 3s
	Kernal           : a2, after a1 , 3s
    Device2Host      : after a2 , 3s

	section Queue
	Host2Device1     : 0s, 1s
	Kernal1          : 1s
    Device2Host1     : 1s
	Host2Device2     : 1s
	Kernal2          : 1s
    Device2Host2     : 1s
	Host2Device3     : 1s
	Kernal3          : 1s
    Device2Host3     : 1s

	section Stream
	Host2Device1     : b1, 0s, 1s
	Kernal1          : b2, after b1 , 1s
    Device2Host1     : after b2 , 1s
	Host2Device2     : after b1,  1s
	Kernal2          : b3, after b2, 1s
    Device2Host2     : after b3 , 1s
	Host2Device3     : after b2,  1s
	Kernal3          : b4, after b3, 1s
    Device2Host3     : after b4 , 1s

由于gpu的硬件特性,cuda中的很多操作是相互独立的,比如核函数的计算,cpu内存与gpu显存间的相互拷贝等。针对这种互相独立的硬件架构,cuda使用多流(multistream)作为一种高并发的方案:把一个大任务拆分开放到多个流中,每次只对一部分数据进行拷贝、计算和回写,并把这个流程做成流水线。这样数据拷贝和核函数计算重叠的时间是重叠的,进而获得性能的提升。如在本例中,通过cuda流处理比简单的队列处理要略快一些。

既然要形成多流的队列,那么队列中的每一步操作都需要知道自己在流。因此在cuda编程中,需要先创建每个流的对象,再把流对象赋值给每一步操作。在numba中,流对象可通过cuda.stream()来创建;执行核函数时,需要与线程块和线程个数一起定义,如kernal[num_block_per_grid, num_thread_per_block, stream]();而在拷贝数据时,也需要将其加入拷贝函数,如cuda.to_device(stream=stream)arr.copy_to_host(stream=stream)。这样,每个流水线可以知道自己所需要执行的步骤了。当不指定具体流对象时,这些操作会在默认的流上执行。

每个流水线是顺序执行的,但非默认的流水线是异步操作的,也就是说先创建的流水线可能后完成。默认流有阻塞的作用。如果调用默认流,那么默认流会等非默认流都执行完才能执行;同样,默认流执行完,才能再次执行其他非默认流。另外,流不宜分配过多,当流过多时,有可能在每个流水线上的核函数计算时间代价会小于单次数据拷贝的时间(如内存到显存或显存到内存)。由于每种操作自身时不独立的,所以其必须等待其他的流完成数据拷贝操作才可以开始自己的数据拷贝操作。这样反而会导致流水线处理的执行效率变低。在实践中,可能需要硬件设备的执行效率和核函数的计算量来微调流的个数,进而达到相对好的表现。

关于矩阵加法的例子先到这里,下面的小节将通过矩阵求和,来阐述并行归约(Reduction)算法及其优化过程中的线程束分化和内存访问。

矩阵求和:并行归约(Reduction)

在之前的矩阵加法中,每个点的计算是相互独立的,各个线程不需要考虑其他线程的计算结果,所以其循环替代和核函数计算相对简单直观。但在很多任务中,循环需要依赖上一步的执行结果,替换此类核函数相对复杂了。本小节将引入矩阵求和,来简单介绍通过并行归约算法解决二元操作问题。其代码如下:

代码:矩阵求和
import time
import random
import math
import numpy as np
from numba import cuda, jit


def matrix_sum_numpy(matrix, n_rows, n_cols):
    return matrix.sum()

def matrix_sum_cpu(matrix, n_rows, n_cols):
    res = 0
    for ridx in range(n_rows):
        for cidx in range(n_cols):
            res += matrix[ridx][cidx]
    return res

@jit
def matrix_sum_numba_jit(matrix, n_rows, n_cols):
    res = 0
    for ridx in range(n_rows):
        for cidx in range(n_cols):
            res += matrix[ridx][cidx]
    return res

@cuda.jit
def matrix_sum_reduce_numba_cuda(matrix, res, max_stride_iter, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    volume = n_rows * n_cols
    
    if idx < volume:
        ridx = idx // n_cols
        cidx = idx %  n_cols

        for s in range(max_stride_iter):
            stride = 2**s
            if (cuda.threadIdx.x % (2 * stride)) == 0:
                idx_s = idx + stride
                if idx_s < volume:
                    ridx_s = idx_s // n_cols
                    cidx_s = idx_s %  n_cols
                    matrix[ridx][cidx] += matrix[ridx_s][cidx_s]
            cuda.syncthreads()

    if cuda.threadIdx.x == 0:
        res[cuda.blockIdx.x] = matrix[ridx][cidx]

@cuda.jit
def matrix_sum_reduce_numba_cuda_v2(matrix, res, max_stride_iter, n_rows, n_cols):
    blockIdx = cuda.blockDim.x * cuda.blockIdx.x
    idx = cuda.threadIdx.x + blockIdx
    volume = n_rows * n_cols
    
    if idx < volume:
        for s in range(max_stride_iter):
            stride = 2**s
            idx =  2 * stride * cuda.threadIdx.x
            if idx < cuda.blockDim.x:
                idx = idx + blockIdx
                idx_s = idx + stride
                if idx < volume and idx_s < volume:
                    ridx = idx // n_cols
                    cidx = idx %  n_cols
                    ridx_s = idx_s // n_cols
                    cidx_s = idx_s %  n_cols
                    matrix[ridx][cidx] += matrix[ridx_s][cidx_s]
            cuda.syncthreads()

    if cuda.threadIdx.x == 0:
        res[cuda.blockIdx.x] = matrix[ridx][cidx]

@cuda.jit
def matrix_sum_reduce_numba_cuda_v3(matrix, res, max_stride_iter, n_rows, n_cols):
    idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
    volume = n_rows * n_cols
    
    if idx < volume:
        ridx = idx // n_cols
        cidx = idx %  n_cols
        for _ in range(max_stride_iter):
            stride = cuda.blockDim.x // 2
            if cuda.threadIdx.x < stride:
                idx_s = idx + stride
                if idx_s < volume:
                    ridx_s = idx_s // n_cols
                    cidx_s = idx_s %  n_cols
                    matrix[ridx][cidx] += matrix[ridx_s][cidx_s]
            cuda.syncthreads()

    if cuda.threadIdx.x == 0:
        res[cuda.blockIdx.x] = matrix[ridx][cidx]

@cuda.reduce
def matrix_sum_reduce_numba_cuda_reduce(a, b):
    return a + b


if __name__ == "__main__":
    n_row, n_col = 1080, 1920
    arr = np.random.random((n_row, n_col))

    elapsed_time = time.time()
    res_numpy = matrix_sum_numpy(arr, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    print("process time on numpy : {:.03f}ms".format(elapsed_time * 1000))

    elapsed_time = time.time()
    res_cpu = matrix_sum_cpu(arr, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_cpu) < 1e-6:
        print("process time on cpu : {:.03f}ms".format(elapsed_time * 1000))

    elapsed_time = time.time()
    res_n_jit = matrix_sum_numba_jit(arr, n_row, n_col)
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_n_jit) < 1e-6:
        print("process time on numba jit : {:.03f}ms".format(elapsed_time * 1000))

    # numba cuda v1, v2, v3 and reduce version
    threadsperblock = 1024
    blockspergrid   = math.ceil((n_col*n_row)/threadsperblock)
    max_stride_iter = math.ceil(math.log2(threadsperblock))
    block_res_n_cuda = np.zeros(blockspergrid)
    d_arr = cuda.to_device(arr)
    d_block_res_n_cuda = cuda.to_device(block_res_n_cuda)
    cuda.synchronize()

    elapsed_time = time.time()
    matrix_sum_reduce_numba_cuda[blockspergrid, threadsperblock](d_arr, d_block_res_n_cuda, max_stride_iter, n_row, n_col)
    cuda.synchronize()
    block_res_n_cuda = d_block_res_n_cuda.copy_to_host()
    cuda.synchronize()
    res_n_cuda = 0
    for res in block_res_n_cuda:
        res_n_cuda += res
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_n_cuda) < 1e-6:
        print("process time on numba cuda: {:.03f}ms".format(elapsed_time * 1000))

    elapsed_time = time.time()
    matrix_sum_reduce_numba_cuda_v2[blockspergrid, threadsperblock](d_arr, d_block_res_n_cuda, max_stride_iter, n_row, n_col)
    cuda.synchronize()
    block_res_n_cuda = d_block_res_n_cuda.copy_to_host()
    cuda.synchronize()
    res_n_cuda = 0
    for res in block_res_n_cuda:
        res_n_cuda += res
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_n_cuda) < 1e-6:
        print("process time on numba cuda v2: {:.03f}ms".format(elapsed_time * 1000))

    elapsed_time = time.time()
    matrix_sum_reduce_numba_cuda_v3[blockspergrid, threadsperblock](d_arr, d_block_res_n_cuda, max_stride_iter, n_row, n_col)
    cuda.synchronize()
    block_res_n_cuda = d_block_res_n_cuda.copy_to_host()
    cuda.synchronize()
    res_n_cuda = 0
    for res in block_res_n_cuda:
        res_n_cuda += res
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_n_cuda) < 1e-6:
        print("process time on numba cuda v3: {:.03f}ms".format(elapsed_time * 1000))

    arr_1d = arr.reshape(-1)
    d_arr_1d = cuda.to_device(arr_1d)
    elapsed_time = time.time()
    res_n_cuda_reduce = matrix_sum_reduce_numba_cuda_reduce(d_arr_1d)
    elapsed_time = time.time() - elapsed_time
    if (res_numpy - res_n_cuda_reduce) < 1e-6:
        print("process time on numba reduce: {:.03f}ms".format(elapsed_time * 1000))

输出1:

process time on numpy : 0.820ms
process time on cpu : 566.166ms
process time on numba jit : 187.324ms
process time on numba cuda: 208.259ms
process time on numba cuda v2: 137.212ms
process time on numba cuda v3: 120.333ms
process time on numba reduce: 376.588ms

输出2:

process time on numpy : 0.871ms
process time on cpu : 567.084ms
process time on numba jit (1st): 236.218ms
process time on numba jit (2nd): 2.202ms
process time on numba cuda (1st): 252.695ms
process time on numba cuda (2nd): 1.662ms
process time on numba cuda v2 (1st): 158.547ms
process time on numba cuda v2 (2nd): 1.468ms
process time on numba cuda v3 (1st): 130.770ms
process time on numba cuda v3 (2nd): 1.101ms
process time on numba reduce (1st): 434.946ms
process time on numba reduce (2nd): 0.686ms

并行归约(Reduction)是一种基础的并行算法。该算法常处理如下问题:假设有N个输入数据,使用一个符合结合律的二元操作符作用其上,最终生成1个结果。这个二元操作符可以是求和、取最大、取最小、平方、逻辑与或等等。本例以加法为例。这种问题的最基本的解决思路就是串行遍历,如示例代码中的matrix_sum_cpu函数。当将问题转化为并行计算时,由于加法的交换律和结合律,矩阵可以以任意顺序求和,其解决思路可以为:首先把输入数组划分为更小的数据块,之后用一个线程计算一个数据块的部分和,最后把所有部分和再求和得出最终结果。这种思路就被称为并行归约。相较于串行计算,并行归约的时间复杂度由O(N)变为O(logN)。由于并行归约非常经典,又在cuda编程中常常使用,numba.cuda本身就通过cuda.reduce实现了为1维数组的并行归约算法,其相关调用可参考本例中的matrix_sum_reduce_numba_cuda_reduce函数。

在本例中,并行归约在numba.cuda上的具体实现可参考matrix_sum_reduce_numba_cuda函数。具体而言,核函数循环stride,stride的上限为线程块中的线程个数,每次循环stride*2。当线程所索引到的数据对步长取余为0时,将线程所索引到的数据和与该数据距离stride的数据进行求和。最后返回该线程块的求和结果,并在cpu为所有线程块的结果求和。之所以只求得线程块的和,是因为核函数只能在每一个线程块中实现同步线程进度。具体而言,在并行归约中,循环中的每一步都需要上一步全部完成才能得到正确结果。因此如果想实现全局求和,需要同步每一个线程块的每一个线程来确认都已完成该步骤。而基于上面小节得知,不同的线程块的开始和结束是不一致的,所以在cuda编程中核函数无法同步不同线程块,甚至需要分批计算。假如某核函数需要10个线程块,但gpu只能并行计算5个线程块,这10个线程块将分成两批进行计算,也就无法为这10个线程块的所有线程达到同一时间的同步。因此核函数只能在每一个线程块中实现同步,无法跨线程块同步线程进度。在numba.cuda中,线程块内线程进度的同步通过cuda.syncthreads()来完成。

在本例的matrix_sum_reduce_numba_cuda函数中,每次循环只有线程索引为是stride的倍数的线程在进行执行,其他线程则保持静默。这样就导致在每一个线程束中只有稀疏的线程在执行,这种情况被称为线程束分化。但由于硬件设计,调度会以一整个线程束为单位进行,所以影响了程序的效率。线程束分化可以通过重新组织线程索引来解决。如本例matrix_sum_reduce_numba_cuda_v2函数所示,通过重新组织线程索引,可以保证在每一个block中一部分线程束的所有线程都在活跃,而另一部分线程束的所有线程都在静默。如在第一轮迭代,前16个线程束执行计算,后16个线程束什么都不做。通过简单整理线程索引,可以提高程序效率,如本例v2比v1执行速度提升了1.5倍。

此外,对全局内存的访问尽量进行合并访问与存储,能够达到尽量最大的带宽,也能够提升程序效率。比如在本例matrix_sum_reduce_numba_cuda_v2函数,每一循环所访问的数据索引不仅由线程索引决定,也由循环的跨度决定,即idx = 2 * stride * cuda.threadIdx.x。这就导致每次循环依据索引所访问到的数据并不连续,其跨度为stride。为了缓解这种现象,在并行归约中可以重新组织配对方法,进而让对内存的访问更加集中。如本例中matrix_sum_reduce_numba_cuda_v3所示,通过交错配对的方法,使得结果每次循环中只访问和存储到统一地址,进而提升了访问效率。如本例v3比v2执行速度提升了1.14倍。

(未完待续)

参考

在撰写本文时,大量的参考了官方文档和其他博客,收益良多。观点表述如有雷同,可视为出自原作者。相关参考链接如下:

眼过千遍,不如手过一遍。希望在阅读本文之后能够自己复现出来上述cuda代码。共勉。


分享这篇文章:

上一篇
通过NPP加速TensorRT部署时图片数据预处理