Linux论坛's Archiver

《开源》旗舰电子杂志2008年11期发布,火热下载中!

legone2008 发表于 2008-7-24 09:45

socket编程中用select实现并发处理

环境:Linux C
看了几天用select函数处理并发的程序,有些地方还是百思不得其解,非常急,可能是基础薄弱,希望能得到基础的讲解。
    我目前有一种处理并发的方式,是采用accept阻塞,然后fork子进程来process,父进程继续阻塞,流程如下:
一.第一种处理方式:
client端:
socket->connect->连接成功->write/read|recv/send->结束

server端:
sockfd = socket()
bind()
listen()

while(1)
{
   newfd = accept()
   if ( 0 == fork() ) {
       close(sockfd)
       process( )  /* write/read|recv/send   */
       close(newfd)
       exit(0)   /* 子进程处理事件完成,退出 */
   }
   
   close(newfd)  /* 主进程重新去accept,等待新连接 */
}

close(sockfd)

二。第二处理方式
这也是我要请教的重点:对以上的流程我还是稍稍熟悉些,主要是用select处理并发就不知道怎么用了,不明白select的作用体现在了哪里?相对于单一使用accept的优点好在哪里?
  我从网上找到了一段代码,很多问题想请教下:
  #include   <stdlib.h>   
  #include   <stdio.h>   
  #include   <unistd.h>   
  #include   <sys/time.h>   
  #include   <sys/types.h>   
  #include   <string.h>   
  #include   <signal.h>   
  #include   <sys/socket.h>   
  #include   <netinet/in.h>   
  #include   <arpa/inet.h>   
  #include   <errno.h>   
   
  #define   MAXCLIENT   10   
  #define   PORT   4672   
  #define   BUF_SIZE   1024   
  #undef   max   
  #define   max(x,y)   ((x)   >   (y)   ?   (x)   :   (y))   
   
  int   main(int   argc,char   *argv[])   
  {   
         
        int   listen_id,accep_id;                     //监听socket,传输socket   
        int   n;   
        static   int   currentid   =   0;   
        int   fd[MAXCLIENT];   
        int   i,j;   
        char   buffer[BUF_SIZE];     
        struct   sockaddr_in   serveraddr,clientaddr;       //客户端地址   
         
        fd_set   rd;                                                 //在这儿定义一个保存可读的fd_set   
         
        for(i   =   0;i   <   MAXCLIENT;i++)   
        {   
              fd   =   -1;   
        }   
         
        //创建监听socket   
        listen_id   =   socket(AF_INET,SOCK_STREAM,0);   
        if(listen_id   <   0)   
        {   
              fprintf(stderr,"Create   listen   socket   failure.\n");   
              exit(1);   
        }   
   
        //设置监听socket   
        memset(&serveraddr,0,sizeof(serveraddr));   
        serveraddr.sin_family   =   AF_INET;                                       
        serveraddr.sin_addr.s_addr   =   htonl(INADDR_ANY);         
        serveraddr.sin_port   =   htons(PORT);     
   
        //绑定socket   
        if(bind(listen_id,(struct   sockaddr   *)&serveraddr,sizeof(serveraddr))<0)   
        {   
              fprintf(stderr,"bind   listen   socket   failure.\n");   
              exit(1);   
        }   
   
        //监听socket   
        if(listen(listen_id,MAXCLIENT)<0)   
        {   
              fprintf(stderr,"listen   the   socket   failure.\n");   
        }   
         
        //并发处理连接   
        FD_ZERO(&rd);   
   
        int   nfds   =   0;     //将该值放到循环外来定义   
        while(1)   
        {   
              int   ret;   
              nfds   =   max(nfds,listen_id);   
              FD_SET(listen_id,&rd);   
               
              ret   =   select(nfds+1,&rd,   NULL,   NULL,   NULL);    /* ???问题1  ------------*/
               
              //有中断,继续探测select   
              if   (   ret   <   0   )   
              {   
                      if   (   EINTR   ==   errno   )   
                      {   
                              continue;   
                      }   
                      else   
                      {   
                              fprintf(stderr,"begin   select   failure.\n");   
                      }   
              }   
               
              //假如信号来自监听socket   
              if(FD_ISSET(listen_id,&rd))     /* ???问题2 ----------------------*/
              {   
                    memset(&clientaddr,0,sizeof(clientaddr));   
                    fd[currentid]   =   accept(listen_id,(struct   sockaddr   *)&clientaddr,(unsigned   int   *)sizeof(clientaddr));   
                    if(fd[currentid]   <   0)   
                    {   
                            fprintf(stderr,"accept   a   new   connect   failure.\n");   
                            exit(1);   
                    }   
                    else   
                    {   
                            FD_SET(fd[currentid],&rd);   
                            currentid++;   
                              
                            if   (fd[currentid]   >   nfds)   
                            {   
                                    nfds   =   fd[currentid];   
                            }   
                              
                            if   (   --ret   <=   0   )     //当没有可读的描述字时   
                            {   
                                    break;   
                            }   
                    }   
              }   
              else   
              {   
                  for(j   =   0;   j   <   currentid;   j++)   
                  {   
                      //刚才这儿没有加   
                      if   (fd[j]   <   0   )   
                      {   
                              continue;   
                      }   
                        
                      if(FD_ISSET(fd[j],&rd))   
                      {   
                          int   rec;   
                          memset(&buffer,0,sizeof(buffer));   
                          rec   =   recv(fd[j],buffer,BUF_SIZE,0);   
                          if(rec   <   0)   
                          {   
                                  fprintf(stderr,"received   from   client:   %s   failure.\n",inet_ntoa(clientaddr.sin_addr));   
                                  exit(1);   
                          }   
                          else   if   (rec   ==   0)           //如果客户端终止   
                          {     
                                  close(fd[j]);                     //关闭该套接口                       
                                  FD_CLR(fd[j],   &rd);         //将该套接口描述字从rd中清除   
                                  fd[j]   =   -1;                         //将fd数组中相应的描述字置为-1   
                          }   
                          else   
                          {   
                              fprintf(stdout,"success   received   from   client:   %s   ,the   word   is:   %s.\n",inet_ntoa(clientaddr.sin_addr),buffer);   
                              if(send(fd[j],   buffer,   rec,   0)   !=   rec)   
                              {   
                                          fprintf(stderr,"sento   client:   %s   ,failure.   \n",inet_ntoa(clientaddr.sin_addr));   
                                          exit(1);   
                                  }   
                          }   
                           
                          //将fd[j]这个套接口,重新加入rd中   
                          FD_SET(fd[j],   &rd);   
                           
                          if   (   --ret   <=   0   )   //当没有描述字可读时   
                          {   
                                  break;   
                          }   
                      }   
                  }   
              }   
        }         
  }   
  
  问题1: 在" ret   =   select(nfds+1,&rd,   NULL,   NULL,   NULL);  ",在没有错误的情况下,ret应该是什么样的呢?是不是listen队列中请求的个数呢?
问题2:在上面程序中,select是如何收集描述符的呢?while大循环中,第一次循环应该执行:if(FD_ISSET(listen_id,&rd)) ,执行之后通过accept获得一个客户端的连接,并将描述符放入rd中,那第二次循环是不是应该执行else了呢?
  如果是的话,那岂不是每次只能处理一个描述符,因为accept每次只能linsten队列中取一个连接。
  但如果不是话,也就是第二次循环还会执行if(FD_ISSET(listen_id,&rd)),第三次也有可能执行if(FD_ISSET(listen_id,&rd)),那么什么时候终止呢?我觉得程序中if   (   --ret   <=   0   )  为终止条件好像不对吧。因为每一次循环都会执行ret   =   select(nfds+1,&rd,   NULL,   NULL,   NULL); ,ret值就会发生变化啊。
  总之问题归结于两点:select 是如何收集客户端连接描述符的呢,比如说有5个客户请求连接,而select在又是如何并发处理的呢?
  问题3:不明白select的作用体现在了哪里?相对于单一使用accept的优点好在哪里?

请求各位老师帮帮忙,我很急,如果可能话,就从while那里开始讲下程序。。

[[i] 本帖最后由 Flyinmorning 于 2008-7-24 14:03 编辑 [/i]]

Flyinmorning 发表于 2008-7-24 14:01

问题1答案:
select返回的是准备就绪的描述符个数,若超时则为0,若出错则为- 1。(参见《UNIX环境高级编程12.5.1节》)
(以下参见man select)
... ...
RETURN VALUE
       On success, select and pselect return the number of descriptors contained in the descriptor sets, which may be zero if the
       timeout  expires  before anything interesting happens.  On error, -1 is returned, and errno is set appropriately; the sets
       and timeout become undefined, so do not rely on their contents after an error.

问题1答案:
select()允许进程指示内核等待多个事件中的任一个发生,并仅在一个或多个事件发生或经过某指定的时间后才唤醒进程。也就是说通知内核我们对哪些描述字感兴趣,我们关心的描述字不受限于套接口,任何描述字都可以用select()来测试。select收集描述符,怎么实现的我不知道,有兴趣看看内核代码吧。
while大循环中,第一次循环应该执行:if(FD_ISSET(listen_id,&rd)) ,执行之后通过accept获得一个客户端的连接,并将描述符放入rd中,那第二次循环是不是应该执行else了呢?
不是!只有当有新的连接连上监听端口的时候FD_ISSET(listen_id,&rd)才会返回TRUE,否则,就去挨个检查那些已经连上来的客户端连接看有没有消息可以收(注意那个for循环,表明不是每次只能处理一个描述符)。
逻辑是这样的,帮你理顺:
把监听描述字设置到rd;
select()返回大于0的话
{
      if有新的连接连上来
    {
        添加新的连接到rd;
    }
    else
        {
            for  所有已有的连接
            {
                  收消息
                  发消息
            }
        }
}

问题3:
select的作用体现在了不需要你自己频繁的去测试哪些文件描述符可读可写或异常,select一次性把你关心的描述字的状态展现在你面前。
假如没有select函数,你就得自己这样写:
for (int i=0; i<maxfd; i++)
{
    if accept(...){...};
    if recv(...){...};
    if send(...){...};
}
这样的频繁的系统调用显然影响效率。
在应用进程中需要对多个I/O设备进行监听,当某个设备可读或可写时,进程能马上得知,并进行相关处理。这时若采用阻塞方式操作I/O,则进程会阻塞在某个设备的I/O读写操作上而不能适用于这种情况;若采用非阻塞方式,则往往需要定时或循环地探测所有设备,才作相应处理,这种作法相当耗费系统中央处理器的执行周期。可见,上述的两个I/O模型都不能满足这类应用,故此需要引入一种特别的I/O处理机制,即I/O多路转接,就是select()。

DUT_girl 发表于 2008-7-24 17:03

Flymorning 分析的真是很透彻阿!高高手阿!

[[i] 本帖最后由 DUT_girl 于 2008-7-24 17:28 编辑 [/i]]

DUT_girl 发表于 2008-7-24 17:26

我来说一下第二点把:
[color=Red]  总之问题归结于两点:select 是如何收集客户端连接描述符的呢,比如说有5个客户请求连接,而select在又是如何并发处理的呢?[/color]

因为不知道内核实如何实现select的,我就说一下我的想法。
1。首先在单处理器里是没有真正的并行处理的,所以就有了并发的概念。
2。就算是真正的同时有5个客户同时请求连接,那处理器也必须一个一个的来处理。
3。实际上select的真正优势是在于可以等待多个文件描述符(当然包括多个套接口)
4。我认为内核可能是让这多个客户连接请求进入队列,然后再一个一个通知select来处理。

[[i] 本帖最后由 DUT_girl 于 2008-7-24 17:27 编辑 [/i]]

页: [1]

Powered by Discuz! Archiver 6.1.0  © 2001-2007 Comsenz Inc.