1package tunnel
2
3import (
4	"errors"
5	"github.com/golang/mock/gomock"
6	. "github.com/smartystreets/goconvey/convey"
7	"go.uber.org/zap"
8	"sync/atomic"
9	"testing"
10	"time"
11	"github.com/aliyun/aliyun-tablestore-go-sdk/tunnel/protocol"
12)
13
14var (
15	cid = "test-channel-id-abc"
16
17	testLogConfig = zap.Config{
18		Level:       zap.NewAtomicLevelAt(zap.WarnLevel),
19		Development: false,
20		Sampling: &zap.SamplingConfig{
21			Initial:    100,
22			Thereafter: 100,
23		},
24		Encoding:      "json",
25		EncoderConfig: zap.NewProductionEncoderConfig(),
26
27		OutputPaths:      []string{"ut.log"},
28		ErrorOutputPaths: []string{"ut.log"},
29	}
30)
31
32func TestFailConn_NotifyStatus(t *testing.T) {
33	cases := []struct {
34		desc        string
35		originState *protocol.Channel
36		updateState *protocol.Channel
37		expectState *protocol.Channel
38	}{
39		{"nil to open", nil, newChannel(0, protocol.ChannelStatus_OPEN),
40			newChannel(1, protocol.ChannelStatus_CLOSE)},
41		{"nil to closing", nil, newChannel(0, protocol.ChannelStatus_CLOSING),
42			newChannel(1, protocol.ChannelStatus_CLOSE)},
43		{"nil to closed", nil, newChannel(0, protocol.ChannelStatus_CLOSE),
44			newChannel(0, protocol.ChannelStatus_CLOSE)},
45		{"nil to terminated", nil, newChannel(0, protocol.ChannelStatus_TERMINATED),
46			newChannel(0, protocol.ChannelStatus_TERMINATED)},
47		{"close to closing", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_CLOSING),
48			newChannel(2, protocol.ChannelStatus_CLOSE)},
49		{"close to open", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_OPEN),
50			newChannel(2, protocol.ChannelStatus_CLOSE)},
51		{"close to close new version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(2, protocol.ChannelStatus_CLOSE),
52			newChannel(2, protocol.ChannelStatus_CLOSE)},
53		{"close to close same version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_CLOSE),
54			newChannel(1, protocol.ChannelStatus_CLOSE)},
55		{"close to terminated", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(2, protocol.ChannelStatus_TERMINATED),
56			newChannel(2, protocol.ChannelStatus_TERMINATED)},
57		{"new version to old version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(0, protocol.ChannelStatus_CLOSING),
58			newChannel(1, protocol.ChannelStatus_CLOSE)},
59	}
60
61	Convey("failConn notifyStatus with an empty state machine", t, func() {
62		lg, _ := testLogConfig.Build()
63		state := NewTunnelStateMachine("", "", nil, nil, nil, lg)
64
65		for _, test := range cases {
66			Convey("NotifyStatus case:"+test.desc, func() {
67				failConn := &failConn{
68					state:        state,
69					currentState: ToChannelStatus(test.originState),
70				}
71				failConn.NotifyStatus(ToChannelStatus(test.updateState))
72				So(failConn.currentState.ToPbChannel(), ShouldResemble, test.expectState)
73				failConn.Close()
74			})
75		}
76	})
77}
78
79func TestFailConn_Closed(t *testing.T) {
80	Convey("init failConn Closed return false", t, func() {
81		conn := new(failConn)
82		So(conn.Closed(), ShouldBeFalse)
83	})
84
85	c := []*protocol.Channel{
86		newChannel(1, protocol.ChannelStatus_CLOSING),
87		newChannel(1, protocol.ChannelStatus_CLOSE),
88		newChannel(1, protocol.ChannelStatus_OPEN),
89		newChannel(1, protocol.ChannelStatus_TERMINATED),
90	}
91	Convey("failConn Closed return true", t, func() {
92		lg, _ := testLogConfig.Build()
93		state := NewTunnelStateMachine("", "", nil, nil, nil, lg)
94		for _, channel := range c {
95			conn := &failConn{state: state}
96			conn.NotifyStatus(ToChannelStatus(channel))
97			So(conn.Closed(), ShouldBeTrue)
98		}
99	})
100}
101
102func TestChannelConn_NotifyStatus(t *testing.T) {
103	//speed up cases
104	oldValue := tickMaxInterval
105	tickMaxInterval = time.Second
106	defer func() {
107		tickMaxInterval = oldValue
108	}()
109	mockCtrl := gomock.NewController(t)
110	defer mockCtrl.Finish()
111	lg, _ := testLogConfig.Build()
112	state := NewTunnelStateMachine("", "", nil, nil, nil, lg)
113	bypassApi := NewMocktunnelDataApi(mockCtrl)
114	bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes()
115
116	Convey("nil state channel notify status", t, func() {
117		cases := []struct {
118			updateState    *protocol.Channel
119			expectState    *protocol.Channel
120			expectChStatus int32
121		}{
122			{newChannel(0, protocol.ChannelStatus_CLOSE),
123				newChannel(0, protocol.ChannelStatus_CLOSE), closedStatus},
124			{newChannel(0, protocol.ChannelStatus_CLOSING),
125				newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus},
126			{newChannel(0, protocol.ChannelStatus_OPEN),
127				newChannel(0, protocol.ChannelStatus_OPEN), runningStatus},
128			{newChannel(0, protocol.ChannelStatus_TERMINATED),
129				newChannel(0, protocol.ChannelStatus_TERMINATED), closedStatus},
130		}
131		dialer := &channelDialer{
132			api: bypassApi,
133			lg:  lg,
134		}
135
136		for _, test := range cases {
137			Convey("from nil to status:"+test.updateState.String(), func() {
138				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state)
139				conn.NotifyStatus(ToChannelStatus(test.updateState))
140				cconn := conn.(*channelConn)
141				So(cconn.getState(), ShouldResemble, test.expectState)
142				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus)
143				cconn.Close()
144			})
145		}
146	})
147
148	Convey("closed state channel notify status", t, func() {
149		closedState := newChannel(1, protocol.ChannelStatus_CLOSE)
150		cases := []struct {
151			updateState    *protocol.Channel
152			expectState    *protocol.Channel
153			expectChStatus int32
154		}{
155			{newChannel(1, protocol.ChannelStatus_CLOSE),
156				newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus},
157			{newChannel(1, protocol.ChannelStatus_CLOSING),
158				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
159			{newChannel(1, protocol.ChannelStatus_OPEN),
160				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
161			{newChannel(1, protocol.ChannelStatus_TERMINATED),
162				newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus},
163			{newChannel(0, protocol.ChannelStatus_CLOSING),
164				newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus},
165		}
166		dialer := &channelDialer{
167			api: nil,
168			lg:  lg,
169		}
170
171		for _, test := range cases {
172			Convey("from closed to status:"+test.updateState.String(), func() {
173				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state)
174				conn.NotifyStatus(ToChannelStatus(closedState))
175
176				conn.NotifyStatus(ToChannelStatus(test.updateState))
177				cconn := conn.(*channelConn)
178				So(cconn.getState(), ShouldResemble, test.expectState)
179				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus)
180				conn.Close()
181			})
182		}
183	})
184
185	Convey("closing state channel notify status", t, func() {
186		openState := newChannel(0, protocol.ChannelStatus_OPEN)
187		closingState := newChannel(1, protocol.ChannelStatus_CLOSING)
188		cases := []struct {
189			updateState      *protocol.Channel
190			stateAfterCheck  *protocol.Channel
191			statusAfterCheck int32
192		}{
193			{newChannel(1, protocol.ChannelStatus_CLOSE),
194				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
195			{newChannel(1, protocol.ChannelStatus_CLOSING),
196				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
197			{newChannel(1, protocol.ChannelStatus_OPEN),
198				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
199			{newChannel(1, protocol.ChannelStatus_TERMINATED),
200				newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus},
201			{newChannel(0, protocol.ChannelStatus_CLOSING),
202				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
203		}
204		dialer := &channelDialer{
205			api: bypassApi,
206			lg:  lg,
207		}
208
209		for _, test := range cases {
210			Convey("from closing to status:"+test.updateState.String(), func() {
211				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(10*time.Millisecond)), state)
212				conn.NotifyStatus(ToChannelStatus(openState))
213				time.Sleep(10 * time.Millisecond)
214				conn.NotifyStatus(ToChannelStatus(closingState))
215
216				conn.NotifyStatus(ToChannelStatus(test.updateState))
217				channelGrace()
218				cconn := conn.(*channelConn)
219				cconn.checkUpdateStatus()
220				So(cconn.getState(), ShouldResemble, test.stateAfterCheck)
221				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.statusAfterCheck)
222				conn.Close()
223			})
224		}
225	})
226
227	Convey("open state channel notify status", t, func() {
228		openState := newChannel(1, protocol.ChannelStatus_OPEN)
229		cases := []struct {
230			updateState    *protocol.Channel
231			expectState    *protocol.Channel
232			expectChStatus int32
233			finalState     *protocol.Channel
234			finalChStatus  int32
235		}{
236			{newChannel(1, protocol.ChannelStatus_CLOSE),
237				newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus,
238				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
239			{newChannel(1, protocol.ChannelStatus_CLOSING),
240				newChannel(1, protocol.ChannelStatus_CLOSING), closingStatus,
241				newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus},
242			{newChannel(1, protocol.ChannelStatus_OPEN),
243				newChannel(1, protocol.ChannelStatus_OPEN), runningStatus,
244				newChannel(1, protocol.ChannelStatus_OPEN), runningStatus},
245			{newChannel(1, protocol.ChannelStatus_TERMINATED),
246				newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus,
247				newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus},
248			{newChannel(0, protocol.ChannelStatus_CLOSING),
249				newChannel(1, protocol.ChannelStatus_OPEN), runningStatus,
250				newChannel(1, protocol.ChannelStatus_OPEN), runningStatus},
251		}
252		dialer := &channelDialer{
253			api: bypassApi,
254			lg:  lg,
255		}
256
257		for _, test := range cases {
258			Convey("from open to status:"+test.updateState.String(), func() {
259				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(100*time.Millisecond)), state)
260				conn.NotifyStatus(ToChannelStatus(openState))
261				time.Sleep(10 * time.Millisecond)
262				conn.NotifyStatus(ToChannelStatus(test.updateState))
263				cconn := conn.(*channelConn)
264				So(cconn.getState(), ShouldResemble, test.expectState)
265				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus)
266				channelGrace()
267				cconn.checkUpdateStatus()
268				So(cconn.getState(), ShouldResemble, test.finalState)
269				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.finalChStatus)
270				conn.Close()
271			})
272		}
273	})
274
275	Convey("terminated state channel notify status", t, func() {
276		termState := newChannel(1, protocol.ChannelStatus_TERMINATED)
277		cases := []struct {
278			updateState    *protocol.Channel
279			expectState    *protocol.Channel
280			expectChStatus int32
281		}{
282			{newChannel(1, protocol.ChannelStatus_CLOSE),
283				newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus},
284			{newChannel(1, protocol.ChannelStatus_CLOSING),
285				newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus},
286			{newChannel(1, protocol.ChannelStatus_OPEN),
287				newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus},
288			{newChannel(1, protocol.ChannelStatus_TERMINATED),
289				newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus},
290			{newChannel(0, protocol.ChannelStatus_CLOSING),
291				newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus},
292		}
293		dialer := &channelDialer{
294			api: nil,
295			lg:  lg,
296		}
297
298		for _, test := range cases {
299			Convey("from closed to status:"+test.updateState.String(), func() {
300				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state)
301				conn.NotifyStatus(ToChannelStatus(termState))
302				conn.NotifyStatus(ToChannelStatus(test.updateState))
303				cconn := conn.(*channelConn)
304				So(cconn.getState(), ShouldResemble, test.expectState)
305				So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus)
306				conn.Close()
307			})
308		}
309	})
310}
311
312func TestChannelConn_NotifyStatus_ProcessRecords(t *testing.T) {
313	mockCtrl := gomock.NewController(t)
314	defer mockCtrl.Finish()
315	lg, _ := testLogConfig.Build()
316	state := NewTunnelStateMachine("", "", nil, nil, nil, lg)
317	bypassApi := NewMocktunnelDataApi(mockCtrl)
318	bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes()
319
320	failApi := NewMocktunnelDataApi(mockCtrl)
321	failApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, errors.New("abc")).Times(1)
322
323	finishApi := NewMocktunnelDataApi(mockCtrl)
324	finishApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, FinishTag, "traceId", 0, nil).Times(1)
325
326	cases := []struct {
327		desc        string
328		api         tunnelDataApi
329		processor   ChannelProcessor
330		expectState *protocol.Channel
331	}{
332		{"tunnel read records failed", failApi, newTestProcessor(time.Duration(0)),
333			newChannel(1, protocol.ChannelStatus_CLOSE)},
334		{"tunnel read records finished", finishApi,
335			newTestProcessor(time.Duration(0)), newChannel(1, protocol.ChannelStatus_TERMINATED)},
336		{"tunnel processor process records failed", bypassApi, new(failProcessor),
337			newChannel(1, protocol.ChannelStatus_CLOSE)},
338	}
339
340	Convey("open channel read records", t, func() {
341		for _, test := range cases {
342			Convey("read records case: "+test.desc, func() {
343				openState := newChannel(0, protocol.ChannelStatus_OPEN)
344				dialer := &channelDialer{
345					api: test.api,
346					lg:  lg,
347				}
348				conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", test.processor, state)
349				conn.NotifyStatus(ToChannelStatus(openState))
350
351				time.Sleep(30 * time.Millisecond)
352				conn.NotifyStatus(ToChannelStatus(openState))
353				cconn := conn.(*channelConn)
354				So(cconn.getState(), ShouldResemble, test.expectState)
355				conn.Close()
356			})
357		}
358	})
359}
360
361func TestChannelConn_Close(t *testing.T) {
362	mockCtrl := gomock.NewController(t)
363	defer mockCtrl.Finish()
364	lg, _ := testLogConfig.Build()
365	state := NewTunnelStateMachine("", "", nil, nil, nil, lg)
366	bypassApi := NewMocktunnelDataApi(mockCtrl)
367	bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes()
368
369	Convey("open tunnel is closed", t, func() {
370		openState := newChannel(0, protocol.ChannelStatus_OPEN)
371		dialer := &channelDialer{
372			api: bypassApi,
373			lg:  lg,
374		}
375		conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state)
376		conn.NotifyStatus(ToChannelStatus(openState))
377
378		time.Sleep(time.Second)
379		conn.Close()
380		So(conn.Closed(), ShouldBeTrue)
381	})
382}
383
384func newChannel(v int64, status protocol.ChannelStatus) *protocol.Channel {
385	return &protocol.Channel{
386		ChannelId: &cid,
387		Version:   &v,
388		Status:    status.Enum(),
389	}
390}
391
392type testProcessor struct {
393	sleepDur time.Duration
394}
395
396func newTestProcessor(dur time.Duration) *testProcessor {
397	return &testProcessor{dur}
398}
399
400func (p *testProcessor) Process(records []*Record, nextToken, traceId string) error {
401	return nil
402}
403
404func (p *testProcessor) Shutdown() {
405	time.Sleep(p.sleepDur)
406}
407
408type failProcessor struct{}
409
410func (p *failProcessor) Process(records []*Record, nextToken, traceId string) error {
411	return errors.New("failed")
412}
413
414func (p *failProcessor) Shutdown() {}
415
416func (c *channelConn) getState() *protocol.Channel {
417	c.mu.Lock()
418	defer c.mu.Unlock()
419	return c.currentState.ToPbChannel()
420}
421
422func channelGrace() {
423	time.Sleep(2 * time.Second)
424}
425