1package memberlist
2
3import (
4	"sort"
5	"sync"
6)
7
8// TransmitLimitedQueue is used to queue messages to broadcast to
9// the cluster (via gossip) but limits the number of transmits per
10// message. It also prioritizes messages with lower transmit counts
11// (hence newer messages).
12type TransmitLimitedQueue struct {
13	// NumNodes returns the number of nodes in the cluster. This is
14	// used to determine the retransmit count, which is calculated
15	// based on the log of this.
16	NumNodes func() int
17
18	// RetransmitMult is the multiplier used to determine the maximum
19	// number of retransmissions attempted.
20	RetransmitMult int
21
22	sync.Mutex
23	bcQueue limitedBroadcasts
24}
25
26type limitedBroadcast struct {
27	transmits int // Number of transmissions attempted.
28	b         Broadcast
29}
30type limitedBroadcasts []*limitedBroadcast
31
32// Broadcast is something that can be broadcasted via gossip to
33// the memberlist cluster.
34type Broadcast interface {
35	// Invalidates checks if enqueuing the current broadcast
36	// invalidates a previous broadcast
37	Invalidates(b Broadcast) bool
38
39	// Returns a byte form of the message
40	Message() []byte
41
42	// Finished is invoked when the message will no longer
43	// be broadcast, either due to invalidation or to the
44	// transmit limit being reached
45	Finished()
46}
47
48// QueueBroadcast is used to enqueue a broadcast
49func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
50	q.Lock()
51	defer q.Unlock()
52
53	// Check if this message invalidates another
54	n := len(q.bcQueue)
55	for i := 0; i < n; i++ {
56		if b.Invalidates(q.bcQueue[i].b) {
57			q.bcQueue[i].b.Finished()
58			copy(q.bcQueue[i:], q.bcQueue[i+1:])
59			q.bcQueue[n-1] = nil
60			q.bcQueue = q.bcQueue[:n-1]
61			n--
62		}
63	}
64
65	// Append to the queue
66	q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b})
67}
68
69// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
70// and applying a per-message overhead as provided.
71func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
72	q.Lock()
73	defer q.Unlock()
74
75	// Fast path the default case
76	if len(q.bcQueue) == 0 {
77		return nil
78	}
79
80	transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
81	bytesUsed := 0
82	var toSend [][]byte
83
84	for i := len(q.bcQueue) - 1; i >= 0; i-- {
85		// Check if this is within our limits
86		b := q.bcQueue[i]
87		msg := b.b.Message()
88		if bytesUsed+overhead+len(msg) > limit {
89			continue
90		}
91
92		// Add to slice to send
93		bytesUsed += overhead + len(msg)
94		toSend = append(toSend, msg)
95
96		// Check if we should stop transmission
97		b.transmits++
98		if b.transmits >= transmitLimit {
99			b.b.Finished()
100			n := len(q.bcQueue)
101			q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil
102			q.bcQueue = q.bcQueue[:n-1]
103		}
104	}
105
106	// If we are sending anything, we need to re-sort to deal
107	// with adjusted transmit counts
108	if len(toSend) > 0 {
109		q.bcQueue.Sort()
110	}
111	return toSend
112}
113
114// NumQueued returns the number of queued messages
115func (q *TransmitLimitedQueue) NumQueued() int {
116	q.Lock()
117	defer q.Unlock()
118	return len(q.bcQueue)
119}
120
121// Reset clears all the queued messages
122func (q *TransmitLimitedQueue) Reset() {
123	q.Lock()
124	defer q.Unlock()
125	for _, b := range q.bcQueue {
126		b.b.Finished()
127	}
128	q.bcQueue = nil
129}
130
131// Prune will retain the maxRetain latest messages, and the rest
132// will be discarded. This can be used to prevent unbounded queue sizes
133func (q *TransmitLimitedQueue) Prune(maxRetain int) {
134	q.Lock()
135	defer q.Unlock()
136
137	// Do nothing if queue size is less than the limit
138	n := len(q.bcQueue)
139	if n < maxRetain {
140		return
141	}
142
143	// Invalidate the messages we will be removing
144	for i := 0; i < n-maxRetain; i++ {
145		q.bcQueue[i].b.Finished()
146	}
147
148	// Move the messages, and retain only the last maxRetain
149	copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:])
150	q.bcQueue = q.bcQueue[:maxRetain]
151}
152
153func (b limitedBroadcasts) Len() int {
154	return len(b)
155}
156
157func (b limitedBroadcasts) Less(i, j int) bool {
158	return b[i].transmits < b[j].transmits
159}
160
161func (b limitedBroadcasts) Swap(i, j int) {
162	b[i], b[j] = b[j], b[i]
163}
164
165func (b limitedBroadcasts) Sort() {
166	sort.Sort(sort.Reverse(b))
167}
168