1package main
2
3import (
4	"context"
5	"flag"
6	"log"
7	"net/http"
8	"os"
9	"os/signal"
10	"strconv"
11	"strings"
12	"syscall"
13	"time"
14
15	_ "net/http/pprof"
16
17	"github.com/centrifugal/centrifuge"
18	"github.com/centrifugal/centrifuge/_examples/custom_engine_tarantool/tntengine"
19)
20
21var (
22	port      = flag.Int("port", 8000, "Port to bind app to")
23	sharded   = flag.Bool("sharded", false, "Start sharded example")
24	ha        = flag.Bool("ha", false, "Start high availability example")
25	raft      = flag.Bool("raft", false, "Using Raft-based replication")
26	user      = flag.String("user", "guest", "Connection user")
27	password  = flag.String("password", "", "Connection password")
28	addresses = flag.String("addresses", "", "Configure Tarantool addresses (by default we use hardcoded here)")
29)
30
31func handleLog(e centrifuge.LogEntry) {
32	log.Printf("[centrifuge] %s: %v", e.Message, e.Fields)
33}
34
35func authMiddleware(h http.Handler) http.Handler {
36	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37		ctx := r.Context()
38		ctx = centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
39			UserID: "42",
40			Info:   []byte(`{"name": "Alexander"}`),
41		})
42		r = r.WithContext(ctx)
43		h.ServeHTTP(w, r)
44	})
45}
46
47func waitExitSignal(n *centrifuge.Node) {
48	sigCh := make(chan os.Signal, 1)
49	done := make(chan bool, 1)
50	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
51	go func() {
52		<-sigCh
53		_ = n.Shutdown(context.Background())
54		done <- true
55	}()
56	<-done
57}
58
59func main() {
60	flag.Parse()
61
62	cfg := centrifuge.DefaultConfig
63	cfg.LogLevel = centrifuge.LogLevelDebug
64	cfg.LogHandler = handleLog
65
66	node, _ := centrifuge.New(cfg)
67
68	node.OnConnect(func(client *centrifuge.Client) {
69		transport := client.Transport()
70		log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())
71
72		client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
73			log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
74			cb(centrifuge.SubscribeReply{
75				Options: centrifuge.SubscribeOptions{
76					Presence:  true,
77					JoinLeave: true,
78					Recover:   true,
79				},
80			}, nil)
81		})
82
83		client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
84			log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
85		})
86
87		client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
88			log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))
89			cb(centrifuge.PublishReply{
90				Options: centrifuge.PublishOptions{
91					HistorySize: 10,
92					HistoryTTL:  10 * time.Minute,
93				},
94			}, nil)
95		})
96
97		client.OnPresence(func(e centrifuge.PresenceEvent, cb centrifuge.PresenceCallback) {
98			log.Printf("user %s calls presence on %s", client.UserID(), e.Channel)
99			if !client.IsSubscribed(e.Channel) {
100				cb(centrifuge.PresenceReply{}, centrifuge.ErrorPermissionDenied)
101				return
102			}
103			cb(centrifuge.PresenceReply{}, nil)
104		})
105
106		client.OnPresenceStats(func(e centrifuge.PresenceStatsEvent, cb centrifuge.PresenceStatsCallback) {
107			log.Printf("user %s calls presence stats on %s", client.UserID(), e.Channel)
108			if !client.IsSubscribed(e.Channel) {
109				cb(centrifuge.PresenceStatsReply{}, centrifuge.ErrorPermissionDenied)
110				return
111			}
112			cb(centrifuge.PresenceStatsReply{}, nil)
113		})
114
115		client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
116			log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
117		})
118	})
119
120	// Single Tarantool.
121	mode := tntengine.ConnectionModeSingleInstance
122	shardAddresses := [][]string{
123		{"127.0.0.1:3301"},
124	}
125
126	if *ha {
127		if *raft {
128			// Single Tarantool RS with automatic leader election with Raft (Tarantool >= 2.7.0).
129			shardAddresses = [][]string{
130				{"127.0.0.1:3301", "127.0.0.1:3302", "127.0.0.1:3303"},
131			}
132			mode = tntengine.ConnectionModeLeaderFollowerRaft
133		} else {
134			// Single Tarantool RS with automatic leader election (ex. in Cartridge).
135			shardAddresses = [][]string{
136				{"127.0.0.1:3301", "127.0.0.1:3302"},
137			}
138			mode = tntengine.ConnectionModeLeaderFollower
139		}
140	} else if *sharded {
141		// Client-side sharding between two Tarantool instances (without HA).
142		shardAddresses = [][]string{
143			{"127.0.0.1:3301"},
144			{"127.0.0.1:3302"},
145		}
146	}
147
148	if *addresses != "" {
149		var customShardAddresses [][]string
150		shardParts := strings.Split(*addresses, " ")
151		for _, shardPart := range shardParts {
152			customShardAddresses = append(customShardAddresses, strings.Split(shardPart, ","))
153		}
154		shardAddresses = customShardAddresses
155	}
156
157	var shards []*tntengine.Shard
158	for _, addresses := range shardAddresses {
159		shard, err := tntengine.NewShard(tntengine.ShardConfig{
160			Addresses:      addresses,
161			User:           *user,
162			Password:       *password,
163			ConnectionMode: mode,
164		})
165		if err != nil {
166			log.Fatal(err)
167		}
168		shards = append(shards, shard)
169	}
170
171	broker, err := tntengine.NewBroker(node, tntengine.BrokerConfig{
172		UsePolling: false,
173		Shards:     shards,
174	})
175	if err != nil {
176		log.Fatal(err)
177	}
178	node.SetBroker(broker)
179
180	presenceManager, err := tntengine.NewPresenceManager(node, tntengine.PresenceManagerConfig{
181		Shards: shards,
182	})
183	if err != nil {
184		log.Fatal(err)
185	}
186	node.SetPresenceManager(presenceManager)
187
188	if err := node.Run(); err != nil {
189		log.Fatal(err)
190	}
191
192	http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
193	http.Handle("/", http.FileServer(http.Dir("./")))
194
195	go func() {
196		if err := http.ListenAndServe(":"+strconv.Itoa(*port), nil); err != nil {
197			log.Fatal(err)
198		}
199	}()
200
201	waitExitSignal(node)
202	log.Println("bye!")
203}
204