1// +build !js 2 3package webrtc 4 5import ( 6 "fmt" 7 "sync" 8 9 "github.com/pion/rtcp" 10 "github.com/pion/rtp" 11 "github.com/pion/srtp" 12) 13 14// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer 15type RTPSender struct { 16 track *Track 17 rtcpReadStream *srtp.ReadStreamSRTCP 18 19 transport *DTLSTransport 20 21 // A reference to the associated api object 22 api *API 23 24 mu sync.RWMutex 25 sendCalled, stopCalled chan interface{} 26} 27 28// NewRTPSender constructs a new RTPSender 29func (api *API) NewRTPSender(track *Track, transport *DTLSTransport) (*RTPSender, error) { 30 if track == nil { 31 return nil, fmt.Errorf("Track must not be nil") 32 } else if transport == nil { 33 return nil, fmt.Errorf("DTLSTransport must not be nil") 34 } 35 36 track.mu.RLock() 37 defer track.mu.RUnlock() 38 if track.receiver != nil { 39 return nil, fmt.Errorf("RTPSender can not be constructed with remote track") 40 } 41 track.totalSenderCount++ 42 43 return &RTPSender{ 44 track: track, 45 transport: transport, 46 api: api, 47 sendCalled: make(chan interface{}), 48 stopCalled: make(chan interface{}), 49 }, nil 50} 51 52// Transport returns the currently-configured *DTLSTransport or nil 53// if one has not yet been configured 54func (r *RTPSender) Transport() *DTLSTransport { 55 r.mu.RLock() 56 defer r.mu.RUnlock() 57 return r.transport 58} 59 60// Send Attempts to set the parameters controlling the sending of media. 61func (r *RTPSender) Send(parameters RTPSendParameters) error { 62 r.mu.Lock() 63 defer r.mu.Unlock() 64 65 if r.hasSent() { 66 return fmt.Errorf("Send has already been called") 67 } 68 69 srtcpSession, err := r.transport.getSRTCPSession() 70 if err != nil { 71 return err 72 } 73 74 r.rtcpReadStream, err = srtcpSession.OpenReadStream(parameters.Encodings.SSRC) 75 if err != nil { 76 return err 77 } 78 79 r.track.mu.Lock() 80 r.track.activeSenders = append(r.track.activeSenders, r) 81 r.track.mu.Unlock() 82 83 close(r.sendCalled) 84 return nil 85} 86 87// Stop irreversibly stops the RTPSender 88func (r *RTPSender) Stop() error { 89 r.mu.Lock() 90 defer r.mu.Unlock() 91 92 select { 93 case <-r.stopCalled: 94 return nil 95 default: 96 } 97 98 r.track.mu.Lock() 99 defer r.track.mu.Unlock() 100 filtered := []*RTPSender{} 101 for _, s := range r.track.activeSenders { 102 if s != r { 103 filtered = append(filtered, s) 104 } else { 105 r.track.totalSenderCount-- 106 } 107 } 108 r.track.activeSenders = filtered 109 close(r.stopCalled) 110 111 if r.hasSent() { 112 return r.rtcpReadStream.Close() 113 } 114 115 return nil 116} 117 118// Read reads incoming RTCP for this RTPReceiver 119func (r *RTPSender) Read(b []byte) (n int, err error) { 120 <-r.sendCalled 121 return r.rtcpReadStream.Read(b) 122} 123 124// ReadRTCP is a convenience method that wraps Read and unmarshals for you 125func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, error) { 126 b := make([]byte, receiveMTU) 127 i, err := r.Read(b) 128 if err != nil { 129 return nil, err 130 } 131 132 return rtcp.Unmarshal(b[:i]) 133} 134 135// sendRTP should only be called by a track, this only exists so we can keep state in one place 136func (r *RTPSender) sendRTP(header *rtp.Header, payload []byte) (int, error) { 137 select { 138 case <-r.stopCalled: 139 return 0, fmt.Errorf("RTPSender has been stopped") 140 case <-r.sendCalled: 141 srtpSession, err := r.transport.getSRTPSession() 142 if err != nil { 143 return 0, err 144 } 145 146 writeStream, err := srtpSession.OpenWriteStream() 147 if err != nil { 148 return 0, err 149 } 150 151 return writeStream.WriteRTP(header, payload) 152 } 153} 154 155// hasSent tells if data has been ever sent for this instance 156func (r *RTPSender) hasSent() bool { 157 select { 158 case <-r.sendCalled: 159 return true 160 default: 161 return false 162 } 163} 164