1 /**************************************************************************/
2 /*                                                                        */
3 /* Copyright (c) 2001, 2011 NoMachine (http://www.nomachine.com)          */
4 /* Copyright (c) 2008-2014 Oleksandr Shneyder <o.shneyder@phoca-gmbh.de>  */
5 /* Copyright (c) 2014-2016 Ulrich Sibiller <uli42@gmx.de>                 */
6 /* Copyright (c) 2014-2016 Mihai Moldovan <ionic@ionic.de>                */
7 /* Copyright (c) 2011-2016 Mike Gabriel <mike.gabriel@das-netzwerkteam.de>*/
8 /* Copyright (c) 2015-2016 Qindel Group (http://www.qindel.com)           */
9 /*                                                                        */
10 /* NXCOMP, NX protocol compression and NX extensions to this software     */
11 /* are copyright of the aforementioned persons and companies.             */
12 /*                                                                        */
13 /* Redistribution and use of the present software is allowed according    */
14 /* to terms specified in the file LICENSE.nxcomp which comes in the       */
15 /* source distribution.                                                   */
16 /*                                                                        */
17 /* All rights reserved.                                                   */
18 /*                                                                        */
19 /* NOTE: This software has received contributions from various other      */
20 /* contributors, only the core maintainers and supporters are listed as   */
21 /* copyright holders. Please contact us, if you feel you should be listed */
22 /* as copyright holder, as well.                                          */
23 /*                                                                        */
24 /**************************************************************************/
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #include "ReadBuffer.h"
31 
32 #include "Transport.h"
33 
34 //
35 // Set the verbosity level.
36 //
37 
38 #define PANIC
39 #define WARNING
40 #undef  TEST
41 #undef  DEBUG
42 
ReadBuffer(Transport * transport)43 ReadBuffer::ReadBuffer(Transport *transport)
44 
45   : transport_(transport)
46 {
47   //
48   // The read buffer will grow until
49   // reaching the maximum buffer size
50   // and then will remain stable at
51   // that size.
52   //
53 
54   initialReadSize_   = READ_BUFFER_DEFAULT_SIZE;
55   maximumBufferSize_ = READ_BUFFER_DEFAULT_SIZE;
56 
57   size_   = 0;
58   buffer_ = NULL;
59 
60   owner_  = 1;
61   length_ = 0;
62   start_  = 0;
63 
64   remaining_ = 0;
65 }
66 
~ReadBuffer()67 ReadBuffer::~ReadBuffer()
68 {
69   if (owner_ == 1)
70   {
71     delete [] buffer_;
72   }
73 }
74 
readMessage(const unsigned char * message,unsigned int length)75 void ReadBuffer::readMessage(const unsigned char *message, unsigned int length)
76 {
77   //
78   // To be here we must be the real owner
79   // of the buffer and there must not be
80   // pending bytes in the transport.
81   //
82 
83   #ifdef TEST
84 
85   if (owner_ == 0)
86   {
87     *logofs << "ReadBuffer: PANIC! Class for FD#"
88             << transport_ -> fd() << " doesn't "
89             << "appear to be the owner of the buffer "
90             << "while borrowing from the caller.\n"
91             << logofs_flush;
92 
93     HandleCleanup();
94   }
95 
96   #endif
97 
98   //
99   // Be sure that any outstanding data from
100   // the transport is appended to our own
101   // byffer.
102   //
103 
104   if (transport_ -> pending() != 0)
105   {
106     #ifdef WARNING
107     *logofs << "ReadBuffer: WARNING! Class for FD#"
108             << transport_ -> fd() << " has pending "
109             << "data in the transport while "
110             << "borrowing from the caller.\n"
111             << logofs_flush;
112     #endif
113 
114     readMessage();
115 
116     if (owner_ == 0)
117     {
118       convertBuffer();
119     }
120   }
121 
122   //
123   // Can't borrow the buffer if there is data
124   // from a partial message. In this case add
125   // the new data to the end of our buffer.
126   //
127 
128   if (length_ == 0)
129   {
130     #ifdef TEST
131     *logofs << "ReadBuffer: Borrowing " << length
132             << " bytes from the caller for FD#"
133             << transport_ -> fd() << " with "
134             << length_ << " bytes in the buffer.\n"
135             << logofs_flush;
136     #endif
137 
138     delete [] buffer_;
139 
140     buffer_ = (unsigned char *) message;
141     size_   = length;
142 
143     length_ = length;
144 
145     owner_ = 0;
146     start_ = 0;
147   }
148   else
149   {
150     #ifdef TEST
151     *logofs << "ReadBuffer: Appending " << length
152             << " bytes from the caller for FD#"
153             << transport_ -> fd() << " with "
154             << length_ << " bytes in the buffer.\n"
155             << logofs_flush;
156     #endif
157 
158     appendBuffer(message, length);
159   }
160 }
161 
readMessage()162 int ReadBuffer::readMessage()
163 {
164   int pendingLength = transport_ -> pending();
165 
166   if (pendingLength > 0)
167   {
168     //
169     // Can't move the data in the borrowed buffer,
170     // so use the tansport buffer only if we don't
171     // have any partial message. This can happen
172     // with the proxy where we need to deflate the
173     // stream.
174     //
175 
176     if (length_ == 0)
177     {
178       unsigned char *newBuffer;
179 
180       length_ = transport_ -> getPending(newBuffer);
181 
182       if (newBuffer == NULL)
183       {
184         #ifdef PANIC
185         *logofs << "ReadBuffer: PANIC! Failed to borrow "
186                 << length_ << " bytes of memory for buffer "
187                 << "in context [A].\n" << logofs_flush;
188         #endif
189 
190         cerr << "Error" << ": Failed to borrow memory for "
191              << "read buffer in context [A].\n";
192 
193         HandleCleanup();
194       }
195 
196       delete [] buffer_;
197 
198       buffer_ = newBuffer;
199       size_   = length_;
200 
201       owner_ = 0;
202       start_ = 0;
203 
204       #ifdef TEST
205       *logofs << "ReadBuffer: Borrowed " << length_
206               << " pending bytes for FD#" << transport_ ->
207                  fd() << ".\n" << logofs_flush;
208       #endif
209 
210       return length_;
211     }
212     #ifdef TEST
213     else
214     {
215       *logofs << "ReadBuffer: WARNING! Cannot borrow "
216               << pendingLength << " bytes for FD#"
217               << transport_ -> fd() << " with "
218               << length_ << " bytes in the buffer.\n"
219               << logofs_flush;
220     }
221     #endif
222   }
223 
224   unsigned int readLength = suggestedLength(pendingLength);
225 
226   #ifdef DEBUG
227   *logofs << "ReadBuffer: Requested " << readLength
228           << " bytes for FD#" << transport_ -> fd()
229           << " with readable " << transport_ -> readable()
230           << " remaining " << remaining_ << " pending "
231           << transport_ -> pending() << ".\n"
232           << logofs_flush;
233   #endif
234 
235   if (readLength < initialReadSize_)
236   {
237     readLength = initialReadSize_;
238   }
239 
240   #ifdef DEBUG
241   *logofs << "ReadBuffer: Buffer size is " << size_
242           << " length " << length_ << " and start "
243           << start_  << ".\n" << logofs_flush;
244   #endif
245 
246   //
247   // We can't use the transport buffer
248   // to read our own data in it.
249   //
250 
251   #ifdef TEST
252 
253   if (owner_ == 0)
254   {
255     *logofs << "ReadBuffer: PANIC! Class for FD#"
256             << transport_ -> fd() << " doesn't "
257             << "appear to be the owner of the buffer "
258             << "while reading.\n" << logofs_flush;
259 
260     HandleCleanup();
261   }
262 
263   #endif
264 
265   //
266   // Be sure that we have enough space
267   // to store all the requested data.
268   //
269 
270   if (buffer_ == NULL || length_ + readLength > size_)
271   {
272     unsigned int newSize = length_ + readLength;
273 
274     #ifdef TEST
275     *logofs << "ReadBuffer: Resizing buffer for FD#"
276             << transport_ -> fd() << " in read from "
277             << size_ << " to " << newSize << " bytes.\n"
278             << logofs_flush;
279     #endif
280 
281     unsigned char *newBuffer = allocateBuffer(newSize);
282 
283     memcpy(newBuffer, buffer_ + start_, length_);
284 
285     delete [] buffer_;
286 
287     buffer_ = newBuffer;
288     size_   = newSize;
289 
290     transport_ -> pendingReset();
291 
292     owner_ = 1;
293   }
294   else if (start_ != 0 && length_ != 0)
295   {
296     //
297     // If any bytes are left due to a partial
298     // message, shift them to the beginning
299     // of the buffer.
300     //
301 
302     #ifdef TEST
303     *logofs << "ReadBuffer: Moving " << length_
304             << " bytes of data " << "at beginning of "
305             << "the buffer for FD#" << transport_ -> fd()
306             << ".\n" << logofs_flush;
307     #endif
308 
309     memmove(buffer_, buffer_ + start_, length_);
310   }
311 
312   start_ = 0;
313 
314   #ifdef DEBUG
315   *logofs << "ReadBuffer: Buffer size is now " << size_
316           << " length is " << length_ << " and start is "
317           << start_ << ".\n" << logofs_flush;
318   #endif
319 
320   unsigned char *readData = buffer_ + length_;
321 
322   #ifdef DEBUG
323   *logofs << "ReadBuffer: Going to read " << readLength
324           << " bytes from FD#" << transport_ -> fd() << ".\n"
325           << logofs_flush;
326   #endif
327 
328   int bytesRead = transport_ -> read(readData, readLength);
329 
330   if (bytesRead > 0)
331   {
332     #ifdef TEST
333     *logofs << "ReadBuffer: Read " << bytesRead
334             << " bytes from FD#" << transport_ -> fd()
335             << ".\n" << logofs_flush;
336     #endif
337 
338     length_ += bytesRead;
339   }
340   else if (bytesRead < 0)
341   {
342     //
343     // Check if there is more data pending than the
344     // size of the provided buffer. After reading
345     // the requested amount, in fact, the transport
346     // may have decompressed the data and produced
347     // more input. This trick allows us to always
348     // borrow the buffer from the transport, even
349     // when the partial read would have prevented
350     // that.
351     //
352 
353     if (transport_ -> pending() > 0)
354     {
355       #ifdef TEST
356       *logofs << "ReadBuffer: WARNING! Trying to read some "
357               << "more with " << transport_ -> pending()
358               << " bytes pending for FD#" << transport_ ->
359                  fd() << ".\n" << logofs_flush;
360       #endif
361 
362       return readMessage();
363     }
364 
365     #ifdef TEST
366     *logofs << "ReadBuffer: Error detected reading "
367             << "from FD#" << transport_ -> fd()
368             << ".\n" << logofs_flush;
369     #endif
370 
371     return -1;
372   }
373   #ifdef TEST
374   else
375   {
376     *logofs << "ReadBuffer: No data read from FD#"
377             << transport_ -> fd() << " with remaining "
378             << remaining_ << ".\n" << logofs_flush;
379   }
380   #endif
381 
382   return bytesRead;
383 }
384 
getMessage(unsigned int & controlLength,unsigned int & dataLength)385 const unsigned char *ReadBuffer::getMessage(unsigned int &controlLength,
386                                                 unsigned int &dataLength)
387 {
388   #ifdef TEST
389 
390   if (transport_ -> pending() > 0)
391   {
392     *logofs << "ReadBuffer: PANIC! The transport "
393             << "appears to have data pending.\n"
394             << logofs_flush;
395 
396     HandleCleanup();
397   }
398 
399   #endif
400 
401   if (length_ == 0)
402   {
403     #ifdef DEBUG
404     *logofs << "ReadBuffer: No message can be located "
405             << "for FD#" << transport_ -> fd() << ".\n"
406             << logofs_flush;
407     #endif
408 
409     if (owner_ == 0)
410     {
411       buffer_ = NULL;
412       size_   = 0;
413 
414       transport_ -> pendingReset();
415 
416       owner_ = 1;
417       start_ = 0;
418     }
419 
420     return NULL;
421   }
422 
423   unsigned int trailerLength;
424 
425   #ifdef DEBUG
426   *logofs << "ReadBuffer: Going to locate message with "
427           << "start at " << start_ << " and length "
428           << length_ << " for FD#" << transport_ -> fd()
429           << ".\n" << logofs_flush;
430   #endif
431 
432   int located = locateMessage(buffer_ + start_, buffer_ + start_ + length_,
433                                   controlLength, dataLength, trailerLength);
434 
435   if (located == 0)
436   {
437     //
438     // No more complete messages are in
439     // the buffer.
440     //
441 
442     #ifdef DEBUG
443     *logofs << "ReadBuffer: No message was located "
444             << "for FD#" << transport_ -> fd()
445             << ".\n" << logofs_flush;
446     #endif
447 
448     if (owner_ == 0)
449     {
450       //
451       // Must move the remaining bytes in
452       // our own buffer.
453       //
454 
455       convertBuffer();
456     }
457 
458     return NULL;
459   }
460   else
461   {
462     const unsigned char *result = buffer_ + start_;
463 
464     if (dataLength > 0)
465     {
466       //
467       // Message contains data, so go to the
468       // first byte of payload.
469       //
470 
471       result += trailerLength;
472 
473       start_  += (dataLength + trailerLength);
474       length_ -= (dataLength + trailerLength);
475     }
476     else
477     {
478       //
479       // It is a control message.
480       //
481 
482       start_  += (controlLength + trailerLength);
483       length_ -= (controlLength + trailerLength);
484     }
485 
486     #ifdef DEBUG
487     *logofs << "ReadBuffer: Located message for FD#"
488             << transport_ -> fd() << " with control length "
489             << controlLength << " and data length "
490             << dataLength << ".\n" << logofs_flush;
491     #endif
492 
493     remaining_ = 0;
494 
495     return result;
496   }
497 }
498 
setSize(int initialReadSize,int maximumBufferSize)499 int ReadBuffer::setSize(int initialReadSize, int maximumBufferSize)
500 {
501   initialReadSize_   = initialReadSize;
502   maximumBufferSize_ = maximumBufferSize;
503 
504   #ifdef TEST
505   *logofs << "ReadBuffer: WARNING! Set buffer parameters to "
506           << initialReadSize_ << "/" << maximumBufferSize_
507           << " for object at "<< this << ".\n"
508           << logofs_flush;
509   #endif
510 
511   return 1;
512 }
513 
fullReset()514 void ReadBuffer::fullReset()
515 {
516   #ifdef TEST
517 
518   if (owner_ == 0)
519   {
520     *logofs << "ReadBuffer: PANIC! Class for FD#"
521             << transport_ -> fd() << " doesn't "
522             << "appear to be the owner of the buffer "
523             << "in reset.\n" << logofs_flush;
524 
525     HandleCleanup();
526   }
527 
528   #endif
529 
530   if (length_ == 0 && size_ > maximumBufferSize_)
531   {
532     #ifdef TEST
533     *logofs << "ReadBuffer: Resizing buffer for FD#"
534             << transport_ -> fd() << " in reset from "
535             << size_ << " to " << maximumBufferSize_
536             << " bytes.\n" << logofs_flush;
537     #endif
538 
539     delete [] buffer_;
540 
541     int newSize = maximumBufferSize_;
542 
543     unsigned char *newBuffer = allocateBuffer(newSize);
544 
545     buffer_ = newBuffer;
546     size_   = newSize;
547 
548     transport_ -> pendingReset();
549 
550     owner_ = 1;
551     start_ = 0;
552   }
553 }
554 
allocateBuffer(unsigned int newSize)555 unsigned char *ReadBuffer::allocateBuffer(unsigned int newSize)
556 {
557   unsigned char *newBuffer = new unsigned char[newSize];
558 
559   if (newBuffer == NULL)
560   {
561     #ifdef PANIC
562     *logofs << "ReadBuffer: PANIC! Can't allocate "
563             << newSize << " bytes of memory for buffer "
564             << "in context [B].\n" << logofs_flush;
565     #endif
566 
567     cerr << "Error" << ": Can't allocate memory for "
568          << "read buffer in context [B].\n";
569 
570     HandleCleanup();
571   }
572 
573   #ifdef VALGRIND
574 
575   memset(newBuffer, '\0', newSize);
576 
577   #endif
578 
579   return newBuffer;
580 }
581 
appendBuffer(const unsigned char * message,unsigned int length)582 void ReadBuffer::appendBuffer(const unsigned char *message, unsigned int length)
583 {
584   if (start_ + length_ + length > size_)
585   {
586     unsigned int newSize = length_ + length + initialReadSize_;
587 
588     #ifdef TEST
589     *logofs << "ReadBuffer: WARNING! Resizing buffer "
590             << "for FD#" << transport_ -> fd()
591             << " from " << size_ << " to " << newSize
592             << " bytes.\n" << logofs_flush;
593     #endif
594 
595     unsigned char *newBuffer = allocateBuffer(newSize);
596 
597     memcpy(newBuffer, buffer_ + start_, length_);
598 
599     delete [] buffer_;
600 
601     buffer_ = newBuffer;
602     size_   = newSize;
603 
604     start_ = 0;
605   }
606 
607   memcpy(buffer_ + start_ + length_, message, length);
608 
609   length_ += length;
610 
611   transport_ -> pendingReset();
612 
613   owner_ = 1;
614 }
615 
convertBuffer()616 void ReadBuffer::convertBuffer()
617 {
618   unsigned int newSize = length_ + initialReadSize_;
619 
620   #ifdef TEST
621   *logofs << "ReadBuffer: WARNING! Converting "
622           << length_ << " bytes to own buffer "
623           << "for FD#" << transport_ -> fd()
624           << " with new size " << newSize
625           << " bytes.\n" << logofs_flush;
626   #endif
627 
628   unsigned char *newBuffer = allocateBuffer(newSize);
629 
630   memcpy(newBuffer, buffer_ + start_, length_);
631 
632   buffer_ = newBuffer;
633   size_   = newSize;
634 
635   transport_ -> pendingReset();
636 
637   owner_ = 1;
638   start_ = 0;
639 }
640