1package tunnel 2 3import ( 4 "errors" 5 "github.com/golang/mock/gomock" 6 . "github.com/smartystreets/goconvey/convey" 7 "go.uber.org/zap" 8 "sync/atomic" 9 "testing" 10 "time" 11 "github.com/aliyun/aliyun-tablestore-go-sdk/tunnel/protocol" 12) 13 14var ( 15 cid = "test-channel-id-abc" 16 17 testLogConfig = zap.Config{ 18 Level: zap.NewAtomicLevelAt(zap.WarnLevel), 19 Development: false, 20 Sampling: &zap.SamplingConfig{ 21 Initial: 100, 22 Thereafter: 100, 23 }, 24 Encoding: "json", 25 EncoderConfig: zap.NewProductionEncoderConfig(), 26 27 OutputPaths: []string{"ut.log"}, 28 ErrorOutputPaths: []string{"ut.log"}, 29 } 30) 31 32func TestFailConn_NotifyStatus(t *testing.T) { 33 cases := []struct { 34 desc string 35 originState *protocol.Channel 36 updateState *protocol.Channel 37 expectState *protocol.Channel 38 }{ 39 {"nil to open", nil, newChannel(0, protocol.ChannelStatus_OPEN), 40 newChannel(1, protocol.ChannelStatus_CLOSE)}, 41 {"nil to closing", nil, newChannel(0, protocol.ChannelStatus_CLOSING), 42 newChannel(1, protocol.ChannelStatus_CLOSE)}, 43 {"nil to closed", nil, newChannel(0, protocol.ChannelStatus_CLOSE), 44 newChannel(0, protocol.ChannelStatus_CLOSE)}, 45 {"nil to terminated", nil, newChannel(0, protocol.ChannelStatus_TERMINATED), 46 newChannel(0, protocol.ChannelStatus_TERMINATED)}, 47 {"close to closing", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_CLOSING), 48 newChannel(2, protocol.ChannelStatus_CLOSE)}, 49 {"close to open", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_OPEN), 50 newChannel(2, protocol.ChannelStatus_CLOSE)}, 51 {"close to close new version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(2, protocol.ChannelStatus_CLOSE), 52 newChannel(2, protocol.ChannelStatus_CLOSE)}, 53 {"close to close same version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(1, protocol.ChannelStatus_CLOSE), 54 newChannel(1, protocol.ChannelStatus_CLOSE)}, 55 {"close to terminated", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(2, protocol.ChannelStatus_TERMINATED), 56 newChannel(2, protocol.ChannelStatus_TERMINATED)}, 57 {"new version to old version", newChannel(1, protocol.ChannelStatus_CLOSE), newChannel(0, protocol.ChannelStatus_CLOSING), 58 newChannel(1, protocol.ChannelStatus_CLOSE)}, 59 } 60 61 Convey("failConn notifyStatus with an empty state machine", t, func() { 62 lg, _ := testLogConfig.Build() 63 state := NewTunnelStateMachine("", "", nil, nil, nil, lg) 64 65 for _, test := range cases { 66 Convey("NotifyStatus case:"+test.desc, func() { 67 failConn := &failConn{ 68 state: state, 69 currentState: ToChannelStatus(test.originState), 70 } 71 failConn.NotifyStatus(ToChannelStatus(test.updateState)) 72 So(failConn.currentState.ToPbChannel(), ShouldResemble, test.expectState) 73 failConn.Close() 74 }) 75 } 76 }) 77} 78 79func TestFailConn_Closed(t *testing.T) { 80 Convey("init failConn Closed return false", t, func() { 81 conn := new(failConn) 82 So(conn.Closed(), ShouldBeFalse) 83 }) 84 85 c := []*protocol.Channel{ 86 newChannel(1, protocol.ChannelStatus_CLOSING), 87 newChannel(1, protocol.ChannelStatus_CLOSE), 88 newChannel(1, protocol.ChannelStatus_OPEN), 89 newChannel(1, protocol.ChannelStatus_TERMINATED), 90 } 91 Convey("failConn Closed return true", t, func() { 92 lg, _ := testLogConfig.Build() 93 state := NewTunnelStateMachine("", "", nil, nil, nil, lg) 94 for _, channel := range c { 95 conn := &failConn{state: state} 96 conn.NotifyStatus(ToChannelStatus(channel)) 97 So(conn.Closed(), ShouldBeTrue) 98 } 99 }) 100} 101 102func TestChannelConn_NotifyStatus(t *testing.T) { 103 //speed up cases 104 oldValue := tickMaxInterval 105 tickMaxInterval = time.Second 106 defer func() { 107 tickMaxInterval = oldValue 108 }() 109 mockCtrl := gomock.NewController(t) 110 defer mockCtrl.Finish() 111 lg, _ := testLogConfig.Build() 112 state := NewTunnelStateMachine("", "", nil, nil, nil, lg) 113 bypassApi := NewMocktunnelDataApi(mockCtrl) 114 bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes() 115 116 Convey("nil state channel notify status", t, func() { 117 cases := []struct { 118 updateState *protocol.Channel 119 expectState *protocol.Channel 120 expectChStatus int32 121 }{ 122 {newChannel(0, protocol.ChannelStatus_CLOSE), 123 newChannel(0, protocol.ChannelStatus_CLOSE), closedStatus}, 124 {newChannel(0, protocol.ChannelStatus_CLOSING), 125 newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus}, 126 {newChannel(0, protocol.ChannelStatus_OPEN), 127 newChannel(0, protocol.ChannelStatus_OPEN), runningStatus}, 128 {newChannel(0, protocol.ChannelStatus_TERMINATED), 129 newChannel(0, protocol.ChannelStatus_TERMINATED), closedStatus}, 130 } 131 dialer := &channelDialer{ 132 api: bypassApi, 133 lg: lg, 134 } 135 136 for _, test := range cases { 137 Convey("from nil to status:"+test.updateState.String(), func() { 138 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state) 139 conn.NotifyStatus(ToChannelStatus(test.updateState)) 140 cconn := conn.(*channelConn) 141 So(cconn.getState(), ShouldResemble, test.expectState) 142 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus) 143 cconn.Close() 144 }) 145 } 146 }) 147 148 Convey("closed state channel notify status", t, func() { 149 closedState := newChannel(1, protocol.ChannelStatus_CLOSE) 150 cases := []struct { 151 updateState *protocol.Channel 152 expectState *protocol.Channel 153 expectChStatus int32 154 }{ 155 {newChannel(1, protocol.ChannelStatus_CLOSE), 156 newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus}, 157 {newChannel(1, protocol.ChannelStatus_CLOSING), 158 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 159 {newChannel(1, protocol.ChannelStatus_OPEN), 160 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 161 {newChannel(1, protocol.ChannelStatus_TERMINATED), 162 newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus}, 163 {newChannel(0, protocol.ChannelStatus_CLOSING), 164 newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus}, 165 } 166 dialer := &channelDialer{ 167 api: nil, 168 lg: lg, 169 } 170 171 for _, test := range cases { 172 Convey("from closed to status:"+test.updateState.String(), func() { 173 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state) 174 conn.NotifyStatus(ToChannelStatus(closedState)) 175 176 conn.NotifyStatus(ToChannelStatus(test.updateState)) 177 cconn := conn.(*channelConn) 178 So(cconn.getState(), ShouldResemble, test.expectState) 179 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus) 180 conn.Close() 181 }) 182 } 183 }) 184 185 Convey("closing state channel notify status", t, func() { 186 openState := newChannel(0, protocol.ChannelStatus_OPEN) 187 closingState := newChannel(1, protocol.ChannelStatus_CLOSING) 188 cases := []struct { 189 updateState *protocol.Channel 190 stateAfterCheck *protocol.Channel 191 statusAfterCheck int32 192 }{ 193 {newChannel(1, protocol.ChannelStatus_CLOSE), 194 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 195 {newChannel(1, protocol.ChannelStatus_CLOSING), 196 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 197 {newChannel(1, protocol.ChannelStatus_OPEN), 198 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 199 {newChannel(1, protocol.ChannelStatus_TERMINATED), 200 newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus}, 201 {newChannel(0, protocol.ChannelStatus_CLOSING), 202 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 203 } 204 dialer := &channelDialer{ 205 api: bypassApi, 206 lg: lg, 207 } 208 209 for _, test := range cases { 210 Convey("from closing to status:"+test.updateState.String(), func() { 211 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(10*time.Millisecond)), state) 212 conn.NotifyStatus(ToChannelStatus(openState)) 213 time.Sleep(10 * time.Millisecond) 214 conn.NotifyStatus(ToChannelStatus(closingState)) 215 216 conn.NotifyStatus(ToChannelStatus(test.updateState)) 217 channelGrace() 218 cconn := conn.(*channelConn) 219 cconn.checkUpdateStatus() 220 So(cconn.getState(), ShouldResemble, test.stateAfterCheck) 221 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.statusAfterCheck) 222 conn.Close() 223 }) 224 } 225 }) 226 227 Convey("open state channel notify status", t, func() { 228 openState := newChannel(1, protocol.ChannelStatus_OPEN) 229 cases := []struct { 230 updateState *protocol.Channel 231 expectState *protocol.Channel 232 expectChStatus int32 233 finalState *protocol.Channel 234 finalChStatus int32 235 }{ 236 {newChannel(1, protocol.ChannelStatus_CLOSE), 237 newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus, 238 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 239 {newChannel(1, protocol.ChannelStatus_CLOSING), 240 newChannel(1, protocol.ChannelStatus_CLOSING), closingStatus, 241 newChannel(2, protocol.ChannelStatus_CLOSE), closedStatus}, 242 {newChannel(1, protocol.ChannelStatus_OPEN), 243 newChannel(1, protocol.ChannelStatus_OPEN), runningStatus, 244 newChannel(1, protocol.ChannelStatus_OPEN), runningStatus}, 245 {newChannel(1, protocol.ChannelStatus_TERMINATED), 246 newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus, 247 newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus}, 248 {newChannel(0, protocol.ChannelStatus_CLOSING), 249 newChannel(1, protocol.ChannelStatus_OPEN), runningStatus, 250 newChannel(1, protocol.ChannelStatus_OPEN), runningStatus}, 251 } 252 dialer := &channelDialer{ 253 api: bypassApi, 254 lg: lg, 255 } 256 257 for _, test := range cases { 258 Convey("from open to status:"+test.updateState.String(), func() { 259 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(100*time.Millisecond)), state) 260 conn.NotifyStatus(ToChannelStatus(openState)) 261 time.Sleep(10 * time.Millisecond) 262 conn.NotifyStatus(ToChannelStatus(test.updateState)) 263 cconn := conn.(*channelConn) 264 So(cconn.getState(), ShouldResemble, test.expectState) 265 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus) 266 channelGrace() 267 cconn.checkUpdateStatus() 268 So(cconn.getState(), ShouldResemble, test.finalState) 269 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.finalChStatus) 270 conn.Close() 271 }) 272 } 273 }) 274 275 Convey("terminated state channel notify status", t, func() { 276 termState := newChannel(1, protocol.ChannelStatus_TERMINATED) 277 cases := []struct { 278 updateState *protocol.Channel 279 expectState *protocol.Channel 280 expectChStatus int32 281 }{ 282 {newChannel(1, protocol.ChannelStatus_CLOSE), 283 newChannel(1, protocol.ChannelStatus_CLOSE), closedStatus}, 284 {newChannel(1, protocol.ChannelStatus_CLOSING), 285 newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus}, 286 {newChannel(1, protocol.ChannelStatus_OPEN), 287 newChannel(2, protocol.ChannelStatus_TERMINATED), closedStatus}, 288 {newChannel(1, protocol.ChannelStatus_TERMINATED), 289 newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus}, 290 {newChannel(0, protocol.ChannelStatus_CLOSING), 291 newChannel(1, protocol.ChannelStatus_TERMINATED), closedStatus}, 292 } 293 dialer := &channelDialer{ 294 api: nil, 295 lg: lg, 296 } 297 298 for _, test := range cases { 299 Convey("from closed to status:"+test.updateState.String(), func() { 300 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state) 301 conn.NotifyStatus(ToChannelStatus(termState)) 302 conn.NotifyStatus(ToChannelStatus(test.updateState)) 303 cconn := conn.(*channelConn) 304 So(cconn.getState(), ShouldResemble, test.expectState) 305 So(atomic.LoadInt32(&cconn.status), ShouldEqual, test.expectChStatus) 306 conn.Close() 307 }) 308 } 309 }) 310} 311 312func TestChannelConn_NotifyStatus_ProcessRecords(t *testing.T) { 313 mockCtrl := gomock.NewController(t) 314 defer mockCtrl.Finish() 315 lg, _ := testLogConfig.Build() 316 state := NewTunnelStateMachine("", "", nil, nil, nil, lg) 317 bypassApi := NewMocktunnelDataApi(mockCtrl) 318 bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes() 319 320 failApi := NewMocktunnelDataApi(mockCtrl) 321 failApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, errors.New("abc")).Times(1) 322 323 finishApi := NewMocktunnelDataApi(mockCtrl) 324 finishApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, FinishTag, "traceId", 0, nil).Times(1) 325 326 cases := []struct { 327 desc string 328 api tunnelDataApi 329 processor ChannelProcessor 330 expectState *protocol.Channel 331 }{ 332 {"tunnel read records failed", failApi, newTestProcessor(time.Duration(0)), 333 newChannel(1, protocol.ChannelStatus_CLOSE)}, 334 {"tunnel read records finished", finishApi, 335 newTestProcessor(time.Duration(0)), newChannel(1, protocol.ChannelStatus_TERMINATED)}, 336 {"tunnel processor process records failed", bypassApi, new(failProcessor), 337 newChannel(1, protocol.ChannelStatus_CLOSE)}, 338 } 339 340 Convey("open channel read records", t, func() { 341 for _, test := range cases { 342 Convey("read records case: "+test.desc, func() { 343 openState := newChannel(0, protocol.ChannelStatus_OPEN) 344 dialer := &channelDialer{ 345 api: test.api, 346 lg: lg, 347 } 348 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", test.processor, state) 349 conn.NotifyStatus(ToChannelStatus(openState)) 350 351 time.Sleep(30 * time.Millisecond) 352 conn.NotifyStatus(ToChannelStatus(openState)) 353 cconn := conn.(*channelConn) 354 So(cconn.getState(), ShouldResemble, test.expectState) 355 conn.Close() 356 }) 357 } 358 }) 359} 360 361func TestChannelConn_Close(t *testing.T) { 362 mockCtrl := gomock.NewController(t) 363 defer mockCtrl.Finish() 364 lg, _ := testLogConfig.Build() 365 state := NewTunnelStateMachine("", "", nil, nil, nil, lg) 366 bypassApi := NewMocktunnelDataApi(mockCtrl) 367 bypassApi.EXPECT().readRecords("tunnelId", "clientId", "channelId", "token").Return(nil, "token", "traceId", 0, nil).AnyTimes() 368 369 Convey("open tunnel is closed", t, func() { 370 openState := newChannel(0, protocol.ChannelStatus_OPEN) 371 dialer := &channelDialer{ 372 api: bypassApi, 373 lg: lg, 374 } 375 conn := dialer.ChannelDial("tunnelId", "clientId", "channelId", "token", newTestProcessor(time.Duration(0)), state) 376 conn.NotifyStatus(ToChannelStatus(openState)) 377 378 time.Sleep(time.Second) 379 conn.Close() 380 So(conn.Closed(), ShouldBeTrue) 381 }) 382} 383 384func newChannel(v int64, status protocol.ChannelStatus) *protocol.Channel { 385 return &protocol.Channel{ 386 ChannelId: &cid, 387 Version: &v, 388 Status: status.Enum(), 389 } 390} 391 392type testProcessor struct { 393 sleepDur time.Duration 394} 395 396func newTestProcessor(dur time.Duration) *testProcessor { 397 return &testProcessor{dur} 398} 399 400func (p *testProcessor) Process(records []*Record, nextToken, traceId string) error { 401 return nil 402} 403 404func (p *testProcessor) Shutdown() { 405 time.Sleep(p.sleepDur) 406} 407 408type failProcessor struct{} 409 410func (p *failProcessor) Process(records []*Record, nextToken, traceId string) error { 411 return errors.New("failed") 412} 413 414func (p *failProcessor) Shutdown() {} 415 416func (c *channelConn) getState() *protocol.Channel { 417 c.mu.Lock() 418 defer c.mu.Unlock() 419 return c.currentState.ToPbChannel() 420} 421 422func channelGrace() { 423 time.Sleep(2 * time.Second) 424} 425