1 /*
2 * Copyright (c) Facebook, Inc. and its affiliates.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #ifndef THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_
18 #define THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_ 1
19
20 #include <thrift/lib/cpp/async/TUnframedAsyncChannel.h>
21
22 #include <thrift/lib/cpp/transport/TBufferTransports.h>
23
24 namespace {
25 const uint32_t kInitialBufferSize = 4096;
26 }
27
28 namespace apache {
29 namespace thrift {
30 namespace async {
31 namespace detail {
32
33 template <typename ProtocolTraits_>
TUnframedACReadState()34 TUnframedACReadState<ProtocolTraits_>::TUnframedACReadState()
35 : maxMessageSize_(0x7fffffff),
36 memBuffer_(kInitialBufferSize),
37 callbackBuffer_(nullptr),
38 protocolTraits_() {}
39
40 template <typename ProtocolTraits_>
~TUnframedACReadState()41 TUnframedACReadState<ProtocolTraits_>::~TUnframedACReadState() {}
42
43 template <typename ProtocolTraits_>
getReadBuffer(void ** bufReturn,size_t * lenReturn)44 void TUnframedACReadState<ProtocolTraits_>::getReadBuffer(
45 void** bufReturn, size_t* lenReturn) {
46 uint32_t bytesAvailable = memBuffer_.available_write();
47 if (bytesAvailable > 0) {
48 // If there is room available in the buffer, just return it.
49 *lenReturn = bytesAvailable;
50 *bufReturn = memBuffer_.getWritePtr(bytesAvailable);
51 return;
52 }
53
54 uint32_t bufferSize = memBuffer_.getBufferSize();
55 uint32_t available_read = memBuffer_.available_read();
56 // we get this much without growing the buffer capacity
57 uint32_t additionalSpace = bufferSize - available_read;
58 if (additionalSpace == 0) {
59 // We need more room. memBuffer_ will at least double it's capacity when
60 // asked for even a single byte.
61 additionalSpace = kInitialBufferSize;
62 }
63
64 // Don't allow more than maxMessageSize_.
65 // Be careful not to over- or underflow uint32_t when checking.
66 //
67 // readDataAvailable() fails the read when we've already read maxMessageSize_
68 // bytes, so available_read should always be less than maxMessageSize_ here.
69 // (Unless maxMessageSize_ is 0, but that's a programmer bug.)
70 assert(available_read < maxMessageSize_);
71 if (available_read > maxMessageSize_ - additionalSpace) {
72 // Don't ask for more than maxMessageSize_ total (but we might get more)
73 additionalSpace = maxMessageSize_ - available_read;
74 }
75
76 try {
77 uint8_t* newBuffer = memBuffer_.getWritePtr(additionalSpace);
78 *lenReturn = memBuffer_.available_write();
79 *bufReturn = newBuffer;
80 } catch (std::exception& ex) {
81 T_ERROR(
82 "TUnframedAsyncChannel: failed to allocate larger read buffer: %s",
83 ex.what());
84 *lenReturn = 0;
85 *bufReturn = nullptr;
86 }
87 }
88
89 template <typename ProtocolTraits_>
readDataAvailable(size_t len)90 bool TUnframedACReadState<ProtocolTraits_>::readDataAvailable(size_t len) {
91 assert(memBuffer_.available_read() + len <= memBuffer_.getBufferSize());
92 memBuffer_.wroteBytes(len);
93
94 uint32_t messageLength = 0;
95 uint32_t bytesRead = memBuffer_.available_read();
96 uint8_t* buffer = (uint8_t*)memBuffer_.borrow(nullptr, &bytesRead);
97 if (!protocolTraits_.getMessageLength(buffer, bytesRead, &messageLength)) {
98 // We're not at the end of the message yet.
99 //
100 // If we've hit maxMessageSize_ already, fail now instead of waiting until
101 // getReadBuffer() is called again.
102 if (bytesRead >= maxMessageSize_) {
103 throw transport::TTransportException(
104 transport::TTransportException::CORRUPTED_DATA,
105 "TUnframedAsyncChannel: max message size exceeded");
106 }
107 return false;
108 }
109
110 // We've read a full message.
111 // Swap the data into the callback's buffer.
112 // Note that we may have actually read more than one message,
113 // so we have to make sure to save any remaining data after the end of the
114 // message.
115 assert(messageLength <= bytesRead);
116
117 callbackBuffer_->link(&memBuffer_, messageLength);
118 memBuffer_.consume(messageLength);
119
120 // We've put a new message in callbackBuffer_
121 return true;
122 }
123
124 } // namespace detail
125 } // namespace async
126 } // namespace thrift
127 } // namespace apache
128
129 #endif // THRIFT_ASYNC_TUNFRAMEDASYNCCHANNEL_TCC_
130