1// +build !js
2
3package webrtc
4
5import (
6	"fmt"
7	"sync"
8
9	"github.com/pion/rtcp"
10	"github.com/pion/rtp"
11	"github.com/pion/srtp"
12)
13
14// RTPSender allows an application to control how a given Track is encoded and transmitted to a remote peer
15type RTPSender struct {
16	track          *Track
17	rtcpReadStream *srtp.ReadStreamSRTCP
18
19	transport *DTLSTransport
20
21	// A reference to the associated api object
22	api *API
23
24	mu                     sync.RWMutex
25	sendCalled, stopCalled chan interface{}
26}
27
28// NewRTPSender constructs a new RTPSender
29func (api *API) NewRTPSender(track *Track, transport *DTLSTransport) (*RTPSender, error) {
30	if track == nil {
31		return nil, fmt.Errorf("Track must not be nil")
32	} else if transport == nil {
33		return nil, fmt.Errorf("DTLSTransport must not be nil")
34	}
35
36	track.mu.RLock()
37	defer track.mu.RUnlock()
38	if track.receiver != nil {
39		return nil, fmt.Errorf("RTPSender can not be constructed with remote track")
40	}
41	track.totalSenderCount++
42
43	return &RTPSender{
44		track:      track,
45		transport:  transport,
46		api:        api,
47		sendCalled: make(chan interface{}),
48		stopCalled: make(chan interface{}),
49	}, nil
50}
51
52// Transport returns the currently-configured *DTLSTransport or nil
53// if one has not yet been configured
54func (r *RTPSender) Transport() *DTLSTransport {
55	r.mu.RLock()
56	defer r.mu.RUnlock()
57	return r.transport
58}
59
60// Send Attempts to set the parameters controlling the sending of media.
61func (r *RTPSender) Send(parameters RTPSendParameters) error {
62	r.mu.Lock()
63	defer r.mu.Unlock()
64
65	if r.hasSent() {
66		return fmt.Errorf("Send has already been called")
67	}
68
69	srtcpSession, err := r.transport.getSRTCPSession()
70	if err != nil {
71		return err
72	}
73
74	r.rtcpReadStream, err = srtcpSession.OpenReadStream(parameters.Encodings.SSRC)
75	if err != nil {
76		return err
77	}
78
79	r.track.mu.Lock()
80	r.track.activeSenders = append(r.track.activeSenders, r)
81	r.track.mu.Unlock()
82
83	close(r.sendCalled)
84	return nil
85}
86
87// Stop irreversibly stops the RTPSender
88func (r *RTPSender) Stop() error {
89	r.mu.Lock()
90	defer r.mu.Unlock()
91
92	select {
93	case <-r.stopCalled:
94		return nil
95	default:
96	}
97
98	r.track.mu.Lock()
99	defer r.track.mu.Unlock()
100	filtered := []*RTPSender{}
101	for _, s := range r.track.activeSenders {
102		if s != r {
103			filtered = append(filtered, s)
104		} else {
105			r.track.totalSenderCount--
106		}
107	}
108	r.track.activeSenders = filtered
109	close(r.stopCalled)
110
111	if r.hasSent() {
112		return r.rtcpReadStream.Close()
113	}
114
115	return nil
116}
117
118// Read reads incoming RTCP for this RTPReceiver
119func (r *RTPSender) Read(b []byte) (n int, err error) {
120	<-r.sendCalled
121	return r.rtcpReadStream.Read(b)
122}
123
124// ReadRTCP is a convenience method that wraps Read and unmarshals for you
125func (r *RTPSender) ReadRTCP() ([]rtcp.Packet, error) {
126	b := make([]byte, receiveMTU)
127	i, err := r.Read(b)
128	if err != nil {
129		return nil, err
130	}
131
132	return rtcp.Unmarshal(b[:i])
133}
134
135// sendRTP should only be called by a track, this only exists so we can keep state in one place
136func (r *RTPSender) sendRTP(header *rtp.Header, payload []byte) (int, error) {
137	select {
138	case <-r.stopCalled:
139		return 0, fmt.Errorf("RTPSender has been stopped")
140	case <-r.sendCalled:
141		srtpSession, err := r.transport.getSRTPSession()
142		if err != nil {
143			return 0, err
144		}
145
146		writeStream, err := srtpSession.OpenWriteStream()
147		if err != nil {
148			return 0, err
149		}
150
151		return writeStream.WriteRTP(header, payload)
152	}
153}
154
155// hasSent tells if data has been ever sent for this instance
156func (r *RTPSender) hasSent() bool {
157	select {
158	case <-r.sendCalled:
159		return true
160	default:
161		return false
162	}
163}
164