1package centrifuge 2 3import ( 4 "container/heap" 5 "context" 6 "sync" 7 "time" 8 9 "github.com/centrifugal/centrifuge/internal/memstream" 10 "github.com/centrifugal/centrifuge/internal/priority" 11) 12 13// MemoryBroker is builtin default Broker which allows to run Centrifuge-based 14// server without any external broker. All data managed inside process memory. 15// 16// With this Broker you can only run single Centrifuge node. If you need to scale 17// you should consider using another Broker implementation instead – for example 18// RedisBroker. 19// 20// Running single node can be sufficient for many use cases especially when you 21// need maximum performance and not too many online clients. Consider configuring 22// your load balancer to have one backup Centrifuge node for HA in this case. 23type MemoryBroker struct { 24 node *Node 25 historyHub *historyHub 26 eventHandler BrokerEventHandler 27 28 // pubLocks synchronize access to publishing. We have to sync publish 29 // to handle publications in the order of offset to prevent InsufficientState 30 // errors. 31 // TODO: maybe replace with sharded pool of workers with buffered channels. 32 pubLocks map[int]*sync.Mutex 33} 34 35var _ Broker = (*MemoryBroker)(nil) 36 37// MemoryBrokerConfig is a memory broker config. 38type MemoryBrokerConfig struct { 39 // HistoryMetaTTL sets a time of inactive stream meta information expiration. 40 // This information contains an epoch and offset of each stream. Having this 41 // meta information helps in message recovery process. 42 // Must have a reasonable value for application. 43 // At moment works with seconds precision. 44 // TODO v1: since we have epoch, things should also properly work without meta 45 // information at all (but we loose possibility of long-term recover in stream 46 // without new messages). We can make this optional and disabled by default at 47 // least. 48 HistoryMetaTTL time.Duration 49} 50 51const numPubLocks = 4096 52 53// NewMemoryBroker initializes MemoryBroker. 54func NewMemoryBroker(n *Node, c MemoryBrokerConfig) (*MemoryBroker, error) { 55 pubLocks := make(map[int]*sync.Mutex, numPubLocks) 56 for i := 0; i < numPubLocks; i++ { 57 pubLocks[i] = &sync.Mutex{} 58 } 59 b := &MemoryBroker{ 60 node: n, 61 historyHub: newHistoryHub(c.HistoryMetaTTL), 62 pubLocks: pubLocks, 63 } 64 return b, nil 65} 66 67// Run runs memory broker. 68func (b *MemoryBroker) Run(h BrokerEventHandler) error { 69 b.eventHandler = h 70 b.historyHub.runCleanups() 71 return nil 72} 73 74// Close is noop for now. 75func (b *MemoryBroker) Close(_ context.Context) error { 76 return nil 77} 78 79func (b *MemoryBroker) pubLock(ch string) *sync.Mutex { 80 return b.pubLocks[index(ch, numPubLocks)] 81} 82 83// Publish adds message into history hub and calls node method to handle message. 84// We don't have any PUB/SUB here as Memory Engine is single node only. 85func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error) { 86 mu := b.pubLock(ch) 87 mu.Lock() 88 defer mu.Unlock() 89 90 pub := &Publication{ 91 Data: data, 92 Info: opts.ClientInfo, 93 } 94 if opts.HistorySize > 0 && opts.HistoryTTL > 0 { 95 streamTop, err := b.historyHub.add(ch, pub, opts) 96 if err != nil { 97 return StreamPosition{}, err 98 } 99 pub.Offset = streamTop.Offset 100 return streamTop, b.eventHandler.HandlePublication(ch, pub, streamTop) 101 } 102 return StreamPosition{}, b.eventHandler.HandlePublication(ch, pub, StreamPosition{}) 103} 104 105// PublishJoin - see Broker interface description. 106func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error { 107 return b.eventHandler.HandleJoin(ch, info) 108} 109 110// PublishLeave - see Broker interface description. 111func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error { 112 return b.eventHandler.HandleLeave(ch, info) 113} 114 115// PublishControl - see Broker interface description. 116func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error { 117 return b.eventHandler.HandleControl(data) 118} 119 120// Subscribe is noop here. 121func (b *MemoryBroker) Subscribe(_ string) error { 122 return nil 123} 124 125// Unsubscribe node from channel. Noop here. 126func (b *MemoryBroker) Unsubscribe(_ string) error { 127 return nil 128} 129 130// History - see Broker interface description. 131func (b *MemoryBroker) History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) { 132 return b.historyHub.get(ch, filter) 133} 134 135// RemoveHistory - see Broker interface description. 136func (b *MemoryBroker) RemoveHistory(ch string) error { 137 return b.historyHub.remove(ch) 138} 139 140type historyHub struct { 141 sync.RWMutex 142 streams map[string]*memstream.Stream 143 nextExpireCheck int64 144 expireQueue priority.Queue 145 expires map[string]int64 146 historyMetaTTL time.Duration 147 nextRemoveCheck int64 148 removeQueue priority.Queue 149 removes map[string]int64 150} 151 152func newHistoryHub(historyMetaTTL time.Duration) *historyHub { 153 return &historyHub{ 154 streams: make(map[string]*memstream.Stream), 155 expireQueue: priority.MakeQueue(), 156 expires: make(map[string]int64), 157 historyMetaTTL: historyMetaTTL, 158 removeQueue: priority.MakeQueue(), 159 removes: make(map[string]int64), 160 } 161} 162 163func (h *historyHub) runCleanups() { 164 go h.expireStreams() 165 if h.historyMetaTTL > 0 { 166 go h.removeStreams() 167 } 168} 169 170func (h *historyHub) removeStreams() { 171 var nextRemoveCheck int64 172 for { 173 time.Sleep(time.Second) 174 h.Lock() 175 if h.nextRemoveCheck == 0 || h.nextRemoveCheck > time.Now().Unix() { 176 h.Unlock() 177 continue 178 } 179 nextRemoveCheck = 0 180 for h.removeQueue.Len() > 0 { 181 item := heap.Pop(&h.removeQueue).(*priority.Item) 182 expireAt := item.Priority 183 if expireAt > time.Now().Unix() { 184 heap.Push(&h.removeQueue, item) 185 nextRemoveCheck = expireAt 186 break 187 } 188 ch := item.Value 189 exp, ok := h.removes[ch] 190 if !ok { 191 continue 192 } 193 if exp <= expireAt { 194 delete(h.removes, ch) 195 delete(h.streams, ch) 196 } else { 197 heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: exp}) 198 } 199 } 200 h.nextRemoveCheck = nextRemoveCheck 201 h.Unlock() 202 } 203} 204 205func (h *historyHub) expireStreams() { 206 var nextExpireCheck int64 207 for { 208 time.Sleep(time.Second) 209 h.Lock() 210 if h.nextExpireCheck == 0 || h.nextExpireCheck > time.Now().Unix() { 211 h.Unlock() 212 continue 213 } 214 nextExpireCheck = 0 215 for h.expireQueue.Len() > 0 { 216 item := heap.Pop(&h.expireQueue).(*priority.Item) 217 expireAt := item.Priority 218 if expireAt > time.Now().Unix() { 219 heap.Push(&h.expireQueue, item) 220 nextExpireCheck = expireAt 221 break 222 } 223 ch := item.Value 224 exp, ok := h.expires[ch] 225 if !ok { 226 continue 227 } 228 if exp <= expireAt { 229 delete(h.expires, ch) 230 if stream, ok := h.streams[ch]; ok { 231 stream.Clear() 232 } 233 } else { 234 heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: exp}) 235 } 236 } 237 h.nextExpireCheck = nextExpireCheck 238 h.Unlock() 239 } 240} 241 242func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (StreamPosition, error) { 243 h.Lock() 244 defer h.Unlock() 245 246 var index uint64 247 var epoch string 248 249 expireAt := time.Now().Unix() + int64(opts.HistoryTTL.Seconds()) 250 if _, ok := h.expires[ch]; !ok { 251 heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: expireAt}) 252 } 253 h.expires[ch] = expireAt 254 if h.nextExpireCheck == 0 || h.nextExpireCheck > expireAt { 255 h.nextExpireCheck = expireAt 256 } 257 258 if h.historyMetaTTL > 0 { 259 removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds()) 260 if _, ok := h.removes[ch]; !ok { 261 heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt}) 262 } 263 h.removes[ch] = removeAt 264 if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt { 265 h.nextRemoveCheck = removeAt 266 } 267 } 268 269 if stream, ok := h.streams[ch]; ok { 270 index, _ = stream.Add(pub, opts.HistorySize) 271 epoch = stream.Epoch() 272 } else { 273 stream := memstream.New() 274 index, _ = stream.Add(pub, opts.HistorySize) 275 epoch = stream.Epoch() 276 h.streams[ch] = stream 277 } 278 pub.Offset = index 279 280 return StreamPosition{Offset: index, Epoch: epoch}, nil 281} 282 283// Lock must be held outside. 284func (h *historyHub) createStream(ch string) StreamPosition { 285 stream := memstream.New() 286 h.streams[ch] = stream 287 streamPosition := StreamPosition{} 288 streamPosition.Offset = 0 289 streamPosition.Epoch = stream.Epoch() 290 return streamPosition 291} 292 293func getPosition(stream *memstream.Stream) StreamPosition { 294 streamPosition := StreamPosition{} 295 streamPosition.Offset = stream.Top() 296 streamPosition.Epoch = stream.Epoch() 297 return streamPosition 298} 299 300func (h *historyHub) get(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) { 301 h.Lock() 302 defer h.Unlock() 303 304 if h.historyMetaTTL > 0 { 305 removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds()) 306 if _, ok := h.removes[ch]; !ok { 307 heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt}) 308 } 309 h.removes[ch] = removeAt 310 if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt { 311 h.nextRemoveCheck = removeAt 312 } 313 } 314 315 stream, ok := h.streams[ch] 316 if !ok { 317 return nil, h.createStream(ch), nil 318 } 319 320 if filter.Since == nil { 321 if filter.Limit == 0 { 322 return nil, getPosition(stream), nil 323 } 324 items, _, err := stream.Get(0, false, filter.Limit, filter.Reverse) 325 if err != nil { 326 return nil, StreamPosition{}, err 327 } 328 pubs := make([]*Publication, 0, len(items)) 329 for _, item := range items { 330 pub := item.Value.(*Publication) 331 pubs = append(pubs, pub) 332 } 333 return pubs, getPosition(stream), nil 334 } 335 336 since := filter.Since 337 338 streamPosition := getPosition(stream) 339 340 if !filter.Reverse { 341 if streamPosition.Offset == since.Offset && since.Epoch == stream.Epoch() { 342 return nil, streamPosition, nil 343 } 344 } 345 346 streamOffset := since.Offset + 1 347 if filter.Reverse { 348 streamOffset = since.Offset - 1 349 } 350 351 items, _, err := stream.Get(streamOffset, true, filter.Limit, filter.Reverse) 352 if err != nil { 353 return nil, StreamPosition{}, err 354 } 355 356 pubs := make([]*Publication, 0, len(items)) 357 for _, item := range items { 358 pub := item.Value.(*Publication) 359 pubs = append(pubs, pub) 360 } 361 return pubs, streamPosition, nil 362} 363 364func (h *historyHub) remove(ch string) error { 365 h.Lock() 366 defer h.Unlock() 367 if stream, ok := h.streams[ch]; ok { 368 stream.Clear() 369 } 370 return nil 371} 372