1package centrifuge
2
3import (
4	"container/heap"
5	"context"
6	"sync"
7	"time"
8
9	"github.com/centrifugal/centrifuge/internal/memstream"
10	"github.com/centrifugal/centrifuge/internal/priority"
11)
12
13// MemoryBroker is builtin default Broker which allows to run Centrifuge-based
14// server without any external broker. All data managed inside process memory.
15//
16// With this Broker you can only run single Centrifuge node. If you need to scale
17// you should consider using another Broker implementation instead – for example
18// RedisBroker.
19//
20// Running single node can be sufficient for many use cases especially when you
21// need maximum performance and not too many online clients. Consider configuring
22// your load balancer to have one backup Centrifuge node for HA in this case.
23type MemoryBroker struct {
24	node         *Node
25	historyHub   *historyHub
26	eventHandler BrokerEventHandler
27
28	// pubLocks synchronize access to publishing. We have to sync publish
29	// to handle publications in the order of offset to prevent InsufficientState
30	// errors.
31	// TODO: maybe replace with sharded pool of workers with buffered channels.
32	pubLocks map[int]*sync.Mutex
33}
34
35var _ Broker = (*MemoryBroker)(nil)
36
37// MemoryBrokerConfig is a memory broker config.
38type MemoryBrokerConfig struct {
39	// HistoryMetaTTL sets a time of inactive stream meta information expiration.
40	// This information contains an epoch and offset of each stream. Having this
41	// meta information helps in message recovery process.
42	// Must have a reasonable value for application.
43	// At moment works with seconds precision.
44	// TODO v1: since we have epoch, things should also properly work without meta
45	// information at all (but we loose possibility of long-term recover in stream
46	// without new messages). We can make this optional and disabled by default at
47	// least.
48	HistoryMetaTTL time.Duration
49}
50
51const numPubLocks = 4096
52
53// NewMemoryBroker initializes MemoryBroker.
54func NewMemoryBroker(n *Node, c MemoryBrokerConfig) (*MemoryBroker, error) {
55	pubLocks := make(map[int]*sync.Mutex, numPubLocks)
56	for i := 0; i < numPubLocks; i++ {
57		pubLocks[i] = &sync.Mutex{}
58	}
59	b := &MemoryBroker{
60		node:       n,
61		historyHub: newHistoryHub(c.HistoryMetaTTL),
62		pubLocks:   pubLocks,
63	}
64	return b, nil
65}
66
67// Run runs memory broker.
68func (b *MemoryBroker) Run(h BrokerEventHandler) error {
69	b.eventHandler = h
70	b.historyHub.runCleanups()
71	return nil
72}
73
74// Close is noop for now.
75func (b *MemoryBroker) Close(_ context.Context) error {
76	return nil
77}
78
79func (b *MemoryBroker) pubLock(ch string) *sync.Mutex {
80	return b.pubLocks[index(ch, numPubLocks)]
81}
82
83// Publish adds message into history hub and calls node method to handle message.
84// We don't have any PUB/SUB here as Memory Engine is single node only.
85func (b *MemoryBroker) Publish(ch string, data []byte, opts PublishOptions) (StreamPosition, error) {
86	mu := b.pubLock(ch)
87	mu.Lock()
88	defer mu.Unlock()
89
90	pub := &Publication{
91		Data: data,
92		Info: opts.ClientInfo,
93	}
94	if opts.HistorySize > 0 && opts.HistoryTTL > 0 {
95		streamTop, err := b.historyHub.add(ch, pub, opts)
96		if err != nil {
97			return StreamPosition{}, err
98		}
99		pub.Offset = streamTop.Offset
100		return streamTop, b.eventHandler.HandlePublication(ch, pub, streamTop)
101	}
102	return StreamPosition{}, b.eventHandler.HandlePublication(ch, pub, StreamPosition{})
103}
104
105// PublishJoin - see Broker interface description.
106func (b *MemoryBroker) PublishJoin(ch string, info *ClientInfo) error {
107	return b.eventHandler.HandleJoin(ch, info)
108}
109
110// PublishLeave - see Broker interface description.
111func (b *MemoryBroker) PublishLeave(ch string, info *ClientInfo) error {
112	return b.eventHandler.HandleLeave(ch, info)
113}
114
115// PublishControl - see Broker interface description.
116func (b *MemoryBroker) PublishControl(data []byte, _, _ string) error {
117	return b.eventHandler.HandleControl(data)
118}
119
120// Subscribe is noop here.
121func (b *MemoryBroker) Subscribe(_ string) error {
122	return nil
123}
124
125// Unsubscribe node from channel. Noop here.
126func (b *MemoryBroker) Unsubscribe(_ string) error {
127	return nil
128}
129
130// History - see Broker interface description.
131func (b *MemoryBroker) History(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) {
132	return b.historyHub.get(ch, filter)
133}
134
135// RemoveHistory - see Broker interface description.
136func (b *MemoryBroker) RemoveHistory(ch string) error {
137	return b.historyHub.remove(ch)
138}
139
140type historyHub struct {
141	sync.RWMutex
142	streams         map[string]*memstream.Stream
143	nextExpireCheck int64
144	expireQueue     priority.Queue
145	expires         map[string]int64
146	historyMetaTTL  time.Duration
147	nextRemoveCheck int64
148	removeQueue     priority.Queue
149	removes         map[string]int64
150}
151
152func newHistoryHub(historyMetaTTL time.Duration) *historyHub {
153	return &historyHub{
154		streams:        make(map[string]*memstream.Stream),
155		expireQueue:    priority.MakeQueue(),
156		expires:        make(map[string]int64),
157		historyMetaTTL: historyMetaTTL,
158		removeQueue:    priority.MakeQueue(),
159		removes:        make(map[string]int64),
160	}
161}
162
163func (h *historyHub) runCleanups() {
164	go h.expireStreams()
165	if h.historyMetaTTL > 0 {
166		go h.removeStreams()
167	}
168}
169
170func (h *historyHub) removeStreams() {
171	var nextRemoveCheck int64
172	for {
173		time.Sleep(time.Second)
174		h.Lock()
175		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > time.Now().Unix() {
176			h.Unlock()
177			continue
178		}
179		nextRemoveCheck = 0
180		for h.removeQueue.Len() > 0 {
181			item := heap.Pop(&h.removeQueue).(*priority.Item)
182			expireAt := item.Priority
183			if expireAt > time.Now().Unix() {
184				heap.Push(&h.removeQueue, item)
185				nextRemoveCheck = expireAt
186				break
187			}
188			ch := item.Value
189			exp, ok := h.removes[ch]
190			if !ok {
191				continue
192			}
193			if exp <= expireAt {
194				delete(h.removes, ch)
195				delete(h.streams, ch)
196			} else {
197				heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: exp})
198			}
199		}
200		h.nextRemoveCheck = nextRemoveCheck
201		h.Unlock()
202	}
203}
204
205func (h *historyHub) expireStreams() {
206	var nextExpireCheck int64
207	for {
208		time.Sleep(time.Second)
209		h.Lock()
210		if h.nextExpireCheck == 0 || h.nextExpireCheck > time.Now().Unix() {
211			h.Unlock()
212			continue
213		}
214		nextExpireCheck = 0
215		for h.expireQueue.Len() > 0 {
216			item := heap.Pop(&h.expireQueue).(*priority.Item)
217			expireAt := item.Priority
218			if expireAt > time.Now().Unix() {
219				heap.Push(&h.expireQueue, item)
220				nextExpireCheck = expireAt
221				break
222			}
223			ch := item.Value
224			exp, ok := h.expires[ch]
225			if !ok {
226				continue
227			}
228			if exp <= expireAt {
229				delete(h.expires, ch)
230				if stream, ok := h.streams[ch]; ok {
231					stream.Clear()
232				}
233			} else {
234				heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: exp})
235			}
236		}
237		h.nextExpireCheck = nextExpireCheck
238		h.Unlock()
239	}
240}
241
242func (h *historyHub) add(ch string, pub *Publication, opts PublishOptions) (StreamPosition, error) {
243	h.Lock()
244	defer h.Unlock()
245
246	var index uint64
247	var epoch string
248
249	expireAt := time.Now().Unix() + int64(opts.HistoryTTL.Seconds())
250	if _, ok := h.expires[ch]; !ok {
251		heap.Push(&h.expireQueue, &priority.Item{Value: ch, Priority: expireAt})
252	}
253	h.expires[ch] = expireAt
254	if h.nextExpireCheck == 0 || h.nextExpireCheck > expireAt {
255		h.nextExpireCheck = expireAt
256	}
257
258	if h.historyMetaTTL > 0 {
259		removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
260		if _, ok := h.removes[ch]; !ok {
261			heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
262		}
263		h.removes[ch] = removeAt
264		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt {
265			h.nextRemoveCheck = removeAt
266		}
267	}
268
269	if stream, ok := h.streams[ch]; ok {
270		index, _ = stream.Add(pub, opts.HistorySize)
271		epoch = stream.Epoch()
272	} else {
273		stream := memstream.New()
274		index, _ = stream.Add(pub, opts.HistorySize)
275		epoch = stream.Epoch()
276		h.streams[ch] = stream
277	}
278	pub.Offset = index
279
280	return StreamPosition{Offset: index, Epoch: epoch}, nil
281}
282
283// Lock must be held outside.
284func (h *historyHub) createStream(ch string) StreamPosition {
285	stream := memstream.New()
286	h.streams[ch] = stream
287	streamPosition := StreamPosition{}
288	streamPosition.Offset = 0
289	streamPosition.Epoch = stream.Epoch()
290	return streamPosition
291}
292
293func getPosition(stream *memstream.Stream) StreamPosition {
294	streamPosition := StreamPosition{}
295	streamPosition.Offset = stream.Top()
296	streamPosition.Epoch = stream.Epoch()
297	return streamPosition
298}
299
300func (h *historyHub) get(ch string, filter HistoryFilter) ([]*Publication, StreamPosition, error) {
301	h.Lock()
302	defer h.Unlock()
303
304	if h.historyMetaTTL > 0 {
305		removeAt := time.Now().Unix() + int64(h.historyMetaTTL.Seconds())
306		if _, ok := h.removes[ch]; !ok {
307			heap.Push(&h.removeQueue, &priority.Item{Value: ch, Priority: removeAt})
308		}
309		h.removes[ch] = removeAt
310		if h.nextRemoveCheck == 0 || h.nextRemoveCheck > removeAt {
311			h.nextRemoveCheck = removeAt
312		}
313	}
314
315	stream, ok := h.streams[ch]
316	if !ok {
317		return nil, h.createStream(ch), nil
318	}
319
320	if filter.Since == nil {
321		if filter.Limit == 0 {
322			return nil, getPosition(stream), nil
323		}
324		items, _, err := stream.Get(0, false, filter.Limit, filter.Reverse)
325		if err != nil {
326			return nil, StreamPosition{}, err
327		}
328		pubs := make([]*Publication, 0, len(items))
329		for _, item := range items {
330			pub := item.Value.(*Publication)
331			pubs = append(pubs, pub)
332		}
333		return pubs, getPosition(stream), nil
334	}
335
336	since := filter.Since
337
338	streamPosition := getPosition(stream)
339
340	if !filter.Reverse {
341		if streamPosition.Offset == since.Offset && since.Epoch == stream.Epoch() {
342			return nil, streamPosition, nil
343		}
344	}
345
346	streamOffset := since.Offset + 1
347	if filter.Reverse {
348		streamOffset = since.Offset - 1
349	}
350
351	items, _, err := stream.Get(streamOffset, true, filter.Limit, filter.Reverse)
352	if err != nil {
353		return nil, StreamPosition{}, err
354	}
355
356	pubs := make([]*Publication, 0, len(items))
357	for _, item := range items {
358		pub := item.Value.(*Publication)
359		pubs = append(pubs, pub)
360	}
361	return pubs, streamPosition, nil
362}
363
364func (h *historyHub) remove(ch string) error {
365	h.Lock()
366	defer h.Unlock()
367	if stream, ok := h.streams[ch]; ok {
368		stream.Clear()
369	}
370	return nil
371}
372