publicvoidrun(){ if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable;// 全局变量,构造函数赋值 if (c != null && state == NEW) { V result; boolean ran; try { result = c.call();// callable的返回值 ran = true;// Callable计算已经结束了 } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
privatevoidfinishCompletion(){ // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { //唤醒队列中等待的线程 Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);//唤醒线程,加锁在get里面,如果没有加锁,正常向下执行 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } }
done();// protected方法,钩子函数,留给程序员自己实现
callable = null; // to reduce footprint }
四、 get()方法
1 2 3 4 5 6 7
public V get()throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //计算没完成,进入awaitDone s = awaitDone(false, 0L); return report(s); }
int s = state; if (s > COMPLETING) { // 计算完成了就返回 if (q != null) q.thread = null; return s; } // COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环 elseif (s == COMPLETING) // cannot time out yet Thread.yield(); elseif (q == null) q = new WaitNode(); elseif (!queued) /* 当前节点未入栈 * 这是Treiber Stack算法入栈的逻辑。 * Treiber Stack是一个基于CAS的无锁并发栈实现, * 更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); elseif (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { // 超时,移除节点 removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
4.2 report()
report把结果赋值回去
1 2 3 4 5 6 7 8
private V report(int s)throws ExecutionException { Object x = outcome; if (s == NORMAL)//正常得到了结果 return (V)x; if (s >= CANCELLED) thrownew CancellationException(); thrownew ExecutionException((Throwable)x); }