源码阅读:C语言epoll模型

@李彪  June 8, 2018

背景

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

模型代码

今天给大家分享一下EPOLL的相关模型代码:

服务端代码

#include <sys/types.h>
#include <sys/signalfd.h>
#include <sys/epoll.h>
#include <errno.h>
#include <poll.h>
#include <signal.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <assert.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "utarray.h"

/*****************************************************************************
 * This program demonstrates epoll-based event notification. It monitors for
 * new client connections, input on existing connections or their closure, as
 * well as signals. The signals are also accepted via a file descriptor using
 * signalfd. This program uses blocking I/O in all cases. The epoll mechanism
 * tell us which exactly which descriptor is ready, when it is ready, so there
 * is no need for speculative/non-blocking reads. This program is more
 * efficient than sigio-server.c.
 *
 * Troy D. Hanson
 ****************************************************************************/

struct {
  in_addr_t addr;    /* local IP or INADDR_ANY   */
  int port;          /* local port to listen on  */
  int fd;            /* listener descriptor      */
  UT_array *fds;     /* array of client descriptors */
  int signal_fd;     /* used to receive signals  */
  int epoll_fd;      /* used for all notification*/
  int verbose;
  int ticks;         /* uptime in seconds        */
  int pid;           /* our own pid              */
  char *prog;
} cfg = {
  .addr = INADDR_ANY, /* by default, listen on all local IP's   */
  .fd = -1,
  .signal_fd = -1,
  .epoll_fd = -1,
};

void usage() {
  fprintf(stderr,"usage: %s [-v] [-a <ip>] -p <port\n", cfg.prog);
  exit(-1);
}

/* do periodic work here */
void periodic() {
  if (cfg.verbose) fprintf(stderr,"up %d seconds\n", cfg.ticks);
}

int add_epoll(int events, int fd) {
  int rc;
  struct epoll_event ev;
  memset(&ev,0,sizeof(ev)); // placate valgrind
  ev.events = events;
  ev.data.fd= fd;
  if (cfg.verbose) fprintf(stderr,"adding fd %d to epoll\n", fd);
  rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_ADD, fd, &ev);
  if (rc == -1) {
    fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
  }
  return rc;
}

int del_epoll(int fd) {
  int rc;
  struct epoll_event ev;
  rc = epoll_ctl(cfg.epoll_fd, EPOLL_CTL_DEL, fd, &ev);
  if (rc == -1) {
    fprintf(stderr,"epoll_ctl: %s\n", strerror(errno));
  }
  return rc;
}

/* signals that we'll accept synchronously via signalfd */
int sigs[] = {SIGIO,SIGHUP,SIGTERM,SIGINT,SIGQUIT,SIGALRM};

int setup_listener() {
  int rc = -1, one=1;

  int fd = socket(AF_INET, SOCK_STREAM, 0);
  if (fd == -1) {
    fprintf(stderr,"socket: %s\n", strerror(errno));
    goto done;
  }

  /**********************************************************
   * internet socket address structure: our address and port
   *********************************************************/
  struct sockaddr_in sin;
  sin.sin_family = AF_INET;
  sin.sin_addr.s_addr = cfg.addr;
  sin.sin_port = htons(cfg.port);

  /**********************************************************
   * bind socket to address and port 
   *********************************************************/
  setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
  if (bind(fd, (struct sockaddr*)&sin, sizeof(sin)) == -1) {
    fprintf(stderr,"bind: %s\n", strerror(errno));
    goto done;
  }

  /**********************************************************
   * put socket into listening state
   *********************************************************/
  if (listen(fd,1) == -1) {
    fprintf(stderr,"listen: %s\n", strerror(errno));
    goto done;
  }

  cfg.fd = fd;
  rc=0;

 done:
  if ((rc < 0) && (fd != -1)) close(fd);
  return rc;
}

/* accept a new client connection to the listening socket */
int accept_client() {
  int fd;
  struct sockaddr_in in;
  socklen_t sz = sizeof(in);

  fd = accept(cfg.fd,(struct sockaddr*)&in, &sz);
  if (fd == -1) {
    fprintf(stderr,"accept: %s\n", strerror(errno)); 
    goto done;
  }

  if (cfg.verbose && (sizeof(in)==sz)) {
    fprintf(stderr,"connection fd %d from %s:%d\n", fd,
    inet_ntoa(in.sin_addr), (int)ntohs(in.sin_port));
  }

  if (add_epoll(EPOLLIN, fd) == -1) { close(fd); fd = -1; }

 done:
  if (fd != -1) utarray_push_back(cfg.fds,&fd);
  return fd;
}

void drain_client(int fd) {
  int rc, pos, *fp;
  char buf[1024];

  rc = read(fd, buf, sizeof(buf));
  switch(rc) { 
    default: fprintf(stderr,"received %d bytes\n", rc);         break;
    case  0: fprintf(stderr,"fd %d closed\n", fd);              break;
    case -1: fprintf(stderr, "recv: %s\n", strerror(errno));    break;
  }

  if (rc != 0) return;

  /* client closed. log it, tell epoll to forget it, close it */
  if (cfg.verbose) fprintf(stderr,"client %d has closed\n", fd);
  del_epoll(fd);
  close(fd);

  /* delete record of fd. linear scan. want hash if lots of fds */
  fp=NULL;
  while ( (fp=(int*)utarray_next(cfg.fds,fp))) { 
    if (*fp != fd) continue;
    pos = utarray_eltidx(cfg.fds,fp);
    utarray_erase(cfg.fds,pos,1);
    break;
  }

}

int main(int argc, char *argv[]) {
  cfg.prog = argv[0];
  cfg.prog=argv[0];
  cfg.pid = getpid();
  int n, opt, *fd;
  struct epoll_event ev;
  struct signalfd_siginfo info;

  utarray_new(cfg.fds,&ut_int_icd);

  while ( (opt=getopt(argc,argv,"vp:a:h")) != -1) {
    switch(opt) {
      case 'v': cfg.verbose++; break;
      case 'p': cfg.port=atoi(optarg); break; 
      case 'a': cfg.addr=inet_addr(optarg); break; 
      case 'h': default: usage(); break;
    }
  }
  if (cfg.addr == INADDR_NONE) usage();
  if (cfg.port==0) usage();

  /* block all signals. we take signals synchronously via signalfd */
  sigset_t all;
  sigfillset(&all);
  sigprocmask(SIG_SETMASK,&all,NULL);

  /* a few signals we'll accept via our signalfd */
  sigset_t sw;
  sigemptyset(&sw);
  for(n=0; n < sizeof(sigs)/sizeof(*sigs); n++) sigaddset(&sw, sigs[n]);

  if (setup_listener()) goto done;

  /* create the signalfd for receiving signals */
  cfg.signal_fd = signalfd(-1, &sw, 0);
  if (cfg.signal_fd == -1) {
    fprintf(stderr,"signalfd: %s\n", strerror(errno));
    goto done;
  }

  /* set up the epoll instance */
  cfg.epoll_fd = epoll_create(1); 
  if (cfg.epoll_fd == -1) {
    fprintf(stderr,"epoll: %s\n", strerror(errno));
    goto done;
  }

  /* add descriptors of interest */
  if (add_epoll(EPOLLIN, cfg.fd))        goto done; // listening socket
  if (add_epoll(EPOLLIN, cfg.signal_fd)) goto done; // signal socket

  /*
   * This is our main loop. epoll for input or signals.
   */
  alarm(1);
  while (epoll_wait(cfg.epoll_fd, &ev, 1, -1) > 0) {

    /* if a signal was sent to us, read its signalfd_siginfo */
    if (ev.data.fd == cfg.signal_fd) { 
      if (read(cfg.signal_fd, &info, sizeof(info)) != sizeof(info)) {
        fprintf(stderr,"failed to read signal fd buffer\n");
        continue;
      }
      switch (info.ssi_signo) {
        case SIGALRM: 
          if ((++cfg.ticks % 10) == 0) periodic(); 
          alarm(1); 
          continue;
        default:  /* exit */
          fprintf(stderr,"got signal %d\n", info.ssi_signo);  
          goto done;
      }
    }

    /* regular POLLIN. handle the particular descriptor that's ready */
    assert(ev.events & EPOLLIN);
    if (cfg.verbose) fprintf(stderr,"handle POLLIN on fd %d\n", ev.data.fd);
    if (ev.data.fd == cfg.fd) accept_client();
    else drain_client(ev.data.fd);

  }

  fprintf(stderr, "epoll_wait: %s\n", strerror(errno));

 done:   /* we get here if we got a signal like Ctrl-C */
  fd=NULL;
  while ( (fd=(int*)utarray_next(cfg.fds,fd))) {del_epoll(*fd); close(*fd);}
  utarray_free(cfg.fds);
  if (cfg.fd != -1) close(cfg.fd);
  if (cfg.epoll_fd != -1) close(cfg.epoll_fd);
  if (cfg.signal_fd != -1) close(cfg.signal_fd);
  return 0;
}

utarray头文件

/*
Copyright (c) 2008-2014, Troy D. Hanson   http://troydhanson.github.com/uthash/
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER
OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

/* a dynamic array implementation using macros 
 */
#ifndef UTARRAY_H
#define UTARRAY_H

#define UTARRAY_VERSION 1.9.9

#ifdef __GNUC__
#define _UNUSED_ __attribute__ ((__unused__)) 
#else
#define _UNUSED_ 
#endif

#include <stddef.h>  /* size_t */
#include <string.h>  /* memset, etc */
#include <stdlib.h>  /* exit */

#define oom() exit(-1)

typedef void (ctor_f)(void *dst, const void *src);
typedef void (dtor_f)(void *elt);
typedef void (init_f)(void *elt);
typedef struct {
    size_t sz;
    init_f *init;
    ctor_f *copy;
    dtor_f *dtor;
} UT_icd;

typedef struct {
    unsigned i,n;/* i: index of next available slot, n: num slots */
    UT_icd icd;  /* initializer, copy and destructor functions */
    char *d;     /* n slots of size icd->sz*/
} UT_array;

#define utarray_init(a,_icd) do {                                             \
  memset(a,0,sizeof(UT_array));                                               \
  (a)->icd=*_icd;                                                             \
} while(0)

#define utarray_done(a) do {                                                  \
  if ((a)->n) {                                                               \
    if ((a)->icd.dtor) {                                                      \
      size_t _ut_i;                                                           \
      for(_ut_i=0; _ut_i < (a)->i; _ut_i++) {                                 \
        (a)->icd.dtor(utarray_eltptr(a,_ut_i));                               \
      }                                                                       \
    }                                                                         \
    free((a)->d);                                                             \
  }                                                                           \
  (a)->n=0;                                                                   \
} while(0)

#define utarray_new(a,_icd) do {                                              \
  a=(UT_array*)malloc(sizeof(UT_array));                                      \
  utarray_init(a,_icd);                                                       \
} while(0)

#define utarray_free(a) do {                                                  \
  utarray_done(a);                                                            \
  free(a);                                                                    \
} while(0)

#define utarray_reserve(a,by) do {                                            \
  if (((a)->i+by) > ((a)->n)) {                                               \
    while(((a)->i+by) > ((a)->n)) { (a)->n = ((a)->n ? (2*(a)->n) : 8); }     \
    if ( ((a)->d=(char*)realloc((a)->d, (a)->n*(a)->icd.sz)) == NULL) oom();  \
  }                                                                           \
} while(0)

#define utarray_push_back(a,p) do {                                           \
  utarray_reserve(a,1);                                                       \
  if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,(a)->i++), p); }      \
  else { memcpy(_utarray_eltptr(a,(a)->i++), p, (a)->icd.sz); };              \
} while(0)

#define utarray_pop_back(a) do {                                              \
  if ((a)->icd.dtor) { (a)->icd.dtor( _utarray_eltptr(a,--((a)->i))); }       \
  else { (a)->i--; }                                                          \
} while(0)

#define utarray_extend_back(a) do {                                           \
  utarray_reserve(a,1);                                                       \
  if ((a)->icd.init) { (a)->icd.init(_utarray_eltptr(a,(a)->i)); }            \
  else { memset(_utarray_eltptr(a,(a)->i),0,(a)->icd.sz); }                   \
  (a)->i++;                                                                   \
} while(0)

#define utarray_len(a) ((a)->i)

#define utarray_eltptr(a,j) (((j) < (a)->i) ? _utarray_eltptr(a,j) : NULL)
#define _utarray_eltptr(a,j) ((char*)((a)->d + ((a)->icd.sz*(j) )))

#define utarray_insert(a,p,j) do {                                            \
  if (j > (a)->i) utarray_resize(a,j);                                        \
  utarray_reserve(a,1);                                                       \
  if ((j) < (a)->i) {                                                         \
    memmove( _utarray_eltptr(a,(j)+1), _utarray_eltptr(a,j),                  \
             ((a)->i - (j))*((a)->icd.sz));                                   \
  }                                                                           \
  if ((a)->icd.copy) { (a)->icd.copy( _utarray_eltptr(a,j), p); }             \
  else { memcpy(_utarray_eltptr(a,j), p, (a)->icd.sz); };                     \
  (a)->i++;                                                                   \
} while(0)

#define utarray_inserta(a,w,j) do {                                           \
  if (utarray_len(w) == 0) break;                                             \
  if (j > (a)->i) utarray_resize(a,j);                                        \
  utarray_reserve(a,utarray_len(w));                                          \
  if ((j) < (a)->i) {                                                         \
    memmove(_utarray_eltptr(a,(j)+utarray_len(w)),                            \
            _utarray_eltptr(a,j),                                             \
            ((a)->i - (j))*((a)->icd.sz));                                    \
  }                                                                           \
  if ((a)->icd.copy) {                                                        \
    size_t _ut_i;                                                             \
    for(_ut_i=0;_ut_i<(w)->i;_ut_i++) {                                       \
      (a)->icd.copy(_utarray_eltptr(a,j+_ut_i), _utarray_eltptr(w,_ut_i));    \
    }                                                                         \
  } else {                                                                    \
    memcpy(_utarray_eltptr(a,j), _utarray_eltptr(w,0),                        \
           utarray_len(w)*((a)->icd.sz));                                     \
  }                                                                           \
  (a)->i += utarray_len(w);                                                   \
} while(0)

#define utarray_resize(dst,num) do {                                          \
  size_t _ut_i;                                                               \
  if (dst->i > (size_t)(num)) {                                               \
    if ((dst)->icd.dtor) {                                                    \
      for(_ut_i=num; _ut_i < dst->i; _ut_i++) {                               \
        (dst)->icd.dtor(utarray_eltptr(dst,_ut_i));                           \
      }                                                                       \
    }                                                                         \
  } else if (dst->i < (size_t)(num)) {                                        \
    utarray_reserve(dst,num-dst->i);                                          \
    if ((dst)->icd.init) {                                                    \
      for(_ut_i=dst->i; _ut_i < num; _ut_i++) {                               \
        (dst)->icd.init(utarray_eltptr(dst,_ut_i));                           \
      }                                                                       \
    } else {                                                                  \
      memset(_utarray_eltptr(dst,dst->i),0,(dst)->icd.sz*(num-dst->i));       \
    }                                                                         \
  }                                                                           \
  dst->i = num;                                                               \
} while(0)

#define utarray_concat(dst,src) do {                                          \
  utarray_inserta((dst),(src),utarray_len(dst));                              \
} while(0)

#define utarray_erase(a,pos,len) do {                                         \
  if ((a)->icd.dtor) {                                                        \
    size_t _ut_i;                                                             \
    for(_ut_i=0; _ut_i < len; _ut_i++) {                                      \
      (a)->icd.dtor(utarray_eltptr((a),pos+_ut_i));                           \
    }                                                                         \
  }                                                                           \
  if ((a)->i > (pos+len)) {                                                   \
    memmove( _utarray_eltptr((a),pos), _utarray_eltptr((a),pos+len),          \
            (((a)->i)-(pos+len))*((a)->icd.sz));                              \
  }                                                                           \
  (a)->i -= (len);                                                            \
} while(0)

#define utarray_renew(a,u) do {                                               \
  if (a) utarray_clear(a); \
  else utarray_new((a),(u));   \
} while(0) 

#define utarray_clear(a) do {                                                 \
  if ((a)->i > 0) {                                                           \
    if ((a)->icd.dtor) {                                                      \
      size_t _ut_i;                                                           \
      for(_ut_i=0; _ut_i < (a)->i; _ut_i++) {                                 \
        (a)->icd.dtor(utarray_eltptr(a,_ut_i));                               \
      }                                                                       \
    }                                                                         \
    (a)->i = 0;                                                               \
  }                                                                           \
} while(0)

#define utarray_sort(a,cmp) do {                                              \
  qsort((a)->d, (a)->i, (a)->icd.sz, cmp);                                    \
} while(0)

#define utarray_find(a,v,cmp) bsearch((v),(a)->d,(a)->i,(a)->icd.sz,cmp)

#define utarray_front(a) (((a)->i) ? (_utarray_eltptr(a,0)) : NULL)
#define utarray_next(a,e) (((e)==NULL) ? utarray_front(a) : ((((a)->i) > (utarray_eltidx(a,e)+1)) ? _utarray_eltptr(a,utarray_eltidx(a,e)+1) : NULL))
#define utarray_prev(a,e) (((e)==NULL) ? utarray_back(a) : ((utarray_eltidx(a,e) > 0) ? _utarray_eltptr(a,utarray_eltidx(a,e)-1) : NULL))
#define utarray_back(a) (((a)->i) ? (_utarray_eltptr(a,(a)->i-1)) : NULL)
#define utarray_eltidx(a,e) (((char*)(e) >= (char*)((a)->d)) ? (((char*)(e) - (char*)((a)->d))/(size_t)(a)->icd.sz) : -1)

/* last we pre-define a few icd for common utarrays of ints and strings */
static void utarray_str_cpy(void *dst, const void *src) {
  char **_src = (char**)src, **_dst = (char**)dst;
  *_dst = (*_src == NULL) ? NULL : strdup(*_src);
}
static void utarray_str_dtor(void *elt) {
  char **eltc = (char**)elt;
  if (*eltc) free(*eltc);
}
static const UT_icd ut_str_icd _UNUSED_ = {sizeof(char*),NULL,utarray_str_cpy,utarray_str_dtor};
static const UT_icd ut_int_icd _UNUSED_ = {sizeof(int),NULL,NULL,NULL};
static const UT_icd ut_ptr_icd _UNUSED_ = {sizeof(void*),NULL,NULL,NULL};


#endif /* UTARRAY_H */

源码来源:https://github.com/troydhanson/network/tree/master/tcp/server


评论已关闭