性能文章>几张图彻底搞懂几个好玩的并发辅助类和阻塞队列>

几张图彻底搞懂几个好玩的并发辅助类和阻塞队列原创

1年前
274123

去年问我怎么学Java的那个五年级小学生又来向我问问题了,说Java中线程同步有没有好用的工具类。幸亏没有问我什么算法,瑟瑟发抖。这个我倒是还挺在行的,了解到他喜欢熊出没,于是我就用熊作为主角,给他分享了几个Java并发框架中的辅助工具类。(呼~松了一口气)

看文本片文章,你将了解到:

  1. CountDownLatch的工作原理和使用场景;

  2. CyclicBarrier的工作原理和使用场景;

  3. Semaphore的工作原理和使用场景;

  4. 对各种阻塞队列的实现原理都有一定的了解,以及了解他们的使用场景。

注意,如果您没有阅读过之前发的   一文读懂并发包中的读写锁和Condition实现原理 强烈建议您先阅读,有助于理解本文中的内容。

1、闭锁 CountDownLatch

一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。

使用场景:在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()await(),前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。

方法详细介绍:CountDownLatch的介绍和使用

为了方便理解,这不,我又发挥了以下我的动画绘制功底,写了一个动图:

countdownlatch

如上图,左边三只小熊,可以当成三个线程,每一只撞到栏杆,计数器就减1,这相当于执行了countDown方法;

右边有两只暴走小熊在等待计数器变为0,可以当成两个线程,执行了await方法;

最终左边三只暴走小熊抵达了栏杆处,计数器变为0,唤醒了右边的暴走小熊,暴走小熊就开始动起来了。

1.1、执行原理

CountDownLatch是基于AQS共享模式的使用。

如下图,我们通过给CountDownLatch构造函数传入state的值。

countDown方法本质是释放共享锁,核心实现逻辑是:state>0 && state-1,如果state>0,则state减一,否则执行失败;

await方法本质是获取共享锁,核心实现是:getState()==0,如果state==0,则表示获取成功,否则线程阻塞进入等待队列;

image-20200317233628586

当state减到0的时候,会唤醒等待队列中的所有线程,尝试继续获取共享锁,这个时候正常是所有线程都能获取成功的。

1.2、使用案例

三个线程共同拉取一块数据,每个线程拉取数据块的一部分,等到所有线程的数据都拉取过来之后,另一个处理线程再开始这个数据块。

下载线程:

 1class Downloader implements Runnable{
2
3    private CountDownLatch latch;
4
5    private String downloaderName;
6
7    /**
8     * 构造函数
9     * @param downLatch 注意, 所有需要协作的线程需要使用同一个闭锁
10     * @param downloaderName
11     */

12    public Downloader(CountDownLatch downLatch, String downloaderName){
13        this.latch = downLatch;
14        this.downloaderName = downloaderName;
15    }
16
17    public void run() {
18        this.download();
19
20        try {
21            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
22        } catch (InterruptedException e) {
23            System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
24        }
25        System.out.println(this.downloaderName + "下载完成...");
26        this.latch.countDown();
27    }
28
29    private void download(){
30        System.out.println(this.downloaderName + "正在下载文件...");
31    }
32
33}

数据处理线程:

 1class DataProcessor implements Runnable {
2
3    private CountDownLatch latch;
4
5    /**
6     * 构造函数
7     * @param latch 注意, 所有需要协作的线程需要使用同一个闭锁
8     */

9    public DataProcessor(CountDownLatch latch){
10        this.latch = latch;
11    }
12
13    public void run() {
14        System.out.println("等待下载完数据...");
15        try {
16            this.latch.await();
17        } catch (InterruptedException e) {
18            System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
19        }
20        System.out.println("数据下载完成, 开始处理数据...");
21    }
22
23}

运行代码:

 1ExecutorService executor = Executors.newFixedThreadPool(4);
2
3CountDownLatch latch = new CountDownLatch(3);
4
5Downloader d1 = new Downloader(latch, "下载线程1");
6Downloader d2 = new Downloader(latch, "下载线程2");
7Downloader d3 = new Downloader(latch, "下载线程3");
8
9DataProcessor processor = new DataProcessor(latch);
10executor.execute(d1);
11executor.execute(d2);
12executor.execute(d3);
13executor.execute(processor);
14executor.shutdown();

执行结果:

1等待下载完数据...
2下载线程1正在下载文件...
3下载线程2正在下载文件...
4下载线程3正在下载文件...
5下载线程3下载完成...
6下载线程2下载完成...
7下载线程1下载完成...
8数据下载完成, 开始处理数据...

1.3、其他说明

类似的,我们也可以使用Thread.join方法实现控制线程执行顺序,但是没有那么灵活。如果我们把任务都丢到线程池里面多线程执行,那么就不能手动的在一个线程里面调用另一个线程的join方法了。

2、栅栏 CyclicBarrier

屏障,或成为栅栏,我们在之前 一文带你彻底理解同步和锁的本质(干货) 一文有讨论到过,主要是达到这种目的:所有线程都准备就绪,就着手下一阶段的工作,否则不能进入下一阶段。

我们还是让上面的小熊来演示一下。

cyclicbarrier

上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。

2.1、执行原理

CyclicBarrier是基于ReentrantLock的Condition来实现的。

如下图,栅栏中有两个关键属性:

  • parties:栅栏计数器初始值

  • count:栅栏计数器

其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:

  • 获取ReentrantLock锁;

  • count减1,如果此时count为0,那么唤醒等待队列中所有线程,并结束这一轮处理,重置屏障,否则进入下一步;

  • 执行condition.await方法,把当前线程丢到条件队列;

  • 当count减少到0的时候,执行condition.signalAll方法把条件队列中的所有线程节点都移动到等待队列;

  • 最后唤醒同步队列中的线程节点,线程从condition.await阻塞处醒来继续执行:获取ReentrantLock锁,用当前线程节点替换旧的头节点,最终放ReentrantLock锁,继续让线程往下执行(每个线程依次获取、锁释放锁)。如下图:

image-20200319235023688

await()能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit),以及reset()方法重新开启下一轮,具体大家可以看源码的实现。

2.2、使用案例

下面的案例模拟了赛跑,只有当所有运动员都在起跑线上准备好了,才允许他们开跑:

 1public class CyclicBarrierTest {
2
3    public static void main(String[] args) {
4        CyclicBarrier barrier = new CyclicBarrier(3);
5        ExecutorService executor = Executors.newFixedThreadPool(3);
6        executor.submit(new Runner(barrier, "1号选手"));
7        executor.submit(new Runner(barrier, "2号选手"));
8        executor.submit(new Runner(barrier, "3号选手"));
9        barrier.reset();
10        executor.submit(new Runner(barrier, "4号选手"));
11        executor.submit(new Runner(barrier, "5号选手"));
12        executor.submit(new Runner(barrier, "6号选手"));
13        executor.shutdown();
14    }
15}
16
17class Runner implements Runnable {
18
19    /**
20     * 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
21     */

22    private CyclicBarrier barrier;
23
24    private String name;
25
26    public Runner(CyclicBarrier barrier, String name) {
27        super();
28        this.barrier = barrier;
29        this.name = name;
30    }
31
32    @Override
33    public void run() {
34        try {
35            Thread.sleep(1000 * (new Random()).nextInt(8));
36            System.out.println(name + " 准备好了...");
37            // barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
38            barrier.await();
39        } catch (InterruptedException | BrokenBarrierException e) {
40            e.printStackTrace();
41        }
42        System.out.println(name + " 起跑!");
43        for (int i = 0; i < 10; i++) {
44            System.out.println(this.name + "正在跑步" + i);
45        }
46    }
47}

2.3、其他说明

CountDownLatch是若干个线程等待另外n个线程完成某件事之后才能执行;而CyclicBarrier是若干个线程互相等待,只有等到所有线程都执行了await只会,这若干个线程才可以继续往下执行

3、信号量 Semaphore

信号量通过一组许可证来控制对共享资源的访问。

如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程优惠醒来尝试获取许可。

Semaphore提供给了若干个api对应不同的功能:

  • Semaphore(int permits):非公平模式创建;

  • Semaphore(int permits, boolean fair):可以指定是否公平模式创建;

  • acquire():尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待;

  • acquire(int permits):跟上一个方法类型,尝试获取permits个许可;

  • acquireUninterruptibly():尝试获取一个许可,不可中断;

  • acquireUninterruptibly(int permits):尝试获取permits个许可,不可中断;

  • tryAcquire():尝试获取一个许可,获取不到则直接返回失败;

  • tryAcquire(int permits):尝试获取permits个许可,获取不到则直接返回失败;

  • tryAcquire(int permits, long timeout, TimeUnit unit):尝试在timeout时间内获取permits个许可,超时则返回false,可被中断;

  • tryAcquire(long timeout, TimeUnit unit):尝试在timeout时间内获取1个许可,超时则返回false,可被中断;

  • release():释放一个许可;

  • release(int permits):释放n个许可;

下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly()

semaphore

这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。

3.1、执行原理

Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:

  • 执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;

  • 执行release的时候,state + releases,把许可加回去。

image-20200321181406717

3.2、使用案例

下面演示了使用semaphore实现限流的机制,模拟20个客户端线程尝试执行业务逻辑,同一时刻最多只有5个线程能够并发的执行。

 1// 线程池
2ExecutorService exec = Executors.newCachedThreadPool();
3// 只能5个线程同时访问
4final Semaphore semp = new Semaphore(5);
5// 模拟20个客户端访问
6for (int index = 0; index < 20; index++) {
7  final int NO = index;
8  Runnable run = () -> {
9    try {
10      // 获取许可
11      if(semp.tryAcquire()) {
12        System.out.println("线程获得许可: " + NO);
13        Thread.sleep((long) (Math.random() * 10000));
14        // 访问完后,释放
15        semp.release();
16      } else {
17        System.out.println("达到并发上限,请求失败,请稍后再试");
18      }
19
20    } catch (InterruptedException e) {
21      System.out.println("执行异常");
22    }
23  };
24  exec.execute(run);
25}
26// 退出线程池
27exec.shutdown();

注意,这里使用的是tryAcquire失败之后直接返回,线程不会进入AQS等待队列。

4、阻塞队列 BlockingQueue

4.1、BlockingQueue的基本原理

我们先来解释一下阻塞队列:

image-20200322114947811

如上图

  • 生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;

  • 消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。

阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。

阻塞队列的常用方法

我们查阅BlockingQueue总结了以下阻塞队列的方法:

注意:

根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。

以上支持阻塞和超时的方法都是能够响应中断的。

4.2、BlockingQueue的实现

作为一个五年级的小学生,到这里就开始听不懂了,暗中嘚瑟,还好学的没那么快。

下图展示了主要的BlockingQueue的实现类:

image-20200323212959261

其实我们在前面的文章:ReentrantLock介绍与使用#2.4、条件变量 章节已经使用ReentrantLock的Condition实现了一个阻塞队列,底层是使用LinkedList进行存储的。

BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。

下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。

5、ArrayBlockingQueue

相信经过上面各个同步类的分析,大家已经对AQS比较熟悉了,下面我将不再画具体的内部结构图了。

ArrayBlockingQueue使用的数据结构是数组

Object[capacity]

容量大小有构造函数的capacity参数决定。

5.1、put方法

 1public void put(E e) throws InterruptedException {
2  checkNotNull(e);
3  // 获取ReentrantLock锁
4  final ReentrantLock lock = this.lock;
5  lock.lockInterruptibly();
6  try {
7    // 如果队列满了,则进入条件队列进行等待
8    while (count == items.length)
9      notFull.await();
10    // 队列不满,或者被取数线程唤醒了,那么会继续执行
11    // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程
12    enqueue(e);
13  } finally {
14    // 释放ReentrantLock锁
15    lock.unlock();
16  }
17}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;

  2. 队列满了会阻塞进入条件队列等待;

  3. 队列不满则添加数据,并且唤醒等待时间最长的取数线程。

5.2、take方法

获取小顶堆最小的元素,获取之后会重新构造小顶堆。

 1public E take() throws InterruptedException {
2  // 获取ReentrantLock锁
3  final ReentrantLock lock = this.lock;
4  lock.lockInterruptibly();
5  try {
6    // 如果队列空了,则进入条件队列进行等待
7    while (count == 0)
8      notEmpty.await();
9    // 队列不空,或者被存数线程唤醒了,那么会继续执行
10    // 这里会从阻塞队列取一个数据,然后唤醒等待时间最长的存数线程
11    return dequeue();
12  } finally {
13    // 释放ReentrantLock锁
14    lock.unlock();
15  }
16}
  1. 只有获取到了ReentrantLock锁之后,才可以操作队列;

  2. 队列空了会阻塞进入条件队列等待;

  3. 队列不满则取数据,并且唤醒等待时间最长的存数线程。

注意:ArrayList中的数据取数和存数都是依次遍历一个一个取或者存,直到队尾之后,从头开始继续。代码如下:

 1private void enqueue(E x) {
2  final Object[] items = this.items;
3  items[putIndex] = x;
4  if (++putIndex == items.length)
5    putIndex = 0;
6  count++;
7  notEmpty.signal();
8}
9
10private E dequeue() {
11  final Object[] items = this.items;
12  @SuppressWarnings("unchecked")
13  E x = (E) items[takeIndex];
14  items[takeIndex] = null;
15  if (++takeIndex == items.length)
16    takeIndex = 0;
17  count--;
18  if (itrs != null)
19    itrs.elementDequeued();
20  notFull.signal();
21  return x;
22}

如下图:

image-20200322153507868

这里put和take使用了同一个ReentrantLock,不能并发执行。

有没有办法能够做到让put和take能够并发执行呢?接下来我们就来看看LinkedBlockingQueue。

6、LinkedBlockingQueue

LinkedBlockingQueue的put方法和take方法分别使用了不同的ReentrantLock,put和take可以并发执行,但是不能并发执行put或者take操作。

LinkedBlockingQueue底层使用的数据结构是单向链表

transient Node head;

private transient Node last;

容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE

6.1、put方法

 1public void put(E e) throws InterruptedException {
2  if (e == nullthrow new NullPointerException();
3  int c = -1;
4  Node<E> node = new Node<E>(e);
5  final ReentrantLock putLock = this.putLock;
6  // 使用AtomicInteger保证原子性
7  final AtomicInteger count = this.count;
8  // 获取put锁
9  putLock.lockInterruptibly();
10  try {
11    // 如果队列满了,则进入put条件队列等待
12    while (count.get() == capacity) {
13      notFull.await();
14    }
15    // 队列不满,或者被取数线程唤醒了,那么会继续执行
16    // 这里会往阻塞队列末尾添加一个数据
17    enqueue(node);
18    c = count.getAndIncrement();
19    // 如果队列不满,则唤醒等待时间最长的put线程
20    if (c + 1 < capacity)
21      notFull.signal();
22  } finally {
23    // 释放put锁
24    putLock.unlock();
25  }
26  // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
27  if (c == 0)
28    signalNotEmpty();
29}

6.2、take方法

 1public E take() throws InterruptedException {
2  E x;
3  int c = -1;
4  final AtomicInteger count = this.count;
5  final ReentrantLock takeLock = this.takeLock;
6  // 获取take锁
7  takeLock.lockInterruptibly();
8  try {
9    // 如果队列空了,则进入take条件队列等待
10    while (count.get() == 0) {
11      notEmpty.await();
12    }
13    // 获取到第一个节点,非哑节点
14    x = dequeue();
15    // 阻塞队列数量减1
16    c = count.getAndDecrement();
17    // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
18    if (c > 1)
19      notEmpty.signal();
20  } finally {
21    // 释放take锁
22    takeLock.unlock();
23  }
24  // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
25  if (c == capacity)
26    signalNotFull();
27  return x;
28}

take和put操作如下图所示:

image-20200322154519699
  1. 队列第一个节点为哑节点,占位用的;

  2. put操作一直往链表后面追加节点;

  3. take操作从链表头取节点;

ArrayBlockingQueue与LinkedBlockingQueue对比

ArrayBlockingQueue

  • 数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;

LinkedBlockingQueue

  • 数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;

  • 两把锁,并发性能较好;

  • 可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。

7、LinkedBlockingDeque

与LinkedBlockingQueue类似,只不过底层的数据结构是双向链表,并且增加了可以从队列两端插入和移除元素的方法,支持FIFOFILO。相关方法定义:

  • putFirst(E e)

  • putLast(E e)

  • E getFirst()

  • E getLast()

  • E takeFirst()

  • E takeLast()

LinkedBlockingQueue与LinkedBlockingDeque对比

LinkedBlockingQueue:

  • FIFO;

  • 读写分开两个ReentrantLock;

LinkedBlockingDeque:

  • FIFO & FILO;

  • 全局一把ReentrantLock;

8、PriorityBlockingQueue

是一个无界队列

存储结构:

private transient Object[] queue;

内部会构造为一颗平衡的二叉小顶堆,根据构造函数中传入的Comparator进行排序或者没有传的情况下使用自然的排序方法,数组的第一个元素为最小的元素。

全局一把ReentrantLock锁。

8.1、put方法

无界队列,一定可以添加成功,无需阻塞。容量不够则扩容,put完会重新构建小顶堆。关键代码如下:

 1public boolean offer(E e) {
2  if (e == null)
3    throw new NullPointerException();
4  final ReentrantLock lock = this.lock;
5  // 尝试获取锁
6  lock.lock();
7  int n, cap;
8  Object[] array;
9  // 如果数组空间不够,尝试扩容:通常会扩大约50%
10  while ((n = size) >= (cap = (array = queue).length))
11    tryGrow(array, cap);
12  try {
13    // 往小顶堆插入元素
14    Comparator<? super E> cmp = comparator;
15    if (cmp == null)
16      siftUpComparable(n, e, array);
17    else
18      siftUpUsingComparator(n, e, array, cmp);
19    // 元素个数+1
20    size = n + 1;
21    // 唤醒等待最久的取数线程
22    notEmpty.signal();
23  } finally {
24    // 释放锁
25    lock.unlock();
26  }
27  return true;
28}

跟前面的各种阻塞队列实现思路基本一致,这里比较有意思的是数组的扩容和往小顶堆插入元素的处理逻辑,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

8.2、take方法

队列为空的时候进入条件等待,take完元素之后,立刻重新构建小顶堆。

 1public E take() throws InterruptedException {
2  final ReentrantLock lock = this.lock;
3  // 获取锁
4  lock.lockInterruptibly();
5  E result;
6  try {
7    // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
8    while ( (result = dequeue()) == null)
9      notEmpty.await();
10  } finally {
11    // 释放锁
12    lock.unlock();
13  }
14  return result;
15}

这里比较有趣的是dequeue()方法,涉及到取最小元素,然后重新排序,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。

9、SynchronousQueue

通过使用SynchronousQueue,我们可以在线程之间安全的传递变量,A线程把需要传递的变量放入SynchronousQueue,B线程读取。该队列特点如下:

  • 容量永远为0;

  • put操作阻塞,直到另一个线程取走了队列中的元素;

  • take操作阻塞,直到另一个线程put一个元素到队列中;

  • 任何线程只能取得其他线程put进去的元素。

与其他阻塞队列不同的是,SynchronousQueue不依赖与AQS实现,而是直接使用CAS操作实现的,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理。

我们不推荐使用的无界线程池Executors.newCachedThreadPool()底层就是用到了SynchronousQueue来实现的。

SynchronousQueue具有公平模式和非公平模式的区别,两者的实现不太一样,接下来就介绍一下。

9.1、公平模式

公平模式下,底层的数据结构是一个单向链表,对应实现类为:TransferQueue。

底层数据结构与LinkedBlockingQueue类似,只不过阻塞的条件不同

  • LinkedBlockingQueue在队列满的时候put线程会阻塞,在队列空的时候,take线程会阻塞;

  • SynchronousQueue put进去的元素没有被take的时候,put线程阻塞,take线程获取不到元素的时候,take线程阻塞;

如下图,刚开始有三个线程执行了put操作,都阻塞等待了:

image-20200323232118531

然后有一个新的线程4执行了take操作,这里是FIFO队列,匹配上了线程1,于是线程1取了线程1节点的数据,然后同时唤醒了线程1,头节点向前推进:

image-20200323232234349

这种FIFO的模式真是公平的体现。

大致执行流程就是这样子,比使用了AQS的简单,取而代之的是使用CAS,通过大量的检验节点是否变更和处理,以达到更高put和take的性能,不过代码就自然会变得很复杂了,感兴趣的朋友可以前往查看源码,这里就不做详细的解读了。

9.2、非公平模式

非公平模式,底层的数据结构是一个栈。代码也是比较复杂的,这里我直接用图来描述下其执行原理。

如下图,线程1、线程2、线程3依次执行put操作,入栈情况如下图,结果三个线程都阻塞了:

image-20200324000849491

这个时候线程4执行take操作,会入栈,与栈顶的栈帧进行匹配:

image-20200324001604302

匹配成功之后,唤醒匹配上的线程3,然后从栈中移除线程3和线程4

image-20200324001533658

可以发现非公平模式下是LIFO的队列。

10、DelayQueue

延迟队列,提供给了在指定时间内才能获取队列元素的功能。

底层是通过PriorityQueue实现的:

  • put元素,触发Delayed接口的compareTo方法重新排序PriorityQueue小顶堆,让最小的元素(最快到期)排在最前面;一定会put成功,容量不够则扩容;

  • take元素,判断第一个元素是否到期,到期了则把原始poll出来(同时会重新构造小顶堆),否则会执行awaitNanos(delay),等待头节点元素到期之后,再重新获取元素。

image-20200326231540134

11、各种阻塞锁对比

12、使用案例

12.1、案例一:生产者消费者

下面是一个使用案例,使用到了阻塞队列和CountDownLatch。这个程序模拟:

  • 一个生产者线程,往阻塞队列中依次存入20条消息;

  • 一个消费者线程,一直尝试从阻塞线程中取出消息进行消费。

您可以使用上一节中 4.2.1~4.2.4中的任何一个阻塞队列替换掉代码中的阻塞队列,尝试看看效果:

 1public class BlockingQueueTest {
2
3    public static void main(String[] args) {
4        ExecutorService executor = Executors.newFixedThreadPool(2);
5        // 这里可以替换为你想试用的阻塞队列
6        BlockingQueue<String> queue = new PriorityBlockingQueue<>();
7        executor.submit(new Producer(queue));
8        executor.submit(new Cunsumer(queue));
9        executor.shutdown();
10    }
11}
12
13/**
14 * 生产者线程,往阻塞队列中依次存入20条消息
15 */

16class Producer implements Runnable{
17
18    private BlockingQueue<String> queue;
19
20    Producer(BlockingQueue<String> queue) {
21        this.queue = queue;
22    }
23
24    @Override
25    public void run() {
26        try {
27            for (int i = 0; i < 20; i++){
28                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
29                String threadName = Thread.currentThread().getName();
30                String msg = threadName + " : " + (i + 1);
31                queue.put(msg);
32                System.out.println("producer: " + msg);
33            }
34            System.out.println("producer finished...");
35        } catch (InterruptedException e) {
36            e.printStackTrace();
37        }
38    }
39
40}
41
42/**
43 * 消费者线程,一直尝试从阻塞线程中取出消息进行消费
44 */

45class Cunsumer implements Runnable{
46
47    private BlockingQueue<String> queue;
48
49    Cunsumer(BlockingQueue<String> queue) {
50        this.queue = queue;
51    }
52
53    @Override
54    public void run() {
55        try {
56            while (true){
57                TimeUnit.MILLISECONDS.sleep(new Random().nextInt(5000));
58                String msg = queue.take();
59                System.out.println("consumer " + Thread.currentThread().getName() + ", msg : " + msg);
60            }
61        } catch (Exception e) {
62            e.printStackTrace();
63        }
64    }
65
66}

12.2、案例二:优先级阻塞队列

下面是一个PriorityBlockingQueue的使用例子,可以发现每次put一个元素之后会自动排序,小顶堆第一个元素总是最小的那个,每次take出来的元素也是最小的,take出来之后也会再次自动排序:

 1public class PriorityBlockingQueueTest {
2
3    public static void main(String[] args) throws InterruptedException {
4
5        PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>();
6        queue.put(new UserInfo(10"User1"));
7        System.out.println("priorityBlockingQueue: " + queue);
8
9        queue.put(new UserInfo(5"User2"));
10        System.out.println("priorityBlockingQueue: " + queue);
11
12        queue.put(new UserInfo(2,"User3"));
13        System.out.println("priorityBlockingQueue: " + queue);
14
15        queue.put(new UserInfo(4,"User4"));
16        System.out.println("priorityBlockingQueue: " + queue);
17
18        System.out.println("take data: " + queue.take());
19        System.out.println("priorityBlockingQueue: " + queue);
20
21        System.out.println("take data: " + queue.take());
22        System.out.println("priorityBlockingQueue: " + queue);
23    }
24
25}
26
27@Data
28@NoArgsConstructor
29@AllArgsConstructor
30class UserInfo implements Comparable<UserInfo>{
31
32    private int id;
33
34    private String name;
35
36    @Override
37    public String toString() {
38        return this.id + "";
39    }
40    @Override
41    public int compareTo(UserInfo person) {
42        return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
43    }
44}

输出结果:

1priorityBlockingQueue: [10]
2priorityBlockingQueue: [510]
3priorityBlockingQueue: [2105]
4priorityBlockingQueue: [24510]
5take data: 2
6priorityBlockingQueue: [4105]
7take data: 4
8priorityBlockingQueue: [510]

12.3、案例三:SynchronousQueue的使用

下面使用SynchronousQueue的使用案例。大家可以替换成公平模式或者非公平模式,来查看程序输出结果,看看是FIFO还是LIFO:

 1ExecutorService executor = Executors.newFixedThreadPool(10);
2// 构造函数参数设置公平模式或者非公平模式
3SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);
4
5Runnable producer = () -> {
6  Integer producedElement = ThreadLocalRandom
7    .current()
8    .nextInt();
9  try {
10    System.out.println(Thread.currentThread().getName() + " put " + producedElement);
11    queue.put(producedElement);
12    System.out.println(Thread.currentThread().getName() + " put finished");
13  } catch (InterruptedException ex) {
14    ex.printStackTrace();
15  }
16};
17
18Runnable consumer = () -> {
19  try {
20    TimeUnit.SECONDS.sleep(1);
21    Integer consumedElement = queue.take();
22    System.out.println(Thread.currentThread().getName() + " take " + consumedElement);
23  } catch (InterruptedException ex) {
24    ex.printStackTrace();
25  }
26};
27
28executor.execute(producer);
29executor.execute(producer);
30executor.execute(producer);
31executor.execute(consumer);
32
33executor.shutdown();
34System.out.println(queue.size());

References

A Guide to Java SynchronousQueue

这篇文章的内容就差不多介绍到这里了,能够阅读到这里的朋友真的是很有耐心,为你点个赞。

本文为arthinking基于相关技术资料和官方文档撰写而成,确保内容的准确性,如果你发现了有何错漏之处,烦请高抬贵手帮忙指正,万分感激。

大家可以关注我的博客:itzhai.com 获取更多文章,我将持续更新后端相关技术,涉及JVM、Java基础、架构设计、网络编程、数据结构、数据库、算法、并发编程、分布式系统等相关内容。

点赞收藏
arthinking
请先登录,查看2条精彩评论吧
快去登录吧,你将获得
  • 浏览更多精彩评论
  • 和开发者讨论交流,共同进步
3
2