1 /*
2 * This File is part of Davix, The IO library for HTTP based protocols
3 * Copyright (C) CERN 2017
4 * Author: Georgios Bitzes <georgios.bitzes@cern.ch>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Lesser General Public
8 * License as published by the Free Software Foundation; either
9 * version 2.1 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Lesser General Public License for more details.
15 *
16 * You should have received a copy of the GNU Lesser General Public
17 * License along with this library; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19 *
20 */
21
22 #include "S3IO.hpp"
23 #include <utils/davix_logger_internal.hpp>
24 #include <xml/S3MultiPartInitiationParser.hpp>
25
26 #define SSTR(message) static_cast<std::ostringstream&>(std::ostringstream().flush() << message).str()
27
28 namespace Davix{
29
is_s3_operation(IOChainContext & context)30 static bool is_s3_operation(IOChainContext & context){
31 if(context._reqparams->getProtocol() == RequestProtocol::AwsS3) {
32 return true;
33 }
34
35 return false;
36 }
37
should_use_s3_multipart(IOChainContext & context,dav_size_t size)38 static bool should_use_s3_multipart(IOChainContext & context, dav_size_t size) {
39 bool is_s3 = is_s3_operation(context);
40
41 if(!is_s3) return false;
42
43 if(context._uri.fragmentParamExists("forceMultiPart")) {
44
45 return true;
46 }
47
48 return size > (1024 * 1024 * 512); // 512 MB
49 }
50
S3IO()51 S3IO::S3IO() {
52
53 }
54
~S3IO()55 S3IO::~S3IO() {
56
57 }
58
initiateMultipart(IOChainContext & iocontext)59 std::string S3IO::initiateMultipart(IOChainContext & iocontext) {
60 Uri url(iocontext._uri);
61 url.addQueryParam("uploads", "");
62
63 return initiateMultipart(iocontext, url);
64 }
65
initiateMultipart(IOChainContext & iocontext,const Uri & url)66 std::string S3IO::initiateMultipart(IOChainContext & iocontext, const Uri &url) {
67 DavixError * tmp_err=NULL;
68
69 PostRequest req(iocontext._context, url, &tmp_err);
70 checkDavixError(&tmp_err);
71
72 req.setParameters(iocontext._reqparams);
73 req.setRequestBody("");
74 req.executeRequest(&tmp_err);
75 if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
76 httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
77 "write error: ", &tmp_err);
78 }
79 checkDavixError(&tmp_err);
80
81 std::string response = req.getAnswerContent();
82 S3MultiPartInitiationParser parser;
83 if(parser.parseChunk(response) != 0) {
84 DavixError::setupError(&tmp_err, "S3::MultiPart", StatusCode::InvalidServerResponse, "Unable to parse server response for multi-part initiation");
85 }
86 checkDavixError(&tmp_err);
87
88
89 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Obtained multi-part upload id {} for {}", parser.getUploadId(), iocontext._uri);
90 return parser.getUploadId();
91 }
92
writeChunk(IOChainContext & iocontext,const char * buff,dav_size_t size,const std::string & uploadId,int partNumber)93 std::string S3IO::writeChunk(IOChainContext & iocontext, const char* buff, dav_size_t size, const std::string &uploadId, int partNumber) {
94 Uri url(iocontext._uri);
95 url.addQueryParam("uploadId", uploadId);
96 url.addQueryParam("partNumber", SSTR(partNumber));
97
98 return writeChunk(iocontext, buff, size, url, partNumber);
99 }
100
writeChunk(IOChainContext & iocontext,const char * buff,dav_size_t size,const Uri & url,int partNumber)101 std::string S3IO::writeChunk(IOChainContext & iocontext, const char* buff, dav_size_t size, const Uri &url, int partNumber) {
102 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "writing chunk #{} with size {}", partNumber, size);
103
104 DavixError * tmp_err=NULL;
105 PutRequest req(iocontext._context, url, &tmp_err);
106 checkDavixError(&tmp_err);
107
108 req.setParameters(iocontext._reqparams);
109 req.setRequestBody(buff, size);
110 req.executeRequest(&tmp_err);
111 if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
112 httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
113 "write error: ", &tmp_err);
114 }
115 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "write result size {}", size);
116 checkDavixError(&tmp_err);
117
118 std::string etag;
119 if(!req.getAnswerHeader("Etag", etag)) {
120 DavixError::setupError(&tmp_err, "S3::MultiPart", StatusCode::InvalidServerResponse, "Unable to retrieve chunk Etag, necessary when committing chunks");
121 }
122
123 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "chunk #{} written successfully, etag: {}", partNumber, etag);
124 return etag;
125 }
126
commitChunks(IOChainContext & iocontext,const std::string & uploadId,const std::vector<std::string> & etags)127 void S3IO::commitChunks(IOChainContext & iocontext, const std::string &uploadId, const std::vector<std::string> &etags) {
128 Uri url(iocontext._uri);
129 url.addQueryParam("uploadId", uploadId);
130
131 return commitChunks(iocontext, url, etags);
132 }
133
commitChunks(IOChainContext & iocontext,const Uri & url,const std::vector<std::string> & etags)134 void S3IO::commitChunks(IOChainContext & iocontext, const Uri &url, const std::vector<std::string> &etags) {
135 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "committing {} chunks", etags.size());
136
137 std::ostringstream payload;
138 payload << "<CompleteMultipartUpload>";
139 for(size_t i = 1; i <= etags.size(); i++) {
140 payload << "<Part>";
141 payload << "<PartNumber>" << i << "</PartNumber>";
142 payload << "<ETag>" << etags[i-1] << "</ETag>";
143 payload << "</Part>";
144 }
145 payload << "</CompleteMultipartUpload>";
146
147 DavixError * tmp_err=NULL;
148 PostRequest req(iocontext._context, url, &tmp_err);
149 req.setParameters(iocontext._reqparams);
150 req.setRequestBody(payload.str());
151 req.executeRequest(&tmp_err);
152
153 if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
154 httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
155 "write error: ", &tmp_err);
156 }
157 checkDavixError(&tmp_err);
158 }
159
readFunction(int fd,void * buffer,dav_size_t size)160 static dav_ssize_t readFunction(int fd, void* buffer, dav_size_t size) {
161 dav_ssize_t ret = ::read(fd, buffer, size);
162 if(ret < 0) {
163 int myerr = errno;
164 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Error in readFunction when attempting to read from fd {}: Return code {}, errno: {}", fd, ret, myerr);
165 }
166 return ret;
167 }
168
writeFromFd(IOChainContext & iocontext,int fd,dav_size_t size)169 dav_ssize_t S3IO::writeFromFd(IOChainContext & iocontext, int fd, dav_size_t size) {
170 if(!should_use_s3_multipart(iocontext, size)) {
171 CHAIN_FORWARD(writeFromFd(iocontext, fd, size));
172 }
173
174 using std::placeholders::_1;
175 using std::placeholders::_2;
176
177 DataProviderFun providerFunc = std::bind(readFunction, fd, _1, _2);
178 return this->writeFromCb(iocontext, providerFunc, size);
179 }
180
fillBufferWithProviderData(std::vector<char> & buffer,const dav_size_t maxChunkSize,const DataProviderFun & func)181 static dav_size_t fillBufferWithProviderData(std::vector<char> &buffer, const dav_size_t maxChunkSize, const DataProviderFun &func) {
182 dav_size_t written = 0u;
183 dav_size_t remaining = maxChunkSize;
184
185 while(true) {
186 dav_ssize_t bytesRead = func(buffer.data(), remaining);
187 if(bytesRead < 0) {
188 throw DavixException(davix_scope_io_buff(), StatusCode::InvalidFileHandle, fmt::format("Error when reading from callback: {}", bytesRead));
189 }
190
191 remaining -= bytesRead;
192 written += bytesRead;
193
194 if(bytesRead == 0) {
195 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Reached data provider EOF, received 0 bytes, even though asked for {}", remaining);
196 break; // EOF
197 }
198
199 if(remaining == 0) {
200 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Data provider buffer has been filled");
201 break; // buffer is full
202 }
203 }
204
205 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Retrieved {} bytes from data provider", written);
206 return written;
207 }
208
writeFromCb(IOChainContext & iocontext,const DataProviderFun & func,dav_size_t size)209 dav_ssize_t S3IO::writeFromCb(IOChainContext & iocontext, const DataProviderFun & func, dav_size_t size) {
210 if(!should_use_s3_multipart(iocontext, size)) {
211 CHAIN_FORWARD(writeFromCb(iocontext, func, size));
212 }
213
214 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "Initiating multi-part upload towards {} to upload file with size {}", iocontext._uri, size);
215 std::string uploadId = initiateMultipart(iocontext);
216
217 size_t remaining = size;
218 const dav_size_t MAX_CHUNK_SIZE = 1024 * 1024 * 256; // 256 MB
219
220 std::vector<char> buffer;
221 buffer.resize(std::min(MAX_CHUNK_SIZE, size) + 10);
222
223 std::vector<std::string> etags;
224
225 size_t partNumber = 0;
226 while(remaining > 0) {
227 size_t toRead = std::min(size, MAX_CHUNK_SIZE);
228 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "S3IO write: toRead from cb {}", toRead);
229
230 dav_size_t bytesRead = fillBufferWithProviderData(buffer, MAX_CHUNK_SIZE, func);
231 if(bytesRead == 0) break; // EOF
232
233 partNumber++;
234 etags.emplace_back(writeChunk(iocontext, buffer.data(), bytesRead, uploadId, partNumber));
235 }
236
237 commitChunks(iocontext, uploadId, etags);
238 }
239
retrieveDynafedUris(IOChainContext & iocontext,const std::string & uploadId,const std::string & pluginId,size_t nchunks)240 DynafedUris S3IO::retrieveDynafedUris(IOChainContext & iocontext, const std::string &uploadId, const std::string &pluginId, size_t nchunks) {
241 DynafedUris retval;
242
243 DavixError *tmp_err = NULL;
244 PutRequest req(iocontext._context, iocontext._uri, &tmp_err);
245 checkDavixError(&tmp_err);
246
247 req.setParameters(iocontext._reqparams);
248 req.addHeaderField("x-s3-uploadid", uploadId);
249 req.addHeaderField("x-ugrpluginid", pluginId);
250 req.addHeaderField("x-s3-upload-nchunks", SSTR(nchunks));
251 req.executeRequest(&tmp_err);
252
253 if(!tmp_err && httpcodeIsValid(req.getRequestCode()) == false){
254 httpcodeToDavixError(req.getRequestCode(), davix_scope_io_buff(),
255 "write error: ", &tmp_err);
256 }
257 checkDavixError(&tmp_err);
258
259 // We have a response, parse it into lines
260 retval.chunks = StrUtil::tokenSplit(req.getAnswerContent(), "\n");
261 if(!retval.chunks.empty()) {
262 retval.post = retval.chunks.back();
263 retval.chunks.pop_back();
264 }
265
266 DAVIX_SLOG(DAVIX_LOG_DEBUG, DAVIX_LOG_CHAIN, "retrieveDynafedUris: Obtained list with {} chunk URIs in total", retval.chunks.size());
267 return retval;
268 }
269
performUgrS3MultiPart(IOChainContext & iocontext,const std::string & posturl,const std::string & pluginId,const DataProviderFun & func,dav_size_t size,DavixError ** err)270 void S3IO::performUgrS3MultiPart(IOChainContext & iocontext, const std::string &posturl, const std::string &pluginId, const DataProviderFun &func, dav_size_t size, DavixError **err) {
271 try {
272 Uri uri(posturl);
273 std::string uploadId = initiateMultipart(iocontext, posturl);
274
275 const dav_size_t MAX_CHUNK_SIZE = 1024 * 1024 * 256; // 256 MB
276 std::vector<char> buffer;
277 buffer.resize(std::min(MAX_CHUNK_SIZE, size) + 10);
278
279 size_t nchunks = (size / MAX_CHUNK_SIZE) + 2;
280 DynafedUris uris = retrieveDynafedUris(iocontext, uploadId, pluginId, nchunks);
281
282 if(uris.chunks.size() != nchunks) {
283 DAVIX_SLOG(DAVIX_LOG_WARNING, DAVIX_LOG_CHAIN, "Dynafed returned different number of URIs than expected: {} vs {]", "} retrieveDynafedUris: Obtained list with {} chunk URIs in total", uris.chunks.size(), nchunks);
284 throw DavixException("S3::MultiPart", StatusCode::InvalidServerResponse, "Dynafed returned different number of URIs than expected");
285 }
286
287 std::vector<std::string> etags;
288 size_t partNumber = 1;
289 uint64_t remaining = size;
290
291 while(remaining > 0) {
292 dav_size_t bytesRetrieved = fillBufferWithProviderData(buffer, MAX_CHUNK_SIZE, func);
293 if(bytesRetrieved == 0) {
294 break; // EOF
295 }
296
297 etags.emplace_back(writeChunk(iocontext, buffer.data(), bytesRetrieved, Uri(uris.chunks[partNumber-1]), partNumber));
298 partNumber++;
299 remaining -= bytesRetrieved;
300 }
301
302 commitChunks(iocontext, Uri(uris.post), etags);
303 }
304 CATCH_DAVIX(err);
305 }
306
307 }
308