1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 #include "nsStreamTransportService.h"
6 #include "nsXPCOMCIDInternal.h"
7 #include "nsNetSegmentUtils.h"
8 #include "nsTransportUtils.h"
9 #include "nsStreamUtils.h"
10 #include "nsError.h"
11 #include "nsNetCID.h"
12 
13 #include "nsIAsyncInputStream.h"
14 #include "nsIAsyncOutputStream.h"
15 #include "nsISeekableStream.h"
16 #include "nsIPipe.h"
17 #include "nsITransport.h"
18 #include "nsIObserverService.h"
19 #include "nsIThreadPool.h"
20 #include "mozilla/Services.h"
21 
22 namespace mozilla {
23 namespace net {
24 
25 //-----------------------------------------------------------------------------
26 // nsInputStreamTransport
27 //
28 // Implements nsIInputStream as a wrapper around the real input stream.  This
29 // allows the transport to support seeking, range-limiting, progress reporting,
30 // and close-when-done semantics while utilizing NS_AsyncCopy.
31 //-----------------------------------------------------------------------------
32 
33 class nsInputStreamTransport : public nsITransport, public nsIInputStream {
34  public:
35   NS_DECL_THREADSAFE_ISUPPORTS
36   NS_DECL_NSITRANSPORT
37   NS_DECL_NSIINPUTSTREAM
38 
nsInputStreamTransport(nsIInputStream * source,bool closeWhenDone)39   nsInputStreamTransport(nsIInputStream *source, bool closeWhenDone)
40       : mSource(source),
41         mOffset(0),
42         mCloseWhenDone(closeWhenDone),
43         mInProgress(false) {}
44 
45  private:
~nsInputStreamTransport()46   virtual ~nsInputStreamTransport() {}
47 
48   nsCOMPtr<nsIAsyncInputStream> mPipeIn;
49 
50   // while the copy is active, these members may only be accessed from the
51   // nsIInputStream implementation.
52   nsCOMPtr<nsITransportEventSink> mEventSink;
53   nsCOMPtr<nsIInputStream> mSource;
54   int64_t mOffset;
55   bool mCloseWhenDone;
56 
57   // this variable serves as a lock to prevent the state of the transport
58   // from being modified once the copy is in progress.
59   bool mInProgress;
60 };
61 
NS_IMPL_ISUPPORTS(nsInputStreamTransport,nsITransport,nsIInputStream)62 NS_IMPL_ISUPPORTS(nsInputStreamTransport, nsITransport, nsIInputStream)
63 
64 /** nsITransport **/
65 
66 NS_IMETHODIMP
67 nsInputStreamTransport::OpenInputStream(uint32_t flags, uint32_t segsize,
68                                         uint32_t segcount,
69                                         nsIInputStream **result) {
70   NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
71 
72   nsresult rv;
73   nsCOMPtr<nsIEventTarget> target =
74       do_GetService(NS_STREAMTRANSPORTSERVICE_CONTRACTID, &rv);
75   if (NS_FAILED(rv)) return rv;
76 
77   // XXX if the caller requests an unbuffered stream, then perhaps
78   //     we'd want to simply return mSource; however, then we would
79   //     not be reading mSource on a background thread.  is this ok?
80 
81   bool nonblocking = !(flags & OPEN_BLOCKING);
82 
83   net_ResolveSegmentParams(segsize, segcount);
84 
85   nsCOMPtr<nsIAsyncOutputStream> pipeOut;
86   rv = NS_NewPipe2(getter_AddRefs(mPipeIn), getter_AddRefs(pipeOut),
87                    nonblocking, true, segsize, segcount);
88   if (NS_FAILED(rv)) return rv;
89 
90   mInProgress = true;
91 
92   // startup async copy process...
93   rv = NS_AsyncCopy(this, pipeOut, target, NS_ASYNCCOPY_VIA_WRITESEGMENTS,
94                     segsize);
95   if (NS_SUCCEEDED(rv)) NS_ADDREF(*result = mPipeIn);
96 
97   return rv;
98 }
99 
100 NS_IMETHODIMP
OpenOutputStream(uint32_t flags,uint32_t segsize,uint32_t segcount,nsIOutputStream ** result)101 nsInputStreamTransport::OpenOutputStream(uint32_t flags, uint32_t segsize,
102                                          uint32_t segcount,
103                                          nsIOutputStream **result) {
104   // this transport only supports reading!
105   NS_NOTREACHED("nsInputStreamTransport::OpenOutputStream");
106   return NS_ERROR_UNEXPECTED;
107 }
108 
109 NS_IMETHODIMP
Close(nsresult reason)110 nsInputStreamTransport::Close(nsresult reason) {
111   if (NS_SUCCEEDED(reason)) reason = NS_BASE_STREAM_CLOSED;
112 
113   return mPipeIn->CloseWithStatus(reason);
114 }
115 
116 NS_IMETHODIMP
SetEventSink(nsITransportEventSink * sink,nsIEventTarget * target)117 nsInputStreamTransport::SetEventSink(nsITransportEventSink *sink,
118                                      nsIEventTarget *target) {
119   NS_ENSURE_TRUE(!mInProgress, NS_ERROR_IN_PROGRESS);
120 
121   if (target)
122     return net_NewTransportEventSinkProxy(getter_AddRefs(mEventSink), sink,
123                                           target);
124 
125   mEventSink = sink;
126   return NS_OK;
127 }
128 
129 /** nsIInputStream **/
130 
131 NS_IMETHODIMP
Close()132 nsInputStreamTransport::Close() {
133   if (mCloseWhenDone) mSource->Close();
134 
135   // make additional reads return early...
136   mOffset = 0;
137   return NS_OK;
138 }
139 
140 NS_IMETHODIMP
Available(uint64_t * result)141 nsInputStreamTransport::Available(uint64_t *result) {
142   return NS_ERROR_NOT_IMPLEMENTED;
143 }
144 
145 NS_IMETHODIMP
Read(char * buf,uint32_t count,uint32_t * result)146 nsInputStreamTransport::Read(char *buf, uint32_t count, uint32_t *result) {
147   nsresult rv = mSource->Read(buf, count, result);
148 
149   if (NS_SUCCEEDED(rv)) {
150     mOffset += *result;
151     if (mEventSink)
152       mEventSink->OnTransportStatus(this, NS_NET_STATUS_READING, mOffset, -1);
153   }
154   return rv;
155 }
156 
157 NS_IMETHODIMP
ReadSegments(nsWriteSegmentFun writer,void * closure,uint32_t count,uint32_t * result)158 nsInputStreamTransport::ReadSegments(nsWriteSegmentFun writer, void *closure,
159                                      uint32_t count, uint32_t *result) {
160   return NS_ERROR_NOT_IMPLEMENTED;
161 }
162 
163 NS_IMETHODIMP
IsNonBlocking(bool * result)164 nsInputStreamTransport::IsNonBlocking(bool *result) {
165   *result = false;
166   return NS_OK;
167 }
168 
169 //-----------------------------------------------------------------------------
170 // nsStreamTransportService
171 //-----------------------------------------------------------------------------
172 
~nsStreamTransportService()173 nsStreamTransportService::~nsStreamTransportService() {
174   NS_ASSERTION(!mPool, "thread pool wasn't shutdown");
175 }
176 
Init()177 nsresult nsStreamTransportService::Init() {
178   mPool = do_CreateInstance(NS_THREADPOOL_CONTRACTID);
179   NS_ENSURE_STATE(mPool);
180 
181   // Configure the pool
182   mPool->SetName(NS_LITERAL_CSTRING("StreamTrans"));
183   mPool->SetThreadLimit(25);
184   mPool->SetIdleThreadLimit(1);
185   mPool->SetIdleThreadTimeout(PR_SecondsToInterval(30));
186 
187   nsCOMPtr<nsIObserverService> obsSvc = mozilla::services::GetObserverService();
188   if (obsSvc) obsSvc->AddObserver(this, "xpcom-shutdown-threads", false);
189   return NS_OK;
190 }
191 
NS_IMPL_ISUPPORTS(nsStreamTransportService,nsIStreamTransportService,nsIEventTarget,nsIObserver)192 NS_IMPL_ISUPPORTS(nsStreamTransportService, nsIStreamTransportService,
193                   nsIEventTarget, nsIObserver)
194 
195 NS_IMETHODIMP
196 nsStreamTransportService::DispatchFromScript(nsIRunnable *task,
197                                              uint32_t flags) {
198   nsCOMPtr<nsIRunnable> event(task);
199   return Dispatch(event.forget(), flags);
200 }
201 
202 NS_IMETHODIMP
Dispatch(already_AddRefed<nsIRunnable> task,uint32_t flags)203 nsStreamTransportService::Dispatch(already_AddRefed<nsIRunnable> task,
204                                    uint32_t flags) {
205   nsCOMPtr<nsIRunnable> event(task);  // so it gets released on failure paths
206   nsCOMPtr<nsIThreadPool> pool;
207   {
208     mozilla::MutexAutoLock lock(mShutdownLock);
209     if (mIsShutdown) {
210       return NS_ERROR_NOT_INITIALIZED;
211     }
212     pool = mPool;
213   }
214   NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
215   return pool->Dispatch(event.forget(), flags);
216 }
217 
218 NS_IMETHODIMP
DelayedDispatch(already_AddRefed<nsIRunnable>,uint32_t)219 nsStreamTransportService::DelayedDispatch(already_AddRefed<nsIRunnable>,
220                                           uint32_t) {
221   return NS_ERROR_NOT_IMPLEMENTED;
222 }
223 
NS_IMETHODIMP_(bool)224 NS_IMETHODIMP_(bool)
225 nsStreamTransportService::IsOnCurrentThreadInfallible() {
226   nsCOMPtr<nsIThreadPool> pool;
227   {
228     mozilla::MutexAutoLock lock(mShutdownLock);
229     pool = mPool;
230   }
231   if (!pool) {
232     return false;
233   }
234   return pool->IsOnCurrentThread();
235 }
236 
237 NS_IMETHODIMP
IsOnCurrentThread(bool * result)238 nsStreamTransportService::IsOnCurrentThread(bool *result) {
239   nsCOMPtr<nsIThreadPool> pool;
240   {
241     mozilla::MutexAutoLock lock(mShutdownLock);
242     if (mIsShutdown) {
243       return NS_ERROR_NOT_INITIALIZED;
244     }
245     pool = mPool;
246   }
247   NS_ENSURE_TRUE(pool, NS_ERROR_NOT_INITIALIZED);
248   return pool->IsOnCurrentThread(result);
249 }
250 
251 NS_IMETHODIMP
CreateInputTransport(nsIInputStream * stream,bool closeWhenDone,nsITransport ** result)252 nsStreamTransportService::CreateInputTransport(nsIInputStream *stream,
253                                                bool closeWhenDone,
254                                                nsITransport **result) {
255   nsInputStreamTransport *trans =
256       new nsInputStreamTransport(stream, closeWhenDone);
257   if (!trans) return NS_ERROR_OUT_OF_MEMORY;
258   NS_ADDREF(*result = trans);
259   return NS_OK;
260 }
261 
262 NS_IMETHODIMP
Observe(nsISupports * subject,const char * topic,const char16_t * data)263 nsStreamTransportService::Observe(nsISupports *subject, const char *topic,
264                                   const char16_t *data) {
265   NS_ASSERTION(strcmp(topic, "xpcom-shutdown-threads") == 0, "oops");
266 
267   {
268     mozilla::MutexAutoLock lock(mShutdownLock);
269     mIsShutdown = true;
270   }
271 
272   if (mPool) {
273     mPool->Shutdown();
274     mPool = nullptr;
275   }
276   return NS_OK;
277 }
278 
279 class AvailableEvent final : public Runnable {
280  public:
AvailableEvent(nsIInputStream * stream,nsIInputAvailableCallback * callback)281   AvailableEvent(nsIInputStream *stream, nsIInputAvailableCallback *callback)
282       : Runnable("net::AvailableEvent"),
283         mStream(stream),
284         mCallback(callback),
285         mDoingCallback(false),
286         mSize(0),
287         mResultForCallback(NS_OK) {
288     mCallbackTarget = GetCurrentThreadEventTarget();
289   }
290 
Run()291   NS_IMETHOD Run() override {
292     if (mDoingCallback) {
293       // pong
294       mCallback->OnInputAvailableComplete(mSize, mResultForCallback);
295       mCallback = nullptr;
296     } else {
297       // ping
298       mResultForCallback = mStream->Available(&mSize);
299       mStream = nullptr;
300       mDoingCallback = true;
301 
302       nsCOMPtr<nsIRunnable> event(this);  // overly cute
303       mCallbackTarget->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
304       mCallbackTarget = nullptr;
305     }
306     return NS_OK;
307   }
308 
309  private:
~AvailableEvent()310   virtual ~AvailableEvent() {}
311 
312   nsCOMPtr<nsIInputStream> mStream;
313   nsCOMPtr<nsIInputAvailableCallback> mCallback;
314   nsCOMPtr<nsIEventTarget> mCallbackTarget;
315   bool mDoingCallback;
316   uint64_t mSize;
317   nsresult mResultForCallback;
318 };
319 
320 NS_IMETHODIMP
InputAvailable(nsIInputStream * stream,nsIInputAvailableCallback * callback)321 nsStreamTransportService::InputAvailable(nsIInputStream *stream,
322                                          nsIInputAvailableCallback *callback) {
323   nsCOMPtr<nsIThreadPool> pool;
324   {
325     mozilla::MutexAutoLock lock(mShutdownLock);
326     if (mIsShutdown) {
327       return NS_ERROR_NOT_INITIALIZED;
328     }
329     pool = mPool;
330   }
331   nsCOMPtr<nsIRunnable> event = new AvailableEvent(stream, callback);
332   return pool->Dispatch(event.forget(), NS_DISPATCH_NORMAL);
333 }
334 
335 }  // namespace net
336 }  // namespace mozilla
337