1 /* <!-- copyright */
2 /*
3  * aria2 - The high speed download utility
4  *
5  * Copyright (C) 2006 Tatsuhiro Tsujikawa
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * In addition, as a special exception, the copyright holders give
22  * permission to link the code of portions of this program with the
23  * OpenSSL library under certain conditions as described in each
24  * individual source file, and distribute linked combinations
25  * including the two.
26  * You must obey the GNU General Public License in all respects
27  * for all of the code used other than OpenSSL.  If you modify
28  * file(s) with this exception, you may extend this exception to your
29  * version of the file(s), but you are not obligated to do so.  If you
30  * do not wish to do so, delete this exception statement from your
31  * version.  If you delete this exception statement from all source
32  * files in the program, then also delete it here.
33  */
34 /* copyright --> */
35 #include "RequestGroupMan.h"
36 
37 #include <unistd.h>
38 #include <cstring>
39 #include <iomanip>
40 #include <sstream>
41 #include <numeric>
42 #include <algorithm>
43 #include <utility>
44 
45 #include "BtProgressInfoFile.h"
46 #include "RecoverableException.h"
47 #include "RequestGroup.h"
48 #include "LogFactory.h"
49 #include "Logger.h"
50 #include "DownloadEngine.h"
51 #include "message.h"
52 #include "a2functional.h"
53 #include "DownloadResult.h"
54 #include "DownloadContext.h"
55 #include "ServerStatMan.h"
56 #include "ServerStat.h"
57 #include "SegmentMan.h"
58 #include "FeedbackURISelector.h"
59 #include "InorderURISelector.h"
60 #include "AdaptiveURISelector.h"
61 #include "Option.h"
62 #include "prefs.h"
63 #include "File.h"
64 #include "util.h"
65 #include "Command.h"
66 #include "FileEntry.h"
67 #include "fmt.h"
68 #include "FileAllocationEntry.h"
69 #include "CheckIntegrityEntry.h"
70 #include "Segment.h"
71 #include "DlAbortEx.h"
72 #include "uri.h"
73 #include "Signature.h"
74 #include "OutputFile.h"
75 #include "download_helper.h"
76 #include "UriListParser.h"
77 #include "SingletonHolder.h"
78 #include "Notifier.h"
79 #include "PeerStat.h"
80 #include "WrDiskCache.h"
81 #include "PieceStorage.h"
82 #include "DiskAdaptor.h"
83 #include "SimpleRandomizer.h"
84 #include "array_fun.h"
85 #include "OpenedFileCounter.h"
86 #include "wallclock.h"
87 #include "RpcMethodImpl.h"
88 #ifdef ENABLE_BITTORRENT
89 #  include "bittorrent_helper.h"
90 #endif // ENABLE_BITTORRENT
91 
92 namespace aria2 {
93 
94 namespace {
95 template <typename InputIterator>
appendReservedGroup(RequestGroupList & list,InputIterator first,InputIterator last)96 void appendReservedGroup(RequestGroupList& list, InputIterator first,
97                          InputIterator last)
98 {
99   for (; first != last; ++first) {
100     list.push_back((*first)->getGID(), *first);
101   }
102 }
103 } // namespace
104 
RequestGroupMan(std::vector<std::shared_ptr<RequestGroup>> requestGroups,int maxConcurrentDownloads,const Option * option)105 RequestGroupMan::RequestGroupMan(
106     std::vector<std::shared_ptr<RequestGroup>> requestGroups,
107     int maxConcurrentDownloads, const Option* option)
108     : maxConcurrentDownloads_(maxConcurrentDownloads),
109       optimizeConcurrentDownloads_(false),
110       optimizeConcurrentDownloadsCoeffA_(5.),
111       optimizeConcurrentDownloadsCoeffB_(25.),
112       optimizationSpeed_(0),
113       numActive_(0),
114       option_(option),
115       serverStatMan_(std::make_shared<ServerStatMan>()),
116       maxOverallDownloadSpeedLimit_(
117           option->getAsInt(PREF_MAX_OVERALL_DOWNLOAD_LIMIT)),
118       maxOverallUploadSpeedLimit_(
119           option->getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT)),
120       keepRunning_(option->getAsBool(PREF_ENABLE_RPC)),
121       queueCheck_(true),
122       removedErrorResult_(0),
123       removedLastErrorResult_(error_code::FINISHED),
124       maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)),
125       openedFileCounter_(std::make_shared<OpenedFileCounter>(
126           this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
127       numStoppedTotal_(0)
128 {
129   setupOptimizeConcurrentDownloads();
130   appendReservedGroup(reservedGroups_, requestGroups.begin(),
131                       requestGroups.end());
132 }
133 
~RequestGroupMan()134 RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
135 
setupOptimizeConcurrentDownloads(void)136 bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
137 {
138   optimizeConcurrentDownloads_ =
139       option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
140   if (optimizeConcurrentDownloads_) {
141     if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) {
142       optimizeConcurrentDownloadsCoeffA_ = strtod(
143           option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA).c_str(),
144           nullptr);
145       optimizeConcurrentDownloadsCoeffB_ = strtod(
146           option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB).c_str(),
147           nullptr);
148     }
149   }
150   return optimizeConcurrentDownloads_;
151 }
152 
downloadFinished()153 bool RequestGroupMan::downloadFinished()
154 {
155   if (keepRunning_) {
156     return false;
157   }
158   return requestGroups_.empty() && reservedGroups_.empty();
159 }
160 
addRequestGroup(const std::shared_ptr<RequestGroup> & group)161 void RequestGroupMan::addRequestGroup(
162     const std::shared_ptr<RequestGroup>& group)
163 {
164   ++numActive_;
165   requestGroups_.push_back(group->getGID(), group);
166 }
167 
addReservedGroup(const std::vector<std::shared_ptr<RequestGroup>> & groups)168 void RequestGroupMan::addReservedGroup(
169     const std::vector<std::shared_ptr<RequestGroup>>& groups)
170 {
171   requestQueueCheck();
172   appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
173 }
174 
addReservedGroup(const std::shared_ptr<RequestGroup> & group)175 void RequestGroupMan::addReservedGroup(
176     const std::shared_ptr<RequestGroup>& group)
177 {
178   requestQueueCheck();
179   reservedGroups_.push_back(group->getGID(), group);
180 }
181 
182 namespace {
183 struct RequestGroupKeyFunc {
operator ()aria2::__anonbd76614a0211::RequestGroupKeyFunc184   a2_gid_t operator()(const std::shared_ptr<RequestGroup>& rg) const
185   {
186     return rg->getGID();
187   }
188 };
189 } // namespace
190 
insertReservedGroup(size_t pos,const std::vector<std::shared_ptr<RequestGroup>> & groups)191 void RequestGroupMan::insertReservedGroup(
192     size_t pos, const std::vector<std::shared_ptr<RequestGroup>>& groups)
193 {
194   requestQueueCheck();
195   pos = std::min(reservedGroups_.size(), pos);
196   reservedGroups_.insert(pos, RequestGroupKeyFunc(), groups.begin(),
197                          groups.end());
198 }
199 
insertReservedGroup(size_t pos,const std::shared_ptr<RequestGroup> & group)200 void RequestGroupMan::insertReservedGroup(
201     size_t pos, const std::shared_ptr<RequestGroup>& group)
202 {
203   requestQueueCheck();
204   pos = std::min(reservedGroups_.size(), pos);
205   reservedGroups_.insert(pos, group->getGID(), group);
206 }
207 
countRequestGroup() const208 size_t RequestGroupMan::countRequestGroup() const
209 {
210   return requestGroups_.size();
211 }
212 
findGroup(a2_gid_t gid) const213 std::shared_ptr<RequestGroup> RequestGroupMan::findGroup(a2_gid_t gid) const
214 {
215   std::shared_ptr<RequestGroup> rg = requestGroups_.get(gid);
216   if (!rg) {
217     rg = reservedGroups_.get(gid);
218   }
219   return rg;
220 }
221 
changeReservedGroupPosition(a2_gid_t gid,int pos,OffsetMode how)222 size_t RequestGroupMan::changeReservedGroupPosition(a2_gid_t gid, int pos,
223                                                     OffsetMode how)
224 {
225   ssize_t dest = reservedGroups_.move(gid, pos, how);
226   if (dest == -1) {
227     throw DL_ABORT_EX(fmt("GID#%s not found in the waiting queue.",
228                           GroupId::toHex(gid).c_str()));
229   }
230   else {
231     return dest;
232   }
233 }
234 
removeReservedGroup(a2_gid_t gid)235 bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
236 {
237   return reservedGroups_.remove(gid);
238 }
239 
240 namespace {
241 
notifyDownloadEvent(DownloadEvent event,const std::shared_ptr<RequestGroup> & group)242 void notifyDownloadEvent(DownloadEvent event,
243                          const std::shared_ptr<RequestGroup>& group)
244 {
245   // Check NULL to make unit test easier.
246   if (SingletonHolder<Notifier>::instance()) {
247     SingletonHolder<Notifier>::instance()->notifyDownloadEvent(event, group);
248   }
249 }
250 
251 } // namespace
252 
253 namespace {
254 
executeStopHook(const std::shared_ptr<RequestGroup> & group,const Option * option,error_code::Value result)255 void executeStopHook(const std::shared_ptr<RequestGroup>& group,
256                      const Option* option, error_code::Value result)
257 {
258   PrefPtr hookPref = nullptr;
259   if (!option->blank(PREF_ON_DOWNLOAD_STOP)) {
260     hookPref = PREF_ON_DOWNLOAD_STOP;
261   }
262   if (result == error_code::FINISHED) {
263     if (!option->blank(PREF_ON_DOWNLOAD_COMPLETE)) {
264       hookPref = PREF_ON_DOWNLOAD_COMPLETE;
265     }
266   }
267   else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
268     if (!option->blank(PREF_ON_DOWNLOAD_ERROR)) {
269       hookPref = PREF_ON_DOWNLOAD_ERROR;
270     }
271   }
272   if (hookPref) {
273     util::executeHookByOptName(group, option, hookPref);
274   }
275 
276   if (result == error_code::FINISHED) {
277     notifyDownloadEvent(EVENT_ON_DOWNLOAD_COMPLETE, group);
278   }
279   else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
280     notifyDownloadEvent(EVENT_ON_DOWNLOAD_ERROR, group);
281   }
282   else {
283     notifyDownloadEvent(EVENT_ON_DOWNLOAD_STOP, group);
284   }
285 }
286 
287 } // namespace
288 
289 namespace {
290 class ProcessStoppedRequestGroup {
291 private:
292   DownloadEngine* e_;
293   RequestGroupList& reservedGroups_;
294 
saveSignature(const std::shared_ptr<RequestGroup> & group)295   void saveSignature(const std::shared_ptr<RequestGroup>& group)
296   {
297     auto& sig = group->getDownloadContext()->getSignature();
298     if (sig && !sig->getBody().empty()) {
299       // filename of signature file is the path to download file followed by
300       // ".sig".
301       std::string signatureFile = group->getFirstFilePath() + ".sig";
302       if (sig->save(signatureFile)) {
303         A2_LOG_NOTICE(fmt(MSG_SIGNATURE_SAVED, signatureFile.c_str()));
304       }
305       else {
306         A2_LOG_NOTICE(fmt(MSG_SIGNATURE_NOT_SAVED, signatureFile.c_str()));
307       }
308     }
309   }
310 
311   // Collect statistics during download in PeerStats and update/register
312   // ServerStatMan
collectStat(const std::shared_ptr<RequestGroup> & group)313   void collectStat(const std::shared_ptr<RequestGroup>& group)
314   {
315     if (group->getSegmentMan()) {
316       bool singleConnection =
317           group->getSegmentMan()->getPeerStats().size() == 1;
318       const std::vector<std::shared_ptr<PeerStat>>& peerStats =
319           group->getSegmentMan()->getFastestPeerStats();
320       for (auto& stat : peerStats) {
321         if (stat->getHostname().empty() || stat->getProtocol().empty()) {
322           continue;
323         }
324         int speed = stat->getAvgDownloadSpeed();
325         if (speed == 0)
326           continue;
327 
328         std::shared_ptr<ServerStat> ss =
329             e_->getRequestGroupMan()->getOrCreateServerStat(
330                 stat->getHostname(), stat->getProtocol());
331         ss->increaseCounter();
332         ss->updateDownloadSpeed(speed);
333         if (singleConnection) {
334           ss->updateSingleConnectionAvgSpeed(speed);
335         }
336         else {
337           ss->updateMultiConnectionAvgSpeed(speed);
338         }
339       }
340     }
341   }
342 
343 public:
ProcessStoppedRequestGroup(DownloadEngine * e,RequestGroupList & reservedGroups)344   ProcessStoppedRequestGroup(DownloadEngine* e,
345                              RequestGroupList& reservedGroups)
346       : e_(e), reservedGroups_(reservedGroups)
347   {
348   }
349 
operator ()(const RequestGroupList::value_type & group)350   bool operator()(const RequestGroupList::value_type& group)
351   {
352     if (group->getNumCommand() == 0) {
353       collectStat(group);
354       const std::shared_ptr<DownloadContext>& dctx =
355           group->getDownloadContext();
356 
357       if (!group->isSeedOnlyEnabled()) {
358         e_->getRequestGroupMan()->decreaseNumActive();
359       }
360 
361       // DownloadContext::resetDownloadStopTime() is only called when
362       // download completed. If
363       // DownloadContext::getDownloadStopTime().isZero() is true, then
364       // there is a possibility that the download is error or
365       // in-progress and resetDownloadStopTime() is not called. So
366       // call it here.
367       if (dctx->getDownloadStopTime().isZero()) {
368         dctx->resetDownloadStopTime();
369       }
370       try {
371         group->closeFile();
372         if (group->isPauseRequested()) {
373           if (!group->isRestartRequested()) {
374             A2_LOG_NOTICE(fmt(_("Download GID#%s paused"),
375                               GroupId::toHex(group->getGID()).c_str()));
376           }
377           group->saveControlFile();
378         }
379         else if (group->downloadFinished() &&
380                  !group->getDownloadContext()->isChecksumVerificationNeeded()) {
381           group->applyLastModifiedTimeToLocalFiles();
382           group->reportDownloadFinished();
383           if (group->allDownloadFinished() &&
384               !group->getOption()->getAsBool(PREF_FORCE_SAVE)) {
385             group->removeControlFile();
386             saveSignature(group);
387           }
388           else {
389             group->saveControlFile();
390           }
391           std::vector<std::shared_ptr<RequestGroup>> nextGroups;
392           group->postDownloadProcessing(nextGroups);
393           if (!nextGroups.empty()) {
394             A2_LOG_DEBUG(fmt("Adding %lu RequestGroups as a result of"
395                              " PostDownloadHandler.",
396                              static_cast<unsigned long>(nextGroups.size())));
397             e_->getRequestGroupMan()->insertReservedGroup(0, nextGroups);
398           }
399 #ifdef ENABLE_BITTORRENT
400           // For in-memory download (e.g., Magnet URI), the
401           // FileEntry::getPath() does not return actual file path, so
402           // we don't remove it.
403           if (group->getOption()->getAsBool(PREF_BT_REMOVE_UNSELECTED_FILE) &&
404               !group->inMemoryDownload() && dctx->hasAttribute(CTX_ATTR_BT)) {
405             A2_LOG_INFO(fmt(MSG_REMOVING_UNSELECTED_FILE,
406                             GroupId::toHex(group->getGID()).c_str()));
407             const std::vector<std::shared_ptr<FileEntry>>& files =
408                 dctx->getFileEntries();
409             for (auto& file : files) {
410               if (!file->isRequested()) {
411                 if (File(file->getPath()).remove()) {
412                   A2_LOG_INFO(fmt(MSG_FILE_REMOVED, file->getPath().c_str()));
413                 }
414                 else {
415                   A2_LOG_INFO(
416                       fmt(MSG_FILE_COULD_NOT_REMOVED, file->getPath().c_str()));
417                 }
418               }
419             }
420           }
421 #endif // ENABLE_BITTORRENT
422         }
423         else {
424           A2_LOG_NOTICE(
425               fmt(_("Download GID#%s not complete: %s"),
426                   GroupId::toHex(group->getGID()).c_str(),
427                   group->getDownloadContext()->getBasePath().c_str()));
428           group->saveControlFile();
429         }
430       }
431       catch (RecoverableException& ex) {
432         A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
433       }
434       if (group->isPauseRequested()) {
435         group->setState(RequestGroup::STATE_WAITING);
436         reservedGroups_.push_front(group->getGID(), group);
437         group->releaseRuntimeResource(e_);
438         group->setForceHaltRequested(false);
439 
440         auto pendingOption = group->getPendingOption();
441         if (pendingOption) {
442           changeOption(group, *pendingOption, e_);
443         }
444 
445         if (group->isRestartRequested()) {
446           group->setPauseRequested(false);
447         }
448         else {
449           util::executeHookByOptName(group, e_->getOption(),
450                                      PREF_ON_DOWNLOAD_PAUSE);
451           notifyDownloadEvent(EVENT_ON_DOWNLOAD_PAUSE, group);
452         }
453         // TODO Should we have to prepend spend uris to remaining uris
454         // in case PREF_REUSE_URI is disabled?
455       }
456       else {
457         std::shared_ptr<DownloadResult> dr = group->createDownloadResult();
458         e_->getRequestGroupMan()->addDownloadResult(dr);
459         executeStopHook(group, e_->getOption(), dr->result);
460         group->releaseRuntimeResource(e_);
461       }
462 
463       group->setRestartRequested(false);
464       group->setPendingOption(nullptr);
465 
466       return true;
467     }
468     else {
469       return false;
470     }
471   }
472 };
473 } // namespace
474 
removeStoppedGroup(DownloadEngine * e)475 void RequestGroupMan::removeStoppedGroup(DownloadEngine* e)
476 {
477   size_t numPrev = requestGroups_.size();
478   requestGroups_.remove_if(ProcessStoppedRequestGroup(e, reservedGroups_));
479   size_t numRemoved = numPrev - requestGroups_.size();
480   if (numRemoved > 0) {
481     A2_LOG_DEBUG(fmt("%lu RequestGroup(s) deleted.",
482                      static_cast<unsigned long>(numRemoved)));
483   }
484 }
485 
configureRequestGroup(const std::shared_ptr<RequestGroup> & requestGroup) const486 void RequestGroupMan::configureRequestGroup(
487     const std::shared_ptr<RequestGroup>& requestGroup) const
488 {
489   const std::string& uriSelectorValue =
490       requestGroup->getOption()->get(PREF_URI_SELECTOR);
491   if (uriSelectorValue == V_FEEDBACK) {
492     requestGroup->setURISelector(
493         make_unique<FeedbackURISelector>(serverStatMan_));
494   }
495   else if (uriSelectorValue == V_INORDER) {
496     requestGroup->setURISelector(make_unique<InorderURISelector>());
497   }
498   else if (uriSelectorValue == V_ADAPTIVE) {
499     requestGroup->setURISelector(
500         make_unique<AdaptiveURISelector>(serverStatMan_, requestGroup.get()));
501   }
502 }
503 
504 namespace {
505 std::vector<std::unique_ptr<Command>>
createInitialCommand(const std::shared_ptr<RequestGroup> & requestGroup,DownloadEngine * e)506 createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
507                      DownloadEngine* e)
508 {
509   std::vector<std::unique_ptr<Command>> res;
510   requestGroup->createInitialCommand(res, e);
511   return res;
512 }
513 } // namespace
514 
fillRequestGroupFromReserver(DownloadEngine * e)515 void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
516 {
517   removeStoppedGroup(e);
518 
519   int maxConcurrentDownloads = optimizeConcurrentDownloads_
520                                    ? optimizeConcurrentDownloads()
521                                    : maxConcurrentDownloads_;
522 
523   if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
524     return;
525   }
526   int count = 0;
527   int num = maxConcurrentDownloads - numActive_;
528   std::vector<std::shared_ptr<RequestGroup>> pending;
529 
530   while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
531     if (uriListParser_ && reservedGroups_.empty()) {
532       std::vector<std::shared_ptr<RequestGroup>> groups;
533       // May throw exception
534       bool ok = createRequestGroupFromUriListParser(groups, option_,
535                                                     uriListParser_.get());
536       if (ok) {
537         appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
538       }
539       else {
540         uriListParser_.reset();
541         if (reservedGroups_.empty()) {
542           break;
543         }
544       }
545     }
546     std::shared_ptr<RequestGroup> groupToAdd = *reservedGroups_.begin();
547     reservedGroups_.pop_front();
548     if ((keepRunning_ && groupToAdd->isPauseRequested()) ||
549         !groupToAdd->isDependencyResolved()) {
550       pending.push_back(groupToAdd);
551       continue;
552     }
553     // Drop pieceStorage here because paused download holds its
554     // reference.
555     groupToAdd->dropPieceStorage();
556     configureRequestGroup(groupToAdd);
557     groupToAdd->setRequestGroupMan(this);
558     groupToAdd->setState(RequestGroup::STATE_ACTIVE);
559     ++numActive_;
560     requestGroups_.push_back(groupToAdd->getGID(), groupToAdd);
561     try {
562       auto res = createInitialCommand(groupToAdd, e);
563       ++count;
564       if (res.empty()) {
565         requestQueueCheck();
566       }
567       else {
568         e->addCommand(std::move(res));
569       }
570     }
571     catch (RecoverableException& ex) {
572       A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
573       A2_LOG_DEBUG("Deleting temporal commands.");
574       groupToAdd->setLastErrorCode(ex.getErrorCode(), ex.what());
575       // We add groupToAdd to e later in order to it is processed in
576       // removeStoppedGroup().
577       requestQueueCheck();
578     }
579 
580     util::executeHookByOptName(groupToAdd, e->getOption(),
581                                PREF_ON_DOWNLOAD_START);
582     notifyDownloadEvent(EVENT_ON_DOWNLOAD_START, groupToAdd);
583   }
584   if (!pending.empty()) {
585     reservedGroups_.insert(reservedGroups_.begin(), RequestGroupKeyFunc(),
586                            pending.begin(), pending.end());
587   }
588   if (count > 0) {
589     e->setNoWait(true);
590     e->setRefreshInterval(std::chrono::milliseconds(0));
591     A2_LOG_DEBUG(fmt("%d RequestGroup(s) added.", count));
592   }
593 }
594 
save()595 void RequestGroupMan::save()
596 {
597   for (auto& rg : requestGroups_) {
598     if (rg->allDownloadFinished() &&
599         !rg->getDownloadContext()->isChecksumVerificationNeeded() &&
600         !rg->getOption()->getAsBool(PREF_FORCE_SAVE)) {
601       rg->removeControlFile();
602     }
603     else {
604       try {
605         rg->saveControlFile();
606       }
607       catch (RecoverableException& e) {
608         A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
609       }
610     }
611   }
612 }
613 
closeFile()614 void RequestGroupMan::closeFile()
615 {
616   for (auto& elem : requestGroups_) {
617     elem->closeFile();
618   }
619 }
620 
getDownloadStat() const621 RequestGroupMan::DownloadStat RequestGroupMan::getDownloadStat() const
622 {
623   int finished = 0;
624   int error = removedErrorResult_;
625   int inprogress = 0;
626   int removed = 0;
627   error_code::Value lastError = removedLastErrorResult_;
628   for (auto& dr : downloadResults_) {
629 
630     if (dr->belongsTo != 0) {
631       continue;
632     }
633     if (dr->result == error_code::FINISHED) {
634       ++finished;
635     }
636     else if (dr->result == error_code::IN_PROGRESS) {
637       ++inprogress;
638     }
639     else if (dr->result == error_code::REMOVED) {
640       ++removed;
641     }
642     else {
643       ++error;
644       lastError = dr->result;
645     }
646   }
647   return DownloadStat(error, inprogress, reservedGroups_.size(), lastError);
648 }
649 
650 enum DownloadResultStatus {
651   A2_STATUS_OK,
652   A2_STATUS_INPR,
653   A2_STATUS_RM,
654   A2_STATUS_ERR
655 };
656 
657 namespace {
getStatusStr(DownloadResultStatus status,bool useColor)658 const char* getStatusStr(DownloadResultStatus status, bool useColor)
659 {
660   // status string is formatted in 4 characters wide.
661   switch (status) {
662   case (A2_STATUS_OK):
663     if (useColor) {
664       return "\033[1;32mOK\033[0m  ";
665     }
666     else {
667       return "OK  ";
668     }
669   case (A2_STATUS_INPR):
670     if (useColor) {
671       return "\033[1;34mINPR\033[0m";
672     }
673     else {
674       return "INPR";
675     }
676   case (A2_STATUS_RM):
677     if (useColor) {
678       return "\033[1mRM\033[0m  ";
679     }
680     else {
681       return "RM  ";
682     }
683   case (A2_STATUS_ERR):
684     if (useColor) {
685       return "\033[1;31mERR\033[0m ";
686     }
687     else {
688       return "ERR ";
689     }
690   default:
691     return "";
692   }
693 }
694 } // namespace
695 
showDownloadResults(OutputFile & o,bool full) const696 void RequestGroupMan::showDownloadResults(OutputFile& o, bool full) const
697 {
698   int pathRowSize = 55;
699   // Download Results:
700   // idx|stat|path/length
701   // ===+====+=======================================================================
702   o.printf("\n%s"
703            "\ngid   |stat|avg speed  |",
704            _("Download Results:"));
705   if (full) {
706     o.write("  %|path/URI"
707             "\n======+====+===========+===+");
708     pathRowSize -= 4;
709   }
710   else {
711     o.write("path/URI"
712             "\n======+====+===========+");
713   }
714   std::string line(pathRowSize, '=');
715   o.printf("%s\n", line.c_str());
716   bool useColor = o.supportsColor() && option_->getAsBool(PREF_ENABLE_COLOR);
717   int ok = 0;
718   int err = 0;
719   int inpr = 0;
720   int rm = 0;
721   for (auto& dr : downloadResults_) {
722 
723     if (dr->belongsTo != 0) {
724       continue;
725     }
726     const char* status;
727     switch (dr->result) {
728     case error_code::FINISHED:
729       status = getStatusStr(A2_STATUS_OK, useColor);
730       ++ok;
731       break;
732     case error_code::IN_PROGRESS:
733       status = getStatusStr(A2_STATUS_INPR, useColor);
734       ++inpr;
735       break;
736     case error_code::REMOVED:
737       status = getStatusStr(A2_STATUS_RM, useColor);
738       ++rm;
739       break;
740     default:
741       status = getStatusStr(A2_STATUS_ERR, useColor);
742       ++err;
743     }
744     if (full) {
745       formatDownloadResultFull(o, status, dr);
746     }
747     else {
748       o.write(formatDownloadResult(status, dr).c_str());
749       o.write("\n");
750     }
751   }
752   if (ok > 0 || err > 0 || inpr > 0 || rm > 0) {
753     o.printf("\n%s\n", _("Status Legend:"));
754     if (ok > 0) {
755       o.write(_("(OK):download completed."));
756     }
757     if (err > 0) {
758       o.write(_("(ERR):error occurred."));
759     }
760     if (inpr > 0) {
761       o.write(_("(INPR):download in-progress."));
762     }
763     if (rm > 0) {
764       o.write(_("(RM):download removed."));
765     }
766     o.write("\n");
767   }
768 }
769 
770 namespace {
formatDownloadResultCommon(std::ostream & o,const char * status,const std::shared_ptr<DownloadResult> & downloadResult)771 void formatDownloadResultCommon(
772     std::ostream& o, const char* status,
773     const std::shared_ptr<DownloadResult>& downloadResult)
774 {
775   o << std::setw(3) << downloadResult->gid->toAbbrevHex() << "|" << std::setw(4)
776     << status << "|";
777   if (downloadResult->sessionTime.count() > 0) {
778     o << std::setw(8)
779       << util::abbrevSize(downloadResult->sessionDownloadLength * 1000 /
780                           downloadResult->sessionTime.count())
781       << "B/s";
782   }
783   else {
784     o << std::setw(11);
785     o << "n/a";
786   }
787   o << "|";
788 }
789 } // namespace
790 
formatDownloadResultFull(OutputFile & out,const char * status,const std::shared_ptr<DownloadResult> & downloadResult) const791 void RequestGroupMan::formatDownloadResultFull(
792     OutputFile& out, const char* status,
793     const std::shared_ptr<DownloadResult>& downloadResult) const
794 {
795   BitfieldMan bt(downloadResult->pieceLength, downloadResult->totalLength);
796   bt.setBitfield(
797       reinterpret_cast<const unsigned char*>(downloadResult->bitfield.data()),
798       downloadResult->bitfield.size());
799   bool head = true;
800   const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
801       downloadResult->fileEntries;
802   for (auto& f : fileEntries) {
803     if (!f->isRequested()) {
804       continue;
805     }
806     std::stringstream o;
807     if (head) {
808       formatDownloadResultCommon(o, status, downloadResult);
809       head = false;
810     }
811     else {
812       o << "   |    |           |";
813     }
814     if (f->getLength() == 0 || downloadResult->bitfield.empty()) {
815       o << "  -|";
816     }
817     else {
818       int64_t completedLength =
819           bt.getOffsetCompletedLength(f->getOffset(), f->getLength());
820       o << std::setw(3) << 100 * completedLength / f->getLength() << "|";
821     }
822     writeFilePath(o, f, downloadResult->inMemoryDownload);
823     o << "\n";
824     out.write(o.str().c_str());
825   }
826   if (head) {
827     std::stringstream o;
828     formatDownloadResultCommon(o, status, downloadResult);
829     o << "  -|n/a\n";
830     out.write(o.str().c_str());
831   }
832 }
833 
formatDownloadResult(const char * status,const std::shared_ptr<DownloadResult> & downloadResult) const834 std::string RequestGroupMan::formatDownloadResult(
835     const char* status,
836     const std::shared_ptr<DownloadResult>& downloadResult) const
837 {
838   std::stringstream o;
839   formatDownloadResultCommon(o, status, downloadResult);
840   const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
841       downloadResult->fileEntries;
842   writeFilePath(fileEntries.begin(), fileEntries.end(), o,
843                 downloadResult->inMemoryDownload);
844   return o.str();
845 }
846 
847 namespace {
848 template <typename StringInputIterator, typename FileEntryInputIterator>
sameFilePathExists(StringInputIterator sfirst,StringInputIterator slast,FileEntryInputIterator ffirst,FileEntryInputIterator flast)849 bool sameFilePathExists(StringInputIterator sfirst, StringInputIterator slast,
850                         FileEntryInputIterator ffirst,
851                         FileEntryInputIterator flast)
852 {
853   for (; ffirst != flast; ++ffirst) {
854     if (std::binary_search(sfirst, slast, (*ffirst)->getPath())) {
855       return true;
856     }
857   }
858   return false;
859 }
860 } // namespace
861 
isSameFileBeingDownloaded(RequestGroup * requestGroup) const862 bool RequestGroupMan::isSameFileBeingDownloaded(
863     RequestGroup* requestGroup) const
864 {
865   // TODO it may be good to use dedicated method rather than use
866   // isPreLocalFileCheckEnabled
867   if (!requestGroup->isPreLocalFileCheckEnabled()) {
868     return false;
869   }
870   std::vector<std::string> files;
871   for (auto& rg : requestGroups_) {
872     if (rg.get() != requestGroup) {
873       const std::vector<std::shared_ptr<FileEntry>>& entries =
874           rg->getDownloadContext()->getFileEntries();
875       std::transform(entries.begin(), entries.end(), std::back_inserter(files),
876                      std::mem_fn(&FileEntry::getPath));
877     }
878   }
879   std::sort(files.begin(), files.end());
880   const std::vector<std::shared_ptr<FileEntry>>& entries =
881       requestGroup->getDownloadContext()->getFileEntries();
882   return sameFilePathExists(files.begin(), files.end(), entries.begin(),
883                             entries.end());
884 }
885 
halt()886 void RequestGroupMan::halt()
887 {
888   for (auto& elem : requestGroups_) {
889     elem->setHaltRequested(true);
890   }
891 }
892 
forceHalt()893 void RequestGroupMan::forceHalt()
894 {
895   for (auto& elem : requestGroups_) {
896     elem->setForceHaltRequested(true);
897   }
898 }
899 
calculateStat()900 TransferStat RequestGroupMan::calculateStat()
901 {
902   // TODO Currently, all time upload length is not set.
903   return netStat_.toTransferStat();
904 }
905 
906 std::shared_ptr<DownloadResult>
findDownloadResult(a2_gid_t gid) const907 RequestGroupMan::findDownloadResult(a2_gid_t gid) const
908 {
909   return downloadResults_.get(gid);
910 }
911 
removeDownloadResult(a2_gid_t gid)912 bool RequestGroupMan::removeDownloadResult(a2_gid_t gid)
913 {
914   return downloadResults_.remove(gid);
915 }
916 
addDownloadResult(const std::shared_ptr<DownloadResult> & dr)917 void RequestGroupMan::addDownloadResult(
918     const std::shared_ptr<DownloadResult>& dr)
919 {
920   ++numStoppedTotal_;
921   bool rv = downloadResults_.push_back(dr->gid->getNumericId(), dr);
922   assert(rv);
923   while (downloadResults_.size() > maxDownloadResult_) {
924     // Save last encountered error code so that we can report it
925     // later.
926     const auto& dr = downloadResults_[0];
927     if (dr->belongsTo == 0 && dr->result != error_code::FINISHED) {
928       removedLastErrorResult_ = dr->result;
929       ++removedErrorResult_;
930 
931       // Keep unfinished download result, so that we can save them by
932       // SessionSerializer.
933       if (option_->getAsBool(PREF_KEEP_UNFINISHED_DOWNLOAD_RESULT)) {
934         if (dr->result != error_code::REMOVED ||
935             dr->option->getAsBool(PREF_FORCE_SAVE)) {
936           unfinishedDownloadResults_.push_back(dr);
937         }
938       }
939     }
940     downloadResults_.pop_front();
941   }
942 }
943 
purgeDownloadResult()944 void RequestGroupMan::purgeDownloadResult() { downloadResults_.clear(); }
945 
946 std::shared_ptr<ServerStat>
findServerStat(const std::string & hostname,const std::string & protocol) const947 RequestGroupMan::findServerStat(const std::string& hostname,
948                                 const std::string& protocol) const
949 {
950   return serverStatMan_->find(hostname, protocol);
951 }
952 
953 std::shared_ptr<ServerStat>
getOrCreateServerStat(const std::string & hostname,const std::string & protocol)954 RequestGroupMan::getOrCreateServerStat(const std::string& hostname,
955                                        const std::string& protocol)
956 {
957   std::shared_ptr<ServerStat> ss = findServerStat(hostname, protocol);
958   if (!ss) {
959     ss = std::make_shared<ServerStat>(hostname, protocol);
960     addServerStat(ss);
961   }
962   return ss;
963 }
964 
addServerStat(const std::shared_ptr<ServerStat> & serverStat)965 bool RequestGroupMan::addServerStat(
966     const std::shared_ptr<ServerStat>& serverStat)
967 {
968   return serverStatMan_->add(serverStat);
969 }
970 
loadServerStat(const std::string & filename)971 bool RequestGroupMan::loadServerStat(const std::string& filename)
972 {
973   return serverStatMan_->load(filename);
974 }
975 
saveServerStat(const std::string & filename) const976 bool RequestGroupMan::saveServerStat(const std::string& filename) const
977 {
978   return serverStatMan_->save(filename);
979 }
980 
removeStaleServerStat(const std::chrono::seconds & timeout)981 void RequestGroupMan::removeStaleServerStat(const std::chrono::seconds& timeout)
982 {
983   serverStatMan_->removeStaleServerStat(timeout);
984 }
985 
doesOverallDownloadSpeedExceed()986 bool RequestGroupMan::doesOverallDownloadSpeedExceed()
987 {
988   return maxOverallDownloadSpeedLimit_ > 0 &&
989          maxOverallDownloadSpeedLimit_ < netStat_.calculateDownloadSpeed();
990 }
991 
doesOverallUploadSpeedExceed()992 bool RequestGroupMan::doesOverallUploadSpeedExceed()
993 {
994   return maxOverallUploadSpeedLimit_ > 0 &&
995          maxOverallUploadSpeedLimit_ < netStat_.calculateUploadSpeed();
996 }
997 
getUsedHosts(std::vector<std::pair<size_t,std::string>> & usedHosts)998 void RequestGroupMan::getUsedHosts(
999     std::vector<std::pair<size_t, std::string>>& usedHosts)
1000 {
1001   // vector of tuple which consists of use count, -download speed,
1002   // hostname. We want to sort by least used and faster download
1003   // speed. We use -download speed so that we can sort them using
1004   // operator<().
1005   std::vector<std::tuple<size_t, int, std::string>> tempHosts;
1006   for (const auto& rg : requestGroups_) {
1007     const auto& inFlightReqs =
1008         rg->getDownloadContext()->getFirstFileEntry()->getInFlightRequests();
1009     for (const auto& req : inFlightReqs) {
1010       uri_split_result us;
1011       if (uri_split(&us, req->getUri().c_str()) == 0) {
1012         std::string host =
1013             uri::getFieldString(us, USR_HOST, req->getUri().c_str());
1014         auto k = tempHosts.begin();
1015         auto eok = tempHosts.end();
1016         for (; k != eok; ++k) {
1017           if (std::get<2>(*k) == host) {
1018             ++std::get<0>(*k);
1019             break;
1020           }
1021         }
1022         if (k == eok) {
1023           std::string protocol =
1024               uri::getFieldString(us, USR_SCHEME, req->getUri().c_str());
1025           auto ss = findServerStat(host, protocol);
1026           int invDlSpeed = (ss && ss->isOK())
1027                                ? -(static_cast<int>(ss->getDownloadSpeed()))
1028                                : 0;
1029           tempHosts.emplace_back(1, invDlSpeed, host);
1030         }
1031       }
1032     }
1033   }
1034   std::sort(tempHosts.begin(), tempHosts.end());
1035   std::transform(tempHosts.begin(), tempHosts.end(),
1036                  std::back_inserter(usedHosts),
1037                  [](const std::tuple<size_t, int, std::string>& x) {
1038                    return std::make_pair(std::get<0>(x), std::get<2>(x));
1039                  });
1040 }
1041 
setUriListParser(const std::shared_ptr<UriListParser> & uriListParser)1042 void RequestGroupMan::setUriListParser(
1043     const std::shared_ptr<UriListParser>& uriListParser)
1044 {
1045   uriListParser_ = uriListParser;
1046 }
1047 
initWrDiskCache()1048 void RequestGroupMan::initWrDiskCache()
1049 {
1050   assert(!wrDiskCache_);
1051   size_t limit = option_->getAsInt(PREF_DISK_CACHE);
1052   if (limit > 0) {
1053     wrDiskCache_ = make_unique<WrDiskCache>(limit);
1054   }
1055 }
1056 
decreaseNumActive()1057 void RequestGroupMan::decreaseNumActive()
1058 {
1059   assert(numActive_ > 0);
1060   --numActive_;
1061 }
1062 
optimizeConcurrentDownloads()1063 int RequestGroupMan::optimizeConcurrentDownloads()
1064 {
1065   // gauge the current speed
1066   int currentSpeed = getNetStat().calculateDownloadSpeed();
1067 
1068   const auto& now = global::wallclock();
1069   if (currentSpeed >= optimizationSpeed_) {
1070     optimizationSpeed_ = currentSpeed;
1071     optimizationSpeedTimer_ = now;
1072   }
1073   else if (std::chrono::duration_cast<std::chrono::seconds>(
1074                optimizationSpeedTimer_.difference(now)) >= 5_s) {
1075     // we keep using the reference speed for minimum 5 seconds so reset the
1076     // timer
1077     optimizationSpeedTimer_ = now;
1078 
1079     // keep the reference speed as long as the speed tends to augment or to
1080     // maintain itself within 10%
1081     if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
1082       // else assume a possible congestion and record a new optimization speed
1083       // by dichotomy
1084       optimizationSpeed_ = (optimizationSpeed_ + currentSpeed) / 2.;
1085     }
1086   }
1087 
1088   if (optimizationSpeed_ <= 0) {
1089     return optimizeConcurrentDownloadsCoeffA_;
1090   }
1091 
1092   // apply the rule
1093   if ((maxOverallDownloadSpeedLimit_ > 0) &&
1094       (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
1095     optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
1096   }
1097   int maxConcurrentDownloads =
1098       ceil(optimizeConcurrentDownloadsCoeffA_ +
1099            optimizeConcurrentDownloadsCoeffB_ *
1100                log10(optimizationSpeed_ * 8. / 1000000.));
1101 
1102   // bring the value in bound between 1 and the defined maximum
1103   maxConcurrentDownloads =
1104       std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
1105 
1106   A2_LOG_DEBUG(
1107       fmt("Max concurrent downloads optimized at %d (%lu currently active) "
1108           "[optimization speed %sB/s, current speed %sB/s]",
1109           maxConcurrentDownloads, static_cast<unsigned long>(numActive_),
1110           util::abbrevSize(optimizationSpeed_).c_str(),
1111           util::abbrevSize(currentSpeed).c_str()));
1112 
1113   return maxConcurrentDownloads;
1114 }
1115 } // namespace aria2
1116