1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.metrics2.impl; 20 21 import java.util.ConcurrentModificationException; 22 23 /** 24 * A half-blocking (nonblocking for producers, blocking for consumers) queue 25 * for metrics sinks. 26 * 27 * New elements are dropped when the queue is full to preserve "interesting" 28 * elements at the onset of queue filling events 29 */ 30 class SinkQueue<T> { 31 // A fixed size circular buffer to minimize garbage 32 private final T[] data; 33 private int head; // head position 34 private int tail; // tail position 35 private int size; // number of elements 36 private Thread currentConsumer = null; 37 38 @SuppressWarnings("unchecked") SinkQueue(int capacity)39 SinkQueue(int capacity) { 40 this.data = (T[]) new Object[Math.max(1, capacity)]; 41 head = tail = size = 0; 42 } 43 enqueue(T e)44 synchronized boolean enqueue(T e) { 45 if (data.length == size) { 46 return false; 47 } 48 ++size; 49 tail = (tail + 1) % data.length; 50 data[tail] = e; 51 notify(); 52 return true; 53 } 54 55 /** 56 * Consume one element, will block if queue is empty 57 * Only one consumer at a time is allowed 58 * @param consumer the consumer callback object 59 */ consume(Consumer<T> consumer)60 void consume(Consumer<T> consumer) throws InterruptedException { 61 T e = waitForData(); 62 63 try { 64 consumer.consume(e); // can take forever 65 _dequeue(); 66 } 67 finally { 68 clearConsumer(); 69 } 70 } 71 72 /** 73 * Consume all the elements, will block if queue is empty 74 * @param consumer the consumer callback object 75 * @throws InterruptedException 76 */ consumeAll(Consumer<T> consumer)77 void consumeAll(Consumer<T> consumer) throws InterruptedException { 78 waitForData(); 79 80 try { 81 for (int i = size(); i-- > 0; ) { 82 consumer.consume(front()); // can take forever 83 _dequeue(); 84 } 85 } 86 finally { 87 clearConsumer(); 88 } 89 } 90 91 /** 92 * Dequeue one element from head of the queue, will block if queue is empty 93 * @return the first element 94 * @throws InterruptedException 95 */ dequeue()96 synchronized T dequeue() throws InterruptedException { 97 checkConsumer(); 98 99 while (0 == size) { 100 wait(); 101 } 102 return _dequeue(); 103 } 104 waitForData()105 private synchronized T waitForData() throws InterruptedException { 106 checkConsumer(); 107 108 while (0 == size) { 109 wait(); 110 } 111 currentConsumer = Thread.currentThread(); 112 return front(); 113 } 114 checkConsumer()115 private synchronized void checkConsumer() { 116 if (currentConsumer != null) { 117 throw new ConcurrentModificationException("The "+ 118 currentConsumer.getName() +" thread is consuming the queue."); 119 } 120 } 121 clearConsumer()122 private synchronized void clearConsumer() { 123 currentConsumer = null; 124 } 125 _dequeue()126 private synchronized T _dequeue() { 127 if (0 == size) { 128 throw new IllegalStateException("Size must > 0 here."); 129 } 130 --size; 131 head = (head + 1) % data.length; 132 T ret = data[head]; 133 data[head] = null; // hint to gc 134 return ret; 135 } 136 front()137 synchronized T front() { 138 return data[(head + 1) % data.length]; 139 } 140 back()141 synchronized T back() { 142 return data[tail]; 143 } 144 clear()145 synchronized void clear() { 146 checkConsumer(); 147 148 for (int i = data.length; i-- > 0; ) { 149 data[i] = null; 150 } 151 size = 0; 152 } 153 size()154 synchronized int size() { 155 return size; 156 } 157 capacity()158 int capacity() { 159 return data.length; 160 } 161 162 } 163