1 /**
2  * @file mega/raid.cpp
3  * @brief helper classes for managing cloudraid downloads
4  *
5  * (c) 2013-2019 by Mega Limited, Auckland, New Zealand
6  *
7  * This file is part of the MEGA SDK - Client Access Engine.
8  *
9  * Applications using the MEGA API must present a valid application key
10  * and comply with the the rules set forth in the Terms of Service.
11  *
12  * The MEGA SDK is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15  *
16  * @copyright Simplified (2-clause) BSD License.
17  *
18  * You should have received a copy of the license along with this
19  * program.
20  */
21 
22 #include "mega/raid.h"
23 
24 #include "mega/transfer.h"
25 #include "mega/testhooks.h"
26 #include "mega.h" // for thread definitions
27 
28 #undef min //avoids issues with std::min
29 
30 namespace mega
31 {
32 
33 const unsigned RAID_ACTIVE_CHANNEL_FAIL_THRESHOLD = 5;
34 
35 struct FaultyServers
36 {
37     // Records URLs that had recent problems, so we can start the next raid download with URLs that can work first try.
38     // In particular this is useful when one server in a raid set is unavailable for an extended period
39     // This class may be shared amongst many megaclients, so thread safety is needed
40     typedef map<string, m_time_t> Map;
41     Map recentFails;
42     std::mutex m_mutex;
43 
servermega::FaultyServers44     string server(const string& url)
45     {
46         size_t n = url.find("://");
47         if (n != string::npos)
48         {
49             n += 3;
50             size_t m = url.find("/", n);
51             if (m != string::npos)
52             {
53                 return url.substr(n, m - n);
54             }
55         }
56         return "";
57     }
58 
addmega::FaultyServers59     void add(const string& url)
60     {
61         std::lock_guard<std::mutex> g(m_mutex);
62         recentFails[server(url)] = m_time();
63     }
64 
65     /**
66      * @brief Select the worst server based on records of recent failures
67      * @param urls The set of URLs to check against previosly failing servers
68      * @return The index from 0 to 5, or 6 (RAIDPARTS) if none of the URLs have failed recently.
69      */
selectWorstServermega::FaultyServers70     unsigned selectWorstServer(vector<string> urls)
71     {
72         // start with 6 connections and drop the slowest to respond, build the file from the other 5.
73         // (unless we recently had problems with the server of one of the 6 URLs, in which case start with the other 5 right away)
74         unsigned worstindex = RAIDPARTS;
75 
76         std::lock_guard<std::mutex> g(m_mutex);
77         if (!recentFails.empty())
78         {
79             m_time_t now = m_time();
80             m_time_t worsttime = now - 10 * 3600;   // 10 hours
81             for (auto i = unsigned(urls.size()); i--; )
82             {
83                 Map::iterator j = recentFails.find(server(urls[i]));
84                 if (j != recentFails.end() && j->second > worsttime)
85                 {
86                     // select URL that failed less than 10 hours ago
87                     worstindex = i;
88                     worsttime = j->second;
89                 }
90             }
91 
92             // cleanup recentFails from URLs older than 10 hours
93             bool cleanup = false;
94             Map::iterator jj;
95             for (Map::iterator j = recentFails.begin(); j != recentFails.end(); cleanup ? (jj = j, ++j, (void)recentFails.erase(jj)) : (void)++j)
96             {
97                 cleanup = j->second < (now - 10 * 3600);
98             }
99         }
100 
101         return worstindex;
102     }
103 
104 };
105 
106 FaultyServers g_faultyServers;
107 
108 
FilePiece()109 RaidBufferManager::FilePiece::FilePiece()
110     : pos(0)
111     , buf(NULL, 0, 0)
112 {
113 }
114 
FilePiece(m_off_t p,size_t len)115 RaidBufferManager::FilePiece::FilePiece(m_off_t p, size_t len)
116     : pos(p)
117     , buf(new byte[len + std::min<size_t>(SymmCipher::BLOCKSIZE, RAIDSECTOR)], 0, len)   // SymmCipher::ctr_crypt requirement: decryption: data must be padded to BLOCKSIZE.  Also make sure we can xor up to RAIDSECTOR more for convenience
118 {
119 }
120 
121 
FilePiece(m_off_t p,HttpReq::http_buf_t * b)122 RaidBufferManager::FilePiece::FilePiece(m_off_t p, HttpReq::http_buf_t* b) // taking ownership
123     : pos(p)
124     , buf(NULL, 0, 0)
125 {
126     buf.swap(*b);  // take its buffer and copy other members
127     delete b;  // client no longer owns it so we must delete.  Similar to move semantics where we would just assign
128 }
129 
swap(FilePiece & other)130 void RaidBufferManager::FilePiece::swap(FilePiece& other)
131 {
132     m_off_t tp = pos; pos = other.pos; other.pos = tp;
133     chunkmacs.swap(other.chunkmacs);
134     buf.swap(other.buf);
135 }
136 
RaidBufferManager()137 RaidBufferManager::RaidBufferManager()
138     : is_raid(false)
139     , raidKnown(false)
140     , raidLinesPerChunk(16 * 1024)
141     , unusedRaidConnection(0)
142     , raidpartspos(0)
143     , outputfilepos(0)
144     , startfilepos(0)
145     , resumewastedbytes(0)
146 {
147     for (int i = RAIDPARTS; i--; )
148     {
149         raidrequestpartpos[i] = 0;
150         connectionPaused[i] = false;
151         raidHttpGetErrorCount[i] = 0;
152         connectionStarted[i] = false;
153     }
154 }
155 
clearOwningFilePieces(std::deque<RaidBufferManager::FilePiece * > & q)156 static void clearOwningFilePieces(std::deque<RaidBufferManager::FilePiece*>& q)
157 {
158     for (std::deque<RaidBufferManager::FilePiece*>::iterator i = q.begin(); i != q.end(); ++i)
159     {
160         delete *i;
161     }
162     q.clear();
163 }
164 
~RaidBufferManager()165 RaidBufferManager::~RaidBufferManager()
166 {
167     for (int i = RAIDPARTS; i--; )
168     {
169         clearOwningFilePieces(raidinputparts[i]);
170     }
171 }
172 
setIsRaid(const std::vector<std::string> & tempUrls,m_off_t resumepos,m_off_t readtopos,m_off_t filesize,m_off_t maxRequestSize)173 void RaidBufferManager::setIsRaid(const std::vector<std::string>& tempUrls, m_off_t resumepos, m_off_t readtopos, m_off_t filesize, m_off_t maxRequestSize)
174 {
175     assert(tempUrls.size() == RAIDPARTS || tempUrls.size() == 1);
176     assert(0 <= resumepos && resumepos <= readtopos && readtopos <= filesize);
177     assert(!raidKnown);
178 
179     tempurls = tempUrls;
180 
181     is_raid = tempurls.size() == RAIDPARTS;
182     raidKnown = true;
183     fullfilesize = filesize;
184     deliverlimitpos = readtopos;
185     acquirelimitpos = deliverlimitpos + RAIDLINE - 1;
186     acquirelimitpos -= acquirelimitpos % RAIDLINE;
187     acquirelimitpos = std::min<m_off_t>(acquirelimitpos, fullfilesize);
188     outputfilepos = resumepos;
189     startfilepos = resumepos;
190     if (is_raid)
191     {
192         raidpartspos = resumepos / (RAIDPARTS - 1);
193         raidpartspos -= raidpartspos % RAIDSECTOR;
194         resumewastedbytes = size_t(outputfilepos - raidpartspos * (RAIDPARTS - 1));
195         outputfilepos -= resumewastedbytes;  // we'll skip over these bytes on the first output
196         for (int i = RAIDPARTS; i--; )
197         {
198             raidrequestpartpos[i] = raidpartspos;
199         }
200 
201         // How much buffer space can we use.  Assuming two chunk sets incoming, one outgoing
202         raidLinesPerChunk = unsigned(maxRequestSize / (RAIDPARTS * 3 * RAIDSECTOR));
203         raidLinesPerChunk -= raidLinesPerChunk % 1024;
204         raidLinesPerChunk = std::min<unsigned>(raidLinesPerChunk, 64 * 1024);
205         raidLinesPerChunk = std::max<unsigned>(raidLinesPerChunk, 8 * 1024);
206 
207         unusedRaidConnection = g_faultyServers.selectWorstServer(tempurls);
208     }
209 
210     DEBUG_TEST_HOOK_RAIDBUFFERMANAGER_SETISRAID(this)
211 }
212 
updateUrlsAndResetPos(const std::vector<std::string> & tempUrls)213 void RaidBufferManager::updateUrlsAndResetPos(const std::vector<std::string>& tempUrls)
214 {
215     // A request to restart from whereever we got to, with new URLs.
216     // the old requested-to pos is not valid anymore, as one or more Http requests failed or were abandoned
217     assert(tempurls.size() == tempUrls.size());
218     if (tempurls.size() == tempUrls.size())
219     {
220         tempurls = tempUrls;
221         if (isRaid())
222         {
223             for (unsigned i = RAIDPARTS; i--; )
224             {
225                 std::deque<FilePiece*>& connectionpieces = raidinputparts[i];
226                 transferPos(i) = connectionpieces.empty() ? raidpartspos : connectionpieces.back()->pos + connectionpieces.back()->buf.datalen();
227             }
228         }
229         else
230         {
231             transferPos(0) = outputfilepos;  // if there is any data waiting in asyncoutputbuffers this value is alreday ahead of it
232         }
233     }
234 }
235 
isRaid() const236 bool RaidBufferManager::isRaid() const
237 {
238     assert(raidKnown);
239     return is_raid;
240 }
241 
tempURL(unsigned connectionNum)242 const std::string& RaidBufferManager::tempURL(unsigned connectionNum)
243 {
244     if (isRaid())
245     {
246         assert(connectionNum < tempurls.size());
247         return tempurls[connectionNum];
248     }
249     else if (!tempurls.empty())
250     {
251         return tempurls[0];
252     }
253     else
254     {
255         assert(false); // this class shouldn't be used until we have the URLs, but don't crash
256         return emptyReturnString;
257     }
258 }
259 
tempUrlVector() const260 const std::vector<std::string>& RaidBufferManager::tempUrlVector() const
261 {
262     return tempurls;
263 }
264 
265 // takes ownership of the buffer
submitBuffer(unsigned connectionNum,FilePiece * piece)266 void RaidBufferManager::submitBuffer(unsigned connectionNum, FilePiece* piece)
267 {
268     if (isRaid())
269     {
270         assert(connectionNum < RAIDPARTS);
271         assert(piece->buf.datalen() % RAIDSECTOR == 0 || piece->pos + m_off_t(piece->buf.datalen()) == raidPartSize(connectionNum, acquirelimitpos));
272         if (!piece->buf.isNull())
273         {
274             raidHttpGetErrorCount[connectionNum] = 0;
275         }
276 
277         std::deque<FilePiece*>& connectionpieces = raidinputparts[connectionNum];
278         m_off_t contiguouspos = connectionpieces.empty() ? raidpartspos : connectionpieces.back()->pos + connectionpieces.back()->buf.datalen();
279 
280         assert(piece->pos == contiguouspos);
281         if (piece->pos == contiguouspos)
282         {
283             transferPos(connectionNum) = piece->pos + piece->buf.datalen();  // in case of download piece arriving after connection failure recovery
284             raidinputparts[connectionNum].push_back(piece);
285         }
286     }
287     else
288     {
289         finalize(*piece);
290         assert(asyncoutputbuffers.find(connectionNum) == asyncoutputbuffers.end() || !asyncoutputbuffers[connectionNum]);
291         asyncoutputbuffers[connectionNum].reset(piece);
292     }
293 }
294 
getAsyncOutputBufferPointer(unsigned connectionNum)295 std::shared_ptr<RaidBufferManager::FilePiece> RaidBufferManager::getAsyncOutputBufferPointer(unsigned connectionNum)
296 {
297     auto i = asyncoutputbuffers.find(connectionNum);
298     if (isRaid() && (i == asyncoutputbuffers.end() || !i->second))
299     {
300         combineRaidParts(connectionNum);
301         i = asyncoutputbuffers.find(connectionNum);
302     }
303     return (i == asyncoutputbuffers.end()) ? NULL : i->second;
304 }
305 
306 
bufferWriteCompleted(unsigned connectionNum,bool success)307 void RaidBufferManager::bufferWriteCompleted(unsigned connectionNum, bool success)
308 {
309     auto aob = asyncoutputbuffers.find(connectionNum);
310     if (aob != asyncoutputbuffers.end())
311     {
312         assert(aob->second);
313         if (aob->second)
314         {
315             if (success)
316             {
317                 bufferWriteCompletedAction(*aob->second);
318             }
319 
320             aob->second.reset();
321         }
322     }
323 }
324 
bufferWriteCompletedAction(FilePiece &)325 void RaidBufferManager::bufferWriteCompletedAction(FilePiece&)
326 {
327     // overridden for Transfers
328 }
329 
transferPos(unsigned connectionNum)330 m_off_t& RaidBufferManager::transferPos(unsigned connectionNum)
331 {
332     assert(isRaid());
333     return raidrequestpartpos[connectionNum];
334 }
335 
336 
nextNPosForConnection(unsigned connectionNum,bool & newInputBufferSupplied,bool & pauseConnectionForRaid)337 std::pair<m_off_t, m_off_t> RaidBufferManager::nextNPosForConnection(unsigned connectionNum, bool& newInputBufferSupplied, bool& pauseConnectionForRaid)
338 {
339     // returning a pair for clarity - specifying the beginning and end position of the next data block, as the 'current pos' may be updated during this function
340     newInputBufferSupplied = false;
341     pauseConnectionForRaid = false;
342 
343     if (!isRaid())
344     {
345         return std::make_pair(transferPos(connectionNum), deliverlimitpos);  // simple case for non-raid direct streaming, get the entire portion of the file requested in one http get
346     }
347     else  // raid
348     {
349         m_off_t curpos = transferPos(connectionNum);  // if we use submitBuffer, transferPos() may be updated to protect against single connection failure recovery
350         m_off_t maxpos = transferSize(connectionNum);
351 
352         // if this connection gets too far ahead of the others, pause it until the others catch up a bit
353         if ((curpos >= raidpartspos + RaidReadAheadChunksPausePoint * raidLinesPerChunk * RAIDSECTOR) ||
354             (curpos > raidpartspos + RaidReadAheadChunksUnpausePoint * raidLinesPerChunk * RAIDSECTOR && connectionPaused[connectionNum]))
355         {
356             connectionPaused[connectionNum] = true;
357             pauseConnectionForRaid = true;
358             return std::make_pair(curpos, curpos);
359         }
360         else
361         {
362             connectionPaused[connectionNum] = false;
363         }
364 
365         m_off_t npos = std::min<m_off_t>(curpos + raidLinesPerChunk * RAIDSECTOR * RaidMaxChunksPerRead, maxpos);
366         if (unusedRaidConnection == connectionNum && npos > curpos)
367         {
368             submitBuffer(connectionNum, new RaidBufferManager::FilePiece(curpos, new HttpReq::http_buf_t(NULL, 0, size_t(npos - curpos))));
369             transferPos(connectionNum) = npos;
370             newInputBufferSupplied = true;
371         }
372         return std::make_pair(curpos, std::min<m_off_t>(npos, maxpos));
373     }
374 }
375 
resetPart(unsigned connectionNum)376 void RaidBufferManager::resetPart(unsigned connectionNum)
377 {
378     assert(isRaid());
379     transferPos(connectionNum) = raidpartspos;
380 
381     // if we are downloading many files at once, eg. initial sync, or large manual folder, it's better to just use 5 connections immediately after the first
382     g_faultyServers.add(tempurls[connectionNum]);
383 }
384 
385 
transferSize(unsigned connectionNum)386 m_off_t RaidBufferManager::transferSize(unsigned connectionNum)
387 {
388     if (isRaid())
389     {
390         return raidPartSize(connectionNum, acquirelimitpos);
391     }
392     else
393     {
394         return fullfilesize;
395     }
396 }
397 
raidPartSize(unsigned part,m_off_t filesize)398 m_off_t RaidBufferManager::raidPartSize(unsigned part, m_off_t filesize)
399 {
400     // compute the size of this raid part based on the original file size len
401     m_off_t r = filesize % RAIDLINE;   // residual part
402 
403     m_off_t t = r - (part - !!part)*RAIDSECTOR; // parts 0 (parity) & 1 (largest data) are the same size
404 
405     // (excess length will be found in the following sectors,
406     // negative length in the preceding sectors)
407     if (t < 0)
408     {
409         t = 0;
410     }
411     else if (t > RAIDSECTOR)
412     {
413         t = RAIDSECTOR;
414     }
415 
416     return (filesize - r) / (RAIDPARTS - 1) + t;
417 }
418 
419 
combineRaidParts(unsigned connectionNum)420 void RaidBufferManager::combineRaidParts(unsigned connectionNum)
421 {
422     assert(asyncoutputbuffers.find(connectionNum) == asyncoutputbuffers.end() || !asyncoutputbuffers[connectionNum]);
423     assert(raidpartspos * (RAIDPARTS - 1) == outputfilepos + m_off_t(leftoverchunk.buf.datalen()));
424 
425     size_t partslen = 0x10000000, sumdatalen = 0, xorlen = 0;
426     for (unsigned i = RAIDPARTS; i--; )
427     {
428         if (raidinputparts[i].empty())
429         {
430             partslen = 0;
431         }
432         else
433         {
434             FilePiece& r = *raidinputparts[i].front();
435             assert(r.pos == raidpartspos);  // check all are in sync at the front
436             partslen = std::min<size_t>(partslen, r.buf.datalen());
437             (i > 0 ? sumdatalen : xorlen) += r.buf.datalen();
438         }
439     }
440     partslen -= partslen % RAIDSECTOR; // restrict to raidline boundary
441 
442     // for correct mac processing, we need to process the output file in pieces delimited by the chunkfloor / chunkceil algorithm
443     m_off_t newdatafilepos = outputfilepos + leftoverchunk.buf.datalen();
444     assert(newdatafilepos + m_off_t(sumdatalen) <= acquirelimitpos);
445     bool processToEnd =  (newdatafilepos + m_off_t(sumdatalen) == acquirelimitpos)   // data to the end
446               &&  (newdatafilepos / (RAIDPARTS - 1) + m_off_t(xorlen) == raidPartSize(0, acquirelimitpos));  // parity to the end
447 
448     assert(!partslen || !processToEnd || sumdatalen - partslen * (RAIDPARTS - 1) <= RAIDLINE);
449 
450     if (partslen > 0 || processToEnd)
451     {
452         m_off_t macchunkpos = calcOutputChunkPos(newdatafilepos + partslen * (RAIDPARTS - 1));
453 
454         size_t buflen = static_cast<size_t>(processToEnd ? sumdatalen : partslen * (RAIDPARTS - 1));
455         FilePiece* outputrec = combineRaidParts(partslen, buflen, outputfilepos, leftoverchunk);  // includes a bit of extra space for non-full sectors if we are at the end of the file
456         rollInputBuffers(partslen);
457         raidpartspos += partslen;
458         sumdatalen -= partslen * (RAIDPARTS - 1);
459         outputfilepos += partslen * (RAIDPARTS - 1) + leftoverchunk.buf.datalen();
460         byte* dest = outputrec->buf.datastart() + partslen * (RAIDPARTS - 1) + leftoverchunk.buf.datalen();
461         FilePiece emptyFilePiece;
462         leftoverchunk.swap(emptyFilePiece);  // this data is entirely included in the outputrec now, so discard and reset
463 
464         if (processToEnd && sumdatalen > 0)
465         {
466             // fill in the last of the buffer with non-full sectors from the end of the file
467             assert(outputfilepos + m_off_t(sumdatalen) == acquirelimitpos);
468             combineLastRaidLine(dest, sumdatalen);
469             rollInputBuffers(RAIDSECTOR);
470         }
471         else if (!processToEnd && outputfilepos > macchunkpos)
472         {
473             // for transfers we do mac processing which must be done in chunks, delimited by chunkfloor and chunkceil.  If we don't have the right amount then hold the remainder over for next time.
474             size_t excessdata = static_cast<size_t>(outputfilepos - macchunkpos);
475             FilePiece newleftover(outputfilepos - excessdata, excessdata);
476             leftoverchunk.swap(newleftover);
477             memcpy(leftoverchunk.buf.datastart(), outputrec->buf.datastart() + outputrec->buf.datalen() - excessdata, excessdata);
478             outputrec->buf.end -= excessdata;
479             outputfilepos -= excessdata;
480             assert(raidpartspos * (RAIDPARTS - 1) == outputfilepos + m_off_t(leftoverchunk.buf.datalen()));
481         }
482 
483         // discard any excess data that we had to fetch when resuming a file (to align the parts appropriately)
484         size_t n = std::min<size_t>(outputrec->buf.datalen(), resumewastedbytes);
485         if (n > 0)
486         {
487             outputrec->pos += n;
488             outputrec->buf.start += n;
489             resumewastedbytes -= n;
490         }
491 
492         // don't deliver any excess data that we needed for parity calculations in the last raid line
493         if (outputrec->pos + m_off_t(outputrec->buf.datalen()) > deliverlimitpos)
494         {
495             size_t excess = size_t(outputrec->pos + outputrec->buf.datalen() - deliverlimitpos);
496             excess = std::min<size_t>(excess, outputrec->buf.datalen());
497             outputrec->buf.end -= excess;
498         }
499 
500         // store the result in a place that can be read out async
501         if (outputrec->buf.datalen() > 0)
502         {
503             finalize(*outputrec);
504             asyncoutputbuffers[connectionNum].reset(outputrec);
505         }
506         else
507         {
508             delete outputrec;  // this would happen if we got some data to process on all connections, but not enough to reach the next chunk boundary yet (and combined data is in leftoverchunk)
509         }
510     }
511 }
512 
combineRaidParts(size_t partslen,size_t bufflen,m_off_t filepos,FilePiece & prevleftoverchunk)513 RaidBufferManager::FilePiece* RaidBufferManager::combineRaidParts(size_t partslen, size_t bufflen, m_off_t filepos, FilePiece& prevleftoverchunk)
514 {
515     assert(prevleftoverchunk.buf.datalen() == 0 || prevleftoverchunk.pos == filepos);
516 
517     // add a bit of extra space and copy prev chunk to the front
518     FilePiece* result = new FilePiece(filepos, bufflen + prevleftoverchunk.buf.datalen());
519     if (prevleftoverchunk.buf.datalen() > 0)
520     {
521         memcpy(result->buf.datastart(), prevleftoverchunk.buf.datastart(), prevleftoverchunk.buf.datalen());
522     }
523 
524     // usual case, for simple and fast processing: all input buffers are the same size, and aligned, and a multiple of raidsector
525     if (partslen > 0)
526     {
527         byte* inputbufs[RAIDPARTS];
528         for (unsigned i = RAIDPARTS; i--; )
529         {
530             FilePiece* inputPiece = raidinputparts[i].front();
531             inputbufs[i] = inputPiece->buf.isNull() ? NULL : inputPiece->buf.datastart();
532         }
533 
534         byte* b = result->buf.datastart() + prevleftoverchunk.buf.datalen();
535         byte* endpos = b + partslen * (RAIDPARTS-1);
536 
537         for (unsigned i = 0; b < endpos; i += RAIDSECTOR)
538         {
539             for (unsigned j = 1; j < RAIDPARTS; ++j)
540             {
541                 assert(b + RAIDSECTOR <= result->buf.datastart() + result->buf.datalen());
542                 if (inputbufs[j])
543                 {
544                     memcpy(b, inputbufs[j] + i, RAIDSECTOR);
545                 }
546                 else
547                 {
548                     recoverSectorFromParity(b, inputbufs, i);
549                 }
550                 b += RAIDSECTOR;
551             }
552         }
553         assert(b == endpos);
554     }
555     return result;
556 }
557 
recoverSectorFromParity(byte * dest,byte * inputbufs[],unsigned offset)558 void RaidBufferManager::recoverSectorFromParity(byte* dest, byte* inputbufs[], unsigned offset)
559 {
560     assert(sizeof(m_off_t)*2 == RAIDSECTOR);
561     bool set = false;
562     for (unsigned i = RAIDPARTS; i--; )
563     {
564         if (inputbufs[i])
565         {
566             if (!set)
567             {
568                 memcpy(dest, inputbufs[i] + offset, RAIDSECTOR);
569                 set = true;
570             }
571             else
572             {
573                 *(m_off_t*)dest ^= *(m_off_t*)(inputbufs[i] + offset);
574                 *(m_off_t*)(dest + sizeof(m_off_t)) ^= *(m_off_t*)(inputbufs[i] + offset + sizeof(m_off_t));
575             }
576         }
577     }
578 }
579 
combineLastRaidLine(byte * dest,size_t remainingbytes)580 void RaidBufferManager::combineLastRaidLine(byte* dest, size_t remainingbytes)
581 {
582     // we have to be careful to use the right number of bytes from each sector
583     for (unsigned i = 1; i < RAIDPARTS && remainingbytes > 0; ++i)
584     {
585         if (!raidinputparts[i].empty())
586         {
587             FilePiece* sector = raidinputparts[i].front();
588             size_t n = std::min(remainingbytes, sector->buf.datalen());
589             if (!sector->buf.isNull())
590             {
591                 memcpy(dest, sector->buf.datastart(), n);
592             }
593             else
594             {
595                 memset(dest, 0, n);
596                 for (unsigned j = RAIDPARTS; j--; )
597                 {
598                     if (!raidinputparts[j].empty() && !raidinputparts[j].front()->buf.isNull())
599                     {
600                         FilePiece* xs = raidinputparts[j].front();
601                         for (size_t x = std::min(n, xs->buf.datalen()); x--; )
602                         {
603                             dest[x] ^= xs->buf.datastart()[x];
604                         }
605                     }
606                 }
607             }
608             dest += n;
609             remainingbytes -= n;
610         }
611     }
612 }
613 
rollInputBuffers(size_t dataToDiscard)614 void RaidBufferManager::rollInputBuffers(size_t dataToDiscard)
615 {
616     // remove finished input buffers
617     for (unsigned i = RAIDPARTS; i--; )
618     {
619         if (!raidinputparts[i].empty())
620         {
621             FilePiece& ip = *raidinputparts[i].front();
622             ip.buf.start += dataToDiscard;
623             ip.pos += dataToDiscard;
624             if (ip.buf.start >= ip.buf.end)
625             {
626                 delete raidinputparts[i].front();
627                 raidinputparts[i].pop_front();
628             }
629         }
630     }
631 }
632 
calcOutputChunkPos(m_off_t acquiredpos)633 m_off_t TransferBufferManager::calcOutputChunkPos(m_off_t acquiredpos)
634 {
635     return ChunkedHash::chunkfloor(acquiredpos);  // we can only mac to the chunk boundary, hold the rest over
636 }
637 
638 // decrypt, mac downloaded chunk
finalize(bool parallel,m_off_t filesize,int64_t ctriv,SymmCipher * cipher,chunkmac_map * source_chunkmacs)639 bool RaidBufferManager::FilePiece::finalize(bool parallel, m_off_t filesize, int64_t ctriv, SymmCipher *cipher, chunkmac_map* source_chunkmacs)
640 {
641     assert(!finalized);
642     bool queueParallel = false;
643 
644     byte *chunkstart = buf.datastart();
645     m_off_t startpos = pos;
646     m_off_t finalpos = startpos + buf.datalen();
647     assert(finalpos <= filesize);
648     if (finalpos != filesize)
649     {
650         finalpos &= -SymmCipher::BLOCKSIZE;
651     }
652 
653     m_off_t endpos = ChunkedHash::chunkceil(startpos, finalpos);
654     unsigned chunksize = static_cast<unsigned>(endpos - startpos);
655 
656     while (chunksize)
657     {
658         m_off_t chunkid = ChunkedHash::chunkfloor(startpos);
659         ChunkMAC &chunkmac = chunkmacs[chunkid];
660         if (!chunkmac.finished)
661         {
662             if (source_chunkmacs)
663             {
664                 chunkmac = (*source_chunkmacs)[chunkid];
665             }
666             if (endpos == ChunkedHash::chunkceil(chunkid, filesize))
667             {
668                 if (parallel)
669                 {
670                     // these parts can be done on a thread - they are independent chunks, or the earlier part of the chunk is already done.
671                     cipher->ctr_crypt(chunkstart, chunksize, startpos, ctriv, chunkmac.mac, false, !chunkmac.finished && !chunkmac.offset);
672                     LOG_debug << "Finished chunk: " << startpos << " - " << endpos << "   Size: " << chunksize;
673                     chunkmac.finished = true;
674                     chunkmac.offset = 0;
675                 }
676                 else
677                 {
678                     queueParallel = true;
679                 }
680             }
681             else if (!parallel)
682             {
683                 // these part chunks must be done serially (and first), since later parts of a chunk need the mac of earlier parts as input.
684                 cipher->ctr_crypt(chunkstart, chunksize, startpos, ctriv, chunkmac.mac, false, !chunkmac.finished && !chunkmac.offset);
685                 LOG_debug << "Decrypted partial chunk: " << startpos << " - " << endpos << "   Size: " << chunksize;
686                 chunkmac.finished = false;
687                 chunkmac.offset += chunksize;
688             }
689         }
690         chunkstart += chunksize;
691         startpos = endpos;
692         endpos = ChunkedHash::chunkceil(startpos, finalpos);
693         chunksize = static_cast<unsigned>(endpos - startpos);
694     }
695 
696     finalized = !queueParallel;
697     if (finalized)
698         finalizedCV.notify_one();
699 
700     return queueParallel;
701 }
702 
finalize(FilePiece & r)703 void TransferBufferManager::finalize(FilePiece& r)
704 {
705     // for transfers (as opposed to DirectRead), decrypt/mac is now done on threads
706 }
707 
708 
tryRaidHttpGetErrorRecovery(unsigned errorConnectionNum)709 bool RaidBufferManager::tryRaidHttpGetErrorRecovery(unsigned errorConnectionNum)
710 {
711     assert(isRaid());
712 
713     raidHttpGetErrorCount[errorConnectionNum] += 1;
714 
715     g_faultyServers.add(tempurls[errorConnectionNum]);
716 
717     unsigned errorSum = 0;
718     unsigned highestErrors = 0;
719     for (unsigned i = RAIDPARTS; i--; )
720     {
721         errorSum += raidHttpGetErrorCount[i];
722         highestErrors = std::max<unsigned>(highestErrors, raidHttpGetErrorCount[i]);
723     }
724 
725     // Allow for one nonfunctional channel and one glitchy channel.  We can still make progress swapping back and forth
726     if ((errorSum - highestErrors) < RAID_ACTIVE_CHANNEL_FAIL_THRESHOLD)
727     {
728         if (unusedRaidConnection < RAIDPARTS)
729         {
730             LOG_warn << "5 connection cloudraid shutting down connection " << errorConnectionNum << " due to error, and starting " << unusedRaidConnection << " instead";
731 
732             // start up the old unused connection, and cancel this one.  Other connections all have real data since we were already in 5 connection mode
733             clearOwningFilePieces(raidinputparts[unusedRaidConnection]);
734             clearOwningFilePieces(raidinputparts[errorConnectionNum]);
735             raidrequestpartpos[unusedRaidConnection] = raidpartspos;
736             raidrequestpartpos[errorConnectionNum] = raidpartspos;
737         }
738         else
739         {
740             LOG_warn << "6 connection cloudraid shutting down connection " << errorConnectionNum << " due to error";
741             clearOwningFilePieces(raidinputparts[errorConnectionNum]);
742             raidrequestpartpos[errorConnectionNum] = raidpartspos;
743         }
744 
745         unusedRaidConnection = errorConnectionNum;
746         return true;
747     }
748     else
749     {
750         return false;
751     }
752 }
753 
connectionRaidPeersAreAllPaused(unsigned slowConnection)754 bool RaidBufferManager::connectionRaidPeersAreAllPaused(unsigned slowConnection)
755 {
756     if (!isRaid())
757     {
758         return false;
759     }
760 
761     // see if one connection is stalled or running much slower than the others, in which case try the other 5 instead
762     // (if already using 5 connections and all of them are paused, except slowConnection, the unusedRaidConnection will
763     // be started again and the slowConnection will become the new unusedRaidConnection)
764     for (unsigned j = RAIDPARTS; j--; )
765     {
766         if (j != slowConnection && j != unusedRaidConnection && !connectionPaused[j])
767         {
768             return false;
769         }
770     }
771     return true;
772 }
773 
detectSlowestRaidConnection(unsigned thisConnection,unsigned & slowestConnection)774 bool RaidBufferManager::detectSlowestRaidConnection(unsigned thisConnection, unsigned& slowestConnection)
775 {
776     if (isRaid() && unusedRaidConnection == RAIDPARTS)
777     {
778         connectionStarted[thisConnection] = true;
779         int count = 0;
780         for (unsigned j = RAIDPARTS; j--; )
781         {
782             if (!connectionStarted[j])
783             {
784                 slowestConnection = j;
785                 ++count;
786             }
787         }
788         if (count == 1)
789         {
790             unusedRaidConnection = slowestConnection;
791             raidrequestpartpos[unusedRaidConnection] = raidpartspos;
792             return true;
793         }
794     }
795     return false;
796 }
797 
798 
progress() const799 m_off_t RaidBufferManager::progress() const
800 {
801     assert(isRaid());
802     m_off_t reportPos = 0;
803 
804     for (unsigned j = RAIDPARTS; j--; )
805     {
806         for (FilePiece* p : raidinputparts[j])
807         {
808             if (!p->buf.isNull())
809             {
810                 reportPos += p->buf.datalen();
811             }
812         }
813     }
814 
815     return reportPos;
816 }
817 
818 
TransferBufferManager()819 TransferBufferManager::TransferBufferManager()
820     : transfer(NULL)
821 {
822 }
823 
setIsRaid(Transfer * t,std::vector<std::string> & tempUrls,m_off_t resumepos,m_off_t maxRequestSize)824 void TransferBufferManager::setIsRaid(Transfer* t, std::vector<std::string>& tempUrls, m_off_t resumepos, m_off_t maxRequestSize)
825 {
826     RaidBufferManager::setIsRaid(tempUrls, resumepos, t->size, t->size, maxRequestSize);
827 
828     transfer = t;
829 }
830 
transferPos(unsigned connectionNum)831 m_off_t& TransferBufferManager::transferPos(unsigned connectionNum)
832 {
833     return isRaid() ? RaidBufferManager::transferPos(connectionNum) : transfer->pos;
834 }
nextNPosForConnection(unsigned connectionNum,m_off_t maxRequestSize,unsigned connectionCount,bool & newInputBufferSupplied,bool & pauseConnectionForRaid,m_off_t uploadSpeed)835 std::pair<m_off_t, m_off_t> TransferBufferManager::nextNPosForConnection(unsigned connectionNum, m_off_t maxRequestSize, unsigned connectionCount, bool& newInputBufferSupplied, bool& pauseConnectionForRaid, m_off_t uploadSpeed)
836 {
837     // returning a pair for clarity - specifying the beginning and end position of the next data block, as the 'current pos' may be updated during this function
838     newInputBufferSupplied = false;
839     pauseConnectionForRaid = false;
840 
841     if (isRaid())
842     {
843         return RaidBufferManager::nextNPosForConnection(connectionNum, newInputBufferSupplied, pauseConnectionForRaid);
844     }
845     else
846     {
847         transfer->pos = transfer->chunkmacs.nextUnprocessedPosFrom(transfer->pos);
848         m_off_t npos = ChunkedHash::chunkceil(transfer->pos, transfer->size);
849         if (!transfer->size)
850         {
851             transfer->pos = 0;
852         }
853 
854         if (transfer->type == PUT)
855         {
856             if (transfer->pos < 1024 * 1024)
857             {
858                 npos = ChunkedHash::chunkceil(npos, transfer->size);
859             }
860 
861             // choose upload chunks that are big enough to saturate the connection, so we don't start HTTP PUT request too frequently
862             // make them smaller at the end of the file so we still have the last parts delivered in parallel
863             m_off_t maxsize = 32 * 1024 * 1024;
864             if (npos + 2 * maxsize > transfer->size) maxsize /= 2;
865             if (npos + maxsize > transfer->size) maxsize /= 2;
866             if (npos + maxsize > transfer->size) maxsize /= 2;
867             m_off_t speedsize = std::min<m_off_t>(maxsize, uploadSpeed * 2 / 3);    // two seconds of data over 3 connections
868             m_off_t sizesize = transfer->size > 32 * 1024 * 1024 ? 8 * 1024 * 1024 : 0;  // start with large-ish portions for large files.
869             m_off_t targetsize = std::max<m_off_t>(sizesize, speedsize);
870 
871             while (npos < transfer->pos + targetsize && npos < transfer->size)
872             {
873                 npos = ChunkedHash::chunkceil(npos, transfer->size);
874             }
875         }
876 
877         if (transfer->type == GET && transfer->size && npos > transfer->pos)
878         {
879             m_off_t maxReqSize = (transfer->size - transfer->progresscompleted) / connectionCount / 2;
880             if (maxReqSize > maxRequestSize)
881             {
882                 maxReqSize = maxRequestSize;
883             }
884 
885             if (maxReqSize > 0x100000)
886             {
887                 m_off_t val = 0x100000;
888                 while (val <= maxReqSize)
889                 {
890                     val <<= 1;
891                 }
892                 maxReqSize = val >> 1;
893                 maxReqSize -= 0x100000;
894             }
895             else
896             {
897                 maxReqSize = 0;
898             }
899 
900             npos = transfer->chunkmacs.expandUnprocessedPiece(transfer->pos, npos, transfer->size, maxReqSize);
901             LOG_debug << "Downloading chunk of size " << npos - transfer->pos;
902             assert(npos > transfer->pos);
903         }
904         return std::make_pair(transfer->pos, npos);
905     }
906 }
907 
bufferWriteCompletedAction(FilePiece & r)908 void TransferBufferManager::bufferWriteCompletedAction(FilePiece& r)
909 {
910     for (chunkmac_map::iterator it = r.chunkmacs.begin(); it != r.chunkmacs.end(); it++)
911     {
912         transfer->chunkmacs[it->first] = it->second;
913     }
914 
915     r.chunkmacs.clear();
916     transfer->progresscompleted += r.buf.datalen();
917     LOG_debug << "Cached data at: " << r.pos << "   Size: " << r.buf.datalen();
918 }
919 
920 
DirectReadBufferManager(DirectRead * dr)921 DirectReadBufferManager::DirectReadBufferManager(DirectRead* dr)
922 {
923     directRead = dr;
924 }
925 
transferPos(unsigned connectionNum)926 m_off_t& DirectReadBufferManager::transferPos(unsigned connectionNum)
927 {
928     return isRaid() ? RaidBufferManager::transferPos(connectionNum) : directRead->nextrequestpos;
929 }
930 
calcOutputChunkPos(m_off_t acquiredpos)931 m_off_t DirectReadBufferManager::calcOutputChunkPos(m_off_t acquiredpos)
932 {
933     return acquiredpos;  // give all the data straight away for streaming, no need to hold any over for mac boundaries
934 }
935 
finalize(FilePiece & fp)936 void DirectReadBufferManager::finalize(FilePiece& fp)
937 {
938     int r, l, t;
939 
940     // decrypt, pass to app and erase
941     r = fp.pos & (SymmCipher::BLOCKSIZE - 1);
942     t = int(fp.buf.datalen());
943 
944     if (r)
945     {
946         byte buf[SymmCipher::BLOCKSIZE];
947         l = sizeof buf - r;
948 
949         if (l > t)
950         {
951             l = t;
952         }
953 
954         memcpy(buf + r, fp.buf.datastart(), l);
955         directRead->drn->symmcipher.ctr_crypt(buf, sizeof buf, fp.pos - r, directRead->drn->ctriv, NULL, false);
956         memcpy(fp.buf.datastart(), buf + r, l);
957     }
958     else
959     {
960         l = 0;
961     }
962 
963     if (t > l)
964     {
965         // the buffer has some extra at the end to allow full blocksize decrypt at the end
966         directRead->drn->symmcipher.ctr_crypt(fp.buf.datastart() + l, t - l, fp.pos + l, directRead->drn->ctriv, NULL, false);
967     }
968 }
969 
970 }; // namespace
971