1// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
2// See LICENSE.txt for license information.
3
4package remotecluster
5
6import (
7	"encoding/json"
8	"fmt"
9	"net/http"
10	"net/http/httptest"
11	"sync"
12	"sync/atomic"
13	"testing"
14
15	"github.com/stretchr/testify/assert"
16	"github.com/stretchr/testify/require"
17	"github.com/wiggin77/merror"
18
19	"github.com/mattermost/mattermost-server/v6/model"
20)
21
22const (
23	Recent = 60000
24)
25
26func TestPing(t *testing.T) {
27	disablePing = false
28
29	t.Run("No error", func(t *testing.T) {
30		var countWebReq int32
31		merr := merror.New()
32
33		wg := &sync.WaitGroup{}
34		wg.Add(NumRemotes)
35
36		ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
37			defer wg.Done()
38			defer w.WriteHeader(200)
39			atomic.AddInt32(&countWebReq, 1)
40
41			var frame model.RemoteClusterFrame
42			err := json.NewDecoder(r.Body).Decode(&frame)
43			if err != nil {
44				merr.Append(err)
45				return
46			}
47			if len(frame.Msg.Payload) == 0 {
48				merr.Append(fmt.Errorf("Payload should not be empty; remote_id=%s", frame.RemoteId))
49				return
50			}
51
52			var ping model.RemoteClusterPing
53			err = json.Unmarshal(frame.Msg.Payload, &ping)
54			if err != nil {
55				merr.Append(err)
56				return
57			}
58			if !checkRecent(ping.SentAt, Recent) {
59				merr.Append(fmt.Errorf("timestamp out of range, got %d", ping.SentAt))
60				return
61			}
62			if ping.RecvAt != 0 {
63				merr.Append(fmt.Errorf("timestamp should be 0, got %d", ping.RecvAt))
64				return
65			}
66		}))
67		defer ts.Close()
68
69		mockServer := newMockServer(makeRemoteClusters(NumRemotes, ts.URL))
70		defer mockServer.Shutdown()
71
72		service, err := NewRemoteClusterService(mockServer)
73		require.NoError(t, err)
74
75		err = service.Start()
76		require.NoError(t, err)
77		defer service.Shutdown()
78
79		wg.Wait()
80
81		assert.NoError(t, merr.ErrorOrNil())
82
83		assert.Equal(t, int32(NumRemotes), atomic.LoadInt32(&countWebReq))
84		t.Log(fmt.Sprintf("%d web requests counted;  %d expected",
85			atomic.LoadInt32(&countWebReq), NumRemotes))
86	})
87
88	t.Run("HTTP errors", func(t *testing.T) {
89		var countWebReq int32
90		merr := merror.New()
91
92		wg := &sync.WaitGroup{}
93		wg.Add(NumRemotes)
94
95		ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
96			defer wg.Done()
97			atomic.AddInt32(&countWebReq, 1)
98
99			var frame model.RemoteClusterFrame
100			err := json.NewDecoder(r.Body).Decode(&frame)
101			if err != nil {
102				merr.Append(err)
103			}
104			var ping model.RemoteClusterPing
105			err = json.Unmarshal(frame.Msg.Payload, &ping)
106			if err != nil {
107				merr.Append(err)
108			}
109			if !checkRecent(ping.SentAt, Recent) {
110				merr.Append(fmt.Errorf("timestamp out of range, got %d", ping.SentAt))
111			}
112			if ping.RecvAt != 0 {
113				merr.Append(fmt.Errorf("timestamp should be 0, got %d", ping.RecvAt))
114			}
115			w.WriteHeader(500)
116		}))
117		defer ts.Close()
118
119		mockServer := newMockServer(makeRemoteClusters(NumRemotes, ts.URL))
120		defer mockServer.Shutdown()
121
122		service, err := NewRemoteClusterService(mockServer)
123		require.NoError(t, err)
124
125		err = service.Start()
126		require.NoError(t, err)
127		defer service.Shutdown()
128
129		wg.Wait()
130
131		assert.NoError(t, merr.ErrorOrNil())
132
133		assert.Equal(t, int32(NumRemotes), atomic.LoadInt32(&countWebReq))
134		t.Log(fmt.Sprintf("%d web requests counted;  %d expected",
135			atomic.LoadInt32(&countWebReq), NumRemotes))
136	})
137}
138
139func checkRecent(millis int64, within int64) bool {
140	now := model.GetMillis()
141	return millis > now-within && millis < now+within
142}
143