POSIX 多进程编程
Table of Contents
并行计算方案包括多线程并行、多进程并行、异构并行和分布式并行。 这篇文章主要讲如何利用 POSIX 标准库(C语言)进行多进程并行计算,包括如何启动和终止子进程和进程间通信的两种方法:管道(pipe)与信号量(semaphore)。 至于其它的进程间通信方式,例如:共享内存(mmap)、网络(sockets)、信号(signal)和消息队列等,都是适用性更广也更复杂的技术,在这篇短文中无法涉及。
启动和终止子进程
使用 fork
创建子进程,使用 wait
或 waitpid
等待运行结束。
fork
创建子进程
#include <unistd.h>
pid_t fork(void);
返回值
- 执行成功:在父进程中返回子进程的
pid_t
, 在子进程中返回 0; - 执行失败:返回-1,并设置
errno
- 在 GNU C 中
pid_t
就是int
说明
-
fork
启动的子进程会获得与此时主进程完全一样的内存上下文,这意味着主进程此时可以访问的一切内容在子进程中都可用,甚至连指针地址都一样。 如果连续启动了两个子进程,在两次fork
中有任何变量在主进程被修改了,前面的子进程看到的还是原来的值,后面的子进程看到的是新的值。 同时由于进程之间内存是隔离的,子进程修改任何内存都不会影响到其它子进程。fork
对内存使用 copy-on-write 机制,没有修改的内存就访问同样的内容,但如果修改就会复制所有内存。
wait
等待子进程退出
#include <sys/wait.h>
pid_t wait(int *stat_loc);
参数
-
int *stat_loc
捕获子进程的退出值,设为NULL
则忽略退出值。具体的返回参数参看 man 2 wait
返回值
- 返回捕获到的子进程ID,失败返回-1,并设置
errno
waitpid
等待特定的子进程退出
#include <sys/wait.h>
pid_t waitpid(pid_t pid, int *stat_loc, int options);
参数
-
pid_t pid
等待特定的子进程 -
int *stat_loc
捕获子进程的退出值,设为NULL
则忽略退出值, 与wait
的参数相同 -
int options
等待的行为可以通过一系列选项来设置,具体的返回参数参看 man 2 waitpid,设成0
则与wait
行为相同。
返回值
- 返回捕获到的子进程ID,失败返回-1
例子:创建一系列子进程并等待
在这个例子中创建了 10 个子进程,按照创建顺序给它们标号,让这些进程按照创建顺序的逆序打印自己的序号,主进程则会等待到所有子进程退出再结束。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
int main()
{
int n_process = 10;
pid_t *pids = calloc(n_process, sizeof(pid_t));
for (int i = 0; i < n_process; i++) {
pids[i] = fork();
if (pids[i] == 0) {
/* in child process */
sleep(n_process - i);
printf("child %d\n", i);
printf("child %d, pid_t: %d\n", i, pids[i]);
/* pointer address is the same */
printf("child %d, pids ptr: %p\n", i, pids);
/* but value is different */
printf("child %d, all pids: ", i);
for (int i = 0; i < n_process; i++) {
printf("%d ", pids[i]);
}
printf("\n");
exit(0);
}
}
/* in parent process */
printf("all pids: \n");
for (int i = 0; i < n_process; i++) {
printf("%d ", pids[i]);
}
printf("\n");
/* get parent pid */
printf("pid: %d\n", getpid());
/* wait subprocess to finish */
for (int i = 0; i < n_process; i++) {
/* wait(NULL); */
waitpid(pids[i], NULL, 0);
}
free(pids);
return 0;
}
管道 Pipe
POSIX 提供了单向管道,可以从一个进程向另一个进程单向地传输信息。
pipe
创建管道
#include <unistd.h>
int pipe(int fildes[2]);
参数
-
int fildes[2]
创建一个管道的两端,值是整数文件标识,fildes[0]
是管道的出口,fildes[1]
是管道的入口 - 发送信息的进程应该先把管道出口关闭,而接收信息的进程应该先把管道入口关闭
- 如果管道的两端在同一个进程里都关闭了,再向这个管道发送信息会产生
SIGPIPE
信号
返回值
- 成功创建管道返回
0
, 失败返回-1
并设置errno
例子:pingpong
两个进程之间互相发送数据,一个数据在两个管道之间来回传递,类似打乒乓球,这是 MPI 中最简单的例子
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#define TOTAL_NUMBER 10
void child(int pipe_in, int pipe_out) {
pid_t pid = getpid();
printf("start pid: %d\n", pid);
int recv_count = 0;
int send_count = 0;
char buf[100] = {0};
int number = 0;
while (recv_count < TOTAL_NUMBER || send_count < TOTAL_NUMBER) {
/* clear buf */
for (int i = 0; i < 100; i++) {
buf[i] = 0;
}
/* read from pipe_in into buf */
read(pipe_in, buf, 100);
recv_count += 1;
number = atoi(buf);
printf("pid=%d: recv %d\n", pid, number);
/* write to pipe_out */
number += 1;
sprintf(buf, "%d", number);
write(pipe_out, buf, 100);
printf("pid=%d: send %d\n", pid, number);
send_count += 1;
}
}
int main (void)
{
int pipe_tochild[2];
int pipe_fromchild[2];
/* Create the pipes. */
if (pipe (pipe_tochild) || pipe(pipe_fromchild))
{
return EXIT_FAILURE;
}
if (fork() == 0) {
/* child process */
close(pipe_tochild[1]);
close(pipe_fromchild[0]);
child(pipe_tochild[0], pipe_fromchild[1]);
close(pipe_tochild[0]);
close(pipe_fromchild[1]);
exit(0);
}
/* parent process */
close(pipe_tochild[0]);
close(pipe_fromchild[1]);
/* send first number */
char buf[100];
sprintf(buf, "%d", 1);
write(pipe_tochild[1], buf, 100);
/* loop */
child(pipe_fromchild[0], pipe_tochild[1]);
close(pipe_fromchild[0]);
close(pipe_tochild[1]);
wait(NULL);
return 0;
}
信号量 System V Semaphores
信号量有两种,一个是旧的 System V 一个是标准 POSIX。System V 的支持更广,POSIX 标准在许多系统上都没有实现,所以这里只解释 System V 信号量。
semget
获得信号量集
#include <sys/sem.h>
int semget(key_t key, int nsems, int semflg);
新建或访问已有的信号量集。
参数
-
key_t key
信号量集的键。每个信号量集都有唯一的键,自定义键名通过 ftok 创建,也可以设成IPC_PRIVATE
来新建一个只有当前进程和子进程可见的信号量集 -
int nsems
信号量集中信号的个数 -
int semflg
控制创建和访问权限。-
IPC_CREAT
表示创建新的信号量集 -
IPC_CREAT | IPC_EXCL
表示创建新的信号量集,并且当之前已经存在信号量集时失败 -
IPC_CREAT | 0666
创建时可以设置权限,与文件的权限规则相同
-
返回值
- 成功时返回信号量集的ID
semid
,失败时返回 -1 并设置 errno
semctl
配置信号量集
#include <sys/sem.h>
int semctl(int semid, int semnum, int cmd, ...); /* union semun sem_perm */
union semun {
int val; /* value for SETVAL */
struct semid_ds *buf; /* buffer for IPC_STAT & IPC_SET */
u_short *array; /* array for GETALL & SETALL */
};
semctl
的作用配置信号量集 semid
中的第 semnum
个参数的值。
配置行为由 cmd
指定,配置的值是可选的第四个参数 sem_union
。
配置的值很多,参看 man semctl.
参数
-
int semid
信号量集的 ID -
int semnum
信号量集中的第几个信号 -
int cmd
要执行的操作,下面是常用的一些选项-
SETVAL
把信号量的值设成sem_union.val
-
SETALL
把所有的信号量的值都设成sem_union.val
-
IPC_RMID
清除信号量
-
- 可选的第四个参数
union semun sem_union
设置的值
返回值
- 成功时返回非负值,与
cmd
设置有关 - 失败时返回 -1,并设置 errno
semop
操作信号
#include <sys/sem.h>
int semop(int semid, struct sembuf *sops, size_t nsops);
struct sembuf {
u_short sem_num; /* semaphore */
short sem_op; /* semaphore operation */
short sem_flg; /* operation flags */
};
semop
的作用是在信号量集 semid
上执行 nsops
个原子操作,每个操作由数组 sops
定义。
配置行为由 cmd
指定,配置的值是可选的第四个参数 sem_union
。
配置的值很多,参看 man semctl.
参数
-
int semid
信号量集的 ID -
struct sembuf *sops
对信号的操作-
u_short sem_num
对第几个信号操作 -
short sem_op
具体的操作 -
short sem_flg
控制操作的行为
-
-
size_t nsops
数组sops
的长度
sem_op
的几种情况
-
>0
新的信号量值 = 旧的信号量值 +sem_op
, 立即执行 -
=0
阻塞直到信号量的值变成 0 -
<0
阻塞直到旧信号量 + sem_op >= 0
返回值
- 成功时返回 0
- 失败时返回 -1,并设置 errno
例子:互斥锁
这个例子里子进程和主进程一起对共享的变量 shared_int
数数,如果不加锁,最后输出的数字会比 2000000 少,也就是出现了数据竞争,而加锁之后就不会出现这种问题了。
可以注释掉 lock
和 unlock
来自己观察一下。
#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/mman.h>
void lock(int semid) {
struct sembuf sb = {0, -1, 0};
semop(semid, &sb, 1);
}
void unlock(int semid) {
struct sembuf sb = {0, 1, 0};
semop(semid, &sb, 1);
}
int main() {
int *shared_int = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
if (shared_int == MAP_FAILED) {
perror("mmap");
exit(EXIT_FAILURE);
}
int semid = semget(IPC_PRIVATE, 1, 0666 | IPC_CREAT);
union semun sem_union;
sem_union.val = 1;
semctl(semid, 0, SETVAL, sem_union);
if (fork() == 0) {
for (int i=0; i<1000000; ++i) {
lock(semid);
(*shared_int)++;
unlock(semid);
}
exit(0);
}
for (int i=0; i<1000000; ++i) {
lock(semid);
(*shared_int)++;
unlock(semid);
}
wait(NULL);
printf("final, %d\n", *shared_int);
semctl(semid, 0, IPC_RMID, sem_union);
munmap(shared_int, sizeof(int));
return 0;
}