我们知道多进程的程序会比多线程的程序在稳定性上面好一些,因为资源是隔离的。但这也是的多进程程序在相互通信方面比多线程复杂一些,但Linux系统还是提供了很多进程通信的方式,统称为IPC(InterProcess Communication).常见的IPC有管道、FIFO、消息队列、信号量、共享内存以及套接字等。

一,管道(PIPE)

管道(有时也称为无名管道,和后面要介绍的FIFO相对)是UNIX/LINUX系统IPC里面最简单的一个,但是它的使用却是非常广泛的。这个名字很形象,其实简单理解他就是一个可以流通数据的管道——数据从一端流入,从另一端流出。比如下图,当我们创建一个Pipe后,我们从fd[1]写数据到该管道,然后数据经pipe从fd[0]端流出。

pipe-1

因为很简单,所以管道有两种局限性:

  • 管道一般是半双工的(有的系统可能实现为全双工),即数据只能在一个方向上流动。
  • 它们只能在具有公共祖先的进程间使用。一般管道最经典的使用方式是一个管道由一个进程创建,然后该进程调用fork,此后父子进程之间就可以利用该管道通信。
    我们解释一下上面的第二条:如下图,假设父进程打开了0和1号描述符,然后我们调用fork,我们知道子进程会复制父进程的文件描述符,并且父子进程共享同一个文件表项(见我之前的博客《Linux文件I/O》)。此时,父子进程的0号描述符都指向管道的读端,1号描述符都指向管道的写端。如果我们想构建从父进程到子进程的数据流向管道时,我们就可以关闭父进程的0号描述符,关闭子进程的1号描述符。

pipe-2

创建管道可以使用pipe函数:

#include <unistd.h>

int pipe(int pipefd[2]);

函数成功返回0,失败返回-1.如果调用成功,经由pipefd返回两个文件描述符:pipefd[0]是读端描述符,pipefd[1]是写端描述符。

下面我们看一个例子:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>

int main()
{
    int     pfds[2];
    char    buf[20];

    pipe(pfds);

    if (fork() > 0)
    {
        close(pfds[0]);  /* parent close read end */
        printf ( "parent: writing to the pipe ...n" );
        write(pfds[1], "test", 5);

        wait(NULL); /* wait child process exit in case of a zombie process */
        exit(0);
    }
    else
    {
        close(pfds[1]); /* child close write end */
        printf ( "child: reading from pipe ...n" );
        read(pfds[0], buf, 20);
        printf ( "child read message: %sn", buf );
        exit(0);
    }

    return 0;
}

这个例子很简单(仅作为示例,没有出错处理),父进程创建管道,然后fork出子进程。父进程关闭读端,子进程关闭写端。父进程通过管道向子进程发送数据。程序运行结果:

allan@NYC:~/test$ ./a.out 
parent: writing to the pipe ...
child: reading from pipe ...
child read message: test

最后,我们再看两个问题:

1.  如果管道的一端关闭后,会怎样?

  • 当读一个写端已经关闭的管道时,在所有数据都被读取后,read返回0,以指示到达了文件结束处。
  • 如果写一个读端已关闭的管道,则会产生SIGPIPE。

2.  管道的缓冲区多大?

管道是内核提供的一种IPC,所以它的缓冲也是内核确定的。常量PIPE_BUF规定了内核中管道的缓冲区大小。如果对管道调用write,而且要求写的字节数小于等于PIPE_BUF,则此操作不会与其它进程对同一管道(或后面要介绍FIFO)的write穿插进行。但是,若多个进程同时写一个管道/FIFO,而且进程要求写的字节数超过PIPE_BUF字节数时,则写操作数据就可能穿插进行。

二,FIFO

FIFO(First in, First out)又被称为命名管道。Pipe只能是具有血缘关系的进程才可以相互通信,但是FIFO却可以在不相关的进程间交换数据(因为FIFO是有"名字"的)。我们可以使用mkfifo或者mknod创建FIFO:

#include <sys/types.h>
#include <sys/stat.h>

int mkfifo(const char *pathname, mode_t mode);
int mknod(const char *path, mode_t mode, dev_t dev);

一般mkfifo也是调用的mknod。两个函数都是成功返回0,失败返回-1。一般的I/O函数都可用于FIFO。

下面我们先看两个例子程序:speak.c和tick.c。speak.c会往FIFO里面写数据,tick.c会从FIFO读数据。

speak.c:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>

#define FIFO_NAME "fifo_test"

int main(void)
{
    char    s[300];
    int     status, num, fd;

    status = mkfifo(FIFO_NAME, S_IFIFO | 0666);
    if (status != 0)
    {
        if (errno == EEXIST)
        {
            printf ( "fifo already exists.n" );
        }
        else
        {
            printf ( "mkfifo failed.n" );
            return EXIT_FAILURE;
        }
    }

    printf ( "waiting for readers ...n" );
    fd = open(FIFO_NAME, O_WRONLY);
    printf ( "got a reader -- type some stuffn" );

    while (fgets(s, 300, stdin), !feof(stdin))
    {
        if ((num = write(fd, s, strlen(s))) == -1)
            perror("write");
        else
            printf ( "speak: wrote %d bytesn", num);
    }

    return EXIT_SUCCESS;
}               /* ----------  end of function main  ---------- */

tick.c:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>

#define FIFO_NAME "fifo_test"

int main(void)
{
    char    s[300];
    int     status, num, fd;

    status = mkfifo(FIFO_NAME, S_IFIFO | 0666);
    if (status < 0)
    {
        if (errno == EEXIST)
        {
            printf ( "fifo already existsn" );
        }
        else
        {
            printf ( "mkfifo failed, errno=%dn", errno );
            return EXIT_FAILURE;
        }
    }

    printf ( "waiting for writers ...n" );
    fd = open(FIFO_NAME, O_RDONLY);
    printf ( "got a writern" );

    do {
        if ((num = read(fd, s, 300)) == -1)
            perror("read");
        else
        {
            s[num] = '';
            printf ( "tick: read %d bytes: "%s"n", num, s );
        }

    }while (num > 0);

    return EXIT_SUCCESS;
}

我们编译之后运行时会发现:不管是先运行speak还是先运行tick,都会阻塞在open的地方,直到运行另外一个程序的时候阻塞的程序才会继续往下运行。原因是默认是以阻塞的方式打开FIFO的,在这种模式下,(1)以只读打开FIFO时进程会阻塞到有其他进程为写此FIFO而打开(2)以只写打开FIFO时,进程会阻塞到有其他进程为读此FIFO而打开。当然,我们可以在打开FIFO的时候指定为O_NOBLOCK,这样的话只读open立即返回。只写open的话如果没有进程为写而打开此FIFO,那么将出错返回-1,errno是ENXIO。

另外:

  1. 类似于Pipe,若用write写一个尚未读而打开的FIFO,则产生SIGPIPE。
  2. 若某个FIFO的最后一个写进程关闭了该FIFO,则将为该FIFO的读进程产生一个文件结束标志。
  3. 一般一个给定的FIFO有多个写进程是很常见的,这就意味着如果不希望多个进程所写的数据互相穿插,则需要考虑原子操作。和Pipe类似,厂里PIPE_BUF说明了可被原子的写到FIFO的最大数据量。

三,XSI IPC

1. XSI IPC共同点

XSI IPC是消息队列、信号量、共享内存的统称,因为他们有很多相似的地方。

每个内核中的IPC结构都用一个非负整数的标识符加以引用。例如,为了对一个消息队列发送或获取消息,只需要直到其队列标识符。与文件描述符不同,IPC标识符不是小的整数。当一个IPC结构被创建,以后又被删除时,与这种结构相关的标识符连续加1,直至达到一个整形数的最大正值,然后又转回0.不过标识符是IPC对象的内部名。为了使多个合作的进程能够在同一个IPC对象上会合,需要提供一个外部名方案。为此使用了键(key),每个IPC对象都与一个键相关联,于是键就用作该对象的外部名。我们可以使用ftok函数来获取一个key(key_t):

#include <sys/types.h>
#include <sys/ipc.h>

key_t ftok(const char *pathname, int proj_id);

函数成功返回key,失败返回-1.

关于该函数有以下几个要点:

  • pathname必须是存在且可访问的文件;
  • proj_id是非0整数,ftok会使用它的低八位;
  • ftok的内部机制一般是获取给定的文件的stat结构,从该结构中获取部分st_dev和st_ino字段,然后再与proj_id组合起来。所以,对于相同的参数,ftok返回的key肯定是一样的。

另外,XSI IPC还为每一个IPC结构设置了一个ipc_perm结构,该结构规定了IPC对象的权限和所有者,它至少包括下列成员:

struct ipc_perm {
uid_t uid; /* owner’s effective user ID */
gid_t gid; /* owner’s effective group ID */
uid_t cuid; /* creator’s effective user ID */
gid_t cgid; /* creator’s effective group ID */
mode_t mode; /* access modes */
.
.
.
};

在创建IPC结构时,对所有字段赋初值,以后可以调用msgctl、semctl或shmctl修改这些字段,调用进程必须是IPC结构的创建者或者超级用户。

另外,系统对于每种XSI IPC都有资源限制(使用ipcs -l命令可以查看),后面会看到。当然我们可以更改这些限制,比如使用sysctl命令。

2. 消息队列

消息队列是消息的链接表,存放在内核中并由消息队列标识符标识。每个消息队列都有一个msqid_ds结构与其相关联:

struct msqid_ds {
   struct ipc_perm msg_perm;     /* Ownership and permissions */
   time_t          msg_stime;    /* Time of last msgsnd(2) */
   time_t          msg_rtime;    /* Time of last msgrcv(2) */
   time_t          msg_ctime;    /* Time of last change */
   unsigned long   __msg_cbytes; /* Current number of bytes in
                                    queue (nonstandard) */
   msgqnum_t       msg_qnum;     /* Current number of messages
                                    in queue */
   msglen_t        msg_qbytes;   /* Maximum number of bytes
                                    allowed in queue */
   pid_t           msg_lspid;    /* PID of last msgsnd(2) */
   pid_t           msg_lrpid;    /* PID of last msgrcv(2) */
};

Linux内核对于消息队列的限制:

allan@NYC:~$ ipcs -ql

------ Messages Limits --------
系统最大队列数量 = 32000
max size of message (bytes) = 8192   # 可发送最长消息的字节数
default max size of queue (bytes) = 16384  # 一个特定队列的最大字节数(即队列中所有消息之和)

和消息队列相关的函数:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

int msgctl(int msqid, int cmd, struct msqid_ds *buf);

int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

(1)msgget:用于打开一个现存队列或者创建一个新队列

满足下列两个条件之一就会创建一个新的消息队列:

  • key是IPC_PRIVATE
  • flag为IPC_CREATE,且没有当前队列为key
    如果我们要创建一个新的IPC结构,而且要确保不是引用具有同一标识符的一个现行IPC结构,那就必须在flag中同时指定IPC_CREATE和IPC_EXCL位。这样做了以后,如果IPC结构已经存在就会造成出错,返回EEXIST。后面要介绍的semget、sheget和msgget类似。另外,msg_qnum、msg_lspid、msg_lrpid、msg_stime、msg_rtime都被设置为0;msg_ctime设置为当前时间;msg_qbytes设置为系统限制值。

函数执行成功,msgget返回非负队列ID,此后,该值可被用于其他三个消息队列函数。

(2)msgctl:对队列执行多种操作

cmd参数说明对由msqid指定的队列要执行的命令:

IPC_STAT  取此队列的msqid_ds结构,并将它存放在buf指向的结构中。

IPC_SET  按由buf指向的结构的值,设置与此队列相关结构中的下列四个字段:msg_perm.uid、msg_perm.gid、msg_perm.mode和msg_qbytes。此命令只能由下列两种进程执行:一种是其有效用户ID等于msg_perm.cuid或msg_perm.uid;另一种是具有超级用户特权的进程。只有超级用户才能增加msg_qbytes的值。

IPC_RMID  从系统中删除该消息队列以及仍在队列中的所有数据。这种删除立即生效。仍在使用这一消息队列的其他进程在他们下一次试图对此队列进行操作时,将出错返回EIDRM。此命令只能由下列两种进程执行:一种是其有效用户ID等于msg_perm.cuid或msg_perm.uid;另一种是具有超级用户特权的进程。
这三个参数也适用于后面要介绍的信号量和共享内存。

(3)msgsnd:将数据放到消息队列中

消息队列的每个消息包含一个正长整型类型字段,一个非负长度以及实际数据字节(对应于长度)。消息总是放在队列尾端。msgp参数指向一个长整型数,它包含了正的整型消息类型,在其后紧跟着消息数据。若msgsz参数是0,则无消息数据。若发送的最长消息是512字节,则可定义下列结构:

struct mymesg
{
    long mtype;      /* positive message type */
    char mtext[512]; /* message data, of length nbytes */
};

必须要注意的是msgsz参数指的是消息体的长度,不包含正的整型消息类型。

参数msgflg的值可以指定为IPC_NOWAIT。这类似于文件I/O的非阻塞标志。若消息队列已满(或者是队列中的消息总数等于系统限制值,或队列中的字节总数等于系统限制值),则执行IPC_NOWAIT使得msgsnd立即出错返回EAGAIN。如果没有指定IPC_NOWAIT,则进程阻塞直到下述情况出现为止:有空间可以容纳要发送的消息;从系统中删除了此队列(msgsnd返回-1,且errno设置为EIDRM);或捕捉到一个信号,并从信号处理返回(msgsnd返回-1,errno设置为EINTR)。

当msgsnd成功返回,与消息队列相关的msqid_ds结构得到了更新,以标明发出该调用的进程ID(msg_lspid)、进行该调用的时间(msg_stime),并指示队列中增加了一条消息。

(4)msgrcv:从队列中取消息

参数msgp和msgsnd含义一样,有一个消息类型和一个存放实际消息的数据缓冲区组成。msgsz说明数据缓冲区的长度。若返回的消息大于msgsz,而且在msgflg中设置了MSG_NOERROR,则该消息被截断(在这种情况下,不通知我们消息截断了,消息的截去部分被丢弃)。如果没有设置这个标志,而消息又太长,则出错返回E2BIG(消息仍留在队列中)。

参数msgtyp使我们可以指定想要哪一种消息:

  • msgtyp == 0:返回队列中的第一个消息;
  • msgtyp >= 0:返回队列中消息类型为msgtyp的第一个消息;
  • msgtyp <= 0:返回队列中消息类型值小于或等于msgtyp绝对值的消息,如果这个消息有若干个,则取类型值最小的消息。
    参数msgflg如果为IPC_NOWAIT,则操作不阻塞,这使得如果没有指定类型的消息,则msgrcv返回-1,errno设置为ENOMSG。如果没有指定,则进程阻塞直到下列情况出现:有了指定类型的消息;从系统中删除了此队列(msgrcv返回-1,且errno设置为EIDRM);或捕捉到一个信号并从喜好处理程序返回(msgrcv返回-1,errno设置为EINTR)。

msgrcv成功执行时,内核更新与消息队列相关的msqid_ds结构。

最后我们看两个例子,send_msg.c创建队列,并往队列发送消息;recv_msg.c从队列读取消息:

send_msg.c:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

struct my_msgbuf
{
    long    mtype;
    char    mtext[200];
};

int main(void)
{
    struct my_msgbuf    buf;
    int                 msqid;
    key_t               key;

    if ((key = ftok("send_msg.c", 'B')) == -1)
    {
        perror("ftok");
        exit(1);
    }

    if ((msqid = msgget(key, 0644 | IPC_CREAT)) == -1)
    {
        perror("msgget");
        exit(1);
    }

    printf ( "Enter lines of text, ^D to quit:n" );

    buf.mtype = 1; /* we don't really care in this case */

    while (fgets(buf.mtext, sizeof buf.mtext, stdin) != NULL)
    {
        int len = strlen(buf.mtext);

        if (buf.mtext[len-1] == 'n')
            buf.mtext[len-1] = '';

        if (msgsnd(msqid, &buf, len+1, 0) == -1) /* +1 for '' */
            perror("msgsnd");
    }

    if (msgctl(msqid, IPC_RMID, NULL) == -1)
    {
        perror("msgctl");
        exit(1);
    }

    return 0;
}

recv_msg.c:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

struct my_msgbuf
{
    long    mtype;
    char    mtext[200];
};

int main(void)
{
    struct my_msgbuf    buf;
    int                 msqid;
    key_t               key;

    if ((key = ftok("send_msg.c", 'B')) == -1)
    {
        perror("ftok");
        exit(1);
    }

    if ((msqid = msgget(key, 0644)) == -1)
    {
        perror("msgget");
        exit(1);
    }

    printf ( "ready to receive message:n" );

    for (;;)
    {
        if (msgrcv(msqid, &buf, sizeof buf.mtext, 0, 0) == -1)
        {
            perror("msgrcv");
            exit(1);
        }
        printf ( ""%s"n", buf.mtext );
    }

    return 0;
}

3. 信号量

信号量不同于我们之前介绍的IPC机构,它是一个计数器,用于多进程对共享对象的访问。但是XSI的信号量集比较复杂,有三个特性造成了这种非必要的复杂性:

  • XSI提供的是信号量集,而不是单个信号量。我们只能创建信号量集,而不能创建单个信号量。当然如果我们创建信号量集时,指定集合中的信号量数目为0,就创建了一个信号量。
  • 创建信号量集(semget)和对其赋初值(semctl)是分开的。这是一个致命的弱点,因为不能原子的创建一个信号量集,并且对该集合中的信号量赋初值。
  • 即使没有进程正在使用各种形式的XSI IPC,他们仍然是存在的。有些程序在终止时并没有释放已经分配给它的信号量集,所以我们不得不为这种程序担心。后面要说明的undo功能就是假定要处理这种情况的。

内核为每个信号量集设置了一个semid_ds结构(定义在sys/sem.h中):

struct semid_ds {
   struct ipc_perm sem_perm;  /* Ownership and permissions */
   time_t          sem_otime; /* Last semop time */
   time_t          sem_ctime; /* Last change time */
   unsigned long   sem_nsems; /* No. of semaphores in set */
};

每个信号量由一个无名结构表示,它至少包含下列成员:

struct 
{
    unsigned short    semval;        /* semaphore value, always &gt;= 0 */
    pid_t            sempid;        /* pid for last operation */
    unsigned short    semncnt;    /* # processes awaiting semval &gt; curval */
    unsigned short    semzcnt;    /* # processes awaiting semval == 0 */
    .
    .
    .
};

和信号量相关的函数:

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

int semget(key_t key, int nsems, int semflg);
成功返回信号量ID,失败返回-1.

int semctl(int semid, int semnum, int cmd, ... /* union semun arg */);
失败返回-1,成功返回值依赖于cmd。

int semop(int semid, struct sembuf *sops, size_t nsops);
成功返回0,失败返回-1.

(1)semget:创建信号量

semget和msgget类似,多了一个nsems参数。这个参数是要创建的信号量集中的信号量数。如果是创建新集合(一般在服务器进程中),则必须指定nsems。如果是引用一个现存的集合,则讲nsems指定为0.

(2)semclt:对信号量进行各种操作

这个函数的第四个参数是可选的,如果使用该参数,则其类型是semun,他是多个特定命令参数的联合(union)(不是指向联合的指针):

union semun {
   int              val;    /* Value for SETVAL */
   struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
   unsigned short  *array;  /* Array for GETALL, SETALL */
   struct seminfo  *__buf;  /* Buffer for IPC_INFO
                               (Linux-specific) */
};

cmd参数指定下列10种命令中的一种,在semid指定的信号量集合上执行此命令。其中5条命令是针对一个特定的信号量的,它们用semnum指定该信号量集合中的一个信号量成员。semnum值在0和nsems-1之间(包括0和nsems-1)。

IPC_STAT     对此集合取semid_ds结构,并存放在arg.buf指向的结构中。

IPC_SET      按由arg.buf指向结构中的值设置与此集合相关结构中的下列三个字段值:sem_perm.uid、sem_perm.gid和sem_perm.mode。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或sem_perm.uid的进程;另                    一种是超级用户特权进程。

IPC_RMID   与之前消息队列那里介绍的一样。

GETVAL      返回成员semnum的semval值。

SETVAL      设置成员semnum的semval值。该值由arg.val指定。

GETPID       返回成员semnum的sempid值。

GETNCNT   返回成员semnum的semncnt值。

GETZCNT    返回成员semnum的semzcnt值。

GETALL      取该集合中所有信号量的值,并将它们存放在存放在由arg.array指向的数组中。

SETALL      按arg.array指向的数组中的值,设置该集合中所有信号量的值。
对于除GETALL以外的所有get命令,semctl函数都返回相应的值。其他命令返回值为0.

(3)semop:操作信号量

semop函数自动执行信号量集合上的操作数组,这是个原子操作。参数sops是个指针,它指向一个信号量操作数组,信号量操作由sembuf结构表示:

struct sembuf
{
   unsigned short sem_num;  /* member # in set (0, 1, ..., nsems-1)  */
   short          sem_op;   /* semaphore operations(negative, 0, or positive) */
   short          sem_flg;  /* operation flags: IPC_NOWAIT, SEM_UNDO */
};

参数nsops规定该数组中操作的数量(元素数)。

对集合中每个成员的操作由相应的sem_op值规定。此值可以是负值、0或正值(下面的讨论将提到信号量的undo标志。此标志对应于相应sem_flg成员的SEM_UNDO位。)。

  1. 最易于处理的情况是sem_op为正。这对应于进程释放占用的资源数。sem_op值加到信号量的值上。如果指定了undo标志,则也从该进程的此信号量调整值中减去sem_op。
  2. 若sem_op为负,则表示要获得由该信号量控制的资源。如若该信号量大于或等于sem_op的绝对值,则从信号量值中减去sem_op的绝对值。这保证信号量的结果值大于等于0.如果指定了undo标志,则sem_op的绝对值也加到该进程的此信号量调整值上。如果信号值小于sem_op的绝对值(资源不能满足要求),则:

    1. 若指定了IPC_NOWAIT,则semop出错返回EAGAIN。
    2. 若未指定IPC_NOWAIT,则该信号量的semncnt值加1(因为调用进程将进入休眠状态),然后调用进程被挂起直至下列事件之一发生:

      1. 此信号量变成大于或等于sem_op的绝对值(即某个进程已释放了某些资源)。此信号量的semncnt值减1,并且从信号量值中减去sem_op的绝对值。如果指定了undo标志,则sem_op的绝对值也加到该进程的此信号量调整值上。
      2. 从系统中删除了此信号量。在此情况下,函数出错返回EIDRM。
      3. 进程捕捉到一个信号,并从信号处理程序返回。在此情况下,此信号量的senmcnt值减1,并且函数出错返回EINTR。
  3. 若sem_op为0,这表示调用进程希望等待到该信号量值变成0.如果信号量值当前是0,则此函数立即返回。否则:

    1. 若指定了IPC_NOWAIT,则出错返回EAGAIN。
    2. 若未指定IPC_NOWAIT,则该信号量的semncnt值加1(因为调用进程将进入休眠状态),然后调用进程被挂起直至下列事件之一发生:

      1. 此信号量值变成0。此信号量的semncnt值减1。
      2. 从系统中删除了此信号量。在此情况下,函数出错返回EIDRM。
      3. 进程捕捉到一个信号,并从信号处理程序返回。在此情况下,此信号量的senmcnt值减1,并且函数出错返回EINTR。
        semop函数具有原子性,它或者执行数组中的所有操作,或者什么也不做。

exit时的信号量调整:正如前面提到的,如果进程终止时,它占用了经由信号量分配的资源,那么就会成为一个问题。无论何时,只要为信号量操作指定了SEM_UNDO标志,然后分配资源(sem_op值小于0),那么内核就会记住对于该特定信号量,分配给调用进程多少资源。当该进程终止时,不论自愿或者不自愿,内核都将检验该进程是否还有尚未处理的信号量调整值,如果有,则按调整值对相应量值进行处理。如果用带SETVAL或SETALL命令的semctl设置一信号量的值,则在所有进程中,对于该信号量的调整值都被设置成0.

最后我们看两个例子:sem_demo.c创建信号量,并模拟文件锁操作,sem_rm.c删除信号量:

sem_demo.c:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

#define MAX_RETRIES 10

union semun
{
    int             val;
    struct semid_ds *buf;
    ushort          *array;
};

int initsem(key_t key, int nsems)
{
    int             i;
    union semun     arg;
    struct semid_ds buf;
    struct sembuf   sb;
    int             semid;

    semid = semget(key, nsems, IPC_CREAT | IPC_EXCL | 0666);
    if (semid >= 0)
    {
        sb.sem_op = 1;
        sb.sem_flg = 0;
        arg.val = 1;

        printf ( "press returnn" ); 
        getchar();

        for (sb.sem_num = 0; sb.sem_num < nsems; sb.sem_num++)
        {
            /* do a semop() to "free" the semaphores. */
            /* this sets the sem_otime filed, as needed below. */
            if (semop(semid, &sb, 1) == -1)
            {
                int e = errno;
                semctl(semid, 0, IPC_RMID); /* clean up */
                errno = e;
                return -1; /* error, check errno */
            }
        }
    } 
    else if (errno == EEXIST)
    {
        int     ready = 0;

        semid = semget(key, nsems, 0); /* get the id */
        if (semid < 0)
            return semid;

        /* wait for other process to initialize the semaphore: */
        arg.buf = &buf;
        for (i = 0; i < MAX_RETRIES && !ready; i++)
        {
            semctl(semid, nsems-1, IPC_STAT, arg);
            if (arg.buf->sem_otime != 0)
            {
                ready = 1;
            }
            else
            {
                sleep(1);
            }
        }

        if (!ready)
        {
            errno = ETIME;
            return -1;
        }
    }
    else
    {
        return semid;
    }

    return semid;
}

int main(void)
{
    key_t   key;
    int     semid;
    struct sembuf sb;

    sb.sem_num = 0;
    sb.sem_op = -1;     /* set to allocate resource */
    sb.sem_flg = SEM_UNDO;

    if ((key = ftok("sem_demo.c", 'J')) == -1)
    {
        perror("ftok");
        exit(1);
    }

    /* grab the semaphore set created by seminit.c */
    if ((semid = initsem(key, 1)) == -1)
    {
        perror("initsem");
        exit(1);
    }

    printf ( "Press return to lock:" );
    getchar();
    printf ( "Trying to lock...n" );

    if (semop(semid, &sb, 1) == -1)
    {
        perror("semop");
        exit(1);
    }

    printf ( "Locked.n" );
    printf ( "Press return to unlock:" );
    getchar();

    sb.sem_op = 1;  /* free resource */
    if (semop(semid, &sb, 1) == -1)
    {
        perror("semop");
        exit(1);
    }

    printf ( "Unlockedn" );

    return 0;
}

sem_rm.c:

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>

union semun
{
    int val;
    struct semid_ds *buf;
    ushort  *array;
};

int main(void)
{
    key_t       key;
    int         semid;
    union semun arg;

    if ((key = ftok("sem_demo.c", 'J')) == -1)
    {
        perror("ftok");
        exit(1);
    }

    if ((semid = semget(key, 1, 0)) == -1)
    {
        perror("semget");
        exit(1);
    }

    /* remove it */
    if (semctl(semid, 0, IPC_RMID, arg) == -1)
    {
        perror("semctl");
        exit(1);
    }

    return 0;
}

4. 共享内存

共享内存允许多个进程共享一给定的存储区,因为不需要在进程之间复制,所以这是最快的一种IPC。使用共享内存需要注意的就是多进程之间对于共享内存去的并发访问。内核为每个共享存储段设置了一个shmid_ds结构:

struct shmid_ds {
   struct ipc_perm shm_perm;    /* Ownership and permissions */
   size_t          shm_segsz;   /* Size of segment (bytes) */
   time_t          shm_atime;   /* Last attach time */
   time_t          shm_dtime;   /* Last detach time */
   time_t          shm_ctime;   /* Last change time */
   pid_t           shm_cpid;    /* PID of creator */
   pid_t           shm_lpid;    /* PID of last shmat(2)/shmdt(2) */
   shmatt_t        shm_nattch;  /* No. of current attaches */   /* shmatt_t一般定义为无符号整形 */
   ...
};

和共享内存相关的函数:

#include <sys/ipc.h>
#include <sys/shm.h>

int shmget(key_t key, size_t size, int shmflg);
成功返回共享存储ID,失败返回-1.

int shmctl(int shmid, int cmd, struct shmid_ds *buf);
成功返回0,失败返回-1.

void *shmat(int shmid, const void *shmaddr, int shmflg);
成功返回指向共享内存的指针,失败返回-1.

int shmdt(const void *shmaddr);
成功返回0,失败返回-1.

(1)shmget:创建/引用共享内存

和之前的msgget/semget类似,不过参数size指定要创建的共享内存大小。如果是引用一个现有的共享内存,则size置为0.

(2)shmctl:操作共享内存

使用与semctl类似,但是cmd仅支持IPC_STAT、IPC_SET、IPC_RMID。在有些系统上面还支持 SHM_LOCK(将共享内存段锁定在内存中,只有超级用户才能执行)和SHM_UNLOCK(解锁共享内存段,只有超级用户才可以执行)。

(3)shmat:将共享内存段链接到自己的地址空间中

将共享内存段链接到调用进程的哪个地址上面与addr参数以及在flag中是否指定SHM_RND位有关,但一般都将addr置为0,让内核自己选择地址。

如果在flag中指定了SHM_RDONLY位,则以只读方式连接此段。否则以读写方式连接此段。如果shmat执行成功,那么内核将使该共享内存段shmid_ds结构中的shm_nattch计数器加1.

(4)shmdt:脱接共享内存段

对共享内存操作结束时,调用shmdt可以脱接共享内存段。要注意的是,这并不从系统中删除共享内存。shmdt会使shmid_ds结构中的shm_nattch计数器减1。

下面看一个简单的例子shm_demo.c,该程序没有参数时,打印共享内存里面的内容,如果跟一个参数的话,将这个参数的值写到共享内存里面:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

#define SHM_SIZE    1024

int main(int argc, char *argv[])
{
    key_t   key;
    int     shmid;
    char    *data;

    if (argc > 2)
    {
        fprintf(stderr, "usage: shmdemo [data_to_write]n");
        exit(1);
    }

    if ((key = ftok("shm_demo.c", 'R')) == -1)
    {
        perror("ftok");
        exit(1);
    }

    /* connect to (and possibly create) the segment */
    if ((shmid = shmget(key, SHM_SIZE, 0644 | IPC_CREAT)) == -1)
    {
        perror("shmget");
        exit(1);
    }

    /* attach to the segment to get a pointer to it: */
    data = shmat(shmid, (void*)0, 0);
    if (data == (char*)(-1))
    {
        perror("shmat");
        exit(1);
    }

    /* read or modify the segment, based on the command line: */
    if (argc == 2)
    {
        printf ( "writing to segment:"%s"n", argv[1] );
        strncpy(data, argv[1], SHM_SIZE);
    }
    else
        printf ( "segment contains:"%s"n", data );

    /* detach from the segment */
    if (shmdt(data) == -1)
    {
        perror("shmdt");
        exit(1);
    }

    return 0;
}

这里总结了PIPE、FIFO、消息队列、信号量、共享内存五种IPC,但其实还有很多其他方式也可以用于进程间通信,比如文件锁、STREAMS(全双工的PIPE)、SOCKET等。后续再介绍。

总结/参考自: