1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved. 2// See LICENSE.txt for license information. 3 4package remotecluster 5 6import ( 7 "encoding/json" 8 "fmt" 9 "net/http" 10 "net/http/httptest" 11 "sync" 12 "sync/atomic" 13 "testing" 14 15 "github.com/stretchr/testify/assert" 16 "github.com/stretchr/testify/require" 17 "github.com/wiggin77/merror" 18 19 "github.com/mattermost/mattermost-server/v6/model" 20) 21 22const ( 23 Recent = 60000 24) 25 26func TestPing(t *testing.T) { 27 disablePing = false 28 29 t.Run("No error", func(t *testing.T) { 30 var countWebReq int32 31 merr := merror.New() 32 33 wg := &sync.WaitGroup{} 34 wg.Add(NumRemotes) 35 36 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 37 defer wg.Done() 38 defer w.WriteHeader(200) 39 atomic.AddInt32(&countWebReq, 1) 40 41 var frame model.RemoteClusterFrame 42 err := json.NewDecoder(r.Body).Decode(&frame) 43 if err != nil { 44 merr.Append(err) 45 return 46 } 47 if len(frame.Msg.Payload) == 0 { 48 merr.Append(fmt.Errorf("Payload should not be empty; remote_id=%s", frame.RemoteId)) 49 return 50 } 51 52 var ping model.RemoteClusterPing 53 err = json.Unmarshal(frame.Msg.Payload, &ping) 54 if err != nil { 55 merr.Append(err) 56 return 57 } 58 if !checkRecent(ping.SentAt, Recent) { 59 merr.Append(fmt.Errorf("timestamp out of range, got %d", ping.SentAt)) 60 return 61 } 62 if ping.RecvAt != 0 { 63 merr.Append(fmt.Errorf("timestamp should be 0, got %d", ping.RecvAt)) 64 return 65 } 66 })) 67 defer ts.Close() 68 69 mockServer := newMockServer(makeRemoteClusters(NumRemotes, ts.URL)) 70 defer mockServer.Shutdown() 71 72 service, err := NewRemoteClusterService(mockServer) 73 require.NoError(t, err) 74 75 err = service.Start() 76 require.NoError(t, err) 77 defer service.Shutdown() 78 79 wg.Wait() 80 81 assert.NoError(t, merr.ErrorOrNil()) 82 83 assert.Equal(t, int32(NumRemotes), atomic.LoadInt32(&countWebReq)) 84 t.Log(fmt.Sprintf("%d web requests counted; %d expected", 85 atomic.LoadInt32(&countWebReq), NumRemotes)) 86 }) 87 88 t.Run("HTTP errors", func(t *testing.T) { 89 var countWebReq int32 90 merr := merror.New() 91 92 wg := &sync.WaitGroup{} 93 wg.Add(NumRemotes) 94 95 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 96 defer wg.Done() 97 atomic.AddInt32(&countWebReq, 1) 98 99 var frame model.RemoteClusterFrame 100 err := json.NewDecoder(r.Body).Decode(&frame) 101 if err != nil { 102 merr.Append(err) 103 } 104 var ping model.RemoteClusterPing 105 err = json.Unmarshal(frame.Msg.Payload, &ping) 106 if err != nil { 107 merr.Append(err) 108 } 109 if !checkRecent(ping.SentAt, Recent) { 110 merr.Append(fmt.Errorf("timestamp out of range, got %d", ping.SentAt)) 111 } 112 if ping.RecvAt != 0 { 113 merr.Append(fmt.Errorf("timestamp should be 0, got %d", ping.RecvAt)) 114 } 115 w.WriteHeader(500) 116 })) 117 defer ts.Close() 118 119 mockServer := newMockServer(makeRemoteClusters(NumRemotes, ts.URL)) 120 defer mockServer.Shutdown() 121 122 service, err := NewRemoteClusterService(mockServer) 123 require.NoError(t, err) 124 125 err = service.Start() 126 require.NoError(t, err) 127 defer service.Shutdown() 128 129 wg.Wait() 130 131 assert.NoError(t, merr.ErrorOrNil()) 132 133 assert.Equal(t, int32(NumRemotes), atomic.LoadInt32(&countWebReq)) 134 t.Log(fmt.Sprintf("%d web requests counted; %d expected", 135 atomic.LoadInt32(&countWebReq), NumRemotes)) 136 }) 137} 138 139func checkRecent(millis int64, within int64) bool { 140 now := model.GetMillis() 141 return millis > now-within && millis < now+within 142} 143