1 /**************************************************************************/
2 /* */
3 /* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com) */
4 /* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de> */
5 /* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de> */
6 /* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de> */
7 /* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
8 /* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com) */
9 /* */
10 /* NXCOMP, NX protocol compression and NX extensions to this software */
11 /* are copyright of the aforementioned persons and companies. */
12 /* */
13 /* Redistribution and use of the present software is allowed according */
14 /* to terms specified in the file LICENSE.nxcomp which comes in the */
15 /* source distribution. */
16 /* */
17 /* All rights reserved. */
18 /* */
19 /* NOTE: This software has received contributions from various other */
20 /* contributors, only the core maintainers and supporters are listed as */
21 /* copyright holders. Please contact us, if you feel you should be listed */
22 /* as copyright holder, as well. */
23 /* */
24 /**************************************************************************/
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29
30 #include "ReadBuffer.h"
31
32 #include "Transport.h"
33
34 //
35 // Set the verbosity level.
36 //
37
38 #define PANIC
39 #define WARNING
40 #undef TEST
41 #undef DEBUG
42
ReadBuffer(Transport * transport)43 ReadBuffer::ReadBuffer(Transport *transport)
44
45 : transport_(transport)
46 {
47 //
48 // The read buffer will grow until
49 // reaching the maximum buffer size
50 // and then will remain stable at
51 // that size.
52 //
53
54 initialReadSize_ = READ_BUFFER_DEFAULT_SIZE;
55 maximumBufferSize_ = READ_BUFFER_DEFAULT_SIZE;
56
57 size_ = 0;
58 buffer_ = NULL;
59
60 owner_ = 1;
61 length_ = 0;
62 start_ = 0;
63
64 remaining_ = 0;
65 }
66
~ReadBuffer()67 ReadBuffer::~ReadBuffer()
68 {
69 if (owner_ == 1)
70 {
71 delete [] buffer_;
72 }
73 }
74
readMessage(const unsigned char * message,unsigned int length)75 void ReadBuffer::readMessage(const unsigned char *message, unsigned int length)
76 {
77 //
78 // To be here we must be the real owner
79 // of the buffer and there must not be
80 // pending bytes in the transport.
81 //
82
83 #ifdef TEST
84
85 if (owner_ == 0)
86 {
87 *logofs << "ReadBuffer: PANIC! Class for FD#"
88 << transport_ -> fd() << " doesn't "
89 << "appear to be the owner of the buffer "
90 << "while borrowing from the caller.\n"
91 << logofs_flush;
92
93 HandleCleanup();
94 }
95
96 #endif
97
98 //
99 // Be sure that any outstanding data from
100 // the transport is appended to our own
101 // byffer.
102 //
103
104 if (transport_ -> pending() != 0)
105 {
106 #ifdef WARNING
107 *logofs << "ReadBuffer: WARNING! Class for FD#"
108 << transport_ -> fd() << " has pending "
109 << "data in the transport while "
110 << "borrowing from the caller.\n"
111 << logofs_flush;
112 #endif
113
114 readMessage();
115
116 if (owner_ == 0)
117 {
118 convertBuffer();
119 }
120 }
121
122 //
123 // Can't borrow the buffer if there is data
124 // from a partial message. In this case add
125 // the new data to the end of our buffer.
126 //
127
128 if (length_ == 0)
129 {
130 #ifdef TEST
131 *logofs << "ReadBuffer: Borrowing " << length
132 << " bytes from the caller for FD#"
133 << transport_ -> fd() << " with "
134 << length_ << " bytes in the buffer.\n"
135 << logofs_flush;
136 #endif
137
138 delete [] buffer_;
139
140 buffer_ = (unsigned char *) message;
141 size_ = length;
142
143 length_ = length;
144
145 owner_ = 0;
146 start_ = 0;
147 }
148 else
149 {
150 #ifdef TEST
151 *logofs << "ReadBuffer: Appending " << length
152 << " bytes from the caller for FD#"
153 << transport_ -> fd() << " with "
154 << length_ << " bytes in the buffer.\n"
155 << logofs_flush;
156 #endif
157
158 appendBuffer(message, length);
159 }
160 }
161
readMessage()162 int ReadBuffer::readMessage()
163 {
164 int pendingLength = transport_ -> pending();
165
166 if (pendingLength > 0)
167 {
168 //
169 // Can't move the data in the borrowed buffer,
170 // so use the tansport buffer only if we don't
171 // have any partial message. This can happen
172 // with the proxy where we need to deflate the
173 // stream.
174 //
175
176 if (length_ == 0)
177 {
178 unsigned char *newBuffer;
179
180 length_ = transport_ -> getPending(newBuffer);
181
182 if (newBuffer == NULL)
183 {
184 #ifdef PANIC
185 *logofs << "ReadBuffer: PANIC! Failed to borrow "
186 << length_ << " bytes of memory for buffer "
187 << "in context [A].\n" << logofs_flush;
188 #endif
189
190 cerr << "Error" << ": Failed to borrow memory for "
191 << "read buffer in context [A].\n";
192
193 HandleCleanup();
194 }
195
196 delete [] buffer_;
197
198 buffer_ = newBuffer;
199 size_ = length_;
200
201 owner_ = 0;
202 start_ = 0;
203
204 #ifdef TEST
205 *logofs << "ReadBuffer: Borrowed " << length_
206 << " pending bytes for FD#" << transport_ ->
207 fd() << ".\n" << logofs_flush;
208 #endif
209
210 return length_;
211 }
212 #ifdef TEST
213 else
214 {
215 *logofs << "ReadBuffer: WARNING! Cannot borrow "
216 << pendingLength << " bytes for FD#"
217 << transport_ -> fd() << " with "
218 << length_ << " bytes in the buffer.\n"
219 << logofs_flush;
220 }
221 #endif
222 }
223
224 unsigned int readLength = suggestedLength(pendingLength);
225
226 #ifdef DEBUG
227 *logofs << "ReadBuffer: Requested " << readLength
228 << " bytes for FD#" << transport_ -> fd()
229 << " with readable " << transport_ -> readable()
230 << " remaining " << remaining_ << " pending "
231 << transport_ -> pending() << ".\n"
232 << logofs_flush;
233 #endif
234
235 if (readLength < initialReadSize_)
236 {
237 readLength = initialReadSize_;
238 }
239
240 #ifdef DEBUG
241 *logofs << "ReadBuffer: Buffer size is " << size_
242 << " length " << length_ << " and start "
243 << start_ << ".\n" << logofs_flush;
244 #endif
245
246 //
247 // We can't use the transport buffer
248 // to read our own data in it.
249 //
250
251 #ifdef TEST
252
253 if (owner_ == 0)
254 {
255 *logofs << "ReadBuffer: PANIC! Class for FD#"
256 << transport_ -> fd() << " doesn't "
257 << "appear to be the owner of the buffer "
258 << "while reading.\n" << logofs_flush;
259
260 HandleCleanup();
261 }
262
263 #endif
264
265 //
266 // Be sure that we have enough space
267 // to store all the requested data.
268 //
269
270 if (buffer_ == NULL || length_ + readLength > size_)
271 {
272 unsigned int newSize = length_ + readLength;
273
274 #ifdef TEST
275 *logofs << "ReadBuffer: Resizing buffer for FD#"
276 << transport_ -> fd() << " in read from "
277 << size_ << " to " << newSize << " bytes.\n"
278 << logofs_flush;
279 #endif
280
281 unsigned char *newBuffer = allocateBuffer(newSize);
282
283 memcpy(newBuffer, buffer_ + start_, length_);
284
285 delete [] buffer_;
286
287 buffer_ = newBuffer;
288 size_ = newSize;
289
290 transport_ -> pendingReset();
291
292 owner_ = 1;
293 }
294 else if (start_ != 0 && length_ != 0)
295 {
296 //
297 // If any bytes are left due to a partial
298 // message, shift them to the beginning
299 // of the buffer.
300 //
301
302 #ifdef TEST
303 *logofs << "ReadBuffer: Moving " << length_
304 << " bytes of data " << "at beginning of "
305 << "the buffer for FD#" << transport_ -> fd()
306 << ".\n" << logofs_flush;
307 #endif
308
309 memmove(buffer_, buffer_ + start_, length_);
310 }
311
312 start_ = 0;
313
314 #ifdef DEBUG
315 *logofs << "ReadBuffer: Buffer size is now " << size_
316 << " length is " << length_ << " and start is "
317 << start_ << ".\n" << logofs_flush;
318 #endif
319
320 unsigned char *readData = buffer_ + length_;
321
322 #ifdef DEBUG
323 *logofs << "ReadBuffer: Going to read " << readLength
324 << " bytes from FD#" << transport_ -> fd() << ".\n"
325 << logofs_flush;
326 #endif
327
328 int bytesRead = transport_ -> read(readData, readLength);
329
330 if (bytesRead > 0)
331 {
332 #ifdef TEST
333 *logofs << "ReadBuffer: Read " << bytesRead
334 << " bytes from FD#" << transport_ -> fd()
335 << ".\n" << logofs_flush;
336 #endif
337
338 length_ += bytesRead;
339 }
340 else if (bytesRead < 0)
341 {
342 //
343 // Check if there is more data pending than the
344 // size of the provided buffer. After reading
345 // the requested amount, in fact, the transport
346 // may have decompressed the data and produced
347 // more input. This trick allows us to always
348 // borrow the buffer from the transport, even
349 // when the partial read would have prevented
350 // that.
351 //
352
353 if (transport_ -> pending() > 0)
354 {
355 #ifdef TEST
356 *logofs << "ReadBuffer: WARNING! Trying to read some "
357 << "more with " << transport_ -> pending()
358 << " bytes pending for FD#" << transport_ ->
359 fd() << ".\n" << logofs_flush;
360 #endif
361
362 return readMessage();
363 }
364
365 #ifdef TEST
366 *logofs << "ReadBuffer: Error detected reading "
367 << "from FD#" << transport_ -> fd()
368 << ".\n" << logofs_flush;
369 #endif
370
371 return -1;
372 }
373 #ifdef TEST
374 else
375 {
376 *logofs << "ReadBuffer: No data read from FD#"
377 << transport_ -> fd() << " with remaining "
378 << remaining_ << ".\n" << logofs_flush;
379 }
380 #endif
381
382 return bytesRead;
383 }
384
getMessage(unsigned int & controlLength,unsigned int & dataLength)385 const unsigned char *ReadBuffer::getMessage(unsigned int &controlLength,
386 unsigned int &dataLength)
387 {
388 #ifdef TEST
389
390 if (transport_ -> pending() > 0)
391 {
392 *logofs << "ReadBuffer: PANIC! The transport "
393 << "appears to have data pending.\n"
394 << logofs_flush;
395
396 HandleCleanup();
397 }
398
399 #endif
400
401 if (length_ == 0)
402 {
403 #ifdef DEBUG
404 *logofs << "ReadBuffer: No message can be located "
405 << "for FD#" << transport_ -> fd() << ".\n"
406 << logofs_flush;
407 #endif
408
409 if (owner_ == 0)
410 {
411 buffer_ = NULL;
412 size_ = 0;
413
414 transport_ -> pendingReset();
415
416 owner_ = 1;
417 start_ = 0;
418 }
419
420 return NULL;
421 }
422
423 unsigned int trailerLength;
424
425 #ifdef DEBUG
426 *logofs << "ReadBuffer: Going to locate message with "
427 << "start at " << start_ << " and length "
428 << length_ << " for FD#" << transport_ -> fd()
429 << ".\n" << logofs_flush;
430 #endif
431
432 int located = locateMessage(buffer_ + start_, buffer_ + start_ + length_,
433 controlLength, dataLength, trailerLength);
434
435 if (located == 0)
436 {
437 //
438 // No more complete messages are in
439 // the buffer.
440 //
441
442 #ifdef DEBUG
443 *logofs << "ReadBuffer: No message was located "
444 << "for FD#" << transport_ -> fd()
445 << ".\n" << logofs_flush;
446 #endif
447
448 if (owner_ == 0)
449 {
450 //
451 // Must move the remaining bytes in
452 // our own buffer.
453 //
454
455 convertBuffer();
456 }
457
458 return NULL;
459 }
460 else
461 {
462 const unsigned char *result = buffer_ + start_;
463
464 if (dataLength > 0)
465 {
466 //
467 // Message contains data, so go to the
468 // first byte of payload.
469 //
470
471 result += trailerLength;
472
473 start_ += (dataLength + trailerLength);
474 length_ -= (dataLength + trailerLength);
475 }
476 else
477 {
478 //
479 // It is a control message.
480 //
481
482 start_ += (controlLength + trailerLength);
483 length_ -= (controlLength + trailerLength);
484 }
485
486 #ifdef DEBUG
487 *logofs << "ReadBuffer: Located message for FD#"
488 << transport_ -> fd() << " with control length "
489 << controlLength << " and data length "
490 << dataLength << ".\n" << logofs_flush;
491 #endif
492
493 remaining_ = 0;
494
495 return result;
496 }
497 }
498
setSize(int initialReadSize,int maximumBufferSize)499 int ReadBuffer::setSize(int initialReadSize, int maximumBufferSize)
500 {
501 initialReadSize_ = initialReadSize;
502 maximumBufferSize_ = maximumBufferSize;
503
504 #ifdef TEST
505 *logofs << "ReadBuffer: WARNING! Set buffer parameters to "
506 << initialReadSize_ << "/" << maximumBufferSize_
507 << " for object at "<< this << ".\n"
508 << logofs_flush;
509 #endif
510
511 return 1;
512 }
513
fullReset()514 void ReadBuffer::fullReset()
515 {
516 #ifdef TEST
517
518 if (owner_ == 0)
519 {
520 *logofs << "ReadBuffer: PANIC! Class for FD#"
521 << transport_ -> fd() << " doesn't "
522 << "appear to be the owner of the buffer "
523 << "in reset.\n" << logofs_flush;
524
525 HandleCleanup();
526 }
527
528 #endif
529
530 if (length_ == 0 && size_ > maximumBufferSize_)
531 {
532 #ifdef TEST
533 *logofs << "ReadBuffer: Resizing buffer for FD#"
534 << transport_ -> fd() << " in reset from "
535 << size_ << " to " << maximumBufferSize_
536 << " bytes.\n" << logofs_flush;
537 #endif
538
539 delete [] buffer_;
540
541 int newSize = maximumBufferSize_;
542
543 unsigned char *newBuffer = allocateBuffer(newSize);
544
545 buffer_ = newBuffer;
546 size_ = newSize;
547
548 transport_ -> pendingReset();
549
550 owner_ = 1;
551 start_ = 0;
552 }
553 }
554
allocateBuffer(unsigned int newSize)555 unsigned char *ReadBuffer::allocateBuffer(unsigned int newSize)
556 {
557 unsigned char *newBuffer = new unsigned char[newSize];
558
559 if (newBuffer == NULL)
560 {
561 #ifdef PANIC
562 *logofs << "ReadBuffer: PANIC! Can't allocate "
563 << newSize << " bytes of memory for buffer "
564 << "in context [B].\n" << logofs_flush;
565 #endif
566
567 cerr << "Error" << ": Can't allocate memory for "
568 << "read buffer in context [B].\n";
569
570 HandleCleanup();
571 }
572
573 #ifdef VALGRIND
574
575 memset(newBuffer, '\0', newSize);
576
577 #endif
578
579 return newBuffer;
580 }
581
appendBuffer(const unsigned char * message,unsigned int length)582 void ReadBuffer::appendBuffer(const unsigned char *message, unsigned int length)
583 {
584 if (start_ + length_ + length > size_)
585 {
586 unsigned int newSize = length_ + length + initialReadSize_;
587
588 #ifdef TEST
589 *logofs << "ReadBuffer: WARNING! Resizing buffer "
590 << "for FD#" << transport_ -> fd()
591 << " from " << size_ << " to " << newSize
592 << " bytes.\n" << logofs_flush;
593 #endif
594
595 unsigned char *newBuffer = allocateBuffer(newSize);
596
597 memcpy(newBuffer, buffer_ + start_, length_);
598
599 delete [] buffer_;
600
601 buffer_ = newBuffer;
602 size_ = newSize;
603
604 start_ = 0;
605 }
606
607 memcpy(buffer_ + start_ + length_, message, length);
608
609 length_ += length;
610
611 transport_ -> pendingReset();
612
613 owner_ = 1;
614 }
615
convertBuffer()616 void ReadBuffer::convertBuffer()
617 {
618 unsigned int newSize = length_ + initialReadSize_;
619
620 #ifdef TEST
621 *logofs << "ReadBuffer: WARNING! Converting "
622 << length_ << " bytes to own buffer "
623 << "for FD#" << transport_ -> fd()
624 << " with new size " << newSize
625 << " bytes.\n" << logofs_flush;
626 #endif
627
628 unsigned char *newBuffer = allocateBuffer(newSize);
629
630 memcpy(newBuffer, buffer_ + start_, length_);
631
632 buffer_ = newBuffer;
633 size_ = newSize;
634
635 transport_ -> pendingReset();
636
637 owner_ = 1;
638 start_ = 0;
639 }
640