1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5  * You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include <algorithm>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #if !defined(__Userspace_os_Windows)
11 #include <arpa/inet.h>
12 #endif
13 // usrsctp.h expects to have errno definitions prior to its inclusion.
14 #include <errno.h>
15 
16 #define SCTP_DEBUG 1
17 #define SCTP_STDINT_INCLUDE <stdint.h>
18 
19 #ifdef _MSC_VER
20 // Disable "warning C4200: nonstandard extension used : zero-sized array in
21 //          struct/union"
22 // ...which the third-party file usrsctp.h runs afoul of.
23 #pragma warning(push)
24 #pragma warning(disable : 4200)
25 #endif
26 
27 #include "usrsctp.h"
28 
29 #ifdef _MSC_VER
30 #pragma warning(pop)
31 #endif
32 
33 #include "DataChannelLog.h"
34 
35 #include "nsServiceManagerUtils.h"
36 #include "nsIObserverService.h"
37 #include "nsIObserver.h"
38 #include "nsIPrefBranch.h"
39 #include "nsIPrefService.h"
40 #include "mozilla/Services.h"
41 #include "mozilla/Sprintf.h"
42 #include "nsProxyRelease.h"
43 #include "nsThread.h"
44 #include "nsThreadUtils.h"
45 #include "nsAutoPtr.h"
46 #include "nsNetUtil.h"
47 #include "nsNetCID.h"
48 #include "mozilla/StaticPtr.h"
49 #include "mozilla/StaticMutex.h"
50 #include "mozilla/Unused.h"
51 #ifdef MOZ_PEERCONNECTION
52 #include "mtransport/runnable_utils.h"
53 #endif
54 
55 #define DATACHANNEL_LOG(args) LOG(args)
56 #include "DataChannel.h"
57 #include "DataChannelProtocol.h"
58 
59 // Let us turn on and off important assertions in non-debug builds
60 #ifdef DEBUG
61 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
62 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
63 #define ASSERT_WEBRTC(x) \
64   do {                   \
65     if (!(x)) {          \
66       MOZ_CRASH();       \
67     }                    \
68   } while (0)
69 #endif
70 
71 static bool sctp_initialized;
72 
73 namespace mozilla {
74 
75 LazyLogModule gDataChannelLog("DataChannel");
76 static LazyLogModule gSCTPLog("SCTP");
77 
78 #define SCTP_LOG(args) \
79   MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
80 
81 class DataChannelConnectionShutdown : public nsITimerCallback {
82  public:
DataChannelConnectionShutdown(DataChannelConnection * aConnection)83   explicit DataChannelConnectionShutdown(DataChannelConnection *aConnection)
84       : mConnection(aConnection) {
85     mTimer = NS_NewTimer();  // we'll crash if this fails
86     mTimer->InitWithCallback(this, 30 * 1000, nsITimer::TYPE_ONE_SHOT);
87   }
88 
89   NS_IMETHODIMP Notify(nsITimer *aTimer) override;
90 
91   NS_DECL_THREADSAFE_ISUPPORTS
92 
93  private:
~DataChannelConnectionShutdown()94   virtual ~DataChannelConnectionShutdown() { mTimer->Cancel(); }
95 
96   RefPtr<DataChannelConnection> mConnection;
97   nsCOMPtr<nsITimer> mTimer;
98 };
99 
100 class DataChannelShutdown;
101 
102 StaticRefPtr<DataChannelShutdown> sDataChannelShutdown;
103 
104 class DataChannelShutdown : public nsIObserver {
105  public:
106   // This needs to be tied to some object that is guaranteed to be
107   // around (singleton likely) unless we want to shutdown sctp whenever
108   // we're not using it (and in which case we'd keep a refcnt'd object
109   // ref'd by each DataChannelConnection to release the SCTP usrlib via
110   // sctp_finish). Right now, the single instance of this class is
111   // owned by the observer service and a StaticRefPtr.
112 
113   NS_DECL_ISUPPORTS
114 
DataChannelShutdown()115   DataChannelShutdown() {}
116 
Init()117   void Init() {
118     nsCOMPtr<nsIObserverService> observerService =
119         mozilla::services::GetObserverService();
120     if (!observerService) return;
121 
122     nsresult rv =
123         observerService->AddObserver(this, "xpcom-will-shutdown", false);
124     MOZ_ASSERT(rv == NS_OK);
125     (void)rv;
126   }
127 
Observe(nsISupports * aSubject,const char * aTopic,const char16_t * aData)128   NS_IMETHOD Observe(nsISupports *aSubject, const char *aTopic,
129                      const char16_t *aData) override {
130     // Note: MainThread
131     if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
132       LOG(("Shutting down SCTP"));
133       if (sctp_initialized) {
134         usrsctp_finish();
135         sctp_initialized = false;
136       }
137       nsCOMPtr<nsIObserverService> observerService =
138           mozilla::services::GetObserverService();
139       if (!observerService) return NS_ERROR_FAILURE;
140 
141       nsresult rv =
142           observerService->RemoveObserver(this, "xpcom-will-shutdown");
143       MOZ_ASSERT(rv == NS_OK);
144       (void)rv;
145 
146       {
147         StaticMutexAutoLock lock(sLock);
148         sConnections = nullptr;  // clears as well
149       }
150       sDataChannelShutdown = nullptr;
151     }
152     return NS_OK;
153   }
154 
CreateConnectionShutdown(DataChannelConnection * aConnection)155   void CreateConnectionShutdown(DataChannelConnection *aConnection) {
156     StaticMutexAutoLock lock(sLock);
157     if (!sConnections) {
158       sConnections = new nsTArray<RefPtr<DataChannelConnectionShutdown>>();
159     }
160     sConnections->AppendElement(new DataChannelConnectionShutdown(aConnection));
161   }
162 
RemoveConnectionShutdown(DataChannelConnectionShutdown * aConnectionShutdown)163   void RemoveConnectionShutdown(
164       DataChannelConnectionShutdown *aConnectionShutdown) {
165     StaticMutexAutoLock lock(sLock);
166     if (sConnections) {
167       sConnections->RemoveElement(aConnectionShutdown);
168     }
169   }
170 
171  private:
172   // The only instance of DataChannelShutdown is owned by the observer
173   // service, so there is no need to call RemoveObserver here.
174   virtual ~DataChannelShutdown() = default;
175 
176   // protects sConnections
177   static StaticMutex sLock;
178   static StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>>
179       sConnections;
180 };
181 
182 StaticMutex DataChannelShutdown::sLock;
183 StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>>
184     DataChannelShutdown::sConnections;
185 
186 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
187 
NS_IMPL_ISUPPORTS(DataChannelConnectionShutdown,nsITimerCallback)188 NS_IMPL_ISUPPORTS(DataChannelConnectionShutdown, nsITimerCallback)
189 
190 NS_IMETHODIMP
191 DataChannelConnectionShutdown::Notify(nsITimer *aTimer) {
192   // safely release reference to ourself
193   RefPtr<DataChannelConnectionShutdown> grip(this);
194   // Might not be set. We don't actually use the |this| pointer in
195   // RemoveConnectionShutdown right now, which makes this a bit gratuitous
196   // anyway...
197   if (sDataChannelShutdown) {
198     sDataChannelShutdown->RemoveConnectionShutdown(this);
199   }
200   return NS_OK;
201 }
202 
OutgoingMsg(struct sctp_sendv_spa & info,const uint8_t * data,size_t length)203 OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
204                          size_t length)
205     : mLength(length), mData(data) {
206   mInfo = &info;
207   mPos = 0;
208 }
209 
Advance(size_t offset)210 void OutgoingMsg::Advance(size_t offset) {
211   mPos += offset;
212   if (mPos > mLength) {
213     mPos = mLength;
214   }
215 }
216 
BufferedOutgoingMsg(OutgoingMsg & msg)217 BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg) {
218   size_t length = msg.GetLeft();
219   auto *tmp = new uint8_t[length];  // infallible malloc!
220   memcpy(tmp, msg.GetData(), length);
221   mLength = length;
222   mData = tmp;
223   mInfo = new sctp_sendv_spa;
224   *mInfo = msg.GetInfo();
225   mPos = 0;
226 }
227 
~BufferedOutgoingMsg()228 BufferedOutgoingMsg::~BufferedOutgoingMsg() {
229   delete mInfo;
230   delete mData;
231 }
232 
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv,int flags,void * ulp_info)233 static int receive_cb(struct socket *sock, union sctp_sockstore addr,
234                       void *data, size_t datalen, struct sctp_rcvinfo rcv,
235                       int flags, void *ulp_info) {
236   DataChannelConnection *connection =
237       static_cast<DataChannelConnection *>(ulp_info);
238   return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
239 }
240 
GetConnectionFromSocket(struct socket * sock)241 static DataChannelConnection *GetConnectionFromSocket(struct socket *sock) {
242   struct sockaddr *addrs = nullptr;
243   int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
244   if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
245     return nullptr;
246   }
247   // usrsctp_getladdrs() returns the addresses bound to this socket, which
248   // contains the SctpDataMediaChannel* as sconn_addr.  Read the pointer,
249   // then free the list of addresses once we have the pointer.  We only open
250   // AF_CONN sockets, and they should all have the sconn_addr set to the
251   // pointer that created them, so [0] is as good as any other.
252   struct sockaddr_conn *sconn =
253       reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
254   DataChannelConnection *connection =
255       reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
256   usrsctp_freeladdrs(addrs);
257 
258   return connection;
259 }
260 
261 // called when the buffer empties to the threshold value
threshold_event(struct socket * sock,uint32_t sb_free)262 static int threshold_event(struct socket *sock, uint32_t sb_free) {
263   DataChannelConnection *connection = GetConnectionFromSocket(sock);
264   if (connection) {
265     connection->SendDeferredMessages();
266   } else {
267     LOG(("Can't find connection for socket %p", sock));
268   }
269   return 0;
270 }
271 
debug_printf(const char * format,...)272 static void debug_printf(const char *format, ...) {
273   va_list ap;
274   char buffer[1024];
275 
276   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
277     va_start(ap, format);
278 #ifdef _WIN32
279     if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
280 #else
281     if (VsprintfLiteral(buffer, format, ap) > 0) {
282 #endif
283       SCTP_LOG(("%s", buffer));
284     }
285     va_end(ap);
286   }
287 }
288 
289 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
290                                              nsIEventTarget *aTarget)
291     : NeckoTargetHolder(aTarget),
292       mLock("netwerk::sctp::DataChannelConnection") {
293   mCurrentStream = 0;
294   mState = CLOSED;
295   mSocket = nullptr;
296   mMasterSocket = nullptr;
297   mListener = listener;
298   mLocalPort = 0;
299   mRemotePort = 0;
300   mPendingType = PENDING_NONE;
301   LOG(("Constructor DataChannelConnection=%p, listener=%p", this,
302        mListener.get()));
303   mInternalIOThread = nullptr;
304 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
305   mShutdown = false;
306 #endif
307 }
308 
309 DataChannelConnection::~DataChannelConnection() {
310   LOG(("Deleting DataChannelConnection %p", (void *)this));
311   // This may die on the MainThread, or on the STS thread
312   ASSERT_WEBRTC(mState == CLOSED);
313   MOZ_ASSERT(!mMasterSocket);
314   MOZ_ASSERT(mPending.GetSize() == 0);
315   MOZ_ASSERT(!mTransportFlow);
316 
317   // Already disconnected from sigslot/mTransportFlow
318   // TransportFlows must be released from the STS thread
319   if (!IsSTSThread()) {
320     ASSERT_WEBRTC(NS_IsMainThread());
321 
322     if (mInternalIOThread) {
323       // Avoid spinning the event thread from here (which if we're mainthread
324       // is in the event loop already)
325       nsCOMPtr<nsIRunnable> r = WrapRunnable(
326           nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::Shutdown);
327       Dispatch(r.forget());
328     }
329   } else {
330     // on STS, safe to call shutdown
331     if (mInternalIOThread) {
332       mInternalIOThread->Shutdown();
333     }
334   }
335 }
336 
337 void DataChannelConnection::Destroy() {
338   // Though it's probably ok to do this and close the sockets;
339   // if we really want it to do true clean shutdowns it can
340   // create a dependant Internal object that would remain around
341   // until the network shut down the association or timed out.
342   LOG(("Destroying DataChannelConnection %p", (void *)this));
343   ASSERT_WEBRTC(NS_IsMainThread());
344   CloseAll();
345 
346   MutexAutoLock lock(mLock);
347   // If we had a pending reset, we aren't waiting for it - clear the list so
348   // we can deregister this DataChannelConnection without leaking.
349   ClearResets();
350 
351   MOZ_ASSERT(mSTS);
352   ASSERT_WEBRTC(NS_IsMainThread());
353   mListener = nullptr;
354   // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
355   // the usrsctp_close() calls can move back here (and just proxy the
356   // disconnect_all())
357   RUN_ON_THREAD(mSTS,
358                 WrapRunnable(RefPtr<DataChannelConnection>(this),
359                              &DataChannelConnection::DestroyOnSTS, mSocket,
360                              mMasterSocket),
361                 NS_DISPATCH_NORMAL);
362 
363   // These will be released on STS
364   mSocket = nullptr;
365   mMasterSocket = nullptr;  // also a flag that we've Destroyed this connection
366 
367   // We can't get any more new callbacks from the SCTP library
368   // All existing callbacks have refs to DataChannelConnection
369 
370   // nsDOMDataChannel objects have refs to DataChannels that have refs to us
371 }
372 
373 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
374                                          struct socket *aSocket) {
375   if (aSocket && aSocket != aMasterSocket) usrsctp_close(aSocket);
376   if (aMasterSocket) usrsctp_close(aMasterSocket);
377 
378   usrsctp_deregister_address(static_cast<void *>(this));
379   LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
380 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
381   mShutdown = true;
382 #endif
383 
384   disconnect_all();
385 
386   // we may have queued packet sends on STS after this; dispatch to ourselves
387   // before finishing here so we can be sure there aren't anymore runnables
388   // active that can try to touch the flow. DON'T use RUN_ON_THREAD, it
389   // queue-jumps!
390   mSTS->Dispatch(WrapRunnable(RefPtr<DataChannelConnection>(this),
391                               &DataChannelConnection::DestroyOnSTSFinal),
392                  NS_DISPATCH_NORMAL);
393 }
394 
395 void DataChannelConnection::DestroyOnSTSFinal() {
396   mTransportFlow = nullptr;
397   sDataChannelShutdown->CreateConnectionShutdown(this);
398 }
399 
400 bool DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams,
401                                  bool aMaxMessageSizeSet,
402                                  uint64_t aMaxMessageSize) {
403   struct sctp_initmsg initmsg;
404   struct sctp_assoc_value av;
405   struct sctp_event event;
406   socklen_t len;
407 
408   uint16_t event_types[] = {
409       SCTP_ASSOC_CHANGE,          SCTP_PEER_ADDR_CHANGE,
410       SCTP_REMOTE_ERROR,          SCTP_SHUTDOWN_EVENT,
411       SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT,
412       SCTP_SEND_FAILED_EVENT,     SCTP_STREAM_RESET_EVENT,
413       SCTP_STREAM_CHANGE_EVENT};
414   {
415     ASSERT_WEBRTC(NS_IsMainThread());
416     // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
417 
418     mSendInterleaved = false;
419     mPpidFragmentation = false;
420     mMaxMessageSizeSet = false;
421     SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
422 
423     if (!sctp_initialized) {
424       LOG(("sctp_init"));
425 #ifdef MOZ_PEERCONNECTION
426       usrsctp_init(0, DataChannelConnection::SctpDtlsOutput, debug_printf);
427 #else
428       MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
429 #endif
430 
431       // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
432       if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
433         usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
434       }
435 
436       // Do not send ABORTs in response to INITs (1).
437       // Do not send ABORTs for received Out of the Blue packets (2).
438       usrsctp_sysctl_set_sctp_blackhole(2);
439 
440       // Disable the Explicit Congestion Notification extension (currently not
441       // supported by the Firefox code)
442       usrsctp_sysctl_set_sctp_ecn_enable(0);
443 
444       // Enable interleaving messages for different streams (incoming)
445       // See: https://tools.ietf.org/html/rfc6458#section-8.1.20
446       usrsctp_sysctl_set_sctp_default_frag_interleave(2);
447 
448       sctp_initialized = true;
449 
450       sDataChannelShutdown = new DataChannelShutdown();
451       sDataChannelShutdown->Init();
452     }
453   }
454 
455   // XXX FIX! make this a global we get once
456   // Find the STS thread
457   nsresult rv;
458   mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
459   MOZ_ASSERT(NS_SUCCEEDED(rv));
460 
461   // Open sctp with a callback
462   if ((mMasterSocket = usrsctp_socket(
463            AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
464            usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
465     return false;
466   }
467 
468   // Make non-blocking for bind/connect.  SCTP over UDP defaults to non-blocking
469   // in associations for normal IO
470   if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
471     LOG(("Couldn't set non_blocking on SCTP socket"));
472     // We can't handle connect() safely if it will block, not that this will
473     // even happen.
474     goto error_cleanup;
475   }
476 
477   // Make sure when we close the socket, make sure it doesn't call us back
478   // again! This would cause it try to use an invalid DataChannelConnection
479   // pointer
480   struct linger l;
481   l.l_onoff = 1;
482   l.l_linger = 0;
483   if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, (const void *)&l,
484                          (socklen_t)sizeof(struct linger)) < 0) {
485     LOG(("Couldn't set SO_LINGER on SCTP socket"));
486     // unsafe to allow it to continue if this fails
487     goto error_cleanup;
488   }
489 
490   // XXX Consider disabling this when we add proper SDP negotiation.
491   // We may want to leave enabled for supporting 'cloning' of SDP offers, which
492   // implies re-use of the same pseudo-port number, or forcing a renegotiation.
493   {
494     const int option_value = 1;
495     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
496                            (const void *)&option_value,
497                            (socklen_t)sizeof(option_value)) < 0) {
498       LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
499     }
500     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
501                            (const void *)&option_value,
502                            (socklen_t)sizeof(option_value)) < 0) {
503       LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
504     }
505   }
506 
507   // Set explicit EOR
508   {
509     const int option_value = 1;
510     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
511                            (const void *)&option_value,
512                            (socklen_t)sizeof(option_value)) < 0) {
513       LOG(("*** failed enable explicit EOR mode %d", errno));
514       goto error_cleanup;
515     }
516   }
517 
518     // Enable ndata
519     // TODO: Bug 1381145, enable this once ndata has been deployed
520 #if 0
521   av.assoc_id = SCTP_FUTURE_ASSOC;
522   av.assoc_value = 1;
523   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
524                          (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
525     LOG(("*** failed enable ndata errno %d", errno));
526     goto error_cleanup;
527   }
528 #endif
529 
530   av.assoc_id = SCTP_ALL_ASSOC;
531   av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
532   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
533                          &av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
534     LOG(("*** failed enable stream reset errno %d", errno));
535     goto error_cleanup;
536   }
537 
538   /* Enable the events of interest. */
539   memset(&event, 0, sizeof(event));
540   event.se_assoc_id = SCTP_ALL_ASSOC;
541   event.se_on = 1;
542   for (unsigned short event_type : event_types) {
543     event.se_type = event_type;
544     if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event,
545                            sizeof(event)) < 0) {
546       LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
547       goto error_cleanup;
548     }
549   }
550 
551   // Update number of streams
552   mStreams.AppendElements(aNumStreams);
553   for (uint32_t i = 0; i < aNumStreams; ++i) {
554     mStreams[i] = nullptr;
555   }
556   memset(&initmsg, 0, sizeof(initmsg));
557   len = sizeof(initmsg);
558   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
559                          &len) < 0) {
560     LOG(("*** failed getsockopt SCTP_INITMSG"));
561     goto error_cleanup;
562   }
563   LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
564        initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
565   initmsg.sinit_num_ostreams = aNumStreams;
566   initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
567   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
568                          (socklen_t)sizeof(initmsg)) < 0) {
569     LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
570     goto error_cleanup;
571   }
572 
573   mSocket = nullptr;
574   usrsctp_register_address(static_cast<void *>(this));
575   LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
576   return true;
577 
578 error_cleanup:
579   usrsctp_close(mMasterSocket);
580   mMasterSocket = nullptr;
581   return false;
582 }
583 
584 void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet,
585                                               uint64_t aMaxMessageSize) {
586   MutexAutoLock lock(mLock);  // TODO: Needed?
587 
588   if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
589     // Don't overwrite already set MMS with default values
590     return;
591   }
592 
593   mMaxMessageSizeSet = aMaxMessageSizeSet;
594   mMaxMessageSize = aMaxMessageSize;
595 
596   bool ppidFragmentationEnforced = false;
597   nsresult rv;
598   nsCOMPtr<nsIPrefService> prefs =
599       do_GetService("@mozilla.org/preferences-service;1", &rv);
600   if (!NS_WARN_IF(NS_FAILED(rv))) {
601     nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
602 
603     if (branch) {
604       if (!NS_FAILED(branch->GetBoolPref(
605               "media.peerconnection.sctp.force_ppid_fragmentation",
606               &mPpidFragmentation))) {
607         // Ensure that forced on/off PPID fragmentation does not get overridden
608         // when Firefox has been detected.
609         mMaxMessageSizeSet = true;
610         ppidFragmentationEnforced = true;
611       }
612 
613       int32_t temp;
614       if (!NS_FAILED(branch->GetIntPref(
615               "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
616         if (temp >= 0) {
617           mMaxMessageSize = (uint64_t)temp;
618         }
619       }
620     }
621   }
622 
623   // Fix remote MMS. This code exists, so future implementations of
624   // RTCSctpTransport.maxMessageSize can simply provide that value from
625   // GetMaxMessageSize.
626 
627   // TODO: Bug 1382779, once resolved, can be increased to
628   // min(Uint8ArrayMaxSize, UINT32_MAX)
629   // TODO: Bug 1381146, once resolved, can be increased to whatever we support
630   // then (hopefully
631   //       SIZE_MAX)
632   if (mMaxMessageSize == 0 ||
633       mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
634     mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
635   }
636 
637   LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
638        mPpidFragmentation ? "yes" : "no",
639        ppidFragmentationEnforced ? "yes" : "no"));
640   LOG(("Maximum message size (outgoing data): %" PRIu64
641        " (set=%s, enforced=%s)",
642        mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
643        aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
644 }
645 
646 uint64_t DataChannelConnection::GetMaxMessageSize() { return mMaxMessageSize; }
647 
648 #ifdef MOZ_PEERCONNECTION
649 void DataChannelConnection::SetEvenOdd() {
650   ASSERT_WEBRTC(IsSTSThread());
651 
652   TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
653       mTransportFlow->GetLayer(TransportLayerDtls::ID()));
654   MOZ_ASSERT(dtls);  // DTLS is mandatory
655   mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
656 }
657 
658 bool DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow,
659                                                     uint16_t localport,
660                                                     uint16_t remoteport) {
661   LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
662 
663   NS_PRECONDITION(mMasterSocket,
664                   "SCTP wasn't initialized before ConnectViaTransportFlow!");
665   if (NS_WARN_IF(!aFlow)) {
666     return false;
667   }
668 
669   mTransportFlow = aFlow;
670   mLocalPort = localport;
671   mRemotePort = remoteport;
672   mState = CONNECTING;
673 
674   RUN_ON_THREAD(mSTS,
675                 WrapRunnable(RefPtr<DataChannelConnection>(this),
676                              &DataChannelConnection::SetSignals),
677                 NS_DISPATCH_NORMAL);
678   return true;
679 }
680 
681 void DataChannelConnection::SetSignals() {
682   ASSERT_WEBRTC(IsSTSThread());
683   ASSERT_WEBRTC(mTransportFlow);
684   LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
685   mTransportFlow->SignalPacketReceived.connect(
686       this, &DataChannelConnection::SctpDtlsInput);
687   // SignalStateChange() doesn't call you with the initial state
688   mTransportFlow->SignalStateChange.connect(
689       this, &DataChannelConnection::CompleteConnect);
690   CompleteConnect(mTransportFlow, mTransportFlow->state());
691 }
692 
693 void DataChannelConnection::CompleteConnect(TransportFlow *flow,
694                                             TransportLayer::State state) {
695   LOG(("Data transport state: %d", state));
696   MutexAutoLock lock(mLock);
697   ASSERT_WEBRTC(IsSTSThread());
698   // We should abort connection on TS_ERROR.
699   // Note however that the association will also fail (perhaps with a delay) and
700   // notify us in that way
701   if (state != TransportLayer::TS_OPEN || !mMasterSocket) return;
702 
703   struct sockaddr_conn addr;
704   memset(&addr, 0, sizeof(addr));
705   addr.sconn_family = AF_CONN;
706 #if defined(__Userspace_os_Darwin)
707   addr.sconn_len = sizeof(addr);
708 #endif
709   addr.sconn_port = htons(mLocalPort);
710   addr.sconn_addr = static_cast<void *>(this);
711 
712   LOG(("Calling usrsctp_bind"));
713   int r = usrsctp_bind(
714       mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
715   if (r < 0) {
716     LOG(("usrsctp_bind failed: %d", r));
717   } else {
718     // This is the remote addr
719     addr.sconn_port = htons(mRemotePort);
720     LOG(("Calling usrsctp_connect"));
721     r = usrsctp_connect(mMasterSocket,
722                         reinterpret_cast<struct sockaddr *>(&addr),
723                         sizeof(addr));
724     if (r >= 0 || errno == EINPROGRESS) {
725       struct sctp_paddrparams paddrparams;
726       socklen_t opt_len;
727 
728       memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
729       memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
730       opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
731       r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
732                              &paddrparams, &opt_len);
733       if (r < 0) {
734         LOG(("usrsctp_getsockopt failed: %d", r));
735       } else {
736         // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4
737         // 1200, IPV6 1280
738         paddrparams.spp_pathmtu = 1200;  // safe for either
739         paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
740         paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
741         opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
742         r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP,
743                                SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
744         if (r < 0) {
745           LOG(("usrsctp_getsockopt failed: %d", r));
746         } else {
747           LOG(("usrsctp: PMTUD disabled, MTU set to %u",
748                paddrparams.spp_pathmtu));
749         }
750       }
751     }
752     if (r < 0) {
753       if (errno == EINPROGRESS) {
754         // non-blocking
755         return;
756       }
757       LOG(("usrsctp_connect failed: %d", errno));
758       mState = CLOSED;
759     } else {
760       // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get
761       // that This also avoids issues with calling TransportFlow stuff on
762       // Mainthread
763       return;
764     }
765   }
766   // Note: currently this doesn't actually notify the application
767   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
768       DataChannelOnMessageAvailable::ON_CONNECTION, this)));
769 }
770 
771 // Process any pending Opens
772 void DataChannelConnection::ProcessQueuedOpens() {
773   // The nsDeque holds channels with an AddRef applied.  Another reference
774   // (may) be held by the DOMDataChannel, unless it's been GC'd.  No other
775   // references should exist.
776 
777   // Can't copy nsDeque's.  Move into temp array since any that fail will
778   // go back to mPending
779   nsDeque temp;
780   DataChannel *temp_channel;  // really already_AddRefed<>
781   while (nullptr !=
782          (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
783     temp.Push(static_cast<void *>(temp_channel));
784   }
785 
786   RefPtr<DataChannel> channel;
787   // All these entries have an AddRef(); make that explicit now via the
788   // dont_AddRef()
789   while (nullptr !=
790          (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
791     if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
792       LOG(("Processing queued open for %p (%u)", channel.get(),
793            channel->mStream));
794       channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
795       // OpenFinish returns a reference itself, so we need to take it can
796       // Release it
797       channel = OpenFinish(channel.forget());  // may reset the flag and re-push
798     } else {
799       NS_ASSERTION(
800           false,
801           "How did a DataChannel get queued without the FINISH_OPEN flag?");
802     }
803   }
804 }
805 void DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
806                                           const unsigned char *data,
807                                           size_t len) {
808   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
809     char *buf;
810 
811     if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) !=
812         nullptr) {
813       SCTP_LOG(("%s", buf));
814       usrsctp_freedumpbuffer(buf);
815     }
816   }
817   // Pass the data to SCTP
818   MutexAutoLock lock(mLock);
819   usrsctp_conninput(static_cast<void *>(this), data, len, 0);
820 }
821 
822 int DataChannelConnection::SendPacket(unsigned char data[], size_t len,
823                                       bool release) {
824   // LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
825   int res = 0;
826   if (mTransportFlow) {
827     res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
828   }
829   if (release) delete[] data;
830   return res;
831 }
832 
833 /* static */
834 int DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer,
835                                           size_t length, uint8_t tos,
836                                           uint8_t set_df) {
837   DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
838   int res;
839   MOZ_DIAGNOSTIC_ASSERT(!peer->mShutdown);
840 
841   if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
842     char *buf;
843 
844     if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) !=
845         nullptr) {
846       SCTP_LOG(("%s", buf));
847       usrsctp_freedumpbuffer(buf);
848     }
849   }
850   // We're async proxying even if on the STSThread because this is called
851   // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
852   // SCTP has an option for Apple, on IP connections only, to release at least
853   // one of the locks before calling a packet output routine; with changes to
854   // the underlying SCTP stack this might remove the need to use an async proxy.
855   if ((false /*peer->IsSTSThread()*/)) {
856     res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
857   } else {
858     auto *data = new unsigned char[length];
859     memcpy(data, buffer, length);
860     // Commented out since we have to Dispatch SendPacket to avoid deadlock"
861     // res = -1;
862 
863     // XXX It might be worthwhile to add an assertion against the thread
864     // somehow getting into the DataChannel/SCTP code again, as
865     // DISPATCH_SYNC is not fully blocking.  This may be tricky, as it
866     // needs to be a per-thread check, not a global.
867     peer->mSTS->Dispatch(
868         WrapRunnable(RefPtr<DataChannelConnection>(peer),
869                      &DataChannelConnection::SendPacket, data, length, true),
870         NS_DISPATCH_NORMAL);
871     res = 0;  // cheat!  Packets can always be dropped later anyways
872   }
873   return res;
874 }
875 #endif
876 
877 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
878   // listen for incoming associations
879   // Blocks! - Don't call this from main thread!
880 
881 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
882 
883 bool DataChannelConnection::Listen(unsigned short port) {
884   struct sockaddr_in addr;
885   socklen_t addr_len;
886 
887   NS_WARNING_ASSERTION(!NS_IsMainThread(),
888                        "Blocks, do not call from main thread!!!");
889 
890   /* Acting as the 'server' */
891   memset((void *)&addr, 0, sizeof(addr));
892 #ifdef HAVE_SIN_LEN
893   addr.sin_len = sizeof(struct sockaddr_in);
894 #endif
895   addr.sin_family = AF_INET;
896   addr.sin_port = htons(port);
897   addr.sin_addr.s_addr = htonl(INADDR_ANY);
898   LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
899   mState = CONNECTING;
900   if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
901                    sizeof(struct sockaddr_in)) < 0) {
902     LOG(("***Failed userspace_bind"));
903     return false;
904   }
905   if (usrsctp_listen(mMasterSocket, 1) < 0) {
906     LOG(("***Failed userspace_listen"));
907     return false;
908   }
909 
910   LOG(("Accepting connection"));
911   addr_len = 0;
912   if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) ==
913       nullptr) {
914     LOG(("***Failed accept"));
915     return false;
916   }
917   mState = OPEN;
918 
919   struct linger l;
920   l.l_onoff = 1;
921   l.l_linger = 0;
922   if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void *)&l,
923                          (socklen_t)sizeof(struct linger)) < 0) {
924     LOG(("Couldn't set SO_LINGER on SCTP socket"));
925   }
926 
927   SetEvenOdd();
928 
929   // Notify Connection open
930   // XXX We need to make sure connection sticks around until the message is
931   // delivered
932   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
933   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
934       DataChannelOnMessageAvailable::ON_CONNECTION, this,
935       (DataChannel *)nullptr)));
936   return true;
937 }
938 
939 // Blocks! - Don't call this from main thread!
940 bool DataChannelConnection::Connect(const char *addr, unsigned short port) {
941   struct sockaddr_in addr4;
942   struct sockaddr_in6 addr6;
943 
944   NS_WARNING_ASSERTION(!NS_IsMainThread(),
945                        "Blocks, do not call from main thread!!!");
946 
947   /* Acting as the connector */
948   LOG(("Connecting to %s, port %u", addr, port));
949   memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
950   memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
951 #ifdef HAVE_SIN_LEN
952   addr4.sin_len = sizeof(struct sockaddr_in);
953 #endif
954 #ifdef HAVE_SIN6_LEN
955   addr6.sin6_len = sizeof(struct sockaddr_in6);
956 #endif
957   addr4.sin_family = AF_INET;
958   addr6.sin6_family = AF_INET6;
959   addr4.sin_port = htons(port);
960   addr6.sin6_port = htons(port);
961   mState = CONNECTING;
962 
963 #if !defined(__Userspace_os_Windows)
964   if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
965     if (usrsctp_connect(mMasterSocket,
966                         reinterpret_cast<struct sockaddr *>(&addr6),
967                         sizeof(struct sockaddr_in6)) < 0) {
968       LOG(("*** Failed userspace_connect"));
969       return false;
970     }
971   } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
972     if (usrsctp_connect(mMasterSocket,
973                         reinterpret_cast<struct sockaddr *>(&addr4),
974                         sizeof(struct sockaddr_in)) < 0) {
975       LOG(("*** Failed userspace_connect"));
976       return false;
977     }
978   } else {
979     LOG(("*** Illegal destination address."));
980   }
981 #else
982   {
983     struct sockaddr_storage ss;
984     int sslen = sizeof(ss);
985 
986     if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr,
987                              (struct sockaddr *)&ss, &sslen)) {
988       addr6.sin6_addr =
989           (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
990       if (usrsctp_connect(mMasterSocket,
991                           reinterpret_cast<struct sockaddr *>(&addr6),
992                           sizeof(struct sockaddr_in6)) < 0) {
993         LOG(("*** Failed userspace_connect"));
994         return false;
995       }
996     } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr,
997                                     (struct sockaddr *)&ss, &sslen)) {
998       addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
999       if (usrsctp_connect(mMasterSocket,
1000                           reinterpret_cast<struct sockaddr *>(&addr4),
1001                           sizeof(struct sockaddr_in)) < 0) {
1002         LOG(("*** Failed userspace_connect"));
1003         return false;
1004       }
1005     } else {
1006       LOG(("*** Illegal destination address."));
1007     }
1008   }
1009 #endif
1010 
1011   mSocket = mMasterSocket;
1012 
1013   LOG(("connect() succeeded!  Entering connected mode"));
1014   mState = OPEN;
1015 
1016   SetEvenOdd();
1017 
1018   // Notify Connection open
1019   // XXX We need to make sure connection sticks around until the message is
1020   // delivered
1021   LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
1022   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1023       DataChannelOnMessageAvailable::ON_CONNECTION, this,
1024       (DataChannel *)nullptr)));
1025   return true;
1026 }
1027 #endif
1028 
1029 DataChannel *DataChannelConnection::FindChannelByStream(uint16_t stream) {
1030   return mStreams.SafeElementAt(stream);
1031 }
1032 
1033 uint16_t DataChannelConnection::FindFreeStream() {
1034   uint32_t i, j, limit;
1035 
1036   limit = mStreams.Length();
1037   if (limit > MAX_NUM_STREAMS) limit = MAX_NUM_STREAMS;
1038 
1039   for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
1040     if (!mStreams[i]) {
1041       // Verify it's not still in the process of closing
1042       for (j = 0; j < mStreamsResetting.Length(); ++j) {
1043         if (mStreamsResetting[j] == i) {
1044           break;
1045         }
1046       }
1047       if (j == mStreamsResetting.Length()) break;
1048     }
1049   }
1050   if (i >= limit) {
1051     return INVALID_STREAM;
1052   }
1053   return i;
1054 }
1055 
1056 uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
1057   if (mCurrentStream == mStreams.Length() - 1) {
1058     mCurrentStream = 0;
1059   } else {
1060     ++mCurrentStream;
1061   }
1062 
1063   return mCurrentStream;
1064 }
1065 
1066 uint32_t DataChannelConnection::GetCurrentStreamIndex() {
1067   // Fix current stream index (in case #streams decreased)
1068   if (mCurrentStream >= mStreams.Length()) {
1069     mCurrentStream = 0;
1070   }
1071 
1072   return mCurrentStream;
1073 }
1074 
1075 bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
1076   struct sctp_status status;
1077   struct sctp_add_streams sas;
1078   uint32_t outStreamsNeeded;
1079   socklen_t len;
1080 
1081   if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
1082     aNeeded = MAX_NUM_STREAMS - mStreams.Length();
1083   }
1084   if (aNeeded <= 0) {
1085     return false;
1086   }
1087 
1088   len = (socklen_t)sizeof(struct sctp_status);
1089   if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status,
1090                          &len) < 0) {
1091     LOG(("***failed: getsockopt SCTP_STATUS"));
1092     return false;
1093   }
1094   outStreamsNeeded = aNeeded;  // number to add
1095 
1096   // Note: if multiple channel opens happen when we don't have enough space,
1097   // we'll call RequestMoreStreams() multiple times
1098   memset(&sas, 0, sizeof(sas));
1099   sas.sas_instrms = 0;
1100   sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
1101   // Doesn't block, we get an event when it succeeds or fails
1102   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
1103                          (socklen_t)sizeof(struct sctp_add_streams)) < 0) {
1104     if (errno == EALREADY) {
1105       LOG(("Already have %u output streams", outStreamsNeeded));
1106       return true;
1107     }
1108 
1109     LOG(("***failed: setsockopt ADD errno=%d", errno));
1110     return false;
1111   }
1112   LOG(("Requested %u more streams", outStreamsNeeded));
1113   // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
1114   // values are larger than mStreams.Length()
1115   return true;
1116 }
1117 
1118 // Returns a POSIX error code.
1119 int DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len,
1120                                               uint16_t stream) {
1121   struct sctp_sendv_spa info = {0};
1122 
1123   // General flags
1124   info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
1125 
1126   // Set stream identifier, protocol identifier and flags
1127   info.sendv_sndinfo.snd_sid = stream;
1128   info.sendv_sndinfo.snd_flags = SCTP_EOR;
1129   info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
1130 
1131   // Create message instance and send
1132   // Note: Main-thread IO, but doesn't block
1133 #if (UINT32_MAX > SIZE_MAX)
1134   if (len > SIZE_MAX) {
1135     return EMSGSIZE;
1136   }
1137 #endif
1138   OutgoingMsg msg(info, data, (size_t)len);
1139   bool buffered;
1140   int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
1141 
1142   // Set pending type (if buffered)
1143   if (!error && buffered && !mPendingType) {
1144     mPendingType = PENDING_DCEP;
1145   }
1146   return error;
1147 }
1148 
1149 // Returns a POSIX error code.
1150 int DataChannelConnection::SendOpenAckMessage(uint16_t stream) {
1151   struct rtcweb_datachannel_ack ack;
1152 
1153   memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
1154   ack.msg_type = DATA_CHANNEL_ACK;
1155 
1156   return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
1157 }
1158 
1159 // Returns a POSIX error code.
1160 int DataChannelConnection::SendOpenRequestMessage(
1161     const nsACString &label, const nsACString &protocol, uint16_t stream,
1162     bool unordered, uint16_t prPolicy, uint32_t prValue) {
1163   const int label_len = label.Length();     // not including nul
1164   const int proto_len = protocol.Length();  // not including nul
1165   // careful - request struct include one char for the label
1166   const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
1167                        label_len + proto_len;
1168   struct rtcweb_datachannel_open_request *req =
1169       (struct rtcweb_datachannel_open_request *)moz_xmalloc(req_size);
1170 
1171   memset(req, 0, req_size);
1172   req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
1173   switch (prPolicy) {
1174     case SCTP_PR_SCTP_NONE:
1175       req->channel_type = DATA_CHANNEL_RELIABLE;
1176       break;
1177     case SCTP_PR_SCTP_TTL:
1178       req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
1179       break;
1180     case SCTP_PR_SCTP_RTX:
1181       req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
1182       break;
1183     default:
1184       free(req);
1185       return EINVAL;
1186   }
1187   if (unordered) {
1188     // Per the current types, all differ by 0x80 between ordered and unordered
1189     req->channel_type |=
1190         0x80;  // NOTE: be careful if new types are added in the future
1191   }
1192 
1193   req->reliability_param = htonl(prValue);
1194   req->priority = htons(0); /* XXX: add support */
1195   req->label_length = htons(label_len);
1196   req->protocol_length = htons(proto_len);
1197   memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
1198   memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
1199 
1200   // TODO: req_size is an int... that looks hairy
1201   int error = SendControlMessage((const uint8_t *)req, req_size, stream);
1202 
1203   free(req);
1204   return error;
1205 }
1206 
1207 // XXX This should use a separate thread (outbound queue) which should
1208 // select() to know when to *try* to send data to the socket again.
1209 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1210 // (just not sure in what direction).  We could re-implement NSPR's
1211 // PR_POLL_WRITE/etc handling... with a lot of work.
1212 
1213 // Better yet, use the SCTP stack's notifications on buffer state to avoid
1214 // filling the SCTP's buffers.
1215 
1216 // returns if we're still blocked (true)
1217 bool DataChannelConnection::SendDeferredMessages() {
1218   RefPtr<DataChannel> channel;  // we may null out the refs to this
1219 
1220   // This may block while something is modifying channels, but should not block
1221   // for IO
1222   mLock.AssertCurrentThreadOwns();
1223 
1224   LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
1225   if (!mPendingType) {
1226     return false;
1227   }
1228 
1229   // Send pending control messages
1230   // Note: If ndata is not active, check if DCEP messages are currently
1231   // outstanding. These need to
1232   //       be sent first before other streams can be used for sending.
1233   if (!mBufferedControl.IsEmpty() &&
1234       (mSendInterleaved || mPendingType == PENDING_DCEP)) {
1235     if (SendBufferedMessages(mBufferedControl)) {
1236       return true;
1237     }
1238 
1239     // Note: There may or may not be pending data messages
1240     mPendingType = PENDING_DATA;
1241   }
1242 
1243   bool blocked = false;
1244   uint32_t i = GetCurrentStreamIndex();
1245   uint32_t end = i;
1246   do {
1247     channel = mStreams[i];
1248     if (!channel || channel->mBufferedData.IsEmpty()) {
1249       i = UpdateCurrentStreamIndex();
1250       continue;
1251     }
1252 
1253     // Clear if closing/closed
1254     if (channel->mState == CLOSED || channel->mState == CLOSING) {
1255       channel->mBufferedData.Clear();
1256       i = UpdateCurrentStreamIndex();
1257       continue;
1258     }
1259 
1260     size_t bufferedAmount = channel->GetBufferedAmountLocked();
1261     size_t threshold = channel->mBufferedThreshold;
1262     bool wasOverThreshold = bufferedAmount >= threshold;
1263 
1264     // Send buffered data messages
1265     // Warning: This will fail in case ndata is inactive and a previously
1266     //          deallocated data channel has not been closed properly. If you
1267     //          ever see that no messages can be sent on any channel, this is
1268     //          likely the cause (an explicit EOR message partially sent whose
1269     //          remaining chunks are still being waited for).
1270     blocked = SendBufferedMessages(channel->mBufferedData);
1271     bufferedAmount = channel->GetBufferedAmountLocked();
1272 
1273     // can never fire with default threshold of 0
1274     if (wasOverThreshold && bufferedAmount < threshold) {
1275       LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1276            channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1277       Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1278           DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD, this, channel)));
1279     }
1280 
1281     if (bufferedAmount == 0) {
1282       // buffered-to-not-buffered transition; tell the DOM code in case this
1283       // makes it available for GC
1284       LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1285            channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1286       Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1287           DataChannelOnMessageAvailable::NO_LONGER_BUFFERED, this, channel)));
1288     }
1289 
1290     // Update current stream index
1291     // Note: If ndata is not active, the outstanding data messages on this
1292     //       stream need to be sent first before other streams can be used for
1293     //       sending.
1294     if (mSendInterleaved || !blocked) {
1295       i = UpdateCurrentStreamIndex();
1296     }
1297   } while (!blocked && i != end);
1298 
1299   if (!blocked) {
1300     mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
1301   }
1302   return blocked;
1303 }
1304 
1305 // Called with mLock locked!
1306 // buffer MUST have at least one item!
1307 // returns if we're still blocked (true)
1308 bool DataChannelConnection::SendBufferedMessages(
1309     nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer) {
1310   do {
1311     // Re-send message
1312     int error = SendMsgInternal(*buffer[0]);
1313     switch (error) {
1314       case 0:
1315         buffer.RemoveElementAt(0);
1316         break;
1317       case EAGAIN:
1318 #if (EAGAIN != EWOULDBLOCK)
1319       case EWOULDBLOCK:
1320 #endif
1321         return true;
1322       default:
1323         buffer.RemoveElementAt(0);
1324         LOG(("error on sending: %d", error));
1325         break;
1326     }
1327   } while (!buffer.IsEmpty());
1328 
1329   return false;
1330 }
1331 
1332 // Caller must ensure that length <= SIZE_MAX
1333 void DataChannelConnection::HandleOpenRequestMessage(
1334     const struct rtcweb_datachannel_open_request *req, uint32_t length,
1335     uint16_t stream) {
1336   RefPtr<DataChannel> channel;
1337   uint32_t prValue;
1338   uint16_t prPolicy;
1339   uint32_t flags;
1340 
1341   mLock.AssertCurrentThreadOwns();
1342 
1343   const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) +
1344                                 ntohs(req->protocol_length);
1345   if (((size_t)length) != requiredLength) {
1346     LOG(("%s: Inconsistent length: %u, should be %zu", __FUNCTION__, length,
1347          requiredLength));
1348     if (((size_t)length) < requiredLength) return;
1349   }
1350 
1351   LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length,
1352        sizeof(*req)));
1353 
1354   switch (req->channel_type) {
1355     case DATA_CHANNEL_RELIABLE:
1356     case DATA_CHANNEL_RELIABLE_UNORDERED:
1357       prPolicy = SCTP_PR_SCTP_NONE;
1358       break;
1359     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1360     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1361       prPolicy = SCTP_PR_SCTP_RTX;
1362       break;
1363     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1364     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1365       prPolicy = SCTP_PR_SCTP_TTL;
1366       break;
1367     default:
1368       LOG(("Unknown channel type %d", req->channel_type));
1369       /* XXX error handling */
1370       return;
1371   }
1372   prValue = ntohl(req->reliability_param);
1373   flags =
1374       (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1375 
1376   if ((channel = FindChannelByStream(stream))) {
1377     if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1378       LOG(
1379           ("ERROR: HandleOpenRequestMessage: channel for stream %u is in state "
1380            "%d instead of CLOSED.",
1381            stream, channel->mState));
1382       /* XXX: some error handling */
1383     } else {
1384       LOG(("Open for externally negotiated channel %u", stream));
1385       // XXX should also check protocol, maybe label
1386       if (prPolicy != channel->mPrPolicy || prValue != channel->mPrValue ||
1387           flags !=
1388               (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
1389         LOG(
1390             ("WARNING: external negotiation mismatch with OpenRequest:"
1391              "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1392              stream, prPolicy, channel->mPrPolicy, prValue, channel->mPrValue,
1393              flags, channel->mFlags));
1394       }
1395     }
1396     return;
1397   }
1398   if (stream >= mStreams.Length()) {
1399     LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
1400          mStreams.Length()));
1401     return;
1402   }
1403 
1404   nsCString label(
1405       nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1406   nsCString protocol(nsDependentCSubstring(
1407       &req->label[ntohs(req->label_length)], ntohs(req->protocol_length)));
1408 
1409   channel =
1410       new DataChannel(this, stream, DataChannel::CONNECTING, label, protocol,
1411                       prPolicy, prValue, flags, nullptr, nullptr);
1412   mStreams[stream] = channel;
1413 
1414   channel->mState = DataChannel::WAITING_TO_OPEN;
1415 
1416   LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1417        channel->mLabel.get(), channel->mProtocol.get(), stream,
1418        channel->mState));
1419   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1420       DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, this, channel)));
1421 
1422   LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
1423        channel.get()));
1424 
1425   int error = SendOpenAckMessage(stream);
1426   if (error) {
1427     LOG(("SendOpenRequest failed, error = %d", error));
1428     // Close the channel, inform the user
1429     CloseInt(channel);
1430     // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1431     return;
1432   }
1433 
1434   // Now process any queued data messages for the channel (which will
1435   // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1436   // more that come in before that happens)
1437   DeliverQueuedData(stream);
1438 }
1439 
1440 // NOTE: the updated spec from the IETF says we should set in-order until we
1441 // receive an ACK. That would make this code moot.  Keep it for now for
1442 // backwards compatibility.
1443 void DataChannelConnection::DeliverQueuedData(uint16_t stream) {
1444   mLock.AssertCurrentThreadOwns();
1445 
1446   uint32_t i = 0;
1447   while (i < mQueuedData.Length()) {
1448     // Careful! we may modify the array length from within the loop!
1449     if (mQueuedData[i]->mStream == stream) {
1450       LOG(("Delivering queued data for stream %u, length %u", stream,
1451            mQueuedData[i]->mLength));
1452       // Deliver the queued data
1453       HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
1454                         mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
1455                         mQueuedData[i]->mFlags);
1456       mQueuedData.RemoveElementAt(i);
1457       continue;  // don't bump index since we removed the element
1458     }
1459     i++;
1460   }
1461 }
1462 
1463 // Caller must ensure that length <= SIZE_MAX
1464 void DataChannelConnection::HandleOpenAckMessage(
1465     const struct rtcweb_datachannel_ack *ack, uint32_t length,
1466     uint16_t stream) {
1467   DataChannel *channel;
1468 
1469   mLock.AssertCurrentThreadOwns();
1470 
1471   channel = FindChannelByStream(stream);
1472   if (NS_WARN_IF(!channel)) {
1473     return;
1474   }
1475 
1476   LOG(("OpenAck received for stream %u, waiting=%d", stream,
1477        (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1478 
1479   channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1480 }
1481 
1482 // Caller must ensure that length <= SIZE_MAX
1483 void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length,
1484                                                  uint16_t stream) {
1485   /* XXX: Send an error message? */
1486   LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid,
1487        length, stream));
1488   // XXX Log to JS error console if possible
1489 }
1490 
1491 uint8_t DataChannelConnection::BufferMessage(nsACString &recvBuffer,
1492                                              const void *data, uint32_t length,
1493                                              uint32_t ppid, int flags) {
1494   const char *buffer = (const char *)data;
1495   uint8_t bufferFlags = 0;
1496 
1497   if ((flags & MSG_EOR) && ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
1498       ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
1499     bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
1500 
1501     // Return directly if nothing has been buffered
1502     if (recvBuffer.IsEmpty()) {
1503       return bufferFlags;
1504     }
1505   }
1506 
1507   // Ensure it doesn't blow up our buffer
1508   // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the
1509   //       new buffer is capable of holding.
1510   if (((uint64_t)recvBuffer.Length()) + ((uint64_t)length) >
1511       WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1512     bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
1513     return bufferFlags;
1514   }
1515 
1516   // Copy & add to receive buffer
1517   recvBuffer.Append(buffer, length);
1518   bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
1519   return bufferFlags;
1520 }
1521 
1522 void DataChannelConnection::HandleDataMessage(const void *data, size_t length,
1523                                               uint32_t ppid, uint16_t stream,
1524                                               int flags) {
1525   DataChannel *channel;
1526   const char *buffer = (const char *)data;
1527 
1528   mLock.AssertCurrentThreadOwns();
1529   channel = FindChannelByStream(stream);
1530 
1531   // Note: Until we support SIZE_MAX sized messages, we need this check
1532 #if (SIZE_MAX > UINT32_MAX)
1533   if (length > UINT32_MAX) {
1534     LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
1535          length, UINT32_MAX));
1536     CloseInt(channel);
1537     return;
1538   }
1539 #endif
1540   uint32_t data_length = (uint32_t)length;
1541 
1542   // XXX A closed channel may trip this... check
1543   // NOTE: the updated spec from the IETF says we should set in-order until we
1544   // receive an ACK. That would make this code moot.  Keep it for now for
1545   // backwards compatibility.
1546   if (!channel) {
1547     // In the updated 0-RTT open case, the sender can send data immediately
1548     // after Open, and doesn't set the in-order bit (since we don't have a
1549     // response or ack).  Also, with external negotiation, data can come in
1550     // before we're told about the external negotiation.  We need to buffer
1551     // data until either a) Open comes in, if the ordering get messed up,
1552     // or b) the app tells us this channel was externally negotiated.  When
1553     // these occur, we deliver the data.
1554 
1555     // Since this is rare and non-performance, keep a single list of queued
1556     // data messages to deliver once the channel opens.
1557     LOG(("Queuing data for stream %u, length %u", stream, data_length));
1558     // Copies data
1559     mQueuedData.AppendElement(
1560         new QueuedDataMessage(stream, ppid, flags, data, data_length));
1561     return;
1562   }
1563 
1564   // Ignore incoming data in case the channel is closed
1565   if (channel->mState == CLOSED) {
1566     return;
1567   }
1568 
1569   bool is_binary = true;
1570   uint8_t bufferFlags;
1571   int32_t type;
1572   const char *info = "";
1573 
1574   if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
1575       ppid == DATA_CHANNEL_PPID_DOMSTRING) {
1576     is_binary = false;
1577   }
1578   if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1579     NS_WARNING("DataChannel message aborted by fragment type change!");
1580     // TODO: Maybe closing would be better as this is a hard to detect protocol
1581     // violation?
1582     channel->mRecvBuffer.Truncate(0);
1583   }
1584   channel->mIsRecvBinary = is_binary;
1585 
1586   // Remaining chunks of previously truncated message (due to the buffer being
1587   // full)?
1588   if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
1589     LOG(
1590         ("DataChannel: Ignoring partial message of length %u, buffer full and "
1591          "closing",
1592          data_length));
1593     // Only unblock if unordered
1594     if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
1595         (flags & MSG_EOR)) {
1596       channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1597     }
1598   }
1599 
1600   // Buffer message until complete
1601   bufferFlags =
1602       BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
1603   if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1604     LOG(
1605         ("DataChannel: Buffered message would become too large to handle, "
1606          "closing channel"));
1607     channel->mRecvBuffer.Truncate(0);
1608     channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1609     CloseInt(channel);
1610     return;
1611   }
1612   if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1613     LOG(
1614         ("DataChannel: Partial %s message of length %u (total %u) on channel "
1615          "id %u",
1616          is_binary ? "binary" : "string", data_length,
1617          channel->mRecvBuffer.Length(), channel->mStream));
1618     return;  // Not ready to notify application
1619   }
1620   if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1621     data_length = channel->mRecvBuffer.Length();
1622   }
1623 
1624   // Complain about large messages (only complain - we can handle it)
1625   if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1626     LOG(
1627         ("DataChannel: Received message of length %u is > announced maximum "
1628          "message size (%u)",
1629          data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
1630   }
1631 
1632   switch (ppid) {
1633     case DATA_CHANNEL_PPID_DOMSTRING:
1634       LOG(("DataChannel: Received string message of length %u on channel %u",
1635            data_length, channel->mStream));
1636       type = DataChannelOnMessageAvailable::ON_DATA_STRING;
1637       if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1638         info = " (string fragmented)";
1639       }
1640       // else send using recvData normally
1641 
1642       // WebSockets checks IsUTF8() here; we can try to deliver it
1643       break;
1644 
1645     case DATA_CHANNEL_PPID_BINARY:
1646       LOG(("DataChannel: Received binary message of length %u on channel id %u",
1647            data_length, channel->mStream));
1648       type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
1649       if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1650         info = " (binary fragmented)";
1651       }
1652 
1653       // else send using recvData normally
1654       break;
1655 
1656     default:
1657       NS_ERROR("Unknown data PPID");
1658       return;
1659   }
1660 
1661   // Notify onmessage
1662   LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
1663        (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING"
1664                                                                : "BINARY",
1665        info, channel));
1666   if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1667     channel->SendOrQueue(new DataChannelOnMessageAvailable(
1668         type, this, channel, channel->mRecvBuffer));
1669     channel->mRecvBuffer.Truncate(0);
1670   } else {
1671     nsAutoCString recvData(buffer, data_length);  // copies (<64) or allocates
1672     channel->SendOrQueue(
1673         new DataChannelOnMessageAvailable(type, this, channel, recvData));
1674   }
1675 }
1676 
1677 void DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length,
1678                                               uint32_t ppid, uint16_t stream,
1679                                               int flags) {
1680   const struct rtcweb_datachannel_open_request *req;
1681   const struct rtcweb_datachannel_ack *ack;
1682 
1683   // Note: Until we support SIZE_MAX sized messages, we need this check
1684 #if (SIZE_MAX > UINT32_MAX)
1685   if (length > UINT32_MAX) {
1686     LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length,
1687          UINT32_MAX));
1688     Stop();
1689     return;
1690   }
1691 #endif
1692   uint32_t data_length = (uint32_t)length;
1693 
1694   mLock.AssertCurrentThreadOwns();
1695 
1696   // Buffer message until complete
1697   const uint8_t bufferFlags =
1698       BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
1699   if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1700     LOG(
1701         ("DataChannel: Buffered message would become too large to handle, "
1702          "closing connection"));
1703     mRecvBuffer.Truncate(0);
1704     Stop();
1705     return;
1706   }
1707   if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1708     LOG(("Buffered partial DCEP message of length %u", data_length));
1709     return;
1710   }
1711   if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1712     buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
1713     data_length = mRecvBuffer.Length();
1714   }
1715 
1716   req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1717   LOG(("Handling DCEP message of length %u", data_length));
1718 
1719   // Ensure minimum message size (ack is the smallest DCEP message)
1720   if ((size_t)data_length < sizeof(*ack)) {
1721     LOG(("Ignored invalid DCEP message (too short)"));
1722     return;
1723   }
1724 
1725   switch (req->msg_type) {
1726     case DATA_CHANNEL_OPEN_REQUEST:
1727       // structure includes a possibly-unused char label[1] (in a packed
1728       // structure)
1729       if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
1730         return;
1731       }
1732 
1733       HandleOpenRequestMessage(req, data_length, stream);
1734       break;
1735     case DATA_CHANNEL_ACK:
1736       // >= sizeof(*ack) checked above
1737 
1738       ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1739       HandleOpenAckMessage(ack, data_length, stream);
1740       break;
1741     default:
1742       HandleUnknownMessage(ppid, data_length, stream);
1743       break;
1744   }
1745 
1746   // Reset buffer
1747   mRecvBuffer.Truncate(0);
1748 }
1749 
1750 // Called with mLock locked!
1751 void DataChannelConnection::HandleMessage(const void *buffer, size_t length,
1752                                           uint32_t ppid, uint16_t stream,
1753                                           int flags) {
1754   mLock.AssertCurrentThreadOwns();
1755 
1756   switch (ppid) {
1757     case DATA_CHANNEL_PPID_CONTROL:
1758       HandleDCEPMessage(buffer, length, ppid, stream, flags);
1759       break;
1760     case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
1761     case DATA_CHANNEL_PPID_DOMSTRING:
1762     case DATA_CHANNEL_PPID_BINARY_PARTIAL:
1763     case DATA_CHANNEL_PPID_BINARY:
1764       HandleDataMessage(buffer, length, ppid, stream, flags);
1765       break;
1766     default:
1767       LOG(("Message of length %zu PPID %u on stream %u received (%s).", length,
1768            ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
1769       break;
1770   }
1771 }
1772 
1773 void DataChannelConnection::HandleAssociationChangeEvent(
1774     const struct sctp_assoc_change *sac) {
1775   uint32_t i, n;
1776 
1777   switch (sac->sac_state) {
1778     case SCTP_COMM_UP:
1779       LOG(("Association change: SCTP_COMM_UP"));
1780       if (mState == CONNECTING) {
1781         mSocket = mMasterSocket;
1782         mState = OPEN;
1783 
1784         // Check for older Firefox by looking at the amount of incoming streams
1785         LOG(("Negotiated number of incoming streams: %" PRIu16,
1786              sac->sac_inbound_streams));
1787         if (!mMaxMessageSizeSet &&
1788             sac->sac_inbound_streams ==
1789                 WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
1790           LOG(("Older Firefox detected, using PPID-based fragmentation"));
1791           mPpidFragmentation = true;
1792         }
1793 
1794         SetEvenOdd();
1795 
1796         Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1797             DataChannelOnMessageAvailable::ON_CONNECTION, this)));
1798         LOG(("DTLS connect() succeeded!  Entering connected mode"));
1799 
1800         // Open any streams pending...
1801         ProcessQueuedOpens();
1802 
1803       } else if (mState == OPEN) {
1804         LOG(("DataConnection Already OPEN"));
1805       } else {
1806         LOG(("Unexpected state: %d", mState));
1807       }
1808       break;
1809     case SCTP_COMM_LOST:
1810       LOG(("Association change: SCTP_COMM_LOST"));
1811       // This association is toast, so also close all the channels -- from
1812       // mainthread!
1813       Stop();
1814       break;
1815     case SCTP_RESTART:
1816       LOG(("Association change: SCTP_RESTART"));
1817       break;
1818     case SCTP_SHUTDOWN_COMP:
1819       LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1820       Stop();
1821       break;
1822     case SCTP_CANT_STR_ASSOC:
1823       LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1824       break;
1825     default:
1826       LOG(("Association change: UNKNOWN"));
1827       break;
1828   }
1829   LOG(("Association change: streams (in/out) = (%u/%u)",
1830        sac->sac_inbound_streams, sac->sac_outbound_streams));
1831 
1832   if (NS_WARN_IF(!sac)) {
1833     return;
1834   }
1835 
1836   n = sac->sac_length - sizeof(*sac);
1837   if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
1838     if (n > 0) {
1839       for (i = 0; i < n; ++i) {
1840         switch (sac->sac_info[i]) {
1841           case SCTP_ASSOC_SUPPORTS_PR:
1842             LOG(("Supports: PR"));
1843             break;
1844           case SCTP_ASSOC_SUPPORTS_AUTH:
1845             LOG(("Supports: AUTH"));
1846             break;
1847           case SCTP_ASSOC_SUPPORTS_ASCONF:
1848             LOG(("Supports: ASCONF"));
1849             break;
1850           case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1851             LOG(("Supports: MULTIBUF"));
1852             break;
1853           case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1854             LOG(("Supports: RE-CONFIG"));
1855             break;
1856 #if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
1857           case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
1858             LOG(("Supports: NDATA"));
1859             // TODO: This should probably be set earlier above in 'case
1860             //       SCTP_COMM_UP' but we also need this for 'SCTP_RESTART'.
1861             mSendInterleaved = true;
1862             break;
1863 #endif
1864           default:
1865             LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1866             break;
1867         }
1868       }
1869     }
1870   } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1871               (sac->sac_state == SCTP_CANT_STR_ASSOC)) &&
1872              (n > 0)) {
1873     LOG(("Association: ABORT ="));
1874     for (i = 0; i < n; ++i) {
1875       LOG((" 0x%02x", sac->sac_info[i]));
1876     }
1877   }
1878   if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1879       (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1880       (sac->sac_state == SCTP_COMM_LOST)) {
1881     return;
1882   }
1883 }
1884 
1885 void DataChannelConnection::HandlePeerAddressChangeEvent(
1886     const struct sctp_paddr_change *spc) {
1887   const char *addr = "";
1888 #if !defined(__Userspace_os_Windows)
1889   char addr_buf[INET6_ADDRSTRLEN];
1890   struct sockaddr_in *sin;
1891   struct sockaddr_in6 *sin6;
1892 #endif
1893 
1894   switch (spc->spc_aaddr.ss_family) {
1895     case AF_INET:
1896 #if !defined(__Userspace_os_Windows)
1897       sin = (struct sockaddr_in *)&spc->spc_aaddr;
1898       addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1899 #endif
1900       break;
1901     case AF_INET6:
1902 #if !defined(__Userspace_os_Windows)
1903       sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1904       addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1905 #endif
1906       break;
1907     case AF_CONN:
1908       addr = "DTLS connection";
1909       break;
1910     default:
1911       break;
1912   }
1913   LOG(("Peer address %s is now ", addr));
1914   switch (spc->spc_state) {
1915     case SCTP_ADDR_AVAILABLE:
1916       LOG(("SCTP_ADDR_AVAILABLE"));
1917       break;
1918     case SCTP_ADDR_UNREACHABLE:
1919       LOG(("SCTP_ADDR_UNREACHABLE"));
1920       break;
1921     case SCTP_ADDR_REMOVED:
1922       LOG(("SCTP_ADDR_REMOVED"));
1923       break;
1924     case SCTP_ADDR_ADDED:
1925       LOG(("SCTP_ADDR_ADDED"));
1926       break;
1927     case SCTP_ADDR_MADE_PRIM:
1928       LOG(("SCTP_ADDR_MADE_PRIM"));
1929       break;
1930     case SCTP_ADDR_CONFIRMED:
1931       LOG(("SCTP_ADDR_CONFIRMED"));
1932       break;
1933     default:
1934       LOG(("UNKNOWN"));
1935       break;
1936   }
1937   LOG((" (error = 0x%08x).\n", spc->spc_error));
1938 }
1939 
1940 void DataChannelConnection::HandleRemoteErrorEvent(
1941     const struct sctp_remote_error *sre) {
1942   size_t i, n;
1943 
1944   n = sre->sre_length - sizeof(struct sctp_remote_error);
1945   LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1946   for (i = 0; i < n; ++i) {
1947     LOG((" 0x%02x", sre->sre_data[i]));
1948   }
1949 }
1950 
1951 void DataChannelConnection::HandleShutdownEvent(
1952     const struct sctp_shutdown_event *sse) {
1953   LOG(("Shutdown event."));
1954   /* XXX: notify all channels. */
1955   // Attempts to actually send anything will fail
1956 }
1957 
1958 void DataChannelConnection::HandleAdaptationIndication(
1959     const struct sctp_adaptation_event *sai) {
1960   LOG(("Adaptation indication: %x.", sai->sai_adaptation_ind));
1961 }
1962 
1963 void DataChannelConnection::HandlePartialDeliveryEvent(
1964     const struct sctp_pdapi_event *spde) {
1965   // Note: Be aware that stream and sequence number being u32 instead of u16 is
1966   //       a bug in the SCTP API. This may change in the future.
1967 
1968   LOG(("Partial delivery event: "));
1969   switch (spde->pdapi_indication) {
1970     case SCTP_PARTIAL_DELIVERY_ABORTED:
1971       LOG(("delivery aborted "));
1972       break;
1973     default:
1974       LOG(("??? "));
1975       break;
1976   }
1977   LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags,
1978        spde->pdapi_stream, spde->pdapi_seq));
1979 
1980   // Validate stream ID
1981   if (spde->pdapi_stream >= UINT16_MAX) {
1982     LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n",
1983          spde->pdapi_stream));
1984     return;
1985   }
1986 
1987   // Find channel and reset buffer
1988   DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
1989   if (channel) {
1990     LOG(("Abort partially delivered message of %u bytes\n",
1991          channel->mRecvBuffer.Length()));
1992     channel->mRecvBuffer.Truncate(0);
1993   }
1994 }
1995 
1996 void DataChannelConnection::HandleSendFailedEvent(
1997     const struct sctp_send_failed_event *ssfe) {
1998   size_t i, n;
1999 
2000   if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
2001     LOG(("Unsent "));
2002   }
2003   if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
2004     LOG(("Sent "));
2005   }
2006   if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
2007     LOG(("(flags = %x) ", ssfe->ssfe_flags));
2008   }
2009   LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
2010        ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
2011        ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
2012   n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
2013   for (i = 0; i < n; ++i) {
2014     LOG((" 0x%02x", ssfe->ssfe_data[i]));
2015   }
2016 }
2017 
2018 void DataChannelConnection::ClearResets() {
2019   // Clear all pending resets
2020   if (!mStreamsResetting.IsEmpty()) {
2021     LOG(("Clearing resets for %zu streams", mStreamsResetting.Length()));
2022   }
2023 
2024   for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
2025     RefPtr<DataChannel> channel;
2026     channel = FindChannelByStream(mStreamsResetting[i]);
2027     if (channel) {
2028       LOG(("Forgetting channel %u (%p) with pending reset", channel->mStream,
2029            channel.get()));
2030       mStreams[channel->mStream] = nullptr;
2031     }
2032   }
2033   mStreamsResetting.Clear();
2034 }
2035 
2036 void DataChannelConnection::ResetOutgoingStream(uint16_t stream) {
2037   uint32_t i;
2038 
2039   mLock.AssertCurrentThreadOwns();
2040   LOG(("Connection %p: Resetting outgoing stream %u", (void *)this, stream));
2041   // Rarely has more than a couple items and only for a short time
2042   for (i = 0; i < mStreamsResetting.Length(); ++i) {
2043     if (mStreamsResetting[i] == stream) {
2044       return;
2045     }
2046   }
2047   mStreamsResetting.AppendElement(stream);
2048 }
2049 
2050 void DataChannelConnection::SendOutgoingStreamReset() {
2051   struct sctp_reset_streams *srs;
2052   uint32_t i;
2053   size_t len;
2054 
2055   LOG(("Connection %p: Sending outgoing stream reset for %zu streams",
2056        (void *)this, mStreamsResetting.Length()));
2057   mLock.AssertCurrentThreadOwns();
2058   if (mStreamsResetting.IsEmpty()) {
2059     LOG(("No streams to reset"));
2060     return;
2061   }
2062   len = sizeof(sctp_assoc_t) +
2063         (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
2064   srs = static_cast<struct sctp_reset_streams *>(
2065       moz_xmalloc(len));  // infallible malloc
2066   memset(srs, 0, len);
2067   srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
2068   srs->srs_number_streams = mStreamsResetting.Length();
2069   for (i = 0; i < mStreamsResetting.Length(); ++i) {
2070     srs->srs_stream_list[i] = mStreamsResetting[i];
2071   }
2072   if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs,
2073                          (socklen_t)len) < 0) {
2074     LOG(("***failed: setsockopt RESET, errno %d", errno));
2075     // if errno == EALREADY, this is normal - we can't send another reset
2076     // with one pending.
2077     // When we get an incoming reset (which may be a response to our
2078     // outstanding one), see if we have any pending outgoing resets and
2079     // send them
2080   } else {
2081     mStreamsResetting.Clear();
2082   }
2083   free(srs);
2084 }
2085 
2086 void DataChannelConnection::HandleStreamResetEvent(
2087     const struct sctp_stream_reset_event *strrst) {
2088   uint32_t n, i;
2089   RefPtr<DataChannel> channel;  // since we may null out the ref to the channel
2090 
2091   if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
2092       !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
2093     n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) /
2094         sizeof(uint16_t);
2095     for (i = 0; i < n; ++i) {
2096       if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
2097         channel = FindChannelByStream(strrst->strreset_stream_list[i]);
2098         if (channel) {
2099           // The other side closed the channel
2100           // We could be in three states:
2101           // 1. Normal state (input and output streams (OPEN)
2102           //    Notify application, send a RESET in response on our
2103           //    outbound channel.  Go to CLOSED
2104           // 2. We sent our own reset (CLOSING); either they crossed on the
2105           //    wire, or this is a response to our Reset.
2106           //    Go to CLOSED
2107           // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
2108           //    I believe this is impossible, as we don't have an input stream
2109           //    yet.
2110 
2111           LOG(("Incoming: Channel %u  closed, state %d", channel->mStream,
2112                channel->mState));
2113           ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
2114                         channel->mState == DataChannel::CLOSING ||
2115                         channel->mState == DataChannel::CONNECTING ||
2116                         channel->mState == DataChannel::WAITING_TO_OPEN);
2117           if (channel->mState == DataChannel::OPEN ||
2118               channel->mState == DataChannel::WAITING_TO_OPEN) {
2119             // Mark the stream for reset (the reset is sent below)
2120             ResetOutgoingStream(channel->mStream);
2121           }
2122           mStreams[channel->mStream] = nullptr;
2123 
2124           LOG(("Disconnected DataChannel %p from connection %p",
2125                (void *)channel.get(), (void *)channel->mConnection.get()));
2126           // This sends ON_CHANNEL_CLOSED to mainthread
2127           channel->StreamClosedLocked();
2128         } else {
2129           LOG(("Can't find incoming channel %d", i));
2130         }
2131       }
2132     }
2133   }
2134 
2135   // Process any pending resets now:
2136   if (!mStreamsResetting.IsEmpty()) {
2137     LOG(("Sending %zu pending resets", mStreamsResetting.Length()));
2138     SendOutgoingStreamReset();
2139   }
2140 }
2141 
2142 void DataChannelConnection::HandleStreamChangeEvent(
2143     const struct sctp_stream_change_event *strchg) {
2144   uint16_t stream;
2145   RefPtr<DataChannel> channel;
2146 
2147   if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
2148     LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
2149          mStreams.Length(), strchg->strchange_instrms,
2150          strchg->strchange_outstrms));
2151     // XXX FIX! notify pending opens of failure
2152     return;
2153   }
2154   if (strchg->strchange_instrms > mStreams.Length()) {
2155     LOG(("Other side increased streams from %zu to %u", mStreams.Length(),
2156          strchg->strchange_instrms));
2157   }
2158   if (strchg->strchange_outstrms > mStreams.Length() ||
2159       strchg->strchange_instrms > mStreams.Length()) {
2160     uint16_t old_len = mStreams.Length();
2161     uint16_t new_len =
2162         std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
2163     LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
2164          old_len, new_len, new_len - old_len, strchg->strchange_instrms));
2165     // make sure both are the same length
2166     mStreams.AppendElements(new_len - old_len);
2167     LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
2168     for (size_t i = old_len; i < mStreams.Length(); ++i) {
2169       mStreams[i] = nullptr;
2170     }
2171     // Re-process any channels waiting for streams.
2172     // Linear search, but we don't increase channels often and
2173     // the array would only get long in case of an app error normally
2174 
2175     // Make sure we request enough streams if there's a big jump in streams
2176     // Could make a more complex API for OpenXxxFinish() and avoid this loop
2177     size_t num_needed = mPending.GetSize();
2178     LOG(("%zu of %d new streams already needed", num_needed,
2179          new_len - old_len));
2180     num_needed -= (new_len - old_len);  // number we added
2181     if (num_needed > 0) {
2182       if (num_needed < 16) num_needed = 16;
2183       LOG(("Not enough new streams, asking for %zu more", num_needed));
2184       // TODO: parameter is an int32_t but we pass size_t
2185       RequestMoreStreams(num_needed);
2186     } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
2187       LOG(("Requesting %d output streams to match partner",
2188            strchg->strchange_instrms - strchg->strchange_outstrms));
2189       RequestMoreStreams(strchg->strchange_instrms -
2190                          strchg->strchange_outstrms);
2191     }
2192 
2193     ProcessQueuedOpens();
2194   }
2195   // else probably not a change in # of streams
2196 
2197   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2198     channel = mStreams[i];
2199     if (!channel) continue;
2200 
2201     if ((channel->mState == CONNECTING) &&
2202         (channel->mStream == INVALID_STREAM)) {
2203       if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
2204           (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
2205         /* XXX: Signal to the other end. */
2206         channel->mState = CLOSED;
2207         Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2208             DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2209         // maybe fire onError (bug 843625)
2210       } else {
2211         stream = FindFreeStream();
2212         if (stream != INVALID_STREAM) {
2213           channel->mStream = stream;
2214           mStreams[stream] = channel;
2215 
2216           // Send open request
2217           int error = SendOpenRequestMessage(
2218               channel->mLabel, channel->mProtocol, channel->mStream,
2219               !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2220               channel->mPrPolicy, channel->mPrValue);
2221           if (error) {
2222             LOG(("SendOpenRequest failed, error = %d", error));
2223             // Close the channel, inform the user
2224             mStreams[channel->mStream] = nullptr;
2225             channel->mState = CLOSED;
2226             // Don't need to reset; we didn't open it
2227             Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2228                 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2229                 channel)));
2230           } else {
2231             channel->mState = OPEN;
2232             channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2233             LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
2234                  channel.get()));
2235             Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2236                 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2237                 channel)));
2238           }
2239         } else {
2240           /* We will not find more ... */
2241           break;
2242         }
2243       }
2244     }
2245   }
2246 }
2247 
2248 // Called with mLock locked!
2249 void DataChannelConnection::HandleNotification(
2250     const union sctp_notification *notif, size_t n) {
2251   mLock.AssertCurrentThreadOwns();
2252   if (notif->sn_header.sn_length != (uint32_t)n) {
2253     return;
2254   }
2255   switch (notif->sn_header.sn_type) {
2256     case SCTP_ASSOC_CHANGE:
2257       HandleAssociationChangeEvent(&(notif->sn_assoc_change));
2258       break;
2259     case SCTP_PEER_ADDR_CHANGE:
2260       HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
2261       break;
2262     case SCTP_REMOTE_ERROR:
2263       HandleRemoteErrorEvent(&(notif->sn_remote_error));
2264       break;
2265     case SCTP_SHUTDOWN_EVENT:
2266       HandleShutdownEvent(&(notif->sn_shutdown_event));
2267       break;
2268     case SCTP_ADAPTATION_INDICATION:
2269       HandleAdaptationIndication(&(notif->sn_adaptation_event));
2270       break;
2271     case SCTP_AUTHENTICATION_EVENT:
2272       LOG(("SCTP_AUTHENTICATION_EVENT"));
2273       break;
2274     case SCTP_SENDER_DRY_EVENT:
2275       // LOG(("SCTP_SENDER_DRY_EVENT"));
2276       break;
2277     case SCTP_NOTIFICATIONS_STOPPED_EVENT:
2278       LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
2279       break;
2280     case SCTP_PARTIAL_DELIVERY_EVENT:
2281       HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
2282       break;
2283     case SCTP_SEND_FAILED_EVENT:
2284       HandleSendFailedEvent(&(notif->sn_send_failed_event));
2285       break;
2286     case SCTP_STREAM_RESET_EVENT:
2287       HandleStreamResetEvent(&(notif->sn_strreset_event));
2288       break;
2289     case SCTP_ASSOC_RESET_EVENT:
2290       LOG(("SCTP_ASSOC_RESET_EVENT"));
2291       break;
2292     case SCTP_STREAM_CHANGE_EVENT:
2293       HandleStreamChangeEvent(&(notif->sn_strchange_event));
2294       break;
2295     default:
2296       LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
2297       break;
2298   }
2299 }
2300 
2301 int DataChannelConnection::ReceiveCallback(struct socket *sock, void *data,
2302                                            size_t datalen,
2303                                            struct sctp_rcvinfo rcv, int flags) {
2304   ASSERT_WEBRTC(!NS_IsMainThread());
2305 
2306   if (!data) {
2307     LOG(("ReceiveCallback: SCTP has finished shutting down"));
2308   } else {
2309     bool locked = false;
2310     if (!IsSTSThread()) {
2311       mLock.Lock();
2312       locked = true;
2313     } else {
2314       mLock.AssertCurrentThreadOwns();
2315     }
2316     if (flags & MSG_NOTIFICATION) {
2317       HandleNotification(static_cast<union sctp_notification *>(data), datalen);
2318     } else {
2319       HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
2320     }
2321     if (locked) {
2322       mLock.Unlock();
2323     }
2324   }
2325   // sctp allocates 'data' with malloc(), and expects the receiver to free
2326   // it (presumably with free).
2327   // XXX future optimization: try to deliver messages without an internal
2328   // alloc/copy, and if so delay the free until later.
2329   free(data);
2330   // usrsctp defines the callback as returning an int, but doesn't use it
2331   return 1;
2332 }
2333 
2334 already_AddRefed<DataChannel> DataChannelConnection::Open(
2335     const nsACString &label, const nsACString &protocol, Type type,
2336     bool inOrder, uint32_t prValue, DataChannelListener *aListener,
2337     nsISupports *aContext, bool aExternalNegotiated, uint16_t aStream) {
2338   // aStream == INVALID_STREAM to have the protocol allocate
2339   uint16_t prPolicy = SCTP_PR_SCTP_NONE;
2340   uint32_t flags;
2341 
2342   LOG(
2343       ("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, "
2344        "context %p, external: %s, stream %u",
2345        PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
2346        type, inOrder, prValue, aListener, aContext,
2347        aExternalNegotiated ? "true" : "false", aStream));
2348   switch (type) {
2349     case DATA_CHANNEL_RELIABLE:
2350       prPolicy = SCTP_PR_SCTP_NONE;
2351       break;
2352     case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
2353       prPolicy = SCTP_PR_SCTP_RTX;
2354       break;
2355     case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
2356       prPolicy = SCTP_PR_SCTP_TTL;
2357       break;
2358     default:
2359       LOG(("ERROR: unsupported channel type: %u", type));
2360       MOZ_ASSERT(false);
2361       return nullptr;
2362   }
2363   if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
2364     return nullptr;
2365   }
2366 
2367   // Don't look past currently-negotiated streams
2368   if (aStream != INVALID_STREAM && aStream < mStreams.Length() &&
2369       mStreams[aStream]) {
2370     LOG(("ERROR: external negotiation of already-open channel %u", aStream));
2371     // XXX How do we indicate this up to the application?  Probably the
2372     // caller's job, but we may need to return an error code.
2373     return nullptr;
2374   }
2375 
2376   flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
2377 
2378   RefPtr<DataChannel> channel(
2379       new DataChannel(this, aStream, DataChannel::CONNECTING, label, protocol,
2380                       prPolicy, prValue, flags, aListener, aContext));
2381   if (aExternalNegotiated) {
2382     channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
2383   }
2384 
2385   MutexAutoLock lock(mLock);  // OpenFinish assumes this
2386   return OpenFinish(channel.forget());
2387 }
2388 
2389 // Separate routine so we can also call it to finish up from pending opens
2390 already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
2391     already_AddRefed<DataChannel> &&aChannel) {
2392   RefPtr<DataChannel> channel(aChannel);  // takes the reference passed in
2393   // Normally 1 reference if called from ::Open(), or 2 if called from
2394   // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2395   uint16_t stream = channel->mStream;
2396   bool queue = false;
2397 
2398   mLock.AssertCurrentThreadOwns();
2399 
2400   // Cases we care about:
2401   // Pre-negotiated:
2402   //    Not Open:
2403   //      Doesn't fit:
2404   //         -> change initial ask or renegotiate after open
2405   //      -> queue open
2406   //    Open:
2407   //      Doesn't fit:
2408   //         -> RequestMoreStreams && queue
2409   //      Does fit:
2410   //         -> open
2411   // Not negotiated:
2412   //    Not Open:
2413   //      -> queue open
2414   //    Open:
2415   //      -> Try to get a stream
2416   //      Doesn't fit:
2417   //         -> RequestMoreStreams && queue
2418   //      Does fit:
2419   //         -> open
2420   // So the Open cases are basically the same
2421   // Not Open cases are simply queue for non-negotiated, and
2422   // either change the initial ask or possibly renegotiate after open.
2423 
2424   if (mState == OPEN) {
2425     if (stream == INVALID_STREAM) {
2426       stream = FindFreeStream();  // may be INVALID_STREAM if we need more
2427     }
2428     if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2429       // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra
2430       // streams to avoid going back immediately for more if the ask to N, N+1,
2431       // etc
2432       int32_t more_needed = (stream == INVALID_STREAM)
2433                                 ? 16
2434                                 : (stream - ((int32_t)mStreams.Length())) + 16;
2435       if (!RequestMoreStreams(more_needed)) {
2436         // Something bad happened... we're done
2437         goto request_error_cleanup;
2438       }
2439       queue = true;
2440     }
2441   } else {
2442     // not OPEN
2443     if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2444         mState == CLOSED) {
2445       // Update number of streams for init message
2446       struct sctp_initmsg initmsg;
2447       socklen_t len = sizeof(initmsg);
2448       int32_t total_needed = stream + 16;
2449 
2450       memset(&initmsg, 0, sizeof(initmsg));
2451       if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
2452                              &initmsg, &len) < 0) {
2453         LOG(("*** failed getsockopt SCTP_INITMSG"));
2454         goto request_error_cleanup;
2455       }
2456       LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2457            initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2458       initmsg.sinit_num_ostreams = total_needed;
2459       initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2460       if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
2461                              &initmsg, (socklen_t)sizeof(initmsg)) < 0) {
2462         LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2463         goto request_error_cleanup;
2464       }
2465 
2466       int32_t old_len = mStreams.Length();
2467       mStreams.AppendElements(total_needed - old_len);
2468       for (int32_t i = old_len; i < total_needed; ++i) {
2469         mStreams[i] = nullptr;
2470       }
2471     }
2472     // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2473     // is called, if needed
2474     queue = true;
2475   }
2476   if (queue) {
2477     LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2478     // Also serves to mark we told the app
2479     channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2480     // we need a ref for the nsDeQue and one to return
2481     DataChannel *rawChannel = channel;
2482     rawChannel->AddRef();
2483     mPending.Push(rawChannel);
2484     return channel.forget();
2485   }
2486 
2487   MOZ_ASSERT(stream != INVALID_STREAM);
2488   // just allocated (& OPEN), or externally negotiated
2489   mStreams[stream] = channel;  // holds a reference
2490   channel->mStream = stream;
2491 
2492 #ifdef TEST_QUEUED_DATA
2493   // It's painful to write a test for this...
2494   channel->mState = OPEN;
2495   channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2496   SendDataMsgInternalOrBuffer(channel, "Help me!", 8,
2497                               DATA_CHANNEL_PPID_DOMSTRING);
2498 #endif
2499 
2500   if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2501     // Don't send unordered until this gets cleared
2502     channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2503   }
2504 
2505   if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2506     int error = SendOpenRequestMessage(
2507         channel->mLabel, channel->mProtocol, stream,
2508         !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2509         channel->mPrPolicy, channel->mPrValue);
2510     if (error) {
2511       LOG(("SendOpenRequest failed, error = %d", error));
2512       if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2513         // We already returned the channel to the app.
2514         NS_ERROR("Failed to send open request");
2515         Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2516             DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2517       }
2518       // If we haven't returned the channel yet, it will get destroyed when we
2519       // exit this function.
2520       mStreams[stream] = nullptr;
2521       channel->mStream = INVALID_STREAM;
2522       // we'll be destroying the channel
2523       channel->mState = CLOSED;
2524       return nullptr;
2525       /* NOTREACHED */
2526     }
2527   }
2528   // Either externally negotiated or we sent Open
2529   channel->mState = OPEN;
2530   channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2531   // FIX?  Move into DOMDataChannel?  I don't think we can send it yet here
2532   LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2533   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2534       DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, channel)));
2535 
2536   return channel.forget();
2537 
2538 request_error_cleanup:
2539   channel->mState = CLOSED;
2540   if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2541     // We already returned the channel to the app.
2542     NS_ERROR("Failed to request more streams");
2543     Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2544         DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2545     return channel.forget();
2546   }
2547   // we'll be destroying the channel, but it never really got set up
2548   // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2549   // Dispatch it to ourselves
2550   return nullptr;
2551 }
2552 
2553 // Requires mLock to be locked!
2554 // Returns a POSIX error code directly instead of setting errno.
2555 int DataChannelConnection::SendMsgInternal(OutgoingMsg &msg) {
2556   auto &info = msg.GetInfo().sendv_sndinfo;
2557   int error;
2558 
2559   // EOR set?
2560   bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
2561 
2562   // Send until buffer is empty
2563   size_t left = msg.GetLeft();
2564   do {
2565     size_t length;
2566 
2567     // Carefully chunk the buffer
2568     if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
2569       length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
2570 
2571       // Unset EOR flag
2572       info.snd_flags &= ~SCTP_EOR;
2573     } else {
2574       length = left;
2575 
2576       // Set EOR flag
2577       if (eor_set) {
2578         info.snd_flags |= SCTP_EOR;
2579       }
2580     }
2581 
2582     // Send (or try at least)
2583     // SCTP will return EMSGSIZE if the message is bigger than the buffer
2584     // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
2585     // by carefully crafting small enough message chunks.
2586     ssize_t written = usrsctp_sendv(
2587         mSocket, msg.GetData(), length, nullptr, 0, (void *)&msg.GetInfo(),
2588         (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
2589     if (written < 0) {
2590       error = errno;
2591       goto out;
2592     }
2593     LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)", (size_t)written,
2594          length, left - (size_t)written));
2595 
2596     // TODO: Remove once resolved
2597     // (https://github.com/sctplab/usrsctp/issues/132)
2598     if (written == 0) {
2599       LOG(("@tuexen: usrsctp_sendv returned 0"));
2600       error = EAGAIN;
2601       goto out;
2602     }
2603 
2604     // If not all bytes have been written, this obviously means that usrsctp's
2605     // buffer is full and we need to try again later.
2606     if ((size_t)written < length) {
2607       msg.Advance((size_t)written);
2608       error = EAGAIN;
2609       goto out;
2610     }
2611 
2612     // Update buffer position
2613     msg.Advance((size_t)written);
2614 
2615     // Get amount of bytes left in the buffer
2616     left = msg.GetLeft();
2617   } while (left > 0);
2618 
2619   // Done
2620   error = 0;
2621 
2622 out:
2623   // Reset EOR flag
2624   if (eor_set) {
2625     info.snd_flags |= SCTP_EOR;
2626   }
2627 
2628   return error;
2629 }
2630 
2631 // Requires mLock to be locked!
2632 // Returns a POSIX error code directly instead of setting errno.
2633 // IMPORTANT: Ensure that the buffer passed is guarded by mLock!
2634 int DataChannelConnection::SendMsgInternalOrBuffer(
2635     nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer, OutgoingMsg &msg,
2636     bool &buffered) {
2637   NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
2638 
2639   int error = 0;
2640   bool need_buffering = false;
2641 
2642   // Note: Main-thread IO, but doesn't block!
2643   // XXX FIX!  to deal with heavy overruns of JS trying to pass data in
2644   // (more than the buffersize) queue data onto another thread to do the
2645   // actual sends.  See netwerk/protocol/websocket/WebSocketChannel.cpp
2646 
2647   // Avoid a race between buffer-full-failure (where we have to add the
2648   // packet to the buffered-data queue) and the buffer-now-only-half-full
2649   // callback, which happens on a different thread.  Otherwise we might
2650   // fail here, then before we add it to the queue get the half-full
2651   // callback, find nothing to do, then on this thread add it to the
2652   // queue - which would sit there.  Also, if we later send more data, it
2653   // would arrive ahead of the buffered message, but if the buffer ever
2654   // got to 1/2 full, the message would get sent - but at a semi-random
2655   // time, after other data it was supposed to be in front of.
2656 
2657   // Must lock before empty check for similar reasons!
2658   mLock.AssertCurrentThreadOwns();
2659   if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
2660     error = SendMsgInternal(msg);
2661     switch (error) {
2662       case 0:
2663         break;
2664       case EAGAIN:
2665 #if (EAGAIN != EWOULDBLOCK)
2666       case EWOULDBLOCK:
2667 #endif
2668         need_buffering = true;
2669         break;
2670       default:
2671         LOG(("error %d on sending", error));
2672         break;
2673     }
2674   } else {
2675     need_buffering = true;
2676   }
2677 
2678   if (need_buffering) {
2679     // queue data for resend!  And queue any further data for the stream until
2680     // it is...
2681     auto *bufferedMsg = new BufferedOutgoingMsg(msg);  // infallible malloc
2682     buffer.AppendElement(bufferedMsg);  // owned by mBufferedData array
2683     LOG(("Queued %zu buffers (left=%zu, total=%zu)", buffer.Length(),
2684          msg.GetLeft(), msg.GetLength()));
2685     buffered = true;
2686     return 0;
2687   }
2688 
2689   buffered = false;
2690   return error;
2691 }
2692 
2693 // Caller must ensure that length <= UINT32_MAX
2694 // Returns a POSIX error code.
2695 int DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel,
2696                                                        const uint8_t *data,
2697                                                        size_t len,
2698                                                        uint32_t ppid) {
2699   if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
2700     return EINVAL;  // TODO: Find a better error code
2701   }
2702 
2703   struct sctp_sendv_spa info = {0};
2704 
2705   // General flags
2706   info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2707 
2708   // Set stream identifier, protocol identifier and flags
2709   info.sendv_sndinfo.snd_sid = channel.mStream;
2710   info.sendv_sndinfo.snd_flags = SCTP_EOR;
2711   info.sendv_sndinfo.snd_ppid = htonl(ppid);
2712 
2713   // Unordered?
2714   // To avoid problems where an in-order OPEN is lost and an
2715   // out-of-order data message "beats" it, require data to be in-order
2716   // until we get an ACK.
2717   if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2718       !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2719     info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
2720   }
2721 
2722   // Partial reliability policy
2723   if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
2724     info.sendv_prinfo.pr_policy = channel.mPrPolicy;
2725     info.sendv_prinfo.pr_value = channel.mPrValue;
2726     info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2727   }
2728 
2729   // Create message instance and send
2730   OutgoingMsg msg(info, data, len);
2731   MutexAutoLock lock(mLock);
2732   bool buffered;
2733   int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
2734 
2735   // Set pending type and stream index (if buffered)
2736   if (!error && buffered && !mPendingType) {
2737     mPendingType = PENDING_DATA;
2738     mCurrentStream = channel.mStream;
2739   }
2740   return error;
2741 }
2742 
2743 // Caller must ensure that length <= UINT32_MAX
2744 // Returns a POSIX error code.
2745 int DataChannelConnection::SendDataMsg(DataChannel &channel,
2746                                        const uint8_t *data, size_t len,
2747                                        uint32_t ppidPartial,
2748                                        uint32_t ppidFinal) {
2749   // We *really* don't want to do this from main thread! - and
2750   // SendDataMsgInternalOrBuffer avoids blocking.
2751 
2752   if (mPpidFragmentation) {
2753     // TODO: Bug 1381136, remove this block and all other code that uses PPIDs
2754     //       for fragmentation and reassembly once older Firefoxes without EOR
2755     //       are no longer supported as target clients.
2756 
2757     // Use the deprecated PPID-level fragmentation if enabled. Should be enabled
2758     // in case we can be certain that the other peer is an older Firefox browser
2759     // that does support PPID-level fragmentation/reassembly.
2760 
2761     // PPID-level fragmentation can only be applied on reliable data channels.
2762     if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2763         channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
2764         !(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2765       LOG((
2766           "Sending data message (total=%zu) using deprecated PPID-based chunks",
2767           len));
2768 
2769       size_t left = len;
2770       while (left > 0) {
2771         // Note: For correctness, chunkLen should also consider mMaxMessageSize
2772         //       as minimum but as this block is going to be removed soon, I
2773         //       see no need for it.
2774         size_t chunkLen =
2775             std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2776         left -= chunkLen;
2777         uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
2778 
2779         // Send the chunk
2780         // Note that these might end up being deferred and queued.
2781         LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u", chunkLen,
2782              left, len, ppid));
2783         int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
2784         if (error) {
2785           LOG(("*** send chunk fail %d", error));
2786           return error;
2787         }
2788 
2789         // Update data position
2790         data += chunkLen;
2791       }
2792 
2793       // Sending chunks complete
2794       LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
2795            (size_t)(len + DATA_CHANNEL_MAX_BINARY_FRAGMENT - 1) /
2796                DATA_CHANNEL_MAX_BINARY_FRAGMENT));
2797       return 0;
2798     }
2799 
2800     // Cannot do PPID-based fragmentaton on unreliable channels
2801     NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2802                          "Sending too-large data on unreliable channel!");
2803   } else {
2804     if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
2805       LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len,
2806            mMaxMessageSize));
2807       return EMSGSIZE;
2808     }
2809   }
2810 
2811   // This will use EOR-based fragmentation if the message is too large (> 64
2812   // KiB)
2813   return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
2814 }
2815 
2816 class ReadBlobRunnable : public Runnable {
2817  public:
2818   ReadBlobRunnable(DataChannelConnection *aConnection, uint16_t aStream,
2819                    nsIInputStream *aBlob)
2820       : Runnable("ReadBlobRunnable"),
2821         mConnection(aConnection),
2822         mStream(aStream),
2823         mBlob(aBlob) {}
2824 
2825   NS_IMETHOD Run() override {
2826     // ReadBlob() is responsible to releasing the reference
2827     DataChannelConnection *self = mConnection;
2828     self->ReadBlob(mConnection.forget(), mStream, mBlob);
2829     return NS_OK;
2830   }
2831 
2832  private:
2833   // Make sure the Connection doesn't die while there are jobs outstanding.
2834   // Let it die (if released by PeerConnectionImpl while we're running)
2835   // when we send our runnable back to MainThread.  Then ~DataChannelConnection
2836   // can send the IOThread to MainThread to die in a runnable, avoiding
2837   // unsafe event loop recursion.  Evil.
2838   RefPtr<DataChannelConnection> mConnection;
2839   uint16_t mStream;
2840   // Use RefCount for preventing the object is deleted when SendBlob returns.
2841   RefPtr<nsIInputStream> mBlob;
2842 };
2843 
2844 // Returns a POSIX error code.
2845 int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) {
2846   DataChannel *channel = mStreams[stream];
2847   if (NS_WARN_IF(!channel)) {
2848     return EINVAL;  // TODO: Find a better error code
2849   }
2850 
2851   // Spawn a thread to send the data
2852   if (!mInternalIOThread) {
2853     nsresult rv =
2854         NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
2855     if (NS_FAILED(rv)) {
2856       return EINVAL;  // TODO: Find a better error code
2857     }
2858   }
2859 
2860   mInternalIOThread->Dispatch(
2861       do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2862   return 0;
2863 }
2864 
2865 class DataChannelBlobSendRunnable : public Runnable {
2866  public:
2867   DataChannelBlobSendRunnable(
2868       already_AddRefed<DataChannelConnection> &aConnection, uint16_t aStream)
2869       : Runnable("DataChannelBlobSendRunnable"),
2870         mConnection(aConnection),
2871         mStream(aStream) {}
2872 
2873   ~DataChannelBlobSendRunnable() override {
2874     if (!NS_IsMainThread() && mConnection) {
2875       MOZ_ASSERT(false);
2876       // explicitly leak the connection if destroyed off mainthread
2877       Unused << mConnection.forget().take();
2878     }
2879   }
2880 
2881   NS_IMETHOD Run() override {
2882     ASSERT_WEBRTC(NS_IsMainThread());
2883 
2884     mConnection->SendBinaryMsg(mStream, mData);
2885     mConnection = nullptr;
2886     return NS_OK;
2887   }
2888 
2889   // explicitly public so we can avoid allocating twice and copying
2890   nsCString mData;
2891 
2892  private:
2893   // Note: we can be destroyed off the target thread, so be careful not to let
2894   // this get Released()ed on the temp thread!
2895   RefPtr<DataChannelConnection> mConnection;
2896   uint16_t mStream;
2897 };
2898 
2899 void DataChannelConnection::ReadBlob(
2900     already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
2901     nsIInputStream *aBlob) {
2902   // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2903   // it off mainthread; if PeerConnectionImpl has released then we want
2904   // ~DataChannelConnection() to run on MainThread
2905 
2906   // XXX to do this safely, we must enqueue these atomically onto the
2907   // output socket.  We need a sender thread(s?) to enqueue data into the
2908   // socket and to avoid main-thread IO that might block.  Even on a
2909   // background thread, we may not want to block on one stream's data.
2910   // I.e. run non-blocking and service multiple channels.
2911 
2912   // Must not let Dispatching it cause the DataChannelConnection to get
2913   // released on the wrong thread.  Using
2914   // WrapRunnable(RefPtr<DataChannelConnection>(aThis),... will occasionally
2915   // cause aThis to get released on this thread.  Also, an explicit Runnable
2916   // lets us avoid copying the blob data an extra time.
2917   RefPtr<DataChannelBlobSendRunnable> runnable =
2918       new DataChannelBlobSendRunnable(aThis, aStream);
2919   // avoid copying the blob data by passing the mData from the runnable
2920   if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) {
2921     // Bug 966602:  Doesn't return an error to the caller via onerror.
2922     // We must release DataChannelConnection on MainThread to avoid issues (bug
2923     // 876167) aThis is now owned by the runnable; release it there
2924     NS_ReleaseOnMainThreadSystemGroup("DataChannelBlobSendRunnable",
2925                                       runnable.forget());
2926     return;
2927   }
2928   aBlob->Close();
2929   Dispatch(runnable.forget());
2930 }
2931 
2932 void DataChannelConnection::GetStreamIds(std::vector<uint16_t> *aStreamList) {
2933   ASSERT_WEBRTC(NS_IsMainThread());
2934   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2935     if (mStreams[i]) {
2936       aStreamList->push_back(mStreams[i]->mStream);
2937     }
2938   }
2939 }
2940 
2941 // Returns a POSIX error code.
2942 int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
2943                                              const nsACString &aMsg,
2944                                              bool isBinary) {
2945   ASSERT_WEBRTC(NS_IsMainThread());
2946   // We really could allow this from other threads, so long as we deal with
2947   // asynchronosity issues with channels closing, in particular access to
2948   // mStreams, and issues with the association closing (access to mSocket).
2949 
2950   const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
2951   uint32_t len = aMsg.Length();
2952 #if (UINT32_MAX > SIZE_MAX)
2953   if (len > SIZE_MAX) {
2954     return EMSGSIZE;
2955   }
2956 #endif
2957   DataChannel *channelPtr;
2958 
2959   LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream,
2960        len));
2961   // XXX if we want more efficiency, translate flags once at open time
2962   channelPtr = mStreams[stream];
2963   if (NS_WARN_IF(!channelPtr)) {
2964     return EINVAL;  // TODO: Find a better error code
2965   }
2966 
2967   auto &channel = *channelPtr;
2968 
2969   if (isBinary) {
2970     return SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_BINARY_PARTIAL,
2971                        DATA_CHANNEL_PPID_BINARY);
2972   } else {
2973     return SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING_PARTIAL,
2974                        DATA_CHANNEL_PPID_DOMSTRING);
2975   }
2976 }
2977 
2978 void DataChannelConnection::Stop() {
2979   // Note: This will call 'CloseAll' from the main thread
2980   Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2981       DataChannelOnMessageAvailable::ON_DISCONNECTED, this)));
2982 }
2983 
2984 void DataChannelConnection::Close(DataChannel *aChannel) {
2985   MutexAutoLock lock(mLock);
2986   CloseInt(aChannel);
2987 }
2988 
2989 // So we can call Close() with the lock already held
2990 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2991 void DataChannelConnection::CloseInt(DataChannel *aChannel) {
2992   MOZ_ASSERT(aChannel);
2993   RefPtr<DataChannel> channel(aChannel);  // make sure it doesn't go away on us
2994 
2995   mLock.AssertCurrentThreadOwns();
2996   LOG(("Connection %p/Channel %p: Closing stream %u",
2997        channel->mConnection.get(), channel.get(), channel->mStream));
2998   // re-test since it may have closed before the lock was grabbed
2999   if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
3000     LOG(("Channel already closing/closed (%u)", aChannel->mState));
3001     if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
3002       // called from CloseAll()
3003       // we're not going to hang around waiting any more
3004       mStreams[channel->mStream] = nullptr;
3005     }
3006     return;
3007   }
3008   aChannel->mBufferedData.Clear();
3009   if (channel->mStream != INVALID_STREAM) {
3010     ResetOutgoingStream(channel->mStream);
3011     if (mState == CLOSED) {  // called from CloseAll()
3012       // Let resets accumulate then send all at once in CloseAll()
3013       // we're not going to hang around waiting
3014       mStreams[channel->mStream] = nullptr;
3015     } else {
3016       SendOutgoingStreamReset();
3017     }
3018   }
3019   aChannel->mState = CLOSING;
3020   if (mState == CLOSED) {
3021     // we're not going to hang around waiting
3022     channel->StreamClosedLocked();
3023   }
3024   // At this point when we leave here, the object is a zombie held alive only by
3025   // the DOM object
3026 }
3027 
3028 void DataChannelConnection::CloseAll() {
3029   LOG(("Closing all channels (connection %p)", (void *)this));
3030   // Don't need to lock here
3031 
3032   // Make sure no more channels will be opened
3033   {
3034     MutexAutoLock lock(mLock);
3035     mState = CLOSED;
3036   }
3037 
3038   // Close current channels
3039   // If there are runnables, they hold a strong ref and keep the channel
3040   // and/or connection alive (even if in a CLOSED state)
3041   bool closed_some = false;
3042   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
3043     if (mStreams[i]) {
3044       mStreams[i]->Close();
3045       closed_some = true;
3046     }
3047   }
3048 
3049   // Clean up any pending opens for channels
3050   RefPtr<DataChannel> channel;
3051   while (nullptr != (channel = dont_AddRef(
3052                          static_cast<DataChannel *>(mPending.PopFront())))) {
3053     LOG(("closing pending channel %p, stream %u", channel.get(),
3054          channel->mStream));
3055     channel->Close();  // also releases the ref on each iteration
3056     closed_some = true;
3057   }
3058   // It's more efficient to let the Resets queue in shutdown and then
3059   // SendOutgoingStreamReset() here.
3060   if (closed_some) {
3061     MutexAutoLock lock(mLock);
3062     SendOutgoingStreamReset();
3063   }
3064 }
3065 
3066 DataChannel::~DataChannel() {
3067   // NS_ASSERTION since this is more "I think I caught all the cases that
3068   // can cause this" than a true kill-the-program assertion.  If this is
3069   // wrong, nothing bad happens.  A worst it's a leak.
3070   NS_ASSERTION(mState == CLOSED || mState == CLOSING,
3071                "unexpected state in ~DataChannel");
3072 }
3073 
3074 void DataChannel::Close() {
3075   if (mConnection) {
3076     // ensure we don't get deleted
3077     RefPtr<DataChannelConnection> connection(mConnection);
3078     connection->Close(this);
3079   }
3080 }
3081 
3082 // Used when disconnecting from the DataChannelConnection
3083 void DataChannel::StreamClosedLocked() {
3084   mConnection->mLock.AssertCurrentThreadOwns();
3085   ENSURE_DATACONNECTION;
3086 
3087   LOG(("Destroying Data channel %u", mStream));
3088   MOZ_ASSERT_IF(mStream != INVALID_STREAM,
3089                 !mConnection->FindChannelByStream(mStream));
3090   mStream = INVALID_STREAM;
3091   mState = CLOSED;
3092   mMainThreadEventTarget->Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
3093       DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, mConnection, this)));
3094   // We leave mConnection live until the DOM releases us, to avoid races
3095 }
3096 
3097 void DataChannel::ReleaseConnection() {
3098   ASSERT_WEBRTC(NS_IsMainThread());
3099   mConnection = nullptr;
3100 }
3101 
3102 void DataChannel::SetListener(DataChannelListener *aListener,
3103                               nsISupports *aContext) {
3104   MutexAutoLock mLock(mListenerLock);
3105   mContext = aContext;
3106   mListener = aListener;
3107 }
3108 
3109 void DataChannel::SendErrnoToErrorResult(int error, ErrorResult &aRv) {
3110   switch (error) {
3111     case 0:
3112       break;
3113     case EMSGSIZE:
3114       aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
3115       break;
3116     default:
3117       aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
3118       break;
3119   }
3120 }
3121 
3122 void DataChannel::SendMsg(const nsACString &aMsg, ErrorResult &aRv) {
3123   if (!EnsureValidStream(aRv)) {
3124     return;
3125   }
3126 
3127   SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
3128 }
3129 
3130 void DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult &aRv) {
3131   if (!EnsureValidStream(aRv)) {
3132     return;
3133   }
3134 
3135   SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
3136 }
3137 
3138 void DataChannel::SendBinaryStream(nsIInputStream *aBlob, ErrorResult &aRv) {
3139   if (!EnsureValidStream(aRv)) {
3140     return;
3141   }
3142 
3143   SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
3144 }
3145 
3146 // May be called from another (i.e. Main) thread!
3147 void DataChannel::AppReady() {
3148   ENSURE_DATACONNECTION;
3149 
3150   MutexAutoLock lock(mConnection->mLock);
3151 
3152   mFlags |= DATA_CHANNEL_FLAGS_READY;
3153   if (mState == WAITING_TO_OPEN) {
3154     mState = OPEN;
3155     mMainThreadEventTarget->Dispatch(
3156         do_AddRef(new DataChannelOnMessageAvailable(
3157             DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
3158             this)));
3159     for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
3160       nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
3161       MOZ_ASSERT(runnable);
3162       mMainThreadEventTarget->Dispatch(runnable.forget());
3163     }
3164   } else {
3165     NS_ASSERTION(mQueuedMessages.IsEmpty(),
3166                  "Shouldn't have queued messages if not WAITING_TO_OPEN");
3167   }
3168   mQueuedMessages.Clear();
3169   mQueuedMessages.Compact();
3170   // We never use it again...  We could even allocate the array in the odd
3171   // cases we need it.
3172 }
3173 
3174 size_t DataChannel::GetBufferedAmountLocked() const {
3175   size_t buffered = 0;
3176 
3177   for (auto &msg : mBufferedData) {
3178     buffered += msg->GetLeft();
3179   }
3180   // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
3181   // amount from the SCTP stack for a single stream.  It is on their to-do
3182   // list, and once we import a stack with support for that, we'll need to
3183   // add it to what we buffer.  Also we'll need to ask for notification of a
3184   // per- stream buffer-low event and merge that into the handling of buffer-low
3185   // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
3186 
3187   return buffered;
3188 }
3189 
3190 uint32_t DataChannel::GetBufferedAmountLowThreshold() {
3191   return mBufferedThreshold;
3192 }
3193 
3194 // Never fire immediately, as it's defined to fire on transitions, not state
3195 void DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) {
3196   mBufferedThreshold = aThreshold;
3197 }
3198 
3199 // Called with mLock locked!
3200 void DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) {
3201   if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
3202       (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
3203     mQueuedMessages.AppendElement(aMessage);
3204   } else {
3205     nsCOMPtr<nsIRunnable> runnable = aMessage;
3206     mMainThreadEventTarget->Dispatch(runnable.forget());
3207   }
3208 }
3209 
3210 bool DataChannel::EnsureValidStream(ErrorResult &aRv) {
3211   MOZ_ASSERT(mConnection);
3212   if (mConnection && mStream != INVALID_STREAM) {
3213     return true;
3214   } else {
3215     aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
3216     return false;
3217   }
3218 }
3219 
3220 }  // namespace mozilla
3221