1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "content/browser/indexed_db/file_stream_reader_to_data_pipe.h"
6 
7 #include "base/bind.h"
8 #include "net/base/net_errors.h"
9 
10 namespace content {
11 
FileStreamReaderToDataPipe(std::unique_ptr<storage::FileStreamReader> reader,mojo::ScopedDataPipeProducerHandle dest)12 FileStreamReaderToDataPipe::FileStreamReaderToDataPipe(
13     std::unique_ptr<storage::FileStreamReader> reader,
14     mojo::ScopedDataPipeProducerHandle dest)
15     : reader_(std::move(reader)), dest_(std::move(dest)) {}
16 
17 FileStreamReaderToDataPipe::~FileStreamReaderToDataPipe() = default;
18 
Start(base::OnceCallback<void (int)> completion_callback,uint64_t read_length)19 void FileStreamReaderToDataPipe::Start(
20     base::OnceCallback<void(int)> completion_callback,
21     uint64_t read_length) {
22   DCHECK(!writable_handle_watcher_.has_value());
23   writable_handle_watcher_.emplace(FROM_HERE,
24                                    mojo::SimpleWatcher::ArmingPolicy::MANUAL,
25                                    base::SequencedTaskRunnerHandle::Get());
26   writable_handle_watcher_->Watch(
27       dest_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
28       base::BindRepeating(&FileStreamReaderToDataPipe::OnDataPipeWritable,
29                           base::Unretained(this)));
30 
31   read_length_ = read_length;
32   completion_callback_ = std::move(completion_callback);
33   ReadMore();
34 }
35 
ReadMore()36 void FileStreamReaderToDataPipe::ReadMore() {
37   DCHECK(!pending_write_.get());
38 
39   uint32_t num_bytes;
40   MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
41       &dest_, &pending_write_, &num_bytes);
42   if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
43     // The pipe is full.  We need to wait for it to have more space.
44     writable_handle_watcher_->ArmOrNotify();
45     return;
46   } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
47     // The data pipe consumer handle has been closed.
48     OnComplete(net::ERR_ABORTED);
49     return;
50   } else if (mojo_result != MOJO_RESULT_OK) {
51     // The body stream is in a bad state. Bail out.
52     OnComplete(net::ERR_UNEXPECTED);
53     return;
54   }
55 
56   uint64_t read_bytes = std::min(static_cast<uint64_t>(num_bytes),
57                                  read_length_ - transferred_bytes_);
58 
59   scoped_refptr<net::IOBuffer> buffer(
60       new network::NetToMojoIOBuffer(pending_write_.get()));
61   int result =
62       reader_->Read(buffer.get(), base::checked_cast<int>(read_bytes),
63                     base::BindRepeating(&FileStreamReaderToDataPipe::DidRead,
64                                         base::Unretained(this)));
65 
66   if (result != net::ERR_IO_PENDING)
67     DidRead(result);
68 }
69 
DidRead(int result)70 void FileStreamReaderToDataPipe::DidRead(int result) {
71   DCHECK(pending_write_);
72   if (result <= 0) {
73     // An error, or end of the stream.
74     pending_write_->Complete(0);  // Closes the data pipe.
75     OnComplete(result);
76     return;
77   }
78 
79   dest_ = pending_write_->Complete(result);
80   transferred_bytes_ += result;
81 
82   if (transferred_bytes_ >= read_length_) {
83     OnComplete(net::OK);
84     return;
85   }
86 
87   pending_write_ = nullptr;
88 
89   base::SequencedTaskRunnerHandle::Get()->PostTask(
90       FROM_HERE, base::BindOnce(&FileStreamReaderToDataPipe::ReadMore,
91                                 weak_factory_.GetWeakPtr()));
92 }
93 
OnDataPipeWritable(MojoResult result)94 void FileStreamReaderToDataPipe::OnDataPipeWritable(MojoResult result) {
95   if (result == MOJO_RESULT_FAILED_PRECONDITION) {
96     OnComplete(net::ERR_ABORTED);
97     return;
98   }
99   DCHECK_EQ(result, MOJO_RESULT_OK) << result;
100 
101   ReadMore();
102 }
103 
OnComplete(int result)104 void FileStreamReaderToDataPipe::OnComplete(int result) {
105   // Resets the watchers, pipes and the exchange handler, so that
106   // we will never be called back.
107   writable_handle_watcher_->Cancel();
108   pending_write_ = nullptr;
109   dest_.reset();
110 
111   std::move(completion_callback_).Run(result);
112 }
113 
114 }  // namespace content
115