从jdk的nio到epoll源码与实现内幕全面解析原创
最近笔者在研究java的nio部分代码时,看到在java的nio中关于事件类型的设计有四个分别是:
- OP_ACCEPT
- OP_READ
- OP_WRITE
- OP_CONNECT
然而nio的底层在Linux的实现是epoll,在epoll的模型中事件类型相对比较多分别是:
- POLLIN
- POLLOUT
- POLLERR
- POLLHUP
- POLLNVAL
- POLLREMOVE
当时笔者就有一个疑惑,为什么在epoll中没有关于OP_ACCEPT相关的事件,那么OP_ACCEPT在epoll中又意味着什么
怀着这样的疑问,笔者开始对源码进行解析,想一探究竟,本篇博客,我们就一起学习下关于java的nio到epoll具体实现的源码。
我们先从java服务端nio代码,一般在java中使用nio编程时都会使用这样的代码:
//当我们在java中使用nio编程时,一般会使用这几行代码在服务端开启一个selector和channel对特定端口进行监听
//笔者省略了一些关于selectionKey的循环处理逻辑
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(bindIp, port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
selector.select(1000L)
Set<SelectionKey> keys = selector.selectedKeys()
1.selector.open():
我们来一行一行分析,先看selector.open(),这一行会创建一个selector,我们一起看下selector的本质是什么:
//这个方法是开启一个Selector
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//默认情况下使用DefaultSelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//linux环境下DefaultSelectorProvider的create方法
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
//可以看出在linux环境下使用的是EPollSelectorProvider
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
//EPollSelectorProvider的openSelector()方法
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
EPollSelectorImpl从名字可以看出,nio的在Linux系统上的实现果然是epoll,我们看下EPollSelectorImpl类的构造方法:
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
//创建一个pipe通道,返回fd文件句柄,用于实现超时机制
long pipeFds = IOUtil.makePipe(false);
//分别保存输入和输出的句柄
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
//创建epoll包装类
pollWrapper = new EPollArrayWrapper();
//这里是初始化中断,后面用于实现超时机制
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
这里我们看到在selector中实际是创建了一个epoll的包装类,我们先着看这个包装类构造方法做了什么:
//EPollArrayWrapper构造方法的
EPollArrayWrapper() throws IOException {
//创建epoll文件描述符 epollCreate()是native方法
epfd = epollCreate();
//申请一个数组,用于保存epoll_event的数组
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
//这里保存了这个数组的地址用于将其传给epoll,epoll会将准备好的事件复制到数组中
pollArrayAddress = pollArray.address();
//用于保存高位channel绑定的socket文件句柄和感兴趣事件的映射
//EPollArrayWrapper中还有用于保存映射关系的低位数组,两者作用是一样的
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
//epollCreate的native方法
private native int epollCreate();
我们看下创建epoll的的native方法,这个方法的实现在EPollArrayWrapper.c中,是由c语言写的:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
//可以看到简单干脆,调用了epoll_create方法,并直接返回epoll的文件句柄
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
到这里我们可以看出java中的selector在linux上其实其本质就是一个epoll,当然以笔者看源码的习惯是不会满足于这里,我们还要继续往下看。
而epoll_create()方法的实现就需要从linux源码中寻找了,它在eventpoll.c中:
//来源于linux 5.14.7
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return do_epoll_create(0);
}
//调用这个方法
static int do_epoll_create(int flags)
{
int error, fd;
//声明一个eventpoll结构体
struct eventpoll *ep = NULL;
struct file *file;
......
//这个方法申请一个epoll
error = ep_alloc(&ep);
//之后会将epoll绑定一个文件并返回其文件句柄
......
}
这里比较关键的是eventpoll这个结构体,后面我们还会讲到,这里可以先注意下:
//这个结构体有两个比较重要的数据结构
struct eventpoll {
.....
//1.就绪队列,本质是一个链表
struct list_head rdllist;
//2.一个红黑树的root节点,用于保存需要监听的socket文件句柄
struct rb_root_cached rbr;
......
};
刚刚还有一个epoll包装类的初始化中断方法——pollWrapper.initInterrupt(fd0, fd1);,这个方法比较简单,我们来看下:
//传入的是申请的pipe的输入和输出文件句柄
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
//这个方法会往epoll中注册事件,这里我们先跳过后面还会讲到
//这里将中断pipe的可读事件(EPOLLIN)注册到epoll中,用于进行对epoll中断
//原理是当pipe有可读事件时会进行中断,从而实现超时机制
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
2.ServerSocketChannel.open():
让我们继续分析java的Nio编程步骤,这一行将会创建一个serverSocketChannel,我们一起看下chanel的本质是什么:
public static ServerSocketChannel open() throws IOException {
//SelectorProvider.provider()前面提到了会返回DefaultSelectorProvider
return SelectorProvider.provider().openServerSocketChannel();
}
//我们看下linux下DefaultSelectorProvider的openServerSocketChannel()方法
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
//这里比较简单我们直接看SocketChannelImpl的构造方法
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
//这个方法最后会创建一个socket套接字并返回fd
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
//设置状态,正在使用
this.state = ST_INUSE;
}
我们可以看到channel的本质其实是封装了一个socket套接字绑定的文件句柄。
3.serverChannel.bind():
接着往下看,serverChannel.bind()这个方法是用来绑定端口号的,其内容也比较简单:
//ServerSocketChannelImpl.bind()方法实现
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (lock) {
......
SecurityManager sm = System.getSecurityManager();
if (sm != null)
sm.checkListen(isa.getPort());
//绑定端口号钩子,默认是个空方法
NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
//将之前绑定的文件句柄绑定端口号,并开始监听
Net.bind(fd, isa.getAddress(), isa.getPort());
Net.listen(fd, backlog < 1 ? 50 : backlog);
synchronized (stateLock) {
localAddress = Net.localAddress(fd);
}
}
return this;
}
4.serverChannel.register()
继续往下看,这个方法比较重要,是进行注册事件的方法,最终会调用其父类AbstractSelectableChannel的方法:
//最终会调用这个方法,第三个参数是额外对象,在这里传入的为Null
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
//我们跳过前面一些判断
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
//先查找是否已经在selector上注册过并有key
SelectionKey k = findKey(sel);
//如果key不为空直接注册感兴趣的事件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
//直接看这里进行事件注册,并返回一个selectionKey
k = ((AbstractSelector)sel).register(this, ops, att);
//将key加入key数组
addKey(k);
}
}
return k;
}
}
//register会调用SelectorImpl的实现方法
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
//创建一个SelectionKey
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
//调用实现类的register方法
implRegister(k);
}
//注册感兴趣事件
k.interestOps(ops);
return k;
}
这里出现两个方法 implRegister()和k.interestOps(),我们按顺序看下:
//在linux下会调用前面提到的EPollSelectorImpl的实现方法
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
//获取刚刚的ServerSocketChannel
SelChImpl ch = ski.channel;
//这里获取的fd是chnnal关联的socket的fd
int fd = Integer.valueOf(ch.getFDVal());
//添加fd与key的映射
fdToKey.put(fd, ski);
//将文件句柄加入epoll包装类
pollWrapper.add(fd);
//加入到selector上注册的keys集合
//解释下keys:一个SelectionKey的Set集合,即selector.selectedKeys()方法返回的集合
//一般用于遍历并判断key感兴趣的事件是否可以处理
keys.add(ski);
}
//看下pollWrapper.add(fd)的方法
void add(int fd) {
synchronized (updateLock) {
//先更新事件强制为0,防止其被之前的注册kill掉
setUpdateEvents(fd, (byte)0, true);
}
}
//更新事件,方法就是将fd和感兴趣的事件进行映射,保存在之前我们提到的eventsLow(低位数组)和eventsHigh中
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
然后我们看看k.interestOps()的方法:
//SelectionKeyImpl的interestOps方法
public SelectionKey interestOps(int ops) {
//验证是否合法
ensureValid();
return nioInterestOps(ops);
}
public SelectionKey nioInterestOps(int ops) {
//这个判断是判断是否是OP_ACCEPT事件,不是则抛异常
//channel().validOps()是ServerSocketChannel只会返回OP_ACCEPT事件
// & ~是位运算的符号
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
//翻译并对事件进行注册
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
看到这里大家可能会有一个疑问,为什么要翻译事件呢?其实就像本文开头所讲的,epoll的事件和nio中设计的事件并不是一一对应的,所以需要加上这一步将nio的事件进行翻译处理。
来看看这个翻译方法是如何处理的:
//ServerSocketChannelImpl的方法
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
// 翻译 ops 如果为OP_ACCEPT则替换成POLLIN
if ((ops & SelectionKey.OP_ACCEPT) != 0)
newOps |= PollArrayWrapper.POLLIN;
// 将ops添加到selector的epoll_event数组中
sk.selector.putEventOps(sk, newOps);
}
//调用EpollSelectorImpl的方法
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
//注册感兴趣事件
//第一个参数是channel关联的socket的文件描述符
//第二个参数是刚刚翻译的POLLIN
pollWrapper.setInterest(ch.getFDVal(), ops);
}
//将事件加入注册数组
void setInterest(int fd, int mask) {
synchronized (updateLock) {
......
//添加文件描述符到数组updateDescriptors
//这里同时将updateCount更新数量增加
updateDescriptors[updateCount++] = fd;
//转换为byte
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
//更新事件,这个方法刚刚提到了,之前强制更新为0,现在添加fd 和event的映射关系到数组
setUpdateEvents(fd, b, false);
}
}
5.selector.select():
继续往下走,我们看这个select()方法:
//selector.select方法最后调用
public int select(long timeout)
throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}
//linux环境下调EPollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
//这个方法会执行将取消队列中的取消key的请求
//本文这里就不展开
processDeregisterQueue();
try {
//这个方法会在当前线程上注册一个钩子,保证线程在执行的时候如果被中断则进行唤醒
begin();
//涉及epoll实现的方法,我们进去看下
pollWrapper.poll(timeout);
} finally {
//这个方法会删除begin()方法注册的钩子
end();
}
processDeregisterQueue();
//这个方法会将pollArray数组中的已经完成的事件取出,找到对应的Key将其
//更新前面提到的SelectionKey的Set集合,即selector.selectedKeys()方法返回的集合
int numKeysUpdated = updateSelectedKeys();
//如果是被中断则清除中断标记,这里的中断是之前提到的利用pipe实现的在中断
if (pollWrapper.interrupted()) {
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
pollWrapper.poll()这个方法内容比较多:
//调用EpollArrayWrapper的poll()方法
int poll(long timeout) throws IOException {
//更新注册信息
updateRegistrations();
//native方法epollWait()
//pollArrayAddress是发生事件的数组地址,epoll会把发生的事件复制到数组中
//updated设置为要处理的事件数
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
//这里判断是否是中断pipe的fd,如果是则直接break跳出循环
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
这里又是两个分支:updateRegistrations()和epollWait(),我们按顺序分析,先看updateRegistrations():
//更新注册信息
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
//从更新文件描述符数组中获取文件描述符(刚刚我们已经将fd添加进去)
int fd = updateDescriptors[j];
//从事件数组中根据描述符获取事件集合
short events = getUpdateEvents(fd);
//registered是一个bitMap跟踪是否文件描述符被epoll注册
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
//如果已经注册过且不为0则为修改,如果为0则为删除
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
//如果没注册过且不为0则为添加
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
//调用native方法注册epoll,我们又碰到这个方法,这次我们直接来看下
epollCtl(epfd, opcode, fd, events);
//更新registered位图
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
关于epollCtl()刚刚笔者已经提到了,现在我们稍微介绍下,epollCtl()底层调用了epoll_ctl方法是用来在epoll上注册事件的,其有四个参数其意义分别是:
第一个参数是epoll_create()的返回值,epoll绑定的文件句柄
第二个参数表示要操作的fd在epoll监听的动作,一共有三种类型:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd(可以是socket套接字的句柄,也可以是pipe通道的句柄,之前的中断pipe就是通道句柄)
第四个参数是告诉内核需要监听fd的事件类型,一共有以下几种类型(即本文开头笔者提到的事件类型):
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
这里代码简单贴了出来,大家有兴趣的话可以去eventpoll.c中自行搜索查看:
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
struct epoll_event epds;
if (ep_op_has_event(op) &&
//将epoll_event事件从用户态copy到内核态
copy_from_user(&epds, event, sizeof(struct epoll_event)))
return -EFAULT;
return do_epoll_ctl(epfd, op, fd, &epds, false);
}
//这个方法比较复杂,笔者忽略了部分代码
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
{
.....
error = -EINVAL;
//判断不同的epoll_ctl动作
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds->events |= EPOLLERR | EPOLLHUP;
//这个方法会将要监听的fd(之前绑定socket)加入epoll的红黑树中
error = ep_insert(ep, epds, tf.file, fd, full_check);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
if (!(epi->event.events & EPOLLEXCLUSIVE)) {
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
}
} else
error = -ENOENT;
break;
}
.....
}
注:这里关于ep_insert()这个方法笔者就不继续展开,这个方法将对应的fd添加到epoll的红黑树中,并在添加一个回调(叫做ep_poll_callback)绑定在epoll事件发生后,这个回调会在epoll事件发生后将对应的事件放到epoll的准备就绪链表上,并唤醒epoll中的所有等待队列。
代码执行到这里,我们需要监听的所有socoket的fd已经被添加到epoll的红黑树中了,并且当产生对应感兴趣的事件后,会调用我们添加的回调方法,将对应事件复制到epoll的就绪链表中,所以接下来epoll要做的就是监听这个就绪链表,这也就是epollWait()方法的内容,我们一起来看下:
//实现一样是在linux源码中,最后会调用这个方法
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, struct timespec64 *to)
{
......
//这个方法会先将一个临时空节点加入就绪队列rdllist中
//之后将就绪队列从eventpoll中卸载,只从临时空节点开始遍历获取所有的就绪事件,
//并将其从内核态copy到用户态,copy到之前传入的事件数组pollArray中(之前传入了pollArray的地址)
//我们就不继续展开
error = ep_poll(ep, events, maxevents, to);
......
}
到这里pollArray中就有已经就绪的地址了,之后会调用updateSelectedKeys()这个方法(之前提到过的方法)将其复制到key的集合中,以便于我们获取:
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
//这个方法会将pollArray数组中的已经完成的事件取出,找到对应的Key将其
//更新前面提到的SelectionKey的Set集合,即selector.selectedKeys()方法返回的集合
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
private int updateSelectedKeys() {
//获取已经就绪的事件数
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
//这个方法是从epollArray中获取就绪的事件关联的fd句柄
int nextFD = pollWrapper.getDescriptor(i);
//根据fd句柄获取映射的key
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (ski != null) {
//这个方法是从epollArray中获取就绪的事件类型
int rOps = pollWrapper.getEventOps(i);
//翻译事件并将其加入selectedkeys集合中
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
6.selector.selectedKeys():
这个方法就比较简单了,单纯的返回selectedKeys的集合:
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
return this.publicSelectedKeys;
}
}
7.总结:
本篇博客我们从java的nio开始一起学习了epoll的实现和执行原理,笔者在这里再根据一次网络请求画一张图,方便大家更好理解epoll的模型:
最后,由于笔者水平有限,如有错误欢迎大家指出,互相学习交流。