微信号:importnew

介绍:伯乐在线旗下账号,专注Java技术分享,包括Java基础技术、进阶技能、架构设计和Java技术领域动态等.

Java 并发之 Condition 的实现分析

2018-10-10 12:00 ImportNew

(点击上方公众号,可快速关注)


来源:zy_lebron ,

youngforzy.top/2017/12/01/Java并发之Condition的实现分析/


一、Condition的概念


介绍


回忆 synchronized 关键字,它配合 Object 的 wait()、notify() 系列方法可以实现等待/通知模式。


对于 Lock,通过 Condition 也可以实现等待/通知模式。


Condition 是一个接口。

Condition 接口的实现类是 Lock(AQS)中的 ConditionObject。

Lock 接口中有个 newCondition() 方法,通过这个方法可以获得 Condition 对象(其实就是 ConditionObject)。

因此,通过 Lock 对象可以获得 Condition 对象。


Lock lock  = new ReentrantLock();

Condition c1 = lock.newCondition();

Condition c2 = lock.newCondition();


二、Condition的实现分析


实现


ConditionObject 类是 AQS 的内部类,实现了 Condition 接口。


public class ConditionObject implements Condition, java.io.Serializable {

        private transient Node firstWaiter;

        private transient Node lastWaiter;

        ...


可以看到,等待队列和同步队列一样,使用的都是同步器 AQS 中的节点类 Node。

同样拥有首节点和尾节点,

每个 Condition 对象都包含着一个 FIFO 队列。

结构图:



等待


调用 Condition 的 await() 方法会使线程进入等待队列,并释放锁,线程状态变为等待状态。


public final void await() throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    Node node = addConditionWaiter();

    //释放同步状态(锁)

    int savedState = fullyRelease(node);

    int interruptMode = 0;

    //判断节点是否放入同步对列

    while (!isOnSyncQueue(node)) {

        //阻塞

        LockSupport.park(this);

        //如果已经中断了,则退出

        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

            break;

    }

    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

        interruptMode = REINTERRUPT;

            if (node.nextWaiter != null) // clean up if cancelled

            unlinkCancelledWaiters();

            if (interruptMode != 0)

                reportInterruptAfterWait(interruptMode);

}


分析上述方法的大概过程:


1. 将当前线程创建为节点,加入等待队列;

2. 释放锁,唤醒同步队列中的后继节点;

3. while循环判断节点是否放入同步队列:

  • 没有放入,则阻塞,继续 while 循环(如果已经中断了,则退出)

  • 放入,则退出 while 循环,执行后面的判断


4. 退出 while 说明节点已经在同步队列中,调用 acquireQueued() 方法加入同步状态竞争。

5. 竞争到锁后从 await() 方法返回,即退出该方法。



addConditionWaiter() 方法:


private Node addConditionWaiter() {

    Node t = lastWaiter;

    if (t != null && t.waitStatus != Node.CONDITION) {

        //清除条件队列中所有状态不为Condition的节点

        unlinkCancelledWaiters();

        t = lastWaiter;

    }

    //将该线程创建节点,放入等待队列

    Node node = new Node(Thread.currentThread(), Node.CONDITION);

    if (t == null)

        firstWaiter = node;

    else

        t.nextWaiter = node;

    lastWaiter = node;

    return node;

}


过程分析:同步队列的首节点移动到等待队列。加入尾节点之前会清除所有状态不为 Condition 的节点。


通知


调用 Condition 的 signal() 方法,可以唤醒等待队列的首节点(等待时间最长),唤醒之前会将该节点移动到同步队列中。


public final void signal() {

    //判断是否获取了锁

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignal(first);

}


过程:


  1. 先判断当前线程是否获取了锁;

  2. 然后对首节点调用 doSignal() 方法。


private void doSignal(Node first) {

    do {

        if ( (firstWaiter = first.nextWaiter) == null)

            lastWaiter = null;

        first.nextWaiter = null;

    } while (!transferForSignal(first) &&

       (first = firstWaiter) != null);

}


过程:


  1. 修改首节点;

  2. 调用 transferForSignal() 方法将节点移动到同步队列。


final boolean transferForSignal(Node node) {

    //将节点状态变为0   

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

        return false;

    //将该节点加入同步队列

    Node p = enq(node);

    int ws = p.waitStatus;

    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

        LockSupport.unpark(node.thread);

    return true;

}


调用同步器的 enq 方法,将节点移动到同步队列,

满足条件后使用 LockSupport 唤醒该线程。



当 Condition 调用 signalAll() 方法:


public final void signalAll() {

    if (!isHeldExclusively())

        throw new IllegalMonitorStateException();

    Node first = firstWaiter;

    if (first != null)

        doSignalAll(first);

}

private void doSignalAll(Node first) {

    lastWaiter = firstWaiter = null;

    do {

        Node next = first.nextWaiter;

        first.nextWaiter = null;

        transferForSignal(first);

        first = next;

    } while (first != null);

}


可以看到 doSignalAll() 方法使用了 do-while 循环来唤醒每一个等待队列中的节点,直到 first 为 null 时,停止循环。


一句话总结 signalAll() 的作用:将等待队列中的全部节点移动到同步队列中,并唤醒每个节点的线程。


总结


整个过程可以分为三步:


第一步:一个线程获取锁后,通过调用 Condition 的 await() 方法,会将当前线程先加入到等待队列中,并释放锁。然后就在 await() 中的一个 while 循环中判断节点是否已经在同步队列,是则尝试获取锁,否则一直阻塞。


第二步:当线程调用 signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过 doSignal(Node first) 方法将节点移动到同步队列,并唤醒节点中的线程。


第三步:被唤醒的线程,将从 await() 中的 while 循环中退出来,然后调用 acquireQueued() 方法竞争同步状态。竞争成功则退出 await() 方法,继续执行。


【关于投稿】


如果大家有原创好文投稿,请直接给公号发送留言。


① 留言格式:
【投稿】+《 文章标题》+ 文章链接

② 示例:
【投稿】《不要自称是程序员,我十多年的 IT 职场总结》:http://blog.jobbole.com/94148/

③ 最后请附上您的个人简介哈~



看完本文有收获?请转发分享给更多人

关注「ImportNew」,提升Java技能

 
ImportNew 更多文章 Java 中如何模拟真正的同时并发请求? 想转行人工智能?哈佛博士后有话说... Java 小应用日志级别异常处理最佳实践 SpringBoot | 番外:使用小技巧合集 Java 面试题问与答:编译时与运行时
猜您喜欢 「 C++挑战赛 」下周开始,最特别的编程题目等你挑战! 支付宝的性能测试 一封来自Alice的回信 C语言编程程序的内存如何布局 博赛网络一月份开班计划来啦~