1 /**
2 * @file mega/raid.cpp
3 * @brief helper classes for managing cloudraid downloads
4 *
5 * (c) 2013-2019 by Mega Limited, Auckland, New Zealand
6 *
7 * This file is part of the MEGA SDK - Client Access Engine.
8 *
9 * Applications using the MEGA API must present a valid application key
10 * and comply with the the rules set forth in the Terms of Service.
11 *
12 * The MEGA SDK is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 *
16 * @copyright Simplified (2-clause) BSD License.
17 *
18 * You should have received a copy of the license along with this
19 * program.
20 */
21
22 #include "mega/raid.h"
23
24 #include "mega/transfer.h"
25 #include "mega/testhooks.h"
26 #include "mega.h" // for thread definitions
27
28 #undef min //avoids issues with std::min
29
30 namespace mega
31 {
32
33 const unsigned RAID_ACTIVE_CHANNEL_FAIL_THRESHOLD = 5;
34
35 struct FaultyServers
36 {
37 // Records URLs that had recent problems, so we can start the next raid download with URLs that can work first try.
38 // In particular this is useful when one server in a raid set is unavailable for an extended period
39 // This class may be shared amongst many megaclients, so thread safety is needed
40 typedef map<string, m_time_t> Map;
41 Map recentFails;
42 std::mutex m_mutex;
43
servermega::FaultyServers44 string server(const string& url)
45 {
46 size_t n = url.find("://");
47 if (n != string::npos)
48 {
49 n += 3;
50 size_t m = url.find("/", n);
51 if (m != string::npos)
52 {
53 return url.substr(n, m - n);
54 }
55 }
56 return "";
57 }
58
addmega::FaultyServers59 void add(const string& url)
60 {
61 std::lock_guard<std::mutex> g(m_mutex);
62 recentFails[server(url)] = m_time();
63 }
64
65 /**
66 * @brief Select the worst server based on records of recent failures
67 * @param urls The set of URLs to check against previosly failing servers
68 * @return The index from 0 to 5, or 6 (RAIDPARTS) if none of the URLs have failed recently.
69 */
selectWorstServermega::FaultyServers70 unsigned selectWorstServer(vector<string> urls)
71 {
72 // start with 6 connections and drop the slowest to respond, build the file from the other 5.
73 // (unless we recently had problems with the server of one of the 6 URLs, in which case start with the other 5 right away)
74 unsigned worstindex = RAIDPARTS;
75
76 std::lock_guard<std::mutex> g(m_mutex);
77 if (!recentFails.empty())
78 {
79 m_time_t now = m_time();
80 m_time_t worsttime = now - 10 * 3600; // 10 hours
81 for (auto i = unsigned(urls.size()); i--; )
82 {
83 Map::iterator j = recentFails.find(server(urls[i]));
84 if (j != recentFails.end() && j->second > worsttime)
85 {
86 // select URL that failed less than 10 hours ago
87 worstindex = i;
88 worsttime = j->second;
89 }
90 }
91
92 // cleanup recentFails from URLs older than 10 hours
93 bool cleanup = false;
94 Map::iterator jj;
95 for (Map::iterator j = recentFails.begin(); j != recentFails.end(); cleanup ? (jj = j, ++j, (void)recentFails.erase(jj)) : (void)++j)
96 {
97 cleanup = j->second < (now - 10 * 3600);
98 }
99 }
100
101 return worstindex;
102 }
103
104 };
105
106 FaultyServers g_faultyServers;
107
108
FilePiece()109 RaidBufferManager::FilePiece::FilePiece()
110 : pos(0)
111 , buf(NULL, 0, 0)
112 {
113 }
114
FilePiece(m_off_t p,size_t len)115 RaidBufferManager::FilePiece::FilePiece(m_off_t p, size_t len)
116 : pos(p)
117 , buf(new byte[len + std::min<size_t>(SymmCipher::BLOCKSIZE, RAIDSECTOR)], 0, len) // SymmCipher::ctr_crypt requirement: decryption: data must be padded to BLOCKSIZE. Also make sure we can xor up to RAIDSECTOR more for convenience
118 {
119 }
120
121
FilePiece(m_off_t p,HttpReq::http_buf_t * b)122 RaidBufferManager::FilePiece::FilePiece(m_off_t p, HttpReq::http_buf_t* b) // taking ownership
123 : pos(p)
124 , buf(NULL, 0, 0)
125 {
126 buf.swap(*b); // take its buffer and copy other members
127 delete b; // client no longer owns it so we must delete. Similar to move semantics where we would just assign
128 }
129
swap(FilePiece & other)130 void RaidBufferManager::FilePiece::swap(FilePiece& other)
131 {
132 m_off_t tp = pos; pos = other.pos; other.pos = tp;
133 chunkmacs.swap(other.chunkmacs);
134 buf.swap(other.buf);
135 }
136
RaidBufferManager()137 RaidBufferManager::RaidBufferManager()
138 : is_raid(false)
139 , raidKnown(false)
140 , raidLinesPerChunk(16 * 1024)
141 , unusedRaidConnection(0)
142 , raidpartspos(0)
143 , outputfilepos(0)
144 , startfilepos(0)
145 , resumewastedbytes(0)
146 {
147 for (int i = RAIDPARTS; i--; )
148 {
149 raidrequestpartpos[i] = 0;
150 connectionPaused[i] = false;
151 raidHttpGetErrorCount[i] = 0;
152 connectionStarted[i] = false;
153 }
154 }
155
clearOwningFilePieces(std::deque<RaidBufferManager::FilePiece * > & q)156 static void clearOwningFilePieces(std::deque<RaidBufferManager::FilePiece*>& q)
157 {
158 for (std::deque<RaidBufferManager::FilePiece*>::iterator i = q.begin(); i != q.end(); ++i)
159 {
160 delete *i;
161 }
162 q.clear();
163 }
164
~RaidBufferManager()165 RaidBufferManager::~RaidBufferManager()
166 {
167 for (int i = RAIDPARTS; i--; )
168 {
169 clearOwningFilePieces(raidinputparts[i]);
170 }
171 }
172
setIsRaid(const std::vector<std::string> & tempUrls,m_off_t resumepos,m_off_t readtopos,m_off_t filesize,m_off_t maxRequestSize)173 void RaidBufferManager::setIsRaid(const std::vector<std::string>& tempUrls, m_off_t resumepos, m_off_t readtopos, m_off_t filesize, m_off_t maxRequestSize)
174 {
175 assert(tempUrls.size() == RAIDPARTS || tempUrls.size() == 1);
176 assert(0 <= resumepos && resumepos <= readtopos && readtopos <= filesize);
177 assert(!raidKnown);
178
179 tempurls = tempUrls;
180
181 is_raid = tempurls.size() == RAIDPARTS;
182 raidKnown = true;
183 fullfilesize = filesize;
184 deliverlimitpos = readtopos;
185 acquirelimitpos = deliverlimitpos + RAIDLINE - 1;
186 acquirelimitpos -= acquirelimitpos % RAIDLINE;
187 acquirelimitpos = std::min<m_off_t>(acquirelimitpos, fullfilesize);
188 outputfilepos = resumepos;
189 startfilepos = resumepos;
190 if (is_raid)
191 {
192 raidpartspos = resumepos / (RAIDPARTS - 1);
193 raidpartspos -= raidpartspos % RAIDSECTOR;
194 resumewastedbytes = size_t(outputfilepos - raidpartspos * (RAIDPARTS - 1));
195 outputfilepos -= resumewastedbytes; // we'll skip over these bytes on the first output
196 for (int i = RAIDPARTS; i--; )
197 {
198 raidrequestpartpos[i] = raidpartspos;
199 }
200
201 // How much buffer space can we use. Assuming two chunk sets incoming, one outgoing
202 raidLinesPerChunk = unsigned(maxRequestSize / (RAIDPARTS * 3 * RAIDSECTOR));
203 raidLinesPerChunk -= raidLinesPerChunk % 1024;
204 raidLinesPerChunk = std::min<unsigned>(raidLinesPerChunk, 64 * 1024);
205 raidLinesPerChunk = std::max<unsigned>(raidLinesPerChunk, 8 * 1024);
206
207 unusedRaidConnection = g_faultyServers.selectWorstServer(tempurls);
208 }
209
210 DEBUG_TEST_HOOK_RAIDBUFFERMANAGER_SETISRAID(this)
211 }
212
updateUrlsAndResetPos(const std::vector<std::string> & tempUrls)213 void RaidBufferManager::updateUrlsAndResetPos(const std::vector<std::string>& tempUrls)
214 {
215 // A request to restart from whereever we got to, with new URLs.
216 // the old requested-to pos is not valid anymore, as one or more Http requests failed or were abandoned
217 assert(tempurls.size() == tempUrls.size());
218 if (tempurls.size() == tempUrls.size())
219 {
220 tempurls = tempUrls;
221 if (isRaid())
222 {
223 for (unsigned i = RAIDPARTS; i--; )
224 {
225 std::deque<FilePiece*>& connectionpieces = raidinputparts[i];
226 transferPos(i) = connectionpieces.empty() ? raidpartspos : connectionpieces.back()->pos + connectionpieces.back()->buf.datalen();
227 }
228 }
229 else
230 {
231 transferPos(0) = outputfilepos; // if there is any data waiting in asyncoutputbuffers this value is alreday ahead of it
232 }
233 }
234 }
235
isRaid() const236 bool RaidBufferManager::isRaid() const
237 {
238 assert(raidKnown);
239 return is_raid;
240 }
241
tempURL(unsigned connectionNum)242 const std::string& RaidBufferManager::tempURL(unsigned connectionNum)
243 {
244 if (isRaid())
245 {
246 assert(connectionNum < tempurls.size());
247 return tempurls[connectionNum];
248 }
249 else if (!tempurls.empty())
250 {
251 return tempurls[0];
252 }
253 else
254 {
255 assert(false); // this class shouldn't be used until we have the URLs, but don't crash
256 return emptyReturnString;
257 }
258 }
259
tempUrlVector() const260 const std::vector<std::string>& RaidBufferManager::tempUrlVector() const
261 {
262 return tempurls;
263 }
264
265 // takes ownership of the buffer
submitBuffer(unsigned connectionNum,FilePiece * piece)266 void RaidBufferManager::submitBuffer(unsigned connectionNum, FilePiece* piece)
267 {
268 if (isRaid())
269 {
270 assert(connectionNum < RAIDPARTS);
271 assert(piece->buf.datalen() % RAIDSECTOR == 0 || piece->pos + m_off_t(piece->buf.datalen()) == raidPartSize(connectionNum, acquirelimitpos));
272 if (!piece->buf.isNull())
273 {
274 raidHttpGetErrorCount[connectionNum] = 0;
275 }
276
277 std::deque<FilePiece*>& connectionpieces = raidinputparts[connectionNum];
278 m_off_t contiguouspos = connectionpieces.empty() ? raidpartspos : connectionpieces.back()->pos + connectionpieces.back()->buf.datalen();
279
280 assert(piece->pos == contiguouspos);
281 if (piece->pos == contiguouspos)
282 {
283 transferPos(connectionNum) = piece->pos + piece->buf.datalen(); // in case of download piece arriving after connection failure recovery
284 raidinputparts[connectionNum].push_back(piece);
285 }
286 }
287 else
288 {
289 finalize(*piece);
290 assert(asyncoutputbuffers.find(connectionNum) == asyncoutputbuffers.end() || !asyncoutputbuffers[connectionNum]);
291 asyncoutputbuffers[connectionNum].reset(piece);
292 }
293 }
294
getAsyncOutputBufferPointer(unsigned connectionNum)295 std::shared_ptr<RaidBufferManager::FilePiece> RaidBufferManager::getAsyncOutputBufferPointer(unsigned connectionNum)
296 {
297 auto i = asyncoutputbuffers.find(connectionNum);
298 if (isRaid() && (i == asyncoutputbuffers.end() || !i->second))
299 {
300 combineRaidParts(connectionNum);
301 i = asyncoutputbuffers.find(connectionNum);
302 }
303 return (i == asyncoutputbuffers.end()) ? NULL : i->second;
304 }
305
306
bufferWriteCompleted(unsigned connectionNum,bool success)307 void RaidBufferManager::bufferWriteCompleted(unsigned connectionNum, bool success)
308 {
309 auto aob = asyncoutputbuffers.find(connectionNum);
310 if (aob != asyncoutputbuffers.end())
311 {
312 assert(aob->second);
313 if (aob->second)
314 {
315 if (success)
316 {
317 bufferWriteCompletedAction(*aob->second);
318 }
319
320 aob->second.reset();
321 }
322 }
323 }
324
bufferWriteCompletedAction(FilePiece &)325 void RaidBufferManager::bufferWriteCompletedAction(FilePiece&)
326 {
327 // overridden for Transfers
328 }
329
transferPos(unsigned connectionNum)330 m_off_t& RaidBufferManager::transferPos(unsigned connectionNum)
331 {
332 assert(isRaid());
333 return raidrequestpartpos[connectionNum];
334 }
335
336
nextNPosForConnection(unsigned connectionNum,bool & newInputBufferSupplied,bool & pauseConnectionForRaid)337 std::pair<m_off_t, m_off_t> RaidBufferManager::nextNPosForConnection(unsigned connectionNum, bool& newInputBufferSupplied, bool& pauseConnectionForRaid)
338 {
339 // returning a pair for clarity - specifying the beginning and end position of the next data block, as the 'current pos' may be updated during this function
340 newInputBufferSupplied = false;
341 pauseConnectionForRaid = false;
342
343 if (!isRaid())
344 {
345 return std::make_pair(transferPos(connectionNum), deliverlimitpos); // simple case for non-raid direct streaming, get the entire portion of the file requested in one http get
346 }
347 else // raid
348 {
349 m_off_t curpos = transferPos(connectionNum); // if we use submitBuffer, transferPos() may be updated to protect against single connection failure recovery
350 m_off_t maxpos = transferSize(connectionNum);
351
352 // if this connection gets too far ahead of the others, pause it until the others catch up a bit
353 if ((curpos >= raidpartspos + RaidReadAheadChunksPausePoint * raidLinesPerChunk * RAIDSECTOR) ||
354 (curpos > raidpartspos + RaidReadAheadChunksUnpausePoint * raidLinesPerChunk * RAIDSECTOR && connectionPaused[connectionNum]))
355 {
356 connectionPaused[connectionNum] = true;
357 pauseConnectionForRaid = true;
358 return std::make_pair(curpos, curpos);
359 }
360 else
361 {
362 connectionPaused[connectionNum] = false;
363 }
364
365 m_off_t npos = std::min<m_off_t>(curpos + raidLinesPerChunk * RAIDSECTOR * RaidMaxChunksPerRead, maxpos);
366 if (unusedRaidConnection == connectionNum && npos > curpos)
367 {
368 submitBuffer(connectionNum, new RaidBufferManager::FilePiece(curpos, new HttpReq::http_buf_t(NULL, 0, size_t(npos - curpos))));
369 transferPos(connectionNum) = npos;
370 newInputBufferSupplied = true;
371 }
372 return std::make_pair(curpos, std::min<m_off_t>(npos, maxpos));
373 }
374 }
375
resetPart(unsigned connectionNum)376 void RaidBufferManager::resetPart(unsigned connectionNum)
377 {
378 assert(isRaid());
379 transferPos(connectionNum) = raidpartspos;
380
381 // if we are downloading many files at once, eg. initial sync, or large manual folder, it's better to just use 5 connections immediately after the first
382 g_faultyServers.add(tempurls[connectionNum]);
383 }
384
385
transferSize(unsigned connectionNum)386 m_off_t RaidBufferManager::transferSize(unsigned connectionNum)
387 {
388 if (isRaid())
389 {
390 return raidPartSize(connectionNum, acquirelimitpos);
391 }
392 else
393 {
394 return fullfilesize;
395 }
396 }
397
raidPartSize(unsigned part,m_off_t filesize)398 m_off_t RaidBufferManager::raidPartSize(unsigned part, m_off_t filesize)
399 {
400 // compute the size of this raid part based on the original file size len
401 m_off_t r = filesize % RAIDLINE; // residual part
402
403 m_off_t t = r - (part - !!part)*RAIDSECTOR; // parts 0 (parity) & 1 (largest data) are the same size
404
405 // (excess length will be found in the following sectors,
406 // negative length in the preceding sectors)
407 if (t < 0)
408 {
409 t = 0;
410 }
411 else if (t > RAIDSECTOR)
412 {
413 t = RAIDSECTOR;
414 }
415
416 return (filesize - r) / (RAIDPARTS - 1) + t;
417 }
418
419
combineRaidParts(unsigned connectionNum)420 void RaidBufferManager::combineRaidParts(unsigned connectionNum)
421 {
422 assert(asyncoutputbuffers.find(connectionNum) == asyncoutputbuffers.end() || !asyncoutputbuffers[connectionNum]);
423 assert(raidpartspos * (RAIDPARTS - 1) == outputfilepos + m_off_t(leftoverchunk.buf.datalen()));
424
425 size_t partslen = 0x10000000, sumdatalen = 0, xorlen = 0;
426 for (unsigned i = RAIDPARTS; i--; )
427 {
428 if (raidinputparts[i].empty())
429 {
430 partslen = 0;
431 }
432 else
433 {
434 FilePiece& r = *raidinputparts[i].front();
435 assert(r.pos == raidpartspos); // check all are in sync at the front
436 partslen = std::min<size_t>(partslen, r.buf.datalen());
437 (i > 0 ? sumdatalen : xorlen) += r.buf.datalen();
438 }
439 }
440 partslen -= partslen % RAIDSECTOR; // restrict to raidline boundary
441
442 // for correct mac processing, we need to process the output file in pieces delimited by the chunkfloor / chunkceil algorithm
443 m_off_t newdatafilepos = outputfilepos + leftoverchunk.buf.datalen();
444 assert(newdatafilepos + m_off_t(sumdatalen) <= acquirelimitpos);
445 bool processToEnd = (newdatafilepos + m_off_t(sumdatalen) == acquirelimitpos) // data to the end
446 && (newdatafilepos / (RAIDPARTS - 1) + m_off_t(xorlen) == raidPartSize(0, acquirelimitpos)); // parity to the end
447
448 assert(!partslen || !processToEnd || sumdatalen - partslen * (RAIDPARTS - 1) <= RAIDLINE);
449
450 if (partslen > 0 || processToEnd)
451 {
452 m_off_t macchunkpos = calcOutputChunkPos(newdatafilepos + partslen * (RAIDPARTS - 1));
453
454 size_t buflen = static_cast<size_t>(processToEnd ? sumdatalen : partslen * (RAIDPARTS - 1));
455 FilePiece* outputrec = combineRaidParts(partslen, buflen, outputfilepos, leftoverchunk); // includes a bit of extra space for non-full sectors if we are at the end of the file
456 rollInputBuffers(partslen);
457 raidpartspos += partslen;
458 sumdatalen -= partslen * (RAIDPARTS - 1);
459 outputfilepos += partslen * (RAIDPARTS - 1) + leftoverchunk.buf.datalen();
460 byte* dest = outputrec->buf.datastart() + partslen * (RAIDPARTS - 1) + leftoverchunk.buf.datalen();
461 FilePiece emptyFilePiece;
462 leftoverchunk.swap(emptyFilePiece); // this data is entirely included in the outputrec now, so discard and reset
463
464 if (processToEnd && sumdatalen > 0)
465 {
466 // fill in the last of the buffer with non-full sectors from the end of the file
467 assert(outputfilepos + m_off_t(sumdatalen) == acquirelimitpos);
468 combineLastRaidLine(dest, sumdatalen);
469 rollInputBuffers(RAIDSECTOR);
470 }
471 else if (!processToEnd && outputfilepos > macchunkpos)
472 {
473 // for transfers we do mac processing which must be done in chunks, delimited by chunkfloor and chunkceil. If we don't have the right amount then hold the remainder over for next time.
474 size_t excessdata = static_cast<size_t>(outputfilepos - macchunkpos);
475 FilePiece newleftover(outputfilepos - excessdata, excessdata);
476 leftoverchunk.swap(newleftover);
477 memcpy(leftoverchunk.buf.datastart(), outputrec->buf.datastart() + outputrec->buf.datalen() - excessdata, excessdata);
478 outputrec->buf.end -= excessdata;
479 outputfilepos -= excessdata;
480 assert(raidpartspos * (RAIDPARTS - 1) == outputfilepos + m_off_t(leftoverchunk.buf.datalen()));
481 }
482
483 // discard any excess data that we had to fetch when resuming a file (to align the parts appropriately)
484 size_t n = std::min<size_t>(outputrec->buf.datalen(), resumewastedbytes);
485 if (n > 0)
486 {
487 outputrec->pos += n;
488 outputrec->buf.start += n;
489 resumewastedbytes -= n;
490 }
491
492 // don't deliver any excess data that we needed for parity calculations in the last raid line
493 if (outputrec->pos + m_off_t(outputrec->buf.datalen()) > deliverlimitpos)
494 {
495 size_t excess = size_t(outputrec->pos + outputrec->buf.datalen() - deliverlimitpos);
496 excess = std::min<size_t>(excess, outputrec->buf.datalen());
497 outputrec->buf.end -= excess;
498 }
499
500 // store the result in a place that can be read out async
501 if (outputrec->buf.datalen() > 0)
502 {
503 finalize(*outputrec);
504 asyncoutputbuffers[connectionNum].reset(outputrec);
505 }
506 else
507 {
508 delete outputrec; // this would happen if we got some data to process on all connections, but not enough to reach the next chunk boundary yet (and combined data is in leftoverchunk)
509 }
510 }
511 }
512
combineRaidParts(size_t partslen,size_t bufflen,m_off_t filepos,FilePiece & prevleftoverchunk)513 RaidBufferManager::FilePiece* RaidBufferManager::combineRaidParts(size_t partslen, size_t bufflen, m_off_t filepos, FilePiece& prevleftoverchunk)
514 {
515 assert(prevleftoverchunk.buf.datalen() == 0 || prevleftoverchunk.pos == filepos);
516
517 // add a bit of extra space and copy prev chunk to the front
518 FilePiece* result = new FilePiece(filepos, bufflen + prevleftoverchunk.buf.datalen());
519 if (prevleftoverchunk.buf.datalen() > 0)
520 {
521 memcpy(result->buf.datastart(), prevleftoverchunk.buf.datastart(), prevleftoverchunk.buf.datalen());
522 }
523
524 // usual case, for simple and fast processing: all input buffers are the same size, and aligned, and a multiple of raidsector
525 if (partslen > 0)
526 {
527 byte* inputbufs[RAIDPARTS];
528 for (unsigned i = RAIDPARTS; i--; )
529 {
530 FilePiece* inputPiece = raidinputparts[i].front();
531 inputbufs[i] = inputPiece->buf.isNull() ? NULL : inputPiece->buf.datastart();
532 }
533
534 byte* b = result->buf.datastart() + prevleftoverchunk.buf.datalen();
535 byte* endpos = b + partslen * (RAIDPARTS-1);
536
537 for (unsigned i = 0; b < endpos; i += RAIDSECTOR)
538 {
539 for (unsigned j = 1; j < RAIDPARTS; ++j)
540 {
541 assert(b + RAIDSECTOR <= result->buf.datastart() + result->buf.datalen());
542 if (inputbufs[j])
543 {
544 memcpy(b, inputbufs[j] + i, RAIDSECTOR);
545 }
546 else
547 {
548 recoverSectorFromParity(b, inputbufs, i);
549 }
550 b += RAIDSECTOR;
551 }
552 }
553 assert(b == endpos);
554 }
555 return result;
556 }
557
recoverSectorFromParity(byte * dest,byte * inputbufs[],unsigned offset)558 void RaidBufferManager::recoverSectorFromParity(byte* dest, byte* inputbufs[], unsigned offset)
559 {
560 assert(sizeof(m_off_t)*2 == RAIDSECTOR);
561 bool set = false;
562 for (unsigned i = RAIDPARTS; i--; )
563 {
564 if (inputbufs[i])
565 {
566 if (!set)
567 {
568 memcpy(dest, inputbufs[i] + offset, RAIDSECTOR);
569 set = true;
570 }
571 else
572 {
573 *(m_off_t*)dest ^= *(m_off_t*)(inputbufs[i] + offset);
574 *(m_off_t*)(dest + sizeof(m_off_t)) ^= *(m_off_t*)(inputbufs[i] + offset + sizeof(m_off_t));
575 }
576 }
577 }
578 }
579
combineLastRaidLine(byte * dest,size_t remainingbytes)580 void RaidBufferManager::combineLastRaidLine(byte* dest, size_t remainingbytes)
581 {
582 // we have to be careful to use the right number of bytes from each sector
583 for (unsigned i = 1; i < RAIDPARTS && remainingbytes > 0; ++i)
584 {
585 if (!raidinputparts[i].empty())
586 {
587 FilePiece* sector = raidinputparts[i].front();
588 size_t n = std::min(remainingbytes, sector->buf.datalen());
589 if (!sector->buf.isNull())
590 {
591 memcpy(dest, sector->buf.datastart(), n);
592 }
593 else
594 {
595 memset(dest, 0, n);
596 for (unsigned j = RAIDPARTS; j--; )
597 {
598 if (!raidinputparts[j].empty() && !raidinputparts[j].front()->buf.isNull())
599 {
600 FilePiece* xs = raidinputparts[j].front();
601 for (size_t x = std::min(n, xs->buf.datalen()); x--; )
602 {
603 dest[x] ^= xs->buf.datastart()[x];
604 }
605 }
606 }
607 }
608 dest += n;
609 remainingbytes -= n;
610 }
611 }
612 }
613
rollInputBuffers(size_t dataToDiscard)614 void RaidBufferManager::rollInputBuffers(size_t dataToDiscard)
615 {
616 // remove finished input buffers
617 for (unsigned i = RAIDPARTS; i--; )
618 {
619 if (!raidinputparts[i].empty())
620 {
621 FilePiece& ip = *raidinputparts[i].front();
622 ip.buf.start += dataToDiscard;
623 ip.pos += dataToDiscard;
624 if (ip.buf.start >= ip.buf.end)
625 {
626 delete raidinputparts[i].front();
627 raidinputparts[i].pop_front();
628 }
629 }
630 }
631 }
632
calcOutputChunkPos(m_off_t acquiredpos)633 m_off_t TransferBufferManager::calcOutputChunkPos(m_off_t acquiredpos)
634 {
635 return ChunkedHash::chunkfloor(acquiredpos); // we can only mac to the chunk boundary, hold the rest over
636 }
637
638 // decrypt, mac downloaded chunk
finalize(bool parallel,m_off_t filesize,int64_t ctriv,SymmCipher * cipher,chunkmac_map * source_chunkmacs)639 bool RaidBufferManager::FilePiece::finalize(bool parallel, m_off_t filesize, int64_t ctriv, SymmCipher *cipher, chunkmac_map* source_chunkmacs)
640 {
641 assert(!finalized);
642 bool queueParallel = false;
643
644 byte *chunkstart = buf.datastart();
645 m_off_t startpos = pos;
646 m_off_t finalpos = startpos + buf.datalen();
647 assert(finalpos <= filesize);
648 if (finalpos != filesize)
649 {
650 finalpos &= -SymmCipher::BLOCKSIZE;
651 }
652
653 m_off_t endpos = ChunkedHash::chunkceil(startpos, finalpos);
654 unsigned chunksize = static_cast<unsigned>(endpos - startpos);
655
656 while (chunksize)
657 {
658 m_off_t chunkid = ChunkedHash::chunkfloor(startpos);
659 ChunkMAC &chunkmac = chunkmacs[chunkid];
660 if (!chunkmac.finished)
661 {
662 if (source_chunkmacs)
663 {
664 chunkmac = (*source_chunkmacs)[chunkid];
665 }
666 if (endpos == ChunkedHash::chunkceil(chunkid, filesize))
667 {
668 if (parallel)
669 {
670 // these parts can be done on a thread - they are independent chunks, or the earlier part of the chunk is already done.
671 cipher->ctr_crypt(chunkstart, chunksize, startpos, ctriv, chunkmac.mac, false, !chunkmac.finished && !chunkmac.offset);
672 LOG_debug << "Finished chunk: " << startpos << " - " << endpos << " Size: " << chunksize;
673 chunkmac.finished = true;
674 chunkmac.offset = 0;
675 }
676 else
677 {
678 queueParallel = true;
679 }
680 }
681 else if (!parallel)
682 {
683 // these part chunks must be done serially (and first), since later parts of a chunk need the mac of earlier parts as input.
684 cipher->ctr_crypt(chunkstart, chunksize, startpos, ctriv, chunkmac.mac, false, !chunkmac.finished && !chunkmac.offset);
685 LOG_debug << "Decrypted partial chunk: " << startpos << " - " << endpos << " Size: " << chunksize;
686 chunkmac.finished = false;
687 chunkmac.offset += chunksize;
688 }
689 }
690 chunkstart += chunksize;
691 startpos = endpos;
692 endpos = ChunkedHash::chunkceil(startpos, finalpos);
693 chunksize = static_cast<unsigned>(endpos - startpos);
694 }
695
696 finalized = !queueParallel;
697 if (finalized)
698 finalizedCV.notify_one();
699
700 return queueParallel;
701 }
702
finalize(FilePiece & r)703 void TransferBufferManager::finalize(FilePiece& r)
704 {
705 // for transfers (as opposed to DirectRead), decrypt/mac is now done on threads
706 }
707
708
tryRaidHttpGetErrorRecovery(unsigned errorConnectionNum)709 bool RaidBufferManager::tryRaidHttpGetErrorRecovery(unsigned errorConnectionNum)
710 {
711 assert(isRaid());
712
713 raidHttpGetErrorCount[errorConnectionNum] += 1;
714
715 g_faultyServers.add(tempurls[errorConnectionNum]);
716
717 unsigned errorSum = 0;
718 unsigned highestErrors = 0;
719 for (unsigned i = RAIDPARTS; i--; )
720 {
721 errorSum += raidHttpGetErrorCount[i];
722 highestErrors = std::max<unsigned>(highestErrors, raidHttpGetErrorCount[i]);
723 }
724
725 // Allow for one nonfunctional channel and one glitchy channel. We can still make progress swapping back and forth
726 if ((errorSum - highestErrors) < RAID_ACTIVE_CHANNEL_FAIL_THRESHOLD)
727 {
728 if (unusedRaidConnection < RAIDPARTS)
729 {
730 LOG_warn << "5 connection cloudraid shutting down connection " << errorConnectionNum << " due to error, and starting " << unusedRaidConnection << " instead";
731
732 // start up the old unused connection, and cancel this one. Other connections all have real data since we were already in 5 connection mode
733 clearOwningFilePieces(raidinputparts[unusedRaidConnection]);
734 clearOwningFilePieces(raidinputparts[errorConnectionNum]);
735 raidrequestpartpos[unusedRaidConnection] = raidpartspos;
736 raidrequestpartpos[errorConnectionNum] = raidpartspos;
737 }
738 else
739 {
740 LOG_warn << "6 connection cloudraid shutting down connection " << errorConnectionNum << " due to error";
741 clearOwningFilePieces(raidinputparts[errorConnectionNum]);
742 raidrequestpartpos[errorConnectionNum] = raidpartspos;
743 }
744
745 unusedRaidConnection = errorConnectionNum;
746 return true;
747 }
748 else
749 {
750 return false;
751 }
752 }
753
connectionRaidPeersAreAllPaused(unsigned slowConnection)754 bool RaidBufferManager::connectionRaidPeersAreAllPaused(unsigned slowConnection)
755 {
756 if (!isRaid())
757 {
758 return false;
759 }
760
761 // see if one connection is stalled or running much slower than the others, in which case try the other 5 instead
762 // (if already using 5 connections and all of them are paused, except slowConnection, the unusedRaidConnection will
763 // be started again and the slowConnection will become the new unusedRaidConnection)
764 for (unsigned j = RAIDPARTS; j--; )
765 {
766 if (j != slowConnection && j != unusedRaidConnection && !connectionPaused[j])
767 {
768 return false;
769 }
770 }
771 return true;
772 }
773
detectSlowestRaidConnection(unsigned thisConnection,unsigned & slowestConnection)774 bool RaidBufferManager::detectSlowestRaidConnection(unsigned thisConnection, unsigned& slowestConnection)
775 {
776 if (isRaid() && unusedRaidConnection == RAIDPARTS)
777 {
778 connectionStarted[thisConnection] = true;
779 int count = 0;
780 for (unsigned j = RAIDPARTS; j--; )
781 {
782 if (!connectionStarted[j])
783 {
784 slowestConnection = j;
785 ++count;
786 }
787 }
788 if (count == 1)
789 {
790 unusedRaidConnection = slowestConnection;
791 raidrequestpartpos[unusedRaidConnection] = raidpartspos;
792 return true;
793 }
794 }
795 return false;
796 }
797
798
progress() const799 m_off_t RaidBufferManager::progress() const
800 {
801 assert(isRaid());
802 m_off_t reportPos = 0;
803
804 for (unsigned j = RAIDPARTS; j--; )
805 {
806 for (FilePiece* p : raidinputparts[j])
807 {
808 if (!p->buf.isNull())
809 {
810 reportPos += p->buf.datalen();
811 }
812 }
813 }
814
815 return reportPos;
816 }
817
818
TransferBufferManager()819 TransferBufferManager::TransferBufferManager()
820 : transfer(NULL)
821 {
822 }
823
setIsRaid(Transfer * t,std::vector<std::string> & tempUrls,m_off_t resumepos,m_off_t maxRequestSize)824 void TransferBufferManager::setIsRaid(Transfer* t, std::vector<std::string>& tempUrls, m_off_t resumepos, m_off_t maxRequestSize)
825 {
826 RaidBufferManager::setIsRaid(tempUrls, resumepos, t->size, t->size, maxRequestSize);
827
828 transfer = t;
829 }
830
transferPos(unsigned connectionNum)831 m_off_t& TransferBufferManager::transferPos(unsigned connectionNum)
832 {
833 return isRaid() ? RaidBufferManager::transferPos(connectionNum) : transfer->pos;
834 }
nextNPosForConnection(unsigned connectionNum,m_off_t maxRequestSize,unsigned connectionCount,bool & newInputBufferSupplied,bool & pauseConnectionForRaid,m_off_t uploadSpeed)835 std::pair<m_off_t, m_off_t> TransferBufferManager::nextNPosForConnection(unsigned connectionNum, m_off_t maxRequestSize, unsigned connectionCount, bool& newInputBufferSupplied, bool& pauseConnectionForRaid, m_off_t uploadSpeed)
836 {
837 // returning a pair for clarity - specifying the beginning and end position of the next data block, as the 'current pos' may be updated during this function
838 newInputBufferSupplied = false;
839 pauseConnectionForRaid = false;
840
841 if (isRaid())
842 {
843 return RaidBufferManager::nextNPosForConnection(connectionNum, newInputBufferSupplied, pauseConnectionForRaid);
844 }
845 else
846 {
847 transfer->pos = transfer->chunkmacs.nextUnprocessedPosFrom(transfer->pos);
848 m_off_t npos = ChunkedHash::chunkceil(transfer->pos, transfer->size);
849 if (!transfer->size)
850 {
851 transfer->pos = 0;
852 }
853
854 if (transfer->type == PUT)
855 {
856 if (transfer->pos < 1024 * 1024)
857 {
858 npos = ChunkedHash::chunkceil(npos, transfer->size);
859 }
860
861 // choose upload chunks that are big enough to saturate the connection, so we don't start HTTP PUT request too frequently
862 // make them smaller at the end of the file so we still have the last parts delivered in parallel
863 m_off_t maxsize = 32 * 1024 * 1024;
864 if (npos + 2 * maxsize > transfer->size) maxsize /= 2;
865 if (npos + maxsize > transfer->size) maxsize /= 2;
866 if (npos + maxsize > transfer->size) maxsize /= 2;
867 m_off_t speedsize = std::min<m_off_t>(maxsize, uploadSpeed * 2 / 3); // two seconds of data over 3 connections
868 m_off_t sizesize = transfer->size > 32 * 1024 * 1024 ? 8 * 1024 * 1024 : 0; // start with large-ish portions for large files.
869 m_off_t targetsize = std::max<m_off_t>(sizesize, speedsize);
870
871 while (npos < transfer->pos + targetsize && npos < transfer->size)
872 {
873 npos = ChunkedHash::chunkceil(npos, transfer->size);
874 }
875 }
876
877 if (transfer->type == GET && transfer->size && npos > transfer->pos)
878 {
879 m_off_t maxReqSize = (transfer->size - transfer->progresscompleted) / connectionCount / 2;
880 if (maxReqSize > maxRequestSize)
881 {
882 maxReqSize = maxRequestSize;
883 }
884
885 if (maxReqSize > 0x100000)
886 {
887 m_off_t val = 0x100000;
888 while (val <= maxReqSize)
889 {
890 val <<= 1;
891 }
892 maxReqSize = val >> 1;
893 maxReqSize -= 0x100000;
894 }
895 else
896 {
897 maxReqSize = 0;
898 }
899
900 npos = transfer->chunkmacs.expandUnprocessedPiece(transfer->pos, npos, transfer->size, maxReqSize);
901 LOG_debug << "Downloading chunk of size " << npos - transfer->pos;
902 assert(npos > transfer->pos);
903 }
904 return std::make_pair(transfer->pos, npos);
905 }
906 }
907
bufferWriteCompletedAction(FilePiece & r)908 void TransferBufferManager::bufferWriteCompletedAction(FilePiece& r)
909 {
910 for (chunkmac_map::iterator it = r.chunkmacs.begin(); it != r.chunkmacs.end(); it++)
911 {
912 transfer->chunkmacs[it->first] = it->second;
913 }
914
915 r.chunkmacs.clear();
916 transfer->progresscompleted += r.buf.datalen();
917 LOG_debug << "Cached data at: " << r.pos << " Size: " << r.buf.datalen();
918 }
919
920
DirectReadBufferManager(DirectRead * dr)921 DirectReadBufferManager::DirectReadBufferManager(DirectRead* dr)
922 {
923 directRead = dr;
924 }
925
transferPos(unsigned connectionNum)926 m_off_t& DirectReadBufferManager::transferPos(unsigned connectionNum)
927 {
928 return isRaid() ? RaidBufferManager::transferPos(connectionNum) : directRead->nextrequestpos;
929 }
930
calcOutputChunkPos(m_off_t acquiredpos)931 m_off_t DirectReadBufferManager::calcOutputChunkPos(m_off_t acquiredpos)
932 {
933 return acquiredpos; // give all the data straight away for streaming, no need to hold any over for mac boundaries
934 }
935
finalize(FilePiece & fp)936 void DirectReadBufferManager::finalize(FilePiece& fp)
937 {
938 int r, l, t;
939
940 // decrypt, pass to app and erase
941 r = fp.pos & (SymmCipher::BLOCKSIZE - 1);
942 t = int(fp.buf.datalen());
943
944 if (r)
945 {
946 byte buf[SymmCipher::BLOCKSIZE];
947 l = sizeof buf - r;
948
949 if (l > t)
950 {
951 l = t;
952 }
953
954 memcpy(buf + r, fp.buf.datastart(), l);
955 directRead->drn->symmcipher.ctr_crypt(buf, sizeof buf, fp.pos - r, directRead->drn->ctriv, NULL, false);
956 memcpy(fp.buf.datastart(), buf + r, l);
957 }
958 else
959 {
960 l = 0;
961 }
962
963 if (t > l)
964 {
965 // the buffer has some extra at the end to allow full blocksize decrypt at the end
966 directRead->drn->symmcipher.ctr_crypt(fp.buf.datastart() + l, t - l, fp.pos + l, directRead->drn->ctriv, NULL, false);
967 }
968 }
969
970 }; // namespace
971