1package main 2 3import ( 4 "context" 5 "io" 6 "log" 7 "net/http" 8 "os" 9 "os/signal" 10 "syscall" 11 "time" 12 13 _ "net/http/pprof" 14 15 "github.com/centrifugal/centrifuge" 16) 17 18func handleLog(e centrifuge.LogEntry) { 19 log.Printf("%s: %v", e.Message, e.Fields) 20} 21 22func authMiddleware(h http.Handler) http.Handler { 23 return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 24 ctx := r.Context() 25 newCtx := centrifuge.SetCredentials(ctx, ¢rifuge.Credentials{ 26 UserID: "42", 27 ExpireAt: time.Now().Unix() + 10, 28 Info: []byte(`{"name": "Alexander"}`), 29 }) 30 r = r.WithContext(newCtx) 31 h.ServeHTTP(w, r) 32 }) 33} 34 35func waitExitSignal(n *centrifuge.Node) { 36 sigCh := make(chan os.Signal, 1) 37 done := make(chan bool, 1) 38 signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) 39 go func() { 40 <-sigCh 41 _ = n.Shutdown(context.Background()) 42 done <- true 43 }() 44 <-done 45} 46 47func main() { 48 cfg := centrifuge.DefaultConfig 49 50 cfg.LogLevel = centrifuge.LogLevelDebug 51 cfg.LogHandler = handleLog 52 53 node, _ := centrifuge.New(cfg) 54 55 node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) { 56 return centrifuge.ConnectReply{ 57 Data: []byte(`{}`), 58 }, nil 59 }) 60 61 node.OnConnect(func(client *centrifuge.Client) { 62 transport := client.Transport() 63 log.Printf("user %s connected via %s with protocol: %s", client.UserID(), transport.Name(), transport.Protocol()) 64 65 go func() { 66 err := client.Send([]byte("hello")) 67 if err != nil { 68 if err == io.EOF { 69 return 70 } 71 log.Fatalln(err.Error()) 72 } 73 }() 74 75 client.OnRefresh(func(e centrifuge.RefreshEvent, cb centrifuge.RefreshCallback) { 76 log.Printf("user %s connection is going to expire, refreshing", client.UserID()) 77 cb(centrifuge.RefreshReply{ 78 ExpireAt: time.Now().Unix() + 10, 79 }, nil) 80 }) 81 82 client.OnSubscribe(func(e centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { 83 log.Printf("user %s subscribes on %s", client.UserID(), e.Channel) 84 cb(centrifuge.SubscribeReply{}, nil) 85 }) 86 87 client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { 88 log.Printf("user %s unsubscribed from %s", client.UserID(), e.Channel) 89 }) 90 91 client.OnPublish(func(e centrifuge.PublishEvent, cb centrifuge.PublishCallback) { 92 log.Printf("user %s publishes into channel %s: %s", client.UserID(), e.Channel, string(e.Data)) 93 cb(centrifuge.PublishReply{}, nil) 94 }) 95 96 client.OnRPC(func(e centrifuge.RPCEvent, cb centrifuge.RPCCallback) { 97 log.Printf("RPC from user: %s, data: %s", client.UserID(), string(e.Data)) 98 cb(centrifuge.RPCReply{ 99 Data: []byte(`{"year": "2020"}`), 100 }, nil) 101 }) 102 103 client.OnMessage(func(e centrifuge.MessageEvent) { 104 log.Printf("message from user: %s, data: %s", client.UserID(), string(e.Data)) 105 }) 106 107 client.OnDisconnect(func(e centrifuge.DisconnectEvent) { 108 log.Printf("user %s disconnected, disconnect: %s", client.UserID(), e.Disconnect) 109 }) 110 }) 111 112 if err := node.Run(); err != nil { 113 log.Fatal(err) 114 } 115 116 http.Handle("/connection/websocket", authMiddleware(centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{}))) 117 http.Handle("/", http.FileServer(http.Dir("./"))) 118 119 go func() { 120 if err := http.ListenAndServe(":8000", nil); err != nil { 121 log.Fatal(err) 122 } 123 }() 124 125 waitExitSignal(node) 126 log.Println("bye!") 127} 128