16、Java 集合:Queue 之 DelayQueue、PriorityQueue、PriorityBlockingQueue
16、Java 集合:Queue 之 DelayQueue、PriorityQueue、PriorityBlockingQueue
##DelayQueue
public class DelayQueue extends AbstractQueue implements BlockingQueue
1、 Delayed 元素的一个基于优先级的无界阻塞队列,只有在延迟期满时才能从中提取元素;
2、 如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null;
3、 不允许使用 null 元素;
成员变量
/**
* 可重入的互斥锁
*/
private final transient ReentrantLock lock = new ReentrantLock();
/**
* 一个基于优先级堆的无界优先级队列。可自然排序。不允许使用 null 元素。
*/
private final PriorityQueue\<E\> q = new PriorityQueue\<E\>();
/**
* 唤醒或等待take线程的条件
*/
private final Condition available = lock.newCondition();
构造方法
/**
* 创建一个最初为空的新 DelayQueue。
*/
public DelayQueue() {}
/**
* 创建一个最初包含 Delayed 实例的给定 collection 元素的 DelayQueue。
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
常用方法
boolean add(E e):将指定元素插入此延迟队列中。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):将指定元素插入此延迟队列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
E first = q.peek();//获取但不移除此队列的头部;如果此队列为空,则返回 null。
q.offer(e);//将指定的元素插入此优先级队列。
if (first == null || e.compareTo(first) < 0)
available.signalAll();//队列中无元素,唤醒take线程
return true;
} finally {
lock.unlock();//释放锁
}
}
Epeek():获取但不移除此队列的头部;如果此队列为空,则返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.peek();// 获取但不移除此队列的头;如果此队列为空,则返回 null。
} finally {
lock.unlock();//释放锁
}
}
Epoll(): 获取并移除此队列的头,如果此队列不包含具有已到期延迟时间的元素,则返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();// 获取但不移除此队列的头;如果此队列为空,则返回 null。
//getDelay:返回与此对象相关的剩余延迟时间,以给定的时间单位表示。
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;//队列为空或元素未到期
else {
E x = q.poll();//获取并移除此队列的头,如果此队列为空,则返回 null。
assert x != null;//非空校验
if (q.size() != 0)//队列中还有元素
available.signalAll();//唤醒take线程
return x;
}
} finally {
lock.unlock();
}
}
void put(E e):将指定元素插入此延迟队列。
public void put(E e) {
offer(e);
}
Etake():获取并移除此队列的头部,在可从此队列获得到期延迟的元素之前一直等待(如有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式获取锁,可响应线程中断
try {
for (;;) {
E first = q.peek();//获取但不移除此队列的头;如果此队列为空,则返回 null。
if (first == null) {
available.await();//队列为空,线程等待,释放锁
} else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//获取元素剩余到期市场,并判断是否到期
if (delay > 0) {
long tl = available.awaitNanos(delay);//元素还未到期,线程等待指定时间,释放锁
} else {
E x = q.poll();//获取并移除此队列的头,如果此队列为空,则返回 null。
assert x != null;//非空校验
if (q.size() != 0)
available.signalAll(); // 线程非空,唤醒其它所有take线程 return x;//返回队头元素
}
}
}
} finally {
lock.unlock();//释放锁
}
}
由源码看出延迟队列 DelayQueue 操作元素是通过 PriorityQueue 实现的,PriorityQueue 是一个基于优先级堆的无界优先级队列。利用可重入的互斥锁 ReentrantLock 保证线程安全,同时利用 Condition 保证插入或获取元素是阻塞的。
##PriorityQueue
public class PriorityQueue extends AbstractQueue implements java.io.Serializable
1、 一个基于优先级堆的无界优先级队列;
2、 优先级队列不允许使用 null 元素;
3、 默认容量 11,当元素容量小于 64 时,扩容 double,否则扩容 50%;
4、 不是同步的;
成员变量
/**
* 初始容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 元素以平衡二叉树形式存储
*/
private transient Object[] queue;
/**
* 优先级队列元素数
*/
private int size = 0;
/**
* 元素自然排序方式
*/
private final Comparator<? super E> comparator;
/**
* 优先级队列修改次数
*/
private transient int modCount = 0;
构造方法
/**
* 使用默认的初始容量(11),并根据其自然顺序对元素进行排序。
*/
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
/**
* 使用指定的初始容量创建Queue,并根据指定的比较器对元素进行排序。
*/
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.queue = new Object[initialCapacity];
this.comparator = comparator;
}
常用方法
boolean add(E e):将指定的元素插入此优先级队列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e): 将指定的元素插入此优先级队列。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;//修改次数+1
int i = size;
if (i >= queue.length)
grow(i + 1);//元素数可能不够,需要扩容
size = i + 1;
if (i == 0)//队列为空
queue[0] = e;
else
siftUp(i, e);
return true;
}
/**
* 队列扩容
*/
private void grow(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
int oldCapacity = queue.length;//获取当前元素数
// <64 双倍扩容; >=64 扩容 50%
int newCapacity = ((oldCapacity < 64)?
((oldCapacity + 1) * 2):
((oldCapacity / 2) * 3));
if (newCapacity < 0) // overflow
newCapacity = Integer.MAX_VALUE;//最大边界
if (newCapacity < minCapacity)
newCapacity = minCapacity;
queue = Arrays.copyOf(queue, newCapacity);//数组复制
}
//选择排序方式并插入相应位置
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
/**
* Comparable 方式排序
*/
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
/**
* Comparator 方式排序
*/
private void siftUpUsingComparator(int k, E x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (comparator.compare(x, (E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = x;
}
Epeek(): 获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
if (size == 0)
return null;
return (E) queue[0];
}
Epoll() :获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
if (size == 0)
return null;
int s = --size;//更新元素数
modCount++;//修改次数+1
E result = (E) queue[0];//获取队列头部元素
E x = (E) queue[s];//获取尾部元素
queue[s] = null;//末尾置空
if (s != 0)//队列中还有元素
siftDown(0, x);
return result;
}
/**
* 元素重新排序
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
/**
* Comparable方式重新排序
*/
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
/**
* Comparator方式重新排序
*/
private void siftDownUsingComparator(int k, E x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
Object c = queue[child];
int right = child + 1;
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];
if (comparator.compare(x, (E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = x;
}
由源码看出,PriorityQueue 是非线程安全的,利用 Comparator 或 Comparable 进行自然排序,从而实现有优先级的队列,以平衡二叉树的形式存储在 transient Object[] queue 中,虽然 api 中介绍说是无界队列,但从源码看出其实是有边界的 ,值为 Integer.MAX_VALUE;只是边界特别大,从某种程度上来说,可以理解为无边界。
##PriorityBlockingQueue
public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable
1、 一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作;
2、 此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError);
3、 不允许使用 null 元素;
4、 不允许插入不可比较的对象;
成员变量
//元素操作均基于PriorityQueue
private final PriorityQueue\<E\> q;
//可重入的互斥锁,该处采用公平锁
private final ReentrantLock lock = new ReentrantLock(true);
//take条件
private final Condition notEmpty = lock.newCondition();
构造方法
/**
* 用默认的初始容量 (11) 创建一个 PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序。
*/
public PriorityBlockingQueue() {
q = new PriorityQueue\<E\>();
}
/**
* 使用指定的初始容量创建一个 PriorityBlockingQueue,并根据指定的比较器对其元素进行排序。
*/
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
q = new PriorityQueue\<E\>(initialCapacity, comparator);
}
/**
* 使用指定的初始容量创建一个 PriorityBlockingQueue,并根据元素的自然顺序对其元素进行排序。
*/
public PriorityBlockingQueue(int initialCapacity) {
q = new PriorityQueue\<E\>(initialCapacity, null);
}
/**
* 创建一个包含指定 collection 元素的 PriorityBlockingQueue。
*/
public PriorityBlockingQueue(Collection<? extends E> c) {
q = new PriorityQueue\<E\>(c);
}
常用方法
boolean add(E e):将指定元素插入此优先级队列。
public boolean add(E e) {
return offer(e);
}
boolean offer(E e):将指定元素插入此优先级队列。
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
boolean ok = q.offer(e);//插入元素
assert ok;
notEmpty.signal();//唤醒take线程
return true;
} finally {
lock.unlock();//释放锁
}
}
Epeek():获取但不移除此队列的头;如果此队列为空,则返回 null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.peek();//获取元素
} finally {
lock.unlock();//释放锁
}
}
Epoll():获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();//阻塞式获取锁
try {
return q.poll();//获取并移除此队列的头
} finally {
lock.unlock();//释放锁
}
}
Etake():获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞式获取锁,可响应线程中断
try {
try {
while (q.size() == 0)
notEmpty.await();//队列中无元素,阻塞等待,释放锁
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();//有元素,获取并移除元素
assert x != null;
return x;//返回元素
} finally {
lock.unlock();//释放锁
}
}
void put(E e):将指定元素插入此优先级队列。
public void put(E e) {
offer(e); // never need to block
}
有源码看出,PriorityBlockingQueue 是基于 PriorityQueue 实现具有优先级的无界阻塞队列,利用 ReentrantLock 实现线程安全,Condition 实现阻塞。
DelayQueue 及 PriorityBlockingQueue 实现具有优先级的无界阻塞队列都是基于 PriorityQueue 的,区别在于 DelayQueue 加入了延迟概念。虽说都是无界,但最大边界为:Integer.MAX_VALUE。