1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.hbase.procedure2.util; 20 21 import java.util.concurrent.locks.Condition; 22 import java.util.concurrent.locks.ReentrantLock; 23 import java.util.concurrent.TimeUnit; 24 25 import org.apache.hadoop.hbase.classification.InterfaceAudience; 26 import org.apache.hadoop.hbase.classification.InterfaceStability; 27 28 @InterfaceAudience.Private 29 @InterfaceStability.Evolving 30 public class TimeoutBlockingQueue<E> { 31 public static interface TimeoutRetriever<T> { getTimeout(T object)32 long getTimeout(T object); getTimeUnit(T object)33 TimeUnit getTimeUnit(T object); 34 } 35 36 private final ReentrantLock lock = new ReentrantLock(); 37 private final Condition waitCond = lock.newCondition(); 38 private final TimeoutRetriever<? super E> timeoutRetriever; 39 40 private E[] objects; 41 private int head = 0; 42 private int tail = 0; 43 TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever)44 public TimeoutBlockingQueue(TimeoutRetriever<? super E> timeoutRetriever) { 45 this(32, timeoutRetriever); 46 } 47 48 @SuppressWarnings("unchecked") TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever)49 public TimeoutBlockingQueue(int capacity, TimeoutRetriever<? super E> timeoutRetriever) { 50 this.objects = (E[])new Object[capacity]; 51 this.timeoutRetriever = timeoutRetriever; 52 } 53 dump()54 public void dump() { 55 for (int i = 0; i < objects.length; ++i) { 56 if (i == head) { 57 System.out.print("[" + objects[i] + "] "); 58 } else if (i == tail) { 59 System.out.print("]" + objects[i] + "[ "); 60 } else { 61 System.out.print(objects[i] + " "); 62 } 63 } 64 System.out.println(); 65 } 66 clear()67 public void clear() { 68 lock.lock(); 69 try { 70 if (head != tail) { 71 for (int i = head; i < tail; ++i) { 72 objects[i] = null; 73 } 74 head = 0; 75 tail = 0; 76 waitCond.signal(); 77 } 78 } finally { 79 lock.unlock(); 80 } 81 } 82 add(E e)83 public void add(E e) { 84 if (e == null) throw new NullPointerException(); 85 86 lock.lock(); 87 try { 88 addElement(e); 89 waitCond.signal(); 90 } finally { 91 lock.unlock(); 92 } 93 } 94 95 @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") poll()96 public E poll() { 97 lock.lock(); 98 try { 99 if (isEmpty()) { 100 waitCond.await(); 101 return null; 102 } 103 104 E elem = objects[head]; 105 long nanos = getNanosTimeout(elem); 106 nanos = waitCond.awaitNanos(nanos); 107 return nanos > 0 ? null : removeFirst(); 108 } catch (InterruptedException e) { 109 Thread.currentThread().interrupt(); 110 return null; 111 } finally { 112 lock.unlock(); 113 } 114 } 115 size()116 public int size() { 117 return tail - head; 118 } 119 isEmpty()120 public boolean isEmpty() { 121 return (tail - head) == 0; 122 } 123 signalAll()124 public void signalAll() { 125 lock.lock(); 126 try { 127 waitCond.signalAll(); 128 } finally { 129 lock.unlock(); 130 } 131 } 132 addElement(E elem)133 private void addElement(E elem) { 134 int size = (tail - head); 135 if ((objects.length - size) == 0) { 136 int capacity = size + ((size < 64) ? (size + 2) : (size >> 1)); 137 E[] newObjects = (E[])new Object[capacity]; 138 139 if (compareTimeouts(objects[tail - 1], elem) <= 0) { 140 // Append 141 System.arraycopy(objects, head, newObjects, 0, tail); 142 tail -= head; 143 newObjects[tail++] = elem; 144 } else if (compareTimeouts(objects[head], elem) > 0) { 145 // Prepend 146 System.arraycopy(objects, head, newObjects, 1, tail); 147 newObjects[0] = elem; 148 tail -= (head - 1); 149 } else { 150 // Insert in the middle 151 int index = upperBound(head, tail - 1, elem); 152 int newIndex = (index - head); 153 System.arraycopy(objects, head, newObjects, 0, newIndex); 154 newObjects[newIndex] = elem; 155 System.arraycopy(objects, index, newObjects, newIndex + 1, tail - index); 156 tail -= (head - 1); 157 } 158 head = 0; 159 objects = newObjects; 160 } else { 161 if (tail == objects.length) { 162 // shift down |-----AAAAAAA| 163 tail -= head; 164 System.arraycopy(objects, head, objects, 0, tail); 165 head = 0; 166 } 167 168 if (tail == head || compareTimeouts(objects[tail - 1], elem) <= 0) { 169 // Append 170 objects[tail++] = elem; 171 } else if (head > 0 && compareTimeouts(objects[head], elem) > 0) { 172 // Prepend 173 objects[--head] = elem; 174 } else { 175 // Insert in the middle 176 int index = upperBound(head, tail - 1, elem); 177 System.arraycopy(objects, index, objects, index + 1, tail - index); 178 objects[index] = elem; 179 tail++; 180 } 181 } 182 } 183 removeFirst()184 private E removeFirst() { 185 E elem = objects[head]; 186 objects[head] = null; 187 head = (head + 1) % objects.length; 188 if (head == 0) tail = 0; 189 return elem; 190 } 191 upperBound(int start, int end, E key)192 private int upperBound(int start, int end, E key) { 193 while (start < end) { 194 int mid = (start + end) >>> 1; 195 E mitem = objects[mid]; 196 int cmp = compareTimeouts(mitem, key); 197 if (cmp > 0) { 198 end = mid; 199 } else { 200 start = mid + 1; 201 } 202 } 203 return start; 204 } 205 compareTimeouts(final E a, final E b)206 private int compareTimeouts(final E a, final E b) { 207 long t1 = getNanosTimeout(a); 208 long t2 = getNanosTimeout(b); 209 return (t1 < t2) ? -1 : (t1 > t2) ? 1 : 0; 210 } 211 getNanosTimeout(final E obj)212 private long getNanosTimeout(final E obj) { 213 TimeUnit unit = timeoutRetriever.getTimeUnit(obj); 214 long timeout = timeoutRetriever.getTimeout(obj); 215 return unit.toNanos(timeout); 216 } 217 } 218