1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <cassert>
21 #include <algorithm>
22
23 #include <thrift/transport/TBufferTransports.h>
24
25 using std::string;
26
27 namespace apache
28 {
29 namespace thrift
30 {
31 namespace transport
32 {
33
34
readSlow(uint8_t * buf,uint32_t len)35 uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len)
36 {
37 uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
38
39 // We should only take the slow path if we can't satisfy the read
40 // with the data already in the buffer.
41 assert(have < len);
42
43 // If we have some data in the buffer, copy it out and return it.
44 // We have to return it without attempting to read more, since we aren't
45 // guaranteed that the underlying transport actually has more data, so
46 // attempting to read from it could block.
47 if (have > 0)
48 {
49 memcpy(buf, rBase_, have);
50 setReadBuffer(rBuf_.get(), 0);
51 return have;
52 }
53
54 // No data is available in our buffer.
55 // Get more from underlying transport up to buffer size.
56 // Note that this makes a lot of sense if len < rBufSize_
57 // and almost no sense otherwise. TODO(dreiss): Fix that
58 // case (possibly including some readv hotness).
59 setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
60
61 // Hand over whatever we have.
62 uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_));
63 memcpy(buf, rBase_, give);
64 rBase_ += give;
65
66 return give;
67 }
68
writeSlow(const uint8_t * buf,uint32_t len)69 void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len)
70 {
71 uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
72 uint32_t space = static_cast<uint32_t>(wBound_ - wBase_);
73 // We should only take the slow path if we can't accomodate the write
74 // with the free space already in the buffer.
75 assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
76
77 // Now here's the tricky question: should we copy data from buf into our
78 // internal buffer and write it from there, or should we just write out
79 // the current internal buffer in one syscall and write out buf in another.
80 // If our currently buffered data plus buf is at least double our buffer
81 // size, we will have to do two syscalls no matter what (except in the
82 // degenerate case when our buffer is empty), so there is no use copying.
83 // Otherwise, there is sort of a sliding scale. If we have N-1 bytes
84 // buffered and need to write 2, it would be crazy to do two syscalls.
85 // On the other hand, if we have 2 bytes buffered and are writing 2N-3,
86 // we can save a syscall in the short term by loading up our buffer, writing
87 // it out, and copying the rest of the bytes into our buffer. Of course,
88 // if we get another 2-byte write, we haven't saved any syscalls at all,
89 // and have just copied nearly 2N bytes for nothing. Finding a perfect
90 // policy would require predicting the size of future writes, so we're just
91 // going to always eschew syscalls if we have less than 2N bytes to write.
92
93 // The case where we have to do two syscalls.
94 // This case also covers the case where the buffer is empty,
95 // but it is clearer (I think) to think of it as two separate cases.
96 if ((have_bytes + len >= 2 * wBufSize_) || (have_bytes == 0))
97 {
98 // TODO(dreiss): writev
99 if (have_bytes > 0)
100 {
101 transport_->write(wBuf_.get(), have_bytes);
102 }
103
104 transport_->write(buf, len);
105 wBase_ = wBuf_.get();
106 return;
107 }
108
109 // Fill up our internal buffer for a write.
110 memcpy(wBase_, buf, space);
111 buf += space;
112 len -= space;
113 transport_->write(wBuf_.get(), wBufSize_);
114
115 // Copy the rest into our buffer.
116 assert(len < wBufSize_);
117 memcpy(wBuf_.get(), buf, len);
118 wBase_ = wBuf_.get() + len;
119 return;
120 }
121
borrowSlow(uint8_t * buf,uint32_t * len)122 const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len)
123 {
124 (void) buf;
125 (void) len;
126 // Simply return NULL. We don't know if there is actually data available on
127 // the underlying transport, so calling read() might block.
128 return NULL;
129 }
130
flush()131 void TBufferedTransport::flush()
132 {
133 // Write out any data waiting in the write buffer.
134 uint32_t have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get());
135
136 if (have_bytes > 0)
137 {
138 // Note that we reset wBase_ prior to the underlying write
139 // to ensure we're in a sane state (i.e. internal buffer cleaned)
140 // if the underlying write throws up an exception
141 wBase_ = wBuf_.get();
142 transport_->write(wBuf_.get(), have_bytes);
143 }
144
145 // Flush the underlying transport.
146 transport_->flush();
147 }
148
149
readSlow(uint8_t * buf,uint32_t len)150 uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len)
151 {
152 uint32_t want = len;
153 uint32_t have = static_cast<uint32_t>(rBound_ - rBase_);
154
155 // We should only take the slow path if we can't satisfy the read
156 // with the data already in the buffer.
157 assert(have < want);
158
159 // If we have some data in the buffer, copy it out and return it.
160 // We have to return it without attempting to read more, since we aren't
161 // guaranteed that the underlying transport actually has more data, so
162 // attempting to read from it could block.
163 if (have > 0)
164 {
165 memcpy(buf, rBase_, have);
166 setReadBuffer(rBuf_.get(), 0);
167 return have;
168 }
169
170 // Read another frame.
171 if (!readFrame())
172 {
173 // EOF. No frame available.
174 return 0;
175 }
176
177 // TODO(dreiss): Should we warn when reads cross frames?
178
179 // Hand over whatever we have.
180 uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_));
181 memcpy(buf, rBase_, give);
182 rBase_ += give;
183 want -= give;
184
185 return (len - want);
186 }
187
readFrame()188 bool TFramedTransport::readFrame()
189 {
190 // TODO(dreiss): Think about using readv here, even though it would
191 // result in (gasp) read-ahead.
192
193 // Read the size of the next frame.
194 // We can't use readAll(&sz, sizeof(sz)), since that always throws an
195 // exception on EOF. We want to throw an exception only if EOF occurs after
196 // partial size data.
197 int32_t sz;
198 uint32_t size_bytes_read = 0;
199
200 while (size_bytes_read < sizeof(sz))
201 {
202 uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read;
203 uint32_t bytes_read = transport_->read(
204 szp,
205 static_cast<uint32_t>(sizeof(sz)) - size_bytes_read);
206
207 if (bytes_read == 0)
208 {
209 if (size_bytes_read == 0)
210 {
211 // EOF before any data was read.
212 return false;
213 }
214 else
215 {
216 // EOF after a partial frame header. Raise an exception.
217 throw TTransportException(TTransportException::END_OF_FILE,
218 "No more data to read after "
219 "partial frame header.");
220 }
221 }
222
223 size_bytes_read += bytes_read;
224 }
225
226 sz = ntohl(sz);
227
228 if (sz < 0)
229 {
230 throw TTransportException("Frame size has negative value");
231 }
232
233 // Read the frame payload, and reset markers.
234 if (sz > static_cast<int32_t>(rBufSize_))
235 {
236 rBuf_.reset(new uint8_t[sz]);
237 rBufSize_ = sz;
238 }
239
240 transport_->readAll(rBuf_.get(), sz);
241 setReadBuffer(rBuf_.get(), sz);
242 return true;
243 }
244
writeSlow(const uint8_t * buf,uint32_t len)245 void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len)
246 {
247 // Double buffer size until sufficient.
248 uint32_t have = static_cast<uint32_t>(wBase_ - wBuf_.get());
249 uint32_t new_size = wBufSize_;
250
251 if (len + have < have /* overflow */ || len + have > 0x7fffffff)
252 {
253 throw TTransportException(TTransportException::BAD_ARGS,
254 "Attempted to write over 2 GB to TFramedTransport.");
255 }
256
257 while (new_size < len + have)
258 {
259 new_size = new_size > 0 ? new_size * 2 : 1;
260 }
261
262 // TODO(dreiss): Consider modifying this class to use malloc/free
263 // so we can use realloc here.
264
265 // Allocate new buffer.
266 uint8_t* new_buf = new uint8_t[new_size];
267
268 // Copy the old buffer to the new one.
269 memcpy(new_buf, wBuf_.get(), have);
270
271 // Now point buf to the new one.
272 wBuf_.reset(new_buf);
273 wBufSize_ = new_size;
274 wBase_ = wBuf_.get() + have;
275 wBound_ = wBuf_.get() + wBufSize_;
276
277 // Copy the data into the new buffer.
278 memcpy(wBase_, buf, len);
279 wBase_ += len;
280 }
281
flush()282 void TFramedTransport::flush()
283 {
284 int32_t sz_hbo, sz_nbo;
285 assert(wBufSize_ > sizeof(sz_nbo));
286
287 // Slip the frame size into the start of the buffer.
288 sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo)));
289 sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
290 memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
291
292 if (sz_hbo > 0)
293 {
294 // Note that we reset wBase_ (with a pad for the frame size)
295 // prior to the underlying write to ensure we're in a sane state
296 // (i.e. internal buffer cleaned) if the underlying write throws
297 // up an exception
298 wBase_ = wBuf_.get() + sizeof(sz_nbo);
299
300 // Write size and frame body.
301 transport_->write(
302 wBuf_.get(),
303 static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo);
304 }
305
306 // Flush the underlying transport.
307 transport_->flush();
308 }
309
writeEnd()310 uint32_t TFramedTransport::writeEnd()
311 {
312 return static_cast<uint32_t>(wBase_ - wBuf_.get());
313 }
314
borrowSlow(uint8_t * buf,uint32_t * len)315 const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len)
316 {
317 (void) buf;
318 (void) len;
319 // Don't try to be clever with shifting buffers.
320 // If the fast path failed let the protocol use its slow path.
321 // Besides, who is going to try to borrow across messages?
322 return NULL;
323 }
324
readEnd()325 uint32_t TFramedTransport::readEnd()
326 {
327 // include framing bytes
328 return static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t));
329 }
330
computeRead(uint32_t len,uint8_t ** out_start,uint32_t * out_give)331 void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give)
332 {
333 // Correct rBound_ so we can use the fast path in the future.
334 rBound_ = wBase_;
335
336 // Decide how much to give.
337 uint32_t give = (std::min)(len, available_read());
338
339 *out_start = rBase_;
340 *out_give = give;
341
342 // Preincrement rBase_ so the caller doesn't have to.
343 rBase_ += give;
344 }
345
readSlow(uint8_t * buf,uint32_t len)346 uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len)
347 {
348 uint8_t* start;
349 uint32_t give;
350 computeRead(len, &start, &give);
351
352 // Copy into the provided buffer.
353 memcpy(buf, start, give);
354
355 return give;
356 }
357
readAppendToString(std::string & str,uint32_t len)358 uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len)
359 {
360 // Don't get some stupid assertion failure.
361 if (buffer_ == NULL)
362 {
363 return 0;
364 }
365
366 uint8_t* start;
367 uint32_t give;
368 computeRead(len, &start, &give);
369
370 // Append to the provided string.
371 str.append((char*)start, give);
372
373 return give;
374 }
375
ensureCanWrite(uint32_t len)376 void TMemoryBuffer::ensureCanWrite(uint32_t len)
377 {
378 // Check available space
379 uint32_t avail = available_write();
380
381 if (len <= avail)
382 {
383 return;
384 }
385
386 if (!owner_)
387 {
388 throw TTransportException("Insufficient space in external MemoryBuffer");
389 }
390
391 // Grow the buffer as necessary.
392 uint32_t new_size = bufferSize_;
393
394 while (len > avail)
395 {
396 new_size = new_size > 0 ? new_size * 2 : 1;
397 avail = available_write() + (new_size - bufferSize_);
398 }
399
400 // Allocate into a new pointer so we don't bork ours if it fails.
401 void* new_buffer = std::realloc(buffer_, new_size);
402
403 if (new_buffer == NULL)
404 {
405 throw std::bad_alloc();
406 }
407
408 bufferSize_ = new_size;
409
410 ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
411 buffer_ += offset;
412 rBase_ += offset;
413 rBound_ += offset;
414 wBase_ += offset;
415 wBound_ = buffer_ + bufferSize_;
416 }
417
writeSlow(const uint8_t * buf,uint32_t len)418 void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len)
419 {
420 ensureCanWrite(len);
421
422 // Copy into the buffer and increment wBase_.
423 memcpy(wBase_, buf, len);
424 wBase_ += len;
425 }
426
wroteBytes(uint32_t len)427 void TMemoryBuffer::wroteBytes(uint32_t len)
428 {
429 uint32_t avail = available_write();
430
431 if (len > avail)
432 {
433 throw TTransportException("Client wrote more bytes than size of buffer.");
434 }
435
436 wBase_ += len;
437 }
438
borrowSlow(uint8_t * buf,uint32_t * len)439 const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len)
440 {
441 (void) buf;
442 rBound_ = wBase_;
443
444 if (available_read() >= *len)
445 {
446 *len = available_read();
447 return rBase_;
448 }
449
450 return NULL;
451 }
452
453 }
454 }
455 } // apache::thrift::transport
456