1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "components/download/public/common/download_file_impl.h"
6
7 #include <algorithm>
8 #include <string>
9 #include <utility>
10
11 #include "base/bind.h"
12 #include "base/files/file_util.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/task/post_task.h"
15 #include "base/threading/sequenced_task_runner_handle.h"
16 #include "base/threading/thread_task_runner_handle.h"
17 #include "base/time/time.h"
18 #include "base/timer/timer.h"
19 #include "base/values.h"
20 #include "build/build_config.h"
21 #include "components/download/internal/common/parallel_download_utils.h"
22 #include "components/download/public/common/download_create_info.h"
23 #include "components/download/public/common/download_destination_observer.h"
24 #include "components/download/public/common/download_features.h"
25 #include "components/download/public/common/download_interrupt_reasons_utils.h"
26 #include "components/download/public/common/download_stats.h"
27 #include "crypto/secure_hash.h"
28 #include "crypto/sha2.h"
29 #include "mojo/public/c/system/types.h"
30 #include "net/base/io_buffer.h"
31
32 #if defined(OS_ANDROID)
33 #include "base/android/content_uri_utils.h"
34 #include "components/download/internal/common/android/download_collection_bridge.h"
35 #endif // defined(OS_ANDROID)
36
37 namespace download {
38
39 namespace {
40
41 const int kUpdatePeriodMs = 500;
42 const int kMaxTimeBlockingFileThreadMs = 1000;
43
44 // These constants control the default retry behavior for failing renames. Each
45 // retry is performed after a delay that is twice the previous delay. The
46 // initial delay is specified by kInitialRenameRetryDelayMs.
47 const int kInitialRenameRetryDelayMs = 200;
48
49 // Number of times a failing rename is retried before giving up.
50 const int kMaxRenameRetries = 3;
51
52 // Because DownloadSaveInfo::kLengthFullContent is 0, we should avoid using
53 // 0 for length if we found that a stream can no longer write any data.
54 const int kNoBytesToWrite = -1;
55
56 // Default content length when the potential file size is not yet determined.
57 const int kUnknownContentLength = -1;
58
59 } // namespace
60
SourceStream(int64_t offset,int64_t starting_file_write_offset,std::unique_ptr<InputStream> stream)61 DownloadFileImpl::SourceStream::SourceStream(
62 int64_t offset,
63 int64_t starting_file_write_offset,
64 std::unique_ptr<InputStream> stream)
65 : offset_(offset),
66 length_(DownloadSaveInfo::kLengthFullContent),
67 starting_file_write_offset_(starting_file_write_offset),
68 bytes_read_(0),
69 bytes_written_(0),
70 finished_(false),
71 index_(0u),
72 input_stream_(std::move(stream)) {
73 CHECK_LE(offset_, starting_file_write_offset_);
74 CHECK_GE(offset_, 0);
75 }
76
77 DownloadFileImpl::SourceStream::~SourceStream() = default;
78
Initialize()79 void DownloadFileImpl::SourceStream::Initialize() {
80 input_stream_->Initialize();
81 }
82
OnBytesConsumed(int64_t bytes_read,int64_t bytes_written)83 void DownloadFileImpl::SourceStream::OnBytesConsumed(int64_t bytes_read,
84 int64_t bytes_written) {
85 CHECK_GE(bytes_read, bytes_written);
86 bytes_read_ += bytes_read;
87 bytes_written_ += bytes_written;
88 }
89
TruncateLengthWithWrittenDataBlock(int64_t received_slice_offset,int64_t bytes_written)90 void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
91 int64_t received_slice_offset,
92 int64_t bytes_written) {
93 DCHECK_GT(bytes_written, 0);
94 if (length_ == kNoBytesToWrite)
95 return;
96
97 if (received_slice_offset <= starting_file_write_offset_) {
98 // If validation has completed, mark the stream as finished if the file
99 // write position already has data.
100 if (received_slice_offset + bytes_written > starting_file_write_offset_ &&
101 GetRemainingBytesToValidate() == 0) {
102 length_ = kNoBytesToWrite;
103 finished_ = true;
104 }
105 return;
106 }
107
108 if (length_ == DownloadSaveInfo::kLengthFullContent ||
109 (length_ > received_slice_offset - offset_ &&
110 length_ > starting_file_write_offset_ - offset_)) {
111 // Stream length should always include the validation data, unless the
112 // response is too short.
113 length_ =
114 std::max(received_slice_offset, starting_file_write_offset_) - offset_;
115 }
116 }
117
RegisterDataReadyCallback(const mojo::SimpleWatcher::ReadyCallback & callback)118 void DownloadFileImpl::SourceStream::RegisterDataReadyCallback(
119 const mojo::SimpleWatcher::ReadyCallback& callback) {
120 input_stream_->RegisterDataReadyCallback(callback);
121 }
122
ClearDataReadyCallback()123 void DownloadFileImpl::SourceStream::ClearDataReadyCallback() {
124 read_stream_callback_.Cancel();
125 input_stream_->ClearDataReadyCallback();
126 }
127
GetCompletionStatus() const128 DownloadInterruptReason DownloadFileImpl::SourceStream::GetCompletionStatus()
129 const {
130 return input_stream_->GetCompletionStatus();
131 }
132
RegisterCompletionCallback(DownloadFileImpl::SourceStream::CompletionCallback callback)133 void DownloadFileImpl::SourceStream::RegisterCompletionCallback(
134 DownloadFileImpl::SourceStream::CompletionCallback callback) {
135 input_stream_->RegisterCompletionCallback(
136 base::BindOnce(std::move(callback), base::Unretained(this)));
137 }
138
Read(scoped_refptr<net::IOBuffer> * data,size_t * length)139 InputStream::StreamState DownloadFileImpl::SourceStream::Read(
140 scoped_refptr<net::IOBuffer>* data,
141 size_t* length) {
142 return input_stream_->Read(data, length);
143 }
144
GetRemainingBytesToValidate()145 size_t DownloadFileImpl::SourceStream::GetRemainingBytesToValidate() {
146 int64_t bytes_remaining = starting_file_write_offset_ - offset_ - bytes_read_;
147 return bytes_remaining < 0 ? 0 : bytes_remaining;
148 }
149
DownloadFileImpl(std::unique_ptr<DownloadSaveInfo> save_info,const base::FilePath & default_download_directory,std::unique_ptr<InputStream> stream,uint32_t download_id,base::WeakPtr<DownloadDestinationObserver> observer)150 DownloadFileImpl::DownloadFileImpl(
151 std::unique_ptr<DownloadSaveInfo> save_info,
152 const base::FilePath& default_download_directory,
153 std::unique_ptr<InputStream> stream,
154 uint32_t download_id,
155 base::WeakPtr<DownloadDestinationObserver> observer)
156 : file_(download_id),
157 save_info_(std::move(save_info)),
158 default_download_directory_(default_download_directory),
159 potential_file_length_(kUnknownContentLength),
160 bytes_seen_(0),
161 num_active_streams_(0),
162 record_stream_bandwidth_(false),
163 bytes_seen_with_parallel_streams_(0),
164 bytes_seen_without_parallel_streams_(0),
165 is_paused_(false),
166 download_id_(download_id),
167 main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
168 observer_(observer) {
169 TRACE_EVENT_INSTANT0("download", "DownloadFileCreated",
170 TRACE_EVENT_SCOPE_THREAD);
171 TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("download", "DownloadFileActive",
172 download_id);
173
174 source_streams_[save_info_->offset] = std::make_unique<SourceStream>(
175 save_info_->offset, save_info_->GetStartingFileWriteOffset(),
176 std::move(stream));
177
178 DETACH_FROM_SEQUENCE(sequence_checker_);
179 }
180
~DownloadFileImpl()181 DownloadFileImpl::~DownloadFileImpl() {
182 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183
184 TRACE_EVENT_NESTABLE_ASYNC_END0("download", "DownloadFileActive",
185 download_id_);
186 }
187
Initialize(InitializeCallback initialize_callback,CancelRequestCallback cancel_request_callback,const DownloadItem::ReceivedSlices & received_slices,bool is_parallelizable)188 void DownloadFileImpl::Initialize(
189 InitializeCallback initialize_callback,
190 CancelRequestCallback cancel_request_callback,
191 const DownloadItem::ReceivedSlices& received_slices,
192 bool is_parallelizable) {
193 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
194
195 update_timer_.reset(new base::RepeatingTimer());
196 int64_t bytes_so_far = 0;
197 cancel_request_callback_ = cancel_request_callback;
198 received_slices_ = received_slices;
199 if (!task_runner_)
200 task_runner_ = base::SequencedTaskRunnerHandle::Get();
201
202 // If the last slice is finished, then we know the actual content size.
203 if (!received_slices_.empty() && received_slices_.back().finished) {
204 SetPotentialFileLength(received_slices_.back().offset +
205 received_slices_.back().received_bytes);
206 }
207
208 if (IsSparseFile()) {
209 for (const auto& received_slice : received_slices_)
210 bytes_so_far += received_slice.received_bytes;
211 slice_to_download_ = FindSlicesToDownload(received_slices_);
212
213 } else {
214 bytes_so_far = save_info_->GetStartingFileWriteOffset();
215 }
216 int64_t bytes_wasted = 0;
217 DownloadInterruptReason reason = file_.Initialize(
218 save_info_->file_path, default_download_directory_,
219 std::move(save_info_->file), bytes_so_far,
220 save_info_->hash_of_partial_file, std::move(save_info_->hash_state),
221 IsSparseFile(), &bytes_wasted);
222 if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
223 main_task_runner_->PostTask(
224 FROM_HERE,
225 base::BindOnce(std::move(initialize_callback), reason, bytes_wasted));
226 return;
227 }
228 download_start_ = base::TimeTicks::Now();
229 last_update_time_ = download_start_;
230 record_stream_bandwidth_ = is_parallelizable;
231
232 // Primarily to make reset to zero in restart visible to owner.
233 SendUpdate();
234
235 main_task_runner_->PostTask(
236 FROM_HERE, base::BindOnce(std::move(initialize_callback),
237 DOWNLOAD_INTERRUPT_REASON_NONE, bytes_wasted));
238
239 // Initial pull from the straw from all source streams.
240 for (auto& source_stream : source_streams_)
241 RegisterAndActivateStream(source_stream.second.get());
242 }
243
AddInputStream(std::unique_ptr<InputStream> stream,int64_t offset)244 void DownloadFileImpl::AddInputStream(std::unique_ptr<InputStream> stream,
245 int64_t offset) {
246 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
247
248 // UI thread may not be notified about completion and detach download file,
249 // clear up the network request.
250 if (IsDownloadCompleted()) {
251 CancelRequest(offset);
252 return;
253 }
254 DCHECK(source_streams_.find(offset) == source_streams_.end());
255 source_streams_[offset] =
256 std::make_unique<SourceStream>(offset, offset, std::move(stream));
257 OnSourceStreamAdded(source_streams_[offset].get());
258 }
259
OnSourceStreamAdded(SourceStream * source_stream)260 void DownloadFileImpl::OnSourceStreamAdded(SourceStream* source_stream) {
261 // There are writers at different offsets now, create the received slices
262 // vector if necessary.
263 if (received_slices_.empty() && TotalBytesReceived() > 0) {
264 size_t index = AddOrMergeReceivedSliceIntoSortedArray(
265 DownloadItem::ReceivedSlice(0, TotalBytesReceived()), received_slices_);
266 DCHECK_EQ(index, 0u);
267 }
268 // If the file is initialized, start to write data, or wait until file opened.
269 if (file_.in_progress())
270 RegisterAndActivateStream(source_stream);
271 }
272
ValidateAndWriteDataToFile(int64_t offset,const char * data,size_t bytes_to_validate,size_t bytes_to_write)273 DownloadInterruptReason DownloadFileImpl::ValidateAndWriteDataToFile(
274 int64_t offset,
275 const char* data,
276 size_t bytes_to_validate,
277 size_t bytes_to_write) {
278 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
279 // Check if some of the data is for validation purpose.
280 if (bytes_to_validate > 0 &&
281 !file_.ValidateDataInFile(offset, data, bytes_to_validate)) {
282 return DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH;
283 }
284 // If there is no data to write, just return DOWNLOAD_INTERRUPT_REASON_NONE
285 // and read the next chunk.
286 if (bytes_to_write <= 0)
287 return DOWNLOAD_INTERRUPT_REASON_NONE;
288 // Write the remaining data to disk.
289 WillWriteToDisk(bytes_to_write);
290 return file_.WriteDataToFile(offset + bytes_to_validate,
291 data + bytes_to_validate, bytes_to_write);
292 }
293
CalculateBytesToWrite(SourceStream * source_stream,size_t bytes_available_to_write,size_t * bytes_to_validate,size_t * bytes_to_write)294 bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
295 size_t bytes_available_to_write,
296 size_t* bytes_to_validate,
297 size_t* bytes_to_write) {
298 *bytes_to_validate = 0;
299 if (source_stream->length() == kNoBytesToWrite) {
300 *bytes_to_write = 0;
301 return true;
302 }
303
304 // First calculate the number of bytes to validate.
305 *bytes_to_write = bytes_available_to_write;
306 size_t remaining_bytes_to_validate =
307 source_stream->GetRemainingBytesToValidate();
308 if (remaining_bytes_to_validate > 0) {
309 *bytes_to_validate =
310 std::min(remaining_bytes_to_validate, bytes_available_to_write);
311 *bytes_to_write -= *bytes_to_validate;
312 }
313 if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
314 source_stream->bytes_read() +
315 static_cast<int64_t>(bytes_available_to_write) >
316 source_stream->length()) {
317 // Total bytes to consume is capped by the length of the stream.
318 int64_t bytes_to_consume =
319 source_stream->length() - source_stream->bytes_read();
320 // The validation data should always be streamed.
321 DCHECK_GE(bytes_to_consume, static_cast<int64_t>(*bytes_to_validate));
322 *bytes_to_write = bytes_to_consume - *bytes_to_validate;
323 return true;
324 }
325
326 // If a new slice finds that its target position has already been written,
327 // terminate the stream if there are no bytes to validate.
328 if (source_stream->bytes_written() == 0 && *bytes_to_write > 0) {
329 for (const auto& received_slice : received_slices_) {
330 if (received_slice.offset <=
331 source_stream->starting_file_write_offset() &&
332 received_slice.offset + received_slice.received_bytes >
333 source_stream->starting_file_write_offset()) {
334 *bytes_to_write = 0;
335 return true;
336 }
337 }
338 }
339
340 return false;
341 }
342
RenameAndUniquify(const base::FilePath & full_path,RenameCompletionCallback callback)343 void DownloadFileImpl::RenameAndUniquify(const base::FilePath& full_path,
344 RenameCompletionCallback callback) {
345 std::unique_ptr<RenameParameters> parameters(
346 new RenameParameters(UNIQUIFY, full_path, std::move(callback)));
347 RenameWithRetryInternal(std::move(parameters));
348 }
349
RenameAndAnnotate(const base::FilePath & full_path,const std::string & client_guid,const GURL & source_url,const GURL & referrer_url,mojo::PendingRemote<quarantine::mojom::Quarantine> remote_quarantine,RenameCompletionCallback callback)350 void DownloadFileImpl::RenameAndAnnotate(
351 const base::FilePath& full_path,
352 const std::string& client_guid,
353 const GURL& source_url,
354 const GURL& referrer_url,
355 mojo::PendingRemote<quarantine::mojom::Quarantine> remote_quarantine,
356 RenameCompletionCallback callback) {
357 std::unique_ptr<RenameParameters> parameters(new RenameParameters(
358 ANNOTATE_WITH_SOURCE_INFORMATION, full_path, std::move(callback)));
359 parameters->client_guid = client_guid;
360 parameters->source_url = source_url;
361 parameters->referrer_url = referrer_url;
362 parameters->remote_quarantine = std::move(remote_quarantine);
363 RenameWithRetryInternal(std::move(parameters));
364 }
365
366 #if defined(OS_ANDROID)
RenameToIntermediateUri(const GURL & original_url,const GURL & referrer_url,const base::FilePath & file_name,const std::string & mime_type,const base::FilePath & current_path,RenameCompletionCallback callback)367 void DownloadFileImpl::RenameToIntermediateUri(
368 const GURL& original_url,
369 const GURL& referrer_url,
370 const base::FilePath& file_name,
371 const std::string& mime_type,
372 const base::FilePath& current_path,
373 RenameCompletionCallback callback) {
374 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
375 // Create new content URI if |current_path| is not content URI
376 // or if it is already deleted.
377 base::FilePath content_path =
378 current_path.IsContentUri() && base::ContentUriExists(current_path)
379 ? current_path
380 : DownloadCollectionBridge::CreateIntermediateUriForPublish(
381 original_url, referrer_url, file_name, mime_type);
382 DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_FILE_FAILED;
383 if (!content_path.empty()) {
384 reason = file_.Rename(content_path);
385 display_name_ = DownloadCollectionBridge::GetDisplayName(content_path);
386 }
387 if (display_name_.empty())
388 display_name_ = file_name;
389 OnRenameComplete(content_path, std::move(callback), reason);
390 }
391
PublishDownload(RenameCompletionCallback callback)392 void DownloadFileImpl::PublishDownload(RenameCompletionCallback callback) {
393 DownloadInterruptReason reason = file_.PublishDownload();
394 OnRenameComplete(file_.full_path(), std::move(callback), reason);
395 }
396
GetDisplayName()397 base::FilePath DownloadFileImpl::GetDisplayName() {
398 return display_name_;
399 }
400 #endif // defined(OS_ANDROID)
401
GetRetryDelayForFailedRename(int attempt_number)402 base::TimeDelta DownloadFileImpl::GetRetryDelayForFailedRename(
403 int attempt_number) {
404 DCHECK_GE(attempt_number, 0);
405 // |delay| starts at kInitialRenameRetryDelayMs and increases by a factor of
406 // 2 at each subsequent retry. Assumes that |retries_left| starts at
407 // kMaxRenameRetries. Also assumes that kMaxRenameRetries is less than the
408 // number of bits in an int.
409 return base::TimeDelta::FromMilliseconds(kInitialRenameRetryDelayMs) *
410 (1 << attempt_number);
411 }
412
ShouldRetryFailedRename(DownloadInterruptReason reason)413 bool DownloadFileImpl::ShouldRetryFailedRename(DownloadInterruptReason reason) {
414 return reason == DOWNLOAD_INTERRUPT_REASON_FILE_TRANSIENT_ERROR;
415 }
416
HandleStreamCompletionStatus(SourceStream * source_stream)417 DownloadInterruptReason DownloadFileImpl::HandleStreamCompletionStatus(
418 SourceStream* source_stream) {
419 DownloadInterruptReason reason = source_stream->GetCompletionStatus();
420 if (source_stream->length() == DownloadSaveInfo::kLengthFullContent &&
421 !received_slices_.empty() &&
422 (source_stream->starting_file_write_offset() ==
423 received_slices_.back().offset +
424 received_slices_.back().received_bytes) &&
425 reason == DOWNLOAD_INTERRUPT_REASON_SERVER_NO_RANGE) {
426 // We are probably reaching the end of the stream, don't treat this
427 // as an error.
428 return DOWNLOAD_INTERRUPT_REASON_NONE;
429 }
430 return reason;
431 }
432
RenameWithRetryInternal(std::unique_ptr<RenameParameters> parameters)433 void DownloadFileImpl::RenameWithRetryInternal(
434 std::unique_ptr<RenameParameters> parameters) {
435 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
436
437 base::FilePath new_path = parameters->new_path;
438
439 if ((parameters->option & UNIQUIFY) && new_path != file_.full_path()) {
440 new_path = base::GetUniquePath(new_path);
441 }
442
443 DownloadInterruptReason reason = file_.Rename(new_path);
444
445 // Attempt to retry the rename if possible. If the rename failed and the
446 // subsequent open also failed, then in_progress() would be false. We don't
447 // try to retry renames if the in_progress() was false to begin with since we
448 // have less assurance that the file at file_.full_path() was the one we were
449 // working with.
450 if (ShouldRetryFailedRename(reason) && file_.in_progress() &&
451 parameters->retries_left > 0) {
452 int attempt_number = kMaxRenameRetries - parameters->retries_left;
453 --parameters->retries_left;
454 if (parameters->time_of_first_failure.is_null())
455 parameters->time_of_first_failure = base::TimeTicks::Now();
456 task_runner_->PostDelayedTask(
457 FROM_HERE,
458 base::BindOnce(&DownloadFileImpl::RenameWithRetryInternal,
459 weak_factory_.GetWeakPtr(), std::move(parameters)),
460 GetRetryDelayForFailedRename(attempt_number));
461 return;
462 }
463
464 if (reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
465 (parameters->option & ANNOTATE_WITH_SOURCE_INFORMATION)) {
466 // Doing the annotation after the rename rather than before leaves
467 // a very small window during which the file has the final name but
468 // hasn't been marked with the Mark Of The Web. However, it allows
469 // anti-virus scanners on Windows to actually see the data
470 // (http://crbug.com/127999) under the correct name (which is information
471 // it uses).
472 //
473 // If concurrent downloads with the same target path are allowed, an
474 // asynchronous quarantine file may cause a file to be stamped with
475 // incorrect mark-of-the-web data. Therefore, fall back to non-service
476 // QuarantineFile when kPreventDownloadsWithSamePath is disabled.
477 file_.AnnotateWithSourceInformation(
478 parameters->client_guid, parameters->source_url,
479 parameters->referrer_url, std::move(parameters->remote_quarantine),
480 base::BindOnce(&DownloadFileImpl::OnRenameComplete,
481 weak_factory_.GetWeakPtr(), new_path,
482 std::move(parameters->completion_callback)));
483 return;
484 }
485
486 OnRenameComplete(new_path, std::move(parameters->completion_callback),
487 reason);
488 }
489
OnRenameComplete(const base::FilePath & new_path,RenameCompletionCallback callback,DownloadInterruptReason reason)490 void DownloadFileImpl::OnRenameComplete(const base::FilePath& new_path,
491 RenameCompletionCallback callback,
492 DownloadInterruptReason reason) {
493 if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
494 // Make sure our information is updated, since we're about to
495 // error out.
496 SendUpdate();
497
498 // Null out callback so that we don't do any more stream processing.
499 // The request that writes to the pipe should be canceled after
500 // the download being interrupted.
501 for (auto& stream : source_streams_)
502 stream.second->ClearDataReadyCallback();
503 }
504
505 main_task_runner_->PostTask(
506 FROM_HERE, base::BindOnce(std::move(callback), reason,
507 reason == DOWNLOAD_INTERRUPT_REASON_NONE
508 ? new_path
509 : base::FilePath()));
510 }
511
Detach()512 void DownloadFileImpl::Detach() {
513 file_.Detach();
514 }
515
Cancel()516 void DownloadFileImpl::Cancel() {
517 file_.Cancel();
518 }
519
SetPotentialFileLength(int64_t length)520 void DownloadFileImpl::SetPotentialFileLength(int64_t length) {
521 DCHECK(potential_file_length_ == length ||
522 potential_file_length_ == kUnknownContentLength)
523 << "Potential file length changed, the download might have updated.";
524
525 if (length < potential_file_length_ ||
526 potential_file_length_ == kUnknownContentLength) {
527 potential_file_length_ = length;
528 }
529
530 // TODO(qinmin): interrupt the download if the received bytes are larger
531 // than content length limit.
532 LOG_IF(ERROR, TotalBytesReceived() > potential_file_length_)
533 << "Received data is larger than the content length limit.";
534 }
535
FullPath() const536 const base::FilePath& DownloadFileImpl::FullPath() const {
537 return file_.full_path();
538 }
539
InProgress() const540 bool DownloadFileImpl::InProgress() const {
541 return file_.in_progress();
542 }
543
Pause()544 void DownloadFileImpl::Pause() {
545 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
546 is_paused_ = true;
547 record_stream_bandwidth_ = false;
548 for (auto& stream : source_streams_)
549 stream.second->ClearDataReadyCallback();
550 }
551
Resume()552 void DownloadFileImpl::Resume() {
553 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
554 DCHECK(is_paused_);
555 is_paused_ = false;
556
557 for (auto& stream : source_streams_) {
558 SourceStream* source_stream = stream.second.get();
559 if (!source_stream->is_finished())
560 StreamActive(source_stream, MOJO_RESULT_OK);
561 }
562 }
563
StreamActive(SourceStream * source_stream,MojoResult result)564 void DownloadFileImpl::StreamActive(SourceStream* source_stream,
565 MojoResult result) {
566 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
567 if (is_paused_)
568 return;
569
570 base::TimeTicks start(base::TimeTicks::Now());
571 base::TimeTicks now;
572 scoped_refptr<net::IOBuffer> incoming_data;
573 size_t incoming_data_size = 0;
574 size_t total_incoming_data_size = 0;
575 size_t num_buffers = 0;
576 size_t bytes_to_validate = 0;
577 size_t bytes_to_write = 0;
578 bool should_terminate = false;
579 InputStream::StreamState state(InputStream::EMPTY);
580 DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
581 base::TimeDelta delta(
582 base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
583 // Take care of any file local activity required.
584 do {
585 state = source_stream->Read(&incoming_data, &incoming_data_size);
586 switch (state) {
587 case InputStream::EMPTY:
588 should_terminate = (source_stream->length() == kNoBytesToWrite);
589 break;
590 case InputStream::HAS_DATA: {
591 ++num_buffers;
592 should_terminate =
593 CalculateBytesToWrite(source_stream, incoming_data_size,
594 &bytes_to_validate, &bytes_to_write);
595 DCHECK_GE(incoming_data_size, bytes_to_write);
596 reason = ValidateAndWriteDataToFile(
597 source_stream->offset() + source_stream->bytes_read(),
598 incoming_data->data(), bytes_to_validate, bytes_to_write);
599 bytes_seen_ += bytes_to_write;
600 total_incoming_data_size += incoming_data_size;
601 if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
602 int64_t prev_bytes_written = source_stream->bytes_written();
603 source_stream->OnBytesConsumed(incoming_data_size, bytes_to_write);
604 if (!IsSparseFile())
605 break;
606 // If the write operation creates a new slice, add it to the
607 // |received_slices_| and update all the entries in
608 // |source_streams_|.
609 if (bytes_to_write > 0 && prev_bytes_written == 0) {
610 AddNewSlice(source_stream->starting_file_write_offset(),
611 bytes_to_write);
612 } else {
613 received_slices_[source_stream->index()].received_bytes +=
614 bytes_to_write;
615 }
616 }
617 } break;
618 case InputStream::WAIT_FOR_COMPLETION:
619 source_stream->RegisterCompletionCallback(base::BindOnce(
620 &DownloadFileImpl::OnStreamCompleted, weak_factory_.GetWeakPtr()));
621 break;
622 case InputStream::COMPLETE:
623 break;
624 default:
625 NOTREACHED();
626 break;
627 }
628 now = base::TimeTicks::Now();
629 } while (state == InputStream::HAS_DATA &&
630 reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
631 !should_terminate);
632
633 // If we're stopping to yield the thread, post a task so we come back.
634 if (state == InputStream::HAS_DATA && now - start > delta &&
635 !should_terminate) {
636 source_stream->read_stream_callback()->Reset(base::BindOnce(
637 &DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
638 source_stream, MOJO_RESULT_OK));
639 task_runner_->PostTask(FROM_HERE,
640 source_stream->read_stream_callback()->callback());
641 } else if (state == InputStream::EMPTY && !should_terminate) {
642 source_stream->RegisterDataReadyCallback(
643 base::BindRepeating(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
644 source_stream));
645 }
646
647 if (state == InputStream::COMPLETE)
648 OnStreamCompleted(source_stream);
649 else
650 NotifyObserver(source_stream, reason, state, should_terminate);
651
652 TRACE_EVENT_INSTANT2("download", "DownloadStreamDrained",
653 TRACE_EVENT_SCOPE_THREAD, "stream_size",
654 total_incoming_data_size, "num_buffers", num_buffers);
655 }
656
OnStreamCompleted(SourceStream * source_stream)657 void DownloadFileImpl::OnStreamCompleted(SourceStream* source_stream) {
658 DownloadInterruptReason reason = HandleStreamCompletionStatus(source_stream);
659 SendUpdate();
660
661 NotifyObserver(source_stream, reason, InputStream::COMPLETE, false);
662 }
663
NotifyObserver(SourceStream * source_stream,DownloadInterruptReason reason,InputStream::StreamState stream_state,bool should_terminate)664 void DownloadFileImpl::NotifyObserver(SourceStream* source_stream,
665 DownloadInterruptReason reason,
666 InputStream::StreamState stream_state,
667 bool should_terminate) {
668 if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
669 HandleStreamError(source_stream, reason);
670 } else if (stream_state == InputStream::COMPLETE || should_terminate) {
671 // Signal successful completion or termination of the current stream.
672 source_stream->ClearDataReadyCallback();
673 source_stream->set_finished(true);
674
675 if (should_terminate)
676 CancelRequest(source_stream->offset());
677 if (source_stream->length() == DownloadSaveInfo::kLengthFullContent) {
678 // Mark received slice as finished.
679 if (IsSparseFile() && source_stream->bytes_written() > 0) {
680 DCHECK_GT(received_slices_.size(), source_stream->index())
681 << "Received slice index out of bound!";
682 received_slices_[source_stream->index()].finished = true;
683 }
684 if (!should_terminate) {
685 SetPotentialFileLength(source_stream->offset() +
686 source_stream->bytes_read());
687 }
688 }
689 num_active_streams_--;
690
691 // Inform observers.
692 SendUpdate();
693
694 // All the stream reader are completed, shut down file IO processing.
695 if (IsDownloadCompleted()) {
696 OnDownloadCompleted();
697 } else {
698 // If all the stream completes and we still not able to complete, trigger
699 // a content length mismatch error so auto resumption will be performed.
700 SendErrorUpdateIfFinished(
701 DOWNLOAD_INTERRUPT_REASON_SERVER_CONTENT_LENGTH_MISMATCH);
702 }
703 }
704 }
705
OnDownloadCompleted()706 void DownloadFileImpl::OnDownloadCompleted() {
707 RecordFileBandwidth(bytes_seen_, base::TimeTicks::Now() - download_start_);
708 if (record_stream_bandwidth_) {
709 RecordParallelizableDownloadStats(
710 bytes_seen_with_parallel_streams_, download_time_with_parallel_streams_,
711 bytes_seen_without_parallel_streams_,
712 download_time_without_parallel_streams_, IsSparseFile());
713 }
714 weak_factory_.InvalidateWeakPtrs();
715 std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
716 update_timer_.reset();
717 main_task_runner_->PostTask(
718 FROM_HERE,
719 base::BindOnce(&DownloadDestinationObserver::DestinationCompleted,
720 observer_, TotalBytesReceived(), std::move(hash_state)));
721 }
722
RegisterAndActivateStream(SourceStream * source_stream)723 void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
724 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
725
726 source_stream->Initialize();
727 // Truncate |source_stream|'s length if necessary.
728 for (const auto& received_slice : received_slices_) {
729 source_stream->TruncateLengthWithWrittenDataBlock(
730 received_slice.offset, received_slice.received_bytes);
731 }
732 num_active_streams_++;
733 StreamActive(source_stream, MOJO_RESULT_OK);
734 }
735
TotalBytesReceived() const736 int64_t DownloadFileImpl::TotalBytesReceived() const {
737 return file_.bytes_so_far();
738 }
739
SendUpdate()740 void DownloadFileImpl::SendUpdate() {
741 // TODO(qinmin): For each active stream, add the slice it has written so
742 // far along with received_slices_.
743 main_task_runner_->PostTask(
744 FROM_HERE,
745 base::BindOnce(&DownloadDestinationObserver::DestinationUpdate, observer_,
746 TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
747 received_slices_));
748 }
749
WillWriteToDisk(size_t data_len)750 void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
751 if (!update_timer_->IsRunning()) {
752 update_timer_->Start(FROM_HERE,
753 base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
754 this, &DownloadFileImpl::SendUpdate);
755 }
756 rate_estimator_.Increment(data_len);
757 base::TimeTicks now = base::TimeTicks::Now();
758 base::TimeDelta time_elapsed = (now - last_update_time_);
759 last_update_time_ = now;
760 if (num_active_streams_ > 1) {
761 download_time_with_parallel_streams_ += time_elapsed;
762 bytes_seen_with_parallel_streams_ += data_len;
763 } else {
764 download_time_without_parallel_streams_ += time_elapsed;
765 bytes_seen_without_parallel_streams_ += data_len;
766 }
767 }
768
AddNewSlice(int64_t offset,int64_t length)769 void DownloadFileImpl::AddNewSlice(int64_t offset, int64_t length) {
770 size_t index = AddOrMergeReceivedSliceIntoSortedArray(
771 DownloadItem::ReceivedSlice(offset, length), received_slices_);
772 // Check if the slice is added as a new slice, or merged with an existing one.
773 bool slice_added = (offset == received_slices_[index].offset);
774 // Update the index of exising SourceStreams.
775 for (auto& stream : source_streams_) {
776 SourceStream* source_stream = stream.second.get();
777 if (source_stream->starting_file_write_offset() > offset) {
778 if (slice_added && source_stream->bytes_written() > 0)
779 source_stream->set_index(source_stream->index() + 1);
780 } else if (source_stream->starting_file_write_offset() == offset) {
781 source_stream->set_index(index);
782 } else {
783 source_stream->TruncateLengthWithWrittenDataBlock(offset, length);
784 }
785 }
786 }
787
IsDownloadCompleted()788 bool DownloadFileImpl::IsDownloadCompleted() {
789 for (auto& stream : source_streams_) {
790 if (!stream.second->is_finished())
791 return false;
792 }
793
794 if (!IsSparseFile())
795 return true;
796
797 // Verify that all the file slices have been downloaded.
798 std::vector<DownloadItem::ReceivedSlice> slices_to_download =
799 FindSlicesToDownload(received_slices_);
800 if (slices_to_download.size() > 1) {
801 // If there are 1 or more holes in the file, download is not finished.
802 // Some streams might not have been added to |source_streams_| yet.
803 return false;
804 }
805 return TotalBytesReceived() == potential_file_length_;
806 }
807
HandleStreamError(SourceStream * source_stream,DownloadInterruptReason reason)808 void DownloadFileImpl::HandleStreamError(SourceStream* source_stream,
809 DownloadInterruptReason reason) {
810 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
811 source_stream->ClearDataReadyCallback();
812 source_stream->set_finished(true);
813 num_active_streams_--;
814
815 bool can_recover_from_error = false;
816 if (reason != DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH) {
817 // If previous stream has already written data at the starting offset of
818 // the error stream. The download can complete.
819 can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
820
821 // See if the previous stream can download the full content.
822 // If the current stream has written some data, length of all preceding
823 // streams will be truncated.
824 if (IsSparseFile() && !can_recover_from_error) {
825 SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
826 while (preceding_neighbor) {
827 if (CanRecoverFromError(source_stream, preceding_neighbor)) {
828 can_recover_from_error = true;
829 break;
830 }
831
832 // If the neighbor cannot recover the error and it has already created
833 // a slice, just interrupt the download.
834 if (preceding_neighbor->bytes_written() > 0)
835 break;
836 preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
837 }
838 }
839 }
840
841 SendUpdate(); // Make info up to date before error.
842
843 // If the download can recover from error, check if it already finishes.
844 // Otherwise, send an error update when all streams are finished.
845 if (!can_recover_from_error)
846 SendErrorUpdateIfFinished(reason);
847 else if (IsDownloadCompleted())
848 OnDownloadCompleted();
849 }
850
SendErrorUpdateIfFinished(DownloadInterruptReason reason)851 void DownloadFileImpl::SendErrorUpdateIfFinished(
852 DownloadInterruptReason reason) {
853 // If there are still active streams, let them finish before
854 // sending out the error to the observer.
855 if (num_active_streams_ > 0)
856 return;
857
858 if (IsSparseFile()) {
859 for (const auto& slice : slice_to_download_) {
860 // Ignore last slice beyond file length.
861 if (slice.offset > 0 && slice.offset == potential_file_length_)
862 continue;
863 if (source_streams_.find(slice.offset) == source_streams_.end())
864 return;
865 }
866 }
867
868 // Error case for both upstream source and file write.
869 // Shut down processing and signal an error to our observer.
870 // Our observer will clean us up.
871 weak_factory_.InvalidateWeakPtrs();
872 std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
873 main_task_runner_->PostTask(
874 FROM_HERE,
875 base::BindOnce(&DownloadDestinationObserver::DestinationError, observer_,
876 reason, TotalBytesReceived(), std::move(hash_state)));
877 }
878
IsSparseFile() const879 bool DownloadFileImpl::IsSparseFile() const {
880 return source_streams_.size() > 1 || !received_slices_.empty();
881 }
882
FindPrecedingNeighbor(SourceStream * source_stream)883 DownloadFileImpl::SourceStream* DownloadFileImpl::FindPrecedingNeighbor(
884 SourceStream* source_stream) {
885 int64_t max_preceding_offset = 0;
886 SourceStream* ret = nullptr;
887 for (auto& stream : source_streams_) {
888 int64_t offset = stream.second->starting_file_write_offset();
889 if (offset < source_stream->starting_file_write_offset() &&
890 offset >= max_preceding_offset) {
891 ret = stream.second.get();
892 max_preceding_offset = offset;
893 }
894 }
895 return ret;
896 }
897
CancelRequest(int64_t offset)898 void DownloadFileImpl::CancelRequest(int64_t offset) {
899 if (!cancel_request_callback_.is_null()) {
900 main_task_runner_->PostTask(
901 FROM_HERE, base::BindOnce(cancel_request_callback_, offset));
902 }
903 }
904
DebugStates() const905 void DownloadFileImpl::DebugStates() const {
906 DVLOG(1) << "### Debugging DownloadFile states:";
907 DVLOG(1) << "Total source stream count = " << source_streams_.size();
908 for (const auto& stream : source_streams_) {
909 DVLOG(1) << "Source stream, offset = " << stream.second->offset()
910 << " , bytes_read = " << stream.second->bytes_read()
911 << " , starting_file_write_offset = "
912 << stream.second->starting_file_write_offset()
913 << " , bytes_written = " << stream.second->bytes_written()
914 << " , is_finished = " << stream.second->is_finished()
915 << " , length = " << stream.second->length()
916 << ", index = " << stream.second->index();
917 }
918
919 DebugSlicesInfo(received_slices_);
920 }
921
SetTaskRunnerForTesting(scoped_refptr<base::SequencedTaskRunner> task_runner)922 void DownloadFileImpl::SetTaskRunnerForTesting(
923 scoped_refptr<base::SequencedTaskRunner> task_runner) {
924 task_runner_ = std::move(task_runner);
925 }
926
RenameParameters(RenameOption option,const base::FilePath & new_path,RenameCompletionCallback completion_callback)927 DownloadFileImpl::RenameParameters::RenameParameters(
928 RenameOption option,
929 const base::FilePath& new_path,
930 RenameCompletionCallback completion_callback)
931 : option(option),
932 new_path(new_path),
933 retries_left(kMaxRenameRetries),
934 completion_callback(std::move(completion_callback)) {}
935
~RenameParameters()936 DownloadFileImpl::RenameParameters::~RenameParameters() {}
937
938 } // namespace download
939