1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "components/download/public/common/download_file_impl.h"
6 
7 #include <algorithm>
8 #include <string>
9 #include <utility>
10 
11 #include "base/bind.h"
12 #include "base/files/file_util.h"
13 #include "base/strings/stringprintf.h"
14 #include "base/task/post_task.h"
15 #include "base/threading/sequenced_task_runner_handle.h"
16 #include "base/threading/thread_task_runner_handle.h"
17 #include "base/time/time.h"
18 #include "base/timer/timer.h"
19 #include "base/values.h"
20 #include "build/build_config.h"
21 #include "components/download/internal/common/parallel_download_utils.h"
22 #include "components/download/public/common/download_create_info.h"
23 #include "components/download/public/common/download_destination_observer.h"
24 #include "components/download/public/common/download_features.h"
25 #include "components/download/public/common/download_interrupt_reasons_utils.h"
26 #include "components/download/public/common/download_stats.h"
27 #include "crypto/secure_hash.h"
28 #include "crypto/sha2.h"
29 #include "mojo/public/c/system/types.h"
30 #include "net/base/io_buffer.h"
31 
32 #if defined(OS_ANDROID)
33 #include "base/android/content_uri_utils.h"
34 #include "components/download/internal/common/android/download_collection_bridge.h"
35 #endif  // defined(OS_ANDROID)
36 
37 namespace download {
38 
39 namespace {
40 
41 const int kUpdatePeriodMs = 500;
42 const int kMaxTimeBlockingFileThreadMs = 1000;
43 
44 // These constants control the default retry behavior for failing renames. Each
45 // retry is performed after a delay that is twice the previous delay. The
46 // initial delay is specified by kInitialRenameRetryDelayMs.
47 const int kInitialRenameRetryDelayMs = 200;
48 
49 // Number of times a failing rename is retried before giving up.
50 const int kMaxRenameRetries = 3;
51 
52 // Because DownloadSaveInfo::kLengthFullContent is 0, we should avoid using
53 // 0 for length if we found that a stream can no longer write any data.
54 const int kNoBytesToWrite = -1;
55 
56 // Default content length when the potential file size is not yet determined.
57 const int kUnknownContentLength = -1;
58 
59 }  // namespace
60 
SourceStream(int64_t offset,int64_t starting_file_write_offset,std::unique_ptr<InputStream> stream)61 DownloadFileImpl::SourceStream::SourceStream(
62     int64_t offset,
63     int64_t starting_file_write_offset,
64     std::unique_ptr<InputStream> stream)
65     : offset_(offset),
66       length_(DownloadSaveInfo::kLengthFullContent),
67       starting_file_write_offset_(starting_file_write_offset),
68       bytes_read_(0),
69       bytes_written_(0),
70       finished_(false),
71       index_(0u),
72       input_stream_(std::move(stream)) {
73   CHECK_LE(offset_, starting_file_write_offset_);
74   CHECK_GE(offset_, 0);
75 }
76 
77 DownloadFileImpl::SourceStream::~SourceStream() = default;
78 
Initialize()79 void DownloadFileImpl::SourceStream::Initialize() {
80   input_stream_->Initialize();
81 }
82 
OnBytesConsumed(int64_t bytes_read,int64_t bytes_written)83 void DownloadFileImpl::SourceStream::OnBytesConsumed(int64_t bytes_read,
84                                                      int64_t bytes_written) {
85   CHECK_GE(bytes_read, bytes_written);
86   bytes_read_ += bytes_read;
87   bytes_written_ += bytes_written;
88 }
89 
TruncateLengthWithWrittenDataBlock(int64_t received_slice_offset,int64_t bytes_written)90 void DownloadFileImpl::SourceStream::TruncateLengthWithWrittenDataBlock(
91     int64_t received_slice_offset,
92     int64_t bytes_written) {
93   DCHECK_GT(bytes_written, 0);
94   if (length_ == kNoBytesToWrite)
95     return;
96 
97   if (received_slice_offset <= starting_file_write_offset_) {
98     // If validation has completed, mark the stream as finished if the file
99     // write position already has data.
100     if (received_slice_offset + bytes_written > starting_file_write_offset_ &&
101         GetRemainingBytesToValidate() == 0) {
102       length_ = kNoBytesToWrite;
103       finished_ = true;
104     }
105     return;
106   }
107 
108   if (length_ == DownloadSaveInfo::kLengthFullContent ||
109       (length_ > received_slice_offset - offset_ &&
110        length_ > starting_file_write_offset_ - offset_)) {
111     // Stream length should always include the validation data, unless the
112     // response is too short.
113     length_ =
114         std::max(received_slice_offset, starting_file_write_offset_) - offset_;
115   }
116 }
117 
RegisterDataReadyCallback(const mojo::SimpleWatcher::ReadyCallback & callback)118 void DownloadFileImpl::SourceStream::RegisterDataReadyCallback(
119     const mojo::SimpleWatcher::ReadyCallback& callback) {
120   input_stream_->RegisterDataReadyCallback(callback);
121 }
122 
ClearDataReadyCallback()123 void DownloadFileImpl::SourceStream::ClearDataReadyCallback() {
124   read_stream_callback_.Cancel();
125   input_stream_->ClearDataReadyCallback();
126 }
127 
GetCompletionStatus() const128 DownloadInterruptReason DownloadFileImpl::SourceStream::GetCompletionStatus()
129     const {
130   return input_stream_->GetCompletionStatus();
131 }
132 
RegisterCompletionCallback(DownloadFileImpl::SourceStream::CompletionCallback callback)133 void DownloadFileImpl::SourceStream::RegisterCompletionCallback(
134     DownloadFileImpl::SourceStream::CompletionCallback callback) {
135   input_stream_->RegisterCompletionCallback(
136       base::BindOnce(std::move(callback), base::Unretained(this)));
137 }
138 
Read(scoped_refptr<net::IOBuffer> * data,size_t * length)139 InputStream::StreamState DownloadFileImpl::SourceStream::Read(
140     scoped_refptr<net::IOBuffer>* data,
141     size_t* length) {
142   return input_stream_->Read(data, length);
143 }
144 
GetRemainingBytesToValidate()145 size_t DownloadFileImpl::SourceStream::GetRemainingBytesToValidate() {
146   int64_t bytes_remaining = starting_file_write_offset_ - offset_ - bytes_read_;
147   return bytes_remaining < 0 ? 0 : bytes_remaining;
148 }
149 
DownloadFileImpl(std::unique_ptr<DownloadSaveInfo> save_info,const base::FilePath & default_download_directory,std::unique_ptr<InputStream> stream,uint32_t download_id,base::WeakPtr<DownloadDestinationObserver> observer)150 DownloadFileImpl::DownloadFileImpl(
151     std::unique_ptr<DownloadSaveInfo> save_info,
152     const base::FilePath& default_download_directory,
153     std::unique_ptr<InputStream> stream,
154     uint32_t download_id,
155     base::WeakPtr<DownloadDestinationObserver> observer)
156     : file_(download_id),
157       save_info_(std::move(save_info)),
158       default_download_directory_(default_download_directory),
159       potential_file_length_(kUnknownContentLength),
160       bytes_seen_(0),
161       num_active_streams_(0),
162       record_stream_bandwidth_(false),
163       bytes_seen_with_parallel_streams_(0),
164       bytes_seen_without_parallel_streams_(0),
165       is_paused_(false),
166       download_id_(download_id),
167       main_task_runner_(base::ThreadTaskRunnerHandle::Get()),
168       observer_(observer) {
169   TRACE_EVENT_INSTANT0("download", "DownloadFileCreated",
170                        TRACE_EVENT_SCOPE_THREAD);
171   TRACE_EVENT_NESTABLE_ASYNC_BEGIN0("download", "DownloadFileActive",
172                                     download_id);
173 
174   source_streams_[save_info_->offset] = std::make_unique<SourceStream>(
175       save_info_->offset, save_info_->GetStartingFileWriteOffset(),
176       std::move(stream));
177 
178   DETACH_FROM_SEQUENCE(sequence_checker_);
179 }
180 
~DownloadFileImpl()181 DownloadFileImpl::~DownloadFileImpl() {
182   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
183 
184   TRACE_EVENT_NESTABLE_ASYNC_END0("download", "DownloadFileActive",
185                                   download_id_);
186 }
187 
Initialize(InitializeCallback initialize_callback,CancelRequestCallback cancel_request_callback,const DownloadItem::ReceivedSlices & received_slices,bool is_parallelizable)188 void DownloadFileImpl::Initialize(
189     InitializeCallback initialize_callback,
190     CancelRequestCallback cancel_request_callback,
191     const DownloadItem::ReceivedSlices& received_slices,
192     bool is_parallelizable) {
193   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
194 
195   update_timer_.reset(new base::RepeatingTimer());
196   int64_t bytes_so_far = 0;
197   cancel_request_callback_ = cancel_request_callback;
198   received_slices_ = received_slices;
199   if (!task_runner_)
200     task_runner_ = base::SequencedTaskRunnerHandle::Get();
201 
202   // If the last slice is finished, then we know the actual content size.
203   if (!received_slices_.empty() && received_slices_.back().finished) {
204     SetPotentialFileLength(received_slices_.back().offset +
205                            received_slices_.back().received_bytes);
206   }
207 
208   if (IsSparseFile()) {
209     for (const auto& received_slice : received_slices_)
210       bytes_so_far += received_slice.received_bytes;
211     slice_to_download_ = FindSlicesToDownload(received_slices_);
212 
213   } else {
214     bytes_so_far = save_info_->GetStartingFileWriteOffset();
215   }
216   int64_t bytes_wasted = 0;
217   DownloadInterruptReason reason = file_.Initialize(
218       save_info_->file_path, default_download_directory_,
219       std::move(save_info_->file), bytes_so_far,
220       save_info_->hash_of_partial_file, std::move(save_info_->hash_state),
221       IsSparseFile(), &bytes_wasted);
222   if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
223     main_task_runner_->PostTask(
224         FROM_HERE,
225         base::BindOnce(std::move(initialize_callback), reason, bytes_wasted));
226     return;
227   }
228   download_start_ = base::TimeTicks::Now();
229   last_update_time_ = download_start_;
230   record_stream_bandwidth_ = is_parallelizable;
231 
232   // Primarily to make reset to zero in restart visible to owner.
233   SendUpdate();
234 
235   main_task_runner_->PostTask(
236       FROM_HERE, base::BindOnce(std::move(initialize_callback),
237                                 DOWNLOAD_INTERRUPT_REASON_NONE, bytes_wasted));
238 
239   // Initial pull from the straw from all source streams.
240   for (auto& source_stream : source_streams_)
241     RegisterAndActivateStream(source_stream.second.get());
242 }
243 
AddInputStream(std::unique_ptr<InputStream> stream,int64_t offset)244 void DownloadFileImpl::AddInputStream(std::unique_ptr<InputStream> stream,
245                                       int64_t offset) {
246   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
247 
248   // UI thread may not be notified about completion and detach download file,
249   // clear up the network request.
250   if (IsDownloadCompleted()) {
251     CancelRequest(offset);
252     return;
253   }
254   DCHECK(source_streams_.find(offset) == source_streams_.end());
255   source_streams_[offset] =
256       std::make_unique<SourceStream>(offset, offset, std::move(stream));
257   OnSourceStreamAdded(source_streams_[offset].get());
258 }
259 
OnSourceStreamAdded(SourceStream * source_stream)260 void DownloadFileImpl::OnSourceStreamAdded(SourceStream* source_stream) {
261   // There are writers at different offsets now, create the received slices
262   // vector if necessary.
263   if (received_slices_.empty() && TotalBytesReceived() > 0) {
264     size_t index = AddOrMergeReceivedSliceIntoSortedArray(
265         DownloadItem::ReceivedSlice(0, TotalBytesReceived()), received_slices_);
266     DCHECK_EQ(index, 0u);
267   }
268   // If the file is initialized, start to write data, or wait until file opened.
269   if (file_.in_progress())
270     RegisterAndActivateStream(source_stream);
271 }
272 
ValidateAndWriteDataToFile(int64_t offset,const char * data,size_t bytes_to_validate,size_t bytes_to_write)273 DownloadInterruptReason DownloadFileImpl::ValidateAndWriteDataToFile(
274     int64_t offset,
275     const char* data,
276     size_t bytes_to_validate,
277     size_t bytes_to_write) {
278   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
279   // Check if some of the data is for validation purpose.
280   if (bytes_to_validate > 0 &&
281       !file_.ValidateDataInFile(offset, data, bytes_to_validate)) {
282     return DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH;
283   }
284   // If there is no data to write, just return DOWNLOAD_INTERRUPT_REASON_NONE
285   // and read the next chunk.
286   if (bytes_to_write <= 0)
287     return DOWNLOAD_INTERRUPT_REASON_NONE;
288   // Write the remaining data to disk.
289   WillWriteToDisk(bytes_to_write);
290   return file_.WriteDataToFile(offset + bytes_to_validate,
291                                data + bytes_to_validate, bytes_to_write);
292 }
293 
CalculateBytesToWrite(SourceStream * source_stream,size_t bytes_available_to_write,size_t * bytes_to_validate,size_t * bytes_to_write)294 bool DownloadFileImpl::CalculateBytesToWrite(SourceStream* source_stream,
295                                              size_t bytes_available_to_write,
296                                              size_t* bytes_to_validate,
297                                              size_t* bytes_to_write) {
298   *bytes_to_validate = 0;
299   if (source_stream->length() == kNoBytesToWrite) {
300     *bytes_to_write = 0;
301     return true;
302   }
303 
304   // First calculate the number of bytes to validate.
305   *bytes_to_write = bytes_available_to_write;
306   size_t remaining_bytes_to_validate =
307       source_stream->GetRemainingBytesToValidate();
308   if (remaining_bytes_to_validate > 0) {
309     *bytes_to_validate =
310         std::min(remaining_bytes_to_validate, bytes_available_to_write);
311     *bytes_to_write -= *bytes_to_validate;
312   }
313   if (source_stream->length() != DownloadSaveInfo::kLengthFullContent &&
314       source_stream->bytes_read() +
315               static_cast<int64_t>(bytes_available_to_write) >
316           source_stream->length()) {
317     // Total bytes to consume is capped by the length of the stream.
318     int64_t bytes_to_consume =
319         source_stream->length() - source_stream->bytes_read();
320     // The validation data should always be streamed.
321     DCHECK_GE(bytes_to_consume, static_cast<int64_t>(*bytes_to_validate));
322     *bytes_to_write = bytes_to_consume - *bytes_to_validate;
323     return true;
324   }
325 
326   // If a new slice finds that its target position has already been written,
327   // terminate the stream if there are no bytes to validate.
328   if (source_stream->bytes_written() == 0 && *bytes_to_write > 0) {
329     for (const auto& received_slice : received_slices_) {
330       if (received_slice.offset <=
331               source_stream->starting_file_write_offset() &&
332           received_slice.offset + received_slice.received_bytes >
333               source_stream->starting_file_write_offset()) {
334         *bytes_to_write = 0;
335         return true;
336       }
337     }
338   }
339 
340   return false;
341 }
342 
RenameAndUniquify(const base::FilePath & full_path,RenameCompletionCallback callback)343 void DownloadFileImpl::RenameAndUniquify(const base::FilePath& full_path,
344                                          RenameCompletionCallback callback) {
345   std::unique_ptr<RenameParameters> parameters(
346       new RenameParameters(UNIQUIFY, full_path, std::move(callback)));
347   RenameWithRetryInternal(std::move(parameters));
348 }
349 
RenameAndAnnotate(const base::FilePath & full_path,const std::string & client_guid,const GURL & source_url,const GURL & referrer_url,mojo::PendingRemote<quarantine::mojom::Quarantine> remote_quarantine,RenameCompletionCallback callback)350 void DownloadFileImpl::RenameAndAnnotate(
351     const base::FilePath& full_path,
352     const std::string& client_guid,
353     const GURL& source_url,
354     const GURL& referrer_url,
355     mojo::PendingRemote<quarantine::mojom::Quarantine> remote_quarantine,
356     RenameCompletionCallback callback) {
357   std::unique_ptr<RenameParameters> parameters(new RenameParameters(
358       ANNOTATE_WITH_SOURCE_INFORMATION, full_path, std::move(callback)));
359   parameters->client_guid = client_guid;
360   parameters->source_url = source_url;
361   parameters->referrer_url = referrer_url;
362   parameters->remote_quarantine = std::move(remote_quarantine);
363   RenameWithRetryInternal(std::move(parameters));
364 }
365 
366 #if defined(OS_ANDROID)
RenameToIntermediateUri(const GURL & original_url,const GURL & referrer_url,const base::FilePath & file_name,const std::string & mime_type,const base::FilePath & current_path,RenameCompletionCallback callback)367 void DownloadFileImpl::RenameToIntermediateUri(
368     const GURL& original_url,
369     const GURL& referrer_url,
370     const base::FilePath& file_name,
371     const std::string& mime_type,
372     const base::FilePath& current_path,
373     RenameCompletionCallback callback) {
374   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
375   // Create new content URI if |current_path| is not content URI
376   // or if it is already deleted.
377   base::FilePath content_path =
378       current_path.IsContentUri() && base::ContentUriExists(current_path)
379           ? current_path
380           : DownloadCollectionBridge::CreateIntermediateUriForPublish(
381                 original_url, referrer_url, file_name, mime_type);
382   DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_FILE_FAILED;
383   if (!content_path.empty()) {
384     reason = file_.Rename(content_path);
385     display_name_ = DownloadCollectionBridge::GetDisplayName(content_path);
386   }
387   if (display_name_.empty())
388     display_name_ = file_name;
389   OnRenameComplete(content_path, std::move(callback), reason);
390 }
391 
PublishDownload(RenameCompletionCallback callback)392 void DownloadFileImpl::PublishDownload(RenameCompletionCallback callback) {
393   DownloadInterruptReason reason = file_.PublishDownload();
394   OnRenameComplete(file_.full_path(), std::move(callback), reason);
395 }
396 
GetDisplayName()397 base::FilePath DownloadFileImpl::GetDisplayName() {
398   return display_name_;
399 }
400 #endif  // defined(OS_ANDROID)
401 
GetRetryDelayForFailedRename(int attempt_number)402 base::TimeDelta DownloadFileImpl::GetRetryDelayForFailedRename(
403     int attempt_number) {
404   DCHECK_GE(attempt_number, 0);
405   // |delay| starts at kInitialRenameRetryDelayMs and increases by a factor of
406   // 2 at each subsequent retry. Assumes that |retries_left| starts at
407   // kMaxRenameRetries. Also assumes that kMaxRenameRetries is less than the
408   // number of bits in an int.
409   return base::TimeDelta::FromMilliseconds(kInitialRenameRetryDelayMs) *
410          (1 << attempt_number);
411 }
412 
ShouldRetryFailedRename(DownloadInterruptReason reason)413 bool DownloadFileImpl::ShouldRetryFailedRename(DownloadInterruptReason reason) {
414   return reason == DOWNLOAD_INTERRUPT_REASON_FILE_TRANSIENT_ERROR;
415 }
416 
HandleStreamCompletionStatus(SourceStream * source_stream)417 DownloadInterruptReason DownloadFileImpl::HandleStreamCompletionStatus(
418     SourceStream* source_stream) {
419   DownloadInterruptReason reason = source_stream->GetCompletionStatus();
420   if (source_stream->length() == DownloadSaveInfo::kLengthFullContent &&
421       !received_slices_.empty() &&
422       (source_stream->starting_file_write_offset() ==
423        received_slices_.back().offset +
424            received_slices_.back().received_bytes) &&
425       reason == DOWNLOAD_INTERRUPT_REASON_SERVER_NO_RANGE) {
426     // We are probably reaching the end of the stream, don't treat this
427     // as an error.
428     return DOWNLOAD_INTERRUPT_REASON_NONE;
429   }
430   return reason;
431 }
432 
RenameWithRetryInternal(std::unique_ptr<RenameParameters> parameters)433 void DownloadFileImpl::RenameWithRetryInternal(
434     std::unique_ptr<RenameParameters> parameters) {
435   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
436 
437   base::FilePath new_path = parameters->new_path;
438 
439   if ((parameters->option & UNIQUIFY) && new_path != file_.full_path()) {
440     new_path = base::GetUniquePath(new_path);
441   }
442 
443   DownloadInterruptReason reason = file_.Rename(new_path);
444 
445   // Attempt to retry the rename if possible. If the rename failed and the
446   // subsequent open also failed, then in_progress() would be false. We don't
447   // try to retry renames if the in_progress() was false to begin with since we
448   // have less assurance that the file at file_.full_path() was the one we were
449   // working with.
450   if (ShouldRetryFailedRename(reason) && file_.in_progress() &&
451       parameters->retries_left > 0) {
452     int attempt_number = kMaxRenameRetries - parameters->retries_left;
453     --parameters->retries_left;
454     if (parameters->time_of_first_failure.is_null())
455       parameters->time_of_first_failure = base::TimeTicks::Now();
456     task_runner_->PostDelayedTask(
457         FROM_HERE,
458         base::BindOnce(&DownloadFileImpl::RenameWithRetryInternal,
459                        weak_factory_.GetWeakPtr(), std::move(parameters)),
460         GetRetryDelayForFailedRename(attempt_number));
461     return;
462   }
463 
464   if (reason == DOWNLOAD_INTERRUPT_REASON_NONE &&
465       (parameters->option & ANNOTATE_WITH_SOURCE_INFORMATION)) {
466     // Doing the annotation after the rename rather than before leaves
467     // a very small window during which the file has the final name but
468     // hasn't been marked with the Mark Of The Web.  However, it allows
469     // anti-virus scanners on Windows to actually see the data
470     // (http://crbug.com/127999) under the correct name (which is information
471     // it uses).
472     //
473     // If concurrent downloads with the same target path are allowed, an
474     // asynchronous quarantine file may cause a file to be stamped with
475     // incorrect mark-of-the-web data. Therefore, fall back to non-service
476     // QuarantineFile when kPreventDownloadsWithSamePath is disabled.
477     file_.AnnotateWithSourceInformation(
478         parameters->client_guid, parameters->source_url,
479         parameters->referrer_url, std::move(parameters->remote_quarantine),
480         base::BindOnce(&DownloadFileImpl::OnRenameComplete,
481                        weak_factory_.GetWeakPtr(), new_path,
482                        std::move(parameters->completion_callback)));
483     return;
484   }
485 
486   OnRenameComplete(new_path, std::move(parameters->completion_callback),
487                    reason);
488 }
489 
OnRenameComplete(const base::FilePath & new_path,RenameCompletionCallback callback,DownloadInterruptReason reason)490 void DownloadFileImpl::OnRenameComplete(const base::FilePath& new_path,
491                                         RenameCompletionCallback callback,
492                                         DownloadInterruptReason reason) {
493   if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
494     // Make sure our information is updated, since we're about to
495     // error out.
496     SendUpdate();
497 
498     // Null out callback so that we don't do any more stream processing.
499     // The request that writes to the pipe should be canceled after
500     // the download being interrupted.
501     for (auto& stream : source_streams_)
502       stream.second->ClearDataReadyCallback();
503   }
504 
505   main_task_runner_->PostTask(
506       FROM_HERE, base::BindOnce(std::move(callback), reason,
507                                 reason == DOWNLOAD_INTERRUPT_REASON_NONE
508                                     ? new_path
509                                     : base::FilePath()));
510 }
511 
Detach()512 void DownloadFileImpl::Detach() {
513   file_.Detach();
514 }
515 
Cancel()516 void DownloadFileImpl::Cancel() {
517   file_.Cancel();
518 }
519 
SetPotentialFileLength(int64_t length)520 void DownloadFileImpl::SetPotentialFileLength(int64_t length) {
521   DCHECK(potential_file_length_ == length ||
522          potential_file_length_ == kUnknownContentLength)
523       << "Potential file length changed, the download might have updated.";
524 
525   if (length < potential_file_length_ ||
526       potential_file_length_ == kUnknownContentLength) {
527     potential_file_length_ = length;
528   }
529 
530   // TODO(qinmin): interrupt the download if the received bytes are larger
531   // than content length limit.
532   LOG_IF(ERROR, TotalBytesReceived() > potential_file_length_)
533       << "Received data is larger than the content length limit.";
534 }
535 
FullPath() const536 const base::FilePath& DownloadFileImpl::FullPath() const {
537   return file_.full_path();
538 }
539 
InProgress() const540 bool DownloadFileImpl::InProgress() const {
541   return file_.in_progress();
542 }
543 
Pause()544 void DownloadFileImpl::Pause() {
545   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
546   is_paused_ = true;
547   record_stream_bandwidth_ = false;
548   for (auto& stream : source_streams_)
549     stream.second->ClearDataReadyCallback();
550 }
551 
Resume()552 void DownloadFileImpl::Resume() {
553   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
554   DCHECK(is_paused_);
555   is_paused_ = false;
556 
557   for (auto& stream : source_streams_) {
558     SourceStream* source_stream = stream.second.get();
559     if (!source_stream->is_finished())
560       StreamActive(source_stream, MOJO_RESULT_OK);
561   }
562 }
563 
StreamActive(SourceStream * source_stream,MojoResult result)564 void DownloadFileImpl::StreamActive(SourceStream* source_stream,
565                                     MojoResult result) {
566   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
567   if (is_paused_)
568     return;
569 
570   base::TimeTicks start(base::TimeTicks::Now());
571   base::TimeTicks now;
572   scoped_refptr<net::IOBuffer> incoming_data;
573   size_t incoming_data_size = 0;
574   size_t total_incoming_data_size = 0;
575   size_t num_buffers = 0;
576   size_t bytes_to_validate = 0;
577   size_t bytes_to_write = 0;
578   bool should_terminate = false;
579   InputStream::StreamState state(InputStream::EMPTY);
580   DownloadInterruptReason reason = DOWNLOAD_INTERRUPT_REASON_NONE;
581   base::TimeDelta delta(
582       base::TimeDelta::FromMilliseconds(kMaxTimeBlockingFileThreadMs));
583   // Take care of any file local activity required.
584   do {
585     state = source_stream->Read(&incoming_data, &incoming_data_size);
586     switch (state) {
587       case InputStream::EMPTY:
588         should_terminate = (source_stream->length() == kNoBytesToWrite);
589         break;
590       case InputStream::HAS_DATA: {
591         ++num_buffers;
592         should_terminate =
593             CalculateBytesToWrite(source_stream, incoming_data_size,
594                                   &bytes_to_validate, &bytes_to_write);
595         DCHECK_GE(incoming_data_size, bytes_to_write);
596         reason = ValidateAndWriteDataToFile(
597             source_stream->offset() + source_stream->bytes_read(),
598             incoming_data->data(), bytes_to_validate, bytes_to_write);
599         bytes_seen_ += bytes_to_write;
600         total_incoming_data_size += incoming_data_size;
601         if (reason == DOWNLOAD_INTERRUPT_REASON_NONE) {
602           int64_t prev_bytes_written = source_stream->bytes_written();
603           source_stream->OnBytesConsumed(incoming_data_size, bytes_to_write);
604           if (!IsSparseFile())
605             break;
606           // If the write operation creates a new slice, add it to the
607           // |received_slices_| and update all the entries in
608           // |source_streams_|.
609           if (bytes_to_write > 0 && prev_bytes_written == 0) {
610             AddNewSlice(source_stream->starting_file_write_offset(),
611                         bytes_to_write);
612           } else {
613             received_slices_[source_stream->index()].received_bytes +=
614                 bytes_to_write;
615           }
616         }
617       } break;
618       case InputStream::WAIT_FOR_COMPLETION:
619         source_stream->RegisterCompletionCallback(base::BindOnce(
620             &DownloadFileImpl::OnStreamCompleted, weak_factory_.GetWeakPtr()));
621         break;
622       case InputStream::COMPLETE:
623         break;
624       default:
625         NOTREACHED();
626         break;
627     }
628     now = base::TimeTicks::Now();
629   } while (state == InputStream::HAS_DATA &&
630            reason == DOWNLOAD_INTERRUPT_REASON_NONE && now - start <= delta &&
631            !should_terminate);
632 
633   // If we're stopping to yield the thread, post a task so we come back.
634   if (state == InputStream::HAS_DATA && now - start > delta &&
635       !should_terminate) {
636     source_stream->read_stream_callback()->Reset(base::BindOnce(
637         &DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
638         source_stream, MOJO_RESULT_OK));
639     task_runner_->PostTask(FROM_HERE,
640                            source_stream->read_stream_callback()->callback());
641   } else if (state == InputStream::EMPTY && !should_terminate) {
642     source_stream->RegisterDataReadyCallback(
643         base::BindRepeating(&DownloadFileImpl::StreamActive, weak_factory_.GetWeakPtr(),
644                    source_stream));
645   }
646 
647   if (state == InputStream::COMPLETE)
648     OnStreamCompleted(source_stream);
649   else
650     NotifyObserver(source_stream, reason, state, should_terminate);
651 
652   TRACE_EVENT_INSTANT2("download", "DownloadStreamDrained",
653                        TRACE_EVENT_SCOPE_THREAD, "stream_size",
654                        total_incoming_data_size, "num_buffers", num_buffers);
655 }
656 
OnStreamCompleted(SourceStream * source_stream)657 void DownloadFileImpl::OnStreamCompleted(SourceStream* source_stream) {
658   DownloadInterruptReason reason = HandleStreamCompletionStatus(source_stream);
659   SendUpdate();
660 
661   NotifyObserver(source_stream, reason, InputStream::COMPLETE, false);
662 }
663 
NotifyObserver(SourceStream * source_stream,DownloadInterruptReason reason,InputStream::StreamState stream_state,bool should_terminate)664 void DownloadFileImpl::NotifyObserver(SourceStream* source_stream,
665                                       DownloadInterruptReason reason,
666                                       InputStream::StreamState stream_state,
667                                       bool should_terminate) {
668   if (reason != DOWNLOAD_INTERRUPT_REASON_NONE) {
669     HandleStreamError(source_stream, reason);
670   } else if (stream_state == InputStream::COMPLETE || should_terminate) {
671     // Signal successful completion or termination of the current stream.
672     source_stream->ClearDataReadyCallback();
673     source_stream->set_finished(true);
674 
675     if (should_terminate)
676       CancelRequest(source_stream->offset());
677     if (source_stream->length() == DownloadSaveInfo::kLengthFullContent) {
678       // Mark received slice as finished.
679       if (IsSparseFile() && source_stream->bytes_written() > 0) {
680         DCHECK_GT(received_slices_.size(), source_stream->index())
681             << "Received slice index out of bound!";
682         received_slices_[source_stream->index()].finished = true;
683       }
684       if (!should_terminate) {
685         SetPotentialFileLength(source_stream->offset() +
686                                source_stream->bytes_read());
687       }
688     }
689     num_active_streams_--;
690 
691     // Inform observers.
692     SendUpdate();
693 
694     // All the stream reader are completed, shut down file IO processing.
695     if (IsDownloadCompleted()) {
696       OnDownloadCompleted();
697     } else {
698       // If all the stream completes and we still not able to complete, trigger
699       // a content length mismatch error so auto resumption will be performed.
700       SendErrorUpdateIfFinished(
701           DOWNLOAD_INTERRUPT_REASON_SERVER_CONTENT_LENGTH_MISMATCH);
702     }
703   }
704 }
705 
OnDownloadCompleted()706 void DownloadFileImpl::OnDownloadCompleted() {
707   RecordFileBandwidth(bytes_seen_, base::TimeTicks::Now() - download_start_);
708   if (record_stream_bandwidth_) {
709     RecordParallelizableDownloadStats(
710         bytes_seen_with_parallel_streams_, download_time_with_parallel_streams_,
711         bytes_seen_without_parallel_streams_,
712         download_time_without_parallel_streams_, IsSparseFile());
713   }
714   weak_factory_.InvalidateWeakPtrs();
715   std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
716   update_timer_.reset();
717   main_task_runner_->PostTask(
718       FROM_HERE,
719       base::BindOnce(&DownloadDestinationObserver::DestinationCompleted,
720                      observer_, TotalBytesReceived(), std::move(hash_state)));
721 }
722 
RegisterAndActivateStream(SourceStream * source_stream)723 void DownloadFileImpl::RegisterAndActivateStream(SourceStream* source_stream) {
724   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
725 
726   source_stream->Initialize();
727   // Truncate |source_stream|'s length if necessary.
728   for (const auto& received_slice : received_slices_) {
729     source_stream->TruncateLengthWithWrittenDataBlock(
730         received_slice.offset, received_slice.received_bytes);
731   }
732   num_active_streams_++;
733   StreamActive(source_stream, MOJO_RESULT_OK);
734 }
735 
TotalBytesReceived() const736 int64_t DownloadFileImpl::TotalBytesReceived() const {
737   return file_.bytes_so_far();
738 }
739 
SendUpdate()740 void DownloadFileImpl::SendUpdate() {
741   // TODO(qinmin): For each active stream, add the slice it has written so
742   // far along with received_slices_.
743   main_task_runner_->PostTask(
744       FROM_HERE,
745       base::BindOnce(&DownloadDestinationObserver::DestinationUpdate, observer_,
746                      TotalBytesReceived(), rate_estimator_.GetCountPerSecond(),
747                      received_slices_));
748 }
749 
WillWriteToDisk(size_t data_len)750 void DownloadFileImpl::WillWriteToDisk(size_t data_len) {
751   if (!update_timer_->IsRunning()) {
752     update_timer_->Start(FROM_HERE,
753                          base::TimeDelta::FromMilliseconds(kUpdatePeriodMs),
754                          this, &DownloadFileImpl::SendUpdate);
755   }
756   rate_estimator_.Increment(data_len);
757   base::TimeTicks now = base::TimeTicks::Now();
758   base::TimeDelta time_elapsed = (now - last_update_time_);
759   last_update_time_ = now;
760   if (num_active_streams_ > 1) {
761     download_time_with_parallel_streams_ += time_elapsed;
762     bytes_seen_with_parallel_streams_ += data_len;
763   } else {
764     download_time_without_parallel_streams_ += time_elapsed;
765     bytes_seen_without_parallel_streams_ += data_len;
766   }
767 }
768 
AddNewSlice(int64_t offset,int64_t length)769 void DownloadFileImpl::AddNewSlice(int64_t offset, int64_t length) {
770   size_t index = AddOrMergeReceivedSliceIntoSortedArray(
771       DownloadItem::ReceivedSlice(offset, length), received_slices_);
772   // Check if the slice is added as a new slice, or merged with an existing one.
773   bool slice_added = (offset == received_slices_[index].offset);
774   // Update the index of exising SourceStreams.
775   for (auto& stream : source_streams_) {
776     SourceStream* source_stream = stream.second.get();
777     if (source_stream->starting_file_write_offset() > offset) {
778       if (slice_added && source_stream->bytes_written() > 0)
779         source_stream->set_index(source_stream->index() + 1);
780     } else if (source_stream->starting_file_write_offset() == offset) {
781       source_stream->set_index(index);
782     } else {
783       source_stream->TruncateLengthWithWrittenDataBlock(offset, length);
784     }
785   }
786 }
787 
IsDownloadCompleted()788 bool DownloadFileImpl::IsDownloadCompleted() {
789   for (auto& stream : source_streams_) {
790     if (!stream.second->is_finished())
791       return false;
792   }
793 
794   if (!IsSparseFile())
795     return true;
796 
797   // Verify that all the file slices have been downloaded.
798   std::vector<DownloadItem::ReceivedSlice> slices_to_download =
799       FindSlicesToDownload(received_slices_);
800   if (slices_to_download.size() > 1) {
801     // If there are 1 or more holes in the file, download is not finished.
802     // Some streams might not have been added to |source_streams_| yet.
803     return false;
804   }
805   return TotalBytesReceived() == potential_file_length_;
806 }
807 
HandleStreamError(SourceStream * source_stream,DownloadInterruptReason reason)808 void DownloadFileImpl::HandleStreamError(SourceStream* source_stream,
809                                          DownloadInterruptReason reason) {
810   DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
811   source_stream->ClearDataReadyCallback();
812   source_stream->set_finished(true);
813   num_active_streams_--;
814 
815   bool can_recover_from_error = false;
816   if (reason != DOWNLOAD_INTERRUPT_REASON_FILE_HASH_MISMATCH) {
817     // If previous stream has already written data at the starting offset of
818     // the error stream. The download can complete.
819     can_recover_from_error = (source_stream->length() == kNoBytesToWrite);
820 
821     // See if the previous stream can download the full content.
822     // If the current stream has written some data, length of all preceding
823     // streams will be truncated.
824     if (IsSparseFile() && !can_recover_from_error) {
825       SourceStream* preceding_neighbor = FindPrecedingNeighbor(source_stream);
826       while (preceding_neighbor) {
827         if (CanRecoverFromError(source_stream, preceding_neighbor)) {
828           can_recover_from_error = true;
829           break;
830         }
831 
832         // If the neighbor cannot recover the error and it has already created
833         // a slice, just interrupt the download.
834         if (preceding_neighbor->bytes_written() > 0)
835           break;
836         preceding_neighbor = FindPrecedingNeighbor(preceding_neighbor);
837       }
838     }
839   }
840 
841   SendUpdate();  // Make info up to date before error.
842 
843   // If the download can recover from error, check if it already finishes.
844   // Otherwise, send an error update when all streams are finished.
845   if (!can_recover_from_error)
846     SendErrorUpdateIfFinished(reason);
847   else if (IsDownloadCompleted())
848     OnDownloadCompleted();
849 }
850 
SendErrorUpdateIfFinished(DownloadInterruptReason reason)851 void DownloadFileImpl::SendErrorUpdateIfFinished(
852     DownloadInterruptReason reason) {
853   // If there are still active streams, let them finish before
854   // sending out the error to the observer.
855   if (num_active_streams_ > 0)
856     return;
857 
858   if (IsSparseFile()) {
859     for (const auto& slice : slice_to_download_) {
860       // Ignore last slice beyond file length.
861       if (slice.offset > 0 && slice.offset == potential_file_length_)
862         continue;
863       if (source_streams_.find(slice.offset) == source_streams_.end())
864         return;
865     }
866   }
867 
868   // Error case for both upstream source and file write.
869   // Shut down processing and signal an error to our observer.
870   // Our observer will clean us up.
871   weak_factory_.InvalidateWeakPtrs();
872   std::unique_ptr<crypto::SecureHash> hash_state = file_.Finish();
873   main_task_runner_->PostTask(
874       FROM_HERE,
875       base::BindOnce(&DownloadDestinationObserver::DestinationError, observer_,
876                      reason, TotalBytesReceived(), std::move(hash_state)));
877 }
878 
IsSparseFile() const879 bool DownloadFileImpl::IsSparseFile() const {
880   return source_streams_.size() > 1 || !received_slices_.empty();
881 }
882 
FindPrecedingNeighbor(SourceStream * source_stream)883 DownloadFileImpl::SourceStream* DownloadFileImpl::FindPrecedingNeighbor(
884     SourceStream* source_stream) {
885   int64_t max_preceding_offset = 0;
886   SourceStream* ret = nullptr;
887   for (auto& stream : source_streams_) {
888     int64_t offset = stream.second->starting_file_write_offset();
889     if (offset < source_stream->starting_file_write_offset() &&
890         offset >= max_preceding_offset) {
891       ret = stream.second.get();
892       max_preceding_offset = offset;
893     }
894   }
895   return ret;
896 }
897 
CancelRequest(int64_t offset)898 void DownloadFileImpl::CancelRequest(int64_t offset) {
899   if (!cancel_request_callback_.is_null()) {
900     main_task_runner_->PostTask(
901         FROM_HERE, base::BindOnce(cancel_request_callback_, offset));
902   }
903 }
904 
DebugStates() const905 void DownloadFileImpl::DebugStates() const {
906   DVLOG(1) << "### Debugging DownloadFile states:";
907   DVLOG(1) << "Total source stream count = " << source_streams_.size();
908   for (const auto& stream : source_streams_) {
909     DVLOG(1) << "Source stream, offset = " << stream.second->offset()
910              << " , bytes_read = " << stream.second->bytes_read()
911              << " , starting_file_write_offset = "
912              << stream.second->starting_file_write_offset()
913              << " , bytes_written = " << stream.second->bytes_written()
914              << " , is_finished = " << stream.second->is_finished()
915              << " , length = " << stream.second->length()
916              << ", index = " << stream.second->index();
917   }
918 
919   DebugSlicesInfo(received_slices_);
920 }
921 
SetTaskRunnerForTesting(scoped_refptr<base::SequencedTaskRunner> task_runner)922 void DownloadFileImpl::SetTaskRunnerForTesting(
923     scoped_refptr<base::SequencedTaskRunner> task_runner) {
924   task_runner_ = std::move(task_runner);
925 }
926 
RenameParameters(RenameOption option,const base::FilePath & new_path,RenameCompletionCallback completion_callback)927 DownloadFileImpl::RenameParameters::RenameParameters(
928     RenameOption option,
929     const base::FilePath& new_path,
930     RenameCompletionCallback completion_callback)
931     : option(option),
932       new_path(new_path),
933       retries_left(kMaxRenameRetries),
934       completion_callback(std::move(completion_callback)) {}
935 
~RenameParameters()936 DownloadFileImpl::RenameParameters::~RenameParameters() {}
937 
938 }  // namespace download
939