1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
21 #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1
22 
23 #include <cstdlib>
24 #include <cstring>
25 #include <limits>
26 #include <boost/scoped_array.hpp>
27 
28 #include <thrift/transport/TTransport.h>
29 #include <thrift/transport/TVirtualTransport.h>
30 
31 #ifdef __GNUC__
32 #define TDB_LIKELY(val) (__builtin_expect((val), 1))
33 #define TDB_UNLIKELY(val) (__builtin_expect((val), 0))
34 #else
35 #define TDB_LIKELY(val) (val)
36 #define TDB_UNLIKELY(val) (val)
37 #endif
38 
39 namespace apache {
40 namespace thrift {
41 namespace transport {
42 
43 /**
44  * Base class for all transports that use read/write buffers for performance.
45  *
46  * TBufferBase is designed to implement the fast-path "memcpy" style
47  * operations that work in the common case.  It does so with small and
48  * (eventually) nonvirtual, inlinable methods.  TBufferBase is an abstract
49  * class.  Subclasses are expected to define the "slow path" operations
50  * that have to be done when the buffers are full or empty.
51  *
52  */
53 class TBufferBase : public TVirtualTransport<TBufferBase> {
54 
55 public:
56   /**
57    * Fast-path read.
58    *
59    * When we have enough data buffered to fulfill the read, we can satisfy it
60    * with a single memcpy, then adjust our internal pointers.  If the buffer
61    * is empty, we call out to our slow path, implemented by a subclass.
62    * This method is meant to eventually be nonvirtual and inlinable.
63    */
read(uint8_t * buf,uint32_t len)64   uint32_t read(uint8_t* buf, uint32_t len) {
65     checkReadBytesAvailable(len);
66     uint8_t* new_rBase = rBase_ + len;
67     if (TDB_LIKELY(new_rBase <= rBound_)) {
68       std::memcpy(buf, rBase_, len);
69       rBase_ = new_rBase;
70       return len;
71     }
72     return readSlow(buf, len);
73   }
74 
75   /**
76    * Shortcutted version of readAll.
77    */
readAll(uint8_t * buf,uint32_t len)78   uint32_t readAll(uint8_t* buf, uint32_t len) {
79     uint8_t* new_rBase = rBase_ + len;
80     if (TDB_LIKELY(new_rBase <= rBound_)) {
81       std::memcpy(buf, rBase_, len);
82       rBase_ = new_rBase;
83       return len;
84     }
85     return apache::thrift::transport::readAll(*this, buf, len);
86   }
87 
88   /**
89    * Fast-path write.
90    *
91    * When we have enough empty space in our buffer to accommodate the write, we
92    * can satisfy it with a single memcpy, then adjust our internal pointers.
93    * If the buffer is full, we call out to our slow path, implemented by a
94    * subclass.  This method is meant to eventually be nonvirtual and
95    * inlinable.
96    */
write(const uint8_t * buf,uint32_t len)97   void write(const uint8_t* buf, uint32_t len) {
98     uint8_t* new_wBase = wBase_ + len;
99     if (TDB_LIKELY(new_wBase <= wBound_)) {
100       std::memcpy(wBase_, buf, len);
101       wBase_ = new_wBase;
102       return;
103     }
104     writeSlow(buf, len);
105   }
106 
107   /**
108    * Fast-path borrow.  A lot like the fast-path read.
109    */
borrow(uint8_t * buf,uint32_t * len)110   const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
111     if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) {
112       // With strict aliasing, writing to len shouldn't force us to
113       // refetch rBase_ from memory.  TODO(dreiss): Verify this.
114       *len = static_cast<uint32_t>(rBound_ - rBase_);
115       return rBase_;
116     }
117     return borrowSlow(buf, len);
118   }
119 
120   /**
121    * Consume doesn't require a slow path.
122    */
consume(uint32_t len)123   void consume(uint32_t len) {
124     countConsumedMessageBytes(len);
125     if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) {
126       rBase_ += len;
127     } else {
128       throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow.");
129     }
130   }
131 
132 protected:
133   /// Slow path read.
134   virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0;
135 
136   /// Slow path write.
137   virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0;
138 
139   /**
140    * Slow path borrow.
141    *
142    * POSTCONDITION: return == nullptr || rBound_ - rBase_ >= *len
143    */
144   virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0;
145 
146   /**
147    * Trivial constructor.
148    *
149    * Initialize pointers safely.  Constructing is not a very
150    * performance-sensitive operation, so it is okay to just leave it to
151    * the concrete class to set up pointers correctly.
152    */
153   TBufferBase(std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)154     : TVirtualTransport(config), rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {}
155 
156   /// Convenience mutator for setting the read buffer.
setReadBuffer(uint8_t * buf,uint32_t len)157   void setReadBuffer(uint8_t* buf, uint32_t len) {
158     rBase_ = buf;
159     rBound_ = buf + len;
160   }
161 
162   /// Convenience mutator for setting the write buffer.
setWriteBuffer(uint8_t * buf,uint32_t len)163   void setWriteBuffer(uint8_t* buf, uint32_t len) {
164     wBase_ = buf;
165     wBound_ = buf + len;
166   }
167 
168   ~TBufferBase() override = default;
169 
170   /// Reads begin here.
171   uint8_t* rBase_;
172   /// Reads may extend to just before here.
173   uint8_t* rBound_;
174 
175   /// Writes begin here.
176   uint8_t* wBase_;
177   /// Writes may extend to just before here.
178   uint8_t* wBound_;
179 };
180 
181 /**
182  * Buffered transport. For reads it will read more data than is requested
183  * and will serve future data out of a local buffer. For writes, data is
184  * stored to an in memory buffer before being written out.
185  *
186  */
187 class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> {
188 public:
189   static const int DEFAULT_BUFFER_SIZE = 512;
190 
191   /// Use default buffer sizes.
192   TBufferedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)193     : TVirtualTransport(config),
194       transport_(transport),
195       rBufSize_(DEFAULT_BUFFER_SIZE),
196       wBufSize_(DEFAULT_BUFFER_SIZE),
197       rBuf_(new uint8_t[rBufSize_]),
198       wBuf_(new uint8_t[wBufSize_]) {
199     initPointers();
200   }
201 
202   /// Use specified buffer sizes.
203   TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)204     : TVirtualTransport(config),
205       transport_(transport),
206       rBufSize_(sz),
207       wBufSize_(sz),
208       rBuf_(new uint8_t[rBufSize_]),
209       wBuf_(new uint8_t[wBufSize_]) {
210     initPointers();
211   }
212 
213   /// Use specified read and write buffer sizes.
214   TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz,
215                      std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)216     : TVirtualTransport(config),
217       transport_(transport),
218       rBufSize_(rsz),
219       wBufSize_(wsz),
220       rBuf_(new uint8_t[rBufSize_]),
221       wBuf_(new uint8_t[wBufSize_]) {
222     initPointers();
223   }
224 
open()225   void open() override { transport_->open(); }
226 
isOpen()227   bool isOpen() const override { return transport_->isOpen(); }
228 
peek()229   bool peek() override {
230     if (rBase_ == rBound_) {
231       setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
232     }
233     return (rBound_ > rBase_);
234   }
235 
close()236   void close() override {
237     flush();
238     transport_->close();
239   }
240 
241   uint32_t readSlow(uint8_t* buf, uint32_t len) override;
242 
243   void writeSlow(const uint8_t* buf, uint32_t len) override;
244 
245   void flush() override;
246 
247   /**
248    * Returns the origin of the underlying transport
249    */
getOrigin()250   const std::string getOrigin() const override { return transport_->getOrigin(); }
251 
252   /**
253    * The following behavior is currently implemented by TBufferedTransport,
254    * but that may change in a future version:
255    * 1/ If len is at most rBufSize_, borrow will never return nullptr.
256    *    Depending on the underlying transport, it could throw an exception
257    *    or hang forever.
258    * 2/ Some borrow requests may copy bytes internally.  However,
259    *    if len is at most rBufSize_/2, none of the copied bytes
260    *    will ever have to be copied again.  For optimial performance,
261    *    stay under this limit.
262    */
263   const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
264 
getUnderlyingTransport()265   std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
266 
267   /*
268    * TVirtualTransport provides a default implementation of readAll().
269    * We want to use the TBufferBase version instead.
270    */
readAll(uint8_t * buf,uint32_t len)271   uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
272 
273 protected:
initPointers()274   void initPointers() {
275     setReadBuffer(rBuf_.get(), 0);
276     setWriteBuffer(wBuf_.get(), wBufSize_);
277     // Write size never changes.
278   }
279 
280   std::shared_ptr<TTransport> transport_;
281 
282   uint32_t rBufSize_;
283   uint32_t wBufSize_;
284   boost::scoped_array<uint8_t> rBuf_;
285   boost::scoped_array<uint8_t> wBuf_;
286 };
287 
288 /**
289  * Wraps a transport into a buffered one.
290  *
291  */
292 class TBufferedTransportFactory : public TTransportFactory {
293 public:
294   TBufferedTransportFactory() = default;
295 
296   ~TBufferedTransportFactory() override = default;
297 
298   /**
299    * Wraps the transport into a buffered one.
300    */
getTransport(std::shared_ptr<TTransport> trans)301   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
302     return std::shared_ptr<TTransport>(new TBufferedTransport(trans));
303   }
304 };
305 
306 /**
307  * Framed transport. All writes go into an in-memory buffer until flush is
308  * called, at which point the transport writes the length of the entire
309  * binary chunk followed by the data payload. This allows the receiver on the
310  * other end to always do fixed-length reads.
311  *
312  */
313 class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> {
314 public:
315   static const int DEFAULT_BUFFER_SIZE = 512;
316   static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
317 
318   /// Use default buffer sizes.
319   TFramedTransport(std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)320     : TVirtualTransport(config),
321       transport_(),
322       rBufSize_(0),
323       wBufSize_(DEFAULT_BUFFER_SIZE),
324       rBuf_(),
325       wBuf_(new uint8_t[wBufSize_]),
326       bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) {
327     initPointers();
328   }
329 
330   TFramedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)331     : TVirtualTransport(config),
332       transport_(transport),
333       rBufSize_(0),
334       wBufSize_(DEFAULT_BUFFER_SIZE),
335       rBuf_(),
336       wBuf_(new uint8_t[wBufSize_]),
337       bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()),
338       maxFrameSize_(configuration_->getMaxFrameSize()) {
339     initPointers();
340   }
341 
342   TFramedTransport(std::shared_ptr<TTransport> transport,
343                    uint32_t sz,
344                    uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)(),
345                    std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)346     : TVirtualTransport(config),
347       transport_(transport),
348       rBufSize_(0),
349       wBufSize_(sz),
350       rBuf_(),
351       wBuf_(new uint8_t[wBufSize_]),
352       bufReclaimThresh_(bufReclaimThresh),
353       maxFrameSize_(configuration_->getMaxFrameSize()) {
354     initPointers();
355   }
356 
open()357   void open() override { transport_->open(); }
358 
isOpen()359   bool isOpen() const override { return transport_->isOpen(); }
360 
peek()361   bool peek() override { return (rBase_ < rBound_) || transport_->peek(); }
362 
close()363   void close() override {
364     flush();
365     transport_->close();
366   }
367 
368   uint32_t readSlow(uint8_t* buf, uint32_t len) override;
369 
370   void writeSlow(const uint8_t* buf, uint32_t len) override;
371 
372   void flush() override;
373 
374   uint32_t readEnd() override;
375 
376   uint32_t writeEnd() override;
377 
378   const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
379 
getUnderlyingTransport()380   std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
381 
382   /*
383    * TVirtualTransport provides a default implementation of readAll().
384    * We want to use the TBufferBase version instead.
385    */
386   using TBufferBase::readAll;
387 
388   /**
389    * Returns the origin of the underlying transport
390    */
getOrigin()391   const std::string getOrigin() const override { return transport_->getOrigin(); }
392 
393   /**
394    * Set the maximum size of the frame at read
395    */
setMaxFrameSize(uint32_t maxFrameSize)396   void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
397 
398   /**
399    * Get the maximum size of the frame at read
400    */
getMaxFrameSize()401   uint32_t getMaxFrameSize() { return maxFrameSize_; }
402 
403 protected:
404   /**
405    * Reads a frame of input from the underlying stream.
406    *
407    * Returns true if a frame was read successfully, or false on EOF.
408    * (Raises a TTransportException if EOF occurs after a partial frame.)
409    */
410   virtual bool readFrame();
411 
initPointers()412   void initPointers() {
413     setReadBuffer(nullptr, 0);
414     setWriteBuffer(wBuf_.get(), wBufSize_);
415 
416     // Pad the buffer so we can insert the size later.
417     int32_t pad = 0;
418     this->write((uint8_t*)&pad, sizeof(pad));
419   }
420 
421   std::shared_ptr<TTransport> transport_;
422 
423   uint32_t rBufSize_;
424   uint32_t wBufSize_;
425   boost::scoped_array<uint8_t> rBuf_;
426   boost::scoped_array<uint8_t> wBuf_;
427   uint32_t bufReclaimThresh_;
428   uint32_t maxFrameSize_;
429 };
430 
431 /**
432  * Wraps a transport into a framed one.
433  *
434  */
435 class TFramedTransportFactory : public TTransportFactory {
436 public:
437   TFramedTransportFactory() = default;
438 
439   ~TFramedTransportFactory() override = default;
440 
441   /**
442    * Wraps the transport into a framed one.
443    */
getTransport(std::shared_ptr<TTransport> trans)444   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
445     return std::shared_ptr<TTransport>(new TFramedTransport(trans));
446   }
447 };
448 
449 /**
450  * A memory buffer is a tranpsort that simply reads from and writes to an
451  * in memory buffer. Anytime you call write on it, the data is simply placed
452  * into a buffer, and anytime you call read, data is read from that buffer.
453  *
454  * The buffers are allocated using C constructs malloc,realloc, and the size
455  * doubles as necessary.  We've considered using scoped
456  *
457  */
458 class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> {
459 private:
460   // Common initialization done by all constructors.
initCommon(uint8_t * buf,uint32_t size,bool owner,uint32_t wPos)461   void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) {
462 
463     maxBufferSize_ = (std::numeric_limits<uint32_t>::max)();
464 
465     if (buf == nullptr && size != 0) {
466       assert(owner);
467       buf = (uint8_t*)std::malloc(size);
468       if (buf == nullptr) {
469 	throw std::bad_alloc();
470       }
471     }
472 
473     buffer_ = buf;
474     bufferSize_ = size;
475 
476     rBase_ = buffer_;
477     rBound_ = buffer_ + wPos;
478     // TODO(dreiss): Investigate NULL-ing this if !owner.
479     wBase_ = buffer_ + wPos;
480     wBound_ = buffer_ + bufferSize_;
481 
482     owner_ = owner;
483 
484     // rBound_ is really an artifact.  In principle, it should always be
485     // equal to wBase_.  We update it in a few places (computeRead, etc.).
486   }
487 
488 public:
489   static const uint32_t defaultSize = 1024;
490 
491   /**
492    * This enum specifies how a TMemoryBuffer should treat
493    * memory passed to it via constructors or resetBuffer.
494    *
495    * OBSERVE:
496    *   TMemoryBuffer will simply store a pointer to the memory.
497    *   It is the callers responsibility to ensure that the pointer
498    *   remains valid for the lifetime of the TMemoryBuffer,
499    *   and that it is properly cleaned up.
500    *   Note that no data can be written to observed buffers.
501    *
502    * COPY:
503    *   TMemoryBuffer will make an internal copy of the buffer.
504    *   The caller has no responsibilities.
505    *
506    * TAKE_OWNERSHIP:
507    *   TMemoryBuffer will become the "owner" of the buffer,
508    *   and will be responsible for freeing it.
509    *   The membory must have been allocated with malloc.
510    */
511   enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 };
512 
513   /**
514    * Construct a TMemoryBuffer with a default-sized buffer,
515    * owned by the TMemoryBuffer object.
516    */
517   TMemoryBuffer(std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)518     : TVirtualTransport(config) {
519     initCommon(nullptr, defaultSize, true, 0);
520   }
521 
522   /**
523    * Construct a TMemoryBuffer with a buffer of a specified size,
524    * owned by the TMemoryBuffer object.
525    *
526    * @param sz  The initial size of the buffer.
527    */
528   TMemoryBuffer(uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)529     : TVirtualTransport(config) {
530     initCommon(nullptr, sz, true, 0);
531   }
532 
533   /**
534    * Construct a TMemoryBuffer with buf as its initial contents.
535    *
536    * @param buf    The initial contents of the buffer.
537    *               Note that, while buf is a non-const pointer,
538    *               TMemoryBuffer will not write to it if policy == OBSERVE,
539    *               so it is safe to const_cast<uint8_t*>(whatever).
540    * @param sz     The size of @c buf.
541    * @param policy See @link MemoryPolicy @endlink .
542    */
543   TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE, std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(config)544     : TVirtualTransport(config) {
545     if (buf == nullptr && sz != 0) {
546       throw TTransportException(TTransportException::BAD_ARGS,
547                                 "TMemoryBuffer given null buffer with non-zero size.");
548     }
549 
550     switch (policy) {
551     case OBSERVE:
552     case TAKE_OWNERSHIP:
553       initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz);
554       break;
555     case COPY:
556       initCommon(nullptr, sz, true, 0);
557       this->write(buf, sz);
558       break;
559     default:
560       throw TTransportException(TTransportException::BAD_ARGS,
561                                 "Invalid MemoryPolicy for TMemoryBuffer");
562     }
563   }
564 
~TMemoryBuffer()565   ~TMemoryBuffer() override {
566     if (owner_) {
567       std::free(buffer_);
568     }
569   }
570 
isOpen()571   bool isOpen() const override { return true; }
572 
peek()573   bool peek() override { return (rBase_ < wBase_); }
574 
open()575   void open() override {}
576 
close()577   void close() override {}
578 
579   // TODO(dreiss): Make bufPtr const.
getBuffer(uint8_t ** bufPtr,uint32_t * sz)580   void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
581     *bufPtr = rBase_;
582     *sz = static_cast<uint32_t>(wBase_ - rBase_);
583   }
584 
getBufferAsString()585   std::string getBufferAsString() {
586     if (buffer_ == nullptr) {
587       return "";
588     }
589     uint8_t* buf;
590     uint32_t sz;
591     getBuffer(&buf, &sz);
592     return std::string((char*)buf, (std::string::size_type)sz);
593   }
594 
appendBufferToString(std::string & str)595   void appendBufferToString(std::string& str) {
596     if (buffer_ == nullptr) {
597       return;
598     }
599     uint8_t* buf;
600     uint32_t sz;
601     getBuffer(&buf, &sz);
602     str.append((char*)buf, sz);
603   }
604 
resetBuffer()605   void resetBuffer() {
606     rBase_ = buffer_;
607     rBound_ = buffer_;
608     wBase_ = buffer_;
609     // It isn't safe to write into a buffer we don't own.
610     if (!owner_) {
611       wBound_ = wBase_;
612       bufferSize_ = 0;
613     }
614   }
615 
616   /// See constructor documentation.
617   void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) {
618     // Use a variant of the copy-and-swap trick for assignment operators.
619     // This is sub-optimal in terms of performance for two reasons:
620     //   1/ The constructing and swapping of the (small) values
621     //      in the temporary object takes some time, and is not necessary.
622     //   2/ If policy == COPY, we allocate the new buffer before
623     //      freeing the old one, precluding the possibility of
624     //      reusing that memory.
625     // I doubt that either of these problems could be optimized away,
626     // but the second is probably no a common case, and the first is minor.
627     // I don't expect resetBuffer to be a common operation, so I'm willing to
628     // bite the performance bullet to make the method this simple.
629 
630     // Construct the new buffer.
631     TMemoryBuffer new_buffer(buf, sz, policy);
632     // Move it into ourself.
633     this->swap(new_buffer);
634     // Our old self gets destroyed.
635   }
636 
637   /// See constructor documentation.
resetBuffer(uint32_t sz)638   void resetBuffer(uint32_t sz) {
639     // Construct the new buffer.
640     TMemoryBuffer new_buffer(sz);
641     // Move it into ourself.
642     this->swap(new_buffer);
643     // Our old self gets destroyed.
644   }
645 
readAsString(uint32_t len)646   std::string readAsString(uint32_t len) {
647     std::string str;
648     (void)readAppendToString(str, len);
649     return str;
650   }
651 
652   uint32_t readAppendToString(std::string& str, uint32_t len);
653 
654   // return number of bytes read
readEnd()655   uint32_t readEnd() override {
656     // This cast should be safe, because buffer_'s size is a uint32_t
657     auto bytes = static_cast<uint32_t>(rBase_ - buffer_);
658     if (rBase_ == wBase_) {
659       resetBuffer();
660     }
661     return bytes;
662   }
663 
664   // Return number of bytes written
writeEnd()665   uint32_t writeEnd() override {
666     // This cast should be safe, because buffer_'s size is a uint32_t
667     return static_cast<uint32_t>(wBase_ - buffer_);
668   }
669 
available_read()670   uint32_t available_read() const {
671     // Remember, wBase_ is the real rBound_.
672     return static_cast<uint32_t>(wBase_ - rBase_);
673   }
674 
available_write()675   uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); }
676 
677   // Returns a pointer to where the client can write data to append to
678   // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a
679   // write of the provided length.  The returned pointer is very convenient for
680   // passing to read(), recv(), or similar. You must call wroteBytes() as soon
681   // as data is written or the buffer will not be aware that data has changed.
getWritePtr(uint32_t len)682   uint8_t* getWritePtr(uint32_t len) {
683     ensureCanWrite(len);
684     return wBase_;
685   }
686 
687   // Informs the buffer that the client has written 'len' bytes into storage
688   // that had been provided by getWritePtr().
689   void wroteBytes(uint32_t len);
690 
691   /*
692    * TVirtualTransport provides a default implementation of readAll().
693    * We want to use the TBufferBase version instead.
694    */
readAll(uint8_t * buf,uint32_t len)695   uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); }
696 
697   //! \brief Get the current buffer size
698   //! \returns the current buffer size
getBufferSize()699   uint32_t getBufferSize() const {
700     return bufferSize_;
701   }
702 
703   //! \brief Get the current maximum buffer size
704   //! \returns the current maximum buffer size
getMaxBufferSize()705   uint32_t getMaxBufferSize() const {
706     return maxBufferSize_;
707   }
708 
709   //! \brief Change the maximum buffer size
710   //! \param[in]  maxSize  the new maximum buffer size allowed to grow to
711   //! \throws  TTransportException(BAD_ARGS) if maxSize is less than the current buffer size
setMaxBufferSize(uint32_t maxSize)712   void setMaxBufferSize(uint32_t maxSize) {
713     if (maxSize < bufferSize_) {
714       throw TTransportException(TTransportException::BAD_ARGS,
715                                 "Maximum buffer size would be less than current buffer size");
716     }
717     maxBufferSize_ = maxSize;
718   }
719 
720 protected:
swap(TMemoryBuffer & that)721   void swap(TMemoryBuffer& that) {
722     using std::swap;
723     swap(buffer_, that.buffer_);
724     swap(bufferSize_, that.bufferSize_);
725 
726     swap(rBase_, that.rBase_);
727     swap(rBound_, that.rBound_);
728     swap(wBase_, that.wBase_);
729     swap(wBound_, that.wBound_);
730 
731     swap(owner_, that.owner_);
732   }
733 
734   // Make sure there's at least 'len' bytes available for writing.
735   void ensureCanWrite(uint32_t len);
736 
737   // Compute the position and available data for reading.
738   void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give);
739 
740   uint32_t readSlow(uint8_t* buf, uint32_t len) override;
741 
742   void writeSlow(const uint8_t* buf, uint32_t len) override;
743 
744   const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override;
745 
746   // Data buffer
747   uint8_t* buffer_;
748 
749   // Allocated buffer size
750   uint32_t bufferSize_;
751 
752   // Maximum allowed size
753   uint32_t maxBufferSize_;
754 
755   // Is this object the owner of the buffer?
756   bool owner_;
757 
758   // Don't forget to update constrctors, initCommon, and swap if
759   // you add new members.
760 };
761 }
762 }
763 } // apache::thrift::transport
764 
765 #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
766