性能文章>【全网首发】Tomcat连接之KeepAlive逻辑分析>

【全网首发】Tomcat连接之KeepAlive逻辑分析原创

416156

背景介绍

我们的系统运行在阿里云上,负载均衡使用的SLB,应用运行环境使用的EDAS,Servlet容器使用的是taobao-tomcat-7.0.59。我们期望在停止应用之前,能够将已经与tomcat建立的连接安全的关闭掉,然后结合SLB的健康检查机制,实现停止应用前优雅的摘流能力,这样在应用停止的时候就不会有任何流量损失了。
但是,在将SLB配置为TCP监听,客户端通过HTTP KeepAlive的方式访问Tomcat的场景下,即使SLB已经检测到后端服务不健康,SLB依然会将已建立连接上的请求转发到不健康的后端服务上,导致无法优雅的摘流。
下面从Tomcat的角度,分析一下Tomcat如何管理HTTP KeepAlive连接的。

分析过程

两个参数

对于一个HTTP连接的管理,有以下两个方面需要考虑:

  • 如果一个HTTP连接建立之后,很少有报文进行交互,长时间不销毁是对资源的浪费
  • 如果一个HTTP连接建立之后,有大量的报文进行交互,可能会造成负载不均或其他问题

在Tomcat中控制以上两个行为的参数分别是keepAliveTimeout和maxKeepAliveRequests。
keepAliveTimeout

The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute. Use a value of -1 to indicate no (i.e. infinite) timeout.
此连接器在关闭连接之前等待另一个HTTP请求的毫秒数。默认值是使用为connectionTimeout属性设置的值。使用-1值表示没有(即无限)超时。

maxKeepAliveRequests

The maximum number of HTTP requests which can be pipelined until the connection is closed by the server. Setting this attribute to 1 will disable HTTP/1.0 keep-alive, as well as HTTP/1.1 keep-alive and pipelining. Setting this to -1 will allow an unlimited amount of pipelined or keep-alive HTTP requests. If not specified, this attribute is set to 100.
在服务器关闭连接之前可以流水线处理的HTTP请求的最大数量。将此属性设置为1将禁用HTTP/1.0的keep-alive,以及HTTP/1.1的keep-alive和pipelining。将其设置为-1将允许无限数量的管道或保持活动的HTTP请求。如果未指定,则将此属性设置为100。

通过arthas mbean命令查看Tomcat配置:
image.png
通过以上分析可知,keepAliveTimeout配置为15000毫秒,maxKeepAliveRequests配置为100,也就是说当HTTP连接空闲15000毫秒或者HTTP连接收到请求数量大于等于100的时候,Tomcat会关闭该HTTP连接。
下面分析Tomcat中HTTP KeepAlive的相关逻辑。

KeepAlive逻辑分析

以下从【建立HTTP连接】—>【解析请求】—>【准备响应】—>【关闭连接】进行分析。

建立HTTP连接

Socket acceptor thread,org.apache.tomcat.util.net.NioEndpoint$Acceptor,相关逻辑如下:

protected class Acceptor extends AbstractEndpoint.Acceptor {
    @Override
    public void run() {
        while (running) {
            ......
            SocketChannel socket = serverSock.accept();
            ......
            if (running && !paused) {
                if (!setSocketOptions(socket)) {
                    ......
                }
            } else {
               ......
            }
        }
    }
}

protected boolean setSocketOptions(SocketChannel socket) {
	......
	NioChannel channel = nioChannels.poll();
	......
	//将NioChannel交给Socket poller thread处理
	getPoller0().register(channel);
}

Socket poller thread主要逻辑如下:

public class Poller implements Runnable {
	public void register(final NioChannel socket) {
		socket.setPoller(this);
		// KeyAttachment继承了SocketWrapper
		KeyAttachment key = keyCache.poll();
		final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
		... ...
		// 将MaxKeepAliveRequests关联到socket上
		ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
		... ...
		PollerEvent r = eventCache.poll();
		... ...
		addEvent(r);
	}
	@Override
	public void run() {
		while (true) {
			... ...
			Iterator<SelectionKey> iterator =
			keyCount > 0 ? selector.selectedKeys().iterator() : null;
			while (iterator != null && iterator.hasNext()) {
				SelectionKey sk = iterator.next();
				KeyAttachment attachment = (KeyAttachment)sk.attachment();
				// Attachment may be null if another thread has called
				// cancelledKey()
				if (attachment == null) {
					iterator.remove();
				} else {
					attachment.access();
					iterator.remove();
					// 处理Socket上的事件
					processKey(sk, attachment);
				}
			}
			... ...
		}
	}
	protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
		... ...
		if (sk.isReadable()) {
			if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
				closeSocket = true;
			}
		}
		if (!closeSocket && sk.isWritable()) {
			if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
				closeSocket = true;
			}
		}
		... ...
	}
	public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
		... ...
		KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
		... ...
		SocketProcessor sc = processorCache.poll();
        if ( sc == null ) sc = new SocketProcessor(socket,status);
        else sc.reset(socket,status);
		// 从Socket poller thread切换到Worker Threads pool
        if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
        else sc.run();
		... ...
	}
}

流程由Socket poller thread切换到了Worker threads pool SocketProcessor,Worker threads pool是用来处理业务逻辑的线程池,接下来分析Tomcat如何解析请求的。

解析请求

SocketProcessor会流转到org.apache.coyote.http11.AbstractHttp11Processor.process中,解析请求相关逻辑如下:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
    ... ...
    keepAlive = true;
    // 如果当前处理业务的线程占总线程的比例超过disableKeepAlivePercentage(默认:75),
    // 则将socket可接收的请求数设置为0
    if (disableKeepAlive()) {
        socketWrapper.setKeepAliveLeft(0);
    }
    ... ...
    prepareRequest();
    ... ...
    if (maxKeepAliveRequests == 1) {
        keepAlive = false;
    } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) {
        // 关键逻辑,socket已接收的请求数量是否超过配置值
        keepAlive = false;
    }
	... ...
	// 调用Servlet service方法
	adapter.service(request, response);
	... ...
}
@Override
protected boolean disableKeepAlive() {
    int threadRatio = -1;   
    // These may return zero or negative values
    // Only calculate a thread ratio when both are >0 to ensure we get a
    // sensible result
    int maxThreads, threadsBusy;
    if ((maxThreads = endpoint.getMaxThreads()) > 0
        && (threadsBusy = endpoint.getCurrentThreadsBusy()) > 0) {
        threadRatio = (threadsBusy * 100) / maxThreads;
    }
    // Disable keep-alive if we are running low on threads      
    if (threadRatio > getDisableKeepAlivePercentage()) {     
        return true;
    }

    return false;
}
protected void prepareRequest() {
    ... ...
    // 通过HTTP协议版本进行判断
    MessageBytes protocolMB = request.protocol();
    if (protocolMB.equals(Constants.HTTP_11)) {
        // HTTP 1.1为keepalive
        ......
    } else if (protocolMB.equals(Constants.HTTP_10)) {
        // HTTP 1.0为非keepalive
        keepAlive = false;
        ......
    } else if (protocolMB.equals("")) {
        // HTTP 0.9为非keepalive
        ......
        keepAlive = false;
    } else {
        // Unsupported protocol
        http11 = false;
        // Send 505; Unsupported HTTP version
        response.setStatus(505);
        ......
    }
	// 通过HTTP Header Connection进行判断
    // 当Connection : close,keepAlive为false
    // 当Connection : keep-alive,keepAlive为true
    MessageBytes connectionValueMB = headers.getValue(Constants.CONNECTION);
    if (connectionValueMB != null) {
        ByteChunk connectionValueBC = connectionValueMB.getByteChunk();
        if (findBytes(connectionValueBC, Constants.CLOSE_BYTES) != -1) {
            keepAlive = false;
        } else if (findBytes(connectionValueBC,
                             Constants.KEEPALIVE_BYTES) != -1) {
            keepAlive = true;
        }
    }
	... ...
}

准备响应

不同的响应也会影响到keepAlive的设置,下面是相关逻辑:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
	... ...
	// 调用Servlet service方法,其中会调用到prepareResponse()
	adapter.service(request, response);
	... ...
	if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
	... ...
}

private void prepareResponse() {
	boolean entityBody = true;
    contentDelimitation = false;
    int statusCode = response.getStatus();
    if (statusCode < 200 || statusCode == 204 || statusCode == 205 || statusCode == 304){
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        entityBody = false;
        contentDelimitation = true;
    }

    MessageBytes methodMB = request.method();
    if (methodMB.equals("HEAD")) {
        // No entity body
        getOutputBuffer().addActiveFilter(outputFilters[Constants.VOID_FILTER]);
        contentDelimitation = true;
    }
    ... ...
    if ((entityBody) && (!contentDelimitation)) {
        // Mark as close the connection after the request, and add the
        // connection: close header
        keepAlive = false;
    }
    // This may disabled keep-alive to check before working out the
    // Connection header.
    checkExpectationAndResponseStatus();
    // If we know that the request is bad this early, add the
    // Connection: close header.
    keepAlive = keepAlive && !statusDropsConnection(statusCode);
    if (!keepAlive) {
        // Avoid adding the close header twice
        if (!connectionClosePresent) {
            headers.addValue(Constants.CONNECTION).setString(Constants.CLOSE);
        }
    } else if (!http11 && !getErrorState().isError()) {
        headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
    }
    ... ...
}
/**
 * Determine if we must drop the connection because of the HTTP status
 * code.  Use the same list of codes as Apache/httpd.
 */
protected boolean statusDropsConnection(int status) {
    return status == 400 /* SC_BAD_REQUEST */ ||
           status == 408 /* SC_REQUEST_TIMEOUT */ ||
           status == 411 /* SC_LENGTH_REQUIRED */ ||
           status == 413 /* SC_REQUEST_ENTITY_TOO_LARGE */ ||
           status == 414 /* SC_REQUEST_URI_TOO_LONG */ ||
           status == 500 /* SC_INTERNAL_SERVER_ERROR */ ||
           status == 503 /* SC_SERVICE_UNAVAILABLE */ ||
           status == 501 /* SC_NOT_IMPLEMENTED */;
}
@Override
protected boolean breakKeepAliveLoop(SocketWrapper<Socket> socketWrapper) {
    openSocket = keepAlive;
    // If we don't have a pipe-lined request allow this thread to be
    // used by another connection
    if (inputBuffer.lastValid == 0) {
        return true;
    }
    return false;
}

关闭连接

给客户端发送完响应就是判断是否关闭Socket的逻辑了:

public SocketState process(SocketWrapper<S> socketWrapper) throws IOException {
	... ...
	if (breakKeepAliveLoop(socketWrapper)) {
       break;
    }
	... ...
    if (getErrorState().isError() || endpoint.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync() || comet) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else if (getUpgradeInbound() != null) {
        return SocketState.UPGRADING_TOMCAT;
    } else {
        if (sendfileInProgress) {
            return SocketState.SENDFILE;
        } else {
            // 从上面分析可知,openSocket==keepAlive,
            // 所以当openSocket=false的时候,keepAlive=false,此时SocketState为CLOSED
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

以上process方法执行完后,最终会返回到SocketProcessor.run中,逻辑如下:

public void run() {
    boolean launch = false;
    synchronized (socket) {
        try {
            SocketState state = SocketState.OPEN;
            ... ...
            state = handler.process(socket,status);
            // 当keepAlive=false的时候,关闭Socket
            if (state == SocketState.CLOSED) {
                // Close socket
                ... ...
                try {
                    socket.getSocket().close();
                } catch (IOException e) {
                    // Ignore
                }
            } else if (state == SocketState.OPEN ||
                    state == SocketState.UPGRADING ||
                    state == SocketState.UPGRADING_TOMCAT  ||
                    state == SocketState.UPGRADED){
                ... ...
                launch = true;
            } else if (state == SocketState.LONG) {
                socket.access();
                waitingRequests.add(socket);
            }
        }finally {
            if (launch) {
                try {
                    getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
                } catch (RejectedExecutionException x) {
                    ... ...
                }
            }
    }
}

总结

从以上分析可知,keepAlive既与请求有关,也与响应有关。
解析请求的时候,会根据系统繁忙程度、使用的HTTP协议版本,HTTP Header Conncetion及Socket接收的请求数量有关;
准备响应的时候,会根据业务返回的状态码,系统异常等情况进行判断,当Socket状态为SocketState.CLOSED的时候则会关闭连接。
通过以上分析,想必大家已经知道怎么优雅的关闭HTTP连接了。

参考资料

GitHub - apache/tomcat: Apache Tomcat

点赞收藏
分类:标签:
大禹的足迹

在阿里搬了几年砖的大龄码农,头条号:大禹的足迹

请先登录,查看5条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步

为你推荐

Redis stream 用做消息队列完美吗?

Redis stream 用做消息队列完美吗?

Netty源码解析:writeAndFlush

Netty源码解析:writeAndFlush

6
5