JUC源码分析-集合篇(八):SynchronousQueue

SynchronousQueue 是一个同步阻塞队列,它的每个插入操作都要等待其他线程相应的移除操作,反之亦然。SynchronousQueue 像是生产者和消费者的会合通道,它比较适合“切换”或“传递”这种场景:一个线程必须同步等待另外一个线程把相关信息/时间/任务传递给它。在之后的线程池源码分析中我们也会见到它,所以理解本章对我们之后的线程池讲解也会有很大帮助。

概述

SynchronousQueue(后面称SQ)内部没有容量,所以不能通过peek方法获取头部元素;也不能单独插入元素,可以简单理解为它的插入和移除是“一对”对称的操作。为了兼容 Collection 的某些操作(例如contains),SQ 扮演了一个空集合的角色。
SQ 的一个典型应用场景是在线程池中,Executors.newCachedThreadPool() 就使用了它,这个构造使线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

SQ 为等待过程中的生产者或消费者线程提供可选的公平策略(默认非公平模式)。非公平模式通过栈(LIFO)实现,公平模式通过队列(FIFO)实现。使用的数据结构是双重队列(Dual queue)双重栈(Dual stack)(后面详细讲解)。FIFO通常用于支持更高的吞吐量,LIFO则支持更高的线程局部存储(TLS)。

SQ 的阻塞算法可以归结为以下几点:

  • 使用了双重队列(Dual queue)双重栈(Dual stack)存储数据,队列中的每个节点都可以是一个生产者或是消费者。(有关双重队列,请参考笔者另外一篇文章:JUC源码分析-集合篇(四):LinkedTransferQueue,双重栈与它原理一致)
  • 已取消节点引用指向自身,避免垃圾保留和内存损耗
  • 通过自旋和 LockSupport 的 park/unpark 实现阻塞,在高争用环境下,自旋可以显著提高吞吐量。

数据结构

《JUC源码分析-集合篇(八):SynchronousQueue》

SynchronousQueue 继承关系

SQ 有三个内部类:

  1. Transferer:内部抽象类,只有一个transfer方法。SQ的puttake被统一为一个方法(就是这个transfer方法),因为在双重队列/栈数据结构中,puttake操作是对称的,所以几乎所有代码都可以合并。
  2. TransferStack:继承了内部抽象类 Transferer,实现了transfer方法,用于非公平模式下的队列操作,数据按照LIFO的顺序。内部通过单向链表 SNode 实现的双重栈。
  3. TransferQueue:继承了内部抽象类 Transferer,实现了transfer方法,用于公平模式下的队列操作,数据按照FIFO的顺序。内部通过单向链表 QNode 实现的双重队列。

SNode & QNode

SNode 是双重栈的实现,内部除了基础的链表指针和数据外,还维护了一个int型变量mode,它是实现双重栈的关键字段,有三个取值:0代表消费者节点(take),1代表生产者节点(put),2 | mode(mode为当前操作者模式:put or take)代表节点已被匹配。此外还有一个match引用,用于匹配时标识匹配的节点,节点取消等待后match引用指向自身。

QNode 是双重队列的实现,通过isData实现双重队列。这个在JUC源码分析-集合篇(四):LinkedTransferQueue 一篇中有讲解,在此就不再赘述。

源码解析

SQ 的 put/take 操作完全是由transfer方法实现,以put方法为例,

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

可以看到调用了内部变量 transferer 的transfer的方法。其它例如offer、take、poll都与之类似,所以接下来我们主要针对transfer方法,来分析 SQ 公平模式和非公平模式的不同实现。

TransferStack.transfer()

E transfer(E e, boolean timed, long nanos) {
    
    SNode s = null; // constructed/reused as needed
    //根据所传元素判断为生产or消费
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())//head已经被匹配,修改head继续循环
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {//构建新的节点s,放到栈顶
                //等待s节点被匹配,返回s.match节点m
                SNode m = awaitFulfill(s, timed, nanos);
                //s.match==s(等待被取消)
                if (m == s) {               // wait was cancelled
                    clean(s);//清除s节点
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { //head节点还没有被匹配,尝试匹配 try to fulfill
            if (h.isCancelled())            // already cancelled
                //head已经被匹配,修改head继续循环
                casHead(h, h.next);         // pop and retry

            //构建新节点,放到栈顶
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    //cas成功后s的match节点就是s.next,即m
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;

                    if (m.tryMatch(s)) {//尝试匹配,唤醒m节点的线程
                        casHead(s, mn);     //弹出匹配成功的两个节点,替换head pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   //匹配失败,删除m节点,重新循环 help unlink
                }
            }
        } else {                            //头节点正在匹配 help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {//帮助头节点匹配
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

说明:基本算法是循环尝试以下三种行为之一:

  1. 如果栈为空或者已经包含了一个相同的 mode,此时分两种情况:如果是非计时操作(offer、poll)或者已经超时,直接返回null;其他情况下就把当前节点压进栈顶等待匹配(通过awaitFulfill方法),匹配成功后返回匹配节点的 item,如果节点取消等待就调用clean方法(后面单独讲解)清除取消等待的节点,并返回 null。

  2. 如果栈顶节点(head)还没有被匹配(通过isFulfilling方法判断),则把当前节点压入栈顶,并尝试与head节点进行匹配,匹配成功后从栈中弹出这两个节点,并返回匹配节点的数据。isFulfilling源码如下:

/** Returns true if m has fulfilling bit set. */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
  1. 如果栈顶节点(head)已经持有另外一个数据节点,说明栈顶节点正在匹配,则帮助此节点进行匹配操作,然后继续从第一步开始循环。

TransferStack. awaitFulfill()

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    //计算截止时间
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    //计算自旋次数
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())//当前线程被中断
            //取消对给定节点s的匹配节点的等待
            s.tryCancel();
        SNode m = s.match;//获取给定节点s的match节点
        if (m != null)//已经匹配到,返回匹配节点
            return m;
        if (timed) {
            //超时处理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();//超时,取消s节点的匹配,match指向自身
                continue;
            }
        }
        if (spins > 0)
            //spins-1
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            //设置给定节点s的waiter为当前线程
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)//没有设定超时,直接阻塞
            LockSupport.park(this);
        else if (nanos > spinForTimeoutThreshold)//阻塞指定超时时间
            LockSupport.parkNanos(this, nanos);
    }
}

说明:如果当前操作是一个不计时操作,或者是一个还未到超时时间的操作,就构建新的节点压入栈顶。然后调用此方法自旋/阻塞等待给定节点s被匹配。
当调用此方法时,所传参数节点s一定是在栈顶,节点真正阻塞前会先自旋,以防生产者和消费者到达的时间点非常接近时也被 park

当节点/线程需要阻塞时,首先设置waiter字段为当前线程,然后在真正阻塞之前重新检查一下waiter的状态,因为在线程竞争中,需要确认waiter没有被其他线程占用。
从主循环返回的检查顺序可以反映出中断优先于正常返回。除了不计时操作(poll/offer)不会检查中断,而是直接在transfer方法中入栈等待匹配。

TransferQueue.transfer()

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);//判断put or take

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               //尾节点滞后,更新尾节点 lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            //为当前操作构造新节点,并放到队尾
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;

            //推进tail
            advanceTail(t, s);              // swing tail and wait
            //等待匹配,并返回匹配节点的item,如果取消等待则返回该节点s
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s); //等待被取消,清除s节点
                return null;
            }

            if (!s.isOffList()) {           // s节点尚未出列 not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;//item指向自身
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

            //take
        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   //m.item=m, m cancelled
                !m.casItem(x, e)) {         // 匹配,CAS修改item为给定元素e lost CAS
                advanceHead(h, m);          // 推进head,继续向后查找 dequeue and retry
                continue;
            }

            advanceHead(h, m);              //匹配成功,head出列 successfully fulfilled
            LockSupport.unpark(m.waiter);   //唤醒被匹配节点m的线程
            return (x != null) ? (E)x : e;
        }
    }
}

说明:基本算法是循环尝试以下两个动作中的其中一个:

  1. 若队列为空或者队列中的尾节点(tail)和自己的模式相同,则把当前节点添加到队列尾,调用awaitFulfill等待节点被匹配。匹配成功后返回匹配节点的 item,如果等待节点被中断或等待超时返回null。在此期间会不断检查tail节点,如果tail节点被其他线程修改,则向后推进tail继续循环尝试。
    注:TransferQueue 的 awaitFulfill方法与 TransferStack.awaitFulfill算法一致,后面就不再讲解了。

  2. 如果当前操作模式与尾节点(tail)不同,说明可以进行匹配,则从队列头节点head开始向后查找一个互补节点进行匹配,尝试通过CAS修改互补节点的item字段为给定元素e,匹配成功后向后推进head,并唤醒被匹配节点的waiter线程,最后返回匹配节点的item

栈/队列节点清除的对比(clean方法)

在队列和栈中进行清理的方式不同:
对于队列来说,如果节点被取消,我们几乎总是可以以 O1 的时间复杂度移除节点。但是如果节点在队尾,它必须等待后面节点的取消。
对于栈来说,我们可能需要 O(n) 的时间复杂度去遍历整个栈,然后确定节点可被移除,但这可以与访问栈的其他线程并行运行。

下面我们来看一下 TransferStack 和 TransferQueue 对节点清除方法的优化:

TransferStack.clean(SNode s)

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // Absorb cancelled nodes at head
    //找到有效head
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // Unsplice embedded nodes
    //移除head到past中已取消节点的链接
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

说明:在最坏的情况下可能需要遍历整个栈来解除给定节点s的链接(例如给定节点在栈底)。在并发情况下,如果有其他线程已经移除给定节点s,当前线程可能无法看到,但是我们可以使用这样一种算法:
使用s.next作为past节点,如果past节点已经取消,则使用past.next节点,然后依次解除从headpast中已取消节点的链接。在这里不会做更深的检查,因为为了找到失效节点而进行两次遍历是不值得的。

TransferQueue.clean(QNode pred, QNode s)

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread
    
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head
        //找到有效head节点
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)//队列为空,直接返回
            return;
        QNode tn = t.next;
        if (t != tail)//tail节点被其他线程修改,重新循环
            continue;
        //找到tail节点
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            if (sn == s || pred.casNext(s, sn))//cas解除s的链接
                return;
        }
        //s是队列尾节点,此时无法删除s,只能去清除cleanMe节点
        QNode dp = cleanMe;
        if (dp != null) {    // Try unlinking previous cancelled node
            QNode d = dp.next;
            QNode dn;
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  //   has successor
                 dn != d &&                //   that is on list
                 dp.casNext(d, dn)))       // d unspliced
                casCleanMe(dp, null);
            if (dp == pred)
                return;      // s is already saved node
        } else if (casCleanMe(null, pred))//原cleanMe为空,标记pred为cleanMe,延迟清除s节点
            return;          // Postpone cleaning s
    }
}

说明:方法参数中s为已经取消的节点,preds的前继节点。
任何时候在队列中都存在一个不能删除的节点,也就是最后被插入的那个节点(tail节点)。为了满足这一点,在 TransferQueue 中维护了一个cleanMe节点引用。当给定s节点为tail节点时,首先删除cleanMe节点引用;然后保存s的前继节点作为cleanMe节点,在下次清除操作时再清除节点。这样保证了在s节点和cleanMe节点中至少有一个是可以删除的。

小结

本章重点:理解 SynchronousQueue 中双重栈和双重队列的实现;理解 SynchronousQueue 的阻塞算法

作者:泰迪的bagwell
链接:https://www.jianshu.com/p/c4855acb57ec
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    原文作者:JUC
    原文地址: https://blog.csdn.net/yongchao940/article/details/83026882
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞