1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 21 #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 22 23 #include <cstdlib> 24 #include <cstring> 25 #include <limits> 26 #include <boost/scoped_array.hpp> 27 28 #include <thrift/transport/TTransport.h> 29 #include <thrift/transport/TVirtualTransport.h> 30 31 #ifdef __GNUC__ 32 #define TDB_LIKELY(val) (__builtin_expect((val), 1)) 33 #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) 34 #else 35 #define TDB_LIKELY(val) (val) 36 #define TDB_UNLIKELY(val) (val) 37 #endif 38 39 namespace apache { 40 namespace thrift { 41 namespace transport { 42 43 /** 44 * Base class for all transports that use read/write buffers for performance. 45 * 46 * TBufferBase is designed to implement the fast-path "memcpy" style 47 * operations that work in the common case. It does so with small and 48 * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract 49 * class. Subclasses are expected to define the "slow path" operations 50 * that have to be done when the buffers are full or empty. 51 * 52 */ 53 class TBufferBase : public TVirtualTransport<TBufferBase> { 54 55 public: 56 /** 57 * Fast-path read. 58 * 59 * When we have enough data buffered to fulfill the read, we can satisfy it 60 * with a single memcpy, then adjust our internal pointers. If the buffer 61 * is empty, we call out to our slow path, implemented by a subclass. 62 * This method is meant to eventually be nonvirtual and inlinable. 63 */ read(uint8_t * buf,uint32_t len)64 uint32_t read(uint8_t* buf, uint32_t len) { 65 checkReadBytesAvailable(len); 66 uint8_t* new_rBase = rBase_ + len; 67 if (TDB_LIKELY(new_rBase <= rBound_)) { 68 std::memcpy(buf, rBase_, len); 69 rBase_ = new_rBase; 70 return len; 71 } 72 return readSlow(buf, len); 73 } 74 75 /** 76 * Shortcutted version of readAll. 77 */ readAll(uint8_t * buf,uint32_t len)78 uint32_t readAll(uint8_t* buf, uint32_t len) { 79 uint8_t* new_rBase = rBase_ + len; 80 if (TDB_LIKELY(new_rBase <= rBound_)) { 81 std::memcpy(buf, rBase_, len); 82 rBase_ = new_rBase; 83 return len; 84 } 85 return apache::thrift::transport::readAll(*this, buf, len); 86 } 87 88 /** 89 * Fast-path write. 90 * 91 * When we have enough empty space in our buffer to accommodate the write, we 92 * can satisfy it with a single memcpy, then adjust our internal pointers. 93 * If the buffer is full, we call out to our slow path, implemented by a 94 * subclass. This method is meant to eventually be nonvirtual and 95 * inlinable. 96 */ write(const uint8_t * buf,uint32_t len)97 void write(const uint8_t* buf, uint32_t len) { 98 uint8_t* new_wBase = wBase_ + len; 99 if (TDB_LIKELY(new_wBase <= wBound_)) { 100 std::memcpy(wBase_, buf, len); 101 wBase_ = new_wBase; 102 return; 103 } 104 writeSlow(buf, len); 105 } 106 107 /** 108 * Fast-path borrow. A lot like the fast-path read. 109 */ borrow(uint8_t * buf,uint32_t * len)110 const uint8_t* borrow(uint8_t* buf, uint32_t* len) { 111 if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { 112 // With strict aliasing, writing to len shouldn't force us to 113 // refetch rBase_ from memory. TODO(dreiss): Verify this. 114 *len = static_cast<uint32_t>(rBound_ - rBase_); 115 return rBase_; 116 } 117 return borrowSlow(buf, len); 118 } 119 120 /** 121 * Consume doesn't require a slow path. 122 */ consume(uint32_t len)123 void consume(uint32_t len) { 124 countConsumedMessageBytes(len); 125 if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { 126 rBase_ += len; 127 } else { 128 throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow."); 129 } 130 } 131 132 protected: 133 /// Slow path read. 134 virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; 135 136 /// Slow path write. 137 virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; 138 139 /** 140 * Slow path borrow. 141 * 142 * POSTCONDITION: return == nullptr || rBound_ - rBase_ >= *len 143 */ 144 virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; 145 146 /** 147 * Trivial constructor. 148 * 149 * Initialize pointers safely. Constructing is not a very 150 * performance-sensitive operation, so it is okay to just leave it to 151 * the concrete class to set up pointers correctly. 152 */ 153 TBufferBase(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)154 : TVirtualTransport(config), rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {} 155 156 /// Convenience mutator for setting the read buffer. setReadBuffer(uint8_t * buf,uint32_t len)157 void setReadBuffer(uint8_t* buf, uint32_t len) { 158 rBase_ = buf; 159 rBound_ = buf + len; 160 } 161 162 /// Convenience mutator for setting the write buffer. setWriteBuffer(uint8_t * buf,uint32_t len)163 void setWriteBuffer(uint8_t* buf, uint32_t len) { 164 wBase_ = buf; 165 wBound_ = buf + len; 166 } 167 168 ~TBufferBase() override = default; 169 170 /// Reads begin here. 171 uint8_t* rBase_; 172 /// Reads may extend to just before here. 173 uint8_t* rBound_; 174 175 /// Writes begin here. 176 uint8_t* wBase_; 177 /// Writes may extend to just before here. 178 uint8_t* wBound_; 179 }; 180 181 /** 182 * Buffered transport. For reads it will read more data than is requested 183 * and will serve future data out of a local buffer. For writes, data is 184 * stored to an in memory buffer before being written out. 185 * 186 */ 187 class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> { 188 public: 189 static const int DEFAULT_BUFFER_SIZE = 512; 190 191 /// Use default buffer sizes. 192 TBufferedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)193 : TVirtualTransport(config), 194 transport_(transport), 195 rBufSize_(DEFAULT_BUFFER_SIZE), 196 wBufSize_(DEFAULT_BUFFER_SIZE), 197 rBuf_(new uint8_t[rBufSize_]), 198 wBuf_(new uint8_t[wBufSize_]) { 199 initPointers(); 200 } 201 202 /// Use specified buffer sizes. 203 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)204 : TVirtualTransport(config), 205 transport_(transport), 206 rBufSize_(sz), 207 wBufSize_(sz), 208 rBuf_(new uint8_t[rBufSize_]), 209 wBuf_(new uint8_t[wBufSize_]) { 210 initPointers(); 211 } 212 213 /// Use specified read and write buffer sizes. 214 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz, 215 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)216 : TVirtualTransport(config), 217 transport_(transport), 218 rBufSize_(rsz), 219 wBufSize_(wsz), 220 rBuf_(new uint8_t[rBufSize_]), 221 wBuf_(new uint8_t[wBufSize_]) { 222 initPointers(); 223 } 224 open()225 void open() override { transport_->open(); } 226 isOpen()227 bool isOpen() const override { return transport_->isOpen(); } 228 peek()229 bool peek() override { 230 if (rBase_ == rBound_) { 231 setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); 232 } 233 return (rBound_ > rBase_); 234 } 235 close()236 void close() override { 237 flush(); 238 transport_->close(); 239 } 240 241 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 242 243 void writeSlow(const uint8_t* buf, uint32_t len) override; 244 245 void flush() override; 246 247 /** 248 * Returns the origin of the underlying transport 249 */ getOrigin()250 const std::string getOrigin() const override { return transport_->getOrigin(); } 251 252 /** 253 * The following behavior is currently implemented by TBufferedTransport, 254 * but that may change in a future version: 255 * 1/ If len is at most rBufSize_, borrow will never return nullptr. 256 * Depending on the underlying transport, it could throw an exception 257 * or hang forever. 258 * 2/ Some borrow requests may copy bytes internally. However, 259 * if len is at most rBufSize_/2, none of the copied bytes 260 * will ever have to be copied again. For optimial performance, 261 * stay under this limit. 262 */ 263 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 264 getUnderlyingTransport()265 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } 266 267 /* 268 * TVirtualTransport provides a default implementation of readAll(). 269 * We want to use the TBufferBase version instead. 270 */ readAll(uint8_t * buf,uint32_t len)271 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } 272 273 protected: initPointers()274 void initPointers() { 275 setReadBuffer(rBuf_.get(), 0); 276 setWriteBuffer(wBuf_.get(), wBufSize_); 277 // Write size never changes. 278 } 279 280 std::shared_ptr<TTransport> transport_; 281 282 uint32_t rBufSize_; 283 uint32_t wBufSize_; 284 boost::scoped_array<uint8_t> rBuf_; 285 boost::scoped_array<uint8_t> wBuf_; 286 }; 287 288 /** 289 * Wraps a transport into a buffered one. 290 * 291 */ 292 class TBufferedTransportFactory : public TTransportFactory { 293 public: 294 TBufferedTransportFactory() = default; 295 296 ~TBufferedTransportFactory() override = default; 297 298 /** 299 * Wraps the transport into a buffered one. 300 */ getTransport(std::shared_ptr<TTransport> trans)301 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { 302 return std::shared_ptr<TTransport>(new TBufferedTransport(trans)); 303 } 304 }; 305 306 /** 307 * Framed transport. All writes go into an in-memory buffer until flush is 308 * called, at which point the transport writes the length of the entire 309 * binary chunk followed by the data payload. This allows the receiver on the 310 * other end to always do fixed-length reads. 311 * 312 */ 313 class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { 314 public: 315 static const int DEFAULT_BUFFER_SIZE = 512; 316 static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; 317 318 /// Use default buffer sizes. 319 TFramedTransport(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)320 : TVirtualTransport(config), 321 transport_(), 322 rBufSize_(0), 323 wBufSize_(DEFAULT_BUFFER_SIZE), 324 rBuf_(), 325 wBuf_(new uint8_t[wBufSize_]), 326 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { 327 initPointers(); 328 } 329 330 TFramedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)331 : TVirtualTransport(config), 332 transport_(transport), 333 rBufSize_(0), 334 wBufSize_(DEFAULT_BUFFER_SIZE), 335 rBuf_(), 336 wBuf_(new uint8_t[wBufSize_]), 337 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()), 338 maxFrameSize_(configuration_->getMaxFrameSize()) { 339 initPointers(); 340 } 341 342 TFramedTransport(std::shared_ptr<TTransport> transport, 343 uint32_t sz, 344 uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)(), 345 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)346 : TVirtualTransport(config), 347 transport_(transport), 348 rBufSize_(0), 349 wBufSize_(sz), 350 rBuf_(), 351 wBuf_(new uint8_t[wBufSize_]), 352 bufReclaimThresh_(bufReclaimThresh), 353 maxFrameSize_(configuration_->getMaxFrameSize()) { 354 initPointers(); 355 } 356 open()357 void open() override { transport_->open(); } 358 isOpen()359 bool isOpen() const override { return transport_->isOpen(); } 360 peek()361 bool peek() override { return (rBase_ < rBound_) || transport_->peek(); } 362 close()363 void close() override { 364 flush(); 365 transport_->close(); 366 } 367 368 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 369 370 void writeSlow(const uint8_t* buf, uint32_t len) override; 371 372 void flush() override; 373 374 uint32_t readEnd() override; 375 376 uint32_t writeEnd() override; 377 378 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 379 getUnderlyingTransport()380 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } 381 382 /* 383 * TVirtualTransport provides a default implementation of readAll(). 384 * We want to use the TBufferBase version instead. 385 */ 386 using TBufferBase::readAll; 387 388 /** 389 * Returns the origin of the underlying transport 390 */ getOrigin()391 const std::string getOrigin() const override { return transport_->getOrigin(); } 392 393 /** 394 * Set the maximum size of the frame at read 395 */ setMaxFrameSize(uint32_t maxFrameSize)396 void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; } 397 398 /** 399 * Get the maximum size of the frame at read 400 */ getMaxFrameSize()401 uint32_t getMaxFrameSize() { return maxFrameSize_; } 402 403 protected: 404 /** 405 * Reads a frame of input from the underlying stream. 406 * 407 * Returns true if a frame was read successfully, or false on EOF. 408 * (Raises a TTransportException if EOF occurs after a partial frame.) 409 */ 410 virtual bool readFrame(); 411 initPointers()412 void initPointers() { 413 setReadBuffer(nullptr, 0); 414 setWriteBuffer(wBuf_.get(), wBufSize_); 415 416 // Pad the buffer so we can insert the size later. 417 int32_t pad = 0; 418 this->write((uint8_t*)&pad, sizeof(pad)); 419 } 420 421 std::shared_ptr<TTransport> transport_; 422 423 uint32_t rBufSize_; 424 uint32_t wBufSize_; 425 boost::scoped_array<uint8_t> rBuf_; 426 boost::scoped_array<uint8_t> wBuf_; 427 uint32_t bufReclaimThresh_; 428 uint32_t maxFrameSize_; 429 }; 430 431 /** 432 * Wraps a transport into a framed one. 433 * 434 */ 435 class TFramedTransportFactory : public TTransportFactory { 436 public: 437 TFramedTransportFactory() = default; 438 439 ~TFramedTransportFactory() override = default; 440 441 /** 442 * Wraps the transport into a framed one. 443 */ getTransport(std::shared_ptr<TTransport> trans)444 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { 445 return std::shared_ptr<TTransport>(new TFramedTransport(trans)); 446 } 447 }; 448 449 /** 450 * A memory buffer is a tranpsort that simply reads from and writes to an 451 * in memory buffer. Anytime you call write on it, the data is simply placed 452 * into a buffer, and anytime you call read, data is read from that buffer. 453 * 454 * The buffers are allocated using C constructs malloc,realloc, and the size 455 * doubles as necessary. We've considered using scoped 456 * 457 */ 458 class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { 459 private: 460 // Common initialization done by all constructors. initCommon(uint8_t * buf,uint32_t size,bool owner,uint32_t wPos)461 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { 462 463 maxBufferSize_ = (std::numeric_limits<uint32_t>::max)(); 464 465 if (buf == nullptr && size != 0) { 466 assert(owner); 467 buf = (uint8_t*)std::malloc(size); 468 if (buf == nullptr) { 469 throw std::bad_alloc(); 470 } 471 } 472 473 buffer_ = buf; 474 bufferSize_ = size; 475 476 rBase_ = buffer_; 477 rBound_ = buffer_ + wPos; 478 // TODO(dreiss): Investigate NULL-ing this if !owner. 479 wBase_ = buffer_ + wPos; 480 wBound_ = buffer_ + bufferSize_; 481 482 owner_ = owner; 483 484 // rBound_ is really an artifact. In principle, it should always be 485 // equal to wBase_. We update it in a few places (computeRead, etc.). 486 } 487 488 public: 489 static const uint32_t defaultSize = 1024; 490 491 /** 492 * This enum specifies how a TMemoryBuffer should treat 493 * memory passed to it via constructors or resetBuffer. 494 * 495 * OBSERVE: 496 * TMemoryBuffer will simply store a pointer to the memory. 497 * It is the callers responsibility to ensure that the pointer 498 * remains valid for the lifetime of the TMemoryBuffer, 499 * and that it is properly cleaned up. 500 * Note that no data can be written to observed buffers. 501 * 502 * COPY: 503 * TMemoryBuffer will make an internal copy of the buffer. 504 * The caller has no responsibilities. 505 * 506 * TAKE_OWNERSHIP: 507 * TMemoryBuffer will become the "owner" of the buffer, 508 * and will be responsible for freeing it. 509 * The membory must have been allocated with malloc. 510 */ 511 enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; 512 513 /** 514 * Construct a TMemoryBuffer with a default-sized buffer, 515 * owned by the TMemoryBuffer object. 516 */ 517 TMemoryBuffer(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)518 : TVirtualTransport(config) { 519 initCommon(nullptr, defaultSize, true, 0); 520 } 521 522 /** 523 * Construct a TMemoryBuffer with a buffer of a specified size, 524 * owned by the TMemoryBuffer object. 525 * 526 * @param sz The initial size of the buffer. 527 */ 528 TMemoryBuffer(uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)529 : TVirtualTransport(config) { 530 initCommon(nullptr, sz, true, 0); 531 } 532 533 /** 534 * Construct a TMemoryBuffer with buf as its initial contents. 535 * 536 * @param buf The initial contents of the buffer. 537 * Note that, while buf is a non-const pointer, 538 * TMemoryBuffer will not write to it if policy == OBSERVE, 539 * so it is safe to const_cast<uint8_t*>(whatever). 540 * @param sz The size of @c buf. 541 * @param policy See @link MemoryPolicy @endlink . 542 */ 543 TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)544 : TVirtualTransport(config) { 545 if (buf == nullptr && sz != 0) { 546 throw TTransportException(TTransportException::BAD_ARGS, 547 "TMemoryBuffer given null buffer with non-zero size."); 548 } 549 550 switch (policy) { 551 case OBSERVE: 552 case TAKE_OWNERSHIP: 553 initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); 554 break; 555 case COPY: 556 initCommon(nullptr, sz, true, 0); 557 this->write(buf, sz); 558 break; 559 default: 560 throw TTransportException(TTransportException::BAD_ARGS, 561 "Invalid MemoryPolicy for TMemoryBuffer"); 562 } 563 } 564 ~TMemoryBuffer()565 ~TMemoryBuffer() override { 566 if (owner_) { 567 std::free(buffer_); 568 } 569 } 570 isOpen()571 bool isOpen() const override { return true; } 572 peek()573 bool peek() override { return (rBase_ < wBase_); } 574 open()575 void open() override {} 576 close()577 void close() override {} 578 579 // TODO(dreiss): Make bufPtr const. getBuffer(uint8_t ** bufPtr,uint32_t * sz)580 void getBuffer(uint8_t** bufPtr, uint32_t* sz) { 581 *bufPtr = rBase_; 582 *sz = static_cast<uint32_t>(wBase_ - rBase_); 583 } 584 getBufferAsString()585 std::string getBufferAsString() { 586 if (buffer_ == nullptr) { 587 return ""; 588 } 589 uint8_t* buf; 590 uint32_t sz; 591 getBuffer(&buf, &sz); 592 return std::string((char*)buf, (std::string::size_type)sz); 593 } 594 appendBufferToString(std::string & str)595 void appendBufferToString(std::string& str) { 596 if (buffer_ == nullptr) { 597 return; 598 } 599 uint8_t* buf; 600 uint32_t sz; 601 getBuffer(&buf, &sz); 602 str.append((char*)buf, sz); 603 } 604 resetBuffer()605 void resetBuffer() { 606 rBase_ = buffer_; 607 rBound_ = buffer_; 608 wBase_ = buffer_; 609 // It isn't safe to write into a buffer we don't own. 610 if (!owner_) { 611 wBound_ = wBase_; 612 bufferSize_ = 0; 613 } 614 } 615 616 /// See constructor documentation. 617 void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { 618 // Use a variant of the copy-and-swap trick for assignment operators. 619 // This is sub-optimal in terms of performance for two reasons: 620 // 1/ The constructing and swapping of the (small) values 621 // in the temporary object takes some time, and is not necessary. 622 // 2/ If policy == COPY, we allocate the new buffer before 623 // freeing the old one, precluding the possibility of 624 // reusing that memory. 625 // I doubt that either of these problems could be optimized away, 626 // but the second is probably no a common case, and the first is minor. 627 // I don't expect resetBuffer to be a common operation, so I'm willing to 628 // bite the performance bullet to make the method this simple. 629 630 // Construct the new buffer. 631 TMemoryBuffer new_buffer(buf, sz, policy); 632 // Move it into ourself. 633 this->swap(new_buffer); 634 // Our old self gets destroyed. 635 } 636 637 /// See constructor documentation. resetBuffer(uint32_t sz)638 void resetBuffer(uint32_t sz) { 639 // Construct the new buffer. 640 TMemoryBuffer new_buffer(sz); 641 // Move it into ourself. 642 this->swap(new_buffer); 643 // Our old self gets destroyed. 644 } 645 readAsString(uint32_t len)646 std::string readAsString(uint32_t len) { 647 std::string str; 648 (void)readAppendToString(str, len); 649 return str; 650 } 651 652 uint32_t readAppendToString(std::string& str, uint32_t len); 653 654 // return number of bytes read readEnd()655 uint32_t readEnd() override { 656 // This cast should be safe, because buffer_'s size is a uint32_t 657 auto bytes = static_cast<uint32_t>(rBase_ - buffer_); 658 if (rBase_ == wBase_) { 659 resetBuffer(); 660 } 661 return bytes; 662 } 663 664 // Return number of bytes written writeEnd()665 uint32_t writeEnd() override { 666 // This cast should be safe, because buffer_'s size is a uint32_t 667 return static_cast<uint32_t>(wBase_ - buffer_); 668 } 669 available_read()670 uint32_t available_read() const { 671 // Remember, wBase_ is the real rBound_. 672 return static_cast<uint32_t>(wBase_ - rBase_); 673 } 674 available_write()675 uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } 676 677 // Returns a pointer to where the client can write data to append to 678 // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a 679 // write of the provided length. The returned pointer is very convenient for 680 // passing to read(), recv(), or similar. You must call wroteBytes() as soon 681 // as data is written or the buffer will not be aware that data has changed. getWritePtr(uint32_t len)682 uint8_t* getWritePtr(uint32_t len) { 683 ensureCanWrite(len); 684 return wBase_; 685 } 686 687 // Informs the buffer that the client has written 'len' bytes into storage 688 // that had been provided by getWritePtr(). 689 void wroteBytes(uint32_t len); 690 691 /* 692 * TVirtualTransport provides a default implementation of readAll(). 693 * We want to use the TBufferBase version instead. 694 */ readAll(uint8_t * buf,uint32_t len)695 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } 696 697 //! \brief Get the current buffer size 698 //! \returns the current buffer size getBufferSize()699 uint32_t getBufferSize() const { 700 return bufferSize_; 701 } 702 703 //! \brief Get the current maximum buffer size 704 //! \returns the current maximum buffer size getMaxBufferSize()705 uint32_t getMaxBufferSize() const { 706 return maxBufferSize_; 707 } 708 709 //! \brief Change the maximum buffer size 710 //! \param[in] maxSize the new maximum buffer size allowed to grow to 711 //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size setMaxBufferSize(uint32_t maxSize)712 void setMaxBufferSize(uint32_t maxSize) { 713 if (maxSize < bufferSize_) { 714 throw TTransportException(TTransportException::BAD_ARGS, 715 "Maximum buffer size would be less than current buffer size"); 716 } 717 maxBufferSize_ = maxSize; 718 } 719 720 protected: swap(TMemoryBuffer & that)721 void swap(TMemoryBuffer& that) { 722 using std::swap; 723 swap(buffer_, that.buffer_); 724 swap(bufferSize_, that.bufferSize_); 725 726 swap(rBase_, that.rBase_); 727 swap(rBound_, that.rBound_); 728 swap(wBase_, that.wBase_); 729 swap(wBound_, that.wBound_); 730 731 swap(owner_, that.owner_); 732 } 733 734 // Make sure there's at least 'len' bytes available for writing. 735 void ensureCanWrite(uint32_t len); 736 737 // Compute the position and available data for reading. 738 void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); 739 740 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 741 742 void writeSlow(const uint8_t* buf, uint32_t len) override; 743 744 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 745 746 // Data buffer 747 uint8_t* buffer_; 748 749 // Allocated buffer size 750 uint32_t bufferSize_; 751 752 // Maximum allowed size 753 uint32_t maxBufferSize_; 754 755 // Is this object the owner of the buffer? 756 bool owner_; 757 758 // Don't forget to update constrctors, initCommon, and swap if 759 // you add new members. 760 }; 761 } 762 } 763 } // apache::thrift::transport 764 765 #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 766