• 技术文章 >java >java基础

    java中SynchronousQueue的核心方法

    小妮浅浅小妮浅浅2021-02-08 19:22:09原创2947

    本教程操作环境:windows7系统、java10版,DELL G3电脑。

    1.transfer概念

    进行匹配交换数据,SynchronousQueue内部使用Transferer来交换元素。

    (1) 传入元素e,是生产者(put方法),

    (2) 传入null,是消费者(take方法)。

    2.使用场景

    (1)当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。

    (2)如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。

    3.实例

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    27

    28

    29

    30

    31

    32

    33

    34

    35

    36

    37

    38

    39

    40

    41

    42

    43

    44

    45

    46

    47

    48

    49

    50

    51

    52

    53

    54

    55

    56

    57

    58

    59

    60

    61

    62

    63

    64

    65

    66

    67

    68

    69

    70

    71

    72

    73

    74

    75

    76

    77

    78

    79

    80

    81

    82

    83

    84

    85

    86

    87

    88

    89

    90

    91

    92

    93

    94

    95

    96

    97

    98

    99

    100

    101

    102

    103

    104

    105

    106

    107

    108

    109

    110

    111

    112

    113

    114

    115

    116

    117

    118

    119

    120

    121

    122

    123

    124

    125

    126

    127

    128

    129

    130

    131

    132

    133

    134

    135

    136

    137

    138

    139

    140

    141

    142

    143

    144

    145

    146

    147

    148

    149

    150

    151

    152

    153

    154

    155

    156

    157

    158

    159

    160

    161

    162

    // TransferStack.transfer()方法

    E transfer(E e, boolean timed, long nanos) {

        SNode s = null; // constructed/reused as needed

        // 根据e是否为null决定是生产者还是消费者

        int mode = (e == null) ? REQUEST : DATA;

        // 自旋+CAS,熟悉的套路,熟悉的味道

        for (;;) {

            // 栈顶元素

            SNode h = head;

            // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的

            // 也就是都是生产者节点或者都是消费者节点

            if (h == null || h.mode == mode) {  // empty or same-mode

                // 如果有超时而且已到期

                if (timed && nanos <= 0) {      // can't wait

                    // 如果头节点不为空且是取消状态

                    if (h != null && h.isCancelled())

                        // 就把头节点弹出,并进入下一次循环

                        casHead(h, h.next);     // pop cancelled node

                    else

                        // 否则,直接返回null(超时返回null)

                        return null;

                } else if (casHead(h, s = snode(s, e, h, mode))) {

                    // 入栈成功(因为是模式相同的,所以只能入栈)

                    // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到

                    SNode m = awaitFulfill(s, timed, nanos);

                    // 如果m等于s,说明取消了,那么就把它清除掉,并返回null

                    if (m == s) {               // wait was cancelled

                        clean(s);

                        // 被取消了返回null

                        return null;

                    }

      

                    // 到这里说明匹配到元素了

                    // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了

      

                    // 如果头节点不为空,并且头节点的下一个节点是s

                    // 就把头节点换成s的下一个节点

                    // 也就是把h和s都弹出了

                    // 也就是把栈顶两个元素都弹出了

                    if ((h = head) != null && h.next == s)

                        casHead(h, s.next);     // help s's fulfiller

                    // 根据当前节点的模式判断返回m还是s中的值

                    return (E) ((mode == REQUEST) ? m.item : s.item);

                }

            } else if (!isFulfilling(h.mode)) { // try to fulfill

                // 到这里说明头节点和当前节点模式不一样

                // 如果头节点不是正在撮合中

      

                // 如果头节点已经取消了,就把它弹出栈

                if (h.isCancelled())            // already cancelled

                    casHead(h, h.next);         // pop and retry

                else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {

                    // 头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配

                    // 且s成为了新的头节点,它的状态是正在撮合中

                    for (;;) { // loop until matched or waiters disappear

                        SNode m = s.next;       // m is s's match

                        // 如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了

                        // 就清空栈并跳出内部循环,到外部循环再重新入栈判断

                        if (m == null) {        // all waiters are gone

                            casHead(s, null);   // pop fulfill node

                            s = null;           // use new node next time

                            break;              // restart main loop

                        }

                        SNode mn = m.next;

                        // 如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s

                        if (m.tryMatch(s)) {

                            casHead(s, mn);     // pop both s and m

                            // 返回撮合结果

                            return (E) ((mode == REQUEST) ? m.item : s.item);

                        } else                  // lost match

                            // 尝试撮合失败,说明m已经先一步被其它线程撮合了

                            // 就协助清除它

                            s.casNext(m, mn);   // help unlink

                    }

                }

            } else {                            // help a fulfiller

                // 到这里说明当前节点和头节点模式不一样

                // 且头节点是正在撮合中

      

                SNode m = h.next;               // m is h's match

                if (m == null)                  // waiter is gone

                    // 如果m为null,说明m已经被其它线程先一步撮合了

                    casHead(h, null);           // pop fulfilling node

                else {

                    SNode mn = m.next;

                    // 协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s

                    if (m.tryMatch(h))          // help match

                        // 将栈顶的两个元素弹出后,再让s重新入栈

                        casHead(h, mn);         // pop both h and m

                    else                        // lost match

                        // 尝试撮合失败,说明m已经先一步被其它线程撮合了

                        // 就协助清除它

                        h.casNext(m, mn);       // help unlink

                }

            }

        }

    }

      

    // 三个参数:需要等待的节点,是否需要超时,超时时间

    SNode awaitFulfill(SNode s, boolean timed, long nanos) {

        // 到期时间

        final long deadline = timed ? System.nanoTime() + nanos : 0L;

        // 当前线程

        Thread w = Thread.currentThread();

        // 自旋次数

        int spins = (shouldSpin(s) ?

                     (timed ? maxTimedSpins : maxUntimedSpins) : 0);

        for (;;) {

            // 当前线程中断了,尝试清除s

            if (w.isInterrupted())

                s.tryCancel();

      

            // 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s)

            SNode m = s.match;

            // 如果匹配到了,直接返回m

            if (m != null)

                return m;

      

            // 如果需要超时

            if (timed) {

                // 检查超时时间如果小于0了,尝试清除s

                nanos = deadline - System.nanoTime();

                if (nanos <= 0L) {

                    s.tryCancel();

                    continue;

                }

            }

            if (spins > 0)

                // 如果还有自旋次数,自旋次数减一,并进入下一次自旋

                spins = shouldSpin(s) ? (spins-1) : 0;

      

            // 后面的elseif都是自旋次数没有了

            else if (s.waiter == null)

                // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋

                s.waiter = w; // establish waiter so can park next iter

            else if (!timed)

                // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素

                LockSupport.park(this);

            else if (nanos > spinForTimeoutThreshold)

                // 如果允许超时且还有剩余时间,就阻塞相应时间

                LockSupport.parkNanos(this, nanos);

        }

    }

      

        // SNode里面的方向,调用者m是s的下一个节点

        // 这时候m节点的线程应该是阻塞状态的

        boolean tryMatch(SNode s) {

            // 如果m还没有匹配者,就把s作为它的匹配者

            if (match == null &&

                UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {

                Thread w = waiter;

                if (w != null) {    // waiters need at most one unpark

                    waiter = null;

                    // 唤醒m中的线程,两者匹配完毕

                    LockSupport.unpark(w);

                }

                // 匹配到了返回true

                return true;

            }

            // 可能其它线程先一步匹配了m,返回其是否是s

            return match == s;

    }

    以上就是java中SynchronousQueue的核心方法,相信已经本篇对于transfer方法的学习,在有关入队和出队的操作上就会进行的比较顺利,学会后一定要加强这方面使用方法的记忆。

    专题推荐:java synchronousqueue核心方法
    上一篇:SynchronousQueue在java中的元素增减 下一篇:java PriorityBlockingQueue的使用

    相关文章推荐

    • java中SynchronousQueue是什么意思• java中SynchronousQueue的原理• SynchronousQueue在java中的元素增减

    全部评论我要评论

    © 2021 Python学习网 苏ICP备2021003149号-1

  • 取消发布评论
  • 

    Python学习网