POSIX 多进程编程

并行计算方案包括多线程并行、多进程并行、异构并行和分布式并行。 这篇文章主要讲如何利用 POSIX 标准库(C语言)进行多进程并行计算,包括如何启动和终止子进程和进程间通信的两种方法:管道(pipe)与信号量(semaphore)。 至于其它的进程间通信方式,例如:共享内存(mmap)、网络(sockets)、信号(signal)和消息队列等,都是适用性更广也更复杂的技术,在这篇短文中无法涉及。

启动和终止子进程

使用 fork 创建子进程,使用 waitwaitpid 等待运行结束。

fork 创建子进程

#include <unistd.h>

pid_t fork(void);

返回值

  • 执行成功:在父进程中返回子进程的 pid_t, 在子进程中返回 0;
  • 执行失败:返回-1,并设置 errno
  • 在 GNU C 中 pid_t 就是 int

说明

  • fork 启动的子进程会获得与此时主进程完全一样的内存上下文,这意味着主进程此时可以访问的一切内容在子进程中都可用,甚至连指针地址都一样。 如果连续启动了两个子进程,在两次 fork 中有任何变量在主进程被修改了,前面的子进程看到的还是原来的值,后面的子进程看到的是新的值。 同时由于进程之间内存是隔离的,子进程修改任何内存都不会影响到其它子进程。 fork 对内存使用 copy-on-write 机制,没有修改的内存就访问同样的内容,但如果修改就会复制所有内存。

wait 等待子进程退出

#include <sys/wait.h>

pid_t wait(int *stat_loc);

参数

  • int *stat_loc 捕获子进程的退出值,设为 NULL 则忽略退出值。具体的返回参数参看 man 2 wait

返回值

  • 返回捕获到的子进程ID,失败返回-1,并设置 errno

waitpid 等待特定的子进程退出

#include <sys/wait.h>

pid_t waitpid(pid_t pid, int *stat_loc, int options);

参数

  • pid_t pid 等待特定的子进程
  • int *stat_loc 捕获子进程的退出值,设为 NULL 则忽略退出值, 与 wait 的参数相同
  • int options 等待的行为可以通过一系列选项来设置,具体的返回参数参看 man 2 waitpid,设成 0 则与 wait 行为相同。

返回值

  • 返回捕获到的子进程ID,失败返回-1

例子:创建一系列子进程并等待

在这个例子中创建了 10 个子进程,按照创建顺序给它们标号,让这些进程按照创建顺序的逆序打印自己的序号,主进程则会等待到所有子进程退出再结束。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

int main()
{

  int n_process = 10;
  pid_t *pids = calloc(n_process, sizeof(pid_t));

  for (int i = 0; i < n_process; i++) {
    pids[i] = fork();
    if (pids[i] == 0) {
      /* in child process */
      sleep(n_process - i);
      printf("child %d\n", i);
      printf("child %d, pid_t: %d\n", i, pids[i]);
      /* pointer address is the same */
      printf("child %d, pids ptr: %p\n", i, pids);
      /* but value is different */
      printf("child %d, all pids: ", i);
      for (int i = 0; i < n_process; i++) {
        printf("%d ", pids[i]);
      }
      printf("\n");

      exit(0);
    }
  }

  /* in parent process */
  printf("all pids: \n");
  for (int i = 0; i < n_process; i++) {
    printf("%d ", pids[i]);
  }
  printf("\n");

  /* get parent pid */
  printf("pid: %d\n", getpid());

  /* wait subprocess to finish */
  for (int i = 0; i < n_process; i++) {
    /* wait(NULL); */
    waitpid(pids[i], NULL, 0);
  }

  free(pids);

  return 0;
}

管道 Pipe

POSIX 提供了单向管道,可以从一个进程向另一个进程单向地传输信息。

pipe 创建管道

#include <unistd.h>

int pipe(int fildes[2]);

参数

  • int fildes[2] 创建一个管道的两端,值是整数文件标识, fildes[0] 是管道的出口, fildes[1] 是管道的入口
  • 发送信息的进程应该先把管道出口关闭,而接收信息的进程应该先把管道入口关闭
  • 如果管道的两端在同一个进程里都关闭了,再向这个管道发送信息会产生 SIGPIPE 信号

返回值

  • 成功创建管道返回 0, 失败返回 -1 并设置 errno

例子:pingpong

两个进程之间互相发送数据,一个数据在两个管道之间来回传递,类似打乒乓球,这是 MPI 中最简单的例子

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#define TOTAL_NUMBER 10

void child(int pipe_in, int pipe_out) {
  pid_t pid = getpid();
  printf("start pid: %d\n", pid);

  int recv_count = 0;
  int send_count = 0;
  char buf[100] = {0};
  int number = 0;

  while (recv_count < TOTAL_NUMBER || send_count < TOTAL_NUMBER) {

    /* clear buf */
    for (int i = 0; i < 100; i++) {
      buf[i] = 0;
    }

    /* read from pipe_in into buf */
    read(pipe_in, buf, 100);
    recv_count += 1;
    number = atoi(buf);
    printf("pid=%d: recv %d\n", pid, number);

    /* write to pipe_out */
    number += 1;
    sprintf(buf, "%d", number);
    write(pipe_out, buf, 100);
    printf("pid=%d: send %d\n", pid, number);
    send_count += 1;

  }

}

int main (void)
{
  int pipe_tochild[2];
  int pipe_fromchild[2];

  /* Create the pipes. */
  if (pipe (pipe_tochild) || pipe(pipe_fromchild))
    {
      return EXIT_FAILURE;
    }

  if (fork() == 0) {
    /* child process */
    close(pipe_tochild[1]);
    close(pipe_fromchild[0]);
    child(pipe_tochild[0], pipe_fromchild[1]);
    close(pipe_tochild[0]);
    close(pipe_fromchild[1]);
    exit(0);
  }
  /* parent process */
  close(pipe_tochild[0]);
  close(pipe_fromchild[1]);

  /* send first number */
  char buf[100];
  sprintf(buf, "%d", 1);
  write(pipe_tochild[1], buf, 100);

  /* loop */
  child(pipe_fromchild[0], pipe_tochild[1]);
  close(pipe_fromchild[0]);
  close(pipe_tochild[1]);

  wait(NULL);

  return 0;
}

信号量 System V Semaphores

信号量有两种,一个是旧的 System V 一个是标准 POSIX。System V 的支持更广,POSIX 标准在许多系统上都没有实现,所以这里只解释 System V 信号量。

semget 获得信号量集

#include <sys/sem.h>

int semget(key_t key, int nsems, int semflg);

新建或访问已有的信号量集。

参数

  • key_t key 信号量集的键。每个信号量集都有唯一的键,自定义键名通过 ftok 创建,也可以设成 IPC_PRIVATE 来新建一个只有当前进程和子进程可见的信号量集
  • int nsems 信号量集中信号的个数
  • int semflg 控制创建和访问权限。
    • IPC_CREAT 表示创建新的信号量集
    • IPC_CREAT | IPC_EXCL 表示创建新的信号量集,并且当之前已经存在信号量集时失败
    • IPC_CREAT | 0666 创建时可以设置权限,与文件的权限规则相同

返回值

  • 成功时返回信号量集的ID semid ,失败时返回 -1 并设置 errno

semctl 配置信号量集

#include <sys/sem.h>

int semctl(int semid, int semnum, int cmd, ...); /* union semun sem_perm */

union semun {
    int     val;            /* value for SETVAL */
    struct  semid_ds *buf;  /* buffer for IPC_STAT & IPC_SET */
    u_short *array;         /* array for GETALL & SETALL */
};

semctl 的作用配置信号量集 semid 中的第 semnum 个参数的值。 配置行为由 cmd 指定,配置的值是可选的第四个参数 sem_union 。 配置的值很多,参看 man semctl.

参数

  • int semid 信号量集的 ID
  • int semnum 信号量集中的第几个信号
  • int cmd 要执行的操作,下面是常用的一些选项
    • SETVAL 把信号量的值设成 sem_union.val
    • SETALL 把所有的信号量的值都设成 sem_union.val
    • IPC_RMID 清除信号量
  • 可选的第四个参数 union semun sem_union 设置的值

返回值

  • 成功时返回非负值,与 cmd 设置有关
  • 失败时返回 -1,并设置 errno

semop 操作信号

#include <sys/sem.h>

int semop(int semid, struct sembuf *sops, size_t nsops);

struct sembuf {
    u_short sem_num;        /* semaphore */
    short   sem_op;         /* semaphore operation */
    short   sem_flg;        /* operation flags */
};

semop 的作用是在信号量集 semid 上执行 nsops 个原子操作,每个操作由数组 sops 定义。 配置行为由 cmd 指定,配置的值是可选的第四个参数 sem_union 。 配置的值很多,参看 man semctl.

参数

  • int semid 信号量集的 ID
  • struct sembuf *sops 对信号的操作
    • u_short sem_num 对第几个信号操作
    • short sem_op 具体的操作
    • short sem_flg 控制操作的行为
  • size_t nsops 数组 sops 的长度

sem_op 的几种情况

  • >0 新的信号量值 = 旧的信号量值 + sem_op, 立即执行
  • =0 阻塞直到信号量的值变成 0
  • <0 阻塞直到 旧信号量 + sem_op >= 0

返回值

  • 成功时返回 0
  • 失败时返回 -1,并设置 errno

例子:互斥锁

这个例子里子进程和主进程一起对共享的变量 shared_int 数数,如果不加锁,最后输出的数字会比 2000000 少,也就是出现了数据竞争,而加锁之后就不会出现这种问题了。 可以注释掉 lockunlock 来自己观察一下。

#include <sys/types.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/sem.h>
#include <sys/mman.h>


void lock(int semid) {
  struct sembuf sb = {0, -1, 0};
  semop(semid, &sb, 1);
}

void unlock(int semid) {
  struct sembuf sb = {0, 1, 0};
  semop(semid, &sb, 1);
}

int main() {

  int *shared_int = mmap(NULL, sizeof(int), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
  if (shared_int == MAP_FAILED) {
    perror("mmap");
    exit(EXIT_FAILURE);
  }

  int semid = semget(IPC_PRIVATE, 1, 0666 | IPC_CREAT);
  union semun sem_union;
  sem_union.val = 1;
  semctl(semid, 0, SETVAL, sem_union);

  if (fork() == 0) {
    for (int i=0; i<1000000; ++i) {
      lock(semid);
      (*shared_int)++;
      unlock(semid);
    }
    exit(0);
  }


  for (int i=0; i<1000000; ++i) {
    lock(semid);
    (*shared_int)++;
    unlock(semid);
  }

  wait(NULL);

  printf("final, %d\n", *shared_int);

  semctl(semid, 0, IPC_RMID, sem_union);
  munmap(shared_int, sizeof(int));

  return 0;
}

评论

Comments powered by Disqus