1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.metrics2.impl;
20 
21 import java.util.ConcurrentModificationException;
22 
23 /**
24  * A half-blocking (nonblocking for producers, blocking for consumers) queue
25  * for metrics sinks.
26  *
27  * New elements are dropped when the queue is full to preserve "interesting"
28  * elements at the onset of queue filling events
29  */
30 class SinkQueue<T> {
31   // A fixed size circular buffer to minimize garbage
32   private final T[] data;
33   private int head; // head position
34   private int tail; // tail position
35   private int size; // number of elements
36   private Thread currentConsumer = null;
37 
38   @SuppressWarnings("unchecked")
SinkQueue(int capacity)39   SinkQueue(int capacity) {
40     this.data = (T[]) new Object[Math.max(1, capacity)];
41     head = tail = size = 0;
42   }
43 
enqueue(T e)44   synchronized boolean enqueue(T e) {
45     if (data.length == size) {
46       return false;
47     }
48     ++size;
49     tail = (tail + 1) % data.length;
50     data[tail] = e;
51     notify();
52     return true;
53   }
54 
55   /**
56    * Consume one element, will block if queue is empty
57    * Only one consumer at a time is allowed
58    * @param consumer  the consumer callback object
59    */
consume(Consumer<T> consumer)60   void consume(Consumer<T> consumer) throws InterruptedException {
61     T e = waitForData();
62 
63     try {
64       consumer.consume(e);  // can take forever
65       _dequeue();
66     }
67     finally {
68       clearConsumer();
69     }
70   }
71 
72   /**
73    * Consume all the elements, will block if queue is empty
74    * @param consumer  the consumer callback object
75    * @throws InterruptedException
76    */
consumeAll(Consumer<T> consumer)77   void consumeAll(Consumer<T> consumer) throws InterruptedException {
78     waitForData();
79 
80     try {
81       for (int i = size(); i-- > 0; ) {
82         consumer.consume(front()); // can take forever
83         _dequeue();
84       }
85     }
86     finally {
87       clearConsumer();
88     }
89   }
90 
91   /**
92    * Dequeue one element from head of the queue, will block if queue is empty
93    * @return  the first element
94    * @throws InterruptedException
95    */
dequeue()96   synchronized T dequeue() throws InterruptedException {
97     checkConsumer();
98 
99     while (0 == size) {
100       wait();
101     }
102     return _dequeue();
103   }
104 
waitForData()105   private synchronized T waitForData() throws InterruptedException {
106     checkConsumer();
107 
108     while (0 == size) {
109       wait();
110     }
111     currentConsumer = Thread.currentThread();
112     return front();
113   }
114 
checkConsumer()115   private synchronized void checkConsumer() {
116     if (currentConsumer != null) {
117       throw new ConcurrentModificationException("The "+
118           currentConsumer.getName() +" thread is consuming the queue.");
119     }
120   }
121 
clearConsumer()122   private synchronized void clearConsumer() {
123     currentConsumer = null;
124   }
125 
_dequeue()126   private synchronized T _dequeue() {
127     if (0 == size) {
128       throw new IllegalStateException("Size must > 0 here.");
129     }
130     --size;
131     head = (head + 1) % data.length;
132     T ret = data[head];
133     data[head] = null;  // hint to gc
134     return ret;
135   }
136 
front()137   synchronized T front() {
138     return data[(head + 1) % data.length];
139   }
140 
back()141   synchronized T back() {
142     return data[tail];
143   }
144 
clear()145   synchronized void clear() {
146     checkConsumer();
147 
148     for (int i = data.length; i-- > 0; ) {
149       data[i] = null;
150     }
151     size = 0;
152   }
153 
size()154   synchronized int size() {
155     return size;
156   }
157 
capacity()158   int capacity() {
159     return data.length;
160   }
161 
162 }
163