1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 package org.apache.guacamole.tunnel;
21 
22 import java.io.Closeable;
23 import java.io.IOException;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ConcurrentMap;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28 
29 /**
30  * Map-like storage for intercepted Guacamole streams.
31  *
32  * @param <T>
33  *     The type of object which will produce or consume the data sent over the
34  *     intercepted Guacamole stream. Usually, this will be either InputStream
35  *     or OutputStream.
36  */
37 public class InterceptedStreamMap<T extends Closeable> {
38 
39     /**
40      * Logger for this class.
41      */
42     private static final Logger logger = LoggerFactory.getLogger(InterceptedStreamMap.class);
43 
44     /**
45      * The maximum number of milliseconds to wait for notification that a
46      * stream has closed before explicitly checking for closure ourselves.
47      */
48     private static final long STREAM_WAIT_TIMEOUT = 1000;
49 
50     /**
51      * Mapping of the indexes of all streams whose associated "blob" and "end"
52      * instructions should be intercepted.
53      */
54     private final ConcurrentMap<String, InterceptedStream<T>> streams =
55             new ConcurrentHashMap<String, InterceptedStream<T>>();
56 
57     /**
58      * Closes the given stream, logging any errors that occur during closure.
59      * The monitor of the stream is notified via a single call to notify() once
60      * the attempt to close has been made.
61      *
62      * @param stream
63      *     The stream to close and notify.
64      */
close(T stream)65     private void close(T stream) {
66 
67         // Attempt to close stream
68         try {
69             stream.close();
70         }
71         catch (IOException e) {
72             logger.warn("Unable to close intercepted stream: {}", e.getMessage());
73             logger.debug("I/O error prevented closure of intercepted stream.", e);
74         }
75 
76         // Notify waiting threads that the stream has ended
77         synchronized (stream) {
78             stream.notify();
79         }
80 
81     }
82 
83     /**
84      * Closes the stream object associated with the stream having the given
85      * index, if any, removing it from the map, logging any errors that occur
86      * during closure, and unblocking any in-progress calls to waitFor() for
87      * that stream. If no such stream exists within this map, then this
88      * function has no effect.
89      *
90      * @param index
91      *     The index of the stream whose associated stream object should be
92      *     closed.
93      *
94      * @return
95      *     The stream associated with the given index, if the stream was stored
96      *     within this map, or null if no such stream exists.
97      */
close(String index)98     public InterceptedStream<T> close(String index) {
99 
100         // Remove associated stream
101         InterceptedStream<T> stream = streams.remove(index);
102         if (stream == null)
103             return null;
104 
105         // Close stream if it exists
106         close(stream.getStream());
107         return stream;
108 
109     }
110 
111     /**
112      * Closes the given stream, logging any errors that occur during closure,
113      * and unblocking any in-progress calls to waitFor() for the given stream.
114      * If the given stream is stored within this map, it will also be removed.
115      *
116      * @param stream
117      *     The stream to close.
118      *
119      * @return
120      *     true if the given stream was stored within this map, false
121      *     otherwise.
122      */
close(InterceptedStream<T> stream)123     public boolean close(InterceptedStream<T> stream) {
124 
125         // Remove stream if present
126         boolean wasRemoved = streams.remove(stream.getIndex(), stream);
127 
128         // Close provided stream
129         close(stream.getStream());
130 
131         return wasRemoved;
132 
133     }
134 
135     /**
136      * Removes and closes all streams stored within this map, logging any errors
137      * that occur during closure, and unblocking any in-progress calls to
138      * waitFor().
139      */
closeAll()140     public void closeAll() {
141 
142         // Close any active streams
143         for (InterceptedStream<T> stream : streams.values())
144             close(stream.getStream());
145 
146         // Remove now-useless references
147         streams.clear();
148 
149     }
150 
151     /**
152      * Blocks until the given stream is closed, or until another stream with
153      * the same index replaces it.
154      *
155      * @param stream
156      *     The stream to wait for.
157      */
waitFor(InterceptedStream<T> stream)158     public void waitFor(InterceptedStream<T> stream) {
159 
160         T underlyingStream = stream.getStream();
161 
162         // Wait for stream to close
163         synchronized (underlyingStream) {
164             while (streams.get(stream.getIndex()) == stream) {
165                 try {
166                     underlyingStream.wait(STREAM_WAIT_TIMEOUT);
167                 }
168                 catch (InterruptedException e) {
169                     // Ignore
170                 }
171             }
172         }
173 
174     }
175 
176     /**
177      * Returns the stream stored in this map under the given index.
178      *
179      * @param index
180      *     The index of the stream to return.
181      *
182      * @return
183      *     The stream having the given index, or null if no such stream is
184      *     stored within this map.
185      */
get(String index)186     public InterceptedStream<T> get(String index) {
187         return streams.get(index);
188     }
189 
190     /**
191      * Adds the given stream to this map, storing it under its associated
192      * index. If another stream already exists within this map having the same
193      * index, that stream will be closed and replaced.
194      *
195      * @param stream
196      *     The stream to store within this map.
197      */
put(InterceptedStream<T> stream)198     public void put(InterceptedStream<T> stream) {
199 
200         // Add given stream to map
201         InterceptedStream<T> oldStream =
202                 streams.put(stream.getIndex(), stream);
203 
204         // If a previous stream DID exist, close it
205         if (oldStream != null)
206             close(oldStream.getStream());
207 
208     }
209 
210 }
211