1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <cassert>
21 #include <algorithm>
22 
23 #include <thrift/transport/TBufferTransports.h>
24 
25 using std::string;
26 
27 namespace apache
28 {
29 namespace thrift
30 {
31 namespace transport
32 {
33 
34 
readSlow(uint8_t * buf,uint32_t len)35 uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len)
36 {
37     uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
38 
39     // We should only take the slow path if we can't satisfy the read
40     // with the data already in the buffer.
41     assert(have < len);
42 
43     // If we have some data in the buffer, copy it out and return it.
44     // We have to return it without attempting to read more, since we aren't
45     // guaranteed that the underlying transport actually has more data, so
46     // attempting to read from it could block.
47     if (have > 0)
48     {
49         memcpy(buf, rBase_, have);
50         setReadBuffer(rBuf_.get(), 0);
51         return have;
52     }
53 
54     // No data is available in our buffer.
55     // Get more from underlying transport up to buffer size.
56     // Note that this makes a lot of sense if len < rBufSize_
57     // and almost no sense otherwise.  TODO(dreiss): Fix that
58     // case (possibly including some readv hotness).
59     setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
60 
61     // Hand over whatever we have.
62     uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_));
63     memcpy(buf, rBase_, give);
64     rBase_ += give;
65 
66     return give;
67 }
68 
writeSlow(const uint8_t * buf,uint32_t len)69 void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len)
70 {
71     uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
72     uint32_t space = static_cast<uint32_t>(wBound_ - wBase_);
73     // We should only take the slow path if we can't accomodate the write
74     // with the free space already in the buffer.
75     assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
76 
77     // Now here's the tricky question: should we copy data from buf into our
78     // internal buffer and write it from there, or should we just write out
79     // the current internal buffer in one syscall and write out buf in another.
80     // If our currently buffered data plus buf is at least double our buffer
81     // size, we will have to do two syscalls no matter what (except in the
82     // degenerate case when our buffer is empty), so there is no use copying.
83     // Otherwise, there is sort of a sliding scale.  If we have N-1 bytes
84     // buffered and need to write 2, it would be crazy to do two syscalls.
85     // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
86     // we can save a syscall in the short term by loading up our buffer, writing
87     // it out, and copying the rest of the bytes into our buffer.  Of course,
88     // if we get another 2-byte write, we haven't saved any syscalls at all,
89     // and have just copied nearly 2N bytes for nothing.  Finding a perfect
90     // policy would require predicting the size of future writes, so we're just
91     // going to always eschew syscalls if we have less than 2N bytes to write.
92 
93     // The case where we have to do two syscalls.
94     // This case also covers the case where the buffer is empty,
95     // but it is clearer (I think) to think of it as two separate cases.
96     if ((have_bytes + len >= 2 * wBufSize_) || (have_bytes == 0))
97     {
98         // TODO(dreiss): writev
99         if (have_bytes > 0)
100         {
101             transport_->write(wBuf_.get(), have_bytes);
102         }
103 
104         transport_->write(buf, len);
105         wBase_ = wBuf_.get();
106         return;
107     }
108 
109     // Fill up our internal buffer for a write.
110     memcpy(wBase_, buf, space);
111     buf += space;
112     len -= space;
113     transport_->write(wBuf_.get(), wBufSize_);
114 
115     // Copy the rest into our buffer.
116     assert(len < wBufSize_);
117     memcpy(wBuf_.get(), buf, len);
118     wBase_ = wBuf_.get() + len;
119     return;
120 }
121 
borrowSlow(uint8_t * buf,uint32_t * len)122 const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len)
123 {
124     (void) buf;
125     (void) len;
126     // Simply return NULL.  We don't know if there is actually data available on
127     // the underlying transport, so calling read() might block.
128     return NULL;
129 }
130 
flush()131 void TBufferedTransport::flush()
132 {
133     // Write out any data waiting in the write buffer.
134     uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
135 
136     if (have_bytes > 0)
137     {
138         // Note that we reset wBase_ prior to the underlying write
139         // to ensure we're in a sane state (i.e. internal buffer cleaned)
140         // if the underlying write throws up an exception
141         wBase_ = wBuf_.get();
142         transport_->write(wBuf_.get(), have_bytes);
143     }
144 
145     // Flush the underlying transport.
146     transport_->flush();
147 }
148 
149 
readSlow(uint8_t * buf,uint32_t len)150 uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len)
151 {
152     uint32_t want = len;
153     uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
154 
155     // We should only take the slow path if we can't satisfy the read
156     // with the data already in the buffer.
157     assert(have < want);
158 
159     // If we have some data in the buffer, copy it out and return it.
160     // We have to return it without attempting to read more, since we aren't
161     // guaranteed that the underlying transport actually has more data, so
162     // attempting to read from it could block.
163     if (have > 0)
164     {
165         memcpy(buf, rBase_, have);
166         setReadBuffer(rBuf_.get(), 0);
167         return have;
168     }
169 
170     // Read another frame.
171     if (!readFrame())
172     {
173         // EOF.  No frame available.
174         return 0;
175     }
176 
177     // TODO(dreiss): Should we warn when reads cross frames?
178 
179     // Hand over whatever we have.
180     uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_));
181     memcpy(buf, rBase_, give);
182     rBase_ += give;
183     want -= give;
184 
185     return (len - want);
186 }
187 
readFrame()188 bool TFramedTransport::readFrame()
189 {
190     // TODO(dreiss): Think about using readv here, even though it would
191     // result in (gasp) read-ahead.
192 
193     // Read the size of the next frame.
194     // We can't use readAll(&sz, sizeof(sz)), since that always throws an
195     // exception on EOF.  We want to throw an exception only if EOF occurs after
196     // partial size data.
197     int32_t sz;
198     uint32_t size_bytes_read = 0;
199 
200     while (size_bytes_read < sizeof(sz))
201     {
202         uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;
203         uint32_t bytes_read = transport_->read(
204                                   szp,
205                                   static_cast<uint32_t>(sizeof(sz)) - size_bytes_read);
206 
207         if (bytes_read == 0)
208         {
209             if (size_bytes_read == 0)
210             {
211                 // EOF before any data was read.
212                 return false;
213             }
214             else
215             {
216                 // EOF after a partial frame header.  Raise an exception.
217                 throw TTransportException(TTransportException::END_OF_FILE,
218                                           "No more data to read after "
219                                           "partial frame header.");
220             }
221         }
222 
223         size_bytes_read += bytes_read;
224     }
225 
226     sz = ntohl(sz);
227 
228     if (sz < 0)
229     {
230         throw TTransportException("Frame size has negative value");
231     }
232 
233     // Read the frame payload, and reset markers.
234     if (sz > static_cast<int32_t>(rBufSize_))
235     {
236         rBuf_.reset(new uint8_t[sz]);
237         rBufSize_ = sz;
238     }
239 
240     transport_->readAll(rBuf_.get(), sz);
241     setReadBuffer(rBuf_.get(), sz);
242     return true;
243 }
244 
writeSlow(const uint8_t * buf,uint32_t len)245 void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len)
246 {
247     // Double buffer size until sufficient.
248     uint32_t have = static_cast<uint32_t>(wBase_ - wBuf_.get());
249     uint32_t new_size = wBufSize_;
250 
251     if (len + have < have /* overflow */ || len + have > 0x7fffffff)
252     {
253         throw TTransportException(TTransportException::BAD_ARGS,
254                                   "Attempted to write over 2 GB to TFramedTransport.");
255     }
256 
257     while (new_size < len + have)
258     {
259         new_size = new_size > 0 ? new_size * 2 : 1;
260     }
261 
262     // TODO(dreiss): Consider modifying this class to use malloc/free
263     // so we can use realloc here.
264 
265     // Allocate new buffer.
266     uint8_t* new_buf = new uint8_t[new_size];
267 
268     // Copy the old buffer to the new one.
269     memcpy(new_buf, wBuf_.get(), have);
270 
271     // Now point buf to the new one.
272     wBuf_.reset(new_buf);
273     wBufSize_ = new_size;
274     wBase_ = wBuf_.get() + have;
275     wBound_ = wBuf_.get() + wBufSize_;
276 
277     // Copy the data into the new buffer.
278     memcpy(wBase_, buf, len);
279     wBase_ += len;
280 }
281 
flush()282 void TFramedTransport::flush()
283 {
284     int32_t sz_hbo, sz_nbo;
285     assert(wBufSize_ > sizeof(sz_nbo));
286 
287     // Slip the frame size into the start of the buffer.
288     sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
289     sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
290     memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
291 
292     if (sz_hbo > 0)
293     {
294         // Note that we reset wBase_ (with a pad for the frame size)
295         // prior to the underlying write to ensure we're in a sane state
296         // (i.e. internal buffer cleaned) if the underlying write throws
297         // up an exception
298         wBase_ = wBuf_.get() + sizeof(sz_nbo);
299 
300         // Write size and frame body.
301         transport_->write(
302             wBuf_.get(),
303             static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
304     }
305 
306     // Flush the underlying transport.
307     transport_->flush();
308 }
309 
writeEnd()310 uint32_t TFramedTransport::writeEnd()
311 {
312     return static_cast<uint32_t>(wBase_ - wBuf_.get());
313 }
314 
borrowSlow(uint8_t * buf,uint32_t * len)315 const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len)
316 {
317     (void) buf;
318     (void) len;
319     // Don't try to be clever with shifting buffers.
320     // If the fast path failed let the protocol use its slow path.
321     // Besides, who is going to try to borrow across messages?
322     return NULL;
323 }
324 
readEnd()325 uint32_t TFramedTransport::readEnd()
326 {
327     // include framing bytes
328     return static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t));
329 }
330 
computeRead(uint32_t len,uint8_t ** out_start,uint32_t * out_give)331 void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give)
332 {
333     // Correct rBound_ so we can use the fast path in the future.
334     rBound_ = wBase_;
335 
336     // Decide how much to give.
337     uint32_t give = (std::min)(len, available_read());
338 
339     *out_start = rBase_;
340     *out_give = give;
341 
342     // Preincrement rBase_ so the caller doesn't have to.
343     rBase_ += give;
344 }
345 
readSlow(uint8_t * buf,uint32_t len)346 uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len)
347 {
348     uint8_t* start;
349     uint32_t give;
350     computeRead(len, &start, &give);
351 
352     // Copy into the provided buffer.
353     memcpy(buf, start, give);
354 
355     return give;
356 }
357 
readAppendToString(std::string & str,uint32_t len)358 uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len)
359 {
360     // Don't get some stupid assertion failure.
361     if (buffer_ == NULL)
362     {
363         return 0;
364     }
365 
366     uint8_t* start;
367     uint32_t give;
368     computeRead(len, &start, &give);
369 
370     // Append to the provided string.
371     str.append((char*)start, give);
372 
373     return give;
374 }
375 
ensureCanWrite(uint32_t len)376 void TMemoryBuffer::ensureCanWrite(uint32_t len)
377 {
378     // Check available space
379     uint32_t avail = available_write();
380 
381     if (len <= avail)
382     {
383         return;
384     }
385 
386     if (!owner_)
387     {
388         throw TTransportException("Insufficient space in external MemoryBuffer");
389     }
390 
391     // Grow the buffer as necessary.
392     uint32_t new_size = bufferSize_;
393 
394     while (len > avail)
395     {
396         new_size = new_size > 0 ? new_size * 2 : 1;
397         avail = available_write() + (new_size - bufferSize_);
398     }
399 
400     // Allocate into a new pointer so we don't bork ours if it fails.
401     void* new_buffer = std::realloc(buffer_, new_size);
402 
403     if (new_buffer == NULL)
404     {
405         throw std::bad_alloc();
406     }
407 
408     bufferSize_ = new_size;
409 
410     ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
411     buffer_ += offset;
412     rBase_ += offset;
413     rBound_ += offset;
414     wBase_ += offset;
415     wBound_ = buffer_ + bufferSize_;
416 }
417 
writeSlow(const uint8_t * buf,uint32_t len)418 void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len)
419 {
420     ensureCanWrite(len);
421 
422     // Copy into the buffer and increment wBase_.
423     memcpy(wBase_, buf, len);
424     wBase_ += len;
425 }
426 
wroteBytes(uint32_t len)427 void TMemoryBuffer::wroteBytes(uint32_t len)
428 {
429     uint32_t avail = available_write();
430 
431     if (len > avail)
432     {
433         throw TTransportException("Client wrote more bytes than size of buffer.");
434     }
435 
436     wBase_ += len;
437 }
438 
borrowSlow(uint8_t * buf,uint32_t * len)439 const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len)
440 {
441     (void) buf;
442     rBound_ = wBase_;
443 
444     if (available_read() >= *len)
445     {
446         *len = available_read();
447         return rBase_;
448     }
449 
450     return NULL;
451 }
452 
453 }
454 }
455 } // apache::thrift::transport
456