1 // Copyright 2019 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "content/browser/indexed_db/file_stream_reader_to_data_pipe.h"
6
7 #include "base/bind.h"
8 #include "net/base/net_errors.h"
9
10 namespace content {
11
FileStreamReaderToDataPipe(std::unique_ptr<storage::FileStreamReader> reader,mojo::ScopedDataPipeProducerHandle dest)12 FileStreamReaderToDataPipe::FileStreamReaderToDataPipe(
13 std::unique_ptr<storage::FileStreamReader> reader,
14 mojo::ScopedDataPipeProducerHandle dest)
15 : reader_(std::move(reader)), dest_(std::move(dest)) {}
16
17 FileStreamReaderToDataPipe::~FileStreamReaderToDataPipe() = default;
18
Start(base::OnceCallback<void (int)> completion_callback,uint64_t read_length)19 void FileStreamReaderToDataPipe::Start(
20 base::OnceCallback<void(int)> completion_callback,
21 uint64_t read_length) {
22 DCHECK(!writable_handle_watcher_.has_value());
23 writable_handle_watcher_.emplace(FROM_HERE,
24 mojo::SimpleWatcher::ArmingPolicy::MANUAL,
25 base::SequencedTaskRunnerHandle::Get());
26 writable_handle_watcher_->Watch(
27 dest_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
28 base::BindRepeating(&FileStreamReaderToDataPipe::OnDataPipeWritable,
29 base::Unretained(this)));
30
31 read_length_ = read_length;
32 completion_callback_ = std::move(completion_callback);
33 ReadMore();
34 }
35
ReadMore()36 void FileStreamReaderToDataPipe::ReadMore() {
37 DCHECK(!pending_write_.get());
38
39 uint32_t num_bytes;
40 MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite(
41 &dest_, &pending_write_, &num_bytes);
42 if (mojo_result == MOJO_RESULT_SHOULD_WAIT) {
43 // The pipe is full. We need to wait for it to have more space.
44 writable_handle_watcher_->ArmOrNotify();
45 return;
46 } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) {
47 // The data pipe consumer handle has been closed.
48 OnComplete(net::ERR_ABORTED);
49 return;
50 } else if (mojo_result != MOJO_RESULT_OK) {
51 // The body stream is in a bad state. Bail out.
52 OnComplete(net::ERR_UNEXPECTED);
53 return;
54 }
55
56 uint64_t read_bytes = std::min(static_cast<uint64_t>(num_bytes),
57 read_length_ - transferred_bytes_);
58
59 scoped_refptr<net::IOBuffer> buffer(
60 new network::NetToMojoIOBuffer(pending_write_.get()));
61 int result =
62 reader_->Read(buffer.get(), base::checked_cast<int>(read_bytes),
63 base::BindRepeating(&FileStreamReaderToDataPipe::DidRead,
64 base::Unretained(this)));
65
66 if (result != net::ERR_IO_PENDING)
67 DidRead(result);
68 }
69
DidRead(int result)70 void FileStreamReaderToDataPipe::DidRead(int result) {
71 DCHECK(pending_write_);
72 if (result <= 0) {
73 // An error, or end of the stream.
74 pending_write_->Complete(0); // Closes the data pipe.
75 OnComplete(result);
76 return;
77 }
78
79 dest_ = pending_write_->Complete(result);
80 transferred_bytes_ += result;
81
82 if (transferred_bytes_ >= read_length_) {
83 OnComplete(net::OK);
84 return;
85 }
86
87 pending_write_ = nullptr;
88
89 base::SequencedTaskRunnerHandle::Get()->PostTask(
90 FROM_HERE, base::BindOnce(&FileStreamReaderToDataPipe::ReadMore,
91 weak_factory_.GetWeakPtr()));
92 }
93
OnDataPipeWritable(MojoResult result)94 void FileStreamReaderToDataPipe::OnDataPipeWritable(MojoResult result) {
95 if (result == MOJO_RESULT_FAILED_PRECONDITION) {
96 OnComplete(net::ERR_ABORTED);
97 return;
98 }
99 DCHECK_EQ(result, MOJO_RESULT_OK) << result;
100
101 ReadMore();
102 }
103
OnComplete(int result)104 void FileStreamReaderToDataPipe::OnComplete(int result) {
105 // Resets the watchers, pipes and the exchange handler, so that
106 // we will never be called back.
107 writable_handle_watcher_->Cancel();
108 pending_write_ = nullptr;
109 dest_.reset();
110
111 std::move(completion_callback_).Run(result);
112 }
113
114 } // namespace content
115