public class ArrayBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable { /** 使用数组存储队列中的元素 */ final Object[] items; /** 使用独占锁ReetrantLock */ final ReentrantLock lock; /** 等待出队的条件 */ private final Condition notEmpty; /** 等待入队的条件 */ private final Condition notFull; /** 初始化时 , 需要指定队列大小 */ public ArrayBlockingQueue(int capacity) {this(capacity, false);}/** 初始化时,也指出指定是否为公平锁 , */public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull =lock.newCondition();}/**入队操作*/public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}/**出队操作*/public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}}ArrayBlockingQueue采用ReentrantLock进行加锁,只有一个ReentrantLock对象 , 这意味着生产者和消费者无法并行运行 。
我们看一个简单的示例代码如下:
public class Container {/*** 初始化阻塞队列*/private final BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);/*** 添加数据到阻塞队列* @param value*/public void add(Integer value) {try {queue.put(value);System.out.println("生产者:"+ Thread.currentThread().getName()+",add:" + value);} catch (InterruptedException e) {e.printStackTrace();}}/*** 从阻塞队列获取数据*/public void get() {try {Integer value = https://www.isolves.com/it/cxkf/jiagou/2023-12-15/queue.take();System.out.println("消费者:"+ Thread.currentThread().getName()+" , value:" + value);} catch (InterruptedException e) {e.printStackTrace();}}}/** * 生产者 */public class Producer extends Thread {private Container container;public Producer(Container container) {this.container = container;}@Overridepublic void run() {for (int i = 0; i < 6; i++) {container.add(i);}}}/** * 消费者 */public class Consumer extends Thread {private Container container;public Consumer(Container container) {this.container = container;}@Overridepublic void run() {for (int i = 0; i < 6; i++) {container.get();}}}/** * 测试类 */public class MyThreadTest {public static void main(String[] args) {Container container = new Container();Producer producer = new Producer(container);Consumer consumer = new Consumer(container);producer.start();consumer.start();}}运行结果如下:
生产者:Thread-0,add:0生产者:Thread-0,add:1生产者:Thread-0 , add:2生产者:Thread-0 , add:3生产者:Thread-0,add:4生产者:Thread-0 , add:5消费者:Thread-1,value:0消费者:Thread-1,value:1消费者:Thread-1 , value:2消费者:Thread-1,value:3消费者:Thread-1,value:4消费者:Thread-1,value:5可以很清晰的看到,生产者线程执行完毕之后,消费者线程才开始消费 。
3.2、LinkedBlockingQueueLinkedBlockingQueue是一个基于链表的阻塞队列,初始化的时候无须指定队列大小,默认队列长度为Integer.MAX_VALUE,也就是 int 型最大值 。
同样的,采用的是ReentrantLock和Condition实现生产者和消费者模型,不同的是它使用了两个lock,这意味着生产者和消费者可以并行运行,程序执行效率进一步得到提升 。
部分核心源码如下:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {/** 使用出队独占锁ReetrantLock */private final ReentrantLock takeLock = new ReentrantLock();/** 等待出队的条件 */private final Condition notEmpty = takeLock.newCondition();/** 使用入队独占锁ReetrantLock */private final ReentrantLock putLock = new ReentrantLock();/** 等待入队的条件 */private final Condition notFull = putLock.newCondition();/**入队操作*/public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}if (c == 0)signalNotEmpty();}/**出队操作*/public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;takeLock.lockInterruptibly();try {while (count.get() == 0) {notEmpty.await();}x = dequeue();c = count.getAndDecrement();if (c > 1)notEmpty.signal();} finally {takeLock.unlock();}if (c == capacity)signalNotFull();return x;}}
推荐阅读
- 45岁刘涛彻底放开!喇叭裤直接开叉到大腿根,“玉女腿”同龄少见
- 林更新迎来大爆发,两部新剧均定档央视,演艺事业将彻底腾飞!
- 这一夜,张天爱用“假屁股”,彻底扯下了内娱女明星的遮羞布
- 善恶终有报!56岁“惨遭封杀、移民国外”的那英,彻底活成了笑话
- 赵丽颖的毛孔,宋轶的黑头,没了滤镜这些明星彻底“暴露”了
- 逆水寒官宣彻底放弃“多货币机制”,网友:真向梦幻西游看齐了?
- 这一回,“遭到舒淇翻白眼”的虞书欣,彻底见识了娱乐圈的“残酷”
- “勾栏从来扮高雅”,这一次左小青的“面具”,被刀郎彻底撕烂
- 一人毁了整部剧!万万没想到,36岁刘诗诗会以这种方式彻底翻车
- 一文读懂常用的 “生成式 AI 框架”
