1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5 #include "nsStreamTransportService.h"
6 #include "nsXPCOMCIDInternal.h"
7 #include "nsNetSegmentUtils.h"
8 #include "nsTransportUtils.h"
9 #include "nsStreamUtils.h"
10 #include "nsError.h"
11 #include "nsNetCID.h"
12
13 #include "nsIAsyncInputStream.h"
14 #include "nsIAsyncOutputStream.h"
15 #include "nsISeekableStream.h"
16 #include "nsIPipe.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsIThreadPool.h"
20 #include "mozilla/Services.h"
21
22 namespace mozilla {
23 namespace net {
24
25 //-----------------------------------------------------------------------------
26 // nsInputStreamTransport
27 //
28 // Implements nsIInputStream as a wrapper around the real input stream. This
29 // allows the transport to support seeking, range-limiting, progress reporting,
30 // and close-when-done semantics while utilizing NS_AsyncCopy.
31 //-----------------------------------------------------------------------------
32
33 class nsInputStreamTransport : public nsITransport, public nsIInputStream {
34 public:
35 NS_DECL_THREADSAFE_ISUPPORTS
36 NS_DECL_NSITRANSPORT
37 NS_DECL_NSIINPUTSTREAM
38
nsInputStreamTransport(nsIInputStream * source,bool closeWhenDone)39 nsInputStreamTransport(nsIInputStream *source, bool closeWhenDone)
40 : mSource(source),
41 mOffset(0),
42 mCloseWhenDone(closeWhenDone),
43 mInProgress(false) {}
44
45 private:
~nsInputStreamTransport()46 virtual ~nsInputStreamTransport() {}
47
48 nsCOMPtr<nsIAsyncInputStream> mPipeIn;
49
50 // while the copy is active, these members may only be accessed from the
51 // nsIInputStream implementation.
52 nsCOMPtr<nsITransportEventSink> mEventSink;
53 nsCOMPtr<nsIInputStream> mSource;
54 int64_t mOffset;
55 bool mCloseWhenDone;
56
57 // this variable serves as a lock to prevent the state of the transport
58 // from being modified once the copy is in progress.
59 bool mInProgress;
60 };
61
NS_IMPL_ISUPPORTS(nsInputStreamTransport,nsITransport,nsIInputStream)62 NS_IMPL_ISUPPORTS(nsInputStreamTransport, nsITransport, nsIInputStream)
63
64 /** nsITransport **/
65
66 NS_IMETHODIMP
67 nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize,
68 uint32_t segcount,
69 nsIInputStream **result) {
70 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
71
72 nsresult rv;
73 nsCOMPtr<nsIEventTarget> target =
74 do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
75 if (NS_FAILED(rv)) return rv;
76
77 // XXX if the caller requests an unbuffered stream, then perhaps
78 // we'd want to simply return mSource; however, then we would
79 // not be reading mSource on a background thread. is this ok?
80
81 bool nonblocking = !(flags & OPEN_BLOCKING);
82
83 net_ResolveSegmentParams(segsize, segcount);
84
85 nsCOMPtr<nsIAsyncOutputStream> pipeOut;
86 rv = NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut),
87 nonblocking, true, segsize, segcount);
88 if (NS_FAILED(rv)) return rv;
89
90 mInProgress = true;
91
92 // startup async copy process...
93 rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
94 segsize);
95 if (NS_SUCCEEDED(rv)) NS_ADDREF(*result = mPipeIn);
96
97 return rv;
98 }
99
100 NS_IMETHODIMP
OpenOutputStream(uint32_t flags,uint32_t segsize,uint32_t segcount,nsIOutputStream ** result)101 nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize,
102 uint32_t segcount,
103 nsIOutputStream **result) {
104 // this transport only supports reading!
105 NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
106 return NS_ERROR_UNEXPECTED;
107 }
108
109 NS_IMETHODIMP
Close(nsresult reason)110 nsInputStreamTransport::Close(nsresult reason) {
111 if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED;
112
113 return mPipeIn->CloseWithStatus(reason);
114 }
115
116 NS_IMETHODIMP
SetEventSink(nsITransportEventSink * sink,nsIEventTarget * target)117 nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink,
118 nsIEventTarget *target) {
119 NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
120
121 if (target)
122 return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink,
123 target);
124
125 mEventSink = sink;
126 return NS_OK;
127 }
128
129 /** nsIInputStream **/
130
131 NS_IMETHODIMP
Close()132 nsInputStreamTransport::Close() {
133 if (mCloseWhenDone) mSource->Close();
134
135 // make additional reads return early...
136 mOffset = 0;
137 return NS_OK;
138 }
139
140 NS_IMETHODIMP
Available(uint64_t * result)141 nsInputStreamTransport::Available(uint64_t *result) {
142 return NS_ERROR_NOT_IMPLEMENTED;
143 }
144
145 NS_IMETHODIMP
Read(char * buf,uint32_t count,uint32_t * result)146 nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) {
147 nsresult rv = mSource->Read(buf, count, result);
148
149 if (NS_SUCCEEDED(rv)) {
150 mOffset += *result;
151 if (mEventSink)
152 mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1);
153 }
154 return rv;
155 }
156
157 NS_IMETHODIMP
ReadSegments(nsWriteSegmentFun writer,void * closure,uint32_t count,uint32_t * result)158 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure,
159 uint32_t count, uint32_t *result) {
160 return NS_ERROR_NOT_IMPLEMENTED;
161 }
162
163 NS_IMETHODIMP
IsNonBlocking(bool * result)164 nsInputStreamTransport::IsNonBlocking(bool *result) {
165 *result = false;
166 return NS_OK;
167 }
168
169 //-----------------------------------------------------------------------------
170 // nsStreamTransportService
171 //-----------------------------------------------------------------------------
172
~nsStreamTransportService()173 nsStreamTransportService::~nsStreamTransportService() {
174 NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
175 }
176
Init()177 nsresult nsStreamTransportService::Init() {
178 mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID);
179 NS_ENSURE_STATE(mPool);
180
181 // Configure the pool
182 mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
183 mPool->SetThreadLimit(25);
184 mPool->SetIdleThreadLimit(1);
185 mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
186
187 nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService();
188 if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
189 return NS_OK;
190 }
191
NS_IMPL_ISUPPORTS(nsStreamTransportService,nsIStreamTransportService,nsIEventTarget,nsIObserver)192 NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService,
193 nsIEventTarget, nsIObserver)
194
195 NS_IMETHODIMP
196 nsStreamTransportService::DispatchFromScript(nsIRunnable *task,
197 uint32_t flags) {
198 nsCOMPtr<nsIRunnable> event(task);
199 return Dispatch(event.forget(), flags);
200 }
201
202 NS_IMETHODIMP
Dispatch(already_AddRefed<nsIRunnable> task,uint32_t flags)203 nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task,
204 uint32_t flags) {
205 nsCOMPtr<nsIRunnable> event(task); // so it gets released on failure paths
206 nsCOMPtr<nsIThreadPool> pool;
207 {
208 mozilla::MutexAutoLock lock(mShutdownLock);
209 if (mIsShutdown) {
210 return NS_ERROR_NOT_INITIALIZED;
211 }
212 pool = mPool;
213 }
214 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
215 return pool->Dispatch(event.forget(), flags);
216 }
217
218 NS_IMETHODIMP
DelayedDispatch(already_AddRefed<nsIRunnable>,uint32_t)219 nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>,
220 uint32_t) {
221 return NS_ERROR_NOT_IMPLEMENTED;
222 }
223
NS_IMETHODIMP_(bool)224 NS_IMETHODIMP_(bool)
225 nsStreamTransportService::IsOnCurrentThreadInfallible() {
226 nsCOMPtr<nsIThreadPool> pool;
227 {
228 mozilla::MutexAutoLock lock(mShutdownLock);
229 pool = mPool;
230 }
231 if (!pool) {
232 return false;
233 }
234 return pool->IsOnCurrentThread();
235 }
236
237 NS_IMETHODIMP
IsOnCurrentThread(bool * result)238 nsStreamTransportService::IsOnCurrentThread(bool *result) {
239 nsCOMPtr<nsIThreadPool> pool;
240 {
241 mozilla::MutexAutoLock lock(mShutdownLock);
242 if (mIsShutdown) {
243 return NS_ERROR_NOT_INITIALIZED;
244 }
245 pool = mPool;
246 }
247 NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
248 return pool->IsOnCurrentThread(result);
249 }
250
251 NS_IMETHODIMP
CreateInputTransport(nsIInputStream * stream,bool closeWhenDone,nsITransport ** result)252 nsStreamTransportService::CreateInputTransport(nsIInputStream *stream,
253 bool closeWhenDone,
254 nsITransport **result) {
255 nsInputStreamTransport *trans =
256 new nsInputStreamTransport(stream, closeWhenDone);
257 if (!trans) return NS_ERROR_OUT_OF_MEMORY;
258 NS_ADDREF(*result = trans);
259 return NS_OK;
260 }
261
262 NS_IMETHODIMP
Observe(nsISupports * subject,const char * topic,const char16_t * data)263 nsStreamTransportService::Observe(nsISupports *subject, const char *topic,
264 const char16_t *data) {
265 NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
266
267 {
268 mozilla::MutexAutoLock lock(mShutdownLock);
269 mIsShutdown = true;
270 }
271
272 if (mPool) {
273 mPool->Shutdown();
274 mPool = nullptr;
275 }
276 return NS_OK;
277 }
278
279 class AvailableEvent final : public Runnable {
280 public:
AvailableEvent(nsIInputStream * stream,nsIInputAvailableCallback * callback)281 AvailableEvent(nsIInputStream *stream, nsIInputAvailableCallback *callback)
282 : Runnable("net::AvailableEvent"),
283 mStream(stream),
284 mCallback(callback),
285 mDoingCallback(false),
286 mSize(0),
287 mResultForCallback(NS_OK) {
288 mCallbackTarget = GetCurrentThreadEventTarget();
289 }
290
Run()291 NS_IMETHOD Run() override {
292 if (mDoingCallback) {
293 // pong
294 mCallback->OnInputAvailableComplete(mSize, mResultForCallback);
295 mCallback = nullptr;
296 } else {
297 // ping
298 mResultForCallback = mStream->Available(&mSize);
299 mStream = nullptr;
300 mDoingCallback = true;
301
302 nsCOMPtr<nsIRunnable> event(this); // overly cute
303 mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
304 mCallbackTarget = nullptr;
305 }
306 return NS_OK;
307 }
308
309 private:
~AvailableEvent()310 virtual ~AvailableEvent() {}
311
312 nsCOMPtr<nsIInputStream> mStream;
313 nsCOMPtr<nsIInputAvailableCallback> mCallback;
314 nsCOMPtr<nsIEventTarget> mCallbackTarget;
315 bool mDoingCallback;
316 uint64_t mSize;
317 nsresult mResultForCallback;
318 };
319
320 NS_IMETHODIMP
InputAvailable(nsIInputStream * stream,nsIInputAvailableCallback * callback)321 nsStreamTransportService::InputAvailable(nsIInputStream *stream,
322 nsIInputAvailableCallback *callback) {
323 nsCOMPtr<nsIThreadPool> pool;
324 {
325 mozilla::MutexAutoLock lock(mShutdownLock);
326 if (mIsShutdown) {
327 return NS_ERROR_NOT_INITIALIZED;
328 }
329 pool = mPool;
330 }
331 nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback);
332 return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
333 }
334
335 } // namespace net
336 } // namespace mozilla
337