pid=99230581
本文主要基于源码介绍 select / poll / epoll 技术(实际上并没有 poll )
回顾socket编程 推荐阅读[1] 服务端 socket bind listen accept | read/write 客户端 soccket connect | read/write 客户端 close(一般是客户端先关闭) 服务端 close(read return 0)
listen 时创建半连接、全连接队列,半连接队列对应SYN_RCVD状态的连接,全连接队列对应 ESTABLISHED 状态的队列,accept
从全连接队列消费
半连接队列满的场景:SYN Flood 攻击,[2]中做实验模拟了此场景
全连接队列满的场景:accept
的消费速度跟不上第三次握手收到后新连接加入全连接队列的速度
TODO
为什么一般不多线程读写单个 socket,怎么理解 socket套接字在多线程发送数据时要加锁吗? - 陈硕的回答 - 知乎https://www.zhihu.com/question/56899596/answer/150926723 https://blog.csdn.net/weixin_34357887/article/details/93720482
参考文章(回顾socket编程)
Linux Socket编程(不限Linux)
从一次 Connection Reset 说起,TCP 半连接队列与全连接队列
select select基本使用 select 的使用例子可以参考[5],为避免每次都打开网页,摘录一部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 fd_set rset; FD_ZERO(&rset); FD_SET(0 , &rset); FD_SET(4 , &rset); FD_SET(5 , &rset); if (select(5 +1 , &rset, NULL , NULL , NULL ) > 0 ) { if (FD_ISSET(0 , &rset)) if (FD_ISSET(4 , &rset)) if (FD_ISSET(5 , &rset)) }
源码阅读 select主要逻辑 2.6.39 内核源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 SYSCALL_DEFINE5(select, int , n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp , struct timeval __user *, tvp) struct timespec end_time , *to = NULL ; ret = core_sys_select(n, inp, outp, exp , to); long stack_fds[SELECT_STACK_ALLOC/sizeof (long )]; bits = stack_fds; fd_set_bits fds; fds.in = bits; fds.out = bits + size; fds.ex = bits + 2 *size; fds.res_in = bits + 3 *size; fds.res_out = bits + 4 *size; fds.res_ex = bits + 5 *size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp , fds.ex))) nr = FDS_BYTES(nr); if (ufdset) return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0 ; goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex); ret = do_select(n, &fds, end_time); struct poll_wqueues table ; retval = max_select_fd(n, fds); n = retval; poll_initwait(&table); init_poll_funcptr(&pwq->pt, __pollwait); pt->qproc = qproc; pt->key = ~0UL ; pwq->polling_task = current; wait = &table.pt; retval = 0 ; for (;;) unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp ; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; for (i = 0 ; i < n; ++rinp, ++routp, ++rexp) in = *inp++; out = *outp++; ex = *exp ++; all_bits = in | out | ex; if (all_bits == 0 ) { i += __NFDBITS; continue ; } for (j = 0 ; j < __NFDBITS; ++j, ++i, bit <<= 1 ) int fput_needed; if (i >= n) break ; if (!(bit & all_bits)) continue ; file = fget_light(i, &fput_needed); f_op = file->f_op; wait_key_set(wait, in, out, bit); mask = (*f_op->poll)(file, wait); fput_light(file, fput_needed); if ((mask & POLLIN_SET) && (in & bit)) res_in |= bit; retval++; wait = NULL ; cond_resched() wait = NULL ; if (retval || timed_out || signal_pending(current)) break ; poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack) set_current_state(state); rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS); __set_current_state(TASK_RUNNING); poll_freewait(&table); if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp , fds.res_ex)) ret = -EFAULT; return ret; ret = poll_select_copy_remaining(&end_time, tvp, 1 , ret);
注意:select中没有清理bitmap的操作,返回的bitmap再次传入前要 FD_ZERO 一下
file侧逻辑 以网络编程常见的socket场景为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 struct socket { const struct proto_ops *ops ; }const struct proto_ops inet_dgram_ops = { .bind = inet_bind, .connect = inet_dgram_connect, .socketpair = sock_no_socketpair, .accept = sock_no_accept, .poll = udp_poll, .ioctl = inet_ioctl, .sendmsg = inet_sendmsg, .recvmsg = inet_recvmsg, }static const struct file_operations socket_file_ops = { .poll = sock_poll, } do_select struct poll_wqueues table; poll_initwait(&table); init_poll_funcptr(&pwq->pt, __pollwait); pt->qproc = qproc; wait = &table.pt; for ;; for fd mask = (*f_op->poll)(file, wait); udp_poll poll_wait (filp, wait_address, p) ; p->qproc(filp, wait_address, p); __pollwait if (p && wait_address) struct poll_table_entry *entry = poll_get_entry(pwq); init_waitqueue_func_entry(&entry->wait, pollwake); add_wait_queue(wait_address, &entry->wait); poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) udp_rcv sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); udp_queue_rcv_skb(sk, skb); ip_queue_rcv_skb sock_queue_rcv_skb sk->sk_data_ready(sk, skb_len);
select callback分析 从do_select出发一路追踪callback的执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 do_select struct poll_wqueues table ; poll_table *wait; poll_initwait(&table); wait = &table.pt; mask = (*f_op->poll)(file, wait); sock_poll struct socket *sock ; sock = file->private_data; return sock->ops->poll(file, sock, wait); udp_poll(struct file *file, struct socket *sock, poll_table *wait) return datagram_poll(struct file *file, struct socket *sock, poll_table *wait) struct sock *sk = sock->sk; sock_poll_wait(file, sk_sleep(sk), wait); sk_sleep return &rcu_dereference_raw (sk->sk_wq) ->wait; poll_wait(filp, wait_address, p); if (p && wait_address) p->qproc(filp, wait_address, p); __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p) struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); struct poll_table_page *table = p->table; if (p->inline_index < N_INLINE_POLL_ENTRIES) return p->inline_entries + p->inline_index++; get_file(filp); entry->filp = filp; entry->wait_address = wait_address; entry->key = p->key; init_waitqueue_func_entry(&entry->wait, pollwake); q->func = func; pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key) struct poll_table_entry entry = container_of(wait, struct poll_table_entry, wait); __pollwake(wait, mode, sync, key); struct poll_wqueues *pwq = wait->private; DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task); default_wake_function(&dummy_wait, mode, sync, key); try_to_wake_up(curr->private, mode, wake_flags); entry->wait.private = pwq; add_wait_queue(wait_address, &entry->wait); __add_wait_queue(wq_head, wq_entry); list_add(&new->task_list, &head->task_list); mask = 0 ; if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue)) mask |= POLLERR; if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= POLLRDHUP | POLLIN | POLLRDNORM; if (sk->sk_shutdown == SHUTDOWN_MASK) mask |= POLLHUP; if (!skb_queue_empty(&sk->sk_receive_queue)) mask |= POLLIN | POLLRfdw2DNORM; if (sock_writeable(sk)) mask |= POLLOUT | POLLWRNORM | POLLWRBAND; return mask;
select callback调用时机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 sock_init_data sk->sk_state_change = sock_def_wakeup; wake_up_interruptible_all(&wq->wait); sk->sk_data_ready = sock_def_readable; wq = rcu_dereference(sk->sk_wq); wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND); __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) __wake_up_common(q, mode, nr_exclusive, wake_flags, key); list_for_each_entry_safe(curr, next, &q->task_list, task_list) curr->func(curr, mode, wake_flags, key) sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); sock_wake_async(sk->sk_socket, how, band); ip_local_deliver ip_local_deliver_finish udp_rcv __udp4_lib_rcv sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); udp_queue_rcv_skb(sk, skb); __udp_queue_rcv_skb ip_queue_rcv_skb sock_queue_rcv_skb sk->sk_data_ready(sk, skb_len); typedef struct __wait_queue_head wait_queue_head_t ;struct __wait_queue_head { spinlock_t lock; struct list_head task_list ; };typedef struct __wait_queue wait_queue_t ;struct __wait_queue { unsigned int flags; void *private; wait_queue_func_t func; struct list_head task_list ; };
如何实现一个支持select的设备 参考了[3],其中还有代码实现,实现起来并不止下面这些,但下面的是核心
定义一个等待队列头 wait_queue_head_t
,用于收留等待队列任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 struct socket { struct sock *sk ; }struct sock { struct socket_wq __rcu *sk_wq ; }struct socket_wq { wait_queue_head_t wait; } ____cacheline_aligned_in_smp;typedef struct __wait_queue_head wait_queue_head_t ;struct __wait_queue_head { spinlock_t lock; struct list_head task_list ; };
实现 fd 的接口 file_operations
(中的一部分?),可以确定的是 poll 函数是一定需要实现的,通常命名为 xxx_poll()
xxx_poll() 函数中
需要对 poll_wait
进行调用,将 select 传来的 poll_table* wait
与 第一步中提到的 wait_queue_head_t
传入
该函数的返回值 mask 需要返回当前 fd 可读状态之类的信息,比如 EPOLLIN / EPOLL / EPOLLERR 等,这个返回值在 do_select()
函数中会去判断处理
条件满足的时候(比如有数据可读了),通过 wake_up_interruptible
系列API唤醒任务,传入第一步中提到的 wait_queue_head_t
Select缺陷 经典八股
fd 两次拷贝
FD_SETSIZE=1024,而文档中提到当 fd 值超过 1024 时,FD_SET 系列宏的行为是 undefined 的
https://pubs.opengroup.org/onlinepubs/7908799/xsh/select.html The behaviour of these macros is undefined if the fd argument is less than 0 or greater than or equal to FD_SETSIZE, or if any of the arguments are expressions with side effects.
https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/posix_types.h#L27
1 2 3 4 5 #define __FD_SETSIZE 1024 typedef struct { unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof (long ))]; } __kernel_fd_set;typedef __kernel_fd_set fd_set;
通常直接在栈上申请 fd_set,按照上面的类型定义,会有1024的问题,而下方链接的博主尝试在堆上分配时可以超过1024的https://blog.csdn.net/dog250/article/details/105896693
只知道有事情发生了,仍然需要O(n)遍历,使用 FD_ISSET 去检查是否是这个 FD 有事件
Select是ET还是LT? 根据源码分析中socket实现的poll可知,poll返回的是接受队列是否为空,所以至少对于select常见的socket编程场景下是LT的
总结 其实看上面怎么自己实现一个支持select的设备就够了,select本质就是对监听的每个fd尝试poll,若无消息(暂不可读)则向其注册一个wake函数,其中写了当前进程,若全部file无消息则调用进程睡眠。file内部需要在有变化时主动调用 wake_up_interruptible
之类的函数,并将注册有 wake函数的容器传入,实现解除select的阻塞
参考文章(select)
如果这篇文章说不清epoll的本质,那就过来掐死我吧! (2)
图解Linux网络包接收过程
【原创】Linux select/poll机制原理分析
TODO 没看完 但是强推 Linux网络 - 数据包的接收过程 TODO 也强推[4]中最后引用的几篇英文文章
Linux网络编程——I/O复用之select详解
TODO 如果多个进程阻塞使用相同的fdset去调用select?
poll TODO 可以简单介绍一下
epoll epoll基本使用 可运行的例子见[7],下面摘录网上的另一个例子,出处找不到了…… 此外例[7]处理了关闭连接,也值得借鉴 TODO 上面的例子只有read,听说写socket也可以加入epoll,有空看下 https://github.com/abbshr/C10K-web-server/blob/master/event-driven-model/epoll-server.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 struct epoll_event ev , event [MAX_EVENT ]; listenfd = socket(AF_INET, SOCK_STREAM, 0 ); result = setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof (on) ); result = bind(listenfd, (const struct sockaddr *)&server_addr, sizeof (server_addr)); result = make_socket_non_blocking(listenfd); result = listen(listenfd, 200 ); epfd = epoll_create1(0 ); ev.data.fd = listenfd; ev.events = EPOLLIN | EPOLLET ; result = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);int wait_count;for (;;) wait_count = epoll_wait(epfd, event, MAX_EVENT, -1 ); for (int i = 0 ; i < wait_count; i++) uint32_t events = event[i].events; if ( events & EPOLLERR || events & EPOLLHUP || (! events & EPOLLIN)) else if (listenfd == event[i].data.fd) for (;;) int accp_fd = accept(listenfd, &in_addr, &in_addr_len); __result = getnameinfo(&in_addr, sizeof (in_addr), host_buf, sizeof (host_buf) / sizeof (host_buf[0 ]), port_buf, sizeof (port_buf) / sizeof (port_buf[0 ]), NI_NUMERICHOST | NI_NUMERICSERV); printf ("New connection: host = %s, port = %s\n" , host_buf, port_buf); __result = make_socket_non_blocking(accp_fd); ev.data.fd = accp_fd; ev.events = EPOLLIN | EPOLLET; __result = epoll_ctl(epfd, EPOLL_CTL_ADD, accp_fd, &ev); else int done = 0 ; for (;;) result_len = read(event[i].data.fd, buf, sizeof (buf) / sizeof (buf[0 ])); if (-1 == result_len) if (EAGAIN != errno) perror ("Read data" ); done = 1 ; break ; else if (! result_len) done = 1 ; break ; write(STDOUT_FILENO, buf, result_len); if (done) printf ("Closed connection\n" ); close (event[i].data.fd); close (epfd);return 0 ;
epoll源码阅读
结构定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 struct eventpoll { wait_queue_head_t wq; wait_queue_head_t poll_wait; struct list_head rdllist ; struct rb_root rbr ; struct user_struct *user ; }struct rb_root { struct rb_node *rb_node ; };struct epitem { struct rb_node rbn ; struct list_head rdllink ; struct epoll_filefd ffd ; struct file *file ; int fd; int nwait; struct list_head pwqlist ; struct eventpoll *ep ; struct epoll_event event ; }struct epoll_event { __u32 events; __u64 data; } EPOLL_PACKED;struct epoll_event { uint32_t events; epoll_data_t data; };
初始化 第一次关注 fs_initcall,介绍可以参考 https://cloud.tencent.com/developer/article/1554770
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 fs_initcall(eventpoll_init); struct sysinfo si ; unsigned long totalhigh; unsigned long totalram; unsigned long freeram; unsigned int mem_unit; si_meminfo(&si); max_user_watches = (((si.totalram - si.totalhigh) / 25 ) << PAGE_SHIFT) / EP_ITEM_COST;
epoll_create 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 SYSCALL_DEFINE1(epoll_create, int , size) SYSCALL_DEFINE1(epoll_create1, int , flags) struct eventpoll *ep = NULL ; error = ep_alloc(&ep); user = get_current_user(); ep = kzalloc(sizeof (*ep), GFP_KERNEL); init_waitqueue_head(&ep->wq); init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; ep->ovflist = EP_UNACTIVE_PTR; ep->user = user; error = anon_inode_getfd("[eventpoll]" , &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); return error; static const struct file_operations eventpoll_fops = { .release = ep_eventpoll_release, .poll = ep_eventpoll_poll, .llseek = noop_llseek, };
epoll_ctl 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 SYSCALL_DEFINE4(epoll_ctl, int , epfd, int , op, int , fd, struct epoll_event __user *, event) struct epoll_event epds ; copy_from_user(&epds, event, sizeof (struct epoll_event)) file = fget(epfd); tfile = fget(fd); if (file == tfile || !is_file_epoll(file)) goto error_tgt_fput; ep = file->private_data; mutex_lock(&ep->mtx); epi = ep_find(ep, tfile, fd); struct rb_node *rbp ; ep_set_ffd(&ffd, file, fd); ffd->file = file; ffd->fd = fd; for (rbp = ep->rbr.rb_node; rbp; ) epi = rb_entry(rbp, struct epitem, rbn); kcmp = ep_cmp_ffd(&ffd, &epi->ffd); if (kcmp > 0 ) rbp = rbp->rb_right; else if (kcmp < 0 ) rbp = rbp->rb_left; else epir = epi; break ; switch (op) case EPOLL_CTL_ADD: if (!epi) epds.events |= POLLERR | POLLHUP; ep_insert(ep, &epds, tfile, fd); case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); case EPOLL_CTL_MOD: if (epi) epds.events |= POLLERR | POLLHUP; error = ep_modify(ep, epi, &epds); mutex_unlock(&ep->mtx);
追踪一下 ep_insert
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd) struct epitem *epi ; struct ep_pqueue epq ; poll_table pt; typedef struct poll_table_struct { poll_queue_proc _qproc; __poll_t _key; } poll_table; struct epitem *epi ; user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; epi = kmem_cache_alloc(epi_cache, GFP_KERNEL) INIT_LIST_HEAD(&epi->rdllink); INIT_LIST_HEAD(&epi->fllink); INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd); epi->event = *event; epi->nwait = 0 ; epi->next = EP_UNACTIVE_PTR; epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); pt->qproc = qproc; revents = tfile->f_op->poll(tfile, &epq.pt); udp_poll datagram_poll sock_poll_wait(file, sk_sleep(sk), wait); poll_wait(filp, wait_address, p); if (p && wait_address) p->qproc(filp, wait_address, p); ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) struct epitem *epi = ep_item_from_epqueue(pt); return container_of(p, struct ep_pqueue, pt)->epi; struct eppoll_entry *pwq ; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); q->func = func; pwq->whead = whead; pwq->base = epi; add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) struct epitem *epi = ep_item_from_wait(wait); container_of(p, struct eppoll_entry, wait)->base; struct eventpoll *ep = epi->ep; if (key && !((unsigned long ) key & epi->event.events)) goto out_unlock; if (!ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink, &ep->rdllist); __list_add(new, head->prev, head); if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); __wake_up_locked(wait_queue_head_t *q, unsigned int mode) __wake_up_common(q, mode, 1 , 0 , NULL ); list_for_each_entry_safe(curr, next, &q->task_list, task_list) curr->func(curr, mode, wake_flags, key) default_wake_function try_to_wake_up if (waitqueue_active(&ep->poll_wait)) pwake++; if (pwake) ep_poll_safewake(&ep->poll_wait); return mask list_add_tail(&epi->fllink, &tfile->f_ep_links); ep_rbtree_insert(ep, epi); if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) list_add_tail(&epi->rdllink, &ep->rdllist); if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq); if (waitqueue_active(&ep->poll_wait)) pwake++; atomic_long_inc(&ep->user->epoll_watches); if (pwake) ep_poll_safewake(&ep->poll_wait); ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS, ep_poll_wakeup_proc, NULL , wq, (void *) (long ) this_cpu); ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests) ep_wake_up_nested((wait_queue_head_t *) cookie, POLLIN, 1 + call_nests); __wake_up __wake_up_common(q, mode, nr_exclusive, 0 , key); list_for_each_entry_safe(curr, next, &q->task_list, task_list) curr->func(curr, mode, wake_flags, key) return 0 ;
1 2 3 4 5 6 7 8 9 void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
epoll_wait 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 SYSCALL_DEFINE4(epoll_wait, int , epfd, struct epoll_event __user *, events, int , maxevents, int , timeout) access_ok(VERIFY_WRITE, events, maxevents * sizeof (struct epoll_event)) file = fget(epfd); if (!is_file_epoll(file)) error = ep_poll(ep, events, maxevents, timeout); return errorstatic int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) int res = 0 , eavail, timed_out = 0 ; wait_queue_t wait; if (timeout > 0 ) else if (timeout == 0 ) fetch_events: if (!ep_events_available(ep)) init_waitqueue_entry(&wait, current); q->flags = 0 ; q->private = p; q->func = default_wake_function; __add_wait_queue_exclusive(&ep->wq, &wait); wait->flags |= WQ_FLAG_EXCLUSIVE; __add_wait_queue(q, wait); list_add(&new->task_list, &head->task_list); for (;;) set_current_state(TASK_INTERRUPTIBLE); if (ep_events_available(ep) || timed_out) break ; if (signal_pending(current)) res = -EINTR; break ; schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS) schedule_hrtimeout_range_clock if (!expires) schedule () ; __set_current_state(TASK_RUNNING); return -EINTR; __remove_wait_queue(&ep->wq, &wait); set_current_state(TASK_RUNNING); eavail = ep_events_available(ep); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out) struct ep_send_events_data esed ; esed.maxevents = maxevents; esed.events = events; return ep_scan_ready_list(ep, ep_send_events_proc, &esed); LIST_HEAD(txlist); list_splice_init(&ep->rdllist, &txlist); error = (*sproc)(ep, &txlist, priv) ep_send_events_proc(ep, txlist, esed) struct ep_send_events_data *esed = priv; for (eventcnt = 0 , uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;) epi = list_first_entry(head, struct epitem, rdllink); list_del_init(&epi->rdllink); revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL ) & epi->event.events; if (revents) __put_user(revents, &uevent->events) __put_user(epi->event.data, &uevent->data) eventcnt++; uevent++; if (!(epi->event.events & EPOLLET)) list_add_tail(&epi->rdllink, &ep->rdllist); return eventcnt; return error; goto fetch_events; return res;
总结(epoll) TODO 看一下[2]的源码说明
与 select 类似的是都用到了 Linux 的等待队列机制,有代码复用,所以读起来轻松不少
流程分析 TODO pollwake 的深入分析请见 http://gityuan.com/2018/12/02/linux-wait-queue/
context 为 poll 时:向 socket 的等待队列添加包装后的函数指针 init_waitqueue_func_entry(&entry->wait, pollwake); init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
context为 sk_data_ready -> wake_up_interruptible_sync_poll -> __wake_up_sync_key ->__wake_up_common -> list_for_each_entry_safe -> curr->func 遍历 file 的等待队列,调用上面包装后的函数指针,curr->func(curr …),注意等待队列中的元素curr自身也被传入了,这个curr中是包含了select/epoll_wait调用时记录的当前进程的,因此可以知道要唤醒谁 (epoll 的不完全正确,但可以这么理解)
epoll_ctl分析 从 EPOLL_CTL_ADD 开始分析:
epi 对应一个加入的 fd,会加入红黑树(与 ready list),其中包含一个 qproc,随后调用 file 的 poll 时,file 的 poll 应当会调用poll_wait,并将poll_table传入(通过container_of可以找到包裹poll_table的epi),qproc 调用时,会申请一个 eppoll_entry 对象,它的主要作用是包装callback ep_poll_callback,加入file的等待队列(ctl_add时加入,ctl_del时删除)。file在状态变化时应主动调用callback,这个callback做三件事:
过滤用户不关心的状态变化,避免走到下面导致唤醒
将 epi 添加到 readylist(如果没有已经添加的话)
如果ep的等待队列不为空(有进程阻塞在当前 epfd 的 epoll_wait),遍历它们 (wait_queue_t),在其中能找到阻塞进程的 task_struct,唤醒它们
epoll_wait分析
如果 ep 的 ready list 为空,那么自己将要阻塞,申请一个 wait_queue_t 对象,填入当前进程的task_struct,添加到ep的等待队列,不断调用schedule让出CPU,每次被唤醒(或被调度)时检查ready list是否为空。
ready list不为空的状态下,遍历ready list调用poll ,过滤取得的 mask(与运算),将事件消息(哪个 fd 触发、是什么事件类型)拷贝回用户态传入的 buffer。如果是LT,有用户关心事件的fd会放回ready list,下次epoll_wait时会略过步骤1,重新poll,因此实现了LT。ET则不会放回ready list,需要file主动汇报状态触发install的callback才能重新使ready list非空,因此实现了ET 。
与select的比较
select 每次调用全量拷贝了用户感兴趣的fd到内核状态,而epoll是增量的
select 使用bitmap维护感兴趣的fd,O1插入删除、On遍历(这里不讨论用户态与内核态的拷贝),而epoll平衡了插入删除与遍历,均为红黑树复杂度
select 每次调用与唤醒时需要On poll一遍,epoll 在 epoll_ctl 需要 poll 一次,后续 epoll_wait 阻塞前不 poll,唤醒后 poll 一次,注意 epoll 只 poll 有事件的 fd
配一张图便于理解
ET与LT的区别 LT不会清理 wait_list,下次调用 epoll_wait 时会重新对 file 进行 poll,若无关心的事件才会移出队列,而 wait_list 不会保留在队列中,意味着这是监听的 fd 下次主动汇报状态之前(以 socket 为例,等价于有新数据)仅有的一次通知机会。或者先CTL DEL后ADD也可以重新触发 poll
(我的理解)一个必须使用ET的场景:如果消息处理与 epoll_wait 的调用不在同一个线程,那么为了 epoll_wait 返回后再次调用时不立即再次返回(如果其他线程还未来得及从 buffer 中拿走数据),需要使用 ET
为什么有人说ET更快 我的理解,LT每次调用 epoll_wait 如果使用阻塞 fd 只能读一次来避免阻塞,但如果使用非阻塞fd倒是和ET没什么区别
[5] 在 eventloop 类型(包括各类 fiber / coroutine)的程序中, 处理逻辑和 epoll_wait 都在一个线程, ET相比LT没有太大的差别. 反而由于LT醒的更频繁, 可能时效性更好些. 在老式的多线程RPC实现中, 消息的读取分割和 epoll_wait 在同一个线程中运行, 类似上面的原因, ET和LT的区别不大.但在更高并发的 RPC 实现中, 为了对大消息的反序列化也可以并行, 消息的读取和分割可能运行和epoll_wait不同的线程中, 这时ET是必须的, 否则在读完数据前, epoll_wait会不停地无谓醒来.
不当的使用可能造成饥饿 Question:我看网上有人说ET需要配合robin list,这个和ET/LT没什么关系吧
[1] 简单来说就是对 epoll_wait 返回的 fd 不要一直读,如果这个 fd 有大量数据,就会导致其他 fd 的饥饿,正确做法是搞一个类似 round_robin 的机制去循环读所有fd
惊群问题 如果多线程阻塞在同一个fd上是会有惊群问题的,一种想法是,每个线程对应单独的一个epfd,其中一个线程专门负责accept,accept后的连接epoll_ctl到其他线程的epfd,可以避免此问题
[4]中提到的另一种做法是每个线程对应一个epfd,都监听listenfd,根据上文的源码分析,listenfd事件到来后是会发生惊群的。对于大量连接到来的情况,为避免全连接队列单线程通过accept消费的速度跟不上,使用多线程监听listenfd并消费是有理由的,但还是,要考虑惊群问题,Linux 4.5开始epoll_ctl支持 EPOLLEXCLUSIVE
选项解决此问题,摘录一部分(但是为什么是at least one……)
[9] If the same file descriptor is in multiple epoll instances, some with the EPOLLEXCLUSIVE flag, and others without, then events will be provided toall epoll instances that did not specify EPOLLEXCLUSIVE, and at least one of the epoll instances that did specify EPOLLEXCLUSIVE.
TODO 看到文档上说 EPOLLEXCLUSIVE 可以和 ET 选项一起用,epoll默认是LT,意思是LT也是可以一起用的吧? TODO EPOLLEXCLUSIVE详细分析 https://wenfh2020.com/2021/11/20/question-epollexclusive/ TODO 另一种方案是用 SO_REUSEPORT,这里不深入了
Question: accept是线程安全的吗? ref: Is accept() thread-safe? https://stackoverflow.com/a/5124450/8454039 省流:是的
为什么使用红黑树?
需要判断fd是否已经正在监听,需要较高的查询效率,select以空间换时间,
若fd不存在则需要插入,也需要考虑插入复杂度
红黑树兼顾了插入、查询效率与树的旋转次数 TODO 我记得旋转次数是它和AVL的区别,但是知乎上也有人放数据显示差不多…… AVL由于最平衡查询快,红黑树旋转少则插入删除快
如果读完一个ET返回的socket,但在调用epoll_wait前,来了新的数据,且调用epoll_wait后不再来新数据(假设必须要我们根据这条数据回复后对方才会发新数据),是否会有liveness问题? 不会的,没有人在 epoll_wait 只是影响 socket 回调不会调用 wakeup 通知阻塞方,而 ctl poll 时添加的 epi 还是在的,并不影响将epi添加到 ready list 的过程。因此即使中间有较短的时间不阻塞在 epoll_wait 但是来了数据,调用 epoll_wait 的时候由于 ready list 非空,还是会走到 poll,拷贝回用户空间,正常返回的,并不会hang住
为什么在边缘触发的epoll函数中要使用非阻塞fd? 对于 connfd LT的话是每次 epoll_wait 返回后只读一次,无论使用阻塞非阻塞都是一样的,因为只读一次,一定不阻塞 ET的话是每次 epoll_wait 返回后为了读完数据,需要循环读,阻塞fd会导致卡住,就算单独开了个线程去读,卡住也是不好的状态;而非阻塞connfd根据EAGAIN返回码即可判断是否读完
对于listenfd Nginx 用的是LT(其他read write用的是ET),socketfd 也是可选 block / nonblock 的[8],所以感觉ET其实也行?
LT与ET的epoll例子https://gist.github.com/liumuqi/04dc92629e322a4613a4610afe786818
TODO 关于非阻塞的想法,推荐扩展阅读高性能网络通信库中为何要将侦听 socket 设置成非阻塞的? one thread one loop 思想
拓展阅读 其实还没来得及读[知乎回答] Nginx为啥使用ET模式Epoll?
参考文章(epoll)
epoll边缘触发模式
[内核源码] epoll 实现原理
[内核源码] epoll lt / et 模式区别
[知乎回答] socket的任意event都会导致epoll_wait的惊群效应吗?
epoll的边沿触发模式(ET)真的比水平触发模式(LT)快吗?(当然LT模式也使用非阻塞IO,重点是要求ET模式下的代码不能造成饥饿)
如果这篇文章说不清epoll的本质,那就过来掐死我吧! (2)
epoll的使用实例
Is it possible (and safe) to make an accepting socket non-blocking?
epoll_ctl(2) — Linux manual page
Reactor 与 Proactor Reactor 篇幅较短,就不新写一篇了
单Reactor单线程,同一个线程既负责阻塞在epoll_wait上,也负责事件到来后的处理
1 2 3 4 5 6 7 8 9 10 while True : epoll_wait() for fd in fds: if fd == listenfd: acceptfd = accept(listenfd) epoll_ctl(epfd, ADD, acceptfd) else : data = read(fd) resp = handlerDict[fd].handleFunc(data) write(fd, resp)
单Reactor多线程:Reactor线程只处理连接事件和读写事件,业务处理交给线程池处理[2]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 while true: epoll_wait() for fd in fds: if fd == listenfd: acceptfd = accept(listenfd) epoll_ctl(epfd, ADD, acceptfd) else : data = read(fd) threadPool.submit(Task(fd, data))class Task : @Override def run (Runnable r ): result = handlerDict[self.fd].handleFunc(self.data)
主从Reactor多线程:认为accept连接比处理数据ready(准备好被读)要重要,因此单独一个线程只处理listenfd,accept后加入其他线程的epfd,其余逻辑同 单Reactor多线程
Proactor TODO 听说是基于AIO的
参考文章(Reactor与Proactor)
Reactor模式
Reactor线程模型 - 每天晒白牙的文章 - 知乎