程序员社区

从浅入深掌握并发执行框架Executor

引言

从浅入深掌握并发执行框架Executor插图

 

任务的执行 

大多数并发应用程序都是围绕“任务执行(Task  Execution)”来构造的:任务通常是一些抽象的且离散的工作单元。

任务通常是一些抽象的且离散的工作单元。通过把应用程序的工作分解到多个任务中,可以简化程序的组织结构,提供一种自然的事务边界来优化错误恢复过程,以及提供一种自然的并行工作结构来提升并发性。

 


 

一、在线程中执行任务

当围绕“任务执行”来设计应用程序时,第一步是要找出清晰的任务边界。

在理想情况下,各个任务之间是相互独立的:任务不依赖其他任务的状态,结果或边界效应。

独立性有助于实现并发,因为如果存在足够的处理资源,那么这些独立的任务都可以并行的执行。

 

在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。应用程序提供商希望程序尽支持可能多的用户,从而降低每个用户的服务成本,而用户则希望获得尽可能快的响应。而且,当负荷过载时,应用程序的性能应该是逐渐降低,而不是直接失败。要实现上述目标,应该选择清晰的任务边界以及明确的任务执行策略。

 

大多数服务器应用程序都提供了一种自然的任务边界选择方式:以独立的客户请求为边界。比如Web服务器,邮件服务器,文件服务器,EJB容器以及数据库服务器等。

 

1.1 串行执行任务

 

应用程序中可以通过多种策略来调度任务,其中最简单的策略就是在单个线程中串行执行各项任务。

这是最经典的一个最简单的Socket server的例子,服务器的资源利用率非常低,因为单线程在等待I/O操作完成时,CPU处于空闲状态。从而阻塞了当前请求的延迟,还彻底阻止了其他等待中的请求被处理。

代码 6-1 串行的Web服务器
public class SingleThreadWebServer {

    public static void main(String[] args) throws IOException {

        ServerSocket socket = new ServerSocket(80);

        while (true) {

            Socket connection = socket.accept();

            handleRequest(connection);

        }

    }

 

    private static void handleRequest(Socket connection) {

        // request-handling logic here

    }

}

串行的处理机制通常无法提供高吞吐率或者快速响应性。不过也有一种例外,例如当任务数量很少且执行时间很长时,或者当服务器只为单个用户提供服务,并且客户每次只发出一个请求时。但大多数服务器并不是按照这种方式来工作的。

 

1.2 显示的创建的线程任务

通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性。如下所示:

代码 6-2 Web服务器为每个请求启动一个线程

class ThreadPerTaskWebServer{

    public stati void main(String[] args) throws IOException{

        ServerSocket socket = new ServerSocket(80);

        while (true){

            final Socket connection = socket.accept();

            Runnable task = new Runnable(){

                public void run(){

                    handleRequest(connection);

                }

            };


            new Thread(task).start();
        }

    }

}

只要请求的到达速率不超出服务器的处理能力,这种方法可以同时带来更快的响应性和更高的吞吐率。

 

1.3 无限制创建线程的不足

在生产环境中,“为每个任务分配一个线程”的做法存在一定缺陷,尤其在需要大量线程的场景:

  • 线程生命周期的开销非常高。

线程的创建过程需要时间,延迟处理的请求,并且需要JVM和操作系统提供一些辅助帮助。如果请求的到达率非常高且请求的处理过程是轻量级的,那么为每个请求创建一个新线程会消耗大量的计算资源。

  • 资源消耗。

活跃的线程会消耗系统资源,尤其是内存。如果已经拥有足够的多的线程使CPU保持忙碌状态,那么在创建更多的线程反而会降低性能。同时会有些线程将闲置,大量的空闲线程,那么会占用许多内存,给垃圾回收器带来压力。

  • 稳定性。

可创建线程的数量存在一个限制。这个限制值将随着平台的不同而不同。如果超过了这些限制,那么很可能抛出OutOfMemoryError异常。

 

 

 


 

二、Executor框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。为每个任务分配一个线程的执行策略,不仅存在诸多不足,同时资源的管理也比较复杂。线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是THread而是Executor。

代码 6-3 Executor接口

public interface Executor {

    /**

     * Executes the given command at some time in the future.  The command

     * may execute in a new thread, in a pooled thread, or in the calling

     * thread, at the discretion of the {@code Executor} implementation.

     *

     * @param command the runnable task

     * @throws RejectedExecutionException if this task cannot be

     * accepted for execution

     * @throws NullPointerException if command is null

     */
    void execute(Runnable command);

}

虽然 Executor 是个简单的接口,但是却为灵活且强大的异步任务执行框架提供了基础。该框架能支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。Executor的实现还支持对生命周期的支持,以及统计信息的收集、应用程序管理机制和性能监视等机制。

Executor基于生产者—消费者设计模式,提交任务的操作单元相当于生产者(生成待完成的工作单元),执行任务的线程相当于消费者(执行完这些工作单元)。

 

2.1 示例:基于ExecutorWeb服务器

基于Executor来构建Web服务器是非常容易的。ThreadPerTaskWebServer使用了一种标准的Executor实现,即一个固定长度的线程池,可以容纳100个线程。

代码 6-4 基于线程池的Web服务器

class ThreadPerTaskWebServer{

    //定义线程池大小

    private static final int NTHREAD = 100;

    //定义Executor

    private static final Executor exec = 

        Executors.newFixedThreadPool(NTHREAD);

    public static void main(String[] args) throws IOException{

        ServerSocket socket = new ServerSocket(80);

        while (true){

            final Socket connection = socket.accept();

            Runnable task = new Runnable(){

                public void run(){

                    handleRequest(connection);

                }

            };

            //将任务添加到线程池中

            exec.execute(task);

        }

    }

}

 

2.2 执行策略

Executor为任务提交和任务执行之间的解耦提供了标准的接口,你可以为某一类任务指定一个特定的执行策略。任务的执行策略定义了任务执行的“What、Where、When、How”,包括:

  • 任务在什么线程中执行?(what?)
  • 任务以什么顺序执行?(what?)
  • 可以有多少个任务并发执行?(how many?)
  • 可以有多少个任务进入等待执行队列?(how many?)
  • 如果系统过载,需要放弃一个任务,应该挑选哪一个任务?(which?)另外,如何通知应用程序知道这一切呢?(how?)
  • 在一个任务的执行前与结束后,应该做些什么?(what?)

执行策略是资源管理的工具。最佳的策略取决于你的计算资源和你对服务的要求。通过控制并发任务的数量,来保证你的任务不会因为计算资源不足而失败,也不会出现因为高并发带来争夺资源时的性能问题。

将任务的提交与执行解耦,还有助于在实现过程中选择一个与当前硬件最匹配的执行策略。

 

2.3 线程池

线程池,从字面含义卡,指的是管理一组同构工作线程的资源池。线程池与工作队列(Work Queue)密切相关有关,其中在工作队列中保存了所有等待执行的任务。工作者线程(Worker Theahd)的任务很简单:从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

使用线程池有一下几个几个优点:

1.通过重用现有的线程而不是创建新线程,可以避免线程创建和销毁的开销。

2.当请求到达时,工作线程通常已经存在,减少了等待线程创建的时间,从而提高响应性。

3.通过调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌状态,同时防止过多线程相互竞争资源,使应用程序耗尽内存或者失败,提高了应用程序稳定性。

 

类库提供了一个灵活的线程池以及一些有用的默认配置。通过Executor中的静态工厂方法之一来创建线程池:

a. newFixedThreadPool

    创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程的最大数量,这是线程池的规模不再变化。如果某个线程由于发生了未预期的Exception而结束,那么线程池会补充一个新的线程。

b. newCachedThreadPool

    创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求,那么会回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。

c. newSingleThreadPool

    是一个蛋线程Executor,她创建单个工作者线程执行任务。如果这个任务异常结束,则会创建另一个线程来代替。newSingleThreadPool确保依照任务在工作队列中的顺序来串行执行。

d. newScheduledThreadPool

    创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer。

 

“为每个任务分配一个线程”策略变成基于线程池的策略,将对应用程序的稳定性产生重大的影响。Web服务器不会在高负载的情况下失败。也不会创建数千个线程来争夺有限的CPU和内存资源,因此服务器性能将平缓的降低。

使用Executor还可以实现各种调优,管理、监视、记录日志、错误报告和其他功能,如果不使用任务执行框架,那么要增加这些功能是非常困难的。

 

2.4 Executor的生命周期

我们已经知道如何创建一个Executor,但并没有讨论如何关闭它。Executor的实现通常会创建线程来执行任务,但JVM只有在所有(非守护线程)线程全部终止后才会退出。因此,如果无法正确地关闭Executor,那么JVM将无法结束。

既然Executor是为应用程序提供服务的,因而他们也是可关闭的,无论采用平缓的当时,还是粗暴的方式,并将在关闭操作中受影响的任务的状态反馈给应用程序。

ps:平缓的关闭形式:完成所有已经启动的任务,并且不再接受任何新的任务。粗暴的关闭形式:直接关掉机房的电源。

为了解决执行服务的生命周期问题,Executor扩展来ExecutorService接口,添加了一些用于生命周期管理的方法:

代码 6-7 ExecutorService中的生命周期管理方法
public interface ExecutorService extends Executor{

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTerminated(long timeout, TimeUtil unit)
        throws InterruptedException;

}

Executor有三种状态:运行(running)、关闭(shutting down) 终止(terminated)

ExecutorService最初创建的时候是运行状态的。调用shutdown方法后,会启动一个平缓的关闭过程:停止接收新的任务,同时等待已经提交的任务完成--包括还未开始执行的任务。调用shutdownNow方法会启动一个强制关闭的过程:尝试取消所有运行中的任务和排列在队列中尚未开始执行的任务。

 

在Executor关闭后提交的任务将由“拒绝执行处理器处理(Rejected Execution Handler)”来处理。

它会抛弃任务,或者是的execute方法抛出一个未检查的RejectedExecutionException

等所有任务都执行完成后,ExecutorService会进入终止状态;可以调用awaitTermination方法等待ExecutorService到达终止状态,也可以使用isTerminated方法轮询来ExecutorService是否已经终止。 

通常在调用 awaitTermination 方法之后会立即调用shutdown,从而产生同步地关闭ExecutorService效果。

 

代码 6-8 LifecycleWebServer通过增加生命周期支持来扩展web服务的功能。

 

代码 6-8 支持关闭操作的Web服务器
class LifecycleWebServer{

    private final ExecutorService exec = ...;

    public static void main(String[] args) throws IOException{
        ServerSocket socket = new ServerSocket(80);

        while (! exec.isShutdown()){
            try{

                final Socket connection = socket.accept();

                exec.execute(new Runnable(){
                    public void run() { handleRequest(connection); }
                })

            } catch (RejectedExecutionException e){
                if (!exec.isShutdown())
                    log("task submission rejected",e);

            }

        }

    }

    public void stop() { exec.shutdown(); }

    void handleRequest(Socket connection){

        Request req = readRequest(connection);

        //判断是否为请求关闭的指令

        if (isShutdownRequest(req))
            stop();

        else 
            dispatchRequest(req);

    }

}

 

2.5 延迟任务与周期任务

使用Timer的弊端在于

  • 如果某个任务执行时间过长,那么将破坏其他TimerTask的定时精确性(执行所有定时任务时只会创建一个线程),只支持基于绝对时间的调度机制,所以对系统时钟变化敏感
  • TimerTask抛出未检查异常后就会终止定时线程(不会捕获异常)

Java5.0或者更高版本的JDK中,将很少使用Timer。

如果要构建自己的调度服务,那么可以使用DelayQueue。它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。在DelayQueue中,只有某个元素逾期后才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据他们的延迟时间排序。

 

 


 

三、找出可利用的并行性

Executor框架帮助指定执行策略,但如果要使用Executor,必须将任务表述为一个Runnable。在大多数的服务器应用程序中都存在一个明显任务边界:单个客户请求。但有时候任务的边界并非是显而易见的,例如在很多桌面应用程序中,即使是服务器应用程序在单个客户请求中仍可能存在可发掘的并行性,例如数据库服务器。

本节中我们将开发一些不同版本的组件,并且每个版本都实现了不同程度的并发性。该示例组件实现浏览器程序中的页面渲染(Page-Rendering)功能,它的作用是将HTML页面绘制到图像缓存中。为了简便,假设HTML页面只包含标签文本,以及预定大小的图片和URL。

 

3.1 示例:串行的页面渲染器

最简单的方法就是对HTM L文档进行串行处理。先绘制文本元素,同时为图像预留出矩形的站位空间,在处理完了第一遍文本后,程序在开始下载图像并绘制到相应的占位空间中。如下:

 

代码 6-10 串行地渲染页面元素

public class SingleThreadRenderer{

    void renderPage(CharSequence source){

        //加载文本
        renderText(source);

        List<ImageData> ImageData = new ArrayList<ImageData>();

        //下载图片
        for (ImageData imageInfo : scanForImageInfo(source))
            ImageData.add(imageInfo.downloadImage());

        //加载图片
        for (ImageData data : ImageData)
            renderImage(data);

    }

}

图像下载过程的大部分时间都是在等待IO操作执行完成,在这期间CPU几乎不做任何工作。因此,在这种串行执行方法中没有充分地利用CPU,是的页面的家在时间较长。通过将问题分解为多个独立的任务并发执行,能够获得更高的CPU利用率和响应灵敏度。

 

 

3.2 携带结果的任务 CallableFuture

 

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,虽然run能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或者抛出一个受检查的异常。

 

许多任务实际上都是存在延迟的计算:计算某个复杂的功能或者执行数据库查询,从网络上获取资源。对于这些任务,Callable是一种更好的抽象:它认为主入口点(即call)将返回一个值,并可能抛出一个异常。

Runnalbe和Callable描述的都是抽象的计算任务。这些任务通常是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。有时希望取消某些任务的执行。

代码 6-11 Callable与Future接口
public interface Callable<V> {

    V call() throws Exception;

}


public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException,ExecutionException,
            CancellationException;

    V get(long timeout, TimeUtil unit) 
        throws InterruptedException,ExecutionException, 
                  CancellationException, TimeoutException;

}

Future表示了一个任务的生命周期,并提供了相应的方法判断是否完成或被取消以及获取执行结果,以及获取任务的结构和取消任务等。

get方法的行为取决于任务的状态:尚未开始,正在执行,已完成。

  •     如果任务已经完成,那么将立即返回或抛出一个 Exception;
  •     如果任务没有完成,那么get将阻塞,直到任务完成;
  •     如果任务抛出了异常,那么个get将该异常封装为 ExecutionException并重新抛出;
  •     如果任务被取消,那么get将抛出CancellationException,同时还可以通过getCause来获得被封装的初始异常;

 

可以通过许多种方法创建一个Future来描述任务:

  • ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor
  • 还可以显示的为某个指定的Runnable或者Callable实力化一个 FutureTask。FutureTask实现了Runnable,因此可以将它提交给Executor来执行,或者直接调用其run方法。
  • 从Java 6开始,ExecutorService实现可以改写AbstractExecutorService中的newTaskFor方法,从而根据已经提交的Runnable或Callable来控制Future的实例化过程。在默认实现中仅创建了一个新的FutureTask,代码:
代码 6-12 AbstractExecutorService中newTaskFor的默认实现
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

    return new FutureTask<T>(runnable, value);

}

在将Runnable或Callable提交到Executor到Executor的过程中,包含了一个安全发布,即将任务从提交线程发布到最终的执行线程。类似的,在设置Future结果的过程中也包含一个安全的发布,即将这个结果从执行线程发布到任何通过get方法获得它的线程。

 

3.3 示例:使用Future 实现页面渲染器

为了使页面渲染器实现更高的并发性,将渲染的过程分解为两个任务,一个是渲染所有的文本(CPU密集型),另一个是下载所有的图像(IO密集型)。

代码 6-13 FutureRenderer
public class FutureRenderer {

    private final ExecutorService exec = ....;

    void renderPage(CharSequence source){

        final List<ImageInfo> imageInfos = scanForImageInfo(source);

        Callable<List<ImageData>> task = 

            new Callable<List<ImageData>>() {

                public List<ImageData> call(){

                    public List<ImageData> result = new ArrayList<ImageData>();

                    for (ImageInfo imageInfo : imageInfos)
                        result.add(imageInfo.downloadImage());

                    return result;

                }

            };

        Future<List<ImageData>> future = exec.submit(task);
        renderText(source);

        try{
            List<ImageData> imageData = future.get();

            for (ImageData data : imageData)
                renderImage(data);

        } catch (InterruptedException e){
            //重新设置线程的中断状态
            Thread.currentThread().interrupt();

            //由于不需要结构,因此取消任务
            future.cancel(true);

        } catch (ExecutionException e){
            throw launderThrowable(e.getCause());

        }

    }
}

Get方法拥有“状态依赖”的内在特性,因而调用者不需要知道任务的状态,此外在任务提交和获得结果中包含的安全发布属性也确保了这个方法是线程安全的。

 

3.4 在异构任务并行化中存在的局限

在上面的FutureRender中使用了两个任务,一个是负责渲染文本,一个是负责渲染图片。如果渲染文本的速度远远高于渲染图片的速度,那么程序的最终性能与串行执行的性能差别并不大,而代码却变复杂了。

然而,通过对异构任务进行并行化来获得很大的性能提升是困难的。只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正的性能提升。

 

3.5 CompletionService:Executor BlockingQueue

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但却有些繁琐。幸运的是,还有一种更好的方法:CompletionService 完成服务。

 

CompletionService将Executor和BlockingQueue的功能融合在一起。可以通过将Callable任务提交给她来执行,然后使用类似于队列操作的take和poll等方法来获得已完成的结果,而这些结果也会在完成时将被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor

 

3.6 示例:使用 CompletionService 实现页面渲染

通过CompletionService从两个方面来提高页面渲染器的性能:缩短总运行时间以及提高响应性。

为每一幅图像的下载都创建一个独立任务,并在线程池中执行他们,从而将串行的下载过程转换为并行的过程,这将减少下载所有图像的总时间。

 

代码 6-15 使用CompletionService,使页面在下载完成后立即显示出来:

代码 6-15 使用CompletionService,使页面在下载完成后立即显示出来

public class Renderer {

      private final ExecutorService executor;

      Renderer(ExecutorService executor){ this.executor=executor; }

      void renderPage(CharSequence source){

          List<ImageInfo> info=scanForImageInfo(source);

          //ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。

          CompletionService<ImageData> completionService=

                  new ExecutorCompletionService<ImageData>(executor);

          for(final ImageInfo imageInfo:info)

              completionService.submit(new Callable<ImageData>(){

                 public ImageData call(){

                     return imageInfo.downloadImage(); //CompletionService中有类似队列的操作

                 }

              });

          renderText(source);

          try{

              for(int t=0,n=info.size();t<n;t++){

                  Future<ImageData> f = completionService.take();

                  ImageData imageData=f.get();

                  renderImage(imageData);

              }

          }catch (InterruptedException e) {
            Thread.currentThread().interrupt();

        }catch (ExecutionException e) {
            throw LaunderThrowable.launderThrowable(e.getCause());

        }

      }

}

 

3.7 为任务设置时限

有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。

例如,某个Web应用程序从外部的广告服务器上获取广告信息,但如果应用程序在2秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。

 

在有限时间内执行任务的主要困难在于,要确保得到的答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的Future.get中支持这种需求:当结果可用时,它立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。

 

在使用时限任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。因此在get方法抛出TimeoutException时,可以通过Future来取消任务。

 

代码 6-16   在指定时间内获取广告信息
public class RenderWithTimeBudget {

     private static final Ad DEFAULT_AD = new Ad();
     private static final long TIME_BUDGET = 1000;
     private static final ExecutorService exec = Executors.newCachedThreadPool();

     Page renderPageWithAd() throws InterruptedException{

         long endNanos=System.nanoTime()+TIME_BUDGET; //返回纳秒级的时间,再加上时限
         Future<Ad> f=exec.submit(new FetchAdTask());

         //在等待广告的同时显示页面
         Page page=renderPageBody();
         Ad ad;

         try{
            //只等待指定的时间长度
            long timeLeft=endNanos-System.nanoTime();
            ad = f.get(timeLeft, NANOSECONDS);//在指定时限内获取,NANOSECONDS为时间单位

         }catch (ExecutionException e) {
            ad = DEFAULT_AD;

        }catch (TimeoutException e) {  
            //如果超时了,广告转为默认广告,并取消获取任务
            ad = DEFAULT_AD;
            f.cancel(true);
        }

         page.setAd(ad);
         return page;

     }

}

 

3.8 示例:旅行预订门户网站

“预订时间”方法可以很容易地扩展到任意数量的任务上。考虑这样一个旅行预订门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线、旅店和汽车租赁公司的报价。在获取不同公司报价的过程中,可能会调用Web服务、访问数据库、制定一个EDI事物或者其他机制。在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略他们,或者其他操作。

 

从一个公司获取报价的过程与其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到线程池,保留n个Futrue,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但我们还可以使用一个更简单的方法——invokeAll。

代码 6-17     在预定时间内请求旅游报价
private class QuoteTask implements Callable<TravelQuote>{

    private final TravelCompany company;
    private final TravelInfo travelInfo;

    public TravelQuote call()throw Exception{
        return company.solicitQuote(travelInfo);
    }

}

 

public List<TravelQuote> getRankedTravelQuotes(
      TravelInfo travelInfo, Set<TravelCompany> companies,
      Comparator<TravelQuote> ranking(long time, TimeUnit unit)

      throws InterruptedException {
             List<QuoteTask> tasks = new ArrayList<QuoteTask>();

             //为每家公司添加报价任务
             for (TravelCompany company : companies)
                  tasks.add(new QuoteTask(company, travelInfo));

             //InvokeAll方法的参数为一组任务,并返回一组Future ,用时限来限制时间       
             List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);

 

          List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
          Iterator<QuoteTask> taskIter = tasks.iterator();

          for (Future<TravelQuote> f : futures) {
             QuoteTask task = taskIter.next();

             try {
                //invokeAll按照任务集合中迭代器额顺序肩所有的Future添加到返回的集合中
                quotes.add(f.get());

             } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));

             } catch (CancellationException e) {
                quotes.add(task.getTimeoutQuote(e));

             }

         }

         Collections.sort(quotes, ranking);
          return quotes;

}

在程序6-17中使用了支持时限的inokAll,将多个任务提交到一个ExecutorService并获得结果。

invokeAll方法有以下几个特性:

  • invokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。invokeAll按照任务集合中迭代器的顺讯将所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示的Callable关联起来。
  • 当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,ivokeAll将返回。
  • 当超过指定时限后,任何还未完成的任务都会取消。
  • invokeAll返回后,每个任务要么正常地完成,要么被取消,而客户端代码可以调用get或者isCancelled来判断究竟是何种情况。

 


 

四、小结

  • 通过围绕任务执行来设计应用程序,可以简化开发的过程,并有助于实现并发。
  • Executor框架将任务提交与执行策略解耦开来,同时还支持多种不同类型的执行策略。
  • 想要在将应用程序分别为不同的任务时获得最大的好处,必须定义清晰的任务边界。

 

 

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 从浅入深掌握并发执行框架Executor

相关推荐

  • 暂无文章

一个分享Java & Python知识的社区