JUC

JUC

打通 Java 任督二脉 —— 并发数据结构的基石 - 掘金 (juejin.cn)

图片

—————《同步锁》 —————

AQS

image-20230419165020363

image-20230411230408908

LockSupport.park() / unpark — Condition等待队列的主要实现

不会释放锁

Object.wait() / notify() / notifyAll()

会释放锁

Condition.await() / signal() / signalAll()

AbstractOwnableSynchronizer

独占锁 –

1
2
3
4
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 设置独占线程
private transient Thread exclusiveOwnerThread;
}

image-20230411221327532

AbstractQueuedSynchronizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

private transient volatile Node head;

/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;

// 状态 -- 可重入锁,不断累加 / 读写锁,分为读高位 写低位
private volatile int state;

// 尝试获取锁 --》 失败后;加入队列
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


// 可打断获取锁
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
}

Node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class Node {
// 共享锁
static final Node SHARED = new Node();
// 排他锁
static final Node EXCLUSIVE = null;
// 被取消 ---不执行,移除同步队列
static final int CANCELLED = 1;
// 当前持有锁线程,需要唤醒后面的线程
static final int SIGNAL = -1;
// 任务处于等待队列,可以被唤醒
static final int CONDITION = -2;
// 下一个 请求共享锁的应当被无条件的传播 ---用于读锁传递
static final int PROPAGATE = -3;
// 等待状态 上面的int
volatile int waitStatus;

volatile Node prev;
volatile Node next;
// 当前线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
}

ConditionObject

维持等待队列,利用Lock

1
2
3
4
5
6
7
8
// 继承 Condition接口,实现 条件等待队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}

Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Condition {
//
void await() throws InterruptedException;
// 可以获取线程打断状态 true
void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒等待队列的第一个
void signal();
// 遍历等待等待队列,修改每一个线程的状态,并执行unpark(thread)方法
void signalAll();
}d

持有同步队列

Sync

实现了AbstractQueuedSynchronizer类,从而通过 AQS的同步队列 (双链表)

image-20230411223018624

image-20230411223456375

Lock

实现锁逻辑

1
2
3
4
5
6
7
8
9
10
11
public interface Lock {
// 获取锁
void lock();
// 获取可打断的锁
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
// 创建条件类
Condition newCondition();
}

image-20230411222741169

ReentrantLock

Sync同步队列

调用逻辑

1
ReentrantLock ## lock() == AQS # acquire()  ===> ReentrantLock ## tryAquick()

公平锁 – FairSync 和 非公平锁(默认)对比

第一次进入队列,就会直接进行锁的竞争,

第一进入队列, 尝试获取锁时, 先检查 AQS 队列中是否有前驱节点, 没有才去竞争

Fair1:

1
2
3
4
5
6
7
8
9
10
11
12
13
// fair  尝试请求锁逻辑
final void lock() {
acquire(1);
}

// unfair
// 尝试直接获取锁逻辑
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

Fair2:

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// fair 
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 当前state== 0 ,无人 抢占锁
if (c == 0) {
// fair2 : 当前队列没有其他节点,才尝试获取锁
// 尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} // 当前锁对象 为 当前线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 否则加入队列
return false;
}



// unfair
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

hasQueuedPredecessors()

1
2
3
4
5
6
7
8
9
10
11
12
// 公平锁会进行判断:判断为false才抢占锁
// 判断 当前阻塞队列 是否有值
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

image-20230411223306110

ReentrantReadWriteLock

state:高16位,共享锁;低16位,独占锁

WriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}

ReadLock

共享锁–可以直接通过 改变statue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

StampedLock

WriteLockView

ReadLockview

CountDownLatch

一次性,设置Sync(同步队列)的state == int值,state

逻辑:通过重写AQS的tryAcquireShared() 和 tryReleaseShared()实现,state不为0阻塞,state为0运行

即当前等待线程小于 count,阻塞,否则全部运行

1
2
作用:CountDownLatch则用于线程的协同操作,CountDownLatch常常用于把任务分解为子任务,在主线程中等待子任务完成,并重组子任务的解。
所有线程 等待计数器的值变为零再继续执行 ---CountDownLatch是一个 一次性的类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

// 设置state 共享锁的数量
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 重写共享锁的逻辑 -- 实现自定逻辑
//
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}


// 获取共享锁 --
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
/*public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 调用重写逻辑,当 state > 0 时,返回-1, 加入同步队列,并阻塞
// 当state == 0时,获取锁成功,继续执行,不阻塞
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
*/
}

// 释放共享锁
public void countDown() {
sync.releaseShared(1);
/*public final boolean releaseShared(int arg) {
// 调用重写逻辑,当state == 0时,释放共享锁失败,
// 当state > 0时,state 减一;减完为0,则释放共享锁,否则释放锁失败
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
*/
}

image-20230419161517357

Semaphore

获取到信号量,运行,典型的共享锁,primit == state (共享锁数量) –

作用:Semaphore用于对资源访问的并发控制’

1
2
3
4
5
6
7
8
9
10
11
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
// 只要remaining >= 0, 则return true,并修改state,不阻塞,获取锁成功
// 否则,加入同步队列
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

LimitLatch

1
2
package org.apache.tomcat.util.threads;
当count <= limit时,继续运行,否则多出来的部分阻塞; 用于控制连接数

image-20230419161149002

CyclicBarrier

Java多线程实战|CyclicBarrier原理介绍及使用场景 - 掘金 (juejin.cn)

count == await的thread,运行

1
2
作用:等待指定个数的线程到达 Barrier 的位置再一起继续执行。
可以传入runnable,在所有线程都到达 barrier时,先执行runnable方法,再执行所有子线程方法(取消阻塞)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
//检查当前栅栏是否被打翻
if (g.broken) {
throw new BrokenBarrierException();
}
//检查当前线程是否被中断
if (Thread.interrupted()) {
//如果当前线程被中断会做以下三件事
//1.打翻当前栅栏
//2.唤醒拦截的所有线程
//3.抛出中断异常
breakBarrier();
throw new InterruptedException();
}
//每次都将计数器的值减1
int index = --count;
//计数器的值减为0则需唤醒所有线程并转换到下一代
if (index == 0) {
boolean ranAction = false;
try {
//唤醒所有线程前先执行指定的任务
final Runnable command = barrierCommand;
if (command != null) {
command.run();
}
ranAction = true;
//唤醒所有线程并转到下一代
nextGeneration();
return 0;
} finally {
//确保在任务未成功执行时能将所有线程唤醒
if (!ranAction) {
breakBarrier();
}
}
}

//如果计数器不为0则执行此循环
for (;;) {
try {
//根据传入的参数来决定是定时等待还是非定时等待
if (!timed) {
trip.await();
}else if (nanos > 0L) {
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
//若当前线程在等待期间被中断则打翻栅栏唤醒其他线程
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
//若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
Thread.currentThread().interrupt();
}
}
//如果线程因为打翻栅栏操作而被唤醒则抛出异常
if (g.broken) {
throw new BrokenBarrierException();
}
//如果线程因为换代操作而被唤醒则返回计数器的值
if (g != generation) {
return index;
}
//如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

image-20230419161109632

image-20230419161354065

锁原理实现

LockSupport

LockSupport是Java并发工具包(java.util.concurrent)中的一个类,它提供了线程阻塞和唤醒的能力。下面是LockSupport的主要方法介绍:

  1. park(): 当前线程阻塞,直到被其他线程唤醒或被中断。
  2. parkNanos(long nanos): 当前线程阻塞,最多阻塞指定的纳秒数,直到被其他线程唤醒、被中断或超时。
  3. parkUntil(long deadline): 当前线程阻塞,直到被其他线程唤醒、被中断或达到指定的时间戳。
  4. unpark(Thread thread): 唤醒指定线程,如果该线程之前被阻塞的话。

LockSupport的实现原理是基于操作系统提供的底层线程阻塞和唤醒机制。在调用park()方法时,当前线程会被阻塞,直到被其他线程调用unpark()方法唤醒。unpark()方法可以在park()方法调用之前或之后调用,因此可以先唤醒线程再阻塞线程。park()方法也可以被中断,即其他线程调用当前线程的interrupt()方法来中断阻塞状态。

LockSupport的优点是它比传统的wait()notify()方法更加灵活和可靠。它不依赖于对象的监视器(monitor),因此可以在任何时候使用,而不需要获取对象的锁。此外,LockSupport的阻塞和唤醒操作是一对一的,不会出现信号丢失的情况。

采用LockSupport来实现锁的原因是它提供了更底层的线程阻塞和唤醒机制,可以更灵活地控制线程的阻塞和唤醒。相比传统的锁机制,LockSupport的使用更加简单,不需要在同步块中调用wait()notify()方法,减少了出错的可能性。此外,LockSupport还可以避免线程因为等待锁而进入阻塞状态时,被其他线程调用interrupt()方法中断而抛出InterruptedException异常的情况。因此,LockSupport是实现锁的一种更可靠和高效的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public class LockSupport {
private LockSupport() {} // Cannot be instantiated.

// 1、
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
//
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

// 2、
private static void setBlocker(Thread t, Object arg) {
// Even though volatile, hotspot doesn't need a write barrier here.
UNSAFE.putObject(t, parkBlockerOffset, arg);
}

///
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread);
}

}

关键字

JMM 内存模型

面试官问我什么是JMM - 知乎 (zhihu.com)

  1. 原子性,简单说就是相关操作不会中途被其他线程干扰,一般通过同步机制实现。
  2. 可见性,是一个线程修改了某个共享变量,其状态能够立即被其他线程知晓,通常被解释为将 线程本地状态反映到主内存上,volatile 就是负责保证可见性的。
  3. 有序性,是保证线程内串行语义,避免指令重排等。

Synchronized

偏向锁

自旋锁

重量锁

Monitor

image-20230411222154076

Volatile

内存屏障

不能保证原子性

内存屏障

面试官问我什么是JMM - 知乎 (zhihu.com)

  • ·在每个volatile写操作的前面插入一个StoreStore屏障。
  • ·在每个volatile写操作的后面插入一个StoreLoad屏障。
  • ·在每个volatile读操作的后面插入一个LoadLoad屏障。
  • ·在每个volatile读操作的后面插入一个LoadStore屏障。

image-20231003153813913

img

img

Concurrent Collection(并发集合)

LongAdder为什么比AtomicLong块
LongAdder的基本思路就是分散热点,将value的值分散到一个Cell数组中,不同的线程会命中不同的Cell,各个现在只对自己槽中的那个值进行Cas操作,这样热点就被分散了,冲突的概率小很多,想要获得真正的long值,只要将各个槽中的变量值累加返回就行。

sum()方法会将Cell数组中的value和base累加作为返回值。

核心思想就是将AtomicLong的一个value的更新压力分散到多个vulue中去

———–《线程 - Thread》 ———–

Thread-Object的源码 和 方法

Object. 和 Thread

在Java中,ObjectThread类提供了一些操作和方法,用于处理对象、线程、并发以及更改线程状态。下面是它们的一些常用方法和功能的介绍:

Object类的常用操作和方法:

  1. wait(): 使当前线程进入等待状态,直到其他线程调用该对象的notify()notifyAll()方法唤醒它。
  2. notify(): 唤醒在该对象上等待的单个线程。
  3. notifyAll(): 唤醒在该对象上等待的所有线程。
  4. synchronized: 用于实现对象的同步,确保在同一时间只有一个线程可以访问被同步的代码块或方法。

Thread类的常用操作和方法:

  1. start(): 启动线程,使其进入可运行状态,并自动调用线程的run()方法。
  2. run(): 线程的执行逻辑,需要在子类中重写。
  3. sleep(): 使当前线程暂停执行指定的时间。
  4. join(): 等待该线程终止。
  5. interrupt(): 中断线程,给线程发送中断信号。
  6. isInterrupted(): 判断线程是否被中断。
  7. yield(): 暂停当前线程的执行,让其他线程有机会执行。

并发和更改线程状态的方法:

  1. wait(), notify(), notifyAll(): 这些方法可以用于实现线程之间的协作和同步。
  2. synchronized: 用于实现线程的互斥访问,确保在同一时间只有一个线程可以访问被同步的代码块或方法。
  3. volatile: 用于保证线程之间的可见性,即一个线程对volatile变量的修改对其他线程是可见的。
  4. LockCondition接口:提供了更灵活的线程同步和条件等待的机制。
  5. Thread.getState(): 获取线程的状态,如NEWRUNNABLEBLOCKEDWAITINGTIMED_WAITINGTERMINATED

这些操作和方法可以帮助我们实现线程的同步、协作和控制,以及处理并发编程中的各种情况和需求。

状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public enum State {
// 未start的thread
NEW,
// 可运行,但是未运行 Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”
// 在jvm中运行,但是在等待资源 或 调度
RUNNABLE,

/**
没有锁
* 想要获取monitor的锁,阻塞在monitor中 -- Object的monitor锁
synchronized锁 就是通过锁对象的 Object底层monitor对象实现
* {@link Object#wait() Object.wait}.
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
等待唤醒,wait,join,park ---释放锁
*/
WAITING,

/**
* 有时间限制的等待,未释放锁
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,

/**
* 线程执行完毕,暂停线程
*/
TERMINATED;
}

1、初始(NEW):新创建了一个线程对象,但还没有调用start()方法。
2、运行(RUNNABLE):Java线程中将就绪(ready)和运行中(running)两种状态笼统的称为“运行”。线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法。此时处于就绪状态(ready)。就绪状态的线程在获得CPU时间片后变为运行中状态(running)。
3、阻塞(BLOCKED):表示线程阻塞于锁。
4、等待(WAITING):进入该状态的线程需要等待其他线程做出一些特定动作(通知或中断)。
5、超时等待(TIMED_WAITING):该状态不同于WAITING,它可以在指定的时间后自行返回。
6、终止(TERMINATED):表示该线程已经执行完毕

Thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class Thread implements Runnable {
public synchronized void start() {
/**
* This method is not invoked for the main method thread or "system"
* group threads created/set up by the VM. Any new functionality added
* to this method in the future may have to also be added to the VM.
*
* A zero status value corresponds to state "NEW".
*/
if (threadStatus != 0)
throw new IllegalThreadStateException();

/* Notify the group that this thread is about to be started
* so that it can be added to the group's list of threads
* and the group's unstarted count can be decremented. */
group.add(this);

boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}

@Override
public void run() {
if (target != null) {
target.run();
}
}

// 异常处理类
@FunctionalInterface
public interface UncaughtExceptionHandler {
/**
* Method invoked when the given thread terminates due to the
* given uncaught exception.
* <p>Any exception thrown by this method will be ignored by the
* Java Virtual Machine.
* @param t the thread
* @param e the exception
*/
void uncaughtException(Thread t, Throwable e);
}
}

ThreaLocal

ThreadLocal,一篇文章就够了 - 知乎 (zhihu.com)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public T get() {
// 1、获取当前thread 的内部属性:threadloaclmap
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
// 2、从 map 中获取,当前threadLocal 为 key 的value
// (一个threadLocal 一个 key)
// (内存泄漏,当key 被回收,value 不会被回收)
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

ThreadLocalMap

  • thread 的内部属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static class ThreadLocalMap {

/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
// key 为弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
}

内存泄露问题

重点来了,突然我们ThreadLocal是null了,也就是要被垃圾回收器回收了,但是此时我们的ThreadLocalMap(thread 的内部属性)生命周期和Thread的一样,它不会回收,这时候就出现了一个现象。那就是ThreadLocalMap的key没了,但是value还在,这就造成了内存泄漏。

为什么key使用弱引用?

如果使用强引用,当ThreadLocal 对象的引用(强引用)被回收了,ThreadLocalMap本身依然还持有ThreadLocal的强引用,如果没有手动删除这个key ,则ThreadLocal不会被回收,所以只要当前线程不消亡,ThreadLocalMap引用的那些对象就不会被回收, 可以认为这导致Entry内存泄漏。

img

Runnable

Callable

Future

Future 的主要缺点如下

(1)不支持手动完成

(2)Future 的结果在非阻塞的情况下,不能执行更进一步的操作

(3)不能够支持链式调用

  • 对于 Future的执行结果,我们想继续传到下一个 Future处理使用,从而形成一个链式的调用,这在 Future 中是没法实现的。
  • 链式调用就是将这一个执行结果,继续传递给下一个继续使用,形成一条链。即职责链模式,例如Web中的Filter。

(4)不支持多个 Future 合并

  • 例如我一个Futrue计算 10的平方,另一个Futrue计算100的平方,我没有办法直接将他们合起来。

(5)不支持异常处理

Future 的 API 没有任何的异常处理的 api,所以运行时,很有可能无法定位到错误。

1
2
3
4
5
6
7
8
9
10
11
12
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); //尝试取消此任务的执行。

boolean isCancelled();//如果此任务在正常完成之前被取消,则返回true

boolean isDone(); //如果此任务完成,则返回true 。 完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法将返回true

V get() throws InterruptedException, ExecutionException; //获得任务计算结果

V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;//可等待多少时间去获得任务计算结果
}

RunnableFuture

1
2
3
4
5
6
7
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class FutureTask<V> implements RunnableFuture<V> {

private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

// 初始化
public FutureTask(Runnable runnable, V result) {
// 封装为RunnableAdapter --> implement(Callable)
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// 等待
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
}

image-20230412090031552

ForkJoinTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// fork
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
// pushTask 方法把当前任务存放在 ForkJoinTask 数组队列里。然后再调用ForkJoinPool 的 signalWork()方法 //唤醒或创建一个工作线程来执行任务。代码如下:
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);//执行
}
else if (n >= m)
growArray();
}
}



public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}

}

CompletionStage

1
2
public interface CompletionStage<T> {
}

CompletableFuture

JUC系列(十一) | Java 8 CompletableFuture 异步编程 - 腾讯云开发者社区-腾讯云 (tencent.com)

概述

CompletableFuture是Java 8引入的一个用于处理异步编程的工具类。它提供了一种简洁而强大的方式来处理异步操作和并发任务。

CompletableFuture可以看作是一个可编程的Future,它可以用于表示一个异步计算的结果。与传统的Future相比,CompletableFuture提供了更多的操作和组合方式,使得异步编程更加灵活和易于管理。

以下是我对CompletableFuture的理解:

  1. 异步执行:CompletableFuture可以用于执行异步操作,它可以在后台线程中执行任务,而不会阻塞主线程。我们可以使用CompletableFuture.runAsync()CompletableFuture.supplyAsync()方法来创建一个CompletableFuture,并指定要执行的任务。
  2. 链式操作:CompletableFuture提供了一系列方法来对异步操作进行链式组合。我们可以使用thenApply()thenAccept()thenCompose()等方法来定义在异步操作完成后要执行的操作。这些方法返回一个新的CompletableFuture,可以继续进行后续的操作。
  3. 异常处理:CompletableFuture提供了异常处理的机制。我们可以使用exceptionally()handle()等方法来处理异步操作中的异常情况,并返回一个默认值或执行其他操作。
  4. 组合操作:CompletableFuture支持多个CompletableFuture的组合操作。我们可以使用thenCombine()thenAcceptBoth()allOf()anyOf()等方法来组合多个CompletableFuture的结果。
  5. 等待结果:CompletableFuture提供了一些方法来等待异步操作的结果。我们可以使用get()方法来阻塞当前线程,直到异步操作完成并返回结果。也可以使用join()方法来等待异步操作的完成,但不会抛出异常。

CompletableFuture的设计使得异步编程更加简洁和灵活。它可以帮助我们处理并发任务、异步IO、事件驱动编程等场景,提高程序的性能和响应能力。

希望以上对CompletableFuture的理解能够帮助到你。如果你有其他问题,请随时提问。

  1. 异步任务结束时,会自动回调某个对象的方法;
  2. 异步任务出错时,会自动回调某个对象的方法;
  3. 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行;

Future的增强版

1
2
3
4
5
6
7
//runAsync 返回void 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
//supplyAsync 异步返回一个结果 函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()
//Supplier 是一个函数式接口,代表是一个生成者的意思
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

image-20230412090753813

image-20230412091903787

线程中断

1
2
3
4
5
6
7
8
9
10
11
public boolean isInterrupted() {  
// 会清除状态 == 只能获取一次
return isInterrupted(false);
}

public static boolean interrupted() {
// 不会清除状态
return currentThread().isInterrupted(true);
}
//调用同一个方法,只是传参不同
private native boolean isInterrupted(boolean ClearInterrupted);

————《线程池》 —————

Executor

1
2
3
4
5
6
7
public interface Executor {

/**
执行runnable方法,没有返回值 ---submit可以铜鼓future获取返回值
*/
void execute(Runnable command);
}

image-20230419230000631

ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public interface ExecutorService extends Executor {

/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();

/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();

/**
* Returns {@code true} if this executor has been shut down.
*/
boolean isShutdown();

/**
* @return {@code true} if all tasks have completed following shut down
返回是否为 Terminated状态(在shutdown的时候,所有的线程都已完成)
除非 首先shutdown/shutdownnow ---(没有一个线程提交,就关闭),否则返回的一直为false
*/
boolean isTerminated();

/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
在shutdown后,有限等待,所有的线程完成
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

/**
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.

*/
<T> Future<T> submit(Callable<T> task);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
返回future能够获取结果
*/
<T> Future<T> submit(Runnable task, T result);

/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.

*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.

*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
只要有一个完成,就不会抛异常
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

/**
* Executes the given tasks, returning the result
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService

上面两个接口方法的默认实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractExecutorService implements ExecutorService {
// 提交任务的三种方式
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}

ThreadPoolExecutor

线程池的一种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ThreadPoolExecutor extends AbstractExecutorService {
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 高三位 存放5个 线程池状态
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;


public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}

Worker

线程的每一次执行都需要判断线程池的状态,如果 stop 或者terminal 都需要 放弃处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// runWorker() 
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}


// getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//队列为空的时候会阻塞线程
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

核心线程阻塞

1
2
3
4
5
6
7
8
9
对于Java线程池中的`getTask()`方法,确实并没有直接阻塞核心线程。核心线程的挂起是通过`workQueue.take()`方法实现的。

在`getTask()`方法中,当没有任务可执行时,会调用`workQueue.take()`方法从任务队列中获取任务。这个方法会阻塞当前线程,直到有新的任务到达队列。在任务队列为空时,核心线程会被挂起,不会消耗CPU资源。

`workQueue.take()`方法是一个阻塞方法,它会一直等待,直到有新的任务到达队列。当有新的任务到达时,它会唤醒被阻塞的线程,使其继续执行。

通过这种方式,核心线程在没有任务可执行时会被挂起,不会占用CPU资源。只有当有新的任务到达时,核心线程才会被唤醒并继续执行任务。

需要注意的是,这是Java线程池的一种实现方式,具体的实现可能会根据不同的线程池实现而有所不同。如果需要了解更多关于Java线程池的实现细节,可以参考Java官方文档或相关的资料。

状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/* The runState provides the main lifecycle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
可以接收新任务,处理消息队列任务
* SHUTDOWN: Don't accept new tasks, but process queued tasks
不可以接收新任务,处理消息队列任务
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
不可以接收新任务,不处理消息队列任务,暂停正在运行任务
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
所有的任务已经暂停,工作队列未0,过度到 TIDING,执行钩子方法terminated()
* TERMINATED: terminated() has completed
钩子方法terminated()执行完毕
*/
public void shutdown(); /// 优雅关闭, SHUTDOWN: 不可以接收新任务,处理消息队列任务
public List<Runnable> shutdownNow(); // STOP: 不可以接收新任务,不处理消息队列任务,暂停正在运行任务

参数

corepollsize : 核心池的大小,默认情况下,在创建线程池后,每当有新的任务来的时候,如果此时线程池中的线程数小于核心线程数,就会去创建一个线程执行(就算有空线程也不复用),当创建的线程数达到核心线程数之后,再有任务进来就会放入任务缓存队列中。当任务缓存队列也满了的时候,就会继续创建线程,知道达到最大线程数。如果达到最大线程数之后再有任务过来,那么就会采取拒绝服务策略。

Maximumpoolsize : 线程池中最多可以创建的线程数

keeplivetime : 线程空闲状态时,最多保持多久的时间会终止。默认情况下,当线程池中的线程数大于corepollsize 时,才会起作用 ,直到线程数不大于 corepollsize 。

workQuque: 阻塞队列,用来存放等待的任务

rejectedExecutionHandler :任务拒绝处理器(这个注意一下),有四种

Worker

继承自AQS保证线程独占执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 轮询调用task
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

BlockQueue / 线程池类型

用于将多余的任务(runnable)暂存

队列

线程池名称 使用阻塞队列 特点
newFixedThreadPool LinkedBlockingQueue() 1、核心线程数和最大线程数相同 2、由于keepAliveTime设置为0,当线程创建后会一直存在3、由于用的是无界队列所以可能会导致OOM
newSingleThreadExecutor LinkedBlockingQueue() 1、核心线程数和最大线程数都为1单线程 2、无界队列可能导致OOM
newCachedThreadPool SynchronousQueue() 1、核心线程数为0,最大线程数为Integer.MAX_VALUE 2、当没任务时线程存活时间为60秒3、使用的是0大小的队列,所以不存储任务,只做任务转发
newScheduledThreadPool new DelayedWorkQueue() 1、执行周期任务 2、无界队列,可能会导致OOM

image-20230412084310877

拒绝策略

主要有4种拒绝策略:

AbortPolicy:直接丢弃任务,抛出异常,这是默认策略
CallerRunsPolicy:只用调用者所在的线程来处理任务
DiscardOldestPolicy:丢弃等待队列中最旧的任务,并执行当前任务
DiscardPolicy:直接丢弃任务,也不抛出异常

ForkJoinPool

工作原理

ForkJoinPool 的工作原理是,每次提交一个 ForkJoinTaskForkJoinPool 中时,ForkJoinPool 将该任务拆分成更小的子任务,并将这些子任务提交到其内部的工作队列中。如果 ForkJoinPool 中的某个线程正在等待任务,那么该线程将获取工作队列中的任务并执行它。如果没有等待线程,则 ForkJoinPool 将创建新的工作线程来执行任务。

在任务很多且任务执行时间很短的情况下,线程之间可能会发生竞争问题,例如争夺同步锁、竞争资源等。这种情况下,使用传统的线程池可能会导致线程之间频繁地切换,造成额外的开销,影响性能。而 ForkJoinPool 采用了任务窃取算法,每个线程都有自己的任务队列,线程在处理完自己队列中的任务后,可以从其他线程的任务队列中窃取任务来执行,从该线程队列的末尾开始窃取,以保持任务的顺序性,而不会影响原线程处理任务的顺序。这种方式可以充分利用空闲线程的资源,减少线程之间的竞争,从而提高效率,同时也可以保证任务的正确性。用于 执行任务 ForkJoin

1
2
3
4
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {

}

Executors ## DelegatedExecutorService

代理调用ExecutorService的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
static class DelegatedExecutorService extends AbstractExecutorService {
// 重点类,代理调用
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
}

ScheduledExecutorService

定时任务线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public interface ScheduledExecutorService extends ExecutorService {

/**
* Creates and executes a one-shot action that becomes enabled
* after the given delay.
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
/**
* Creates and executes a ScheduledFuture that becomes enabled after the
* given delay
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
/**
* subsequently with the given
* period; that is executions will commence after
* {@code initialDelay} then {@code initialDelay+period}, then
* {@code initialDelay + 2 * period}, and so on.
固定周期执行
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
/**
间隔 执行
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

/*scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行*/
}

AbstractScheduledEventExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
new Comparator<ScheduledFutureTask<?>>() {
@Override
public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
return o1.compareTo(o2);
}
};

private static final long START_TIME = System.nanoTime();

static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;

long nextTaskId;

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}

return task;
}
}

image-20230419230034673

image-20230328150530763

image-20230328150613875

AbstractEventExecutor — EvenGroup — Netty

EventExecutorGroup

EventExecutor

AbstractEventExecutor

三折都是 Netty的自己实现的 工作线程,用于 IO任务的调度

image-20230723152419543

1
2
3
4
5
6
7
8
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

private final EventExecutorGroup parent;
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

Executors – 线程池创建

线程池类型

用于将多余的任务(runnable)暂存

队列

线程池名称 使用阻塞队列 特点
newFixedThreadPool LinkedBlockingQueue() 1、核心线程数和最大线程数相同 2、由于keepAliveTime设置为0,当线程创建后会一直存在3、由于用的是无界队列所以可能会导致OOM
newSingleThreadExecutor LinkedBlockingQueue() 1、核心线程数和最大线程数都为1单线程 2、无界队列可能导致OOM
newCachedThreadPool SynchronousQueue() 1、核心线程数为0,最大线程数为Integer.MAX_VALUE 2、当没任务时线程存活时间为60秒3、使用的是0大小的队列,所以不存储任务,只做任务转发
newScheduledThreadPool new DelayedWorkQueue() 1、执行周期任务 2、无界队列,可能会导致OOM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class Executors {	
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// ForkJoinPool
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool
(parallelism,
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
//ThreadPoolExecutor
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}


public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}


public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
//DelegatedScheduledExecutorService
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
//ScheduledThreadPoolExecutor
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

image-20230723151844691

ThreadFactory

线程创建工厂

1
2
3
4
5
6
7
8
9
10
11
12
public interface ThreadFactory {

/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}

DefaultThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

PrivilegedThreadFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Thread factory capturing access control context and class loader
*/
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final AccessControlContext acc;
private final ClassLoader ccl;

PrivilegedThreadFactory() {
super();
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
// Calls to getContextClassLoader from this class
// never trigger a security check, but we check
// whether our callers have this permission anyways.
sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);

// Fail fast
sm.checkPermission(new RuntimePermission("setContextClassLoader"));
}
this.acc = AccessController.getContext();
this.ccl = Thread.currentThread().getContextClassLoader();
}

public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}

《其他笔记》

《Java并发编程实战》学习笔记

什么是并发编程

跳出来看全局就是3部分:分工、协作、互斥
钻进去看本质:技术的本质就是背后的理论模型

在这里插入图片描述

并发理论基础

并发编程产生bug的原因

  • 可见性:cpu缓存导致共享变量在多核cpu的情况下,不可见,所以计算会出错
  • 原子性:高级语言中的一个操作可能是操作系统中多个操作,如count + =1 ,其实对应操作系统3步操作「1、count从内存加载到cpu寄存器;2、寄存器执行+1操作;3、将结果写回内存(缓存机制可能回写回cpu缓存而不是内存)」
  • 有序性:编译优化后,代码执行顺序可能发生变化
  • 在这里插入图片描述

Java内存模型,如何解决可见性和有序性问题

方案:按需禁用缓存以及编译优化
方法:volatile,synchronzied,final关键字,六项Happens-Before规则

  • volatile:告诉编译器禁用cpu缓存,必须从内存中读写变量(解决可见性)
  • final: 告诉编译器这个变量不会变,随便进行编译优化(解决编译优化顺序性和原子性)
  • 六项Happens-Before规则:前面一个操作的结果对后续是可见的。(解决可见性问题和编译优化顺序性)

原子性问题

源头:线程切换
解决方案:锁,互斥锁,一把锁可以保护多个资源
加锁本质就是在锁对象的对象头中写入当前线程id
用不同的锁对受保护资源进行精细化管理,能够提升性能。这种锁还有个名字,叫细粒度锁,会有死锁问题
锁的资源:无关联资源可用各自对象,有关联资源可用class对象,class对象是Java虚拟机加载类时创建的,具有唯一性

如何避免死锁

产生的条件:互斥、占有资源A等待资源B且不释放资源A、其他资源不能强行抢占资源、循环等待
解决办法:破坏其中一个条件
破坏占有且等待条件:由一个单例类来分配资源
破坏不可抢占条件:主动释放资源可用Lock,synchroized做不到
破坏循环等待条件:资源进行排序,等待-通知机制

活锁

原因:线程永久阻塞,类似两个人过同一个门,都走左边,然后互相谦让又都走右边,一直循环下去
方案:添加一个随机的等待时间

线程饥饿

原因:优先级低的线程获取锁的机会小,持有锁的线程执行时间长
方案:公平锁,线程等待有先来后到,获取锁的机会平等

性能问题

阿姆达尔定律:

S=(1−p)+np​1​

公式里的 n 可以理解为 CPU 的核数,p 可以理解为并行百分比,那(1-p)就是串行百分比了,也就是我们假设的 5%。我们再假设 CPU 的核数(也就是 n)无穷大,那加速比 S 的极限就是 20。

方案:

  1. 无锁算法和数据结构,例如线程本地存储 (Thread Local Storage, TLS)、写入时复制 (Copy-on-write)、乐观锁等;Java 并发包里面的原子类也是一种无锁的数据结构;Disruptor 则是一个无锁的内存队列,性能都非常好……
  2. 减少锁持有时间,增加并行度,如:细粒度锁,分段锁,读写锁

管程

解决互斥问题思路:将共享变量及其对共享变量的操作封装起来。

在这里插入图片描述

解决同步问题思路:条件变量等待队列

在这里插入图片描述

线程生命周期

在这里插入图片描述

可通过jstack或Java VisualVm可视化工具查看线程栈信息

创建多少线程合适

I/O密集型:最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]
CPU密集型:线程的数量 =CPU 核数

并发工具类

Lock和Condition

Lock和synchoronized的区别:
sync申请资源如果申请不到,线程直接阻塞,无法释放
Lock能够响应中断、支持超时、非阻塞的获取锁

1
2
3
4
5
6
7
// 支持中断的API
void lockInterruptibly() throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();
1234567

Lock如何保证可见性:内部维护了用volatile的state字段
可重入锁:线程可以重复获取同一把锁
公平锁和非公平锁:ReentrantLock 这个类有两个构造函数,一个是无参构造函数,一个是传入 fair 参数的构造函数。fair 参数代表的是锁的公平策略,如果传入 true 就表示需要构造一个公平锁,反之则表示要构造一个非公平锁。公平锁从等待队列里唤醒等待时间最长的线程,非公平锁则不保证。
用锁最佳实践

  1. 永远只在更新对象的成员变量时加锁
  2. 永远只在访问可变的成员变量时加锁
  3. 永远不在调用其他对象的方法时加锁

如何快速实现一个限流器

方案:Semaphore 信号量,像生活中的红绿灯
关键方法:acquire(),release(),new Semaphore(size)

如何快速实现一个完备的缓存

方案:读写锁 ReadWriteLock
什么是读写锁?1、允许多个线程同时读共享变量;2、只允许一个线程写共享变量;3、执行写操作时禁止读。
按需加载:先加读锁,读取,释放读锁,加写锁,写值,释放写锁。
注意:不允许锁升级(读锁变为写锁,导致写锁永久等待,线程阻塞),允许降级(写锁变为读锁)

比读写锁更快的锁

StampedLock:写锁、悲观读锁、乐观读(性能好的原因,无锁算法CAS,不是所有写操作都被阻塞)
注意:使用时一定不要调用中断操作,只有使用悲观读锁和写锁时可以中断
StampedLock读模版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final StampedLock sl = 
new StampedLock();

// 乐观读
long stamp =
sl.tryOptimisticRead();
// 读入方法局部变量
......
// 校验stamp
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
// 读入方法局部变量
.....
} finally {
//释放悲观读锁
sl.unlockRead(stamp);
}
}
//使用方法局部变量执行业务操作
......
1234567891011121314151617181920212223

StampedLock写模版

1
2
3
4
5
6
7
8
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
12345678

如何让线程步调一致

CountDownLatch:设置要同步的线程数量,线程执行完后可调用latch.countDown()来给计数器减一
CyclicBarrier:可以传入回调函数,线程都执行完后可通知,计数器清零后可重置,可循环利用
CompletableFuture:默认使用ForkJoinPool线程池,建议根据不同业务创建线程池,避免阻塞
CompletionStage接口:40个方法,AND聚合关系,OR聚合关系

并发容器

在这里插入图片描述

在这里插入图片描述

原子类

原理:CAS(比较并交换,3个变量,共享变量A,比较值B,新值C)

在这里插入图片描述

如何创建正确的线程池?

线程池是生产者-消费者模型
注意事项:尽量使用有界队列,默认拒绝策略慎用,若处理任务非常重要建议自定义拒绝策略。
创建线程池参数:

在这里插入图片描述

CompletionService 批量执行异步任务

将Executor和BlockingQueue的功能融合,让异步任务的执行结果有序化。快速实现Forking Cluster这样的需求。

Fork/Join

该计算框架核心组件是ForkJoinPool,支持任务窃取,让所有线程工作量基本均衡。Java8的Stream API 的并行流也是以此为基础。需要注意,所有程序共享一个线程池,所以建议使用不同的ForkJoinPool执行不同类型的计算任务。


JUC
http://example.com/2023/06/01/Java精通/JUC/
作者
where
发布于
2023年6月1日
许可协议