前言 多线程是 java 并发编程中的一个重要特性,最简单的用法是在需要使用多线程的地方 new 一个新的线程让其异步运行。但是,线程的创建是一个比较大的开销,如果有多个任务需要频繁间歇地异步执行,为每个任务单独创建一个线程就显得有点浪费。因此,可以考虑使用线程池的做法,在线程池中保持一定数量的线程,当有任务需要异步执行时,直接让线程池中的空闲线程获取该任务进行执行。这样就能避免每次创建线程的开销。
线程池的基本概念 当创建一个线程池的时候,需要传入以下参数
1 2 3 4 5 6 7 int corePoolSize int maximumPoolSize long keepAliveTime TimeUnit unit BlockingQueue<Runnable> workQueue ThreadFactory threadFactory RejectedExecutionHandler handler
线程池的创建 一般情况下,我们使用 Executors
类中的工厂类创建所需要的线程池,常用的两种线程池是 Executors.newCachedThreadPool()
、Executors.newFixedThreadPool()
,其构造函数如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); } public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
线程池的状态 线程池中定义了 5 个状态:RUNNING
、SHUTDOWN
、STOP
、TIDYING
、TERMINATED
。
RUNNING
:表示线程池处于运行状态
SHUTDOWN
:表示线程池处于关闭状态,可调用 shutdown
函数将线程池的状态由 RUNNING
变成此状态。当线程池处于此状态下时,线程池不再接受新的任务,但是对于已经提交的任务和正在运行的任务,会继续执行。
STOP
:表示线程池处于停止状态,可调用 shutdownNow
函数将线程池的状态由 RUNNING
或 SHUTDOWN
变成此状态。当线程处于此状态时,线程池不在接受新的任务,同时会中断正在执行的任务,而对于工作队列中等待执行的任务,将不再调用执行。
TIDYING
:表示线程池处于「纯净待结束」状态。此状态可由 SHUTDOWN
状态或 STOP
状态转换而来。对于状态 SHUTDOWN
,如果其工作队列中没有等待的线程,同时没有线程在运行时,将其状态变为 TIDYING
。对于状态 STOP
,当线程池中没有线程在运行时,将其状态转变为 TIDYING
。当线程池转换此状态时,会调用可扩展的terminated
函数。
TERMINATED
:表示线程池处于结束状态。此状态由 TIDYING
转换而来。当执行完 terminated
后线程池变为此状态。
其各个状态之间的转换关系大致如下图所示
线程池的运行过程 当我们执行 execute.submit(task)
向线程池中提交任务时,其会先判断当前运行线程是否小于核心线程数量,如果是,则新建一个线程运行,否则判断任务是否能放入工作队列(阻塞队列)中,如果可以,则放入阻塞队列中等待调度执行,否则继续判断当前运行线程数量是否大于最大线程数量,如果小于则新建线程运行任务,否则执行拒绝策略。
流程图 提交任务后线程池的处理过程,大致流程图如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 start=>start: start submit=>operation: 提交一个任务 lessThanCoreSize=>condition: 运行的线程数量 小于 corePoolSize newThread=>operation: 新建一个线程运行任务 isQueueFull=>condition: 阻塞队列是否已满 waitRun=>operation: 等待调度执行 lessThanMaxSize=>condition: 运行的线程数量小于 maximumPoolSize executeRejection=>operation: 执行拒绝策略 end=>end: end start->submit->lessThanCoreSize(no)->isQueueFull(yes)->lessThanMaxSize(no)->executeRejection->end lessThanCoreSize(yes, right)->newThread->end isQueueFull(no, right)->waitRun->end lessThanMaxSize(yes)->newThread->end
部分源码解析 当执行 submit
函数时,会调用 execute
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Runnable task, T result) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
在 execute
函数中,会依次对线程数量是否小于核心线程数量、是否能加入阻塞队列、线程数量是否小于最大线程数量进行判断。当线程数量小于核心线程数量时,会新建一个线程运行当前任务,否则加入阻塞队列,当加入阻塞队列失败时,会判断线程数量是否小于最大线程数量,如果小于则新建线程运行当前任务,否则执行拒绝策略。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
接下来看看 addWorker
函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
线程池中新建线程会执行 worker
的 run
方法,run
方法中只是简单的调用 runWorker
方法,因此,接下来看下 runWorker
的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public void run () { runWorker(this ); } final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
接下来看下 getTask
的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
至此,execute
的函数大致解析完毕。
线程池的拒绝策略 当线程池中的线程数量大于 maximumPoolSize
时,再次提交任务,会执行拒绝策略。线程池中提供了四种拒绝策略可以供我们直接使用。当然,也可以自定义拒绝策略,在创建线程池中传入即可。线程池中定义的四种拒绝策略如下
拒绝策略
处理方式
CallerRunsPolicy
如果线程池没有关闭,由提交任务的线程自己执行这个任务
AbortPolicy
抛出 RejectedExecutionException 异常,这是默认的拒绝策略,如果在创建线程池的时候不传入参数指定拒绝策略,会默认使用这个拒绝策略
DiscardPolicy
不做任何处理,直接丢弃这个任务
DiscardOldestPolicy
如果线程池没有关闭,从工作队列(阻塞队列)中取出队头的任务丢弃,然后重新尝试提交任务。此策略不建议与使用优先级队列(PriorityQueue)的工作队列结合使用
自定义线程池示例 本例自定义一个线程池,其中 corePoolSize 为 3,maximumPoolSize 为 5,工作队列(阻塞队列)是长度为 2 的有界阻塞队列,拒绝策略采用默认的抛 RejectedExecutionException 异常的方式 。在创建线程池后,向线程提交 10 个任务进行运行。刚开始,由于线程池中线程数量小于 3,所以会创建 3 个线程用于运行提交的任务,当任务继续提交时,会将任务放入阻塞队列中等待运行,当提交任务达到 5 个时,阻塞队列已满,此时再提交任务,会继续创建新的线程运行刚刚提交的任务。当提交任务达到 7 个时(3 个在核心线程中运行,2 个在阻塞队列中,另外 2 个是由于线程数小于 maximumPoolSize 而新创建的线程执行的),再提交任务,会执行拒绝策略,抛出 RejectedExecutionException 异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public class TestThreadPool { public static void main (String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor (3 , 5 , 10 , TimeUnit.SECONDS, new ArrayBlockingQueue <>(2 )); for (int i = 0 ; i < 10 ; i++) { System.out.println("Push task count: " + (i + 1 )); final String threadName = "TTT" + (i + 1 ); try { threadPoolExecutor.execute(new Runnable () { @Override public void run () { System.out.println("Run thread " + threadName); try { Thread.sleep(500 ); } catch (InterruptedException e) { e.printStackTrace(); } } }); } catch (Exception e) { System.out.println("Submit thread " + threadName + " error" ); e.printStackTrace(); } System.out.println("Pool size: " + threadPoolExecutor.getPoolSize()); System.out.println("Queue count: " + threadPoolExecutor.getQueue().size()); System.out.println("-----------------" ); } threadPoolExecutor.shutdown(); } }
输出结果如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 Push task count: 1 Pool size: 1 Queue count: 0 ----------------- Push task count: 2 Run thread TTT1 Pool size: 2 Queue count: 0 Run thread TTT2 ----------------- Push task count: 3 Pool size: 3 Queue count: 0 ----------------- Push task count: 4 Pool size: 3 Run thread TTT3 Queue count: 1 ----------------- Push task count: 5 Pool size: 3 Queue count: 2 ----------------- Push task count: 6 Pool size: 4 Queue count: 2 ----------------- Push task count: 7 Pool size: 5 Run thread TTT6 Queue count: 2 ----------------- Push task count: 8 Run thread TTT7 Submit thread TTT8 error java.util.concurrent.RejectedExecutionException: Task thread.TestThreadPool$1@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0] Pool size: 5 Queue count: 2 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) at thread.TestThreadPool.main(TestThreadPool.java:18) ----------------- Push task count: 9 Submit thread TTT9 error java.util.concurrent.RejectedExecutionException: Task thread.TestThreadPool$1@75b84c92 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) at thread.TestThreadPool.main(TestThreadPool.java:18) Pool size: 5 Queue count: 2 ----------------- Push task count: 10 Submit thread TTT10 error java.util.concurrent.RejectedExecutionException: Task thread.TestThreadPool$1@232204a1 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 5, active threads = 5, queued tasks = 2, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) at thread.TestThreadPool.main(TestThreadPool.java:18) Pool size: 5 Queue count: 2 ----------------- Run thread TTT4 Run thread TTT5
从输出结果中可以看出
当提交任务数不超过 3 时,线程池立刻创建新线程执行任务。这从Run Thread TTT1
到 Run Thread TTT3
这一段的输出结果可以看出
当提交任务数是 (3, 5]
个时,提交的任务放如阻塞队列中等待执行。这从最后最后的两行输出 Run thread TTT4
、Run thread TTT5
以及这两行在 Run thread TTT7
后面可以看出。
当提交任务数是(5, 7]
个时,线程池立刻创建新的线程执行任务。这从输出 Run Thread TTT3
后直接输出 Run Thread TTT6
和 Run Thread TTT7
而后才输出 Run thread TTT4
、Run thread TTT5
可以看出
当提交任务数是 (7, 10]
个时,线程池执行拒绝策略抛出异常。这从 Submit thread TTT8 error
、Submit thread TTT9 error
、Submit thread TTT10 error
可以看出
扩展阅读 https://javadoop.com/post/java-thread-pool