1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "services/network/data_pipe_element_reader.h"
6
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "mojo/public/c/system/types.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14
15 namespace network {
16
DataPipeElementReader(scoped_refptr<ResourceRequestBody> resource_request_body,mojo::PendingRemote<mojom::DataPipeGetter> data_pipe_getter)17 DataPipeElementReader::DataPipeElementReader(
18 scoped_refptr<ResourceRequestBody> resource_request_body,
19 mojo::PendingRemote<mojom::DataPipeGetter> data_pipe_getter)
20 : resource_request_body_(std::move(resource_request_body)),
21 data_pipe_getter_(std::move(data_pipe_getter)),
22 handle_watcher_(FROM_HERE,
23 mojo::SimpleWatcher::ArmingPolicy::MANUAL,
24 base::SequencedTaskRunnerHandle::Get()) {}
25
~DataPipeElementReader()26 DataPipeElementReader::~DataPipeElementReader() {}
27
Init(net::CompletionOnceCallback callback)28 int DataPipeElementReader::Init(net::CompletionOnceCallback callback) {
29 DCHECK(callback);
30
31 // Init rewinds the stream. Throw away current state.
32 read_callback_.Reset();
33 buf_ = nullptr;
34 buf_length_ = 0;
35 handle_watcher_.Cancel();
36 size_ = 0;
37 bytes_read_ = 0;
38 // Need to do this to prevent any previously pending ReadCallback() invocation
39 // from running.
40 weak_factory_.InvalidateWeakPtrs();
41
42 // Get a new data pipe and start.
43 mojo::DataPipe data_pipe;
44 data_pipe_getter_->Read(std::move(data_pipe.producer_handle),
45 base::BindOnce(&DataPipeElementReader::ReadCallback,
46 weak_factory_.GetWeakPtr()));
47 data_pipe_ = std::move(data_pipe.consumer_handle);
48 handle_watcher_.Watch(
49 data_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
50 base::BindRepeating(&DataPipeElementReader::OnHandleReadable,
51 base::Unretained(this)));
52
53 init_callback_ = std::move(callback);
54 return net::ERR_IO_PENDING;
55 }
56
GetContentLength() const57 uint64_t DataPipeElementReader::GetContentLength() const {
58 return size_;
59 }
60
BytesRemaining() const61 uint64_t DataPipeElementReader::BytesRemaining() const {
62 return size_ - bytes_read_;
63 }
64
Read(net::IOBuffer * buf,int buf_length,net::CompletionOnceCallback callback)65 int DataPipeElementReader::Read(net::IOBuffer* buf,
66 int buf_length,
67 net::CompletionOnceCallback callback) {
68 DCHECK(callback);
69 DCHECK(!read_callback_);
70 DCHECK(!init_callback_);
71 DCHECK(!buf_);
72
73 int result = ReadInternal(buf, buf_length);
74 if (result == net::ERR_IO_PENDING) {
75 buf_ = buf;
76 buf_length_ = buf_length;
77 read_callback_ = std::move(callback);
78 }
79 return result;
80 }
81
ReadCallback(int32_t status,uint64_t size)82 void DataPipeElementReader::ReadCallback(int32_t status, uint64_t size) {
83 if (status == net::OK)
84 size_ = size;
85 if (init_callback_)
86 std::move(init_callback_).Run(status);
87 }
88
OnHandleReadable(MojoResult result)89 void DataPipeElementReader::OnHandleReadable(MojoResult result) {
90 DCHECK(read_callback_);
91 DCHECK(buf_);
92
93 // Final result of the Read() call, to be passed to the consumer.
94 int read_result;
95 if (result == MOJO_RESULT_OK) {
96 read_result = ReadInternal(buf_.get(), buf_length_);
97 } else {
98 read_result = net::ERR_FAILED;
99 }
100
101 buf_ = nullptr;
102 buf_length_ = 0;
103
104 if (read_result != net::ERR_IO_PENDING)
105 std::move(read_callback_).Run(read_result);
106 }
107
ReadInternal(net::IOBuffer * buf,int buf_length)108 int DataPipeElementReader::ReadInternal(net::IOBuffer* buf, int buf_length) {
109 DCHECK(buf);
110 DCHECK_GT(buf_length, 0);
111
112 if (BytesRemaining() == 0)
113 return net::OK;
114
115 uint32_t num_bytes = buf_length;
116 MojoResult rv =
117 data_pipe_->ReadData(buf->data(), &num_bytes, MOJO_READ_DATA_FLAG_NONE);
118 if (rv == MOJO_RESULT_OK) {
119 bytes_read_ += num_bytes;
120 return num_bytes;
121 }
122
123 if (rv == MOJO_RESULT_SHOULD_WAIT) {
124 handle_watcher_.ArmOrNotify();
125 return net::ERR_IO_PENDING;
126 }
127
128 return net::ERR_FAILED;
129 }
130
131 } // namespace network
132