几张图彻底搞懂几个好玩的并发辅助类和阻塞队列原创
去年问我怎么学Java的那个五年级小学生又来向我问问题了,说Java中线程同步有没有好用的工具类。幸亏没有问我什么算法,瑟瑟发抖。这个我倒是还挺在行的,了解到他喜欢熊出没,于是我就用熊作为主角,给他分享了几个Java并发框架中的辅助工具类。(呼~松了一口气)
看文本片文章,你将了解到:
-
CountDownLatch的工作原理和使用场景;
-
CyclicBarrier的工作原理和使用场景;
-
Semaphore的工作原理和使用场景;
-
对各种阻塞队列的实现原理都有一定的了解,以及了解他们的使用场景。
注意,如果您没有阅读过之前发的 一文读懂并发包中的读写锁和Condition实现原理 强烈建议您先阅读,有助于理解本文中的内容。
1、闭锁 CountDownLatch
一个同步工具类,允许一个或者多个线程一直等待,直到其他线程的操作都执行完成之后再继续往下执行。
使用场景:在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。这个时候就可以使用CountDownLatch。CountDownLatch最重要的方法是countDown()
和await()
,前者主要是计数减一,后者是等待计数到0,如果没有到达0,就继续阻塞等待。
方法详细介绍:CountDownLatch的介绍和使用
为了方便理解,这不,我又发挥了以下我的动画绘制功底,写了一个动图:
如上图,左边三只小熊,可以当成三个线程,每一只撞到栏杆,计数器就减1,这相当于执行了countDown方法;
右边有两只暴走小熊在等待计数器变为0,可以当成两个线程,执行了await方法;
最终左边三只暴走小熊抵达了栏杆处,计数器变为0,唤醒了右边的暴走小熊,暴走小熊就开始动起来了。
1.1、执行原理
CountDownLatch是基于AQS共享模式的使用。
如下图,我们通过给CountDownLatch构造函数传入state的值。
countDown方法本质是释放共享锁,核心实现逻辑是:state>0 && state-1,如果state>0,则state减一,否则执行失败;
await方法本质是获取共享锁,核心实现是:getState()==0,如果state==0,则表示获取成功,否则线程阻塞进入等待队列;
当state减到0的时候,会唤醒等待队列中的所有线程,尝试继续获取共享锁,这个时候正常是所有线程都能获取成功的。
1.2、使用案例
三个线程共同拉取一块数据,每个线程拉取数据块的一部分,等到所有线程的数据都拉取过来之后,另一个处理线程再开始这个数据块。
下载线程:
1class Downloader implements Runnable{
2
3 private CountDownLatch latch;
4
5 private String downloaderName;
6
7 /**
8 * 构造函数
9 * @param downLatch 注意, 所有需要协作的线程需要使用同一个闭锁
10 * @param downloaderName
11 */
12 public Downloader(CountDownLatch downLatch, String downloaderName){
13 this.latch = downLatch;
14 this.downloaderName = downloaderName;
15 }
16
17 public void run() {
18 this.download();
19
20 try {
21 TimeUnit.SECONDS.sleep(new Random().nextInt(10));
22 } catch (InterruptedException e) {
23 System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
24 }
25 System.out.println(this.downloaderName + "下载完成...");
26 this.latch.countDown();
27 }
28
29 private void download(){
30 System.out.println(this.downloaderName + "正在下载文件...");
31 }
32
33}
数据处理线程:
1class DataProcessor implements Runnable {
2
3 private CountDownLatch latch;
4
5 /**
6 * 构造函数
7 * @param latch 注意, 所有需要协作的线程需要使用同一个闭锁
8 */
9 public DataProcessor(CountDownLatch latch){
10 this.latch = latch;
11 }
12
13 public void run() {
14 System.out.println("等待下载完数据...");
15 try {
16 this.latch.await();
17 } catch (InterruptedException e) {
18 System.out.println("Thread interrupt status: " + Thread.currentThread().isInterrupted());
19 }
20 System.out.println("数据下载完成, 开始处理数据...");
21 }
22
23}
运行代码:
1ExecutorService executor = Executors.newFixedThreadPool(4);
2
3CountDownLatch latch = new CountDownLatch(3);
4
5Downloader d1 = new Downloader(latch, "下载线程1");
6Downloader d2 = new Downloader(latch, "下载线程2");
7Downloader d3 = new Downloader(latch, "下载线程3");
8
9DataProcessor processor = new DataProcessor(latch);
10executor.execute(d1);
11executor.execute(d2);
12executor.execute(d3);
13executor.execute(processor);
14executor.shutdown();
执行结果:
1等待下载完数据...
2下载线程1正在下载文件...
3下载线程2正在下载文件...
4下载线程3正在下载文件...
5下载线程3下载完成...
6下载线程2下载完成...
7下载线程1下载完成...
8数据下载完成, 开始处理数据...
1.3、其他说明
类似的,我们也可以使用Thread.join
方法实现控制线程执行顺序,但是没有那么灵活。如果我们把任务都丢到线程池里面多线程执行,那么就不能手动的在一个线程里面调用另一个线程的join方法了。
2、栅栏 CyclicBarrier
屏障,或成为栅栏,我们在之前 一文带你彻底理解同步和锁的本质(干货) 一文有讨论到过,主要是达到这种目的:所有线程都准备就绪,就着手下一阶段的工作,否则不能进入下一阶段。
我们还是让上面的小熊来演示一下。
上面5只小熊,准备跑到起跑线,跑到起跑线等待,相当于执行了await方法,等到所有小熊准备就绪之后,然后一起开跑。这就很好的揭示了内存屏障的作用了。
2.1、执行原理
CyclicBarrier是基于ReentrantLock的Condition来实现的。
如下图,栅栏中有两个关键属性:
-
parties:栅栏计数器初始值
-
count:栅栏计数器
其中CyclicBarrier的await()方法封装了对ReentrantLock条件锁的使用,主要处理流程:
-
获取ReentrantLock锁;
-
count减1,如果此时count为0,那么唤醒等待队列中所有线程,并结束这一轮处理,重置屏障,否则进入下一步;
-
执行condition.await方法,把当前线程丢到条件队列;
-
当count减少到0的时候,执行condition.signalAll方法把条件队列中的所有线程节点都移动到等待队列;
-
最后唤醒同步队列中的线程节点,线程从condition.await阻塞处醒来继续执行:获取ReentrantLock锁,用当前线程节点替换旧的头节点,最终放ReentrantLock锁,继续让线程往下执行(每个线程依次获取、锁释放锁)。如下图:
await()
能够响应中断。除此之外,await还提供了带有超时的实现await(long timeout, TimeUnit unit)
,以及reset()
方法重新开启下一轮,具体大家可以看源码的实现。
2.2、使用案例
下面的案例模拟了赛跑,只有当所有运动员都在起跑线上准备好了,才允许他们开跑:
1public class CyclicBarrierTest {
2
3 public static void main(String[] args) {
4 CyclicBarrier barrier = new CyclicBarrier(3);
5 ExecutorService executor = Executors.newFixedThreadPool(3);
6 executor.submit(new Runner(barrier, "1号选手"));
7 executor.submit(new Runner(barrier, "2号选手"));
8 executor.submit(new Runner(barrier, "3号选手"));
9 barrier.reset();
10 executor.submit(new Runner(barrier, "4号选手"));
11 executor.submit(new Runner(barrier, "5号选手"));
12 executor.submit(new Runner(barrier, "6号选手"));
13 executor.shutdown();
14 }
15}
16
17class Runner implements Runnable {
18
19 /**
20 * 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)
21 */
22 private CyclicBarrier barrier;
23
24 private String name;
25
26 public Runner(CyclicBarrier barrier, String name) {
27 super();
28 this.barrier = barrier;
29 this.name = name;
30 }
31
32 @Override
33 public void run() {
34 try {
35 Thread.sleep(1000 * (new Random()).nextInt(8));
36 System.out.println(name + " 准备好了...");
37 // barrier的await方法,在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
38 barrier.await();
39 } catch (InterruptedException | BrokenBarrierException e) {
40 e.printStackTrace();
41 }
42 System.out.println(name + " 起跑!");
43 for (int i = 0; i < 10; i++) {
44 System.out.println(this.name + "正在跑步" + i);
45 }
46 }
47}
2.3、其他说明
CountDownLatch是若干个线程等待另外n个线程完成某件事之后才能执行;而CyclicBarrier是若干个线程互相等待,只有等到所有线程都执行了await只会,这若干个线程才可以继续往下执行。
3、信号量 Semaphore
信号量通过一组许可证来控制对共享资源的访问。
如果需要,可以用acquire()方法获取许可,如果许可为0,那么会进行阻塞,通过使用release()方法释放许可,把许可归还给Semaphore,归还之后,阻塞的线程优惠醒来尝试获取许可。
Semaphore提供给了若干个api对应不同的功能:
-
Semaphore(int permits)
:非公平模式创建; -
Semaphore(int permits, boolean fair)
:可以指定是否公平模式创建; -
acquire()
:尝试获取1个许可,如果没有许可则阻塞,可以被中断停止等待; -
acquire(int permits)
:跟上一个方法类型,尝试获取permits个许可; -
acquireUninterruptibly()
:尝试获取一个许可,不可中断; -
acquireUninterruptibly(int permits)
:尝试获取permits个许可,不可中断; -
tryAcquire()
:尝试获取一个许可,获取不到则直接返回失败; -
tryAcquire(int permits)
:尝试获取permits个许可,获取不到则直接返回失败; -
tryAcquire(int permits, long timeout, TimeUnit unit)
:尝试在timeout时间内获取permits个许可,超时则返回false,可被中断; -
tryAcquire(long timeout, TimeUnit unit)
:尝试在timeout时间内获取1个许可,超时则返回false,可被中断; -
release()
:释放一个许可; -
release(int permits)
:释放n个许可;
下面演示基于公平锁的Semaphore,获取锁使用acquireUninterruptibly()
:
这里设置的许可为2,可以发现,同一时刻最多只能有两个线程获得许可。
3.1、执行原理
Semaphore的执行原理相对来说比较简单。下面描述了可中断非公平的信号量实现原理,ASQ中的state值就相当于许可的数量:
-
执行acquire的时候,会尝试让state - acquires,如果发现许可足够,则进行cas更新,扣减许可,否则线程进入等待队列;
-
执行release的时候,state + releases,把许可加回去。
3.2、使用案例
下面演示了使用semaphore实现限流的机制,模拟20个客户端线程尝试执行业务逻辑,同一时刻最多只有5个线程能够并发的执行。
1// 线程池
2ExecutorService exec = Executors.newCachedThreadPool();
3// 只能5个线程同时访问
4final Semaphore semp = new Semaphore(5);
5// 模拟20个客户端访问
6for (int index = 0; index < 20; index++) {
7 final int NO = index;
8 Runnable run = () -> {
9 try {
10 // 获取许可
11 if(semp.tryAcquire()) {
12 System.out.println("线程获得许可: " + NO);
13 Thread.sleep((long) (Math.random() * 10000));
14 // 访问完后,释放
15 semp.release();
16 } else {
17 System.out.println("达到并发上限,请求失败,请稍后再试");
18 }
19
20 } catch (InterruptedException e) {
21 System.out.println("执行异常");
22 }
23 };
24 exec.execute(run);
25}
26// 退出线程池
27exec.shutdown();
注意,这里使用的是
tryAcquire
失败之后直接返回,线程不会进入AQS等待队列。
4、阻塞队列 BlockingQueue
4.1、BlockingQueue的基本原理
我们先来解释一下阻塞队列:
如上图
-
生产线程1往阻塞队列里面添加新的数据,当阻塞队列满的时候(针对有界队列),生产线程1将会处于阻塞状态,直到消费线程2从队列中取走一个数据;
-
消费线程2从阻塞队列取数据,当阻塞队列空的时候,消费线程2将会处于阻塞状态,直到生产线程把一个数据放进去。
阻塞队列的基本原理就这样,至于队列是用什么数据结构进行存储的,这里并没有规定,所以后面我们可以看到很多阻塞队列的实现。
阻塞队列的常用方法
我们查阅BlockingQueue总结了以下阻塞队列的方法:
注意:
根据remove(Object o)方法签名可知,这个方法可以移除队列的特定对象,但是这个方法效率并不高。因为需要遍历队列匹配到特定的对象之后,再进行移除。
以上支持阻塞和超时的方法都是能够响应中断的。
4.2、BlockingQueue的实现
作为一个五年级的小学生,到这里就开始听不懂了,暗中嘚瑟,还好学的没那么快。
下图展示了主要的BlockingQueue的实现类:
其实我们在前面的文章:ReentrantLock介绍与使用#2.4、条件变量 章节已经使用ReentrantLock的Condition实现了一个阻塞队列,底层是使用LinkedList进行存储的。
BlockingQueue底层也是基于AQS实现的,队列的阻塞使用ReentrantLock的Condition实现的。
下面我们来看看各个实现类的原理。以下分析我都会基于支持阻塞的put和take方法来分析。
5、ArrayBlockingQueue
相信经过上面各个同步类的分析,大家已经对AQS比较熟悉了,下面我将不再画具体的内部结构图了。
ArrayBlockingQueue使用的数据结构是数组:
Object[capacity]
容量大小有构造函数的capacity参数决定。
5.1、put方法
1public void put(E e) throws InterruptedException {
2 checkNotNull(e);
3 // 获取ReentrantLock锁
4 final ReentrantLock lock = this.lock;
5 lock.lockInterruptibly();
6 try {
7 // 如果队列满了,则进入条件队列进行等待
8 while (count == items.length)
9 notFull.await();
10 // 队列不满,或者被取数线程唤醒了,那么会继续执行
11 // 这里会往阻塞队列添加一个数据,然后唤醒等待时间最长的取数线程
12 enqueue(e);
13 } finally {
14 // 释放ReentrantLock锁
15 lock.unlock();
16 }
17}
-
只有获取到了ReentrantLock锁之后,才可以操作队列;
-
队列满了会阻塞进入条件队列等待;
-
队列不满则添加数据,并且唤醒等待时间最长的取数线程。
5.2、take方法
获取小顶堆最小的元素,获取之后会重新构造小顶堆。
1public E take() throws InterruptedException {
2 // 获取ReentrantLock锁
3 final ReentrantLock lock = this.lock;
4 lock.lockInterruptibly();
5 try {
6 // 如果队列空了,则进入条件队列进行等待
7 while (count == 0)
8 notEmpty.await();
9 // 队列不空,或者被存数线程唤醒了,那么会继续执行
10 // 这里会从阻塞队列取一个数据,然后唤醒等待时间最长的存数线程
11 return dequeue();
12 } finally {
13 // 释放ReentrantLock锁
14 lock.unlock();
15 }
16}
-
只有获取到了ReentrantLock锁之后,才可以操作队列;
-
队列空了会阻塞进入条件队列等待;
-
队列不满则取数据,并且唤醒等待时间最长的存数线程。
注意:ArrayList中的数据取数和存数都是依次遍历一个一个取或者存,直到队尾之后,从头开始继续。代码如下:
1private void enqueue(E x) {
2 final Object[] items = this.items;
3 items[putIndex] = x;
4 if (++putIndex == items.length)
5 putIndex = 0;
6 count++;
7 notEmpty.signal();
8}
9
10private E dequeue() {
11 final Object[] items = this.items;
12 @SuppressWarnings("unchecked")
13 E x = (E) items[takeIndex];
14 items[takeIndex] = null;
15 if (++takeIndex == items.length)
16 takeIndex = 0;
17 count--;
18 if (itrs != null)
19 itrs.elementDequeued();
20 notFull.signal();
21 return x;
22}
如下图:
这里put和take使用了同一个ReentrantLock,不能并发执行。
有没有办法能够做到让put和take能够并发执行呢?接下来我们就来看看LinkedBlockingQueue。
6、LinkedBlockingQueue
LinkedBlockingQueue的put方法和take方法分别使用了不同的ReentrantLock,put和take可以并发执行,但是不能并发执行put或者take操作。
LinkedBlockingQueue底层使用的数据结构是单向链表:
transient Node head;
private transient Node last;
容量大小可以由构造函数的capacity设定,默认为:Integer.MAX_VALUE
6.1、put方法
1public void put(E e) throws InterruptedException {
2 if (e == null) throw new NullPointerException();
3 int c = -1;
4 Node<E> node = new Node<E>(e);
5 final ReentrantLock putLock = this.putLock;
6 // 使用AtomicInteger保证原子性
7 final AtomicInteger count = this.count;
8 // 获取put锁
9 putLock.lockInterruptibly();
10 try {
11 // 如果队列满了,则进入put条件队列等待
12 while (count.get() == capacity) {
13 notFull.await();
14 }
15 // 队列不满,或者被取数线程唤醒了,那么会继续执行
16 // 这里会往阻塞队列末尾添加一个数据
17 enqueue(node);
18 c = count.getAndIncrement();
19 // 如果队列不满,则唤醒等待时间最长的put线程
20 if (c + 1 < capacity)
21 notFull.signal();
22 } finally {
23 // 释放put锁
24 putLock.unlock();
25 }
26 // 如果队列为空,再次获取put锁,然后唤醒等待时间最长的put线程
27 if (c == 0)
28 signalNotEmpty();
29}
6.2、take方法
1public E take() throws InterruptedException {
2 E x;
3 int c = -1;
4 final AtomicInteger count = this.count;
5 final ReentrantLock takeLock = this.takeLock;
6 // 获取take锁
7 takeLock.lockInterruptibly();
8 try {
9 // 如果队列空了,则进入take条件队列等待
10 while (count.get() == 0) {
11 notEmpty.await();
12 }
13 // 获取到第一个节点,非哑节点
14 x = dequeue();
15 // 阻塞队列数量减1
16 c = count.getAndDecrement();
17 // 如果阻塞队列数量不为空,那么唤醒等待时间最长的take线程
18 if (c > 1)
19 notEmpty.signal();
20 } finally {
21 // 释放take锁
22 takeLock.unlock();
23 }
24 // 如果队列满了,再次获取take锁,然后唤醒等待时间最长的take线程
25 if (c == capacity)
26 signalNotFull();
27 return x;
28}
take和put操作如下图所示:
-
队列第一个节点为哑节点,占位用的;
-
put操作一直往链表后面追加节点;
-
take操作从链表头取节点;
ArrayBlockingQueue与LinkedBlockingQueue对比
ArrayBlockingQueue:
-
数据结构:数组,存储空间预先分配,无需动态申请空间,使用过程中内存开销较小;
LinkedBlockingQueue:
-
数据结构:单项链表,存储空间动态申请,会增加JVM垃圾回收负担;
-
两把锁,并发性能较好;
-
可设置为无界,吞吐量比较大,但是不稳定,入队速度太快有可能导致内存溢出。
7、LinkedBlockingDeque
与LinkedBlockingQueue类似,只不过底层的数据结构是双向链表,并且增加了可以从队列两端插入和移除元素的方法,支持FIFO和FILO。相关方法定义:
-
putFirst(E e)
-
putLast(E e)
-
E getFirst()
-
E getLast()
-
E takeFirst()
-
E takeLast()
-
…
LinkedBlockingQueue与LinkedBlockingDeque对比
LinkedBlockingQueue:
-
FIFO;
-
读写分开两个ReentrantLock;
LinkedBlockingDeque:
-
FIFO & FILO;
-
全局一把ReentrantLock;
8、PriorityBlockingQueue
是一个无界队列
存储结构:
private transient Object[] queue;
内部会构造为一颗平衡的二叉小顶堆,根据构造函数中传入的Comparator进行排序或者没有传的情况下使用自然的排序方法,数组的第一个元素为最小的元素。
全局一把ReentrantLock锁。
8.1、put方法
无界队列,一定可以添加成功,无需阻塞。容量不够则扩容,put完会重新构建小顶堆。关键代码如下:
1public boolean offer(E e) {
2 if (e == null)
3 throw new NullPointerException();
4 final ReentrantLock lock = this.lock;
5 // 尝试获取锁
6 lock.lock();
7 int n, cap;
8 Object[] array;
9 // 如果数组空间不够,尝试扩容:通常会扩大约50%
10 while ((n = size) >= (cap = (array = queue).length))
11 tryGrow(array, cap);
12 try {
13 // 往小顶堆插入元素
14 Comparator<? super E> cmp = comparator;
15 if (cmp == null)
16 siftUpComparable(n, e, array);
17 else
18 siftUpUsingComparator(n, e, array, cmp);
19 // 元素个数+1
20 size = n + 1;
21 // 唤醒等待最久的取数线程
22 notEmpty.signal();
23 } finally {
24 // 释放锁
25 lock.unlock();
26 }
27 return true;
28}
跟前面的各种阻塞队列实现思路基本一致,这里比较有意思的是数组的扩容和往小顶堆插入元素的处理逻辑,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。
8.2、take方法
队列为空的时候进入条件等待,take完元素之后,立刻重新构建小顶堆。
1public E take() throws InterruptedException {
2 final ReentrantLock lock = this.lock;
3 // 获取锁
4 lock.lockInterruptibly();
5 E result;
6 try {
7 // 尝试获取最小元素,即小顶堆第一个元素,然后重新排序,如果不存在表示队列暂无元素,进行阻塞等待。
8 while ( (result = dequeue()) == null)
9 notEmpty.await();
10 } finally {
11 // 释放锁
12 lock.unlock();
13 }
14 return result;
15}
这里比较有趣的是dequeue()方法,涉及到取最小元素,然后重新排序,由于篇幅所限,这里不展开讲了,感兴趣的朋友可以前去了解下。
9、SynchronousQueue
通过使用SynchronousQueue,我们可以在线程之间安全的传递变量,A线程把需要传递的变量放入SynchronousQueue,B线程读取。该队列特点如下:
-
容量永远为0;
-
put操作阻塞,直到另一个线程取走了队列中的元素;
-
take操作阻塞,直到另一个线程put一个元素到队列中;
-
任何线程只能取得其他线程put进去的元素。
与其他阻塞队列不同的是,SynchronousQueue不依赖与AQS实现,而是直接使用CAS操作实现的,这导致代码中有大量的判断是否数据被并发改写了,并做相应的处理。
我们不推荐使用的无界线程池Executors.newCachedThreadPool()
底层就是用到了SynchronousQueue
来实现的。
SynchronousQueue具有公平模式和非公平模式的区别,两者的实现不太一样,接下来就介绍一下。
9.1、公平模式
公平模式下,底层的数据结构是一个单向链表,对应实现类为:TransferQueue。
底层数据结构与LinkedBlockingQueue类似,只不过阻塞的条件不同:
-
LinkedBlockingQueue在队列满的时候put线程会阻塞,在队列空的时候,take线程会阻塞;
-
SynchronousQueue put进去的元素没有被take的时候,put线程阻塞,take线程获取不到元素的时候,take线程阻塞;
如下图,刚开始有三个线程执行了put操作,都阻塞等待了:
然后有一个新的线程4执行了take操作,这里是FIFO队列,匹配上了线程1,于是线程1取了线程1节点的数据,然后同时唤醒了线程1,头节点向前推进:
这种FIFO的模式真是公平的体现。
大致执行流程就是这样子,比使用了AQS的简单,取而代之的是使用CAS,通过大量的检验节点是否变更和处理,以达到更高put和take的性能,不过代码就自然会变得很复杂了,感兴趣的朋友可以前往查看源码,这里就不做详细的解读了。
9.2、非公平模式
非公平模式,底层的数据结构是一个栈。代码也是比较复杂的,这里我直接用图来描述下其执行原理。
如下图,线程1、线程2、线程3依次执行put操作,入栈情况如下图,结果三个线程都阻塞了:
这个时候线程4执行take操作,会入栈,与栈顶的栈帧进行匹配:
匹配成功之后,唤醒匹配上的线程3,然后从栈中移除线程3和线程4
可以发现非公平模式下是LIFO的队列。
10、DelayQueue
延迟队列,提供给了在指定时间内才能获取队列元素的功能。
底层是通过PriorityQueue实现的:
-
put元素,触发Delayed接口的compareTo方法重新排序PriorityQueue小顶堆,让最小的元素(最快到期)排在最前面;一定会put成功,容量不够则扩容;
-
take元素,判断第一个元素是否到期,到期了则把原始poll出来(同时会重新构造小顶堆),否则会执行awaitNanos(delay),等待头节点元素到期之后,再重新获取元素。
11、各种阻塞锁对比
12、使用案例
12.1、案例一:生产者消费者
下面是一个使用案例,使用到了阻塞队列和CountDownLatch。这个程序模拟:
-
一个生产者线程,往阻塞队列中依次存入20条消息;
-
一个消费者线程,一直尝试从阻塞线程中取出消息进行消费。
您可以使用上一节中 4.2.1~4.2.4中的任何一个阻塞队列替换掉代码中的阻塞队列,尝试看看效果:
1public class BlockingQueueTest {
2
3 public static void main(String[] args) {
4 ExecutorService executor = Executors.newFixedThreadPool(2);
5 // 这里可以替换为你想试用的阻塞队列
6 BlockingQueue<String> queue = new PriorityBlockingQueue<>();
7 executor.submit(new Producer(queue));
8 executor.submit(new Cunsumer(queue));
9 executor.shutdown();
10 }
11}
12
13/**
14 * 生产者线程,往阻塞队列中依次存入20条消息
15 */
16class Producer implements Runnable{
17
18 private BlockingQueue<String> queue;
19
20 Producer(BlockingQueue<String> queue) {
21 this.queue = queue;
22 }
23
24 @Override
25 public void run() {
26 try {
27 for (int i = 0; i < 20; i++){
28 TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
29 String threadName = Thread.currentThread().getName();
30 String msg = threadName + " : " + (i + 1);
31 queue.put(msg);
32 System.out.println("producer: " + msg);
33 }
34 System.out.println("producer finished...");
35 } catch (InterruptedException e) {
36 e.printStackTrace();
37 }
38 }
39
40}
41
42/**
43 * 消费者线程,一直尝试从阻塞线程中取出消息进行消费
44 */
45class Cunsumer implements Runnable{
46
47 private BlockingQueue<String> queue;
48
49 Cunsumer(BlockingQueue<String> queue) {
50 this.queue = queue;
51 }
52
53 @Override
54 public void run() {
55 try {
56 while (true){
57 TimeUnit.MILLISECONDS.sleep(new Random().nextInt(5000));
58 String msg = queue.take();
59 System.out.println("consumer " + Thread.currentThread().getName() + ", msg : " + msg);
60 }
61 } catch (Exception e) {
62 e.printStackTrace();
63 }
64 }
65
66}
12.2、案例二:优先级阻塞队列
下面是一个PriorityBlockingQueue的使用例子,可以发现每次put一个元素之后会自动排序,小顶堆第一个元素总是最小的那个,每次take出来的元素也是最小的,take出来之后也会再次自动排序:
1public class PriorityBlockingQueueTest {
2
3 public static void main(String[] args) throws InterruptedException {
4
5 PriorityBlockingQueue<UserInfo> queue = new PriorityBlockingQueue<>();
6 queue.put(new UserInfo(10, "User1"));
7 System.out.println("priorityBlockingQueue: " + queue);
8
9 queue.put(new UserInfo(5, "User2"));
10 System.out.println("priorityBlockingQueue: " + queue);
11
12 queue.put(new UserInfo(2,"User3"));
13 System.out.println("priorityBlockingQueue: " + queue);
14
15 queue.put(new UserInfo(4,"User4"));
16 System.out.println("priorityBlockingQueue: " + queue);
17
18 System.out.println("take data: " + queue.take());
19 System.out.println("priorityBlockingQueue: " + queue);
20
21 System.out.println("take data: " + queue.take());
22 System.out.println("priorityBlockingQueue: " + queue);
23 }
24
25}
26
27@Data
28@NoArgsConstructor
29@AllArgsConstructor
30class UserInfo implements Comparable<UserInfo>{
31
32 private int id;
33
34 private String name;
35
36 @Override
37 public String toString() {
38 return this.id + "";
39 }
40 @Override
41 public int compareTo(UserInfo person) {
42 return this.id > person.getId() ? 1 : ( this.id < person.getId() ? -1 :0);
43 }
44}
输出结果:
1priorityBlockingQueue: [10]
2priorityBlockingQueue: [5, 10]
3priorityBlockingQueue: [2, 10, 5]
4priorityBlockingQueue: [2, 4, 5, 10]
5take data: 2
6priorityBlockingQueue: [4, 10, 5]
7take data: 4
8priorityBlockingQueue: [5, 10]
12.3、案例三:SynchronousQueue的使用
下面使用SynchronousQueue的使用案例。大家可以替换成公平模式或者非公平模式,来查看程序输出结果,看看是FIFO还是LIFO:
1ExecutorService executor = Executors.newFixedThreadPool(10);
2// 构造函数参数设置公平模式或者非公平模式
3SynchronousQueue<Integer> queue = new SynchronousQueue<>(false);
4
5Runnable producer = () -> {
6 Integer producedElement = ThreadLocalRandom
7 .current()
8 .nextInt();
9 try {
10 System.out.println(Thread.currentThread().getName() + " put " + producedElement);
11 queue.put(producedElement);
12 System.out.println(Thread.currentThread().getName() + " put finished");
13 } catch (InterruptedException ex) {
14 ex.printStackTrace();
15 }
16};
17
18Runnable consumer = () -> {
19 try {
20 TimeUnit.SECONDS.sleep(1);
21 Integer consumedElement = queue.take();
22 System.out.println(Thread.currentThread().getName() + " take " + consumedElement);
23 } catch (InterruptedException ex) {
24 ex.printStackTrace();
25 }
26};
27
28executor.execute(producer);
29executor.execute(producer);
30executor.execute(producer);
31executor.execute(consumer);
32
33executor.shutdown();
34System.out.println(queue.size());
References
A Guide to Java SynchronousQueue
这篇文章的内容就差不多介绍到这里了,能够阅读到这里的朋友真的是很有耐心,为你点个赞。
本文为arthinking基于相关技术资料和官方文档撰写而成,确保内容的准确性,如果你发现了有何错漏之处,烦请高抬贵手帮忙指正,万分感激。
大家可以关注我的博客:itzhai.com
获取更多文章,我将持续更新后端相关技术,涉及JVM、Java基础、架构设计、网络编程、数据结构、数据库、算法、并发编程、分布式系统等相关内容。