AQS

概述

AQS,全称Abstract Queued Synchronizer,抽象队列同步器,这个类在 java.util.concurrent.locks 包下面。Java并发包下面很多API都是基于AQS来实现加锁解锁等功能的,AQSJava并发包的基础类。AQS为构建锁和同步器提供了一些通用功能的实现,因此,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLockSemaphore,其他的诸如 ReentrantReadWriteLockSynchronousQueue等等皆是基于 AQS 的。

AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制

1
2
3
4
5
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable{

}

子类主要实现这样一些方法(默认抛出UnsupportedOperationException

1
2
3
4
5
6
7
8
9
10
//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int)
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int)
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int)
//共享方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryReleaseShared(int)
//该线程是否正在独占资源。只有用到condition才需要去实现它。
protected boolean isHeldExclusively()

关键变量

AQS内部有一个核心变量叫state,是int类型,代表同步状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 共享变量,使用volatile修饰保证线程可见性
private volatile int state;

//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

State含义千变万化

它的含义不是一成不变的,而是根据具体实现类的不同而表示不同的含义,例如:

  • Semaphore里,state表示许可证的数量,如果他的值为10,就代表有10张许可证,某个线程拿走一个许可证后,state=9
  • CountDownLatch里,state表示“需要倒数”的数量,一开始我们假设它设置为5,当每次调用CountDown方法时,state就会减1,一直减到0的时候就代表这个门闩被打开
  • ReentrantLock里,state表示的是锁的占有情况。最开始是0,表示没有任何线程占有锁;如果state变成1,则就代表这个锁已经被某个线程所持有了

ReentrantLock是可重入的,同一个线程可以再次拥有这把锁就叫重入。如果这个锁被同一个线程多次获取,那么state就会逐渐的往上加,state的值表示重入的次数,释放的时候会递减。所以,state=0表示锁不被任何线程所占有,代表这个锁当前释放状态的,其他线程此时就可以来尝试获取了。

如何安全修改State的值

因为state是会被多个线程共享的,会被并发地修改,所以所有去修改state的方法都必须要保证state是线程安全的。可是state本身它仅仅是被volatile本身并不足以保证线程安全,在AQS内使用compareAndSetStatesetState来完成修改

1
2
3
protected final boolean compareAndSetState(int except,int update){
return unsafe.compareAndSwapInt(this,stateOffset,except,update);
}

其利用了Unsafe里面的CAS操作,利用CPU指令的原子性保证了这个操作的原子性。同时对于volatile变量的直接更新,可以做到无锁操作

FIFO队列

FIFO队列,即先进先出队列,这个队列最主要的作用是存储等待的线程。假设很多线程都要同时抢锁,那么大部分的线程是抢不到的,那怎么处理这些抢不到锁的线程呢?需要有一个队列来存放、管理它们。所以AQS的一大功能就是充当线程的“排队管理器”

队列内部为双向链表,在队列中,分别用headtail来表示头节点和尾节点,两者在初始化的时候指向了一个空节点。头节点可以理解为“当前持有锁的线程”,而在头节点之后的线程就会被阻塞了,它们等待被唤醒,唤醒也是AQS负责操作的

注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才成为阻塞队列

获取方法

比如ReentrantLock中的lock方法就是其中一个“获取方法”,在执行时,如果发现state!=0且当前线程不是持有锁的的线程,那么就代表这个锁已经被其他线程所持有。这个是由就获取不了锁,于是就让该线程进入阻塞状态

再比如,Semaphore中的acquire方法就是其中一个获取方法,作用是获取许可证,此时能不能获取到这个许可证也取决于state的值。如果state值是正数,那么代表还有剩余的许可证,数量足够的话就可以成功获取;

再比如,CountDownLatch获取方法就是await方法,作用就是“等待,直到倒数结束”。执行await的时候会判断state的值,如果state!=0,线程就陷入阻塞状态,直到其他线程执行倒数方法把state减为0,此时就代表这个门闩放开了,所以之前阻塞的线程就会被唤醒

获取方法在不同的类中代表不同的含义,但往往和state值相关,也经常会让线程进入阻塞状态,这个也同样证明了state状态在AQS类中的重要地位

释放方法

释放方法是站在获取方法的对立面的,通常和刚才的获取方法配合使用。我们刚才讲的获取方法可能会让线程阻塞,比如说获取不到所锁就会让线程进入阻塞状态,但是释放方法通常是不会阻塞线程的

比如在Semaphore信号量里面,释放锁是release方法,release()方法的作用就是去释放一个许可证,会让state+1

再比如,在CountDownLatch里面,释放就是countDown方法,作用是倒数一个数,让state-1.所以也可以看出,在不同的实现类里面,对于state的操作是截然不同的,需要由每一个写作类根据自己的逻辑去具体实现

常见同步工具类

Semaphore

Semaphore有两种模式

  • 公平模式:调用aquire()方法的顺序就是获取许可证的顺序,遵循FIFO
  • 非公平模式:抢占式的

构造方法:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

原理

  • Semaphore 是共享锁的一种实现,它默认构造 AQSstate 值为 permits,你可以将 permits 的值理解为许可证的数量,只有拿到许可证的线程才能执行。

  • 如果可以获取成功的话(state > 0 ),会尝试使用 CAS 操作去修改 state 的值 state=state-1。如果获取失败则会创建一个 Node 节点加入等待队列,挂起当前线程。

  • 以无参 release 方法为例,调用semaphore.release(); ,线程尝试释放许可证,并使用 CAS 操作去修改 state 的值 state=state+1。释放许可证成功之后,同时会唤醒等待队列中的一个线程。被唤醒的线程会重新尝试去修改 state 的值 state=state-1 ,如果 state > 0 则获取令牌成功,否则重新进入等待队列,挂起线程。

案例

线程1和2交替打印abc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class testSemaphore {
public static void main(String[] args) {
Semaphore semaphoreA = new Semaphore(1);
Semaphore semaphoreB=new Semaphore(0);
ExecutorService service = Executors.newFixedThreadPool(2);
service.submit(()->{
for (int i=0;i<3;i++){
try {
semaphoreA.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程1:"+(char)('a'+i));
semaphoreB.release();

}
});
service.submit(()->{
for (int i=0;i<3;i++){
try {
semaphoreB.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("线程2:"+(char)('a'+i));
semaphoreA.release();
}
});
service.shutdown();
}

/* 输出
线程1:a
线程2:a
线程1:b
线程2:b
线程1:c
线程2:c
*/

CountDownLatch

  • CountDownLatch 允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

  • CountDownLatch 是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当 CountDownLatch 使用完毕后,它不能再次被使用。

CycliBarrier

  • CyclicBarrierCountDownLatch 非常类似,它也可以实现线程间的技术等待,但是它的功能比 CountDownLatch 更加复杂和强大。主要应用场景和 CountDownLatch 类似。
  • 可以再次被使用