1 /*
2  * Copyright (c) Facebook, Inc. and its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_
18 #define THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_ 1
19 
20 #include <thrift/lib/cpp/async/TUnframedAsyncChannel.h>
21 
22 #include <thrift/lib/cpp/transport/TBufferTransports.h>
23 
24 namespace {
25 const uint32_t kInitialBufferSize = 4096;
26 }
27 
28 namespace apache {
29 namespace thrift {
30 namespace async {
31 namespace detail {
32 
33 template <typename ProtocolTraits_>
TUnframedACReadState()34 TUnframedACReadState<ProtocolTraits_>::TUnframedACReadState()
35     : maxMessageSize_(0x7fffffff),
36       memBuffer_(kInitialBufferSize),
37       callbackBuffer_(nullptr),
38       protocolTraits_() {}
39 
40 template <typename ProtocolTraits_>
~TUnframedACReadState()41 TUnframedACReadState<ProtocolTraits_>::~TUnframedACReadState() {}
42 
43 template <typename ProtocolTraits_>
getReadBuffer(void ** bufReturn,size_t * lenReturn)44 void TUnframedACReadState<ProtocolTraits_>::getReadBuffer(
45     void** bufReturn, size_t* lenReturn) {
46   uint32_t bytesAvailable = memBuffer_.available_write();
47   if (bytesAvailable > 0) {
48     // If there is room available in the buffer, just return it.
49     *lenReturn = bytesAvailable;
50     *bufReturn = memBuffer_.getWritePtr(bytesAvailable);
51     return;
52   }
53 
54   uint32_t bufferSize = memBuffer_.getBufferSize();
55   uint32_t available_read = memBuffer_.available_read();
56   // we get this much without growing the buffer capacity
57   uint32_t additionalSpace = bufferSize - available_read;
58   if (additionalSpace == 0) {
59     // We need more room.  memBuffer_ will at least double it's capacity when
60     // asked for even a single byte.
61     additionalSpace = kInitialBufferSize;
62   }
63 
64   // Don't allow more than maxMessageSize_.
65   // Be careful not to over- or underflow uint32_t when checking.
66   //
67   // readDataAvailable() fails the read when we've already read maxMessageSize_
68   // bytes, so available_read should always be less than maxMessageSize_ here.
69   // (Unless maxMessageSize_ is 0, but that's a programmer bug.)
70   assert(available_read < maxMessageSize_);
71   if (available_read > maxMessageSize_ - additionalSpace) {
72     // Don't ask for more than maxMessageSize_ total (but we might get more)
73     additionalSpace = maxMessageSize_ - available_read;
74   }
75 
76   try {
77     uint8_t* newBuffer = memBuffer_.getWritePtr(additionalSpace);
78     *lenReturn = memBuffer_.available_write();
79     *bufReturn = newBuffer;
80   } catch (std::exception& ex) {
81     T_ERROR(
82         "TUnframedAsyncChannel: failed to allocate larger read buffer: %s",
83         ex.what());
84     *lenReturn = 0;
85     *bufReturn = nullptr;
86   }
87 }
88 
89 template <typename ProtocolTraits_>
readDataAvailable(size_t len)90 bool TUnframedACReadState<ProtocolTraits_>::readDataAvailable(size_t len) {
91   assert(memBuffer_.available_read() + len <= memBuffer_.getBufferSize());
92   memBuffer_.wroteBytes(len);
93 
94   uint32_t messageLength = 0;
95   uint32_t bytesRead = memBuffer_.available_read();
96   uint8_t* buffer = (uint8_t*)memBuffer_.borrow(nullptr, &bytesRead);
97   if (!protocolTraits_.getMessageLength(buffer, bytesRead, &messageLength)) {
98     // We're not at the end of the message yet.
99     //
100     // If we've hit maxMessageSize_ already, fail now instead of waiting until
101     // getReadBuffer() is called again.
102     if (bytesRead >= maxMessageSize_) {
103       throw transport::TTransportException(
104           transport::TTransportException::CORRUPTED_DATA,
105           "TUnframedAsyncChannel: max message size exceeded");
106     }
107     return false;
108   }
109 
110   // We've read a full message.
111   // Swap the data into the callback's buffer.
112   // Note that we may have actually read more than one message,
113   // so we have to make sure to save any remaining data after the end of the
114   // message.
115   assert(messageLength <= bytesRead);
116 
117   callbackBuffer_->link(&memBuffer_, messageLength);
118   memBuffer_.consume(messageLength);
119 
120   // We've put a new message in callbackBuffer_
121   return true;
122 }
123 
124 } // namespace detail
125 } // namespace async
126 } // namespace thrift
127 } // namespace apache
128 
129 #endif // THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_
130