1package main 2 3import ( 4 "encoding/json" 5 "log" 6 "sync/atomic" 7 "time" 8 9 "github.com/pion/webrtc/v2" 10) 11 12const ( 13 bufferedAmountLowThreshold uint64 = 512 * 1024 // 512 KB 14 maxBufferedAmount uint64 = 1024 * 1024 // 1 MB 15) 16 17func check(err error) { 18 if err != nil { 19 panic(err) 20 } 21} 22 23func setRemoteDescription(pc *webrtc.PeerConnection, sdp []byte) { 24 var desc webrtc.SessionDescription 25 err := json.Unmarshal(sdp, &desc) 26 check(err) 27 28 // Apply the desc as the remote description 29 err = pc.SetRemoteDescription(desc) 30 check(err) 31} 32 33func createOfferer() *webrtc.PeerConnection { 34 // Prepare the configuration 35 config := webrtc.Configuration{ 36 ICEServers: []webrtc.ICEServer{}, 37 } 38 39 // Create a new PeerConnection 40 pc, err := webrtc.NewPeerConnection(config) 41 check(err) 42 43 buf := make([]byte, 1024) 44 45 ordered := false 46 maxRetransmits := uint16(0) 47 48 options := &webrtc.DataChannelInit{ 49 Ordered: &ordered, 50 MaxRetransmits: &maxRetransmits, 51 } 52 53 sendMoreCh := make(chan struct{}) 54 55 // Create a datachannel with label 'data' 56 dc, err := pc.CreateDataChannel("data", options) 57 check(err) 58 59 // Register channel opening handling 60 dc.OnOpen(func() { 61 log.Printf("OnOpen: %s-%d. Start sending a series of 1024-byte packets as fast as it can\n", dc.Label(), dc.ID()) 62 63 for { 64 err2 := dc.Send(buf) 65 check(err2) 66 67 if dc.BufferedAmount()+uint64(len(buf)) > maxBufferedAmount { 68 // Wait until the bufferedAmount becomes lower than the threshold 69 <-sendMoreCh 70 } 71 } 72 }) 73 74 // Set bufferedAmountLowThreshold so that we can get notified when 75 // we can send more 76 dc.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) 77 78 // This callback is made when the current bufferedAmount becomes lower than the threadshold 79 dc.OnBufferedAmountLow(func() { 80 sendMoreCh <- struct{}{} 81 }) 82 83 return pc 84} 85 86func createAnswerer() *webrtc.PeerConnection { 87 // Prepare the configuration 88 config := webrtc.Configuration{ 89 ICEServers: []webrtc.ICEServer{}, 90 } 91 92 // Create a new PeerConnection 93 pc, err := webrtc.NewPeerConnection(config) 94 check(err) 95 96 pc.OnDataChannel(func(dc *webrtc.DataChannel) { 97 var totalBytesReceived uint64 98 99 // Register channel opening handling 100 dc.OnOpen(func() { 101 log.Printf("OnOpen: %s-%d. Start receiving data", dc.Label(), dc.ID()) 102 since := time.Now() 103 104 // Start printing out the observed throughput 105 for range time.NewTicker(1000 * time.Millisecond).C { 106 bps := float64(atomic.LoadUint64(&totalBytesReceived)*8) / time.Since(since).Seconds() 107 log.Printf("Throughput: %.03f Mbps", bps/1024/1024) 108 } 109 }) 110 111 // Register the OnMessage to handle incoming messages 112 dc.OnMessage(func(dcMsg webrtc.DataChannelMessage) { 113 n := len(dcMsg.Data) 114 atomic.AddUint64(&totalBytesReceived, uint64(n)) 115 }) 116 }) 117 118 return pc 119} 120 121func main() { 122 offerPC := createOfferer() 123 answerPC := createAnswerer() 124 125 // Now, create an offer 126 offer, err := offerPC.CreateOffer(nil) 127 check(err) 128 check(offerPC.SetLocalDescription(offer)) 129 desc, err := json.Marshal(offer) 130 check(err) 131 132 setRemoteDescription(answerPC, desc) 133 134 answer, err := answerPC.CreateAnswer(nil) 135 check(err) 136 check(answerPC.SetLocalDescription(answer)) 137 desc2, err := json.Marshal(answer) 138 check(err) 139 140 setRemoteDescription(offerPC, desc2) 141 142 // Block forever 143 select {} 144} 145