1// +build !js 2 3package webrtc 4 5import ( 6 "crypto/ecdsa" 7 "crypto/elliptic" 8 "crypto/rand" 9 "errors" 10 "fmt" 11 "io" 12 "strconv" 13 "strings" 14 "sync" 15 "sync/atomic" 16 "time" 17 18 "github.com/pion/ice/v2" 19 "github.com/pion/interceptor" 20 "github.com/pion/logging" 21 "github.com/pion/rtcp" 22 "github.com/pion/sdp/v3" 23 "github.com/pion/webrtc/v3/internal/util" 24 "github.com/pion/webrtc/v3/pkg/rtcerr" 25) 26 27// PeerConnection represents a WebRTC connection that establishes a 28// peer-to-peer communications with another PeerConnection instance in a 29// browser, or to another endpoint implementing the required protocols. 30type PeerConnection struct { 31 statsID string 32 mu sync.RWMutex 33 34 // ops is an operations queue which will ensure the enqueued actions are 35 // executed in order. It is used for asynchronously, but serially processing 36 // remote and local descriptions 37 ops *operations 38 39 configuration Configuration 40 41 currentLocalDescription *SessionDescription 42 pendingLocalDescription *SessionDescription 43 currentRemoteDescription *SessionDescription 44 pendingRemoteDescription *SessionDescription 45 signalingState SignalingState 46 iceConnectionState ICEConnectionState 47 connectionState PeerConnectionState 48 49 idpLoginURL *string 50 51 isClosed *atomicBool 52 isNegotiationNeeded *atomicBool 53 negotiationNeededState negotiationNeededState 54 55 lastOffer string 56 lastAnswer string 57 58 // a value containing the last known greater mid value 59 // we internally generate mids as numbers. Needed since JSEP 60 // requires that when reusing a media section a new unique mid 61 // should be defined (see JSEP 3.4.1). 62 greaterMid int 63 64 rtpTransceivers []*RTPTransceiver 65 66 onSignalingStateChangeHandler func(SignalingState) 67 onICEConnectionStateChangeHandler func(ICEConnectionState) 68 onConnectionStateChangeHandler func(PeerConnectionState) 69 onTrackHandler func(*TrackRemote, *RTPReceiver) 70 onDataChannelHandler func(*DataChannel) 71 onNegotiationNeededHandler atomic.Value // func() 72 73 iceGatherer *ICEGatherer 74 iceTransport *ICETransport 75 dtlsTransport *DTLSTransport 76 sctpTransport *SCTPTransport 77 78 // A reference to the associated API state used by this connection 79 api *API 80 log logging.LeveledLogger 81 82 interceptorRTCPWriter interceptor.RTCPWriter 83} 84 85// NewPeerConnection creates a PeerConnection with the default codecs and 86// interceptors. See RegisterDefaultCodecs and RegisterDefaultInterceptors. 87// 88// If you wish to customize the set of available codecs or the set of 89// active interceptors, create a MediaEngine and call api.NewPeerConnection 90// instead of this function. 91func NewPeerConnection(configuration Configuration) (*PeerConnection, error) { 92 m := &MediaEngine{} 93 if err := m.RegisterDefaultCodecs(); err != nil { 94 return nil, err 95 } 96 97 i := &interceptor.Registry{} 98 if err := RegisterDefaultInterceptors(m, i); err != nil { 99 return nil, err 100 } 101 102 api := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(i)) 103 return api.NewPeerConnection(configuration) 104} 105 106// NewPeerConnection creates a new PeerConnection with the provided configuration against the received API object 107func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, error) { 108 // https://w3c.github.io/webrtc-pc/#constructor (Step #2) 109 // Some variables defined explicitly despite their implicit zero values to 110 // allow better readability to understand what is happening. 111 pc := &PeerConnection{ 112 statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()), 113 configuration: Configuration{ 114 ICEServers: []ICEServer{}, 115 ICETransportPolicy: ICETransportPolicyAll, 116 BundlePolicy: BundlePolicyBalanced, 117 RTCPMuxPolicy: RTCPMuxPolicyRequire, 118 Certificates: []Certificate{}, 119 ICECandidatePoolSize: 0, 120 }, 121 ops: newOperations(), 122 isClosed: &atomicBool{}, 123 isNegotiationNeeded: &atomicBool{}, 124 negotiationNeededState: negotiationNeededStateEmpty, 125 lastOffer: "", 126 lastAnswer: "", 127 greaterMid: -1, 128 signalingState: SignalingStateStable, 129 iceConnectionState: ICEConnectionStateNew, 130 connectionState: PeerConnectionStateNew, 131 132 api: api, 133 log: api.settingEngine.LoggerFactory.NewLogger("pc"), 134 } 135 136 if !api.settingEngine.disableMediaEngineCopy { 137 pc.api = &API{ 138 settingEngine: api.settingEngine, 139 mediaEngine: api.mediaEngine.copy(), 140 interceptor: api.interceptor, 141 } 142 } 143 144 var err error 145 if err = pc.initConfiguration(configuration); err != nil { 146 return nil, err 147 } 148 149 pc.iceGatherer, err = pc.createICEGatherer() 150 if err != nil { 151 return nil, err 152 } 153 154 // Create the ice transport 155 iceTransport := pc.createICETransport() 156 pc.iceTransport = iceTransport 157 158 // Create the DTLS transport 159 dtlsTransport, err := pc.api.NewDTLSTransport(pc.iceTransport, pc.configuration.Certificates) 160 if err != nil { 161 return nil, err 162 } 163 pc.dtlsTransport = dtlsTransport 164 165 // Create the SCTP transport 166 pc.sctpTransport = pc.api.NewSCTPTransport(pc.dtlsTransport) 167 168 // Wire up the on datachannel handler 169 pc.sctpTransport.OnDataChannel(func(d *DataChannel) { 170 pc.mu.RLock() 171 handler := pc.onDataChannelHandler 172 pc.mu.RUnlock() 173 if handler != nil { 174 handler(d) 175 } 176 }) 177 178 pc.interceptorRTCPWriter = api.interceptor.BindRTCPWriter(interceptor.RTCPWriterFunc(pc.writeRTCP)) 179 180 return pc, nil 181} 182 183// initConfiguration defines validation of the specified Configuration and 184// its assignment to the internal configuration variable. This function differs 185// from its SetConfiguration counterpart because most of the checks do not 186// include verification statements related to the existing state. Thus the 187// function describes only minor verification of some the struct variables. 188func (pc *PeerConnection) initConfiguration(configuration Configuration) error { 189 if configuration.PeerIdentity != "" { 190 pc.configuration.PeerIdentity = configuration.PeerIdentity 191 } 192 193 // https://www.w3.org/TR/webrtc/#constructor (step #3) 194 if len(configuration.Certificates) > 0 { 195 now := time.Now() 196 for _, x509Cert := range configuration.Certificates { 197 if !x509Cert.Expires().IsZero() && now.After(x509Cert.Expires()) { 198 return &rtcerr.InvalidAccessError{Err: ErrCertificateExpired} 199 } 200 pc.configuration.Certificates = append(pc.configuration.Certificates, x509Cert) 201 } 202 } else { 203 sk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) 204 if err != nil { 205 return &rtcerr.UnknownError{Err: err} 206 } 207 certificate, err := GenerateCertificate(sk) 208 if err != nil { 209 return err 210 } 211 pc.configuration.Certificates = []Certificate{*certificate} 212 } 213 214 if configuration.BundlePolicy != BundlePolicy(Unknown) { 215 pc.configuration.BundlePolicy = configuration.BundlePolicy 216 } 217 218 if configuration.RTCPMuxPolicy != RTCPMuxPolicy(Unknown) { 219 pc.configuration.RTCPMuxPolicy = configuration.RTCPMuxPolicy 220 } 221 222 if configuration.ICECandidatePoolSize != 0 { 223 pc.configuration.ICECandidatePoolSize = configuration.ICECandidatePoolSize 224 } 225 226 if configuration.ICETransportPolicy != ICETransportPolicy(Unknown) { 227 pc.configuration.ICETransportPolicy = configuration.ICETransportPolicy 228 } 229 230 if configuration.SDPSemantics != SDPSemantics(Unknown) { 231 pc.configuration.SDPSemantics = configuration.SDPSemantics 232 } 233 234 sanitizedICEServers := configuration.getICEServers() 235 if len(sanitizedICEServers) > 0 { 236 for _, server := range sanitizedICEServers { 237 if err := server.validate(); err != nil { 238 return err 239 } 240 } 241 pc.configuration.ICEServers = sanitizedICEServers 242 } 243 244 return nil 245} 246 247// OnSignalingStateChange sets an event handler which is invoked when the 248// peer connection's signaling state changes 249func (pc *PeerConnection) OnSignalingStateChange(f func(SignalingState)) { 250 pc.mu.Lock() 251 defer pc.mu.Unlock() 252 pc.onSignalingStateChangeHandler = f 253} 254 255func (pc *PeerConnection) onSignalingStateChange(newState SignalingState) { 256 pc.mu.RLock() 257 handler := pc.onSignalingStateChangeHandler 258 pc.mu.RUnlock() 259 260 pc.log.Infof("signaling state changed to %s", newState) 261 if handler != nil { 262 go handler(newState) 263 } 264} 265 266// OnDataChannel sets an event handler which is invoked when a data 267// channel message arrives from a remote peer. 268func (pc *PeerConnection) OnDataChannel(f func(*DataChannel)) { 269 pc.mu.Lock() 270 defer pc.mu.Unlock() 271 pc.onDataChannelHandler = f 272} 273 274// OnNegotiationNeeded sets an event handler which is invoked when 275// a change has occurred which requires session negotiation 276func (pc *PeerConnection) OnNegotiationNeeded(f func()) { 277 pc.onNegotiationNeededHandler.Store(f) 278} 279 280func (pc *PeerConnection) onNegotiationNeeded() { 281 // https://w3c.github.io/webrtc-pc/#updating-the-negotiation-needed-flag 282 // non-canon step 1 283 pc.mu.Lock() 284 defer pc.mu.Unlock() 285 if pc.negotiationNeededState == negotiationNeededStateRun { 286 pc.negotiationNeededState = negotiationNeededStateQueue 287 return 288 } else if pc.negotiationNeededState == negotiationNeededStateQueue { 289 return 290 } 291 292 pc.negotiationNeededState = negotiationNeededStateRun 293 294 pc.ops.Enqueue(pc.negotiationNeededOp) 295} 296 297func (pc *PeerConnection) negotiationNeededOp() { 298 // Don't run NegotiatedNeeded checks if OnNegotiationNeeded is not set 299 if handler := pc.onNegotiationNeededHandler.Load(); handler == nil { 300 return 301 } 302 303 // https://www.w3.org/TR/webrtc/#updating-the-negotiation-needed-flag 304 // Step 2.1 305 if pc.isClosed.get() { 306 return 307 } 308 // non-canon step 2.2 309 if !pc.ops.IsEmpty() { 310 pc.ops.Enqueue(pc.negotiationNeededOp) 311 return 312 } 313 314 // non-canon, run again if there was a request 315 defer func() { 316 pc.mu.Lock() 317 if pc.negotiationNeededState == negotiationNeededStateQueue { 318 defer pc.onNegotiationNeeded() 319 } 320 pc.negotiationNeededState = negotiationNeededStateEmpty 321 pc.mu.Unlock() 322 }() 323 324 // Step 2.3 325 if pc.SignalingState() != SignalingStateStable { 326 return 327 } 328 329 // Step 2.4 330 if !pc.checkNegotiationNeeded() { 331 pc.isNegotiationNeeded.set(false) 332 return 333 } 334 335 // Step 2.5 336 if pc.isNegotiationNeeded.get() { 337 return 338 } 339 340 // Step 2.6 341 pc.isNegotiationNeeded.set(true) 342 343 // Step 2.7 344 if handler, ok := pc.onNegotiationNeededHandler.Load().(func()); ok && handler != nil { 345 handler() 346 } 347} 348 349func (pc *PeerConnection) checkNegotiationNeeded() bool { //nolint:gocognit 350 // To check if negotiation is needed for connection, perform the following checks: 351 // Skip 1, 2 steps 352 // Step 3 353 pc.mu.Lock() 354 defer pc.mu.Unlock() 355 356 localDesc := pc.currentLocalDescription 357 remoteDesc := pc.currentRemoteDescription 358 359 if localDesc == nil { 360 return true 361 } 362 363 pc.sctpTransport.lock.Lock() 364 lenDataChannel := len(pc.sctpTransport.dataChannels) 365 pc.sctpTransport.lock.Unlock() 366 367 if lenDataChannel != 0 && haveDataChannel(localDesc) == nil { 368 return true 369 } 370 371 for _, t := range pc.rtpTransceivers { 372 // https://www.w3.org/TR/webrtc/#dfn-update-the-negotiation-needed-flag 373 // Step 5.1 374 // if t.stopping && !t.stopped { 375 // return true 376 // } 377 m := getByMid(t.Mid(), localDesc) 378 // Step 5.2 379 if !t.stopped && m == nil { 380 return true 381 } 382 if !t.stopped && m != nil { 383 // Step 5.3.1 384 if t.Direction() == RTPTransceiverDirectionSendrecv || t.Direction() == RTPTransceiverDirectionSendonly { 385 descMsid, okMsid := m.Attribute(sdp.AttrKeyMsid) 386 track := t.Sender().Track() 387 if !okMsid || descMsid != track.StreamID()+" "+track.ID() { 388 return true 389 } 390 } 391 switch localDesc.Type { 392 case SDPTypeOffer: 393 // Step 5.3.2 394 rm := getByMid(t.Mid(), remoteDesc) 395 if rm == nil { 396 return true 397 } 398 399 if getPeerDirection(m) != t.Direction() && getPeerDirection(rm) != t.Direction().Revers() { 400 return true 401 } 402 case SDPTypeAnswer: 403 // Step 5.3.3 404 if _, ok := m.Attribute(t.Direction().String()); !ok { 405 return true 406 } 407 default: 408 } 409 } 410 // Step 5.4 411 if t.stopped && t.Mid() != "" { 412 if getByMid(t.Mid(), localDesc) != nil || getByMid(t.Mid(), remoteDesc) != nil { 413 return true 414 } 415 } 416 } 417 // Step 6 418 return false 419} 420 421// OnICECandidate sets an event handler which is invoked when a new ICE 422// candidate is found. 423// Take note that the handler is gonna be called with a nil pointer when 424// gathering is finished. 425func (pc *PeerConnection) OnICECandidate(f func(*ICECandidate)) { 426 pc.iceGatherer.OnLocalCandidate(f) 427} 428 429// OnICEGatheringStateChange sets an event handler which is invoked when the 430// ICE candidate gathering state has changed. 431func (pc *PeerConnection) OnICEGatheringStateChange(f func(ICEGathererState)) { 432 pc.iceGatherer.OnStateChange(f) 433} 434 435// OnTrack sets an event handler which is called when remote track 436// arrives from a remote peer. 437func (pc *PeerConnection) OnTrack(f func(*TrackRemote, *RTPReceiver)) { 438 pc.mu.Lock() 439 defer pc.mu.Unlock() 440 pc.onTrackHandler = f 441} 442 443func (pc *PeerConnection) onTrack(t *TrackRemote, r *RTPReceiver) { 444 pc.mu.RLock() 445 handler := pc.onTrackHandler 446 pc.mu.RUnlock() 447 448 pc.log.Debugf("got new track: %+v", t) 449 if t != nil { 450 if handler != nil { 451 go handler(t, r) 452 } else { 453 pc.log.Warnf("OnTrack unset, unable to handle incoming media streams") 454 } 455 } 456} 457 458// OnICEConnectionStateChange sets an event handler which is called 459// when an ICE connection state is changed. 460func (pc *PeerConnection) OnICEConnectionStateChange(f func(ICEConnectionState)) { 461 pc.mu.Lock() 462 defer pc.mu.Unlock() 463 pc.onICEConnectionStateChangeHandler = f 464} 465 466func (pc *PeerConnection) onICEConnectionStateChange(cs ICEConnectionState) { 467 pc.mu.Lock() 468 pc.iceConnectionState = cs 469 handler := pc.onICEConnectionStateChangeHandler 470 pc.mu.Unlock() 471 472 pc.log.Infof("ICE connection state changed: %s", cs) 473 if handler != nil { 474 go handler(cs) 475 } 476} 477 478// OnConnectionStateChange sets an event handler which is called 479// when the PeerConnectionState has changed 480func (pc *PeerConnection) OnConnectionStateChange(f func(PeerConnectionState)) { 481 pc.mu.Lock() 482 defer pc.mu.Unlock() 483 pc.onConnectionStateChangeHandler = f 484} 485 486// SetConfiguration updates the configuration of this PeerConnection object. 487func (pc *PeerConnection) SetConfiguration(configuration Configuration) error { //nolint:gocognit 488 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-setconfiguration (step #2) 489 if pc.isClosed.get() { 490 return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 491 } 492 493 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #3) 494 if configuration.PeerIdentity != "" { 495 if configuration.PeerIdentity != pc.configuration.PeerIdentity { 496 return &rtcerr.InvalidModificationError{Err: ErrModifyingPeerIdentity} 497 } 498 pc.configuration.PeerIdentity = configuration.PeerIdentity 499 } 500 501 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #4) 502 if len(configuration.Certificates) > 0 { 503 if len(configuration.Certificates) != len(pc.configuration.Certificates) { 504 return &rtcerr.InvalidModificationError{Err: ErrModifyingCertificates} 505 } 506 507 for i, certificate := range configuration.Certificates { 508 if !pc.configuration.Certificates[i].Equals(certificate) { 509 return &rtcerr.InvalidModificationError{Err: ErrModifyingCertificates} 510 } 511 } 512 pc.configuration.Certificates = configuration.Certificates 513 } 514 515 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #5) 516 if configuration.BundlePolicy != BundlePolicy(Unknown) { 517 if configuration.BundlePolicy != pc.configuration.BundlePolicy { 518 return &rtcerr.InvalidModificationError{Err: ErrModifyingBundlePolicy} 519 } 520 pc.configuration.BundlePolicy = configuration.BundlePolicy 521 } 522 523 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #6) 524 if configuration.RTCPMuxPolicy != RTCPMuxPolicy(Unknown) { 525 if configuration.RTCPMuxPolicy != pc.configuration.RTCPMuxPolicy { 526 return &rtcerr.InvalidModificationError{Err: ErrModifyingRTCPMuxPolicy} 527 } 528 pc.configuration.RTCPMuxPolicy = configuration.RTCPMuxPolicy 529 } 530 531 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #7) 532 if configuration.ICECandidatePoolSize != 0 { 533 if pc.configuration.ICECandidatePoolSize != configuration.ICECandidatePoolSize && 534 pc.LocalDescription() != nil { 535 return &rtcerr.InvalidModificationError{Err: ErrModifyingICECandidatePoolSize} 536 } 537 pc.configuration.ICECandidatePoolSize = configuration.ICECandidatePoolSize 538 } 539 540 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #8) 541 if configuration.ICETransportPolicy != ICETransportPolicy(Unknown) { 542 pc.configuration.ICETransportPolicy = configuration.ICETransportPolicy 543 } 544 545 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11) 546 if len(configuration.ICEServers) > 0 { 547 // https://www.w3.org/TR/webrtc/#set-the-configuration (step #11.3) 548 for _, server := range configuration.ICEServers { 549 if err := server.validate(); err != nil { 550 return err 551 } 552 } 553 pc.configuration.ICEServers = configuration.ICEServers 554 } 555 return nil 556} 557 558// GetConfiguration returns a Configuration object representing the current 559// configuration of this PeerConnection object. The returned object is a 560// copy and direct mutation on it will not take affect until SetConfiguration 561// has been called with Configuration passed as its only argument. 562// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-getconfiguration 563func (pc *PeerConnection) GetConfiguration() Configuration { 564 return pc.configuration 565} 566 567func (pc *PeerConnection) getStatsID() string { 568 pc.mu.RLock() 569 defer pc.mu.RUnlock() 570 return pc.statsID 571} 572 573func (pc *PeerConnection) hasLocalDescriptionChanged(desc *SessionDescription) bool { 574 for _, t := range pc.GetTransceivers() { 575 m := getByMid(t.Mid(), desc) 576 if m == nil { 577 return true 578 } 579 580 if getPeerDirection(m) != t.Direction() { 581 return true 582 } 583 } 584 585 return false 586} 587 588var errExcessiveRetries = errors.New("excessive retries in CreateOffer") 589 590// CreateOffer starts the PeerConnection and generates the localDescription 591// https://w3c.github.io/webrtc-pc/#dom-rtcpeerconnection-createoffer 592func (pc *PeerConnection) CreateOffer(options *OfferOptions) (SessionDescription, error) { //nolint:gocognit 593 useIdentity := pc.idpLoginURL != nil 594 switch { 595 case useIdentity: 596 return SessionDescription{}, errIdentityProviderNotImplemented 597 case pc.isClosed.get(): 598 return SessionDescription{}, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 599 } 600 601 if options != nil && options.ICERestart { 602 if err := pc.iceTransport.restart(); err != nil { 603 return SessionDescription{}, err 604 } 605 } 606 607 var ( 608 d *sdp.SessionDescription 609 offer SessionDescription 610 err error 611 ) 612 613 // This may be necessary to recompute if, for example, createOffer was called when only an 614 // audio RTCRtpTransceiver was added to connection, but while performing the in-parallel 615 // steps to create an offer, a video RTCRtpTransceiver was added, requiring additional 616 // inspection of video system resources. 617 count := 0 618 for { 619 // We cache current transceivers to ensure they aren't 620 // mutated during offer generation. We later check if they have 621 // been mutated and recompute the offer if necessary. 622 currentTransceivers := pc.GetTransceivers() 623 624 // in-parallel steps to create an offer 625 // https://w3c.github.io/webrtc-pc/#dfn-in-parallel-steps-to-create-an-offer 626 isPlanB := pc.configuration.SDPSemantics == SDPSemanticsPlanB 627 if pc.currentRemoteDescription != nil { 628 isPlanB = descriptionIsPlanB(pc.RemoteDescription()) 629 } 630 631 // include unmatched local transceivers 632 if !isPlanB { 633 // update the greater mid if the remote description provides a greater one 634 if pc.currentRemoteDescription != nil { 635 var numericMid int 636 for _, media := range pc.currentRemoteDescription.parsed.MediaDescriptions { 637 mid := getMidValue(media) 638 if mid == "" { 639 continue 640 } 641 numericMid, err = strconv.Atoi(mid) 642 if err != nil { 643 continue 644 } 645 if numericMid > pc.greaterMid { 646 pc.greaterMid = numericMid 647 } 648 } 649 } 650 for _, t := range currentTransceivers { 651 if t.Mid() != "" { 652 continue 653 } 654 pc.greaterMid++ 655 err = t.setMid(strconv.Itoa(pc.greaterMid)) 656 if err != nil { 657 return SessionDescription{}, err 658 } 659 } 660 } 661 662 if pc.currentRemoteDescription == nil { 663 d, err = pc.generateUnmatchedSDP(currentTransceivers, useIdentity) 664 } else { 665 d, err = pc.generateMatchedSDP(currentTransceivers, useIdentity, true /*includeUnmatched */, connectionRoleFromDtlsRole(defaultDtlsRoleOffer)) 666 } 667 668 if err != nil { 669 return SessionDescription{}, err 670 } 671 672 sdpBytes, err := d.Marshal() 673 if err != nil { 674 return SessionDescription{}, err 675 } 676 677 offer = SessionDescription{ 678 Type: SDPTypeOffer, 679 SDP: string(sdpBytes), 680 parsed: d, 681 } 682 683 // Verify local media hasn't changed during offer 684 // generation. Recompute if necessary 685 if isPlanB || !pc.hasLocalDescriptionChanged(&offer) { 686 break 687 } 688 count++ 689 if count >= 128 { 690 return SessionDescription{}, errExcessiveRetries 691 } 692 } 693 694 pc.lastOffer = offer.SDP 695 return offer, nil 696} 697 698func (pc *PeerConnection) createICEGatherer() (*ICEGatherer, error) { 699 g, err := pc.api.NewICEGatherer(ICEGatherOptions{ 700 ICEServers: pc.configuration.getICEServers(), 701 ICEGatherPolicy: pc.configuration.ICETransportPolicy, 702 }) 703 if err != nil { 704 return nil, err 705 } 706 707 return g, nil 708} 709 710// Update the PeerConnectionState given the state of relevant transports 711// https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum 712func (pc *PeerConnection) updateConnectionState(iceConnectionState ICEConnectionState, dtlsTransportState DTLSTransportState) { 713 pc.mu.Lock() 714 defer pc.mu.Unlock() 715 716 connectionState := PeerConnectionStateNew 717 switch { 718 // The RTCPeerConnection object's [[IsClosed]] slot is true. 719 case pc.isClosed.get(): 720 connectionState = PeerConnectionStateClosed 721 722 // Any of the RTCIceTransports or RTCDtlsTransports are in a "failed" state. 723 case iceConnectionState == ICEConnectionStateFailed || dtlsTransportState == DTLSTransportStateFailed: 724 connectionState = PeerConnectionStateFailed 725 726 // Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected" 727 // state and none of them are in the "failed" or "connecting" or "checking" state. */ 728 case iceConnectionState == ICEConnectionStateDisconnected: 729 connectionState = PeerConnectionStateDisconnected 730 731 // All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed" 732 // state and at least one of them is in the "connected" or "completed" state. 733 case iceConnectionState == ICEConnectionStateConnected && dtlsTransportState == DTLSTransportStateConnected: 734 connectionState = PeerConnectionStateConnected 735 736 // Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or 737 // "checking" state and none of them is in the "failed" state. 738 case iceConnectionState == ICEConnectionStateChecking && dtlsTransportState == DTLSTransportStateConnecting: 739 connectionState = PeerConnectionStateConnecting 740 } 741 742 if pc.connectionState == connectionState { 743 return 744 } 745 746 pc.log.Infof("peer connection state changed: %s", connectionState) 747 pc.connectionState = connectionState 748 handler := pc.onConnectionStateChangeHandler 749 if handler != nil { 750 go handler(connectionState) 751 } 752} 753 754func (pc *PeerConnection) createICETransport() *ICETransport { 755 t := pc.api.NewICETransport(pc.iceGatherer) 756 t.OnConnectionStateChange(func(state ICETransportState) { 757 var cs ICEConnectionState 758 switch state { 759 case ICETransportStateNew: 760 cs = ICEConnectionStateNew 761 case ICETransportStateChecking: 762 cs = ICEConnectionStateChecking 763 case ICETransportStateConnected: 764 cs = ICEConnectionStateConnected 765 case ICETransportStateCompleted: 766 cs = ICEConnectionStateCompleted 767 case ICETransportStateFailed: 768 cs = ICEConnectionStateFailed 769 case ICETransportStateDisconnected: 770 cs = ICEConnectionStateDisconnected 771 case ICETransportStateClosed: 772 cs = ICEConnectionStateClosed 773 default: 774 pc.log.Warnf("OnConnectionStateChange: unhandled ICE state: %s", state) 775 return 776 } 777 pc.onICEConnectionStateChange(cs) 778 pc.updateConnectionState(cs, pc.dtlsTransport.State()) 779 }) 780 781 return t 782} 783 784// CreateAnswer starts the PeerConnection and generates the localDescription 785func (pc *PeerConnection) CreateAnswer(options *AnswerOptions) (SessionDescription, error) { 786 useIdentity := pc.idpLoginURL != nil 787 switch { 788 case pc.RemoteDescription() == nil: 789 return SessionDescription{}, &rtcerr.InvalidStateError{Err: ErrNoRemoteDescription} 790 case useIdentity: 791 return SessionDescription{}, errIdentityProviderNotImplemented 792 case pc.isClosed.get(): 793 return SessionDescription{}, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 794 case pc.signalingState.Get() != SignalingStateHaveRemoteOffer && pc.signalingState.Get() != SignalingStateHaveLocalPranswer: 795 return SessionDescription{}, &rtcerr.InvalidStateError{Err: ErrIncorrectSignalingState} 796 } 797 798 connectionRole := connectionRoleFromDtlsRole(pc.api.settingEngine.answeringDTLSRole) 799 if connectionRole == sdp.ConnectionRole(0) { 800 connectionRole = connectionRoleFromDtlsRole(defaultDtlsRoleAnswer) 801 } 802 803 currentTransceivers := pc.GetTransceivers() 804 d, err := pc.generateMatchedSDP(currentTransceivers, useIdentity, false /*includeUnmatched */, connectionRole) 805 if err != nil { 806 return SessionDescription{}, err 807 } 808 809 sdpBytes, err := d.Marshal() 810 if err != nil { 811 return SessionDescription{}, err 812 } 813 814 desc := SessionDescription{ 815 Type: SDPTypeAnswer, 816 SDP: string(sdpBytes), 817 parsed: d, 818 } 819 pc.lastAnswer = desc.SDP 820 return desc, nil 821} 822 823// 4.4.1.6 Set the SessionDescription 824func (pc *PeerConnection) setDescription(sd *SessionDescription, op stateChangeOp) error { //nolint:gocognit 825 switch { 826 case pc.isClosed.get(): 827 return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 828 case NewSDPType(sd.Type.String()) == SDPType(Unknown): 829 return &rtcerr.TypeError{Err: fmt.Errorf("%w: '%d' is not a valid enum value of type SDPType", errPeerConnSDPTypeInvalidValue, sd.Type)} 830 } 831 832 nextState, err := func() (SignalingState, error) { 833 pc.mu.Lock() 834 defer pc.mu.Unlock() 835 836 cur := pc.SignalingState() 837 setLocal := stateChangeOpSetLocal 838 setRemote := stateChangeOpSetRemote 839 newSDPDoesNotMatchOffer := &rtcerr.InvalidModificationError{Err: errSDPDoesNotMatchOffer} 840 newSDPDoesNotMatchAnswer := &rtcerr.InvalidModificationError{Err: errSDPDoesNotMatchAnswer} 841 842 var nextState SignalingState 843 var err error 844 switch op { 845 case setLocal: 846 switch sd.Type { 847 // stable->SetLocal(offer)->have-local-offer 848 case SDPTypeOffer: 849 if sd.SDP != pc.lastOffer { 850 return nextState, newSDPDoesNotMatchOffer 851 } 852 nextState, err = checkNextSignalingState(cur, SignalingStateHaveLocalOffer, setLocal, sd.Type) 853 if err == nil { 854 pc.pendingLocalDescription = sd 855 } 856 // have-remote-offer->SetLocal(answer)->stable 857 // have-local-pranswer->SetLocal(answer)->stable 858 case SDPTypeAnswer: 859 if sd.SDP != pc.lastAnswer { 860 return nextState, newSDPDoesNotMatchAnswer 861 } 862 nextState, err = checkNextSignalingState(cur, SignalingStateStable, setLocal, sd.Type) 863 if err == nil { 864 pc.currentLocalDescription = sd 865 pc.currentRemoteDescription = pc.pendingRemoteDescription 866 pc.pendingRemoteDescription = nil 867 pc.pendingLocalDescription = nil 868 } 869 case SDPTypeRollback: 870 nextState, err = checkNextSignalingState(cur, SignalingStateStable, setLocal, sd.Type) 871 if err == nil { 872 pc.pendingLocalDescription = nil 873 } 874 // have-remote-offer->SetLocal(pranswer)->have-local-pranswer 875 case SDPTypePranswer: 876 if sd.SDP != pc.lastAnswer { 877 return nextState, newSDPDoesNotMatchAnswer 878 } 879 nextState, err = checkNextSignalingState(cur, SignalingStateHaveLocalPranswer, setLocal, sd.Type) 880 if err == nil { 881 pc.pendingLocalDescription = sd 882 } 883 default: 884 return nextState, &rtcerr.OperationError{Err: fmt.Errorf("%w: %s(%s)", errPeerConnStateChangeInvalid, op, sd.Type)} 885 } 886 case setRemote: 887 switch sd.Type { 888 // stable->SetRemote(offer)->have-remote-offer 889 case SDPTypeOffer: 890 nextState, err = checkNextSignalingState(cur, SignalingStateHaveRemoteOffer, setRemote, sd.Type) 891 if err == nil { 892 pc.pendingRemoteDescription = sd 893 } 894 // have-local-offer->SetRemote(answer)->stable 895 // have-remote-pranswer->SetRemote(answer)->stable 896 case SDPTypeAnswer: 897 nextState, err = checkNextSignalingState(cur, SignalingStateStable, setRemote, sd.Type) 898 if err == nil { 899 pc.currentRemoteDescription = sd 900 pc.currentLocalDescription = pc.pendingLocalDescription 901 pc.pendingRemoteDescription = nil 902 pc.pendingLocalDescription = nil 903 } 904 case SDPTypeRollback: 905 nextState, err = checkNextSignalingState(cur, SignalingStateStable, setRemote, sd.Type) 906 if err == nil { 907 pc.pendingRemoteDescription = nil 908 } 909 // have-local-offer->SetRemote(pranswer)->have-remote-pranswer 910 case SDPTypePranswer: 911 nextState, err = checkNextSignalingState(cur, SignalingStateHaveRemotePranswer, setRemote, sd.Type) 912 if err == nil { 913 pc.pendingRemoteDescription = sd 914 } 915 default: 916 return nextState, &rtcerr.OperationError{Err: fmt.Errorf("%w: %s(%s)", errPeerConnStateChangeInvalid, op, sd.Type)} 917 } 918 default: 919 return nextState, &rtcerr.OperationError{Err: fmt.Errorf("%w: %q", errPeerConnStateChangeUnhandled, op)} 920 } 921 922 return nextState, err 923 }() 924 925 if err == nil { 926 pc.signalingState.Set(nextState) 927 if pc.signalingState.Get() == SignalingStateStable { 928 pc.isNegotiationNeeded.set(false) 929 pc.onNegotiationNeeded() 930 } 931 pc.onSignalingStateChange(nextState) 932 } 933 return err 934} 935 936// SetLocalDescription sets the SessionDescription of the local peer 937func (pc *PeerConnection) SetLocalDescription(desc SessionDescription) error { 938 if pc.isClosed.get() { 939 return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 940 } 941 942 haveLocalDescription := pc.currentLocalDescription != nil 943 944 // JSEP 5.4 945 if desc.SDP == "" { 946 switch desc.Type { 947 case SDPTypeAnswer, SDPTypePranswer: 948 desc.SDP = pc.lastAnswer 949 case SDPTypeOffer: 950 desc.SDP = pc.lastOffer 951 default: 952 return &rtcerr.InvalidModificationError{ 953 Err: fmt.Errorf("%w: %s", errPeerConnSDPTypeInvalidValueSetLocalDescription, desc.Type), 954 } 955 } 956 } 957 958 desc.parsed = &sdp.SessionDescription{} 959 if err := desc.parsed.Unmarshal([]byte(desc.SDP)); err != nil { 960 return err 961 } 962 if err := pc.setDescription(&desc, stateChangeOpSetLocal); err != nil { 963 return err 964 } 965 966 currentTransceivers := append([]*RTPTransceiver{}, pc.GetTransceivers()...) 967 968 weAnswer := desc.Type == SDPTypeAnswer 969 remoteDesc := pc.RemoteDescription() 970 if weAnswer && remoteDesc != nil { 971 if err := pc.startRTPSenders(currentTransceivers); err != nil { 972 return err 973 } 974 pc.ops.Enqueue(func() { 975 pc.startRTP(haveLocalDescription, remoteDesc, currentTransceivers) 976 }) 977 } 978 979 if pc.iceGatherer.State() == ICEGathererStateNew { 980 return pc.iceGatherer.Gather() 981 } 982 return nil 983} 984 985// LocalDescription returns PendingLocalDescription if it is not null and 986// otherwise it returns CurrentLocalDescription. This property is used to 987// determine if SetLocalDescription has already been called. 988// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-localdescription 989func (pc *PeerConnection) LocalDescription() *SessionDescription { 990 if pendingLocalDescription := pc.PendingLocalDescription(); pendingLocalDescription != nil { 991 return pendingLocalDescription 992 } 993 return pc.CurrentLocalDescription() 994} 995 996// SetRemoteDescription sets the SessionDescription of the remote peer 997// nolint: gocyclo 998func (pc *PeerConnection) SetRemoteDescription(desc SessionDescription) error { //nolint:gocognit 999 if pc.isClosed.get() { 1000 return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1001 } 1002 1003 isRenegotation := pc.currentRemoteDescription != nil 1004 1005 if _, err := desc.Unmarshal(); err != nil { 1006 return err 1007 } 1008 if err := pc.setDescription(&desc, stateChangeOpSetRemote); err != nil { 1009 return err 1010 } 1011 1012 if err := pc.api.mediaEngine.updateFromRemoteDescription(*desc.parsed); err != nil { 1013 return err 1014 } 1015 1016 var t *RTPTransceiver 1017 localTransceivers := append([]*RTPTransceiver{}, pc.GetTransceivers()...) 1018 detectedPlanB := descriptionIsPlanB(pc.RemoteDescription()) 1019 weOffer := desc.Type == SDPTypeAnswer 1020 1021 if !weOffer && !detectedPlanB { 1022 for _, media := range pc.RemoteDescription().parsed.MediaDescriptions { 1023 midValue := getMidValue(media) 1024 if midValue == "" { 1025 return errPeerConnRemoteDescriptionWithoutMidValue 1026 } 1027 1028 if media.MediaName.Media == mediaSectionApplication { 1029 continue 1030 } 1031 1032 kind := NewRTPCodecType(media.MediaName.Media) 1033 direction := getPeerDirection(media) 1034 if kind == 0 || direction == RTPTransceiverDirection(Unknown) { 1035 continue 1036 } 1037 1038 t, localTransceivers = findByMid(midValue, localTransceivers) 1039 if t == nil { 1040 t, localTransceivers = satisfyTypeAndDirection(kind, direction, localTransceivers) 1041 } else if direction == RTPTransceiverDirectionInactive { 1042 if err := t.Stop(); err != nil { 1043 return err 1044 } 1045 } 1046 1047 if t == nil { 1048 receiver, err := pc.api.NewRTPReceiver(kind, pc.dtlsTransport) 1049 if err != nil { 1050 return err 1051 } 1052 1053 localDirection := RTPTransceiverDirectionRecvonly 1054 if direction == RTPTransceiverDirectionRecvonly { 1055 localDirection = RTPTransceiverDirectionSendonly 1056 } 1057 1058 t = pc.newRTPTransceiver(receiver, nil, localDirection, kind) 1059 1060 pc.onNegotiationNeeded() 1061 } else if direction == RTPTransceiverDirectionRecvonly { 1062 if t.Direction() == RTPTransceiverDirectionSendrecv { 1063 t.setDirection(RTPTransceiverDirectionSendonly) 1064 } 1065 } 1066 1067 if t.Mid() == "" { 1068 if err := t.setMid(midValue); err != nil { 1069 return err 1070 } 1071 } 1072 } 1073 } 1074 1075 remoteUfrag, remotePwd, candidates, err := extractICEDetails(desc.parsed) 1076 if err != nil { 1077 return err 1078 } 1079 1080 if isRenegotation && pc.iceTransport.haveRemoteCredentialsChange(remoteUfrag, remotePwd) { 1081 // An ICE Restart only happens implicitly for a SetRemoteDescription of type offer 1082 if !weOffer { 1083 if err = pc.iceTransport.restart(); err != nil { 1084 return err 1085 } 1086 } 1087 1088 if err = pc.iceTransport.setRemoteCredentials(remoteUfrag, remotePwd); err != nil { 1089 return err 1090 } 1091 } 1092 1093 for i := range candidates { 1094 if err = pc.iceTransport.AddRemoteCandidate(&candidates[i]); err != nil { 1095 return err 1096 } 1097 } 1098 1099 currentTransceivers := append([]*RTPTransceiver{}, pc.GetTransceivers()...) 1100 1101 if isRenegotation { 1102 if weOffer { 1103 if err = pc.startRTPSenders(currentTransceivers); err != nil { 1104 return err 1105 } 1106 pc.ops.Enqueue(func() { 1107 pc.startRTP(true, &desc, currentTransceivers) 1108 }) 1109 } 1110 return nil 1111 } 1112 1113 remoteIsLite := false 1114 for _, a := range desc.parsed.Attributes { 1115 if strings.TrimSpace(a.Key) == sdp.AttrKeyICELite { 1116 remoteIsLite = true 1117 } 1118 } 1119 1120 fingerprint, fingerprintHash, err := extractFingerprint(desc.parsed) 1121 if err != nil { 1122 return err 1123 } 1124 1125 iceRole := ICERoleControlled 1126 // If one of the agents is lite and the other one is not, the lite agent must be the controlling agent. 1127 // If both or neither agents are lite the offering agent is controlling. 1128 // RFC 8445 S6.1.1 1129 if (weOffer && remoteIsLite == pc.api.settingEngine.candidates.ICELite) || (remoteIsLite && !pc.api.settingEngine.candidates.ICELite) { 1130 iceRole = ICERoleControlling 1131 } 1132 1133 // Start the networking in a new routine since it will block until 1134 // the connection is actually established. 1135 if weOffer { 1136 if err := pc.startRTPSenders(currentTransceivers); err != nil { 1137 return err 1138 } 1139 } 1140 1141 pc.ops.Enqueue(func() { 1142 pc.startTransports(iceRole, dtlsRoleFromRemoteSDP(desc.parsed), remoteUfrag, remotePwd, fingerprint, fingerprintHash) 1143 if weOffer { 1144 pc.startRTP(false, &desc, currentTransceivers) 1145 } 1146 }) 1147 return nil 1148} 1149 1150func (pc *PeerConnection) startReceiver(incoming trackDetails, receiver *RTPReceiver) { 1151 encodings := []RTPDecodingParameters{} 1152 if incoming.ssrc != 0 { 1153 encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{SSRC: incoming.ssrc}}) 1154 } 1155 for _, rid := range incoming.rids { 1156 encodings = append(encodings, RTPDecodingParameters{RTPCodingParameters{RID: rid}}) 1157 } 1158 1159 if err := receiver.Receive(RTPReceiveParameters{Encodings: encodings}); err != nil { 1160 pc.log.Warnf("RTPReceiver Receive failed %s", err) 1161 return 1162 } 1163 1164 // set track id and label early so they can be set as new track information 1165 // is received from the SDP. 1166 for i := range receiver.tracks { 1167 receiver.tracks[i].track.mu.Lock() 1168 receiver.tracks[i].track.id = incoming.id 1169 receiver.tracks[i].track.streamID = incoming.streamID 1170 receiver.tracks[i].track.mu.Unlock() 1171 } 1172 1173 // We can't block and wait for a single SSRC 1174 if incoming.ssrc == 0 { 1175 return 1176 } 1177 1178 go func() { 1179 if err := receiver.Track().determinePayloadType(); err != nil { 1180 pc.log.Warnf("Could not determine PayloadType for SSRC %d", receiver.Track().SSRC()) 1181 return 1182 } 1183 1184 params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(receiver.Track().PayloadType()) 1185 if err != nil { 1186 pc.log.Warnf("no codec could be found for payloadType %d", receiver.Track().PayloadType()) 1187 return 1188 } 1189 1190 receiver.Track().mu.Lock() 1191 receiver.Track().kind = receiver.kind 1192 receiver.Track().codec = params.Codecs[0] 1193 receiver.Track().params = params 1194 receiver.Track().mu.Unlock() 1195 1196 pc.onTrack(receiver.Track(), receiver) 1197 }() 1198} 1199 1200// startRTPReceivers opens knows inbound SRTP streams from the RemoteDescription 1201func (pc *PeerConnection) startRTPReceivers(incomingTracks []trackDetails, currentTransceivers []*RTPTransceiver) { //nolint:gocognit 1202 localTransceivers := append([]*RTPTransceiver{}, currentTransceivers...) 1203 1204 remoteIsPlanB := false 1205 switch pc.configuration.SDPSemantics { 1206 case SDPSemanticsPlanB: 1207 remoteIsPlanB = true 1208 case SDPSemanticsUnifiedPlanWithFallback: 1209 remoteIsPlanB = descriptionIsPlanB(pc.RemoteDescription()) 1210 default: 1211 // none 1212 } 1213 1214 // Ensure we haven't already started a transceiver for this ssrc 1215 for i := range incomingTracks { 1216 if len(incomingTracks) <= i { 1217 break 1218 } 1219 incomingTrack := incomingTracks[i] 1220 1221 for _, t := range localTransceivers { 1222 if (t.Receiver()) == nil || t.Receiver().Track() == nil || t.Receiver().Track().ssrc != incomingTrack.ssrc { 1223 continue 1224 } 1225 1226 incomingTracks = filterTrackWithSSRC(incomingTracks, incomingTrack.ssrc) 1227 } 1228 } 1229 1230 unhandledTracks := incomingTracks[:0] 1231 for i := range incomingTracks { 1232 trackHandled := false 1233 for j := range localTransceivers { 1234 t := localTransceivers[j] 1235 incomingTrack := incomingTracks[i] 1236 1237 if t.Mid() != incomingTrack.mid { 1238 continue 1239 } 1240 1241 if (incomingTrack.kind != t.kind) || 1242 (t.Direction() != RTPTransceiverDirectionRecvonly && t.Direction() != RTPTransceiverDirectionSendrecv) || 1243 (t.Receiver()) == nil || 1244 (t.Receiver().haveReceived()) { 1245 continue 1246 } 1247 1248 pc.startReceiver(incomingTrack, t.Receiver()) 1249 trackHandled = true 1250 break 1251 } 1252 1253 if !trackHandled { 1254 unhandledTracks = append(unhandledTracks, incomingTracks[i]) 1255 } 1256 } 1257 1258 if remoteIsPlanB { 1259 for _, incoming := range unhandledTracks { 1260 t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{ 1261 Direction: RTPTransceiverDirectionSendrecv, 1262 }) 1263 if err != nil { 1264 pc.log.Warnf("Could not add transceiver for remote SSRC %d: %s", incoming.ssrc, err) 1265 continue 1266 } 1267 pc.startReceiver(incoming, t.Receiver()) 1268 } 1269 } 1270} 1271 1272// startRTPSenders starts all outbound RTP streams 1273func (pc *PeerConnection) startRTPSenders(currentTransceivers []*RTPTransceiver) error { 1274 for _, transceiver := range currentTransceivers { 1275 if transceiver.Sender() != nil && transceiver.Sender().isNegotiated() && !transceiver.Sender().hasSent() { 1276 err := transceiver.Sender().Send(RTPSendParameters{ 1277 Encodings: []RTPEncodingParameters{ 1278 { 1279 RTPCodingParameters{ 1280 SSRC: transceiver.Sender().ssrc, 1281 PayloadType: transceiver.Sender().payloadType, 1282 }, 1283 }, 1284 }, 1285 }) 1286 if err != nil { 1287 return err 1288 } 1289 } 1290 } 1291 1292 return nil 1293} 1294 1295// Start SCTP subsystem 1296func (pc *PeerConnection) startSCTP() { 1297 // Start sctp 1298 if err := pc.sctpTransport.Start(SCTPCapabilities{ 1299 MaxMessageSize: 0, 1300 }); err != nil { 1301 pc.log.Warnf("Failed to start SCTP: %s", err) 1302 if err = pc.sctpTransport.Stop(); err != nil { 1303 pc.log.Warnf("Failed to stop SCTPTransport: %s", err) 1304 } 1305 1306 return 1307 } 1308 1309 // DataChannels that need to be opened now that SCTP is available 1310 // make a copy we may have incoming DataChannels mutating this while we open 1311 pc.sctpTransport.lock.RLock() 1312 dataChannels := append([]*DataChannel{}, pc.sctpTransport.dataChannels...) 1313 pc.sctpTransport.lock.RUnlock() 1314 1315 var openedDCCount uint32 1316 for _, d := range dataChannels { 1317 if d.ReadyState() == DataChannelStateConnecting { 1318 err := d.open(pc.sctpTransport) 1319 if err != nil { 1320 pc.log.Warnf("failed to open data channel: %s", err) 1321 continue 1322 } 1323 openedDCCount++ 1324 } 1325 } 1326 1327 pc.sctpTransport.lock.Lock() 1328 pc.sctpTransport.dataChannelsOpened += openedDCCount 1329 pc.sctpTransport.lock.Unlock() 1330} 1331 1332func (pc *PeerConnection) handleUndeclaredSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocognit 1333 remoteDescription := pc.RemoteDescription() 1334 if remoteDescription == nil { 1335 return errPeerConnRemoteDescriptionNil 1336 } 1337 1338 // If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared 1339 if len(remoteDescription.parsed.MediaDescriptions) == 1 { 1340 onlyMediaSection := remoteDescription.parsed.MediaDescriptions[0] 1341 for _, a := range onlyMediaSection.Attributes { 1342 if a.Key == ssrcStr { 1343 return errPeerConnSingleMediaSectionHasExplicitSSRC 1344 } 1345 } 1346 1347 incoming := trackDetails{ 1348 ssrc: ssrc, 1349 kind: RTPCodecTypeVideo, 1350 } 1351 if onlyMediaSection.MediaName.Media == RTPCodecTypeAudio.String() { 1352 incoming.kind = RTPCodecTypeAudio 1353 } 1354 1355 t, err := pc.AddTransceiverFromKind(incoming.kind, RtpTransceiverInit{ 1356 Direction: RTPTransceiverDirectionSendrecv, 1357 }) 1358 if err != nil { 1359 return fmt.Errorf("%w: %d: %s", errPeerConnRemoteSSRCAddTransceiver, ssrc, err) 1360 } 1361 pc.startReceiver(incoming, t.Receiver()) 1362 return nil 1363 } 1364 1365 midExtensionID, audioSupported, videoSupported := pc.api.mediaEngine.getHeaderExtensionID(RTPHeaderExtensionCapability{sdp.SDESMidURI}) 1366 if !audioSupported && !videoSupported { 1367 return errPeerConnSimulcastMidRTPExtensionRequired 1368 } 1369 1370 streamIDExtensionID, audioSupported, videoSupported := pc.api.mediaEngine.getHeaderExtensionID(RTPHeaderExtensionCapability{sdp.SDESRTPStreamIDURI}) 1371 if !audioSupported && !videoSupported { 1372 return errPeerConnSimulcastStreamIDRTPExtensionRequired 1373 } 1374 1375 b := make([]byte, receiveMTU) 1376 var mid, rid string 1377 for readCount := 0; readCount <= simulcastProbeCount; readCount++ { 1378 i, err := rtpStream.Read(b) 1379 if err != nil { 1380 return err 1381 } 1382 1383 maybeMid, maybeRid, payloadType, err := handleUnknownRTPPacket(b[:i], uint8(midExtensionID), uint8(streamIDExtensionID)) 1384 if err != nil { 1385 return err 1386 } 1387 1388 if maybeMid != "" { 1389 mid = maybeMid 1390 } 1391 if maybeRid != "" { 1392 rid = maybeRid 1393 } 1394 1395 if mid == "" || rid == "" { 1396 continue 1397 } 1398 1399 params, err := pc.api.mediaEngine.getRTPParametersByPayloadType(payloadType) 1400 if err != nil { 1401 return err 1402 } 1403 1404 for _, t := range pc.GetTransceivers() { 1405 if t.Mid() != mid || t.Receiver() == nil { 1406 continue 1407 } 1408 1409 track, err := t.Receiver().receiveForRid(rid, params, ssrc) 1410 if err != nil { 1411 return err 1412 } 1413 pc.onTrack(track, t.Receiver()) 1414 return nil 1415 } 1416 } 1417 1418 return errPeerConnSimulcastIncomingSSRCFailed 1419} 1420 1421// undeclaredMediaProcessor handles RTP/RTCP packets that don't match any a:ssrc lines 1422func (pc *PeerConnection) undeclaredMediaProcessor() { 1423 go func() { 1424 var simulcastRoutineCount uint64 1425 for { 1426 srtpSession, err := pc.dtlsTransport.getSRTPSession() 1427 if err != nil { 1428 pc.log.Warnf("undeclaredMediaProcessor failed to open SrtpSession: %v", err) 1429 return 1430 } 1431 1432 stream, ssrc, err := srtpSession.AcceptStream() 1433 if err != nil { 1434 pc.log.Warnf("Failed to accept RTP %v", err) 1435 return 1436 } 1437 1438 if pc.isClosed.get() { 1439 if err = stream.Close(); err != nil { 1440 pc.log.Warnf("Failed to close RTP stream %v", err) 1441 } 1442 continue 1443 } 1444 1445 if atomic.AddUint64(&simulcastRoutineCount, 1) >= simulcastMaxProbeRoutines { 1446 atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) 1447 pc.log.Warn(ErrSimulcastProbeOverflow.Error()) 1448 continue 1449 } 1450 1451 go func(rtpStream io.Reader, ssrc SSRC) { 1452 pc.dtlsTransport.storeSimulcastStream(stream) 1453 1454 if err := pc.handleUndeclaredSSRC(rtpStream, ssrc); err != nil { 1455 pc.log.Errorf("Incoming unhandled RTP ssrc(%d), OnTrack will not be fired. %v", ssrc, err) 1456 } 1457 atomic.AddUint64(&simulcastRoutineCount, ^uint64(0)) 1458 }(stream, SSRC(ssrc)) 1459 } 1460 }() 1461 1462 go func() { 1463 for { 1464 srtcpSession, err := pc.dtlsTransport.getSRTCPSession() 1465 if err != nil { 1466 pc.log.Warnf("undeclaredMediaProcessor failed to open SrtcpSession: %v", err) 1467 return 1468 } 1469 1470 _, ssrc, err := srtcpSession.AcceptStream() 1471 if err != nil { 1472 pc.log.Warnf("Failed to accept RTCP %v", err) 1473 return 1474 } 1475 pc.log.Warnf("Incoming unhandled RTCP ssrc(%d), OnTrack will not be fired", ssrc) 1476 } 1477 }() 1478} 1479 1480// RemoteDescription returns pendingRemoteDescription if it is not null and 1481// otherwise it returns currentRemoteDescription. This property is used to 1482// determine if setRemoteDescription has already been called. 1483// https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-remotedescription 1484func (pc *PeerConnection) RemoteDescription() *SessionDescription { 1485 pc.mu.RLock() 1486 defer pc.mu.RUnlock() 1487 1488 if pc.pendingRemoteDescription != nil { 1489 return pc.pendingRemoteDescription 1490 } 1491 return pc.currentRemoteDescription 1492} 1493 1494// AddICECandidate accepts an ICE candidate string and adds it 1495// to the existing set of candidates. 1496func (pc *PeerConnection) AddICECandidate(candidate ICECandidateInit) error { 1497 if pc.RemoteDescription() == nil { 1498 return &rtcerr.InvalidStateError{Err: ErrNoRemoteDescription} 1499 } 1500 1501 candidateValue := strings.TrimPrefix(candidate.Candidate, "candidate:") 1502 1503 var iceCandidate *ICECandidate 1504 if candidateValue != "" { 1505 candidate, err := ice.UnmarshalCandidate(candidateValue) 1506 if err != nil { 1507 return err 1508 } 1509 1510 c, err := newICECandidateFromICE(candidate) 1511 if err != nil { 1512 return err 1513 } 1514 iceCandidate = &c 1515 } 1516 1517 return pc.iceTransport.AddRemoteCandidate(iceCandidate) 1518} 1519 1520// ICEConnectionState returns the ICE connection state of the 1521// PeerConnection instance. 1522func (pc *PeerConnection) ICEConnectionState() ICEConnectionState { 1523 pc.mu.RLock() 1524 defer pc.mu.RUnlock() 1525 1526 return pc.iceConnectionState 1527} 1528 1529// GetSenders returns the RTPSender that are currently attached to this PeerConnection 1530func (pc *PeerConnection) GetSenders() []*RTPSender { 1531 pc.mu.Lock() 1532 defer pc.mu.Unlock() 1533 1534 result := []*RTPSender{} 1535 for _, transceiver := range pc.rtpTransceivers { 1536 if transceiver.Sender() != nil { 1537 result = append(result, transceiver.Sender()) 1538 } 1539 } 1540 return result 1541} 1542 1543// GetReceivers returns the RTPReceivers that are currently attached to this PeerConnection 1544func (pc *PeerConnection) GetReceivers() (receivers []*RTPReceiver) { 1545 pc.mu.Lock() 1546 defer pc.mu.Unlock() 1547 1548 for _, transceiver := range pc.rtpTransceivers { 1549 if transceiver.Receiver() != nil { 1550 receivers = append(receivers, transceiver.Receiver()) 1551 } 1552 } 1553 return 1554} 1555 1556// GetTransceivers returns the RtpTransceiver that are currently attached to this PeerConnection 1557func (pc *PeerConnection) GetTransceivers() []*RTPTransceiver { 1558 pc.mu.Lock() 1559 defer pc.mu.Unlock() 1560 1561 return pc.rtpTransceivers 1562} 1563 1564// AddTrack adds a Track to the PeerConnection 1565func (pc *PeerConnection) AddTrack(track TrackLocal) (*RTPSender, error) { 1566 if pc.isClosed.get() { 1567 return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1568 } 1569 1570 var transceiver *RTPTransceiver 1571 for _, t := range pc.GetTransceivers() { 1572 if !t.stopped && t.kind == track.Kind() && t.Sender() == nil { 1573 transceiver = t 1574 break 1575 } 1576 } 1577 if transceiver != nil { 1578 sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport) 1579 if err != nil { 1580 return nil, err 1581 } 1582 transceiver.setSender(sender) 1583 // we still need to call setSendingTrack to ensure direction has changed 1584 if err := transceiver.setSendingTrack(track); err != nil { 1585 return nil, err 1586 } 1587 pc.onNegotiationNeeded() 1588 1589 return sender, nil 1590 } 1591 1592 transceiver, err := pc.AddTransceiverFromTrack(track) 1593 if err != nil { 1594 return nil, err 1595 } 1596 1597 return transceiver.Sender(), nil 1598} 1599 1600// RemoveTrack removes a Track from the PeerConnection 1601func (pc *PeerConnection) RemoveTrack(sender *RTPSender) error { 1602 if pc.isClosed.get() { 1603 return &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1604 } 1605 1606 var transceiver *RTPTransceiver 1607 for _, t := range pc.GetTransceivers() { 1608 if t.Sender() == sender { 1609 transceiver = t 1610 break 1611 } 1612 } 1613 1614 if transceiver == nil { 1615 return &rtcerr.InvalidAccessError{Err: ErrSenderNotCreatedByConnection} 1616 } else if err := sender.Stop(); err != nil { 1617 return err 1618 } 1619 1620 if err := transceiver.setSendingTrack(nil); err != nil { 1621 return err 1622 } 1623 1624 pc.onNegotiationNeeded() 1625 1626 return nil 1627} 1628 1629// AddTransceiverFromKind Create a new RtpTransceiver(SendRecv or RecvOnly) and add it to the set of transceivers. 1630func (pc *PeerConnection) AddTransceiverFromKind(kind RTPCodecType, init ...RtpTransceiverInit) (*RTPTransceiver, error) { 1631 if pc.isClosed.get() { 1632 return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1633 } 1634 1635 direction := RTPTransceiverDirectionSendrecv 1636 if len(init) > 1 { 1637 return nil, errPeerConnAddTransceiverFromKindOnlyAcceptsOne 1638 } else if len(init) == 1 { 1639 direction = init[0].Direction 1640 } 1641 1642 switch direction { 1643 case RTPTransceiverDirectionSendrecv: 1644 codecs := pc.api.mediaEngine.getCodecsByKind(kind) 1645 if len(codecs) == 0 { 1646 return nil, ErrNoCodecsAvailable 1647 } 1648 1649 track, err := NewTrackLocalStaticSample(codecs[0].RTPCodecCapability, util.MathRandAlpha(16), util.MathRandAlpha(16)) 1650 if err != nil { 1651 return nil, err 1652 } 1653 1654 return pc.AddTransceiverFromTrack(track, init...) 1655 case RTPTransceiverDirectionRecvonly: 1656 receiver, err := pc.api.NewRTPReceiver(kind, pc.dtlsTransport) 1657 if err != nil { 1658 return nil, err 1659 } 1660 1661 t := pc.newRTPTransceiver( 1662 receiver, 1663 nil, 1664 RTPTransceiverDirectionRecvonly, 1665 kind, 1666 ) 1667 1668 pc.onNegotiationNeeded() 1669 1670 return t, nil 1671 default: 1672 return nil, errPeerConnAddTransceiverFromKindSupport 1673 } 1674} 1675 1676// AddTransceiverFromTrack Create a new RtpTransceiver(SendRecv or SendOnly) and add it to the set of transceivers. 1677func (pc *PeerConnection) AddTransceiverFromTrack(track TrackLocal, init ...RtpTransceiverInit) (*RTPTransceiver, error) { 1678 if pc.isClosed.get() { 1679 return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1680 } 1681 1682 direction := RTPTransceiverDirectionSendrecv 1683 if len(init) > 1 { 1684 return nil, errPeerConnAddTransceiverFromTrackOnlyAcceptsOne 1685 } else if len(init) == 1 { 1686 direction = init[0].Direction 1687 } 1688 1689 switch direction { 1690 case RTPTransceiverDirectionSendrecv: 1691 receiver, err := pc.api.NewRTPReceiver(track.Kind(), pc.dtlsTransport) 1692 if err != nil { 1693 return nil, err 1694 } 1695 1696 sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport) 1697 if err != nil { 1698 return nil, err 1699 } 1700 1701 t := pc.newRTPTransceiver( 1702 receiver, 1703 sender, 1704 RTPTransceiverDirectionSendrecv, 1705 track.Kind(), 1706 ) 1707 1708 pc.onNegotiationNeeded() 1709 1710 return t, nil 1711 1712 case RTPTransceiverDirectionSendonly: 1713 sender, err := pc.api.NewRTPSender(track, pc.dtlsTransport) 1714 if err != nil { 1715 return nil, err 1716 } 1717 1718 t := pc.newRTPTransceiver( 1719 nil, 1720 sender, 1721 RTPTransceiverDirectionSendonly, 1722 track.Kind(), 1723 ) 1724 1725 pc.onNegotiationNeeded() 1726 1727 return t, nil 1728 default: 1729 return nil, errPeerConnAddTransceiverFromTrackSupport 1730 } 1731} 1732 1733// CreateDataChannel creates a new DataChannel object with the given label 1734// and optional DataChannelInit used to configure properties of the 1735// underlying channel such as data reliability. 1736func (pc *PeerConnection) CreateDataChannel(label string, options *DataChannelInit) (*DataChannel, error) { 1737 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #2) 1738 if pc.isClosed.get() { 1739 return nil, &rtcerr.InvalidStateError{Err: ErrConnectionClosed} 1740 } 1741 1742 params := &DataChannelParameters{ 1743 Label: label, 1744 Ordered: true, 1745 } 1746 1747 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #19) 1748 if options != nil { 1749 params.ID = options.ID 1750 } 1751 1752 if options != nil { 1753 // Ordered indicates if data is allowed to be delivered out of order. The 1754 // default value of true, guarantees that data will be delivered in order. 1755 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #9) 1756 if options.Ordered != nil { 1757 params.Ordered = *options.Ordered 1758 } 1759 1760 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #7) 1761 if options.MaxPacketLifeTime != nil { 1762 params.MaxPacketLifeTime = options.MaxPacketLifeTime 1763 } 1764 1765 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #8) 1766 if options.MaxRetransmits != nil { 1767 params.MaxRetransmits = options.MaxRetransmits 1768 } 1769 1770 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #10) 1771 if options.Protocol != nil { 1772 params.Protocol = *options.Protocol 1773 } 1774 1775 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #11) 1776 if len(params.Protocol) > 65535 { 1777 return nil, &rtcerr.TypeError{Err: ErrProtocolTooLarge} 1778 } 1779 1780 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #12) 1781 if options.Negotiated != nil { 1782 params.Negotiated = *options.Negotiated 1783 } 1784 } 1785 1786 d, err := pc.api.newDataChannel(params, pc.log) 1787 if err != nil { 1788 return nil, err 1789 } 1790 1791 // https://w3c.github.io/webrtc-pc/#peer-to-peer-data-api (Step #16) 1792 if d.maxPacketLifeTime != nil && d.maxRetransmits != nil { 1793 return nil, &rtcerr.TypeError{Err: ErrRetransmitsOrPacketLifeTime} 1794 } 1795 1796 pc.sctpTransport.lock.Lock() 1797 pc.sctpTransport.dataChannels = append(pc.sctpTransport.dataChannels, d) 1798 pc.sctpTransport.dataChannelsRequested++ 1799 pc.sctpTransport.lock.Unlock() 1800 1801 // If SCTP already connected open all the channels 1802 if pc.sctpTransport.State() == SCTPTransportStateConnected { 1803 if err = d.open(pc.sctpTransport); err != nil { 1804 return nil, err 1805 } 1806 } 1807 1808 pc.onNegotiationNeeded() 1809 1810 return d, nil 1811} 1812 1813// SetIdentityProvider is used to configure an identity provider to generate identity assertions 1814func (pc *PeerConnection) SetIdentityProvider(provider string) error { 1815 return errPeerConnSetIdentityProviderNotImplemented 1816} 1817 1818// WriteRTCP sends a user provided RTCP packet to the connected peer. If no peer is connected the 1819// packet is discarded. It also runs any configured interceptors. 1820func (pc *PeerConnection) WriteRTCP(pkts []rtcp.Packet) error { 1821 _, err := pc.interceptorRTCPWriter.Write(pkts, make(interceptor.Attributes)) 1822 return err 1823} 1824 1825func (pc *PeerConnection) writeRTCP(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) { 1826 raw, err := rtcp.Marshal(pkts) 1827 if err != nil { 1828 return 0, err 1829 } 1830 1831 srtcpSession, err := pc.dtlsTransport.getSRTCPSession() 1832 if err != nil { 1833 return 0, nil 1834 } 1835 1836 writeStream, err := srtcpSession.OpenWriteStream() 1837 if err != nil { 1838 return 0, fmt.Errorf("%w: %v", errPeerConnWriteRTCPOpenWriteStream, err) 1839 } 1840 1841 if n, err := writeStream.Write(raw); err != nil { 1842 return n, err 1843 } 1844 return 0, nil 1845} 1846 1847// Close ends the PeerConnection 1848func (pc *PeerConnection) Close() error { 1849 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #1) 1850 if pc.isClosed.get() { 1851 return nil 1852 } 1853 1854 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #2) 1855 pc.isClosed.set(true) 1856 1857 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #3) 1858 pc.signalingState.Set(SignalingStateClosed) 1859 1860 // Try closing everything and collect the errors 1861 // Shutdown strategy: 1862 // 1. All Conn close by closing their underlying Conn. 1863 // 2. A Mux stops this chain. It won't close the underlying 1864 // Conn if one of the endpoints is closed down. To 1865 // continue the chain the Mux has to be closed. 1866 closeErrs := make([]error, 4) 1867 1868 closeErrs = append(closeErrs, pc.api.interceptor.Close()) 1869 1870 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #4) 1871 for _, t := range pc.GetTransceivers() { 1872 if !t.stopped { 1873 closeErrs = append(closeErrs, t.Stop()) 1874 } 1875 } 1876 1877 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #5) 1878 pc.sctpTransport.lock.Lock() 1879 for _, d := range pc.sctpTransport.dataChannels { 1880 d.setReadyState(DataChannelStateClosed) 1881 } 1882 pc.sctpTransport.lock.Unlock() 1883 1884 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #6) 1885 if pc.sctpTransport != nil { 1886 closeErrs = append(closeErrs, pc.sctpTransport.Stop()) 1887 } 1888 1889 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #7) 1890 closeErrs = append(closeErrs, pc.dtlsTransport.Stop()) 1891 1892 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #8, #9, #10) 1893 if pc.iceTransport != nil { 1894 closeErrs = append(closeErrs, pc.iceTransport.Stop()) 1895 } 1896 1897 // https://www.w3.org/TR/webrtc/#dom-rtcpeerconnection-close (step #11) 1898 pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State()) 1899 1900 return util.FlattenErrs(closeErrs) 1901} 1902 1903func (pc *PeerConnection) newRTPTransceiver( 1904 receiver *RTPReceiver, 1905 sender *RTPSender, 1906 direction RTPTransceiverDirection, 1907 kind RTPCodecType, 1908) *RTPTransceiver { 1909 t := &RTPTransceiver{kind: kind} 1910 t.setReceiver(receiver) 1911 t.setSender(sender) 1912 t.setDirection(direction) 1913 1914 pc.mu.Lock() 1915 pc.rtpTransceivers = append(pc.rtpTransceivers, t) 1916 pc.mu.Unlock() 1917 1918 return t 1919} 1920 1921// CurrentLocalDescription represents the local description that was 1922// successfully negotiated the last time the PeerConnection transitioned 1923// into the stable state plus any local candidates that have been generated 1924// by the ICEAgent since the offer or answer was created. 1925func (pc *PeerConnection) CurrentLocalDescription() *SessionDescription { 1926 pc.mu.Lock() 1927 defer pc.mu.Unlock() 1928 return populateLocalCandidates(pc.currentLocalDescription, pc.iceGatherer, pc.ICEGatheringState()) 1929} 1930 1931// PendingLocalDescription represents a local description that is in the 1932// process of being negotiated plus any local candidates that have been 1933// generated by the ICEAgent since the offer or answer was created. If the 1934// PeerConnection is in the stable state, the value is null. 1935func (pc *PeerConnection) PendingLocalDescription() *SessionDescription { 1936 pc.mu.Lock() 1937 defer pc.mu.Unlock() 1938 return populateLocalCandidates(pc.pendingLocalDescription, pc.iceGatherer, pc.ICEGatheringState()) 1939} 1940 1941// CurrentRemoteDescription represents the last remote description that was 1942// successfully negotiated the last time the PeerConnection transitioned 1943// into the stable state plus any remote candidates that have been supplied 1944// via AddICECandidate() since the offer or answer was created. 1945func (pc *PeerConnection) CurrentRemoteDescription() *SessionDescription { 1946 return pc.currentRemoteDescription 1947} 1948 1949// PendingRemoteDescription represents a remote description that is in the 1950// process of being negotiated, complete with any remote candidates that 1951// have been supplied via AddICECandidate() since the offer or answer was 1952// created. If the PeerConnection is in the stable state, the value is 1953// null. 1954func (pc *PeerConnection) PendingRemoteDescription() *SessionDescription { 1955 return pc.pendingRemoteDescription 1956} 1957 1958// SignalingState attribute returns the signaling state of the 1959// PeerConnection instance. 1960func (pc *PeerConnection) SignalingState() SignalingState { 1961 return pc.signalingState.Get() 1962} 1963 1964// ICEGatheringState attribute returns the ICE gathering state of the 1965// PeerConnection instance. 1966func (pc *PeerConnection) ICEGatheringState() ICEGatheringState { 1967 if pc.iceGatherer == nil { 1968 return ICEGatheringStateNew 1969 } 1970 1971 switch pc.iceGatherer.State() { 1972 case ICEGathererStateNew: 1973 return ICEGatheringStateNew 1974 case ICEGathererStateGathering: 1975 return ICEGatheringStateGathering 1976 default: 1977 return ICEGatheringStateComplete 1978 } 1979} 1980 1981// ConnectionState attribute returns the connection state of the 1982// PeerConnection instance. 1983func (pc *PeerConnection) ConnectionState() PeerConnectionState { 1984 pc.mu.Lock() 1985 defer pc.mu.Unlock() 1986 1987 return pc.connectionState 1988} 1989 1990// GetStats return data providing statistics about the overall connection 1991func (pc *PeerConnection) GetStats() StatsReport { 1992 var ( 1993 dataChannelsAccepted uint32 1994 dataChannelsClosed uint32 1995 dataChannelsOpened uint32 1996 dataChannelsRequested uint32 1997 ) 1998 statsCollector := newStatsReportCollector() 1999 statsCollector.Collecting() 2000 2001 pc.mu.Lock() 2002 if pc.iceGatherer != nil { 2003 pc.iceGatherer.collectStats(statsCollector) 2004 } 2005 if pc.iceTransport != nil { 2006 pc.iceTransport.collectStats(statsCollector) 2007 } 2008 2009 pc.sctpTransport.lock.Lock() 2010 dataChannels := append([]*DataChannel{}, pc.sctpTransport.dataChannels...) 2011 dataChannelsAccepted = pc.sctpTransport.dataChannelsAccepted 2012 dataChannelsOpened = pc.sctpTransport.dataChannelsOpened 2013 dataChannelsRequested = pc.sctpTransport.dataChannelsRequested 2014 pc.sctpTransport.lock.Unlock() 2015 2016 for _, d := range dataChannels { 2017 state := d.ReadyState() 2018 if state != DataChannelStateConnecting && state != DataChannelStateOpen { 2019 dataChannelsClosed++ 2020 } 2021 2022 d.collectStats(statsCollector) 2023 } 2024 pc.sctpTransport.collectStats(statsCollector) 2025 2026 stats := PeerConnectionStats{ 2027 Timestamp: statsTimestampNow(), 2028 Type: StatsTypePeerConnection, 2029 ID: pc.statsID, 2030 DataChannelsAccepted: dataChannelsAccepted, 2031 DataChannelsClosed: dataChannelsClosed, 2032 DataChannelsOpened: dataChannelsOpened, 2033 DataChannelsRequested: dataChannelsRequested, 2034 } 2035 2036 statsCollector.Collect(stats.ID, stats) 2037 2038 certificates := pc.configuration.Certificates 2039 for _, certificate := range certificates { 2040 if err := certificate.collectStats(statsCollector); err != nil { 2041 continue 2042 } 2043 } 2044 pc.mu.Unlock() 2045 2046 pc.api.mediaEngine.collectStats(statsCollector) 2047 2048 return statsCollector.Ready() 2049} 2050 2051// Start all transports. PeerConnection now has enough state 2052func (pc *PeerConnection) startTransports(iceRole ICERole, dtlsRole DTLSRole, remoteUfrag, remotePwd, fingerprint, fingerprintHash string) { 2053 // Start the ice transport 2054 err := pc.iceTransport.Start( 2055 pc.iceGatherer, 2056 ICEParameters{ 2057 UsernameFragment: remoteUfrag, 2058 Password: remotePwd, 2059 ICELite: false, 2060 }, 2061 &iceRole, 2062 ) 2063 if err != nil { 2064 pc.log.Warnf("Failed to start manager: %s", err) 2065 return 2066 } 2067 2068 // Start the dtls transport 2069 err = pc.dtlsTransport.Start(DTLSParameters{ 2070 Role: dtlsRole, 2071 Fingerprints: []DTLSFingerprint{{Algorithm: fingerprintHash, Value: fingerprint}}, 2072 }) 2073 pc.updateConnectionState(pc.ICEConnectionState(), pc.dtlsTransport.State()) 2074 if err != nil { 2075 pc.log.Warnf("Failed to start manager: %s", err) 2076 return 2077 } 2078} 2079 2080func (pc *PeerConnection) startRTP(isRenegotiation bool, remoteDesc *SessionDescription, currentTransceivers []*RTPTransceiver) { 2081 trackDetails := trackDetailsFromSDP(pc.log, remoteDesc.parsed) 2082 if isRenegotiation { 2083 for _, t := range currentTransceivers { 2084 if t.Receiver() == nil || t.Receiver().Track() == nil { 2085 continue 2086 } 2087 2088 t.Receiver().Track().mu.Lock() 2089 ssrc := t.Receiver().Track().ssrc 2090 2091 if details := trackDetailsForSSRC(trackDetails, ssrc); details != nil { 2092 t.Receiver().Track().id = details.id 2093 t.Receiver().Track().streamID = details.streamID 2094 t.Receiver().Track().mu.Unlock() 2095 continue 2096 } 2097 2098 t.Receiver().Track().mu.Unlock() 2099 2100 if err := t.Receiver().Stop(); err != nil { 2101 pc.log.Warnf("Failed to stop RtpReceiver: %s", err) 2102 continue 2103 } 2104 2105 receiver, err := pc.api.NewRTPReceiver(t.Receiver().kind, pc.dtlsTransport) 2106 if err != nil { 2107 pc.log.Warnf("Failed to create new RtpReceiver: %s", err) 2108 continue 2109 } 2110 t.setReceiver(receiver) 2111 } 2112 } 2113 2114 pc.startRTPReceivers(trackDetails, currentTransceivers) 2115 if haveApplicationMediaSection(remoteDesc.parsed) { 2116 pc.startSCTP() 2117 } 2118 2119 if !isRenegotiation { 2120 pc.undeclaredMediaProcessor() 2121 } 2122} 2123 2124// generateUnmatchedSDP generates an SDP that doesn't take remote state into account 2125// This is used for the initial call for CreateOffer 2126func (pc *PeerConnection) generateUnmatchedSDP(transceivers []*RTPTransceiver, useIdentity bool) (*sdp.SessionDescription, error) { 2127 d, err := sdp.NewJSEPSessionDescription(useIdentity) 2128 if err != nil { 2129 return nil, err 2130 } 2131 2132 iceParams, err := pc.iceGatherer.GetLocalParameters() 2133 if err != nil { 2134 return nil, err 2135 } 2136 2137 candidates, err := pc.iceGatherer.GetLocalCandidates() 2138 if err != nil { 2139 return nil, err 2140 } 2141 2142 isPlanB := pc.configuration.SDPSemantics == SDPSemanticsPlanB 2143 mediaSections := []mediaSection{} 2144 2145 // Needed for pc.sctpTransport.dataChannelsRequested 2146 pc.sctpTransport.lock.Lock() 2147 defer pc.sctpTransport.lock.Unlock() 2148 2149 if isPlanB { 2150 video := make([]*RTPTransceiver, 0) 2151 audio := make([]*RTPTransceiver, 0) 2152 2153 for _, t := range transceivers { 2154 if t.kind == RTPCodecTypeVideo { 2155 video = append(video, t) 2156 } else if t.kind == RTPCodecTypeAudio { 2157 audio = append(audio, t) 2158 } 2159 if t.Sender() != nil { 2160 t.Sender().setNegotiated() 2161 } 2162 } 2163 2164 if len(video) > 0 { 2165 mediaSections = append(mediaSections, mediaSection{id: "video", transceivers: video}) 2166 } 2167 if len(audio) > 0 { 2168 mediaSections = append(mediaSections, mediaSection{id: "audio", transceivers: audio}) 2169 } 2170 2171 if pc.sctpTransport.dataChannelsRequested != 0 { 2172 mediaSections = append(mediaSections, mediaSection{id: "data", data: true}) 2173 } 2174 } else { 2175 for _, t := range transceivers { 2176 if t.Sender() != nil { 2177 t.Sender().setNegotiated() 2178 } 2179 mediaSections = append(mediaSections, mediaSection{id: t.Mid(), transceivers: []*RTPTransceiver{t}}) 2180 } 2181 2182 if pc.sctpTransport.dataChannelsRequested != 0 { 2183 mediaSections = append(mediaSections, mediaSection{id: strconv.Itoa(len(mediaSections)), data: true}) 2184 } 2185 } 2186 2187 dtlsFingerprints, err := pc.configuration.Certificates[0].GetFingerprints() 2188 if err != nil { 2189 return nil, err 2190 } 2191 2192 return populateSDP(d, isPlanB, dtlsFingerprints, pc.api.settingEngine.sdpMediaLevelFingerprints, pc.api.settingEngine.candidates.ICELite, pc.api.mediaEngine, connectionRoleFromDtlsRole(defaultDtlsRoleOffer), candidates, iceParams, mediaSections, pc.ICEGatheringState()) 2193} 2194 2195// generateMatchedSDP generates a SDP and takes the remote state into account 2196// this is used everytime we have a RemoteDescription 2197// nolint: gocyclo 2198func (pc *PeerConnection) generateMatchedSDP(transceivers []*RTPTransceiver, useIdentity bool, includeUnmatched bool, connectionRole sdp.ConnectionRole) (*sdp.SessionDescription, error) { //nolint:gocognit 2199 d, err := sdp.NewJSEPSessionDescription(useIdentity) 2200 if err != nil { 2201 return nil, err 2202 } 2203 2204 iceParams, err := pc.iceGatherer.GetLocalParameters() 2205 if err != nil { 2206 return nil, err 2207 } 2208 2209 candidates, err := pc.iceGatherer.GetLocalCandidates() 2210 if err != nil { 2211 return nil, err 2212 } 2213 2214 var t *RTPTransceiver 2215 localTransceivers := append([]*RTPTransceiver{}, transceivers...) 2216 detectedPlanB := descriptionIsPlanB(pc.RemoteDescription()) 2217 mediaSections := []mediaSection{} 2218 alreadyHaveApplicationMediaSection := false 2219 2220 for _, media := range pc.RemoteDescription().parsed.MediaDescriptions { 2221 midValue := getMidValue(media) 2222 if midValue == "" { 2223 return nil, errPeerConnRemoteDescriptionWithoutMidValue 2224 } 2225 2226 if media.MediaName.Media == mediaSectionApplication { 2227 mediaSections = append(mediaSections, mediaSection{id: midValue, data: true}) 2228 alreadyHaveApplicationMediaSection = true 2229 continue 2230 } 2231 2232 kind := NewRTPCodecType(media.MediaName.Media) 2233 direction := getPeerDirection(media) 2234 if kind == 0 || direction == RTPTransceiverDirection(Unknown) { 2235 continue 2236 } 2237 2238 sdpSemantics := pc.configuration.SDPSemantics 2239 2240 switch { 2241 case sdpSemantics == SDPSemanticsPlanB || sdpSemantics == SDPSemanticsUnifiedPlanWithFallback && detectedPlanB: 2242 if !detectedPlanB { 2243 return nil, &rtcerr.TypeError{Err: ErrIncorrectSDPSemantics} 2244 } 2245 // If we're responding to a plan-b offer, then we should try to fill up this 2246 // media entry with all matching local transceivers 2247 mediaTransceivers := []*RTPTransceiver{} 2248 for { 2249 // keep going until we can't get any more 2250 t, localTransceivers = satisfyTypeAndDirection(kind, direction, localTransceivers) 2251 if t == nil { 2252 if len(mediaTransceivers) == 0 { 2253 t = &RTPTransceiver{kind: kind} 2254 t.setDirection(RTPTransceiverDirectionInactive) 2255 mediaTransceivers = append(mediaTransceivers, t) 2256 } 2257 break 2258 } 2259 if t.Sender() != nil { 2260 t.Sender().setNegotiated() 2261 } 2262 mediaTransceivers = append(mediaTransceivers, t) 2263 } 2264 mediaSections = append(mediaSections, mediaSection{id: midValue, transceivers: mediaTransceivers}) 2265 case sdpSemantics == SDPSemanticsUnifiedPlan || sdpSemantics == SDPSemanticsUnifiedPlanWithFallback: 2266 if detectedPlanB { 2267 return nil, &rtcerr.TypeError{Err: ErrIncorrectSDPSemantics} 2268 } 2269 t, localTransceivers = findByMid(midValue, localTransceivers) 2270 if t == nil { 2271 return nil, fmt.Errorf("%w: %q", errPeerConnTranscieverMidNil, midValue) 2272 } 2273 if t.Sender() != nil { 2274 t.Sender().setNegotiated() 2275 } 2276 mediaTransceivers := []*RTPTransceiver{t} 2277 mediaSections = append(mediaSections, mediaSection{id: midValue, transceivers: mediaTransceivers, ridMap: getRids(media)}) 2278 } 2279 } 2280 2281 // If we are offering also include unmatched local transceivers 2282 if includeUnmatched { 2283 if !detectedPlanB { 2284 for _, t := range localTransceivers { 2285 if t.Sender() != nil { 2286 t.Sender().setNegotiated() 2287 } 2288 mediaSections = append(mediaSections, mediaSection{id: t.Mid(), transceivers: []*RTPTransceiver{t}}) 2289 } 2290 } 2291 2292 if pc.sctpTransport.dataChannelsRequested != 0 && !alreadyHaveApplicationMediaSection { 2293 if detectedPlanB { 2294 mediaSections = append(mediaSections, mediaSection{id: "data", data: true}) 2295 } else { 2296 mediaSections = append(mediaSections, mediaSection{id: strconv.Itoa(len(mediaSections)), data: true}) 2297 } 2298 } 2299 } 2300 2301 if pc.configuration.SDPSemantics == SDPSemanticsUnifiedPlanWithFallback && detectedPlanB { 2302 pc.log.Info("Plan-B Offer detected; responding with Plan-B Answer") 2303 } 2304 2305 dtlsFingerprints, err := pc.configuration.Certificates[0].GetFingerprints() 2306 if err != nil { 2307 return nil, err 2308 } 2309 2310 return populateSDP(d, detectedPlanB, dtlsFingerprints, pc.api.settingEngine.sdpMediaLevelFingerprints, pc.api.settingEngine.candidates.ICELite, pc.api.mediaEngine, connectionRole, candidates, iceParams, mediaSections, pc.ICEGatheringState()) 2311} 2312 2313func (pc *PeerConnection) setGatherCompleteHandler(handler func()) { 2314 pc.iceGatherer.onGatheringCompleteHandler.Store(handler) 2315} 2316 2317// SCTP returns the SCTPTransport for this PeerConnection 2318// 2319// The SCTP transport over which SCTP data is sent and received. If SCTP has not been negotiated, the value is nil. 2320// https://www.w3.org/TR/webrtc/#attributes-15 2321func (pc *PeerConnection) SCTP() *SCTPTransport { 2322 return pc.sctpTransport 2323} 2324