1 /* 2 * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. 8 * 9 * This code is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 12 * version 2 for more details (a copy is included in the LICENSE file that 13 * accompanied this code). 14 * 15 * You should have received a copy of the GNU General Public License version 16 * 2 along with this work; if not, write to the Free Software Foundation, 17 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 18 * 19 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 20 * or visit www.oracle.com if you need additional information or have any 21 * questions. 22 */ 23 24 import java.io.IOException; 25 import java.util.LinkedList; 26 import java.util.Objects; 27 import java.util.stream.Stream; 28 29 // Each stream has one of these for input. Each Http2Connection has one 30 // for output. Can be used blocking or asynchronously. 31 32 public class Queue<T> implements ExceptionallyCloseable { 33 34 private final LinkedList<T> q = new LinkedList<>(); 35 private boolean closed = false; 36 private boolean closing = false; 37 private Throwable exception = null; 38 private int waiters; // true if someone waiting 39 private final T closeSentinel; 40 Queue(T closeSentinel)41 Queue(T closeSentinel) { 42 this.closeSentinel = Objects.requireNonNull(closeSentinel); 43 } 44 size()45 public synchronized int size() { 46 return q.size(); 47 } 48 isClosed()49 public synchronized boolean isClosed() { 50 return closed; 51 } 52 isClosing()53 public synchronized boolean isClosing() { 54 return closing; 55 } 56 put(T obj)57 public synchronized void put(T obj) throws IOException { 58 Objects.requireNonNull(obj); 59 if (closed || closing) { 60 throw new IOException("stream closed"); 61 } 62 63 q.add(obj); 64 65 if (waiters > 0) { 66 notifyAll(); 67 } 68 } 69 70 // Other close() variants are immediate and abortive 71 // This allows whatever is on Q to be processed first. 72 orderlyClose()73 public synchronized void orderlyClose() { 74 if (closing || closed) 75 return; 76 77 try { 78 put(closeSentinel); 79 } catch (IOException e) { 80 e.printStackTrace(); 81 } 82 closing = true; 83 } 84 85 @Override close()86 public synchronized void close() { 87 if (closed) 88 return; 89 closed = true; 90 notifyAll(); 91 } 92 93 @Override closeExceptionally(Throwable t)94 public synchronized void closeExceptionally(Throwable t) { 95 if (exception == null) exception = t; 96 else if (t != null && t != exception) { 97 if (!Stream.of(exception.getSuppressed()) 98 .filter(x -> x == t) 99 .findFirst() 100 .isPresent()) 101 { 102 exception.addSuppressed(t); 103 } 104 } 105 close(); 106 } 107 take()108 public synchronized T take() throws IOException { 109 if (closed) { 110 throw newIOException("stream closed"); 111 } 112 try { 113 while (q.size() == 0) { 114 waiters++; 115 wait(); 116 if (closed) { 117 throw newIOException("Queue closed"); 118 } 119 waiters--; 120 } 121 T item = q.removeFirst(); 122 if (item.equals(closeSentinel)) { 123 closed = true; 124 assert q.isEmpty(); 125 return null; 126 } 127 return item; 128 } catch (InterruptedException ex) { 129 throw new IOException(ex); 130 } 131 } 132 poll()133 public synchronized T poll() throws IOException { 134 if (closed) { 135 throw newIOException("stream closed"); 136 } 137 138 if (q.isEmpty()) { 139 return null; 140 } 141 return take(); 142 } 143 newIOException(String msg)144 private IOException newIOException(String msg) { 145 if (exception == null) { 146 return new IOException(msg); 147 } else { 148 return new IOException(msg, exception); 149 } 150 } 151 } 152