本教程操作环境:windows7系统、java10版,DELL G3电脑。
1.transfer概念
进行匹配交换数据,SynchronousQueue内部使用Transferer来交换元素。
(1) 传入元素e,是生产者(put方法),
(2) 传入null,是消费者(take方法)。
2.使用场景
(1)当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程)。这种情况下,将当前线程加入到等待队列即可。
(2)如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。这种情况下,匹配等待队列的队头,出队,返回相应数据。
3.实例
// TransferStack.transfer()方法 E transfer(E e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed // 根据e是否为null决定是生产者还是消费者 int mode = (e == null) ? REQUEST : DATA; // 自旋+CAS,熟悉的套路,熟悉的味道 for (;;) { // 栈顶元素 SNode h = head; // 栈顶没有元素,或者栈顶元素跟当前元素是一个模式的 // 也就是都是生产者节点或者都是消费者节点 if (h == null || h.mode == mode) { // empty or same-mode // 如果有超时而且已到期 if (timed && nanos <= 0) { // can't wait // 如果头节点不为空且是取消状态 if (h != null && h.isCancelled()) // 就把头节点弹出,并进入下一次循环 casHead(h, h.next); // pop cancelled node else // 否则,直接返回null(超时返回null) return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // 入栈成功(因为是模式相同的,所以只能入栈) // 调用awaitFulfill()方法自旋+阻塞当前入栈的线程并等待被匹配到 SNode m = awaitFulfill(s, timed, nanos); // 如果m等于s,说明取消了,那么就把它清除掉,并返回null if (m == s) { // wait was cancelled clean(s); // 被取消了返回null return null; } // 到这里说明匹配到元素了 // 因为从awaitFulfill()里面出来要不被取消了要不就匹配到了 // 如果头节点不为空,并且头节点的下一个节点是s // 就把头节点换成s的下一个节点 // 也就是把h和s都弹出了 // 也就是把栈顶两个元素都弹出了 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller // 根据当前节点的模式判断返回m还是s中的值 return (E) ((mode == REQUEST) ? m.item : s.item); } } else if (!isFulfilling(h.mode)) { // try to fulfill // 到这里说明头节点和当前节点模式不一样 // 如果头节点不是正在撮合中 // 如果头节点已经取消了,就把它弹出栈 if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 头节点没有在撮合中,就让当前节点先入队,再让他们尝试匹配 // 且s成为了新的头节点,它的状态是正在撮合中 for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match // 如果m为null,说明除了s节点外的节点都被其它线程先一步撮合掉了 // 就清空栈并跳出内部循环,到外部循环再重新入栈判断 if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; // 如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m // 返回撮合结果 return (E) ((mode == REQUEST) ? m.item : s.item); } else // lost match // 尝试撮合失败,说明m已经先一步被其它线程撮合了 // 就协助清除它 s.casNext(m, mn); // help unlink } } } else { // help a fulfiller // 到这里说明当前节点和头节点模式不一样 // 且头节点是正在撮合中 SNode m = h.next; // m is h's match if (m == null) // waiter is gone // 如果m为null,说明m已经被其它线程先一步撮合了 casHead(h, null); // pop fulfilling node else { SNode mn = m.next; // 协助匹配,如果m和s尝试撮合成功,就弹出栈顶的两个元素m和s if (m.tryMatch(h)) // help match // 将栈顶的两个元素弹出后,再让s重新入栈 casHead(h, mn); // pop both h and m else // lost match // 尝试撮合失败,说明m已经先一步被其它线程撮合了 // 就协助清除它 h.casNext(m, mn); // help unlink } } } } // 三个参数:需要等待的节点,是否需要超时,超时时间 SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 到期时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 当前线程 Thread w = Thread.currentThread(); // 自旋次数 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { // 当前线程中断了,尝试清除s if (w.isInterrupted()) s.tryCancel(); // 检查s是否匹配到了元素m(有可能是其它线程的m匹配到当前线程的s) SNode m = s.match; // 如果匹配到了,直接返回m if (m != null) return m; // 如果需要超时 if (timed) { // 检查超时时间如果小于0了,尝试清除s nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) // 如果还有自旋次数,自旋次数减一,并进入下一次自旋 spins = shouldSpin(s) ? (spins-1) : 0; // 后面的elseif都是自旋次数没有了 else if (s.waiter == null) // 如果s的waiter为null,把当前线程注入进去,并进入下一次自旋 s.waiter = w; // establish waiter so can park next iter else if (!timed) // 如果不允许超时,直接阻塞,并等待被其它线程唤醒,唤醒后继续自旋并查看是否匹配到了元素 LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) // 如果允许超时且还有剩余时间,就阻塞相应时间 LockSupport.parkNanos(this, nanos); } } // SNode里面的方向,调用者m是s的下一个节点 // 这时候m节点的线程应该是阻塞状态的 boolean tryMatch(SNode s) { // 如果m还没有匹配者,就把s作为它的匹配者 if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; // 唤醒m中的线程,两者匹配完毕 LockSupport.unpark(w); } // 匹配到了返回true return true; } // 可能其它线程先一步匹配了m,返回其是否是s return match == s; }
以上就是java中SynchronousQueue的核心方法,相信已经本篇对于transfer方法的学习,在有关入队和出队的操作上就会进行的比较顺利,学会后一定要加强这方面使用方法的记忆。