1 // Copyright 2017 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 #include "services/network/chunked_data_pipe_upload_data_stream.h"
6 
7 #include "base/bind.h"
8 #include "base/callback.h"
9 #include "base/location.h"
10 #include "base/logging.h"
11 #include "mojo/public/c/system/types.h"
12 #include "net/base/io_buffer.h"
13 
14 namespace network {
15 
ChunkedDataPipeUploadDataStream(scoped_refptr<ResourceRequestBody> resource_request_body,mojo::PendingRemote<mojom::ChunkedDataPipeGetter> chunked_data_pipe_getter)16 ChunkedDataPipeUploadDataStream::ChunkedDataPipeUploadDataStream(
17     scoped_refptr<ResourceRequestBody> resource_request_body,
18     mojo::PendingRemote<mojom::ChunkedDataPipeGetter> chunked_data_pipe_getter)
19     : net::UploadDataStream(true /* is_chunked */,
20                             resource_request_body->identifier()),
21       resource_request_body_(std::move(resource_request_body)),
22       chunked_data_pipe_getter_(std::move(chunked_data_pipe_getter)),
23       handle_watcher_(FROM_HERE,
24                       mojo::SimpleWatcher::ArmingPolicy::MANUAL,
25                       base::SequencedTaskRunnerHandle::Get()) {
26   chunked_data_pipe_getter_.set_disconnect_handler(
27       base::BindOnce(&ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed,
28                      base::Unretained(this)));
29   chunked_data_pipe_getter_->GetSize(
30       base::BindOnce(&ChunkedDataPipeUploadDataStream::OnSizeReceived,
31                      base::Unretained(this)));
32 }
33 
~ChunkedDataPipeUploadDataStream()34 ChunkedDataPipeUploadDataStream::~ChunkedDataPipeUploadDataStream() {}
35 
InitInternal(const net::NetLogWithSource & net_log)36 int ChunkedDataPipeUploadDataStream::InitInternal(
37     const net::NetLogWithSource& net_log) {
38   // If there was an error either passed to the ReadCallback or as a result of
39   // closing the DataPipeGetter pipe, fail the read.
40   if (status_ != net::OK)
41     return status_;
42 
43   // If the data pipe was closed, just fail initialization.
44   if (!chunked_data_pipe_getter_.is_connected())
45     return net::ERR_FAILED;
46 
47   // Get a new data pipe and start.
48   mojo::DataPipe data_pipe;
49   chunked_data_pipe_getter_->StartReading(std::move(data_pipe.producer_handle));
50   data_pipe_ = std::move(data_pipe.consumer_handle);
51 
52   return net::OK;
53 }
54 
ReadInternal(net::IOBuffer * buf,int buf_len)55 int ChunkedDataPipeUploadDataStream::ReadInternal(net::IOBuffer* buf,
56                                                   int buf_len) {
57   DCHECK(!buf_);
58   DCHECK(buf);
59   DCHECK_GT(buf_len, 0);
60 
61   // If there was an error either passed to the ReadCallback or as a result of
62   // closing the DataPipeGetter pipe, fail the read.
63   if (status_ != net::OK)
64     return status_;
65 
66   // Nothing else to do, if the entire body was read.
67   if (size_ && bytes_read_ == *size_) {
68     // This shouldn't be called if the stream was already completed.
69     DCHECK(!IsEOF());
70 
71     SetIsFinalChunk();
72     return net::OK;
73   }
74 
75   // Only start watching once a read starts. This is because OnHandleReadable()
76   // uses |buf_| implicitly assuming that this method has already been called.
77   if (!handle_watcher_.IsWatching()) {
78     handle_watcher_.Watch(
79         data_pipe_.get(),
80         MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
81         base::BindRepeating(&ChunkedDataPipeUploadDataStream::OnHandleReadable,
82                             base::Unretained(this)));
83   }
84 
85   uint32_t num_bytes = buf_len;
86   if (size_ && num_bytes > *size_ - bytes_read_)
87     num_bytes = *size_ - bytes_read_;
88   MojoResult rv =
89       data_pipe_->ReadData(buf->data(), &num_bytes, MOJO_READ_DATA_FLAG_NONE);
90   if (rv == MOJO_RESULT_OK) {
91     bytes_read_ += num_bytes;
92     // Not needed for correctness, but this allows the consumer to send the
93     // final chunk and the end of stream message together, for protocols that
94     // allow it.
95     if (size_ && *size_ == bytes_read_)
96       SetIsFinalChunk();
97     return num_bytes;
98   }
99 
100   if (rv == MOJO_RESULT_SHOULD_WAIT) {
101     handle_watcher_.ArmOrNotify();
102     buf_ = buf;
103     buf_len_ = buf_len;
104     return net::ERR_IO_PENDING;
105   }
106 
107   // The pipe was closed. If the size isn't known yet, could be a success or a
108   // failure.
109   if (!size_) {
110     // Need to keep the buffer around because its presence is used to indicate
111     // that there's a pending UploadDataStream read.
112     buf_ = buf;
113     buf_len_ = buf_len;
114 
115     handle_watcher_.Cancel();
116     data_pipe_.reset();
117     return net::ERR_IO_PENDING;
118   }
119 
120   // |size_| was checked earlier, so if this point is reached, the pipe was
121   // closed before receiving all bytes.
122   DCHECK_LT(bytes_read_, *size_);
123 
124   return net::ERR_FAILED;
125 }
126 
ResetInternal()127 void ChunkedDataPipeUploadDataStream::ResetInternal() {
128   // Init rewinds the stream. Throw away current state, other than |size_| and
129   // |status_|.
130   buf_ = nullptr;
131   buf_len_ = 0;
132   handle_watcher_.Cancel();
133   bytes_read_ = 0;
134   data_pipe_.reset();
135 }
136 
OnSizeReceived(int32_t status,uint64_t size)137 void ChunkedDataPipeUploadDataStream::OnSizeReceived(int32_t status,
138                                                      uint64_t size) {
139   DCHECK(!size_);
140   DCHECK_EQ(net::OK, status_);
141 
142   status_ = status;
143   if (status == net::OK) {
144     size_ = size;
145     if (size == bytes_read_) {
146       // Only set this as a final chunk if there's a read in progress. Setting
147       // it asynchronously could result in confusing consumers.
148       if (buf_)
149         SetIsFinalChunk();
150     } else if (size < bytes_read_ || (buf_ && !data_pipe_.is_valid())) {
151       // If more data was received than was expected, or there's a pending read
152       // and data pipe was closed without passing in as many bytes as expected,
153       // the upload can't continue.  If there's no pending read but the pipe was
154       // closed, the closure and size difference will be noticed on the next
155       // read attempt.
156       status_ = net::ERR_FAILED;
157     }
158   }
159 
160   // If this is done, and there's a pending read, complete the pending read.
161   // If there's not a pending read, either |status_| will be reported on the
162   // next read, the file will be marked as done, so ReadInternal() won't be
163   // called again.
164   if (buf_ && (IsEOF() || status_ != net::OK)) {
165     // |data_pipe_| isn't needed any more, and if it's still open, a close pipe
166     // message would cause issues, since this class normally only watches the
167     // pipe when there's a pending read.
168     handle_watcher_.Cancel();
169     data_pipe_.reset();
170     // Clear |buf_| as well, so it's only non-null while there's a pending read.
171     buf_ = nullptr;
172     buf_len_ = 0;
173 
174     OnReadCompleted(status_);
175 
176     // |this| may have been deleted at this point.
177   }
178 }
179 
OnHandleReadable(MojoResult result)180 void ChunkedDataPipeUploadDataStream::OnHandleReadable(MojoResult result) {
181   DCHECK(buf_);
182 
183   // Final result of the Read() call, to be passed to the consumer.
184   // Swap out |buf_| and |buf_len_|
185   scoped_refptr<net::IOBuffer> buf(std::move(buf_));
186   int buf_len = buf_len_;
187   buf_len_ = 0;
188 
189   int rv = ReadInternal(buf.get(), buf_len);
190 
191   if (rv != net::ERR_IO_PENDING)
192     OnReadCompleted(rv);
193 
194   // |this| may have been deleted at this point.
195 }
196 
OnDataPipeGetterClosed()197 void ChunkedDataPipeUploadDataStream::OnDataPipeGetterClosed() {
198   // If the size hasn't been received yet, treat this as receiving an error.
199   // Otherwise, this will only be a problem if/when InitInternal() tries to
200   // start reading again, so do nothing.
201   if (!size_)
202     OnSizeReceived(net::ERR_FAILED, 0);
203 }
204 
205 }  // namespace network
206