1package main 2 3import ( 4 "context" 5 "flag" 6 "log" 7 "net/http" 8 "os" 9 "os/signal" 10 "strconv" 11 "strings" 12 "syscall" 13 "time" 14 15 _ "net/http/pprof" 16 17 "github.com/centrifugal/centrifuge" 18 "github.com/centrifugal/centrifuge/_examples/custom_engine_tarantool/tntengine" 19) 20 21var ( 22 port = flag.Int("port", 8000, "Port to bind app to") 23 sharded = flag.Bool("sharded", false, "Start sharded example") 24 ha = flag.Bool("ha", false, "Start high availability example") 25 raft = flag.Bool("raft", false, "Using Raft-based replication") 26 user = flag.String("user", "guest", "Connection user") 27 password = flag.String("password", "", "Connection password") 28 addresses = flag.String("addresses", "", "Configure Tarantool addresses (by default we use hardcoded here)") 29) 30 31func handleLog(e centrifuge.LogEntry) { 32 log.Printf("[centrifuge] %s: %v", e.Message, e.Fields) 33} 34 35func authMiddleware(h http.Handler) http.Handler { 36 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 37 ctx := r.Context() 38 ctx = centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ 39 UserID: "42", 40 Info: []byte(`{"name": "Alexander"}`), 41 }) 42 r = r.WithContext(ctx) 43 h.ServeHTTP(w, r) 44 }) 45} 46 47func waitExitSignal(n *centrifuge.Node) { 48 sigCh := make(chan os.Signal, 1) 49 done := make(chan bool, 1) 50 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 51 go func() { 52 <-sigCh 53 _ = n.Shutdown(context.Background()) 54 done <- true 55 }() 56 <-done 57} 58 59func main() { 60 flag.Parse() 61 62 cfg := centrifuge.DefaultConfig 63 cfg.LogLevel = centrifuge.LogLevelDebug 64 cfg.LogHandler = handleLog 65 66 node, _ := centrifuge.New(cfg) 67 68 node.OnConnect(func(client *centrifuge.Client) { 69 transport := client.Transport() 70 log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) 71 72 client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { 73 log.Printf("user %s subscribes on %s", client.UserID(), e.Channel) 74 cb(centrifuge.SubscribeReply{ 75 Options: centrifuge.SubscribeOptions{ 76 Presence: true, 77 JoinLeave: true, 78 Recover: true, 79 }, 80 }, nil) 81 }) 82 83 client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { 84 log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel) 85 }) 86 87 client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { 88 log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data)) 89 cb(centrifuge.PublishReply{ 90 Options: centrifuge.PublishOptions{ 91 HistorySize: 10, 92 HistoryTTL: 10 * time.Minute, 93 }, 94 }, nil) 95 }) 96 97 client.OnPresence(func(e centrifuge.PresenceEvent, cb centrifuge.PresenceCallback) { 98 log.Printf("user %s calls presence on %s", client.UserID(), e.Channel) 99 if !client.IsSubscribed(e.Channel) { 100 cb(centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied) 101 return 102 } 103 cb(centrifuge.PresenceReply{}, nil) 104 }) 105 106 client.OnPresenceStats(func(e centrifuge.PresenceStatsEvent, cb centrifuge.PresenceStatsCallback) { 107 log.Printf("user %s calls presence stats on %s", client.UserID(), e.Channel) 108 if !client.IsSubscribed(e.Channel) { 109 cb(centrifuge.PresenceStatsReply{}, centrifuge.ErrorPermissionDenied) 110 return 111 } 112 cb(centrifuge.PresenceStatsReply{}, nil) 113 }) 114 115 client.OnDisconnect(func(e centrifuge.DisconnectEvent) { 116 log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) 117 }) 118 }) 119 120 // Single Tarantool. 121 mode := tntengine.ConnectionModeSingleInstance 122 shardAddresses := [][]string{ 123 {"127.0.0.1:3301"}, 124 } 125 126 if *ha { 127 if *raft { 128 // Single Tarantool RS with automatic leader election with Raft (Tarantool >= 2.7.0). 129 shardAddresses = [][]string{ 130 {"127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"}, 131 } 132 mode = tntengine.ConnectionModeLeaderFollowerRaft 133 } else { 134 // Single Tarantool RS with automatic leader election (ex. in Cartridge). 135 shardAddresses = [][]string{ 136 {"127.0.0.1:3301", "127.0.0.1:3302"}, 137 } 138 mode = tntengine.ConnectionModeLeaderFollower 139 } 140 } else if *sharded { 141 // Client-side sharding between two Tarantool instances (without HA). 142 shardAddresses = [][]string{ 143 {"127.0.0.1:3301"}, 144 {"127.0.0.1:3302"}, 145 } 146 } 147 148 if *addresses != "" { 149 var customShardAddresses [][]string 150 shardParts := strings.Split(*addresses, " ") 151 for _, shardPart := range shardParts { 152 customShardAddresses = append(customShardAddresses, strings.Split(shardPart, ",")) 153 } 154 shardAddresses = customShardAddresses 155 } 156 157 var shards []*tntengine.Shard 158 for _, addresses := range shardAddresses { 159 shard, err := tntengine.NewShard(tntengine.ShardConfig{ 160 Addresses: addresses, 161 User: *user, 162 Password: *password, 163 ConnectionMode: mode, 164 }) 165 if err != nil { 166 log.Fatal(err) 167 } 168 shards = append(shards, shard) 169 } 170 171 broker, err := tntengine.NewBroker(node, tntengine.BrokerConfig{ 172 UsePolling: false, 173 Shards: shards, 174 }) 175 if err != nil { 176 log.Fatal(err) 177 } 178 node.SetBroker(broker) 179 180 presenceManager, err := tntengine.NewPresenceManager(node, tntengine.PresenceManagerConfig{ 181 Shards: shards, 182 }) 183 if err != nil { 184 log.Fatal(err) 185 } 186 node.SetPresenceManager(presenceManager) 187 188 if err := node.Run(); err != nil { 189 log.Fatal(err) 190 } 191 192 http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) 193 http.Handle("/", http.FileServer(http.Dir("./"))) 194 195 go func() { 196 if err := http.ListenAndServe(":"+strconv.Itoa(*port), nil); err != nil { 197 log.Fatal(err) 198 } 199 }() 200 201 waitExitSignal(node) 202 log.Println("bye!") 203} 204