`

深入浅出Java并发包—锁机制(转)

阅读更多

前面我们看到了Locksynchronized都能正常的保证数据的一致性(上文例子中执行的结果都是20000000),也看到了Lock的优势,那究竟他们是什么原理来保障的呢?今天我们就来探讨下Java中的锁机制!

Synchronized是基于JVM来保证数据同步的,而Lock则是在硬件层面,依赖特殊的CPU指令实现数据同步的,那究竟是如何来实现的呢?我们一一看来!

一、synchronized的实现方案

synchronized比较简单,语义也比较明确,尽管Lock推出后性能有较大提升,但是基于其使用简单,语义清晰明了,使用还是比较广泛的,其应用层的含义是把任意一个非NULL的对象当作锁。当synchronized作用于方法时,锁住的是对象的实例(this),当作用于静态方法时,锁住的是Class实例,又因为Class的相关数据存储在永久带,因此静态方法锁相当于类的一个全局锁,当synchronized作用于一个对象实例时,锁住的是对应的代码块。在SunHotSpot JVM实现中,其实synchronized锁还有一个名字:对象监视器。

当多个线程一起访问某个对象监视器的时候,对象监视器会将这些请求存储在不同的容器中。

1、  Contention List:竞争队列,所有请求锁的线程首先被放在这个竞争队列中

2、  Entry ListContention List中那些有资格成为候选资源的线程被移动到Entry List

3、  Wait Set:哪些调用wait方法被阻塞的线程被放置在这里

4、  OnDeck:任意时刻,最多只有一个线程正在竞争锁资源,该线程被成为OnDeck

5、  Owner:当前已经获取到所资源的线程被称为Owner

6、  !Owner:当前释放锁的线程

下图展示了他们之前的关系

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 ContentionList

并不是真正意义上的一个队列。仅仅是一个虚拟队列,它只有Node以及对应的Next指针构成,并没有Queue的数据结构。每次新加入Node会在队头进行,通过CAS改变第一个节点为新增节点,同时新增阶段的next指向后续节点,而取数据都在队列尾部进行。

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 JVM

每次从队列的尾部取出一个数据用于锁竞争候选者(OnDeck),但是并发情况下,ContentionList会被大量的并发线程进行CAS访问,为了降低对尾部元素的竞争,JVM会将一部分线程移动到EntryList中作为候选竞争线程。Owner线程会在unlock时,将ContentionList中的部分线程迁移到EntryList中,并指定EntryList中的某个线程为OnDeck线程(一般是最先进去的那个线程)。Owner线程并不直接把锁传递给OnDeck线程,而是把锁竞争的权利交个OnDeckOnDeck需要重新竞争锁。这样虽然牺牲了一些公平性,但是能极大的提升系统的吞吐量,在JVM中,也把这种选择行为称之为“竞争切换”。

OnDeck线程获取到锁资源后会变为Owner线程,而没有得到锁资源的仍然停留在EntryList中。如果Owner线程被wait方法阻塞,则转移到WaitSet队列中,直到某个时刻通过notify或者notifyAll唤醒,会重新进去EntryList中。

处于ContentionListEntryListWaitSet中的线程都处于阻塞状态,该阻塞是由操作系统来完成的(Linux内核下采用pthread_mutex_lock内核函数实现的)。该线程被阻塞后则进入内核调度状态,会导致系统在用户和内核之间进行来回切换,严重影响锁的性能。为了缓解上述性能问题,JVM引入了自旋锁。原理非常简单,如果Owner线程能在很短时间内释放锁资源,那么哪些等待竞争锁的线程可以稍微等一等(自旋)而不是立即阻塞,当Owner线程释放锁后可立即获取锁,进而避免用户线程和内核的切换。但是Owner可能执行的时间会超过设定的阈值,争用线程在一定时间内还是获取不到锁,这是争用线程会停止自旋进入阻塞状态。基本思路就是先自旋等待一段时间看能否成功获取,如果不成功再执行阻塞,尽可能的减少阻塞的可能性,这对于占用锁时间比较短的代码块来说性能能大幅度的提升!

但是有个头大的问题,何为自旋?其实就是执行几个空方法,稍微等一等,也许是一段时间的循环,也许是几行空的汇编指令,其目的是为了占着CPU的资源不释放,等到获取到锁立即进行处理。但是如何去选择自旋的执行时间呢?如果自旋执行时间太长,会有大量的线程处于自旋状态占用CPU资源,进而会影响整体系统的性能。因此自旋的周期选的额外重要!

JVM对于自旋周期的选择,基本认为一个线程上下文切换的时间是最佳的一个时间,同时JVM还针对当前CPU的负荷情况做了较多的优化

1、  如果平均负载小于CPUs则一直自旋

2、  如果有超过(CPUs/2)个线程正在自旋,则后来线程直接阻塞

3、  如果正在自旋的线程发现Owner发生了变化则延迟自旋时间(自旋计数)或进入阻塞

4、  如果CPU处于节电模式则停止自旋

5、  自旋时间的最坏情况是CPU的存储延迟(CPU A存储了一个数据,到CPU B得知这个数据直接的时间差)

6、  自旋时会适当放弃线程优先级之间的差异

Synchronized在线程进入ContentionList时,等待的线程就通过自旋先获取锁,如果获取不到就进入ContentionList,这明显对于已经进入队列的线程是不公平的,还有一个不公平的事情就是自旋获取锁的线程还可能直接抢占OnDeck线程的锁资源。

JVM6以后还引入了一种偏向锁,主要用于解决无竞争下面锁的性能问题。我们首先来看没有这个会有什么样子的问题。

现在基本上所有的锁都是可重入的,即已经获取锁的线程可以多次锁定/解锁监视对象,但是按照之前JVM的设计,每次加锁解锁都采用CAS操作,而CAS会引发本地延迟(下面会讲原因),因此偏向锁希望线程一旦获取到监视对象后,之后让监视对象偏向这个锁,进而避免多次CAS操作,说白了就是设置了一个变量,发现是这个线程过来的就避免再走加锁解锁流程。

CAS为什么会引发本地延迟呢?这要从多核处(SMP)理架构说起(前面有提到过--JVM内存模型),下图基本上表明了多核处理的架构

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 多核

CPU会共享一条系统总线,靠总线和主存通讯,但是每个CPU又有自己的一级缓存,而CAS是一条原子指令,其作用是让CPU比较,如果相同则进行数据更新,而这些是基于硬件实现的(JVM只是封装了硬件的汇编调用,AtomicInteger其实是通过调用这些封装后的接口实现的)。多核运算时,由于线程切换,很有可能第二次取值是在另外一核CPU上执行的。假设Core1Core2把对应的某个值加载到自己的一级缓存时,某个时刻,core1更新了这个数据并通过总线通知主存,此时core2的一级缓存中的数据就失效了,他需要从主存中重新加载一次到一级缓存中,大家通过总线通讯被称之为一致性流量,总线的通讯能力有限,当缓存一致性流量过大时,总线会成为瓶颈,而当Core1Core2的数据再次一致时,被称为缓存一致性!

CAS要保证数据的一致性,恰好会引发比较多的一致性流量,如果有很多线程共享一个对象,当某个线程成功执行一次CAS时会引发总线风暴,这就是本地延迟,而偏向锁就是为了消除CAS,降低Cache一致性流量!

当然并不是所有的CAS都会引发总线风暴,这和Cache一致性协议有关系的。但是偏向锁的引入却带来了另外一个问题,在很多线程竞争使用中,如果一个线程持有偏向锁,另外一个线程想争用偏向对象,拥有者想释放这个偏向锁,释放会带来额外的性能开销,但是总体来说偏向锁带来的好处还是大于CAS的代价的。

 

二、Lock的实现

synchronized不同的是,Lock书纯Java实现的,与底层的JVM无关。在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLockReadWriteLock(实现类ReentrantReadWriteLock),其实现都依赖java.util.concurrent.AbstractQueuedSynchronizer类(简称AQS),实现思路都大同小异,因此我们以ReentrantLock作为讲解切入点。

分析之前我们先来花点时间看下AQSAQS是我们后面将要提到的CountDownLatch/FutureTask/ReentrantLock/RenntrantReadWriteLock/Semaphore的基础,因此AQS也是LockExcutor实现的基础。它的基本思想就是一个同步器,支持获取锁和释放锁两个操作。

获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。

while(synchronization state does not allow acquire){

    enqueue current thread if not already queued;

    possibly block current thread;

}

dequeue current thread if it was queued;

释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话,就唤醒队列中的一个或者更多线程。

update synchronization state;

if(state may permit a blocked thread to acquire)

    unlock one or more queued threads;

要支持上面两个操作就必须有下面的条件

1、  状态位必须是原子操作的

2、  阻塞和唤醒线程

3、  一个有序的队列,用于支持锁的公平性

怎么样才能满足这几个条件呢?

1、  原子操作状态位,前面我们已经提到了,实际JDK中也是通过一个32bit的整数位进行CAS操作来实现的。

2、  阻塞和唤醒,JDK1.5之前的API中并没有阻塞一个线程,然后在将来的某个时刻唤醒它(wait/notify是基于synchronized下才生效的,在这里不算),JDK5之后利用JNILockSupport 这个类中实现了相关的特性!

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 

3、  有序队列:在AQS中采用CLH队列来解决队列的有序问题。

我们来看下ReentrantLock的调用过程

经过源码分析,我们看到ReentrantLock把所有的Lock都委托给Sync类进行处理,该类继承自AQS,其类关系图如下

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 其中

Sync又有两个final static的子类NonfairSyncFairSync用于支持非公平锁和公平锁。我们先来挑一个看下对应Reentrant.lock()的调用过程(默认为非公平锁)

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 这些模版很难让我们直观的看到整个调用过程,但是通过上面的过程图和

AbstractQueuedSynchronizer的注释可以看出,AbstractQueuedSynchronizer抽象了大多数Lock的功能,而只把tryAcquire(int)委托给子类进行多态实现。tryAcquire用于判断对应线程事都能够获取锁,无论成功与否,AbstractQueuedSynchronizer都将处理后面的流程。

简单来讲,AQS会把所有请求锁的线程组成一个CLH的队列,当一个线程执行完毕释放锁(Lock.unlock())的时候,AQS会激活其后继节点,正在执行的线程不在队列当中,而那些等待的线程全部处于阻塞状态,经过源码分析,我们可以清楚的看到最终是通过LockSupport.park()实现的,而底层是调用sun.misc.Unsafe.park()本地方法,再进一步,HotSpotLinux中中通过调用pthread_mutex_lock函数把线程交给系统内核进行阻塞。其运行示意图如下

 

深入浅出Java并发包—锁机制(一) - 一线天色 天宇星辰 - 一线天色 天宇星辰

 与

synchronized相同的是,这个也是一个虚拟队列,并不存在真正的队列示例,仅存在节点之前的前后关系。(注:原生的CLH队列用于自旋锁,JUC将其改造为阻塞锁)。和synchronized还有一点相同的是,就是当获取锁失败的时候,不是立即进行阻塞,而是先自旋一段时间看是否能获取锁,这对那些已经在阻塞队列里面的线程显然不公平(非公平锁的实现,公平锁通过有序队列强制线程顺序进行),但会极大的提升吞吐量。如果自旋还是获取失败了,则创建一个节点加入队列尾部,加入方法仍采用CAS操作,并发对队尾CAS操作有可能会发生失败,AQS是采用自旋循环的方法,知道CAS成功!下面我们来看下锁的实现细节!

锁的实现依赖与lock()方法,Lock()方法首先是调用acquire(int)方法,不管是公平锁还是非公平锁

 

public final void acquire(int arg) {

         if (!tryAcquire(arg) &&

             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

             selfInterrupt();

     }

 

Acquire()方法默认首先调用tryAcquire(int)方法,而此时公平锁和不公平锁的实现就不一样了。

1Sync.NonfairSync.TryAcquire(非公平锁)

nonfairTryAcquire方法是lock方法间接调用的第一个方法,每次调用都会首先调用这个方法,我们来看下对应的实现代码:

 

final boolean nonfairTryAcquire(int acquires) {

        final Thread current = Thread.currentThread();

        int c = getState();

        if (c == 0) {

            if (compareAndSetState(0, acquires)) {

                setExclusiveOwnerThread(current);

                return true;

            }

        }

        else if (current == getExclusiveOwnerThread()) {

            int nextc = c + acquires;

            if (nextc < 0) // overflow

                throw new Error("Maximum lock count exceeded");

            setState(nextc);

            return true;

        }

        return false;

    }

 

该方法首先会判断当前线程的状态,如果c==0 说明没有线程正在竞争锁。(反过来,如果c!=0则说明已经有其他线程已经拥有了锁)。如果c==0,则通过CAS将状态设置为acquires(独占锁的acquires1),后续每次重入该锁都会+1,每次unlock都会-1,当数据为0时则释放锁资源。其中精妙的部分在于:并发访问时,有可能多个线程同时检测到c0,此时执行compareAndSetState(0, acquires))设置,可以预见,如果当前线程CAS成功,则其他线程都不会再成功,也就默认当前线程获取了锁,直接作为running线程,很显然这个线程并没有进入等待队列。如果c!=0,首先判断获取锁的线程是不是当前线程,如果是当前线程,则表明为锁重入,继续+1,修改state的状态,此时并没有锁竞争,也非CAS,因此这段代码也非常漂亮的实现了偏向锁。

 

2Sync.FairSync.TryAcquire(公平锁)

我们直接来看代码

protected final boolean tryAcquire(int acquires) {

         final Thread current = Thread.currentThread();

         int c = getState();

         if (c == 0) {

             if (isFirst(current) &&

                 compareAndSetState(0, acquires)) {

                 setExclusiveOwnerThread(current);

                 return true;

             }

         }

         else if (current == getExclusiveOwnerThread()) {

             int nextc = c + acquires;

             if (nextc < 0)

                 throw new Error("Maximum lock count exceeded");

             setState(nextc);

             return true;

         }

         return false;

     }

和明细我们可以看出,公平锁就比不公平锁多了一个判断头结点的方法,就是采用此方法来保证锁的公平性。

3AbstractQueuedSynchronizer.addWaiter

tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥作用了。一般情况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量

static final Node EXCLUSIVE = null//独占节点模式

 

static final Node SHARED = new Node(); //共享节点模式

addWaiter(mode)中的mode就是节点模式,也就是共享锁还是独占锁模式。添加的节点是当前线程。(注:ReentrantLock是独占锁模式),我们来看下对应的实现代码:

    private Node addWaiter(Node mode) {

        Node node = new Node(Thread.currentThread(), mode);

        // Try the fast path of enq; backup to full enq on failure

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        enq(node);

        return node;

    }

这块代码并不复杂,如果当前队尾存在元素(tail!=null),则通过CAS添加当前线程到队尾,如果队尾为空或者CAS失败,则通过enq方法设置tail。我们来看下enq的代码

private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            if (t == null) { // Must initialize

                Node h = new Node(); // Dummy header

                h.next = node;

                node.prev = h;

                if (compareAndSetHead(h)) {

                    tail = node;

                    return h;

                }

            }

            else {

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

    }

该方法就是循环调用CAS,即使有高并发的场景,无限循环将会最终成功把当前线程追加到队尾(或设置队头)。总而言之,addWaiter的目的就是通过CAS把当前现在追加到队尾,并返回包装后的Node实例。

把线程要包装为Node对象的主要原因,除了用Node构造供虚拟队列外,还用Node包装了各种线程状态,这些状态被精心设计为一些数字值:

1)、  SIGNAL(-1) :线程的后继线程正/已被阻塞,当该线程releasecancel时要重新这个后继线程(unpark)

2)、  CANCELLED(1):因为超时或中断,该线程已经被取消

3)、  CONDITION(-2):表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞

4)、  PROPAGATE(-3):传播共享锁

5)、  00代表无状态

3AbstractQueuedSynchronizer.acquireQueued(进行阻塞)

acquireQueued的主要作用是把已经追加到队列的线程节点(addWaiter方法返回值)进行阻塞,但阻塞前又通过tryAccquire重试是否能获得锁,如果重试成功能则无需阻塞,直接返回。下面我们来看以下它对应的源码信息

   final boolean acquireQueued(final Node node, int arg) {

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null// help GC

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

    }

仔细看看这个方法是个无限循环,感觉如果p == head && tryAcquire(arg)条件不满足循环将永远无法结束,当然不会出现死循环,奥秘在于parkAndCheckInterrupt会把当前线程挂起,从而阻塞住线程的调用栈。我们来看下他的实现方法:

private final boolean parkAndCheckInterrupt() {

        LockSupport.park(this);

        return Thread.interrupted();

    }

如前面所述,LockSupport.park最终把线程交给系统(Linux)内核进行阻塞。当然也不是马上把请求不到锁的线程进行阻塞,还要检查该线程的状态,比如如果该线程处于Cancel状态则没有必要,具体的检查在shouldParkAfterFailedAcquire中:

private static boolean shouldParkAfterFailedAcquire(Node pred, Nodenode) {

        int ws = pred.waitStatus;

        if (ws == Node.SIGNAL)

            /*

             * This node has already set status asking a release

             * to signal it, so it can safely park

             */

            return true;

        if (ws > 0) {

            /*

             * Predecessor was cancelled. Skip over predecessors and

             * indicate retry.

             */

        do {

       node.prev = pred = pred.prev;

        } while (pred.waitStatus > 0);

        pred.next = node;

        } else {

            /*

             * waitStatus must be 0 or PROPAGATE. Indicate that we

             * need a signal, but don't park yet. Caller will need to

             * retry to make sure it cannot acquire before parking.

             */

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

        }

        return false;

    }

检查原则如下:

1、如果前继节点的waitStatussignal,则说明前面的节点都还灭有获取到锁,此时当前线程需要阻塞,直接返回true

2、如果前继节点waitStatus>0,说明前继节点已经被取消,则重新设置当前节点的前继节点,返回false,之后无限循环直到第一步状态返回true,导致线程阻塞

3、如果前继节点waitStatus小于0而且不等于-1(signal),则通过CAS设置前继节点额外isignal,并返回false,之后无限循环直到步骤1返回true,线程阻塞。

请求锁不成功的线程会被挂起在acquireQueued方法的第12行,12行以后的代码必须等线程被解锁锁才能执行,假如被阻塞的线程得到解锁,则执行第13行,即设置interrupted = true,之后又进入无限循环。

从无限循环的代码可以看出,并不是得到解锁的线程一定能获得锁,必须在第6行中调用tryAccquire重新竞争,非公平锁中有可能被新加入的线程获取到,从而导致刚刚被唤醒的线程再次阻塞;公平锁通过判断当前节点是否是头结点来保证锁的公平性。上面的代码我们还可以看到,因为每次第一个被解锁的是头结点,因此一般p==head的判断都会成功。解锁相对比较简单,主要体现在AbstractQueuedSynchronizer.releaseSync.tryRelease方法中:

   public final boolean release(int arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

}

    protected final boolean tryRelease(int releases) {

        int c = getState() - releases;

        if (Thread.currentThread() != getExclusiveOwnerThread())

            throw new IllegalMonitorStateException();

        boolean free = false;

        if (c == 0) {

            free = true;

            setExclusiveOwnerThread(null);

        }

        setState(c);

        return free;

    }

这个逻辑也比较简单:

1.判断持有锁的线程是否是当前线程,如果不是就抛出IllegalMonitorStateExeception(),因为一个线程是不能释放另一个线程持有的锁(否则锁就失去了意义)。否则进行2

2.AQS状态位减少要释放的次数(对于独占锁而言总是1),如果剩余的状态位0(也就是没有线程持有锁),那么当前线程就是最后一个持有锁的线程,清空AQS持有锁的独占线程。进行3

3.将剩余的状态位写回AQS,如果没有线程持有锁就返回true,否则就是false

从上面我们可以知道,这里c==0决定了是否完全释放了锁。由于ReentrantLock是可重入锁,因此同一个线程可能多重持有锁,那么当且仅当最后一个持有锁的线程释放锁是才能将AQS中持有锁的独占线程清空,这样接下来的操作才需要唤醒下一个需要锁的AQS节点(Node),否则就只是减少锁持有的计数器,并不能改变其他操作。

tryRelease操作成功后(也就是完全释放了锁),release操作才能检查是否需要唤醒下一个继任节点。这里的前提是AQS队列的头结点需要锁(waitStatus!=0),如果头结点需要锁,就开始检测下一个继任节点是否需要锁操作。

上文说道acquireQueued操作完成后(拿到了锁),会将当前持有锁的节点设为头结点,所以一旦头结点释放锁,那么就需要寻找头结点的下一个需要锁的继任节点,并唤醒它。我们来看下对应的实现代码:

private void unparkSuccessor(Node node) {

        /*

         * If status is negative (i.e., possibly needing signal) try

         * to clear in anticipation of signalling. It is OK if this

         * fails or if status is changed by waiting thread.

         */

        int ws = node.waitStatus;

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

 

        /*

         * Thread to unpark is held in successor, which is normally

         * just the next node.  But if cancelled or apparently null,

         * traverse backwards from tail to find the actual

         * non-cancelled successor.

         */

        Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

    }

对比对应的代码我们可以看出,一旦头结点的后继结点被唤醒,那么后继结点就尝试去获取锁,如果获取成功就将头结点设置为自身,并将头结点的前任节点清空。

final boolean acquireQueued(final Node node, int arg) {

        try {

            boolean interrupted = false;

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null// help GC

                    return interrupted;

                }

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } catch (RuntimeException ex) {

            cancelAcquire(node);

            throw ex;

        }

    }

对比lockunlock是相当比较简单的,主要是释放需要响应的资源,并唤醒AQS队列中有效的后继结点,这样就试图以请求的顺序获取锁资源了。

对比公平锁和不公平锁,其实就是在获取锁的时候有区别,释放锁的时候都是一样的。非公平锁总是尝试看当前有没有线程持有锁,如果没有则使用现有的线程去抢占锁资源,但是一旦抢占失败,也就和公平锁一样,进入阻塞队列老老实实排队去了,也就是说公平锁和非公平锁只有在进入AQSCLH队列之前有区别,后面都是按照队列的顺序请求锁资源的。

 

由锁衍生的下一个对象是条件变量,这个对象的存在很大程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。

条件(也称为条件队列 或条件变量)为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联。等待提供一个条件的主要属性是:以原子方式 释放相关的锁,并挂起当前线程,就像Object.wait 做的那样。

上述API说明表明条件变量需要与锁绑定,而且多个Condition需要绑定到同一锁上。前面的Lock中提到,获取一个条件变量的方法是Lock.newCondition()

Condition  Object 监视器方法(waitnotify  notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 setwait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。await对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法。

每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。我们通过一个生产者消费者模型来看一下相关的实现!

package com.yhj.lock;

 

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.ReentrantLock;

/**

 * @Described 生产者消费者模型

 * @Author YHJ create at 2013-6-下午09:15:27

 */

public class ProductQueue<T> {

 

    private final T[] items//队列存储区

 

    private final Lock lock = new ReentrantLock(); //独占锁

 

    private Condition notFull = lock.newCondition(); //条件

 

    private Condition notEmpty = lock.newCondition();

 

    private int headtailcount//下标

 

    @SuppressWarnings("unchecked")

    public ProductQueue(int maxSize) {

         items = (T[]) new Object[maxSize];

     }

 

    /**

     * 默认10个元素

     * @Constructors

     * @Author YHJ create at 2013-6-下午09:15:21

     */

    public ProductQueue() {

         this(10);

     }

 

    /**

     * 放置数据

     * @param t

     * @throws InterruptedException

     * @Author YHJ create at 2013-6-下午09:15:27

     */

    public void put(T t) throws InterruptedException {

         lock.lock();

         try {

             while (count == getCapacity()) {

                 notFull.await();

             }

             items[tail] = t;

             if (++tail == getCapacity()) {

                 tail = 0;

             }

             ++count;

             notEmpty.signalAll();

         } finally {

             lock.unlock();

         }

     }

 

    /**

     * 取数据

     * @return

     * @throws InterruptedException

     * @Author YHJ create at 2013-6-下午09:18:36

     */

    public T take() throws InterruptedException {

         lock.lock();

         try {

             while (count == 0) {

                 notEmpty.await();

             }

             T ret = items[head];

             items[head] = null;//GC

             if (++head == getCapacity()) {

                 head = 0;

             }

             --count;

             notFull.signalAll();

             return ret;

         } finally {

             lock.unlock();

         }

     }

 

    /**

     * 获取容量(队列)

     * @return

     * @Author YHJ create at 2013-6-下午09:18:45

     */

    public int getCapacity() {

         return items.length;

     }

 

    /**

     * 获取元素数目

     * @return

     * @Author YHJ create at 2013-6-下午09:19:04

     */

    public int size() {

         lock.lock();

         try {

             return count;

         } finally {

             lock.unlock();

         }

     }

 

}

在这个例子中消费take()需要 队列不为空,如果为空就挂起(await()),直到收到notEmpty的信号;生产put()需要队列不满,如果满了就挂起(await()),直到收到notFull的信号。可能有人会问:如果一个线程lock()对象后被挂起还没有unlock,那么另外一个线程就拿不到锁了(lock()操作会挂起),那么就无法通知(notify)前一个线程,这样岂不是“死锁”了?

是这样子么?当然不是,如果是这样有这么大的问题,锁性能再好又有什么用呢?我们来看下await方法的代码:

        public final void await() throws InterruptedException {

            if (Thread.interrupted())

                throw new InterruptedException();

            Node node = addConditionWaiter();

            long savedState = fullyRelease(node);

            int interruptMode = 0;

            while (!isOnSyncQueue(node)) {

                LockSupport.park(this);

                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

                    break;

            }

            if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)

                interruptMode = REINTERRUPT;

            if (node.nextWaiter != null)

                unlinkCancelledWaiters();

            if (interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

        }

  final boolean isOnSyncQueue(Node node) {

        if (node.waitStatus == Node.CONDITION || node.prev == null)

            return false;

        if (node.next != null// If has successor, it must be on queue

            return true;

        return findNodeFromTail(node);

    }

很显然,执行await方法的时候,首先将当前节点加入Condition队列,然后会做一次锁的释放(如果不释放其他线程就会等待而无法获取锁,进而更没有办法notify此条件,引发死锁),然后自旋尝试挂起当前线程(LockSupport.park(this);),直到有线程conditionsignal来解除(被唤醒继续操作或被取消,如果被取消则直接剔除),如果被唤醒而且没有被取消的话,尝试重新进入锁获取的等待队列(acquireQueued(node, savedState)),尝试成功后从Condition队列中删除(再次拿到了之前的锁对象)!

这里再回头介绍Condition的数据结构。我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await()串联起来组成一个FIFO的队列。所以当某一个节点被唤醒的时候,需要进行一次队列关系重建(unlinkCancelledWaiters())。

await()清楚了,现在再来看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要将Condition.await()FIFO队列中第一个Node/全部Node唤醒。尽管所有Node可能都被唤醒,但是要知道的是仍然只有一个线程能够拿到锁,其它没有拿到锁的线程仍然需要自旋等待(acquireQueued)。我们来看下相关的代码实现:

public final void signal() {

            if (!isHeldExclusively())

                throw new IllegalMonitorStateException();

            Node first = firstWaiter;

            if (first != null)

                doSignal(first);

        }

private void doSignal(Node first) {

            do {

                if ( (firstWaiter = first.nextWaiter) == null)

                    lastWaiter = null;

                first.nextWaiter = null;

            } while (!transferForSignal(first) &&

                     (first = firstWaiter) != null);

        }

public final void signalAll() {

            if (!isHeldExclusively())

                throw new IllegalMonitorStateException();

            Node first = firstWaiter;

            if (first != null)

                doSignalAll(first);

        }

private void doSignalAll(Node first) {

            lastWaiter = firstWaiter  = null;

            do {

                Node next = first.nextWaiter;

                first.nextWaiter = null;

                transferForSignal(first);

                first = next;

            } while (first != null);

        }

上面的代码很容易看出来,signal就是唤醒Condition队列中的第一个非CANCELLED节点线程,而signalAll就是唤醒所有非CANCELLED节点线程。当然了遇到CANCELLED线程就需要将其从FIFO队列中剔除。

final boolean transferForSignal(Node node) {

        /*

         * If cannot change waitStatus, the node has been cancelled.

         */

        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

            return false;

        /*

         * Splice onto queue and try to set waitStatus of predecessor to

         * indicate that thread is (probably) waiting. If cancelled or

         * attempt to set waitStatus fails, wake up to resync (in which

         * case the waitStatus can be transiently and harmlessly wrong).

         */

        Node p = enq(node);

        int c = p.waitStatus;

        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))

            LockSupport.unpark(node.thread);

        return true;

    }

 

上面就是唤醒一个await()线程的过程,根据前面介绍的,如果要unpark线程,并使线程拿到锁,那么就需要线程节点进入AQS的队列。所以可以看到在LockSupport.unpark之前调用了enq(node)操作,将当前节点加入到AQS队列。

分享到:
评论

相关推荐

    深入浅出_Java并发工具包原理讲解

    深入了解Java的并发开发工具包的基本原理,有兴趣的可以学习下。

    Java 并发核心编程

    自从java创建以来就已经支持并发的理念,如线程和锁。这篇指南主要是为帮助java多线程开发人员理解并发的核心概念以及如何应用这些理念。本文的主题是关于具有java语言风格的Thread、synchronized、volatile,以及...

    java并发编程实践笔记资料.pdf

    java并发编程实践笔记资料.pdf

    oracle并发和锁机制

    oracle并发和锁机制,oracle并发锁,oracle锁

    java并发工具包详解

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    JAVA并发编程艺术 高清pdf

    JAVA并发编程艺术 高清pdf : 1.并发变成的挑战 2. java并发机制的底层实现原理 3. java 内存模型 4. java并发编程基础 5.java中的锁。。。。。。。

    Java并发工具包

    Java并发工具包java_util_concurrent_user_guide_cn。

    java并发编程2

    java并发编程pdf文档第二部分:Java并发编程实战.pdf、Java多线程编程核心技术.pdf、实战Java高并发程序设计.pdf

    JAVA并发编程实践 .pdf

    《Java并发编程实战》深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及验证...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java并发编程实战源码,java并发编程实战pdf,Java

    java并发编程实战源码 附有本书所有源码,maven 导入 eclipse或idea

    Netty 高并发深入浅出学习高并发服务器

    Netty 高并发深入浅出学习高并发服务器Netty 高并发深入浅出学习高并发服务器

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    《java 并发编程实战高清PDF版》

    深入讲解java并发编程技术,多线程、锁以及java内存模型等

    java并发编程艺术

    java并发编程艺术java并发编程艺术java并发编程艺术java并发编程艺术java并发编程艺术

    java并发编程实战中文加英文版加源码

    JAVA并发编程实践中文版 英文版 原书源码 带书签 java_concurrency_in_practice.pdf 英文版还是不错的,但是中文版的译者典型的没有技术功底,介绍上说什么专家, 翻译的非常差劲,有些句子都不通顺,都不知道自己去...

    Java并发编程实战2019.zip

    Java并发编程实战,第1章 简介,第...第10章 避免活跃性危险 第11章 性能与可伸缩性 第12章 并发程序的测试 第13章 显式锁 第14章 构建自定义的同步工具 第15章 原子变量与非阻塞同步机制 第16章 Java内存模型

    JAVA并发编程实践 带书签

    本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册。书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险、构造线程安全的类及验证线程安全的规则...

    Java并发编程实战

    1.1 并发简史 1.2 线程的优势 1.2.1 发挥多处理器的强大能力 1.2.2 建模的简单性 1.2.3 异步事件的简化处理 1.2.4 响应更灵敏的用户界面 1.3 线程带来的风险 1.3.1 安全性问题 1.3.2 活跃性问题 1.3.3 ...

    java并发锁面试知识

    java中的乐观锁与悲观锁,synchronized与ReentrantLock重入锁的说明与比较

Global site tag (gtag.js) - Google Analytics