1package tntengine
2
3import (
4	"context"
5	"errors"
6	"fmt"
7	"runtime"
8	"strings"
9	"sync"
10	"sync/atomic"
11	"time"
12
13	"github.com/FZambia/tarantool"
14	"github.com/centrifugal/centrifuge"
15	"github.com/centrifugal/protocol"
16	"github.com/google/uuid"
17	"github.com/vmihailenco/msgpack/v5"
18)
19
20const internalChannelPrefix = "__"
21
22const (
23	// tarantoolControlChannel is a name for control channel.
24	tarantoolControlChannel = internalChannelPrefix + "control"
25	// tarantoolNodeChannelPrefix is a prefix for node channel.
26	tarantoolNodeChannelPrefix = internalChannelPrefix + "node."
27)
28
29// Broker uses Tarantool to implement centrifuge.Broker functionality.
30type Broker struct {
31	controlRound uint64 // Keep atomic on struct top for 32-bit architectures.
32	node         *centrifuge.Node
33	sharding     bool
34	config       BrokerConfig
35	shards       []*Shard
36	nodeChannel  string
37}
38
39var _ centrifuge.Broker = (*Broker)(nil)
40
41// BrokerConfig is a config for Tarantool Broker.
42type BrokerConfig struct {
43	// HistoryMetaTTL sets a time of stream meta key expiration in Tarantool. Stream
44	// meta key is a Tarantool HASH that contains top offset in channel and epoch value.
45	// By default stream meta keys do not expire.
46	HistoryMetaTTL time.Duration
47
48	// UsePolling allows to turn on polling mode instead of push.
49	UsePolling bool
50
51	// Shards is a list of Tarantool instances to shard data by channel.
52	Shards []*Shard
53}
54
55// NewBroker initializes Tarantool Broker.
56func NewBroker(n *centrifuge.Node, config BrokerConfig) (*Broker, error) {
57	if len(config.Shards) == 0 {
58		return nil, errors.New("no Tarantool shards provided in configuration")
59	}
60	if len(config.Shards) > 1 {
61		n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards))))
62	}
63	e := &Broker{
64		node:        n,
65		shards:      config.Shards,
66		config:      config,
67		sharding:    len(config.Shards) > 1,
68		nodeChannel: nodeChannel(n.ID()),
69	}
70	return e, nil
71}
72
73// Run runs broker after node initialized.
74func (b *Broker) Run(h centrifuge.BrokerEventHandler) error {
75	for _, shard := range b.shards {
76		if err := b.runShard(shard, h); err != nil {
77			return err
78		}
79	}
80	return nil
81}
82
83func (b *Broker) runForever(fn func(), minDelay time.Duration) {
84	for {
85		started := time.Now()
86		fn()
87		elapsed := time.Since(started)
88		if elapsed < minDelay {
89			// Sleep for a while to prevent busy loop when reconnecting.
90			// If elapsed >= minDelay then fn will be restarted right away – this is
91			// intentional for fast reconnect in case of one random error.
92			time.Sleep(minDelay - elapsed)
93		}
94	}
95}
96
97const pubSubRoutineMinDelay = 300 * time.Millisecond
98
99// Run Tarantool shard.
100func (b *Broker) runShard(s *Shard, h centrifuge.BrokerEventHandler) error {
101	go b.runForever(func() {
102		b.runPubSub(s, h)
103	}, pubSubRoutineMinDelay)
104	go b.runForever(func() {
105		b.runControlPubSub(s, h)
106	}, pubSubRoutineMinDelay)
107	return nil
108}
109
110type pubRequest struct {
111	MsgType        string
112	Channel        string
113	Data           string
114	Info           string
115	HistoryTTL     int
116	HistorySize    int
117	HistoryMetaTTL int
118}
119
120type pubResponse struct {
121	Offset uint64
122	Epoch  string
123}
124
125func (m *pubResponse) DecodeMsgpack(d *msgpack.Decoder) error {
126	var err error
127	var l int
128	if l, err = d.DecodeArrayLen(); err != nil {
129		return err
130	}
131	if l != 2 {
132		return fmt.Errorf("malformed array len: %d", l)
133	}
134	if m.Offset, err = d.DecodeUint64(); err != nil {
135		return err
136	}
137	if m.Epoch, err = d.DecodeString(); err != nil {
138		return err
139	}
140	return nil
141}
142
143// Publish - see centrifuge.Broker interface description.
144func (b *Broker) Publish(ch string, data []byte, opts centrifuge.PublishOptions) (centrifuge.StreamPosition, error) {
145	s := consistentShard(ch, b.shards)
146	pr := &pubRequest{
147		MsgType:        "p",
148		Channel:        ch,
149		Data:           string(data),
150		Info:           b.clientInfoString(opts.ClientInfo),
151		HistoryTTL:     int(opts.HistoryTTL.Seconds()),
152		HistorySize:    opts.HistorySize,
153		HistoryMetaTTL: int(b.config.HistoryMetaTTL.Seconds()),
154	}
155	var resp pubResponse
156	err := s.ExecTyped(tarantool.Call("centrifuge.publish", pr), &resp)
157	if err != nil {
158		return centrifuge.StreamPosition{}, err
159	}
160	return centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch}, err
161}
162
163// PublishJoin - see centrifuge.Broker interface description.
164func (b *Broker) PublishJoin(ch string, info *centrifuge.ClientInfo) error {
165	s := consistentShard(ch, b.shards)
166	pr := pubRequest{
167		MsgType: "j",
168		Channel: ch,
169		Info:    b.clientInfoString(info),
170	}
171	_, err := s.Exec(tarantool.Call("centrifuge.publish", pr))
172	return err
173}
174
175// PublishLeave - see centrifuge.Broker interface description.
176func (b *Broker) PublishLeave(ch string, info *centrifuge.ClientInfo) error {
177	s := consistentShard(ch, b.shards)
178	pr := pubRequest{
179		MsgType: "l",
180		Channel: ch,
181		Info:    b.clientInfoString(info),
182	}
183	_, err := s.Exec(tarantool.Call("centrifuge.publish", pr))
184	return err
185}
186
187func (b *Broker) clientInfoString(clientInfo *centrifuge.ClientInfo) string {
188	var info string
189	if clientInfo != nil {
190		byteMessage, err := infoToProto(clientInfo).MarshalVT()
191		if err != nil {
192			return info
193		}
194		info = string(byteMessage)
195	}
196	return info
197}
198
199// PublishControl - see centrifuge.Broker interface description.
200func (b *Broker) PublishControl(data []byte, nodeID, _ string) error {
201	currentRound := atomic.AddUint64(&b.controlRound, 1)
202	index := currentRound % uint64(len(b.shards))
203	var channel string
204	if nodeID != "" {
205		channel = nodeChannel(nodeID)
206	} else {
207		channel = b.controlChannel()
208	}
209	pr := pubRequest{
210		MsgType: "c",
211		Channel: channel,
212		Data:    string(data),
213	}
214	_, err := b.shards[index].Exec(tarantool.Call("centrifuge.publish", pr))
215	return err
216}
217
218func (b *Broker) controlChannel() string {
219	return tarantoolControlChannel
220}
221
222func nodeChannel(nodeID string) string {
223	return tarantoolNodeChannelPrefix + nodeID
224}
225
226// Subscribe - see centrifuge.Broker interface description.
227func (b *Broker) Subscribe(ch string) error {
228	if strings.HasPrefix(ch, internalChannelPrefix) {
229		return centrifuge.ErrorBadRequest
230	}
231	if b.node.LogEnabled(centrifuge.LogLevelDebug) {
232		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]interface{}{"channel": ch}))
233	}
234	r := newSubRequest([]string{ch}, true)
235	s := b.shards[consistentIndex(ch, len(b.shards))]
236	return b.sendSubscribe(s, r)
237}
238
239// Unsubscribe - see centrifuge.Broker interface description.
240func (b *Broker) Unsubscribe(ch string) error {
241	if b.node.LogEnabled(centrifuge.LogLevelDebug) {
242		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]interface{}{"channel": ch}))
243	}
244	r := newSubRequest([]string{ch}, false)
245	s := b.shards[consistentIndex(ch, len(b.shards))]
246	return b.sendSubscribe(s, r)
247}
248
249var errOpTimeout = errors.New("operation timed out")
250
251func (b *Broker) sendSubscribe(shard *Shard, r subRequest) error {
252	select {
253	case shard.subCh <- r:
254	default:
255		timer := AcquireTimer(defaultRequestTimeout)
256		defer ReleaseTimer(timer)
257		select {
258		case shard.subCh <- r:
259		case <-timer.C:
260			return errOpTimeout
261		}
262	}
263	return r.result()
264}
265
266type historyRequest struct {
267	Channel        string
268	Offset         uint64
269	Limit          int
270	Reverse        bool
271	IncludePubs    bool
272	HistoryMetaTTL int
273}
274
275type historyResponse struct {
276	Offset uint64
277	Epoch  string
278	Pubs   []*centrifuge.Publication
279}
280
281func (m *historyResponse) DecodeMsgpack(d *msgpack.Decoder) error {
282	var err error
283	var l int
284	if l, err = d.DecodeArrayLen(); err != nil {
285		return err
286	}
287	if l != 3 {
288		return fmt.Errorf("malformed array len: %d", l)
289	}
290	if m.Offset, err = d.DecodeUint64(); err != nil {
291		return err
292	}
293	if m.Epoch, err = d.DecodeString(); err != nil {
294		return err
295	}
296	if l, err = d.DecodeArrayLen(); err != nil {
297		return err
298	}
299	if l == -1 {
300		return nil
301	}
302
303	pubs := make([]*centrifuge.Publication, 0, l)
304
305	for i := 0; i < l; i++ {
306		var pub centrifuge.Publication
307		var l int
308		if l, err = d.DecodeArrayLen(); err != nil {
309			return err
310		}
311		if l != 6 {
312			return fmt.Errorf("malformed array len: %d", l)
313		}
314		if _, err = d.DecodeUint64(); err != nil {
315			return err
316		}
317		if _, err = d.DecodeString(); err != nil {
318			return err
319		}
320		if pub.Offset, err = d.DecodeUint64(); err != nil {
321			return err
322		}
323		if _, err = d.DecodeFloat64(); err != nil {
324			return err
325		}
326		if data, err := d.DecodeString(); err != nil {
327			return err
328		} else {
329			if len(data) > 0 {
330				pub.Data = []byte(data)
331			}
332		}
333		if info, err := d.DecodeString(); err != nil {
334			return err
335		} else {
336			if len(info) > 0 {
337				var i protocol.ClientInfo
338				if err = i.UnmarshalVT([]byte(info)); err != nil {
339					return err
340				}
341				pub.Info = infoFromProto(&i)
342			}
343		}
344		pubs = append(pubs, &pub)
345	}
346	m.Pubs = pubs
347	return nil
348}
349
350// History - see centrifuge.Broker interface description.
351func (b *Broker) History(ch string, filter centrifuge.HistoryFilter) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) {
352	var includePubs = true
353	var offset uint64
354	if filter.Since != nil {
355		if filter.Reverse {
356			offset = filter.Since.Offset - 1
357		} else {
358			offset = filter.Since.Offset + 1
359		}
360	}
361	var limit int
362	if filter.Limit == 0 {
363		includePubs = false
364	}
365	if filter.Limit > 0 {
366		limit = filter.Limit
367	}
368	historyMetaTTLSeconds := int(b.config.HistoryMetaTTL.Seconds())
369	s := consistentShard(ch, b.shards)
370	req := historyRequest{
371		Channel:        ch,
372		Offset:         offset,
373		Limit:          limit,
374		Reverse:        filter.Reverse,
375		IncludePubs:    includePubs,
376		HistoryMetaTTL: historyMetaTTLSeconds,
377	}
378	var resp historyResponse
379	err := s.ExecTyped(tarantool.Call("centrifuge.history", req), &resp)
380	if err != nil {
381		return nil, centrifuge.StreamPosition{}, err
382	}
383	streamPosition := centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch}
384	return resp.Pubs, streamPosition, nil
385}
386
387type removeHistoryRequest struct {
388	Channel string
389}
390
391// RemoveHistory - see centrifuge.Broker interface description.
392func (b *Broker) RemoveHistory(ch string) error {
393	s := consistentShard(ch, b.shards)
394	_, err := s.Exec(tarantool.Call("centrifuge.remove_history", removeHistoryRequest{Channel: ch}))
395	return err
396}
397
398const (
399	// tarantoolPubSubWorkerChannelSize sets buffer size of channel to which we send all
400	// messages received from Tarantool PUB/SUB connection to process in separate goroutine.
401	tarantoolPubSubWorkerChannelSize = 512
402	// tarantoolSubscribeBatchLimit is a maximum number of channels to include in a single
403	// batch subscribe call.
404	tarantoolSubscribeBatchLimit = 512
405)
406
407func (b *Broker) getShard(channel string) *Shard {
408	if !b.sharding {
409		return b.shards[0]
410	}
411	return b.shards[consistentIndex(channel, len(b.shards))]
412}
413
414type pollRequest struct {
415	ConnID     string
416	UsePolling bool
417	Timeout    int
418}
419
420type subscribeRequest struct {
421	ConnID   string
422	Channels []string
423}
424
425type pubSubMessage struct {
426	Type    string
427	Channel string
428	Offset  uint64
429	Epoch   string
430	Data    []byte
431	Info    []byte
432}
433
434func (m *pubSubMessage) DecodeMsgpack(d *msgpack.Decoder) error {
435	var err error
436	var l int
437	if l, err = d.DecodeArrayLen(); err != nil {
438		return err
439	}
440	if l != 6 {
441		return fmt.Errorf("wrong array len: %d", l)
442	}
443	if m.Type, err = d.DecodeString(); err != nil {
444		return err
445	}
446	if m.Channel, err = d.DecodeString(); err != nil {
447		return err
448	}
449	if m.Offset, err = d.DecodeUint64(); err != nil {
450		return err
451	}
452	if m.Epoch, err = d.DecodeString(); err != nil {
453		return err
454	}
455	if data, err := d.DecodeString(); err != nil {
456		return err
457	} else {
458		m.Data = []byte(data)
459	}
460	if info, err := d.DecodeString(); err != nil {
461		return err
462	} else {
463		m.Info = []byte(info)
464	}
465	return nil
466}
467
468func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) {
469	logError := func(errString string) {
470		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]interface{}{"error": errString}))
471	}
472
473	u, err := uuid.NewRandom()
474	if err != nil {
475		logError(err.Error())
476		return
477	}
478	connID := u.String()
479
480	conn, cancel, err := s.pubSubConn()
481	if err != nil {
482		logError(err.Error())
483		return
484	}
485	defer cancel()
486	defer func() { _ = conn.Close() }()
487
488	// Register poller with unique ID.
489	result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0}))
490	if err != nil {
491		logError(err.Error())
492		return
493	}
494	if result.Error != "" {
495		logError(result.Error)
496		return
497	}
498
499	numWorkers := runtime.NumCPU()
500
501	b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool PUB/SUB, num workers: %d", numWorkers)))
502	defer func() {
503		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool PUB/SUB"))
504	}()
505
506	done := make(chan struct{})
507	var doneOnce sync.Once
508	closeDoneOnce := func() {
509		doneOnce.Do(func() {
510			close(done)
511			_ = conn.Close()
512		})
513	}
514	defer closeDoneOnce()
515
516	// Run subscriber goroutine.
517	go func(conn *tarantool.Connection) {
518		for {
519			select {
520			case <-done:
521				return
522			case r := <-s.subCh:
523				isSubscribe := r.subscribe
524				channelBatch := []subRequest{r}
525
526				chIDs := r.channels
527
528				var otherR *subRequest
529
530			loop:
531				for len(chIDs) < tarantoolSubscribeBatchLimit {
532					select {
533					case r := <-s.subCh:
534						if r.subscribe != isSubscribe {
535							// We can not mix subscribe and unsubscribe request into one batch
536							// so must stop here. As we consumed a subRequest value from channel
537							// we should take care of it later.
538							otherR = &r
539							break loop
540						}
541						channelBatch = append(channelBatch, r)
542						for _, ch := range r.channels {
543							chIDs = append(chIDs, ch)
544						}
545					default:
546						break loop
547					}
548				}
549
550				var opErr error
551				if isSubscribe {
552					_, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs}))
553					opErr = err
554				} else {
555					_, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs}))
556					opErr = err
557				}
558
559				if opErr != nil {
560					for _, r := range channelBatch {
561						r.done(opErr)
562					}
563					if otherR != nil {
564						otherR.done(opErr)
565					}
566					// Close conn, this should cause Receive to return with err below
567					// and whole runPubSub method to restart.
568					closeDoneOnce()
569					return
570				}
571				for _, r := range channelBatch {
572					r.done(nil)
573				}
574				if otherR != nil {
575					chIDs := otherR.channels
576					var opErr error
577					if otherR.subscribe {
578						_, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs}))
579						opErr = err
580					} else {
581						_, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs}))
582						opErr = err
583					}
584					if opErr != nil {
585						otherR.done(opErr)
586						// Close conn, this should cause Receive to return with err below
587						// and whole runPubSub method to restart.
588						closeDoneOnce()
589						return
590					}
591					otherR.done(nil)
592				}
593			}
594		}
595	}(conn)
596
597	// Run workers to spread received message processing work over worker goroutines.
598	workers := make(map[int]chan pubSubMessage)
599	for i := 0; i < numWorkers; i++ {
600		workerCh := make(chan pubSubMessage, tarantoolPubSubWorkerChannelSize)
601		workers[i] = workerCh
602		go func(ch chan pubSubMessage) {
603			for {
604				select {
605				case <-done:
606					return
607				case n := <-ch:
608					err := b.handleMessage(eventHandler, n)
609					if err != nil {
610						b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]interface{}{"error": err.Error()}))
611						continue
612					}
613				}
614			}
615		}(workerCh)
616	}
617
618	go func() {
619		var chIDs []string
620
621		channels := b.node.Hub().Channels()
622		for i := 0; i < len(channels); i++ {
623			if b.getShard(channels[i]) == s {
624				chIDs = append(chIDs, channels[i])
625			}
626		}
627
628		batch := make([]string, 0)
629
630		for i, ch := range chIDs {
631			if len(batch) > 0 && i%tarantoolSubscribeBatchLimit == 0 {
632				r := newSubRequest(batch, true)
633				err := b.sendSubscribe(s, r)
634				if err != nil {
635					b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()}))
636					closeDoneOnce()
637					return
638				}
639				batch = nil
640			}
641			batch = append(batch, ch)
642		}
643		if len(batch) > 0 {
644			r := newSubRequest(batch, true)
645			err := b.sendSubscribe(s, r)
646			if err != nil {
647				b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()}))
648				closeDoneOnce()
649				return
650			}
651		}
652	}()
653
654	processPubSubMessages := func(messages []pubSubMessage) {
655		for _, msg := range messages {
656			// Add message to worker channel preserving message order - i.e. messages
657			// from the same channel will be processed in the same worker.
658			workers[index(msg.Channel, numWorkers)] <- msg
659		}
660	}
661
662	for {
663		err := b.waitPubSubMessages(conn, connID, processPubSubMessages)
664		if err != nil {
665			logError(err.Error())
666			return
667		}
668	}
669}
670
671func (b *Broker) waitPubSubMessages(conn *tarantool.Connection, connID string, cb func([]pubSubMessage)) error {
672	if !b.config.UsePolling {
673		ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
674		defer cancel()
675		_, err := conn.ExecContext(ctx, tarantool.Call(
676			"centrifuge.get_messages",
677			pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25},
678		).WithPushTyped(func(decode func(interface{}) error) {
679			var m [][]pubSubMessage
680			if err := decode(&m); err != nil {
681				b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]interface{}{"error": err.Error()}))
682				return
683			}
684			if len(m) == 1 {
685				cb(m[0])
686			}
687		}))
688		if err != nil {
689			return err
690		}
691	} else {
692		var m [][]pubSubMessage
693		err := conn.ExecTyped(tarantool.Call(
694			"centrifuge.get_messages",
695			pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}),
696			&m,
697		)
698		if err != nil {
699			return err
700		}
701		if len(m) == 1 {
702			cb(m[0])
703		}
704	}
705	return nil
706}
707
708func (b *Broker) handleMessage(eventHandler centrifuge.BrokerEventHandler, msg pubSubMessage) error {
709	switch msg.Type {
710	case "p":
711		pub := &centrifuge.Publication{
712			Offset: msg.Offset,
713			Data:   msg.Data,
714		}
715		if len(msg.Info) > 0 {
716			var info protocol.ClientInfo
717			err := info.UnmarshalVT(msg.Info)
718			if err == nil {
719				pub.Info = infoFromProto(&info)
720			}
721		}
722		_ = eventHandler.HandlePublication(msg.Channel, pub, centrifuge.StreamPosition{Offset: msg.Offset, Epoch: msg.Epoch})
723	case "j":
724		var info protocol.ClientInfo
725		err := info.UnmarshalVT(msg.Info)
726		if err == nil {
727			_ = eventHandler.HandleJoin(msg.Channel, infoFromProto(&info))
728		}
729	case "l":
730		var info protocol.ClientInfo
731		err := info.UnmarshalVT(msg.Info)
732		if err == nil {
733			_ = eventHandler.HandleLeave(msg.Channel, infoFromProto(&info))
734		}
735	}
736	return nil
737}
738
739func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) {
740	logError := func(errString string) {
741		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]interface{}{"error": errString}))
742	}
743
744	u, err := uuid.NewRandom()
745	if err != nil {
746		logError(err.Error())
747		return
748	}
749	connID := u.String()
750
751	conn, cancel, err := s.pubSubConn()
752	if err != nil {
753		logError(err.Error())
754		return
755	}
756	defer cancel()
757	defer func() { _ = conn.Close() }()
758
759	// Register poller with unique ID.
760	result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0}))
761	if err != nil {
762		logError(err.Error())
763		return
764	}
765	if result.Error != "" {
766		logError(result.Error)
767		return
768	}
769
770	numWorkers := runtime.NumCPU()
771
772	b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool control PUB/SUB, num workers: %d", numWorkers)))
773	defer func() {
774		b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool control PUB/SUB"))
775	}()
776
777	done := make(chan struct{})
778	var doneOnce sync.Once
779	closeDoneOnce := func() {
780		doneOnce.Do(func() {
781			close(done)
782			_ = conn.Close()
783		})
784	}
785	defer closeDoneOnce()
786
787	// Run workers to spread message processing work over worker goroutines.
788	workCh := make(chan pubSubMessage)
789	for i := 0; i < numWorkers; i++ {
790		go func() {
791			for {
792				select {
793				case <-done:
794					return
795				case n := <-workCh:
796					err := eventHandler.HandleControl(n.Data)
797					if err != nil {
798						b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling control message", map[string]interface{}{"error": err.Error()}))
799						continue
800					}
801				}
802			}
803		}()
804	}
805
806	controlChannel := b.controlChannel()
807	result, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: []string{controlChannel, b.nodeChannel}}))
808	if err != nil || result.Error != "" {
809		if err != nil {
810			logError(err.Error())
811		} else {
812			logError(result.Error)
813		}
814		return
815	}
816
817	processPubSubMessages := func(messages []pubSubMessage) {
818		for _, msg := range messages {
819			workCh <- msg
820		}
821	}
822
823	for {
824		err := b.waitPubSubMessages(conn, connID, processPubSubMessages)
825		if err != nil {
826			logError(err.Error())
827			return
828		}
829	}
830}
831