1 /*
2  * This File is part of Davix, The IO library for HTTP based protocols
3  * Copyright (C) CERN 2017
4  * Author: Georgios Bitzes <georgios.bitzes@cern.ch>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; either
9  * version 2.1 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20 */
21 
22 #include "S3IO.hpp"
23 #include <utils/davix_logger_internal.hpp>
24 #include <xml/S3MultiPartInitiationParser.hpp>
25 
26 #define SSTR(message) static_cast<std::ostringstream&>(std::ostringstream().flush() << message).str()
27 
28 namespace Davix{
29 
is_s3_operation(IOChainContext & context)30 static bool is_s3_operation(IOChainContext & context){
31   if(context._reqparams->getProtocol() == RequestProtocol::AwsS3) {
32     return true;
33   }
34 
35   return false;
36 }
37 
should_use_s3_multipart(IOChainContext & context,dav_size_t size)38 static bool should_use_s3_multipart(IOChainContext & context, dav_size_t size) {
39   bool is_s3 = is_s3_operation(context);
40 
41   if(!is_s3) return false;
42 
43   if(context._uri.fragmentParamExists("forceMultiPart")) {
44 
45     return true;
46   }
47 
48   return size > (1024 * 1024 * 512); // 512 MB
49 }
50 
S3IO()51 S3IO::S3IO() {
52 
53 }
54 
~S3IO()55 S3IO::~S3IO() {
56 
57 }
58 
initiateMultipart(IOChainContext & iocontext)59 std::string S3IO::initiateMultipart(IOChainContext & iocontext) {
60   Uri url(iocontext._uri);
61   url.addQueryParam("uploads", "");
62 
63   return initiateMultipart(iocontext, url);
64 }
65 
initiateMultipart(IOChainContext & iocontext,const Uri & url)66 std::string S3IO::initiateMultipart(IOChainContext & iocontext, const Uri &url) {
67   DavixError * tmp_err=NULL;
68 
69   PostRequest req(iocontext._context, url, &tmp_err);
70   checkDavixError(&tmp_err);
71 
72   req.setParameters(iocontext._reqparams);
73   req.setRequestBody("");
74   req.executeRequest(&tmp_err);
75   if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
76     httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
77       "write error: ", &tmp_err);
78   }
79   checkDavixError(&tmp_err);
80 
81   std::string response = req.getAnswerContent();
82   S3MultiPartInitiationParser parser;
83   if(parser.parseChunk(response) != 0) {
84     DavixError::setupError(&tmp_err, "S3::MultiPart", StatusCode::InvalidServerResponse, "Unable to parse server response for multi-part initiation");
85   }
86   checkDavixError(&tmp_err);
87 
88 
89   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Obtained multi-part upload id {} for {}", parser.getUploadId(), iocontext._uri);
90   return parser.getUploadId();
91 }
92 
writeChunk(IOChainContext & iocontext,const char * buff,dav_size_t size,const std::string & uploadId,int partNumber)93 std::string S3IO::writeChunk(IOChainContext & iocontext, const char* buff, dav_size_t size, const std::string &uploadId, int partNumber) {
94   Uri url(iocontext._uri);
95   url.addQueryParam("uploadId", uploadId);
96   url.addQueryParam("partNumber", SSTR(partNumber));
97 
98   return writeChunk(iocontext, buff, size, url, partNumber);
99 }
100 
writeChunk(IOChainContext & iocontext,const char * buff,dav_size_t size,const Uri & url,int partNumber)101 std::string S3IO::writeChunk(IOChainContext & iocontext, const char* buff, dav_size_t size, const Uri &url, int partNumber) {
102   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "writing chunk #{} with size {}", partNumber, size);
103 
104   DavixError * tmp_err=NULL;
105   PutRequest req(iocontext._context, url, &tmp_err);
106   checkDavixError(&tmp_err);
107 
108   req.setParameters(iocontext._reqparams);
109   req.setRequestBody(buff, size);
110   req.executeRequest(&tmp_err);
111   if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
112       httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
113                            "write error: ", &tmp_err);
114   }
115   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "write result size {}", size);
116   checkDavixError(&tmp_err);
117 
118   std::string etag;
119   if(!req.getAnswerHeader("Etag", etag)) {
120     DavixError::setupError(&tmp_err, "S3::MultiPart", StatusCode::InvalidServerResponse, "Unable to retrieve chunk Etag, necessary when committing chunks");
121   }
122 
123   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "chunk #{} written successfully, etag: {}", partNumber, etag);
124   return etag;
125 }
126 
commitChunks(IOChainContext & iocontext,const std::string & uploadId,const std::vector<std::string> & etags)127 void S3IO::commitChunks(IOChainContext & iocontext,  const std::string &uploadId, const std::vector<std::string> &etags) {
128   Uri url(iocontext._uri);
129   url.addQueryParam("uploadId", uploadId);
130 
131   return commitChunks(iocontext, url, etags);
132 }
133 
commitChunks(IOChainContext & iocontext,const Uri & url,const std::vector<std::string> & etags)134 void S3IO::commitChunks(IOChainContext & iocontext,  const Uri &url, const std::vector<std::string> &etags) {
135   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "committing {} chunks", etags.size());
136 
137   std::ostringstream payload;
138   payload << "<CompleteMultipartUpload>";
139   for(size_t i = 1; i <= etags.size(); i++) {
140     payload << "<Part>";
141     payload << "<PartNumber>" << i << "</PartNumber>";
142     payload << "<ETag>" << etags[i-1] << "</ETag>";
143     payload << "</Part>";
144   }
145   payload << "</CompleteMultipartUpload>";
146 
147   DavixError * tmp_err=NULL;
148   PostRequest req(iocontext._context, url, &tmp_err);
149   req.setParameters(iocontext._reqparams);
150   req.setRequestBody(payload.str());
151   req.executeRequest(&tmp_err);
152 
153   if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
154       httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
155                            "write error: ", &tmp_err);
156   }
157   checkDavixError(&tmp_err);
158 }
159 
readFunction(int fd,void * buffer,dav_size_t size)160 static dav_ssize_t readFunction(int fd, void* buffer, dav_size_t size) {
161   dav_ssize_t ret = ::read(fd, buffer, size);
162   if(ret < 0) {
163     int myerr = errno;
164     DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Error in readFunction when attempting to read from fd {}: Return code {}, errno: {}", fd, ret, myerr);
165   }
166   return ret;
167 }
168 
writeFromFd(IOChainContext & iocontext,int fd,dav_size_t size)169 dav_ssize_t S3IO::writeFromFd(IOChainContext & iocontext, int fd, dav_size_t size) {
170   if(!should_use_s3_multipart(iocontext, size)) {
171     CHAIN_FORWARD(writeFromFd(iocontext, fd, size));
172   }
173 
174   using std::placeholders::_1;
175   using std::placeholders::_2;
176 
177   DataProviderFun providerFunc = std::bind(readFunction, fd, _1, _2);
178   return this->writeFromCb(iocontext, providerFunc, size);
179 }
180 
fillBufferWithProviderData(std::vector<char> & buffer,const dav_size_t maxChunkSize,const DataProviderFun & func)181 static dav_size_t fillBufferWithProviderData(std::vector<char> &buffer, const dav_size_t maxChunkSize, const DataProviderFun &func) {
182     dav_size_t written = 0u;
183     dav_size_t remaining = maxChunkSize;
184 
185     while(true) {
186       dav_ssize_t bytesRead = func(buffer.data(), remaining);
187       if(bytesRead < 0) {
188         throw DavixException(davix_scope_io_buff(), StatusCode::InvalidFileHandle, fmt::format("Error when reading from callback: {}", bytesRead));
189       }
190 
191       remaining -= bytesRead;
192       written += bytesRead;
193 
194       if(bytesRead == 0) {
195         DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Reached data provider EOF, received 0 bytes, even though asked for {}", remaining);
196         break; // EOF
197       }
198 
199       if(remaining == 0) {
200         DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Data provider buffer has been filled");
201         break; // buffer is full
202       }
203     }
204 
205     DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Retrieved {} bytes from data provider", written);
206     return written;
207 }
208 
writeFromCb(IOChainContext & iocontext,const DataProviderFun & func,dav_size_t size)209 dav_ssize_t S3IO::writeFromCb(IOChainContext & iocontext, const DataProviderFun & func, dav_size_t size) {
210   if(!should_use_s3_multipart(iocontext, size)) {
211     CHAIN_FORWARD(writeFromCb(iocontext, func, size));
212   }
213 
214   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Initiating multi-part upload towards {} to upload file with size {}", iocontext._uri, size);
215   std::string uploadId = initiateMultipart(iocontext);
216 
217   size_t remaining = size;
218   const dav_size_t MAX_CHUNK_SIZE = 1024 * 1024 * 256; // 256 MB
219 
220   std::vector<char> buffer;
221   buffer.resize(std::min(MAX_CHUNK_SIZE, size) + 10);
222 
223   std::vector<std::string> etags;
224 
225   size_t partNumber = 0;
226   while(remaining > 0) {
227     size_t toRead = std::min(size, MAX_CHUNK_SIZE);
228     DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "S3IO write: toRead from cb {}", toRead);
229 
230     dav_size_t bytesRead = fillBufferWithProviderData(buffer, MAX_CHUNK_SIZE, func);
231     if(bytesRead == 0) break; // EOF
232 
233     partNumber++;
234     etags.emplace_back(writeChunk(iocontext, buffer.data(), bytesRead, uploadId, partNumber));
235    }
236 
237    commitChunks(iocontext, uploadId, etags);
238 }
239 
retrieveDynafedUris(IOChainContext & iocontext,const std::string & uploadId,const std::string & pluginId,size_t nchunks)240 DynafedUris S3IO::retrieveDynafedUris(IOChainContext & iocontext, const std::string &uploadId, const std::string &pluginId, size_t nchunks) {
241   DynafedUris retval;
242 
243   DavixError *tmp_err = NULL;
244   PutRequest req(iocontext._context, iocontext._uri, &tmp_err);
245   checkDavixError(&tmp_err);
246 
247   req.setParameters(iocontext._reqparams);
248   req.addHeaderField("x-s3-uploadid", uploadId);
249   req.addHeaderField("x-ugrpluginid", pluginId);
250   req.addHeaderField("x-s3-upload-nchunks", SSTR(nchunks));
251   req.executeRequest(&tmp_err);
252 
253   if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
254       httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
255                            "write error: ", &tmp_err);
256   }
257   checkDavixError(&tmp_err);
258 
259   // We have a response, parse it into lines
260   retval.chunks = StrUtil::tokenSplit(req.getAnswerContent(), "\n");
261   if(!retval.chunks.empty()) {
262     retval.post = retval.chunks.back();
263     retval.chunks.pop_back();
264   }
265 
266   DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "retrieveDynafedUris: Obtained list with {} chunk URIs in total", retval.chunks.size());
267   return retval;
268 }
269 
performUgrS3MultiPart(IOChainContext & iocontext,const std::string & posturl,const std::string & pluginId,const DataProviderFun & func,dav_size_t size,DavixError ** err)270 void S3IO::performUgrS3MultiPart(IOChainContext & iocontext, const std::string &posturl, const std::string &pluginId, const DataProviderFun &func, dav_size_t size, DavixError **err) {
271     try {
272         Uri uri(posturl);
273         std::string uploadId = initiateMultipart(iocontext, posturl);
274 
275         const dav_size_t MAX_CHUNK_SIZE = 1024 * 1024 * 256; // 256 MB
276         std::vector<char> buffer;
277         buffer.resize(std::min(MAX_CHUNK_SIZE, size) + 10);
278 
279         size_t nchunks = (size / MAX_CHUNK_SIZE) + 2;
280         DynafedUris uris = retrieveDynafedUris(iocontext, uploadId, pluginId, nchunks);
281 
282         if(uris.chunks.size() != nchunks) {
283           DAVIX_SLOG(DAVIX_LOG_WARNING, DAVIX_LOG_CHAIN, "Dynafed returned different number of URIs than expected: {} vs {]", "} retrieveDynafedUris: Obtained list with {} chunk URIs in total", uris.chunks.size(), nchunks);
284           throw DavixException("S3::MultiPart", StatusCode::InvalidServerResponse, "Dynafed returned different number of URIs than expected");
285         }
286 
287         std::vector<std::string> etags;
288         size_t partNumber = 1;
289         uint64_t remaining = size;
290 
291         while(remaining > 0) {
292           dav_size_t bytesRetrieved = fillBufferWithProviderData(buffer, MAX_CHUNK_SIZE, func);
293           if(bytesRetrieved == 0) {
294             break; // EOF
295           }
296 
297           etags.emplace_back(writeChunk(iocontext, buffer.data(), bytesRetrieved, Uri(uris.chunks[partNumber-1]), partNumber));
298           partNumber++;
299           remaining -= bytesRetrieved;
300         }
301 
302         commitChunks(iocontext, Uri(uris.post), etags);
303     }
304     CATCH_DAVIX(err);
305 }
306 
307 }
308