程序员社区

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等

前言

JUC 高并发容器是基于非阻塞算法(或者无锁编程算法)实现的容器类,无锁编程(Lock Free)算法主要通过 CAS(Compare And Swap)+volatile 组合实现,通过 CAS 保障操作的原子性,通过volatile 保障变量的内存的可见性。无锁编程(Lock Free)算法的主要优点:
(1)开销较小:不需要在内核态和用户态之间切换进程。
(2)读写不互斥:只有写操作需要使用基于 CAS 机制的乐观锁,读读操作之间可以不用互斥。

1.高并发容器分类

JUC 包中提供了 List、Set、Queue、Map 各种类型的高并发容器,如 ConcurrentHashMap、ConcurrentSkipListMap、ConcurrentSkipListSet、CopyOnWriteArrayList 和 CopyOnWriteArraySet。
在性能上,ConcurrentHashMap 通常优于同步的 HashMap,ConcurrentSkipListMap 通常优于同步
的 TreeMap。当读取和遍历操作远远大于列表的更新操作时,CopyOnWriteArrayList 优于同步的ArrayList。

1.1 List

JUC 包中高并发 List 主要有 CopyOnWriteArrayList,对应的基础容器为 ArrayList。
CopyOnWriteArrayList 相当于线程安全的 ArrayList,它实现了 List 接口。在读多写少的场景中,其性能远远高于 ArrayList 的同步包装容器。

1.2 Set

JUC 包中 Set 主要有 CopyOnWriteArraySet、ConcurrentSkipListSet。

  • CopyOnWriteArraySet 继承于 AbstractSet 类,对应的基础容器为 HashSet。其内部组合了一个 CopyOnWriteArrayList 对象,它是核心操作是基于 CopyOnWriteArrayList 实现的。
  • ConcurrentSkipListSet 是线程安全的有序的集合,对应的基础容器为 TreeSet。它继承于
    AbstractSet,并实现了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap
    实现的。

1.3 Map

JUC 包中 Map 主要有 ConcurrentHashMap、ConcurrentSkipListMap。

  • ConcurrentHashMap 对应的基础容器为 HashMap。JDK7 中 ConcurrentHashMap 采用一种更加细粒度的“分段锁(Segment)”加锁机制,JDK8 中采用 CAS 无锁算法。
  • ConcurrentSkipListMap 对应的基础容器为 TreeMap。其内部的 Skip List(跳表)结构是一种可以代替平衡树的数据结构,默认是按照 Key 值升序的。

1.4 Queue

JUC 包中 Queue 的实现类包括三类:单向队列、双向队列、阻塞队列。

  • ConcurrentLinkedQueue 是一个基于列表实现的单向队列,按照 FIFO(先进先出)原则对元素进行排序。新元素从队列尾部插入,而获取队列元素,则需要从队列头部获取。
  • ConcurrentLinkedDeque 是基于链表的双向队列,但是该队列不允许 null 元素。作为双端队列,ConcurrentLinkedDeque 可以当作“栈”来使用,并且高效地支持并发环境。
    除了提供普通的单向、双向队列,JUC 拓展了 Queue,增加了可阻塞的插入和获取等操作,提供了一组阻塞队列,具体如下:
  • ArrayBlockingQueue:基于数组实现的可阻塞的 FIFO 队列
  • LinkedBlockingQueue:基于链表实现的可阻塞的 FIFO 队列
  • PriorityBlockingQueue:按优先级排序的队列
  • DelayQueue:按照元素的 delay 时间进行排序的队列
  • SynchronousQueue:无缓冲等待队列

2. CopyOnWriteArrayList分析

在很多应用场景中,读操作可能会远远大于写操作。由于读操作根本不会修改原有的数据,因此如果每次读取都进行加锁操作,其实是一种资源浪费。我们应该允许多个线程同时访问 List 的内部数据,毕竟读操作是线程安全的。
写时复制(CopyOnWrite,简称 COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个 Accessor(访问器)访问一个资源(如内存或者是磁盘上的数据存储)时,他们会共同获取相同的指针指向相同的资源,只要有一个(修改器)需要修改该资源,系统会复制一份专用 Private Copy(副本)给该 Mutator,而其他 Accessor 所见到的最初的资源仍然保持不变,修改的过程对其他的 Accessor 都是透明的(transparently)。COW 主要的优点是如果没有修改器(mutator)去修改资源,就不会有副本被创建,因此多个 Accessor 可以共享同一份资源。

2.1 CopyOnWriteArrayList 的使用

在不使用CopyOnWriteArrayList 的情况下代码如下:

public class WithoutCopyOnWriteArrayListTest {

    public static class ConcurrentTarget implements Runnable {
        //并发操作的目标队列
        List<String> targetList = null;
        public ConcurrentTarget(List<String> targetList) {
            this.targetList = targetList;
        }

        @Override
        public void run() {
            Iterator<String> iterator = targetList.iterator();
            //迭代操作
            while (iterator.hasNext()) {
                // 在迭代操作时,进行列表的修改
                String threadName = Thread.currentThread().getName();
                System.out.println("开始往同步队列加入线程名称:" + threadName);
                targetList.add(threadName);
            }
        }

        //测试同步队列:在迭代操作时,进行列表的修改
        public static void main(String[] args) {
            List<String> notSafeList = Arrays.asList("a", "b", "c");
            List<String> synList = Collections.synchronizedList(notSafeList);
            //创建一个执行目标
            ConcurrentTarget synchronizedListListDemo =
                    new ConcurrentTarget(synList);
            //10 个线程并发
            for (int i = 0; i < 10; i++) {
                new Thread(synchronizedListListDemo , "线程" + i).start();
            }
            //主线程等待
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行代码会报如下错误:

java.lang.UnsupportedOperationException
    at java.util.AbstractList.add(AbstractList.java:148)
    at java.util.AbstractList.add(AbstractList.java:108)
    at java.util.Collections$SynchronizedCollection.add(Collections.java:2035)
    at com.ymj.study.code10_juc_container.CopyOnWriteArrayListTest$ConcurrentTarget.run(CopyOnWriteArrayListTest.java:33)
    at java.lang.Thread.run(Thread.java:748)

这个时候可使用 CopyOnWriteArrayList 替代 Collections.synchronizedList同步包装实例,具体的代码如下:

public class CopyOnWriteArrayListTest {
    public static class ConcurrentTarget implements Runnable {
        //并发操作的目标队列
        List<String> targetList = null;

        public ConcurrentTarget(List<String> targetList) {
            this.targetList = targetList;
        }

        @Override
        public void run() {
            Iterator<String> iterator = targetList.iterator();
            //迭代操作
            while (iterator.hasNext()) {
                // 在迭代操作时,进行列表的修改
                String threadName = Thread.currentThread().getName();
                System.out.println("开始往同步队列加入线程名称:" + threadName);
                targetList.add(threadName);
            }
        }
    }

    public static void main(String[] args) {
        List<String> notSafeList = Arrays.asList("a", "b", "c");
        //创建一个 CopyOnWriteArrayList 队列
        List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();
        copyOnWriteArrayList.addAll(notSafeList);

        //并发执行目标
        ConcurrentTarget copyOnWriteArrayListDemo =
                new ConcurrentTarget(copyOnWriteArrayList);
        for (int i = 0; i < 10; i++) {
            new Thread(copyOnWriteArrayListDemo, "线程" + i).start();
        }
        //主线程等待
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

运行之后发现UnsupportedOperationException 异常没有了。也就是说,使用CopyOnWriteArrayList 容器,可以在进行元素迭代的同时,又要进行元素添加操作。

2.2 CopyOnWriteArrayList 原理

所谓 CopyOnWrite(写时复制):就是在修改器(mutator)对一块内存进行修改时,不直接在原有内存块上进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来的指针(或者引用)指向新的内存,原来的内存被回收。
CopyOnWriteArrayList 是写时复制思想的一种典型实现: 其含有一个指向操作内存的内部指针 array,而可变操作(add、set 等)是在 array 数组的副本上进行的。当元素需要被修改或者增加的时候,并不直接在 array 指向的原有数组上操作,而是首先对 array 进行一次拷贝,将修改的内容写入拷贝副本中。写完之后,再将内部指针 array 指向新的副本,这样就可以确保修改操作不会影响访问器(accessor)的读取操作了。CopyOnWriteArrayList 的原理,如图所示:

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图
CopyOnWriteArrayList 的原理

CopyOnWriteArrayList 核心成员如下:

public class CopyOnWriteArrayList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    private static final long serialVersionUID = 8673264195747942595L;

    /** The lock protecting all mutators */
    /**
     * 对所有的修改器(mutator)方法进行保护,访问器(accessor)方法并不需要保护
     */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    /**
     * 内部对象数组,通过 getArray/setArray 方法去访问
     */
    private transient volatile Object[] array;

    /**
     * Gets the array.  Non-private so as to also be accessible
     * from CopyOnWriteArraySet class.
     */
    /**
     * 获取内部对象数组
     */
    final Object[] getArray() {
        return array;
    }

    /**
     * Sets the array.
     */
    /**
     * 设置内部对象数组
     */
    final void setArray(Object[] a) {
        array = a;
    }
}

2.3 CopyOnWriteArrayList 读取操作

访问器(accessor)的读取操作没有任何同步控制和锁操作,理由就是内部数组 array 不会发生修改,只会被另外一个 array 替换,因此可以保证数据安全。

   /** 操作内存的引用*/
    private transient volatile Object[] array;
    public E get(int index) {
        return get(getArray(), index);
    }
    //获取元素
    @SuppressWarnings("unchecked")
    private E get(Object[] a, int index) {
        return (E) a[index];
    }
    //返回操作内存
    final Object[] getArray() {
        return array;
    }

2.4 CopyOnWriteArrayList 写入操作

CopyOnWriteArrayList 的写入操作 add( )方法在执行的时候加了独占锁以确保只能有一个线程进行写入操作,避免多线程写的时候会 copy 出多个副本。

   /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 拷贝新数组
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

从 add 操作可以看出,在每次进行添加操作的时候,CopyOnWriteArrayList 底层都是重新 copy了一份数组,再往新的数组中添加数组,待添加完了,再将新的 array 引用指向新的数组。当 add操作完成后,array 的引用就已经指向另一个存储空间了。
那么既然每次添加元素的时候,都会重新复制一份新的数组,那就带来了一个问题,就是增加了内存的开销,如果容器的写操作比较频繁,那么其开销就比较大。所以,在实际应用的时候,CopyOnWriteArrayList 并不适合做添加操作。但是如果在并发场景下,迭代操作比较频繁,那CopyOnWriteArrayList 是个不错的选择。

2.5 CopyOnWriteArrayList 的迭代器实现

CopyOnWriteArray 有自己的迭代器,该迭代器不会检查修改状态,也无需检查状态。为什么呢?因为被迭代的 array 数组是可以说是只读的,不会有其他线程能够修改它。

   static final class COWIterator<E> implements ListIterator<E> {
        /** Snapshot of the array */
        /**对象数组的快照(snapshot)*/
        private final Object[] snapshot;
        /** Index of element to be returned by subsequent call to next.  */
        private int cursor;

        private COWIterator(Object[] elements, int initialCursor) {
            cursor = initialCursor;
            snapshot = elements;
        }

        public boolean hasNext() {
            return cursor < snapshot.length;
        }

        public boolean hasPrevious() {
            return cursor > 0;
        }

        @SuppressWarnings("unchecked")
        //下一个元素
        public E next() {
            if (! hasNext())
                throw new NoSuchElementException();
            return (E) snapshot[cursor++];
        }

        @SuppressWarnings("unchecked")
        public E previous() {
            if (! hasPrevious())
                throw new NoSuchElementException();
            return (E) snapshot[--cursor];
        }

        public int nextIndex() {
            return cursor;
        }

        public int previousIndex() {
            return cursor-1;
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code remove}
         *         is not supported by this iterator.
         */
        public void remove() {
            throw new UnsupportedOperationException();
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code set}
         *         is not supported by this iterator.
         */
        public void set(E e) {
            throw new UnsupportedOperationException();
        }

        /**
         * Not supported. Always throws UnsupportedOperationException.
         * @throws UnsupportedOperationException always; {@code add}
         *         is not supported by this iterator.
         */
        public void add(E e) {
            throw new UnsupportedOperationException();
        }

        @Override
        public void forEachRemaining(Consumer<? super E> action) {
            Objects.requireNonNull(action);
            Object[] elements = snapshot;
            final int size = elements.length;
            for (int i = cursor; i < size; i++) {
                @SuppressWarnings("unchecked") E e = (E) elements[i];
                action.accept(e);
            }
            cursor = size;
        }
    }

迭代器的 snapshot(快照)成员,会在构造迭代器的时候,使用 CopyOnWriteArrayList 的 array成员去初始化,具体如下:

//获取迭代器
 public Iterator<E> iterator() {
 return new COWIterator<E>(getArray(), 0);
 }

 //返回操作内存
 final Object[] getArray() {
      return array;
 }

2.6 CopyOnWriteArrayList总结

CopyOnWriteArrayList 的优点
CopyOnWriteArrayList 有一个显著的优点,那就是读取、遍历操作不需要同步,速度会非常快。所以,CopyOnWriteArrayList 适用于读操作多、写操作相对较少的场景("读多写少"),比如可以在进行“黑名单”拦截时使用 CopyOnWriteArrayList。
CopyOnWriteArrayList 和 ReentrantReadWriteLock 的比较
CopyOnWriteArrayList 和 ReentrantReadWriteLock 读写锁的思想非常类似,读写锁的思想是读读共享、写写互斥、读写互斥、写读互斥。但是 CopyOnWriteArrayList 相比读写锁的又更进一步:为了将读取的性能发挥到极致,CopyOnWriteArrayList 读取是完全不用加锁的,而且写入也不会阻塞读取操作,只有写入和写入之间需要进行同步等待,读操作的性能得到大幅度提升。

3 BlockingQueue分析

在 Java8 中,提供了 7 个阻塞队列

阻塞队列 介绍
ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。
LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序
PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序
DelayQueue 优先级队列实现的无界阻塞队列
SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
LinkedTransferQueue 链表实现的无界阻塞队列
LinkedBlockingDeque 链表实现的双向阻塞队列

3.1 阻塞队列的操作方法

在阻塞队列中,提供了四种处理方式:
1. 插入操作

  • add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。
  • offer(e): 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true
  • put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,知道队列可用
  • offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出
    2. 移除操作
  • remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true
  • poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null
  • take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费
  • poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回

3.2 ArrayBlockingQueue 原理分析

3.2.1 构造方法

ArrayBlockingQueue 提供了三个构造方法,分别如下:
capacity: 表示数组的长度,也就是队列的长度。
fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

  public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        //重入锁,出队和入队持有这一把锁
        lock = new ReentrantLock(fair);
        //初始化非空等待队列
        notEmpty = lock.newCondition();
        //初始化非满等待队列
        notFull =  lock.newCondition();
    }

   public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

3.2.2 Add方法

以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.如果看源码看得比较多的话,一般这种写法都是调用父类的模版方法来解决通用性问题

public boolean add(E e) {
 return super.add(e);
}
   // 从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常
    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
3.2.2.1 offer 方法

add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈。
这段代码做了几个事情:

  1. 判断添加的数据是否为空
  2. 添加重入锁
  3. 判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false
  4. 否则,直接调用 enqueue 将元素添加到队列中
    public boolean offer(E e) {
        //对请求数据做判断
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }
3.2.2.2 enqueue方法

这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items

   /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        //通过 putIndex 对数据赋值
        items[putIndex] = x;
        // 当putIndex 等于数组长度时,将 putIndex 重置为 0
        if (++putIndex == items.length)
            putIndex = 0;
        count++; //记录队列元素的个数
        //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
        notEmpty.signal();
    }

putIndex 为什么会在等于数组长度的时候重新设置为 0?
因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了。
下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。同时还需要思考两个问题:

  1. 当元素满了以后是无法继续添加的,因为会报错
  2. 其次,队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除

    Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图1
    image.png

3.2.3 put方法

put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。这个在最开始的时候说过。接下来看一下
它的实现逻辑:

   public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        /**
         * 这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调
         *  用等待线程的 interrupt 方法来中断等待直接返回。而 lock
         *  方法是尝试获得锁成功后才响应中断
         */
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                //队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图2
put

3.2.4 take方法

take 方法是一种阻塞获取队列中元素的方法它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                //如果队列为空的情况下,直接通过 await 方法阻塞
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图3
take

如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图4
image.png
3.2.4.1 dequeue 方法

这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端,takeIndex,是用来记录拿数据的索引值

  /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //默认获取 0 位置的元素
        E x = (E) items[takeIndex];
        //将该位置的元素设置为空
        items[takeIndex] = null;
        //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据
        if (++takeIndex == items.length)
            takeIndex = 0;
        //记录 元素个数递减
        count--;
        if (itrs != null)
            //同时更新迭代器中的元素数据
            itrs.elementDequeued();
        //触发 因为队列满了以后导致的被阻塞的线程
        notFull.signal();
        return x;
    }

3.2.4.2 itrs.elementDequeued();

ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素

      /**
         * Called whenever an element has been dequeued (at takeIndex).
         */
        void elementDequeued() {
            // assert lock.getHoldCount() == 1;
            if (count == 0)
                queueIsEmpty();
            else if (takeIndex == 0)
                takeIndexWrapped();
        }
    }

itrs.elementDequeued() 是用来更新迭代器中的元素数据的

3.2.4 remove方法

remove 方法是移除一个指定元素。看看它的实现代码

   public boolean remove(Object o) {
        if (o == null) return false;
        //获取数组元素
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        //获得锁
        lock.lock();
        try {
            //如果队列不为空
            if (count > 0) {
                //获取下一个要添加元素时的索引
                final int putIndex = this.putIndex;
                //获取当前要被移除的元素的索引
                int i = takeIndex;
                do {
                    //从takeIndex 下标开始,找到要被删除的元素
                    if (o.equals(items[i])) {
                        //移除指定元素
                        removeAt(i);
                        //返回执行结果
                        return true;
                    }
                    //当前删除索引执行加 1 后判断是否与数组长度相等
                    //若为 true,说明索引已到数组尽头,将 i 设置为 0
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex); //继续查找,直到找到最后一个元素
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

4 BlockingDeque分析

BlockingDeque定义了一个阻塞的双端队列接口,如下所示

public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> { 
    void putFirst(E e) throws InterruptedException; 
    void putLast(E e) throws InterruptedException; 
    E takeFirst() throws InterruptedException; 
    E takeLast() throws InterruptedException; 
    // ... 
}

该接口继承了BlockingQueue接口,同时增加了对应的双端队列操作接口。该接口只有一个实现,就是LinkedBlockingDeque。
其核心数据结构如下所示,是一个双向链表。

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements
        BlockingDeque<E>, java.io.Serializable {
    static final class Node<E> {
        E item; Node<E> prev; // 双向链表的Node Node<E> next;
        Node(E x) {
            item = x;
        }
    }
    transient Node<E> first; // 队列的头和尾
    transient Node<E> last;
    private transient int count; // 元素个数
    private final int capacity; // 容量
    // 一把锁+两个条件
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.netCondition();
    private final Condition notFull = lock.newCondition();
    // ...
}

对应的实现原理,和LinkedBlockingQueue基本一样,只是LinkedBlockingQueue是单向链表,而LinkedBlockingDeque是双向链表。

5 ConcurrentLinkedQueue/Deque

AQS内部的阻塞队列实现原理:基于双向链表,通过对head/tail进行CAS操作,实现入队和出队。ConcurrentLinkedQueue 的实现原理和AQS 内部的阻塞队列类似:同样是基于 CAS,同样是通过head/tail指针记录队列头部和尾部,但还是有稍许差别。
首先,它是一个单向链表,定义如下:

public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements
        Queue<E>, java.io.Serializable {
    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
        //...
    }
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    //...
}

其次,在AQS的阻塞队列中,每次入队后,tail一定后移一个位置;每次出队,head一定后移一个位置,以保证head指向队列头部,tail指向链表尾部。但在ConcurrentLinkedQueue中,head/tail的更新可能落后于节点的入队和出队,因为它不是直接对 head/tail指针进行 CAS操作的,而是对 Node中的 item进行操作。下面进行详细分析:

5.1 初始化

初始的时候, head 和 tail 都指向一个 null 节点。对应的代码如下。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图5
image.png

5.2 入队列

代码如下所示:

   public boolean offer(E e) {
        checkNotNull(e);
        final Node<E> newNode = new Node<E>(e);

        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            if (q == null) {
                // p is last node
                // 对tail的next指针而不是对tail指针执行CAS操作
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    if (p != t) // hop two nodes at a time
                        // 每入队两个节点后移一次tail指针
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                // 已经到达队列尾部
                p = (t != (t = tail)) ? t : head;
            else
                // Check for tail updates after two hops.
                // 后移p指针
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

上面的入队其实是每次在队尾追加2个节点时,才移动一次tail节点。如下图所示:
初始的时候,队列中有1个节点item1,tail指向该节点,假设线程1要入队item2节点:
step1: p=tail,q=p.next=NULL.
step2:对p的next执行CAS操作,追加item2,成功之后,p=tail。所以上面的casTail方法不会执
行,直接返回。此时tail指针没有变化。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图6
image.png

之后,假设线程2要入队item3节点,如下图所示:
step3: p=tail,q=p.next.
step4:q!=NULL,因此不会入队新节点。p,q都后移1位。
step5:q=NULL,对p的next执行CAS操作,入队item3节点。
step6:p!=t,满足条件,执行上面的casTail操作,tail后移2个位置,到达队列尾部。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图7
image.png

总结出以下关键点:

  1. 即使tail指针没有移动,只要对p的next指针成功进行CAS操作,就算成功入队列。
  2. 只有当 p != tail的时候,才会后移tail指针。也就是说,每连续追加2个节点,才后移1次tail指针。即使CAS失败也没关系,可以由下1个线程来移动tail指针。

5.3 出队列

上面说了入队列之后,tail指针不变化,那是否会出现入队列之后,要出队列却没有元素可出的情况呢?

 public E poll() {
        restartFromHead:
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {
                E item = p.item;
                // 出队列的时候,并没有移动head指针,而是把item设置为null
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    if (p != h) // hop two nodes at a time
                        // 每产生2个null节点,才把head指针后移两位
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

出队列的代码和入队列类似,也有p、q2个指针,整个变化过程如图5-8所示。假设初始的时候head指向空节点,队列中有item1、item2、item3 三个节点。
step1:p=head,q=p.next.p!=q.
step2:后移p指针,使得p=q。
step3:出队列。关键点:此处并没有直接删除item1节点,只是把该节点的item通过CAS操作置为了NULL。
step4:p!=head,此时队列中有了2个 NULL 节点,再前移1次head指针,对其执行updateHead操作。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图8
image.png

总结:

  1. 出队列的判断并非观察 tail 指针的位置,而是依赖于 head 指针后续的节点是否为NULL这一条件。
  2. 只要对节点的item执行CAS操作,置为NULL成功,则出队列成功。即使head指针没有成功移动,也可以由下1个线程继续完成。

5.4 队列判空

因为head/tail 并不是精确地指向队列头部和尾部,所以不能简单地通过比较 head/tail 指针来判断队列是否为空,而是需要从head指针开始遍历,找第1个不为NULL的节点。如果找到,则队列不为空;如果找不到,则队列为空。代码如下所示:

   public boolean isEmpty() {
        // 寻找第一个不是null的节点
        return first() == null;
    }
    Node<E> first() {
        restartFromHead:
        for (;;) {
            // 从head指针开始遍历,查找第一个不是null的节点
            for (Node<E> h = head, p = h, q;;) {
                boolean hasItem = (p.item != null);
                if (hasItem || (q = p.next) == null) {
                    updateHead(h, p);
                    return hasItem ? p : null;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

6. ConcurrentHashMap解析

Java并发编程之并发容器ConcurrentHashMap详解

7. ConcurrentSkipListMap/Set

ConcurrentHashMap 是一种 key 无序的 HashMap,ConcurrentSkipListMap则是 key 有序的,实现了NavigableMap接口,此接口又继承了SortedMap接口。

7.1 ConcurrentSkipListMap

7.1.1 为什么要使用SkipList实现Map?

在Java的util包中,有一个非线程安全的HashMap,也就是TreeMap,是key有序的,基于红黑树实现。
而在Concurrent包中,提供的key有序的HashMap,也就是ConcurrentSkipListMap,是基于SkipList(跳查表)来实现的。这里为什么不用红黑树,而用跳查表来实现呢?
借用Doug Lea的原话:

The reason is that there are no known efficient lock0free insertion and deletion algorithms for search trees.

也就是目前计算机领域还未找到一种高效的、作用在树上的、无锁的、增加和删除节点的办法。
那为什么SkipList可以无锁地实现节点的增加、删除呢?这要从无锁链表的实现说起。

7.1.2 无锁链表

之前讲的无锁队列、栈,都是只在队头、队尾进行CAS操作,通常不会有问题。如果在链表的中间进行插入或删除操作,按照通常的CAS做法,就会出现问题!
操作1:在节点10后面插入节点20。如下图所示,首先把节点20的next指针指向节点30,然后对节点10的next指针执行CAS操作,使其指向节点20即可。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图9
操作1

操作2:删除节点10。如下图所示,只需把头节点的next指针,进行CAS操作到节点30即可。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图10
操作2

但是,如果两个线程同时操作,一个删除节点10,一个要在节点10后面插入节点20。并且这两个操作都各自是CAS的,此时就会出现问题。如下图所示,删除节点10,会同时把新插入的节点20也删除掉!这个问题超出了CAS的解决范围。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图11
image.png

为什么会出现这个问题呢
原因: 在删除节点10的时候,实际受到操作的是节点10的前驱,也就是头节点。节点10本身没有任何变化。故而,再往节点10后插入节点20的线程,并不知道节点10已经被删除了!
针对这个问题,在论文中提出了如下的解决办法,如下图所示,把节点 10 的删除分为两2步:
第一步,把节点10的next指针,mark成删除,即软删除;
第二步,找机会,物理删除。
做标记之后,当线程再往节点10后面插入节点20的时候,便可以先进行判断,节点10是否已经被删除,从而避免在一个删除的节点10后面插入节点20。这个解决方法有一个关键点:“把节点10的next指针指向节点20(插入操作)”和“判断节点10本身是否已经删除(判断操作)”,必须是原子的,必须在1 个CAS操作里面完成!

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图12
image.png

具体的实现有两个办法:
办法一:AtomicMarkableReference
保证每个 next 是 AtomicMarkableReference 类型。但这个办法不够高效,Doug Lea 在ConcurrentSkipListMap的实现中用了另一种办法。
办法2:Mark节点
我们的目的是标记节点10已经删除,也就是标记它的next字段。那么可以新造一个marker节点,使节点10的next指针指向该Marker节点。这样,当向节点10的后面插入节点20的时候,就可以在插入的同时判断节点10的next指针是否指向了一个Marker节点,这两个操作可以在一个CAS操作里面完成。

7.1.3 跳查表

解决了无锁链表的插入或删除问题,也就解决了跳查表的一个关键问题。因为跳查表就是多层链表叠起来的
下面先看一下跳查表的数据结构:

static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K, V> next;

        /**
         * Creates a new regular node.
         */
        Node(K key, Object value, Node<K, V> next) {
            this.key = key;
            this.value = value;
            this.next = next;
        }
        //...
    }

上图中的Node就是跳查表底层节点类型。所有的<K, V>对都是由这个单向链表串起来的。
上面的Index层的节点:

   static class Index<K,V> {
        final Node<K, V> node;
        final Index<K, V> down;
        volatile Index<K, V> right;

        /**
         * Creates index node with given values.
         */
        Index(Node<K, V> node, Index<K, V> down, Index<K, V> right) {
            this.node = node;
            this.down = down;
            this.right = right;
        }
        //...
    }

上图中的node属性不存储实际数据,指向Node节点。
down属性:每个Index节点,必须有一个指针,指向其下一个Level对应的节点。
right属性:Index也组成单向链表。
整个ConcurrentSkipListMap就只需要记录顶层的head节点即可:

public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
    // ... 
    private transient Index<K,V> head; 
    // ... 
}
Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图13
image.png

下面详细分析如何从跳查表上查找、插入和删除元素。

7.1.4 put实现分析

  private V doPut(K key, V value, boolean onlyIfAbsent) {
        Node<K,V> z;             // added node
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                if (n != null) {
                    Object v; int c;
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    if ((v = n.value) == null) {   // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (b.value == null || v == n) // b is deleted
                        break;
                    if ((c = cpr(cmp, key, n.key)) > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {
                        if (onlyIfAbsent || n.casValue(v, value)) {
                            @SuppressWarnings("unchecked") V vv = (V)v;
                            return vv;
                        }
                        break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                z = new Node<K,V>(key, value, n);
                if (!b.casNext(n, z))
                    break;         // restart if lost race to append to b
                break outer;
            }
        }

        int rnd = ThreadLocalRandom.nextSecondarySeed();
        if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
            int level = 1, max;
            while (((rnd >>>= 1) & 1) != 0)
                ++level;
            Index<K,V> idx = null;
            HeadIndex<K,V> h = head;
            if (level <= (max = h.level)) {
                for (int i = 1; i <= level; ++i)
                    idx = new Index<K,V>(z, idx, null);
            }
            else { // try to grow by one level
                level = max + 1; // hold in array and later pick the one to use
                @SuppressWarnings("unchecked")Index<K,V>[] idxs =
                    (Index<K,V>[])new Index<?,?>[level+1];
                for (int i = 1; i <= level; ++i)
                    idxs[i] = idx = new Index<K,V>(z, idx, null);
                for (;;) {
                    h = head;
                    int oldLevel = h.level;
                    if (level <= oldLevel) // lost race to add level
                        break;
                    HeadIndex<K,V> newh = h;
                    Node<K,V> oldbase = h.node;
                    for (int j = oldLevel+1; j <= level; ++j)
                        newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
                    if (casHead(h, newh)) {
                        h = newh;
                        idx = idxs[level = oldLevel];
                        break;
                    }
                }
            }
            // find insertion points and splice in
            splice: for (int insertionLevel = level;;) {
                int j = h.level;
                for (Index<K,V> q = h, r = q.right, t = idx;;) {
                    if (q == null || t == null)
                        break splice;
                    if (r != null) {
                        Node<K,V> n = r.node;
                        // compare before deletion check avoids needing recheck
                        int c = cpr(cmp, key, n.key);
                        if (n.value == null) {
                            if (!q.unlink(r))
                                break;
                            r = q.right;
                            continue;
                        }
                        if (c > 0) {
                            q = r;
                            r = r.right;
                            continue;
                        }
                    }

                    if (j == insertionLevel) {
                        if (!q.link(r, t))
                            break; // restart
                        if (t.node.value == null) {
                            findNode(key);
                            break splice;
                        }
                        if (--insertionLevel == 0)
                            break splice;
                    }

                    if (--j >= insertionLevel && j < level)
                        t = t.down;
                    q = q.down;
                    r = q.right;
                }
            }
        }
        return null;
    }

在底层,节点按照从小到大的顺序排列,上面的index层间隔地串在一起,因为从小到大排列。查找的时候,从顶层index开始,自左往右、自上往下,形成图示的遍历曲线。假设要查找的元素是32,遍历过程如下:
先遍历第2层Index,发现在21的后面;
从21下降到第1层Index,从21往后遍历,发现在21和35之间;
从21下降到底层,从21往后遍历,最终发现在29和35之间。
在整个的查找过程中,范围不断缩小,最终定位到底层的两个元素之间。

Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等插图14
image.png

在put代码中,通过findPredecessor找到了待插入的元素在[b,n]之间之后,并不能马上插入。因为其他线程也在操作这个链表,b、n都有可能被删除,所以在插入之前执行了一系列的检查逻辑,而这也正是无锁链表的复杂之处。

7.1.5 remove实现分析

   // 若找到了(key, value)就删除,并返回value;找不到就返回null
    final V doRemove(Object key, Object value) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                    // inconsistent read
                    break;
                if ((v = n.value) == null) {        // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)      // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) < 0)
                    break outer;
                if (c > 0) {
                    b = n;
                    n = f;
                    continue;
                }
                if (value != null && !value.equals(v))
                    break outer;
                if (!n.casValue(v, null))
                    break;
                if (!n.appendMarker(f) || !b.casNext(n, f))
                    findNode(key);                  // retry via findNode
                else {
                    findPredecessor(key, cmp);      // clean index
                    if (head.right == null)
                        tryReduceLevel();
                }
                @SuppressWarnings("unchecked") V vv = (V)v;
                return vv;
            }
        }
        return null;
    }

上面的删除方法和插入方法的逻辑非常类似,因为无论是插入,还是删除,都要先找到元素的前驱,也就是定位到元素所在的区间[b,n]。在定位之后,执行下面几个步骤:

  1. 如果发现b、n已经被删除了,则执行对应的删除清理逻辑;
  2. 否则,如果没有找到待删除的(k, v),返回null;
  3. 如果找到了待删除的元素,也就是节点n,则把n的value置为null,同时在n的后面加上Marker节点,同时检查是否需要降低Index的层次。

7.1.6 get实现分析

private V doGet(Object key) {
        if (key == null)
            throw new NullPointerException();
        Comparator<? super K> cmp = comparator;
        outer: for (;;) {
            for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
                Object v; int c;
                if (n == null)
                    break outer;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                if ((v = n.value) == null) {    // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (b.value == null || v == n)  // b is deleted
                    break;
                if ((c = cpr(cmp, key, n.key)) == 0) {
                    @SuppressWarnings("unchecked") V vv = (V)v;
                    return vv;
                }
                if (c < 0)
                    break outer;
                b = n;
                n = f;
            }
        }
        return null;
    }

无论是插入、删除,还是查找,都有相似的逻辑,都需要先定位到元素位置[b,n],然后判断b、n是否已经被删除,如果是,则需要执行相应的删除清理逻辑。这也正是无锁链表复杂的地方。

7.2 ConcurrentSkipListSet

如下面代码所示,ConcurrentSkipListSet只是对ConcurrentSkipListMap的简单封装。

public class ConcurrentSkipListSet<E>
        extends AbstractSet<E>
        implements NavigableSet<E>, Cloneable, java.io.Serializable {
    // 封装了一个ConcurrentSkipListMap
    private final ConcurrentNavigableMap<E,Object> m;
    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }
    public boolean add(E e) {
        return m.putIfAbsent(e, Boolean.TRUE) == null;
    }
    // ... 
}

以上内容就是对Java并发编程中并发容器的一些介绍,其中阻塞队列中还有很多并没有一一赘述了。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Java并发编程之并发容器 CopyOnWrite,ConcurrentSkipListMap/Set,阻塞队列等

一个分享Java & Python知识的社区