// 公平锁会进行判断:判断为false才抢占锁 // 判断 当前阻塞队列 是否有值 publicfinalbooleanhasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Nodet= tail; // Read fields in reverse initialization order Nodeh= head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
ReentrantReadWriteLock
state:高16位,共享锁;低16位,独占锁
WriteLock
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
finalbooleantryWriteLock() { Threadcurrent= Thread.currentThread(); intc= getState(); if (c != 0) { intw= exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) returnfalse; if (w == MAX_COUNT) thrownewError("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) returnfalse; setExclusiveOwnerThread(current); returntrue; }
protectedbooleantryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { intc= getState(); if (c == 0) returnfalse; intnextc= c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
// 获取共享锁 -- publicvoidawait()throws InterruptedException { sync.acquireSharedInterruptibly(1); /*public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 调用重写逻辑,当 state > 0 时,返回-1, 加入同步队列,并阻塞 // 当state == 0时,获取锁成功,继续执行,不阻塞 return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } */ }
/** * Thread state for a waiting thread. * A thread is in the waiting state due to calling one of the * following methods: * <ul> * <li>{@link Object#wait() Object.wait} with no timeout</li> * <li>{@link #join() Thread.join} with no timeout</li> * <li>{@link LockSupport#park() LockSupport.park}</li> * </ul> * * <p>A thread in the waiting state is waiting for another thread to * perform a particular action. 等待唤醒,wait,join,park ---释放锁 */ WAITING,
publicclassThreadimplementsRunnable { publicsynchronizedvoidstart() { /** * This method is not invoked for the main method thread or "system" * group threads created/set up by the VM. Any new functionality added * to this method in the future may have to also be added to the VM. * * A zero status value corresponds to state "NEW". */ if (threadStatus != 0) thrownewIllegalThreadStateException();
/* Notify the group that this thread is about to be started * so that it can be added to the group's list of threads * and the group's unstarted count can be decremented. */ group.add(this);
booleanstarted=false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) { /* do nothing. If start0 threw a Throwable then it will be passed up the call stack */ } } }
// 异常处理类 @FunctionalInterface publicinterfaceUncaughtExceptionHandler { /** * Method invoked when the given thread terminates due to the * given uncaught exception. * <p>Any exception thrown by this method will be ignored by the * Java Virtual Machine. * @param t the thread * @param e the exception */ voiduncaughtException(Thread t, Throwable e); } }
/** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ // key 为弱引用 staticclassEntryextendsWeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value;
V get()throws InterruptedException, ExecutionException; //获得任务计算结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;//可等待多少时间去获得任务计算结果 }
RunnableFuture
1 2 3 4 5 6 7
publicinterfaceRunnableFuture<V> extendsRunnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ voidrun(); }
privatevolatileint state; privatestaticfinalintNEW=0; privatestaticfinalintCOMPLETING=1; privatestaticfinalintNORMAL=2; privatestaticfinalintEXCEPTIONAL=3; privatestaticfinalintCANCELLED=4; privatestaticfinalintINTERRUPTING=5; privatestaticfinalintINTERRUPTED=6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ privatevolatile Thread runner; /** Treiber stack of waiting threads */ privatevolatile WaitNode waiters;
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")}, * or the security manager's {@code checkAccess} method * denies access. */ voidshutdown();
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. For example, typical * implementations will cancel via {@link Thread#interrupt}, so any * task that fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution * @throws SecurityException if a security manager exists and * shutting down this ExecutorService may manipulate * threads that the caller is not permitted to modify * because it does not hold {@link * java.lang.RuntimePermission}{@code ("modifyThread")}, * or the security manager's {@code checkAccess} method * denies access. */ List<Runnable> shutdownNow();
/** * Returns {@code true} if this executor has been shut down. */ booleanisShutdown();
/** * @return {@code true} if all tasks have completed following shut down 返回是否为 Terminated状态(在shutdown的时候,所有的线程都已完成) 除非 首先shutdown/shutdownnow ---(没有一个线程提交,就关闭),否则返回的一直为false */ booleanisTerminated();
/** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. 在shutdown后,有限等待,所有的线程完成 */ booleanawaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
/** * <p> * If you would like to immediately block waiting * for a task, you can use constructions of the form * {@code result = exec.submit(aCallable).get();} * * <p>Note: The {@link Executors} class includes a set of methods * that can convert some other common closure-like objects, * for example, {@link java.security.PrivilegedAction} to * {@link Callable} form so they can be submitted.
*/ <T> Future<T> submit(Callable<T> task);
/** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's {@code get} method will * return the given result upon successful completion. 返回future能够获取结果 */ <T> Future<T> submit(Runnable task, T result);
/** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's {@code get} method will * return {@code null} upon <em>successful</em> completion. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<?> submit(Runnable task);
/** * Executes the given tasks, returning a list of Futures holding * their status and results when all complete. * {@link Future#isDone} is {@code true} for each * element of the returned list. * Note that a <em>completed</em> task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress. */ <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
/** * Executes the given tasks, returning a list of Futures holding * their status and results * when all complete or the timeout expires, whichever happens first. * {@link Future#isDone} is {@code true} for each * element of the returned list. * Upon return, tasks that have not completed are cancelled. * Note that a <em>completed</em> task could have * terminated either normally or by throwing an exception. * The results of this method are undefined if the given * collection is modified while this operation is in progress.
/** * Executes the given tasks, returning the result * of one that has completed successfully (i.e., without throwing * an exception), if any do. Upon normal or exceptional return, * tasks that have not completed are cancelled. * The results of this method are undefined if the given * collection is modified while this operation is in progress. 只要有一个完成,就不会抛异常 */ <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
/** * Executes the given tasks, returning the result */ <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) addWorker(null, false); } elseif (!addWorker(command, false)) reject(command); } }
/* The runState provides the main lifecycle control, taking on values: * * RUNNING: Accept new tasks and process queued tasks 可以接收新任务,处理消息队列任务 * SHUTDOWN: Don't accept new tasks, but process queued tasks 不可以接收新任务,处理消息队列任务 * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks 不可以接收新任务,不处理消息队列任务,暂停正在运行任务 * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method 所有的任务已经暂停,工作队列未0,过度到 TIDING,执行钩子方法terminated() * TERMINATED: terminated() has completed 钩子方法terminated()执行完毕 */ publicvoidshutdown(); /// 优雅关闭, SHUTDOWN: 不可以接收新任务,处理消息队列任务 public List<Runnable> shutdownNow(); // STOP: 不可以接收新任务,不处理消息队列任务,暂停正在运行任务
/** * Creates and executes a one-shot action that becomes enabled * after the given delay. */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * Creates and executes a ScheduledFuture that becomes enabled after the * given delay */ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * subsequently with the given * period; that is executions will commence after * {@code initialDelay} then {@code initialDelay+period}, then * {@code initialDelay + 2 * period}, and so on. 固定周期执行 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** 间隔 执行 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduleFromEventLoop(task); } else { finallongdeadlineNanos= task.deadlineNanos(); // task will add itself to scheduled task queue when run if not expired if (beforeScheduledTaskSubmitted(deadlineNanos)) { execute(task); } else { lazyExecute(task); // Second hook after scheduling to facilitate race-avoidance if (afterScheduledTaskSubmitted(deadlineNanos)) { execute(WAKEUP_TASK); } } }
/** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); }
/** * Thread factory capturing access control context and class loader */ staticclassPrivilegedThreadFactoryextendsDefaultThreadFactory { privatefinal AccessControlContext acc; privatefinal ClassLoader ccl;
PrivilegedThreadFactory() { super(); SecurityManagersm= System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);