1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/transport/TTransportUtils.h>
21 
22 using std::string;
23 
24 namespace apache {
25 namespace thrift {
26 namespace transport {
27 
read(uint8_t * buf,uint32_t len)28 uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
29   checkReadBytesAvailable(len);
30   uint32_t need = len;
31 
32   // We don't have enough data yet
33   if (rLen_ - rPos_ < need) {
34     // Copy out whatever we have
35     if (rLen_ - rPos_ > 0) {
36       memcpy(buf, rBuf_ + rPos_, rLen_ - rPos_);
37       need -= rLen_ - rPos_;
38       buf += rLen_ - rPos_;
39       rPos_ = rLen_;
40     }
41 
42     // Double the size of the underlying buffer if it is full
43     if (rLen_ == rBufSize_) {
44       rBufSize_ *= 2;
45       auto *tmpBuf = (uint8_t*)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
46       if (tmpBuf == nullptr) {
47        throw std::bad_alloc();
48       }
49       rBuf_ = tmpBuf;
50     }
51 
52     // try to fill up the buffer
53     rLen_ += srcTrans_->read(rBuf_ + rPos_, rBufSize_ - rPos_);
54   }
55 
56   // Hand over whatever we have
57   uint32_t give = need;
58   if (rLen_ - rPos_ < give) {
59     give = rLen_ - rPos_;
60   }
61   if (give > 0) {
62     memcpy(buf, rBuf_ + rPos_, give);
63     rPos_ += give;
64     need -= give;
65   }
66 
67   return (len - need);
68 }
69 
write(const uint8_t * buf,uint32_t len)70 void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
71   if (len == 0) {
72     return;
73   }
74 
75   // Make the buffer as big as it needs to be
76   if ((len + wLen_) >= wBufSize_) {
77     uint32_t newBufSize = wBufSize_ * 2;
78     while ((len + wLen_) >= newBufSize) {
79       newBufSize *= 2;
80     }
81     auto *tmpBuf= (uint8_t*)std::realloc(wBuf_, sizeof(uint8_t) * newBufSize);
82     if (tmpBuf == nullptr) {
83       throw std::bad_alloc();
84     }
85     wBuf_ = tmpBuf;
86 
87     wBufSize_ = newBufSize;
88   }
89 
90   // Copy into the buffer
91   memcpy(wBuf_ + wLen_, buf, len);
92   wLen_ += len;
93 }
94 
flush()95 void TPipedTransport::flush() {
96   // Write out any data waiting in the write buffer
97   if (wLen_ > 0) {
98     srcTrans_->write(wBuf_, wLen_);
99     wLen_ = 0;
100   }
101 
102   // Flush the underlying transport
103   srcTrans_->flush();
104 }
105 
TPipedFileReaderTransport(std::shared_ptr<TFileReaderTransport> srcTrans,std::shared_ptr<TTransport> dstTrans,std::shared_ptr<TConfiguration> config)106 TPipedFileReaderTransport::TPipedFileReaderTransport(
107     std::shared_ptr<TFileReaderTransport> srcTrans,
108     std::shared_ptr<TTransport> dstTrans,
109     std::shared_ptr<TConfiguration> config)
110   : TPipedTransport(srcTrans, dstTrans, config), srcTrans_(srcTrans) {
111 }
112 
113 TPipedFileReaderTransport::~TPipedFileReaderTransport() = default;
114 
isOpen() const115 bool TPipedFileReaderTransport::isOpen() const {
116   return TPipedTransport::isOpen();
117 }
118 
peek()119 bool TPipedFileReaderTransport::peek() {
120   return TPipedTransport::peek();
121 }
122 
open()123 void TPipedFileReaderTransport::open() {
124   TPipedTransport::open();
125 }
126 
close()127 void TPipedFileReaderTransport::close() {
128   TPipedTransport::close();
129 }
130 
read(uint8_t * buf,uint32_t len)131 uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
132   return TPipedTransport::read(buf, len);
133 }
134 
readAll(uint8_t * buf,uint32_t len)135 uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
136   checkReadBytesAvailable(len);
137   uint32_t have = 0;
138   uint32_t get = 0;
139 
140   while (have < len) {
141     get = read(buf + have, len - have);
142     if (get <= 0) {
143       throw TEOFException();
144     }
145     have += get;
146   }
147 
148   return have;
149 }
150 
readEnd()151 uint32_t TPipedFileReaderTransport::readEnd() {
152   return TPipedTransport::readEnd();
153 }
154 
write(const uint8_t * buf,uint32_t len)155 void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
156   TPipedTransport::write(buf, len);
157 }
158 
writeEnd()159 uint32_t TPipedFileReaderTransport::writeEnd() {
160   return TPipedTransport::writeEnd();
161 }
162 
flush()163 void TPipedFileReaderTransport::flush() {
164   TPipedTransport::flush();
165 }
166 
getReadTimeout()167 int32_t TPipedFileReaderTransport::getReadTimeout() {
168   return srcTrans_->getReadTimeout();
169 }
170 
setReadTimeout(int32_t readTimeout)171 void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
172   srcTrans_->setReadTimeout(readTimeout);
173 }
174 
getNumChunks()175 uint32_t TPipedFileReaderTransport::getNumChunks() {
176   return srcTrans_->getNumChunks();
177 }
178 
getCurChunk()179 uint32_t TPipedFileReaderTransport::getCurChunk() {
180   return srcTrans_->getCurChunk();
181 }
182 
seekToChunk(int32_t chunk)183 void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
184   srcTrans_->seekToChunk(chunk);
185 }
186 
seekToEnd()187 void TPipedFileReaderTransport::seekToEnd() {
188   srcTrans_->seekToEnd();
189 }
190 }
191 }
192 } // apache::thrift::transport
193