1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include <algorithm>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #if !defined(__Userspace_os_Windows)
11 #include <arpa/inet.h>
12 #endif
13 // usrsctp.h expects to have errno definitions prior to its inclusion.
14 #include <errno.h>
15
16 #define SCTP_DEBUG 1
17 #define SCTP_STDINT_INCLUDE <stdint.h>
18
19 #ifdef _MSC_VER
20 // Disable "warning C4200: nonstandard extension used : zero-sized array in
21 // struct/union"
22 // ...which the third-party file usrsctp.h runs afoul of.
23 #pragma warning(push)
24 #pragma warning(disable : 4200)
25 #endif
26
27 #include "usrsctp.h"
28
29 #ifdef _MSC_VER
30 #pragma warning(pop)
31 #endif
32
33 #include "DataChannelLog.h"
34
35 #include "nsServiceManagerUtils.h"
36 #include "nsIObserverService.h"
37 #include "nsIObserver.h"
38 #include "nsIPrefBranch.h"
39 #include "nsIPrefService.h"
40 #include "mozilla/Services.h"
41 #include "mozilla/Sprintf.h"
42 #include "nsProxyRelease.h"
43 #include "nsThread.h"
44 #include "nsThreadUtils.h"
45 #include "nsAutoPtr.h"
46 #include "nsNetUtil.h"
47 #include "nsNetCID.h"
48 #include "mozilla/StaticPtr.h"
49 #include "mozilla/StaticMutex.h"
50 #include "mozilla/Unused.h"
51 #ifdef MOZ_PEERCONNECTION
52 #include "mtransport/runnable_utils.h"
53 #endif
54
55 #define DATACHANNEL_LOG(args) LOG(args)
56 #include "DataChannel.h"
57 #include "DataChannelProtocol.h"
58
59 // Let us turn on and off important assertions in non-debug builds
60 #ifdef DEBUG
61 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
62 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
63 #define ASSERT_WEBRTC(x) \
64 do { \
65 if (!(x)) { \
66 MOZ_CRASH(); \
67 } \
68 } while (0)
69 #endif
70
71 static bool sctp_initialized;
72
73 namespace mozilla {
74
75 LazyLogModule gDataChannelLog("DataChannel");
76 static LazyLogModule gSCTPLog("SCTP");
77
78 #define SCTP_LOG(args) \
79 MOZ_LOG(mozilla::gSCTPLog, mozilla::LogLevel::Debug, args)
80
81 class DataChannelConnectionShutdown : public nsITimerCallback {
82 public:
DataChannelConnectionShutdown(DataChannelConnection * aConnection)83 explicit DataChannelConnectionShutdown(DataChannelConnection *aConnection)
84 : mConnection(aConnection) {
85 mTimer = NS_NewTimer(); // we'll crash if this fails
86 mTimer->InitWithCallback(this, 30 * 1000, nsITimer::TYPE_ONE_SHOT);
87 }
88
89 NS_IMETHODIMP Notify(nsITimer *aTimer) override;
90
91 NS_DECL_THREADSAFE_ISUPPORTS
92
93 private:
~DataChannelConnectionShutdown()94 virtual ~DataChannelConnectionShutdown() { mTimer->Cancel(); }
95
96 RefPtr<DataChannelConnection> mConnection;
97 nsCOMPtr<nsITimer> mTimer;
98 };
99
100 class DataChannelShutdown;
101
102 StaticRefPtr<DataChannelShutdown> sDataChannelShutdown;
103
104 class DataChannelShutdown : public nsIObserver {
105 public:
106 // This needs to be tied to some object that is guaranteed to be
107 // around (singleton likely) unless we want to shutdown sctp whenever
108 // we're not using it (and in which case we'd keep a refcnt'd object
109 // ref'd by each DataChannelConnection to release the SCTP usrlib via
110 // sctp_finish). Right now, the single instance of this class is
111 // owned by the observer service and a StaticRefPtr.
112
113 NS_DECL_ISUPPORTS
114
DataChannelShutdown()115 DataChannelShutdown() {}
116
Init()117 void Init() {
118 nsCOMPtr<nsIObserverService> observerService =
119 mozilla::services::GetObserverService();
120 if (!observerService) return;
121
122 nsresult rv =
123 observerService->AddObserver(this, "xpcom-will-shutdown", false);
124 MOZ_ASSERT(rv == NS_OK);
125 (void)rv;
126 }
127
Observe(nsISupports * aSubject,const char * aTopic,const char16_t * aData)128 NS_IMETHOD Observe(nsISupports *aSubject, const char *aTopic,
129 const char16_t *aData) override {
130 // Note: MainThread
131 if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
132 LOG(("Shutting down SCTP"));
133 if (sctp_initialized) {
134 usrsctp_finish();
135 sctp_initialized = false;
136 }
137 nsCOMPtr<nsIObserverService> observerService =
138 mozilla::services::GetObserverService();
139 if (!observerService) return NS_ERROR_FAILURE;
140
141 nsresult rv =
142 observerService->RemoveObserver(this, "xpcom-will-shutdown");
143 MOZ_ASSERT(rv == NS_OK);
144 (void)rv;
145
146 {
147 StaticMutexAutoLock lock(sLock);
148 sConnections = nullptr; // clears as well
149 }
150 sDataChannelShutdown = nullptr;
151 }
152 return NS_OK;
153 }
154
CreateConnectionShutdown(DataChannelConnection * aConnection)155 void CreateConnectionShutdown(DataChannelConnection *aConnection) {
156 StaticMutexAutoLock lock(sLock);
157 if (!sConnections) {
158 sConnections = new nsTArray<RefPtr<DataChannelConnectionShutdown>>();
159 }
160 sConnections->AppendElement(new DataChannelConnectionShutdown(aConnection));
161 }
162
RemoveConnectionShutdown(DataChannelConnectionShutdown * aConnectionShutdown)163 void RemoveConnectionShutdown(
164 DataChannelConnectionShutdown *aConnectionShutdown) {
165 StaticMutexAutoLock lock(sLock);
166 if (sConnections) {
167 sConnections->RemoveElement(aConnectionShutdown);
168 }
169 }
170
171 private:
172 // The only instance of DataChannelShutdown is owned by the observer
173 // service, so there is no need to call RemoveObserver here.
174 virtual ~DataChannelShutdown() = default;
175
176 // protects sConnections
177 static StaticMutex sLock;
178 static StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>>
179 sConnections;
180 };
181
182 StaticMutex DataChannelShutdown::sLock;
183 StaticAutoPtr<nsTArray<RefPtr<DataChannelConnectionShutdown>>>
184 DataChannelShutdown::sConnections;
185
186 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
187
NS_IMPL_ISUPPORTS(DataChannelConnectionShutdown,nsITimerCallback)188 NS_IMPL_ISUPPORTS(DataChannelConnectionShutdown, nsITimerCallback)
189
190 NS_IMETHODIMP
191 DataChannelConnectionShutdown::Notify(nsITimer *aTimer) {
192 // safely release reference to ourself
193 RefPtr<DataChannelConnectionShutdown> grip(this);
194 // Might not be set. We don't actually use the |this| pointer in
195 // RemoveConnectionShutdown right now, which makes this a bit gratuitous
196 // anyway...
197 if (sDataChannelShutdown) {
198 sDataChannelShutdown->RemoveConnectionShutdown(this);
199 }
200 return NS_OK;
201 }
202
OutgoingMsg(struct sctp_sendv_spa & info,const uint8_t * data,size_t length)203 OutgoingMsg::OutgoingMsg(struct sctp_sendv_spa &info, const uint8_t *data,
204 size_t length)
205 : mLength(length), mData(data) {
206 mInfo = &info;
207 mPos = 0;
208 }
209
Advance(size_t offset)210 void OutgoingMsg::Advance(size_t offset) {
211 mPos += offset;
212 if (mPos > mLength) {
213 mPos = mLength;
214 }
215 }
216
BufferedOutgoingMsg(OutgoingMsg & msg)217 BufferedOutgoingMsg::BufferedOutgoingMsg(OutgoingMsg &msg) {
218 size_t length = msg.GetLeft();
219 auto *tmp = new uint8_t[length]; // infallible malloc!
220 memcpy(tmp, msg.GetData(), length);
221 mLength = length;
222 mData = tmp;
223 mInfo = new sctp_sendv_spa;
224 *mInfo = msg.GetInfo();
225 mPos = 0;
226 }
227
~BufferedOutgoingMsg()228 BufferedOutgoingMsg::~BufferedOutgoingMsg() {
229 delete mInfo;
230 delete mData;
231 }
232
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv,int flags,void * ulp_info)233 static int receive_cb(struct socket *sock, union sctp_sockstore addr,
234 void *data, size_t datalen, struct sctp_rcvinfo rcv,
235 int flags, void *ulp_info) {
236 DataChannelConnection *connection =
237 static_cast<DataChannelConnection *>(ulp_info);
238 return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
239 }
240
GetConnectionFromSocket(struct socket * sock)241 static DataChannelConnection *GetConnectionFromSocket(struct socket *sock) {
242 struct sockaddr *addrs = nullptr;
243 int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
244 if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
245 return nullptr;
246 }
247 // usrsctp_getladdrs() returns the addresses bound to this socket, which
248 // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
249 // then free the list of addresses once we have the pointer. We only open
250 // AF_CONN sockets, and they should all have the sconn_addr set to the
251 // pointer that created them, so [0] is as good as any other.
252 struct sockaddr_conn *sconn =
253 reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
254 DataChannelConnection *connection =
255 reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
256 usrsctp_freeladdrs(addrs);
257
258 return connection;
259 }
260
261 // called when the buffer empties to the threshold value
threshold_event(struct socket * sock,uint32_t sb_free)262 static int threshold_event(struct socket *sock, uint32_t sb_free) {
263 DataChannelConnection *connection = GetConnectionFromSocket(sock);
264 if (connection) {
265 connection->SendDeferredMessages();
266 } else {
267 LOG(("Can't find connection for socket %p", sock));
268 }
269 return 0;
270 }
271
debug_printf(const char * format,...)272 static void debug_printf(const char *format, ...) {
273 va_list ap;
274 char buffer[1024];
275
276 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
277 va_start(ap, format);
278 #ifdef _WIN32
279 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
280 #else
281 if (VsprintfLiteral(buffer, format, ap) > 0) {
282 #endif
283 SCTP_LOG(("%s", buffer));
284 }
285 va_end(ap);
286 }
287 }
288
289 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener,
290 nsIEventTarget *aTarget)
291 : NeckoTargetHolder(aTarget),
292 mLock("netwerk::sctp::DataChannelConnection") {
293 mCurrentStream = 0;
294 mState = CLOSED;
295 mSocket = nullptr;
296 mMasterSocket = nullptr;
297 mListener = listener;
298 mLocalPort = 0;
299 mRemotePort = 0;
300 mPendingType = PENDING_NONE;
301 LOG(("Constructor DataChannelConnection=%p, listener=%p", this,
302 mListener.get()));
303 mInternalIOThread = nullptr;
304 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
305 mShutdown = false;
306 #endif
307 }
308
309 DataChannelConnection::~DataChannelConnection() {
310 LOG(("Deleting DataChannelConnection %p", (void *)this));
311 // This may die on the MainThread, or on the STS thread
312 ASSERT_WEBRTC(mState == CLOSED);
313 MOZ_ASSERT(!mMasterSocket);
314 MOZ_ASSERT(mPending.GetSize() == 0);
315 MOZ_ASSERT(!mTransportFlow);
316
317 // Already disconnected from sigslot/mTransportFlow
318 // TransportFlows must be released from the STS thread
319 if (!IsSTSThread()) {
320 ASSERT_WEBRTC(NS_IsMainThread());
321
322 if (mInternalIOThread) {
323 // Avoid spinning the event thread from here (which if we're mainthread
324 // is in the event loop already)
325 nsCOMPtr<nsIRunnable> r = WrapRunnable(
326 nsCOMPtr<nsIThread>(mInternalIOThread), &nsIThread::Shutdown);
327 Dispatch(r.forget());
328 }
329 } else {
330 // on STS, safe to call shutdown
331 if (mInternalIOThread) {
332 mInternalIOThread->Shutdown();
333 }
334 }
335 }
336
337 void DataChannelConnection::Destroy() {
338 // Though it's probably ok to do this and close the sockets;
339 // if we really want it to do true clean shutdowns it can
340 // create a dependant Internal object that would remain around
341 // until the network shut down the association or timed out.
342 LOG(("Destroying DataChannelConnection %p", (void *)this));
343 ASSERT_WEBRTC(NS_IsMainThread());
344 CloseAll();
345
346 MutexAutoLock lock(mLock);
347 // If we had a pending reset, we aren't waiting for it - clear the list so
348 // we can deregister this DataChannelConnection without leaking.
349 ClearResets();
350
351 MOZ_ASSERT(mSTS);
352 ASSERT_WEBRTC(NS_IsMainThread());
353 mListener = nullptr;
354 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
355 // the usrsctp_close() calls can move back here (and just proxy the
356 // disconnect_all())
357 RUN_ON_THREAD(mSTS,
358 WrapRunnable(RefPtr<DataChannelConnection>(this),
359 &DataChannelConnection::DestroyOnSTS, mSocket,
360 mMasterSocket),
361 NS_DISPATCH_NORMAL);
362
363 // These will be released on STS
364 mSocket = nullptr;
365 mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
366
367 // We can't get any more new callbacks from the SCTP library
368 // All existing callbacks have refs to DataChannelConnection
369
370 // nsDOMDataChannel objects have refs to DataChannels that have refs to us
371 }
372
373 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
374 struct socket *aSocket) {
375 if (aSocket && aSocket != aMasterSocket) usrsctp_close(aSocket);
376 if (aMasterSocket) usrsctp_close(aMasterSocket);
377
378 usrsctp_deregister_address(static_cast<void *>(this));
379 LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
380 #ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
381 mShutdown = true;
382 #endif
383
384 disconnect_all();
385
386 // we may have queued packet sends on STS after this; dispatch to ourselves
387 // before finishing here so we can be sure there aren't anymore runnables
388 // active that can try to touch the flow. DON'T use RUN_ON_THREAD, it
389 // queue-jumps!
390 mSTS->Dispatch(WrapRunnable(RefPtr<DataChannelConnection>(this),
391 &DataChannelConnection::DestroyOnSTSFinal),
392 NS_DISPATCH_NORMAL);
393 }
394
395 void DataChannelConnection::DestroyOnSTSFinal() {
396 mTransportFlow = nullptr;
397 sDataChannelShutdown->CreateConnectionShutdown(this);
398 }
399
400 bool DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams,
401 bool aMaxMessageSizeSet,
402 uint64_t aMaxMessageSize) {
403 struct sctp_initmsg initmsg;
404 struct sctp_assoc_value av;
405 struct sctp_event event;
406 socklen_t len;
407
408 uint16_t event_types[] = {
409 SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
410 SCTP_REMOTE_ERROR, SCTP_SHUTDOWN_EVENT,
411 SCTP_ADAPTATION_INDICATION, SCTP_PARTIAL_DELIVERY_EVENT,
412 SCTP_SEND_FAILED_EVENT, SCTP_STREAM_RESET_EVENT,
413 SCTP_STREAM_CHANGE_EVENT};
414 {
415 ASSERT_WEBRTC(NS_IsMainThread());
416 // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
417
418 mSendInterleaved = false;
419 mPpidFragmentation = false;
420 mMaxMessageSizeSet = false;
421 SetMaxMessageSize(aMaxMessageSizeSet, aMaxMessageSize);
422
423 if (!sctp_initialized) {
424 LOG(("sctp_init"));
425 #ifdef MOZ_PEERCONNECTION
426 usrsctp_init(0, DataChannelConnection::SctpDtlsOutput, debug_printf);
427 #else
428 MOZ_CRASH("Trying to use SCTP/DTLS without mtransport");
429 #endif
430
431 // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
432 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
433 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
434 }
435
436 // Do not send ABORTs in response to INITs (1).
437 // Do not send ABORTs for received Out of the Blue packets (2).
438 usrsctp_sysctl_set_sctp_blackhole(2);
439
440 // Disable the Explicit Congestion Notification extension (currently not
441 // supported by the Firefox code)
442 usrsctp_sysctl_set_sctp_ecn_enable(0);
443
444 // Enable interleaving messages for different streams (incoming)
445 // See: https://tools.ietf.org/html/rfc6458#section-8.1.20
446 usrsctp_sysctl_set_sctp_default_frag_interleave(2);
447
448 sctp_initialized = true;
449
450 sDataChannelShutdown = new DataChannelShutdown();
451 sDataChannelShutdown->Init();
452 }
453 }
454
455 // XXX FIX! make this a global we get once
456 // Find the STS thread
457 nsresult rv;
458 mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
459 MOZ_ASSERT(NS_SUCCEEDED(rv));
460
461 // Open sctp with a callback
462 if ((mMasterSocket = usrsctp_socket(
463 AF_CONN, SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
464 usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
465 return false;
466 }
467
468 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
469 // in associations for normal IO
470 if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
471 LOG(("Couldn't set non_blocking on SCTP socket"));
472 // We can't handle connect() safely if it will block, not that this will
473 // even happen.
474 goto error_cleanup;
475 }
476
477 // Make sure when we close the socket, make sure it doesn't call us back
478 // again! This would cause it try to use an invalid DataChannelConnection
479 // pointer
480 struct linger l;
481 l.l_onoff = 1;
482 l.l_linger = 0;
483 if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER, (const void *)&l,
484 (socklen_t)sizeof(struct linger)) < 0) {
485 LOG(("Couldn't set SO_LINGER on SCTP socket"));
486 // unsafe to allow it to continue if this fails
487 goto error_cleanup;
488 }
489
490 // XXX Consider disabling this when we add proper SDP negotiation.
491 // We may want to leave enabled for supporting 'cloning' of SDP offers, which
492 // implies re-use of the same pseudo-port number, or forcing a renegotiation.
493 {
494 const int option_value = 1;
495 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
496 (const void *)&option_value,
497 (socklen_t)sizeof(option_value)) < 0) {
498 LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
499 }
500 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
501 (const void *)&option_value,
502 (socklen_t)sizeof(option_value)) < 0) {
503 LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
504 }
505 }
506
507 // Set explicit EOR
508 {
509 const int option_value = 1;
510 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EXPLICIT_EOR,
511 (const void *)&option_value,
512 (socklen_t)sizeof(option_value)) < 0) {
513 LOG(("*** failed enable explicit EOR mode %d", errno));
514 goto error_cleanup;
515 }
516 }
517
518 // Enable ndata
519 // TODO: Bug 1381145, enable this once ndata has been deployed
520 #if 0
521 av.assoc_id = SCTP_FUTURE_ASSOC;
522 av.assoc_value = 1;
523 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INTERLEAVING_SUPPORTED, &av,
524 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
525 LOG(("*** failed enable ndata errno %d", errno));
526 goto error_cleanup;
527 }
528 #endif
529
530 av.assoc_id = SCTP_ALL_ASSOC;
531 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
532 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
533 &av, (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
534 LOG(("*** failed enable stream reset errno %d", errno));
535 goto error_cleanup;
536 }
537
538 /* Enable the events of interest. */
539 memset(&event, 0, sizeof(event));
540 event.se_assoc_id = SCTP_ALL_ASSOC;
541 event.se_on = 1;
542 for (unsigned short event_type : event_types) {
543 event.se_type = event_type;
544 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event,
545 sizeof(event)) < 0) {
546 LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
547 goto error_cleanup;
548 }
549 }
550
551 // Update number of streams
552 mStreams.AppendElements(aNumStreams);
553 for (uint32_t i = 0; i < aNumStreams; ++i) {
554 mStreams[i] = nullptr;
555 }
556 memset(&initmsg, 0, sizeof(initmsg));
557 len = sizeof(initmsg);
558 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
559 &len) < 0) {
560 LOG(("*** failed getsockopt SCTP_INITMSG"));
561 goto error_cleanup;
562 }
563 LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
564 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
565 initmsg.sinit_num_ostreams = aNumStreams;
566 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
567 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
568 (socklen_t)sizeof(initmsg)) < 0) {
569 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
570 goto error_cleanup;
571 }
572
573 mSocket = nullptr;
574 usrsctp_register_address(static_cast<void *>(this));
575 LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
576 return true;
577
578 error_cleanup:
579 usrsctp_close(mMasterSocket);
580 mMasterSocket = nullptr;
581 return false;
582 }
583
584 void DataChannelConnection::SetMaxMessageSize(bool aMaxMessageSizeSet,
585 uint64_t aMaxMessageSize) {
586 MutexAutoLock lock(mLock); // TODO: Needed?
587
588 if (mMaxMessageSizeSet && !aMaxMessageSizeSet) {
589 // Don't overwrite already set MMS with default values
590 return;
591 }
592
593 mMaxMessageSizeSet = aMaxMessageSizeSet;
594 mMaxMessageSize = aMaxMessageSize;
595
596 bool ppidFragmentationEnforced = false;
597 nsresult rv;
598 nsCOMPtr<nsIPrefService> prefs =
599 do_GetService("@mozilla.org/preferences-service;1", &rv);
600 if (!NS_WARN_IF(NS_FAILED(rv))) {
601 nsCOMPtr<nsIPrefBranch> branch = do_QueryInterface(prefs);
602
603 if (branch) {
604 if (!NS_FAILED(branch->GetBoolPref(
605 "media.peerconnection.sctp.force_ppid_fragmentation",
606 &mPpidFragmentation))) {
607 // Ensure that forced on/off PPID fragmentation does not get overridden
608 // when Firefox has been detected.
609 mMaxMessageSizeSet = true;
610 ppidFragmentationEnforced = true;
611 }
612
613 int32_t temp;
614 if (!NS_FAILED(branch->GetIntPref(
615 "media.peerconnection.sctp.force_maximum_message_size", &temp))) {
616 if (temp >= 0) {
617 mMaxMessageSize = (uint64_t)temp;
618 }
619 }
620 }
621 }
622
623 // Fix remote MMS. This code exists, so future implementations of
624 // RTCSctpTransport.maxMessageSize can simply provide that value from
625 // GetMaxMessageSize.
626
627 // TODO: Bug 1382779, once resolved, can be increased to
628 // min(Uint8ArrayMaxSize, UINT32_MAX)
629 // TODO: Bug 1381146, once resolved, can be increased to whatever we support
630 // then (hopefully
631 // SIZE_MAX)
632 if (mMaxMessageSize == 0 ||
633 mMaxMessageSize > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE) {
634 mMaxMessageSize = WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_REMOTE;
635 }
636
637 LOG(("Use PPID-based fragmentation/reassembly: %s (enforced=%s)",
638 mPpidFragmentation ? "yes" : "no",
639 ppidFragmentationEnforced ? "yes" : "no"));
640 LOG(("Maximum message size (outgoing data): %" PRIu64
641 " (set=%s, enforced=%s)",
642 mMaxMessageSize, mMaxMessageSizeSet ? "yes" : "no",
643 aMaxMessageSize != mMaxMessageSize ? "yes" : "no"));
644 }
645
646 uint64_t DataChannelConnection::GetMaxMessageSize() { return mMaxMessageSize; }
647
648 #ifdef MOZ_PEERCONNECTION
649 void DataChannelConnection::SetEvenOdd() {
650 ASSERT_WEBRTC(IsSTSThread());
651
652 TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
653 mTransportFlow->GetLayer(TransportLayerDtls::ID()));
654 MOZ_ASSERT(dtls); // DTLS is mandatory
655 mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
656 }
657
658 bool DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow,
659 uint16_t localport,
660 uint16_t remoteport) {
661 LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
662
663 NS_PRECONDITION(mMasterSocket,
664 "SCTP wasn't initialized before ConnectViaTransportFlow!");
665 if (NS_WARN_IF(!aFlow)) {
666 return false;
667 }
668
669 mTransportFlow = aFlow;
670 mLocalPort = localport;
671 mRemotePort = remoteport;
672 mState = CONNECTING;
673
674 RUN_ON_THREAD(mSTS,
675 WrapRunnable(RefPtr<DataChannelConnection>(this),
676 &DataChannelConnection::SetSignals),
677 NS_DISPATCH_NORMAL);
678 return true;
679 }
680
681 void DataChannelConnection::SetSignals() {
682 ASSERT_WEBRTC(IsSTSThread());
683 ASSERT_WEBRTC(mTransportFlow);
684 LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
685 mTransportFlow->SignalPacketReceived.connect(
686 this, &DataChannelConnection::SctpDtlsInput);
687 // SignalStateChange() doesn't call you with the initial state
688 mTransportFlow->SignalStateChange.connect(
689 this, &DataChannelConnection::CompleteConnect);
690 CompleteConnect(mTransportFlow, mTransportFlow->state());
691 }
692
693 void DataChannelConnection::CompleteConnect(TransportFlow *flow,
694 TransportLayer::State state) {
695 LOG(("Data transport state: %d", state));
696 MutexAutoLock lock(mLock);
697 ASSERT_WEBRTC(IsSTSThread());
698 // We should abort connection on TS_ERROR.
699 // Note however that the association will also fail (perhaps with a delay) and
700 // notify us in that way
701 if (state != TransportLayer::TS_OPEN || !mMasterSocket) return;
702
703 struct sockaddr_conn addr;
704 memset(&addr, 0, sizeof(addr));
705 addr.sconn_family = AF_CONN;
706 #if defined(__Userspace_os_Darwin)
707 addr.sconn_len = sizeof(addr);
708 #endif
709 addr.sconn_port = htons(mLocalPort);
710 addr.sconn_addr = static_cast<void *>(this);
711
712 LOG(("Calling usrsctp_bind"));
713 int r = usrsctp_bind(
714 mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr));
715 if (r < 0) {
716 LOG(("usrsctp_bind failed: %d", r));
717 } else {
718 // This is the remote addr
719 addr.sconn_port = htons(mRemotePort);
720 LOG(("Calling usrsctp_connect"));
721 r = usrsctp_connect(mMasterSocket,
722 reinterpret_cast<struct sockaddr *>(&addr),
723 sizeof(addr));
724 if (r >= 0 || errno == EINPROGRESS) {
725 struct sctp_paddrparams paddrparams;
726 socklen_t opt_len;
727
728 memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
729 memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
730 opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
731 r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
732 &paddrparams, &opt_len);
733 if (r < 0) {
734 LOG(("usrsctp_getsockopt failed: %d", r));
735 } else {
736 // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4
737 // 1200, IPV6 1280
738 paddrparams.spp_pathmtu = 1200; // safe for either
739 paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
740 paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
741 opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
742 r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP,
743 SCTP_PEER_ADDR_PARAMS, &paddrparams, opt_len);
744 if (r < 0) {
745 LOG(("usrsctp_getsockopt failed: %d", r));
746 } else {
747 LOG(("usrsctp: PMTUD disabled, MTU set to %u",
748 paddrparams.spp_pathmtu));
749 }
750 }
751 }
752 if (r < 0) {
753 if (errno == EINPROGRESS) {
754 // non-blocking
755 return;
756 }
757 LOG(("usrsctp_connect failed: %d", errno));
758 mState = CLOSED;
759 } else {
760 // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get
761 // that This also avoids issues with calling TransportFlow stuff on
762 // Mainthread
763 return;
764 }
765 }
766 // Note: currently this doesn't actually notify the application
767 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
768 DataChannelOnMessageAvailable::ON_CONNECTION, this)));
769 }
770
771 // Process any pending Opens
772 void DataChannelConnection::ProcessQueuedOpens() {
773 // The nsDeque holds channels with an AddRef applied. Another reference
774 // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
775 // references should exist.
776
777 // Can't copy nsDeque's. Move into temp array since any that fail will
778 // go back to mPending
779 nsDeque temp;
780 DataChannel *temp_channel; // really already_AddRefed<>
781 while (nullptr !=
782 (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
783 temp.Push(static_cast<void *>(temp_channel));
784 }
785
786 RefPtr<DataChannel> channel;
787 // All these entries have an AddRef(); make that explicit now via the
788 // dont_AddRef()
789 while (nullptr !=
790 (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
791 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
792 LOG(("Processing queued open for %p (%u)", channel.get(),
793 channel->mStream));
794 channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
795 // OpenFinish returns a reference itself, so we need to take it can
796 // Release it
797 channel = OpenFinish(channel.forget()); // may reset the flag and re-push
798 } else {
799 NS_ASSERTION(
800 false,
801 "How did a DataChannel get queued without the FINISH_OPEN flag?");
802 }
803 }
804 }
805 void DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
806 const unsigned char *data,
807 size_t len) {
808 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
809 char *buf;
810
811 if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) !=
812 nullptr) {
813 SCTP_LOG(("%s", buf));
814 usrsctp_freedumpbuffer(buf);
815 }
816 }
817 // Pass the data to SCTP
818 MutexAutoLock lock(mLock);
819 usrsctp_conninput(static_cast<void *>(this), data, len, 0);
820 }
821
822 int DataChannelConnection::SendPacket(unsigned char data[], size_t len,
823 bool release) {
824 // LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
825 int res = 0;
826 if (mTransportFlow) {
827 res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
828 }
829 if (release) delete[] data;
830 return res;
831 }
832
833 /* static */
834 int DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer,
835 size_t length, uint8_t tos,
836 uint8_t set_df) {
837 DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
838 int res;
839 MOZ_DIAGNOSTIC_ASSERT(!peer->mShutdown);
840
841 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
842 char *buf;
843
844 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) !=
845 nullptr) {
846 SCTP_LOG(("%s", buf));
847 usrsctp_freedumpbuffer(buf);
848 }
849 }
850 // We're async proxying even if on the STSThread because this is called
851 // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
852 // SCTP has an option for Apple, on IP connections only, to release at least
853 // one of the locks before calling a packet output routine; with changes to
854 // the underlying SCTP stack this might remove the need to use an async proxy.
855 if ((false /*peer->IsSTSThread()*/)) {
856 res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
857 } else {
858 auto *data = new unsigned char[length];
859 memcpy(data, buffer, length);
860 // Commented out since we have to Dispatch SendPacket to avoid deadlock"
861 // res = -1;
862
863 // XXX It might be worthwhile to add an assertion against the thread
864 // somehow getting into the DataChannel/SCTP code again, as
865 // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
866 // needs to be a per-thread check, not a global.
867 peer->mSTS->Dispatch(
868 WrapRunnable(RefPtr<DataChannelConnection>(peer),
869 &DataChannelConnection::SendPacket, data, length, true),
870 NS_DISPATCH_NORMAL);
871 res = 0; // cheat! Packets can always be dropped later anyways
872 }
873 return res;
874 }
875 #endif
876
877 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
878 // listen for incoming associations
879 // Blocks! - Don't call this from main thread!
880
881 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
882
883 bool DataChannelConnection::Listen(unsigned short port) {
884 struct sockaddr_in addr;
885 socklen_t addr_len;
886
887 NS_WARNING_ASSERTION(!NS_IsMainThread(),
888 "Blocks, do not call from main thread!!!");
889
890 /* Acting as the 'server' */
891 memset((void *)&addr, 0, sizeof(addr));
892 #ifdef HAVE_SIN_LEN
893 addr.sin_len = sizeof(struct sockaddr_in);
894 #endif
895 addr.sin_family = AF_INET;
896 addr.sin_port = htons(port);
897 addr.sin_addr.s_addr = htonl(INADDR_ANY);
898 LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
899 mState = CONNECTING;
900 if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
901 sizeof(struct sockaddr_in)) < 0) {
902 LOG(("***Failed userspace_bind"));
903 return false;
904 }
905 if (usrsctp_listen(mMasterSocket, 1) < 0) {
906 LOG(("***Failed userspace_listen"));
907 return false;
908 }
909
910 LOG(("Accepting connection"));
911 addr_len = 0;
912 if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) ==
913 nullptr) {
914 LOG(("***Failed accept"));
915 return false;
916 }
917 mState = OPEN;
918
919 struct linger l;
920 l.l_onoff = 1;
921 l.l_linger = 0;
922 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER, (const void *)&l,
923 (socklen_t)sizeof(struct linger)) < 0) {
924 LOG(("Couldn't set SO_LINGER on SCTP socket"));
925 }
926
927 SetEvenOdd();
928
929 // Notify Connection open
930 // XXX We need to make sure connection sticks around until the message is
931 // delivered
932 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
933 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
934 DataChannelOnMessageAvailable::ON_CONNECTION, this,
935 (DataChannel *)nullptr)));
936 return true;
937 }
938
939 // Blocks! - Don't call this from main thread!
940 bool DataChannelConnection::Connect(const char *addr, unsigned short port) {
941 struct sockaddr_in addr4;
942 struct sockaddr_in6 addr6;
943
944 NS_WARNING_ASSERTION(!NS_IsMainThread(),
945 "Blocks, do not call from main thread!!!");
946
947 /* Acting as the connector */
948 LOG(("Connecting to %s, port %u", addr, port));
949 memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
950 memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
951 #ifdef HAVE_SIN_LEN
952 addr4.sin_len = sizeof(struct sockaddr_in);
953 #endif
954 #ifdef HAVE_SIN6_LEN
955 addr6.sin6_len = sizeof(struct sockaddr_in6);
956 #endif
957 addr4.sin_family = AF_INET;
958 addr6.sin6_family = AF_INET6;
959 addr4.sin_port = htons(port);
960 addr6.sin6_port = htons(port);
961 mState = CONNECTING;
962
963 #if !defined(__Userspace_os_Windows)
964 if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
965 if (usrsctp_connect(mMasterSocket,
966 reinterpret_cast<struct sockaddr *>(&addr6),
967 sizeof(struct sockaddr_in6)) < 0) {
968 LOG(("*** Failed userspace_connect"));
969 return false;
970 }
971 } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
972 if (usrsctp_connect(mMasterSocket,
973 reinterpret_cast<struct sockaddr *>(&addr4),
974 sizeof(struct sockaddr_in)) < 0) {
975 LOG(("*** Failed userspace_connect"));
976 return false;
977 }
978 } else {
979 LOG(("*** Illegal destination address."));
980 }
981 #else
982 {
983 struct sockaddr_storage ss;
984 int sslen = sizeof(ss);
985
986 if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr,
987 (struct sockaddr *)&ss, &sslen)) {
988 addr6.sin6_addr =
989 (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
990 if (usrsctp_connect(mMasterSocket,
991 reinterpret_cast<struct sockaddr *>(&addr6),
992 sizeof(struct sockaddr_in6)) < 0) {
993 LOG(("*** Failed userspace_connect"));
994 return false;
995 }
996 } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr,
997 (struct sockaddr *)&ss, &sslen)) {
998 addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
999 if (usrsctp_connect(mMasterSocket,
1000 reinterpret_cast<struct sockaddr *>(&addr4),
1001 sizeof(struct sockaddr_in)) < 0) {
1002 LOG(("*** Failed userspace_connect"));
1003 return false;
1004 }
1005 } else {
1006 LOG(("*** Illegal destination address."));
1007 }
1008 }
1009 #endif
1010
1011 mSocket = mMasterSocket;
1012
1013 LOG(("connect() succeeded! Entering connected mode"));
1014 mState = OPEN;
1015
1016 SetEvenOdd();
1017
1018 // Notify Connection open
1019 // XXX We need to make sure connection sticks around until the message is
1020 // delivered
1021 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
1022 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1023 DataChannelOnMessageAvailable::ON_CONNECTION, this,
1024 (DataChannel *)nullptr)));
1025 return true;
1026 }
1027 #endif
1028
1029 DataChannel *DataChannelConnection::FindChannelByStream(uint16_t stream) {
1030 return mStreams.SafeElementAt(stream);
1031 }
1032
1033 uint16_t DataChannelConnection::FindFreeStream() {
1034 uint32_t i, j, limit;
1035
1036 limit = mStreams.Length();
1037 if (limit > MAX_NUM_STREAMS) limit = MAX_NUM_STREAMS;
1038
1039 for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
1040 if (!mStreams[i]) {
1041 // Verify it's not still in the process of closing
1042 for (j = 0; j < mStreamsResetting.Length(); ++j) {
1043 if (mStreamsResetting[j] == i) {
1044 break;
1045 }
1046 }
1047 if (j == mStreamsResetting.Length()) break;
1048 }
1049 }
1050 if (i >= limit) {
1051 return INVALID_STREAM;
1052 }
1053 return i;
1054 }
1055
1056 uint32_t DataChannelConnection::UpdateCurrentStreamIndex() {
1057 if (mCurrentStream == mStreams.Length() - 1) {
1058 mCurrentStream = 0;
1059 } else {
1060 ++mCurrentStream;
1061 }
1062
1063 return mCurrentStream;
1064 }
1065
1066 uint32_t DataChannelConnection::GetCurrentStreamIndex() {
1067 // Fix current stream index (in case #streams decreased)
1068 if (mCurrentStream >= mStreams.Length()) {
1069 mCurrentStream = 0;
1070 }
1071
1072 return mCurrentStream;
1073 }
1074
1075 bool DataChannelConnection::RequestMoreStreams(int32_t aNeeded) {
1076 struct sctp_status status;
1077 struct sctp_add_streams sas;
1078 uint32_t outStreamsNeeded;
1079 socklen_t len;
1080
1081 if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
1082 aNeeded = MAX_NUM_STREAMS - mStreams.Length();
1083 }
1084 if (aNeeded <= 0) {
1085 return false;
1086 }
1087
1088 len = (socklen_t)sizeof(struct sctp_status);
1089 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status,
1090 &len) < 0) {
1091 LOG(("***failed: getsockopt SCTP_STATUS"));
1092 return false;
1093 }
1094 outStreamsNeeded = aNeeded; // number to add
1095
1096 // Note: if multiple channel opens happen when we don't have enough space,
1097 // we'll call RequestMoreStreams() multiple times
1098 memset(&sas, 0, sizeof(sas));
1099 sas.sas_instrms = 0;
1100 sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
1101 // Doesn't block, we get an event when it succeeds or fails
1102 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
1103 (socklen_t)sizeof(struct sctp_add_streams)) < 0) {
1104 if (errno == EALREADY) {
1105 LOG(("Already have %u output streams", outStreamsNeeded));
1106 return true;
1107 }
1108
1109 LOG(("***failed: setsockopt ADD errno=%d", errno));
1110 return false;
1111 }
1112 LOG(("Requested %u more streams", outStreamsNeeded));
1113 // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
1114 // values are larger than mStreams.Length()
1115 return true;
1116 }
1117
1118 // Returns a POSIX error code.
1119 int DataChannelConnection::SendControlMessage(const uint8_t *data, uint32_t len,
1120 uint16_t stream) {
1121 struct sctp_sendv_spa info = {0};
1122
1123 // General flags
1124 info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
1125
1126 // Set stream identifier, protocol identifier and flags
1127 info.sendv_sndinfo.snd_sid = stream;
1128 info.sendv_sndinfo.snd_flags = SCTP_EOR;
1129 info.sendv_sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
1130
1131 // Create message instance and send
1132 // Note: Main-thread IO, but doesn't block
1133 #if (UINT32_MAX > SIZE_MAX)
1134 if (len > SIZE_MAX) {
1135 return EMSGSIZE;
1136 }
1137 #endif
1138 OutgoingMsg msg(info, data, (size_t)len);
1139 bool buffered;
1140 int error = SendMsgInternalOrBuffer(mBufferedControl, msg, buffered);
1141
1142 // Set pending type (if buffered)
1143 if (!error && buffered && !mPendingType) {
1144 mPendingType = PENDING_DCEP;
1145 }
1146 return error;
1147 }
1148
1149 // Returns a POSIX error code.
1150 int DataChannelConnection::SendOpenAckMessage(uint16_t stream) {
1151 struct rtcweb_datachannel_ack ack;
1152
1153 memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
1154 ack.msg_type = DATA_CHANNEL_ACK;
1155
1156 return SendControlMessage((const uint8_t *)&ack, sizeof(ack), stream);
1157 }
1158
1159 // Returns a POSIX error code.
1160 int DataChannelConnection::SendOpenRequestMessage(
1161 const nsACString &label, const nsACString &protocol, uint16_t stream,
1162 bool unordered, uint16_t prPolicy, uint32_t prValue) {
1163 const int label_len = label.Length(); // not including nul
1164 const int proto_len = protocol.Length(); // not including nul
1165 // careful - request struct include one char for the label
1166 const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
1167 label_len + proto_len;
1168 struct rtcweb_datachannel_open_request *req =
1169 (struct rtcweb_datachannel_open_request *)moz_xmalloc(req_size);
1170
1171 memset(req, 0, req_size);
1172 req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
1173 switch (prPolicy) {
1174 case SCTP_PR_SCTP_NONE:
1175 req->channel_type = DATA_CHANNEL_RELIABLE;
1176 break;
1177 case SCTP_PR_SCTP_TTL:
1178 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
1179 break;
1180 case SCTP_PR_SCTP_RTX:
1181 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
1182 break;
1183 default:
1184 free(req);
1185 return EINVAL;
1186 }
1187 if (unordered) {
1188 // Per the current types, all differ by 0x80 between ordered and unordered
1189 req->channel_type |=
1190 0x80; // NOTE: be careful if new types are added in the future
1191 }
1192
1193 req->reliability_param = htonl(prValue);
1194 req->priority = htons(0); /* XXX: add support */
1195 req->label_length = htons(label_len);
1196 req->protocol_length = htons(proto_len);
1197 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
1198 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
1199
1200 // TODO: req_size is an int... that looks hairy
1201 int error = SendControlMessage((const uint8_t *)req, req_size, stream);
1202
1203 free(req);
1204 return error;
1205 }
1206
1207 // XXX This should use a separate thread (outbound queue) which should
1208 // select() to know when to *try* to send data to the socket again.
1209 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1210 // (just not sure in what direction). We could re-implement NSPR's
1211 // PR_POLL_WRITE/etc handling... with a lot of work.
1212
1213 // Better yet, use the SCTP stack's notifications on buffer state to avoid
1214 // filling the SCTP's buffers.
1215
1216 // returns if we're still blocked (true)
1217 bool DataChannelConnection::SendDeferredMessages() {
1218 RefPtr<DataChannel> channel; // we may null out the refs to this
1219
1220 // This may block while something is modifying channels, but should not block
1221 // for IO
1222 mLock.AssertCurrentThreadOwns();
1223
1224 LOG(("SendDeferredMessages called, pending type: %d", mPendingType));
1225 if (!mPendingType) {
1226 return false;
1227 }
1228
1229 // Send pending control messages
1230 // Note: If ndata is not active, check if DCEP messages are currently
1231 // outstanding. These need to
1232 // be sent first before other streams can be used for sending.
1233 if (!mBufferedControl.IsEmpty() &&
1234 (mSendInterleaved || mPendingType == PENDING_DCEP)) {
1235 if (SendBufferedMessages(mBufferedControl)) {
1236 return true;
1237 }
1238
1239 // Note: There may or may not be pending data messages
1240 mPendingType = PENDING_DATA;
1241 }
1242
1243 bool blocked = false;
1244 uint32_t i = GetCurrentStreamIndex();
1245 uint32_t end = i;
1246 do {
1247 channel = mStreams[i];
1248 if (!channel || channel->mBufferedData.IsEmpty()) {
1249 i = UpdateCurrentStreamIndex();
1250 continue;
1251 }
1252
1253 // Clear if closing/closed
1254 if (channel->mState == CLOSED || channel->mState == CLOSING) {
1255 channel->mBufferedData.Clear();
1256 i = UpdateCurrentStreamIndex();
1257 continue;
1258 }
1259
1260 size_t bufferedAmount = channel->GetBufferedAmountLocked();
1261 size_t threshold = channel->mBufferedThreshold;
1262 bool wasOverThreshold = bufferedAmount >= threshold;
1263
1264 // Send buffered data messages
1265 // Warning: This will fail in case ndata is inactive and a previously
1266 // deallocated data channel has not been closed properly. If you
1267 // ever see that no messages can be sent on any channel, this is
1268 // likely the cause (an explicit EOR message partially sent whose
1269 // remaining chunks are still being waited for).
1270 blocked = SendBufferedMessages(channel->mBufferedData);
1271 bufferedAmount = channel->GetBufferedAmountLocked();
1272
1273 // can never fire with default threshold of 0
1274 if (wasOverThreshold && bufferedAmount < threshold) {
1275 LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1276 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1277 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1278 DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD, this, channel)));
1279 }
1280
1281 if (bufferedAmount == 0) {
1282 // buffered-to-not-buffered transition; tell the DOM code in case this
1283 // makes it available for GC
1284 LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1285 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1286 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1287 DataChannelOnMessageAvailable::NO_LONGER_BUFFERED, this, channel)));
1288 }
1289
1290 // Update current stream index
1291 // Note: If ndata is not active, the outstanding data messages on this
1292 // stream need to be sent first before other streams can be used for
1293 // sending.
1294 if (mSendInterleaved || !blocked) {
1295 i = UpdateCurrentStreamIndex();
1296 }
1297 } while (!blocked && i != end);
1298
1299 if (!blocked) {
1300 mPendingType = mBufferedControl.IsEmpty() ? PENDING_NONE : PENDING_DCEP;
1301 }
1302 return blocked;
1303 }
1304
1305 // Called with mLock locked!
1306 // buffer MUST have at least one item!
1307 // returns if we're still blocked (true)
1308 bool DataChannelConnection::SendBufferedMessages(
1309 nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer) {
1310 do {
1311 // Re-send message
1312 int error = SendMsgInternal(*buffer[0]);
1313 switch (error) {
1314 case 0:
1315 buffer.RemoveElementAt(0);
1316 break;
1317 case EAGAIN:
1318 #if (EAGAIN != EWOULDBLOCK)
1319 case EWOULDBLOCK:
1320 #endif
1321 return true;
1322 default:
1323 buffer.RemoveElementAt(0);
1324 LOG(("error on sending: %d", error));
1325 break;
1326 }
1327 } while (!buffer.IsEmpty());
1328
1329 return false;
1330 }
1331
1332 // Caller must ensure that length <= SIZE_MAX
1333 void DataChannelConnection::HandleOpenRequestMessage(
1334 const struct rtcweb_datachannel_open_request *req, uint32_t length,
1335 uint16_t stream) {
1336 RefPtr<DataChannel> channel;
1337 uint32_t prValue;
1338 uint16_t prPolicy;
1339 uint32_t flags;
1340
1341 mLock.AssertCurrentThreadOwns();
1342
1343 const size_t requiredLength = (sizeof(*req) - 1) + ntohs(req->label_length) +
1344 ntohs(req->protocol_length);
1345 if (((size_t)length) != requiredLength) {
1346 LOG(("%s: Inconsistent length: %u, should be %zu", __FUNCTION__, length,
1347 requiredLength));
1348 if (((size_t)length) < requiredLength) return;
1349 }
1350
1351 LOG(("%s: length %u, sizeof(*req) = %zu", __FUNCTION__, length,
1352 sizeof(*req)));
1353
1354 switch (req->channel_type) {
1355 case DATA_CHANNEL_RELIABLE:
1356 case DATA_CHANNEL_RELIABLE_UNORDERED:
1357 prPolicy = SCTP_PR_SCTP_NONE;
1358 break;
1359 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1360 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1361 prPolicy = SCTP_PR_SCTP_RTX;
1362 break;
1363 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1364 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1365 prPolicy = SCTP_PR_SCTP_TTL;
1366 break;
1367 default:
1368 LOG(("Unknown channel type %d", req->channel_type));
1369 /* XXX error handling */
1370 return;
1371 }
1372 prValue = ntohl(req->reliability_param);
1373 flags =
1374 (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1375
1376 if ((channel = FindChannelByStream(stream))) {
1377 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1378 LOG(
1379 ("ERROR: HandleOpenRequestMessage: channel for stream %u is in state "
1380 "%d instead of CLOSED.",
1381 stream, channel->mState));
1382 /* XXX: some error handling */
1383 } else {
1384 LOG(("Open for externally negotiated channel %u", stream));
1385 // XXX should also check protocol, maybe label
1386 if (prPolicy != channel->mPrPolicy || prValue != channel->mPrValue ||
1387 flags !=
1388 (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
1389 LOG(
1390 ("WARNING: external negotiation mismatch with OpenRequest:"
1391 "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1392 stream, prPolicy, channel->mPrPolicy, prValue, channel->mPrValue,
1393 flags, channel->mFlags));
1394 }
1395 }
1396 return;
1397 }
1398 if (stream >= mStreams.Length()) {
1399 LOG(("%s: stream %u out of bounds (%zu)", __FUNCTION__, stream,
1400 mStreams.Length()));
1401 return;
1402 }
1403
1404 nsCString label(
1405 nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1406 nsCString protocol(nsDependentCSubstring(
1407 &req->label[ntohs(req->label_length)], ntohs(req->protocol_length)));
1408
1409 channel =
1410 new DataChannel(this, stream, DataChannel::CONNECTING, label, protocol,
1411 prPolicy, prValue, flags, nullptr, nullptr);
1412 mStreams[stream] = channel;
1413
1414 channel->mState = DataChannel::WAITING_TO_OPEN;
1415
1416 LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1417 channel->mLabel.get(), channel->mProtocol.get(), stream,
1418 channel->mState));
1419 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1420 DataChannelOnMessageAvailable::ON_CHANNEL_CREATED, this, channel)));
1421
1422 LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
1423 channel.get()));
1424
1425 int error = SendOpenAckMessage(stream);
1426 if (error) {
1427 LOG(("SendOpenRequest failed, error = %d", error));
1428 // Close the channel, inform the user
1429 CloseInt(channel);
1430 // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1431 return;
1432 }
1433
1434 // Now process any queued data messages for the channel (which will
1435 // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1436 // more that come in before that happens)
1437 DeliverQueuedData(stream);
1438 }
1439
1440 // NOTE: the updated spec from the IETF says we should set in-order until we
1441 // receive an ACK. That would make this code moot. Keep it for now for
1442 // backwards compatibility.
1443 void DataChannelConnection::DeliverQueuedData(uint16_t stream) {
1444 mLock.AssertCurrentThreadOwns();
1445
1446 uint32_t i = 0;
1447 while (i < mQueuedData.Length()) {
1448 // Careful! we may modify the array length from within the loop!
1449 if (mQueuedData[i]->mStream == stream) {
1450 LOG(("Delivering queued data for stream %u, length %u", stream,
1451 mQueuedData[i]->mLength));
1452 // Deliver the queued data
1453 HandleDataMessage(mQueuedData[i]->mData, mQueuedData[i]->mLength,
1454 mQueuedData[i]->mPpid, mQueuedData[i]->mStream,
1455 mQueuedData[i]->mFlags);
1456 mQueuedData.RemoveElementAt(i);
1457 continue; // don't bump index since we removed the element
1458 }
1459 i++;
1460 }
1461 }
1462
1463 // Caller must ensure that length <= SIZE_MAX
1464 void DataChannelConnection::HandleOpenAckMessage(
1465 const struct rtcweb_datachannel_ack *ack, uint32_t length,
1466 uint16_t stream) {
1467 DataChannel *channel;
1468
1469 mLock.AssertCurrentThreadOwns();
1470
1471 channel = FindChannelByStream(stream);
1472 if (NS_WARN_IF(!channel)) {
1473 return;
1474 }
1475
1476 LOG(("OpenAck received for stream %u, waiting=%d", stream,
1477 (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1478
1479 channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1480 }
1481
1482 // Caller must ensure that length <= SIZE_MAX
1483 void DataChannelConnection::HandleUnknownMessage(uint32_t ppid, uint32_t length,
1484 uint16_t stream) {
1485 /* XXX: Send an error message? */
1486 LOG(("unknown DataChannel message received: %u, len %u on stream %d", ppid,
1487 length, stream));
1488 // XXX Log to JS error console if possible
1489 }
1490
1491 uint8_t DataChannelConnection::BufferMessage(nsACString &recvBuffer,
1492 const void *data, uint32_t length,
1493 uint32_t ppid, int flags) {
1494 const char *buffer = (const char *)data;
1495 uint8_t bufferFlags = 0;
1496
1497 if ((flags & MSG_EOR) && ppid != DATA_CHANNEL_PPID_BINARY_PARTIAL &&
1498 ppid != DATA_CHANNEL_PPID_DOMSTRING_PARTIAL) {
1499 bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE;
1500
1501 // Return directly if nothing has been buffered
1502 if (recvBuffer.IsEmpty()) {
1503 return bufferFlags;
1504 }
1505 }
1506
1507 // Ensure it doesn't blow up our buffer
1508 // TODO: Change 'WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL' to whatever the
1509 // new buffer is capable of holding.
1510 if (((uint64_t)recvBuffer.Length()) + ((uint64_t)length) >
1511 WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1512 bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE;
1513 return bufferFlags;
1514 }
1515
1516 // Copy & add to receive buffer
1517 recvBuffer.Append(buffer, length);
1518 bufferFlags |= DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED;
1519 return bufferFlags;
1520 }
1521
1522 void DataChannelConnection::HandleDataMessage(const void *data, size_t length,
1523 uint32_t ppid, uint16_t stream,
1524 int flags) {
1525 DataChannel *channel;
1526 const char *buffer = (const char *)data;
1527
1528 mLock.AssertCurrentThreadOwns();
1529 channel = FindChannelByStream(stream);
1530
1531 // Note: Until we support SIZE_MAX sized messages, we need this check
1532 #if (SIZE_MAX > UINT32_MAX)
1533 if (length > UINT32_MAX) {
1534 LOG(("DataChannel: Cannot handle message of size %zu (max=%" PRIu32 ")",
1535 length, UINT32_MAX));
1536 CloseInt(channel);
1537 return;
1538 }
1539 #endif
1540 uint32_t data_length = (uint32_t)length;
1541
1542 // XXX A closed channel may trip this... check
1543 // NOTE: the updated spec from the IETF says we should set in-order until we
1544 // receive an ACK. That would make this code moot. Keep it for now for
1545 // backwards compatibility.
1546 if (!channel) {
1547 // In the updated 0-RTT open case, the sender can send data immediately
1548 // after Open, and doesn't set the in-order bit (since we don't have a
1549 // response or ack). Also, with external negotiation, data can come in
1550 // before we're told about the external negotiation. We need to buffer
1551 // data until either a) Open comes in, if the ordering get messed up,
1552 // or b) the app tells us this channel was externally negotiated. When
1553 // these occur, we deliver the data.
1554
1555 // Since this is rare and non-performance, keep a single list of queued
1556 // data messages to deliver once the channel opens.
1557 LOG(("Queuing data for stream %u, length %u", stream, data_length));
1558 // Copies data
1559 mQueuedData.AppendElement(
1560 new QueuedDataMessage(stream, ppid, flags, data, data_length));
1561 return;
1562 }
1563
1564 // Ignore incoming data in case the channel is closed
1565 if (channel->mState == CLOSED) {
1566 return;
1567 }
1568
1569 bool is_binary = true;
1570 uint8_t bufferFlags;
1571 int32_t type;
1572 const char *info = "";
1573
1574 if (ppid == DATA_CHANNEL_PPID_DOMSTRING_PARTIAL ||
1575 ppid == DATA_CHANNEL_PPID_DOMSTRING) {
1576 is_binary = false;
1577 }
1578 if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1579 NS_WARNING("DataChannel message aborted by fragment type change!");
1580 // TODO: Maybe closing would be better as this is a hard to detect protocol
1581 // violation?
1582 channel->mRecvBuffer.Truncate(0);
1583 }
1584 channel->mIsRecvBinary = is_binary;
1585
1586 // Remaining chunks of previously truncated message (due to the buffer being
1587 // full)?
1588 if (channel->mFlags & DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE) {
1589 LOG(
1590 ("DataChannel: Ignoring partial message of length %u, buffer full and "
1591 "closing",
1592 data_length));
1593 // Only unblock if unordered
1594 if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
1595 (flags & MSG_EOR)) {
1596 channel->mFlags &= ~DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1597 }
1598 }
1599
1600 // Buffer message until complete
1601 bufferFlags =
1602 BufferMessage(channel->mRecvBuffer, buffer, data_length, ppid, flags);
1603 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1604 LOG(
1605 ("DataChannel: Buffered message would become too large to handle, "
1606 "closing channel"));
1607 channel->mRecvBuffer.Truncate(0);
1608 channel->mFlags |= DATA_CHANNEL_FLAGS_CLOSING_TOO_LARGE;
1609 CloseInt(channel);
1610 return;
1611 }
1612 if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1613 LOG(
1614 ("DataChannel: Partial %s message of length %u (total %u) on channel "
1615 "id %u",
1616 is_binary ? "binary" : "string", data_length,
1617 channel->mRecvBuffer.Length(), channel->mStream));
1618 return; // Not ready to notify application
1619 }
1620 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1621 data_length = channel->mRecvBuffer.Length();
1622 }
1623
1624 // Complain about large messages (only complain - we can handle it)
1625 if (data_length > WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL) {
1626 LOG(
1627 ("DataChannel: Received message of length %u is > announced maximum "
1628 "message size (%u)",
1629 data_length, WEBRTC_DATACHANNEL_MAX_MESSAGE_SIZE_LOCAL));
1630 }
1631
1632 switch (ppid) {
1633 case DATA_CHANNEL_PPID_DOMSTRING:
1634 LOG(("DataChannel: Received string message of length %u on channel %u",
1635 data_length, channel->mStream));
1636 type = DataChannelOnMessageAvailable::ON_DATA_STRING;
1637 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1638 info = " (string fragmented)";
1639 }
1640 // else send using recvData normally
1641
1642 // WebSockets checks IsUTF8() here; we can try to deliver it
1643 break;
1644
1645 case DATA_CHANNEL_PPID_BINARY:
1646 LOG(("DataChannel: Received binary message of length %u on channel id %u",
1647 data_length, channel->mStream));
1648 type = DataChannelOnMessageAvailable::ON_DATA_BINARY;
1649 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1650 info = " (binary fragmented)";
1651 }
1652
1653 // else send using recvData normally
1654 break;
1655
1656 default:
1657 NS_ERROR("Unknown data PPID");
1658 return;
1659 }
1660
1661 // Notify onmessage
1662 LOG(("%s: sending ON_DATA_%s%s for %p", __FUNCTION__,
1663 (type == DataChannelOnMessageAvailable::ON_DATA_STRING) ? "STRING"
1664 : "BINARY",
1665 info, channel));
1666 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1667 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1668 type, this, channel, channel->mRecvBuffer));
1669 channel->mRecvBuffer.Truncate(0);
1670 } else {
1671 nsAutoCString recvData(buffer, data_length); // copies (<64) or allocates
1672 channel->SendOrQueue(
1673 new DataChannelOnMessageAvailable(type, this, channel, recvData));
1674 }
1675 }
1676
1677 void DataChannelConnection::HandleDCEPMessage(const void *buffer, size_t length,
1678 uint32_t ppid, uint16_t stream,
1679 int flags) {
1680 const struct rtcweb_datachannel_open_request *req;
1681 const struct rtcweb_datachannel_ack *ack;
1682
1683 // Note: Until we support SIZE_MAX sized messages, we need this check
1684 #if (SIZE_MAX > UINT32_MAX)
1685 if (length > UINT32_MAX) {
1686 LOG(("DataChannel: Cannot handle message of size %zu (max=%u)", length,
1687 UINT32_MAX));
1688 Stop();
1689 return;
1690 }
1691 #endif
1692 uint32_t data_length = (uint32_t)length;
1693
1694 mLock.AssertCurrentThreadOwns();
1695
1696 // Buffer message until complete
1697 const uint8_t bufferFlags =
1698 BufferMessage(mRecvBuffer, buffer, data_length, ppid, flags);
1699 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_TOO_LARGE) {
1700 LOG(
1701 ("DataChannel: Buffered message would become too large to handle, "
1702 "closing connection"));
1703 mRecvBuffer.Truncate(0);
1704 Stop();
1705 return;
1706 }
1707 if (!(bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_COMPLETE)) {
1708 LOG(("Buffered partial DCEP message of length %u", data_length));
1709 return;
1710 }
1711 if (bufferFlags & DATA_CHANNEL_BUFFER_MESSAGE_FLAGS_BUFFERED) {
1712 buffer = reinterpret_cast<const void *>(mRecvBuffer.BeginReading());
1713 data_length = mRecvBuffer.Length();
1714 }
1715
1716 req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1717 LOG(("Handling DCEP message of length %u", data_length));
1718
1719 // Ensure minimum message size (ack is the smallest DCEP message)
1720 if ((size_t)data_length < sizeof(*ack)) {
1721 LOG(("Ignored invalid DCEP message (too short)"));
1722 return;
1723 }
1724
1725 switch (req->msg_type) {
1726 case DATA_CHANNEL_OPEN_REQUEST:
1727 // structure includes a possibly-unused char label[1] (in a packed
1728 // structure)
1729 if (NS_WARN_IF((size_t)data_length < sizeof(*req) - 1)) {
1730 return;
1731 }
1732
1733 HandleOpenRequestMessage(req, data_length, stream);
1734 break;
1735 case DATA_CHANNEL_ACK:
1736 // >= sizeof(*ack) checked above
1737
1738 ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1739 HandleOpenAckMessage(ack, data_length, stream);
1740 break;
1741 default:
1742 HandleUnknownMessage(ppid, data_length, stream);
1743 break;
1744 }
1745
1746 // Reset buffer
1747 mRecvBuffer.Truncate(0);
1748 }
1749
1750 // Called with mLock locked!
1751 void DataChannelConnection::HandleMessage(const void *buffer, size_t length,
1752 uint32_t ppid, uint16_t stream,
1753 int flags) {
1754 mLock.AssertCurrentThreadOwns();
1755
1756 switch (ppid) {
1757 case DATA_CHANNEL_PPID_CONTROL:
1758 HandleDCEPMessage(buffer, length, ppid, stream, flags);
1759 break;
1760 case DATA_CHANNEL_PPID_DOMSTRING_PARTIAL:
1761 case DATA_CHANNEL_PPID_DOMSTRING:
1762 case DATA_CHANNEL_PPID_BINARY_PARTIAL:
1763 case DATA_CHANNEL_PPID_BINARY:
1764 HandleDataMessage(buffer, length, ppid, stream, flags);
1765 break;
1766 default:
1767 LOG(("Message of length %zu PPID %u on stream %u received (%s).", length,
1768 ppid, stream, (flags & MSG_EOR) ? "complete" : "partial"));
1769 break;
1770 }
1771 }
1772
1773 void DataChannelConnection::HandleAssociationChangeEvent(
1774 const struct sctp_assoc_change *sac) {
1775 uint32_t i, n;
1776
1777 switch (sac->sac_state) {
1778 case SCTP_COMM_UP:
1779 LOG(("Association change: SCTP_COMM_UP"));
1780 if (mState == CONNECTING) {
1781 mSocket = mMasterSocket;
1782 mState = OPEN;
1783
1784 // Check for older Firefox by looking at the amount of incoming streams
1785 LOG(("Negotiated number of incoming streams: %" PRIu16,
1786 sac->sac_inbound_streams));
1787 if (!mMaxMessageSizeSet &&
1788 sac->sac_inbound_streams ==
1789 WEBRTC_DATACHANNEL_STREAMS_OLDER_FIREFOX) {
1790 LOG(("Older Firefox detected, using PPID-based fragmentation"));
1791 mPpidFragmentation = true;
1792 }
1793
1794 SetEvenOdd();
1795
1796 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
1797 DataChannelOnMessageAvailable::ON_CONNECTION, this)));
1798 LOG(("DTLS connect() succeeded! Entering connected mode"));
1799
1800 // Open any streams pending...
1801 ProcessQueuedOpens();
1802
1803 } else if (mState == OPEN) {
1804 LOG(("DataConnection Already OPEN"));
1805 } else {
1806 LOG(("Unexpected state: %d", mState));
1807 }
1808 break;
1809 case SCTP_COMM_LOST:
1810 LOG(("Association change: SCTP_COMM_LOST"));
1811 // This association is toast, so also close all the channels -- from
1812 // mainthread!
1813 Stop();
1814 break;
1815 case SCTP_RESTART:
1816 LOG(("Association change: SCTP_RESTART"));
1817 break;
1818 case SCTP_SHUTDOWN_COMP:
1819 LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1820 Stop();
1821 break;
1822 case SCTP_CANT_STR_ASSOC:
1823 LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1824 break;
1825 default:
1826 LOG(("Association change: UNKNOWN"));
1827 break;
1828 }
1829 LOG(("Association change: streams (in/out) = (%u/%u)",
1830 sac->sac_inbound_streams, sac->sac_outbound_streams));
1831
1832 if (NS_WARN_IF(!sac)) {
1833 return;
1834 }
1835
1836 n = sac->sac_length - sizeof(*sac);
1837 if ((sac->sac_state == SCTP_COMM_UP) || (sac->sac_state == SCTP_RESTART)) {
1838 if (n > 0) {
1839 for (i = 0; i < n; ++i) {
1840 switch (sac->sac_info[i]) {
1841 case SCTP_ASSOC_SUPPORTS_PR:
1842 LOG(("Supports: PR"));
1843 break;
1844 case SCTP_ASSOC_SUPPORTS_AUTH:
1845 LOG(("Supports: AUTH"));
1846 break;
1847 case SCTP_ASSOC_SUPPORTS_ASCONF:
1848 LOG(("Supports: ASCONF"));
1849 break;
1850 case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1851 LOG(("Supports: MULTIBUF"));
1852 break;
1853 case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1854 LOG(("Supports: RE-CONFIG"));
1855 break;
1856 #if defined(SCTP_ASSOC_SUPPORTS_INTERLEAVING)
1857 case SCTP_ASSOC_SUPPORTS_INTERLEAVING:
1858 LOG(("Supports: NDATA"));
1859 // TODO: This should probably be set earlier above in 'case
1860 // SCTP_COMM_UP' but we also need this for 'SCTP_RESTART'.
1861 mSendInterleaved = true;
1862 break;
1863 #endif
1864 default:
1865 LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1866 break;
1867 }
1868 }
1869 }
1870 } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1871 (sac->sac_state == SCTP_CANT_STR_ASSOC)) &&
1872 (n > 0)) {
1873 LOG(("Association: ABORT ="));
1874 for (i = 0; i < n; ++i) {
1875 LOG((" 0x%02x", sac->sac_info[i]));
1876 }
1877 }
1878 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1879 (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1880 (sac->sac_state == SCTP_COMM_LOST)) {
1881 return;
1882 }
1883 }
1884
1885 void DataChannelConnection::HandlePeerAddressChangeEvent(
1886 const struct sctp_paddr_change *spc) {
1887 const char *addr = "";
1888 #if !defined(__Userspace_os_Windows)
1889 char addr_buf[INET6_ADDRSTRLEN];
1890 struct sockaddr_in *sin;
1891 struct sockaddr_in6 *sin6;
1892 #endif
1893
1894 switch (spc->spc_aaddr.ss_family) {
1895 case AF_INET:
1896 #if !defined(__Userspace_os_Windows)
1897 sin = (struct sockaddr_in *)&spc->spc_aaddr;
1898 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1899 #endif
1900 break;
1901 case AF_INET6:
1902 #if !defined(__Userspace_os_Windows)
1903 sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1904 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1905 #endif
1906 break;
1907 case AF_CONN:
1908 addr = "DTLS connection";
1909 break;
1910 default:
1911 break;
1912 }
1913 LOG(("Peer address %s is now ", addr));
1914 switch (spc->spc_state) {
1915 case SCTP_ADDR_AVAILABLE:
1916 LOG(("SCTP_ADDR_AVAILABLE"));
1917 break;
1918 case SCTP_ADDR_UNREACHABLE:
1919 LOG(("SCTP_ADDR_UNREACHABLE"));
1920 break;
1921 case SCTP_ADDR_REMOVED:
1922 LOG(("SCTP_ADDR_REMOVED"));
1923 break;
1924 case SCTP_ADDR_ADDED:
1925 LOG(("SCTP_ADDR_ADDED"));
1926 break;
1927 case SCTP_ADDR_MADE_PRIM:
1928 LOG(("SCTP_ADDR_MADE_PRIM"));
1929 break;
1930 case SCTP_ADDR_CONFIRMED:
1931 LOG(("SCTP_ADDR_CONFIRMED"));
1932 break;
1933 default:
1934 LOG(("UNKNOWN"));
1935 break;
1936 }
1937 LOG((" (error = 0x%08x).\n", spc->spc_error));
1938 }
1939
1940 void DataChannelConnection::HandleRemoteErrorEvent(
1941 const struct sctp_remote_error *sre) {
1942 size_t i, n;
1943
1944 n = sre->sre_length - sizeof(struct sctp_remote_error);
1945 LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1946 for (i = 0; i < n; ++i) {
1947 LOG((" 0x%02x", sre->sre_data[i]));
1948 }
1949 }
1950
1951 void DataChannelConnection::HandleShutdownEvent(
1952 const struct sctp_shutdown_event *sse) {
1953 LOG(("Shutdown event."));
1954 /* XXX: notify all channels. */
1955 // Attempts to actually send anything will fail
1956 }
1957
1958 void DataChannelConnection::HandleAdaptationIndication(
1959 const struct sctp_adaptation_event *sai) {
1960 LOG(("Adaptation indication: %x.", sai->sai_adaptation_ind));
1961 }
1962
1963 void DataChannelConnection::HandlePartialDeliveryEvent(
1964 const struct sctp_pdapi_event *spde) {
1965 // Note: Be aware that stream and sequence number being u32 instead of u16 is
1966 // a bug in the SCTP API. This may change in the future.
1967
1968 LOG(("Partial delivery event: "));
1969 switch (spde->pdapi_indication) {
1970 case SCTP_PARTIAL_DELIVERY_ABORTED:
1971 LOG(("delivery aborted "));
1972 break;
1973 default:
1974 LOG(("??? "));
1975 break;
1976 }
1977 LOG(("(flags = %x), stream = %" PRIu32 ", sn = %" PRIu32, spde->pdapi_flags,
1978 spde->pdapi_stream, spde->pdapi_seq));
1979
1980 // Validate stream ID
1981 if (spde->pdapi_stream >= UINT16_MAX) {
1982 LOG(("Invalid stream id in partial delivery event: %" PRIu32 "\n",
1983 spde->pdapi_stream));
1984 return;
1985 }
1986
1987 // Find channel and reset buffer
1988 DataChannel *channel = FindChannelByStream((uint16_t)spde->pdapi_stream);
1989 if (channel) {
1990 LOG(("Abort partially delivered message of %u bytes\n",
1991 channel->mRecvBuffer.Length()));
1992 channel->mRecvBuffer.Truncate(0);
1993 }
1994 }
1995
1996 void DataChannelConnection::HandleSendFailedEvent(
1997 const struct sctp_send_failed_event *ssfe) {
1998 size_t i, n;
1999
2000 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
2001 LOG(("Unsent "));
2002 }
2003 if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
2004 LOG(("Sent "));
2005 }
2006 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
2007 LOG(("(flags = %x) ", ssfe->ssfe_flags));
2008 }
2009 LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
2010 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
2011 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
2012 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
2013 for (i = 0; i < n; ++i) {
2014 LOG((" 0x%02x", ssfe->ssfe_data[i]));
2015 }
2016 }
2017
2018 void DataChannelConnection::ClearResets() {
2019 // Clear all pending resets
2020 if (!mStreamsResetting.IsEmpty()) {
2021 LOG(("Clearing resets for %zu streams", mStreamsResetting.Length()));
2022 }
2023
2024 for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
2025 RefPtr<DataChannel> channel;
2026 channel = FindChannelByStream(mStreamsResetting[i]);
2027 if (channel) {
2028 LOG(("Forgetting channel %u (%p) with pending reset", channel->mStream,
2029 channel.get()));
2030 mStreams[channel->mStream] = nullptr;
2031 }
2032 }
2033 mStreamsResetting.Clear();
2034 }
2035
2036 void DataChannelConnection::ResetOutgoingStream(uint16_t stream) {
2037 uint32_t i;
2038
2039 mLock.AssertCurrentThreadOwns();
2040 LOG(("Connection %p: Resetting outgoing stream %u", (void *)this, stream));
2041 // Rarely has more than a couple items and only for a short time
2042 for (i = 0; i < mStreamsResetting.Length(); ++i) {
2043 if (mStreamsResetting[i] == stream) {
2044 return;
2045 }
2046 }
2047 mStreamsResetting.AppendElement(stream);
2048 }
2049
2050 void DataChannelConnection::SendOutgoingStreamReset() {
2051 struct sctp_reset_streams *srs;
2052 uint32_t i;
2053 size_t len;
2054
2055 LOG(("Connection %p: Sending outgoing stream reset for %zu streams",
2056 (void *)this, mStreamsResetting.Length()));
2057 mLock.AssertCurrentThreadOwns();
2058 if (mStreamsResetting.IsEmpty()) {
2059 LOG(("No streams to reset"));
2060 return;
2061 }
2062 len = sizeof(sctp_assoc_t) +
2063 (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
2064 srs = static_cast<struct sctp_reset_streams *>(
2065 moz_xmalloc(len)); // infallible malloc
2066 memset(srs, 0, len);
2067 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
2068 srs->srs_number_streams = mStreamsResetting.Length();
2069 for (i = 0; i < mStreamsResetting.Length(); ++i) {
2070 srs->srs_stream_list[i] = mStreamsResetting[i];
2071 }
2072 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs,
2073 (socklen_t)len) < 0) {
2074 LOG(("***failed: setsockopt RESET, errno %d", errno));
2075 // if errno == EALREADY, this is normal - we can't send another reset
2076 // with one pending.
2077 // When we get an incoming reset (which may be a response to our
2078 // outstanding one), see if we have any pending outgoing resets and
2079 // send them
2080 } else {
2081 mStreamsResetting.Clear();
2082 }
2083 free(srs);
2084 }
2085
2086 void DataChannelConnection::HandleStreamResetEvent(
2087 const struct sctp_stream_reset_event *strrst) {
2088 uint32_t n, i;
2089 RefPtr<DataChannel> channel; // since we may null out the ref to the channel
2090
2091 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
2092 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
2093 n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) /
2094 sizeof(uint16_t);
2095 for (i = 0; i < n; ++i) {
2096 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
2097 channel = FindChannelByStream(strrst->strreset_stream_list[i]);
2098 if (channel) {
2099 // The other side closed the channel
2100 // We could be in three states:
2101 // 1. Normal state (input and output streams (OPEN)
2102 // Notify application, send a RESET in response on our
2103 // outbound channel. Go to CLOSED
2104 // 2. We sent our own reset (CLOSING); either they crossed on the
2105 // wire, or this is a response to our Reset.
2106 // Go to CLOSED
2107 // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
2108 // I believe this is impossible, as we don't have an input stream
2109 // yet.
2110
2111 LOG(("Incoming: Channel %u closed, state %d", channel->mStream,
2112 channel->mState));
2113 ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
2114 channel->mState == DataChannel::CLOSING ||
2115 channel->mState == DataChannel::CONNECTING ||
2116 channel->mState == DataChannel::WAITING_TO_OPEN);
2117 if (channel->mState == DataChannel::OPEN ||
2118 channel->mState == DataChannel::WAITING_TO_OPEN) {
2119 // Mark the stream for reset (the reset is sent below)
2120 ResetOutgoingStream(channel->mStream);
2121 }
2122 mStreams[channel->mStream] = nullptr;
2123
2124 LOG(("Disconnected DataChannel %p from connection %p",
2125 (void *)channel.get(), (void *)channel->mConnection.get()));
2126 // This sends ON_CHANNEL_CLOSED to mainthread
2127 channel->StreamClosedLocked();
2128 } else {
2129 LOG(("Can't find incoming channel %d", i));
2130 }
2131 }
2132 }
2133 }
2134
2135 // Process any pending resets now:
2136 if (!mStreamsResetting.IsEmpty()) {
2137 LOG(("Sending %zu pending resets", mStreamsResetting.Length()));
2138 SendOutgoingStreamReset();
2139 }
2140 }
2141
2142 void DataChannelConnection::HandleStreamChangeEvent(
2143 const struct sctp_stream_change_event *strchg) {
2144 uint16_t stream;
2145 RefPtr<DataChannel> channel;
2146
2147 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
2148 LOG(("*** Failed increasing number of streams from %zu (%u/%u)",
2149 mStreams.Length(), strchg->strchange_instrms,
2150 strchg->strchange_outstrms));
2151 // XXX FIX! notify pending opens of failure
2152 return;
2153 }
2154 if (strchg->strchange_instrms > mStreams.Length()) {
2155 LOG(("Other side increased streams from %zu to %u", mStreams.Length(),
2156 strchg->strchange_instrms));
2157 }
2158 if (strchg->strchange_outstrms > mStreams.Length() ||
2159 strchg->strchange_instrms > mStreams.Length()) {
2160 uint16_t old_len = mStreams.Length();
2161 uint16_t new_len =
2162 std::max(strchg->strchange_outstrms, strchg->strchange_instrms);
2163 LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
2164 old_len, new_len, new_len - old_len, strchg->strchange_instrms));
2165 // make sure both are the same length
2166 mStreams.AppendElements(new_len - old_len);
2167 LOG(("New length = %zu (was %d)", mStreams.Length(), old_len));
2168 for (size_t i = old_len; i < mStreams.Length(); ++i) {
2169 mStreams[i] = nullptr;
2170 }
2171 // Re-process any channels waiting for streams.
2172 // Linear search, but we don't increase channels often and
2173 // the array would only get long in case of an app error normally
2174
2175 // Make sure we request enough streams if there's a big jump in streams
2176 // Could make a more complex API for OpenXxxFinish() and avoid this loop
2177 size_t num_needed = mPending.GetSize();
2178 LOG(("%zu of %d new streams already needed", num_needed,
2179 new_len - old_len));
2180 num_needed -= (new_len - old_len); // number we added
2181 if (num_needed > 0) {
2182 if (num_needed < 16) num_needed = 16;
2183 LOG(("Not enough new streams, asking for %zu more", num_needed));
2184 // TODO: parameter is an int32_t but we pass size_t
2185 RequestMoreStreams(num_needed);
2186 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
2187 LOG(("Requesting %d output streams to match partner",
2188 strchg->strchange_instrms - strchg->strchange_outstrms));
2189 RequestMoreStreams(strchg->strchange_instrms -
2190 strchg->strchange_outstrms);
2191 }
2192
2193 ProcessQueuedOpens();
2194 }
2195 // else probably not a change in # of streams
2196
2197 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2198 channel = mStreams[i];
2199 if (!channel) continue;
2200
2201 if ((channel->mState == CONNECTING) &&
2202 (channel->mStream == INVALID_STREAM)) {
2203 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
2204 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
2205 /* XXX: Signal to the other end. */
2206 channel->mState = CLOSED;
2207 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2208 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2209 // maybe fire onError (bug 843625)
2210 } else {
2211 stream = FindFreeStream();
2212 if (stream != INVALID_STREAM) {
2213 channel->mStream = stream;
2214 mStreams[stream] = channel;
2215
2216 // Send open request
2217 int error = SendOpenRequestMessage(
2218 channel->mLabel, channel->mProtocol, channel->mStream,
2219 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2220 channel->mPrPolicy, channel->mPrValue);
2221 if (error) {
2222 LOG(("SendOpenRequest failed, error = %d", error));
2223 // Close the channel, inform the user
2224 mStreams[channel->mStream] = nullptr;
2225 channel->mState = CLOSED;
2226 // Don't need to reset; we didn't open it
2227 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2228 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2229 channel)));
2230 } else {
2231 channel->mState = OPEN;
2232 channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2233 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__,
2234 channel.get()));
2235 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2236 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2237 channel)));
2238 }
2239 } else {
2240 /* We will not find more ... */
2241 break;
2242 }
2243 }
2244 }
2245 }
2246 }
2247
2248 // Called with mLock locked!
2249 void DataChannelConnection::HandleNotification(
2250 const union sctp_notification *notif, size_t n) {
2251 mLock.AssertCurrentThreadOwns();
2252 if (notif->sn_header.sn_length != (uint32_t)n) {
2253 return;
2254 }
2255 switch (notif->sn_header.sn_type) {
2256 case SCTP_ASSOC_CHANGE:
2257 HandleAssociationChangeEvent(&(notif->sn_assoc_change));
2258 break;
2259 case SCTP_PEER_ADDR_CHANGE:
2260 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
2261 break;
2262 case SCTP_REMOTE_ERROR:
2263 HandleRemoteErrorEvent(&(notif->sn_remote_error));
2264 break;
2265 case SCTP_SHUTDOWN_EVENT:
2266 HandleShutdownEvent(&(notif->sn_shutdown_event));
2267 break;
2268 case SCTP_ADAPTATION_INDICATION:
2269 HandleAdaptationIndication(&(notif->sn_adaptation_event));
2270 break;
2271 case SCTP_AUTHENTICATION_EVENT:
2272 LOG(("SCTP_AUTHENTICATION_EVENT"));
2273 break;
2274 case SCTP_SENDER_DRY_EVENT:
2275 // LOG(("SCTP_SENDER_DRY_EVENT"));
2276 break;
2277 case SCTP_NOTIFICATIONS_STOPPED_EVENT:
2278 LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
2279 break;
2280 case SCTP_PARTIAL_DELIVERY_EVENT:
2281 HandlePartialDeliveryEvent(&(notif->sn_pdapi_event));
2282 break;
2283 case SCTP_SEND_FAILED_EVENT:
2284 HandleSendFailedEvent(&(notif->sn_send_failed_event));
2285 break;
2286 case SCTP_STREAM_RESET_EVENT:
2287 HandleStreamResetEvent(&(notif->sn_strreset_event));
2288 break;
2289 case SCTP_ASSOC_RESET_EVENT:
2290 LOG(("SCTP_ASSOC_RESET_EVENT"));
2291 break;
2292 case SCTP_STREAM_CHANGE_EVENT:
2293 HandleStreamChangeEvent(&(notif->sn_strchange_event));
2294 break;
2295 default:
2296 LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
2297 break;
2298 }
2299 }
2300
2301 int DataChannelConnection::ReceiveCallback(struct socket *sock, void *data,
2302 size_t datalen,
2303 struct sctp_rcvinfo rcv, int flags) {
2304 ASSERT_WEBRTC(!NS_IsMainThread());
2305
2306 if (!data) {
2307 LOG(("ReceiveCallback: SCTP has finished shutting down"));
2308 } else {
2309 bool locked = false;
2310 if (!IsSTSThread()) {
2311 mLock.Lock();
2312 locked = true;
2313 } else {
2314 mLock.AssertCurrentThreadOwns();
2315 }
2316 if (flags & MSG_NOTIFICATION) {
2317 HandleNotification(static_cast<union sctp_notification *>(data), datalen);
2318 } else {
2319 HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid, flags);
2320 }
2321 if (locked) {
2322 mLock.Unlock();
2323 }
2324 }
2325 // sctp allocates 'data' with malloc(), and expects the receiver to free
2326 // it (presumably with free).
2327 // XXX future optimization: try to deliver messages without an internal
2328 // alloc/copy, and if so delay the free until later.
2329 free(data);
2330 // usrsctp defines the callback as returning an int, but doesn't use it
2331 return 1;
2332 }
2333
2334 already_AddRefed<DataChannel> DataChannelConnection::Open(
2335 const nsACString &label, const nsACString &protocol, Type type,
2336 bool inOrder, uint32_t prValue, DataChannelListener *aListener,
2337 nsISupports *aContext, bool aExternalNegotiated, uint16_t aStream) {
2338 // aStream == INVALID_STREAM to have the protocol allocate
2339 uint16_t prPolicy = SCTP_PR_SCTP_NONE;
2340 uint32_t flags;
2341
2342 LOG(
2343 ("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, "
2344 "context %p, external: %s, stream %u",
2345 PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
2346 type, inOrder, prValue, aListener, aContext,
2347 aExternalNegotiated ? "true" : "false", aStream));
2348 switch (type) {
2349 case DATA_CHANNEL_RELIABLE:
2350 prPolicy = SCTP_PR_SCTP_NONE;
2351 break;
2352 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
2353 prPolicy = SCTP_PR_SCTP_RTX;
2354 break;
2355 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
2356 prPolicy = SCTP_PR_SCTP_TTL;
2357 break;
2358 default:
2359 LOG(("ERROR: unsupported channel type: %u", type));
2360 MOZ_ASSERT(false);
2361 return nullptr;
2362 }
2363 if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
2364 return nullptr;
2365 }
2366
2367 // Don't look past currently-negotiated streams
2368 if (aStream != INVALID_STREAM && aStream < mStreams.Length() &&
2369 mStreams[aStream]) {
2370 LOG(("ERROR: external negotiation of already-open channel %u", aStream));
2371 // XXX How do we indicate this up to the application? Probably the
2372 // caller's job, but we may need to return an error code.
2373 return nullptr;
2374 }
2375
2376 flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
2377
2378 RefPtr<DataChannel> channel(
2379 new DataChannel(this, aStream, DataChannel::CONNECTING, label, protocol,
2380 prPolicy, prValue, flags, aListener, aContext));
2381 if (aExternalNegotiated) {
2382 channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
2383 }
2384
2385 MutexAutoLock lock(mLock); // OpenFinish assumes this
2386 return OpenFinish(channel.forget());
2387 }
2388
2389 // Separate routine so we can also call it to finish up from pending opens
2390 already_AddRefed<DataChannel> DataChannelConnection::OpenFinish(
2391 already_AddRefed<DataChannel> &&aChannel) {
2392 RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2393 // Normally 1 reference if called from ::Open(), or 2 if called from
2394 // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2395 uint16_t stream = channel->mStream;
2396 bool queue = false;
2397
2398 mLock.AssertCurrentThreadOwns();
2399
2400 // Cases we care about:
2401 // Pre-negotiated:
2402 // Not Open:
2403 // Doesn't fit:
2404 // -> change initial ask or renegotiate after open
2405 // -> queue open
2406 // Open:
2407 // Doesn't fit:
2408 // -> RequestMoreStreams && queue
2409 // Does fit:
2410 // -> open
2411 // Not negotiated:
2412 // Not Open:
2413 // -> queue open
2414 // Open:
2415 // -> Try to get a stream
2416 // Doesn't fit:
2417 // -> RequestMoreStreams && queue
2418 // Does fit:
2419 // -> open
2420 // So the Open cases are basically the same
2421 // Not Open cases are simply queue for non-negotiated, and
2422 // either change the initial ask or possibly renegotiate after open.
2423
2424 if (mState == OPEN) {
2425 if (stream == INVALID_STREAM) {
2426 stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2427 }
2428 if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2429 // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra
2430 // streams to avoid going back immediately for more if the ask to N, N+1,
2431 // etc
2432 int32_t more_needed = (stream == INVALID_STREAM)
2433 ? 16
2434 : (stream - ((int32_t)mStreams.Length())) + 16;
2435 if (!RequestMoreStreams(more_needed)) {
2436 // Something bad happened... we're done
2437 goto request_error_cleanup;
2438 }
2439 queue = true;
2440 }
2441 } else {
2442 // not OPEN
2443 if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2444 mState == CLOSED) {
2445 // Update number of streams for init message
2446 struct sctp_initmsg initmsg;
2447 socklen_t len = sizeof(initmsg);
2448 int32_t total_needed = stream + 16;
2449
2450 memset(&initmsg, 0, sizeof(initmsg));
2451 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
2452 &initmsg, &len) < 0) {
2453 LOG(("*** failed getsockopt SCTP_INITMSG"));
2454 goto request_error_cleanup;
2455 }
2456 LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2457 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2458 initmsg.sinit_num_ostreams = total_needed;
2459 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2460 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG,
2461 &initmsg, (socklen_t)sizeof(initmsg)) < 0) {
2462 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2463 goto request_error_cleanup;
2464 }
2465
2466 int32_t old_len = mStreams.Length();
2467 mStreams.AppendElements(total_needed - old_len);
2468 for (int32_t i = old_len; i < total_needed; ++i) {
2469 mStreams[i] = nullptr;
2470 }
2471 }
2472 // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2473 // is called, if needed
2474 queue = true;
2475 }
2476 if (queue) {
2477 LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2478 // Also serves to mark we told the app
2479 channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2480 // we need a ref for the nsDeQue and one to return
2481 DataChannel *rawChannel = channel;
2482 rawChannel->AddRef();
2483 mPending.Push(rawChannel);
2484 return channel.forget();
2485 }
2486
2487 MOZ_ASSERT(stream != INVALID_STREAM);
2488 // just allocated (& OPEN), or externally negotiated
2489 mStreams[stream] = channel; // holds a reference
2490 channel->mStream = stream;
2491
2492 #ifdef TEST_QUEUED_DATA
2493 // It's painful to write a test for this...
2494 channel->mState = OPEN;
2495 channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2496 SendDataMsgInternalOrBuffer(channel, "Help me!", 8,
2497 DATA_CHANNEL_PPID_DOMSTRING);
2498 #endif
2499
2500 if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2501 // Don't send unordered until this gets cleared
2502 channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2503 }
2504
2505 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2506 int error = SendOpenRequestMessage(
2507 channel->mLabel, channel->mProtocol, stream,
2508 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2509 channel->mPrPolicy, channel->mPrValue);
2510 if (error) {
2511 LOG(("SendOpenRequest failed, error = %d", error));
2512 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2513 // We already returned the channel to the app.
2514 NS_ERROR("Failed to send open request");
2515 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2516 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2517 }
2518 // If we haven't returned the channel yet, it will get destroyed when we
2519 // exit this function.
2520 mStreams[stream] = nullptr;
2521 channel->mStream = INVALID_STREAM;
2522 // we'll be destroying the channel
2523 channel->mState = CLOSED;
2524 return nullptr;
2525 /* NOTREACHED */
2526 }
2527 }
2528 // Either externally negotiated or we sent Open
2529 channel->mState = OPEN;
2530 channel->mFlags |= DATA_CHANNEL_FLAGS_READY;
2531 // FIX? Move into DOMDataChannel? I don't think we can send it yet here
2532 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2533 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2534 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this, channel)));
2535
2536 return channel.forget();
2537
2538 request_error_cleanup:
2539 channel->mState = CLOSED;
2540 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2541 // We already returned the channel to the app.
2542 NS_ERROR("Failed to request more streams");
2543 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2544 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this, channel)));
2545 return channel.forget();
2546 }
2547 // we'll be destroying the channel, but it never really got set up
2548 // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2549 // Dispatch it to ourselves
2550 return nullptr;
2551 }
2552
2553 // Requires mLock to be locked!
2554 // Returns a POSIX error code directly instead of setting errno.
2555 int DataChannelConnection::SendMsgInternal(OutgoingMsg &msg) {
2556 auto &info = msg.GetInfo().sendv_sndinfo;
2557 int error;
2558
2559 // EOR set?
2560 bool eor_set = info.snd_flags & SCTP_EOR ? true : false;
2561
2562 // Send until buffer is empty
2563 size_t left = msg.GetLeft();
2564 do {
2565 size_t length;
2566
2567 // Carefully chunk the buffer
2568 if (left > DATA_CHANNEL_MAX_BINARY_FRAGMENT) {
2569 length = DATA_CHANNEL_MAX_BINARY_FRAGMENT;
2570
2571 // Unset EOR flag
2572 info.snd_flags &= ~SCTP_EOR;
2573 } else {
2574 length = left;
2575
2576 // Set EOR flag
2577 if (eor_set) {
2578 info.snd_flags |= SCTP_EOR;
2579 }
2580 }
2581
2582 // Send (or try at least)
2583 // SCTP will return EMSGSIZE if the message is bigger than the buffer
2584 // size (or EAGAIN if there isn't space). However, we can avoid EMSGSIZE
2585 // by carefully crafting small enough message chunks.
2586 ssize_t written = usrsctp_sendv(
2587 mSocket, msg.GetData(), length, nullptr, 0, (void *)&msg.GetInfo(),
2588 (socklen_t)sizeof(struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
2589 if (written < 0) {
2590 error = errno;
2591 goto out;
2592 }
2593 LOG(("Sent buffer (written=%zu, len=%zu, left=%zu)", (size_t)written,
2594 length, left - (size_t)written));
2595
2596 // TODO: Remove once resolved
2597 // (https://github.com/sctplab/usrsctp/issues/132)
2598 if (written == 0) {
2599 LOG(("@tuexen: usrsctp_sendv returned 0"));
2600 error = EAGAIN;
2601 goto out;
2602 }
2603
2604 // If not all bytes have been written, this obviously means that usrsctp's
2605 // buffer is full and we need to try again later.
2606 if ((size_t)written < length) {
2607 msg.Advance((size_t)written);
2608 error = EAGAIN;
2609 goto out;
2610 }
2611
2612 // Update buffer position
2613 msg.Advance((size_t)written);
2614
2615 // Get amount of bytes left in the buffer
2616 left = msg.GetLeft();
2617 } while (left > 0);
2618
2619 // Done
2620 error = 0;
2621
2622 out:
2623 // Reset EOR flag
2624 if (eor_set) {
2625 info.snd_flags |= SCTP_EOR;
2626 }
2627
2628 return error;
2629 }
2630
2631 // Requires mLock to be locked!
2632 // Returns a POSIX error code directly instead of setting errno.
2633 // IMPORTANT: Ensure that the buffer passed is guarded by mLock!
2634 int DataChannelConnection::SendMsgInternalOrBuffer(
2635 nsTArray<nsAutoPtr<BufferedOutgoingMsg>> &buffer, OutgoingMsg &msg,
2636 bool &buffered) {
2637 NS_WARNING_ASSERTION(msg.GetLength() > 0, "Length is 0?!");
2638
2639 int error = 0;
2640 bool need_buffering = false;
2641
2642 // Note: Main-thread IO, but doesn't block!
2643 // XXX FIX! to deal with heavy overruns of JS trying to pass data in
2644 // (more than the buffersize) queue data onto another thread to do the
2645 // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
2646
2647 // Avoid a race between buffer-full-failure (where we have to add the
2648 // packet to the buffered-data queue) and the buffer-now-only-half-full
2649 // callback, which happens on a different thread. Otherwise we might
2650 // fail here, then before we add it to the queue get the half-full
2651 // callback, find nothing to do, then on this thread add it to the
2652 // queue - which would sit there. Also, if we later send more data, it
2653 // would arrive ahead of the buffered message, but if the buffer ever
2654 // got to 1/2 full, the message would get sent - but at a semi-random
2655 // time, after other data it was supposed to be in front of.
2656
2657 // Must lock before empty check for similar reasons!
2658 mLock.AssertCurrentThreadOwns();
2659 if (buffer.IsEmpty() && (mSendInterleaved || !mPendingType)) {
2660 error = SendMsgInternal(msg);
2661 switch (error) {
2662 case 0:
2663 break;
2664 case EAGAIN:
2665 #if (EAGAIN != EWOULDBLOCK)
2666 case EWOULDBLOCK:
2667 #endif
2668 need_buffering = true;
2669 break;
2670 default:
2671 LOG(("error %d on sending", error));
2672 break;
2673 }
2674 } else {
2675 need_buffering = true;
2676 }
2677
2678 if (need_buffering) {
2679 // queue data for resend! And queue any further data for the stream until
2680 // it is...
2681 auto *bufferedMsg = new BufferedOutgoingMsg(msg); // infallible malloc
2682 buffer.AppendElement(bufferedMsg); // owned by mBufferedData array
2683 LOG(("Queued %zu buffers (left=%zu, total=%zu)", buffer.Length(),
2684 msg.GetLeft(), msg.GetLength()));
2685 buffered = true;
2686 return 0;
2687 }
2688
2689 buffered = false;
2690 return error;
2691 }
2692
2693 // Caller must ensure that length <= UINT32_MAX
2694 // Returns a POSIX error code.
2695 int DataChannelConnection::SendDataMsgInternalOrBuffer(DataChannel &channel,
2696 const uint8_t *data,
2697 size_t len,
2698 uint32_t ppid) {
2699 if (NS_WARN_IF(channel.mState != OPEN && channel.mState != CONNECTING)) {
2700 return EINVAL; // TODO: Find a better error code
2701 }
2702
2703 struct sctp_sendv_spa info = {0};
2704
2705 // General flags
2706 info.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2707
2708 // Set stream identifier, protocol identifier and flags
2709 info.sendv_sndinfo.snd_sid = channel.mStream;
2710 info.sendv_sndinfo.snd_flags = SCTP_EOR;
2711 info.sendv_sndinfo.snd_ppid = htonl(ppid);
2712
2713 // Unordered?
2714 // To avoid problems where an in-order OPEN is lost and an
2715 // out-of-order data message "beats" it, require data to be in-order
2716 // until we get an ACK.
2717 if ((channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2718 !(channel.mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2719 info.sendv_sndinfo.snd_flags |= SCTP_UNORDERED;
2720 }
2721
2722 // Partial reliability policy
2723 if (channel.mPrPolicy != SCTP_PR_SCTP_NONE) {
2724 info.sendv_prinfo.pr_policy = channel.mPrPolicy;
2725 info.sendv_prinfo.pr_value = channel.mPrValue;
2726 info.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2727 }
2728
2729 // Create message instance and send
2730 OutgoingMsg msg(info, data, len);
2731 MutexAutoLock lock(mLock);
2732 bool buffered;
2733 int error = SendMsgInternalOrBuffer(channel.mBufferedData, msg, buffered);
2734
2735 // Set pending type and stream index (if buffered)
2736 if (!error && buffered && !mPendingType) {
2737 mPendingType = PENDING_DATA;
2738 mCurrentStream = channel.mStream;
2739 }
2740 return error;
2741 }
2742
2743 // Caller must ensure that length <= UINT32_MAX
2744 // Returns a POSIX error code.
2745 int DataChannelConnection::SendDataMsg(DataChannel &channel,
2746 const uint8_t *data, size_t len,
2747 uint32_t ppidPartial,
2748 uint32_t ppidFinal) {
2749 // We *really* don't want to do this from main thread! - and
2750 // SendDataMsgInternalOrBuffer avoids blocking.
2751
2752 if (mPpidFragmentation) {
2753 // TODO: Bug 1381136, remove this block and all other code that uses PPIDs
2754 // for fragmentation and reassembly once older Firefoxes without EOR
2755 // are no longer supported as target clients.
2756
2757 // Use the deprecated PPID-level fragmentation if enabled. Should be enabled
2758 // in case we can be certain that the other peer is an older Firefox browser
2759 // that does support PPID-level fragmentation/reassembly.
2760
2761 // PPID-level fragmentation can only be applied on reliable data channels.
2762 if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2763 channel.mPrPolicy == DATA_CHANNEL_RELIABLE &&
2764 !(channel.mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2765 LOG((
2766 "Sending data message (total=%zu) using deprecated PPID-based chunks",
2767 len));
2768
2769 size_t left = len;
2770 while (left > 0) {
2771 // Note: For correctness, chunkLen should also consider mMaxMessageSize
2772 // as minimum but as this block is going to be removed soon, I
2773 // see no need for it.
2774 size_t chunkLen =
2775 std::min<size_t>(left, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2776 left -= chunkLen;
2777 uint32_t ppid = left > 0 ? ppidPartial : ppidFinal;
2778
2779 // Send the chunk
2780 // Note that these might end up being deferred and queued.
2781 LOG(("Send chunk (len=%zu, left=%zu, total=%zu, ppid %u", chunkLen,
2782 left, len, ppid));
2783 int error = SendDataMsgInternalOrBuffer(channel, data, chunkLen, ppid);
2784 if (error) {
2785 LOG(("*** send chunk fail %d", error));
2786 return error;
2787 }
2788
2789 // Update data position
2790 data += chunkLen;
2791 }
2792
2793 // Sending chunks complete
2794 LOG(("Sent %zu chunks using deprecated PPID-based fragmentation",
2795 (size_t)(len + DATA_CHANNEL_MAX_BINARY_FRAGMENT - 1) /
2796 DATA_CHANNEL_MAX_BINARY_FRAGMENT));
2797 return 0;
2798 }
2799
2800 // Cannot do PPID-based fragmentaton on unreliable channels
2801 NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2802 "Sending too-large data on unreliable channel!");
2803 } else {
2804 if (mMaxMessageSize != 0 && len > mMaxMessageSize) {
2805 LOG(("Message rejected, too large (%zu > %" PRIu64 ")", len,
2806 mMaxMessageSize));
2807 return EMSGSIZE;
2808 }
2809 }
2810
2811 // This will use EOR-based fragmentation if the message is too large (> 64
2812 // KiB)
2813 return SendDataMsgInternalOrBuffer(channel, data, len, ppidFinal);
2814 }
2815
2816 class ReadBlobRunnable : public Runnable {
2817 public:
2818 ReadBlobRunnable(DataChannelConnection *aConnection, uint16_t aStream,
2819 nsIInputStream *aBlob)
2820 : Runnable("ReadBlobRunnable"),
2821 mConnection(aConnection),
2822 mStream(aStream),
2823 mBlob(aBlob) {}
2824
2825 NS_IMETHOD Run() override {
2826 // ReadBlob() is responsible to releasing the reference
2827 DataChannelConnection *self = mConnection;
2828 self->ReadBlob(mConnection.forget(), mStream, mBlob);
2829 return NS_OK;
2830 }
2831
2832 private:
2833 // Make sure the Connection doesn't die while there are jobs outstanding.
2834 // Let it die (if released by PeerConnectionImpl while we're running)
2835 // when we send our runnable back to MainThread. Then ~DataChannelConnection
2836 // can send the IOThread to MainThread to die in a runnable, avoiding
2837 // unsafe event loop recursion. Evil.
2838 RefPtr<DataChannelConnection> mConnection;
2839 uint16_t mStream;
2840 // Use RefCount for preventing the object is deleted when SendBlob returns.
2841 RefPtr<nsIInputStream> mBlob;
2842 };
2843
2844 // Returns a POSIX error code.
2845 int DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob) {
2846 DataChannel *channel = mStreams[stream];
2847 if (NS_WARN_IF(!channel)) {
2848 return EINVAL; // TODO: Find a better error code
2849 }
2850
2851 // Spawn a thread to send the data
2852 if (!mInternalIOThread) {
2853 nsresult rv =
2854 NS_NewNamedThread("DataChannel IO", getter_AddRefs(mInternalIOThread));
2855 if (NS_FAILED(rv)) {
2856 return EINVAL; // TODO: Find a better error code
2857 }
2858 }
2859
2860 mInternalIOThread->Dispatch(
2861 do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2862 return 0;
2863 }
2864
2865 class DataChannelBlobSendRunnable : public Runnable {
2866 public:
2867 DataChannelBlobSendRunnable(
2868 already_AddRefed<DataChannelConnection> &aConnection, uint16_t aStream)
2869 : Runnable("DataChannelBlobSendRunnable"),
2870 mConnection(aConnection),
2871 mStream(aStream) {}
2872
2873 ~DataChannelBlobSendRunnable() override {
2874 if (!NS_IsMainThread() && mConnection) {
2875 MOZ_ASSERT(false);
2876 // explicitly leak the connection if destroyed off mainthread
2877 Unused << mConnection.forget().take();
2878 }
2879 }
2880
2881 NS_IMETHOD Run() override {
2882 ASSERT_WEBRTC(NS_IsMainThread());
2883
2884 mConnection->SendBinaryMsg(mStream, mData);
2885 mConnection = nullptr;
2886 return NS_OK;
2887 }
2888
2889 // explicitly public so we can avoid allocating twice and copying
2890 nsCString mData;
2891
2892 private:
2893 // Note: we can be destroyed off the target thread, so be careful not to let
2894 // this get Released()ed on the temp thread!
2895 RefPtr<DataChannelConnection> mConnection;
2896 uint16_t mStream;
2897 };
2898
2899 void DataChannelConnection::ReadBlob(
2900 already_AddRefed<DataChannelConnection> aThis, uint16_t aStream,
2901 nsIInputStream *aBlob) {
2902 // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2903 // it off mainthread; if PeerConnectionImpl has released then we want
2904 // ~DataChannelConnection() to run on MainThread
2905
2906 // XXX to do this safely, we must enqueue these atomically onto the
2907 // output socket. We need a sender thread(s?) to enqueue data into the
2908 // socket and to avoid main-thread IO that might block. Even on a
2909 // background thread, we may not want to block on one stream's data.
2910 // I.e. run non-blocking and service multiple channels.
2911
2912 // Must not let Dispatching it cause the DataChannelConnection to get
2913 // released on the wrong thread. Using
2914 // WrapRunnable(RefPtr<DataChannelConnection>(aThis),... will occasionally
2915 // cause aThis to get released on this thread. Also, an explicit Runnable
2916 // lets us avoid copying the blob data an extra time.
2917 RefPtr<DataChannelBlobSendRunnable> runnable =
2918 new DataChannelBlobSendRunnable(aThis, aStream);
2919 // avoid copying the blob data by passing the mData from the runnable
2920 if (NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, -1))) {
2921 // Bug 966602: Doesn't return an error to the caller via onerror.
2922 // We must release DataChannelConnection on MainThread to avoid issues (bug
2923 // 876167) aThis is now owned by the runnable; release it there
2924 NS_ReleaseOnMainThreadSystemGroup("DataChannelBlobSendRunnable",
2925 runnable.forget());
2926 return;
2927 }
2928 aBlob->Close();
2929 Dispatch(runnable.forget());
2930 }
2931
2932 void DataChannelConnection::GetStreamIds(std::vector<uint16_t> *aStreamList) {
2933 ASSERT_WEBRTC(NS_IsMainThread());
2934 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2935 if (mStreams[i]) {
2936 aStreamList->push_back(mStreams[i]->mStream);
2937 }
2938 }
2939 }
2940
2941 // Returns a POSIX error code.
2942 int DataChannelConnection::SendDataMsgCommon(uint16_t stream,
2943 const nsACString &aMsg,
2944 bool isBinary) {
2945 ASSERT_WEBRTC(NS_IsMainThread());
2946 // We really could allow this from other threads, so long as we deal with
2947 // asynchronosity issues with channels closing, in particular access to
2948 // mStreams, and issues with the association closing (access to mSocket).
2949
2950 const uint8_t *data = (const uint8_t *)aMsg.BeginReading();
2951 uint32_t len = aMsg.Length();
2952 #if (UINT32_MAX > SIZE_MAX)
2953 if (len > SIZE_MAX) {
2954 return EMSGSIZE;
2955 }
2956 #endif
2957 DataChannel *channelPtr;
2958
2959 LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream,
2960 len));
2961 // XXX if we want more efficiency, translate flags once at open time
2962 channelPtr = mStreams[stream];
2963 if (NS_WARN_IF(!channelPtr)) {
2964 return EINVAL; // TODO: Find a better error code
2965 }
2966
2967 auto &channel = *channelPtr;
2968
2969 if (isBinary) {
2970 return SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_BINARY_PARTIAL,
2971 DATA_CHANNEL_PPID_BINARY);
2972 } else {
2973 return SendDataMsg(channel, data, len, DATA_CHANNEL_PPID_DOMSTRING_PARTIAL,
2974 DATA_CHANNEL_PPID_DOMSTRING);
2975 }
2976 }
2977
2978 void DataChannelConnection::Stop() {
2979 // Note: This will call 'CloseAll' from the main thread
2980 Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
2981 DataChannelOnMessageAvailable::ON_DISCONNECTED, this)));
2982 }
2983
2984 void DataChannelConnection::Close(DataChannel *aChannel) {
2985 MutexAutoLock lock(mLock);
2986 CloseInt(aChannel);
2987 }
2988
2989 // So we can call Close() with the lock already held
2990 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2991 void DataChannelConnection::CloseInt(DataChannel *aChannel) {
2992 MOZ_ASSERT(aChannel);
2993 RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
2994
2995 mLock.AssertCurrentThreadOwns();
2996 LOG(("Connection %p/Channel %p: Closing stream %u",
2997 channel->mConnection.get(), channel.get(), channel->mStream));
2998 // re-test since it may have closed before the lock was grabbed
2999 if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
3000 LOG(("Channel already closing/closed (%u)", aChannel->mState));
3001 if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
3002 // called from CloseAll()
3003 // we're not going to hang around waiting any more
3004 mStreams[channel->mStream] = nullptr;
3005 }
3006 return;
3007 }
3008 aChannel->mBufferedData.Clear();
3009 if (channel->mStream != INVALID_STREAM) {
3010 ResetOutgoingStream(channel->mStream);
3011 if (mState == CLOSED) { // called from CloseAll()
3012 // Let resets accumulate then send all at once in CloseAll()
3013 // we're not going to hang around waiting
3014 mStreams[channel->mStream] = nullptr;
3015 } else {
3016 SendOutgoingStreamReset();
3017 }
3018 }
3019 aChannel->mState = CLOSING;
3020 if (mState == CLOSED) {
3021 // we're not going to hang around waiting
3022 channel->StreamClosedLocked();
3023 }
3024 // At this point when we leave here, the object is a zombie held alive only by
3025 // the DOM object
3026 }
3027
3028 void DataChannelConnection::CloseAll() {
3029 LOG(("Closing all channels (connection %p)", (void *)this));
3030 // Don't need to lock here
3031
3032 // Make sure no more channels will be opened
3033 {
3034 MutexAutoLock lock(mLock);
3035 mState = CLOSED;
3036 }
3037
3038 // Close current channels
3039 // If there are runnables, they hold a strong ref and keep the channel
3040 // and/or connection alive (even if in a CLOSED state)
3041 bool closed_some = false;
3042 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
3043 if (mStreams[i]) {
3044 mStreams[i]->Close();
3045 closed_some = true;
3046 }
3047 }
3048
3049 // Clean up any pending opens for channels
3050 RefPtr<DataChannel> channel;
3051 while (nullptr != (channel = dont_AddRef(
3052 static_cast<DataChannel *>(mPending.PopFront())))) {
3053 LOG(("closing pending channel %p, stream %u", channel.get(),
3054 channel->mStream));
3055 channel->Close(); // also releases the ref on each iteration
3056 closed_some = true;
3057 }
3058 // It's more efficient to let the Resets queue in shutdown and then
3059 // SendOutgoingStreamReset() here.
3060 if (closed_some) {
3061 MutexAutoLock lock(mLock);
3062 SendOutgoingStreamReset();
3063 }
3064 }
3065
3066 DataChannel::~DataChannel() {
3067 // NS_ASSERTION since this is more "I think I caught all the cases that
3068 // can cause this" than a true kill-the-program assertion. If this is
3069 // wrong, nothing bad happens. A worst it's a leak.
3070 NS_ASSERTION(mState == CLOSED || mState == CLOSING,
3071 "unexpected state in ~DataChannel");
3072 }
3073
3074 void DataChannel::Close() {
3075 if (mConnection) {
3076 // ensure we don't get deleted
3077 RefPtr<DataChannelConnection> connection(mConnection);
3078 connection->Close(this);
3079 }
3080 }
3081
3082 // Used when disconnecting from the DataChannelConnection
3083 void DataChannel::StreamClosedLocked() {
3084 mConnection->mLock.AssertCurrentThreadOwns();
3085 ENSURE_DATACONNECTION;
3086
3087 LOG(("Destroying Data channel %u", mStream));
3088 MOZ_ASSERT_IF(mStream != INVALID_STREAM,
3089 !mConnection->FindChannelByStream(mStream));
3090 mStream = INVALID_STREAM;
3091 mState = CLOSED;
3092 mMainThreadEventTarget->Dispatch(do_AddRef(new DataChannelOnMessageAvailable(
3093 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, mConnection, this)));
3094 // We leave mConnection live until the DOM releases us, to avoid races
3095 }
3096
3097 void DataChannel::ReleaseConnection() {
3098 ASSERT_WEBRTC(NS_IsMainThread());
3099 mConnection = nullptr;
3100 }
3101
3102 void DataChannel::SetListener(DataChannelListener *aListener,
3103 nsISupports *aContext) {
3104 MutexAutoLock mLock(mListenerLock);
3105 mContext = aContext;
3106 mListener = aListener;
3107 }
3108
3109 void DataChannel::SendErrnoToErrorResult(int error, ErrorResult &aRv) {
3110 switch (error) {
3111 case 0:
3112 break;
3113 case EMSGSIZE:
3114 aRv.Throw(NS_ERROR_DOM_TYPE_ERR);
3115 break;
3116 default:
3117 aRv.Throw(NS_ERROR_DOM_OPERATION_ERR);
3118 break;
3119 }
3120 }
3121
3122 void DataChannel::SendMsg(const nsACString &aMsg, ErrorResult &aRv) {
3123 if (!EnsureValidStream(aRv)) {
3124 return;
3125 }
3126
3127 SendErrnoToErrorResult(mConnection->SendMsg(mStream, aMsg), aRv);
3128 }
3129
3130 void DataChannel::SendBinaryMsg(const nsACString &aMsg, ErrorResult &aRv) {
3131 if (!EnsureValidStream(aRv)) {
3132 return;
3133 }
3134
3135 SendErrnoToErrorResult(mConnection->SendBinaryMsg(mStream, aMsg), aRv);
3136 }
3137
3138 void DataChannel::SendBinaryStream(nsIInputStream *aBlob, ErrorResult &aRv) {
3139 if (!EnsureValidStream(aRv)) {
3140 return;
3141 }
3142
3143 SendErrnoToErrorResult(mConnection->SendBlob(mStream, aBlob), aRv);
3144 }
3145
3146 // May be called from another (i.e. Main) thread!
3147 void DataChannel::AppReady() {
3148 ENSURE_DATACONNECTION;
3149
3150 MutexAutoLock lock(mConnection->mLock);
3151
3152 mFlags |= DATA_CHANNEL_FLAGS_READY;
3153 if (mState == WAITING_TO_OPEN) {
3154 mState = OPEN;
3155 mMainThreadEventTarget->Dispatch(
3156 do_AddRef(new DataChannelOnMessageAvailable(
3157 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
3158 this)));
3159 for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
3160 nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
3161 MOZ_ASSERT(runnable);
3162 mMainThreadEventTarget->Dispatch(runnable.forget());
3163 }
3164 } else {
3165 NS_ASSERTION(mQueuedMessages.IsEmpty(),
3166 "Shouldn't have queued messages if not WAITING_TO_OPEN");
3167 }
3168 mQueuedMessages.Clear();
3169 mQueuedMessages.Compact();
3170 // We never use it again... We could even allocate the array in the odd
3171 // cases we need it.
3172 }
3173
3174 size_t DataChannel::GetBufferedAmountLocked() const {
3175 size_t buffered = 0;
3176
3177 for (auto &msg : mBufferedData) {
3178 buffered += msg->GetLeft();
3179 }
3180 // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
3181 // amount from the SCTP stack for a single stream. It is on their to-do
3182 // list, and once we import a stack with support for that, we'll need to
3183 // add it to what we buffer. Also we'll need to ask for notification of a
3184 // per- stream buffer-low event and merge that into the handling of buffer-low
3185 // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
3186
3187 return buffered;
3188 }
3189
3190 uint32_t DataChannel::GetBufferedAmountLowThreshold() {
3191 return mBufferedThreshold;
3192 }
3193
3194 // Never fire immediately, as it's defined to fire on transitions, not state
3195 void DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold) {
3196 mBufferedThreshold = aThreshold;
3197 }
3198
3199 // Called with mLock locked!
3200 void DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage) {
3201 if (!(mFlags & DATA_CHANNEL_FLAGS_READY) &&
3202 (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
3203 mQueuedMessages.AppendElement(aMessage);
3204 } else {
3205 nsCOMPtr<nsIRunnable> runnable = aMessage;
3206 mMainThreadEventTarget->Dispatch(runnable.forget());
3207 }
3208 }
3209
3210 bool DataChannel::EnsureValidStream(ErrorResult &aRv) {
3211 MOZ_ASSERT(mConnection);
3212 if (mConnection && mStream != INVALID_STREAM) {
3213 return true;
3214 } else {
3215 aRv.Throw(NS_ERROR_DOM_INVALID_STATE_ERR);
3216 return false;
3217 }
3218 }
3219
3220 } // namespace mozilla
3221