1package tntengine 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "runtime" 8 "strings" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "github.com/FZambia/tarantool" 14 "github.com/centrifugal/centrifuge" 15 "github.com/centrifugal/protocol" 16 "github.com/google/uuid" 17 "github.com/vmihailenco/msgpack/v5" 18) 19 20const internalChannelPrefix = "__" 21 22const ( 23 // tarantoolControlChannel is a name for control channel. 24 tarantoolControlChannel = internalChannelPrefix + "control" 25 // tarantoolNodeChannelPrefix is a prefix for node channel. 26 tarantoolNodeChannelPrefix = internalChannelPrefix + "node." 27) 28 29// Broker uses Tarantool to implement centrifuge.Broker functionality. 30type Broker struct { 31 controlRound uint64 // Keep atomic on struct top for 32-bit architectures. 32 node *centrifuge.Node 33 sharding bool 34 config BrokerConfig 35 shards []*Shard 36 nodeChannel string 37} 38 39var _ centrifuge.Broker = (*Broker)(nil) 40 41// BrokerConfig is a config for Tarantool Broker. 42type BrokerConfig struct { 43 // HistoryMetaTTL sets a time of stream meta key expiration in Tarantool. Stream 44 // meta key is a Tarantool HASH that contains top offset in channel and epoch value. 45 // By default stream meta keys do not expire. 46 HistoryMetaTTL time.Duration 47 48 // UsePolling allows to turn on polling mode instead of push. 49 UsePolling bool 50 51 // Shards is a list of Tarantool instances to shard data by channel. 52 Shards []*Shard 53} 54 55// NewBroker initializes Tarantool Broker. 56func NewBroker(n *centrifuge.Node, config BrokerConfig) (*Broker, error) { 57 if len(config.Shards) == 0 { 58 return nil, errors.New("no Tarantool shards provided in configuration") 59 } 60 if len(config.Shards) > 1 { 61 n.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, fmt.Sprintf("Tarantool sharding enabled: %d shards", len(config.Shards)))) 62 } 63 e := &Broker{ 64 node: n, 65 shards: config.Shards, 66 config: config, 67 sharding: len(config.Shards) > 1, 68 nodeChannel: nodeChannel(n.ID()), 69 } 70 return e, nil 71} 72 73// Run runs broker after node initialized. 74func (b *Broker) Run(h centrifuge.BrokerEventHandler) error { 75 for _, shard := range b.shards { 76 if err := b.runShard(shard, h); err != nil { 77 return err 78 } 79 } 80 return nil 81} 82 83func (b *Broker) runForever(fn func(), minDelay time.Duration) { 84 for { 85 started := time.Now() 86 fn() 87 elapsed := time.Since(started) 88 if elapsed < minDelay { 89 // Sleep for a while to prevent busy loop when reconnecting. 90 // If elapsed >= minDelay then fn will be restarted right away – this is 91 // intentional for fast reconnect in case of one random error. 92 time.Sleep(minDelay - elapsed) 93 } 94 } 95} 96 97const pubSubRoutineMinDelay = 300 * time.Millisecond 98 99// Run Tarantool shard. 100func (b *Broker) runShard(s *Shard, h centrifuge.BrokerEventHandler) error { 101 go b.runForever(func() { 102 b.runPubSub(s, h) 103 }, pubSubRoutineMinDelay) 104 go b.runForever(func() { 105 b.runControlPubSub(s, h) 106 }, pubSubRoutineMinDelay) 107 return nil 108} 109 110type pubRequest struct { 111 MsgType string 112 Channel string 113 Data string 114 Info string 115 HistoryTTL int 116 HistorySize int 117 HistoryMetaTTL int 118} 119 120type pubResponse struct { 121 Offset uint64 122 Epoch string 123} 124 125func (m *pubResponse) DecodeMsgpack(d *msgpack.Decoder) error { 126 var err error 127 var l int 128 if l, err = d.DecodeArrayLen(); err != nil { 129 return err 130 } 131 if l != 2 { 132 return fmt.Errorf("malformed array len: %d", l) 133 } 134 if m.Offset, err = d.DecodeUint64(); err != nil { 135 return err 136 } 137 if m.Epoch, err = d.DecodeString(); err != nil { 138 return err 139 } 140 return nil 141} 142 143// Publish - see centrifuge.Broker interface description. 144func (b *Broker) Publish(ch string, data []byte, opts centrifuge.PublishOptions) (centrifuge.StreamPosition, error) { 145 s := consistentShard(ch, b.shards) 146 pr := &pubRequest{ 147 MsgType: "p", 148 Channel: ch, 149 Data: string(data), 150 Info: b.clientInfoString(opts.ClientInfo), 151 HistoryTTL: int(opts.HistoryTTL.Seconds()), 152 HistorySize: opts.HistorySize, 153 HistoryMetaTTL: int(b.config.HistoryMetaTTL.Seconds()), 154 } 155 var resp pubResponse 156 err := s.ExecTyped(tarantool.Call("centrifuge.publish", pr), &resp) 157 if err != nil { 158 return centrifuge.StreamPosition{}, err 159 } 160 return centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch}, err 161} 162 163// PublishJoin - see centrifuge.Broker interface description. 164func (b *Broker) PublishJoin(ch string, info *centrifuge.ClientInfo) error { 165 s := consistentShard(ch, b.shards) 166 pr := pubRequest{ 167 MsgType: "j", 168 Channel: ch, 169 Info: b.clientInfoString(info), 170 } 171 _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) 172 return err 173} 174 175// PublishLeave - see centrifuge.Broker interface description. 176func (b *Broker) PublishLeave(ch string, info *centrifuge.ClientInfo) error { 177 s := consistentShard(ch, b.shards) 178 pr := pubRequest{ 179 MsgType: "l", 180 Channel: ch, 181 Info: b.clientInfoString(info), 182 } 183 _, err := s.Exec(tarantool.Call("centrifuge.publish", pr)) 184 return err 185} 186 187func (b *Broker) clientInfoString(clientInfo *centrifuge.ClientInfo) string { 188 var info string 189 if clientInfo != nil { 190 byteMessage, err := infoToProto(clientInfo).MarshalVT() 191 if err != nil { 192 return info 193 } 194 info = string(byteMessage) 195 } 196 return info 197} 198 199// PublishControl - see centrifuge.Broker interface description. 200func (b *Broker) PublishControl(data []byte, nodeID, _ string) error { 201 currentRound := atomic.AddUint64(&b.controlRound, 1) 202 index := currentRound % uint64(len(b.shards)) 203 var channel string 204 if nodeID != "" { 205 channel = nodeChannel(nodeID) 206 } else { 207 channel = b.controlChannel() 208 } 209 pr := pubRequest{ 210 MsgType: "c", 211 Channel: channel, 212 Data: string(data), 213 } 214 _, err := b.shards[index].Exec(tarantool.Call("centrifuge.publish", pr)) 215 return err 216} 217 218func (b *Broker) controlChannel() string { 219 return tarantoolControlChannel 220} 221 222func nodeChannel(nodeID string) string { 223 return tarantoolNodeChannelPrefix + nodeID 224} 225 226// Subscribe - see centrifuge.Broker interface description. 227func (b *Broker) Subscribe(ch string) error { 228 if strings.HasPrefix(ch, internalChannelPrefix) { 229 return centrifuge.ErrorBadRequest 230 } 231 if b.node.LogEnabled(centrifuge.LogLevelDebug) { 232 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "subscribe node on channel", map[string]interface{}{"channel": ch})) 233 } 234 r := newSubRequest([]string{ch}, true) 235 s := b.shards[consistentIndex(ch, len(b.shards))] 236 return b.sendSubscribe(s, r) 237} 238 239// Unsubscribe - see centrifuge.Broker interface description. 240func (b *Broker) Unsubscribe(ch string) error { 241 if b.node.LogEnabled(centrifuge.LogLevelDebug) { 242 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "unsubscribe node from channel", map[string]interface{}{"channel": ch})) 243 } 244 r := newSubRequest([]string{ch}, false) 245 s := b.shards[consistentIndex(ch, len(b.shards))] 246 return b.sendSubscribe(s, r) 247} 248 249var errOpTimeout = errors.New("operation timed out") 250 251func (b *Broker) sendSubscribe(shard *Shard, r subRequest) error { 252 select { 253 case shard.subCh <- r: 254 default: 255 timer := AcquireTimer(defaultRequestTimeout) 256 defer ReleaseTimer(timer) 257 select { 258 case shard.subCh <- r: 259 case <-timer.C: 260 return errOpTimeout 261 } 262 } 263 return r.result() 264} 265 266type historyRequest struct { 267 Channel string 268 Offset uint64 269 Limit int 270 Reverse bool 271 IncludePubs bool 272 HistoryMetaTTL int 273} 274 275type historyResponse struct { 276 Offset uint64 277 Epoch string 278 Pubs []*centrifuge.Publication 279} 280 281func (m *historyResponse) DecodeMsgpack(d *msgpack.Decoder) error { 282 var err error 283 var l int 284 if l, err = d.DecodeArrayLen(); err != nil { 285 return err 286 } 287 if l != 3 { 288 return fmt.Errorf("malformed array len: %d", l) 289 } 290 if m.Offset, err = d.DecodeUint64(); err != nil { 291 return err 292 } 293 if m.Epoch, err = d.DecodeString(); err != nil { 294 return err 295 } 296 if l, err = d.DecodeArrayLen(); err != nil { 297 return err 298 } 299 if l == -1 { 300 return nil 301 } 302 303 pubs := make([]*centrifuge.Publication, 0, l) 304 305 for i := 0; i < l; i++ { 306 var pub centrifuge.Publication 307 var l int 308 if l, err = d.DecodeArrayLen(); err != nil { 309 return err 310 } 311 if l != 6 { 312 return fmt.Errorf("malformed array len: %d", l) 313 } 314 if _, err = d.DecodeUint64(); err != nil { 315 return err 316 } 317 if _, err = d.DecodeString(); err != nil { 318 return err 319 } 320 if pub.Offset, err = d.DecodeUint64(); err != nil { 321 return err 322 } 323 if _, err = d.DecodeFloat64(); err != nil { 324 return err 325 } 326 if data, err := d.DecodeString(); err != nil { 327 return err 328 } else { 329 if len(data) > 0 { 330 pub.Data = []byte(data) 331 } 332 } 333 if info, err := d.DecodeString(); err != nil { 334 return err 335 } else { 336 if len(info) > 0 { 337 var i protocol.ClientInfo 338 if err = i.UnmarshalVT([]byte(info)); err != nil { 339 return err 340 } 341 pub.Info = infoFromProto(&i) 342 } 343 } 344 pubs = append(pubs, &pub) 345 } 346 m.Pubs = pubs 347 return nil 348} 349 350// History - see centrifuge.Broker interface description. 351func (b *Broker) History(ch string, filter centrifuge.HistoryFilter) ([]*centrifuge.Publication, centrifuge.StreamPosition, error) { 352 var includePubs = true 353 var offset uint64 354 if filter.Since != nil { 355 if filter.Reverse { 356 offset = filter.Since.Offset - 1 357 } else { 358 offset = filter.Since.Offset + 1 359 } 360 } 361 var limit int 362 if filter.Limit == 0 { 363 includePubs = false 364 } 365 if filter.Limit > 0 { 366 limit = filter.Limit 367 } 368 historyMetaTTLSeconds := int(b.config.HistoryMetaTTL.Seconds()) 369 s := consistentShard(ch, b.shards) 370 req := historyRequest{ 371 Channel: ch, 372 Offset: offset, 373 Limit: limit, 374 Reverse: filter.Reverse, 375 IncludePubs: includePubs, 376 HistoryMetaTTL: historyMetaTTLSeconds, 377 } 378 var resp historyResponse 379 err := s.ExecTyped(tarantool.Call("centrifuge.history", req), &resp) 380 if err != nil { 381 return nil, centrifuge.StreamPosition{}, err 382 } 383 streamPosition := centrifuge.StreamPosition{Offset: resp.Offset, Epoch: resp.Epoch} 384 return resp.Pubs, streamPosition, nil 385} 386 387type removeHistoryRequest struct { 388 Channel string 389} 390 391// RemoveHistory - see centrifuge.Broker interface description. 392func (b *Broker) RemoveHistory(ch string) error { 393 s := consistentShard(ch, b.shards) 394 _, err := s.Exec(tarantool.Call("centrifuge.remove_history", removeHistoryRequest{Channel: ch})) 395 return err 396} 397 398const ( 399 // tarantoolPubSubWorkerChannelSize sets buffer size of channel to which we send all 400 // messages received from Tarantool PUB/SUB connection to process in separate goroutine. 401 tarantoolPubSubWorkerChannelSize = 512 402 // tarantoolSubscribeBatchLimit is a maximum number of channels to include in a single 403 // batch subscribe call. 404 tarantoolSubscribeBatchLimit = 512 405) 406 407func (b *Broker) getShard(channel string) *Shard { 408 if !b.sharding { 409 return b.shards[0] 410 } 411 return b.shards[consistentIndex(channel, len(b.shards))] 412} 413 414type pollRequest struct { 415 ConnID string 416 UsePolling bool 417 Timeout int 418} 419 420type subscribeRequest struct { 421 ConnID string 422 Channels []string 423} 424 425type pubSubMessage struct { 426 Type string 427 Channel string 428 Offset uint64 429 Epoch string 430 Data []byte 431 Info []byte 432} 433 434func (m *pubSubMessage) DecodeMsgpack(d *msgpack.Decoder) error { 435 var err error 436 var l int 437 if l, err = d.DecodeArrayLen(); err != nil { 438 return err 439 } 440 if l != 6 { 441 return fmt.Errorf("wrong array len: %d", l) 442 } 443 if m.Type, err = d.DecodeString(); err != nil { 444 return err 445 } 446 if m.Channel, err = d.DecodeString(); err != nil { 447 return err 448 } 449 if m.Offset, err = d.DecodeUint64(); err != nil { 450 return err 451 } 452 if m.Epoch, err = d.DecodeString(); err != nil { 453 return err 454 } 455 if data, err := d.DecodeString(); err != nil { 456 return err 457 } else { 458 m.Data = []byte(data) 459 } 460 if info, err := d.DecodeString(); err != nil { 461 return err 462 } else { 463 m.Info = []byte(info) 464 } 465 return nil 466} 467 468func (b *Broker) runPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { 469 logError := func(errString string) { 470 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart pub/sub", map[string]interface{}{"error": errString})) 471 } 472 473 u, err := uuid.NewRandom() 474 if err != nil { 475 logError(err.Error()) 476 return 477 } 478 connID := u.String() 479 480 conn, cancel, err := s.pubSubConn() 481 if err != nil { 482 logError(err.Error()) 483 return 484 } 485 defer cancel() 486 defer func() { _ = conn.Close() }() 487 488 // Register poller with unique ID. 489 result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) 490 if err != nil { 491 logError(err.Error()) 492 return 493 } 494 if result.Error != "" { 495 logError(result.Error) 496 return 497 } 498 499 numWorkers := runtime.NumCPU() 500 501 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool PUB/SUB, num workers: %d", numWorkers))) 502 defer func() { 503 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool PUB/SUB")) 504 }() 505 506 done := make(chan struct{}) 507 var doneOnce sync.Once 508 closeDoneOnce := func() { 509 doneOnce.Do(func() { 510 close(done) 511 _ = conn.Close() 512 }) 513 } 514 defer closeDoneOnce() 515 516 // Run subscriber goroutine. 517 go func(conn *tarantool.Connection) { 518 for { 519 select { 520 case <-done: 521 return 522 case r := <-s.subCh: 523 isSubscribe := r.subscribe 524 channelBatch := []subRequest{r} 525 526 chIDs := r.channels 527 528 var otherR *subRequest 529 530 loop: 531 for len(chIDs) < tarantoolSubscribeBatchLimit { 532 select { 533 case r := <-s.subCh: 534 if r.subscribe != isSubscribe { 535 // We can not mix subscribe and unsubscribe request into one batch 536 // so must stop here. As we consumed a subRequest value from channel 537 // we should take care of it later. 538 otherR = &r 539 break loop 540 } 541 channelBatch = append(channelBatch, r) 542 for _, ch := range r.channels { 543 chIDs = append(chIDs, ch) 544 } 545 default: 546 break loop 547 } 548 } 549 550 var opErr error 551 if isSubscribe { 552 _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) 553 opErr = err 554 } else { 555 _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) 556 opErr = err 557 } 558 559 if opErr != nil { 560 for _, r := range channelBatch { 561 r.done(opErr) 562 } 563 if otherR != nil { 564 otherR.done(opErr) 565 } 566 // Close conn, this should cause Receive to return with err below 567 // and whole runPubSub method to restart. 568 closeDoneOnce() 569 return 570 } 571 for _, r := range channelBatch { 572 r.done(nil) 573 } 574 if otherR != nil { 575 chIDs := otherR.channels 576 var opErr error 577 if otherR.subscribe { 578 _, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) 579 opErr = err 580 } else { 581 _, err = conn.Exec(tarantool.Call("centrifuge.unsubscribe", subscribeRequest{ConnID: connID, Channels: chIDs})) 582 opErr = err 583 } 584 if opErr != nil { 585 otherR.done(opErr) 586 // Close conn, this should cause Receive to return with err below 587 // and whole runPubSub method to restart. 588 closeDoneOnce() 589 return 590 } 591 otherR.done(nil) 592 } 593 } 594 } 595 }(conn) 596 597 // Run workers to spread received message processing work over worker goroutines. 598 workers := make(map[int]chan pubSubMessage) 599 for i := 0; i < numWorkers; i++ { 600 workerCh := make(chan pubSubMessage, tarantoolPubSubWorkerChannelSize) 601 workers[i] = workerCh 602 go func(ch chan pubSubMessage) { 603 for { 604 select { 605 case <-done: 606 return 607 case n := <-ch: 608 err := b.handleMessage(eventHandler, n) 609 if err != nil { 610 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling client message", map[string]interface{}{"error": err.Error()})) 611 continue 612 } 613 } 614 } 615 }(workerCh) 616 } 617 618 go func() { 619 var chIDs []string 620 621 channels := b.node.Hub().Channels() 622 for i := 0; i < len(channels); i++ { 623 if b.getShard(channels[i]) == s { 624 chIDs = append(chIDs, channels[i]) 625 } 626 } 627 628 batch := make([]string, 0) 629 630 for i, ch := range chIDs { 631 if len(batch) > 0 && i%tarantoolSubscribeBatchLimit == 0 { 632 r := newSubRequest(batch, true) 633 err := b.sendSubscribe(s, r) 634 if err != nil { 635 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()})) 636 closeDoneOnce() 637 return 638 } 639 batch = nil 640 } 641 batch = append(batch, ch) 642 } 643 if len(batch) > 0 { 644 r := newSubRequest(batch, true) 645 err := b.sendSubscribe(s, r) 646 if err != nil { 647 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error subscribing", map[string]interface{}{"error": err.Error()})) 648 closeDoneOnce() 649 return 650 } 651 } 652 }() 653 654 processPubSubMessages := func(messages []pubSubMessage) { 655 for _, msg := range messages { 656 // Add message to worker channel preserving message order - i.e. messages 657 // from the same channel will be processed in the same worker. 658 workers[index(msg.Channel, numWorkers)] <- msg 659 } 660 } 661 662 for { 663 err := b.waitPubSubMessages(conn, connID, processPubSubMessages) 664 if err != nil { 665 logError(err.Error()) 666 return 667 } 668 } 669} 670 671func (b *Broker) waitPubSubMessages(conn *tarantool.Connection, connID string, cb func([]pubSubMessage)) error { 672 if !b.config.UsePolling { 673 ctx, cancel := context.WithTimeout(context.Background(), time.Minute) 674 defer cancel() 675 _, err := conn.ExecContext(ctx, tarantool.Call( 676 "centrifuge.get_messages", 677 pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}, 678 ).WithPushTyped(func(decode func(interface{}) error) { 679 var m [][]pubSubMessage 680 if err := decode(&m); err != nil { 681 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding push", map[string]interface{}{"error": err.Error()})) 682 return 683 } 684 if len(m) == 1 { 685 cb(m[0]) 686 } 687 })) 688 if err != nil { 689 return err 690 } 691 } else { 692 var m [][]pubSubMessage 693 err := conn.ExecTyped(tarantool.Call( 694 "centrifuge.get_messages", 695 pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 25}), 696 &m, 697 ) 698 if err != nil { 699 return err 700 } 701 if len(m) == 1 { 702 cb(m[0]) 703 } 704 } 705 return nil 706} 707 708func (b *Broker) handleMessage(eventHandler centrifuge.BrokerEventHandler, msg pubSubMessage) error { 709 switch msg.Type { 710 case "p": 711 pub := ¢rifuge.Publication{ 712 Offset: msg.Offset, 713 Data: msg.Data, 714 } 715 if len(msg.Info) > 0 { 716 var info protocol.ClientInfo 717 err := info.UnmarshalVT(msg.Info) 718 if err == nil { 719 pub.Info = infoFromProto(&info) 720 } 721 } 722 _ = eventHandler.HandlePublication(msg.Channel, pub, centrifuge.StreamPosition{Offset: msg.Offset, Epoch: msg.Epoch}) 723 case "j": 724 var info protocol.ClientInfo 725 err := info.UnmarshalVT(msg.Info) 726 if err == nil { 727 _ = eventHandler.HandleJoin(msg.Channel, infoFromProto(&info)) 728 } 729 case "l": 730 var info protocol.ClientInfo 731 err := info.UnmarshalVT(msg.Info) 732 if err == nil { 733 _ = eventHandler.HandleLeave(msg.Channel, infoFromProto(&info)) 734 } 735 } 736 return nil 737} 738 739func (b *Broker) runControlPubSub(s *Shard, eventHandler centrifuge.BrokerEventHandler) { 740 logError := func(errString string) { 741 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "restart control pub/sub", map[string]interface{}{"error": errString})) 742 } 743 744 u, err := uuid.NewRandom() 745 if err != nil { 746 logError(err.Error()) 747 return 748 } 749 connID := u.String() 750 751 conn, cancel, err := s.pubSubConn() 752 if err != nil { 753 logError(err.Error()) 754 return 755 } 756 defer cancel() 757 defer func() { _ = conn.Close() }() 758 759 // Register poller with unique ID. 760 result, err := conn.Exec(tarantool.Call("centrifuge.get_messages", pollRequest{ConnID: connID, UsePolling: b.config.UsePolling, Timeout: 0})) 761 if err != nil { 762 logError(err.Error()) 763 return 764 } 765 if result.Error != "" { 766 logError(result.Error) 767 return 768 } 769 770 numWorkers := runtime.NumCPU() 771 772 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, fmt.Sprintf("running Tarantool control PUB/SUB, num workers: %d", numWorkers))) 773 defer func() { 774 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelDebug, "stopping Tarantool control PUB/SUB")) 775 }() 776 777 done := make(chan struct{}) 778 var doneOnce sync.Once 779 closeDoneOnce := func() { 780 doneOnce.Do(func() { 781 close(done) 782 _ = conn.Close() 783 }) 784 } 785 defer closeDoneOnce() 786 787 // Run workers to spread message processing work over worker goroutines. 788 workCh := make(chan pubSubMessage) 789 for i := 0; i < numWorkers; i++ { 790 go func() { 791 for { 792 select { 793 case <-done: 794 return 795 case n := <-workCh: 796 err := eventHandler.HandleControl(n.Data) 797 if err != nil { 798 b.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error handling control message", map[string]interface{}{"error": err.Error()})) 799 continue 800 } 801 } 802 } 803 }() 804 } 805 806 controlChannel := b.controlChannel() 807 result, err = conn.Exec(tarantool.Call("centrifuge.subscribe", subscribeRequest{ConnID: connID, Channels: []string{controlChannel, b.nodeChannel}})) 808 if err != nil || result.Error != "" { 809 if err != nil { 810 logError(err.Error()) 811 } else { 812 logError(result.Error) 813 } 814 return 815 } 816 817 processPubSubMessages := func(messages []pubSubMessage) { 818 for _, msg := range messages { 819 workCh <- msg 820 } 821 } 822 823 for { 824 err := b.waitPubSubMessages(conn, connID, processPubSubMessages) 825 if err != nil { 826 logError(err.Error()) 827 return 828 } 829 } 830} 831