1 /*
2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.
8  *
9  * This code is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * version 2 for more details (a copy is included in the LICENSE file that
13  * accompanied this code).
14  *
15  * You should have received a copy of the GNU General Public License version
16  * 2 along with this work; if not, write to the Free Software Foundation,
17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
18  *
19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
20  * or visit www.oracle.com if you need additional information or have any
21  * questions.
22  */
23 
24 import java.io.IOException;
25 import java.util.LinkedList;
26 import java.util.Objects;
27 import java.util.stream.Stream;
28 
29 // Each stream has one of these for input. Each Http2Connection has one
30 // for output. Can be used blocking or asynchronously.
31 
32 public class Queue<T> implements ExceptionallyCloseable {
33 
34     private final LinkedList<T> q = new LinkedList<>();
35     private boolean closed = false;
36     private boolean closing = false;
37     private Throwable exception = null;
38     private int waiters; // true if someone waiting
39     private final T closeSentinel;
40 
Queue(T closeSentinel)41     Queue(T closeSentinel) {
42         this.closeSentinel = Objects.requireNonNull(closeSentinel);
43     }
44 
size()45     public synchronized int size() {
46         return q.size();
47     }
48 
isClosed()49     public synchronized boolean isClosed() {
50         return closed;
51     }
52 
isClosing()53     public synchronized boolean isClosing() {
54         return closing;
55     }
56 
put(T obj)57     public synchronized void put(T obj) throws IOException {
58         Objects.requireNonNull(obj);
59         if (closed || closing) {
60             throw new IOException("stream closed");
61         }
62 
63         q.add(obj);
64 
65         if (waiters > 0) {
66             notifyAll();
67         }
68     }
69 
70     // Other close() variants are immediate and abortive
71     // This allows whatever is on Q to be processed first.
72 
orderlyClose()73     public synchronized void orderlyClose() {
74         if (closing || closed)
75             return;
76 
77         try {
78             put(closeSentinel);
79         } catch (IOException e) {
80             e.printStackTrace();
81         }
82         closing = true;
83     }
84 
85     @Override
close()86     public synchronized void close() {
87         if (closed)
88             return;
89         closed = true;
90         notifyAll();
91     }
92 
93     @Override
closeExceptionally(Throwable t)94     public synchronized void closeExceptionally(Throwable t) {
95         if (exception == null) exception = t;
96         else if (t != null && t != exception) {
97             if (!Stream.of(exception.getSuppressed())
98                 .filter(x -> x == t)
99                 .findFirst()
100                 .isPresent())
101             {
102                 exception.addSuppressed(t);
103             }
104         }
105         close();
106     }
107 
take()108     public synchronized T take() throws IOException {
109         if (closed) {
110             throw newIOException("stream closed");
111         }
112         try {
113             while (q.size() == 0) {
114                 waiters++;
115                 wait();
116                 if (closed) {
117                     throw newIOException("Queue closed");
118                 }
119                 waiters--;
120             }
121             T item = q.removeFirst();
122             if (item.equals(closeSentinel)) {
123                 closed = true;
124                 assert q.isEmpty();
125                 return null;
126             }
127             return item;
128         } catch (InterruptedException ex) {
129             throw new IOException(ex);
130         }
131     }
132 
poll()133     public synchronized T poll() throws IOException {
134         if (closed) {
135             throw newIOException("stream closed");
136         }
137 
138         if (q.isEmpty()) {
139             return null;
140         }
141         return take();
142     }
143 
newIOException(String msg)144     private IOException newIOException(String msg) {
145         if (exception == null) {
146             return new IOException(msg);
147         } else {
148             return new IOException(msg, exception);
149         }
150     }
151 }
152