1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "services/network/data_pipe_element_reader.h"
6 
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "mojo/public/c/system/types.h"
12 #include "net/base/io_buffer.h"
13 #include "net/base/net_errors.h"
14 
15 namespace network {
16 
DataPipeElementReader(scoped_refptr<ResourceRequestBody> resource_request_body,mojo::PendingRemote<mojom::DataPipeGetter> data_pipe_getter)17 DataPipeElementReader::DataPipeElementReader(
18     scoped_refptr<ResourceRequestBody> resource_request_body,
19     mojo::PendingRemote<mojom::DataPipeGetter> data_pipe_getter)
20     : resource_request_body_(std::move(resource_request_body)),
21       data_pipe_getter_(std::move(data_pipe_getter)),
22       handle_watcher_(FROM_HERE,
23                       mojo::SimpleWatcher::ArmingPolicy::MANUAL,
24                       base::SequencedTaskRunnerHandle::Get()) {}
25 
~DataPipeElementReader()26 DataPipeElementReader::~DataPipeElementReader() {}
27 
Init(net::CompletionOnceCallback callback)28 int DataPipeElementReader::Init(net::CompletionOnceCallback callback) {
29   DCHECK(callback);
30 
31   // Init rewinds the stream. Throw away current state.
32   read_callback_.Reset();
33   buf_ = nullptr;
34   buf_length_ = 0;
35   handle_watcher_.Cancel();
36   size_ = 0;
37   bytes_read_ = 0;
38   // Need to do this to prevent any previously pending ReadCallback() invocation
39   // from running.
40   weak_factory_.InvalidateWeakPtrs();
41 
42   // Get a new data pipe and start.
43   mojo::DataPipe data_pipe;
44   data_pipe_getter_->Read(std::move(data_pipe.producer_handle),
45                           base::BindOnce(&DataPipeElementReader::ReadCallback,
46                                          weak_factory_.GetWeakPtr()));
47   data_pipe_ = std::move(data_pipe.consumer_handle);
48   handle_watcher_.Watch(
49       data_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
50       base::BindRepeating(&DataPipeElementReader::OnHandleReadable,
51                           base::Unretained(this)));
52 
53   init_callback_ = std::move(callback);
54   return net::ERR_IO_PENDING;
55 }
56 
GetContentLength() const57 uint64_t DataPipeElementReader::GetContentLength() const {
58   return size_;
59 }
60 
BytesRemaining() const61 uint64_t DataPipeElementReader::BytesRemaining() const {
62   return size_ - bytes_read_;
63 }
64 
Read(net::IOBuffer * buf,int buf_length,net::CompletionOnceCallback callback)65 int DataPipeElementReader::Read(net::IOBuffer* buf,
66                                 int buf_length,
67                                 net::CompletionOnceCallback callback) {
68   DCHECK(callback);
69   DCHECK(!read_callback_);
70   DCHECK(!init_callback_);
71   DCHECK(!buf_);
72 
73   int result = ReadInternal(buf, buf_length);
74   if (result == net::ERR_IO_PENDING) {
75     buf_ = buf;
76     buf_length_ = buf_length;
77     read_callback_ = std::move(callback);
78   }
79   return result;
80 }
81 
ReadCallback(int32_t status,uint64_t size)82 void DataPipeElementReader::ReadCallback(int32_t status, uint64_t size) {
83   if (status == net::OK)
84     size_ = size;
85   if (init_callback_)
86     std::move(init_callback_).Run(status);
87 }
88 
OnHandleReadable(MojoResult result)89 void DataPipeElementReader::OnHandleReadable(MojoResult result) {
90   DCHECK(read_callback_);
91   DCHECK(buf_);
92 
93   // Final result of the Read() call, to be passed to the consumer.
94   int read_result;
95   if (result == MOJO_RESULT_OK) {
96     read_result = ReadInternal(buf_.get(), buf_length_);
97   } else {
98     read_result = net::ERR_FAILED;
99   }
100 
101   buf_ = nullptr;
102   buf_length_ = 0;
103 
104   if (read_result != net::ERR_IO_PENDING)
105     std::move(read_callback_).Run(read_result);
106 }
107 
ReadInternal(net::IOBuffer * buf,int buf_length)108 int DataPipeElementReader::ReadInternal(net::IOBuffer* buf, int buf_length) {
109   DCHECK(buf);
110   DCHECK_GT(buf_length, 0);
111 
112   if (BytesRemaining() == 0)
113     return net::OK;
114 
115   uint32_t num_bytes = buf_length;
116   MojoResult rv =
117       data_pipe_->ReadData(buf->data(), &num_bytes, MOJO_READ_DATA_FLAG_NONE);
118   if (rv == MOJO_RESULT_OK) {
119     bytes_read_ += num_bytes;
120     return num_bytes;
121   }
122 
123   if (rv == MOJO_RESULT_SHOULD_WAIT) {
124     handle_watcher_.ArmOrNotify();
125     return net::ERR_IO_PENDING;
126   }
127 
128   return net::ERR_FAILED;
129 }
130 
131 }  // namespace network
132