程序员社区

Java多线程实现-线程池

在《阿里巴巴Java手册》里有这样一条:
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所花的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

简单来说使用线程池有以下几个目的:

  • 线程是稀缺资源,不能频繁的创建
  • 解耦作用:线程创建时执行完全分开,方便维护
  • 可以将其放入一个池子中,方便给其他任务进行复用

一、线程池介绍

谈到线程池就会想到池化技术,其中最核心的思想就是把宝贵的资源放到一个池子中;每次使用都从里面获取,用完之后又放回池子供其他人使用,简单来说,池化技术就是提前保存大量的资源,以备不时之需。在机器资源有限的情况下,使用池化技术可以大大的提高资源的利用率,提升性能。

创建线程池并使用的简单例子:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class test {

    public static void main(String[] args) {
        ExecutorService executorService=new ThreadPoolExecutor(1,1,
                60L, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(10));
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("test");
            }
        });
        
        executorService.shutdown();
    }

}

jdk提供给外部的接口很简单,直接调用ThreadPoolExecutor构造一个就可以了,也可以通过Executors静态工厂构建。

二、线程池源码分析

ThreadPoolExecutors构造函数的源码如下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                          long keepAliveTime, TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

2.1 构造函数中各参数含义:

corePoolSize(线程池的核心线程数)

  • 当提交一个任务到线程池时,若创建的线程数量还未达到核心线程数,则线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程。
  • 当任务到来时,若创建的线程数大于等于核心线程数并且小于最大线程数,则只有当workQueue满时才创建新的线程去处理任务。
  • 当corePoolSize和maximumPoolSize相同,则创建的线程池大小固定,判断workQueue未满则加入workQueue中等待空闲线程去处理。
  • 当运行的线程数等于maximumPoolSize,且workQueue已经满了,则通过handler所指定的策略来处理任务
  • 如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  • 注意到如果线程池中的corePoolSize的线程是会一直存活的,除非设置了allowCoreThreadTimeOut参数

maximumPoolSize(线程池最大大小)

  • 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列则这个参数没有效果。

workQueue(阻塞队列)

  • 用于存放任务的阻塞队列,当任务提交时,线程池所有线程正忙,会将该任务加入workQueue队列中等待
  • 这个队列只会存储那些由execute方法提交的Runnable类型的任务

keepAliveTime(线程活动保持时间)

  • 线程池的工作线程空闲后,保持存活的时间。这个时间针对的是超过corePoolSize数量的线程,它们执行完线程后不会立即销毁,直到超过KeepAliveTime所指定的时间。
  • 如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

TimeUnit(线程活动保持时间的单位)

  • 可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

threadFactory:

  • 用来创建线程的类,其中只有一个newThread方法,默认创建的线程拥有相同的优先级并且是非守护线程,同时设置线程的名称

handler

  • 这个参数用来表示执行超过线程池执行任务限制的任务时,我们所采取的策略,有四种实现方式。
  • AbortPolicy:直接抛出异常,默认策略
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldesPolicy:丢弃阻塞队列中靠前的任务,并执行当前任务
  • DiscardPolicy:直接丢弃任务

2.2 线程池的使用

通常我们都是使用:

threadPool.execute(new Job());

这样的方式来提交一个任务到线程池中,所以核心的逻辑就是execute()函数了。

在具体分析之前先了解下线程池中所定义的状态,这些状态都和线程的执行密切相关:

// runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  • RUNNING:指运行状态,指可以接收任务执行队列里的任务
  • SHUTDOWN:指调用了shutdown()方法,不再接收新任务了,但是队列里的任务得执行完毕
  • STOP:指调用了shutdownNow()方法,不再接受新任务,同时阻塞队列里的所有任务并中断所有正在执行的任务。
  • TIDYING:指所有任务都执行完毕,在调用shutdown()/shutdownNow()中都会尝试更新为这个状态
  • TERMINATED:终止状态,当执行terminated()后会更新为这个状态。

用图表示为:
在这里插入图片描述

execute()方法的内部实现:
在将来的某段时间内执行给定的任务。这个任务可以在新线程中执行也可以在已经存在的线程中执行。如果这个任务不能被提交执行,那么有两个原因:一个是这个executor已经被shutdown了,另一个是达到了最大容量。接下来这个任务会被提交给RejectedExecutionHandler处理。

/**
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

		//获取当前线程池的状态
        int c = ctl.get();
		
		//步骤1
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
		
		//步骤2
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		
		//步骤3
        else if (!addWorker(command, false))
            reject(command);
    }

首先获取线程池的状态,接下来进行三个步骤:

  1. 如果当前线程数量小于corePoolSize,创建一个新的线程运行。(The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn’t, by returning false)
  2. 如果写入阻塞队列成功,此时我们仍需双重检查,看我们是否需要新添加一个线程 ,because existing ones died since last checking) 或者这个线程池在我们执行方法的时候已经被shutdown了。所以我们需要再次获取线程状态,如果线程状态改变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略。如果当前线程池为空就创建一个新的线程并执行。
  3. 如果无法写入阻塞队列,那么我们尝试创建一个新的线程。如果创建新线程失败,那么我们可以知道这个线程池已经被shutdown了或者已经饱和了,那么执行拒绝策略。

整个流程如下所示:
在这里插入图片描述

2.3 添加worker线程

从方法execute的实现可以看出:
addWorker主要负责创建新的线程并执行任务,代码如下:

/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
			
			//做是否能够添加工作线程条件过滤
            //判断线程池的状态,如果线程池的状态大于或等于SHUTDOWN,则不处理提交的任务,直接返回
            //或者处于SHUTDOWN状态并且工作队列非空
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
			
			//做自旋,更新创建线程数量
			//通过参数core判断当前需要创建的线程是否为核心线程,如果core为true,且当前线程数小于corePoolSize,则跳出循环,开始创建新的线程
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
        	//获取线程池主锁
        	//线程池的工作线程通过Worker类实现,通过ReentrantLock锁保证线程安全
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

					//添加线程到workers中(线程池中)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //启动新建的线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

2.4 什么是workers

 /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

可以看出workers是一个hashSet,所以,线程池底层的存储结构是一个HashSet

2.5 执行worker的函数

/**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
        	//是否是第一次执行任务,或者从队列中可以获取到任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                //获取到任务后,执行任务前开始操作钩子
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    //执行恩物
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    	//执行任务后钩子
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

这两个钩子(beforeExecute,afterExecute)允许我们自己继承线程池,做任务执行前后处理。

2.6 如何配置线程池

接下来看下上文提到的几个核心参数应该怎么配置

有一点是肯定的,线程池肯定不是越大越好,通常我们是需要根据任务执行的性质来确定线程池的大小:

  • IO 密集型任务:由于线程并不是一直在运行,CPU的性能,消耗的时间少于请求内存、硬盘、网络的时间,这时大部分时间CPU处于空闲状态,所以可以尽可能的多配置线程,比如 CPU 个数 * 2
  • CPU 密集型任务主要时间花费在计算上面,内存、硬盘、网络占用的时间少于CPU本身计算的时间,这时应配置尽可能小的线程,避免线程之间频繁的切换消耗资源,如配置

    N

    c

    p

    u

    +

    1

    N_{cpu}+1

    Ncpu+1个线程的线程池

这些都只是经验,最好的方式还是根据实际情况测试得出最佳配置。

2.7 关闭线程池

关闭线程池是两个方法:shutdown()和shutdownNow()

注意这两个方法有着重要的区别:

  • shutdown()执行后停止接受新任务,会把队列的任务执行完毕
  • shutdownNow()也是停止接受新任务,但会终端所有的任务,将线程池状态变为stop

两个方法都会中断线程,用户可自行判断是否需要响应中断。

2.8 线程池的优点:

  • 降低资源消耗,重复利用已创建的线程降低线程创建和销毁造成的损耗
  • 提高响应速度,任务到达时不需要等待线程创建就能立即执行
  • 提高线程的可管理性,线程是稀缺资源不能无限制创建,通过线程池同一分配可以调优和监控

2.9 整个工作流程图

在这里插入图片描述

https://www.cnblogs.com/rinack/p/9888717.html

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java多线程实现-线程池

一个分享Java & Python知识的社区