【译】处理阻塞调用的两种方法:线程并发与网络异步转载
问题描述
当有客户端连接到 HTTP 服务器(或任何 TCP 服务器)并发送需要大量计算的请求时,每个请求都需要执行一些可能需要 “任意时间才能完成”代码。如果我们将这个耗时的代码隔离在一个函数中,我们就可以将此函数称为 阻塞调用。
也就是说,调用结果返回之前,当前线程会被挂起。
简单的例子是查询数据库的函数或操作大图像文件的函数。在一个连接将由其自己的专用线程处理的旧模型中,这不是问题。但是在单个线程将处理数千个连接的新反应器模型中,只需要一个连接执行阻塞调用,就会影响和阻塞所有其他连接。
那么我们如何在不回到专用线程模型的情况下解决这个问题呢?
解决方案 #1:线程并发
你基本上使用CoralQueue将请求的工作(而不是请求本身)分配给固定数量的线程,这些线程同时执行(即并行)。假设您有 1000 个同时连接。除了拥有 1000 个并发线程(即不切实际 的每个连接一个线程 模型),您可以分析您的机器有多少可用的 CPU 内核并选择更少的线程数,比如说 4。这种架构将为您提供以下优点:
- 处理 HTTP 服务器请求的关键反应器线程永远不会阻塞,因为每个请求所需的工作将简单地添加到队列中,从而释放反应器线程以处理额外的传入 HTTP 请求。
- 即使一两个线程收到一个需要很长时间才能完成的请求,其他线程也可以继续排空队列中的请求。
如果你能提前猜到哪些请求会花费很长时间执行,你甚至可以将队列划分为通道,并为高优先级/快速请求设置一个快速通道,这样它们总能找到一个空闲线程来执行。
解决方案#2:分布式系统
你可以使用分布式系统架构并利用异步网络调用,而不是在一台机器上使用有限的 CPU 内核做所有事情。这简化了处理请求的 HTTP 服务器,现在不需要任何额外的线程和并发队列。它可以在单个非阻塞反应器线程中完成所有操作。它是这样工作的:
- 您可以将此任务移动到另一个 节点 (即另一个进程或机器),而不是在 HTTP 服务器本身上进行繁重的计算。
- 您可以简单地对负责繁重计算任务的节点进行异步网络调用,而不是使用 CoralQueue 跨线程分配工作。
- HTTP 服务器将 异步等待 来自繁重计算节点的响应。响应可能需要尽可能长的时间才能通过网络到达,因为 HTTP 服务器永远不会阻塞。
- HTTP 服务器只能使用一个线程来处理来自外部客户端的传入 HTTP 连接以及到执行繁重计算工作的内部节点的传出 HTTP 连接。
- 它的美妙之处在于,您可以通过根据需要简单地添加更多节点来进行扩展。故障转移和负载平衡也变得微不足道。
现在你可能会问:我们如何为这个负责繁重计算工作的新节点实现架构?我们不只是将问题从一台机器转移到另一台机器上吗?当然不是!
HTTP 服务器不关心或不需要知道该节点将如何选择执行繁重的计算任务。它所需要的只是一对带有流水线支持的 HTTP keep-alive 连接,通过它可以发送多个异步请求。就 HTTP 服务器而言,繁重的计算节点可以使用最好或最差的架构来完成其工作。服务器将发出请求并异步等待答案。
举个例子
假设我们有一个 HTTP 服务器,它接收来自客户的股票价格请求。它知道股票价格的方式是向 GoogleFinance 发出 HTTP 请求以发现价格。如果向 Google 发出请求是一个阻塞调用(这是因为您怎么能提前知道需要多长时间才能得到响应?)我们可以使用解决方案 #1。请求将分布在并行处理它们的线程中,必要时阻塞以等待 Google 以价格响应。但是等一下, 为什么我们不能将 Google 视为分布式系统中的一个单独节点,并对其 HTTP 服务器进行异步调用呢? 那是解决方案#2,下面的代码显示了它是如何实现的:
package com.coralblocks.coralreactor.client.bench.google;
import java.nio.ByteBuffer;
import java.util.Iterator;
import com.coralblocks.coralbits.ds.IdentityMap;
import com.coralblocks.coralbits.ds.PooledLinkedList;
import com.coralblocks.coralbits.util.ByteBufferUtils;
import com.coralblocks.coralreactor.client.Client;
import com.coralblocks.coralreactor.nio.NioReactor;
import com.coralblocks.coralreactor.server.Server;
import com.coralblocks.coralreactor.server.http.HttpServer;
import com.coralblocks.coralreactor.util.Configuration;
import com.coralblocks.coralreactor.util.MapConfiguration;
public class AsyncHttpServer extends HttpServer implements GoogleFinanceListener {
// number of http clients used to connect to google
private final int numberOfHttpClients;
// the clients used to connect to google
private final GoogleFinanceClient[] googleClients;
// a list of clients waiting for responses from google (for each google http connection)
private final IdentityMap<GoogleFinanceClient, PooledLinkedList<Client>> pendingRequests;
private final StringBuilder response = new StringBuilder(1024);
private final StringBuilder symbol = new StringBuilder(32);
private final StringBuilder price = new StringBuilder(32);
public AsyncHttpServer(NioReactor nio, int port, Configuration config) {
super(nio, port, config);
this.numberOfHttpClients = config.getInt("numberOfHttpClients");
this.googleClients = new GoogleFinanceClient[numberOfHttpClients];
this.pendingRequests = new IdentityMap<GoogleFinanceClient, PooledLinkedList<Client>>(numberOfHttpClients);
MapConfiguration googleFinanceConfig = new MapConfiguration();
googleFinanceConfig.add("readBufferSize", 512 * 1024); // the html page is big...
for(int i = 0; i < googleClients.length; i++) {
googleClients[i] = new GoogleFinanceClient(nio, "www.google.com", 80, googleFinanceConfig);
googleClients[i].addListener(this);
googleClients[i].open();
pendingRequests.put(googleClients[i], new PooledLinkedList<Client>());
}
}
private CharSequence parseSymbolFromRequest(ByteBuffer request) {
// for simplicity we assume that the symbol is the request
// Ex: GET /GOOG HTTP/1.1 => the symbol is GOOG
int pos = ByteBufferUtils.positionOf(request, '/');
if (pos == -1) return null;
request.position(pos + 1);
pos = ByteBufferUtils.positionOf(request, ' ');
if (pos == -1) return null;
request.limit(pos);
symbol.setLength(0);
ByteBufferUtils.parseString(request, symbol);
return symbol;
}
@Override
protected void handleMessage(Client client, ByteBuffer msg) {
HttpAttachment a = (HttpAttachment) getAttachment(client);
ByteBuffer request = a.getRequest();
CharSequence symbol = parseSymbolFromRequest(request);
if (symbol == null) return;
long clientId = getClientId(client);
// distribute requests across our Google http clients...
int index = (int) (clientId % numberOfHttpClients);
GoogleFinanceClient googleClient = googleClients[index];
if (!googleClient.isConnectionOpen()) {
client.close();
return;
}
// send the request to google (it fully supports http pipelining)
googleClient.sendPriceRequest(symbol);
// add this client to the line of clients waiting for a response from the google http client
pendingRequests.get(googleClient).add(client);
}
@Override // from GoogleFinanceListener interface
public void onSymbolPrice(GoogleFinanceClient googleClient, CharSequence symbol, ByteBuffer priceBuffer) {
// Got a response from google, respond to the client waiting for the price...
PooledLinkedList<Client> clients = pendingRequests.get(googleClient);
Client client = clients.removeFirst();
price.setLength(0);
ByteBufferUtils.parseString(priceBuffer, price);
response.setLength(0);
response.append("HTTP/1.1 200 OK\n");
response.append("Content-Type: text/plain\n");
response.append("Server: CoralReactor\n");
response.append("Date: ").append(getDateTime()).append("\n");
response.append("Content-length: ").append(price.length()).append("\n");
response.append("\n");
response.append(price);
client.send(response);
}
@Override // from GoogleFinanceListener interface
public void onConnectionOpened(GoogleFinanceClient client) {
// NOOP
}
@Override // from GoogleFinanceListener interface
public void onConnectionTerminated(GoogleFinanceClient googleClient) {
// Our connection to google was broken... close all clients waiting on this connection...
PooledLinkedList<Client> clients = pendingRequests.get(googleClient);
Iterator<Client> iter = clients.iterator();
while(iter.hasNext()) {
Client c = iter.next();
if (c.isOpen()) c.close();
}
clients.clear();
}
public static void main(String[] args) {
int httpConnections = Integer.parseInt(args[0]);
int port = Integer.parseInt(args[1]);
NioReactor nio = NioReactor.create();
MapConfiguration config = new MapConfiguration();
config.add("numberOfHttpClients", httpConnections);
Server server = new AsyncHttpServer(nio, port, config);
server.open();
nio.start();
}
}
上面代码的优点是:
- 简单明朗。
- 它只使用一个线程,即关键反应器线程,用于所有网络活动。
- 没有多线程编程,没有阻塞,也没有并发队列。
- 您可以通过启动更多固定到另一个 CPU 内核的 HTTP 服务器来进行扩展。或者通过添加更多机器。
numberOfHttpClients
您可以通过向 google 添加更多 HTTP 客户端(即上面)来增加吞吐量 。
异步消息
如果你已经了解分布式系统的概念,那么下一步就是深入了解 基于异步消息的真正分布式系统。
不是向单个节点发出异步网络请求,而是将消息发送到分布式系统,因此任何节点都可以在必要时采取行动。并且由于异步消息通常通过 UDP 实现,因此可以构建一个真正的分布式系统。
这时候系统就能提供:
并行性(节点可以真正并行运行);
紧密集成(所有节点以相同的顺序看到相同的消息);
解耦(节点可以独立进化);
故障转移/冗余(当一个节点发生故障时,另一个节点可以运行并建立状态以立即接管);
可扩展性/负载平衡(只需添加更多节点);
弹性1(节点可以在活动高峰期滞后而不影响整个系统);
弹性2(节点可以在不关闭整个系统的情况下失败/停止工作)
结论
每个系统最终都必须执行某种需要任意时间才能完成的操作。当然多线程程序变得非常流行时,专用线程模型自然跟不上。
这时候通过使用并发队列,就能创建一个没有多线程复杂性的多线程系统,最重要的是它可以轻松扩展到数千个同时连接。
另一种解决方案,分布式系统不是使用内存中的并发队列来跨线程分配工作,而是使用网络跨节点分配工作,对这些节点进行异步网络调用。
下一个架构步骤是使用异步消息而不是网络请求来设计分布式系统。这样你就可以创建不仅易于扩展而且松耦合的程序,还能提供并行性、紧密集成、故障转移、冗余、负载平衡、弹性等优势。