1 /* <!-- copyright */
2 /*
3 * aria2 - The high speed download utility
4 *
5 * Copyright (C) 2006 Tatsuhiro Tsujikawa
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * In addition, as a special exception, the copyright holders give
22 * permission to link the code of portions of this program with the
23 * OpenSSL library under certain conditions as described in each
24 * individual source file, and distribute linked combinations
25 * including the two.
26 * You must obey the GNU General Public License in all respects
27 * for all of the code used other than OpenSSL. If you modify
28 * file(s) with this exception, you may extend this exception to your
29 * version of the file(s), but you are not obligated to do so. If you
30 * do not wish to do so, delete this exception statement from your
31 * version. If you delete this exception statement from all source
32 * files in the program, then also delete it here.
33 */
34 /* copyright --> */
35 #include "RequestGroupMan.h"
36
37 #include <unistd.h>
38 #include <cstring>
39 #include <iomanip>
40 #include <sstream>
41 #include <numeric>
42 #include <algorithm>
43 #include <utility>
44
45 #include "BtProgressInfoFile.h"
46 #include "RecoverableException.h"
47 #include "RequestGroup.h"
48 #include "LogFactory.h"
49 #include "Logger.h"
50 #include "DownloadEngine.h"
51 #include "message.h"
52 #include "a2functional.h"
53 #include "DownloadResult.h"
54 #include "DownloadContext.h"
55 #include "ServerStatMan.h"
56 #include "ServerStat.h"
57 #include "SegmentMan.h"
58 #include "FeedbackURISelector.h"
59 #include "InorderURISelector.h"
60 #include "AdaptiveURISelector.h"
61 #include "Option.h"
62 #include "prefs.h"
63 #include "File.h"
64 #include "util.h"
65 #include "Command.h"
66 #include "FileEntry.h"
67 #include "fmt.h"
68 #include "FileAllocationEntry.h"
69 #include "CheckIntegrityEntry.h"
70 #include "Segment.h"
71 #include "DlAbortEx.h"
72 #include "uri.h"
73 #include "Signature.h"
74 #include "OutputFile.h"
75 #include "download_helper.h"
76 #include "UriListParser.h"
77 #include "SingletonHolder.h"
78 #include "Notifier.h"
79 #include "PeerStat.h"
80 #include "WrDiskCache.h"
81 #include "PieceStorage.h"
82 #include "DiskAdaptor.h"
83 #include "SimpleRandomizer.h"
84 #include "array_fun.h"
85 #include "OpenedFileCounter.h"
86 #include "wallclock.h"
87 #include "RpcMethodImpl.h"
88 #ifdef ENABLE_BITTORRENT
89 # include "bittorrent_helper.h"
90 #endif // ENABLE_BITTORRENT
91
92 namespace aria2 {
93
94 namespace {
95 template <typename InputIterator>
appendReservedGroup(RequestGroupList & list,InputIterator first,InputIterator last)96 void appendReservedGroup(RequestGroupList& list, InputIterator first,
97 InputIterator last)
98 {
99 for (; first != last; ++first) {
100 list.push_back((*first)->getGID(), *first);
101 }
102 }
103 } // namespace
104
RequestGroupMan(std::vector<std::shared_ptr<RequestGroup>> requestGroups,int maxConcurrentDownloads,const Option * option)105 RequestGroupMan::RequestGroupMan(
106 std::vector<std::shared_ptr<RequestGroup>> requestGroups,
107 int maxConcurrentDownloads, const Option* option)
108 : maxConcurrentDownloads_(maxConcurrentDownloads),
109 optimizeConcurrentDownloads_(false),
110 optimizeConcurrentDownloadsCoeffA_(5.),
111 optimizeConcurrentDownloadsCoeffB_(25.),
112 optimizationSpeed_(0),
113 numActive_(0),
114 option_(option),
115 serverStatMan_(std::make_shared<ServerStatMan>()),
116 maxOverallDownloadSpeedLimit_(
117 option->getAsInt(PREF_MAX_OVERALL_DOWNLOAD_LIMIT)),
118 maxOverallUploadSpeedLimit_(
119 option->getAsInt(PREF_MAX_OVERALL_UPLOAD_LIMIT)),
120 keepRunning_(option->getAsBool(PREF_ENABLE_RPC)),
121 queueCheck_(true),
122 removedErrorResult_(0),
123 removedLastErrorResult_(error_code::FINISHED),
124 maxDownloadResult_(option->getAsInt(PREF_MAX_DOWNLOAD_RESULT)),
125 openedFileCounter_(std::make_shared<OpenedFileCounter>(
126 this, option->getAsInt(PREF_BT_MAX_OPEN_FILES))),
127 numStoppedTotal_(0)
128 {
129 setupOptimizeConcurrentDownloads();
130 appendReservedGroup(reservedGroups_, requestGroups.begin(),
131 requestGroups.end());
132 }
133
~RequestGroupMan()134 RequestGroupMan::~RequestGroupMan() { openedFileCounter_->deactivate(); }
135
setupOptimizeConcurrentDownloads(void)136 bool RequestGroupMan::setupOptimizeConcurrentDownloads(void)
137 {
138 optimizeConcurrentDownloads_ =
139 option_->getAsBool(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS);
140 if (optimizeConcurrentDownloads_) {
141 if (option_->defined(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA)) {
142 optimizeConcurrentDownloadsCoeffA_ = strtod(
143 option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFA).c_str(),
144 nullptr);
145 optimizeConcurrentDownloadsCoeffB_ = strtod(
146 option_->get(PREF_OPTIMIZE_CONCURRENT_DOWNLOADS_COEFFB).c_str(),
147 nullptr);
148 }
149 }
150 return optimizeConcurrentDownloads_;
151 }
152
downloadFinished()153 bool RequestGroupMan::downloadFinished()
154 {
155 if (keepRunning_) {
156 return false;
157 }
158 return requestGroups_.empty() && reservedGroups_.empty();
159 }
160
addRequestGroup(const std::shared_ptr<RequestGroup> & group)161 void RequestGroupMan::addRequestGroup(
162 const std::shared_ptr<RequestGroup>& group)
163 {
164 ++numActive_;
165 requestGroups_.push_back(group->getGID(), group);
166 }
167
addReservedGroup(const std::vector<std::shared_ptr<RequestGroup>> & groups)168 void RequestGroupMan::addReservedGroup(
169 const std::vector<std::shared_ptr<RequestGroup>>& groups)
170 {
171 requestQueueCheck();
172 appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
173 }
174
addReservedGroup(const std::shared_ptr<RequestGroup> & group)175 void RequestGroupMan::addReservedGroup(
176 const std::shared_ptr<RequestGroup>& group)
177 {
178 requestQueueCheck();
179 reservedGroups_.push_back(group->getGID(), group);
180 }
181
182 namespace {
183 struct RequestGroupKeyFunc {
operator ()aria2::__anonbd76614a0211::RequestGroupKeyFunc184 a2_gid_t operator()(const std::shared_ptr<RequestGroup>& rg) const
185 {
186 return rg->getGID();
187 }
188 };
189 } // namespace
190
insertReservedGroup(size_t pos,const std::vector<std::shared_ptr<RequestGroup>> & groups)191 void RequestGroupMan::insertReservedGroup(
192 size_t pos, const std::vector<std::shared_ptr<RequestGroup>>& groups)
193 {
194 requestQueueCheck();
195 pos = std::min(reservedGroups_.size(), pos);
196 reservedGroups_.insert(pos, RequestGroupKeyFunc(), groups.begin(),
197 groups.end());
198 }
199
insertReservedGroup(size_t pos,const std::shared_ptr<RequestGroup> & group)200 void RequestGroupMan::insertReservedGroup(
201 size_t pos, const std::shared_ptr<RequestGroup>& group)
202 {
203 requestQueueCheck();
204 pos = std::min(reservedGroups_.size(), pos);
205 reservedGroups_.insert(pos, group->getGID(), group);
206 }
207
countRequestGroup() const208 size_t RequestGroupMan::countRequestGroup() const
209 {
210 return requestGroups_.size();
211 }
212
findGroup(a2_gid_t gid) const213 std::shared_ptr<RequestGroup> RequestGroupMan::findGroup(a2_gid_t gid) const
214 {
215 std::shared_ptr<RequestGroup> rg = requestGroups_.get(gid);
216 if (!rg) {
217 rg = reservedGroups_.get(gid);
218 }
219 return rg;
220 }
221
changeReservedGroupPosition(a2_gid_t gid,int pos,OffsetMode how)222 size_t RequestGroupMan::changeReservedGroupPosition(a2_gid_t gid, int pos,
223 OffsetMode how)
224 {
225 ssize_t dest = reservedGroups_.move(gid, pos, how);
226 if (dest == -1) {
227 throw DL_ABORT_EX(fmt("GID#%s not found in the waiting queue.",
228 GroupId::toHex(gid).c_str()));
229 }
230 else {
231 return dest;
232 }
233 }
234
removeReservedGroup(a2_gid_t gid)235 bool RequestGroupMan::removeReservedGroup(a2_gid_t gid)
236 {
237 return reservedGroups_.remove(gid);
238 }
239
240 namespace {
241
notifyDownloadEvent(DownloadEvent event,const std::shared_ptr<RequestGroup> & group)242 void notifyDownloadEvent(DownloadEvent event,
243 const std::shared_ptr<RequestGroup>& group)
244 {
245 // Check NULL to make unit test easier.
246 if (SingletonHolder<Notifier>::instance()) {
247 SingletonHolder<Notifier>::instance()->notifyDownloadEvent(event, group);
248 }
249 }
250
251 } // namespace
252
253 namespace {
254
executeStopHook(const std::shared_ptr<RequestGroup> & group,const Option * option,error_code::Value result)255 void executeStopHook(const std::shared_ptr<RequestGroup>& group,
256 const Option* option, error_code::Value result)
257 {
258 PrefPtr hookPref = nullptr;
259 if (!option->blank(PREF_ON_DOWNLOAD_STOP)) {
260 hookPref = PREF_ON_DOWNLOAD_STOP;
261 }
262 if (result == error_code::FINISHED) {
263 if (!option->blank(PREF_ON_DOWNLOAD_COMPLETE)) {
264 hookPref = PREF_ON_DOWNLOAD_COMPLETE;
265 }
266 }
267 else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
268 if (!option->blank(PREF_ON_DOWNLOAD_ERROR)) {
269 hookPref = PREF_ON_DOWNLOAD_ERROR;
270 }
271 }
272 if (hookPref) {
273 util::executeHookByOptName(group, option, hookPref);
274 }
275
276 if (result == error_code::FINISHED) {
277 notifyDownloadEvent(EVENT_ON_DOWNLOAD_COMPLETE, group);
278 }
279 else if (result != error_code::IN_PROGRESS && result != error_code::REMOVED) {
280 notifyDownloadEvent(EVENT_ON_DOWNLOAD_ERROR, group);
281 }
282 else {
283 notifyDownloadEvent(EVENT_ON_DOWNLOAD_STOP, group);
284 }
285 }
286
287 } // namespace
288
289 namespace {
290 class ProcessStoppedRequestGroup {
291 private:
292 DownloadEngine* e_;
293 RequestGroupList& reservedGroups_;
294
saveSignature(const std::shared_ptr<RequestGroup> & group)295 void saveSignature(const std::shared_ptr<RequestGroup>& group)
296 {
297 auto& sig = group->getDownloadContext()->getSignature();
298 if (sig && !sig->getBody().empty()) {
299 // filename of signature file is the path to download file followed by
300 // ".sig".
301 std::string signatureFile = group->getFirstFilePath() + ".sig";
302 if (sig->save(signatureFile)) {
303 A2_LOG_NOTICE(fmt(MSG_SIGNATURE_SAVED, signatureFile.c_str()));
304 }
305 else {
306 A2_LOG_NOTICE(fmt(MSG_SIGNATURE_NOT_SAVED, signatureFile.c_str()));
307 }
308 }
309 }
310
311 // Collect statistics during download in PeerStats and update/register
312 // ServerStatMan
collectStat(const std::shared_ptr<RequestGroup> & group)313 void collectStat(const std::shared_ptr<RequestGroup>& group)
314 {
315 if (group->getSegmentMan()) {
316 bool singleConnection =
317 group->getSegmentMan()->getPeerStats().size() == 1;
318 const std::vector<std::shared_ptr<PeerStat>>& peerStats =
319 group->getSegmentMan()->getFastestPeerStats();
320 for (auto& stat : peerStats) {
321 if (stat->getHostname().empty() || stat->getProtocol().empty()) {
322 continue;
323 }
324 int speed = stat->getAvgDownloadSpeed();
325 if (speed == 0)
326 continue;
327
328 std::shared_ptr<ServerStat> ss =
329 e_->getRequestGroupMan()->getOrCreateServerStat(
330 stat->getHostname(), stat->getProtocol());
331 ss->increaseCounter();
332 ss->updateDownloadSpeed(speed);
333 if (singleConnection) {
334 ss->updateSingleConnectionAvgSpeed(speed);
335 }
336 else {
337 ss->updateMultiConnectionAvgSpeed(speed);
338 }
339 }
340 }
341 }
342
343 public:
ProcessStoppedRequestGroup(DownloadEngine * e,RequestGroupList & reservedGroups)344 ProcessStoppedRequestGroup(DownloadEngine* e,
345 RequestGroupList& reservedGroups)
346 : e_(e), reservedGroups_(reservedGroups)
347 {
348 }
349
operator ()(const RequestGroupList::value_type & group)350 bool operator()(const RequestGroupList::value_type& group)
351 {
352 if (group->getNumCommand() == 0) {
353 collectStat(group);
354 const std::shared_ptr<DownloadContext>& dctx =
355 group->getDownloadContext();
356
357 if (!group->isSeedOnlyEnabled()) {
358 e_->getRequestGroupMan()->decreaseNumActive();
359 }
360
361 // DownloadContext::resetDownloadStopTime() is only called when
362 // download completed. If
363 // DownloadContext::getDownloadStopTime().isZero() is true, then
364 // there is a possibility that the download is error or
365 // in-progress and resetDownloadStopTime() is not called. So
366 // call it here.
367 if (dctx->getDownloadStopTime().isZero()) {
368 dctx->resetDownloadStopTime();
369 }
370 try {
371 group->closeFile();
372 if (group->isPauseRequested()) {
373 if (!group->isRestartRequested()) {
374 A2_LOG_NOTICE(fmt(_("Download GID#%s paused"),
375 GroupId::toHex(group->getGID()).c_str()));
376 }
377 group->saveControlFile();
378 }
379 else if (group->downloadFinished() &&
380 !group->getDownloadContext()->isChecksumVerificationNeeded()) {
381 group->applyLastModifiedTimeToLocalFiles();
382 group->reportDownloadFinished();
383 if (group->allDownloadFinished() &&
384 !group->getOption()->getAsBool(PREF_FORCE_SAVE)) {
385 group->removeControlFile();
386 saveSignature(group);
387 }
388 else {
389 group->saveControlFile();
390 }
391 std::vector<std::shared_ptr<RequestGroup>> nextGroups;
392 group->postDownloadProcessing(nextGroups);
393 if (!nextGroups.empty()) {
394 A2_LOG_DEBUG(fmt("Adding %lu RequestGroups as a result of"
395 " PostDownloadHandler.",
396 static_cast<unsigned long>(nextGroups.size())));
397 e_->getRequestGroupMan()->insertReservedGroup(0, nextGroups);
398 }
399 #ifdef ENABLE_BITTORRENT
400 // For in-memory download (e.g., Magnet URI), the
401 // FileEntry::getPath() does not return actual file path, so
402 // we don't remove it.
403 if (group->getOption()->getAsBool(PREF_BT_REMOVE_UNSELECTED_FILE) &&
404 !group->inMemoryDownload() && dctx->hasAttribute(CTX_ATTR_BT)) {
405 A2_LOG_INFO(fmt(MSG_REMOVING_UNSELECTED_FILE,
406 GroupId::toHex(group->getGID()).c_str()));
407 const std::vector<std::shared_ptr<FileEntry>>& files =
408 dctx->getFileEntries();
409 for (auto& file : files) {
410 if (!file->isRequested()) {
411 if (File(file->getPath()).remove()) {
412 A2_LOG_INFO(fmt(MSG_FILE_REMOVED, file->getPath().c_str()));
413 }
414 else {
415 A2_LOG_INFO(
416 fmt(MSG_FILE_COULD_NOT_REMOVED, file->getPath().c_str()));
417 }
418 }
419 }
420 }
421 #endif // ENABLE_BITTORRENT
422 }
423 else {
424 A2_LOG_NOTICE(
425 fmt(_("Download GID#%s not complete: %s"),
426 GroupId::toHex(group->getGID()).c_str(),
427 group->getDownloadContext()->getBasePath().c_str()));
428 group->saveControlFile();
429 }
430 }
431 catch (RecoverableException& ex) {
432 A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
433 }
434 if (group->isPauseRequested()) {
435 group->setState(RequestGroup::STATE_WAITING);
436 reservedGroups_.push_front(group->getGID(), group);
437 group->releaseRuntimeResource(e_);
438 group->setForceHaltRequested(false);
439
440 auto pendingOption = group->getPendingOption();
441 if (pendingOption) {
442 changeOption(group, *pendingOption, e_);
443 }
444
445 if (group->isRestartRequested()) {
446 group->setPauseRequested(false);
447 }
448 else {
449 util::executeHookByOptName(group, e_->getOption(),
450 PREF_ON_DOWNLOAD_PAUSE);
451 notifyDownloadEvent(EVENT_ON_DOWNLOAD_PAUSE, group);
452 }
453 // TODO Should we have to prepend spend uris to remaining uris
454 // in case PREF_REUSE_URI is disabled?
455 }
456 else {
457 std::shared_ptr<DownloadResult> dr = group->createDownloadResult();
458 e_->getRequestGroupMan()->addDownloadResult(dr);
459 executeStopHook(group, e_->getOption(), dr->result);
460 group->releaseRuntimeResource(e_);
461 }
462
463 group->setRestartRequested(false);
464 group->setPendingOption(nullptr);
465
466 return true;
467 }
468 else {
469 return false;
470 }
471 }
472 };
473 } // namespace
474
removeStoppedGroup(DownloadEngine * e)475 void RequestGroupMan::removeStoppedGroup(DownloadEngine* e)
476 {
477 size_t numPrev = requestGroups_.size();
478 requestGroups_.remove_if(ProcessStoppedRequestGroup(e, reservedGroups_));
479 size_t numRemoved = numPrev - requestGroups_.size();
480 if (numRemoved > 0) {
481 A2_LOG_DEBUG(fmt("%lu RequestGroup(s) deleted.",
482 static_cast<unsigned long>(numRemoved)));
483 }
484 }
485
configureRequestGroup(const std::shared_ptr<RequestGroup> & requestGroup) const486 void RequestGroupMan::configureRequestGroup(
487 const std::shared_ptr<RequestGroup>& requestGroup) const
488 {
489 const std::string& uriSelectorValue =
490 requestGroup->getOption()->get(PREF_URI_SELECTOR);
491 if (uriSelectorValue == V_FEEDBACK) {
492 requestGroup->setURISelector(
493 make_unique<FeedbackURISelector>(serverStatMan_));
494 }
495 else if (uriSelectorValue == V_INORDER) {
496 requestGroup->setURISelector(make_unique<InorderURISelector>());
497 }
498 else if (uriSelectorValue == V_ADAPTIVE) {
499 requestGroup->setURISelector(
500 make_unique<AdaptiveURISelector>(serverStatMan_, requestGroup.get()));
501 }
502 }
503
504 namespace {
505 std::vector<std::unique_ptr<Command>>
createInitialCommand(const std::shared_ptr<RequestGroup> & requestGroup,DownloadEngine * e)506 createInitialCommand(const std::shared_ptr<RequestGroup>& requestGroup,
507 DownloadEngine* e)
508 {
509 std::vector<std::unique_ptr<Command>> res;
510 requestGroup->createInitialCommand(res, e);
511 return res;
512 }
513 } // namespace
514
fillRequestGroupFromReserver(DownloadEngine * e)515 void RequestGroupMan::fillRequestGroupFromReserver(DownloadEngine* e)
516 {
517 removeStoppedGroup(e);
518
519 int maxConcurrentDownloads = optimizeConcurrentDownloads_
520 ? optimizeConcurrentDownloads()
521 : maxConcurrentDownloads_;
522
523 if (static_cast<size_t>(maxConcurrentDownloads) <= numActive_) {
524 return;
525 }
526 int count = 0;
527 int num = maxConcurrentDownloads - numActive_;
528 std::vector<std::shared_ptr<RequestGroup>> pending;
529
530 while (count < num && (uriListParser_ || !reservedGroups_.empty())) {
531 if (uriListParser_ && reservedGroups_.empty()) {
532 std::vector<std::shared_ptr<RequestGroup>> groups;
533 // May throw exception
534 bool ok = createRequestGroupFromUriListParser(groups, option_,
535 uriListParser_.get());
536 if (ok) {
537 appendReservedGroup(reservedGroups_, groups.begin(), groups.end());
538 }
539 else {
540 uriListParser_.reset();
541 if (reservedGroups_.empty()) {
542 break;
543 }
544 }
545 }
546 std::shared_ptr<RequestGroup> groupToAdd = *reservedGroups_.begin();
547 reservedGroups_.pop_front();
548 if ((keepRunning_ && groupToAdd->isPauseRequested()) ||
549 !groupToAdd->isDependencyResolved()) {
550 pending.push_back(groupToAdd);
551 continue;
552 }
553 // Drop pieceStorage here because paused download holds its
554 // reference.
555 groupToAdd->dropPieceStorage();
556 configureRequestGroup(groupToAdd);
557 groupToAdd->setRequestGroupMan(this);
558 groupToAdd->setState(RequestGroup::STATE_ACTIVE);
559 ++numActive_;
560 requestGroups_.push_back(groupToAdd->getGID(), groupToAdd);
561 try {
562 auto res = createInitialCommand(groupToAdd, e);
563 ++count;
564 if (res.empty()) {
565 requestQueueCheck();
566 }
567 else {
568 e->addCommand(std::move(res));
569 }
570 }
571 catch (RecoverableException& ex) {
572 A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, ex);
573 A2_LOG_DEBUG("Deleting temporal commands.");
574 groupToAdd->setLastErrorCode(ex.getErrorCode(), ex.what());
575 // We add groupToAdd to e later in order to it is processed in
576 // removeStoppedGroup().
577 requestQueueCheck();
578 }
579
580 util::executeHookByOptName(groupToAdd, e->getOption(),
581 PREF_ON_DOWNLOAD_START);
582 notifyDownloadEvent(EVENT_ON_DOWNLOAD_START, groupToAdd);
583 }
584 if (!pending.empty()) {
585 reservedGroups_.insert(reservedGroups_.begin(), RequestGroupKeyFunc(),
586 pending.begin(), pending.end());
587 }
588 if (count > 0) {
589 e->setNoWait(true);
590 e->setRefreshInterval(std::chrono::milliseconds(0));
591 A2_LOG_DEBUG(fmt("%d RequestGroup(s) added.", count));
592 }
593 }
594
save()595 void RequestGroupMan::save()
596 {
597 for (auto& rg : requestGroups_) {
598 if (rg->allDownloadFinished() &&
599 !rg->getDownloadContext()->isChecksumVerificationNeeded() &&
600 !rg->getOption()->getAsBool(PREF_FORCE_SAVE)) {
601 rg->removeControlFile();
602 }
603 else {
604 try {
605 rg->saveControlFile();
606 }
607 catch (RecoverableException& e) {
608 A2_LOG_ERROR_EX(EX_EXCEPTION_CAUGHT, e);
609 }
610 }
611 }
612 }
613
closeFile()614 void RequestGroupMan::closeFile()
615 {
616 for (auto& elem : requestGroups_) {
617 elem->closeFile();
618 }
619 }
620
getDownloadStat() const621 RequestGroupMan::DownloadStat RequestGroupMan::getDownloadStat() const
622 {
623 int finished = 0;
624 int error = removedErrorResult_;
625 int inprogress = 0;
626 int removed = 0;
627 error_code::Value lastError = removedLastErrorResult_;
628 for (auto& dr : downloadResults_) {
629
630 if (dr->belongsTo != 0) {
631 continue;
632 }
633 if (dr->result == error_code::FINISHED) {
634 ++finished;
635 }
636 else if (dr->result == error_code::IN_PROGRESS) {
637 ++inprogress;
638 }
639 else if (dr->result == error_code::REMOVED) {
640 ++removed;
641 }
642 else {
643 ++error;
644 lastError = dr->result;
645 }
646 }
647 return DownloadStat(error, inprogress, reservedGroups_.size(), lastError);
648 }
649
650 enum DownloadResultStatus {
651 A2_STATUS_OK,
652 A2_STATUS_INPR,
653 A2_STATUS_RM,
654 A2_STATUS_ERR
655 };
656
657 namespace {
getStatusStr(DownloadResultStatus status,bool useColor)658 const char* getStatusStr(DownloadResultStatus status, bool useColor)
659 {
660 // status string is formatted in 4 characters wide.
661 switch (status) {
662 case (A2_STATUS_OK):
663 if (useColor) {
664 return "\033[1;32mOK\033[0m ";
665 }
666 else {
667 return "OK ";
668 }
669 case (A2_STATUS_INPR):
670 if (useColor) {
671 return "\033[1;34mINPR\033[0m";
672 }
673 else {
674 return "INPR";
675 }
676 case (A2_STATUS_RM):
677 if (useColor) {
678 return "\033[1mRM\033[0m ";
679 }
680 else {
681 return "RM ";
682 }
683 case (A2_STATUS_ERR):
684 if (useColor) {
685 return "\033[1;31mERR\033[0m ";
686 }
687 else {
688 return "ERR ";
689 }
690 default:
691 return "";
692 }
693 }
694 } // namespace
695
showDownloadResults(OutputFile & o,bool full) const696 void RequestGroupMan::showDownloadResults(OutputFile& o, bool full) const
697 {
698 int pathRowSize = 55;
699 // Download Results:
700 // idx|stat|path/length
701 // ===+====+=======================================================================
702 o.printf("\n%s"
703 "\ngid |stat|avg speed |",
704 _("Download Results:"));
705 if (full) {
706 o.write(" %|path/URI"
707 "\n======+====+===========+===+");
708 pathRowSize -= 4;
709 }
710 else {
711 o.write("path/URI"
712 "\n======+====+===========+");
713 }
714 std::string line(pathRowSize, '=');
715 o.printf("%s\n", line.c_str());
716 bool useColor = o.supportsColor() && option_->getAsBool(PREF_ENABLE_COLOR);
717 int ok = 0;
718 int err = 0;
719 int inpr = 0;
720 int rm = 0;
721 for (auto& dr : downloadResults_) {
722
723 if (dr->belongsTo != 0) {
724 continue;
725 }
726 const char* status;
727 switch (dr->result) {
728 case error_code::FINISHED:
729 status = getStatusStr(A2_STATUS_OK, useColor);
730 ++ok;
731 break;
732 case error_code::IN_PROGRESS:
733 status = getStatusStr(A2_STATUS_INPR, useColor);
734 ++inpr;
735 break;
736 case error_code::REMOVED:
737 status = getStatusStr(A2_STATUS_RM, useColor);
738 ++rm;
739 break;
740 default:
741 status = getStatusStr(A2_STATUS_ERR, useColor);
742 ++err;
743 }
744 if (full) {
745 formatDownloadResultFull(o, status, dr);
746 }
747 else {
748 o.write(formatDownloadResult(status, dr).c_str());
749 o.write("\n");
750 }
751 }
752 if (ok > 0 || err > 0 || inpr > 0 || rm > 0) {
753 o.printf("\n%s\n", _("Status Legend:"));
754 if (ok > 0) {
755 o.write(_("(OK):download completed."));
756 }
757 if (err > 0) {
758 o.write(_("(ERR):error occurred."));
759 }
760 if (inpr > 0) {
761 o.write(_("(INPR):download in-progress."));
762 }
763 if (rm > 0) {
764 o.write(_("(RM):download removed."));
765 }
766 o.write("\n");
767 }
768 }
769
770 namespace {
formatDownloadResultCommon(std::ostream & o,const char * status,const std::shared_ptr<DownloadResult> & downloadResult)771 void formatDownloadResultCommon(
772 std::ostream& o, const char* status,
773 const std::shared_ptr<DownloadResult>& downloadResult)
774 {
775 o << std::setw(3) << downloadResult->gid->toAbbrevHex() << "|" << std::setw(4)
776 << status << "|";
777 if (downloadResult->sessionTime.count() > 0) {
778 o << std::setw(8)
779 << util::abbrevSize(downloadResult->sessionDownloadLength * 1000 /
780 downloadResult->sessionTime.count())
781 << "B/s";
782 }
783 else {
784 o << std::setw(11);
785 o << "n/a";
786 }
787 o << "|";
788 }
789 } // namespace
790
formatDownloadResultFull(OutputFile & out,const char * status,const std::shared_ptr<DownloadResult> & downloadResult) const791 void RequestGroupMan::formatDownloadResultFull(
792 OutputFile& out, const char* status,
793 const std::shared_ptr<DownloadResult>& downloadResult) const
794 {
795 BitfieldMan bt(downloadResult->pieceLength, downloadResult->totalLength);
796 bt.setBitfield(
797 reinterpret_cast<const unsigned char*>(downloadResult->bitfield.data()),
798 downloadResult->bitfield.size());
799 bool head = true;
800 const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
801 downloadResult->fileEntries;
802 for (auto& f : fileEntries) {
803 if (!f->isRequested()) {
804 continue;
805 }
806 std::stringstream o;
807 if (head) {
808 formatDownloadResultCommon(o, status, downloadResult);
809 head = false;
810 }
811 else {
812 o << " | | |";
813 }
814 if (f->getLength() == 0 || downloadResult->bitfield.empty()) {
815 o << " -|";
816 }
817 else {
818 int64_t completedLength =
819 bt.getOffsetCompletedLength(f->getOffset(), f->getLength());
820 o << std::setw(3) << 100 * completedLength / f->getLength() << "|";
821 }
822 writeFilePath(o, f, downloadResult->inMemoryDownload);
823 o << "\n";
824 out.write(o.str().c_str());
825 }
826 if (head) {
827 std::stringstream o;
828 formatDownloadResultCommon(o, status, downloadResult);
829 o << " -|n/a\n";
830 out.write(o.str().c_str());
831 }
832 }
833
formatDownloadResult(const char * status,const std::shared_ptr<DownloadResult> & downloadResult) const834 std::string RequestGroupMan::formatDownloadResult(
835 const char* status,
836 const std::shared_ptr<DownloadResult>& downloadResult) const
837 {
838 std::stringstream o;
839 formatDownloadResultCommon(o, status, downloadResult);
840 const std::vector<std::shared_ptr<FileEntry>>& fileEntries =
841 downloadResult->fileEntries;
842 writeFilePath(fileEntries.begin(), fileEntries.end(), o,
843 downloadResult->inMemoryDownload);
844 return o.str();
845 }
846
847 namespace {
848 template <typename StringInputIterator, typename FileEntryInputIterator>
sameFilePathExists(StringInputIterator sfirst,StringInputIterator slast,FileEntryInputIterator ffirst,FileEntryInputIterator flast)849 bool sameFilePathExists(StringInputIterator sfirst, StringInputIterator slast,
850 FileEntryInputIterator ffirst,
851 FileEntryInputIterator flast)
852 {
853 for (; ffirst != flast; ++ffirst) {
854 if (std::binary_search(sfirst, slast, (*ffirst)->getPath())) {
855 return true;
856 }
857 }
858 return false;
859 }
860 } // namespace
861
isSameFileBeingDownloaded(RequestGroup * requestGroup) const862 bool RequestGroupMan::isSameFileBeingDownloaded(
863 RequestGroup* requestGroup) const
864 {
865 // TODO it may be good to use dedicated method rather than use
866 // isPreLocalFileCheckEnabled
867 if (!requestGroup->isPreLocalFileCheckEnabled()) {
868 return false;
869 }
870 std::vector<std::string> files;
871 for (auto& rg : requestGroups_) {
872 if (rg.get() != requestGroup) {
873 const std::vector<std::shared_ptr<FileEntry>>& entries =
874 rg->getDownloadContext()->getFileEntries();
875 std::transform(entries.begin(), entries.end(), std::back_inserter(files),
876 std::mem_fn(&FileEntry::getPath));
877 }
878 }
879 std::sort(files.begin(), files.end());
880 const std::vector<std::shared_ptr<FileEntry>>& entries =
881 requestGroup->getDownloadContext()->getFileEntries();
882 return sameFilePathExists(files.begin(), files.end(), entries.begin(),
883 entries.end());
884 }
885
halt()886 void RequestGroupMan::halt()
887 {
888 for (auto& elem : requestGroups_) {
889 elem->setHaltRequested(true);
890 }
891 }
892
forceHalt()893 void RequestGroupMan::forceHalt()
894 {
895 for (auto& elem : requestGroups_) {
896 elem->setForceHaltRequested(true);
897 }
898 }
899
calculateStat()900 TransferStat RequestGroupMan::calculateStat()
901 {
902 // TODO Currently, all time upload length is not set.
903 return netStat_.toTransferStat();
904 }
905
906 std::shared_ptr<DownloadResult>
findDownloadResult(a2_gid_t gid) const907 RequestGroupMan::findDownloadResult(a2_gid_t gid) const
908 {
909 return downloadResults_.get(gid);
910 }
911
removeDownloadResult(a2_gid_t gid)912 bool RequestGroupMan::removeDownloadResult(a2_gid_t gid)
913 {
914 return downloadResults_.remove(gid);
915 }
916
addDownloadResult(const std::shared_ptr<DownloadResult> & dr)917 void RequestGroupMan::addDownloadResult(
918 const std::shared_ptr<DownloadResult>& dr)
919 {
920 ++numStoppedTotal_;
921 bool rv = downloadResults_.push_back(dr->gid->getNumericId(), dr);
922 assert(rv);
923 while (downloadResults_.size() > maxDownloadResult_) {
924 // Save last encountered error code so that we can report it
925 // later.
926 const auto& dr = downloadResults_[0];
927 if (dr->belongsTo == 0 && dr->result != error_code::FINISHED) {
928 removedLastErrorResult_ = dr->result;
929 ++removedErrorResult_;
930
931 // Keep unfinished download result, so that we can save them by
932 // SessionSerializer.
933 if (option_->getAsBool(PREF_KEEP_UNFINISHED_DOWNLOAD_RESULT)) {
934 if (dr->result != error_code::REMOVED ||
935 dr->option->getAsBool(PREF_FORCE_SAVE)) {
936 unfinishedDownloadResults_.push_back(dr);
937 }
938 }
939 }
940 downloadResults_.pop_front();
941 }
942 }
943
purgeDownloadResult()944 void RequestGroupMan::purgeDownloadResult() { downloadResults_.clear(); }
945
946 std::shared_ptr<ServerStat>
findServerStat(const std::string & hostname,const std::string & protocol) const947 RequestGroupMan::findServerStat(const std::string& hostname,
948 const std::string& protocol) const
949 {
950 return serverStatMan_->find(hostname, protocol);
951 }
952
953 std::shared_ptr<ServerStat>
getOrCreateServerStat(const std::string & hostname,const std::string & protocol)954 RequestGroupMan::getOrCreateServerStat(const std::string& hostname,
955 const std::string& protocol)
956 {
957 std::shared_ptr<ServerStat> ss = findServerStat(hostname, protocol);
958 if (!ss) {
959 ss = std::make_shared<ServerStat>(hostname, protocol);
960 addServerStat(ss);
961 }
962 return ss;
963 }
964
addServerStat(const std::shared_ptr<ServerStat> & serverStat)965 bool RequestGroupMan::addServerStat(
966 const std::shared_ptr<ServerStat>& serverStat)
967 {
968 return serverStatMan_->add(serverStat);
969 }
970
loadServerStat(const std::string & filename)971 bool RequestGroupMan::loadServerStat(const std::string& filename)
972 {
973 return serverStatMan_->load(filename);
974 }
975
saveServerStat(const std::string & filename) const976 bool RequestGroupMan::saveServerStat(const std::string& filename) const
977 {
978 return serverStatMan_->save(filename);
979 }
980
removeStaleServerStat(const std::chrono::seconds & timeout)981 void RequestGroupMan::removeStaleServerStat(const std::chrono::seconds& timeout)
982 {
983 serverStatMan_->removeStaleServerStat(timeout);
984 }
985
doesOverallDownloadSpeedExceed()986 bool RequestGroupMan::doesOverallDownloadSpeedExceed()
987 {
988 return maxOverallDownloadSpeedLimit_ > 0 &&
989 maxOverallDownloadSpeedLimit_ < netStat_.calculateDownloadSpeed();
990 }
991
doesOverallUploadSpeedExceed()992 bool RequestGroupMan::doesOverallUploadSpeedExceed()
993 {
994 return maxOverallUploadSpeedLimit_ > 0 &&
995 maxOverallUploadSpeedLimit_ < netStat_.calculateUploadSpeed();
996 }
997
getUsedHosts(std::vector<std::pair<size_t,std::string>> & usedHosts)998 void RequestGroupMan::getUsedHosts(
999 std::vector<std::pair<size_t, std::string>>& usedHosts)
1000 {
1001 // vector of tuple which consists of use count, -download speed,
1002 // hostname. We want to sort by least used and faster download
1003 // speed. We use -download speed so that we can sort them using
1004 // operator<().
1005 std::vector<std::tuple<size_t, int, std::string>> tempHosts;
1006 for (const auto& rg : requestGroups_) {
1007 const auto& inFlightReqs =
1008 rg->getDownloadContext()->getFirstFileEntry()->getInFlightRequests();
1009 for (const auto& req : inFlightReqs) {
1010 uri_split_result us;
1011 if (uri_split(&us, req->getUri().c_str()) == 0) {
1012 std::string host =
1013 uri::getFieldString(us, USR_HOST, req->getUri().c_str());
1014 auto k = tempHosts.begin();
1015 auto eok = tempHosts.end();
1016 for (; k != eok; ++k) {
1017 if (std::get<2>(*k) == host) {
1018 ++std::get<0>(*k);
1019 break;
1020 }
1021 }
1022 if (k == eok) {
1023 std::string protocol =
1024 uri::getFieldString(us, USR_SCHEME, req->getUri().c_str());
1025 auto ss = findServerStat(host, protocol);
1026 int invDlSpeed = (ss && ss->isOK())
1027 ? -(static_cast<int>(ss->getDownloadSpeed()))
1028 : 0;
1029 tempHosts.emplace_back(1, invDlSpeed, host);
1030 }
1031 }
1032 }
1033 }
1034 std::sort(tempHosts.begin(), tempHosts.end());
1035 std::transform(tempHosts.begin(), tempHosts.end(),
1036 std::back_inserter(usedHosts),
1037 [](const std::tuple<size_t, int, std::string>& x) {
1038 return std::make_pair(std::get<0>(x), std::get<2>(x));
1039 });
1040 }
1041
setUriListParser(const std::shared_ptr<UriListParser> & uriListParser)1042 void RequestGroupMan::setUriListParser(
1043 const std::shared_ptr<UriListParser>& uriListParser)
1044 {
1045 uriListParser_ = uriListParser;
1046 }
1047
initWrDiskCache()1048 void RequestGroupMan::initWrDiskCache()
1049 {
1050 assert(!wrDiskCache_);
1051 size_t limit = option_->getAsInt(PREF_DISK_CACHE);
1052 if (limit > 0) {
1053 wrDiskCache_ = make_unique<WrDiskCache>(limit);
1054 }
1055 }
1056
decreaseNumActive()1057 void RequestGroupMan::decreaseNumActive()
1058 {
1059 assert(numActive_ > 0);
1060 --numActive_;
1061 }
1062
optimizeConcurrentDownloads()1063 int RequestGroupMan::optimizeConcurrentDownloads()
1064 {
1065 // gauge the current speed
1066 int currentSpeed = getNetStat().calculateDownloadSpeed();
1067
1068 const auto& now = global::wallclock();
1069 if (currentSpeed >= optimizationSpeed_) {
1070 optimizationSpeed_ = currentSpeed;
1071 optimizationSpeedTimer_ = now;
1072 }
1073 else if (std::chrono::duration_cast<std::chrono::seconds>(
1074 optimizationSpeedTimer_.difference(now)) >= 5_s) {
1075 // we keep using the reference speed for minimum 5 seconds so reset the
1076 // timer
1077 optimizationSpeedTimer_ = now;
1078
1079 // keep the reference speed as long as the speed tends to augment or to
1080 // maintain itself within 10%
1081 if (currentSpeed >= 1.1 * getNetStat().calculateNewestDownloadSpeed(5)) {
1082 // else assume a possible congestion and record a new optimization speed
1083 // by dichotomy
1084 optimizationSpeed_ = (optimizationSpeed_ + currentSpeed) / 2.;
1085 }
1086 }
1087
1088 if (optimizationSpeed_ <= 0) {
1089 return optimizeConcurrentDownloadsCoeffA_;
1090 }
1091
1092 // apply the rule
1093 if ((maxOverallDownloadSpeedLimit_ > 0) &&
1094 (optimizationSpeed_ > maxOverallDownloadSpeedLimit_)) {
1095 optimizationSpeed_ = maxOverallDownloadSpeedLimit_;
1096 }
1097 int maxConcurrentDownloads =
1098 ceil(optimizeConcurrentDownloadsCoeffA_ +
1099 optimizeConcurrentDownloadsCoeffB_ *
1100 log10(optimizationSpeed_ * 8. / 1000000.));
1101
1102 // bring the value in bound between 1 and the defined maximum
1103 maxConcurrentDownloads =
1104 std::min(std::max(1, maxConcurrentDownloads), maxConcurrentDownloads_);
1105
1106 A2_LOG_DEBUG(
1107 fmt("Max concurrent downloads optimized at %d (%lu currently active) "
1108 "[optimization speed %sB/s, current speed %sB/s]",
1109 maxConcurrentDownloads, static_cast<unsigned long>(numActive_),
1110 util::abbrevSize(optimizationSpeed_).c_str(),
1111 util::abbrevSize(currentSpeed).c_str()));
1112
1113 return maxConcurrentDownloads;
1114 }
1115 } // namespace aria2
1116