程序员社区

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解

1 前言

在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数 据的一种手段。本文会对这些并发工具类进行介绍。

2 等待多线程完成的CountDownLatch

2.1 CountDownLatch案例演示

countdownlatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 countdown 是倒数的意思,类似于我们倒计时的概念。
countdownlatch 提供了两个方法,一个是 countDown,一个是 await,countdownlatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。
现在有一个需求就是:学校放学了,等学生们全部走完之后再关闭教室的门,如果不用CountDownLatch的话代码如下:

public class WithoutCountDownLatchDemo {
    public static void main(String[] args) {
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "同学离开了");
            }).start();
        }
        System.out.println(Thread.currentThread().getName() + "要关门了,此时教室已经没有人了~");
    }
}

输出结果如下:

Thread-0同学离开了
main要关门了,此时教室已经没有人了~
Thread-1同学离开了
Thread-2同学离开了
Thread-3同学离开了
Thread-4同学离开了
Thread-5同学离开了

同学还没有走光,但是门却先关了,这样显然是不对的,那么使用了CountDownLatch的话代码是怎样的呢?

public class CountDownLatchDemo {
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+"同学离开了");
                countDownLatch.countDown();
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+"要关门了,此时教室已经没人了~");
    }
}

运行结果如下:

Thread-0同学离开了
Thread-2同学离开了
Thread-1同学离开了
Thread-3同学离开了
Thread-5同学离开了
Thread-4同学离开了
main要关门了,此时教室已经没人了~

等到同学们全部走完之后,才开始关门,这样才是正确的!
从如下代码中:

public class CountDownLatchDemo2 {
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch=new CountDownLatch(3);
        new Thread(() -> {
            System.out.println(""+Thread.currentThread().getName()+"-执行中");
            countDownLatch.countDown();
            System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
        },"t1").start();
        new Thread(()->{
            System.out.println(""+Thread.currentThread().getName()+"-执行中");
            countDownLatch.countDown();
            System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
        },"t2").start();
        new Thread(()->{
            System.out.println(""+Thread.currentThread().getName()+"-执行中");
            countDownLatch.countDown();
            System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
        },"t3").start();
        countDownLatch.await();
        System.out.println("所有线程执行完毕");
    }
}

可以看出有点类似 join 的功能,但是比 join 更加灵活。CountDownLatch 构造函数会接收一个 int 类型的参数作为计数器的初始值,当调用 CountDownLatch 的countDown 方法时,这个计数器就会减一。通过 await 方法去阻塞去阻塞主流程。

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图
流程图

2.2 CountDownLatch源码分析

2.2.1 CountDownLatch类图

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图1
CountDownLatch继承图

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。countDown() 方法每次调用都会将 state 减 1,直到state 的值为 0;而 await 是一个阻塞方法,当 state 减 为 0 的时候,await 方法才会返回。await 可以被多个线程调用,大家在这个时候脑子里要有个图:所有调用了await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满(state == 0),将线程从队列中一个个唤醒过来。

2.2.2 acquireSharedInterruptibly

countdownlatch 也用到了 AQS,在 CountDownLatch 内部写了一个 Sync 并且继承了 AQS 这个抽象类重写了 AQS中的共享锁方法。首先看到下面这个代码,这块代码主要是判断当前线程是否获取到了共享锁 ; ( 在CountDownLatch 中 , 使 用 的 是 共 享 锁 机 制 ,因为CountDownLatch 并不需要实现互斥的特性)

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //state 如果不等于 0,说明当前线程需要加入到共享锁队列中
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

2.2.3 doAcquireSharedInterruptibly

  1. addWaiter 设置为 shared 模式
  2. tryAcquire 和 tryAcquireShared 的返回值不同,因此会多出一个判断过程
  3. 在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了setHeadAndPropagate 方法,而不是简单的更新一下头节点。
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        //创建一个共享模式的节点添加到队列中
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    // 就判断尝试获取锁
                    int r = tryAcquireShared(arg);
                    //r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //阻塞线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2.2.4 图解分析

加入这个时候有 3 个线程调用了 await 方法,由于这个时候 state 的值还不为 0,所以这三个线程都会加入到 AQS队列中。并且三个线程都处于阻塞状态。

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图2
图解分析

2.2.5 CountDownLatch.countDown

由于线程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的时候才会被唤醒,我们来看看 countdown 做了什么

  1. 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true, 否则只是简单的 state = state - 1
  2. 如果 state=0, 则调用 doReleaseShared 唤醒处于 await 状态下的线程
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

CountDownLatch中的tryReleaseShared

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

AQS.doReleaseShared
共享锁的释放和独占锁的释放有一定的差别,前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点。
PROPAGATE: 标识为 PROPAGATE 状态的节点,是共享锁模式下的节点状态,处于这个状态下的节点,会对线程的唤醒进行传播。

    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
            // 通过检查头节点是否改变了,如果改变了就继续循环
            if (h == head)                   // loop if head changed
                break;
        }
    }

h == head:说明头节点还没有被刚刚用unparkSuccessor 唤醒的线程(这里可以理解为ThreadB)占有,此时 break 退出循环。
h != head:头节点被刚刚唤醒的线程(这里可以理解为ThreadB)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 ThreadB )。我们知道,等到ThreadB 被唤醒后,其实是会主动唤醒 ThreadC...
doAcquireSharedInterruptibly
一旦 ThreadA 被唤醒,代码又会继续回到doAcquireSharedInterruptibly 中来执行。如果当前 state满足=0 的条件,则会执行 setHeadAndPropagate 方法

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        //创建一个共享模式的节点添加到队列中
        boolean failed = true;
        try {
            for (;;) {//被唤醒的线程进入下一次循环继续判断
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

setHeadAndPropagate
这个方法的主要作用是把被唤醒的节点,设置成 head 节 点。 然后继续唤醒队列中的其他线程。由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA被唤醒,并且设置为 head 之后,会继续唤醒后续的ThreadB

   private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

图解分析

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图3
图解分析

3 同步屏障CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

3.1 CyclicBarrier使用场景

使用场景:5个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把5个人看作5个线程,代码如下:
Main类:

public class Main {
    public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            new MyThread("线程-" + (i + 1), barrier).start();
        }
    }
}

MyThread类:

public class MyThread extends Thread{

    private final CyclicBarrier barrier;
    private final Random random = new Random();
    public MyThread(String name, CyclicBarrier barrier) {
        super(name);
        this.barrier = barrier;
    }
    @Override public void run() {
        try {
            Thread.sleep(random.nextInt(2000));
            System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
            barrier.await();
            Thread.sleep(random.nextInt(2000));
            System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");
            barrier.await();
            Thread.sleep(random.nextInt(2000));
            System.out.println(Thread.currentThread().getName() + " - 已经面试结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        super.run();
    }
}

在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。

3.2 CyclicBarrier实现原理

CyclicBarrier基于ReentrantLock+Condition实现。

public class CyclicBarrier { 
    private final ReentrantLock lock = new ReentrantLock(); 
    // 用于线程之间相互唤醒 
    private final Condition trip = lock.newCondition(); 
    // 线程总数 
    private final int parties; 
    private int count; 
    private Generation generation = new Generation(); 
    // ... 
}

下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // 参与方数量
        this.parties = parties;
        this.count = parties;
        // 当所有线程被唤醒时,执行barrierCommand表示的Runnable。
        this.barrierCommand = barrierAction;
    }

接下来看一下await()方法的实现过程。

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();
            // 响应中断
            if (Thread.interrupted()) {
                // 唤醒所有阻塞的线程
                breakBarrier();
                throw new InterruptedException();
            }
            // 每个线程调用一次await(),count都要减1
            int index = --count;
            // 当count减到0的时候,此线程唤醒其他所有线程
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

以上几点的说明:

  1. CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了5个线程,这5个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这5个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
  2. CyclicBarrier 会响应中断。5 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
  3. 上面的回调方法,barrierAction只会被第5个线程执行1次(在唤醒其他4个线程之前),而不是5个线程每个都执行1次。

3.3 CyclicBarrier与CountDownLatch 区别

CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

4 控制并发线程数的Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以 保证合理的使用公共资源

4.1 Semaphore的使用场景

public class SemaphoreDemo {
    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3);//此时海底捞有3个空桌
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println("第"+Thread.currentThread().getName()+"等待者抢到座位。");
                    //假设每桌客人吃饭时间为3S
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println("第"+Thread.currentThread().getName()+"客人吃完饭离开。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

运行结果如下:

第0等待者抢到座位。
第1等待者抢到座位。
第2等待者抢到座位。
第1客人吃完饭离开。
第0客人吃完饭离开。
第2客人吃完饭离开。
第4等待者抢到座位。
第5等待者抢到座位。
第3等待者抢到座位。
第5客人吃完饭离开。
第3客人吃完饭离开。
第4客人吃完饭离开。

4.2 Semaphore源码分析

假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图4
示例

当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:

Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解插图5
继承体系

创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
Semaphore 分公平策略和非公平策略

4.2.1 FairSync

   /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

4.2.2 NofairSync

通过对比发现公平和非公平的区别就在于是否多了一个hasQueuedPredecessors 的判断

   /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

后面的代码和 CountDownLatch 的是完全一样,都是基于共享锁的实现。

5 线程间交换数据的Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交 换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。

5.1 使用场景

Exchanger用于线程之间交换数据,代码如下:

public class ExchangerDemo {
    private static final Random random = new Random();

    public static void main(String[] args) {
        // 建一个多线程共用的exchange对象
        // 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自 己的数据作为参数
        // 传递进去,返回值是另外一个线程调用exchange传进去的参数
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread("线程1") {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调 用exchange为止。
                        String otherData = exchanger.exchange("交换数据1");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();


        new Thread("线程2") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据2");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();

        new Thread("线程3") {
            @Override
            public void run() {
                while (true) {
                    try {
                        String otherData = exchanger.exchange("交换数据3");
                        System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                        Thread.sleep(random.nextInt(2000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }.start();
    }
}

在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。

5.2 Exchanger实现原理

Exchanger的核心机制和Lock一样,也是CAS+park/unpark。

5.2.1 Exchanger内部代码

首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:

public class Exchanger<V> {
    //...
    // 添加了Contended注解,表示伪共享与缓存行填充
    @sun.misc.Contended static final class Node {
        int index;              // Arena index
        int bound;              // Last recorded value of Exchanger.bound
        int collides;           // 本次绑定中,CAS操作失败次数
        int hash;               // 自旋伪随机
        Object item;            // 本线程要交换的数据
        volatile Object match;  //  对方线程交换来的数据
        // 当前线程
        volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
    }

    /** The corresponding thread local class */
    static final class Participant extends ThreadLocal<java.util.concurrent.Exchanger.Node> {
        public java.util.concurrent.Exchanger.Node initialValue() { return new java.util.concurrent.Exchanger.Node(); }
    }
    //...
}

每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。
一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:

    /**
     * Elimination array; null until enabled (within slotExchange).
     * Element accesses use emulation of volatile gets and CAS.
     */
    private volatile Node[] arena;

5.2.2 exchange(V x)实现分析

明白了大致思路,下面来看exchange(V x)方法的详细实现:

    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        Object v;
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        if ((arena != null ||
             (v = slotExchange(item, false, 0L)) == null) &&
            ((Thread.interrupted() || // disambiguates null return
              (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }

上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常
如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
如果slotExchange的返回值是null,并且线程被中断,则抛异常。
如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

slotExchange的实现:

    /**
     * Exchange function used until arenas enabled. See above for explanation.
     * 如果不启用arenas,则使用该方法进行线程间数据交换。
     * @param item 需要交换的数据
     * @param timed 是否是计时等待,true表示是计时等待
     * @param ns  如果是计时等待,该值表示最大等待的时长。
     * @return 对方线程交换来的数据;如果等待超时或线程中断,或者启用了arena,则返回 null。
     */
    private final Object slotExchange(Object item, boolean timed, long ns) {
        // participant在初始化的时候设置初始值为new Node()
        // 获取本线程要交换的数据节点
        Node p = participant.get();
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 如果线程被中断,则返回null。
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;

        for (Node q;;) {
            // 如果slot非空,表明有其他线程在等待该线程交换数据
            if ((q = slot) != null) {
                // CAS操作,将当前线程的slot由slot设置为null
                // 如果操作成功,则执行if中的语句
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    // 获取对方线程交换来的数据
                    Object v = q.item;
                    // 设置要交换的数据
                    q.match = item;
                    // 获取q中阻塞的线程对象
                    Thread w = q.parked;
                    if (w != null)
                        // 如果对方阻塞的线程非空,则唤醒阻塞的线程
                        U.unpark(w);
                    return v;
                }
                // create arena on contention, but continue until slot null
                // 创建arena用于处理多个线程需要交换数据的场合,防止slot冲突
                if (NCPU > 1 && bound == 0 &&
                    U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            // 如果arena不是null,需要调用者调用arenaExchange方法接着获取对方线程交 换来的数据
            else if (arena != null)
                return null; // caller must reroute to arenaExchange
            else {
                // 如果slot为null,表示对方没有线程等待该线程交换数据
                // 设置要交换的本方数据
                p.item = item;
                // 设置当前线程要交换的数据到slot
                // CAS操作,如果设置失败,则进入下一轮for循环
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

        // await release
        // 没有对方线程等待交换数据,将当前线程要交换的数据放到slot中,是一个Node对象
        // 然后阻塞,等待唤醒
        int h = p.hash;
        // 如果是计时等待交换,则计算超时时间;否则设置为0。
        long end = timed ? System.nanoTime() + ns : 0L;
        // 如果CPU核心数大于1,则使用SPINS数,自旋;否则为1,没必要自旋。
        int spins = (NCPU > 1) ? SPINS : 1;
        // 记录对方线程交换来的数据
        Object v;
        // 如果p.match==null,表示还没有线程交换来数据
        while ((v = p.match) == null) {
            // 如果自旋次数大于0,计算hash随机数
            if (spins > 0) {
                // 生成随机数,用于自旋次数控制
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    Thread.yield();
            }
            // p是ThreadLocal记录的当前线程的Node。
            // 如果slot不是p表示slot是别的线程放进去的
            else if (slot != p)
                spins = SPINS;
            else if (!t.isInterrupted() && arena == null &&
                     (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns);
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                // 没有被中断但是超时了,返回TIMED_OUT,否则返回null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // match设置为null值 CAS
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        // 返回获取的对方线程交换来的数据
        return v;
    }

arenaExchange的实现:

   /**
     * Exchange function when arenas enabled. See above for explanation.
     * 当启用arenas的时候,使用该方法进行线程间的数据交换。
     * @param item 本线程要交换的非null数据。
     * @param timed 如果需要计时等待,则设置为true。
     * @param ns 表示计时等待的最大时长。
     * @return 对方线程交换来的数据。如果线程被中断,或者等待超时,则返回null。
     */
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        Node[] a = arena;
        Node p = participant.get();
        // 访问下标为i处的slot数据
        for (int i = p.index;;) {                      // access slot at i
            int b, m, c; long j;                       // j is raw array offset
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 如果q不是null,则将数组的第j个元素由q设置为null
            if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                // 获取对方线程交换来的数据
                Object v = q.item;                     // release
                // 设置本方线程交换的数据
                q.match = item;
                Thread w = q.parked;
                if (w != null)
                    // 如果对方线程非空,则唤醒对方线程
                    U.unpark(w);
                return v;
            }
            // 如果自旋次数没达到边界,且q为null
            else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 提供本方数据
                p.item = item;                         // offer
                // 将arena的第j个元素由null设置为p
                if (U.compareAndSwapObject(a, j, null, p)) {
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait
                    // 自旋等待
                    for (int h = p.hash, spins = SPINS;;) {
                        // 获取对方交换来的数据
                        Object v = p.match;
                        // 如果对方交换来的数据非空
                        if (v != null) {
                            // 将p设置为null,CAS操作
                            U.putOrderedObject(p, MATCH, null);
                            // 清空
                            p.item = null;             // clear for next use
                            p.hash = h;
                            // 返回交换来的数据
                            return v;
                        }
                        // 产生随机数,用于限制自旋次数
                        else if (spins > 0) {
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0)
                                Thread.yield();        // two yields per wait
                        }
                        // 如果arena的第j个元素不是p
                        else if (U.getObjectVolatile(a, j) != p)
                            spins = SPINS;       // releaser hasn't set match yet
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed ||
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);
                            p.parked = null;
                            U.putObject(t, BLOCKER, null);
                        }
                        // arena的第j个元素是p并且CAS设置arena的第j个元素由p设置 为null成功
                        else if (U.getObjectVolatile(a, j) == p &&
                                 U.compareAndSwapObject(a, j, p, null)) {
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1;        // descend
                            // 如果线程被中断,则返回null值
                            if (Thread.interrupted())
                                return null;
                            if (timed && m == 0 && ns <= 0L)
                                // 如果超时,返回TIMED_OUT。
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                }
                else
                    p.item = null;                     // clear offer
            }
            else {
                if (p.bound != b) {                    // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL ||
                         !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                }
                else
                    i = m + 1;                         // grow
                p.index = i;
            }
        }
    }

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java并发编程之并发工具类CountDownLatch,CyclicBarrier,Semaphore详解

一个分享Java & Python知识的社区