1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.hbase.procedure2.util;
20 
21 import java.util.concurrent.locks.Condition;
22 import java.util.concurrent.locks.ReentrantLock;
23 import java.util.concurrent.TimeUnit;
24 
25 import org.apache.hadoop.hbase.classification.InterfaceAudience;
26 import org.apache.hadoop.hbase.classification.InterfaceStability;
27 
28 @InterfaceAudience.Private
29 @InterfaceStability.Evolving
30 public class TimeoutBlockingQueue<E> {
31   public static interface TimeoutRetriever<T> {
getTimeout(T object)32     long getTimeout(T object);
getTimeUnit(T object)33     TimeUnit getTimeUnit(T object);
34   }
35 
36   private final ReentrantLock lock = new ReentrantLock();
37   private final Condition waitCond = lock.newCondition();
38   private final TimeoutRetriever<? super E> timeoutRetriever;
39 
40   private E[] objects;
41   private int head = 0;
42   private int tail = 0;
43 
TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever)44   public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) {
45     this(32, timeoutRetriever);
46   }
47 
48   @SuppressWarnings("unchecked")
TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever)49   public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) {
50     this.objects = (E[])new Object[capacity];
51     this.timeoutRetriever = timeoutRetriever;
52   }
53 
dump()54   public void dump() {
55     for (int i = 0; i < objects.length; ++i) {
56       if (i == head) {
57         System.out.print("[" + objects[i] + "] ");
58       } else if (i == tail) {
59         System.out.print("]" + objects[i] + "[ ");
60       } else {
61         System.out.print(objects[i] + " ");
62       }
63     }
64     System.out.println();
65   }
66 
clear()67   public void clear() {
68     lock.lock();
69     try {
70       if (head != tail) {
71         for (int i = head; i < tail; ++i) {
72           objects[i] = null;
73         }
74         head = 0;
75         tail = 0;
76         waitCond.signal();
77       }
78     } finally {
79       lock.unlock();
80     }
81   }
82 
add(E e)83   public void add(E e) {
84     if (e == null) throw new NullPointerException();
85 
86     lock.lock();
87     try {
88       addElement(e);
89       waitCond.signal();
90     } finally {
91       lock.unlock();
92     }
93   }
94 
95   @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
poll()96   public E poll() {
97     lock.lock();
98     try {
99       if (isEmpty()) {
100         waitCond.await();
101         return null;
102       }
103 
104       E elem = objects[head];
105       long nanos = getNanosTimeout(elem);
106       nanos = waitCond.awaitNanos(nanos);
107       return nanos > 0 ? null : removeFirst();
108     } catch (InterruptedException e) {
109       Thread.currentThread().interrupt();
110       return null;
111     } finally {
112       lock.unlock();
113     }
114   }
115 
size()116   public int size() {
117     return tail - head;
118   }
119 
isEmpty()120   public boolean isEmpty() {
121     return (tail - head) == 0;
122   }
123 
signalAll()124   public void signalAll() {
125     lock.lock();
126     try {
127       waitCond.signalAll();
128     } finally {
129       lock.unlock();
130     }
131   }
132 
addElement(E elem)133   private void addElement(E elem) {
134     int size = (tail - head);
135     if ((objects.length - size) == 0) {
136       int capacity = size + ((size < 64) ? (size + 2) : (size >> 1));
137       E[] newObjects = (E[])new Object[capacity];
138 
139       if (compareTimeouts(objects[tail - 1], elem) <= 0) {
140         // Append
141         System.arraycopy(objects, head, newObjects, 0, tail);
142         tail -= head;
143         newObjects[tail++] = elem;
144       } else if (compareTimeouts(objects[head], elem) > 0) {
145         // Prepend
146         System.arraycopy(objects, head, newObjects, 1, tail);
147         newObjects[0] = elem;
148         tail -= (head - 1);
149       } else {
150         // Insert in the middle
151         int index = upperBound(head, tail - 1, elem);
152         int newIndex = (index - head);
153         System.arraycopy(objects, head, newObjects, 0, newIndex);
154         newObjects[newIndex] = elem;
155         System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index);
156         tail -= (head - 1);
157       }
158       head = 0;
159       objects = newObjects;
160     } else {
161       if (tail == objects.length) {
162         // shift down |-----AAAAAAA|
163         tail -= head;
164         System.arraycopy(objects, head, objects, 0, tail);
165         head = 0;
166       }
167 
168       if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) {
169         // Append
170         objects[tail++] = elem;
171       } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) {
172         // Prepend
173         objects[--head] = elem;
174       } else {
175         // Insert in the middle
176         int index = upperBound(head, tail - 1, elem);
177         System.arraycopy(objects, index, objects, index + 1, tail - index);
178         objects[index] = elem;
179         tail++;
180       }
181     }
182   }
183 
removeFirst()184   private E removeFirst() {
185     E elem = objects[head];
186     objects[head] = null;
187     head = (head + 1) % objects.length;
188     if (head == 0) tail = 0;
189     return elem;
190   }
191 
upperBound(int start, int end, E key)192   private int upperBound(int start, int end, E key) {
193     while (start < end) {
194       int mid = (start + end) >>> 1;
195       E mitem = objects[mid];
196       int cmp = compareTimeouts(mitem, key);
197       if (cmp > 0) {
198         end = mid;
199       } else {
200         start = mid + 1;
201       }
202     }
203     return start;
204   }
205 
compareTimeouts(final E a, final E b)206   private int compareTimeouts(final E a, final E b) {
207     long t1 = getNanosTimeout(a);
208     long t2 = getNanosTimeout(b);
209     return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0;
210   }
211 
getNanosTimeout(final E obj)212   private long getNanosTimeout(final E obj) {
213     TimeUnit unit = timeoutRetriever.getTimeUnit(obj);
214     long timeout = timeoutRetriever.getTimeout(obj);
215     return unit.toNanos(timeout);
216   }
217 }
218