1// Copyright 2017 The Gorilla WebSocket Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package websocket 6 7import ( 8 "io" 9 "io/ioutil" 10 "sync/atomic" 11 "testing" 12) 13 14// broadcastBench allows to run broadcast benchmarks. 15// In every broadcast benchmark we create many connections, then send the same 16// message into every connection and wait for all writes complete. This emulates 17// an application where many connections listen to the same data - i.e. PUB/SUB 18// scenarios with many subscribers in one channel. 19type broadcastBench struct { 20 w io.Writer 21 message *broadcastMessage 22 closeCh chan struct{} 23 doneCh chan struct{} 24 count int32 25 conns []*broadcastConn 26 compression bool 27 usePrepared bool 28} 29 30type broadcastMessage struct { 31 payload []byte 32 prepared *PreparedMessage 33} 34 35type broadcastConn struct { 36 conn *Conn 37 msgCh chan *broadcastMessage 38} 39 40func newBroadcastConn(c *Conn) *broadcastConn { 41 return &broadcastConn{ 42 conn: c, 43 msgCh: make(chan *broadcastMessage, 1), 44 } 45} 46 47func newBroadcastBench(usePrepared, compression bool) *broadcastBench { 48 bench := &broadcastBench{ 49 w: ioutil.Discard, 50 doneCh: make(chan struct{}), 51 closeCh: make(chan struct{}), 52 usePrepared: usePrepared, 53 compression: compression, 54 } 55 msg := &broadcastMessage{ 56 payload: textMessages(1)[0], 57 } 58 if usePrepared { 59 pm, _ := NewPreparedMessage(TextMessage, msg.payload) 60 msg.prepared = pm 61 } 62 bench.message = msg 63 bench.makeConns(10000) 64 return bench 65} 66 67func (b *broadcastBench) makeConns(numConns int) { 68 conns := make([]*broadcastConn, numConns) 69 70 for i := 0; i < numConns; i++ { 71 c := newTestConn(nil, b.w, true) 72 if b.compression { 73 c.enableWriteCompression = true 74 c.newCompressionWriter = compressNoContextTakeover 75 } 76 conns[i] = newBroadcastConn(c) 77 go func(c *broadcastConn) { 78 for { 79 select { 80 case msg := <-c.msgCh: 81 if b.usePrepared { 82 c.conn.WritePreparedMessage(msg.prepared) 83 } else { 84 c.conn.WriteMessage(TextMessage, msg.payload) 85 } 86 val := atomic.AddInt32(&b.count, 1) 87 if val%int32(numConns) == 0 { 88 b.doneCh <- struct{}{} 89 } 90 case <-b.closeCh: 91 return 92 } 93 } 94 }(conns[i]) 95 } 96 b.conns = conns 97} 98 99func (b *broadcastBench) close() { 100 close(b.closeCh) 101} 102 103func (b *broadcastBench) runOnce() { 104 for _, c := range b.conns { 105 c.msgCh <- b.message 106 } 107 <-b.doneCh 108} 109 110func BenchmarkBroadcast(b *testing.B) { 111 benchmarks := []struct { 112 name string 113 usePrepared bool 114 compression bool 115 }{ 116 {"NoCompression", false, false}, 117 {"WithCompression", false, true}, 118 {"NoCompressionPrepared", true, false}, 119 {"WithCompressionPrepared", true, true}, 120 } 121 for _, bm := range benchmarks { 122 b.Run(bm.name, func(b *testing.B) { 123 bench := newBroadcastBench(bm.usePrepared, bm.compression) 124 defer bench.close() 125 b.ResetTimer() 126 for i := 0; i < b.N; i++ { 127 bench.runOnce() 128 } 129 b.ReportAllocs() 130 }) 131 } 132} 133