select poll epoll 源码阅读

pid=99230581

本文主要基于源码介绍 select / poll / epoll 技术(实际上并没有 poll )

回顾socket编程

推荐阅读[1]
服务端 socket bind listen accept | read/write
客户端 soccket connect | read/write
客户端 close(一般是客户端先关闭)
服务端 close(read return 0)

listen 时创建半连接、全连接队列,半连接队列对应SYN_RCVD状态的连接,全连接队列对应 ESTABLISHED 状态的队列,accept 从全连接队列消费

  • 半连接队列满的场景:SYN Flood 攻击,[2]中做实验模拟了此场景
  • 全连接队列满的场景:accept 的消费速度跟不上第三次握手收到后新连接加入全连接队列的速度

ref: https://www.jianshu.com/p/3c7a0771b67e

ref https://www.jianshu.com/p/3c7a0771b67e 实线代表服务端行为导致状态变化,虚线代表客户端行为导致状态变化

TODO

  1. 为什么一般不多线程读写单个 socket,怎么理解
    socket套接字在多线程发送数据时要加锁吗? - 陈硕的回答 - 知乎
    https://www.zhihu.com/question/56899596/answer/150926723
    https://blog.csdn.net/weixin_34357887/article/details/93720482

参考文章(回顾socket编程)

  1. Linux Socket编程(不限Linux)
  2. 从一次 Connection Reset 说起,TCP 半连接队列与全连接队列

select

select基本使用

select 的使用例子可以参考[5],为避免每次都打开网页,摘录一部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fd_set rset; //创建一个描述符集rset  
FD_ZERO(&rset); //对描述符集rset清零
FD_SET(0, &rset); //将描述符0加入到描述符集rset中
FD_SET(4, &rset); //将描述符4加入到描述符集rset中
FD_SET(5, &rset); //将描述符5加入到描述符集rset中

// 这里没用上select的返回值
if(select(5+1, &rset, NULL, NULL, NULL) > 0)
{
// 例子有点挫,实际上这里应该是一个循环
if(FD_ISSET(0, &rset))
//描述符0可读及相应的处理代码

if(FD_ISSET(4, &rset))
//描述符4可读及相应的处理代码

if(FD_ISSET(5, &rset))
//描述符5可读及相应的处理代码
}

源码阅读

select主要逻辑

2.6.39 内核源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
fd_set __user *, exp, struct timeval __user *, tvp)
// 不看 tvp > 0 有timeout的
struct timespec end_time, *to = NULL;
ret = core_sys_select(n, inp, outp, exp, to);
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; // SELECT_STACK_ALLOC = 256
bits = stack_fds;
fd_set_bits fds;
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;

if ((ret = get_fd_set(n, inp, fds.in)) ||
(ret = get_fd_set(n, outp, fds.out)) ||
(ret = get_fd_set(n, exp, fds.ex)))
// get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
nr = FDS_BYTES(nr);
if (ufdset)
// to from n
return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0; // 这就是为什么说select每次都涉及拷贝fdset
goto out;
zero_fd_set(n, fds.res_in);
zero_fd_set(n, fds.res_out);
zero_fd_set(n, fds.res_ex);

// do select也可以看下方分析
ret = do_select(n, &fds, end_time); // n fds NULL
struct poll_wqueues table;
retval = max_select_fd(n, fds);
n = retval;
poll_initwait(&table); // table就是下方的pwq,类型是 poll_wqueues
init_poll_funcptr(&pwq->pt, __pollwait);
pt->qproc = qproc; // event_poll 会调用这里设置的 qproc,即 __pollwait
pt->key = ~0UL; /* all events enabled */
pwq->polling_task = current; // NOTE: 在wait中加入了current,我的理解是这里关联了"要唤醒谁"的信息
// current 存的是当前进程 ref: https://stackoverflow.com/questions/12434651/what-is-the-current-in-linux-kernel-source
wait = &table.pt; // NOTE: 注意这个wait
retval = 0;
for (;;)
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
for (i = 0; i < n; ++rinp, ++routp, ++rexp)
in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += __NFDBITS; // (8 * sizeof(unsigned long))
continue;
}

for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1)
int fput_needed;
if (i >= n)
break;
if (!(bit & all_bits))
continue;
file = fget_light(i, &fput_needed); // 从 fd 获得 file struct
f_op = file->f_op;
wait_key_set(wait, in, out, bit);
mask = (*f_op->poll)(file, wait); // NOTE: 重点!后续分析见下面的代码块
fput_light(file, fput_needed); // 似乎是关闭文件
if ((mask & POLLIN_SET) && (in & bit))
// NOTE: 重要!在这里向用户反馈是哪个fd有事件(通过下标),是什么事件(通过res_in还是res_out、res_ex)
res_in |= bit;
retval++;
// NOTE:wait=NULL 会使得后续 poll 中的 poll_wait 被跳过,因为已经确定要返回了,但已经传进去的还是生效的
wait = NULL;
// NOTE: 同理 POLLOUT_SET POLLEX_SET 对应 res_out res_ex
cond_resched() // 抢占点,迷惑
wait = NULL;
if (retval || timed_out || signal_pending(current)) // 三种返回条件 有事件、超时、信号(?)
break;
poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack) // (struct poll_wqueues *pwq, int state, ktime_t *expires, unsigned long slack)
set_current_state(state);
// sleep 直到timeout(或interrupt)
// 在epoll一节也有分析,内部(如果没有timeout的话)会调用 schedule 放弃 CPU,这也是为什么要套外层的 for(;;)
// NOTE: 阻塞点
rc = schedule_hrtimeout_range(expires, slack, HRTIMER_MODE_ABS);
__set_current_state(TASK_RUNNING);
// NOTE:返回时,清理注册到socket wait_address的entry
poll_freewait(&table);
// 把内核内存中的fd_set(bitmap)拷贝回用户空间
if (set_fd_set(n, inp, fds.res_in) ||
set_fd_set(n, outp, fds.res_out) ||
set_fd_set(n, exp, fds.res_ex))
ret = -EFAULT;

return ret;
ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

ref [3]

注意:select中没有清理bitmap的操作,返回的bitmap再次传入前要 FD_ZERO 一下

file侧逻辑

以网络编程常见的socket场景为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
struct socket {
// ...
const struct proto_ops *ops; // protocol specific socket operations
// [2] socket数据结构中的const struct proto_ops对应的是协议的方法集合。每个协议都会实现不同的方法集,对于IPv4 Internet协议族来说,每种协议都有对应的处理方法,如下。对于udp来说,是通过inet_dgram_ops来定义的,其中注册了inet_recvmsg方法。
}

// \net\ipv4\af_inet.c
const struct proto_ops inet_dgram_ops = {
// ...
.bind = inet_bind,
.connect = inet_dgram_connect,
.socketpair = sock_no_socketpair,
.accept = sock_no_accept,
.poll = udp_poll,
.ioctl = inet_ioctl,
.sendmsg = inet_sendmsg,
.recvmsg = inet_recvmsg,
}

// TODO:作为扩展可以看下eventfd_poll

static const struct file_operations socket_file_ops = {
// ...
.poll = sock_poll,
}

// 简化版本:select侧
do_select
struct poll_wqueues table;
poll_initwait(&table);
init_poll_funcptr(&pwq->pt, __pollwait);
pt->qproc = qproc; // 在下面的wait中注册__pollwait函数
wait = &table.pt;
for ;;
for fd
mask = (*f_op->poll)(file, wait); // 调用 fd 实现的 file_operations 接口的 poll 方法
udp_poll
poll_wait(filp, wait_address, p); // do_select 中初始化的 poll_table* wait 一路传到这里(参数 p),wait 中
p->qproc(filp, wait_address, p); // 调用到了上面几行注册的 __pollwait
__pollwait
if (p && wait_address)
struct poll_table_entry *entry = poll_get_entry(pwq); // 获得一个空的entry
init_waitqueue_func_entry(&entry->wait, pollwake); // 向entry.wait注册pollwake函数
add_wait_queue(wait_address, &entry->wait); // entry的wait实际上是个list
// 如果任何fd mask有事件,break
// 否则睡觉
poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack))
// ...
// 根据mask,将inp/oup/exp的bitmap的fd对应位置置位,稍后拷贝回用户空间

// 简化版本:唤醒侧
udp_rcv
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); // 根据报文找对应socket
udp_queue_rcv_skb(sk, skb);
ip_queue_rcv_skb // Queue an skb into sock receive queue
sock_queue_rcv_skb
sk->sk_data_ready(sk, skb_len); // NOTE:走到上面的 sock_def_readable
// socket初始化时将sk_data_ready赋值为sock_def_readable函数,下面跟进此函数

select callback分析

从do_select出发一路追踪callback的执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// 完整版本
do_select
struct poll_wqueues table;
poll_table *wait;
poll_initwait(&table);
// table->pt = qproc
// pwq->polling_task = current;
wait = &table.pt;
// NOTE: 这里省略了,还套了一层for,遍历0到最大fd
mask = (*f_op->poll)(file, wait); // file struct 下有 f_op 就是 file_operations 接口的实现
// 等价于调用 sock_poll
sock_poll
struct socket *sock;
sock = file->private_data; // 看起来c语言用void*管理子类特有数据?
// ops 应该是对应 socket_file_ops,返回mask
return sock->ops->poll(file, sock, wait);
udp_poll(struct file *file, struct socket *sock, poll_table *wait)
// 返回 mask,有简化
// NOTE: 重要:socket_poll_wait的逻辑是
// NOTE: 1. 如果有注册 poll_table* wait 那么注册 pollwake 在收到报文后会有回调去调用 pollwake 唤醒 select 重新调用datagram_poll
// NOTE: 2. 检查skb队列是否为空,根据结果向select函数返回mask
return datagram_poll(struct file *file, struct socket *sock, poll_table *wait) // net/core/datagram.c
struct sock *sk = sock->sk;
sock_poll_wait(file, sk_sleep(sk), wait); // include/net/sock.h
// 看一眼sk_sleep
sk_sleep
return &rcu_dereference_raw(sk->sk_wq)->wait; // // 类型 wait_queue_head_t

// NOTE: 注意 sk->sk_wq->wait 是socket的等待队列,也是下方的wait_address
// NOTE: 下方的flip就是上方的file,就是socket文件
// NOTE: p就是上面的 wait,从select构造后一路传下来
poll_wait(filp, wait_address, p);
// NOTE:如果 wait 有效(说明select目前还没有发现事件,可能是会睡眠的),向我们维护的、但是select了解的容器
if (p && wait_address)
p->qproc(filp, wait_address, p);
// 假设是 select 的场景
__pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p)
struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); // 找到外层包围的 poll_wqueue,p在其中的字段名为pt
// NOTE:从数组中获得下一个位置,或者说申请一个 poll_table_entry
struct poll_table_entry *entry = poll_get_entry(pwq);
struct poll_table_page *table = p->table; //
if (p->inline_index < N_INLINE_POLL_ENTRIES)
return p->inline_entries + p->inline_index++;
get_file(filp); // get_file的作用是原子的增加f_count,也就是该文件的引用计数(在close的时候会减这个值)。 ref: https://www.cnblogs.com/lit10050528/p/6206235.html
entry->filp = filp;
// NOTE:将来从wait_address中将注册的entry清理时需要引用wait_address
entry->wait_address = wait_address;
entry->key = p->key;
init_waitqueue_func_entry(&entry->wait, pollwake); // entry->wait 是 wait_queue_t 类型
q->func = func; // q 是第一个参数
// NOTE: 下面尝试跟进pollwake,但是太深了不看了
// NOTE: 上游是 sk_data_ready -> wake_up_interruptible_sync_poll
pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)
struct poll_table_entry entry = container_of(wait, struct poll_table_entry, wait);
__pollwake(wait, mode, sync, key);
// NOTE: 注意,这里的entry拿到的内容在下面 entry->wait.private = pwq
// NOTE: pwq就是调用select是创建的table,其polling_task 存了 current,也就是select调用方进程
struct poll_wqueues *pwq = wait->private; // wait是wait_queue_t类型
DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);
// NOTE:展开后
// wait_queue_t dummy_wait = __WAITQUEUE_INITIALIZER(dummy_wait, pwq->polling_task)
// #define __WAITQUEUE_INITIALIZER(name, tsk) { \
// .private = tsk, \
// .func = default_wake_function, \
// .task_list = { NULL, NULL } }
default_wake_function(&dummy_wait, mode, sync, key); // kernel/sched.c
// try_to_wake_up(struct task_struct *p, unsigned int state, int wake_flags)
// NOTE: 这里的curr->private拿到了之前在pwq中存放的current
// 第一个参数是 task_struct
try_to_wake_up(curr->private, mode, wake_flags); // NOTE: 太深了不看了
// NOTE: 重要
entry->wait.private = pwq;
// TODO:其实还存在唤醒几个的问题,entry->wait可能有多个wait_address

// 把entry->wait添加到wait_address中,即将pollwake函数包装为entry,添加到socket维护的等待队列中
// NOTE:重点!将后者加入前者 前者类型是 wait_queue_head_t 后者是 wait_queue_t
add_wait_queue(wait_address, &entry->wait);
// void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
// 把entry添加到head的头部
__add_wait_queue(wq_head, wq_entry); // __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
list_add(&new->task_list, &head->task_list); // list_add(struct list_head *new, struct list_head *head)
// typedef struct __wait_queue_head wait_queue_head_t

mask = 0;

// POLLIN There is data to read.
// POLLRDNORM Equivalent to POLLIN.

// NOTE: 根据 buffer 情况,通过约定好的宏(POLLIN、POLLOUT等)返回 fd 状态相关信息(可读、可写等)
/* exceptional events? */
if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
mask |= POLLERR;
if (sk->sk_shutdown & RCV_SHUTDOWN)
mask |= POLLRDHUP | POLLIN | POLLRDNORM;
if (sk->sk_shutdown == SHUTDOWN_MASK)
mask |= POLLHUP;
/* readable? */
if (!skb_queue_empty(&sk->sk_receive_queue))
mask |= POLLIN | POLLRfdw2DNORM;
// ...
if (sock_writeable(sk))
mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
return mask;

ref [3]

select callback调用时机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 看下socket何时唤醒它
sock_init_data // socket初始化
sk->sk_state_change = sock_def_wakeup;
wake_up_interruptible_all(&wq->wait);
sk->sk_data_ready = sock_def_readable;
wq = rcu_dereference(sk->sk_wq);
wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND); // NOTE: 重点
// #define wake_up_interruptible_sync_poll(x, m) \
// __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))
__wake_up_sync_key(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
// __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key)
__wake_up_common(q, mode, nr_exclusive, wake_flags, key); // 详细分析可以看 http://abcdxyzk.github.io/blog/2015/06/12/kernel-net-socket-io/
// NOTE: 遍历socket wait_queue 调用通过 等待队列机制 向 wait_queue 中注册的 callback
list_for_each_entry_safe(curr, next, &q->task_list, task_list) // q 就是 wait_queue_head_t*
// curr类型 wait_queue_t
curr->func(curr, mode, wake_flags, key) // 这个func 就是 pollwake
sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
sock_wake_async(sk->sk_socket, how, band);

ip_local_deliver
ip_local_deliver_finish // IP层结束
udp_rcv
__udp4_lib_rcv
sk = __udp4_lib_lookup_skb(skb, uh->source, uh->dest, udptable); // 根据报文找对应socket
udp_queue_rcv_skb(sk, skb);
__udp_queue_rcv_skb
ip_queue_rcv_skb // Queue an skb into sock receive queue
sock_queue_rcv_skb
sk->sk_data_ready(sk, skb_len); // NOTE:走到上面的 sock_def_readable
// 寻找 sock_queue_rcv_skb 被调过程中参考了
// 1. http://kerneltravel.net/blog/2020/network_ljr10/
// 2. [4]


typedef struct __wait_queue_head wait_queue_head_t;
struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};

typedef struct __wait_queue wait_queue_t;
struct __wait_queue {
unsigned int flags;
void *private;
wait_queue_func_t func;
struct list_head task_list;
};

ref [3]

如何实现一个支持select的设备

参考了[3],其中还有代码实现,实现起来并不止下面这些,但下面的是核心

  1. 定义一个等待队列头 wait_queue_head_t ,用于收留等待队列任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 比如socket中的实现
struct socket {
// ...
struct sock *sk;
}

struct sock {
// ...
struct socket_wq __rcu *sk_wq;
}

struct socket_wq {
wait_queue_head_t wait;
// ...
} ____cacheline_aligned_in_smp;

typedef struct __wait_queue_head wait_queue_head_t;
struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};
  1. 实现 fd 的接口 file_operations (中的一部分?),可以确定的是 poll 函数是一定需要实现的,通常命名为 xxx_poll()
  2. xxx_poll() 函数中
    1. 需要对 poll_wait 进行调用,将 select 传来的 poll_table* wait 与 第一步中提到的 wait_queue_head_t 传入
    2. 该函数的返回值 mask 需要返回当前 fd 可读状态之类的信息,比如 EPOLLIN / EPOLL / EPOLLERR 等,这个返回值在 do_select() 函数中会去判断处理
  3. 条件满足的时候(比如有数据可读了),通过 wake_up_interruptible 系列API唤醒任务,传入第一步中提到的 wait_queue_head_t

Select缺陷

经典八股

  1. fd 两次拷贝
  2. FD_SETSIZE=1024,而文档中提到当 fd 值超过 1024 时,FD_SET 系列宏的行为是 undefined 的

https://pubs.opengroup.org/onlinepubs/7908799/xsh/select.html
The behaviour of these macros is undefined if the fd argument is less than 0 or greater than or equal to FD_SETSIZE, or if any of the arguments are expressions with side effects.

https://elixir.bootlin.com/linux/latest/source/include/uapi/linux/posix_types.h#L27

1
2
3
4
5
#define __FD_SETSIZE	1024
typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} __kernel_fd_set;
typedef __kernel_fd_set fd_set;

通常直接在栈上申请 fd_set,按照上面的类型定义,会有1024的问题,而下方链接的博主尝试在堆上分配时可以超过1024的
https://blog.csdn.net/dog250/article/details/105896693

  1. 只知道有事情发生了,仍然需要O(n)遍历,使用 FD_ISSET 去检查是否是这个 FD 有事件

Select是ET还是LT?

根据源码分析中socket实现的poll可知,poll返回的是接受队列是否为空,所以至少对于select常见的socket编程场景下是LT的

总结

其实看上面怎么自己实现一个支持select的设备就够了,select本质就是对监听的每个fd尝试poll,若无消息(暂不可读)则向其注册一个wake函数,其中写了当前进程,若全部file无消息则调用进程睡眠。file内部需要在有变化时主动调用 wake_up_interruptible 之类的函数,并将注册有 wake函数的容器传入,实现解除select的阻塞

参考文章(select)

  1. 如果这篇文章说不清epoll的本质,那就过来掐死我吧! (2)
  2. 图解Linux网络包接收过程
  3. 【原创】Linux select/poll机制原理分析
  4. TODO 没看完 但是强推 Linux网络 - 数据包的接收过程
    TODO 也强推[4]中最后引用的几篇英文文章
  5. Linux网络编程——I/O复用之select详解

TODO 如果多个进程阻塞使用相同的fdset去调用select?

poll

TODO 可以简单介绍一下

epoll

epoll基本使用

可运行的例子见[7],下面摘录网上的另一个例子,出处找不到了……
此外例[7]处理了关闭连接,也值得借鉴
TODO 上面的例子只有read,听说写socket也可以加入epoll,有空看下 https://github.com/abbshr/C10K-web-server/blob/master/event-driven-model/epoll-server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
struct epoll_event ev, event[MAX_EVENT];
listenfd = socket(AF_INET, SOCK_STREAM, 0);
// 打开 socket 端口复用, 防止测试的时候出现 Address already in use
result = setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
result = bind(listenfd, (const struct sockaddr *)&server_addr, sizeof (server_addr));
result = make_socket_non_blocking(listenfd);
result = listen(listenfd, 200);
epfd = epoll_create1(0);
// NOTE:ev.data中的数据只是为了收到通知后你能得知这个event的一些信息(比如这里我们就能知道是哪个fd上有事件)
ev.data.fd = listenfd;
ev.events = EPOLLIN | EPOLLET /* 边缘触发选项。 */;
// 设置epoll的事件
result = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
int wait_count;
for (;;)
wait_count = epoll_wait(epfd, event, MAX_EVENT, -1);
for (int i = 0 ; i < wait_count; i++)
uint32_t events = event[i].events;
// 判断epoll是否发生错误
if ( events & EPOLLERR || events & EPOLLHUP || (! events & EPOLLIN))
// ...
else if (listenfd == event[i].data.fd)
// listenfd的事件,有新连接
for (;;) // 由于采用了边缘触发模式,这里需要使用循环
int accp_fd = accept(listenfd, &in_addr, &in_addr_len);
// 获得socket的一些信息,打印
__result = getnameinfo(&in_addr, sizeof (in_addr),
host_buf, sizeof (host_buf) / sizeof (host_buf[0]),
port_buf, sizeof (port_buf) / sizeof (port_buf[0]),
NI_NUMERICHOST | NI_NUMERICSERV);
printf("New connection: host = %s, port = %s\n", host_buf, port_buf);
__result = make_socket_non_blocking(accp_fd);
// 由于会拷贝到内核空间,所以可以复用同一个
ev.data.fd = accp_fd;
ev.events = EPOLLIN | EPOLLET;
__result = epoll_ctl(epfd, EPOLL_CTL_ADD, accp_fd, &ev);
else
// 文件可读
// 因为采用边缘触发,所以这里需要使用循环。如果不使用循环,程序并不能完全读取到缓存区里面的数据。
int done = 0;
for (;;)
result_len = read(event[i].data.fd, buf, sizeof (buf) / sizeof (buf[0]));
if (-1 == result_len)
// ref: stackoverflow
// EAGAIN is often raised when performing non-blocking I/O. It means "there is no data available right now, try again later".
if (EAGAIN != errno)
// 出错,break+关闭连接
perror ("Read data");
done = 1;
break;
else if (! result_len)
done = 1;
break;
write(STDOUT_FILENO, buf, result_len);
if (done)
printf("Closed connection\n");
close (event[i].data.fd);
close (epfd);
return 0;

epoll源码阅读

ref [3]

结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
}

struct rb_root {
struct rb_node *rb_node; // 注意是指针,而下方epitem的是对象,此指针通过container_of可以找到epitem
};

struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink;
/* The file descriptor information this item refers to */
struct epoll_filefd ffd;
struct file *file;
int fd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
struct eventpoll *ep;
/* The structure that describe the interested events and the source fd */
struct epoll_event event;
}

struct epoll_event {
__u32 events;
__u64 data;
} EPOLL_PACKED;

// NOTE:奇怪的是 https://man7.org/linux/man-pages/man2/epoll_wait.2.html 中,以及网上的应用中,是这样定义的
// 当然也无所谓,__u64 data看做指针就好
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

初始化

第一次关注 fs_initcall,介绍可以参考 https://cloud.tencent.com/developer/article/1554770

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
fs_initcall(eventpoll_init);
struct sysinfo si;
// totalhigh: high zone部分的物理内存大小,在64位系统中由于不存在high zone故位0,32位系统中根据实际情况
// ref: https://blog.csdn.net/weixin_42730667/article/details/117629859
unsigned long totalhigh; /* Total high memory size */
unsigned long totalram; /* Total usable main memory size */
unsigned long freeram; /* Available memory size */
// mem_unit: 页面大小
unsigned int mem_unit; /* Memory unit size in bytes */
// ...
si_meminfo(&si); // 获得系统信息
// 页表大小4KB,对应12位
// #define PAGE_SHIFT 12 // arch/x86/include/asm/page_types.h
// 所以这里的totalram是以页表为单位的? 迷惑,先按给的注释来吧
/*
* Allows top 4% of lomem to be allocated for epoll watches (per user).
*/
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST; // #define EP_ITEM_COST (sizeof(struct epitem) + sizeof(struct eppoll_entry))
// ... 其他的用到了再来看,略

epoll_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SYSCALL_DEFINE1(epoll_create, int, size)
SYSCALL_DEFINE1(epoll_create1, int, flags)
struct eventpoll *ep = NULL;
error = ep_alloc(&ep);
user = get_current_user();
ep = kzalloc(sizeof(*ep), GFP_KERNEL); // GFP_KERNEL 是内存分配时的选项
init_waitqueue_head(&ep->wq); // wait_queue_head_t 类型,select中也有用到
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
// #define RB_ROOT (struct rb_root) { NULL, }
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
// Creates a new file by hooking it on a single inode. This is useful for files
// * that do not need to have a full-fledged inode in order to operate correctly.
// * All the files created with anon_inode_getfd() will share a single inode,
// 传入 name、file_opearations、private_data、flags
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
return error; // error实际上是fd,无语子

// NOTE: epfd作为一个fd也实现了file_operations,我理解poll对应epoll套epoll的情况,先不看了
static const struct file_operations eventpoll_fops = {
.release = ep_eventpoll_release,
.poll = ep_eventpoll_poll,
.llseek = noop_llseek,
};

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
struct epoll_event epds;
// 如果op是DEL,那么不用将event拷贝到内核空间,这部分略
copy_from_user(&epds, event, sizeof(struct epoll_event))

file = fget(epfd);
tfile = fget(fd); // target file
// 不能自己监听自己、且epfd应对应一个eventepoll file,后者通过检查file的op来确定
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
ep = file->private_data; // 获得 eventpoll 对象
// epoll下挂的fd不能成环,如ep1监听ep2 ep2监听ep1,代码略
mutex_lock(&ep->mtx);
// NOTE: 唯一的一处红黑树查找
epi = ep_find(ep, tfile, fd); // 在ep的rbt中搜索tfile,(tfile, fd)作为搜索的key (大概,未确认)
struct rb_node *rbp;
ep_set_ffd(&ffd, file, fd);
ffd->file = file;
ffd->fd = fd;
for (rbp = ep->rbr.rb_node; rbp; )
// #define rb_entry(ptr, type, member) container_of(ptr, type, member)
// include/linux/rbtree.h
epi = rb_entry(rbp, struct epitem, rbn); // ep的rbr的rb_node指针,通过container_of找到其外围包裹的
kcmp = ep_cmp_ffd(&ffd, &epi->ffd); // 先对比file,后对比fd
if (kcmp > 0)
rbp = rbp->rb_right;
else if (kcmp < 0)
rbp = rbp->rb_left;
else
epir = epi;
break;
// 简单的二叉树查找
switch (op)
case EPOLL_CTL_ADD:
if (!epi) // 如果节点不存在
// NOTE:添加其他需要唤醒的状态
epds.events |= POLLERR | POLLHUP;
// 唯一的一处红黑树插入
ep_insert(ep, &epds, tfile, fd);
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
case EPOLL_CTL_MOD:
if (epi)
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
mutex_unlock(&ep->mtx);

追踪一下 ep_insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
ep_insert(struct eventpoll *ep, struct epoll_event *event, // event是用户传来的 epoll_event(如 EPOLLIN | EPOLLET)
struct file *tfile, int fd)
struct epitem *epi;
// NOTE: 注意epq是一个短时的栈上对象,仅用于调用file的poll,它存了epi和polltable,后者就是 ep_ptable_queue_proc callback的包装
struct ep_pqueue epq;
poll_table pt; // select中也在用的一个struct
// include/linux/poll.h
typedef struct poll_table_struct {
poll_queue_proc _qproc;
__poll_t _key;
} poll_table;
struct epitem *epi;
// 不能注册太多watch,限制了内存消耗
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;

epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd); // 用于在红黑树中比较
epi->event = *event; // 在读ready list返回结果时会再次用到
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); // 有理由相信这里和之前在select内的逻辑会很像,与select不同,这里的
pt->qproc = qproc;

// NOTE: 调用target file的poll,传入 poll_table
revents = tfile->f_op->poll(tfile, &epq.pt);
// 这里放一下select中 sock->ops->poll(file, sock, wait); 的简化版本
udp_poll
datagram_poll
// 分别是socket文件、socket等待队列、poll_table
sock_poll_wait(file, sk_sleep(sk), wait);
poll_wait(filp, wait_address, p);
if (p && wait_address)
p->qproc(filp, wait_address, p);
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
struct epitem *epi = ep_item_from_epqueue(pt); // poll_table 找到外部的 ep_pqueue 再访问 eq_queue->epi:epitem,epitem包含红黑树比较的key、用户传来的epoll选项(EPOLLIN | EPOLLET 等)
return container_of(p, struct ep_pqueue, pt)->epi;
struct eppoll_entry *pwq;
// 申请 eppoll_entry ,类似于select中的poll_table_entry(?)
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)))
// NOTE: 向entry中注入 ep_poll_callback
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
q->func = func;
pwq->whead = whead; // whead就是socket的等待队列,EPOLL_CTL_DEL是需要通过它将whead中注册的epi删除
pwq->base = epi;
// NOTE: 将新申请的 entry 加入 socket等待队列,wait的类型是 wait_queue_t
add_wait_queue(whead, &pwq->wait);
// 将申请的entry pwq加入epoll_ctl insert是创建的 epitem
// Question: 这一行的意义是什么
list_add_tail(&pwq->llink, &epi->pwqlist); // 将 param1 添加到 param2 的尾部,这里双向链表的头尾是连接起来的
epi->nwait++;
// =================== 跟踪 ep_poll_callback ======================
// NOTE: ep_poll_callback 的被调时机是 sk_data_ready -> wake_up_interruptible_sync_poll
// 遍历 socket等待队列,拿到上面添加的 pwq->wait,将其传入 ep_poll_callback
ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) // wait是socket等待队列
struct epitem *epi = ep_item_from_wait(wait); // 从 wait 找到 epi
container_of(p, struct eppoll_entry, wait)->base; // base指向epi
struct eventpoll *ep = epi->ep; // 从epi拿到ep

// NOTE:key是file主动report状态变化时传来的状态位,如果不符合用户的兴趣,过滤,避免唤醒用户
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;

// NOTE: ep_poll_callback作用1:添加ready list
if (!ep_is_linked(&epi->rdllink))
// NOTE: 重要的是epi中记录了事件源(也就是此事件属于哪个FD),加入ep的ready list
// NOTE: 我理解epi->rdllink 实际上并不代表链表,只是代表单个节点,而ep->rdllist才真正是链表
list_add_tail(&epi->rdllink, &ep->rdllist);
__list_add(new, head->prev, head);
// new添加到head->prev与head中间,也就是添加到尾部
// NOTE: ep_poll_callback作用2:唤醒阻塞在epoll_wait的元素
// NOTE:如果没有进程阻塞在epoll_wait上,这里不会调用,但不影响上面将epi添加到ready list
if (waitqueue_active(&ep->wq)) // wq中的list是否empty
// NOTE: 唤醒所有阻塞在 epoll_wait 的
// NOTE: ep->wq 是 wait_queue_head_t 类型,其 task_list 字段中存的是 wait_queue_t 类型
// NOTE: select中 wake_up 系列是留给监听的file调用的,这里的context是 sk_data_ready -> wake_up_interruptible_sync_poll 好吧也是监听的file

// 相比于select是拿到select调用时创建的table,利用其中的current调用default_wake_function
// 这里是拿到了ep->wq,对其中的元素调用func,
// 相当于select只有一层queue,在sk_data_ready对应的socket
// 而ep除了socket有queue以外,epfd自身也维护了ep->wq
// 在epoll调用时,会申请一个 wait_queue_t 加入这第二层的 ep->wq
// NOTE:猜想原因是可能对这个epfd有多个线程调用了epoll_wait,而select则没有fd,也就不会有这种情况
wake_up_locked(&ep->wq);
// #define wake_up_locked(x) __wake_up_locked((x), TASK_NORMAL)
__wake_up_locked(wait_queue_head_t *q, unsigned int mode)
__wake_up_common(q, mode, 1, 0, NULL);
// 这些遍历时的元素在epoll_wait时添加
list_for_each_entry_safe(curr, next, &q->task_list, task_list)
// // curr类型 wait_queue_t
// NOTE:但是select的 pollwake 确实是调用了 try_to_wake_up 的
curr->func(curr, mode, wake_flags, key)
default_wake_function
try_to_wake_up
// NOTE: epoll套epoll的情况
if (waitqueue_active(&ep->poll_wait))
pwake++;
if (pwake)
ep_poll_safewake(&ep->poll_wait);
// 本质是调用wake_up,唤醒所有等待我们的,也就是epfd被等待的情况,可以是select套epfd,也可以是epoll套epfd
// 根据socket队列是否为空等条件返回mask
return mask
list_add_tail(&epi->fllink, &tfile->f_ep_links); // 不明
ep_rbtree_insert(ep, epi); // 将上面初始化的epi添加到红黑树
// TODO 有空学习下红黑树
/* If the file is already "ready" we drop it inside the ready list */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink))
// NOTE:如果已经ready,加入ready list,接着epoll_wait的时候就会再次poll了
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
/* 如果进程正在睡眠等待,唤醒它去处理就绪事件 */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
atomic_long_inc(&ep->user->epoll_watches);

if (pwake)
ep_poll_safewake(&ep->poll_wait);
ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,
ep_poll_wakeup_proc, NULL, wq, (void *) (long) this_cpu);
// priv对应NULL,cookie对应wq: wait_queue_head_t
ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)
ep_wake_up_nested((wait_queue_head_t *) cookie, POLLIN, 1 + call_nests);
// #define wake_up_poll(x, m) \
// __wake_up(x, TASK_NORMAL, 1, (void *) (m))
__wake_up
__wake_up_common(q, mode, nr_exclusive, 0, key); // kernel/sched.c
list_for_each_entry_safe(curr, next, &q->task_list, task_list)
curr->func(curr, mode, wake_flags, key)
return 0;
1
2
3
4
5
6
7
8
9
/**
* __wake_up - wake up threads blocked on a waitqueue.
* @q: the waitqueue
* @mode: which threads
* @nr_exclusive: how many wake-one or wake-many threads to wake up
* @key: is directly passed to the wakeup function
*/
void __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)

epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
/* Verify that the area passed by the user is writeable */
access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))
file = fget(epfd);
if (!is_file_epoll(file))
// ...
error = ep_poll(ep, events, maxevents, timeout);
return error

// NOTE:尾部调用不再缩进
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
int res = 0, eavail, timed_out = 0; // 注意这里有两个timeout,我们关注的无超时情况下 timed_out == 0
wait_queue_t wait;
if (timeout > 0)
// ... 不看超时的部分
else if (timeout == 0) // 非阻塞
// ... 只看阻塞的
fetch_events:
if (!ep_events_available(ep)) // 根据rdllist是否为空判断
// NOTE: 注入current
init_waitqueue_entry(&wait, current);
// q 就是 wait
q->flags = 0;
q->private = p; // 将current写到p
q->func = default_wake_function;
__add_wait_queue_exclusive(&ep->wq, &wait); // 将 wait_queue_t 加入 ep->wq (wait_queue_head_t 类型)
wait->flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(q, wait); // __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
list_add(&new->task_list, &head->task_list); // list_add(struct list_head *new, struct list_head *head)
for (;;)
set_current_state(TASK_INTERRUPTIBLE);
// NOTE: 通过检查队列是否为空判断,而非通过poll判断,这使得ET不会基于fd的poll返回值,但是这样不就永远走不到后面的poll了吗?
// NOTE:错误的,sk_data_ready -> wake_up_interruptible_sync_poll 路径仍然会从socket wait_queue中拿到之前注册的epi,将
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current))
res = -EINTR;
break;
// ref: https://hackmd.io/@hankluo6/2021q3_quiz4
// 當目前沒有事件要處理時,schedule_hrtimeout_range 會主動讓出 CPU 給其他 process,而當 schedule_hrtimeout_range 回傳時表示被喚醒或 timeout
// ===
// 函数文档中有说将task state设置为 TASK_INTERRUPTIBLE 的话也可以因为 interrupt 醒来
schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS) // Returns 0 when the timer has expired otherwise -EINTR 想看timeout分支的话可以参考下
schedule_hrtimeout_range_clock
if (!expires)
// NOTE: 阻塞点
// NOTE: 所以看起来epoll是依赖interrupt,否则就一直主动放弃CPU?
schedule(); // 这是无超时情况的分支,expires就是传进来的to,是NULL
__set_current_state(TASK_RUNNING);
return -EINTR;
__remove_wait_queue(&ep->wq, &wait); // 取消等待
set_current_state(TASK_RUNNING);
eavail = ep_events_available(ep);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events; // 用户传来的
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
LIST_HEAD(txlist);
list_splice_init(&ep->rdllist, &txlist); // 合并rdllist与txlist,重置rdllist为空
error = (*sproc)(ep, &txlist, priv)
ep_send_events_proc(ep, txlist, esed) // (ep, head, priv)
struct ep_send_events_data *esed = priv;
for (eventcnt = 0, uevent = esed->events; !list_empty(head) && eventcnt < esed->maxevents;)
epi = list_first_entry(head, struct epitem, rdllink); // 从txlist取item,原本来自于ep的rdllist
// fd在insert时申请epi
list_del_init(&epi->rdllink); // 从队头移除
// 位运算,如果和用户关心的事件有重叠
// NOTE: 重要,调用poll检查其是否有我们关心的状态,注意做了与运算
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
// NOTE: EPOLLIN 和 POLLIN 是一样的,事实上 EPOLL 的一系列宏(也许除了 EPOLLET)都和select系列是一样的
// ref: https://stackoverflow.com/questions/30685515/are-poll-and-epoll-event-flags-compatible
if (revents)
// uevent指向用户传来的 events 数组item
__put_user(revents, &uevent->events) // 拷贝回用户空间,revents指出有哪些事件
__put_user(epi->event.data, &uevent->data) // 是哪个fd触发的
eventcnt++;
uevent++; // 指向下一个位置

// 如果非ET(也就是LT),将epi放回ep的ready list,下次wait时如果没有数据就不会通过 if (revents) 判断,也就不会再在ready list中出现了
// 而ET的情况需要一次读完,否则只有下次 sk_data_ready 才能再次知道这个fd可读
// NOTE: 下面这么一小段代码实现了LT和ET的差别
if (!(epi->event.events & EPOLLET))
list_add_tail(&epi->rdllink, &ep->rdllist);
return eventcnt;
return error;
// NOTE: 有可能LT不清空ready list,再次进入这里但没有poll到用户关心的事件,这种情况下ready list被清空后应该重新阻塞
goto fetch_events;
return res;

总结(epoll)

TODO 看一下[2]的源码说明

与 select 类似的是都用到了 Linux 的等待队列机制,有代码复用,所以读起来轻松不少

流程分析

TODO pollwake 的深入分析请见 http://gityuan.com/2018/12/02/linux-wait-queue/

context 为 poll 时:向 socket 的等待队列添加包装后的函数指针
init_waitqueue_func_entry(&entry->wait, pollwake);
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);

context为 sk_data_ready -> wake_up_interruptible_sync_poll -> __wake_up_sync_key ->__wake_up_common -> list_for_each_entry_safe -> curr->func
遍历 file 的等待队列,调用上面包装后的函数指针,curr->func(curr …),注意等待队列中的元素curr自身也被传入了,这个curr中是包含了select/epoll_wait调用时记录的当前进程的,因此可以知道要唤醒谁(epoll 的不完全正确,但可以这么理解)

epoll_ctl分析

从 EPOLL_CTL_ADD 开始分析:

epi 对应一个加入的 fd,会加入红黑树(与 ready list),其中包含一个 qproc,随后调用 file 的 poll 时,file 的 poll 应当会调用poll_wait,并将poll_table传入(通过container_of可以找到包裹poll_table的epi),qproc 调用时,会申请一个 eppoll_entry 对象,它的主要作用是包装callback ep_poll_callback,加入file的等待队列(ctl_add时加入,ctl_del时删除)。file在状态变化时应主动调用callback,这个callback做三件事:

  1. 过滤用户不关心的状态变化,避免走到下面导致唤醒
  2. 将 epi 添加到 readylist(如果没有已经添加的话)
  3. 如果ep的等待队列不为空(有进程阻塞在当前 epfd 的 epoll_wait),遍历它们 (wait_queue_t),在其中能找到阻塞进程的 task_struct,唤醒它们
epoll_wait分析
  1. 如果 ep 的 ready list 为空,那么自己将要阻塞,申请一个 wait_queue_t 对象,填入当前进程的task_struct,添加到ep的等待队列,不断调用schedule让出CPU,每次被唤醒(或被调度)时检查ready list是否为空。
  2. ready list不为空的状态下,遍历ready list调用poll,过滤取得的 mask(与运算),将事件消息(哪个 fd 触发、是什么事件类型)拷贝回用户态传入的 buffer。如果是LT,有用户关心事件的fd会放回ready list,下次epoll_wait时会略过步骤1,重新poll,因此实现了LT。ET则不会放回ready list,需要file主动汇报状态触发install的callback才能重新使ready list非空,因此实现了ET

与select的比较

  • select 每次调用全量拷贝了用户感兴趣的fd到内核状态,而epoll是增量的
  • select 使用bitmap维护感兴趣的fd,O1插入删除、On遍历(这里不讨论用户态与内核态的拷贝),而epoll平衡了插入删除与遍历,均为红黑树复杂度
  • select 每次调用与唤醒时需要On poll一遍,epoll 在 epoll_ctl 需要 poll 一次,后续 epoll_wait 阻塞前不 poll,唤醒后 poll 一次,注意 epoll 只 poll 有事件的 fd

配一张图便于理解
ref [6]

ET与LT的区别

LT不会清理 wait_list,下次调用 epoll_wait 时会重新对 file 进行 poll,若无关心的事件才会移出队列,而 wait_list 不会保留在队列中,意味着这是监听的 fd 下次主动汇报状态之前(以 socket 为例,等价于有新数据)仅有的一次通知机会。或者先CTL DEL后ADD也可以重新触发 poll

(我的理解)一个必须使用ET的场景:如果消息处理与 epoll_wait 的调用不在同一个线程,那么为了 epoll_wait 返回后再次调用时不立即再次返回(如果其他线程还未来得及从 buffer 中拿走数据),需要使用 ET

为什么有人说ET更快

我的理解,LT每次调用 epoll_wait 如果使用阻塞 fd 只能读一次来避免阻塞,但如果使用非阻塞fd倒是和ET没什么区别

[5] 在 eventloop 类型(包括各类 fiber / coroutine)的程序中, 处理逻辑和 epoll_wait 都在一个线程, ET相比LT没有太大的差别. 反而由于LT醒的更频繁, 可能时效性更好些. 在老式的多线程RPC实现中, 消息的读取分割和 epoll_wait 在同一个线程中运行, 类似上面的原因, ET和LT的区别不大.但在更高并发的 RPC 实现中, 为了对大消息的反序列化也可以并行, 消息的读取和分割可能运行和epoll_wait不同的线程中, 这时ET是必须的, 否则在读完数据前, epoll_wait会不停地无谓醒来.

不当的使用可能造成饥饿

Question:我看网上有人说ET需要配合robin list,这个和ET/LT没什么关系吧

[1] 简单来说就是对 epoll_wait 返回的 fd 不要一直读,如果这个 fd 有大量数据,就会导致其他 fd 的饥饿,正确做法是搞一个类似 round_robin 的机制去循环读所有fd

惊群问题

如果多线程阻塞在同一个fd上是会有惊群问题的,一种想法是,每个线程对应单独的一个epfd,其中一个线程专门负责accept,accept后的连接epoll_ctl到其他线程的epfd,可以避免此问题

[4]中提到的另一种做法是每个线程对应一个epfd,都监听listenfd,根据上文的源码分析,listenfd事件到来后是会发生惊群的。对于大量连接到来的情况,为避免全连接队列单线程通过accept消费的速度跟不上,使用多线程监听listenfd并消费是有理由的,但还是,要考虑惊群问题,Linux 4.5开始epoll_ctl支持 EPOLLEXCLUSIVE 选项解决此问题,摘录一部分(但是为什么是at least one……)

[9] If the same file descriptor is in multiple epoll
instances, some with the EPOLLEXCLUSIVE flag, and others
without, then events will be provided toall epoll
instances that did not specify EPOLLEXCLUSIVE, and at
least one
of the epoll instances that did specify
EPOLLEXCLUSIVE.

TODO 看到文档上说 EPOLLEXCLUSIVE 可以和 ET 选项一起用,epoll默认是LT,意思是LT也是可以一起用的吧?
TODO EPOLLEXCLUSIVE详细分析 https://wenfh2020.com/2021/11/20/question-epollexclusive/
TODO 另一种方案是用 SO_REUSEPORT,这里不深入了

Question: accept是线程安全的吗?
ref: Is accept() thread-safe? https://stackoverflow.com/a/5124450/8454039
省流:是的

为什么使用红黑树?

  1. 需要判断fd是否已经正在监听,需要较高的查询效率,select以空间换时间,
  2. 若fd不存在则需要插入,也需要考虑插入复杂度

红黑树兼顾了插入、查询效率与树的旋转次数
TODO 我记得旋转次数是它和AVL的区别,但是知乎上也有人放数据显示差不多…… AVL由于最平衡查询快,红黑树旋转少则插入删除快

如果读完一个ET返回的socket,但在调用epoll_wait前,来了新的数据,且调用epoll_wait后不再来新数据(假设必须要我们根据这条数据回复后对方才会发新数据),是否会有liveness问题?

不会的,没有人在 epoll_wait 只是影响 socket 回调不会调用 wakeup 通知阻塞方,而 ctl poll 时添加的 epi 还是在的,并不影响将epi添加到 ready list 的过程。因此即使中间有较短的时间不阻塞在 epoll_wait 但是来了数据,调用 epoll_wait 的时候由于 ready list 非空,还是会走到 poll,拷贝回用户空间,正常返回的,并不会hang住

为什么在边缘触发的epoll函数中要使用非阻塞fd?

对于 connfd
LT的话是每次 epoll_wait 返回后只读一次,无论使用阻塞非阻塞都是一样的,因为只读一次,一定不阻塞
ET的话是每次 epoll_wait 返回后为了读完数据,需要循环读,阻塞fd会导致卡住,就算单独开了个线程去读,卡住也是不好的状态;而非阻塞connfd根据EAGAIN返回码即可判断是否读完

对于listenfd
Nginx 用的是LT(其他read write用的是ET),socketfd 也是可选 block / nonblock 的[8],所以感觉ET其实也行?

LT与ET的epoll例子
https://gist.github.com/liumuqi/04dc92629e322a4613a4610afe786818

TODO 关于非阻塞的想法,推荐扩展阅读
高性能网络通信库中为何要将侦听 socket 设置成非阻塞的?
one thread one loop 思想

拓展阅读

其实还没来得及读
[知乎回答] Nginx为啥使用ET模式Epoll?

参考文章(epoll)

  1. epoll边缘触发模式
  2. [内核源码] epoll 实现原理
  3. [内核源码] epoll lt / et 模式区别
  4. [知乎回答] socket的任意event都会导致epoll_wait的惊群效应吗?
  5. epoll的边沿触发模式(ET)真的比水平触发模式(LT)快吗?(当然LT模式也使用非阻塞IO,重点是要求ET模式下的代码不能造成饥饿)
  6. 如果这篇文章说不清epoll的本质,那就过来掐死我吧! (2)
  7. epoll的使用实例
  8. Is it possible (and safe) to make an accepting socket non-blocking?
  9. epoll_ctl(2) — Linux manual page

Reactor 与 Proactor

Reactor

篇幅较短,就不新写一篇了

单Reactor单线程,同一个线程既负责阻塞在epoll_wait上,也负责事件到来后的处理

1
2
3
4
5
6
7
8
9
10
while True:
epoll_wait()
for fd in fds: # fds that has event
if fd == listenfd:
acceptfd = accept(listenfd)
epoll_ctl(epfd, ADD, acceptfd)
else:
data = read(fd)
resp = handlerDict[fd].handleFunc(data)
write(fd, resp)

单Reactor多线程:Reactor线程只处理连接事件和读写事件,业务处理交给线程池处理[2]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while true:
epoll_wait()
for fd in fds: # fds that has event
if fd == listenfd:
acceptfd = accept(listenfd)
epoll_ctl(epfd, ADD, acceptfd)
else:
data = read(fd)
threadPool.submit(Task(fd, data))

class Task:
@Override
def run(Runnable r):
result = handlerDict[self.fd].handleFunc(self.data)
# 我的理解,通过某种机制通知reactor,通过epoll_ctl监听fd可写事件(EPOLLOUT),将resp返回给client

主从Reactor多线程:认为accept连接比处理数据ready(准备好被读)要重要,因此单独一个线程只处理listenfd,accept后加入其他线程的epfd,其余逻辑同 单Reactor多线程

Proactor

TODO 听说是基于AIO的

参考文章(Reactor与Proactor)

  1. Reactor模式
  2. Reactor线程模型 - 每天晒白牙的文章 - 知乎

select poll epoll 源码阅读
https://vicety.github.io/2022/08/18/select-poll-epoll/
作者
vicety
发布于
2022年8月18日
许可协议