完善资料让更多小伙伴认识你,还能领取20积分哦, 立即完善>
CompletableFuture为什么它在依赖于不同的异类异步执行任务的交互的编程系统中如此有用?以及它如何补充fork / join样式的并行性?
|
|
相关推荐
1个回答
|
|
java异步回调和同步回调
最近,在准备有关Java并行流的讨论时,我注意到经典文章“ 免费午餐结束 ”(TFLiO)已经过了十岁生日。 对于大多数程序员而言,本文及其伴随的宣传是他们的第一个警告,即处理器速度呈指数增长的40年趋势即将结束,实际上已经结束了。 用不同的趋势代替它,即增加每个芯片上的处理器数量,这意味着用赫伯·萨特(Herb Sutter)的话来说,程序员必须“从根本上转向并发”。 在TFLiO中,Sutter观察到绝大多数程序员“不依赖并发性”,但补充说“一旦抱怨了,基于锁的编程并不比OO难。” 第一个陈述无疑是正确的,但是基于锁的并发的十年经验却无法证实第二个陈述。 幸运的是,自从TFLiO发布时,Java 5才刚刚可用,并且它的高级并发实用程序就开始使用了,Java程序员基本上能够避免对其进行测试。 这些使Java开发人员几乎可以避免对同步和关键部分进行细粒度的推理。 从并发到并行,再到并行 Java 5并发库专注于异步任务处理,它基于生产者线程模型来创建任务,并通过阻塞队列将其移交给任务消费者。 在Java 7和8中,该模型得到了增强,它支持另一种任务执行样式,包括将任务数据集分解为子集,然后可以通过独立的同质子任务来处理每个子集。 这种样式的基本库是fork / join框架,它使程序员可以规定应如何拆分数据集,并支持将子任务提交到标准默认线程池。 “普通” ForkJoinPool 。 (在本文中,不合格的类和接口名称将引用程序包java.util.concurrent中的类型。)在Java 8中,通过并行流机制可以更轻松地访问fork / join并行性。 但是,并非所有问题都适用于这种并行处理方式:元素处理必须独立,数据集必须足够大,并且每个元素的处理成本要足够高才能进行并行加速,以补偿设置fork /加入框架。 同时,对Java 8中并行流的创新的关注已从对并发库的实质性添加(以CompletableFuture《T》类的形式)转移了注意力。 本文将探讨CompletableFuture ,以解释为什么它在依赖于不同的异类异步执行任务的交互的编程系统中如此有用,以及它如何补充fork / join样式的并行性,包括并行流。 页面渲染器 我们的起点将是从“ Java并发实践”(JCiP)中借用的示例,这是Java 5并发实用程序的经典说明。 在JCiP§6.3中,Brian Goetz探索了网页渲染器的开发,其每个页面的任务是渲染其文本以及下载和渲染其图像。 图像下载需要很长时间,在此期间,CPU除了等待外别无其他。 因此,显而易见的策略是通过首先启动其所有图像的下载,然后使用它们完成呈现页面文本之前的时间,最后呈现下载的图像来呈现页面。 页面渲染器的第一个JCiP版本使用Future的概念,该界面公开了允许客户端监视其他线程正在执行的任务进度的方法。 在清单1中,代表下载页面所有图像的任务的Callable提交给执行器,执行器返回一个Future,通过它可以查询下载任务的状态。 主线程完成页面文本的呈现后,将调用Future.get ,该方法将阻塞,直到所有下载的结果都可以作为List《ImageData》完全使用。 明显的缺点是下载任务的粗粒度性质。 在下载所有图像之前,没有图像可用于渲染。 接下来,我们将看到如何缓解该问题。 public void renderPage(CharSequence source) { List《ImageInfo》 info = scanForImageInfo(source); // create Callable representing download of all images final Callable《List《ImageData》》 task = () -》 info.stream() .map(ImageInfo::downloadImage) .collect(Collectors.toList()); // submit download task to the executor Future《List《ImageData》》 images = executor.submit(task); // renderText(source); try { // get all downloaded images (blocking until all are available) final List《ImageData》 imageDatas = images.get(); // render images imageDatas.forEach(this::renderImage); } catch (InterruptedException e) { // Re-assert the thread’s interrupted status Thread.currentThread().interrupt(); // We don’t need the result, so cancel the task too images.cancel(true); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } 清单1.使用Future等待所有图像下载 一些简化的假设使此示例及其后续变型易于管理:我们假设存在类型ImageInfo (大致为URL)和ImageData (图像的二进制数据),以及方法scanForImageInfo, downloadImage, renderText, renderImage, launderThrowable,和ImageInfo.downloadImage 。 实例变量executor可以用ExecutorService类型声明并进行适当初始化。 原始JCiP示例的代码已在本文中使用Java 8 lambda和流进行了现代化。 该代码必须等待所有下载完成的原因是,作为异步执行任务的模型,它用来表示下载任务的Future接口受到很大限制。 Future允许客户查询任务的结果,是否有必要或状态(即任务是否已完成或被取消)。 但是Future本身不提供任何提供回调方法的方法,该方法允许在每个图像下载完成后通知页面渲染线程。 清单2通过将页面下载任务提交给CompletionService改进了上一示例的粗粒度,该清单的poll和take方法以其任务完成的顺序产生相应的Future实例,而清单1则在列表中处理了这些任务。他们被提交的顺序。 该接口的平台实现ExecutorCompletionService通过将每个任务包装在FutureTask Future实现FutureTask , Future实施还允许提供完成回调。 为ExecutorCompletionService创建的Future指定的回调操作将已完成任务的添加封装到队列中,以供客户端查询。 public void renderPage(CharSequence source) { List《ImageInfo》 info = scanForImageInfo(source); CompletionService《ImageData》 completionService = new ExecutorCompletionService《》(executor); // submit each download task to the completion service info.forEach(imageInfo -》 completionService.submit(imageInfo::downloadImage)); renderText(source); // retrieve each RunnableFuture as it becomes // available (and when we are ready to process it)。 for (int t = 0; t 《 info.size(); t++) { Future《ImageData》 imageFuture = completionService.take(); renderImage(imageFuture.get()); } } 清单2:使用CompletionService在图像可用时渲染图像(为简洁起见,省略了中断和错误处理) 介绍CompletableFuture 清单2代表了从Java 5到去年的最新威廉希尔官方网站 ,当时通过引入CompletableFuture (CF)类极大地提高了Java在编程异步系统方面的表现力。 此类是Future的实现,该实现将在与任务不同的线程中执行的回调与在同一线程中执行的同步延续函数放在相同的位置。 通过允许CF实例与回调方法组成新的CF ,它避免了常规回调的最大问题,即将控制流分离到不同的事件处理程序中。 一个例子是thenAccept方法,该方法接受一个Consumer (用户提供的空轴承功能)并返回一个新的CF 返回的CF具有将Consumer应用于完成原始CF的结果的效果。 像许多其他CF方法一样, thenAccept具有两个其他变体:在一个中, Consumer由公共fork / join池中的线程执行; 在第二个中,它由您随调用提供的Executor中的线程Executor 。 重载的这种三重作用-同步运行,在公共ForkJoinPool异步运行,以及在提供的线程池中异步运行-对于否则将导致近60种CompletableFuture方法令人生畏的情况,应承担大部分责任。 这是thenAccept的示例,用于重新实现页面渲染器: public void renderPage(CharSequence source) { List《ImageInfo》 info = scanForImageInfo(source); info.forEach(imageInfo -》 CompletableFuture .supplyAsync(imageInfo::downloadImage) .thenAccept(this::renderImage)); renderText(source); } 清单3:使用CompletableFuture实现页面渲染 尽管清单3比以前的版本明显更简洁,但是通过一些实践,它的样式就变得易于阅读。 工厂方法supplyAsync返回一个新的CF ,它将通过在公共ForkJoinPool运行指定的Supplier来完成,并将Supplier的结果作为CF的结果。 thenAccept方法thenAccept返回一个新的CF ,它将通过在supplyAsync生成的CF的结果上执行指定的Consumer在这种情况下渲染所提供的图像)来完成此supplyAsync 可以肯定的是, thenAccept并不是将CF与函数组合的唯一方法。 用函数组成CF的方法可以采用以下参数: 将应用于CF结果的函数。 接受这些方法的方法是: thenCompose ,用于返回CompletableFuture函数 thenApply ,用于返回其他类型的函数; thenAccept接受返回void的函数; 一个Runnable 。 这将被thenRun方法thenRun ; 可以处理正常终止和异常终止的功能。 CF分别通过以下两种方法组成: handle ,用于采用值和Throwable并返回值的函数; whenComplete ,用于采用一个值和Throwable并返回void的函数。 扩展页面渲染器 此示例的扩展可以说明CompletableFuture其他功能。 例如,假设我们要在图像下载超时或失败的地方使用图标作为视觉指示器。 如果CF尚未完成,则CF公开一个get(long, TimeUnit)方法get(long, TimeUnit)该方法在指定时间段后引发TimeoutException 。 它可以用来定义从ImageInfo到ImageData的功能(清单4)。 Function《ImageInfo, ImageData》 infoToData = imageInfo -》 { CompletableFuture《ImageData》 imageDataFuture = CompletableFuture.supplyAsync(imageInfo::downloadImage, executor); try { return imageDataFuture.get(5, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); imageDataFuture.cancel(true); return ImageData.createIcon(e); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } catch (TimeoutException e) { return ImageData.createIcon(e); } } 清单4:使用CompletableFuture.get实现超时 现在可以通过连续调用infoToData来呈现页面。 每个调用都会同步返回一个下载的映像,因此要并行下载它们,需要为每个映像创建一个新的异步任务。 合适的工厂方法是CompletableFuture.runAsync() ,类似于supplyAsync但是采用Runnable而不是Supplier : public void renderPage(CharSequence source) throws InterruptedException { List《ImageInfo》 info = scanForImageInfo(source); info.forEach(imageInfo -》 CompletableFuture.runAsync(() -》 renderImage(infoToData.apply(imageInfo)), executor)); } 现在考虑一个进一步的要求,即在所有请求都已完成或超时时在页面上放置一个指示器-这种情况可以通过所有相应CompletableFutures的join方法返回来表明。 在这种情况下提供了静态方法allOf ,以允许创建一个返回空值的CompletableFuture ,当其所有组件完成时就完成了此操作。 ( join方法通常可用于返回CF结果,但要查看由allOf组成的CF的结果,必须分别查询它们。) public void renderPage(CharSequence source) { List《ImageInfo》 info = scanForImageInfo(source); CompletableFuture[] cfs = info.stream() .map(ii -》 CompletableFuture.runAsync( () -》 renderImage(mapper.apply(ii)), executor)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(cfs).join(); renderImage(ImageData.createDoneIcon()); } 结合多个可完成的未来 另一组方法允许多个CF组合。 我们已经看到了静态方法allOf ,该方法在其所有组件都完成时完成。 当其任何组件完成时,其双重anyOf也将带有空隙)完成。 除了这两种方法外,该组中的所有其他方法都是实例方法,它们以某种方式与另一个CF组成接收器,然后将结果传递给提供的函数。 为了说明这些方法是如何工作的,让我们扩展JCiP中的下一个示例,即旅行预订门户的示例,以便将关联的一组预订的进度记录在TripPlan ,其中包含总价格以及服务供应商列表用过的: interface TripPlan { List《ServiceSupplier》 getSuppliers(); int getPrice(); TripPlan combine(TripPlan); } ServiceSupplier航空公司或酒店)可以创建TripPlan :(实际上,实际上, ServiceSupplier.createPlan将接受与目的地,旅行舱等相对应的参数。) interface ServiceSupplier { TripPlan createPlan(); String getAlliance(); // for use later } 要选择最佳旅行计划,我们需要向每个服务提供商查询他们的旅行计划,然后使用Comparator反映出我们选择标准的结果计划(在这种情况下,就是最低价): TripPlan selectBestTripPlan(List《ServiceSupplier》 serviceList) { List《CompletableFuture《TripPlan》》 tripPlanFutures = serviceList.stream() .map(svc -》 CompletableFuture.supplyAsync(svc::createPlan, executor)) .collect(toList()); return tripPlanFutures.stream() .min(Comparator.comparing(cf -》 cf.join().getPrice())) .get().join(); } 请注意中间collect操作,这是由于中间操作对流的惰性所必需的。 没有它,流终端操作将是min ,其执行首先需要为tripPlanFutures每个元素执行join 。 相反,在代码站立时,将collect终端操作,该操作将累积由map操作产生的CF值而不会发生阻塞,从而允许基础任务同时执行。 如果检索最佳航空公司和酒店旅行计划的任务是独立的,则我们希望同时启动它们,就像前面示例的图像下载一样。 要以这种方式组合两个CF ,请使用CompletableFuture.thenCombine ,它并行执行接收器和提供的CF ,然后使用提供的功能合并它们的结果(我们假设airlines , hotels和(后来的) cars变量具有被声明为List《TravelService》类型并适当初始化: CompletableFuture .supplyAsync(() -》 selectBestTripPlan(airlines)) .thenCombine( CompletableFuture.supplyAsync(() -》 selectBestTripPlan(hotels)), TripPlan::combine); 扩展示例很有指导意义。 假设服务提供者每个都隶属于以String财产alliance代表的旅行alliance 。 独立预订航空公司和酒店后,我们可能会决定,如果两者都属于同一个联盟,那么唯一值得考虑的租车服务就是那些也属于该联盟的租车服务: private TripPlan addCarHire(TripPlan p) { List《String》 alliances = p.getSuppliers().stream() .map(ServiceSupplier::getAlliance) .distinct() .collect(toList()); if (alliances.size() == 1) { return p.combine(selectBestTripPlan(cars, alliances.get(0))); } else { return p.combine(selectBestTripPlan(cars)); } } selectBestTripPlan的新重载将接受selectBestTripPlan联盟的String ,如果存在,则使用它来过滤要使用的服务流: private TripPlan selectBestTripPlan( List《ServiceSupplier》 serviceList, String favoredAlliance) { List《CompletableFuture《TripPlan》》 tripPlanFutures = serviceList.stream() .filter(ts -》 favoredAlliance == null || ts.getAlliance().equals(favoredAlliance)) .map(svc -》 CompletableFuture.supplyAsync(svc::createPlan, executor)) .collect(toList()); 。.. } 在这种情况下, CF选择汽车租赁服务是依赖于CF的综合飞行和酒店预订任务。 仅当同时预订了机票和酒店后,它才能完成。 然后,实现此关系的方法是thenCompose : CompletableFuture.supplyAsync(() -》 selectBestTripPlan(airlines)) .thenCombine( CompletableFuture.supplyAsync(() -》 selectBestTripPlan(hotels)), TripPlan::combine) .thenCompose(p -》 CompletableFuture.supplyAsync(() -》 addCarHire(p))); 由合并的航班和酒店预订产生的CF被执行,其结果(合并的TripPlan )成为TripPlan函数参数的thenCompose 。 生成的CF巧妙地封装了不同异步服务的依赖关系。 这段代码简洁的关键在于,尽管thenCompose组成了两个CF ,但是它没有返回您可能期望的CompletableFuture《CompletableFuture《TripPlan》》 ,而是返回了CompletableFuture《TripPlan》 。 因此,无论在创建CF应用了多少层次的合成,它都不会嵌套而是变平,并且只需要执行一项操作即可检索其结果。 这就是CF monad的“ bind”操作(名称来自Haskell)的特征,并解释了对monad的一些热情:例如,在这种情况下,我们能够以函数形式编写否则,在单独的回调中需要一系列尴尬的显式任务定义。 thenCombine方法只是可以构成两个CF的多种方式之一。 其他包括: thenAcceptBoth :类似于thenCombine但是采用返回void的函数; runAfterBoth ,它在两个CF完成后接受Runnable来执行; applyToEither ,它采用一元函数, applyToEither其提供以CF最先完成的结果为准; acceptEither :类似于applyToEither,但是采用一元函数,结果无效; runAfterEither :在任一CF完成后接受Runnable执行。 结论 在短篇文章中,不可能彻底探索像CompletableFuture这样的API,但是我希望这里的示例能够使人们对它所启用的并发编程风格有所了解。 将CompletableFutures与其他功能以及其他功能组合在一起,可以为多个任务构造类似管道的配方,并控制同步或异步执行以及它们之间的依赖关系。 您可能希望更详细地探讨的方面包括异常处理,选择和配置执行程序的实际方面以及异步API设计中出现的有趣挑战。 我希望弄清楚Java 8中提供的两种并发编程样式之间的关系。在应用fork / join并行性(包括并行流)的情况下,它可以非常有效地在多个内核之间分配工作。 但是使用它的标准非常狭窄:数据集应该很大并且可以有效拆分,对单个数据元素的操作应该(合理地)彼此独立,这些操作应该相当昂贵,并且它们会占用大量CPU。 如果不满足这些条件,特别是如果您的任务花费大量时间阻止I / O或网络请求,则CompletableFuture是更好的选择。 作为Java程序员,我们很幸运有一个集成了这些互补方法的平台库。 |
|
|
|
只有小组成员才能发言,加入小组>>
4600个成员聚集在这个小组
加入小组3357 浏览 0 评论
航顺(HK)联合电子发烧友推出“近距离体验高性能Cortex-M3,免费申请价值288元评估板
4279 浏览 1 评论
4306 浏览 0 评论
小黑屋| 手机版| Archiver| 电子发烧友 ( 湘ICP备2023018690号 )
GMT+8, 2025-1-8 06:12 , Processed in 0.600044 second(s), Total 73, Slave 57 queries .
Powered by 电子发烧友网
© 2015 bbs.elecfans.com
关注我们的微信
下载发烧友APP
电子发烧友观察
版权所有 © 湖南华秋数字科技有限公司
电子发烧友 (电路图) 湘公网安备 43011202000918 号 电信与信息服务业务经营许可证:合字B2-20210191 工商网监 湘ICP备2023018690号