MPI

MPI: Message Passing Interface

MPI 是消息传递接口标准,主要的实现有 Intel MPI, OpenMPI 和 MPICH 等. MPI 标准目前有三版 MPI-1,MPI-2, MPI-3, 标准支持的语言是 c 和 fortran, c++ 支持在 MPI-3 中移除了。

MPI 的优势:

  • 标准:所有超级计算机都支持
  • 可移植:无需修改程序就能在所有支持 MPI 的平台上运行
  • 高性能:所有实现都会根据机器做优化
  • 功能齐全:MPI-3 中有超过 430 个函数,尽管大多数程序可能就用几十个函数
  • 可用:相当多的实现都可用,包括供应商提供的和开放获取的

基本结构

程序结构

所有 MPI 程序都分以下几步

  1. 引用头文件 "mpi.h"
  2. 执行顺序代码
  3. 初始化 MPI 环境
  4. 执行并行任务
  5. 终止 MPI 环境
  6. 执行顺序代码
  7. 程序结束

接口约定

所有 MPI 的函数都以 MPI_ 开头,并且下划线后的第一个字母大写,函数返回消息码 rc , 如果成功就是 MPI_SUCCESS, 即 0

基本概念

  1. communicator: 通信器,定义哪些进程之间需要通信
    1. MPI_COMM_WORLD 通信器表示所有进程都相互通信
  2. group: 进程的分组
  3. rank: 秩,指的是一个通信器中每个进程都有自己单独的编号,有时候也叫做 任务 ID ,秩是从 0 开始连续的整数
  4. 错误处理: 尽管大多数函数都返回一个错误码,但是 MPI 标准指出,当错误发生时程序应立刻终止,所以并不需要自己捕获和处理所有错误码

MPI 版本

MPI-1 (1994) :最早的版本,构建起了 MPI 的基本框架,包括

  1. 基本的环境管理
  2. 点到点通信
  3. 集体通信
  4. 组和通信器
  5. 虚拟拓扑

MPI-2 (1998) : 为 MPI-1 添加大量新函数

  1. 动态进程:可以在任务启动后创建新的进程
  2. 单边通信:提供单向通信功能,包括共享内存和远程累加操作
  3. 扩展集体通信:允许通信器间的集体通信
  4. 扩展接口:允许在 MPI 之上构造开发曾,包括 debugger 和 profiler 等
  5. 额外语言绑定:支持 fortran 90
  6. 并行 I/O

MPI-3 (2012) :对 MPI-1 和 MPI-2 更多的扩展

  1. 非阻塞集体操作
  2. 新的单边通信操作
  3. 近邻集体:扩展更多虚拟拓扑结构
  4. 支持 fortran 2008
  5. MPIT Tool Interface 允许 MPI 实现开放一些内部变量给用户,用于性能提升
  6. Matched Probe: 改进多线程中的 probe 功能

MPI-4 (2021) : TODO

  1. 增加大 count 参数支持
  2. 持久集体
  3. 部分通信
  4. 新的初始化方式
  5. 改进信息和错误处理

环境管理函数

  1. MPI_Init (&argc,&argv)
    1. 初始化 MPI 环境,应在所有 MPI 函数之前调用,并且只能调用一次,这个函数具体做的事情是实现依赖的
  2. MPI_Comm_size (comm,&size)
    1. 获得通信器 comm 中 MPI 进程的总数
    2. 如果通信器是 MPI_COMM_WORLD 那么就会得到程序的总进程数
  3. MPI_Comm_rank (comm,&rank)
    1. 获得调用这个函数的进程在通信器 comm 中的秩,
  4. MPI_Abort (comm,errorcode)
    1. 终止通信器 comm 中的所有进程
  5. MPI_Get_processor_name (&name,&resultlength)
    1. 获得进程的名字和名字的长度
    2. 名字的缓冲区大小至少是 MPI_MAX_PROCESSOR_NAMEsizeof(char)
    3. 具体返回的名字是什么是实现依赖的
  6. MPI_Get_version (&version,&subversion)
    1. 获得 MPI 标准的版本号和子版本号
  7. MPI_Initialized (&flag)
    1. 测试是否已经调用过 MPI_Init()
  8. MPI_Wtime ()
    1. 返回该进程所处的时间,以秒为单位
    2. 不同进程的时间可能不一样,但大多数实现会同步这个时间
  9. MPI_Wtick ()
    1. 返回 MPI_Wtime() 的精度
  10. MPI_Finalize ()
    1. 终止 MPI 环境,每个 MPI 程序只能调用一次
   #include "mpi.h"

   int main(int argc, char *argv[])
   {
       MPI_Init(&argc, &argv);

       int num_tasks;
       MPI_Comm_size(MPI_COMM_WORLD, &num_tasks);

       int rank;
       MPI_Comm_rank(MPI_COMM_WORLD, &rank);

       char hostname[MPI_MAX_PROCESSOR_NAME];
       int name_len;
       MPI_Get_processor_name(hostname, &name_len);

       MPI_Finalize();
   }

点到点通信

一般参数说明

类型 举例的函数
Blocking sends MPISend(buffer,count,type,dest,tag,comm)
Non-blocking sends MPIIsend(buffer,count,type,dest,tag,comm,request)
Blocking receive MPIRecv(buffer,count,type,source,tag,comm,status)
Non-blocking receive MPIIrecv(buffer,count,type,source,tag,comm,request)

参数说明:

  1. buffer: 要被收发的内容,内存地址,也就是指针
  2. count: 要收发的数据大小
  3. type: 数据类型,全大写的一个量 MPI_CHAR 等,也可以自定义
  4. destination: 发送的目标进程的 rank
  5. source: 接受的源进程的 rank,
    1. 如果是 MPI_ANY_SOURCE 那么就从所有进程接收
  6. tag: 一个任意的非负整数参数,用来标记不同的信息,接收和发送两方的 tag 要相同,
    1. MPI_ANY_TAG 会接收所有 tag
    2. MPI 标准指出 tag 应该在 0-32767, 不过大多数实现都支持更大的数
  7. comm: 通信器
  8. status: 是收到的消息的属性,类型是 MPI_Status
    1. MPI_Get_count() 可以获取收到的消息的大小
  9. request: 非阻塞收发的回调,类型是 MPI_Request

常用阻塞点到点通信函数

  1. MPI_Send() 基本的发送消息, 只有当发送信道再次可用时才返回
  2. MPI_Recv() 基本的接收消息,阻塞直到接收信道再次可用
  3. MPI_Ssend() 同步阻塞发送,阻塞直到发送信道可用并且接收方开始接收这个消息
  4. MPI_Sendrecv() 发送消息并发送回执,阻塞直到发送信道可用,并且接收方已经收到消息
  5. MPI_Wait()
  6. MPI_Waitany()
  7. MPI_Waitall()
  8. MPI_Waitsome()
    1. 阻塞直到某个非阻塞的收发完成
  9. MPI_Probe() 进行阻塞测试
  10. MPI_Get_count() 返回收到的消息的源、标签和大小
    1. 输出类型是 MPI_SOURCE

常用非阻塞点到点通信函数

  1. MPI_Isend() 非阻塞发送,所指定的发送 buffer 应该在 MPI_Probe() 确定已经发送完成之前不修改
  2. MPI_Irecv() 非阻塞接收,同样不应该在确定接收完成前修改 buffer
  3. MPI_Issend() 非阻塞同步发送,类似 MPI_Isend() 只是 MPI_Probe() 会确定接收方已经收到消息
  4. MPI_Test()
  5. MPI_Testany()
  6. MPI_Testall()
  7. MPI_Testsome()
    1. 检查非阻塞收发操作的状态,输出值 flag0 表示没完成, 1 表示已完成
  8. MPI_Iprobe() 进行非阻塞测试, 如果消息已经到了 flag1

集体通信

nil

集体通信操作的类型有

  • 同步 阻塞直到所有进程都到达同步点
  • 数据移动 broadcast, scatter, gather, all to all
  • 集体计算(归约) min, max, add, multiply 等

常用集体通信函数

  1. MPI_Barrier (comm) 同步所有进程
  2. MPI_Bcast (&buffer,count,datatype,root,comm) 将数据从 root 进程广播到所有进程
  3. MPI_Scatter (&sendbuf,sendcnt,sendtype,&recvbuf,recvcnt,recvtype,root,comm) 将数据分散到所有进程
  4. MPI_Gather (&sendbuf,sendcnt,sendtype,&recvbuf,recvcount,recvtype,root,comm) 从所有进程获取数据到 root
  5. MPI_Allgather (&sendbuf,sendcount,sendtype,&recvbuf,recvcount,recvtype,comm) 所有进程都获得总的数据
  6. MPI_Reduce (&sendbuf,&recvbuf,count,datatype,op,root,comm) 执行一个归约操作 op 到进程 root
  7. MPI_Allreduce (&sendbuf,&recvbuf,count,datatype,op,comm) 执行归约操作到所有进程
  8. MPI_Reduce_scatter (&sendbuf,&recvbuf,recvcount,datatype,op,comm) 先 reduce 再 scatter
  9. MPI_Alltoall (&sendbuf,sendcount,sendtype,&recvbuf,recvcnt,recvtype,comm) 每个进程都执行 scatter
  10. MPI_Scan (&sendbuf,&recvbuf,count,datatype,op,comm) 扫描操作

自定义数据类型

用户定义的数据类型,称作 derived data types

  1. MPI_Type_contiguous (count,oldtype,&newtype) 创建一个新的数据类型,是 count 个旧类型数据的组合
  2. MPI_Type_vector (count,blocklength,stride,oldtype,&newtype)MPI_Type_contiguous 相同,只不过每个数据间可以有间隔
  3. MPI_Type_indexed (count,blocklens[],offsets[],old_type,&newtype)
  4. MPI_Type_create_struct
  5. MPI_Type_get_extent
  6. MPI_Type_commit (&datatype) 向系统提交类型
  7. MPI_Type_free (&datatype) 释放指定类型的对象

自定义结构体

MPI_Type_create_struct(count, array_of_blocklengths, array_of_displacements, array_of_types, newtype)

  1. count 结构体中的元素数,也是下面三个数组的长度
  2. array_of_blocklengths 每个块中的元素个数
  3. array_of_displacements 每个块的偏移量
  4. array_of_types 每个块的类型
  5. newtype 输出
    struct object {
        char c;
        double x[2];
        int i;
    };

    MPI_Datatype newtype;
    int object_len = 3;
    int blocklengths[3];
    MPI_Datatype types[3];
    MPI_Aint displacements[3];

    MPI_Aint current_displacement = 0;

    blocklength[0] = 1;
    types[0] = MPI_CHAR;
    displacements[0] = offsetof(struct object, c);

    blocklength[1] = 2;
    types[1] = MPI_DOUBLE;
    displacements[1] = offsetof(struct object, x);

    blocklength[2] = 1;
    types[2] = MPI_INT;
    displacements[2] = offsetof(struct object, i);

    MPI_Type_create_struct(object_len, blocklengths, displacements, types, &newtype);
    MPI_Type_commit(&newtype);

    struct object o;

    if (proc_id == 0){
        MPI_Send(&o, 1, newtype, 1, 0, comm);
    }else if (proc_id == 1){
        MPI_Recv(&o, 1, newtype, 0, 0, comm, MPI_STATUS_IGNORE);
    }

    MPI_Type_free(&newtype);

组和通信器管理

group 组是一些进程的有序集合,每个进程编号从 0 到 N-1, communicator 通信器是一组需要相互通信的进程,每个组都有一个通信器。从编程的角度看,组是用来指定哪些进程相互通信,进而创建通信器的。

典型的过程:

  1. 使用 MPI_Comm_group()MPI_COMM_WORLD 中取出全局组的句柄
  2. 使用 MPI_Group_incl() 从全局组的子集中创建新的组
  3. 使用 MPI_Comm_create() 为新的组创建通信器
  4. 使用 MPI_Comm_rank() 确定新的通信器中的 rank
  5. 进行通信
  6. 使用 MPI_Comm_free()MPI_Group_free() 释放不用的组和通信器对象

基本通信器

  • MPI_COMM_WORLD 全部的进程
  • MPI_COMM_SELF 只包含当前进程
  • MPI_COMM_NULL 无效通信器,用于一些函数的错误码

通信器的类型是 MPI_Comm

复制通信器

复制一个一样的通信器,主要用于库函数编写时防止修改全局参数

  1. MPI_Comm_dup() 复制一个通信器
  2. MPI_Comm_idup() 复制通信器,非阻塞
  3. MPI_Comm_dup_with_info() 复制通信器并传递 info
  4. MPI_Comm_idup_with_info() 复制通信器并传递 info, 非阻塞

划分通信器

使用 MPI_Comm_split(comm, color, key, newcomm)comm 中有相同 color 的进程组成一个新的通信器 newcomm, 新通信器中的 rank 由 key 指定

注意

  1. 代码会在所有进程中执行,所以只要进程中生成自己的 colorkey ,返回的通信器就包括这些进程
  2. 一般新通信器中的 rank 跟全局的保存一致就行了, MPI_Comm_rank(MPI_COMM_WORLD, &my_key)
  3. MPI_Comm_spawn()
  4. MPI_Comm_free()

从组创建通信器

组的类型是 MPI_Group

  1. MPI_Comm_group()
  2. MPI_Group_incl(group, n, ranks, newgroup) 将组中 ranks 的进程添加到新组
  3. MPI_Group_excl(group, n, ranks, newgroup) 除了 ranks 以外的进程添加到新组
  4. MPI_Group_difference()
  5. MPI_Group_union()
  6. MPI_Group_intersection()
  7. MPI_Group_difference()
  8. MPI_Group_size()
  9. MPI_Group_rank()
  10. MPI_Comm_create_group()
  11. MPI_Comm_create(comm, group, newcomm) 从组创建通信器

通信器间的通信器

MPI_Intercomm_create(local_comm, local_leader, peer_comm, remote_leader, tag, newintercomm)

虚拟拓扑

MPI 的术语 virtual topologies 指的是一些进程相互连接构成一个几何形状,这个网络拓扑是虚拟的,与物理实际的连接无关。

当特殊的通信模式与一个拓扑结果相匹配时 MPI 虚拟拓扑就很有用了。

单边通信

基本概念

  • Remote Memory Access(RMA), Remote Direct Momory Access (RDMA), 远程内存访问
    • 指的是两个进程 origin 和 target, origin 发起动作 put/get, 访问 target 的内存
    • 之所以叫单边通信,是因为 target 进程完全不知道发生了什么
  • window
    • 单边通信只能访问 target 进程所指定的一块内存,称为 window
  • distributed shared memory, virtual shared memory
    • 是除了 window 之外的另一种实现远程内存访问的方法
    • 只有所谓的 Partitioned Global Address Space(PGAS)语言支持,例如 Unified Parallel C (UPC)
  • active RMA 和 passive RMA
    • active RMA 也叫做 active target synchronization, target 进程设置一个时间周期(epoch),在这其中 window 可以被访问,类似于同步数据转移
    • passive RMA 也叫做 passive target synchronization, target 进程不对 window 的访问做限制,这种方式很高效,但是非常难 debug 并且很容易死锁

窗口

用于单边通信的内存区域就是窗口,变量类型为 MPI_Win, 进程可以向其中存取任何东西

说明

  1. 窗口是定义在通信器上的,创建窗口操作是集体通信,即窗口的不同部分被不同的进程所拥有
  2. 每个进程的窗口大小独立设置,可以设为 0
  3. 窗口的内存分配和释放要显式进行

常用函数

  1. MPI_Win_allocate() 分配窗口的内存
  2. MPI_Win_free() 释放内存
  3. MPI_Win_create() 基于指定的 buffer 创建窗口
  4. MPI_Win_allocate_shared() 在共享内存的通信器上创建窗口
  5. MPI_Win_create_dynamic() 创建窗口,但不分配内存
    MPI_Info info;
    MPI_Win window;
    MPI_Win_allocate(size, disp_unit, info, comm, &memory, &window);

    /* 进行操作 */

    MPI_Win_free(&window);

动态分配内存

  1. MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win *win)
    1. 创建窗口,可以向其中动态的加入内存
  2. int MPI_Win_free(MPI_Win *win)
    1. 释放窗口的内存
  3. int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr)
    1. 分配内存
    2. size 是内存大小
    3. baseptr 是输出的指针
    4. info 可以是 MPI_INFO_NULL
  4. int MPI_Free_mem(void *base)
    1. 释放分配的内存
  5. MPI_Win_attach(MPI_Win win, void *base, MPI_Aint size)
    1. 向窗口中添加内存
    2. win 必须是 MPI_Win_create_dynamic() 创建的窗口
    3. 窗口中的内存就是这里指针的内存,没有复制
  6. MPI_Win_detach(MPI_Win win, void *base)
    1. 从窗口中去掉添加的内存
  7. int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win)
    1. 对窗口整体加锁
    2. lock_typeMPI_LOCK_EXCLUSIVEMPI_LOCK_SHARED
    3. rank 是获得锁的进程
    4. assert 是用于做优化的参数,不需要就设 assert=0
  8. int MPI_Win_unlock(int rank, MPI_Win win)
    1. 解锁
  9. int MPI_Get_address(const void *location, MPI_Aint *address)
    1. 获得调用内存中位置的地址
    2. 应该用这个函数获得的 MPI_Alloc_mem() 分配的地址作为值广播给其它需要用到这个值的进程
    3. 对于在窗口中的指针,这里获得的地址就是偏移量
    4. 可以把偏移量看做是窗口中的指针
  10. MPI_Put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
    1. origin_addr 复制内存到 target_rank 进程的从 win 窗口的偏移 target_disp 开始的地址
  11. MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
    1. target_rank 进程的 win 窗口偏移 target_disp 复制内存到 origin_addr
  12. int MPI_Compare_and_swap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Win win)
    1. 比较并交换,原子操作
    2. 比较当前内存中的旧值 *result_addr 和之前读取到的旧值 *compare_addr ,如果一样,则说明中间内存未被修改过,那么就用新值 *origin_addr 替换旧值 *result_addr
    3. 被修改的值属于 target_rank 进程
    4. target_disp 是从窗口的起点到 target_rank 进程中被替换的内存 result_addr 的起点之间的偏移量
  13. int MPI_Win_fence(int assert, MPI_Win win)
    1. 同步窗口在所有进程中

参考:

  1. 这里实现了一个链表 https://svn.mcs.anl.gov/repos/mpi/mpich2/trunk/test/mpi/rma/linked_list.c
  2. 指针的内存要在进程中自己释放,窗口中实际没有保存数据

例子

    /* 创建窗口 */
    MPI_Win win;
    MPI_Win_create_dynamic(MPI_INFO_NULL, MPI_COMM_WORLD, &win);

    /* 分配内存,并把它附加到窗口中,同时获得窗口中对应的偏移量,也就是窗口中的指针 */
    int* i_ptr;
    MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &i_ptr);
    MPI_Win_attach(win, i_ptr, sizeof(int));
    MPI_Aint i_disp;
    MPI_Get_address(i_ptr, &i_disp);

    *i_ptr = proc_rank;
    printf("%d, i_ptr=%p, %d, i_disp=%ld\n",proc_rank, i_ptr, *i_ptr, i_disp);

    printf("i ptr dead\n");

    /* 读数据 */
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    int get = 0;
    MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    printf("%d: get= %d\n", proc_rank, get);
    MPI_Win_unlock(proc_rank, win);

    /* 写数据 */
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    int put = proc_rank+10;
    MPI_Put(&put, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    MPI_Win_unlock(proc_rank, win);

    /* 同步 */
    MPI_Win_fence(0, win);

    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
    get = 0;
    MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
    printf("%d: get again= %d\n", proc_rank, get);
    MPI_Win_unlock(proc_rank, win);

    /* 释放内存时,要用本地指针释放 */
    MPI_Free_mem(i_ptr);
    MPI_Win_free(&win);

共享内存

  1. int MPI_Comm_split_type(MPI_Comm comm, int split_type, int key, MPI_Info info, MPI_Comm *newcomm)
    1. 获得同一节点上的进程的通信器
    2. split_type = MPI_COMM_TYPE_SHARED
    3. comm = MPI_COMM_WORLD
    4. key = proc_rank_world
  2. int MPI_Win_allocate_shared (MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win)
    1. 创建共享内存数据
    2. size 是数据的比特大小,即 个数 * sizeof(type)
    3. disp_unit 是单个数据的比特大小,即 sizeof(type)
    4. baseptr 是本地数据的指针
    5. 这个内存会在 MPI_Win_free() 时释放掉
  3. int MPI_Win_shared_query (MPI_Win win, int rank, MPI_Aint *size, int *disp_unit, void *baseptr)
    1. 获得属于 rank 进程的内存在当前进程中的地址
    2. size win 大小
    3. disp_unit 单位数据大小
    4. baseptr 当前进程中用来访问这个内存的指针
    int proc_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);

    /* 获得同一节点上的通信器,以及对应的 rank */
    MPI_Comm comm_node;
    int proc_rank_node;
    MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
    MPI_Comm_rank(comm_node, &proc_rank_node);

    /* 获得不同节点上编号为 0 的进程之间的通信器 */
    MPI_Comm comm_0;
    int proc_rank_0;
    MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
    MPI_Comm_rank(comm_0, proc_rank_0);

    /* 分配内存, 只有编号为 0 的进程才分配,其它进程的大小是 0 */
    int* data;
    MPI_Win data_win;
    int win_size;
    if (proc_rank == 0){
        win_size = 1;
    }else {
        win_size = 0;
    }
    MPI_Win_allocate_shared(win_size, sizeof(int), MPI_INFO_NULL, comm_node, &data, &data_win);

    /* 在全局编号 0 的进程上创建数据 */
    if (proc_rank == 0){
        *data = 1;
    }

    MPI_Barrier(comm_node);
    MPI_Win_fence(0, data_win);

    /* 分享数据给所有节点上编号 0 的进程 */
    MPI_Bcast(&data, 1, MPI_INT, 0, comm_0);

    /* 其它进程获得节点编号 0 进程上的指针 */
    MPI_Aint size;
    int disp_uint;
    MPI_Win_shared_query(data_win, 0, &size, &disp_unit, &data_0);

    /* 释放内存 */
    MPI_Win_free(&data_win);

混合多线程

混合多线程的策略:

  1. 纯 MPI
  2. 每个节点一个 MPI 进程,全部的线程
  3. 每个 socket 一个 MPI 进程,和剩下的线程,比如20核的节点,每个节点两个进程,每个进程10线程

大量实践认为混合多线程并不会比纯 MPI 速度快。

线程初始化

使用 MPI_Init_thread() 初始化带有线程的 MPI,可用的选项有(注意并不是所有MPI实现都提供下面全部的模式)

  1. MPI_THREAD_SINGLE 每个进程一个线程
  2. MPI_THREAD_FUNNELED 每个进程有多个线程,但只有主线程可以调用 MPI 函数
  3. MPI_THREAD_SERIALIZED 使用多个线程,每个线程都可以调用 MPI,但是只有一个线程可以调用同步通信
  4. MPI_THREAD_MULTIPLE 使用多个线程,没有任何限制

使用 MPI_Query_thread() 来确定初始化之后提供的是哪种模式

使用 MPI_Is_thread_main() 来确定线程是否是主线程

设计模式

为每个节点创建一个通信器

用于创建节点内部的共享内存窗口

   /* 获得同一节点上的通信器,以及对应的 rank */
   MPI_Comm comm_node;
   int proc_rank_node;
   MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
   MPI_Comm_rank(comm_node, &proc_rank_node);

   /* 获得不同节点上编号为 0 的进程之间的通信器 */
   MPI_Comm comm_0;
   int proc_rank_0;
   MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
   MPI_Comm_rank(comm_0, &proc_rank_0);

   MPI_Comm_free(&comm_node);
   MPI_Comm_free(&comm_0);

非自旋等待

  1. 阻塞通信是自旋锁,会一直燃烧 CPU, 为了给某个进程上的多线程任务让出 CPU,需要让其它进程非自旋等待
  2. 使用 MPI_Iprobe() 非阻塞地探测信号,并在探测周期中 sleep()
   int proc_rank;
   MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);

   int finish = false;
   MPI_Request finish_request;

   if (proc_rank == 0) {

       /* do sth pthread or openmp */

       finish = true;
       int proc_total = 0;
       MPI_Comm_size(MPI_COMM_WORLD, &proc_total);
       for(int i=1; i< proc_total; ++i){
           MPI_Isend(&finish, 1, MPI_INT, i, 0, MPI_COMM_WORLD, &finish_request);
           MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
       }

   }else{
       int flag = false;
       MPI_Status finish_status;

       while(!flag){
           MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &finish_status);
           if (flag){
               MPI_Irecv(&finish, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &finish_request);
               MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
               break;
           }
           sleep(1);
       }
   }

模拟单线程

随便写一个变量,按照顺序一个接一个的发送下去

   from mpi4py import MPI

   comm = MPI.COMM_WORLD
   proc_rank = comm.Get_rank()
   proc_num = comm.Get_size()

   myturn = 0

   if proc_rank == 0:
       myturn = 1
   else:
       myturn = comm.recv(source=proc_rank-1, tag=proc_rank-1)

   print(f"{proc_rank=}, {myturn=}")

   if proc_rank < proc_num - 1:
       comm.send(myturn, dest=proc_rank+1, tag=proc_rank)

评论

Comments powered by Disqus