一文带你彻底掌握阻塞队列!( 三 )

把最上面的样例Container中的阻塞队列实现类换成LinkedBlockingQueue,调整如下:
/** * 初始化阻塞队列 */private final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();再次运行结果如下:
生产者:Thread-0,add:0消费者:Thread-1,value:0生产者:Thread-0 , add:1消费者:Thread-1,value:1生产者:Thread-0,add:2消费者:Thread-1,value:2生产者:Thread-0,add:3生产者:Thread-0,add:4生产者:Thread-0,add:5消费者:Thread-1 , value:3消费者:Thread-1 , value:4消费者:Thread-1,value:5可以很清晰的看到,生产者线程和消费者线程,交替并行执行 。
3.3、SynchronousQueueSynchronousQueue是一个没有缓冲的队列 , 生产者产生的数据直接会被消费者获取并且立刻消费,相当于传统的一个请求对应一个应答模式 。
相比ArrayBlockingQueue和LinkedBlockingQueue,SynchronousQueue实现机制也不同,它主要采用队列和栈来实现数据的传递,中间不存储任何数据 , 生产的数据必须得消费者处理,线程阻塞方式采用 JDK 提供的LockSupport park/unpark函数来完成,也支持公平和非公平两种模式 。

  • 当采用公平模式时:使用一个 FIFO 队列来管理多余的生产者和消费者
  • 当采用非公平模式时:使用一个 LIFO 栈来管理多余的生产者和消费者,这也是SynchronousQueue默认的模式
部分核心源码如下:
public class SynchronousQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/**不同的策略实现*/private transient volatile Transferer<E> transferer; /**默认非公平模式*/public SynchronousQueue() {this(false);}/**可以选策略,也可以采用公平模式*/public SynchronousQueue(boolean fair) {transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();} /**入队操作*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();if (transferer.transfer(e, false, 0) == null) {Thread.interrupted();throw new InterruptedException();}}/**出队操作*/public E take() throws InterruptedException {E e = transferer.transfer(null, false, 0);if (e != null)return e;Thread.interrupted();throw new InterruptedException();}}同样的,把最上面的样例Container中的阻塞队列实现类换成SynchronousQueue , 代码如下:
public class Container {/*** 初始化阻塞队列*/private final BlockingQueue<Integer> queue = new SynchronousQueue<>();/*** 添加数据到阻塞队列* @param value*/public void add(Integer value) {try {queue.put(value);Thread.sleep(100);System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);} catch (InterruptedException e) {e.printStackTrace();}}/*** 从阻塞队列获取数据*/public void get() {try {Integer value = https://www.isolves.com/it/cxkf/jiagou/2023-12-15/queue.take();Thread.sleep(200);System.out.println("消费者:"+ Thread.currentThread().getName()+",value:" + value);} catch (InterruptedException e) {e.printStackTrace();}}}再次运行结果如下:
生产者:Thread-0,add:0消费者:Thread-1,value:0生产者:Thread-0 , add:1消费者:Thread-1,value:1生产者:Thread-0,add:2消费者:Thread-1 , value:2生产者:Thread-0 , add:3消费者:Thread-1 , value:3生产者:Thread-0,add:4消费者:Thread-1,value:4生产者:Thread-0,add:5消费者:Thread-1,value:5可以很清晰的看到,生产者线程和消费者线程,交替串行执行 , 生产者每投递一条数据,消费者处理一条数据 。
3.4、PriorityBlockingQueuePriorityBlockingQueue是一个基于优先级别的阻塞队列 , 底层基于数组实现 , 可以认为是一个无界队列 。
PriorityBlockingQueue与ArrayBlockingQueue的实现逻辑,基本相似,也是采用ReentrantLock来实现加锁的操作 。
最大不同点在于:
  • 1.PriorityBlockingQueue内部基于数组实现的最小二叉堆算法 , 可以对队列中的元素进行排序,插入队列的元素需要实现Comparator或者Comparable接口,以便对元素进行排序
  • 2.其次,队列的长度是可扩展的 , 不需要显式指定长度,上限为Integer.MAX_VALUE - 8
部分核心源码如下:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/**队列元素*/private transient Object[] queue;/**比较器*/private transient Comparator<? super E> comparator;/**采用ReentrantLock进行加锁*/private final ReentrantLock lock;/**条件等待与通知*/private final Condition notEmpty;/**入队操作*/public boolean offer(E e) {if (e == null)throw new NullPointerException();final ReentrantLock lock = this.lock;lock.lock();int n, cap;Object[] array;while ((n = size) >= (cap = (array = queue).length))tryGrow(array, cap);try {Comparator<? super E> cmp = comparator;if (cmp == null)siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp);size = n + 1;notEmpty.signal();} finally {lock.unlock();}return true;}/**出队操作*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();E result;try {while ( (result = dequeue()) == null)notEmpty.await();} finally {lock.unlock();}return result;}}


推荐阅读