1package memberlist 2 3import ( 4 "sort" 5 "sync" 6) 7 8// TransmitLimitedQueue is used to queue messages to broadcast to 9// the cluster (via gossip) but limits the number of transmits per 10// message. It also prioritizes messages with lower transmit counts 11// (hence newer messages). 12type TransmitLimitedQueue struct { 13 // NumNodes returns the number of nodes in the cluster. This is 14 // used to determine the retransmit count, which is calculated 15 // based on the log of this. 16 NumNodes func() int 17 18 // RetransmitMult is the multiplier used to determine the maximum 19 // number of retransmissions attempted. 20 RetransmitMult int 21 22 sync.Mutex 23 bcQueue limitedBroadcasts 24} 25 26type limitedBroadcast struct { 27 transmits int // Number of transmissions attempted. 28 b Broadcast 29} 30type limitedBroadcasts []*limitedBroadcast 31 32// Broadcast is something that can be broadcasted via gossip to 33// the memberlist cluster. 34type Broadcast interface { 35 // Invalidates checks if enqueuing the current broadcast 36 // invalidates a previous broadcast 37 Invalidates(b Broadcast) bool 38 39 // Returns a byte form of the message 40 Message() []byte 41 42 // Finished is invoked when the message will no longer 43 // be broadcast, either due to invalidation or to the 44 // transmit limit being reached 45 Finished() 46} 47 48// QueueBroadcast is used to enqueue a broadcast 49func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { 50 q.Lock() 51 defer q.Unlock() 52 53 // Check if this message invalidates another 54 n := len(q.bcQueue) 55 for i := 0; i < n; i++ { 56 if b.Invalidates(q.bcQueue[i].b) { 57 q.bcQueue[i].b.Finished() 58 copy(q.bcQueue[i:], q.bcQueue[i+1:]) 59 q.bcQueue[n-1] = nil 60 q.bcQueue = q.bcQueue[:n-1] 61 n-- 62 } 63 } 64 65 // Append to the queue 66 q.bcQueue = append(q.bcQueue, &limitedBroadcast{0, b}) 67} 68 69// GetBroadcasts is used to get a number of broadcasts, up to a byte limit 70// and applying a per-message overhead as provided. 71func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte { 72 q.Lock() 73 defer q.Unlock() 74 75 // Fast path the default case 76 if len(q.bcQueue) == 0 { 77 return nil 78 } 79 80 transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes()) 81 bytesUsed := 0 82 var toSend [][]byte 83 84 for i := len(q.bcQueue) - 1; i >= 0; i-- { 85 // Check if this is within our limits 86 b := q.bcQueue[i] 87 msg := b.b.Message() 88 if bytesUsed+overhead+len(msg) > limit { 89 continue 90 } 91 92 // Add to slice to send 93 bytesUsed += overhead + len(msg) 94 toSend = append(toSend, msg) 95 96 // Check if we should stop transmission 97 b.transmits++ 98 if b.transmits >= transmitLimit { 99 b.b.Finished() 100 n := len(q.bcQueue) 101 q.bcQueue[i], q.bcQueue[n-1] = q.bcQueue[n-1], nil 102 q.bcQueue = q.bcQueue[:n-1] 103 } 104 } 105 106 // If we are sending anything, we need to re-sort to deal 107 // with adjusted transmit counts 108 if len(toSend) > 0 { 109 q.bcQueue.Sort() 110 } 111 return toSend 112} 113 114// NumQueued returns the number of queued messages 115func (q *TransmitLimitedQueue) NumQueued() int { 116 q.Lock() 117 defer q.Unlock() 118 return len(q.bcQueue) 119} 120 121// Reset clears all the queued messages 122func (q *TransmitLimitedQueue) Reset() { 123 q.Lock() 124 defer q.Unlock() 125 for _, b := range q.bcQueue { 126 b.b.Finished() 127 } 128 q.bcQueue = nil 129} 130 131// Prune will retain the maxRetain latest messages, and the rest 132// will be discarded. This can be used to prevent unbounded queue sizes 133func (q *TransmitLimitedQueue) Prune(maxRetain int) { 134 q.Lock() 135 defer q.Unlock() 136 137 // Do nothing if queue size is less than the limit 138 n := len(q.bcQueue) 139 if n < maxRetain { 140 return 141 } 142 143 // Invalidate the messages we will be removing 144 for i := 0; i < n-maxRetain; i++ { 145 q.bcQueue[i].b.Finished() 146 } 147 148 // Move the messages, and retain only the last maxRetain 149 copy(q.bcQueue[0:], q.bcQueue[n-maxRetain:]) 150 q.bcQueue = q.bcQueue[:maxRetain] 151} 152 153func (b limitedBroadcasts) Len() int { 154 return len(b) 155} 156 157func (b limitedBroadcasts) Less(i, j int) bool { 158 return b[i].transmits < b[j].transmits 159} 160 161func (b limitedBroadcasts) Swap(i, j int) { 162 b[i], b[j] = b[j], b[i] 163} 164 165func (b limitedBroadcasts) Sort() { 166 sort.Sort(sort.Reverse(b)) 167} 168