xxhash

xxHash: 快速的哈希函数

安装

meson 的 wrapDB 中有,所以只需要 meson wrap install xxhash

编译时使用 XXH_INLINE_ALL 选项提高性能,对于 meson 来说需要写 xxhash_dep = dependency('libxxhash', default_options: ['inline-all=true'])

基本使用

最快的哈希函数是 XXH3_64bits

   #include "xxhash.h"

   static inline
   XXH64_hash_t
   hash_func(int* key) {
       return XXH3_64bits(key, 2*sizeof(int));	
   }

klib

klib: 轻量级 c 头文件库

khash.h

快速轻量的哈希表库

   #include "khash.h"

   typedef int kh_myname_key_t;
   typedef int kh_myname_value_t;

   kh_int_t 
   kh_myname_hash_func(kh_myname_key_t key){
       return HASH(key);
   }

   int 
   kh_myname_hash_equal(kh_myname_key_t key_1, kh_myname_key_t key_2){
       return key_1 == key_2;
   }

   KHASH_INIT(myname, kh_myname_key_t, kh_myname_value_t, 1, kh_myname_hash_func, kh_myname_hash_equal)

   int 
   main(){

       khiter_t k;	
       khash_t(myname) *h = kh_init(myname);

       // 存入数据
       for (int i=0; i<10; ++i){
           kh_myname_key_t* key = calloc(1, sizeof(kh_myname_key_t));			
           *key = i;

           int ret;
           k = kh_put(myname, h, key, &ret);			
           kh_value(h, k) = i + 3;			
       }

       // 读取,修改,删除
       for (int i=0; i<10; ++i){
           int key = i;
           k = kh_get(myname, h, key);			

           if (!kh_exist(h,k) || k== kh_end(h))
               printf("not exits!\n");
           if (kh_value(h, k) != i+j)			
               printf("wrong value!\n");

           kh_del(myname, h, k);			
           free(kh_key(h, k));			
       }

       kh_destroy(myname, h);	

       return 0;
   }

cython

Cython

基本概念

文件

  1. .pyxcython 语法的源文件
  2. .pyd (windows) 是 windows 平台上编译后的文件,对应 linux 平台的 *.so
  3. .pyd (源文件) 是外部声明,即对 .h 头文件的重新封装

  1. Cython.Build python 端调用,用于编译
  2. cython.parallel cython 端调用,并行 prange

所有 c 库在 Cython/Includes

  1. libc cython 端调用, c 标准库
  2. libcpp cython 端调用, c++ stl 库
  3. numpy cython 端调用,numpy 支持
  4. posix cython 端调用,posix 库
  5. openmp cython 端调用,并行
  6. cpython cython 端调用, python 的 c 接口
  7. cython cython 端调用,控制特殊行为

语法

    from libc.math cimport log2 # 引入 c 函数
    cdef int x # 声明变量类型
    cdef int [:,:] x # 声明变量是 numpy 的 2d 向量
    ctypedef struct queue: # 声明类型
        pass

    ctypedef fused my_type: # 泛型
        int
        double
        long long

    # 类的定义,注意使用 __cinit__
    cdef class A:
        cdef int a
        def __cinit__(self):
            self.a = 1

    <void*>value # 类型转换成指针

    # 定义 c 函数,异常时返回 -1
    cdef int func(int x) execpt? -1:
        pass

文件注解

  1. # distutils: language=c++ 使用 c++ 编译
  2. # distutils: sources = c-algorithms/src/queue.c 指定静态链接的源文件
  3. # distutils: include_dirs = c-algorithms/src/ 指定头文件目录
  4. # distutils: extra_compile_args=-fopenmp 指定编译器参数
  5. # distutils: extra_link_args=-fopenmp 指定链接参数
  6. # cython: infer_types=True 自动推断类型,只作用于一级缩进的变量
  7. # cython: profile=True 开启 profile
  8. # cython: linetrace=True 开启 profile 行追踪

函数注解

  1. @cython.boundscheck(False) 关闭数组边界检查
  2. @cython.wraparound(False) 关闭负指标

编译

基本结构

setup.py

    from setuptools import setup
    from Cython.Build import cythonize

    setup(
        ext_modules = cythonize("helloworld.pyx")
    )

helloworld.pyx

    print("Hello World")

执行命令

    python setup.py build_ext --inplace

这编译得到的 helloworld.sohelloworld.pyd

动态链接

参考

需要指明 libraries 参数

    from setuptools import Extension, setup
    from Cython.Build import cythonize

    ext_modules = [
        Extension("demo",
                  sources=["demo.pyx"],
                  libraries=["m"]  # Unix-like specific
                  )
    ]

    setup(name="Demos",
          ext_modules=cythonize(ext_modules))

静态链接

使用注解

    # distutils: sources = c-algorithms/src/queue.c
    # distutils: include_dirs = c-algorithms/src/

    pass

使用 c 库

外部声明

使用头文件, 外部声明可以放在 .pyd 文件中

    cdef extern from "math.h":
        double sin(double x)

异常处理

  1. except? -1 语法表示当发生任何错误时,返回 -1
  2. execpt * 表示返回时调用 PyErr_Occurred() ,当函数返回 void 并且需要传递错误时使用这个
    cdef int pop(self) except? -1:
        if cqueue.queue_is_empty(self._c_queue):
            raise IndexError("Queue is empty")
        return <Py_ssize_t>cqueue.queue_pop_head(self._c_queue)

    cdef int spam() except *:
        ...

使用 numpy

numpy 的 ndarray 在 cython 中用做 typed memoryview

基本例子

    import numpy as np

    DTYPE = np.intc


    cdef int clip(int a, int min_value, int max_value):
        return min(max(a, min_value), max_value)


    def compute(int[:, :] array_1, int[:, :] array_2, int a, int b, int c):

        cdef Py_ssize_t x_max = array_1.shape[0]
        cdef Py_ssize_t y_max = array_1.shape[1]
        assert tuple(array_1.shape) == tuple(array_2.shape)

        result = np.zeros((x_max, y_max), dtype=DTYPE)
        cdef int[:, :] result_view = result

        cdef int tmp
        cdef Py_ssize_t x, y

        for x in range(x_max):
            for y in range(y_max):

                tmp = clip(array_1[x, y], 2, 10)
                tmp = tmp * a + array_2[x, y] * b
                result_view[x, y] = tmp + c

        return result

并行

  1. 使用 prange
  2. 使用 nogil

          from cython.parallel import prange
          cdef int func(int x) nogil:
              cdef int i
              cdef int y
              for i in prange(x, nogil=True):
                  y += i 
              return y
    

MPI

MPI: Message Passing Interface

MPI 是消息传递接口标准,主要的实现有 Intel MPI, OpenMPI 和 MPICH 等. MPI 标准目前有三版 MPI-1,MPI-2, MPI-3, 标准支持的语言是 c 和 fortran, c++ 支持在 MPI-3 中移除了。

MPI 的优势:

  • 标准:所有超级计算机都支持
  • 可移植:无需修改程序就能在所有支持 MPI 的平台上运行
  • 高性能:所有实现都会根据机器做优化
  • 功能齐全:MPI-3 中有超过 430 个函数,尽管大多数程序可能就用几十个函数
  • 可用:相当多的实现都可用,包括供应商提供的和开放获取的

基本结构

程序结构

所有 MPI 程序都分以下几步

  1. 引用头文件 "mpi.h"
  2. 执行顺序代码
  3. 初始化 MPI 环境
  4. 执行并行任务
  5. 终止 MPI 环境
  6. 执行顺序代码
  7. 程序结束

接口约定

所有 MPI 的函数都以 MPI_ 开头,并且下划线后的第一个字母大写,函数返回消息码 rc , 如果成功就是 MPI_SUCCESS, 即 0

基本概念

  1. communicator: 通信器,定义哪些进程之间需要通信
    1. MPI_COMM_WORLD 通信器表示所有进程都相互通信
  2. group: 进程的分组
  3. rank: 秩,指的是一个通信器中每个进程都有自己单独的编号,有时候也叫做 任务 ID ,秩是从 0 开始连续的整数
  4. 错误处理: 尽管大多数函数都返回一个错误码,但是 MPI 标准指出,当错误发生时程序应立刻终止,所以并不需要自己捕获和处理所有错误码

MPI 版本

MPI-1 (1994) :最早的版本,构建起了 MPI 的基本框架,包括

  1. 基本的环境管理
  2. 点到点通信
  3. 集体通信
  4. 组和通信器
  5. 虚拟拓扑

MPI-2 (1998) : 为 MPI-1 添加大量新函数

  1. 动态进程:可以在任务启动后创建新的进程
  2. 单边通信:提供单向通信功能,包括共享内存和远程累加操作
  3. 扩展集体通信:允许通信器间的集体通信
  4. 扩展接口:允许在 MPI 之上构造开发曾,包括 debugger 和 profiler 等
  5. 额外语言绑定:支持 fortran 90
  6. 并行 I/O

MPI-3 (2012) :对 MPI-1 和 MPI-2 更多的扩展

  1. 非阻塞集体操作
  2. 新的单边通信操作
  3. 近邻集体:扩展更多虚拟拓扑结构
  4. 支持 fortran 2008
  5. MPIT Tool Interface 允许 MPI 实现开放一些内部变量给用户,用于性能提升
  6. Matched Probe: 改进多线程中的 probe 功能

MPI-4 (2021) : TODO

  1. 增加大 count 参数支持
  2. 持久集体
  3. 部分通信
  4. 新的初始化方式
  5. 改进信息和错误处理

环境管理函数

  1. MPI_Init (&argc,&argv)
    1. 初始化 MPI 环境,应在所有 MPI 函数之前调用,并且只能调用一次,这个函数具体做的事情是实现依赖的
  2. MPI_Comm_size (comm,&size)
    1. 获得通信器 comm 中 MPI 进程的总数
    2. 如果通信器是 MPI_COMM_WORLD 那么就会得到程序的总进程数
  3. MPI_Comm_rank (comm,&rank)
    1. 获得调用这个函数的进程在通信器 comm 中的秩,
  4. MPI_Abort (comm,errorcode)
    1. 终止通信器 comm 中的所有进程
  5. MPI_Get_processor_name (&name,&resultlength)
    1. 获得进程的名字和名字的长度
    2. 名字的缓冲区大小至少是 MPI_MAX_PROCESSOR_NAMEsizeof(char)
    3. 具体返回的名字是什么是实现依赖的
  6. MPI_Get_version (&version,&subversion)
    1. 获得 MPI 标准的版本号和子版本号
  7. MPI_Initialized (&flag)
    1. 测试是否已经调用过 MPI_Init()
  8. MPI_Wtime ()
    1. 返回该进程所处的时间,以秒为单位
    2. 不同进程的时间可能不一样,但大多数实现会同步这个时间
  9. MPI_Wtick ()
    1. 返回 MPI_Wtime() 的精度
  10. MPI_Finalize ()
    1. 终止 MPI 环境,每个 MPI 程序只能调用一次
   #include "mpi.h"

   int main(int argc, char *argv[])
   {
       MPI_Init(&argc, &argv);

       int num_tasks;
       MPI_Comm_size(MPI_COMM_WORLD, &num_tasks);

       int rank;
       MPI_Comm_rank(MPI_COMM_WORLD, &rank);

       char hostname[MPI_MAX_PROCESSOR_NAME];
       int name_len;
       MPI_Get_processor_name(hostname, &name_len);

       MPI_Finalize();
   }

点到点通信

一般参数说明

类型 举例的函数
Blocking sends MPISend(buffer,count,type,dest,tag,comm)
Non-blocking sends MPIIsend(buffer,count,type,dest,tag,comm,request)
Blocking receive MPIRecv(buffer,count,type,source,tag,comm,status)
Non-blocking receive MPIIrecv(buffer,count,type,source,tag,comm,request)

参数说明:

  1. buffer: 要被收发的内容,内存地址,也就是指针
  2. count: 要收发的数据大小
  3. type: 数据类型,全大写的一个量 MPI_CHAR 等,也可以自定义
  4. destination: 发送的目标进程的 rank
  5. source: 接受的源进程的 rank,
    1. 如果是 MPI_ANY_SOURCE 那么就从所有进程接收
  6. tag: 一个任意的非负整数参数,用来标记不同的信息,接收和发送两方的 tag 要相同,
    1. MPI_ANY_TAG 会接收所有 tag
    2. MPI 标准指出 tag 应该在 0-32767, 不过大多数实现都支持更大的数
  7. comm: 通信器
  8. status: 是收到的消息的属性,类型是 MPI_Status
    1. MPI_Get_count() 可以获取收到的消息的大小
  9. request: 非阻塞收发的回调,类型是 MPI_Request

常用阻塞点到点通信函数

  1. MPI_Send() 基本的发送消息, 只有当发送信道再次可用时才返回
  2. MPI_Recv() 基本的接收消息,阻塞直到接收信道再次可用
  3. MPI_Ssend() 同步阻塞发送,阻塞直到发送信道可用并且接收方开始接收这个消息
  4. MPI_Sendrecv() 发送消息并发送回执,阻塞直到发送信道可用,并且接收方已经收到消息
  5. MPI_Wait()
  6. MPI_Waitany()
  7. MPI_Waitall()
  8. MPI_Waitsome()
    1. 阻塞直到某个非阻塞的收发完成
  9. MPI_Probe() 进行阻塞测试
  10. MPI_Get_count() 返回收到的消息的源、标签和大小
    1. 输出类型是 MPI_SOURCE

常用非阻塞点到点通信函数

  1. MPI_Isend() 非阻塞发送,所指定的发送 buffer 应该在 MPI_Probe() 确定已经发送完成之前不修改
  2. MPI_Irecv() 非阻塞接收,同样不应该在确定接收完成前修改 buffer
  3. MPI_Issend() 非阻塞同步发送,类似 MPI_Isend() 只是 MPI_Probe() 会确定接收方已经收到消息
  4. MPI_Test()
  5. MPI_Testany()
  6. MPI_Testall()
  7. MPI_Testsome()
    1. 检查非阻塞收发操作的状态,输出值 flag0 表示没完成, 1 表示已完成
  8. MPI_Iprobe() 进行非阻塞测试, 如果消息已经到了 flag1

集体通信

nil

集体通信操作的类型有

  • 同步 阻塞直到所有进程都到达同步点
  • 数据移动 broadcast, scatter, gather, all to all
  • 集体计算(归约) min, max, add, multiply 等

常用集体通信函数

  1. MPI_Barrier (comm) 同步所有进程
  2. MPI_Bcast (&buffer,count,datatype,root,comm) 将数据从 root 进程广播到所有进程
  3. MPI_Scatter (&sendbuf,sendcnt,sendtype,&recvbuf,recvcnt,recvtype,root,comm) 将数据分散到所有进程
  4. MPI_Gather (&sendbuf,sendcnt,sendtype,&recvbuf,recvcount,recvtype,root,comm) 从所有进程获取数据到 root
  5. MPI_Allgather (&sendbuf,sendcount,sendtype,&recvbuf,recvcount,recvtype,comm) 所有进程都获得总的数据
  6. MPI_Reduce (&sendbuf,&recvbuf,count,datatype,op,root,comm) 执行一个归约操作 op 到进程 root
  7. MPI_Allreduce (&sendbuf,&recvbuf,count,datatype,op,comm) 执行归约操作到所有进程
  8. MPI_Reduce_scatter (&sendbuf,&recvbuf,recvcount,datatype,op,comm) 先 reduce 再 scatter
  9. MPI_Alltoall (&sendbuf,sendcount,sendtype,&recvbuf,recvcnt,recvtype,comm) 每个进程都执行 scatter
  10. MPI_Scan (&sendbuf,&recvbuf,count,datatype,op,comm) 扫描操作

自定义数据类型

用户定义的数据类型,称作 derived data types

  1. MPI_Type_contiguous (count,oldtype,&newtype) 创建一个新的数据类型,是 count 个旧类型数据的组合
  2. MPI_Type_vector (count,blocklength,stride,oldtype,&newtype)MPI_Type_contiguous 相同,只不过每个数据间可以有间隔
  3. MPI_Type_indexed (count,blocklens[],offsets[],old_type,&newtype)
  4. MPI_Type_create_struct
  5. MPI_Type_get_extent
  6. MPI_Type_commit (&datatype) 向系统提交类型
  7. MPI_Type_free (&datatype) 释放指定类型的对象

自定义结构体

MPI_Type_create_struct(count, array_of_blocklengths, array_of_displacements, array_of_types, newtype)

  1. count 结构体中的元素数,也是下面三个数组的长度
  2. array_of_blocklengths 每个块中的元素个数
  3. array_of_displacements 每个块的偏移量
  4. array_of_types 每个块的类型
  5. newtype 输出
    struct object {
        char c;
        double x[2];
        int i;
    };

    MPI_Datatype newtype;
    int object_len = 3;
    int blocklengths[3];
    MPI_Datatype types[3];
    MPI_Aint displacements[3];

    MPI_Aint current_displacement = 0;

    blocklength[0] = 1;
    types[0] = MPI_CHAR;
    displacements[0] = offsetof(struct object, c);

    blocklength[1] = 2;
    types[1] = MPI_DOUBLE;
    displacements[1] = offsetof(struct object, x);

    blocklength[2] = 1;
    types[2] = MPI_INT;
    displacements[2] = offsetof(struct object, i);

    MPI_Type_create_struct(object_len, blocklengths, displacements, types, &newtype);
    MPI_Type_commit(&newtype);

    struct object o;

    if (proc_id == 0){
        MPI_Send(&o, 1, newtype, 1, 0, comm);
    }else if (proc_id == 1){
        MPI_Recv(&o, 1, newtype, 0, 0, comm, MPI_STATUS_IGNORE);
    }

    MPI_Type_free(&newtype);

组和通信器管理

group 组是一些进程的有序集合,每个进程编号从 0 到 N-1, communicator 通信器是一组需要相互通信的进程,每个组都有一个通信器。从编程的角度看,组是用来指定哪些进程相互通信,进而创建通信器的。

典型的过程:

  1. 使用 MPI_Comm_group()MPI_COMM_WORLD 中取出全局组的句柄
  2. 使用 MPI_Group_incl() 从全局组的子集中创建新的组
  3. 使用 MPI_Comm_create() 为新的组创建通信器
  4. 使用 MPI_Comm_rank() 确定新的通信器中的 rank
  5. 进行通信
  6. 使用 MPI_Comm_free()MPI_Group_free() 释放不用的组和通信器对象

基本通信器

  • MPI_COMM_WORLD 全部的进程
  • MPI_COMM_SELF 只包含当前进程
  • MPI_COMM_NULL 无效通信器,用于一些函数的错误码

通信器的类型是 MPI_Comm

复制通信器

复制一个一样的通信器,主要用于库函数编写时防止修改全局参数

  1. MPI_Comm_dup() 复制一个通信器
  2. MPI_Comm_idup() 复制通信器,非阻塞
  3. MPI_Comm_dup_with_info() 复制通信器并传递 info
  4. MPI_Comm_idup_with_info() 复制通信器并传递 info, 非阻塞

划分通信器

使用 MPI_Comm_split(comm, color, key, newcomm)comm 中有相同 color 的进程组成一个新的通信器 newcomm, 新通信器中的 rank 由 key 指定

注意

  1. 代码会在所有进程中执行,所以只要进程中生成自己的 colorkey ,返回的通信器就包括这些进程
  2. 一般新通信器中的 rank 跟全局的保存一致就行了, MPI_Comm_rank(MPI_COMM_WORLD, &my_key)
  3. MPI_Comm_spawn()
  4. MPI_Comm_free()

从组创建通信器

组的类型是 MPI_Group

  1. MPI_Comm_group()
  2. MPI_Group_incl(group, n, ranks, newgroup) 将组中 ranks 的进程添加到新组
  3. MPI_Group_excl(group, n, ranks, newgroup) 除了 ranks 以外的进程添加到新组
  4. MPI_Group_difference()
  5. MPI_Group_union()
  6. MPI_Group_intersection()
  7. MPI_Group_difference()
  8. MPI_Group_size()
  9. MPI_Group_rank()
  10. MPI_Comm_create_group()
  11. MPI_Comm_create(comm, group, newcomm) 从组创建通信器

通信器间的通信器

MPI_Intercomm_create(local_comm, local_leader, peer_comm, remote_leader, tag, newintercomm)

虚拟拓扑

MPI 的术语 virtual topologies 指的是一些进程相互连接构成一个几何形状,这个网络拓扑是虚拟的,与物理实际的连接无关。

当特殊的通信模式与一个拓扑结果相匹配时 MPI 虚拟拓扑就很有用了。

单边通信

基本概念

  • Remote Memory Access(RMA), Remote Direct Momory Access (RDMA), 远程内存访问
    • 指的是两个进程 origin 和 target, origin 发起动作 put/get, 访问 target 的内存
    • 之所以叫单边通信,是因为 target 进程完全不知道发生了什么
  • window
    • 单边通信只能访问 target 进程所指定的一块内存,称为 window
  • distributed shared memory, virtual shared memory
    • 是除了 window 之外的另一种实现远程内存访问的方法
    • 只有所谓的 Partitioned Global Address Space(PGAS)语言支持,例如 Unified Parallel C (UPC)
  • active RMA 和 passive RMA
    • active RMA 也叫做 active target synchronization, target 进程设置一个时间周期(epoch),在这其中 window 可以被访问,类似于同步数据转移
    • passive RMA 也叫做 passive target synchronization, target 进程不对 window 的访问做限制,这种方式很高效,但是非常难 debug 并且很容易死锁

窗口

用于单边通信的内存区域就是窗口,变量类型为 MPI_Win, 进程可以向其中存取任何东西

说明

  1. 窗口是定义在通信器上的,创建窗口操作是集体通信,即窗口的不同部分被不同的进程所拥有
  2. 每个进程的窗口大小独立设置,可以设为 0
  3. 窗口的内存分配和释放要显式进行

常用函数

  1. MPI_Win_allocate() 分配窗口的内存
  2. MPI_Win_free() 释放内存
  3. MPI_Win_create() 基于指定的 buffer 创建窗口
  4. MPI_Win_allocate_shared() 在共享内存的通信器上创建窗口
  5. MPI_Win_create_dynamic() 创建窗口,但不分配内存
    MPI_Info info;
    MPI_Win window;
    MPI_Win_allocate(size, disp_unit, info, comm, &memory, &window);

    /* 进行操作 */

    MPI_Win_free(&window);

动态分配内存

  1. MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win *win)
    1. 创建窗口,可以向其中动态的加入内存
  2. int MPI_Win_free(MPI_Win *win)
    1. 释放窗口的内存
  3. int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr)
    1. 分配内存
    2. size 是内存大小
    3. baseptr 是输出的指针
    4. info 可以是 MPI_INFO_NULL
  4. int MPI_Free_mem(void *base)
    1. 释放分配的内存
  5. MPI_Win_attach(MPI_Win win, void *base, MPI_Aint size)
    1. 向窗口中添加内存
    2. win 必须是 MPI_Win_create_dynamic() 创建的窗口
    3. 窗口中的内存就是这里指针的内存,没有复制
  6. MPI_Win_detach(MPI_Win win, void *base)
    1. 从窗口中去掉添加的内存
  7. int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win)
    1. 对窗口整体加锁
    2. lock_typeMPI_LOCK_EXCLUSIVEMPI_LOCK_SHARED
    3. rank 是获得锁的进程
    4. assert 是用于做优化的参数,不需要就设 assert=0
  8. int MPI_Win_unlock(int rank, MPI_Win win)
    1. 解锁
  9. int MPI_Get_address(const void *location, MPI_Aint *address)
    1. 获得调用内存中位置的地址
    2. 应该用这个函数获得的 MPI_Alloc_mem() 分配的地址作为值广播给其它需要用到这个值的进程
    3. 对于在窗口中的指针,这里获得的地址就是偏移量
    4. 可以把偏移量看做是窗口中的指针
  10. MPI_Put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
    1. origin_addr 复制内存到 target_rank 进程的从 win 窗口的偏移 target_disp 开始的地址
  11. MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
    1. target_rank 进程的 win 窗口偏移 target_disp 复制内存到 origin_addr
  12. int MPI_Compare_and_swap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Win win)
    1. 比较并交换,原子操作
    2. 比较当前内存中的旧值 *result_addr 和之前读取到的旧值 *compare_addr ,如果一样,则说明中间内存未被修改过,那么就用新值 *origin_addr 替换旧值 *result_addr
    3. 被修改的值属于 target_rank 进程
    4. target_disp 是从窗口的起点到 target_rank 进程中被替换的内存 result_addr 的起点之间的偏移量
  13. int MPI_Win_fence(int assert, MPI_Win win)
    1. 同步窗口在所有进程中

参考:

  1. 这里实现了一个链表 https://svn.mcs.anl.gov/repos/mpi/mpich2/trunk/test/mpi/rma/linked_list.c
  2. 指针的内存要在进程中自己释放,窗口中实际没有保存数据

例子

    /* 创建窗口 */
    MPI_Win win;
    MPI_Win_create_dynamic(MPI_INFO_NULL, MPI_COMM_WORLD, &win);

    /* 分配内存,并把它附加到窗口中,同时获得窗口中对应的偏移量,也就是窗口中的指针 */
    int* i_ptr;
    MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &i_ptr);
    MPI_Win_attach(win, i_ptr, sizeof(int));
    MPI_Aint i_disp;
    MPI_Get_address(i_ptr, &i_disp);

    *i_ptr = proc_rank;
    printf("%d, i_ptr=%p, %d, i_disp=%ld\n",proc_rank, i_ptr, *i_ptr, i_disp);

    printf("i ptr dead\n");

    /* 读数据 */
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    int get = 0;
    MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    printf("%d: get= %d\n", proc_rank, get);
    MPI_Win_unlock(proc_rank, win);

    /* 写数据 */
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    int put = proc_rank+10;
    MPI_Put(&put, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    MPI_Win_unlock(proc_rank, win);

    /* 同步 */
    MPI_Win_fence(0, win);

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    get = 0;
    MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    printf("%d: get again= %d\n", proc_rank, get);
    MPI_Win_unlock(proc_rank, win);

    /* 释放内存时,要用本地指针释放 */
    MPI_Free_mem(i_ptr);
    MPI_Win_free(&win);

共享内存

  1. int MPI_Comm_split_type(MPI_Comm comm, int split_type, int key, MPI_Info info, MPI_Comm *newcomm)
    1. 获得同一节点上的进程的通信器
    2. split_type = MPI_COMM_TYPE_SHARED
    3. comm = MPI_COMM_WORLD
    4. key = proc_rank_world
  2. int MPI_Win_allocate_shared (MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win)
    1. 创建共享内存数据
    2. size 是数据的比特大小,即 个数 * sizeof(type)
    3. disp_unit 是单个数据的比特大小,即 sizeof(type)
    4. baseptr 是本地数据的指针
    5. 这个内存会在 MPI_Win_free() 时释放掉
  3. int MPI_Win_shared_query (MPI_Win win, int rank, MPI_Aint *size, int *disp_unit, void *baseptr)
    1. 获得属于 rank 进程的内存在当前进程中的地址
    2. size win 大小
    3. disp_unit 单位数据大小
    4. baseptr 当前进程中用来访问这个内存的指针
    int proc_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);

    /* 获得同一节点上的通信器,以及对应的 rank */
    MPI_Comm comm_node;
    int proc_rank_node;
    MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
    MPI_Comm_rank(comm_node, &proc_rank_node);

    /* 获得不同节点上编号为 0 的进程之间的通信器 */
    MPI_Comm comm_0;
    int proc_rank_0;
    MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
    MPI_Comm_rank(comm_0, proc_rank_0);

    /* 分配内存, 只有编号为 0 的进程才分配,其它进程的大小是 0 */
    int* data;
    MPI_Win data_win;
    int win_size;
    if (proc_rank == 0){
        win_size = 1;
    }else {
        win_size = 0;
    }
    MPI_Win_allocate_shared(win_size, sizeof(int), MPI_INFO_NULL, comm_node, &data, &data_win);

    /* 在全局编号 0 的进程上创建数据 */
    if (proc_rank == 0){
        *data = 1;
    }

    MPI_Barrier(comm_node);
    MPI_Win_fence(0, data_win);

    /* 分享数据给所有节点上编号 0 的进程 */
    MPI_Bcast(&data, 1, MPI_INT, 0, comm_0);

    /* 其它进程获得节点编号 0 进程上的指针 */
    MPI_Aint size;
    int disp_uint;
    MPI_Win_shared_query(data_win, 0, &size, &disp_unit, &data_0);

    /* 释放内存 */
    MPI_Win_free(&data_win);

混合多线程

混合多线程的策略:

  1. 纯 MPI
  2. 每个节点一个 MPI 进程,全部的线程
  3. 每个 socket 一个 MPI 进程,和剩下的线程,比如20核的节点,每个节点两个进程,每个进程10线程

大量实践认为混合多线程并不会比纯 MPI 速度快。

线程初始化

使用 MPI_Init_thread() 初始化带有线程的 MPI,可用的选项有(注意并不是所有MPI实现都提供下面全部的模式)

  1. MPI_THREAD_SINGLE 每个进程一个线程
  2. MPI_THREAD_FUNNELED 每个进程有多个线程,但只有主线程可以调用 MPI 函数
  3. MPI_THREAD_SERIALIZED 使用多个线程,每个线程都可以调用 MPI,但是只有一个线程可以调用同步通信
  4. MPI_THREAD_MULTIPLE 使用多个线程,没有任何限制

使用 MPI_Query_thread() 来确定初始化之后提供的是哪种模式

使用 MPI_Is_thread_main() 来确定线程是否是主线程

设计模式

为每个节点创建一个通信器

用于创建节点内部的共享内存窗口

   /* 获得同一节点上的通信器,以及对应的 rank */
   MPI_Comm comm_node;
   int proc_rank_node;
   MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
   MPI_Comm_rank(comm_node, &proc_rank_node);

   /* 获得不同节点上编号为 0 的进程之间的通信器 */
   MPI_Comm comm_0;
   int proc_rank_0;
   MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
   MPI_Comm_rank(comm_0, &proc_rank_0);

   MPI_Comm_free(&comm_node);
   MPI_Comm_free(&comm_0);

非自旋等待

  1. 阻塞通信是自旋锁,会一直燃烧 CPU, 为了给某个进程上的多线程任务让出 CPU,需要让其它进程非自旋等待
  2. 使用 MPI_Iprobe() 非阻塞地探测信号,并在探测周期中 sleep()
   int proc_rank;
   MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);

   int finish = false;
   MPI_Request finish_request;

   if (proc_rank == 0) {

       /* do sth pthread or openmp */

       finish = true;
       int proc_total = 0;
       MPI_Comm_size(MPI_COMM_WORLD, &proc_total);
       for(int i=1; i< proc_total; ++i){
           MPI_Isend(&finish, 1, MPI_INT, i, 0, MPI_COMM_WORLD, &finish_request);
           MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
       }

   }else{
       int flag = false;
       MPI_Status finish_status;

       while(!flag){
           MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &finish_status);
           if (flag){
               MPI_Irecv(&finish, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &finish_request);
               MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
               break;
           }
           sleep(1);
       }
   }

模拟单线程

随便写一个变量,按照顺序一个接一个的发送下去

   from mpi4py import MPI

   comm = MPI.COMM_WORLD
   proc_rank = comm.Get_rank()
   proc_num = comm.Get_size()

   myturn = 0

   if proc_rank == 0:
       myturn = 1
   else:
       myturn = comm.recv(source=proc_rank-1, tag=proc_rank-1)

   print(f"{proc_rank=}, {myturn=}")

   if proc_rank < proc_num - 1:
       comm.send(myturn, dest=proc_rank+1, tag=proc_rank)

SLEPc

SLEPc: PETSc 的本征值求解器扩展

SLEPc 是用来求 PETSc 矩阵的扩展库。

基本结构

SLEPc 有以下基本部分构成

  1. EPS (Eigenvalue Problem Solver)
  2. SVD (Singular Value Decomposition)
  3. PEP (Polynomial Eigenvalue Problem)
  4. NEP (Nonlinear Eigenvalue Problem)
  5. MFN (Matrix Function)

还有一些辅助模块

  1. ST (Spectral Transformation)
  2. BV (Basis Vectors)
  3. DS (Dense System)
  4. RG (Region)
  5. FN (Mathematical Function)

安装

下载时注意,SLEPc 的版本必须和 PETSc 版本一样

编译选项

   ./configure --download-arpack --download-blopex --download-hpddm --download-primme  --with-slepc4py=1

最后别忘了把环境变量 SLEPC_DIR 加上

基本使用

程序开始和结束时分别调用 SlepcInitialize()SlepcFinalize(), 实际上就是 PetscInitialize()PetscFinalize()

矩阵的使用跟 PETSc 完全一致

EPS: 矩阵本征值求解器

基本使用

  1. EPSCreate(MPI_Comm comm,EPS *eps);
    1. 创建求解器
  2. EPSSetOperators(EPS eps,Mat A,Mat B);
    1. 设置要解的算符
  3. EPSSetFromOptions(EPS eps);
    1. 添加运行时设置
  4. EPSSolve(EPS eps);
    1. 运行求解器
  5. EPSDestroy(EPS *eps);
    1. 销毁求解器
  6. EPSSetUp(EPS eps);
    1. 某些更具体的设置, 如果有需要的话,要在运行求解器之前设置
  7. EPSGetST(EPS eps,ST *st);
    1. 获得谱变换器(ST), EPS 会默认自动创建一个 ST 对象,如果需要设置谱变换,就要这样获得它
  8. EPSView(EPS eps,PetscViewer viewer);
    1. 打印求解器配置
    #include <slepceps.h>


    Mat A; /* problem matrix */
    /* do something to fill A */

    PetscErrorCode ierr;
    /* Create eigensolver context */
    EPS eps; /* eigenproblem solver context */
    ierr = EPSCreate(PETSC_COMM_WORLD, &eps);
    CHKERRQ(ierr);

    /* Set operators. In this case, it is a standard eigenvalue problem */
    ierr = EPSSetOperators(eps, A, NULL);
    CHKERRQ(ierr);
    ierr = EPSSetProblemType(eps, EPS_HEP);
    CHKERRQ(ierr);

    /* show EPS setup */
    ierr = EPSView(eps, PETSC_VIEWER_STDOUT_WORLD) ;
    CHKERRQ(ierr);

    /* Set solver parameters at runtime */
    ierr = EPSSetFromOptions(eps);
    CHKERRQ(ierr);

    /* Solve the eigensystem */
    ierr = EPSSolve(eps);
    CHKERRQ(ierr);

    /* Optional: Get some information from the solver and display it */
    PetscInt its;
    ierr = EPSGetIterationNumber(eps, &its);
    CHKERRQ(ierr);
    ierr = PetscPrintf(PETSC_COMM_WORLD, " Number of iterations of the method: %D\n", its);
    CHKERRQ(ierr);

    EPSType type;
    ierr = EPSGetType(eps, &type);
    CHKERRQ(ierr);
    ierr = PetscPrintf(PETSC_COMM_WORLD, " Solution method: %s\n\n", type);
    CHKERRQ(ierr);

    PetscInt nev;
    ierr = EPSGetDimensions(eps, &nev, NULL, NULL);
    CHKERRQ(ierr);
    ierr = PetscPrintf(PETSC_COMM_WORLD, " Number of requested eigenvalues: %D\n", nev);
    CHKERRQ(ierr);

    PetscReal tol;
    PetscInt maxit;
    ierr = EPSGetTolerances(eps, &tol, &maxit);
    CHKERRQ(ierr);
    ierr = PetscPrintf(PETSC_COMM_WORLD, " Stopping condition: tol=%.4g, maxit=%D\n", (double)tol, maxit);
    CHKERRQ(ierr);

    /* Display solution and clean up */
    /* Get number of converged approximate eigenpairs */
    PetscInt nconv;
    ierr = EPSGetConverged(eps, &nconv);
    CHKERRQ(ierr);
    ierr = PetscPrintf(PETSC_COMM_WORLD, " Number of converged eigenpairs: %D\n\n", nconv);
    CHKERRQ(ierr);

    if (nconv > 0) {
    /* Display eigenvalues and relative errors */
    ierr = PetscPrintf(PETSC_COMM_WORLD,
                        "           k          ||Ax-kx||/||kx||\n"
                        "   ----------------- ------------------\n");
    CHKERRQ(ierr);

    for (i = 0; i < nconv; i++) {
        /* Get converged eigenpairs: i-th eigenvalue is stored in kr (real part) and ki (imaginary part) */
        PetscScalar kr, ki;
        Vec xr, xi;
        ierr = MatCreateVecs(A, NULL, &xr);
        CHKERRQ(ierr);
        ierr = MatCreateVecs(A, NULL, &xi);
        CHKERRQ(ierr);
        ierr = EPSGetEigenpair(eps, i, &kr, &ki, xr, xi);
        CHKERRQ(ierr);

        /* Compute the relative error associated to each eigenpair */
        PetscReal error;
        ierr = EPSComputeError(eps, i, EPS_ERROR_RELATIVE, &error);
        CHKERRQ(ierr);

        PetscReal re = PetscRealPart(kr);
        PetscReal im = PetscImaginaryPart(kr);
        if (im != 0.0) {
            ierr = PetscPrintf(PETSC_COMM_WORLD, " %9f%+9fi %12g\n", (double)re, (double)im, (double)error);
            CHKERRQ(ierr);
        } else {
            ierr = PetscPrintf(PETSC_COMM_WORLD, "   %12f       %12g\n", (double)re, (double)error);
            CHKERRQ(ierr);
        }
    }
    ierr = PetscPrintf(PETSC_COMM_WORLD, "\n");
    CHKERRQ(ierr);
    }


    /* Free work space */

    ierr = EPSDestroy(&eps);
    CHKERRQ(ierr);
    ierr = MatDestroy(&A);
    CHKERRQ(ierr);
    ierr = VecDestroy(&xr);
    CHKERRQ(ierr);
    ierr = VecDestroy(&xi);
    CHKERRQ(ierr);

求解器配置

  • 问题类型

    问题类型使用 EPSSetProblemType(EPS eps,EPSProblemType type); 进行设置

    可选的有

    问题类型 EPSProblemTyle 命令行选项
    Hermitian EPSHEP -epshermitian
    Non-Hermitian EPSNHEP -epsnonhermitian
    Generalized Hermitian EPSGHEP -epsgenhermitian
    Generalized Hermitian indefinite EPSGHIEP -epsgenindefinite
    Generalized Non-Hermitian EPSGNHEP -epsgennonhermitian
    GNHEP with positive (semi-)definite BEPSPGNHEP -epsposgennonhermitian

    还可以用一系列函数来判断设置的问题类型

    1. EPSIsGeneralized(EPS eps,PetscBool *gen);
    2. EPSIsHermitian(EPS eps,PetscBool *her);
    3. EPSIsPositive(EPS eps,PetscBool *pos);
  • 求解本征值个数

    使用 EPSSetDimensions(EPS eps,PetscInt nev,PetscInt ncv,PetscInt mpd); 设置要求的本征值个数

    1. PetscInt nev 本征值个数
    2. PetscInt ncv 最大工作空间的维数,指的是用多少个中间向量, ncv 至少和 nev 一样多,最好是它的两倍以上
    3. PetscInt mpd 最大投影空间维数,用来计算很多本征值个数的时候,设置它可以减少工作空间的需求
  • 求解本征值位置
    1. EPSSetWhichEigenpairs(EPS eps,EPSWhich which); 设置要求本征值的位置
    2. EPSSetTarget(EPS eps,PetscScalar target); 设置计算距离某个值最近的本征值
    3. EPSSetInterval(EPS eps,PetscScalar a,PetscScalar b); 计算 \(\lambda \in [a, b]\) 的所有本征值
    4. EPSSetTwoSided(EPS eps,PetscBool twosided); 计算非厄米问题的左本征矢

    可选的本征值位置有

    EPSWhich 命令行 说明
    EPSLARGESTMAGNITUDE -epslargestmagnitude 最大 \(\vert \lambda\vert\)
    EPSSMALLESTMAGNITUDE -epssmallestmagnitude 最小 \(\vert\lambda\vert\)
    EPSLARGESTREAL -epslargestreal 最大 \(\mathrm{Re}(\lambda)\)
    EPSSMALLESTREAL -epssmallestreal 最小 \(\mathrm{Re}(\lambda)\)
    EPSLARGESTIMAGINARY -epslargestimaginary 最大 \(\mathrm{Im}(\lambda)\)
    EPSSMALLESTIMAGINARY -epssmallestimaginary 最小 \(\mathrm{Im}(\lambda)\)
    EPSTARGETMAGNITUDE -epstargetmagnitude 最小 \(\vert\lambda - \tau\vert\)
    EPSTARGETREAL -epstargetreal 最小 \(\vert\mathrm{Re}(\lambda-\tau)\vert\)
    EPSTARGETIMAGINARY -epstargetimaginary 最小 \(\vert\mathrm{Im}(\lambda-\tau)\vert\)
    EPSALL -epsall 所有 \(\lambda \in [a,b]\)
    EPSWHICHUSER   用户定义
  • 选择算法

    通过 EPSSetType(EPS eps,EPSType method); 设置求解器使用的算法

    SLEPc 支持的算法有

    • 基本算法
      • Power Itration, Rayleigh Quotient iteration(RQI)
      • Subspace Iteration with Rayleigh-Ritz projection and locking
      • Arnoldi method with explicit restart and deflation
      • Lanczos with explicit restart
    • Krylov-Schur, thick-restart Lanczos method (默认)
    • Generalized Davidson
    • Jacobi-Davidson
    • RQCG
    • LOBPCG
    • CISS
    • Lyapunov inverse iteration

    注意:

    1. 由于实现支持有限,只有 arnoldi, krylov-schur, gd, jd, arpack 支持所有类型的问题求解

      方法 EPSType 选项名
      Power / Inverse / RQI EPSPOWER power
      Subspace Iteration EPSSUBSPACE subspace
      Arnoldi EPSARNOLDI arnoldi
      Lanczos EPSLANCZOS lanczos
      Krylov-Schur EPSKRYLOVSCHUR krylovschur
      Generalized Davidson EPSGD gd
      Jacobi-Davidson EPSJD jd
      Rayleigh quotient CG EPSRQCG rqcg
      LOBPCG EPSLOBPCG lobpcg
      Contour integral SS EPSCISS ciss
      Lyapunov Inverse Iteration EPSLYAPII lyapii
      LAPACK solver EPSLAPACK lapack
      Wrapper to arpack EPSARPACK arpack
      Wrapper to primme EPSPRIMME primme
      Wrapper to evsl EPSEVSL evsl
      Wrapper to trlan EPSTRLAN trlan
      Wrapper to blopex EPSBLOPEX blopex
      Wrapper to scalapack EPSSCALAPACK scalapack
      Wrapper to elpa EPSELPA elpa
      Wrapper to elemental EPSELEMENTAL elemental
      Wrapper to feast EPSFEAST feast
  • 获得结果
    1. EPSGetConverged(EPS eps,PetscInt *nconv); 获得收敛的解个数
    2. EPSGetEigenpair(EPS eps,PetscInt j,PetscScalar *kr,PetscScalar *ki,Vec xr,Vec xi); 获得本征值和本征矢
    3. EPSGetLeftEigenvector(EPS eps,PetscInt j,Vec yr,Vec yi); 获得左本征矢,如果设置求解器计算左本征矢的话
    4. EPSComputeError(EPS eps,PetscInt j,EPSErrorType type,PetscReal *error); 获得结果的误差
      1. 可选的误差类型有 \(||r||\) EPS_ERROR_ABSOLUTE, \(||r||/|\lambda|\) EPS_ERROR_RELATIVE, \(||r||/(||A||+|\lambda| ||B||)\) EPS_ERROR_BACKWARD
    5. EPSGetIterationNumber(EPS eps,PetscInt *its); 获得迭代次数
    6. EPSSetTolerances(EPS eps,PetscReal tol,PetscInt max_it); 设置误差和最大迭代次数
      1. 默认误差 \(10^{-8}\)
    7. EPSSetTrueResidual(EPS eps,PetscBool trueres); 设置使用真实残差计算收敛条件,而不是默认的简化形式
    8. EPSSetConvergenceTest(EPS eps,EPSConv conv); 设置收敛条件
      1. 可选的误差类型有 \(||r||\) EPS_CONV_ABS, \(||r||/|\lambda|\) EPS_CONV_REL, \(||r||/(||A||+|\lambda| ||B||)\) EPS_CONV_NORM
    9. EPSGetErrorEstimate(EPS eps,PetscInt j,PetscReal *errest); 获得误差估计

    注意:

    1. 如果编译时使用实数模式,那么 kr, ki 等分别是实部和虚部
    2. 如果编译时使用复数模式,那么 kr, xr 中就保存复数结果, ki,xi 不使用(设为全零)
    3. 如果设置命令行选项 -eps_monitor 那么会在每次迭代过程中打印计算过程
    4. 其它命令行选项甚至可以画图 ( -eps_monitor draw:draw_lg -draw_pause .2
  • 其它设置
    • 初始猜解

      EPSSetInitialSpace(EPS eps,PetscInt n,Vec is[]);

    • 处理简并
    • 选择正交化方法
    • 选择滤波算法
    • 计算大量本征值的方法

pthread

pthread: POSIX Threads

pthread 是 POSIX 标准的线程 API, 放在头文件 <pthread.h> 中,并由 libc 提供实现。

参考

pthread API

标准 pthread API 包括以下几部分

  1. 线程管理:创建、检测、合并等
  2. 互斥锁(mutex):创建、销毁、加锁、解锁等
  3. 条件变量:在线程间共享锁、创建、销毁、等待和信号
  4. 同步:管理读写锁和屏障

    API 命名结构如下:

Routine Prefix Functional Group
pthread_ Threads themselves and miscellaneous subroutines
pthread_attr_ Thread attributes objects
pthread_mutex_ Mutexes
pthread_mutexattr_ Mutex attributes objects.
pthread_cond_ Condition variables
pthread_condattr_ Condition attributes objects
pthread_key_ Thread-specific data keys
pthread_rwlock_ Read/write locks
pthread_barrier_ Synchronization barriers

线程管理

创建和终止线程

包括以下函数

  1. pthread_create(thread,attr,start_routine,arg) 创建线程
  2. pthread_exit(status)
  3. pthread_cancel(thread)
  4. pthread_attr_init(attr) 创建线程属性对象
  5. pthread_attr_destroy(attr)

注意:

  1. 每个线程结束时最后都调用 pthread_exit 返回
  2. 最好主函数也用 pthread_exit 返回

合并和检测线程

  1. pthread_join (threadid,status)
  2. pthread_detach (threadid)
  3. pthread_attr_setdetachstate (attr,detachstate)
  4. pthread_attr_getdetachstate (attr,detachstate)

注意:

  1. 如果线程需要被 join 最好显式设置 joinable 属性,因为不是所有实现都默认加这个属性

栈管理

  1. pthread_attr_getstacksize (attr, stacksize)
  2. pthread_attr_setstacksize (attr, stacksize)
  3. pthread_attr_getstackaddr (attr, stackaddr)
  4. pthread_attr_setstackaddr (attr, stackaddr)

注意:

  1. 线程的栈大小是实现相关的,所以为了程序可移植,最好自己设置栈大小

其它

  1. pthread_self ()
  2. pthread_equal (thread1,thread2)
  3. pthread_once (once_control, init_routine)

互斥锁

典型的使用锁的过程是

  1. 创建并初始化锁变量
  2. 几个线程尝试加锁
  3. 只有一个会成功并获得锁
  4. 获得锁的线程进行一些操作
  5. 释放锁
  6. 另一个线程尝试获得锁
  7. 最终锁被销毁

创建和销毁锁

  1. pthread_mutex_init (mutex,attr)
  2. pthread_mutex_destroy (mutex)
  3. pthread_mutexattr_init (attr)
  4. pthread_mutexattr_destroy (attr)

注意:

  1. 创建锁变量可以用 pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;, 也可以用 pthread_mutex_init()

加锁和解锁

  1. pthread_mutex_lock (mutex)
  2. pthread_mutex_trylock (mutex)
  3. pthread_mutex_unlock (mutex)

条件变量

条件变量也是用于线程间同步的机制,与加锁的共享内存不同,条件变量用于传递消息。

创建和销毁条件变量

  1. pthread_cond_init (condition,attr)
  2. pthread_cond_destroy (condition)
  3. pthread_condattr_init (attr)
  4. pthread_condattr_destroy (attr)

注意:

  1. 创建条件变量可以用 pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER; 也可以用 pthread_cond_init()

等待和通信

  1. pthread_cond_wait (condition,mutex) 阻止线程直到某个条件被通信过来,应该在加锁时使用,会在等待时自动释放锁,当获得信号时又会把锁加回来
  2. pthread_cond_signal (condition) 用来向其它等待中的线程传递信号,应该在加锁时使用,在操作后传信号,最后解锁
  3. pthread_cond_broadcast (condition) 用来向多个线程传递信号

注意:

  1. 等待锁时应该使用 while 循环判断而不是 if

ck: Concurrency Kit

ck: Concurrency Kit

ck 是一个c语言的高并发数据结构库,内存安全并且无阻塞。

ckht.h: HashTable

哈希表包括在头文件 ck_ht.h 中,主要的数据类型有

  1. ck_ht_t 哈希表
  2. ck_ht_entry_t 哈希表中的键, 是用来读写哈希表的中间变量
  3. ck_ht_hash_t 键的哈希值
  4. ck_ht_iterator_t 用来遍历哈希表的迭代器

初始化哈希表

函数 ck_hs_init 参数说明:

  1. ck_hs_t * 哈希表
  2. unsigned int 哈希表的模式
  3. ck_hs_hash_cb_t * 哈希算法,传 NULL 使用默认算法
  4. struct ck_malloc * 内存分配器
  5. unsigned long 初始容量,只是建议值,内部实现可大可小
  6. unsigned long 哈希算法的随机数种子
    ck_ht_t ht;
    unsigned int mode = CK_HT_MODE_BYTESTRING;

    /* 设置内存分配器 */
    static void *
    ht_malloc(size_t r)
    {
        return malloc(r);
    }

    static void
    ht_free(void *p, size_t b, bool r)
    {

        (void)b;
        (void)r;
        free(p);
        return;
    }
    static struct ck_malloc my_allocator = {
        .malloc = ht_malloc,
        .free = ht_free
    };

    /* 初始化哈希表 */
    if (ck_ht_init(&ht, mode, ht_hash_wrapper, &my_allocator, 2, 6602834) == false) {
        perror("ck_ht_init");
        exit(EXIT_FAILURE);
    }

    /* 删除哈希表 */
    ck_ht_destroy(&ht);

读写哈希表

基本的逻辑是

  1. 使用 ck_ht_hash 计算键的哈希值
  2. 使用 ck_ht_entry_set 为写数据准备键值对,或使用 ck_ht_entry_key_set 为读数据准备键值对
  3. 使用 ck_ht_put_spmc 创建新键值对,使用 ck_ht_get_spmc 读取键值对,使用 ck_ht_set_spmc 更新值
  4. 使用 ck_ht_entry_value 获取读到的值
    ck_ht_t ht;
    ck_ht_entry_t entry;
    ck_ht_hash_t h;

    void *key;
    uint16_t key_len;
    void *val;

    /* 创建新值 */
    /* 注意这里的 key_len 是 bytes 长度,也就是数组长度*sizeof(数组类型) */
    ck_ht_hash(&h, &ht, &key, key_len);
    ck_ht_entry_set(&entry, h, &key, key_len, &val);
    ck_ht_put_spmc(&ht, h, &entry);

    /* 读取 */
    ck_ht_hash(&h, &ht, &key, key_len);
    ck_ht_entry_key_set(&entry, h, &key, key_len);
    ck_ht_put_spmc(&ht, h, &entry);
    void *v = ck_ht_entry_value(&entry);

    /* 更新 */
    ck_ht_hash(&h, &ht, &key, key_len);
    ck_ht_entry_set(&entry, h, &key, key_len, &val);
    ck_ht_put_spmc(&ht, h, &entry);
    void *val_old = ck_ht_entry_value(&entry);

遍历哈希表

基本步骤

  1. 使用 ck_ht_iterator_init 初始化迭代器
  2. 使用 ck_ht_next 迭代
  3. 使用 ck_ht_entry_keyck_ht_entry_value 读取键值

           ck_ht_t ht;
           ck_ht_iterator_t iterator = CK_HT_ITERATOR_INITIALIZER;
           ck_ht_entry_t *cursor;
    
           ck_ht_iterator_init(&iterator);
           while (ck_ht_next(&ht, &iterator, &cursor) == true) {
               void *val = ck_ht_entry_value(cursor);
               void *key = ck_ht_entry_key(cursor);
           }
    

greatest

greatest: c 的单元测试

greatest 是 c 的一个单个头文件库,用于写单元测试

基本使用

#include "greatest.h"

/* A test runs various assertions, then calls PASS(), FAIL(), or SKIP(). */
TEST x_should_equal_1(void) {
    int x = 1;
    /* Compare, with an automatic "1 != x" failure message */
    ASSERT_EQ(1, x);

    /* Compare, with a custom failure message */
    ASSERT_EQm("Yikes, x doesn't equal 1", 1, x);

    /* Compare, and if they differ, print both values,
     * formatted like `printf("Expected: %d\nGot: %d\n", 1, x);` */
    ASSERT_EQ_FMT(1, x, "%d");
    PASS();
}

/* Suites can group multiple tests with common setup. */
SUITE(the_suite) {
    RUN_TEST(x_should_equal_1);
}

/* Add definitions that need to be in the test runner's main file. */
GREATEST_MAIN_DEFS();

int main(int argc, char **argv) {
    GREATEST_MAIN_BEGIN();      /* command-line options, initialization. */

    /* Individual tests can be run directly in main, outside of suites. */
    /* RUN_TEST(x_should_equal_1); */

    /* Tests can also be gathered into test suites. */
    RUN_SUITE(the_suite);

    GREATEST_MAIN_END();        /* display results */
}