ArrayBlockingQueue

BlockingQueue的核心方法

boolean add(E e) // 把e添加到BlockingQueue里。如果BlockingQueue可以容纳,则返回true,否则抛出异常。
boolean offer(E e) // 表示如果可能的话,将e加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
void put(E e) // 把e添加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续。
E poll(long timeout, TimeUnit unit)  // 取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。
E take() // 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,则调用此方法的线程被阻塞直到BlockingQueue有新的数据被加入。
int drainTo(Collection c)
int drainTo(Collection c, int maxElements)   // 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁。

ArrayBlockingQueue源码分析

ArrayBlockingQueue创建的时候需要指定容量capacity(可以存储的最大的元素个数,因为它不会自动扩容),其中一个构造方法为:

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = (E[]) new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

ArrayBlockingQueue类中定义的变量有:

    /** The queued items  */
    private final E[] items;
    /** items index for next take, poll or remove */
    private int takeIndex;
    /** items index for next put, offer, or add. */
    private int putIndex;
    /** Number of items in the queue */
    private int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

使用数组items来存储元素,由于是循环队列,使用takeIndex和putIndex来标记put和take的位置。可以看到,该类中只定义了一个锁ReentrantLock,定义两个Condition对象:notEmputy和notFull,分别用来对take和put操作进行所控制。

put(E e)方法的源码如下。进行put操作之前,必须获得锁并进行加锁操作,以保证线程安全性。加锁后,若发现队列已满,则调用notFull.await()方法,如当前线程陷入等待。直到其他线程take走某个元素后,会调用notFull.signal()方法来激活该线程。激活之后,继续下面的插入操作。

/**
  * Inserts the specified element at the tail of this queue, waiting
  * for space to become available if the queue is full.
  *
  */
public void put(E e) throws InterruptedException {
    //不能存放 null  元素
    if (e == null) throw new NullPointerException();
    final E[] items = this.items;	//数组队列
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
        try {
            //当队列满时,调用notFull.await()方法,使该线程阻塞。
            //直到take掉某个元素后,调用notFull.signal()方法激活该线程。
            while (count == items.length)
                notFull.await();
        } catch (InterruptedException ie) {
            notFull.signal(); // propagate to non-interrupted thread
            throw ie;
        }
        //把元素 e 插入到队尾
        insert(e);
    } finally {
        //解锁
        lock.unlock();
    }
}

insert(E e) 方法如下:

/**
  * Inserts element at current put position, advances, and signals.
  * Call only when holding lock.
  */
private void insert(E x) {
    items[putIndex] = x;  
    //下标加1或者等于0
    putIndex = inc(putIndex);
    ++count;  //计数加1
    //若有take()线程陷入阻塞,则该操作激活take()线程,继续进行取元素操作。
    //若没有take()线程陷入阻塞,则该操作无意义。
    notEmpty.signal();
}
	
/**
  * Circularly increment i.
  */
final int inc(int i) {
    //此处可以看到使用了循环队列
    return (++i == items.length)? 0 : i;
}

This chapter requires login to view full content. You are viewing a preview.

Login to View Full Content

Course Curriculum

3

框架与 I/O:Spring、Netty 与 Web 容器

理解 Spring Boot 自动装配、AOP 与事务原理,掌握 Netty Reactor 模型及 Tomcat 连接处理机制,构建高内聚、易扩展的应用服务层。
4

高性能中间件:消息、缓存与存储

熟练运用 MySQL 索引/事务、Redis 缓存策略、Kafka/RocketMQ 消息可靠性,以及 ZooKeeper 分布式协调,搭建稳定、解耦的分布式数据底座。
6

云原生:容器化、可观测性与工程效能

通过 Docker/K8s 实现弹性部署,集成 Metrics/Logs/Traces 构建可观测体系,推动 DevOps 与自动化,让架构在云上持续交付与进化。