MPI
MPI: Message Passing Interface
MPI 是消息传递接口标准,主要的实现有 Intel MPI, OpenMPI 和 MPICH 等. MPI 标准目前有三版 MPI-1,MPI-2, MPI-3, 标准支持的语言是 c 和 fortran, c++ 支持在 MPI-3 中移除了。
MPI 的优势:
- 标准:所有超级计算机都支持
- 可移植:无需修改程序就能在所有支持 MPI 的平台上运行
- 高性能:所有实现都会根据机器做优化
- 功能齐全:MPI-3 中有超过 430 个函数,尽管大多数程序可能就用几十个函数
- 可用:相当多的实现都可用,包括供应商提供的和开放获取的
基本结构
程序结构
所有 MPI 程序都分以下几步
- 引用头文件
"mpi.h"
- 执行顺序代码
- 初始化 MPI 环境
- 执行并行任务
- 终止 MPI 环境
- 执行顺序代码
- 程序结束
接口约定
所有 MPI 的函数都以 MPI_
开头,并且下划线后的第一个字母大写,函数返回消息码 rc
, 如果成功就是 MPI_SUCCESS
, 即 0
基本概念
-
communicator: 通信器,定义哪些进程之间需要通信
-
MPI_COMM_WORLD
通信器表示所有进程都相互通信
-
- group: 进程的分组
- rank: 秩,指的是一个通信器中每个进程都有自己单独的编号,有时候也叫做 任务 ID ,秩是从 0 开始连续的整数
- 错误处理: 尽管大多数函数都返回一个错误码,但是 MPI 标准指出,当错误发生时程序应立刻终止,所以并不需要自己捕获和处理所有错误码
MPI 版本
MPI-1 (1994) :最早的版本,构建起了 MPI 的基本框架,包括
- 基本的环境管理
- 点到点通信
- 集体通信
- 组和通信器
- 虚拟拓扑
MPI-2 (1998) : 为 MPI-1 添加大量新函数
- 动态进程:可以在任务启动后创建新的进程
- 单边通信:提供单向通信功能,包括共享内存和远程累加操作
- 扩展集体通信:允许通信器间的集体通信
- 扩展接口:允许在 MPI 之上构造开发曾,包括 debugger 和 profiler 等
- 额外语言绑定:支持 fortran 90
- 并行 I/O
MPI-3 (2012) :对 MPI-1 和 MPI-2 更多的扩展
- 非阻塞集体操作
- 新的单边通信操作
- 近邻集体:扩展更多虚拟拓扑结构
- 支持 fortran 2008
- MPIT Tool Interface 允许 MPI 实现开放一些内部变量给用户,用于性能提升
- Matched Probe: 改进多线程中的 probe 功能
MPI-4 (2021) : TODO
- 增加大 count 参数支持
- 持久集体
- 部分通信
- 新的初始化方式
- 改进信息和错误处理
环境管理函数
-
MPI_Init (&argc,&argv)
- 初始化 MPI 环境,应在所有 MPI 函数之前调用,并且只能调用一次,这个函数具体做的事情是实现依赖的
-
MPI_Comm_size (comm,&size)
- 获得通信器
comm
中 MPI 进程的总数 - 如果通信器是
MPI_COMM_WORLD
那么就会得到程序的总进程数
- 获得通信器
-
MPI_Comm_rank (comm,&rank)
- 获得调用这个函数的进程在通信器
comm
中的秩,
- 获得调用这个函数的进程在通信器
-
MPI_Abort (comm,errorcode)
- 终止通信器
comm
中的所有进程
- 终止通信器
-
MPI_Get_processor_name (&name,&resultlength)
- 获得进程的名字和名字的长度
- 名字的缓冲区大小至少是
MPI_MAX_PROCESSOR_NAME
个sizeof(char)
- 具体返回的名字是什么是实现依赖的
-
MPI_Get_version (&version,&subversion)
- 获得 MPI 标准的版本号和子版本号
-
MPI_Initialized (&flag)
- 测试是否已经调用过
MPI_Init()
- 测试是否已经调用过
-
MPI_Wtime ()
- 返回该进程所处的时间,以秒为单位
- 不同进程的时间可能不一样,但大多数实现会同步这个时间
-
MPI_Wtick ()
- 返回
MPI_Wtime()
的精度
- 返回
-
MPI_Finalize ()
- 终止 MPI 环境,每个 MPI 程序只能调用一次
#include "mpi.h"
int main(int argc, char *argv[])
{
MPI_Init(&argc, &argv);
int num_tasks;
MPI_Comm_size(MPI_COMM_WORLD, &num_tasks);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
char hostname[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(hostname, &name_len);
MPI_Finalize();
}
点到点通信
一般参数说明
类型 | 举例的函数 |
---|---|
Blocking sends | MPISend(buffer,count,type,dest,tag,comm) |
Non-blocking sends | MPIIsend(buffer,count,type,dest,tag,comm,request) |
Blocking receive | MPIRecv(buffer,count,type,source,tag,comm,status) |
Non-blocking receive | MPIIrecv(buffer,count,type,source,tag,comm,request) |
参数说明:
- buffer: 要被收发的内容,内存地址,也就是指针
- count: 要收发的数据大小
-
type: 数据类型,全大写的一个量
MPI_CHAR
等,也可以自定义 - destination: 发送的目标进程的 rank
-
source: 接受的源进程的 rank,
- 如果是
MPI_ANY_SOURCE
那么就从所有进程接收
- 如果是
-
tag: 一个任意的非负整数参数,用来标记不同的信息,接收和发送两方的
tag
要相同,-
MPI_ANY_TAG
会接收所有 tag - MPI 标准指出 tag 应该在 0-32767, 不过大多数实现都支持更大的数
-
- comm: 通信器
-
status: 是收到的消息的属性,类型是
MPI_Status
-
MPI_Get_count()
可以获取收到的消息的大小
-
-
request: 非阻塞收发的回调,类型是
MPI_Request
常用阻塞点到点通信函数
-
MPI_Send()
基本的发送消息, 只有当发送信道再次可用时才返回 -
MPI_Recv()
基本的接收消息,阻塞直到接收信道再次可用 -
MPI_Ssend()
同步阻塞发送,阻塞直到发送信道可用并且接收方开始接收这个消息 -
MPI_Sendrecv()
发送消息并发送回执,阻塞直到发送信道可用,并且接收方已经收到消息 MPI_Wait()
MPI_Waitany()
MPI_Waitall()
-
MPI_Waitsome()
- 阻塞直到某个非阻塞的收发完成
-
MPI_Probe()
进行阻塞测试 -
MPI_Get_count()
返回收到的消息的源、标签和大小- 输出类型是
MPI_SOURCE
- 输出类型是
常用非阻塞点到点通信函数
-
MPI_Isend()
非阻塞发送,所指定的发送 buffer 应该在MPI_Probe()
确定已经发送完成之前不修改 -
MPI_Irecv()
非阻塞接收,同样不应该在确定接收完成前修改 buffer -
MPI_Issend()
非阻塞同步发送,类似MPI_Isend()
只是MPI_Probe()
会确定接收方已经收到消息 MPI_Test()
MPI_Testany()
MPI_Testall()
-
MPI_Testsome()
- 检查非阻塞收发操作的状态,输出值
flag
为0
表示没完成,1
表示已完成
- 检查非阻塞收发操作的状态,输出值
-
MPI_Iprobe()
进行非阻塞测试, 如果消息已经到了flag
为1
集体通信
集体通信操作的类型有
- 同步 阻塞直到所有进程都到达同步点
- 数据移动 broadcast, scatter, gather, all to all
- 集体计算(归约) min, max, add, multiply 等
常用集体通信函数
-
MPI_Barrier (comm)
同步所有进程 -
MPI_Bcast (&buffer,count,datatype,root,comm)
将数据从root
进程广播到所有进程 -
MPI_Scatter (&sendbuf,sendcnt,sendtype,&recvbuf,recvcnt,recvtype,root,comm)
将数据分散到所有进程 -
MPI_Gather (&sendbuf,sendcnt,sendtype,&recvbuf,recvcount,recvtype,root,comm)
从所有进程获取数据到root
-
MPI_Allgather (&sendbuf,sendcount,sendtype,&recvbuf,recvcount,recvtype,comm)
所有进程都获得总的数据 -
MPI_Reduce (&sendbuf,&recvbuf,count,datatype,op,root,comm)
执行一个归约操作op
到进程root
-
MPI_Allreduce (&sendbuf,&recvbuf,count,datatype,op,comm)
执行归约操作到所有进程 -
MPI_Reduce_scatter (&sendbuf,&recvbuf,recvcount,datatype,op,comm)
先 reduce 再 scatter -
MPI_Alltoall (&sendbuf,sendcount,sendtype,&recvbuf,recvcnt,recvtype,comm)
每个进程都执行 scatter -
MPI_Scan (&sendbuf,&recvbuf,count,datatype,op,comm)
扫描操作
自定义数据类型
用户定义的数据类型,称作 derived data types
-
MPI_Type_contiguous (count,oldtype,&newtype)
创建一个新的数据类型,是count
个旧类型数据的组合 -
MPI_Type_vector (count,blocklength,stride,oldtype,&newtype)
与MPI_Type_contiguous
相同,只不过每个数据间可以有间隔 MPI_Type_indexed (count,blocklens[],offsets[],old_type,&newtype)
MPI_Type_create_struct
MPI_Type_get_extent
-
MPI_Type_commit (&datatype)
向系统提交类型 -
MPI_Type_free (&datatype)
释放指定类型的对象
自定义结构体
MPI_Type_create_struct(count, array_of_blocklengths, array_of_displacements, array_of_types, newtype)
-
count
结构体中的元素数,也是下面三个数组的长度 -
array_of_blocklengths
每个块中的元素个数 -
array_of_displacements
每个块的偏移量 -
array_of_types
每个块的类型 -
newtype
输出
struct object {
char c;
double x[2];
int i;
};
MPI_Datatype newtype;
int object_len = 3;
int blocklengths[3];
MPI_Datatype types[3];
MPI_Aint displacements[3];
MPI_Aint current_displacement = 0;
blocklength[0] = 1;
types[0] = MPI_CHAR;
displacements[0] = offsetof(struct object, c);
blocklength[1] = 2;
types[1] = MPI_DOUBLE;
displacements[1] = offsetof(struct object, x);
blocklength[2] = 1;
types[2] = MPI_INT;
displacements[2] = offsetof(struct object, i);
MPI_Type_create_struct(object_len, blocklengths, displacements, types, &newtype);
MPI_Type_commit(&newtype);
struct object o;
if (proc_id == 0){
MPI_Send(&o, 1, newtype, 1, 0, comm);
}else if (proc_id == 1){
MPI_Recv(&o, 1, newtype, 0, 0, comm, MPI_STATUS_IGNORE);
}
MPI_Type_free(&newtype);
组和通信器管理
group 组是一些进程的有序集合,每个进程编号从 0 到 N-1, communicator 通信器是一组需要相互通信的进程,每个组都有一个通信器。从编程的角度看,组是用来指定哪些进程相互通信,进而创建通信器的。
典型的过程:
- 使用
MPI_Comm_group()
从MPI_COMM_WORLD
中取出全局组的句柄 - 使用
MPI_Group_incl()
从全局组的子集中创建新的组 - 使用
MPI_Comm_create()
为新的组创建通信器 - 使用
MPI_Comm_rank()
确定新的通信器中的 rank - 进行通信
- 使用
MPI_Comm_free()
和MPI_Group_free()
释放不用的组和通信器对象
基本通信器
-
MPI_COMM_WORLD
全部的进程 -
MPI_COMM_SELF
只包含当前进程 -
MPI_COMM_NULL
无效通信器,用于一些函数的错误码
通信器的类型是 MPI_Comm
复制通信器
复制一个一样的通信器,主要用于库函数编写时防止修改全局参数
-
MPI_Comm_dup()
复制一个通信器 -
MPI_Comm_idup()
复制通信器,非阻塞 -
MPI_Comm_dup_with_info()
复制通信器并传递 info -
MPI_Comm_idup_with_info()
复制通信器并传递 info, 非阻塞
划分通信器
使用 MPI_Comm_split(comm, color, key, newcomm)
将 comm
中有相同 color
的进程组成一个新的通信器 newcomm
, 新通信器中的 rank 由 key
指定
注意
- 代码会在所有进程中执行,所以只要进程中生成自己的
color
和key
,返回的通信器就包括这些进程 - 一般新通信器中的
rank
跟全局的保存一致就行了,MPI_Comm_rank(MPI_COMM_WORLD, &my_key)
MPI_Comm_spawn()
MPI_Comm_free()
从组创建通信器
组的类型是 MPI_Group
MPI_Comm_group()
-
MPI_Group_incl(group, n, ranks, newgroup)
将组中ranks
的进程添加到新组 -
MPI_Group_excl(group, n, ranks, newgroup)
除了ranks
以外的进程添加到新组 MPI_Group_difference()
MPI_Group_union()
MPI_Group_intersection()
MPI_Group_difference()
MPI_Group_size()
MPI_Group_rank()
MPI_Comm_create_group()
-
MPI_Comm_create(comm, group, newcomm)
从组创建通信器
通信器间的通信器
MPI_Intercomm_create(local_comm, local_leader, peer_comm, remote_leader, tag, newintercomm)
虚拟拓扑
MPI 的术语 virtual topologies 指的是一些进程相互连接构成一个几何形状,这个网络拓扑是虚拟的,与物理实际的连接无关。
当特殊的通信模式与一个拓扑结果相匹配时 MPI 虚拟拓扑就很有用了。
单边通信
基本概念
- Remote Memory Access(RMA), Remote Direct Momory Access (RDMA), 远程内存访问
- 指的是两个进程 origin 和 target, origin 发起动作 put/get, 访问 target 的内存
- 之所以叫单边通信,是因为 target 进程完全不知道发生了什么
- window
- 单边通信只能访问 target 进程所指定的一块内存,称为 window
- distributed shared memory, virtual shared memory
- 是除了 window 之外的另一种实现远程内存访问的方法
- 只有所谓的 Partitioned Global Address Space(PGAS)语言支持,例如 Unified Parallel C (UPC)
- active RMA 和 passive RMA
- active RMA 也叫做 active target synchronization, target 进程设置一个时间周期(epoch),在这其中 window 可以被访问,类似于同步数据转移
- passive RMA 也叫做 passive target synchronization, target 进程不对 window 的访问做限制,这种方式很高效,但是非常难 debug 并且很容易死锁
窗口
用于单边通信的内存区域就是窗口,变量类型为 MPI_Win
, 进程可以向其中存取任何东西
说明
- 窗口是定义在通信器上的,创建窗口操作是集体通信,即窗口的不同部分被不同的进程所拥有
- 每个进程的窗口大小独立设置,可以设为 0
- 窗口的内存分配和释放要显式进行
常用函数
-
MPI_Win_allocate()
分配窗口的内存 -
MPI_Win_free()
释放内存 -
MPI_Win_create()
基于指定的 buffer 创建窗口 -
MPI_Win_allocate_shared()
在共享内存的通信器上创建窗口 -
MPI_Win_create_dynamic()
创建窗口,但不分配内存
MPI_Info info;
MPI_Win window;
MPI_Win_allocate(size, disp_unit, info, comm, &memory, &window);
/* 进行操作 */
MPI_Win_free(&window);
动态分配内存
-
MPI_Win_create_dynamic(MPI_Info info, MPI_Comm comm, MPI_Win *win)
- 创建窗口,可以向其中动态的加入内存
-
int MPI_Win_free(MPI_Win *win)
- 释放窗口的内存
-
int MPI_Alloc_mem(MPI_Aint size, MPI_Info info, void *baseptr)
- 分配内存
-
size
是内存大小 -
baseptr
是输出的指针 -
info
可以是MPI_INFO_NULL
-
int MPI_Free_mem(void *base)
- 释放分配的内存
-
MPI_Win_attach(MPI_Win win, void *base, MPI_Aint size)
- 向窗口中添加内存
-
win
必须是MPI_Win_create_dynamic()
创建的窗口 - 窗口中的内存就是这里指针的内存,没有复制
-
MPI_Win_detach(MPI_Win win, void *base)
- 从窗口中去掉添加的内存
-
int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win)
- 对窗口整体加锁
-
lock_type
是MPI_LOCK_EXCLUSIVE
或MPI_LOCK_SHARED
-
rank
是获得锁的进程 -
assert
是用于做优化的参数,不需要就设assert=0
-
int MPI_Win_unlock(int rank, MPI_Win win)
- 解锁
-
int MPI_Get_address(const void *location, MPI_Aint *address)
- 获得调用内存中位置的地址
- 应该用这个函数获得的
MPI_Alloc_mem()
分配的地址作为值广播给其它需要用到这个值的进程 - 对于在窗口中的指针,这里获得的地址就是偏移量
- 可以把偏移量看做是窗口中的指针
-
MPI_Put(const void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
- 从
origin_addr
复制内存到target_rank
进程的从win
窗口的偏移target_disp
开始的地址
- 从
-
MPI_Get(void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, MPI_Win win)
- 从
target_rank
进程的win
窗口偏移target_disp
复制内存到origin_addr
- 从
-
int MPI_Compare_and_swap(const void *origin_addr, const void *compare_addr, void *result_addr, MPI_Datatype datatype, int target_rank, MPI_Aint target_disp, MPI_Win win)
- 比较并交换,原子操作
- 比较当前内存中的旧值
*result_addr
和之前读取到的旧值*compare_addr
,如果一样,则说明中间内存未被修改过,那么就用新值*origin_addr
替换旧值*result_addr
- 被修改的值属于
target_rank
进程 -
target_disp
是从窗口的起点到target_rank
进程中被替换的内存result_addr
的起点之间的偏移量
-
int MPI_Win_fence(int assert, MPI_Win win)
- 同步窗口在所有进程中
参考:
- 这里实现了一个链表 https://svn.mcs.anl.gov/repos/mpi/mpich2/trunk/test/mpi/rma/linked_list.c
- 指针的内存要在进程中自己释放,窗口中实际没有保存数据
例子
/* 创建窗口 */
MPI_Win win;
MPI_Win_create_dynamic(MPI_INFO_NULL, MPI_COMM_WORLD, &win);
/* 分配内存,并把它附加到窗口中,同时获得窗口中对应的偏移量,也就是窗口中的指针 */
int* i_ptr;
MPI_Alloc_mem(sizeof(int), MPI_INFO_NULL, &i_ptr);
MPI_Win_attach(win, i_ptr, sizeof(int));
MPI_Aint i_disp;
MPI_Get_address(i_ptr, &i_disp);
*i_ptr = proc_rank;
printf("%d, i_ptr=%p, %d, i_disp=%ld\n",proc_rank, i_ptr, *i_ptr, i_disp);
printf("i ptr dead\n");
/* 读数据 */
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
int get = 0;
MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
printf("%d: get= %d\n", proc_rank, get);
MPI_Win_unlock(proc_rank, win);
/* 写数据 */
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
int put = proc_rank+10;
MPI_Put(&put, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
MPI_Win_unlock(proc_rank, win);
/* 同步 */
MPI_Win_fence(0, win);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, proc_rank, 0, win);
get = 0;
MPI_Get(&get, 1, MPI_INT, proc_rank, i_disp, 1, MPI_INT, win);
printf("%d: get again= %d\n", proc_rank, get);
MPI_Win_unlock(proc_rank, win);
/* 释放内存时,要用本地指针释放 */
MPI_Free_mem(i_ptr);
MPI_Win_free(&win);
共享内存
-
int MPI_Comm_split_type(MPI_Comm comm, int split_type, int key, MPI_Info info, MPI_Comm *newcomm)
- 获得同一节点上的进程的通信器
split_type = MPI_COMM_TYPE_SHARED
comm = MPI_COMM_WORLD
key = proc_rank_world
-
int MPI_Win_allocate_shared (MPI_Aint size, int disp_unit, MPI_Info info, MPI_Comm comm, void *baseptr, MPI_Win *win)
- 创建共享内存数据
-
size
是数据的比特大小,即 个数 * sizeof(type) -
disp_unit
是单个数据的比特大小,即 sizeof(type) -
baseptr
是本地数据的指针 - 这个内存会在
MPI_Win_free()
时释放掉
-
int MPI_Win_shared_query (MPI_Win win, int rank, MPI_Aint *size, int *disp_unit, void *baseptr)
- 获得属于
rank
进程的内存在当前进程中的地址 -
size
win 大小 -
disp_unit
单位数据大小 -
baseptr
当前进程中用来访问这个内存的指针
- 获得属于
int proc_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);
/* 获得同一节点上的通信器,以及对应的 rank */
MPI_Comm comm_node;
int proc_rank_node;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
MPI_Comm_rank(comm_node, &proc_rank_node);
/* 获得不同节点上编号为 0 的进程之间的通信器 */
MPI_Comm comm_0;
int proc_rank_0;
MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
MPI_Comm_rank(comm_0, proc_rank_0);
/* 分配内存, 只有编号为 0 的进程才分配,其它进程的大小是 0 */
int* data;
MPI_Win data_win;
int win_size;
if (proc_rank == 0){
win_size = 1;
}else {
win_size = 0;
}
MPI_Win_allocate_shared(win_size, sizeof(int), MPI_INFO_NULL, comm_node, &data, &data_win);
/* 在全局编号 0 的进程上创建数据 */
if (proc_rank == 0){
*data = 1;
}
MPI_Barrier(comm_node);
MPI_Win_fence(0, data_win);
/* 分享数据给所有节点上编号 0 的进程 */
MPI_Bcast(&data, 1, MPI_INT, 0, comm_0);
/* 其它进程获得节点编号 0 进程上的指针 */
MPI_Aint size;
int disp_uint;
MPI_Win_shared_query(data_win, 0, &size, &disp_unit, &data_0);
/* 释放内存 */
MPI_Win_free(&data_win);
混合多线程
混合多线程的策略:
- 纯 MPI
- 每个节点一个 MPI 进程,全部的线程
- 每个 socket 一个 MPI 进程,和剩下的线程,比如20核的节点,每个节点两个进程,每个进程10线程
大量实践认为混合多线程并不会比纯 MPI 速度快。
线程初始化
使用 MPI_Init_thread()
初始化带有线程的 MPI,可用的选项有(注意并不是所有MPI实现都提供下面全部的模式)
-
MPI_THREAD_SINGLE
每个进程一个线程 -
MPI_THREAD_FUNNELED
每个进程有多个线程,但只有主线程可以调用 MPI 函数 -
MPI_THREAD_SERIALIZED
使用多个线程,每个线程都可以调用 MPI,但是只有一个线程可以调用同步通信 -
MPI_THREAD_MULTIPLE
使用多个线程,没有任何限制
使用 MPI_Query_thread()
来确定初始化之后提供的是哪种模式
使用 MPI_Is_thread_main()
来确定线程是否是主线程
设计模式
为每个节点创建一个通信器
用于创建节点内部的共享内存窗口
/* 获得同一节点上的通信器,以及对应的 rank */
MPI_Comm comm_node;
int proc_rank_node;
MPI_Comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, proc_rank, MPI_INFO_NULL, &comm_node);
MPI_Comm_rank(comm_node, &proc_rank_node);
/* 获得不同节点上编号为 0 的进程之间的通信器 */
MPI_Comm comm_0;
int proc_rank_0;
MPI_Comm_split(MPI_COMM_WORLD, proc_rank_node, proc_rank, &comm_0);
MPI_Comm_rank(comm_0, &proc_rank_0);
MPI_Comm_free(&comm_node);
MPI_Comm_free(&comm_0);
非自旋等待
- 阻塞通信是自旋锁,会一直燃烧 CPU, 为了给某个进程上的多线程任务让出 CPU,需要让其它进程非自旋等待
- 使用
MPI_Iprobe()
非阻塞地探测信号,并在探测周期中sleep()
int proc_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &proc_rank);
int finish = false;
MPI_Request finish_request;
if (proc_rank == 0) {
/* do sth pthread or openmp */
finish = true;
int proc_total = 0;
MPI_Comm_size(MPI_COMM_WORLD, &proc_total);
for(int i=1; i< proc_total; ++i){
MPI_Isend(&finish, 1, MPI_INT, i, 0, MPI_COMM_WORLD, &finish_request);
MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
}
}else{
int flag = false;
MPI_Status finish_status;
while(!flag){
MPI_Iprobe(0, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &finish_status);
if (flag){
MPI_Irecv(&finish, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &finish_request);
MPI_Wait(&finish_request, MPI_STATUS_IGNORE);
break;
}
sleep(1);
}
}
模拟单线程
随便写一个变量,按照顺序一个接一个的发送下去
from mpi4py import MPI
comm = MPI.COMM_WORLD
proc_rank = comm.Get_rank()
proc_num = comm.Get_size()
myturn = 0
if proc_rank == 0:
myturn = 1
else:
myturn = comm.recv(source=proc_rank-1, tag=proc_rank-1)
print(f"{proc_rank=}, {myturn=}")
if proc_rank < proc_num - 1:
comm.send(myturn, dest=proc_rank+1, tag=proc_rank)
评论
Comments powered by Disqus