13518219792

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

鸟瞰Java并发框架

1. 为什么要写这篇文章

创新互联是一家专注于成都网站设计、成都网站建设与策划设计,青原网站建设哪家好?创新互联做网站,专注于网站建设10年,网设计领域的专业建站公司;建站业务涵盖:青原等地区。青原做网站价格咨询:18982081108

几年前 NoSQL 开始流行的时候,像其他团队一样,我们的团队也热衷于令人兴奋的新东西,并且计划替换一个应用程序的数据库。但是,当深入实现细节时,我们想起了一位智者曾经说过的话:“细节决定成败”。最终我们意识到 NoSQL 不是解决所有问题的银弹,而 NoSQL vs RDMS 的答案是:“视情况而定”。类似地,去年RxJava 和 Spring Reactor 这样的并发库加入了让人充满激情的语句,如异步非阻塞方法等。为了避免再犯同样的错误,我们尝试评估诸如 ExecutorService、 RxJava、Disruptor 和 Akka 这些并发框架彼此之间的差异,以及如何确定各自框架的正确用法。

本文中用到的术语在这里有更详细的描述。

2. 分析并发框架的示例用例

3. 快速更新线程配置

在开始比较并发框架的之前,让我们快速复习一下如何配置最优线程数以提高并行任务的性能。这个理论适用于所有框架,并且在所有框架中使用相同的线程配置来度量性能。

参考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/

4. 性能测试结果

性能测试配置 GCP -> 处理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架构:x86_64;CPU 内核:8个(注意:这些结果仅对该配置有意义,并不表示一个框架比另一个框架更好)。

5. 使用执行器服务并行化 IO 任务

5.1 何时使用?

如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 小于可用的核心数量,那么 ExecutorService 可用于并行化任务,更快地执行代码。

5.2 什么时候适用?

如果一个应用程序部署在多个节点上,并且每个节点的 req/sec 远远高于可用的核心数量,那么使用 ExecutorService 进一步并行化只会使情况变得更糟。

当外部服务延迟增加到 400ms 时,性能测试结果如下(请求速率 @50 req/sec,8核)。

5.3 所有任务按顺序执行示例

 
 
 
 
  1. // I/O 任务:调用外部服务 
  2. String posts = JsonService.getPosts(); 
  3. String comments = JsonService.getComments(); 
  4. String albums = JsonService.getAlbums(); 
  5. String photos = JsonService.getPhotos(); 
  6.  
  7. // 合并来自外部服务的响应 
  8. // (内存中的任务将作为此操作的一部分执行) 
  9. int userId = new Random().nextInt(10) + 1; 
  10. String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); 
  11. String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); 
  12.  
  13. // 构建最终响应并将其发送回客户端 
  14. String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; 
  15. return response; 

5.4 I/O 任务与 ExecutorService 并行执行代码示例

 
 
 
 
  1. // 添加 I/O 任务 
  2. List> ioCallableTasks = new ArrayList<>(); 
  3. ioCallableTasks.add(JsonService::getPosts); 
  4. ioCallableTasks.add(JsonService::getComments); 
  5. ioCallableTasks.add(JsonService::getAlbums); 
  6. ioCallableTasks.add(JsonService::getPhotos); 
  7.  
  8. // 调用所有并行任务 
  9. ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); 
  10. List> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks); 
  11.  
  12. // 获取 I/O  操作(阻塞调用)结果 
  13. String posts = futuresOfIOTasks.get(0).get(); 
  14. String comments = futuresOfIOTasks.get(1).get(); 
  15. String albums = futuresOfIOTasks.get(2).get(); 
  16. String photos = futuresOfIOTasks.get(3).get(); 
  17.  
  18. // 合并响应(内存中的任务是此操作的一部分) 
  19. String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); 
  20. String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); 
  21.  
  22. // 构建最终响应并将其发送回客户端 
  23. return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; 

6. 使用执行器服务并行化 IO 任务(CompletableFuture)

与上述情况类似:处理传入请求的 HTTP 线程被阻塞,而 CompletableFuture 用于处理并行任务。

6.1 何时使用?

如果没有 AsyncResponse,性能与 ExecutorService 相同。如果多个 API 调用必须异步并且链接起来,那么这种方法更好(类似 Node 中的 Promises)。

 
 
 
 
  1. ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); 
  2.  
  3. // I/O 任务 
  4. CompletableFuture postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); 
  5. CompletableFuture commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, 
  6.     ioExecutorService); 
  7. CompletableFuture albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, 
  8.     ioExecutorService); 
  9. CompletableFuture photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, 
  10.     ioExecutorService); 
  11. CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get(); 
  12.  
  13. // 从 I/O 任务(阻塞调用)获得响应 
  14. String posts = postsFuture.get(); 
  15. String comments = commentsFuture.get(); 
  16. String albums = albumsFuture.get(); 
  17. String photos = photosFuture.get(); 
  18.  
  19. // 合并响应(内存中的任务将是此操作的一部分) 
  20. String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments); 
  21. String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos); 
  22.  
  23. // 构建最终响应并将其发送回客户端 
  24. return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; 

7. 使用 ExecutorService 并行处理所有任务

使用 ExecutorService 并行处理所有任务,并使用 @suspended AsyncResponse response 以非阻塞方式发送响应。

图片来自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html

7.1 何时使用?

如果用例类似于服务器端聊天应用程序,在客户端响应之前,线程不需要保持连接,那么异步、非阻塞方法比同步通信更受欢迎。在这些用例中,系统资源可以通过异步、非阻塞方法得到更好的利用,而不仅仅是等待。

 
 
 
 
  1. // 为异步执行提交并行任务 
  2. ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize); 
  3. CompletableFuture postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService); 
  4. CompletableFuture commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments, 
  5. ioExecutorService); 
  6. CompletableFuture albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums, 
  7. ioExecutorService); 
  8. CompletableFuture photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos, 
  9. ioExecutorService); 
  10.  
  11. // 当 /posts API 返回响应时,它将与来自 /comments API 的响应结合在一起 
  12. // 作为这个操作的一部分,将执行内存中的一些任务 
  13. CompletableFuture postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture, 
  14. (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments), 
  15. ioExecutorService); 
  16.  
  17. // 当 /albums API 返回响应时,它将与来自 /photos API 的响应结合在一起 
  18. // 作为这个操作的一部分,将执行内存中的一些任务 
  19. CompletableFuture albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture, 
  20. (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos), 
  21. ioExecutorService); 
  22.  
  23. // 构建最终响应并恢复 http 连接,把响应发送回客户端 
  24. postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> { 
  25. LOG.info("Building Async Response in Thread " + Thread.currentThread().getName()); 
  26. String response = s1 + s2; 
  27. asyncHttpResponse.resume(response); 
  28. }, ioExecutorService); 

8. RxJava

8.1 何时使用?

如果编码的场景适合异步非阻塞方式,那么可以*** RxJava 或任何响应式开发库。还具有诸如 back-pressure 之类的附加功能,可以在生产者和消费者之间平衡负载。

 
 
 
 
  1. int userId = new Random().nextInt(10) + 1; 
  2. ExecutorService executor = CustomThreads.getExecutorService(8); 
  3.  
  4. // I/O 任务 
  5. Observable postsObservable = Observable.just(userId).map(o -> JsonService.getPosts()) 
  6. .subscribeOn(Schedulers.from(executor)); 
  7. Observable commentsObservable = Observable.just(userId).map(o -> JsonService.getComments()) 
  8. .subscribeOn(Schedulers.from(executor)); 
  9. Observable albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums()) 
  10. .subscribeOn(Schedulers.from(executor)); 
  11. Observable photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos()) 
  12. .subscribeOn(Schedulers.from(executor)); 
  13.  
  14. // 合并来自 /posts 和 /comments API 的响应 
  15. // 作为这个操作的一部分,将执行内存中的一些任务 
  16. Observable postsAndCommentsObservable = Observable 
  17. .zip(postsObservable, commentsObservable, 
  18. (posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments)) 
  19. .subscribeOn(Schedulers.from(executor)); 
  20.  
  21. // 合并来自 /albums 和 /photos API 的响应 
  22. // 作为这个操作的一部分,将执行内存中的一些任务 
  23. Observable albumsAndPhotosObservable = Observable 
  24. .zip(albumsObservable, photosObservable, 
  25. (albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos)) 
  26. .subscribeOn(Schedulers.from(executor)); 
  27.  
  28. // 构建最终响应 
  29. Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2) 
  30. .subscribeOn(Schedulers.from(executor)) 
  31. .subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error")); 

9. Disruptor

[Queue vs RingBuffer]

图片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

图片2:https://www.baeldung.com/lmax-disruptor-concurrency

9.1 何时使用?

Disruptor 框架在下列场合性能更好:与事件驱动的体系结构一起使用,或主要关注内存任务的单个生产者和多个消费者。

 
 
 
 
  1. static { 
  2.     int userId = new Random().nextInt(10) + 1; 
  3.  
  4.     // 示例 Event-Handler; count down latch 用于使线程与 http 线程同步 
  5.     EventHandler postsApiHandler = (event, sequence, endOfBatch) -> { 
  6.         event.posts = JsonService.getPosts(); 
  7.         event.countDownLatch.countDown(); 
  8.     }; 
  9.  
  10.     // 配置 Disputor 用于处理事件 
  11.     DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler) 
  12.     .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2) 
  13.     .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2) 
  14.     .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2); 
  15.     DISRUPTOR.start(); 
  16.  
  17. // 对于每个请求,在 RingBuffer 中发布一个事件: 
  18. Event event = null; 
  19. RingBuffer ringBuffer = DISRUPTOR.getRingBuffer(); 
  20. long sequence = ringBuffer.next(); 
  21. CountDownLatch countDownLatch = new CountDownLatch(6); 
  22. try { 
  23.     event = ringBuffer.get(sequence); 
  24.     event.countDownLatch = countDownLatch; 
  25.     event.startTime = System.currentTimeMillis(); 
  26. } finally { 
  27.     ringBuffer.publish(sequence); 
  28. try { 
  29.     event.countDownLatch.await(); 
  30. } catch (InterruptedException e) { 
  31.     e.printStackTrace(); 

10. Akka

图片来自:https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/

10.1 示例代码

 
 
 
 
  1. // 来自 controller : 
  2. Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender()); 
  3.  
  4. // handler : 
  5. public Receive createReceive() { 
  6.     return receiveBuilder().match(Request.class, request -> { 
  7.     Event event = request.event; // Ideally, immutable data structures should be used here. 
  8.     request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf()); 
  9.     request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf()); 
  10.     request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf()); 
  11.     request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf()); 
  12.     }).match(Event.class, e -> { 
  13.     if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) { 
  14.     int userId = new Random().nextInt(10) + 1; 
  15.     String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts, 
  16.     e.comments); 
  17.     String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums, 
  18.     e.photos); 
  19.     String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser; 
  20.     e.response = response; 
  21.     e.countDownLatch.countDown(); 
  22.     } 
  23.     }).build(); 

11. 总结


分享名称:鸟瞰Java并发框架
网页地址:http://cdbrznjsb.com/article/dhchhoj.html

其他资讯

让你的专属顾问为你服务