1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.guacamole.tunnel; 21 22 import java.io.Closeable; 23 import java.io.IOException; 24 import java.util.concurrent.ConcurrentHashMap; 25 import java.util.concurrent.ConcurrentMap; 26 import org.slf4j.Logger; 27 import org.slf4j.LoggerFactory; 28 29 /** 30 * Map-like storage for intercepted Guacamole streams. 31 * 32 * @param <T> 33 * The type of object which will produce or consume the data sent over the 34 * intercepted Guacamole stream. Usually, this will be either InputStream 35 * or OutputStream. 36 */ 37 public class InterceptedStreamMap<T extends Closeable> { 38 39 /** 40 * Logger for this class. 41 */ 42 private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class); 43 44 /** 45 * The maximum number of milliseconds to wait for notification that a 46 * stream has closed before explicitly checking for closure ourselves. 47 */ 48 private static final long STREAM_WAIT_TIMEOUT = 1000; 49 50 /** 51 * Mapping of the indexes of all streams whose associated "blob" and "end" 52 * instructions should be intercepted. 53 */ 54 private final ConcurrentMap<String, InterceptedStream<T>> streams = 55 new ConcurrentHashMap<String, InterceptedStream<T>>(); 56 57 /** 58 * Closes the given stream, logging any errors that occur during closure. 59 * The monitor of the stream is notified via a single call to notify() once 60 * the attempt to close has been made. 61 * 62 * @param stream 63 * The stream to close and notify. 64 */ close(T stream)65 private void close(T stream) { 66 67 // Attempt to close stream 68 try { 69 stream.close(); 70 } 71 catch (IOException e) { 72 logger.warn("Unable to close intercepted stream: {}", e.getMessage()); 73 logger.debug("I/O error prevented closure of intercepted stream.", e); 74 } 75 76 // Notify waiting threads that the stream has ended 77 synchronized (stream) { 78 stream.notify(); 79 } 80 81 } 82 83 /** 84 * Closes the stream object associated with the stream having the given 85 * index, if any, removing it from the map, logging any errors that occur 86 * during closure, and unblocking any in-progress calls to waitFor() for 87 * that stream. If no such stream exists within this map, then this 88 * function has no effect. 89 * 90 * @param index 91 * The index of the stream whose associated stream object should be 92 * closed. 93 * 94 * @return 95 * The stream associated with the given index, if the stream was stored 96 * within this map, or null if no such stream exists. 97 */ close(String index)98 public InterceptedStream<T> close(String index) { 99 100 // Remove associated stream 101 InterceptedStream<T> stream = streams.remove(index); 102 if (stream == null) 103 return null; 104 105 // Close stream if it exists 106 close(stream.getStream()); 107 return stream; 108 109 } 110 111 /** 112 * Closes the given stream, logging any errors that occur during closure, 113 * and unblocking any in-progress calls to waitFor() for the given stream. 114 * If the given stream is stored within this map, it will also be removed. 115 * 116 * @param stream 117 * The stream to close. 118 * 119 * @return 120 * true if the given stream was stored within this map, false 121 * otherwise. 122 */ close(InterceptedStream<T> stream)123 public boolean close(InterceptedStream<T> stream) { 124 125 // Remove stream if present 126 boolean wasRemoved = streams.remove(stream.getIndex(), stream); 127 128 // Close provided stream 129 close(stream.getStream()); 130 131 return wasRemoved; 132 133 } 134 135 /** 136 * Removes and closes all streams stored within this map, logging any errors 137 * that occur during closure, and unblocking any in-progress calls to 138 * waitFor(). 139 */ closeAll()140 public void closeAll() { 141 142 // Close any active streams 143 for (InterceptedStream<T> stream : streams.values()) 144 close(stream.getStream()); 145 146 // Remove now-useless references 147 streams.clear(); 148 149 } 150 151 /** 152 * Blocks until the given stream is closed, or until another stream with 153 * the same index replaces it. 154 * 155 * @param stream 156 * The stream to wait for. 157 */ waitFor(InterceptedStream<T> stream)158 public void waitFor(InterceptedStream<T> stream) { 159 160 T underlyingStream = stream.getStream(); 161 162 // Wait for stream to close 163 synchronized (underlyingStream) { 164 while (streams.get(stream.getIndex()) == stream) { 165 try { 166 underlyingStream.wait(STREAM_WAIT_TIMEOUT); 167 } 168 catch (InterruptedException e) { 169 // Ignore 170 } 171 } 172 } 173 174 } 175 176 /** 177 * Returns the stream stored in this map under the given index. 178 * 179 * @param index 180 * The index of the stream to return. 181 * 182 * @return 183 * The stream having the given index, or null if no such stream is 184 * stored within this map. 185 */ get(String index)186 public InterceptedStream<T> get(String index) { 187 return streams.get(index); 188 } 189 190 /** 191 * Adds the given stream to this map, storing it under its associated 192 * index. If another stream already exists within this map having the same 193 * index, that stream will be closed and replaced. 194 * 195 * @param stream 196 * The stream to store within this map. 197 */ put(InterceptedStream<T> stream)198 public void put(InterceptedStream<T> stream) { 199 200 // Add given stream to map 201 InterceptedStream<T> oldStream = 202 streams.put(stream.getIndex(), stream); 203 204 // If a previous stream DID exist, close it 205 if (oldStream != null) 206 close(oldStream.getStream()); 207 208 } 209 210 } 211