【译】ExecutorService、RxJava、Disruptor 和Akka等并发框架的对比转载
几年前NoSQL 流行时,就像其他团队一样,我们的团队也对新的东西充满热情,于是计划在其中一个程序中试一试。但当深入到实现的细节时,意识到了一句话——“魔鬼在细节”。事实证明NoSQL 并不是解决所有问题的灵丹妙药。
对于 NoSQL VS RDMS : “适合最重要!”
类似地这几年,RxJava 和 Spring Reactor 等并发框架也成为趋势,还有异步、非阻塞的方法也一样。为了不再犯同样的错误,我们常识评估ExecutorService、RxJava、Disruptor 和 Akka 等并发框架有何不同,以及如何为各自的框架确定正确的用例。
本文中使用的术语在 此处进行了更详细的描述。
分析并发框架的示例用例
简单回顾线程配置
在比较并发框架之前,让我们快速回顾一下如何配置最佳线程数以提高并行任务的性能。该理论适用于所有框架,并且所有框架都使用相同的线程配置来衡量性能。
- 对于内存中的任务,线程数大约等于具有最佳性能的内核数,尽管它可能会根据各自处理器中的超线程特性而有所变化。
- 例如,在 8 核机器中,如果对应用程序的每个请求必须并行执行四个内存中任务,那么这台机器上的负载应该保持在 2 req/sec 和 8 个线程
ThreadPool
。
- 例如,在 8 核机器中,如果对应用程序的每个请求必须并行执行四个内存中任务,那么这台机器上的负载应该保持在 2 req/sec 和 8 个线程
- 对于 I/O 任务,配置的线程数
ExecutorService
应该基于外部服务的延迟。- 与内存中的任务不同的是,I/O 任务中涉及的线程会被阻塞并处于等待状态,直到外部服务响应或超时。因此,当涉及 I/O 任务时,由于线程被阻塞,应增加线程数以处理来自并发请求的额外负载。
- I/O 任务的线程数应该保守地增加,因为许多线程处于 Active 状态会带来上下文切换的成本,这会影响应用程序的性能。为了避免这种情况,这台机器的确切线程数和负载应该与 I/O 任务中涉及的线程的等待时间成比例地增加。
参考:http ://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/
性能结果对比
性能测试运行在 GCP -> 处理器型号名称:Intel(R) Xeon(R) CPU @ 2.30GHz;
架构:x86_64;
核心数:8
(注意:这些结果对这个用例是主观的,并不意味着一个框架比另一个更好)。
标签 | # 请求 | I/O 任务的线程池大小 | 以毫秒为单位的平均延迟(50 个请求/秒) |
所有操作都按顺序排列 | ~10000 | 不适用 | ~2100 |
使用 Executor Service 并行化 IO 任务并使用 HTTP 线程执行内存中的任务 | ~10000 | 16 | ~1800 |
使用 Executor Service (Completable Future) 并行化 IO 任务,并将 HTTP 线程用于内存中的任务 | ~10000 | 16 | ~1800 |
使用 ExecutorService 并行化所有任务,并用于@Suspended AsyncResponse response 以非阻塞方式发送响应 |
~10000 | 16 | ~3500 |
使用 Rx-Java 执行所有任务并使用@Suspended AsyncResponse response 以非阻塞方式发送响应 |
~10000 | 不适用 | ~2300 |
使用 Disruptor 框架并行化所有任务(Http 线程将被阻塞) | ~10000 | 11 | ~3000 |
使用 Disruptor 框架并行化所有任务,并用于@Suspended AsyncResponse response 以非阻塞方式发送响应 |
~10000 | 12 | ~3500 |
使用 Akka 框架并行化所有任务(Http 线程将被阻塞) | ~10000 | ~3000 |
使用 Executor Service 并行化 IO 任务
什么时候使用?
如果应用程序部署在多个节点中,并且每个节点中的 req/sec 小于可用内核数, ExecutorService
则可用于并行化任务并更快地执行代码。
什么时候不使用?
如果一个应用程序部署在多个节点中,并且每个节点中的 req/sec 远远高于可用的核心数,那么使用 ExecutorService
进一步并行化只会让事情变得更糟。
外部服务延迟增加到 400 毫秒时的性能结果(8 核机器中的请求速率@50 req/sec)。
标签 | # 请求 | I/O 任务的线程池大小 | 以毫秒为单位的平均延迟(50 个请求/秒) |
所有操作都按顺序排列 | ~3000 | 不适用 | ~2600 |
使用 Executor Service 并行化 IO 任务并使用 HTTP 线程执行内存中的任务 | ~3000 | 24 | ~3000 |
所有任务按顺序执行时的示例:
// I/O tasks : invoke external services
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();
// merge the response from external service
// (in-memory tasks will be performed as part this operation)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
// build the final response to send it back to client
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;
I/O 任务与 ExecutorService 并行执行时的代码示例
使用 Executor Service 并行化 IO 任务(CompletableFuture)
这与上述情况类似:处理传入请求的 HTTP 线程将被阻塞,并 CompletableFuture
用于处理并行任务
什么时候使用?
如果没有 AsyncResponse
, 性能与 ExecutorService.
If multiple API calls has been async and if it has to be chained, 这种方法更好(类似于 Node.js 中的 Promises)。
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
// I/O tasks
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();
// get response from I/O tasks (blocking call)
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();
// merge response (in-memory tasks will be part of this operation)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);
// Build final response to send it back to client
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
并行化所有任务 ExecutorService
ExecutorService
使用 和并行化所有任务,以 @Suspended AsyncResponse response
非阻塞方式发送响应。
[io 与 nio]
- 传入的请求将通过事件池进行处理,并将请求传递到执行器池进行进一步处理,当所有任务完成后,来自事件池的另一个 HTTP 线程会将响应发送回客户端。(异步和非阻塞)。
- 性能下降的原因:
- 在同步通信中,虽然参与 I/O 任务的线程被阻塞了,但只要进程有额外的线程来承担并发请求的负载,进程仍然会处于运行状态。
- 因此,以非阻塞方式保持线程所带来的好处非常少,并且以这种模式处理请求所涉及的成本似乎很高。
- 通常情况下,对我们这里讨论的用例使用异步非阻塞方法会降低应用程序性能。
什么时候使用?
如果用例类似于服务器端聊天应用程序,其中线程不需要在客户端响应之前保持连接,那么异步、非阻塞方法可以优于同步通信;在这些用例中,可以通过异步、非阻塞方法更好地利用系统资源,而不仅仅是等待。
// submit parallel tasks for Async execution
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);
// When /posts API returns a response, it will be combined with the response from /comments API
// and as part of this operation, some in-memory tasks will be performed
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);
// When /albums API returns a response, it will be combined with the response from /photos API
// and as part of this operation, some in-memory tasks will be performed
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);
// Build the final response and resume the http-connection to send the response back to client.
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);
RxJava/RxNetty
- RxJava/RxNetty 组合的主要区别在于,它可以通过使 I/O 任务完全非阻塞来处理带有事件池的传入和传出请求。
- 此外,RxJava 提供了更好的 DSL 来以流畅的方式编写代码,这在本示例中可能不可见。
- 性能优于处理并行任务
CompletableFuture
什么时候使用?
如果异步、非阻塞方法适合用例,则可以首选 RxJava 或任何响应式库。它具有诸如背压之类的附加功能,可以平衡生产者和消费者之间的负载。
// non blocking API call from Application - getPosts()
HttpClientRequest<ByteBuf, ByteBuf> request = HttpClient.newClient(MOCKY_IO_SERVICE, 80)
.createGet(POSTS_API).addHeader("content-type", "application/json; charset=utf-8");
rx.Observable<String> rx1ObservableResponse = request.flatMap(HttpClientResponse::getContent)
.map(buf -> getBytesFromResponse(buf))
.reduce(new byte[0], (acc, bytes) -> reduceAndAccumulateBytes(acc, bytes))
.map(bytes -> getStringResponse(bytes, "getPosts", startTime));
int userId = new Random().nextInt(10) + 1;
// Submit parallel I/O tasks for each incoming request.
Observable<String> postsObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getPosts());
Observable<String> commentsObservable = Observable.just(userId)
.flatMap(o -> NonBlockingJsonService.getComments());
Observable<String> albumsObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getAlbums());
Observable<String> photosObservable = Observable.just(userId).flatMap(o -> NonBlockingJsonService.getPhotos());
// When /posts API returns a response, it will be combined with the response from /comments API
// and as part of this operation, some in-memory tasks will be performed
Observable<String> postsAndCommentsObservable = Observable.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments));
// When /albums API returns a response, it will be combined with the response from /photos API
// and as part of this operation, some in-memory tasks will be performed
Observable<String> albumsAndPhotosObservable = Observable.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos));
// build final response
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribe((response) -> asyncResponse.resume(response), e -> {
LOG.error("Error", e);
asyncResponse.resume("Error");
});
Disruptor
[队列与 RingBuffer]
- 在此示例中,HTTP 线程将被阻塞,直到中断器完成任务并且
CountDownLatch
已使用 a 将 HTTP 线程与来自ExecutorService
. - 该框架的主要特点是在没有任何锁的情况下处理线程间通信;在 中
ExecutorService
,生产者和消费者之间的数据将通过队列传递,并且在Lock
生产者和消费者之间的数据传输过程中涉及到一个。Disruptor 框架Locks
借助称为 Ring Buffer 的数据结构(它是循环数组队列的扩展版本)来处理这种 Producer-Consumer 通信而无需任何处理。 - 这个库不适用于我们在这里讨论的用例。它只是出于好奇而添加的。
什么时候使用?
Disruptor 框架在与事件驱动的架构模式一起使用时以及当有一个生产者和多个消费者主要关注内存中的任务时表现更好。
static {
int userId = new Random().nextInt(10) + 1;
// Sample Event-Handler; count down latch is used to synchronize the thread with http-thread
EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {
event.posts = JsonService.getPosts();
event.countDownLatch.countDown();
};
// Disruptor set-up to handle events
DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
.handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
.thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
.handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
DISRUPTOR.start();
}
// for each request, publish an event in RingBuffer:
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
event = ringBuffer.get(sequence);
event.countDownLatch = countDownLatch;
event.startTime = System.currentTimeMillis();
} finally {
ringBuffer.publish(sequence);
}
try {
event.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Akka
- Akka 库的主要优点是它具有构建分布式系统的原生支持。
- 它运行在一个叫做 Actor System 的系统上,它抽象了线程的概念,Actor System 中的 Actor 通过异步消息进行通信,类似于 Producer 和 Consumer 之间的通信。
- 这种额外的抽象级别有助于 Actor 系统提供容错、位置透明度等功能。
- 使用正确的 Actor-to-Thread 策略,可以优化此框架,使其性能优于上表中显示的结果。尽管它无法在单个节点上与传统方法的性能相媲美,但它仍然可以因其构建分布式和弹性系统的能力而受到青睐。
示例代码
// from controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());
// handler :
public Receive createReceive() {
return receiveBuilder().match(Request.class, request -> {
Event event = request.event; // Ideally, immutable data structures should be used here.
request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
}).match(Event.class, e -> {
if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
e.comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
e.photos);
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
e.response = response;
e.countDownLatch.countDown();
}
}).build();
}
概括总结
- 根据机器负载决定 Executor 框架的配置,并根据应用程序中并行任务的数量检查是否可以进行负载均衡。如果 I/O 任务的最佳线程数计算正确完成,那么这种方法通常会在性能结果中获胜。
- 使用反应式或任何异步库会降低大多数传统应用程序的性能。仅当用例类似于服务器端聊天应用程序时,此模式才有用,其中线程无需保留连接,直到客户端响应。
- 与事件驱动架构模式一起使用时,Disruptor 框架的性能很好;但是当 Disruptor 模式与传统架构和我们在这里讨论的用例混合时,它就达不到标准了。需要注意的是,Akka 和 Disruptor 库值得单独发布一篇文章,介绍如何将它们与事件驱动的架构模式一起使用。
这篇文章的源代码可以在GitHub上找到。