1package main
2
3import (
4	"encoding/json"
5	"log"
6	"sync/atomic"
7	"time"
8
9	"github.com/pion/webrtc/v2"
10)
11
12const (
13	bufferedAmountLowThreshold uint64 = 512 * 1024  // 512 KB
14	maxBufferedAmount          uint64 = 1024 * 1024 // 1 MB
15)
16
17func check(err error) {
18	if err != nil {
19		panic(err)
20	}
21}
22
23func setRemoteDescription(pc *webrtc.PeerConnection, sdp []byte) {
24	var desc webrtc.SessionDescription
25	err := json.Unmarshal(sdp, &desc)
26	check(err)
27
28	// Apply the desc as the remote description
29	err = pc.SetRemoteDescription(desc)
30	check(err)
31}
32
33func createOfferer() *webrtc.PeerConnection {
34	// Prepare the configuration
35	config := webrtc.Configuration{
36		ICEServers: []webrtc.ICEServer{},
37	}
38
39	// Create a new PeerConnection
40	pc, err := webrtc.NewPeerConnection(config)
41	check(err)
42
43	buf := make([]byte, 1024)
44
45	ordered := false
46	maxRetransmits := uint16(0)
47
48	options := &webrtc.DataChannelInit{
49		Ordered:        &ordered,
50		MaxRetransmits: &maxRetransmits,
51	}
52
53	sendMoreCh := make(chan struct{})
54
55	// Create a datachannel with label 'data'
56	dc, err := pc.CreateDataChannel("data", options)
57	check(err)
58
59	// Register channel opening handling
60	dc.OnOpen(func() {
61		log.Printf("OnOpen: %s-%d. Start sending a series of 1024-byte packets as fast as it can\n", dc.Label(), dc.ID())
62
63		for {
64			err2 := dc.Send(buf)
65			check(err2)
66
67			if dc.BufferedAmount()+uint64(len(buf)) > maxBufferedAmount {
68				// Wait until the bufferedAmount becomes lower than the threshold
69				<-sendMoreCh
70			}
71		}
72	})
73
74	// Set bufferedAmountLowThreshold so that we can get notified when
75	// we can send more
76	dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold)
77
78	// This callback is made when the current bufferedAmount becomes lower than the threadshold
79	dc.OnBufferedAmountLow(func() {
80		sendMoreCh <- struct{}{}
81	})
82
83	return pc
84}
85
86func createAnswerer() *webrtc.PeerConnection {
87	// Prepare the configuration
88	config := webrtc.Configuration{
89		ICEServers: []webrtc.ICEServer{},
90	}
91
92	// Create a new PeerConnection
93	pc, err := webrtc.NewPeerConnection(config)
94	check(err)
95
96	pc.OnDataChannel(func(dc *webrtc.DataChannel) {
97		var totalBytesReceived uint64
98
99		// Register channel opening handling
100		dc.OnOpen(func() {
101			log.Printf("OnOpen: %s-%d. Start receiving data", dc.Label(), dc.ID())
102			since := time.Now()
103
104			// Start printing out the observed throughput
105			for range time.NewTicker(1000 * time.Millisecond).C {
106				bps := float64(atomic.LoadUint64(&totalBytesReceived)*8) / time.Since(since).Seconds()
107				log.Printf("Throughput: %.03f Mbps", bps/1024/1024)
108			}
109		})
110
111		// Register the OnMessage to handle incoming messages
112		dc.OnMessage(func(dcMsg webrtc.DataChannelMessage) {
113			n := len(dcMsg.Data)
114			atomic.AddUint64(&totalBytesReceived, uint64(n))
115		})
116	})
117
118	return pc
119}
120
121func main() {
122	offerPC := createOfferer()
123	answerPC := createAnswerer()
124
125	// Now, create an offer
126	offer, err := offerPC.CreateOffer(nil)
127	check(err)
128	check(offerPC.SetLocalDescription(offer))
129	desc, err := json.Marshal(offer)
130	check(err)
131
132	setRemoteDescription(answerPC, desc)
133
134	answer, err := answerPC.CreateAnswer(nil)
135	check(err)
136	check(answerPC.SetLocalDescription(answer))
137	desc2, err := json.Marshal(answer)
138	check(err)
139
140	setRemoteDescription(offerPC, desc2)
141
142	// Block forever
143	select {}
144}
145