1 /* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim: set ts=2 et sw=2 tw=80: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
5 * You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7 #include <algorithm>
8 #include <stdio.h>
9 #include <stdlib.h>
10 #if !defined(__Userspace_os_Windows)
11 #include <arpa/inet.h>
12 #endif
13 // usrsctp.h expects to have errno definitions prior to its inclusion.
14 #include <errno.h>
15
16 #define SCTP_DEBUG 1
17 #define SCTP_STDINT_INCLUDE <stdint.h>
18
19 #ifdef _MSC_VER
20 // Disable "warning C4200: nonstandard extension used : zero-sized array in
21 // struct/union"
22 // ...which the third-party file usrsctp.h runs afoul of.
23 #pragma warning(push)
24 #pragma warning(disable:4200)
25 #endif
26
27 #include "usrsctp.h"
28
29 #ifdef _MSC_VER
30 #pragma warning(pop)
31 #endif
32
33 #include "DataChannelLog.h"
34
35 #include "nsServiceManagerUtils.h"
36 #include "nsIObserverService.h"
37 #include "nsIObserver.h"
38 #include "mozilla/Services.h"
39 #include "mozilla/Sprintf.h"
40 #include "nsProxyRelease.h"
41 #include "nsThread.h"
42 #include "nsThreadUtils.h"
43 #include "nsAutoPtr.h"
44 #include "nsNetUtil.h"
45 #include "nsNetCID.h"
46 #include "mozilla/StaticPtr.h"
47 #include "mozilla/Unused.h"
48 #ifdef MOZ_PEERCONNECTION
49 #include "mtransport/runnable_utils.h"
50 #endif
51
52 #define DATACHANNEL_LOG(args) LOG(args)
53 #include "DataChannel.h"
54 #include "DataChannelProtocol.h"
55
56 // Let us turn on and off important assertions in non-debug builds
57 #ifdef DEBUG
58 #define ASSERT_WEBRTC(x) MOZ_ASSERT((x))
59 #elif defined(MOZ_WEBRTC_ASSERT_ALWAYS)
60 #define ASSERT_WEBRTC(x) do { if (!(x)) { MOZ_CRASH(); } } while (0)
61 #endif
62
63 static bool sctp_initialized;
64
65 namespace mozilla {
66
67 LazyLogModule gDataChannelLog("DataChannel");
68 static LazyLogModule gSCTPLog("SCTP");
69
70 class DataChannelShutdown : public nsIObserver
71 {
72 public:
73 // This needs to be tied to some form object that is guaranteed to be
74 // around (singleton likely) unless we want to shutdown sctp whenever
75 // we're not using it (and in which case we'd keep a refcnt'd object
76 // ref'd by each DataChannelConnection to release the SCTP usrlib via
77 // sctp_finish). Right now, the single instance of this class is
78 // owned by the observer service.
79
80 NS_DECL_ISUPPORTS
81
DataChannelShutdown()82 DataChannelShutdown() {}
83
Init()84 void Init()
85 {
86 nsCOMPtr<nsIObserverService> observerService =
87 mozilla::services::GetObserverService();
88 if (!observerService)
89 return;
90
91 nsresult rv = observerService->AddObserver(this,
92 "xpcom-will-shutdown",
93 false);
94 MOZ_ASSERT(rv == NS_OK);
95 (void) rv;
96 }
97
98 private:
99 // The only instance of DataChannelShutdown is owned by the observer
100 // service, so there is no need to call RemoveObserver here.
101 virtual ~DataChannelShutdown() = default;
102
103 public:
Observe(nsISupports * aSubject,const char * aTopic,const char16_t * aData)104 NS_IMETHOD Observe(nsISupports* aSubject, const char* aTopic,
105 const char16_t* aData) override {
106 if (strcmp(aTopic, "xpcom-will-shutdown") == 0) {
107 LOG(("Shutting down SCTP"));
108 if (sctp_initialized) {
109 usrsctp_finish();
110 sctp_initialized = false;
111 }
112 nsCOMPtr<nsIObserverService> observerService =
113 mozilla::services::GetObserverService();
114 if (!observerService)
115 return NS_ERROR_FAILURE;
116
117 nsresult rv = observerService->RemoveObserver(this,
118 "xpcom-will-shutdown");
119 MOZ_ASSERT(rv == NS_OK);
120 (void) rv;
121 }
122 return NS_OK;
123 }
124 };
125
126 NS_IMPL_ISUPPORTS(DataChannelShutdown, nsIObserver);
127
BufferedMsg(struct sctp_sendv_spa & spa,const char * data,size_t length)128 BufferedMsg::BufferedMsg(struct sctp_sendv_spa &spa, const char *data,
129 size_t length) : mLength(length)
130 {
131 mSpa = new sctp_sendv_spa;
132 *mSpa = spa;
133 auto *tmp = new char[length]; // infallible malloc!
134 memcpy(tmp, data, length);
135 mData = tmp;
136 }
137
~BufferedMsg()138 BufferedMsg::~BufferedMsg()
139 {
140 delete mSpa;
141 delete mData;
142 }
143
144 static int
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv,int flags,void * ulp_info)145 receive_cb(struct socket* sock, union sctp_sockstore addr,
146 void *data, size_t datalen,
147 struct sctp_rcvinfo rcv, int flags, void *ulp_info)
148 {
149 DataChannelConnection *connection = static_cast<DataChannelConnection*>(ulp_info);
150 return connection->ReceiveCallback(sock, data, datalen, rcv, flags);
151 }
152
153 static
154 DataChannelConnection *
GetConnectionFromSocket(struct socket * sock)155 GetConnectionFromSocket(struct socket* sock)
156 {
157 struct sockaddr *addrs = nullptr;
158 int naddrs = usrsctp_getladdrs(sock, 0, &addrs);
159 if (naddrs <= 0 || addrs[0].sa_family != AF_CONN) {
160 return nullptr;
161 }
162 // usrsctp_getladdrs() returns the addresses bound to this socket, which
163 // contains the SctpDataMediaChannel* as sconn_addr. Read the pointer,
164 // then free the list of addresses once we have the pointer. We only open
165 // AF_CONN sockets, and they should all have the sconn_addr set to the
166 // pointer that created them, so [0] is as good as any other.
167 struct sockaddr_conn *sconn = reinterpret_cast<struct sockaddr_conn *>(&addrs[0]);
168 DataChannelConnection *connection =
169 reinterpret_cast<DataChannelConnection *>(sconn->sconn_addr);
170 usrsctp_freeladdrs(addrs);
171
172 return connection;
173 }
174
175 // called when the buffer empties to the threshold value
176 static int
threshold_event(struct socket * sock,uint32_t sb_free)177 threshold_event(struct socket* sock, uint32_t sb_free)
178 {
179 DataChannelConnection *connection = GetConnectionFromSocket(sock);
180 if (connection) {
181 LOG(("SendDeferred()"));
182 connection->SendDeferredMessages();
183 } else {
184 LOG(("Can't find connection for socket %p", sock));
185 }
186 return 0;
187 }
188
189 static void
debug_printf(const char * format,...)190 debug_printf(const char *format, ...)
191 {
192 va_list ap;
193 char buffer[1024];
194
195 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
196 va_start(ap, format);
197 #ifdef _WIN32
198 if (vsnprintf_s(buffer, sizeof(buffer), _TRUNCATE, format, ap) > 0) {
199 #else
200 if (VsprintfLiteral(buffer, format, ap) > 0) {
201 #endif
202 PR_LogPrint("%s", buffer);
203 }
204 va_end(ap);
205 }
206 }
207
208 DataChannelConnection::DataChannelConnection(DataConnectionListener *listener) :
209 mLock("netwerk::sctp::DataChannelConnection")
210 {
211 mState = CLOSED;
212 mSocket = nullptr;
213 mMasterSocket = nullptr;
214 mListener = listener;
215 mLocalPort = 0;
216 mRemotePort = 0;
217 LOG(("Constructor DataChannelConnection=%p, listener=%p", this, mListener.get()));
218 mInternalIOThread = nullptr;
219 }
220
221 DataChannelConnection::~DataChannelConnection()
222 {
223 LOG(("Deleting DataChannelConnection %p", (void *) this));
224 // This may die on the MainThread, or on the STS thread
225 ASSERT_WEBRTC(mState == CLOSED);
226 MOZ_ASSERT(!mMasterSocket);
227 MOZ_ASSERT(mPending.GetSize() == 0);
228
229 // Already disconnected from sigslot/mTransportFlow
230 // TransportFlows must be released from the STS thread
231 if (!IsSTSThread()) {
232 ASSERT_WEBRTC(NS_IsMainThread());
233 if (mTransportFlow) {
234 ASSERT_WEBRTC(mSTS);
235 NS_ProxyRelease(mSTS, mTransportFlow.forget());
236 }
237
238 if (mInternalIOThread) {
239 // Avoid spinning the event thread from here (which if we're mainthread
240 // is in the event loop already)
241 NS_DispatchToMainThread(WrapRunnable(nsCOMPtr<nsIThread>(mInternalIOThread),
242 &nsIThread::Shutdown),
243 NS_DISPATCH_NORMAL);
244 }
245 } else {
246 // on STS, safe to call shutdown
247 if (mInternalIOThread) {
248 mInternalIOThread->Shutdown();
249 }
250 }
251 }
252
253 void
254 DataChannelConnection::Destroy()
255 {
256 // Though it's probably ok to do this and close the sockets;
257 // if we really want it to do true clean shutdowns it can
258 // create a dependant Internal object that would remain around
259 // until the network shut down the association or timed out.
260 LOG(("Destroying DataChannelConnection %p", (void *) this));
261 ASSERT_WEBRTC(NS_IsMainThread());
262 CloseAll();
263
264 MutexAutoLock lock(mLock);
265 // If we had a pending reset, we aren't waiting for it - clear the list so
266 // we can deregister this DataChannelConnection without leaking.
267 ClearResets();
268
269 MOZ_ASSERT(mSTS);
270 ASSERT_WEBRTC(NS_IsMainThread());
271 // Must do this in Destroy() since we may then delete this object.
272 // Do this before dispatching to create a consistent ordering of calls to
273 // the SCTP stack.
274 if (mUsingDtls) {
275 usrsctp_deregister_address(static_cast<void *>(this));
276 LOG(("Deregistered %p from the SCTP stack.", static_cast<void *>(this)));
277 }
278
279 // Finish Destroy on STS thread to avoid bug 876167 - once that's fixed,
280 // the usrsctp_close() calls can move back here (and just proxy the
281 // disconnect_all())
282 RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
283 &DataChannelConnection::DestroyOnSTS,
284 mSocket, mMasterSocket),
285 NS_DISPATCH_NORMAL);
286
287 // These will be released on STS
288 mSocket = nullptr;
289 mMasterSocket = nullptr; // also a flag that we've Destroyed this connection
290
291 // We can't get any more new callbacks from the SCTP library
292 // All existing callbacks have refs to DataChannelConnection
293
294 // nsDOMDataChannel objects have refs to DataChannels that have refs to us
295 }
296
297 void DataChannelConnection::DestroyOnSTS(struct socket *aMasterSocket,
298 struct socket *aSocket)
299 {
300 if (aSocket && aSocket != aMasterSocket)
301 usrsctp_close(aSocket);
302 if (aMasterSocket)
303 usrsctp_close(aMasterSocket);
304
305 disconnect_all();
306 }
307
308 bool
309 DataChannelConnection::Init(unsigned short aPort, uint16_t aNumStreams, bool aUsingDtls)
310 {
311 struct sctp_initmsg initmsg;
312 struct sctp_udpencaps encaps;
313 struct sctp_assoc_value av;
314 struct sctp_event event;
315 socklen_t len;
316
317 uint16_t event_types[] = {SCTP_ASSOC_CHANGE,
318 SCTP_PEER_ADDR_CHANGE,
319 SCTP_REMOTE_ERROR,
320 SCTP_SHUTDOWN_EVENT,
321 SCTP_ADAPTATION_INDICATION,
322 SCTP_SEND_FAILED_EVENT,
323 SCTP_STREAM_RESET_EVENT,
324 SCTP_STREAM_CHANGE_EVENT};
325 {
326 ASSERT_WEBRTC(NS_IsMainThread());
327
328 // MutexAutoLock lock(mLock); Not needed since we're on mainthread always
329 if (!sctp_initialized) {
330 if (aUsingDtls) {
331 LOG(("sctp_init(DTLS)"));
332 #ifdef MOZ_PEERCONNECTION
333 usrsctp_init(0,
334 DataChannelConnection::SctpDtlsOutput,
335 debug_printf
336 );
337 #else
338 NS_ASSERTION(!aUsingDtls, "Trying to use SCTP/DTLS without mtransport");
339 #endif
340 } else {
341 LOG(("sctp_init(%u)", aPort));
342 usrsctp_init(aPort,
343 nullptr,
344 debug_printf
345 );
346 }
347
348 // Set logging to SCTP:LogLevel::Debug to get SCTP debugs
349 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
350 usrsctp_sysctl_set_sctp_debug_on(SCTP_DEBUG_ALL);
351 }
352
353 usrsctp_sysctl_set_sctp_blackhole(2);
354 // ECN is currently not supported by the Firefox code
355 usrsctp_sysctl_set_sctp_ecn_enable(0);
356 sctp_initialized = true;
357
358 RefPtr<DataChannelShutdown> shutdown = new DataChannelShutdown();
359 shutdown->Init();
360 }
361 }
362
363 // XXX FIX! make this a global we get once
364 // Find the STS thread
365 nsresult rv;
366 mSTS = do_GetService(NS_SOCKETTRANSPORTSERVICE_CONTRACTID, &rv);
367 MOZ_ASSERT(NS_SUCCEEDED(rv));
368
369 // Open sctp with a callback
370 if ((mMasterSocket = usrsctp_socket(
371 aUsingDtls ? AF_CONN : AF_INET,
372 SOCK_STREAM, IPPROTO_SCTP, receive_cb, threshold_event,
373 usrsctp_sysctl_get_sctp_sendspace() / 2, this)) == nullptr) {
374 return false;
375 }
376
377 // Make non-blocking for bind/connect. SCTP over UDP defaults to non-blocking
378 // in associations for normal IO
379 if (usrsctp_set_non_blocking(mMasterSocket, 1) < 0) {
380 LOG(("Couldn't set non_blocking on SCTP socket"));
381 // We can't handle connect() safely if it will block, not that this will
382 // even happen.
383 goto error_cleanup;
384 }
385
386 // Make sure when we close the socket, make sure it doesn't call us back again!
387 // This would cause it try to use an invalid DataChannelConnection pointer
388 struct linger l;
389 l.l_onoff = 1;
390 l.l_linger = 0;
391 if (usrsctp_setsockopt(mMasterSocket, SOL_SOCKET, SO_LINGER,
392 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
393 LOG(("Couldn't set SO_LINGER on SCTP socket"));
394 // unsafe to allow it to continue if this fails
395 goto error_cleanup;
396 }
397
398 // XXX Consider disabling this when we add proper SDP negotiation.
399 // We may want to leave enabled for supporting 'cloning' of SDP offers, which
400 // implies re-use of the same pseudo-port number, or forcing a renegotiation.
401 {
402 uint32_t on = 1;
403 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REUSE_PORT,
404 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
405 LOG(("Couldn't set SCTP_REUSE_PORT on SCTP socket"));
406 }
407 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_NODELAY,
408 (const void *)&on, (socklen_t)sizeof(on)) < 0) {
409 LOG(("Couldn't set SCTP_NODELAY on SCTP socket"));
410 }
411 }
412
413 if (!aUsingDtls) {
414 memset(&encaps, 0, sizeof(encaps));
415 encaps.sue_address.ss_family = AF_INET;
416 encaps.sue_port = htons(aPort);
417 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_REMOTE_UDP_ENCAPS_PORT,
418 (const void*)&encaps,
419 (socklen_t)sizeof(struct sctp_udpencaps)) < 0) {
420 LOG(("*** failed encaps errno %d", errno));
421 goto error_cleanup;
422 }
423 LOG(("SCTP encapsulation local port %d", aPort));
424 }
425
426 av.assoc_id = SCTP_ALL_ASSOC;
427 av.assoc_value = SCTP_ENABLE_RESET_STREAM_REQ | SCTP_ENABLE_CHANGE_ASSOC_REQ;
428 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET, &av,
429 (socklen_t)sizeof(struct sctp_assoc_value)) < 0) {
430 LOG(("*** failed enable stream reset errno %d", errno));
431 goto error_cleanup;
432 }
433
434 /* Enable the events of interest. */
435 memset(&event, 0, sizeof(event));
436 event.se_assoc_id = SCTP_ALL_ASSOC;
437 event.se_on = 1;
438 for (unsigned short event_type : event_types) {
439 event.se_type = event_type;
440 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_EVENT, &event, sizeof(event)) < 0) {
441 LOG(("*** failed setsockopt SCTP_EVENT errno %d", errno));
442 goto error_cleanup;
443 }
444 }
445
446 // Update number of streams
447 mStreams.AppendElements(aNumStreams);
448 for (uint32_t i = 0; i < aNumStreams; ++i) {
449 mStreams[i] = nullptr;
450 }
451 memset(&initmsg, 0, sizeof(initmsg));
452 len = sizeof(initmsg);
453 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
454 LOG(("*** failed getsockopt SCTP_INITMSG"));
455 goto error_cleanup;
456 }
457 LOG(("Setting number of SCTP streams to %u, was %u/%u", aNumStreams,
458 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
459 initmsg.sinit_num_ostreams = aNumStreams;
460 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
461 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
462 (socklen_t)sizeof(initmsg)) < 0) {
463 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
464 goto error_cleanup;
465 }
466
467 mSocket = nullptr;
468 if (aUsingDtls) {
469 mUsingDtls = true;
470 usrsctp_register_address(static_cast<void *>(this));
471 LOG(("Registered %p within the SCTP stack.", static_cast<void *>(this)));
472 } else {
473 mUsingDtls = false;
474 }
475 return true;
476
477 error_cleanup:
478 usrsctp_close(mMasterSocket);
479 mMasterSocket = nullptr;
480 mUsingDtls = false;
481 return false;
482 }
483
484 #ifdef MOZ_PEERCONNECTION
485 void
486 DataChannelConnection::SetEvenOdd()
487 {
488 ASSERT_WEBRTC(IsSTSThread());
489
490 TransportLayerDtls *dtls = static_cast<TransportLayerDtls *>(
491 mTransportFlow->GetLayer(TransportLayerDtls::ID()));
492 MOZ_ASSERT(dtls); // DTLS is mandatory
493 mAllocateEven = (dtls->role() == TransportLayerDtls::CLIENT);
494 }
495
496 bool
497 DataChannelConnection::ConnectViaTransportFlow(TransportFlow *aFlow, uint16_t localport, uint16_t remoteport)
498 {
499 LOG(("Connect DTLS local %u, remote %u", localport, remoteport));
500
501 NS_PRECONDITION(mMasterSocket, "SCTP wasn't initialized before ConnectViaTransportFlow!");
502 NS_ENSURE_TRUE(aFlow, false);
503
504 mTransportFlow = aFlow;
505 mLocalPort = localport;
506 mRemotePort = remoteport;
507 mState = CONNECTING;
508
509 RUN_ON_THREAD(mSTS, WrapRunnable(RefPtr<DataChannelConnection>(this),
510 &DataChannelConnection::SetSignals),
511 NS_DISPATCH_NORMAL);
512 return true;
513 }
514
515 void
516 DataChannelConnection::SetSignals()
517 {
518 ASSERT_WEBRTC(IsSTSThread());
519 ASSERT_WEBRTC(mTransportFlow);
520 LOG(("Setting transport signals, state: %d", mTransportFlow->state()));
521 mTransportFlow->SignalPacketReceived.connect(this, &DataChannelConnection::SctpDtlsInput);
522 // SignalStateChange() doesn't call you with the initial state
523 mTransportFlow->SignalStateChange.connect(this, &DataChannelConnection::CompleteConnect);
524 CompleteConnect(mTransportFlow, mTransportFlow->state());
525 }
526
527 void
528 DataChannelConnection::CompleteConnect(TransportFlow *flow, TransportLayer::State state)
529 {
530 LOG(("Data transport state: %d", state));
531 MutexAutoLock lock(mLock);
532 ASSERT_WEBRTC(IsSTSThread());
533 // We should abort connection on TS_ERROR.
534 // Note however that the association will also fail (perhaps with a delay) and
535 // notify us in that way
536 if (state != TransportLayer::TS_OPEN || !mMasterSocket)
537 return;
538
539 struct sockaddr_conn addr;
540 memset(&addr, 0, sizeof(addr));
541 addr.sconn_family = AF_CONN;
542 #if defined(__Userspace_os_Darwin)
543 addr.sconn_len = sizeof(addr);
544 #endif
545 addr.sconn_port = htons(mLocalPort);
546 addr.sconn_addr = static_cast<void *>(this);
547
548 LOG(("Calling usrsctp_bind"));
549 int r = usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
550 sizeof(addr));
551 if (r < 0) {
552 LOG(("usrsctp_bind failed: %d", r));
553 } else {
554 // This is the remote addr
555 addr.sconn_port = htons(mRemotePort);
556 LOG(("Calling usrsctp_connect"));
557 r = usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr),
558 sizeof(addr));
559 if (r >= 0 || errno == EINPROGRESS) {
560 struct sctp_paddrparams paddrparams;
561 socklen_t opt_len;
562
563 memset(&paddrparams, 0, sizeof(struct sctp_paddrparams));
564 memcpy(&paddrparams.spp_address, &addr, sizeof(struct sockaddr_conn));
565 opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
566 r = usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
567 &paddrparams, &opt_len);
568 if (r < 0) {
569 LOG(("usrsctp_getsockopt failed: %d", r));
570 } else {
571 // draft-ietf-rtcweb-data-channel-13 section 5: max initial MTU IPV4 1200, IPV6 1280
572 paddrparams.spp_pathmtu = 1200; // safe for either
573 paddrparams.spp_flags &= ~SPP_PMTUD_ENABLE;
574 paddrparams.spp_flags |= SPP_PMTUD_DISABLE;
575 opt_len = (socklen_t)sizeof(struct sctp_paddrparams);
576 r = usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS,
577 &paddrparams, opt_len);
578 if (r < 0) {
579 LOG(("usrsctp_getsockopt failed: %d", r));
580 } else {
581 LOG(("usrsctp: PMTUD disabled, MTU set to %u", paddrparams.spp_pathmtu));
582 }
583 }
584 }
585 if (r < 0) {
586 if (errno == EINPROGRESS) {
587 // non-blocking
588 return;
589 } else {
590 LOG(("usrsctp_connect failed: %d", errno));
591 mState = CLOSED;
592 }
593 } else {
594 // We set Even/Odd and fire ON_CONNECTION via SCTP_COMM_UP when we get that
595 // This also avoids issues with calling TransportFlow stuff on Mainthread
596 return;
597 }
598 }
599 // Note: currently this doesn't actually notify the application
600 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
601 DataChannelOnMessageAvailable::ON_CONNECTION,
602 this)));
603 return;
604 }
605
606 // Process any pending Opens
607 void
608 DataChannelConnection::ProcessQueuedOpens()
609 {
610 // The nsDeque holds channels with an AddRef applied. Another reference
611 // (may) be held by the DOMDataChannel, unless it's been GC'd. No other
612 // references should exist.
613
614 // Can't copy nsDeque's. Move into temp array since any that fail will
615 // go back to mPending
616 nsDeque temp;
617 DataChannel *temp_channel; // really already_AddRefed<>
618 while (nullptr != (temp_channel = static_cast<DataChannel *>(mPending.PopFront()))) {
619 temp.Push(static_cast<void *>(temp_channel));
620 }
621
622 RefPtr<DataChannel> channel;
623 // All these entries have an AddRef(); make that explicit now via the dont_AddRef()
624 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(temp.PopFront())))) {
625 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
626 LOG(("Processing queued open for %p (%u)", channel.get(), channel->mStream));
627 channel->mFlags &= ~DATA_CHANNEL_FLAGS_FINISH_OPEN;
628 // OpenFinish returns a reference itself, so we need to take it can Release it
629 channel = OpenFinish(channel.forget()); // may reset the flag and re-push
630 } else {
631 NS_ASSERTION(false, "How did a DataChannel get queued without the FINISH_OPEN flag?");
632 }
633 }
634
635 }
636 void
637 DataChannelConnection::SctpDtlsInput(TransportFlow *flow,
638 const unsigned char *data, size_t len)
639 {
640 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
641 char *buf;
642
643 if ((buf = usrsctp_dumppacket((void *)data, len, SCTP_DUMP_INBOUND)) != nullptr) {
644 PR_LogPrint("%s", buf);
645 usrsctp_freedumpbuffer(buf);
646 }
647 }
648 // Pass the data to SCTP
649 usrsctp_conninput(static_cast<void *>(this), data, len, 0);
650 }
651
652 int
653 DataChannelConnection::SendPacket(unsigned char data[], size_t len, bool release)
654 {
655 //LOG(("%p: SCTP/DTLS sent %ld bytes", this, len));
656 int res = mTransportFlow->SendPacket(data, len) < 0 ? 1 : 0;
657 if (release)
658 delete [] data;
659 return res;
660 }
661
662 /* static */
663 int
664 DataChannelConnection::SctpDtlsOutput(void *addr, void *buffer, size_t length,
665 uint8_t tos, uint8_t set_df)
666 {
667 DataChannelConnection *peer = static_cast<DataChannelConnection *>(addr);
668 int res;
669
670 if (MOZ_LOG_TEST(gSCTPLog, LogLevel::Debug)) {
671 char *buf;
672
673 if ((buf = usrsctp_dumppacket(buffer, length, SCTP_DUMP_OUTBOUND)) != nullptr) {
674 PR_LogPrint("%s", buf);
675 usrsctp_freedumpbuffer(buf);
676 }
677 }
678 // We're async proxying even if on the STSThread because this is called
679 // with internal SCTP locks held in some cases (such as in usrsctp_connect()).
680 // SCTP has an option for Apple, on IP connections only, to release at least
681 // one of the locks before calling a packet output routine; with changes to
682 // the underlying SCTP stack this might remove the need to use an async proxy.
683 if ((false /*peer->IsSTSThread()*/)) {
684 res = peer->SendPacket(static_cast<unsigned char *>(buffer), length, false);
685 } else {
686 auto *data = new unsigned char[length];
687 memcpy(data, buffer, length);
688 // Commented out since we have to Dispatch SendPacket to avoid deadlock"
689 // res = -1;
690
691 // XXX It might be worthwhile to add an assertion against the thread
692 // somehow getting into the DataChannel/SCTP code again, as
693 // DISPATCH_SYNC is not fully blocking. This may be tricky, as it
694 // needs to be a per-thread check, not a global.
695 peer->mSTS->Dispatch(WrapRunnable(
696 RefPtr<DataChannelConnection>(peer),
697 &DataChannelConnection::SendPacket, data, length, true),
698 NS_DISPATCH_NORMAL);
699 res = 0; // cheat! Packets can always be dropped later anyways
700 }
701 return res;
702 }
703 #endif
704
705 #ifdef ALLOW_DIRECT_SCTP_LISTEN_CONNECT
706 // listen for incoming associations
707 // Blocks! - Don't call this from main thread!
708
709 #error This code will not work as-is since SetEvenOdd() runs on Mainthread
710
711 bool
712 DataChannelConnection::Listen(unsigned short port)
713 {
714 struct sockaddr_in addr;
715 socklen_t addr_len;
716
717 NS_WARNING_ASSERTION(!NS_IsMainThread(),
718 "Blocks, do not call from main thread!!!");
719
720 /* Acting as the 'server' */
721 memset((void *)&addr, 0, sizeof(addr));
722 #ifdef HAVE_SIN_LEN
723 addr.sin_len = sizeof(struct sockaddr_in);
724 #endif
725 addr.sin_family = AF_INET;
726 addr.sin_port = htons(port);
727 addr.sin_addr.s_addr = htonl(INADDR_ANY);
728 LOG(("Waiting for connections on port %u", ntohs(addr.sin_port)));
729 mState = CONNECTING;
730 if (usrsctp_bind(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr), sizeof(struct sockaddr_in)) < 0) {
731 LOG(("***Failed userspace_bind"));
732 return false;
733 }
734 if (usrsctp_listen(mMasterSocket, 1) < 0) {
735 LOG(("***Failed userspace_listen"));
736 return false;
737 }
738
739 LOG(("Accepting connection"));
740 addr_len = 0;
741 if ((mSocket = usrsctp_accept(mMasterSocket, nullptr, &addr_len)) == nullptr) {
742 LOG(("***Failed accept"));
743 return false;
744 }
745 mState = OPEN;
746
747 struct linger l;
748 l.l_onoff = 1;
749 l.l_linger = 0;
750 if (usrsctp_setsockopt(mSocket, SOL_SOCKET, SO_LINGER,
751 (const void *)&l, (socklen_t)sizeof(struct linger)) < 0) {
752 LOG(("Couldn't set SO_LINGER on SCTP socket"));
753 }
754
755 SetEvenOdd();
756
757 // Notify Connection open
758 // XXX We need to make sure connection sticks around until the message is delivered
759 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
760 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
761 DataChannelOnMessageAvailable::ON_CONNECTION,
762 this, (DataChannel *) nullptr)));
763 return true;
764 }
765
766 // Blocks! - Don't call this from main thread!
767 bool
768 DataChannelConnection::Connect(const char *addr, unsigned short port)
769 {
770 struct sockaddr_in addr4;
771 struct sockaddr_in6 addr6;
772
773 NS_WARNING_ASSERTION(!NS_IsMainThread(),
774 "Blocks, do not call from main thread!!!");
775
776 /* Acting as the connector */
777 LOG(("Connecting to %s, port %u", addr, port));
778 memset((void *)&addr4, 0, sizeof(struct sockaddr_in));
779 memset((void *)&addr6, 0, sizeof(struct sockaddr_in6));
780 #ifdef HAVE_SIN_LEN
781 addr4.sin_len = sizeof(struct sockaddr_in);
782 #endif
783 #ifdef HAVE_SIN6_LEN
784 addr6.sin6_len = sizeof(struct sockaddr_in6);
785 #endif
786 addr4.sin_family = AF_INET;
787 addr6.sin6_family = AF_INET6;
788 addr4.sin_port = htons(port);
789 addr6.sin6_port = htons(port);
790 mState = CONNECTING;
791
792 #if !defined(__Userspace_os_Windows)
793 if (inet_pton(AF_INET6, addr, &addr6.sin6_addr) == 1) {
794 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
795 LOG(("*** Failed userspace_connect"));
796 return false;
797 }
798 } else if (inet_pton(AF_INET, addr, &addr4.sin_addr) == 1) {
799 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
800 LOG(("*** Failed userspace_connect"));
801 return false;
802 }
803 } else {
804 LOG(("*** Illegal destination address."));
805 }
806 #else
807 {
808 struct sockaddr_storage ss;
809 int sslen = sizeof(ss);
810
811 if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET6, nullptr, (struct sockaddr*)&ss, &sslen)) {
812 addr6.sin6_addr = (reinterpret_cast<struct sockaddr_in6 *>(&ss))->sin6_addr;
813 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr6), sizeof(struct sockaddr_in6)) < 0) {
814 LOG(("*** Failed userspace_connect"));
815 return false;
816 }
817 } else if (!WSAStringToAddressA(const_cast<char *>(addr), AF_INET, nullptr, (struct sockaddr*)&ss, &sslen)) {
818 addr4.sin_addr = (reinterpret_cast<struct sockaddr_in *>(&ss))->sin_addr;
819 if (usrsctp_connect(mMasterSocket, reinterpret_cast<struct sockaddr *>(&addr4), sizeof(struct sockaddr_in)) < 0) {
820 LOG(("*** Failed userspace_connect"));
821 return false;
822 }
823 } else {
824 LOG(("*** Illegal destination address."));
825 }
826 }
827 #endif
828
829 mSocket = mMasterSocket;
830
831 LOG(("connect() succeeded! Entering connected mode"));
832 mState = OPEN;
833
834 SetEvenOdd();
835
836 // Notify Connection open
837 // XXX We need to make sure connection sticks around until the message is delivered
838 LOG(("%s: sending ON_CONNECTION for %p", __FUNCTION__, this));
839 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
840 DataChannelOnMessageAvailable::ON_CONNECTION,
841 this, (DataChannel *) nullptr)));
842 return true;
843 }
844 #endif
845
846 DataChannel *
847 DataChannelConnection::FindChannelByStream(uint16_t stream)
848 {
849 return mStreams.SafeElementAt(stream);
850 }
851
852 uint16_t
853 DataChannelConnection::FindFreeStream()
854 {
855 uint32_t i, j, limit;
856
857 limit = mStreams.Length();
858 if (limit > MAX_NUM_STREAMS)
859 limit = MAX_NUM_STREAMS;
860
861 for (i = (mAllocateEven ? 0 : 1); i < limit; i += 2) {
862 if (!mStreams[i]) {
863 // Verify it's not still in the process of closing
864 for (j = 0; j < mStreamsResetting.Length(); ++j) {
865 if (mStreamsResetting[j] == i) {
866 break;
867 }
868 }
869 if (j == mStreamsResetting.Length())
870 break;
871 }
872 }
873 if (i >= limit) {
874 return INVALID_STREAM;
875 }
876 return i;
877 }
878
879 bool
880 DataChannelConnection::RequestMoreStreams(int32_t aNeeded)
881 {
882 struct sctp_status status;
883 struct sctp_add_streams sas;
884 uint32_t outStreamsNeeded;
885 socklen_t len;
886
887 if (aNeeded + mStreams.Length() > MAX_NUM_STREAMS) {
888 aNeeded = MAX_NUM_STREAMS - mStreams.Length();
889 }
890 if (aNeeded <= 0) {
891 return false;
892 }
893
894 len = (socklen_t)sizeof(struct sctp_status);
895 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_STATUS, &status, &len) < 0) {
896 LOG(("***failed: getsockopt SCTP_STATUS"));
897 return false;
898 }
899 outStreamsNeeded = aNeeded; // number to add
900
901 // Note: if multiple channel opens happen when we don't have enough space,
902 // we'll call RequestMoreStreams() multiple times
903 memset(&sas, 0, sizeof(sas));
904 sas.sas_instrms = 0;
905 sas.sas_outstrms = (uint16_t)outStreamsNeeded; /* XXX error handling */
906 // Doesn't block, we get an event when it succeeds or fails
907 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_ADD_STREAMS, &sas,
908 (socklen_t) sizeof(struct sctp_add_streams)) < 0) {
909 if (errno == EALREADY) {
910 LOG(("Already have %u output streams", outStreamsNeeded));
911 return true;
912 }
913
914 LOG(("***failed: setsockopt ADD errno=%d", errno));
915 return false;
916 }
917 LOG(("Requested %u more streams", outStreamsNeeded));
918 // We add to mStreams when we get a SCTP_STREAM_CHANGE_EVENT and the
919 // values are larger than mStreams.Length()
920 return true;
921 }
922
923 int32_t
924 DataChannelConnection::SendControlMessage(void *msg, uint32_t len, uint16_t stream)
925 {
926 struct sctp_sndinfo sndinfo;
927
928 // Note: Main-thread IO, but doesn't block
929 memset(&sndinfo, 0, sizeof(struct sctp_sndinfo));
930 sndinfo.snd_sid = stream;
931 sndinfo.snd_ppid = htonl(DATA_CHANNEL_PPID_CONTROL);
932 if (usrsctp_sendv(mSocket, msg, len, nullptr, 0,
933 &sndinfo, (socklen_t)sizeof(struct sctp_sndinfo),
934 SCTP_SENDV_SNDINFO, 0) < 0) {
935 //LOG(("***failed: sctp_sendv")); don't log because errno is a return!
936 return (0);
937 }
938 return (1);
939 }
940
941 int32_t
942 DataChannelConnection::SendOpenAckMessage(uint16_t stream)
943 {
944 struct rtcweb_datachannel_ack ack;
945
946 memset(&ack, 0, sizeof(struct rtcweb_datachannel_ack));
947 ack.msg_type = DATA_CHANNEL_ACK;
948
949 return SendControlMessage(&ack, sizeof(ack), stream);
950 }
951
952 int32_t
953 DataChannelConnection::SendOpenRequestMessage(const nsACString& label,
954 const nsACString& protocol,
955 uint16_t stream, bool unordered,
956 uint16_t prPolicy, uint32_t prValue)
957 {
958 const int label_len = label.Length(); // not including nul
959 const int proto_len = protocol.Length(); // not including nul
960 // careful - request struct include one char for the label
961 const int req_size = sizeof(struct rtcweb_datachannel_open_request) - 1 +
962 label_len + proto_len;
963 struct rtcweb_datachannel_open_request *req =
964 (struct rtcweb_datachannel_open_request*) moz_xmalloc(req_size);
965
966 memset(req, 0, req_size);
967 req->msg_type = DATA_CHANNEL_OPEN_REQUEST;
968 switch (prPolicy) {
969 case SCTP_PR_SCTP_NONE:
970 req->channel_type = DATA_CHANNEL_RELIABLE;
971 break;
972 case SCTP_PR_SCTP_TTL:
973 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_TIMED;
974 break;
975 case SCTP_PR_SCTP_RTX:
976 req->channel_type = DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT;
977 break;
978 default:
979 // FIX! need to set errno! Or make all these SendXxxx() funcs return 0 or errno!
980 free(req);
981 return (0);
982 }
983 if (unordered) {
984 // Per the current types, all differ by 0x80 between ordered and unordered
985 req->channel_type |= 0x80; // NOTE: be careful if new types are added in the future
986 }
987
988 req->reliability_param = htonl(prValue);
989 req->priority = htons(0); /* XXX: add support */
990 req->label_length = htons(label_len);
991 req->protocol_length = htons(proto_len);
992 memcpy(&req->label[0], PromiseFlatCString(label).get(), label_len);
993 memcpy(&req->label[label_len], PromiseFlatCString(protocol).get(), proto_len);
994
995 int32_t result = SendControlMessage(req, req_size, stream);
996
997 free(req);
998 return result;
999 }
1000
1001 // XXX This should use a separate thread (outbound queue) which should
1002 // select() to know when to *try* to send data to the socket again.
1003 // Alternatively, it can use a timeout, but that's guaranteed to be wrong
1004 // (just not sure in what direction). We could re-implement NSPR's
1005 // PR_POLL_WRITE/etc handling... with a lot of work.
1006
1007 // Better yet, use the SCTP stack's notifications on buffer state to avoid
1008 // filling the SCTP's buffers.
1009
1010 // returns if we're still blocked or not
1011 bool
1012 DataChannelConnection::SendDeferredMessages()
1013 {
1014 uint32_t i;
1015 RefPtr<DataChannel> channel; // we may null out the refs to this
1016 bool still_blocked = false;
1017
1018 // This may block while something is modifying channels, but should not block for IO
1019 MutexAutoLock lock(mLock);
1020
1021 // XXX For total fairness, on a still_blocked we'd start next time at the
1022 // same index. Sorry, not going to bother for now.
1023 for (i = 0; i < mStreams.Length(); ++i) {
1024 channel = mStreams[i];
1025 if (!channel)
1026 continue;
1027
1028 // Only one of these should be set....
1029 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_REQ) {
1030 if (SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
1031 channel->mStream,
1032 channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED,
1033 channel->mPrPolicy, channel->mPrValue)) {
1034 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_REQ;
1035
1036 channel->mState = OPEN;
1037 channel->mReady = true;
1038 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1039 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1040 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
1041 channel)));
1042 } else {
1043 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1044 still_blocked = true;
1045 } else {
1046 // Close the channel, inform the user
1047 mStreams[channel->mStream] = nullptr;
1048 channel->mState = CLOSED;
1049 // Don't need to reset; we didn't open it
1050 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1051 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1052 channel)));
1053 }
1054 }
1055 }
1056 if (still_blocked)
1057 break;
1058
1059 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_ACK) {
1060 if (SendOpenAckMessage(channel->mStream)) {
1061 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_ACK;
1062 } else {
1063 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1064 still_blocked = true;
1065 } else {
1066 // Close the channel, inform the user
1067 CloseInt(channel);
1068 // XXX send error via DataChannelOnMessageAvailable (bug 843625)
1069 }
1070 }
1071 }
1072 if (still_blocked)
1073 break;
1074
1075 if (channel->mFlags & DATA_CHANNEL_FLAGS_SEND_DATA) {
1076 bool failed_send = false;
1077 int32_t result;
1078
1079 if (channel->mState == CLOSED || channel->mState == CLOSING) {
1080 channel->mBufferedData.Clear();
1081 }
1082
1083 uint32_t buffered_amount = channel->GetBufferedAmountLocked();
1084 uint32_t threshold = channel->GetBufferedAmountLowThreshold();
1085 bool was_over_threshold = buffered_amount >= threshold;
1086
1087 while (!channel->mBufferedData.IsEmpty() &&
1088 !failed_send) {
1089 struct sctp_sendv_spa *spa = channel->mBufferedData[0]->mSpa;
1090 const char *data = channel->mBufferedData[0]->mData;
1091 size_t len = channel->mBufferedData[0]->mLength;
1092
1093 // SCTP will return EMSGSIZE if the message is bigger than the buffer
1094 // size (or EAGAIN if there isn't space)
1095 if ((result = usrsctp_sendv(mSocket, data, len,
1096 nullptr, 0,
1097 (void *)spa, (socklen_t)sizeof(struct sctp_sendv_spa),
1098 SCTP_SENDV_SPA,
1099 0)) < 0) {
1100 if (errno == EAGAIN || errno == EWOULDBLOCK) {
1101 // leave queued for resend
1102 failed_send = true;
1103 LOG(("queue full again when resending %d bytes (%d)", len, result));
1104 } else {
1105 LOG(("error %d re-sending string", errno));
1106 failed_send = true;
1107 }
1108 } else {
1109 LOG(("Resent buffer of %d bytes (%d)", len, result));
1110 // In theory this could underflow if >4GB was buffered and re
1111 // truncated in GetBufferedAmount(), but this won't cause any problems.
1112 buffered_amount -= channel->mBufferedData[0]->mLength;
1113 channel->mBufferedData.RemoveElementAt(0);
1114 // can never fire with default threshold of 0
1115 if (was_over_threshold && buffered_amount < threshold) {
1116 LOG(("%s: sending BUFFER_LOW_THRESHOLD for %s/%s: %u", __FUNCTION__,
1117 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1118 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1119 DataChannelOnMessageAvailable::BUFFER_LOW_THRESHOLD,
1120 this, channel)));
1121 was_over_threshold = false;
1122 }
1123 if (buffered_amount == 0) {
1124 // buffered-to-not-buffered transition; tell the DOM code in case this makes it
1125 // available for GC
1126 LOG(("%s: sending NO_LONGER_BUFFERED for %s/%s: %u", __FUNCTION__,
1127 channel->mLabel.get(), channel->mProtocol.get(), channel->mStream));
1128 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1129 DataChannelOnMessageAvailable::NO_LONGER_BUFFERED,
1130 this, channel)));
1131 }
1132 }
1133 }
1134 if (channel->mBufferedData.IsEmpty())
1135 channel->mFlags &= ~DATA_CHANNEL_FLAGS_SEND_DATA;
1136 else
1137 still_blocked = true;
1138 }
1139 if (still_blocked)
1140 break;
1141 }
1142
1143 return still_blocked;
1144 }
1145
1146 void
1147 DataChannelConnection::HandleOpenRequestMessage(const struct rtcweb_datachannel_open_request *req,
1148 size_t length,
1149 uint16_t stream)
1150 {
1151 RefPtr<DataChannel> channel;
1152 uint32_t prValue;
1153 uint16_t prPolicy;
1154 uint32_t flags;
1155
1156 mLock.AssertCurrentThreadOwns();
1157
1158 if (length != (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)) {
1159 LOG(("%s: Inconsistent length: %u, should be %u", __FUNCTION__, length,
1160 (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length)));
1161 if (length < (sizeof(*req) - 1) + ntohs(req->label_length) + ntohs(req->protocol_length))
1162 return;
1163 }
1164
1165 LOG(("%s: length %u, sizeof(*req) = %u", __FUNCTION__, length, sizeof(*req)));
1166
1167 switch (req->channel_type) {
1168 case DATA_CHANNEL_RELIABLE:
1169 case DATA_CHANNEL_RELIABLE_UNORDERED:
1170 prPolicy = SCTP_PR_SCTP_NONE;
1171 break;
1172 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1173 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT_UNORDERED:
1174 prPolicy = SCTP_PR_SCTP_RTX;
1175 break;
1176 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1177 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED_UNORDERED:
1178 prPolicy = SCTP_PR_SCTP_TTL;
1179 break;
1180 default:
1181 LOG(("Unknown channel type", req->channel_type));
1182 /* XXX error handling */
1183 return;
1184 }
1185 prValue = ntohl(req->reliability_param);
1186 flags = (req->channel_type & 0x80) ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1187
1188 if ((channel = FindChannelByStream(stream))) {
1189 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
1190 LOG(("ERROR: HandleOpenRequestMessage: channel for stream %u is in state %d instead of CLOSED.",
1191 stream, channel->mState));
1192 /* XXX: some error handling */
1193 } else {
1194 LOG(("Open for externally negotiated channel %u", stream));
1195 // XXX should also check protocol, maybe label
1196 if (prPolicy != channel->mPrPolicy ||
1197 prValue != channel->mPrValue ||
1198 flags != (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED))
1199 {
1200 LOG(("WARNING: external negotiation mismatch with OpenRequest:"
1201 "channel %u, policy %u/%u, value %u/%u, flags %x/%x",
1202 stream, prPolicy, channel->mPrPolicy,
1203 prValue, channel->mPrValue, flags, channel->mFlags));
1204 }
1205 }
1206 return;
1207 }
1208 if (stream >= mStreams.Length()) {
1209 LOG(("%s: stream %u out of bounds (%u)", __FUNCTION__, stream, mStreams.Length()));
1210 return;
1211 }
1212
1213 nsCString label(nsDependentCSubstring(&req->label[0], ntohs(req->label_length)));
1214 nsCString protocol(nsDependentCSubstring(&req->label[ntohs(req->label_length)],
1215 ntohs(req->protocol_length)));
1216
1217 channel = new DataChannel(this,
1218 stream,
1219 DataChannel::CONNECTING,
1220 label,
1221 protocol,
1222 prPolicy, prValue,
1223 flags,
1224 nullptr, nullptr);
1225 mStreams[stream] = channel;
1226
1227 channel->mState = DataChannel::WAITING_TO_OPEN;
1228
1229 LOG(("%s: sending ON_CHANNEL_CREATED for %s/%s: %u (state %u)", __FUNCTION__,
1230 channel->mLabel.get(), channel->mProtocol.get(), stream, channel->mState));
1231 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1232 DataChannelOnMessageAvailable::ON_CHANNEL_CREATED,
1233 this, channel)));
1234
1235 LOG(("%s: deferring sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
1236
1237 if (!SendOpenAckMessage(stream)) {
1238 // XXX Only on EAGAIN!? And if not, then close the channel??
1239 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_ACK;
1240 // Note: we're locked, so there's no danger of a race with the
1241 // buffer-threshold callback
1242 }
1243
1244 // Now process any queued data messages for the channel (which will
1245 // themselves likely get queued until we leave WAITING_TO_OPEN, plus any
1246 // more that come in before that happens)
1247 DeliverQueuedData(stream);
1248 }
1249
1250 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1251 // That would make this code moot. Keep it for now for backwards compatibility.
1252 void
1253 DataChannelConnection::DeliverQueuedData(uint16_t stream)
1254 {
1255 mLock.AssertCurrentThreadOwns();
1256
1257 uint32_t i = 0;
1258 while (i < mQueuedData.Length()) {
1259 // Careful! we may modify the array length from within the loop!
1260 if (mQueuedData[i]->mStream == stream) {
1261 LOG(("Delivering queued data for stream %u, length %u",
1262 stream, (unsigned int) mQueuedData[i]->mLength));
1263 // Deliver the queued data
1264 HandleDataMessage(mQueuedData[i]->mPpid,
1265 mQueuedData[i]->mData, mQueuedData[i]->mLength,
1266 mQueuedData[i]->mStream);
1267 mQueuedData.RemoveElementAt(i);
1268 continue; // don't bump index since we removed the element
1269 }
1270 i++;
1271 }
1272 }
1273
1274 void
1275 DataChannelConnection::HandleOpenAckMessage(const struct rtcweb_datachannel_ack *ack,
1276 size_t length, uint16_t stream)
1277 {
1278 DataChannel *channel;
1279
1280 mLock.AssertCurrentThreadOwns();
1281
1282 channel = FindChannelByStream(stream);
1283 NS_ENSURE_TRUE_VOID(channel);
1284
1285 LOG(("OpenAck received for stream %u, waiting=%d", stream,
1286 (channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK) ? 1 : 0));
1287
1288 channel->mFlags &= ~DATA_CHANNEL_FLAGS_WAITING_ACK;
1289 }
1290
1291 void
1292 DataChannelConnection::HandleUnknownMessage(uint32_t ppid, size_t length, uint16_t stream)
1293 {
1294 /* XXX: Send an error message? */
1295 LOG(("unknown DataChannel message received: %u, len %ld on stream %lu", ppid, length, stream));
1296 // XXX Log to JS error console if possible
1297 }
1298
1299 void
1300 DataChannelConnection::HandleDataMessage(uint32_t ppid,
1301 const void *data, size_t length,
1302 uint16_t stream)
1303 {
1304 DataChannel *channel;
1305 const char *buffer = (const char *) data;
1306
1307 mLock.AssertCurrentThreadOwns();
1308
1309 channel = FindChannelByStream(stream);
1310
1311 // XXX A closed channel may trip this... check
1312 // NOTE: the updated spec from the IETF says we should set in-order until we receive an ACK.
1313 // That would make this code moot. Keep it for now for backwards compatibility.
1314 if (!channel) {
1315 // In the updated 0-RTT open case, the sender can send data immediately
1316 // after Open, and doesn't set the in-order bit (since we don't have a
1317 // response or ack). Also, with external negotiation, data can come in
1318 // before we're told about the external negotiation. We need to buffer
1319 // data until either a) Open comes in, if the ordering get messed up,
1320 // or b) the app tells us this channel was externally negotiated. When
1321 // these occur, we deliver the data.
1322
1323 // Since this is rare and non-performance, keep a single list of queued
1324 // data messages to deliver once the channel opens.
1325 LOG(("Queuing data for stream %u, length %u", stream, length));
1326 // Copies data
1327 mQueuedData.AppendElement(new QueuedDataMessage(stream, ppid, data, length));
1328 return;
1329 }
1330
1331 // XXX should this be a simple if, no warnings/debugbreaks?
1332 NS_ENSURE_TRUE_VOID(channel->mState != CLOSED);
1333
1334 {
1335 nsAutoCString recvData(buffer, length); // copies (<64) or allocates
1336 bool is_binary = true;
1337
1338 if (ppid == DATA_CHANNEL_PPID_DOMSTRING ||
1339 ppid == DATA_CHANNEL_PPID_DOMSTRING_LAST) {
1340 is_binary = false;
1341 }
1342 if (is_binary != channel->mIsRecvBinary && !channel->mRecvBuffer.IsEmpty()) {
1343 NS_WARNING("DataChannel message aborted by fragment type change!");
1344 channel->mRecvBuffer.Truncate(0);
1345 }
1346 channel->mIsRecvBinary = is_binary;
1347
1348 switch (ppid) {
1349 case DATA_CHANNEL_PPID_DOMSTRING:
1350 case DATA_CHANNEL_PPID_BINARY:
1351 channel->mRecvBuffer += recvData;
1352 LOG(("DataChannel: Partial %s message of length %lu (total %u) on channel id %u",
1353 is_binary ? "binary" : "string", length, channel->mRecvBuffer.Length(),
1354 channel->mStream));
1355 return; // Not ready to notify application
1356
1357 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1358 LOG(("DataChannel: String message received of length %lu on channel %u",
1359 length, channel->mStream));
1360 if (!channel->mRecvBuffer.IsEmpty()) {
1361 channel->mRecvBuffer += recvData;
1362 LOG(("%s: sending ON_DATA (string fragmented) for %p", __FUNCTION__, channel));
1363 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1364 DataChannelOnMessageAvailable::ON_DATA, this,
1365 channel, channel->mRecvBuffer, -1));
1366 channel->mRecvBuffer.Truncate(0);
1367 return;
1368 }
1369 // else send using recvData normally
1370 length = -1; // Flag for DOMString
1371
1372 // WebSockets checks IsUTF8() here; we can try to deliver it
1373 break;
1374
1375 case DATA_CHANNEL_PPID_BINARY_LAST:
1376 LOG(("DataChannel: Received binary message of length %lu on channel id %u",
1377 length, channel->mStream));
1378 if (!channel->mRecvBuffer.IsEmpty()) {
1379 channel->mRecvBuffer += recvData;
1380 LOG(("%s: sending ON_DATA (binary fragmented) for %p", __FUNCTION__, channel));
1381 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1382 DataChannelOnMessageAvailable::ON_DATA, this,
1383 channel, channel->mRecvBuffer,
1384 channel->mRecvBuffer.Length()));
1385 channel->mRecvBuffer.Truncate(0);
1386 return;
1387 }
1388 // else send using recvData normally
1389 break;
1390
1391 default:
1392 NS_ERROR("Unknown data PPID");
1393 return;
1394 }
1395 /* Notify onmessage */
1396 LOG(("%s: sending ON_DATA for %p", __FUNCTION__, channel));
1397 channel->SendOrQueue(new DataChannelOnMessageAvailable(
1398 DataChannelOnMessageAvailable::ON_DATA, this,
1399 channel, recvData, length));
1400 }
1401 }
1402
1403 // Called with mLock locked!
1404 void
1405 DataChannelConnection::HandleMessage(const void *buffer, size_t length, uint32_t ppid, uint16_t stream)
1406 {
1407 const struct rtcweb_datachannel_open_request *req;
1408 const struct rtcweb_datachannel_ack *ack;
1409
1410 mLock.AssertCurrentThreadOwns();
1411
1412 switch (ppid) {
1413 case DATA_CHANNEL_PPID_CONTROL:
1414 req = static_cast<const struct rtcweb_datachannel_open_request *>(buffer);
1415
1416 NS_ENSURE_TRUE_VOID(length >= sizeof(*ack)); // smallest message
1417 switch (req->msg_type) {
1418 case DATA_CHANNEL_OPEN_REQUEST:
1419 // structure includes a possibly-unused char label[1] (in a packed structure)
1420 NS_ENSURE_TRUE_VOID(length >= sizeof(*req) - 1);
1421
1422 HandleOpenRequestMessage(req, length, stream);
1423 break;
1424 case DATA_CHANNEL_ACK:
1425 // >= sizeof(*ack) checked above
1426
1427 ack = static_cast<const struct rtcweb_datachannel_ack *>(buffer);
1428 HandleOpenAckMessage(ack, length, stream);
1429 break;
1430 default:
1431 HandleUnknownMessage(ppid, length, stream);
1432 break;
1433 }
1434 break;
1435 case DATA_CHANNEL_PPID_DOMSTRING:
1436 case DATA_CHANNEL_PPID_DOMSTRING_LAST:
1437 case DATA_CHANNEL_PPID_BINARY:
1438 case DATA_CHANNEL_PPID_BINARY_LAST:
1439 HandleDataMessage(ppid, buffer, length, stream);
1440 break;
1441 default:
1442 LOG(("Message of length %lu, PPID %u on stream %u received.",
1443 length, ppid, stream));
1444 break;
1445 }
1446 }
1447
1448 void
1449 DataChannelConnection::HandleAssociationChangeEvent(const struct sctp_assoc_change *sac)
1450 {
1451 uint32_t i, n;
1452
1453 switch (sac->sac_state) {
1454 case SCTP_COMM_UP:
1455 LOG(("Association change: SCTP_COMM_UP"));
1456 if (mState == CONNECTING) {
1457 mSocket = mMasterSocket;
1458 mState = OPEN;
1459
1460 SetEvenOdd();
1461
1462 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1463 DataChannelOnMessageAvailable::ON_CONNECTION,
1464 this)));
1465 LOG(("DTLS connect() succeeded! Entering connected mode"));
1466
1467 // Open any streams pending...
1468 ProcessQueuedOpens();
1469
1470 } else if (mState == OPEN) {
1471 LOG(("DataConnection Already OPEN"));
1472 } else {
1473 LOG(("Unexpected state: %d", mState));
1474 }
1475 break;
1476 case SCTP_COMM_LOST:
1477 LOG(("Association change: SCTP_COMM_LOST"));
1478 // This association is toast, so also close all the channels -- from mainthread!
1479 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1480 DataChannelOnMessageAvailable::ON_DISCONNECTED,
1481 this)));
1482 break;
1483 case SCTP_RESTART:
1484 LOG(("Association change: SCTP_RESTART"));
1485 break;
1486 case SCTP_SHUTDOWN_COMP:
1487 LOG(("Association change: SCTP_SHUTDOWN_COMP"));
1488 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1489 DataChannelOnMessageAvailable::ON_DISCONNECTED,
1490 this)));
1491 break;
1492 case SCTP_CANT_STR_ASSOC:
1493 LOG(("Association change: SCTP_CANT_STR_ASSOC"));
1494 break;
1495 default:
1496 LOG(("Association change: UNKNOWN"));
1497 break;
1498 }
1499 LOG(("Association change: streams (in/out) = (%u/%u)",
1500 sac->sac_inbound_streams, sac->sac_outbound_streams));
1501
1502 NS_ENSURE_TRUE_VOID(sac);
1503 n = sac->sac_length - sizeof(*sac);
1504 if (((sac->sac_state == SCTP_COMM_UP) ||
1505 (sac->sac_state == SCTP_RESTART)) && (n > 0)) {
1506 for (i = 0; i < n; ++i) {
1507 switch (sac->sac_info[i]) {
1508 case SCTP_ASSOC_SUPPORTS_PR:
1509 LOG(("Supports: PR"));
1510 break;
1511 case SCTP_ASSOC_SUPPORTS_AUTH:
1512 LOG(("Supports: AUTH"));
1513 break;
1514 case SCTP_ASSOC_SUPPORTS_ASCONF:
1515 LOG(("Supports: ASCONF"));
1516 break;
1517 case SCTP_ASSOC_SUPPORTS_MULTIBUF:
1518 LOG(("Supports: MULTIBUF"));
1519 break;
1520 case SCTP_ASSOC_SUPPORTS_RE_CONFIG:
1521 LOG(("Supports: RE-CONFIG"));
1522 break;
1523 default:
1524 LOG(("Supports: UNKNOWN(0x%02x)", sac->sac_info[i]));
1525 break;
1526 }
1527 }
1528 } else if (((sac->sac_state == SCTP_COMM_LOST) ||
1529 (sac->sac_state == SCTP_CANT_STR_ASSOC)) && (n > 0)) {
1530 LOG(("Association: ABORT ="));
1531 for (i = 0; i < n; ++i) {
1532 LOG((" 0x%02x", sac->sac_info[i]));
1533 }
1534 }
1535 if ((sac->sac_state == SCTP_CANT_STR_ASSOC) ||
1536 (sac->sac_state == SCTP_SHUTDOWN_COMP) ||
1537 (sac->sac_state == SCTP_COMM_LOST)) {
1538 return;
1539 }
1540 }
1541
1542 void
1543 DataChannelConnection::HandlePeerAddressChangeEvent(const struct sctp_paddr_change *spc)
1544 {
1545 const char *addr = "";
1546 #if !defined(__Userspace_os_Windows)
1547 char addr_buf[INET6_ADDRSTRLEN];
1548 struct sockaddr_in *sin;
1549 struct sockaddr_in6 *sin6;
1550 #endif
1551
1552 switch (spc->spc_aaddr.ss_family) {
1553 case AF_INET:
1554 #if !defined(__Userspace_os_Windows)
1555 sin = (struct sockaddr_in *)&spc->spc_aaddr;
1556 addr = inet_ntop(AF_INET, &sin->sin_addr, addr_buf, INET6_ADDRSTRLEN);
1557 #endif
1558 break;
1559 case AF_INET6:
1560 #if !defined(__Userspace_os_Windows)
1561 sin6 = (struct sockaddr_in6 *)&spc->spc_aaddr;
1562 addr = inet_ntop(AF_INET6, &sin6->sin6_addr, addr_buf, INET6_ADDRSTRLEN);
1563 #endif
1564 break;
1565 case AF_CONN:
1566 addr = "DTLS connection";
1567 break;
1568 default:
1569 break;
1570 }
1571 LOG(("Peer address %s is now ", addr));
1572 switch (spc->spc_state) {
1573 case SCTP_ADDR_AVAILABLE:
1574 LOG(("SCTP_ADDR_AVAILABLE"));
1575 break;
1576 case SCTP_ADDR_UNREACHABLE:
1577 LOG(("SCTP_ADDR_UNREACHABLE"));
1578 break;
1579 case SCTP_ADDR_REMOVED:
1580 LOG(("SCTP_ADDR_REMOVED"));
1581 break;
1582 case SCTP_ADDR_ADDED:
1583 LOG(("SCTP_ADDR_ADDED"));
1584 break;
1585 case SCTP_ADDR_MADE_PRIM:
1586 LOG(("SCTP_ADDR_MADE_PRIM"));
1587 break;
1588 case SCTP_ADDR_CONFIRMED:
1589 LOG(("SCTP_ADDR_CONFIRMED"));
1590 break;
1591 default:
1592 LOG(("UNKNOWN"));
1593 break;
1594 }
1595 LOG((" (error = 0x%08x).\n", spc->spc_error));
1596 }
1597
1598 void
1599 DataChannelConnection::HandleRemoteErrorEvent(const struct sctp_remote_error *sre)
1600 {
1601 size_t i, n;
1602
1603 n = sre->sre_length - sizeof(struct sctp_remote_error);
1604 LOG(("Remote Error (error = 0x%04x): ", sre->sre_error));
1605 for (i = 0; i < n; ++i) {
1606 LOG((" 0x%02x", sre-> sre_data[i]));
1607 }
1608 }
1609
1610 void
1611 DataChannelConnection::HandleShutdownEvent(const struct sctp_shutdown_event *sse)
1612 {
1613 LOG(("Shutdown event."));
1614 /* XXX: notify all channels. */
1615 // Attempts to actually send anything will fail
1616 }
1617
1618 void
1619 DataChannelConnection::HandleAdaptationIndication(const struct sctp_adaptation_event *sai)
1620 {
1621 LOG(("Adaptation indication: %x.", sai-> sai_adaptation_ind));
1622 }
1623
1624 void
1625 DataChannelConnection::HandleSendFailedEvent(const struct sctp_send_failed_event *ssfe)
1626 {
1627 size_t i, n;
1628
1629 if (ssfe->ssfe_flags & SCTP_DATA_UNSENT) {
1630 LOG(("Unsent "));
1631 }
1632 if (ssfe->ssfe_flags & SCTP_DATA_SENT) {
1633 LOG(("Sent "));
1634 }
1635 if (ssfe->ssfe_flags & ~(SCTP_DATA_SENT | SCTP_DATA_UNSENT)) {
1636 LOG(("(flags = %x) ", ssfe->ssfe_flags));
1637 }
1638 LOG(("message with PPID = %u, SID = %d, flags: 0x%04x due to error = 0x%08x",
1639 ntohl(ssfe->ssfe_info.snd_ppid), ssfe->ssfe_info.snd_sid,
1640 ssfe->ssfe_info.snd_flags, ssfe->ssfe_error));
1641 n = ssfe->ssfe_length - sizeof(struct sctp_send_failed_event);
1642 for (i = 0; i < n; ++i) {
1643 LOG((" 0x%02x", ssfe->ssfe_data[i]));
1644 }
1645 }
1646
1647 void
1648 DataChannelConnection::ClearResets()
1649 {
1650 // Clear all pending resets
1651 if (!mStreamsResetting.IsEmpty()) {
1652 LOG(("Clearing resets for %d streams", mStreamsResetting.Length()));
1653 }
1654
1655 for (uint32_t i = 0; i < mStreamsResetting.Length(); ++i) {
1656 RefPtr<DataChannel> channel;
1657 channel = FindChannelByStream(mStreamsResetting[i]);
1658 if (channel) {
1659 LOG(("Forgetting channel %u (%p) with pending reset",channel->mStream, channel.get()));
1660 mStreams[channel->mStream] = nullptr;
1661 }
1662 }
1663 mStreamsResetting.Clear();
1664 }
1665
1666 void
1667 DataChannelConnection::ResetOutgoingStream(uint16_t stream)
1668 {
1669 uint32_t i;
1670
1671 mLock.AssertCurrentThreadOwns();
1672 LOG(("Connection %p: Resetting outgoing stream %u",
1673 (void *) this, stream));
1674 // Rarely has more than a couple items and only for a short time
1675 for (i = 0; i < mStreamsResetting.Length(); ++i) {
1676 if (mStreamsResetting[i] == stream) {
1677 return;
1678 }
1679 }
1680 mStreamsResetting.AppendElement(stream);
1681 }
1682
1683 void
1684 DataChannelConnection::SendOutgoingStreamReset()
1685 {
1686 struct sctp_reset_streams *srs;
1687 uint32_t i;
1688 size_t len;
1689
1690 LOG(("Connection %p: Sending outgoing stream reset for %d streams",
1691 (void *) this, mStreamsResetting.Length()));
1692 mLock.AssertCurrentThreadOwns();
1693 if (mStreamsResetting.IsEmpty()) {
1694 LOG(("No streams to reset"));
1695 return;
1696 }
1697 len = sizeof(sctp_assoc_t) + (2 + mStreamsResetting.Length()) * sizeof(uint16_t);
1698 srs = static_cast<struct sctp_reset_streams *> (moz_xmalloc(len)); // infallible malloc
1699 memset(srs, 0, len);
1700 srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
1701 srs->srs_number_streams = mStreamsResetting.Length();
1702 for (i = 0; i < mStreamsResetting.Length(); ++i) {
1703 srs->srs_stream_list[i] = mStreamsResetting[i];
1704 }
1705 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_RESET_STREAMS, srs, (socklen_t)len) < 0) {
1706 LOG(("***failed: setsockopt RESET, errno %d", errno));
1707 // if errno == EALREADY, this is normal - we can't send another reset
1708 // with one pending.
1709 // When we get an incoming reset (which may be a response to our
1710 // outstanding one), see if we have any pending outgoing resets and
1711 // send them
1712 } else {
1713 mStreamsResetting.Clear();
1714 }
1715 free(srs);
1716 }
1717
1718 void
1719 DataChannelConnection::HandleStreamResetEvent(const struct sctp_stream_reset_event *strrst)
1720 {
1721 uint32_t n, i;
1722 RefPtr<DataChannel> channel; // since we may null out the ref to the channel
1723
1724 if (!(strrst->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
1725 !(strrst->strreset_flags & SCTP_STREAM_RESET_FAILED)) {
1726 n = (strrst->strreset_length - sizeof(struct sctp_stream_reset_event)) / sizeof(uint16_t);
1727 for (i = 0; i < n; ++i) {
1728 if (strrst->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
1729 channel = FindChannelByStream(strrst->strreset_stream_list[i]);
1730 if (channel) {
1731 // The other side closed the channel
1732 // We could be in three states:
1733 // 1. Normal state (input and output streams (OPEN)
1734 // Notify application, send a RESET in response on our
1735 // outbound channel. Go to CLOSED
1736 // 2. We sent our own reset (CLOSING); either they crossed on the
1737 // wire, or this is a response to our Reset.
1738 // Go to CLOSED
1739 // 3. We've sent a open but haven't gotten a response yet (CONNECTING)
1740 // I believe this is impossible, as we don't have an input stream yet.
1741
1742 LOG(("Incoming: Channel %u closed, state %d",
1743 channel->mStream, channel->mState));
1744 ASSERT_WEBRTC(channel->mState == DataChannel::OPEN ||
1745 channel->mState == DataChannel::CLOSING ||
1746 channel->mState == DataChannel::CONNECTING ||
1747 channel->mState == DataChannel::WAITING_TO_OPEN);
1748 if (channel->mState == DataChannel::OPEN ||
1749 channel->mState == DataChannel::WAITING_TO_OPEN) {
1750 // Mark the stream for reset (the reset is sent below)
1751 ResetOutgoingStream(channel->mStream);
1752 }
1753 mStreams[channel->mStream] = nullptr;
1754
1755 LOG(("Disconnected DataChannel %p from connection %p",
1756 (void *) channel.get(), (void *) channel->mConnection.get()));
1757 // This sends ON_CHANNEL_CLOSED to mainthread
1758 channel->StreamClosedLocked();
1759 } else {
1760 LOG(("Can't find incoming channel %d",i));
1761 }
1762 }
1763 }
1764 }
1765
1766 // Process any pending resets now:
1767 if (!mStreamsResetting.IsEmpty()) {
1768 LOG(("Sending %d pending resets", mStreamsResetting.Length()));
1769 SendOutgoingStreamReset();
1770 }
1771 }
1772
1773 void
1774 DataChannelConnection::HandleStreamChangeEvent(const struct sctp_stream_change_event *strchg)
1775 {
1776 uint16_t stream;
1777 RefPtr<DataChannel> channel;
1778
1779 if (strchg->strchange_flags == SCTP_STREAM_CHANGE_DENIED) {
1780 LOG(("*** Failed increasing number of streams from %u (%u/%u)",
1781 mStreams.Length(),
1782 strchg->strchange_instrms,
1783 strchg->strchange_outstrms));
1784 // XXX FIX! notify pending opens of failure
1785 return;
1786 } else {
1787 if (strchg->strchange_instrms > mStreams.Length()) {
1788 LOG(("Other side increased streams from %u to %u",
1789 mStreams.Length(), strchg->strchange_instrms));
1790 }
1791 if (strchg->strchange_outstrms > mStreams.Length() ||
1792 strchg->strchange_instrms > mStreams.Length()) {
1793 uint16_t old_len = mStreams.Length();
1794 uint16_t new_len = std::max(strchg->strchange_outstrms,
1795 strchg->strchange_instrms);
1796 LOG(("Increasing number of streams from %u to %u - adding %u (in: %u)",
1797 old_len, new_len, new_len - old_len,
1798 strchg->strchange_instrms));
1799 // make sure both are the same length
1800 mStreams.AppendElements(new_len - old_len);
1801 LOG(("New length = %d (was %d)", mStreams.Length(), old_len));
1802 for (size_t i = old_len; i < mStreams.Length(); ++i) {
1803 mStreams[i] = nullptr;
1804 }
1805 // Re-process any channels waiting for streams.
1806 // Linear search, but we don't increase channels often and
1807 // the array would only get long in case of an app error normally
1808
1809 // Make sure we request enough streams if there's a big jump in streams
1810 // Could make a more complex API for OpenXxxFinish() and avoid this loop
1811 size_t num_needed = mPending.GetSize();
1812 LOG(("%d of %d new streams already needed", num_needed,
1813 new_len - old_len));
1814 num_needed -= (new_len - old_len); // number we added
1815 if (num_needed > 0) {
1816 if (num_needed < 16)
1817 num_needed = 16;
1818 LOG(("Not enough new streams, asking for %d more", num_needed));
1819 RequestMoreStreams(num_needed);
1820 } else if (strchg->strchange_outstrms < strchg->strchange_instrms) {
1821 LOG(("Requesting %d output streams to match partner",
1822 strchg->strchange_instrms - strchg->strchange_outstrms));
1823 RequestMoreStreams(strchg->strchange_instrms - strchg->strchange_outstrms);
1824 }
1825
1826 ProcessQueuedOpens();
1827 }
1828 // else probably not a change in # of streams
1829 }
1830
1831 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1832 channel = mStreams[i];
1833 if (!channel)
1834 continue;
1835
1836 if ((channel->mState == CONNECTING) &&
1837 (channel->mStream == INVALID_STREAM)) {
1838 if ((strchg->strchange_flags & SCTP_STREAM_CHANGE_DENIED) ||
1839 (strchg->strchange_flags & SCTP_STREAM_CHANGE_FAILED)) {
1840 /* XXX: Signal to the other end. */
1841 channel->mState = CLOSED;
1842 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
1843 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
1844 channel)));
1845 // maybe fire onError (bug 843625)
1846 } else {
1847 stream = FindFreeStream();
1848 if (stream != INVALID_STREAM) {
1849 channel->mStream = stream;
1850 mStreams[stream] = channel;
1851 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
1852 // Note: we're locked, so there's no danger of a race with the
1853 // buffer-threshold callback
1854 } else {
1855 /* We will not find more ... */
1856 break;
1857 }
1858 }
1859 }
1860 }
1861 }
1862
1863 // Called with mLock locked!
1864 void
1865 DataChannelConnection::HandleNotification(const union sctp_notification *notif, size_t n)
1866 {
1867 mLock.AssertCurrentThreadOwns();
1868 if (notif->sn_header.sn_length != (uint32_t)n) {
1869 return;
1870 }
1871 switch (notif->sn_header.sn_type) {
1872 case SCTP_ASSOC_CHANGE:
1873 HandleAssociationChangeEvent(&(notif->sn_assoc_change));
1874 break;
1875 case SCTP_PEER_ADDR_CHANGE:
1876 HandlePeerAddressChangeEvent(&(notif->sn_paddr_change));
1877 break;
1878 case SCTP_REMOTE_ERROR:
1879 HandleRemoteErrorEvent(&(notif->sn_remote_error));
1880 break;
1881 case SCTP_SHUTDOWN_EVENT:
1882 HandleShutdownEvent(&(notif->sn_shutdown_event));
1883 break;
1884 case SCTP_ADAPTATION_INDICATION:
1885 HandleAdaptationIndication(&(notif->sn_adaptation_event));
1886 break;
1887 case SCTP_PARTIAL_DELIVERY_EVENT:
1888 LOG(("SCTP_PARTIAL_DELIVERY_EVENT"));
1889 break;
1890 case SCTP_AUTHENTICATION_EVENT:
1891 LOG(("SCTP_AUTHENTICATION_EVENT"));
1892 break;
1893 case SCTP_SENDER_DRY_EVENT:
1894 //LOG(("SCTP_SENDER_DRY_EVENT"));
1895 break;
1896 case SCTP_NOTIFICATIONS_STOPPED_EVENT:
1897 LOG(("SCTP_NOTIFICATIONS_STOPPED_EVENT"));
1898 break;
1899 case SCTP_SEND_FAILED_EVENT:
1900 HandleSendFailedEvent(&(notif->sn_send_failed_event));
1901 break;
1902 case SCTP_STREAM_RESET_EVENT:
1903 HandleStreamResetEvent(&(notif->sn_strreset_event));
1904 break;
1905 case SCTP_ASSOC_RESET_EVENT:
1906 LOG(("SCTP_ASSOC_RESET_EVENT"));
1907 break;
1908 case SCTP_STREAM_CHANGE_EVENT:
1909 HandleStreamChangeEvent(&(notif->sn_strchange_event));
1910 break;
1911 default:
1912 LOG(("unknown SCTP event: %u", (uint32_t)notif->sn_header.sn_type));
1913 break;
1914 }
1915 }
1916
1917 int
1918 DataChannelConnection::ReceiveCallback(struct socket* sock, void *data, size_t datalen,
1919 struct sctp_rcvinfo rcv, int32_t flags)
1920 {
1921 ASSERT_WEBRTC(!NS_IsMainThread());
1922
1923 if (!data) {
1924 usrsctp_close(sock); // SCTP has finished shutting down
1925 } else {
1926 MutexAutoLock lock(mLock);
1927 if (flags & MSG_NOTIFICATION) {
1928 HandleNotification(static_cast<union sctp_notification *>(data), datalen);
1929 } else {
1930 HandleMessage(data, datalen, ntohl(rcv.rcv_ppid), rcv.rcv_sid);
1931 }
1932 }
1933 // sctp allocates 'data' with malloc(), and expects the receiver to free
1934 // it (presumably with free).
1935 // XXX future optimization: try to deliver messages without an internal
1936 // alloc/copy, and if so delay the free until later.
1937 free(data);
1938 // usrsctp defines the callback as returning an int, but doesn't use it
1939 return 1;
1940 }
1941
1942 already_AddRefed<DataChannel>
1943 DataChannelConnection::Open(const nsACString& label, const nsACString& protocol,
1944 Type type, bool inOrder,
1945 uint32_t prValue, DataChannelListener *aListener,
1946 nsISupports *aContext, bool aExternalNegotiated,
1947 uint16_t aStream)
1948 {
1949 // aStream == INVALID_STREAM to have the protocol allocate
1950 uint16_t prPolicy = SCTP_PR_SCTP_NONE;
1951 uint32_t flags;
1952
1953 LOG(("DC Open: label %s/%s, type %u, inorder %d, prValue %u, listener %p, context %p, external: %s, stream %u",
1954 PromiseFlatCString(label).get(), PromiseFlatCString(protocol).get(),
1955 type, inOrder, prValue, aListener, aContext,
1956 aExternalNegotiated ? "true" : "false", aStream));
1957 switch (type) {
1958 case DATA_CHANNEL_RELIABLE:
1959 prPolicy = SCTP_PR_SCTP_NONE;
1960 break;
1961 case DATA_CHANNEL_PARTIAL_RELIABLE_REXMIT:
1962 prPolicy = SCTP_PR_SCTP_RTX;
1963 break;
1964 case DATA_CHANNEL_PARTIAL_RELIABLE_TIMED:
1965 prPolicy = SCTP_PR_SCTP_TTL;
1966 break;
1967 }
1968 if ((prPolicy == SCTP_PR_SCTP_NONE) && (prValue != 0)) {
1969 return nullptr;
1970 }
1971
1972 // Don't look past currently-negotiated streams
1973 if (aStream != INVALID_STREAM && aStream < mStreams.Length() && mStreams[aStream]) {
1974 LOG(("ERROR: external negotiation of already-open channel %u", aStream));
1975 // XXX How do we indicate this up to the application? Probably the
1976 // caller's job, but we may need to return an error code.
1977 return nullptr;
1978 }
1979
1980 flags = !inOrder ? DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED : 0;
1981 RefPtr<DataChannel> channel(new DataChannel(this,
1982 aStream,
1983 DataChannel::CONNECTING,
1984 label, protocol,
1985 type, prValue,
1986 flags,
1987 aListener, aContext));
1988 if (aExternalNegotiated) {
1989 channel->mFlags |= DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED;
1990 }
1991
1992 MutexAutoLock lock(mLock); // OpenFinish assumes this
1993 return OpenFinish(channel.forget());
1994 }
1995
1996 // Separate routine so we can also call it to finish up from pending opens
1997 already_AddRefed<DataChannel>
1998 DataChannelConnection::OpenFinish(already_AddRefed<DataChannel>&& aChannel)
1999 {
2000 RefPtr<DataChannel> channel(aChannel); // takes the reference passed in
2001 // Normally 1 reference if called from ::Open(), or 2 if called from
2002 // ProcessQueuedOpens() unless the DOMDataChannel was gc'd
2003 uint16_t stream = channel->mStream;
2004 bool queue = false;
2005
2006 mLock.AssertCurrentThreadOwns();
2007
2008 // Cases we care about:
2009 // Pre-negotiated:
2010 // Not Open:
2011 // Doesn't fit:
2012 // -> change initial ask or renegotiate after open
2013 // -> queue open
2014 // Open:
2015 // Doesn't fit:
2016 // -> RequestMoreStreams && queue
2017 // Does fit:
2018 // -> open
2019 // Not negotiated:
2020 // Not Open:
2021 // -> queue open
2022 // Open:
2023 // -> Try to get a stream
2024 // Doesn't fit:
2025 // -> RequestMoreStreams && queue
2026 // Does fit:
2027 // -> open
2028 // So the Open cases are basically the same
2029 // Not Open cases are simply queue for non-negotiated, and
2030 // either change the initial ask or possibly renegotiate after open.
2031
2032 if (mState == OPEN) {
2033 if (stream == INVALID_STREAM) {
2034 stream = FindFreeStream(); // may be INVALID_STREAM if we need more
2035 }
2036 if (stream == INVALID_STREAM || stream >= mStreams.Length()) {
2037 // RequestMoreStreams() limits to MAX_NUM_STREAMS -- allocate extra streams
2038 // to avoid going back immediately for more if the ask to N, N+1, etc
2039 int32_t more_needed = (stream == INVALID_STREAM) ? 16 :
2040 (stream-((int32_t)mStreams.Length())) + 16;
2041 if (!RequestMoreStreams(more_needed)) {
2042 // Something bad happened... we're done
2043 goto request_error_cleanup;
2044 }
2045 queue = true;
2046 }
2047 } else {
2048 // not OPEN
2049 if (stream != INVALID_STREAM && stream >= mStreams.Length() &&
2050 mState == CLOSED) {
2051 // Update number of streams for init message
2052 struct sctp_initmsg initmsg;
2053 socklen_t len = sizeof(initmsg);
2054 int32_t total_needed = stream+16;
2055
2056 memset(&initmsg, 0, sizeof(initmsg));
2057 if (usrsctp_getsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg, &len) < 0) {
2058 LOG(("*** failed getsockopt SCTP_INITMSG"));
2059 goto request_error_cleanup;
2060 }
2061 LOG(("Setting number of SCTP streams to %u, was %u/%u", total_needed,
2062 initmsg.sinit_num_ostreams, initmsg.sinit_max_instreams));
2063 initmsg.sinit_num_ostreams = total_needed;
2064 initmsg.sinit_max_instreams = MAX_NUM_STREAMS;
2065 if (usrsctp_setsockopt(mMasterSocket, IPPROTO_SCTP, SCTP_INITMSG, &initmsg,
2066 (socklen_t)sizeof(initmsg)) < 0) {
2067 LOG(("*** failed setsockopt SCTP_INITMSG, errno %d", errno));
2068 goto request_error_cleanup;
2069 }
2070
2071 int32_t old_len = mStreams.Length();
2072 mStreams.AppendElements(total_needed - old_len);
2073 for (int32_t i = old_len; i < total_needed; ++i) {
2074 mStreams[i] = nullptr;
2075 }
2076 }
2077 // else if state is CONNECTING, we'll just re-negotiate when OpenFinish
2078 // is called, if needed
2079 queue = true;
2080 }
2081 if (queue) {
2082 LOG(("Queuing channel %p (%u) to finish open", channel.get(), stream));
2083 // Also serves to mark we told the app
2084 channel->mFlags |= DATA_CHANNEL_FLAGS_FINISH_OPEN;
2085 // we need a ref for the nsDeQue and one to return
2086 DataChannel* rawChannel = channel;
2087 rawChannel->AddRef();
2088 mPending.Push(rawChannel);
2089 return channel.forget();
2090 }
2091
2092 MOZ_ASSERT(stream != INVALID_STREAM);
2093 // just allocated (& OPEN), or externally negotiated
2094 mStreams[stream] = channel; // holds a reference
2095 channel->mStream = stream;
2096
2097 #ifdef TEST_QUEUED_DATA
2098 // It's painful to write a test for this...
2099 channel->mState = OPEN;
2100 channel->mReady = true;
2101 SendMsgInternal(channel, "Help me!", 8, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2102 #endif
2103
2104 if (channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) {
2105 // Don't send unordered until this gets cleared
2106 channel->mFlags |= DATA_CHANNEL_FLAGS_WAITING_ACK;
2107 }
2108
2109 if (!(channel->mFlags & DATA_CHANNEL_FLAGS_EXTERNAL_NEGOTIATED)) {
2110 if (!SendOpenRequestMessage(channel->mLabel, channel->mProtocol,
2111 stream,
2112 !!(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED),
2113 channel->mPrPolicy, channel->mPrValue)) {
2114 LOG(("SendOpenRequest failed, errno = %d", errno));
2115 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2116 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_REQ;
2117 // Note: we're locked, so there's no danger of a race with the
2118 // buffer-threshold callback
2119 return channel.forget();
2120 } else {
2121 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2122 // We already returned the channel to the app.
2123 NS_ERROR("Failed to send open request");
2124 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2125 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2126 channel)));
2127 }
2128 // If we haven't returned the channel yet, it will get destroyed when we exit
2129 // this function.
2130 mStreams[stream] = nullptr;
2131 channel->mStream = INVALID_STREAM;
2132 // we'll be destroying the channel
2133 channel->mState = CLOSED;
2134 return nullptr;
2135 }
2136 /* NOTREACHED */
2137 }
2138 }
2139 // Either externally negotiated or we sent Open
2140 channel->mState = OPEN;
2141 channel->mReady = true;
2142 // FIX? Move into DOMDataChannel? I don't think we can send it yet here
2143 LOG(("%s: sending ON_CHANNEL_OPEN for %p", __FUNCTION__, channel.get()));
2144 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2145 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, this,
2146 channel)));
2147
2148 return channel.forget();
2149
2150 request_error_cleanup:
2151 channel->mState = CLOSED;
2152 if (channel->mFlags & DATA_CHANNEL_FLAGS_FINISH_OPEN) {
2153 // We already returned the channel to the app.
2154 NS_ERROR("Failed to request more streams");
2155 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2156 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED, this,
2157 channel)));
2158 return channel.forget();
2159 }
2160 // we'll be destroying the channel, but it never really got set up
2161 // Alternative would be to RUN_ON_THREAD(channel.forget(),::Destroy,...) and
2162 // Dispatch it to ourselves
2163 return nullptr;
2164 }
2165
2166 int32_t
2167 DataChannelConnection::SendMsgInternal(DataChannel *channel, const char *data,
2168 size_t length, uint32_t ppid)
2169 {
2170 uint16_t flags;
2171 struct sctp_sendv_spa spa;
2172 int32_t result;
2173
2174 NS_ENSURE_TRUE(channel->mState == OPEN || channel->mState == CONNECTING, 0);
2175 NS_WARNING_ASSERTION(length > 0, "Length is 0?!");
2176
2177 // To avoid problems where an in-order OPEN is lost and an
2178 // out-of-order data message "beats" it, require data to be in-order
2179 // until we get an ACK.
2180 if ((channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED) &&
2181 !(channel->mFlags & DATA_CHANNEL_FLAGS_WAITING_ACK)) {
2182 flags = SCTP_UNORDERED;
2183 } else {
2184 flags = 0;
2185 }
2186
2187 spa.sendv_sndinfo.snd_ppid = htonl(ppid);
2188 spa.sendv_sndinfo.snd_sid = channel->mStream;
2189 spa.sendv_sndinfo.snd_flags = flags;
2190 spa.sendv_sndinfo.snd_context = 0;
2191 spa.sendv_sndinfo.snd_assoc_id = 0;
2192 spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
2193
2194 if (channel->mPrPolicy != SCTP_PR_SCTP_NONE) {
2195 spa.sendv_prinfo.pr_policy = channel->mPrPolicy;
2196 spa.sendv_prinfo.pr_value = channel->mPrValue;
2197 spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
2198 }
2199
2200 // Note: Main-thread IO, but doesn't block!
2201 // XXX FIX! to deal with heavy overruns of JS trying to pass data in
2202 // (more than the buffersize) queue data onto another thread to do the
2203 // actual sends. See netwerk/protocol/websocket/WebSocketChannel.cpp
2204
2205 // SCTP will return EMSGSIZE if the message is bigger than the buffer
2206 // size (or EAGAIN if there isn't space)
2207
2208 // Avoid a race between buffer-full-failure (where we have to add the
2209 // packet to the buffered-data queue) and the buffer-now-only-half-full
2210 // callback, which happens on a different thread. Otherwise we might
2211 // fail here, then before we add it to the queue get the half-full
2212 // callback, find nothing to do, then on this thread add it to the
2213 // queue - which would sit there. Also, if we later send more data, it
2214 // would arrive ahead of the buffered message, but if the buffer ever
2215 // got to 1/2 full, the message would get sent - but at a semi-random
2216 // time, after other data it was supposed to be in front of.
2217
2218 // Must lock before empty check for similar reasons!
2219 MutexAutoLock lock(mLock);
2220 if (channel->mBufferedData.IsEmpty()) {
2221 result = usrsctp_sendv(mSocket, data, length,
2222 nullptr, 0,
2223 (void *)&spa, (socklen_t)sizeof(struct sctp_sendv_spa),
2224 SCTP_SENDV_SPA, 0);
2225 LOG(("Sent buffer (len=%u), result=%d", length, result));
2226 } else {
2227 // Fake EAGAIN if we're already buffering data
2228 result = -1;
2229 errno = EAGAIN;
2230 }
2231 if (result < 0) {
2232 if (errno == EAGAIN || errno == EWOULDBLOCK) {
2233
2234 // queue data for resend! And queue any further data for the stream until it is...
2235 auto *buffered = new BufferedMsg(spa, data, length); // infallible malloc
2236 channel->mBufferedData.AppendElement(buffered); // owned by mBufferedData array
2237 channel->mFlags |= DATA_CHANNEL_FLAGS_SEND_DATA;
2238 LOG(("Queued %u buffers (len=%u)", channel->mBufferedData.Length(), length));
2239 return 0;
2240 }
2241 LOG(("error %d sending string", errno));
2242 }
2243 return result;
2244 }
2245
2246 // Handles fragmenting binary messages
2247 int32_t
2248 DataChannelConnection::SendBinary(DataChannel *channel, const char *data,
2249 size_t len,
2250 uint32_t ppid_partial, uint32_t ppid_final)
2251 {
2252 // Since there's a limit on network buffer size and no limits on message
2253 // size, and we don't want to use EOR mode (multiple writes for a
2254 // message, but all other streams are blocked until you finish sending
2255 // this message), we need to add application-level fragmentation of large
2256 // messages. On a reliable channel, these can be simply rebuilt into a
2257 // large message. On an unreliable channel, we can't and don't know how
2258 // long to wait, and there are no retransmissions, and no easy way to
2259 // tell the user "this part is missing", so on unreliable channels we
2260 // need to return an error if sending more bytes than the network buffers
2261 // can hold, and perhaps a lower number.
2262
2263 // We *really* don't want to do this from main thread! - and SendMsgInternal
2264 // avoids blocking.
2265 // This MUST be reliable and in-order for the reassembly to work
2266 if (len > DATA_CHANNEL_MAX_BINARY_FRAGMENT &&
2267 channel->mPrPolicy == DATA_CHANNEL_RELIABLE &&
2268 !(channel->mFlags & DATA_CHANNEL_FLAGS_OUT_OF_ORDER_ALLOWED)) {
2269 int32_t sent=0;
2270 uint32_t origlen = len;
2271 LOG(("Sending binary message length %u in chunks", len));
2272 // XXX check flags for out-of-order, or force in-order for large binary messages
2273 while (len > 0) {
2274 size_t sendlen = std::min<size_t>(len, DATA_CHANNEL_MAX_BINARY_FRAGMENT);
2275 uint32_t ppid;
2276 len -= sendlen;
2277 ppid = len > 0 ? ppid_partial : ppid_final;
2278 LOG(("Send chunk of %u bytes, ppid %u", sendlen, ppid));
2279 // Note that these might end up being deferred and queued.
2280 sent += SendMsgInternal(channel, data, sendlen, ppid);
2281 data += sendlen;
2282 }
2283 LOG(("Sent %d buffers for %u bytes, %d sent immediately, %d buffers queued",
2284 (origlen+DATA_CHANNEL_MAX_BINARY_FRAGMENT-1)/DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2285 origlen, sent,
2286 channel->mBufferedData.Length()));
2287 return sent;
2288 }
2289 NS_WARNING_ASSERTION(len <= DATA_CHANNEL_MAX_BINARY_FRAGMENT,
2290 "Sending too-large data on unreliable channel!");
2291
2292 // This will fail if the message is too large (default 256K)
2293 return SendMsgInternal(channel, data, len, ppid_final);
2294 }
2295
2296 class ReadBlobRunnable : public Runnable {
2297 public:
2298 ReadBlobRunnable(DataChannelConnection* aConnection, uint16_t aStream,
2299 nsIInputStream* aBlob) :
2300 mConnection(aConnection),
2301 mStream(aStream),
2302 mBlob(aBlob)
2303 {}
2304
2305 NS_IMETHOD Run() override {
2306 // ReadBlob() is responsible to releasing the reference
2307 DataChannelConnection *self = mConnection;
2308 self->ReadBlob(mConnection.forget(), mStream, mBlob);
2309 return NS_OK;
2310 }
2311
2312 private:
2313 // Make sure the Connection doesn't die while there are jobs outstanding.
2314 // Let it die (if released by PeerConnectionImpl while we're running)
2315 // when we send our runnable back to MainThread. Then ~DataChannelConnection
2316 // can send the IOThread to MainThread to die in a runnable, avoiding
2317 // unsafe event loop recursion. Evil.
2318 RefPtr<DataChannelConnection> mConnection;
2319 uint16_t mStream;
2320 // Use RefCount for preventing the object is deleted when SendBlob returns.
2321 RefPtr<nsIInputStream> mBlob;
2322 };
2323
2324 int32_t
2325 DataChannelConnection::SendBlob(uint16_t stream, nsIInputStream *aBlob)
2326 {
2327 DataChannel *channel = mStreams[stream];
2328 NS_ENSURE_TRUE(channel, 0);
2329 // Spawn a thread to send the data
2330 if (!mInternalIOThread) {
2331 nsresult res = NS_NewThread(getter_AddRefs(mInternalIOThread));
2332 if (NS_FAILED(res)) {
2333 return -1;
2334 }
2335 }
2336
2337 mInternalIOThread->Dispatch(do_AddRef(new ReadBlobRunnable(this, stream, aBlob)), NS_DISPATCH_NORMAL);
2338 return 0;
2339 }
2340
2341 class DataChannelBlobSendRunnable : public Runnable
2342 {
2343 public:
2344 DataChannelBlobSendRunnable(already_AddRefed<DataChannelConnection>& aConnection,
2345 uint16_t aStream)
2346 : mConnection(aConnection)
2347 , mStream(aStream) {}
2348
2349 ~DataChannelBlobSendRunnable() override
2350 {
2351 if (!NS_IsMainThread() && mConnection) {
2352 MOZ_ASSERT(false);
2353 // explicitly leak the connection if destroyed off mainthread
2354 Unused << mConnection.forget().take();
2355 }
2356 }
2357
2358 NS_IMETHOD Run() override
2359 {
2360 ASSERT_WEBRTC(NS_IsMainThread());
2361
2362 mConnection->SendBinaryMsg(mStream, mData);
2363 mConnection = nullptr;
2364 return NS_OK;
2365 }
2366
2367 // explicitly public so we can avoid allocating twice and copying
2368 nsCString mData;
2369
2370 private:
2371 // Note: we can be destroyed off the target thread, so be careful not to let this
2372 // get Released()ed on the temp thread!
2373 RefPtr<DataChannelConnection> mConnection;
2374 uint16_t mStream;
2375 };
2376
2377 void
2378 DataChannelConnection::ReadBlob(already_AddRefed<DataChannelConnection> aThis,
2379 uint16_t aStream, nsIInputStream* aBlob)
2380 {
2381 // NOTE: 'aThis' has been forgotten by the caller to avoid releasing
2382 // it off mainthread; if PeerConnectionImpl has released then we want
2383 // ~DataChannelConnection() to run on MainThread
2384
2385 // XXX to do this safely, we must enqueue these atomically onto the
2386 // output socket. We need a sender thread(s?) to enqueue data into the
2387 // socket and to avoid main-thread IO that might block. Even on a
2388 // background thread, we may not want to block on one stream's data.
2389 // I.e. run non-blocking and service multiple channels.
2390
2391 // For now as a hack, send as a single blast of queued packets which may
2392 // be deferred until buffer space is available.
2393 uint64_t len;
2394 nsCOMPtr<nsIThread> mainThread;
2395 NS_GetMainThread(getter_AddRefs(mainThread));
2396
2397 // Must not let Dispatching it cause the DataChannelConnection to get
2398 // released on the wrong thread. Using WrapRunnable(RefPtr<DataChannelConnection>(aThis),...
2399 // will occasionally cause aThis to get released on this thread. Also, an explicit Runnable
2400 // lets us avoid copying the blob data an extra time.
2401 RefPtr<DataChannelBlobSendRunnable> runnable = new DataChannelBlobSendRunnable(aThis,
2402 aStream);
2403 // avoid copying the blob data by passing the mData from the runnable
2404 if (NS_FAILED(aBlob->Available(&len)) ||
2405 NS_FAILED(NS_ReadInputStreamToString(aBlob, runnable->mData, len))) {
2406 // Bug 966602: Doesn't return an error to the caller via onerror.
2407 // We must release DataChannelConnection on MainThread to avoid issues (bug 876167)
2408 // aThis is now owned by the runnable; release it there
2409 NS_ProxyRelease(mainThread, runnable.forget());
2410 return;
2411 }
2412 aBlob->Close();
2413 NS_DispatchToMainThread(runnable, NS_DISPATCH_NORMAL);
2414 }
2415
2416 void
2417 DataChannelConnection::GetStreamIds(std::vector<uint16_t>* aStreamList)
2418 {
2419 ASSERT_WEBRTC(NS_IsMainThread());
2420 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2421 if (mStreams[i]) {
2422 aStreamList->push_back(mStreams[i]->mStream);
2423 }
2424 }
2425 }
2426
2427 int32_t
2428 DataChannelConnection::SendMsgCommon(uint16_t stream, const nsACString &aMsg,
2429 bool isBinary)
2430 {
2431 ASSERT_WEBRTC(NS_IsMainThread());
2432 // We really could allow this from other threads, so long as we deal with
2433 // asynchronosity issues with channels closing, in particular access to
2434 // mStreams, and issues with the association closing (access to mSocket).
2435
2436 const char *data = aMsg.BeginReading();
2437 uint32_t len = aMsg.Length();
2438 DataChannel *channel;
2439
2440 LOG(("Sending %sto stream %u: %u bytes", isBinary ? "binary " : "", stream, len));
2441 // XXX if we want more efficiency, translate flags once at open time
2442 channel = mStreams[stream];
2443 NS_ENSURE_TRUE(channel, 0);
2444
2445 if (isBinary)
2446 return SendBinary(channel, data, len,
2447 DATA_CHANNEL_PPID_BINARY, DATA_CHANNEL_PPID_BINARY_LAST);
2448 return SendBinary(channel, data, len,
2449 DATA_CHANNEL_PPID_DOMSTRING, DATA_CHANNEL_PPID_DOMSTRING_LAST);
2450 }
2451
2452 void
2453 DataChannelConnection::Close(DataChannel *aChannel)
2454 {
2455 MutexAutoLock lock(mLock);
2456 CloseInt(aChannel);
2457 }
2458
2459 // So we can call Close() with the lock already held
2460 // Called from someone who holds a ref via ::Close(), or from ~DataChannel
2461 void
2462 DataChannelConnection::CloseInt(DataChannel *aChannel)
2463 {
2464 MOZ_ASSERT(aChannel);
2465 RefPtr<DataChannel> channel(aChannel); // make sure it doesn't go away on us
2466
2467 mLock.AssertCurrentThreadOwns();
2468 LOG(("Connection %p/Channel %p: Closing stream %u",
2469 channel->mConnection.get(), channel.get(), channel->mStream));
2470 // re-test since it may have closed before the lock was grabbed
2471 if (aChannel->mState == CLOSED || aChannel->mState == CLOSING) {
2472 LOG(("Channel already closing/closed (%u)", aChannel->mState));
2473 if (mState == CLOSED && channel->mStream != INVALID_STREAM) {
2474 // called from CloseAll()
2475 // we're not going to hang around waiting any more
2476 mStreams[channel->mStream] = nullptr;
2477 }
2478 return;
2479 }
2480 aChannel->mBufferedData.Clear();
2481 if (channel->mStream != INVALID_STREAM) {
2482 ResetOutgoingStream(channel->mStream);
2483 if (mState == CLOSED) { // called from CloseAll()
2484 // Let resets accumulate then send all at once in CloseAll()
2485 // we're not going to hang around waiting
2486 mStreams[channel->mStream] = nullptr;
2487 } else {
2488 SendOutgoingStreamReset();
2489 }
2490 }
2491 aChannel->mState = CLOSING;
2492 if (mState == CLOSED) {
2493 // we're not going to hang around waiting
2494 channel->StreamClosedLocked();
2495 }
2496 // At this point when we leave here, the object is a zombie held alive only by the DOM object
2497 }
2498
2499 void DataChannelConnection::CloseAll()
2500 {
2501 LOG(("Closing all channels (connection %p)", (void*) this));
2502 // Don't need to lock here
2503
2504 // Make sure no more channels will be opened
2505 {
2506 MutexAutoLock lock(mLock);
2507 mState = CLOSED;
2508 }
2509
2510 // Close current channels
2511 // If there are runnables, they hold a strong ref and keep the channel
2512 // and/or connection alive (even if in a CLOSED state)
2513 bool closed_some = false;
2514 for (uint32_t i = 0; i < mStreams.Length(); ++i) {
2515 if (mStreams[i]) {
2516 mStreams[i]->Close();
2517 closed_some = true;
2518 }
2519 }
2520
2521 // Clean up any pending opens for channels
2522 RefPtr<DataChannel> channel;
2523 while (nullptr != (channel = dont_AddRef(static_cast<DataChannel *>(mPending.PopFront())))) {
2524 LOG(("closing pending channel %p, stream %u", channel.get(), channel->mStream));
2525 channel->Close(); // also releases the ref on each iteration
2526 closed_some = true;
2527 }
2528 // It's more efficient to let the Resets queue in shutdown and then
2529 // SendOutgoingStreamReset() here.
2530 if (closed_some) {
2531 MutexAutoLock lock(mLock);
2532 SendOutgoingStreamReset();
2533 }
2534 }
2535
2536 DataChannel::~DataChannel()
2537 {
2538 // NS_ASSERTION since this is more "I think I caught all the cases that
2539 // can cause this" than a true kill-the-program assertion. If this is
2540 // wrong, nothing bad happens. A worst it's a leak.
2541 NS_ASSERTION(mState == CLOSED || mState == CLOSING, "unexpected state in ~DataChannel");
2542 }
2543
2544 void
2545 DataChannel::Close()
2546 {
2547 if (mConnection) {
2548 // ensure we don't get deleted
2549 RefPtr<DataChannelConnection> connection(mConnection);
2550 connection->Close(this);
2551 }
2552 }
2553
2554 // Used when disconnecting from the DataChannelConnection
2555 void
2556 DataChannel::StreamClosedLocked()
2557 {
2558 mConnection->mLock.AssertCurrentThreadOwns();
2559 ENSURE_DATACONNECTION;
2560
2561 LOG(("Destroying Data channel %u", mStream));
2562 MOZ_ASSERT_IF(mStream != INVALID_STREAM,
2563 !mConnection->FindChannelByStream(mStream));
2564 mStream = INVALID_STREAM;
2565 mState = CLOSED;
2566 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2567 DataChannelOnMessageAvailable::ON_CHANNEL_CLOSED,
2568 mConnection, this)));
2569 // We leave mConnection live until the DOM releases us, to avoid races
2570 }
2571
2572 void
2573 DataChannel::ReleaseConnection()
2574 {
2575 ASSERT_WEBRTC(NS_IsMainThread());
2576 mConnection = nullptr;
2577 }
2578
2579 void
2580 DataChannel::SetListener(DataChannelListener *aListener, nsISupports *aContext)
2581 {
2582 MutexAutoLock mLock(mListenerLock);
2583 mContext = aContext;
2584 mListener = aListener;
2585 }
2586
2587 // May be called from another (i.e. Main) thread!
2588 void
2589 DataChannel::AppReady()
2590 {
2591 ENSURE_DATACONNECTION;
2592
2593 MutexAutoLock lock(mConnection->mLock);
2594
2595 mReady = true;
2596 if (mState == WAITING_TO_OPEN) {
2597 mState = OPEN;
2598 NS_DispatchToMainThread(do_AddRef(new DataChannelOnMessageAvailable(
2599 DataChannelOnMessageAvailable::ON_CHANNEL_OPEN, mConnection,
2600 this)));
2601 for (uint32_t i = 0; i < mQueuedMessages.Length(); ++i) {
2602 nsCOMPtr<nsIRunnable> runnable = mQueuedMessages[i];
2603 MOZ_ASSERT(runnable);
2604 NS_DispatchToMainThread(runnable);
2605 }
2606 } else {
2607 NS_ASSERTION(mQueuedMessages.IsEmpty(), "Shouldn't have queued messages if not WAITING_TO_OPEN");
2608 }
2609 mQueuedMessages.Clear();
2610 mQueuedMessages.Compact();
2611 // We never use it again... We could even allocate the array in the odd
2612 // cases we need it.
2613 }
2614
2615 uint32_t
2616 DataChannel::GetBufferedAmountLocked() const
2617 {
2618 size_t buffered = 0;
2619
2620 for (auto& buffer : mBufferedData) {
2621 buffered += buffer->mLength;
2622 }
2623 // XXX Note: per Michael Tuexen, there's no way to currently get the buffered
2624 // amount from the SCTP stack for a single stream. It is on their to-do
2625 // list, and once we import a stack with support for that, we'll need to
2626 // add it to what we buffer. Also we'll need to ask for notification of a per-
2627 // stream buffer-low event and merge that into the handling of buffer-low
2628 // (the equivalent to TCP_NOTSENT_LOWAT on TCP sockets)
2629
2630 if (buffered > UINT32_MAX) { // paranoia - >4GB buffered is very very unlikely
2631 buffered = UINT32_MAX;
2632 }
2633 return buffered;
2634 }
2635
2636 uint32_t
2637 DataChannel::GetBufferedAmountLowThreshold()
2638 {
2639 return mBufferedThreshold;
2640 }
2641
2642 // Never fire immediately, as it's defined to fire on transitions, not state
2643 void
2644 DataChannel::SetBufferedAmountLowThreshold(uint32_t aThreshold)
2645 {
2646 mBufferedThreshold = aThreshold;
2647 }
2648
2649 // Called with mLock locked!
2650 void
2651 DataChannel::SendOrQueue(DataChannelOnMessageAvailable *aMessage)
2652 {
2653 if (!mReady &&
2654 (mState == CONNECTING || mState == WAITING_TO_OPEN)) {
2655 mQueuedMessages.AppendElement(aMessage);
2656 } else {
2657 NS_DispatchToMainThread(aMessage);
2658 }
2659 }
2660
2661 } // namespace mozilla
2662