基本概述 线程池作用:线程复用
降低资源消耗:减少了创建和销毁线程的次数 提高响应速度:任务到达,如果有线程可以直接用 提高可管理性:使用线程池可以进行统一的分配,管理 阻塞队列 基本介绍 阻塞队列的实现(java.util.concurrent.BlockingQueue
接口):
ArrayBlockQueue
:数组结构,有界阻塞队列LinkedBlockingQueue
:链表结构,无界阻塞队列(有界,默认大小Integer.MAX_VALUE
)PriorityBlockQueue
:支持优先级排序的,无界阻塞队列DelayedWorkQueue
:使用优先级队列实现的,延迟无界阻塞队列SynchronousQueue
:不存储元素的阻塞队列,每一个生产线程会阻塞到有一个put的线程放入元素为止LinkedTransferQueue
:链表结构,无界阻塞队列LinkedBlockingDeque
:链表结构,双向阻塞队列阻塞队列:
阻塞添加put()
:队列已满时,添加元素的线程会被阻塞 阻塞删除take()
:队列为空时,删除元素的线程会被阻塞 核心方法 方法类型 抛出异常 特殊值(boolean或者null) 阻塞 超时(抛出错误) 插入(尾) add(e) offer(e) put(e) offer(e,time,unit) 移除(头) remove() poll() take() poll(time,unit) 检查(队首元素) element() peek() 不可用 不可用
链表阻塞队列 1 2 3 4 5 6 7 8 9 10 11 12 13 public class LinkedBlockingQueue <E> extends AbstractQueue <E> implements BlockingQueue <E>, java.io.Serializable { static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } } }
入队出队
初始化链表,last = head = new Node<E>(null)
1 2 3 4 5 public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException (); this .capacity = capacity; last = head = new Node <E>(null ); }
节点入队(尾节点入队),last = last.next = node
1 2 3 private void enqueue (Node<E> node) { last = last.next = node; }
1 2 3 4 5 6 7 8 9 private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
总结:无论如何,head节点总是Dummy节点(item为null)
加锁分析 用了两把锁 + dummy节点
线程安全分析:
节点总数大于2时(包括dummy节点),putLock
保证last节点的线程安全,takeLock
保证head节点的安全 节点总数等于2时(一个dummy节点,一个正常节点),仍然是两把锁,不会竞争 节点总数等于1时(一个dummy节点),take线程被noEmpty
条件阻塞,有竞争,会阻塞 1 2 3 4 5 6 7 private final ReentrantLock putLock = new ReentrantLock ();private final Condition notFull = putLock.newCondition();private final ReentrantLock takeLock = new ReentrantLock ();private final Condition notEmpty = takeLock.newCondition();
put入队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
1 2 3 4 5 6 7 8 9 10 11 private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
take出队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1 2 3 4 5 6 7 8 9 private void signalNotFull () { final ReentrantLock putLock = this .putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
同步阻塞队列 SynchronousQueue
是一个不存储元素的BlockingQueue
,每一个生产者必须阻塞匹配到一个消费者
成员属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class SynchronousQueue <E> extends AbstractQueue <E> { static final int NCPUS = Runtime.getRuntime().availableProcessors(); static final int maxTimedSpins = (NCPUS < 2 ) ? 0 : 32 ; static final int maxUntimedSpins = maxTimedSpins * 16 ; static final long spinForTimeoutThreshold = 1000L ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 private transient volatile Transferer<E> transferer;abstract static class Transferer <E> { abstract E transfer (E e, boolean timed, long nanos) ; }
1 2 3 4 5 public SynchronousQueue (boolean fair) { transferer = fair ? new TransferQueue <E>() : new TransferStack <E>(); }
1 2 3 4 5 6 7 8 public boolean offer (E e) { if (e == null ) throw new NullPointerException (); return transferer.transfer(e, true , 0 ) != null ; } public E poll () { return transferer.transfer(null , true , 0 ); }
非公平(TransferStack) TransferStack
是非公平同步队列
所有请求都被压入栈中,栈顶元素最先匹配,栈底元素存在饥饿的问题
TransferStack类成员变量 1 2 3 4 5 static final int REQUEST = 0 ; static final int DATA = 1 ; static final int FULFILLING = 2 ;
SNode内部类 1 2 3 4 5 6 7 static final class SNode { volatile SNode next; volatile SNode match; volatile Thread waiter; Object item; int mode; }
1 2 3 SNode(Object item) { this .item = item; }
设置方法:设置Node对象的next字段,此处使用了CAS 1 2 3 4 5 boolean casNext (SNode cmp, SNode val) { return cmp == next && UNSAFE.compareAndSwapObject(this , nextOffset, cmp, val); }
1 2 3 4 5 6 7 8 9 10 11 12 13 boolean tryMatch (SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this , matchOffset, null , s)) { Thread w = waiter; if (w != null ) { waiter = null ; LockSupport.unpark(w); } return true ; } return match == s; }
1 2 3 4 5 6 7 void tryCancel () { UNSAFE.compareAndSwapObject(this , matchOffset, null , this ); } boolean isCancelled () { return match == this ; }
TransferStack类成员方法 1 2 3 4 5 6 static SNode snode (SNode s, Object e, SNode next, int mode) { if (s == null ) s = new SNode (e); s.mode = mode; s.next = next; return s; }
transfer方法:核心方法,请求匹配出栈,不匹配则阻塞 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 E transfer (E e, boolean timed, long nanos) { SNode s = null ; int mode = (e == null ) ? REQUEST : DATA; for (;;) { SNode h = head; if (h == null || h.mode == mode) { if (timed && nanos <= 0 ) { if (h != null && h.isCancelled()) casHead(h, h.next); else return null ; } else if (casHead(h, s = snode(s, e, h, mode))) { SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null ; } if ((h = head) != null && h.next == s) casHead(h, s.next); return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { if (h.isCancelled()) casHead(h, h.next); else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { SNode m = s.next; if (m == null ) { casHead(s, null ); s = null ; break ; } SNode mn = m.next; if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else s.casNext(m, mn); } } } else { SNode m = h.next; if (m == null ) casHead(h, null ); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } }
awaitFulfill
方法:阻塞当前线程,等待被匹配,或者取消当前节点1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 SNode awaitFulfill (SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L ; Thread w = Thread.currentThread(); int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0 ); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null ) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { s.tryCancel(); continue ; } } if (spins > 0 ) spins = shouldSpin(s) ? (spins-1 ) : 0 ; else if (s.waiter == null ) s.waiter = w; else if (!timed) LockSupport.park(this ); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanos); } }
公平(TransferQueue) TransferQueue是公平的同步队列,采用FIFO的队列实现,请求节点与队尾模式不同,需要与队头发生匹配
TransferQueue类成员变量 1 2 3 transient volatile QNode head; transient volatile QNode tail; transient volatile QNode cleanMe;
QNode内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 static final class QNode { volatile QNode next; volatile Object item; volatile Thread waiter; final boolean isData; QNode(Object item, boolean isData) { this .item = item; this .isData = isData; } void tryCancel (Object cmp) { UNSAFE.compareAndSwapObject(this , itemOffset, cmp, this ); } boolean isCancelled () { return item == this ; } boolean isOffList () { return next == this ; } }
TransferQueue类成员方法 1 2 3 4 5 6 7 8 9 10 11 void advanceHead (QNode h, QNode nh) { if (h == head && UNSAFE.compareAndSwapObject(this , headOffset, h, nh)) h.next = h; } void advanceTail (QNode t, QNode nt) { if (tail == t) UNSAFE.compareAndSwapObject(this , tailOffset, t, nt); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 E transfer (E e, boolean timed, long nanos) { QNode s = null ; boolean isData = (e != null ); for (;;) { QNode t = tail; QNode h = head; if (t == null || h == null ) continue ; if (h == t || t.isData == isData) { QNode tn = t.next; if (t != tail) continue ; if (tn != null ) { advanceTail(t, tn); continue ; } if (timed && nanos <= 0 ) return null ; if (s == null ) s = new QNode (e, isData); if (!t.casNext(null , s)) continue ; advanceTail(t, s); Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { clean(t, s); return null ; } if (!s.isOffList()) { advanceHead(t, s); if (x != null ) s.item = s; s.waiter = null ; } return (x != null ) ? (E)x : e; } else { QNode m = h.next; if (t != tail || m == null || h != head) continue ; Object x = m.item; if (isData == (x != null ) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue ; } advanceHead(h, m); LockSupport.unpark(m.waiter); return (x != null ) ? (E)x : e; } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 Object awaitFulfill (QNode s, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L ; Thread w = Thread.currentThread(); int spins = ((head.next == s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0 ); for (;;) { if (w.isInterrupted()) s.tryCancel(e); Object x = s.item; if (x != e) return x; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { s.tryCancel(e); continue ; } } if (spins > 0 ) --spins; else if (s.waiter == null ) s.waiter = w; else if (!timed) LockSupport.park(this ); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanos); } }
操作Pool 创建方法 Executor Executor构造方法:
1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, // 核心线程数,即最小线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 存活时间,核心线程数外的线程超时销毁 TimeUnit unit, BlockingQueue<Runnable> workQueue, // 阻塞队列,存放提交的任务 ThreadFactory threadFactory, // 线程工厂,创建线程时用到 RejectedExecutionHandler handler)
拒绝策略有四个实现类:
ThreadPoolExecutor.AbortPolicy
:让调用者抛出RejectedExecutionException
异常(默认策略)ThreadPoolExecutor.CallerRunsPolicy
:将某些任务回退到调用者,让调用者运行ThreadPoolExecutor.DiscardPolicy
:直接丢弃任务ThreadPoolExecutor.DiscardOldestPolicy
:放弃队列中最早的任务,把当前任务加入队列中尝试再次提交其他框架的拒绝策略:
Dubbo:抛出异常前记录日志,并dump线程栈信息,方便定位问题 Netty:创建一个新线程来执行任务 ActiveMQ:超时等待(60s),尝试放入队列 PinPoint:使用一个拒绝链,逐一尝试每种拒绝策略 工作原理:
创建线程池(此时还没有创建线程)
当调用execute()方法添加一个任务时:
如果当前线程数少于corePoolSize,马上创建线程运行任务
如果当前线程数大于等于corePoolSize,将这个任务放入队列
如果队列满了,但运行线程数还小于maximumPoolSize,创建非核心线程 立即运行这个任务(对阻塞队列的任务来说不公平)
如果队列满了,且运行线程数大于等于maximumPoolSize,线程池会启动拒绝策略
当一个线程完成任务,会从队列中取下一个任务来执行
当线程空闲时间超过keepAliveTime,会销毁线程(令线程数量收缩到corePoolSize大小)
Executors 提供了四种线程池的创建:
newFixedThreadPool
:创建具有n个线程的线程池核心线程数 = 最大线程数(所以无需超时时间) 单向链表实现阻塞队列,默认大小Integer.MAX_VALUE
(无界的),任务较多可能OOM 适用于任务量已知,相对耗时的长期任务 1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
newCachedThreadPool
:创建一个可扩容的线程池核心线程数是0,最大线程数是29个1,全是救急线程(60s后可以回收) SynchronousQueue
作为阻塞队列,没有容量适合任务数较密集,但每个任务执行时间较短的情况 1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
newSingleThreadExecutor
:创建只有一个线程的线程池1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
开发要求 阿里巴巴开发手册:
线程必须通过线程池提供 线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor
的方式 线程池多大合适:
总线程数是核心线程池数量的两倍 核心线程数常用公式CPU密集型任务:N+1 IO密集型任务:2N 或者 N/(1-阻塞系数),阻塞系数在0.8-0.9之间 提交方法 ExecutorService
类API:
方法 说明 void execute(Runnable command)
执行任务(Executor 类 API) Future<?> submit(Runnable task)
提交任务 task() Future submit(Callable<T> task)
提交任务 task,用返回值 Future 获得任务执行结果 List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
提交 tasks 中所有任务 List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
提交 tasks 中所有任务,超时时间针对所有task,超时会取消没有执行完的任务,并抛出超时异常 T invokeAny(Collection<? extends Callable<T>> tasks)
提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
execute只能执行Runnable类型的任务,没有返回值 submit既能提交Runnable类型任务,也能提交Callable类型任务底层封装成FutureTask,然后调用execute执行 execute直接抛出任务执行时的异常 submit会吞掉异常,可通过Future的get方法将任务执行时的异常重新抛出 关闭方法 ExecutorService
类API:
方法 说明 void shutdown()
线程池状态变为SHUTDOWN,等待任务执行完后关闭线程池,不会接收新任务,但已提交任务会执行完,而且也可以添加线程(不绑定任务) List<Runnable> shutdownNow()
线程池状态变为 STOP,用 interrupt 中断正在执行的任务,直接关闭线程池,不会接收新任务,会将队列中的任务返回 boolean isShutdown()
不在RUNNING状态的线程池,此执行者已被关闭,方法返回 true boolean isTerminated()
线程池状态是否是TERMINATED,如果所有任务在关闭后完成,返回 true boolean awaitTermination(long timeout, TimeUnit unit)
调用shutdown后,由于调用线程不会等待所有任务运行结束,如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
处理异常 execute会直接抛出任务执行时的异常,submit会吞掉异常
方法一:主动捕获异常
1 2 3 4 5 6 7 8 9 ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.submit(() -> { try { int i = 1 / 0 ; } catch (Exception e) { e.printStackTrace(); } });
方法二:使用Future对象
1 2 3 4 5 6 7 8 9 10 11 ExecutorService executorService = Executors.newSingleThreadExecutor();Future<?> future = executorService.submit(() -> { int i = 1 / 0 ; }); try { future.get(); } catch (ExecutionException e) { e.printStackTrace(); }
工作原理 状态信息 ThreadPoolExecutor
使用int的高3位来表示线程池状态,低29位表示线程数量 。
存储在原子变量ctl
中,可以用一次CAS原子操作修改
1 2 3 private final AtomicInteger ctl = new AtomicInteger (ctlOf(RUNNING, 0 ));private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ;
1 2 3 4 5 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
状态 高3位 接收新任务 处理阻塞任务队列 说明 RUNNING 111 Y Y SHUTDOWN 000 N Y 不接收新任务,但处理阻塞队列剩余任务 STOP 001 N N 中断正在执行的任务,并抛弃阻塞队列任务 TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结 TERMINATED 011 - - 终止状态
1 private static int workerCountOf (int c) { return c & CAPACITY; }
1 2 private static int ctlOf (int rs, int wc) { return rs | wc; }
1 2 3 4 5 6 7 8 9 10 11 12 13 private static boolean runStateLessThan (int c, int s) { return c < s; } private static boolean runStateAtLeast (int c, int s) { return c >= s; } private static boolean isRunning (int c) { return c < SHUTDOWN; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private boolean compareAndIncrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect + 1 ); } private boolean compareAndDecrementWorkerCount (int expect) { return ctl.compareAndSet(expect, expect - 1 ); } private void decrementWorkerCount () { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
成员属性 成员变量:
1 private final HashSet<Worker> workers = new HashSet <Worker>();
1 2 private final ReentrantLock mainLock = new ReentrantLock ();
1 2 private final Condition termination = mainLock.newCondition();
1 2 3 4 5 6 7 8 9 10 11 private volatile int corePoolSize;private volatile int maximumPoolSize;private volatile long keepAliveTime;private volatile ThreadFactory threadFactory; private final BlockingQueue<Runnable> workQueue; private volatile RejectedExecutionHandler handler; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy (); private int largestPoolSize; private long completedTaskCount;
1 private volatile boolean allowCoreThreadTimeOut;
内部类:
Worker类每个Worker会绑定一个初始任务,启动Worker时优先执行。 Worker继承自AQS,本身具有锁的特性,采用独占锁模式(state=0未占用,state>0被占用,state<0表示初始状态,不能被抢锁) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { setState(-1 ); this .firstTask = firstTask; this .thread = getThreadFactory().newThread(this ); } protected boolean tryAcquire (int unused) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } }
1 2 3 4 5 6 7 8 9 10 11 12 public Thread newThread (Runnable r) { Thread t = new Thread (group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon(false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
1 2 3 4 CallerRunsPolicy AbortPolicy DiscardPolicy DiscardOldestPolicy
成员方法 提交方法 ThreadPoolExecutor
=>AbstractExecutorService
=>ExecutorService
=>Executor
submit()
方法,将提交的Runnable或者Callable封装为FutureTask执行。会返回任务对象,通过get阻塞获取结果1 2 3 4 5 6 7 8 9 10 11 12 13 14 public Future<?> submit(Runnable task) { if (task == null ) throw new NullPointerException (); RunnableFuture<Void> ftask = newTaskFor(task, null ); execute(ftask); return ftask; } public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException (); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
1 2 3 4 5 6 7 8 protected <T> RunnableFuture<T> newTaskFor (Runnable runnable, T value) { return new FutureTask <T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask <T>(callable); }
execute()
方法,执行任务,但是没有返回值。出现异常会直接抛出只能提交Runnable,或者FutureTask 有担保机制,保证线程池至少有一个worker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public void execute (Runnable command) { if (command == null ) throw new NullPointerException (); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true )) return ; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get() if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } else if (!addWorker(command, false )) reject(command); }
添加线程 prestartAllCoreThreads()
方法,提前预热,创建所有的核心线程1 2 3 4 5 6 7 8 public int prestartAllCoreThreads () { int n = 0 ; while (addWorker(null , true )) ++n; return n; }
addWorker()
方法:添加线程到线程池,返回true表示成功,且线程启动。首先判断线程池是否允许添加线程,允许线程数量+1,然后创建Worker加入线程池 SHUTDOWN状态也能添加线程,但是要求新的Worker没有firstTask
,且当前queue不为空,来帮助执行队列中的任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false ; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { w = new Worker (firstTask); final Thread t = w.thread; if (t != null ) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { if (t.isAlive()) throw new IllegalThreadStateException (); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true ; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private void addWorkerFailed (Worker w) { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (w != null ) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
运行方法 Worker
=>Runnable & AQS
Worker实现了Runnable接口,线程启动时会调用Worker的run方法 1 2 3 4 public void run () { runWorker(this ); }
runWorker方法,会一直while循环获取任务并执行 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; w.unlock(); boolean completedAbruptly = true ; try { while (task != null || (task = getTask()) != null ) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error (x); } finally { afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { processWorkerExit(w, completedAbruptly); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public void unlock () { release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; }
getTask()
:获取任务,线程空闲时间超过keepAliveTime就会回收,判断依据时当前线程阻塞获取任务超过保活时间 方法返回null,代表当前线程要被回收了,进入退出逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private Runnable getTask () { boolean timedOut = false ; for (;;) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
processWorkerExit()
方法:线程退出线程池,也有担保机制1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void processWorkerExit (Worker w, boolean completedAbruptly) { if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; } addWorker(null , false ); } }
停止方法 shutdown()
方法:停止线程池,会等待执行完成1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow()
方法:直接关闭线程池,不会等待执行完成1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
tryTerminate()
方法:设置为TERMINATED状态1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 final void tryTerminate () { for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return ; if (workerCountOf(c) != 0 ) { interruptIdleWorkers(ONLY_ONE); return ; } final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0 ))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0 )); termination.signalAll(); } return ; } } finally { mainLock.unlock(); } } }
FutureTask 基本使用 FutureTask是未来任务对象,继承Runnable、Future接口,用于包装Callable对象,实现任务的提交
1 2 3 4 5 Callable<String> stringCallable = () -> "Hello" ; FutureTask<String> task = new FutureTask <>(stringCallable); new Thread (task).start();String s = task.get();
构造方法:
1 2 3 4 5 6 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException (); this .callable = callable; this .state = NEW; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; } public static <T> Callable<T> callable (Runnable task, T result) { if (task == null ) throw new NullPointerException (); return new RunnableAdapter <T>(task, result); } static final class RunnableAdapter <T> implements Callable <T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this .task = task; this .result = result; } public T call () { task.run(); return result; } }
成员属性 FutureTask类的成员属性
1 2 3 4 5 6 7 8 9 10 11 private volatile int state; private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ;
1 private Callable<V> callable;
1 private volatile Thread runner;
1 private volatile WaitNode waiters;
1 2 3 4 5 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
成员方法 任务执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
FutureTask::run()
方法,设置返回值1 2 3 4 5 6 7 8 protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } }
FutureTask::setException()
方法,设置异常返回值1 2 3 4 5 6 7 8 protected void setException (Throwable t) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL); finishCompletion(); } }
FutureTask::finishCompletion()
方法,唤醒get阻塞线程1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void finishCompletion () { for (WaitNode q; (q = waiters) != null ;) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (;;) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; }
FutureTask::handlePossibleCancellationInterrupt()
方法,任务中断处理1 2 3 4 5 private void handlePossibleCancellationInterrupt (int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield (); }
结果获取:
FutureTask::get()
方法,获取线程执行的返回值。可以有多个线程get1 2 3 4 5 6 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); }
FutureTask::awaitDone()
方法,get线程封装成WaitNode
对象,进入阻塞队列等待1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException (); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield (); else if (q == null ) q = new WaitNode (); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } }
FutureTask::report()
方法,封装运行结果(成员变量outcome)1 2 3 4 5 6 7 8 private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException (); throw new ExecutionException ((Throwable)x); }
FutureTask::cancel()
方法,任务取消,打断正在执行该任务的线程1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
任务调度 Timer Timer实现定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Timer timer = new Timer ();TimerTask task1 = new TimerTask () { @Override public void run () { System.out.println("task1" ); Thread.sleep(2000 ); } }; TimerTask task2 = new TimerTask () { @Override public void run () { System.out.println("task2" ); Thread.sleep(1000 ); } }; timer.schedule(task1, 1000 ); timer.schedule(task2, 1000 );
Scheduled 任务调度线程池ScheduledThreadPoolExecutor
,继承ThreadPoolExecutor
使用内部类ScheduledFutureTask
,封装任务 使用内部类DelayedWorkQueue
,作为线程池队列 重写onShutdown
方法,去处理shutdown后的任务 提供decorateTask
方法,作为ScheduledFutureTask
的修饰方法,以便开发者进行扩展 构造方法:
1 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10 );
1 2 3 4 5 6 7 8 9 10 11 public static ScheduledExecutorService newScheduledThreadPool (int corePoolSize) { return new ScheduledThreadPoolExecutor (corePoolSize); } public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue ()); }
常用API:
API 参数 说明 schedule
Runnable command, long delay, TimeUnit unit
执行延时任务 schedule
Callable<V> callable, long delay, TimeUnit unit
执行延时任务 scheduleAtFixedRate
Runnable command, long initialDelay, long period, TimeUnit unit
定时执行周期任务(不考虑执行的耗时) scheduleWithFixedDelay
Runnable command, long initialDelay, long delay, TimeUnit unit
定时执行周期任务(要考虑执行的耗时)
基本使用:
1 2 3 4 5 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 );executor.schedule(() -> { System.out.println("task1" ); }, 1000 , TimeUnit.MILLISECONDS);
周期任务scheduleAtFixedRate
,一次任务启动到下一次任务启动之间 ,只要大于间隔时间,抢占到CPU就会立即执行 1 2 3 4 5 6 7 8 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 );executor.scheduleAtFixedRate(() -> { System.out.println(new Date ()); Thread.sleep(2000 ); }, 1000 , 1000 , TimeUnit.MILLISECONDS);
周期任务scheduleWithFixedDelay
,一次任务结束到下一次任务开始间隔 ,严格等于initialDelay
1 2 3 4 5 6 7 8 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 );executor.scheduleWithFixedDelay(() -> { System.out.println(new Date ()); Thread.sleep(2000 ); }, 1000 , 1000 , TimeUnit.MILLISECONDS);
ForkJoin ForkJoin:线程池的实现,体现的是分治思想,用于并行计算。
任务拆分:将一个大任务拆分为算法上相同的小任务,直至不能拆分,可以直接求解。
跟递归相关的一些计算,比如归并排序,斐波那契数列都可以用分治思想。
ForkJoin在分治基础上,加入了多线程,把每个任务的分解和合并交给不同线程来完成,提高了运算效率 ForkJoin使用ForkJoinPool来启动,是一个特殊的线程池,默认创建与CPU核心数大小相同的线程池 任务有返回值继承RecursiveTask
,没有返回值继承RecursiveAction
1 2 3 ForkJoinPool pool = new ForkJoinPool ();MyTask task = new MyTask (100 );Integer result = pool.invoke(task);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class MyTask extends RecursiveTask <Integer> { private int n; public MyTask (int n) { this .n = n; } @Override protected Integer compute () { if (n == 1 ) { return n; } MyTask t1 = new MyTask (n - 1 ); t1.fork(); return n + t1.join(); } }
1 2 3 ForkJoinPool pool = new ForkJoinPool ();MyTask task = new MyTask (1 , 100 );Integer result = pool.invoke(task);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class MyTask extends RecursiveTask <Integer> { private int begin; private int end; public MyTask (int begin, int end) { this .begin = begin; this .end = end; } @Override protected Integer compute () { if (begin == end) { return begin; } if (end - begin == 1 ) { return begin + end; } int mid = (begin + end) / 2 ; MyTask t1 = new MyTask (begin, mid); MyTask t2 = new MyTask (mid + 1 , end); t1.fork(); t2.fork(); return t1.join() + t2.join(); } }
ForkJoinPool实现了工作窃取算法,来提高CPU利用率
每个线程都维护了一个双端队列,用来存储需要执行的任务 工作窃取算法:允许空闲的线程,从其他线程双端队列中窃取任务来执行窃取的必须是最晚的任务,避免和队列所属线程发生竞争(但是队列中只有一个任务时,还是会发生竞争) 享元模式 享元模式 :用于减少创建对象的数量,以减少内存占用和提高性能。
异步模式 :让有限的工作线程,轮流异步处理无限多的任务(典型实现:线程池)
工作机制 :享元模式,尝试重用现有的同类对象,如果未匹配才创建新对象