1package main
2
3import (
4	"context"
5	"io"
6	"log"
7	"net/http"
8	"os"
9	"os/signal"
10	"syscall"
11	"time"
12
13	_ "net/http/pprof"
14
15	"github.com/centrifugal/centrifuge"
16)
17
18func handleLog(e centrifuge.LogEntry) {
19	log.Printf("%s: %v", e.Message, e.Fields)
20}
21
22func authMiddleware(h http.Handler) http.Handler {
23	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
24		ctx := r.Context()
25		newCtx := centrifuge.SetCredentials(ctx, &centrifuge.Credentials{
26			UserID:   "42",
27			ExpireAt: time.Now().Unix() + 10,
28			Info:     []byte(`{"name": "Alexander"}`),
29		})
30		r = r.WithContext(newCtx)
31		h.ServeHTTP(w, r)
32	})
33}
34
35func waitExitSignal(n *centrifuge.Node) {
36	sigCh := make(chan os.Signal, 1)
37	done := make(chan bool, 1)
38	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
39	go func() {
40		<-sigCh
41		_ = n.Shutdown(context.Background())
42		done <- true
43	}()
44	<-done
45}
46
47func main() {
48	cfg := centrifuge.DefaultConfig
49
50	cfg.LogLevel = centrifuge.LogLevelDebug
51	cfg.LogHandler = handleLog
52
53	node, _ := centrifuge.New(cfg)
54
55	node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
56		return centrifuge.ConnectReply{
57			Data: []byte(`{}`),
58		}, nil
59	})
60
61	node.OnConnect(func(client *centrifuge.Client) {
62		transport := client.Transport()
63		log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol())
64
65		go func() {
66			err := client.Send([]byte("hello"))
67			if err != nil {
68				if err == io.EOF {
69					return
70				}
71				log.Fatalln(err.Error())
72			}
73		}()
74
75		client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) {
76			log.Printf("user %s connection is going to expire, refreshing", client.UserID())
77			cb(centrifuge.RefreshReply{
78				ExpireAt: time.Now().Unix() + 10,
79			}, nil)
80		})
81
82		client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
83			log.Printf("user %s subscribes on %s", client.UserID(), e.Channel)
84			cb(centrifuge.SubscribeReply{}, nil)
85		})
86
87		client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
88			log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel)
89		})
90
91		client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) {
92			log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data))
93			cb(centrifuge.PublishReply{}, nil)
94		})
95
96		client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) {
97			log.Printf("RPC from user: %s, data: %s", client.UserID(), string(e.Data))
98			cb(centrifuge.RPCReply{
99				Data: []byte(`{"year": "2020"}`),
100			}, nil)
101		})
102
103		client.OnMessage(func(e centrifuge.MessageEvent) {
104			log.Printf("message from user: %s, data: %s", client.UserID(), string(e.Data))
105		})
106
107		client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
108			log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect)
109		})
110	})
111
112	if err := node.Run(); err != nil {
113		log.Fatal(err)
114	}
115
116	http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{})))
117	http.Handle("/", http.FileServer(http.Dir("./")))
118
119	go func() {
120		if err := http.ListenAndServe(":8000", nil); err != nil {
121			log.Fatal(err)
122		}
123	}()
124
125	waitExitSignal(node)
126	log.Println("bye!")
127}
128