1// Copyright 2017 The Gorilla WebSocket Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package websocket
6
7import (
8	"io"
9	"io/ioutil"
10	"sync/atomic"
11	"testing"
12)
13
14// broadcastBench allows to run broadcast benchmarks.
15// In every broadcast benchmark we create many connections, then send the same
16// message into every connection and wait for all writes complete. This emulates
17// an application where many connections listen to the same data - i.e. PUB/SUB
18// scenarios with many subscribers in one channel.
19type broadcastBench struct {
20	w           io.Writer
21	message     *broadcastMessage
22	closeCh     chan struct{}
23	doneCh      chan struct{}
24	count       int32
25	conns       []*broadcastConn
26	compression bool
27	usePrepared bool
28}
29
30type broadcastMessage struct {
31	payload  []byte
32	prepared *PreparedMessage
33}
34
35type broadcastConn struct {
36	conn  *Conn
37	msgCh chan *broadcastMessage
38}
39
40func newBroadcastConn(c *Conn) *broadcastConn {
41	return &broadcastConn{
42		conn:  c,
43		msgCh: make(chan *broadcastMessage, 1),
44	}
45}
46
47func newBroadcastBench(usePrepared, compression bool) *broadcastBench {
48	bench := &broadcastBench{
49		w:           ioutil.Discard,
50		doneCh:      make(chan struct{}),
51		closeCh:     make(chan struct{}),
52		usePrepared: usePrepared,
53		compression: compression,
54	}
55	msg := &broadcastMessage{
56		payload: textMessages(1)[0],
57	}
58	if usePrepared {
59		pm, _ := NewPreparedMessage(TextMessage, msg.payload)
60		msg.prepared = pm
61	}
62	bench.message = msg
63	bench.makeConns(10000)
64	return bench
65}
66
67func (b *broadcastBench) makeConns(numConns int) {
68	conns := make([]*broadcastConn, numConns)
69
70	for i := 0; i < numConns; i++ {
71		c := newTestConn(nil, b.w, true)
72		if b.compression {
73			c.enableWriteCompression = true
74			c.newCompressionWriter = compressNoContextTakeover
75		}
76		conns[i] = newBroadcastConn(c)
77		go func(c *broadcastConn) {
78			for {
79				select {
80				case msg := <-c.msgCh:
81					if b.usePrepared {
82						c.conn.WritePreparedMessage(msg.prepared)
83					} else {
84						c.conn.WriteMessage(TextMessage, msg.payload)
85					}
86					val := atomic.AddInt32(&b.count, 1)
87					if val%int32(numConns) == 0 {
88						b.doneCh <- struct{}{}
89					}
90				case <-b.closeCh:
91					return
92				}
93			}
94		}(conns[i])
95	}
96	b.conns = conns
97}
98
99func (b *broadcastBench) close() {
100	close(b.closeCh)
101}
102
103func (b *broadcastBench) runOnce() {
104	for _, c := range b.conns {
105		c.msgCh <- b.message
106	}
107	<-b.doneCh
108}
109
110func BenchmarkBroadcast(b *testing.B) {
111	benchmarks := []struct {
112		name        string
113		usePrepared bool
114		compression bool
115	}{
116		{"NoCompression", false, false},
117		{"WithCompression", false, true},
118		{"NoCompressionPrepared", true, false},
119		{"WithCompressionPrepared", true, true},
120	}
121	for _, bm := range benchmarks {
122		b.Run(bm.name, func(b *testing.B) {
123			bench := newBroadcastBench(bm.usePrepared, bm.compression)
124			defer bench.close()
125			b.ResetTimer()
126			for i := 0; i < b.N; i++ {
127				bench.runOnce()
128			}
129			b.ReportAllocs()
130		})
131	}
132}
133