1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5  * You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include <algorithm>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #if !defined(__Userspace_os_Windows)
11 #include <arpa/inet.h>
12 #endif
13 // usrsctp.h expects to have errno definitions prior to its inclusion.
14 #include <errno.h>
15 
16 #define SCTP_DEBUG 1
17 #define SCTP_STDINT_INCLUDE <stdint.h>
18 
19 #ifdef _MSC_VER
20 // Disable "warning C4200: nonstandard extension used : zero-sized array in
21 //          struct/union"
22 // ...which the third-party file usrsctp.h runs afoul of.
23 #pragma warning(push)
24 #pragma warning(disable:4200)
25 #endif
26 
27 #include "usrsctp.h"
28 
29 #ifdef _MSC_VER
30 #pragma warning(pop)
31 #endif
32 
33 #include "DataChannelLog.h"
34 
35 #include "nsServiceManagerUtils.h"
36 #include "nsIObserverService.h"
37 #include "nsIObserver.h"
38 #include "mozilla/Services.h"
39 #include "mozilla/Sprintf.h"
40 #include "nsProxyRelease.h"
41 #include "nsThread.h"
42 #include "nsThreadUtils.h"
43 #include "nsAutoPtr.h"
44 #include "nsNetUtil.h"
45 #include "nsNetCID.h"
46 #include "mozilla/StaticPtr.h"
47 #include "mozilla/Unused.h"
48 #ifdef MOZ_PEERCONNECTION
49 #include "mtransport/runnable_utils.h"
50 #endif
51 
52 #define DATACHANNEL_LOG(args) LOG(args)
53 #include "DataChannel.h"
54 #include "DataChannelProtocol.h"
55 
56 // Let us turn on and off important assertions in non-debug builds
57 #ifdef DEBUG
58 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
59 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
60 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
61 #endif
62 
63 static bool sctp_initialized;
64 
65 namespace mozilla {
66 
67 LazyLogModule gDataChannelLog("DataChannel");
68 static LazyLogModule gSCTPLog("SCTP");
69 
70 class DataChannelShutdown : public nsIObserver
71 {
72 public:
73   // This needs to be tied to some form object that is guaranteed to be
74   // around (singleton likely) unless we want to shutdown sctp whenever
75   // we're not using it (and in which case we'd keep a refcnt'd object
76   // ref'd by each DataChannelConnection to release the SCTP usrlib via
77   // sctp_finish). Right now, the single instance of this class is
78   // owned by the observer service.
79 
80   NS_DECL_ISUPPORTS
81 
DataChannelShutdown()82   DataChannelShutdown() {}
83 
Init()84   void Init()
85     {
86       nsCOMPtr<nsIObserverService> observerService =
87         mozilla::services::GetObserverService();
88       if (!observerService)
89         return;
90 
91       nsresult rv = observerService->AddObserver(this,
92                                                  "xpcom-will-shutdown",
93                                                  false);
94       MOZ_ASSERT(rv == NS_OK);
95       (void) rv;
96     }
97 
98 private:
99   // The only instance of DataChannelShutdown is owned by the observer
100   // service, so there is no need to call RemoveObserver here.
101   virtual ~DataChannelShutdown() = default;
102 
103 public:
Observe(nsISupports * aSubject,const char * aTopic,const char16_t * aData)104   NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
105                      const char16_t* aData) override {
106     if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
107       LOG(("Shutting down SCTP"));
108       if (sctp_initialized) {
109         usrsctp_finish();
110         sctp_initialized = false;
111       }
112       nsCOMPtr<nsIObserverService> observerService =
113         mozilla::services::GetObserverService();
114       if (!observerService)
115         return NS_ERROR_FAILURE;
116 
117       nsresult rv = observerService->RemoveObserver(this,
118                                                     "xpcom-will-shutdown");
119       MOZ_ASSERT(rv == NS_OK);
120       (void) rv;
121     }
122     return NS_OK;
123   }
124 };
125 
126 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
127 
BufferedMsg(struct sctp_sendv_spa & spa,const char * data,size_t length)128 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
129                          size_t length) : mLength(length)
130 {
131   mSpa = new sctp_sendv_spa;
132   *mSpa = spa;
133   auto *tmp = new char[length]; // infallible malloc!
134   memcpy(tmp, data, length);
135   mData = tmp;
136 }
137 
~BufferedMsg()138 BufferedMsg::~BufferedMsg()
139 {
140   delete mSpa;
141   delete mData;
142 }
143 
144 static int
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv,int flags,void * ulp_info)145 receive_cb(struct socket* sock, union sctp_sockstore addr,
146            void *data, size_t datalen,
147            struct sctp_rcvinfo rcv, int flags, void *ulp_info)
148 {
149   DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
150   return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
151 }
152 
153 static
154 DataChannelConnection *
GetConnectionFromSocket(struct socket * sock)155 GetConnectionFromSocket(struct socket* sock)
156 {
157   struct sockaddr *addrs = nullptr;
158   int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
159   if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
160     return nullptr;
161   }
162   // usrsctp_getladdrs() returns the addresses bound to this socket, which
163   // contains the SctpDataMediaChannel* as sconn_addr.  Read the pointer,
164   // then free the list of addresses once we have the pointer.  We only open
165   // AF_CONN sockets, and they should all have the sconn_addr set to the
166   // pointer that created them, so [0] is as good as any other.
167   struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
168   DataChannelConnection *connection =
169     reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
170   usrsctp_freeladdrs(addrs);
171 
172   return connection;
173 }
174 
175 // called when the buffer empties to the threshold value
176 static int
threshold_event(struct socket * sock,uint32_t sb_free)177 threshold_event(struct socket* sock, uint32_t sb_free)
178 {
179   DataChannelConnection *connection = GetConnectionFromSocket(sock);
180   if (connection) {
181     LOG(("SendDeferred()"));
182     connection->SendDeferredMessages();
183   } else {
184     LOG(("Can't find connection for socket %p", sock));
185   }
186   return 0;
187 }
188 
189 static void
debug_printf(const char * format,...)190 debug_printf(const char *format, ...)
191 {
192   va_list ap;
193   char buffer[1024];
194 
195   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
196     va_start(ap, format);
197 #ifdef _WIN32
198     if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
199 #else
200     if (VsprintfLiteral(buffer, format, ap) > 0) {
201 #endif
202       PR_LogPrint("%s", buffer);
203     }
204     va_end(ap);
205   }
206 }
207 
208 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
209    mLock("netwerk::sctp::DataChannelConnection")
210 {
211   mState = CLOSED;
212   mSocket = nullptr;
213   mMasterSocket = nullptr;
214   mListener = listener;
215   mLocalPort = 0;
216   mRemotePort = 0;
217   LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
218   mInternalIOThread = nullptr;
219 }
220 
221 DataChannelConnection::~DataChannelConnection()
222 {
223   LOG(("Deleting DataChannelConnection %p", (void *) this));
224   // This may die on the MainThread, or on the STS thread
225   ASSERT_WEBRTC(mState == CLOSED);
226   MOZ_ASSERT(!mMasterSocket);
227   MOZ_ASSERT(mPending.GetSize() == 0);
228 
229   // Already disconnected from sigslot/mTransportFlow
230   // TransportFlows must be released from the STS thread
231   if (!IsSTSThread()) {
232     ASSERT_WEBRTC(NS_IsMainThread());
233     if (mTransportFlow) {
234       ASSERT_WEBRTC(mSTS);
235       NS_ProxyRelease(mSTS, mTransportFlow.forget());
236     }
237 
238     if (mInternalIOThread) {
239       // Avoid spinning the event thread from here (which if we're mainthread
240       // is in the event loop already)
241       NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
242                                            &nsIThread::Shutdown),
243                               NS_DISPATCH_NORMAL);
244     }
245   } else {
246     // on STS, safe to call shutdown
247     if (mInternalIOThread) {
248       mInternalIOThread->Shutdown();
249     }
250   }
251 }
252 
253 void
254 DataChannelConnection::Destroy()
255 {
256   // Though it's probably ok to do this and close the sockets;
257   // if we really want it to do true clean shutdowns it can
258   // create a dependant Internal object that would remain around
259   // until the network shut down the association or timed out.
260   LOG(("Destroying DataChannelConnection %p", (void *) this));
261   ASSERT_WEBRTC(NS_IsMainThread());
262   CloseAll();
263 
264   MutexAutoLock lock(mLock);
265   // If we had a pending reset, we aren't waiting for it - clear the list so
266   // we can deregister this DataChannelConnection without leaking.
267   ClearResets();
268 
269   MOZ_ASSERT(mSTS);
270   ASSERT_WEBRTC(NS_IsMainThread());
271   // Must do this in Destroy() since we may then delete this object.
272   // Do this before dispatching to create a consistent ordering of calls to
273   // the SCTP stack.
274   if (mUsingDtls) {
275     usrsctp_deregister_address(static_cast<void *>(this));
276     LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
277   }
278 
279   // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
280   // the usrsctp_close() calls can move back here (and just proxy the
281   // disconnect_all())
282   RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
283                                    &DataChannelConnection::DestroyOnSTS,
284                                    mSocket, mMasterSocket),
285                 NS_DISPATCH_NORMAL);
286 
287   // These will be released on STS
288   mSocket = nullptr;
289   mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
290 
291   // We can't get any more new callbacks from the SCTP library
292   // All existing callbacks have refs to DataChannelConnection
293 
294   // nsDOMDataChannel objects have refs to DataChannels that have refs to us
295 }
296 
297 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
298                                          struct socket *aSocket)
299 {
300   if (aSocket && aSocket != aMasterSocket)
301     usrsctp_close(aSocket);
302   if (aMasterSocket)
303     usrsctp_close(aMasterSocket);
304 
305   disconnect_all();
306 }
307 
308 bool
309 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
310 {
311   struct sctp_initmsg initmsg;
312   struct sctp_udpencaps encaps;
313   struct sctp_assoc_value av;
314   struct sctp_event event;
315   socklen_t len;
316 
317   uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
318                             SCTP_PEER_ADDR_CHANGE,
319                             SCTP_REMOTE_ERROR,
320                             SCTP_SHUTDOWN_EVENT,
321                             SCTP_ADAPTATION_INDICATION,
322                             SCTP_SEND_FAILED_EVENT,
323                             SCTP_STREAM_RESET_EVENT,
324                             SCTP_STREAM_CHANGE_EVENT};
325   {
326     ASSERT_WEBRTC(NS_IsMainThread());
327 
328     // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
329     if (!sctp_initialized) {
330       if (aUsingDtls) {
331         LOG(("sctp_init(DTLS)"));
332 #ifdef MOZ_PEERCONNECTION
333         usrsctp_init(0,
334                      DataChannelConnection::SctpDtlsOutput,
335                      debug_printf
336                     );
337 #else
338         NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
339 #endif
340       } else {
341         LOG(("sctp_init(%u)", aPort));
342         usrsctp_init(aPort,
343                      nullptr,
344                      debug_printf
345                     );
346       }
347 
348       // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
349       if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
350         usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
351       }
352 
353       usrsctp_sysctl_set_sctp_blackhole(2);
354       // ECN is currently not supported by the Firefox code
355       usrsctp_sysctl_set_sctp_ecn_enable(0);
356       sctp_initialized = true;
357 
358       RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
359       shutdown->Init();
360     }
361   }
362 
363   // XXX FIX! make this a global we get once
364   // Find the STS thread
365   nsresult rv;
366   mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
367   MOZ_ASSERT(NS_SUCCEEDED(rv));
368 
369   // Open sctp with a callback
370   if ((mMasterSocket = usrsctp_socket(
371          aUsingDtls ? AF_CONN : AF_INET,
372          SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
373          usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
374     return false;
375   }
376 
377   // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
378   // in associations for normal IO
379   if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
380     LOG(("Couldn't set non_blocking on SCTP socket"));
381     // We can't handle connect() safely if it will block, not that this will
382     // even happen.
383     goto error_cleanup;
384   }
385 
386   // Make sure when we close the socket, make sure it doesn't call us back again!
387   // This would cause it try to use an invalid DataChannelConnection pointer
388   struct linger l;
389   l.l_onoff = 1;
390   l.l_linger = 0;
391   if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
392                          (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
393     LOG(("Couldn't set SO_LINGER on SCTP socket"));
394     // unsafe to allow it to continue if this fails
395     goto error_cleanup;
396   }
397 
398   // XXX Consider disabling this when we add proper SDP negotiation.
399   // We may want to leave enabled for supporting 'cloning' of SDP offers, which
400   // implies re-use of the same pseudo-port number, or forcing a renegotiation.
401   {
402     uint32_t on = 1;
403     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
404                            (const void *)&on, (socklen_t)sizeof(on)) < 0) {
405       LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
406     }
407     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
408                            (const void *)&on, (socklen_t)sizeof(on)) < 0) {
409       LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
410     }
411   }
412 
413   if (!aUsingDtls) {
414     memset(&encaps, 0, sizeof(encaps));
415     encaps.sue_address.ss_family = AF_INET;
416     encaps.sue_port = htons(aPort);
417     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
418                            (const void*)&encaps,
419                            (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
420       LOG(("*** failed encaps errno %d", errno));
421       goto error_cleanup;
422     }
423     LOG(("SCTP encapsulation local port %d", aPort));
424   }
425 
426   av.assoc_id = SCTP_ALL_ASSOC;
427   av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
428   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
429                          (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
430     LOG(("*** failed enable stream reset errno %d", errno));
431     goto error_cleanup;
432   }
433 
434   /* Enable the events of interest. */
435   memset(&event, 0, sizeof(event));
436   event.se_assoc_id = SCTP_ALL_ASSOC;
437   event.se_on = 1;
438   for (unsigned short event_type : event_types) {
439     event.se_type = event_type;
440     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
441       LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
442       goto error_cleanup;
443     }
444   }
445 
446   // Update number of streams
447   mStreams.AppendElements(aNumStreams);
448   for (uint32_t i = 0; i < aNumStreams; ++i) {
449     mStreams[i] = nullptr;
450   }
451   memset(&initmsg, 0, sizeof(initmsg));
452   len = sizeof(initmsg);
453   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
454     LOG(("*** failed getsockopt SCTP_INITMSG"));
455     goto error_cleanup;
456   }
457   LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
458        initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
459   initmsg.sinit_num_ostreams  = aNumStreams;
460   initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
461   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
462                          (socklen_t)sizeof(initmsg)) < 0) {
463     LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
464     goto error_cleanup;
465   }
466 
467   mSocket = nullptr;
468   if (aUsingDtls) {
469     mUsingDtls = true;
470     usrsctp_register_address(static_cast<void *>(this));
471     LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
472   } else {
473     mUsingDtls = false;
474   }
475   return true;
476 
477 error_cleanup:
478   usrsctp_close(mMasterSocket);
479   mMasterSocket = nullptr;
480   mUsingDtls = false;
481   return false;
482 }
483 
484 #ifdef MOZ_PEERCONNECTION
485 void
486 DataChannelConnection::SetEvenOdd()
487 {
488   ASSERT_WEBRTC(IsSTSThread());
489 
490   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
491       mTransportFlow->GetLayer(TransportLayerDtls::ID()));
492   MOZ_ASSERT(dtls);  // DTLS is mandatory
493   mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
494 }
495 
496 bool
497 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
498 {
499   LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
500 
501   NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
502   NS_ENSURE_TRUE(aFlow, false);
503 
504   mTransportFlow = aFlow;
505   mLocalPort = localport;
506   mRemotePort = remoteport;
507   mState = CONNECTING;
508 
509   RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
510                                    &DataChannelConnection::SetSignals),
511                 NS_DISPATCH_NORMAL);
512   return true;
513 }
514 
515 void
516 DataChannelConnection::SetSignals()
517 {
518   ASSERT_WEBRTC(IsSTSThread());
519   ASSERT_WEBRTC(mTransportFlow);
520   LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
521   mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
522   // SignalStateChange() doesn't call you with the initial state
523   mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
524   CompleteConnect(mTransportFlow, mTransportFlow->state());
525 }
526 
527 void
528 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
529 {
530   LOG(("Data transport state: %d", state));
531   MutexAutoLock lock(mLock);
532   ASSERT_WEBRTC(IsSTSThread());
533   // We should abort connection on TS_ERROR.
534   // Note however that the association will also fail (perhaps with a delay) and
535   // notify us in that way
536   if (state != TransportLayer::TS_OPEN || !mMasterSocket)
537     return;
538 
539   struct sockaddr_conn addr;
540   memset(&addr, 0, sizeof(addr));
541   addr.sconn_family = AF_CONN;
542 #if defined(__Userspace_os_Darwin)
543   addr.sconn_len = sizeof(addr);
544 #endif
545   addr.sconn_port = htons(mLocalPort);
546   addr.sconn_addr = static_cast<void *>(this);
547 
548   LOG(("Calling usrsctp_bind"));
549   int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
550                        sizeof(addr));
551   if (r < 0) {
552     LOG(("usrsctp_bind failed: %d", r));
553   } else {
554     // This is the remote addr
555     addr.sconn_port = htons(mRemotePort);
556     LOG(("Calling usrsctp_connect"));
557     r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
558                         sizeof(addr));
559     if (r >= 0 || errno == EINPROGRESS) {
560       struct sctp_paddrparams paddrparams;
561       socklen_t opt_len;
562 
563       memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
564       memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
565       opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
566       r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
567                              &paddrparams, &opt_len);
568       if (r < 0) {
569         LOG(("usrsctp_getsockopt failed: %d", r));
570       } else {
571         // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
572         paddrparams.spp_pathmtu = 1200; // safe for either
573         paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
574         paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
575         opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
576         r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
577                                &paddrparams, opt_len);
578         if (r < 0) {
579           LOG(("usrsctp_getsockopt failed: %d", r));
580         } else {
581           LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
582         }
583       }
584     }
585     if (r < 0) {
586       if (errno == EINPROGRESS) {
587         // non-blocking
588         return;
589       } else {
590         LOG(("usrsctp_connect failed: %d", errno));
591         mState = CLOSED;
592       }
593     } else {
594       // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
595       // This also avoids issues with calling TransportFlow stuff on Mainthread
596       return;
597     }
598   }
599   // Note: currently this doesn't actually notify the application
600   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
601                                       DataChannelOnMessageAvailable::ON_CONNECTION,
602                                       this)));
603   return;
604 }
605 
606 // Process any pending Opens
607 void
608 DataChannelConnection::ProcessQueuedOpens()
609 {
610   // The nsDeque holds channels with an AddRef applied.  Another reference
611   // (may) be held by the DOMDataChannel, unless it's been GC'd.  No other
612   // references should exist.
613 
614   // Can't copy nsDeque's.  Move into temp array since any that fail will
615   // go back to mPending
616   nsDeque temp;
617   DataChannel *temp_channel; // really already_AddRefed<>
618   while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
619     temp.Push(static_cast<void *>(temp_channel));
620   }
621 
622   RefPtr<DataChannel> channel;
623   // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
624   while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
625     if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
626       LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
627       channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
628       // OpenFinish returns a reference itself, so we need to take it can Release it
629       channel = OpenFinish(channel.forget()); // may reset the flag and re-push
630     } else {
631       NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
632     }
633   }
634 
635 }
636 void
637 DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
638                                      const unsigned char *data, size_t len)
639 {
640   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
641     char *buf;
642 
643     if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
644       PR_LogPrint("%s", buf);
645       usrsctp_freedumpbuffer(buf);
646     }
647   }
648   // Pass the data to SCTP
649   usrsctp_conninput(static_cast<void *>(this), data, len, 0);
650 }
651 
652 int
653 DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
654 {
655   //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
656   int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
657   if (release)
658     delete [] data;
659   return res;
660 }
661 
662 /* static */
663 int
664 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
665                                       uint8_t tos, uint8_t set_df)
666 {
667   DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
668   int res;
669 
670   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
671     char *buf;
672 
673     if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
674       PR_LogPrint("%s", buf);
675       usrsctp_freedumpbuffer(buf);
676     }
677   }
678   // We're async proxying even if on the STSThread because this is called
679   // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
680   // SCTP has an option for Apple, on IP connections only, to release at least
681   // one of the locks before calling a packet output routine; with changes to
682   // the underlying SCTP stack this might remove the need to use an async proxy.
683   if ((false /*peer->IsSTSThread()*/)) {
684     res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
685   } else {
686     auto *data = new unsigned char[length];
687     memcpy(data, buffer, length);
688     // Commented out since we have to Dispatch SendPacket to avoid deadlock"
689     // res = -1;
690 
691     // XXX It might be worthwhile to add an assertion against the thread
692     // somehow getting into the DataChannel/SCTP code again, as
693     // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
694     // needs to be a per-thread check, not a global.
695     peer->mSTS->Dispatch(WrapRunnable(
696                            RefPtr<DataChannelConnection>(peer),
697                            &DataChannelConnection::SendPacket, data, length, true),
698                                    NS_DISPATCH_NORMAL);
699     res = 0; // cheat!  Packets can always be dropped later anyways
700   }
701   return res;
702 }
703 #endif
704 
705 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
706 // listen for incoming associations
707 // Blocks! - Don't call this from main thread!
708 
709 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
710 
711 bool
712 DataChannelConnection::Listen(unsigned short port)
713 {
714   struct sockaddr_in addr;
715   socklen_t addr_len;
716 
717   NS_WARNING_ASSERTION(!NS_IsMainThread(),
718                        "Blocks, do not call from main thread!!!");
719 
720   /* Acting as the 'server' */
721   memset((void *)&addr, 0, sizeof(addr));
722 #ifdef HAVE_SIN_LEN
723   addr.sin_len = sizeof(struct sockaddr_in);
724 #endif
725   addr.sin_family = AF_INET;
726   addr.sin_port = htons(port);
727   addr.sin_addr.s_addr = htonl(INADDR_ANY);
728   LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
729   mState = CONNECTING;
730   if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
731     LOG(("***Failed userspace_bind"));
732     return false;
733   }
734   if (usrsctp_listen(mMasterSocket, 1) < 0) {
735     LOG(("***Failed userspace_listen"));
736     return false;
737   }
738 
739   LOG(("Accepting connection"));
740   addr_len = 0;
741   if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
742     LOG(("***Failed accept"));
743     return false;
744   }
745   mState = OPEN;
746 
747   struct linger l;
748   l.l_onoff = 1;
749   l.l_linger = 0;
750   if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
751                          (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
752     LOG(("Couldn't set SO_LINGER on SCTP socket"));
753   }
754 
755   SetEvenOdd();
756 
757   // Notify Connection open
758   // XXX We need to make sure connection sticks around until the message is delivered
759   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
760   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
761                             DataChannelOnMessageAvailable::ON_CONNECTION,
762                             this, (DataChannel *) nullptr)));
763   return true;
764 }
765 
766 // Blocks! - Don't call this from main thread!
767 bool
768 DataChannelConnection::Connect(const char *addr, unsigned short port)
769 {
770   struct sockaddr_in addr4;
771   struct sockaddr_in6 addr6;
772 
773   NS_WARNING_ASSERTION(!NS_IsMainThread(),
774                        "Blocks, do not call from main thread!!!");
775 
776   /* Acting as the connector */
777   LOG(("Connecting to %s, port %u", addr, port));
778   memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
779   memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
780 #ifdef HAVE_SIN_LEN
781   addr4.sin_len = sizeof(struct sockaddr_in);
782 #endif
783 #ifdef HAVE_SIN6_LEN
784   addr6.sin6_len = sizeof(struct sockaddr_in6);
785 #endif
786   addr4.sin_family = AF_INET;
787   addr6.sin6_family = AF_INET6;
788   addr4.sin_port = htons(port);
789   addr6.sin6_port = htons(port);
790   mState = CONNECTING;
791 
792 #if !defined(__Userspace_os_Windows)
793   if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
794     if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
795       LOG(("*** Failed userspace_connect"));
796       return false;
797     }
798   } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
799     if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
800       LOG(("*** Failed userspace_connect"));
801       return false;
802     }
803   } else {
804     LOG(("*** Illegal destination address."));
805   }
806 #else
807   {
808     struct sockaddr_storage ss;
809     int sslen = sizeof(ss);
810 
811     if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
812       addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
813       if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
814         LOG(("*** Failed userspace_connect"));
815         return false;
816       }
817     } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
818       addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
819       if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
820         LOG(("*** Failed userspace_connect"));
821         return false;
822       }
823     } else {
824       LOG(("*** Illegal destination address."));
825     }
826   }
827 #endif
828 
829   mSocket = mMasterSocket;
830 
831   LOG(("connect() succeeded!  Entering connected mode"));
832   mState = OPEN;
833 
834   SetEvenOdd();
835 
836   // Notify Connection open
837   // XXX We need to make sure connection sticks around until the message is delivered
838   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
839   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
840                             DataChannelOnMessageAvailable::ON_CONNECTION,
841                             this, (DataChannel *) nullptr)));
842   return true;
843 }
844 #endif
845 
846 DataChannel *
847 DataChannelConnection::FindChannelByStream(uint16_t stream)
848 {
849   return mStreams.SafeElementAt(stream);
850 }
851 
852 uint16_t
853 DataChannelConnection::FindFreeStream()
854 {
855   uint32_t i, j, limit;
856 
857   limit = mStreams.Length();
858   if (limit > MAX_NUM_STREAMS)
859     limit = MAX_NUM_STREAMS;
860 
861   for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
862     if (!mStreams[i]) {
863       // Verify it's not still in the process of closing
864       for (j = 0; j < mStreamsResetting.Length(); ++j) {
865         if (mStreamsResetting[j] == i) {
866           break;
867         }
868       }
869       if (j == mStreamsResetting.Length())
870         break;
871     }
872   }
873   if (i >= limit) {
874     return INVALID_STREAM;
875   }
876   return i;
877 }
878 
879 bool
880 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
881 {
882   struct sctp_status status;
883   struct sctp_add_streams sas;
884   uint32_t outStreamsNeeded;
885   socklen_t len;
886 
887   if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
888     aNeeded = MAX_NUM_STREAMS - mStreams.Length();
889   }
890   if (aNeeded <= 0) {
891     return false;
892   }
893 
894   len = (socklen_t)sizeof(struct sctp_status);
895   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
896     LOG(("***failed: getsockopt SCTP_STATUS"));
897     return false;
898   }
899   outStreamsNeeded = aNeeded; // number to add
900 
901   // Note: if multiple channel opens happen when we don't have enough space,
902   // we'll call RequestMoreStreams() multiple times
903   memset(&sas, 0, sizeof(sas));
904   sas.sas_instrms = 0;
905   sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
906   // Doesn't block, we get an event when it succeeds or fails
907   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
908                          (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
909     if (errno == EALREADY) {
910       LOG(("Already have %u output streams", outStreamsNeeded));
911       return true;
912     }
913 
914     LOG(("***failed: setsockopt ADD errno=%d", errno));
915     return false;
916   }
917   LOG(("Requested %u more streams", outStreamsNeeded));
918   // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
919   // values are larger than mStreams.Length()
920   return true;
921 }
922 
923 int32_t
924 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
925 {
926   struct sctp_sndinfo sndinfo;
927 
928   // Note: Main-thread IO, but doesn't block
929   memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
930   sndinfo.snd_sid = stream;
931   sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
932   if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
933                     &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
934                     SCTP_SENDV_SNDINFO, 0) < 0) {
935     //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
936     return (0);
937   }
938   return (1);
939 }
940 
941 int32_t
942 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
943 {
944   struct rtcweb_datachannel_ack ack;
945 
946   memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
947   ack.msg_type = DATA_CHANNEL_ACK;
948 
949   return SendControlMessage(&ack, sizeof(ack), stream);
950 }
951 
952 int32_t
953 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
954                                               const nsACString& protocol,
955                                               uint16_t stream, bool unordered,
956                                               uint16_t prPolicy, uint32_t prValue)
957 {
958   const int label_len = label.Length(); // not including nul
959   const int proto_len = protocol.Length(); // not including nul
960   // careful - request struct include one char for the label
961   const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
962                         label_len + proto_len;
963   struct rtcweb_datachannel_open_request *req =
964     (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
965 
966   memset(req, 0, req_size);
967   req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
968   switch (prPolicy) {
969   case SCTP_PR_SCTP_NONE:
970     req->channel_type = DATA_CHANNEL_RELIABLE;
971     break;
972   case SCTP_PR_SCTP_TTL:
973     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
974     break;
975   case SCTP_PR_SCTP_RTX:
976     req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
977     break;
978   default:
979     // FIX! need to set errno!  Or make all these SendXxxx() funcs return 0 or errno!
980     free(req);
981     return (0);
982   }
983   if (unordered) {
984     // Per the current types, all differ by 0x80 between ordered and unordered
985     req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
986   }
987 
988   req->reliability_param = htonl(prValue);
989   req->priority = htons(0); /* XXX: add support */
990   req->label_length = htons(label_len);
991   req->protocol_length = htons(proto_len);
992   memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
993   memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
994 
995   int32_t result = SendControlMessage(req, req_size, stream);
996 
997   free(req);
998   return result;
999 }
1000 
1001 // XXX This should use a separate thread (outbound queue) which should
1002 // select() to know when to *try* to send data to the socket again.
1003 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1004 // (just not sure in what direction).  We could re-implement NSPR's
1005 // PR_POLL_WRITE/etc handling... with a lot of work.
1006 
1007 // Better yet, use the SCTP stack's notifications on buffer state to avoid
1008 // filling the SCTP's buffers.
1009 
1010 // returns if we're still blocked or not
1011 bool
1012 DataChannelConnection::SendDeferredMessages()
1013 {
1014   uint32_t i;
1015   RefPtr<DataChannel> channel; // we may null out the refs to this
1016   bool still_blocked = false;
1017 
1018   // This may block while something is modifying channels, but should not block for IO
1019   MutexAutoLock lock(mLock);
1020 
1021   // XXX For total fairness, on a still_blocked we'd start next time at the
1022   // same index.  Sorry, not going to bother for now.
1023   for (i = 0; i < mStreams.Length(); ++i) {
1024     channel = mStreams[i];
1025     if (!channel)
1026       continue;
1027 
1028     // Only one of these should be set....
1029     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
1030       if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
1031                                  channel->mStream,
1032                                  channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
1033                                  channel->mPrPolicy, channel->mPrValue)) {
1034         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
1035 
1036         channel->mState = OPEN;
1037         channel->mReady = true;
1038         LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1039         NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1040                                   DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
1041                                   channel)));
1042       } else {
1043         if (errno == EAGAIN || errno == EWOULDBLOCK) {
1044           still_blocked = true;
1045         } else {
1046           // Close the channel, inform the user
1047           mStreams[channel->mStream] = nullptr;
1048           channel->mState = CLOSED;
1049           // Don't need to reset; we didn't open it
1050           NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1051                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1052                                     channel)));
1053         }
1054       }
1055     }
1056     if (still_blocked)
1057       break;
1058 
1059     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
1060       if (SendOpenAckMessage(channel->mStream)) {
1061         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
1062       } else {
1063         if (errno == EAGAIN || errno == EWOULDBLOCK) {
1064           still_blocked = true;
1065         } else {
1066           // Close the channel, inform the user
1067           CloseInt(channel);
1068           // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1069         }
1070       }
1071     }
1072     if (still_blocked)
1073       break;
1074 
1075     if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
1076       bool failed_send = false;
1077       int32_t result;
1078 
1079       if (channel->mState == CLOSED || channel->mState == CLOSING) {
1080         channel->mBufferedData.Clear();
1081       }
1082 
1083       uint32_t buffered_amount = channel->GetBufferedAmountLocked();
1084       uint32_t threshold = channel->GetBufferedAmountLowThreshold();
1085       bool was_over_threshold = buffered_amount >= threshold;
1086 
1087       while (!channel->mBufferedData.IsEmpty() &&
1088              !failed_send) {
1089         struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
1090         const char *data           = channel->mBufferedData[0]->mData;
1091         size_t len                 = channel->mBufferedData[0]->mLength;
1092 
1093         // SCTP will return EMSGSIZE if the message is bigger than the buffer
1094         // size (or EAGAIN if there isn't space)
1095         if ((result = usrsctp_sendv(mSocket, data, len,
1096                                     nullptr, 0,
1097                                     (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
1098                                     SCTP_SENDV_SPA,
1099                                     0)) < 0) {
1100           if (errno == EAGAIN || errno == EWOULDBLOCK) {
1101             // leave queued for resend
1102             failed_send = true;
1103             LOG(("queue full again when resending %d bytes (%d)", len, result));
1104           } else {
1105             LOG(("error %d re-sending string", errno));
1106             failed_send = true;
1107           }
1108         } else {
1109           LOG(("Resent buffer of %d bytes (%d)", len, result));
1110           // In theory this could underflow if >4GB was buffered and re
1111           // truncated in GetBufferedAmount(), but this won't cause any problems.
1112           buffered_amount -= channel->mBufferedData[0]->mLength;
1113           channel->mBufferedData.RemoveElementAt(0);
1114           // can never fire with default threshold of 0
1115           if (was_over_threshold && buffered_amount < threshold) {
1116             LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1117                  channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1118             NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1119                                                 DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
1120                                                 this, channel)));
1121             was_over_threshold = false;
1122           }
1123           if (buffered_amount == 0) {
1124             // buffered-to-not-buffered transition; tell the DOM code in case this makes it
1125             // available for GC
1126             LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1127                  channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1128             NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1129                                                 DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
1130                                                 this, channel)));
1131           }
1132         }
1133       }
1134       if (channel->mBufferedData.IsEmpty())
1135         channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
1136       else
1137         still_blocked = true;
1138     }
1139     if (still_blocked)
1140       break;
1141   }
1142 
1143   return still_blocked;
1144 }
1145 
1146 void
1147 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
1148                                                 size_t length,
1149                                                 uint16_t stream)
1150 {
1151   RefPtr<DataChannel> channel;
1152   uint32_t prValue;
1153   uint16_t prPolicy;
1154   uint32_t flags;
1155 
1156   mLock.AssertCurrentThreadOwns();
1157 
1158   if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
1159     LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
1160          (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
1161     if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
1162       return;
1163   }
1164 
1165   LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
1166 
1167   switch (req->channel_type) {
1168     case DATA_CHANNEL_RELIABLE:
1169     case DATA_CHANNEL_RELIABLE_UNORDERED:
1170       prPolicy = SCTP_PR_SCTP_NONE;
1171       break;
1172     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1173     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1174       prPolicy = SCTP_PR_SCTP_RTX;
1175       break;
1176     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1177     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1178       prPolicy = SCTP_PR_SCTP_TTL;
1179       break;
1180     default:
1181       LOG(("Unknown channel type", req->channel_type));
1182       /* XXX error handling */
1183       return;
1184   }
1185   prValue = ntohl(req->reliability_param);
1186   flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1187 
1188   if ((channel = FindChannelByStream(stream))) {
1189     if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1190       LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
1191            stream, channel->mState));
1192      /* XXX: some error handling */
1193     } else {
1194       LOG(("Open for externally negotiated channel %u", stream));
1195       // XXX should also check protocol, maybe label
1196       if (prPolicy != channel->mPrPolicy ||
1197           prValue != channel->mPrValue ||
1198           flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
1199       {
1200         LOG(("WARNING: external negotiation mismatch with OpenRequest:"
1201              "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1202              stream, prPolicy, channel->mPrPolicy,
1203              prValue, channel->mPrValue, flags, channel->mFlags));
1204       }
1205     }
1206     return;
1207   }
1208   if (stream >= mStreams.Length()) {
1209     LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
1210     return;
1211   }
1212 
1213   nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1214   nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
1215                                            ntohs(req->protocol_length)));
1216 
1217   channel = new DataChannel(this,
1218                             stream,
1219                             DataChannel::CONNECTING,
1220                             label,
1221                             protocol,
1222                             prPolicy, prValue,
1223                             flags,
1224                             nullptr, nullptr);
1225   mStreams[stream] = channel;
1226 
1227   channel->mState = DataChannel::WAITING_TO_OPEN;
1228 
1229   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1230        channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
1231   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1232                             DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
1233                             this, channel)));
1234 
1235   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1236 
1237   if (!SendOpenAckMessage(stream)) {
1238     // XXX Only on EAGAIN!?  And if not, then close the channel??
1239     channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
1240     // Note: we're locked, so there's no danger of a race with the
1241     // buffer-threshold callback
1242   }
1243 
1244   // Now process any queued data messages for the channel (which will
1245   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1246   // more that come in before that happens)
1247   DeliverQueuedData(stream);
1248 }
1249 
1250 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1251 // That would make this code moot.  Keep it for now for backwards compatibility.
1252 void
1253 DataChannelConnection::DeliverQueuedData(uint16_t stream)
1254 {
1255   mLock.AssertCurrentThreadOwns();
1256 
1257   uint32_t i = 0;
1258   while (i < mQueuedData.Length()) {
1259     // Careful! we may modify the array length from within the loop!
1260     if (mQueuedData[i]->mStream == stream) {
1261       LOG(("Delivering queued data for stream %u, length %u",
1262            stream, (unsigned int) mQueuedData[i]->mLength));
1263       // Deliver the queued data
1264       HandleDataMessage(mQueuedData[i]->mPpid,
1265                         mQueuedData[i]->mData, mQueuedData[i]->mLength,
1266                         mQueuedData[i]->mStream);
1267       mQueuedData.RemoveElementAt(i);
1268       continue; // don't bump index since we removed the element
1269     }
1270     i++;
1271   }
1272 }
1273 
1274 void
1275 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
1276                                             size_t length, uint16_t stream)
1277 {
1278   DataChannel *channel;
1279 
1280   mLock.AssertCurrentThreadOwns();
1281 
1282   channel = FindChannelByStream(stream);
1283   NS_ENSURE_TRUE_VOID(channel);
1284 
1285   LOG(("OpenAck received for stream %u, waiting=%d", stream,
1286        (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1287 
1288   channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1289 }
1290 
1291 void
1292 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
1293 {
1294   /* XXX: Send an error message? */
1295   LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
1296   // XXX Log to JS error console if possible
1297 }
1298 
1299 void
1300 DataChannelConnection::HandleDataMessage(uint32_t ppid,
1301                                          const void *data, size_t length,
1302                                          uint16_t stream)
1303 {
1304   DataChannel *channel;
1305   const char *buffer = (const char *) data;
1306 
1307   mLock.AssertCurrentThreadOwns();
1308 
1309   channel = FindChannelByStream(stream);
1310 
1311   // XXX A closed channel may trip this... check
1312   // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1313   // That would make this code moot.  Keep it for now for backwards compatibility.
1314   if (!channel) {
1315     // In the updated 0-RTT open case, the sender can send data immediately
1316     // after Open, and doesn't set the in-order bit (since we don't have a
1317     // response or ack).  Also, with external negotiation, data can come in
1318     // before we're told about the external negotiation.  We need to buffer
1319     // data until either a) Open comes in, if the ordering get messed up,
1320     // or b) the app tells us this channel was externally negotiated.  When
1321     // these occur, we deliver the data.
1322 
1323     // Since this is rare and non-performance, keep a single list of queued
1324     // data messages to deliver once the channel opens.
1325     LOG(("Queuing data for stream %u, length %u", stream, length));
1326     // Copies data
1327     mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
1328     return;
1329   }
1330 
1331   // XXX should this be a simple if, no warnings/debugbreaks?
1332   NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
1333 
1334   {
1335     nsAutoCString recvData(buffer, length); // copies (<64) or allocates
1336     bool is_binary = true;
1337 
1338     if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
1339         ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
1340       is_binary = false;
1341     }
1342     if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1343       NS_WARNING("DataChannel message aborted by fragment type change!");
1344       channel->mRecvBuffer.Truncate(0);
1345     }
1346     channel->mIsRecvBinary = is_binary;
1347 
1348     switch (ppid) {
1349       case DATA_CHANNEL_PPID_DOMSTRING:
1350       case DATA_CHANNEL_PPID_BINARY:
1351         channel->mRecvBuffer += recvData;
1352         LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
1353              is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
1354              channel->mStream));
1355         return; // Not ready to notify application
1356 
1357       case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1358         LOG(("DataChannel: String message received of length %lu on channel %u",
1359              length, channel->mStream));
1360         if (!channel->mRecvBuffer.IsEmpty()) {
1361           channel->mRecvBuffer += recvData;
1362           LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
1363           channel->SendOrQueue(new DataChannelOnMessageAvailable(
1364                                  DataChannelOnMessageAvailable::ON_DATA, this,
1365                                  channel, channel->mRecvBuffer, -1));
1366           channel->mRecvBuffer.Truncate(0);
1367           return;
1368         }
1369         // else send using recvData normally
1370         length = -1; // Flag for DOMString
1371 
1372         // WebSockets checks IsUTF8() here; we can try to deliver it
1373         break;
1374 
1375       case DATA_CHANNEL_PPID_BINARY_LAST:
1376         LOG(("DataChannel: Received binary message of length %lu on channel id %u",
1377              length, channel->mStream));
1378         if (!channel->mRecvBuffer.IsEmpty()) {
1379           channel->mRecvBuffer += recvData;
1380           LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
1381           channel->SendOrQueue(new DataChannelOnMessageAvailable(
1382                                  DataChannelOnMessageAvailable::ON_DATA, this,
1383                                  channel, channel->mRecvBuffer,
1384                                  channel->mRecvBuffer.Length()));
1385           channel->mRecvBuffer.Truncate(0);
1386           return;
1387         }
1388         // else send using recvData normally
1389         break;
1390 
1391       default:
1392         NS_ERROR("Unknown data PPID");
1393         return;
1394     }
1395     /* Notify onmessage */
1396     LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
1397     channel->SendOrQueue(new DataChannelOnMessageAvailable(
1398                            DataChannelOnMessageAvailable::ON_DATA, this,
1399                            channel, recvData, length));
1400   }
1401 }
1402 
1403 // Called with mLock locked!
1404 void
1405 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
1406 {
1407   const struct rtcweb_datachannel_open_request *req;
1408   const struct rtcweb_datachannel_ack *ack;
1409 
1410   mLock.AssertCurrentThreadOwns();
1411 
1412   switch (ppid) {
1413     case DATA_CHANNEL_PPID_CONTROL:
1414       req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1415 
1416       NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
1417       switch (req->msg_type) {
1418         case DATA_CHANNEL_OPEN_REQUEST:
1419           // structure includes a possibly-unused char label[1] (in a packed structure)
1420           NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
1421 
1422           HandleOpenRequestMessage(req, length, stream);
1423           break;
1424         case DATA_CHANNEL_ACK:
1425           // >= sizeof(*ack) checked above
1426 
1427           ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1428           HandleOpenAckMessage(ack, length, stream);
1429           break;
1430         default:
1431           HandleUnknownMessage(ppid, length, stream);
1432           break;
1433       }
1434       break;
1435     case DATA_CHANNEL_PPID_DOMSTRING:
1436     case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1437     case DATA_CHANNEL_PPID_BINARY:
1438     case DATA_CHANNEL_PPID_BINARY_LAST:
1439       HandleDataMessage(ppid, buffer, length, stream);
1440       break;
1441     default:
1442       LOG(("Message of length %lu, PPID %u on stream %u received.",
1443            length, ppid, stream));
1444       break;
1445   }
1446 }
1447 
1448 void
1449 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
1450 {
1451   uint32_t i, n;
1452 
1453   switch (sac->sac_state) {
1454   case SCTP_COMM_UP:
1455     LOG(("Association change: SCTP_COMM_UP"));
1456     if (mState == CONNECTING) {
1457       mSocket = mMasterSocket;
1458       mState = OPEN;
1459 
1460       SetEvenOdd();
1461 
1462       NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1463                                 DataChannelOnMessageAvailable::ON_CONNECTION,
1464                                 this)));
1465       LOG(("DTLS connect() succeeded!  Entering connected mode"));
1466 
1467       // Open any streams pending...
1468       ProcessQueuedOpens();
1469 
1470     } else if (mState == OPEN) {
1471       LOG(("DataConnection Already OPEN"));
1472     } else {
1473       LOG(("Unexpected state: %d", mState));
1474     }
1475     break;
1476   case SCTP_COMM_LOST:
1477     LOG(("Association change: SCTP_COMM_LOST"));
1478     // This association is toast, so also close all the channels -- from mainthread!
1479     NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1480                               DataChannelOnMessageAvailable::ON_DISCONNECTED,
1481                               this)));
1482     break;
1483   case SCTP_RESTART:
1484     LOG(("Association change: SCTP_RESTART"));
1485     break;
1486   case SCTP_SHUTDOWN_COMP:
1487     LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1488     NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1489                               DataChannelOnMessageAvailable::ON_DISCONNECTED,
1490                               this)));
1491     break;
1492   case SCTP_CANT_STR_ASSOC:
1493     LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1494     break;
1495   default:
1496     LOG(("Association change: UNKNOWN"));
1497     break;
1498   }
1499   LOG(("Association change: streams (in/out) = (%u/%u)",
1500        sac->sac_inbound_streams, sac->sac_outbound_streams));
1501 
1502   NS_ENSURE_TRUE_VOID(sac);
1503   n = sac->sac_length - sizeof(*sac);
1504   if (((sac->sac_state == SCTP_COMM_UP) ||
1505         (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
1506     for (i = 0; i < n; ++i) {
1507       switch (sac->sac_info[i]) {
1508       case SCTP_ASSOC_SUPPORTS_PR:
1509         LOG(("Supports: PR"));
1510         break;
1511       case SCTP_ASSOC_SUPPORTS_AUTH:
1512         LOG(("Supports: AUTH"));
1513         break;
1514       case SCTP_ASSOC_SUPPORTS_ASCONF:
1515         LOG(("Supports: ASCONF"));
1516         break;
1517       case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1518         LOG(("Supports: MULTIBUF"));
1519         break;
1520       case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1521         LOG(("Supports: RE-CONFIG"));
1522         break;
1523       default:
1524         LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1525         break;
1526       }
1527     }
1528   } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1529               (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
1530     LOG(("Association: ABORT ="));
1531     for (i = 0; i < n; ++i) {
1532       LOG((" 0x%02x", sac->sac_info[i]));
1533     }
1534   }
1535   if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1536       (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1537       (sac->sac_state == SCTP_COMM_LOST)) {
1538     return;
1539   }
1540 }
1541 
1542 void
1543 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
1544 {
1545   const char *addr = "";
1546 #if !defined(__Userspace_os_Windows)
1547   char addr_buf[INET6_ADDRSTRLEN];
1548   struct sockaddr_in *sin;
1549   struct sockaddr_in6 *sin6;
1550 #endif
1551 
1552   switch (spc->spc_aaddr.ss_family) {
1553   case AF_INET:
1554 #if !defined(__Userspace_os_Windows)
1555     sin = (struct sockaddr_in *)&spc->spc_aaddr;
1556     addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1557 #endif
1558     break;
1559   case AF_INET6:
1560 #if !defined(__Userspace_os_Windows)
1561     sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1562     addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1563 #endif
1564     break;
1565   case AF_CONN:
1566     addr = "DTLS connection";
1567     break;
1568   default:
1569     break;
1570   }
1571   LOG(("Peer address %s is now ", addr));
1572   switch (spc->spc_state) {
1573   case SCTP_ADDR_AVAILABLE:
1574     LOG(("SCTP_ADDR_AVAILABLE"));
1575     break;
1576   case SCTP_ADDR_UNREACHABLE:
1577     LOG(("SCTP_ADDR_UNREACHABLE"));
1578     break;
1579   case SCTP_ADDR_REMOVED:
1580     LOG(("SCTP_ADDR_REMOVED"));
1581     break;
1582   case SCTP_ADDR_ADDED:
1583     LOG(("SCTP_ADDR_ADDED"));
1584     break;
1585   case SCTP_ADDR_MADE_PRIM:
1586     LOG(("SCTP_ADDR_MADE_PRIM"));
1587     break;
1588   case SCTP_ADDR_CONFIRMED:
1589     LOG(("SCTP_ADDR_CONFIRMED"));
1590     break;
1591   default:
1592     LOG(("UNKNOWN"));
1593     break;
1594   }
1595   LOG((" (error = 0x%08x).\n", spc->spc_error));
1596 }
1597 
1598 void
1599 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
1600 {
1601   size_t i, n;
1602 
1603   n = sre->sre_length - sizeof(struct sctp_remote_error);
1604   LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1605   for (i = 0; i < n; ++i) {
1606     LOG((" 0x%02x", sre-> sre_data[i]));
1607   }
1608 }
1609 
1610 void
1611 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
1612 {
1613   LOG(("Shutdown event."));
1614   /* XXX: notify all channels. */
1615   // Attempts to actually send anything will fail
1616 }
1617 
1618 void
1619 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
1620 {
1621   LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
1622 }
1623 
1624 void
1625 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
1626 {
1627   size_t i, n;
1628 
1629   if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
1630     LOG(("Unsent "));
1631   }
1632    if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
1633     LOG(("Sent "));
1634   }
1635   if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
1636     LOG(("(flags = %x) ", ssfe->ssfe_flags));
1637   }
1638   LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
1639        ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
1640        ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
1641   n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
1642   for (i = 0; i < n; ++i) {
1643     LOG((" 0x%02x", ssfe->ssfe_data[i]));
1644   }
1645 }
1646 
1647 void
1648 DataChannelConnection::ClearResets()
1649 {
1650   // Clear all pending resets
1651   if (!mStreamsResetting.IsEmpty()) {
1652     LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
1653   }
1654 
1655   for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
1656     RefPtr<DataChannel> channel;
1657     channel = FindChannelByStream(mStreamsResetting[i]);
1658     if (channel) {
1659       LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
1660       mStreams[channel->mStream] = nullptr;
1661     }
1662   }
1663   mStreamsResetting.Clear();
1664 }
1665 
1666 void
1667 DataChannelConnection::ResetOutgoingStream(uint16_t stream)
1668 {
1669   uint32_t i;
1670 
1671   mLock.AssertCurrentThreadOwns();
1672   LOG(("Connection %p: Resetting outgoing stream %u",
1673        (void *) this, stream));
1674   // Rarely has more than a couple items and only for a short time
1675   for (i = 0; i < mStreamsResetting.Length(); ++i) {
1676     if (mStreamsResetting[i] == stream) {
1677       return;
1678     }
1679   }
1680   mStreamsResetting.AppendElement(stream);
1681 }
1682 
1683 void
1684 DataChannelConnection::SendOutgoingStreamReset()
1685 {
1686   struct sctp_reset_streams *srs;
1687   uint32_t i;
1688   size_t len;
1689 
1690   LOG(("Connection %p: Sending outgoing stream reset for %d streams",
1691        (void *) this, mStreamsResetting.Length()));
1692   mLock.AssertCurrentThreadOwns();
1693   if (mStreamsResetting.IsEmpty()) {
1694     LOG(("No streams to reset"));
1695     return;
1696   }
1697   len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
1698   srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
1699   memset(srs, 0, len);
1700   srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
1701   srs->srs_number_streams = mStreamsResetting.Length();
1702   for (i = 0; i < mStreamsResetting.Length(); ++i) {
1703     srs->srs_stream_list[i] = mStreamsResetting[i];
1704   }
1705   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
1706     LOG(("***failed: setsockopt RESET, errno %d", errno));
1707     // if errno == EALREADY, this is normal - we can't send another reset
1708     // with one pending.
1709     // When we get an incoming reset (which may be a response to our
1710     // outstanding one), see if we have any pending outgoing resets and
1711     // send them
1712   } else {
1713     mStreamsResetting.Clear();
1714   }
1715   free(srs);
1716 }
1717 
1718 void
1719 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
1720 {
1721   uint32_t n, i;
1722   RefPtr<DataChannel> channel; // since we may null out the ref to the channel
1723 
1724   if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
1725       !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
1726     n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
1727     for (i = 0; i < n; ++i) {
1728       if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
1729         channel = FindChannelByStream(strrst->strreset_stream_list[i]);
1730         if (channel) {
1731           // The other side closed the channel
1732           // We could be in three states:
1733           // 1. Normal state (input and output streams (OPEN)
1734           //    Notify application, send a RESET in response on our
1735           //    outbound channel.  Go to CLOSED
1736           // 2. We sent our own reset (CLOSING); either they crossed on the
1737           //    wire, or this is a response to our Reset.
1738           //    Go to CLOSED
1739           // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
1740           //    I believe this is impossible, as we don't have an input stream yet.
1741 
1742           LOG(("Incoming: Channel %u  closed, state %d",
1743                channel->mStream, channel->mState));
1744           ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
1745                         channel->mState == DataChannel::CLOSING ||
1746                         channel->mState == DataChannel::CONNECTING ||
1747                         channel->mState == DataChannel::WAITING_TO_OPEN);
1748           if (channel->mState == DataChannel::OPEN ||
1749               channel->mState == DataChannel::WAITING_TO_OPEN) {
1750             // Mark the stream for reset (the reset is sent below)
1751             ResetOutgoingStream(channel->mStream);
1752           }
1753           mStreams[channel->mStream] = nullptr;
1754 
1755           LOG(("Disconnected DataChannel %p from connection %p",
1756                (void *) channel.get(), (void *) channel->mConnection.get()));
1757           // This sends ON_CHANNEL_CLOSED to mainthread
1758           channel->StreamClosedLocked();
1759         } else {
1760           LOG(("Can't find incoming channel %d",i));
1761         }
1762       }
1763     }
1764   }
1765 
1766   // Process any pending resets now:
1767   if (!mStreamsResetting.IsEmpty()) {
1768     LOG(("Sending %d pending resets", mStreamsResetting.Length()));
1769     SendOutgoingStreamReset();
1770   }
1771 }
1772 
1773 void
1774 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
1775 {
1776   uint16_t stream;
1777   RefPtr<DataChannel> channel;
1778 
1779   if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
1780     LOG(("*** Failed increasing number of streams from %u (%u/%u)",
1781          mStreams.Length(),
1782          strchg->strchange_instrms,
1783          strchg->strchange_outstrms));
1784     // XXX FIX! notify pending opens of failure
1785     return;
1786   } else {
1787     if (strchg->strchange_instrms > mStreams.Length()) {
1788       LOG(("Other side increased streams from %u to %u",
1789            mStreams.Length(), strchg->strchange_instrms));
1790     }
1791     if (strchg->strchange_outstrms > mStreams.Length() ||
1792         strchg->strchange_instrms > mStreams.Length()) {
1793       uint16_t old_len = mStreams.Length();
1794       uint16_t new_len = std::max(strchg->strchange_outstrms,
1795                                   strchg->strchange_instrms);
1796       LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
1797            old_len, new_len, new_len - old_len,
1798            strchg->strchange_instrms));
1799       // make sure both are the same length
1800       mStreams.AppendElements(new_len - old_len);
1801       LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
1802       for (size_t i = old_len; i < mStreams.Length(); ++i) {
1803         mStreams[i] = nullptr;
1804       }
1805       // Re-process any channels waiting for streams.
1806       // Linear search, but we don't increase channels often and
1807       // the array would only get long in case of an app error normally
1808 
1809       // Make sure we request enough streams if there's a big jump in streams
1810       // Could make a more complex API for OpenXxxFinish() and avoid this loop
1811       size_t num_needed = mPending.GetSize();
1812       LOG(("%d of %d new streams already needed", num_needed,
1813            new_len - old_len));
1814       num_needed -= (new_len - old_len); // number we added
1815       if (num_needed > 0) {
1816         if (num_needed < 16)
1817           num_needed = 16;
1818         LOG(("Not enough new streams, asking for %d more", num_needed));
1819         RequestMoreStreams(num_needed);
1820       } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
1821         LOG(("Requesting %d output streams to match partner",
1822              strchg->strchange_instrms - strchg->strchange_outstrms));
1823         RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
1824       }
1825 
1826       ProcessQueuedOpens();
1827     }
1828     // else probably not a change in # of streams
1829   }
1830 
1831   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1832     channel = mStreams[i];
1833     if (!channel)
1834       continue;
1835 
1836     if ((channel->mState == CONNECTING) &&
1837         (channel->mStream == INVALID_STREAM)) {
1838       if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
1839           (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
1840         /* XXX: Signal to the other end. */
1841         channel->mState = CLOSED;
1842         NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1843                                   DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1844                                   channel)));
1845         // maybe fire onError (bug 843625)
1846       } else {
1847         stream = FindFreeStream();
1848         if (stream != INVALID_STREAM) {
1849           channel->mStream = stream;
1850           mStreams[stream] = channel;
1851           channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
1852           // Note: we're locked, so there's no danger of a race with the
1853           // buffer-threshold callback
1854         } else {
1855           /* We will not find more ... */
1856           break;
1857         }
1858       }
1859     }
1860   }
1861 }
1862 
1863 // Called with mLock locked!
1864 void
1865 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
1866 {
1867   mLock.AssertCurrentThreadOwns();
1868   if (notif->sn_header.sn_length != (uint32_t)n) {
1869     return;
1870   }
1871   switch (notif->sn_header.sn_type) {
1872   case SCTP_ASSOC_CHANGE:
1873     HandleAssociationChangeEvent(&(notif->sn_assoc_change));
1874     break;
1875   case SCTP_PEER_ADDR_CHANGE:
1876     HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
1877     break;
1878   case SCTP_REMOTE_ERROR:
1879     HandleRemoteErrorEvent(&(notif->sn_remote_error));
1880     break;
1881   case SCTP_SHUTDOWN_EVENT:
1882     HandleShutdownEvent(&(notif->sn_shutdown_event));
1883     break;
1884   case SCTP_ADAPTATION_INDICATION:
1885     HandleAdaptationIndication(&(notif->sn_adaptation_event));
1886     break;
1887   case SCTP_PARTIAL_DELIVERY_EVENT:
1888     LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
1889     break;
1890   case SCTP_AUTHENTICATION_EVENT:
1891     LOG(("SCTP_AUTHENTICATION_EVENT"));
1892     break;
1893   case SCTP_SENDER_DRY_EVENT:
1894     //LOG(("SCTP_SENDER_DRY_EVENT"));
1895     break;
1896   case SCTP_NOTIFICATIONS_STOPPED_EVENT:
1897     LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
1898     break;
1899   case SCTP_SEND_FAILED_EVENT:
1900     HandleSendFailedEvent(&(notif->sn_send_failed_event));
1901     break;
1902   case SCTP_STREAM_RESET_EVENT:
1903     HandleStreamResetEvent(&(notif->sn_strreset_event));
1904     break;
1905   case SCTP_ASSOC_RESET_EVENT:
1906     LOG(("SCTP_ASSOC_RESET_EVENT"));
1907     break;
1908   case SCTP_STREAM_CHANGE_EVENT:
1909     HandleStreamChangeEvent(&(notif->sn_strchange_event));
1910     break;
1911   default:
1912     LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
1913     break;
1914    }
1915  }
1916 
1917 int
1918 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
1919                                        struct sctp_rcvinfo rcv, int32_t flags)
1920 {
1921   ASSERT_WEBRTC(!NS_IsMainThread());
1922 
1923   if (!data) {
1924     usrsctp_close(sock); // SCTP has finished shutting down
1925   } else {
1926     MutexAutoLock lock(mLock);
1927     if (flags & MSG_NOTIFICATION) {
1928       HandleNotification(static_cast<union sctp_notification *>(data), datalen);
1929     } else {
1930       HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
1931     }
1932   }
1933   // sctp allocates 'data' with malloc(), and expects the receiver to free
1934   // it (presumably with free).
1935   // XXX future optimization: try to deliver messages without an internal
1936   // alloc/copy, and if so delay the free until later.
1937   free(data);
1938   // usrsctp defines the callback as returning an int, but doesn't use it
1939   return 1;
1940 }
1941 
1942 already_AddRefed<DataChannel>
1943 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
1944                             Type type, bool inOrder,
1945                             uint32_t prValue, DataChannelListener *aListener,
1946                             nsISupports *aContext, bool aExternalNegotiated,
1947                             uint16_t aStream)
1948 {
1949   // aStream == INVALID_STREAM to have the protocol allocate
1950   uint16_t prPolicy = SCTP_PR_SCTP_NONE;
1951   uint32_t flags;
1952 
1953   LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
1954        PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
1955        type, inOrder, prValue, aListener, aContext,
1956        aExternalNegotiated ? "true" : "false", aStream));
1957   switch (type) {
1958     case DATA_CHANNEL_RELIABLE:
1959       prPolicy = SCTP_PR_SCTP_NONE;
1960       break;
1961     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1962       prPolicy = SCTP_PR_SCTP_RTX;
1963       break;
1964     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1965       prPolicy = SCTP_PR_SCTP_TTL;
1966       break;
1967   }
1968   if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
1969     return nullptr;
1970   }
1971 
1972   // Don't look past currently-negotiated streams
1973   if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
1974     LOG(("ERROR: external negotiation of already-open channel %u", aStream));
1975     // XXX How do we indicate this up to the application?  Probably the
1976     // caller's job, but we may need to return an error code.
1977     return nullptr;
1978   }
1979 
1980   flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1981   RefPtr<DataChannel> channel(new DataChannel(this,
1982                                                 aStream,
1983                                                 DataChannel::CONNECTING,
1984                                                 label, protocol,
1985                                                 type, prValue,
1986                                                 flags,
1987                                                 aListener, aContext));
1988   if (aExternalNegotiated) {
1989     channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
1990   }
1991 
1992   MutexAutoLock lock(mLock); // OpenFinish assumes this
1993   return OpenFinish(channel.forget());
1994 }
1995 
1996 // Separate routine so we can also call it to finish up from pending opens
1997 already_AddRefed<DataChannel>
1998 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
1999 {
2000   RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2001   // Normally 1 reference if called from ::Open(), or 2 if called from
2002   // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2003   uint16_t stream = channel->mStream;
2004   bool queue = false;
2005 
2006   mLock.AssertCurrentThreadOwns();
2007 
2008   // Cases we care about:
2009   // Pre-negotiated:
2010   //    Not Open:
2011   //      Doesn't fit:
2012   //         -> change initial ask or renegotiate after open
2013   //      -> queue open
2014   //    Open:
2015   //      Doesn't fit:
2016   //         -> RequestMoreStreams && queue
2017   //      Does fit:
2018   //         -> open
2019   // Not negotiated:
2020   //    Not Open:
2021   //      -> queue open
2022   //    Open:
2023   //      -> Try to get a stream
2024   //      Doesn't fit:
2025   //         -> RequestMoreStreams && queue
2026   //      Does fit:
2027   //         -> open
2028   // So the Open cases are basically the same
2029   // Not Open cases are simply queue for non-negotiated, and
2030   // either change the initial ask or possibly renegotiate after open.
2031 
2032   if (mState == OPEN) {
2033     if (stream == INVALID_STREAM) {
2034       stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2035     }
2036     if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2037       // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
2038       // to avoid going back immediately for more if the ask to N, N+1, etc
2039       int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
2040                             (stream-((int32_t)mStreams.Length())) + 16;
2041       if (!RequestMoreStreams(more_needed)) {
2042         // Something bad happened... we're done
2043         goto request_error_cleanup;
2044       }
2045       queue = true;
2046     }
2047   } else {
2048     // not OPEN
2049     if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2050         mState == CLOSED) {
2051       // Update number of streams for init message
2052       struct sctp_initmsg initmsg;
2053       socklen_t len = sizeof(initmsg);
2054       int32_t total_needed = stream+16;
2055 
2056       memset(&initmsg, 0, sizeof(initmsg));
2057       if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
2058         LOG(("*** failed getsockopt SCTP_INITMSG"));
2059         goto request_error_cleanup;
2060       }
2061       LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2062            initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2063       initmsg.sinit_num_ostreams  = total_needed;
2064       initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2065       if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
2066                              (socklen_t)sizeof(initmsg)) < 0) {
2067         LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2068         goto request_error_cleanup;
2069       }
2070 
2071       int32_t old_len = mStreams.Length();
2072       mStreams.AppendElements(total_needed - old_len);
2073       for (int32_t i = old_len; i < total_needed; ++i) {
2074         mStreams[i] = nullptr;
2075       }
2076     }
2077     // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2078     // is called, if needed
2079     queue = true;
2080   }
2081   if (queue) {
2082     LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2083     // Also serves to mark we told the app
2084     channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2085     // we need a ref for the nsDeQue and one to return
2086     DataChannel* rawChannel = channel;
2087     rawChannel->AddRef();
2088     mPending.Push(rawChannel);
2089     return channel.forget();
2090   }
2091 
2092   MOZ_ASSERT(stream != INVALID_STREAM);
2093   // just allocated (& OPEN), or externally negotiated
2094   mStreams[stream] = channel; // holds a reference
2095   channel->mStream = stream;
2096 
2097 #ifdef TEST_QUEUED_DATA
2098   // It's painful to write a test for this...
2099   channel->mState = OPEN;
2100   channel->mReady = true;
2101   SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2102 #endif
2103 
2104   if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2105     // Don't send unordered until this gets cleared
2106     channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2107   }
2108 
2109   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2110     if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
2111                                 stream,
2112                                 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2113                                 channel->mPrPolicy, channel->mPrValue)) {
2114       LOG(("SendOpenRequest failed, errno = %d", errno));
2115       if (errno == EAGAIN || errno == EWOULDBLOCK) {
2116         channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
2117         // Note: we're locked, so there's no danger of a race with the
2118         // buffer-threshold callback
2119         return channel.forget();
2120       } else {
2121         if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2122           // We already returned the channel to the app.
2123           NS_ERROR("Failed to send open request");
2124           NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2125                                     DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2126                                     channel)));
2127         }
2128         // If we haven't returned the channel yet, it will get destroyed when we exit
2129         // this function.
2130         mStreams[stream] = nullptr;
2131         channel->mStream = INVALID_STREAM;
2132         // we'll be destroying the channel
2133         channel->mState = CLOSED;
2134         return nullptr;
2135       }
2136       /* NOTREACHED */
2137     }
2138   }
2139   // Either externally negotiated or we sent Open
2140   channel->mState = OPEN;
2141   channel->mReady = true;
2142   // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
2143   LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2144   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2145                             DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2146                             channel)));
2147 
2148   return channel.forget();
2149 
2150 request_error_cleanup:
2151   channel->mState = CLOSED;
2152   if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2153     // We already returned the channel to the app.
2154     NS_ERROR("Failed to request more streams");
2155     NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2156                               DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2157                               channel)));
2158     return channel.forget();
2159   }
2160   // we'll be destroying the channel, but it never really got set up
2161   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2162   // Dispatch it to ourselves
2163   return nullptr;
2164 }
2165 
2166 int32_t
2167 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
2168                                        size_t length, uint32_t ppid)
2169 {
2170   uint16_t flags;
2171   struct sctp_sendv_spa spa;
2172   int32_t result;
2173 
2174   NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
2175   NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
2176 
2177   // To avoid problems where an in-order OPEN is lost and an
2178   // out-of-order data message "beats" it, require data to be in-order
2179   // until we get an ACK.
2180   if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2181       !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2182     flags = SCTP_UNORDERED;
2183   } else {
2184     flags = 0;
2185   }
2186 
2187   spa.sendv_sndinfo.snd_ppid = htonl(ppid);
2188   spa.sendv_sndinfo.snd_sid = channel->mStream;
2189   spa.sendv_sndinfo.snd_flags = flags;
2190   spa.sendv_sndinfo.snd_context = 0;
2191   spa.sendv_sndinfo.snd_assoc_id = 0;
2192   spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2193 
2194   if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
2195     spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
2196     spa.sendv_prinfo.pr_value = channel->mPrValue;
2197     spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2198   }
2199 
2200   // Note: Main-thread IO, but doesn't block!
2201   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
2202   // (more than the buffersize) queue data onto another thread to do the
2203   // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
2204 
2205   // SCTP will return EMSGSIZE if the message is bigger than the buffer
2206   // size (or EAGAIN if there isn't space)
2207 
2208   // Avoid a race between buffer-full-failure (where we have to add the
2209   // packet to the buffered-data queue) and the buffer-now-only-half-full
2210   // callback, which happens on a different thread.  Otherwise we might
2211   // fail here, then before we add it to the queue get the half-full
2212   // callback, find nothing to do, then on this thread add it to the
2213   // queue - which would sit there.  Also, if we later send more data, it
2214   // would arrive ahead of the buffered message, but if the buffer ever
2215   // got to 1/2 full, the message would get sent - but at a semi-random
2216   // time, after other data it was supposed to be in front of.
2217 
2218   // Must lock before empty check for similar reasons!
2219   MutexAutoLock lock(mLock);
2220   if (channel->mBufferedData.IsEmpty()) {
2221     result = usrsctp_sendv(mSocket, data, length,
2222                            nullptr, 0,
2223                            (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
2224                            SCTP_SENDV_SPA, 0);
2225     LOG(("Sent buffer (len=%u), result=%d", length, result));
2226   } else {
2227     // Fake EAGAIN if we're already buffering data
2228     result = -1;
2229     errno = EAGAIN;
2230   }
2231   if (result < 0) {
2232     if (errno == EAGAIN || errno == EWOULDBLOCK) {
2233 
2234       // queue data for resend!  And queue any further data for the stream until it is...
2235       auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
2236       channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
2237       channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
2238       LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
2239       return 0;
2240     }
2241     LOG(("error %d sending string", errno));
2242   }
2243   return result;
2244 }
2245 
2246 // Handles fragmenting binary messages
2247 int32_t
2248 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
2249                                   size_t len,
2250                                   uint32_t ppid_partial, uint32_t ppid_final)
2251 {
2252   // Since there's a limit on network buffer size and no limits on message
2253   // size, and we don't want to use EOR mode (multiple writes for a
2254   // message, but all other streams are blocked until you finish sending
2255   // this message), we need to add application-level fragmentation of large
2256   // messages.  On a reliable channel, these can be simply rebuilt into a
2257   // large message.  On an unreliable channel, we can't and don't know how
2258   // long to wait, and there are no retransmissions, and no easy way to
2259   // tell the user "this part is missing", so on unreliable channels we
2260   // need to return an error if sending more bytes than the network buffers
2261   // can hold, and perhaps a lower number.
2262 
2263   // We *really* don't want to do this from main thread! - and SendMsgInternal
2264   // avoids blocking.
2265   // This MUST be reliable and in-order for the reassembly to work
2266   if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2267       channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
2268       !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2269     int32_t sent=0;
2270     uint32_t origlen = len;
2271     LOG(("Sending binary message length %u in chunks", len));
2272     // XXX check flags for out-of-order, or force in-order for large binary messages
2273     while (len > 0) {
2274       size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2275       uint32_t ppid;
2276       len -= sendlen;
2277       ppid = len > 0 ? ppid_partial : ppid_final;
2278       LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
2279       // Note that these might end up being deferred and queued.
2280       sent += SendMsgInternal(channel, data, sendlen, ppid);
2281       data += sendlen;
2282     }
2283     LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
2284          (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2285          origlen, sent,
2286          channel->mBufferedData.Length()));
2287     return sent;
2288   }
2289   NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2290                        "Sending too-large data on unreliable channel!");
2291 
2292   // This will fail if the message is too large (default 256K)
2293   return SendMsgInternal(channel, data, len, ppid_final);
2294 }
2295 
2296 class ReadBlobRunnable : public Runnable {
2297 public:
2298   ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
2299     nsIInputStream* aBlob) :
2300     mConnection(aConnection),
2301     mStream(aStream),
2302     mBlob(aBlob)
2303   {}
2304 
2305   NS_IMETHOD Run() override {
2306     // ReadBlob() is responsible to releasing the reference
2307     DataChannelConnection *self = mConnection;
2308     self->ReadBlob(mConnection.forget(), mStream, mBlob);
2309     return NS_OK;
2310   }
2311 
2312 private:
2313   // Make sure the Connection doesn't die while there are jobs outstanding.
2314   // Let it die (if released by PeerConnectionImpl while we're running)
2315   // when we send our runnable back to MainThread.  Then ~DataChannelConnection
2316   // can send the IOThread to MainThread to die in a runnable, avoiding
2317   // unsafe event loop recursion.  Evil.
2318   RefPtr<DataChannelConnection> mConnection;
2319   uint16_t mStream;
2320   // Use RefCount for preventing the object is deleted when SendBlob returns.
2321   RefPtr<nsIInputStream> mBlob;
2322 };
2323 
2324 int32_t
2325 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
2326 {
2327   DataChannel *channel = mStreams[stream];
2328   NS_ENSURE_TRUE(channel, 0);
2329   // Spawn a thread to send the data
2330   if (!mInternalIOThread) {
2331     nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
2332     if (NS_FAILED(res)) {
2333       return -1;
2334     }
2335   }
2336 
2337   mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2338   return 0;
2339 }
2340 
2341 class DataChannelBlobSendRunnable : public Runnable
2342 {
2343 public:
2344   DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection,
2345                               uint16_t aStream)
2346     : mConnection(aConnection)
2347     , mStream(aStream) {}
2348 
2349   ~DataChannelBlobSendRunnable() override
2350   {
2351     if (!NS_IsMainThread() && mConnection) {
2352       MOZ_ASSERT(false);
2353       // explicitly leak the connection if destroyed off mainthread
2354       Unused << mConnection.forget().take();
2355     }
2356   }
2357 
2358   NS_IMETHOD Run() override
2359   {
2360     ASSERT_WEBRTC(NS_IsMainThread());
2361 
2362     mConnection->SendBinaryMsg(mStream, mData);
2363     mConnection = nullptr;
2364     return NS_OK;
2365   }
2366 
2367   // explicitly public so we can avoid allocating twice and copying
2368   nsCString mData;
2369 
2370 private:
2371   // Note: we can be destroyed off the target thread, so be careful not to let this
2372   // get Released()ed on the temp thread!
2373   RefPtr<DataChannelConnection> mConnection;
2374   uint16_t mStream;
2375 };
2376 
2377 void
2378 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
2379                                 uint16_t aStream, nsIInputStream* aBlob)
2380 {
2381   // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2382   // it off mainthread; if PeerConnectionImpl has released then we want
2383   // ~DataChannelConnection() to run on MainThread
2384 
2385   // XXX to do this safely, we must enqueue these atomically onto the
2386   // output socket.  We need a sender thread(s?) to enqueue data into the
2387   // socket and to avoid main-thread IO that might block.  Even on a
2388   // background thread, we may not want to block on one stream's data.
2389   // I.e. run non-blocking and service multiple channels.
2390 
2391   // For now as a hack, send as a single blast of queued packets which may
2392   // be deferred until buffer space is available.
2393   uint64_t len;
2394   nsCOMPtr<nsIThread> mainThread;
2395   NS_GetMainThread(getter_AddRefs(mainThread));
2396 
2397   // Must not let Dispatching it cause the DataChannelConnection to get
2398   // released on the wrong thread.  Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
2399   // will occasionally cause aThis to get released on this thread.  Also, an explicit Runnable
2400   // lets us avoid copying the blob data an extra time.
2401   RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
2402                                                                                    aStream);
2403   // avoid copying the blob data by passing the mData from the runnable
2404   if (NS_FAILED(aBlob->Available(&len)) ||
2405       NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
2406     // Bug 966602:  Doesn't return an error to the caller via onerror.
2407     // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
2408     // aThis is now owned by the runnable; release it there
2409     NS_ProxyRelease(mainThread, runnable.forget());
2410     return;
2411   }
2412   aBlob->Close();
2413   NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL);
2414 }
2415 
2416 void
2417 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
2418 {
2419   ASSERT_WEBRTC(NS_IsMainThread());
2420   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2421     if (mStreams[i]) {
2422       aStreamList->push_back(mStreams[i]->mStream);
2423     }
2424   }
2425 }
2426 
2427 int32_t
2428 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
2429                                      bool isBinary)
2430 {
2431   ASSERT_WEBRTC(NS_IsMainThread());
2432   // We really could allow this from other threads, so long as we deal with
2433   // asynchronosity issues with channels closing, in particular access to
2434   // mStreams, and issues with the association closing (access to mSocket).
2435 
2436   const char *data = aMsg.BeginReading();
2437   uint32_t len     = aMsg.Length();
2438   DataChannel *channel;
2439 
2440   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
2441   // XXX if we want more efficiency, translate flags once at open time
2442   channel = mStreams[stream];
2443   NS_ENSURE_TRUE(channel, 0);
2444 
2445   if (isBinary)
2446     return SendBinary(channel, data, len,
2447                       DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
2448   return SendBinary(channel, data, len,
2449                     DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2450 }
2451 
2452 void
2453 DataChannelConnection::Close(DataChannel *aChannel)
2454 {
2455   MutexAutoLock lock(mLock);
2456   CloseInt(aChannel);
2457 }
2458 
2459 // So we can call Close() with the lock already held
2460 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2461 void
2462 DataChannelConnection::CloseInt(DataChannel *aChannel)
2463 {
2464   MOZ_ASSERT(aChannel);
2465   RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
2466 
2467   mLock.AssertCurrentThreadOwns();
2468   LOG(("Connection %p/Channel %p: Closing stream %u",
2469        channel->mConnection.get(), channel.get(), channel->mStream));
2470   // re-test since it may have closed before the lock was grabbed
2471   if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
2472     LOG(("Channel already closing/closed (%u)", aChannel->mState));
2473     if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
2474       // called from CloseAll()
2475       // we're not going to hang around waiting any more
2476       mStreams[channel->mStream] = nullptr;
2477     }
2478     return;
2479   }
2480   aChannel->mBufferedData.Clear();
2481   if (channel->mStream != INVALID_STREAM) {
2482     ResetOutgoingStream(channel->mStream);
2483     if (mState == CLOSED) { // called from CloseAll()
2484       // Let resets accumulate then send all at once in CloseAll()
2485       // we're not going to hang around waiting
2486       mStreams[channel->mStream] = nullptr;
2487     } else {
2488       SendOutgoingStreamReset();
2489     }
2490   }
2491   aChannel->mState = CLOSING;
2492   if (mState == CLOSED) {
2493     // we're not going to hang around waiting
2494     channel->StreamClosedLocked();
2495   }
2496   // At this point when we leave here, the object is a zombie held alive only by the DOM object
2497 }
2498 
2499 void DataChannelConnection::CloseAll()
2500 {
2501   LOG(("Closing all channels (connection %p)", (void*) this));
2502   // Don't need to lock here
2503 
2504   // Make sure no more channels will be opened
2505   {
2506     MutexAutoLock lock(mLock);
2507     mState = CLOSED;
2508   }
2509 
2510   // Close current channels
2511   // If there are runnables, they hold a strong ref and keep the channel
2512   // and/or connection alive (even if in a CLOSED state)
2513   bool closed_some = false;
2514   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2515     if (mStreams[i]) {
2516       mStreams[i]->Close();
2517       closed_some = true;
2518     }
2519   }
2520 
2521   // Clean up any pending opens for channels
2522   RefPtr<DataChannel> channel;
2523   while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
2524     LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
2525     channel->Close(); // also releases the ref on each iteration
2526     closed_some = true;
2527   }
2528   // It's more efficient to let the Resets queue in shutdown and then
2529   // SendOutgoingStreamReset() here.
2530   if (closed_some) {
2531     MutexAutoLock lock(mLock);
2532     SendOutgoingStreamReset();
2533   }
2534 }
2535 
2536 DataChannel::~DataChannel()
2537 {
2538   // NS_ASSERTION since this is more "I think I caught all the cases that
2539   // can cause this" than a true kill-the-program assertion.  If this is
2540   // wrong, nothing bad happens.  A worst it's a leak.
2541   NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
2542 }
2543 
2544 void
2545 DataChannel::Close()
2546 {
2547   if (mConnection) {
2548     // ensure we don't get deleted
2549     RefPtr<DataChannelConnection> connection(mConnection);
2550     connection->Close(this);
2551   }
2552 }
2553 
2554 // Used when disconnecting from the DataChannelConnection
2555 void
2556 DataChannel::StreamClosedLocked()
2557 {
2558   mConnection->mLock.AssertCurrentThreadOwns();
2559   ENSURE_DATACONNECTION;
2560 
2561   LOG(("Destroying Data channel %u", mStream));
2562   MOZ_ASSERT_IF(mStream != INVALID_STREAM,
2563                 !mConnection->FindChannelByStream(mStream));
2564   mStream = INVALID_STREAM;
2565   mState = CLOSED;
2566   NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2567                                       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
2568                                       mConnection, this)));
2569   // We leave mConnection live until the DOM releases us, to avoid races
2570 }
2571 
2572 void
2573 DataChannel::ReleaseConnection()
2574 {
2575   ASSERT_WEBRTC(NS_IsMainThread());
2576   mConnection = nullptr;
2577 }
2578 
2579 void
2580 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
2581 {
2582   MutexAutoLock mLock(mListenerLock);
2583   mContext = aContext;
2584   mListener = aListener;
2585 }
2586 
2587 // May be called from another (i.e. Main) thread!
2588 void
2589 DataChannel::AppReady()
2590 {
2591   ENSURE_DATACONNECTION;
2592 
2593   MutexAutoLock lock(mConnection->mLock);
2594 
2595   mReady = true;
2596   if (mState == WAITING_TO_OPEN) {
2597     mState = OPEN;
2598     NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2599                               DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
2600                               this)));
2601     for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
2602       nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
2603       MOZ_ASSERT(runnable);
2604       NS_DispatchToMainThread(runnable);
2605     }
2606   } else {
2607     NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
2608   }
2609   mQueuedMessages.Clear();
2610   mQueuedMessages.Compact();
2611   // We never use it again...  We could even allocate the array in the odd
2612   // cases we need it.
2613 }
2614 
2615 uint32_t
2616 DataChannel::GetBufferedAmountLocked() const
2617 {
2618   size_t buffered = 0;
2619 
2620   for (auto& buffer : mBufferedData) {
2621     buffered += buffer->mLength;
2622   }
2623   // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
2624   // amount from the SCTP stack for a single stream.  It is on their to-do
2625   // list, and once we import a stack with support for that, we'll need to
2626   // add it to what we buffer.  Also we'll need to ask for notification of a per-
2627   // stream buffer-low event and merge that into the handling of buffer-low
2628   // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
2629 
2630   if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
2631     buffered = UINT32_MAX;
2632   }
2633   return buffered;
2634 }
2635 
2636 uint32_t
2637 DataChannel::GetBufferedAmountLowThreshold()
2638 {
2639   return mBufferedThreshold;
2640 }
2641 
2642 // Never fire immediately, as it's defined to fire on transitions, not state
2643 void
2644 DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
2645 {
2646   mBufferedThreshold = aThreshold;
2647 }
2648 
2649 // Called with mLock locked!
2650 void
2651 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
2652 {
2653   if (!mReady &&
2654       (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
2655     mQueuedMessages.AppendElement(aMessage);
2656   } else {
2657     NS_DispatchToMainThread(aMessage);
2658   }
2659 }
2660 
2661 } // namespace mozilla
2662