1 /* Copyright (C) 2002-2005 RealVNC Ltd.  All Rights Reserved.
2  * Copyright (C) 2012-2019 Brian P. Hinz
3  *
4  * This is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This software is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this software; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301,
17  * USA.
18  */
19 
20 package com.tigervnc.rdr;
21 
22 import java.nio.*;
23 import java.nio.channels.Selector;
24 import java.nio.channels.SelectionKey;
25 import java.util.Set;
26 import java.util.Iterator;
27 
28 import com.tigervnc.network.*;
29 
30 public class FdInStream extends InStream {
31 
32   static final int DEFAULT_BUF_SIZE = 16384;
33   static final int minBulkSize = 1024;
34 
FdInStream(FileDescriptor fd_, int timeoutms_, int bufSize_, boolean closeWhenDone_)35   public FdInStream(FileDescriptor fd_, int timeoutms_, int bufSize_,
36                     boolean closeWhenDone_)
37   {
38     fd = fd_; closeWhenDone = closeWhenDone_;
39     timeoutms = timeoutms_; blockCallback = null;
40     timing = false; timeWaitedIn100us = 5; timedKbits = 0;
41     bufSize = ((bufSize_ > 0) ? bufSize_ : DEFAULT_BUF_SIZE);
42     b = new byte[bufSize];
43     ptr = end = offset = 0;
44   }
45 
FdInStream(FileDescriptor fd_)46   public FdInStream(FileDescriptor fd_) { this(fd_, -1, 0, false); }
47 
FdInStream(FileDescriptor fd_, FdInStreamBlockCallback blockCallback_, int bufSize_)48   public FdInStream(FileDescriptor fd_, FdInStreamBlockCallback blockCallback_,
49                     int bufSize_)
50   {
51     fd = fd_; timeoutms = 0; blockCallback = blockCallback_;
52     timing = false; timeWaitedIn100us = 5; timedKbits = 0;
53     bufSize = ((bufSize_ > 0) ? bufSize_ : DEFAULT_BUF_SIZE);
54     b = new byte[bufSize];
55     ptr = end = offset = 0;
56   }
57 
FdInStream(FileDescriptor fd_, FdInStreamBlockCallback blockCallback_)58   public FdInStream(FileDescriptor fd_,
59                     FdInStreamBlockCallback blockCallback_) {
60     this(fd_, blockCallback_, 0);
61   }
62 
readBytes(ByteBuffer data, int length)63   public final void readBytes(ByteBuffer data, int length) {
64     if (length < minBulkSize) {
65       super.readBytes(data, length);
66       return;
67     }
68 
69     int dataPtr = data.position();
70 
71     int n = end - ptr;
72     if (n > length) n = length;
73 
74     data.put(b, ptr, n);
75     dataPtr += n;
76     length -= n;
77     ptr += n;
78 
79     while (length > 0) {
80       n = readWithTimeoutOrCallback(data, length);
81       dataPtr += n;
82       length -= n;
83       offset += n;
84     }
85   }
86 
setTimeout(int timeoutms_)87   public void setTimeout(int timeoutms_) {
88     timeoutms = timeoutms_;
89   }
90 
setBlockCallback(FdInStreamBlockCallback blockCallback_)91   public void setBlockCallback(FdInStreamBlockCallback blockCallback_)
92   {
93     blockCallback = blockCallback_;
94     timeoutms = 0;
95   }
96 
pos()97   public final int pos() { return offset + ptr; }
98 
startTiming()99   public final void startTiming() {
100     timing = true;
101 
102     // Carry over up to 1s worth of previous rate for smoothing.
103 
104     if (timeWaitedIn100us > 10000) {
105       timedKbits = timedKbits * 10000 / timeWaitedIn100us;
106       timeWaitedIn100us = 10000;
107     }
108   }
109 
stopTiming()110   public final void stopTiming() {
111     timing = false;
112     if (timeWaitedIn100us < timedKbits/2)
113       timeWaitedIn100us = timedKbits/2; // upper limit 20Mbit/s
114   }
115 
kbitsPerSecond()116   public final long kbitsPerSecond() {
117     return timedKbits * 10000 / timeWaitedIn100us;
118   }
119 
timeWaited()120   public final long timeWaited() { return timeWaitedIn100us; }
121 
overrun(int itemSize, int nItems, boolean wait)122   protected int overrun(int itemSize, int nItems, boolean wait)
123   {
124     if (itemSize > bufSize)
125       throw new Exception("FdInStream overrun: max itemSize exceeded");
126 
127     if (end - ptr != 0)
128       System.arraycopy(b, ptr, b, 0, end - ptr);
129 
130     offset += ptr;
131     end -= ptr;
132     ptr = 0;
133 
134     int bytes_to_read;
135     while (end < itemSize) {
136       bytes_to_read = bufSize - end;
137       if (!timing) {
138         // When not timing, we must be careful not to read too much
139         // extra data into the buffer. Otherwise, the line speed
140         // estimation might stay at zero for a long time: All reads
141         // during timing=1 can be satisfied without calling
142         // readWithTimeoutOrCallback. However, reading only 1 or 2 bytes
143         // bytes is ineffecient.
144         bytes_to_read = Math.min(bytes_to_read, Math.max(itemSize*nItems, 8));
145       }
146       Buffer buf = ByteBuffer.wrap(b).position(end);
147       int n = readWithTimeoutOrCallback((ByteBuffer)buf, bytes_to_read, wait);
148       if (n == 0) return 0;
149       end += n;
150     }
151 
152     int nAvail;
153     nAvail = (end - ptr) / itemSize;
154     if (nAvail < nItems)
155       return nAvail;
156 
157     return nItems;
158   }
159 
readWithTimeoutOrCallback(ByteBuffer buf, int len, boolean wait)160   protected int readWithTimeoutOrCallback(ByteBuffer buf, int len, boolean wait) {
161     long before = 0;
162     if (timing)
163       before = System.nanoTime();
164 
165     int n;
166     while (true) {
167       do {
168         Integer tv;
169 
170         if (!wait) {
171           tv = new Integer(0);
172         } else if (timeoutms != -1) {
173           tv = new Integer(timeoutms);
174         } else {
175           tv = null;
176         }
177 
178         try {
179           n = fd.select(SelectionKey.OP_READ, tv);
180         } catch (Exception e) {
181           throw new SystemException("select:"+e.toString());
182         }
183       } while (n < 0);
184 
185 
186       if (n > 0) break;
187       if (!wait) return 0;
188       if (blockCallback == null) throw new TimedOut();
189 
190       blockCallback.blockCallback();
191     }
192 
193     try {
194       n = fd.read(buf, len);
195     } catch (Exception e) {
196       throw new SystemException("read:"+e.toString());
197     }
198 
199     if (n == 0) throw new EndOfStream();
200 
201     if (timing) {
202       long after = System.nanoTime();
203       long newTimeWaited = (after - before) / 100000;
204       int newKbits = n * 8 / 1000;
205 
206       // limit rate to between 10kbit/s and 40Mbit/s
207 
208       if (newTimeWaited > newKbits*1000) {
209         newTimeWaited = newKbits*1000;
210       } else if (newTimeWaited < newKbits/4) {
211         newTimeWaited = newKbits/4;
212       }
213 
214       timeWaitedIn100us += newTimeWaited;
215       timedKbits += newKbits;
216     }
217 
218     return n;
219   }
220 
readWithTimeoutOrCallback(ByteBuffer buf, int len)221   private int readWithTimeoutOrCallback(ByteBuffer buf, int len) {
222     return readWithTimeoutOrCallback(buf, len, true);
223   }
224 
getFd()225   public FileDescriptor getFd() {
226     return fd;
227   }
228 
setFd(FileDescriptor fd_)229   public void setFd(FileDescriptor fd_) {
230     fd = fd_;
231   }
232 
getBufSize()233   public int getBufSize() {
234     return bufSize;
235   }
236 
237   private FileDescriptor fd;
238   boolean closeWhenDone;
239   protected int timeoutms;
240   private FdInStreamBlockCallback blockCallback;
241   private int offset;
242   private int bufSize;
243 
244   protected boolean timing;
245   protected long timeWaitedIn100us;
246   protected long timedKbits;
247 }
248