1// Code generated by private/model/cli/gen-api/main.go. DO NOT EDIT. 2 3// +build go1.10 4 5package kinesis 6 7import ( 8 "bytes" 9 "context" 10 "io/ioutil" 11 "net/http" 12 "reflect" 13 "strings" 14 "sync" 15 "testing" 16 "time" 17 18 "github.com/aws/aws-sdk-go/aws" 19 "github.com/aws/aws-sdk-go/aws/awserr" 20 "github.com/aws/aws-sdk-go/aws/corehandlers" 21 "github.com/aws/aws-sdk-go/aws/request" 22 "github.com/aws/aws-sdk-go/awstesting/unit" 23 "github.com/aws/aws-sdk-go/private/protocol" 24 "github.com/aws/aws-sdk-go/private/protocol/eventstream" 25 "github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi" 26 "github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamtest" 27 "github.com/aws/aws-sdk-go/private/protocol/jsonrpc" 28) 29 30var _ time.Time 31var _ awserr.Error 32var _ context.Context 33var _ sync.WaitGroup 34var _ strings.Reader 35 36func TestSubscribeToShard_Read(t *testing.T) { 37 expectEvents, eventMsgs := mockSubscribeToShardReadEvents() 38 sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t, 39 eventstreamtest.ServeEventStream{ 40 T: t, 41 Events: eventMsgs, 42 }, 43 true, 44 ) 45 if err != nil { 46 t.Fatalf("expect no error, %v", err) 47 } 48 defer cleanupFn() 49 50 svc := New(sess) 51 resp, err := svc.SubscribeToShard(nil) 52 if err != nil { 53 t.Fatalf("expect no error got, %v", err) 54 } 55 defer resp.GetStream().Close() 56 // Trim off response output type pseudo event so only event messages remain. 57 expectEvents = expectEvents[1:] 58 59 var i int 60 for event := range resp.GetStream().Events() { 61 if event == nil { 62 t.Errorf("%d, expect event, got nil", i) 63 } 64 if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) { 65 t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a) 66 } 67 i++ 68 } 69 70 if err := resp.GetStream().Err(); err != nil { 71 t.Errorf("expect no error, %v", err) 72 } 73} 74 75func TestSubscribeToShard_ReadClose(t *testing.T) { 76 _, eventMsgs := mockSubscribeToShardReadEvents() 77 sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t, 78 eventstreamtest.ServeEventStream{ 79 T: t, 80 Events: eventMsgs, 81 }, 82 true, 83 ) 84 if err != nil { 85 t.Fatalf("expect no error, %v", err) 86 } 87 defer cleanupFn() 88 89 svc := New(sess) 90 resp, err := svc.SubscribeToShard(nil) 91 if err != nil { 92 t.Fatalf("expect no error got, %v", err) 93 } 94 95 // Assert calling Err before close does not close the stream. 96 resp.GetStream().Err() 97 select { 98 case _, ok := <-resp.GetStream().Events(): 99 if !ok { 100 t.Fatalf("expect stream not to be closed, but was") 101 } 102 default: 103 } 104 105 resp.GetStream().Close() 106 <-resp.GetStream().Events() 107 108 if err := resp.GetStream().Err(); err != nil { 109 t.Errorf("expect no error, %v", err) 110 } 111} 112 113func TestSubscribeToShard_ReadUnknownEvent(t *testing.T) { 114 expectEvents, eventMsgs := mockSubscribeToShardReadEvents() 115 eventOffset := 1 116 117 unknownEvent := eventstream.Message{ 118 Headers: eventstream.Headers{ 119 eventstreamtest.EventMessageTypeHeader, 120 { 121 Name: eventstreamapi.EventTypeHeader, 122 Value: eventstream.StringValue("UnknownEventName"), 123 }, 124 }, 125 Payload: []byte("some unknown event"), 126 } 127 128 eventMsgs = append(eventMsgs[:eventOffset], 129 append([]eventstream.Message{unknownEvent}, eventMsgs[eventOffset:]...)...) 130 131 expectEvents = append(expectEvents[:eventOffset], 132 append([]SubscribeToShardEventStreamEvent{ 133 &SubscribeToShardEventStreamUnknownEvent{ 134 Type: "UnknownEventName", 135 Message: unknownEvent, 136 }, 137 }, 138 expectEvents[eventOffset:]...)...) 139 140 sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t, 141 eventstreamtest.ServeEventStream{ 142 T: t, 143 Events: eventMsgs, 144 }, 145 true, 146 ) 147 if err != nil { 148 t.Fatalf("expect no error, %v", err) 149 } 150 defer cleanupFn() 151 152 svc := New(sess) 153 resp, err := svc.SubscribeToShard(nil) 154 if err != nil { 155 t.Fatalf("expect no error got, %v", err) 156 } 157 defer resp.GetStream().Close() 158 // Trim off response output type pseudo event so only event messages remain. 159 expectEvents = expectEvents[1:] 160 161 var i int 162 for event := range resp.GetStream().Events() { 163 if event == nil { 164 t.Errorf("%d, expect event, got nil", i) 165 } 166 if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) { 167 t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a) 168 } 169 i++ 170 } 171 172 if err := resp.GetStream().Err(); err != nil { 173 t.Errorf("expect no error, %v", err) 174 } 175} 176 177func BenchmarkSubscribeToShard_Read(b *testing.B) { 178 _, eventMsgs := mockSubscribeToShardReadEvents() 179 var buf bytes.Buffer 180 encoder := eventstream.NewEncoder(&buf) 181 for _, msg := range eventMsgs { 182 if err := encoder.Encode(msg); err != nil { 183 b.Fatalf("failed to encode message, %v", err) 184 } 185 } 186 stream := &loopReader{source: bytes.NewReader(buf.Bytes())} 187 188 sess := unit.Session 189 svc := New(sess, &aws.Config{ 190 Endpoint: aws.String("https://example.com"), 191 DisableParamValidation: aws.Bool(true), 192 }) 193 svc.Handlers.Send.Swap(corehandlers.SendHandler.Name, 194 request.NamedHandler{Name: "mockSend", 195 Fn: func(r *request.Request) { 196 r.HTTPResponse = &http.Response{ 197 Status: "200 OK", 198 StatusCode: 200, 199 Header: http.Header{}, 200 Body: ioutil.NopCloser(stream), 201 } 202 }, 203 }, 204 ) 205 206 resp, err := svc.SubscribeToShard(nil) 207 if err != nil { 208 b.Fatalf("failed to create request, %v", err) 209 } 210 defer resp.GetStream().Close() 211 b.ResetTimer() 212 213 for i := 0; i < b.N; i++ { 214 if err = resp.GetStream().Err(); err != nil { 215 b.Fatalf("expect no error, got %v", err) 216 } 217 event := <-resp.GetStream().Events() 218 if event == nil { 219 b.Fatalf("expect event, got nil, %v, %d", resp.GetStream().Err(), i) 220 } 221 } 222} 223 224func mockSubscribeToShardReadEvents() ( 225 []SubscribeToShardEventStreamEvent, 226 []eventstream.Message, 227) { 228 expectEvents := []SubscribeToShardEventStreamEvent{ 229 &SubscribeToShardOutput{}, 230 &SubscribeToShardEvent{ 231 ChildShards: []*ChildShard{ 232 { 233 HashKeyRange: &HashKeyRange{ 234 EndingHashKey: aws.String("string value goes here"), 235 StartingHashKey: aws.String("string value goes here"), 236 }, 237 ParentShards: []*string{ 238 aws.String("string value goes here"), 239 aws.String("string value goes here"), 240 aws.String("string value goes here"), 241 }, 242 ShardId: aws.String("string value goes here"), 243 }, 244 { 245 HashKeyRange: &HashKeyRange{ 246 EndingHashKey: aws.String("string value goes here"), 247 StartingHashKey: aws.String("string value goes here"), 248 }, 249 ParentShards: []*string{ 250 aws.String("string value goes here"), 251 aws.String("string value goes here"), 252 aws.String("string value goes here"), 253 }, 254 ShardId: aws.String("string value goes here"), 255 }, 256 { 257 HashKeyRange: &HashKeyRange{ 258 EndingHashKey: aws.String("string value goes here"), 259 StartingHashKey: aws.String("string value goes here"), 260 }, 261 ParentShards: []*string{ 262 aws.String("string value goes here"), 263 aws.String("string value goes here"), 264 aws.String("string value goes here"), 265 }, 266 ShardId: aws.String("string value goes here"), 267 }, 268 }, 269 ContinuationSequenceNumber: aws.String("string value goes here"), 270 MillisBehindLatest: aws.Int64(1234), 271 Records: []*Record{ 272 { 273 ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()), 274 Data: []byte("blob value goes here"), 275 EncryptionType: aws.String("string value goes here"), 276 PartitionKey: aws.String("string value goes here"), 277 SequenceNumber: aws.String("string value goes here"), 278 }, 279 { 280 ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()), 281 Data: []byte("blob value goes here"), 282 EncryptionType: aws.String("string value goes here"), 283 PartitionKey: aws.String("string value goes here"), 284 SequenceNumber: aws.String("string value goes here"), 285 }, 286 { 287 ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()), 288 Data: []byte("blob value goes here"), 289 EncryptionType: aws.String("string value goes here"), 290 PartitionKey: aws.String("string value goes here"), 291 SequenceNumber: aws.String("string value goes here"), 292 }, 293 }, 294 }, 295 } 296 297 var marshalers request.HandlerList 298 marshalers.PushBackNamed(jsonrpc.BuildHandler) 299 payloadMarshaler := protocol.HandlerPayloadMarshal{ 300 Marshalers: marshalers, 301 } 302 _ = payloadMarshaler 303 304 eventMsgs := []eventstream.Message{ 305 { 306 Headers: eventstream.Headers{ 307 eventstreamtest.EventMessageTypeHeader, 308 { 309 Name: eventstreamapi.EventTypeHeader, 310 Value: eventstream.StringValue("initial-response"), 311 }, 312 }, 313 Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]), 314 }, 315 { 316 Headers: eventstream.Headers{ 317 eventstreamtest.EventMessageTypeHeader, 318 { 319 Name: eventstreamapi.EventTypeHeader, 320 Value: eventstream.StringValue("SubscribeToShardEvent"), 321 }, 322 }, 323 Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]), 324 }, 325 } 326 327 return expectEvents, eventMsgs 328} 329func TestSubscribeToShard_ReadException(t *testing.T) { 330 expectEvents := []SubscribeToShardEventStreamEvent{ 331 &SubscribeToShardOutput{}, 332 &InternalFailureException{ 333 RespMetadata: protocol.ResponseMetadata{ 334 StatusCode: 200, 335 }, 336 Message_: aws.String("string value goes here"), 337 }, 338 } 339 340 var marshalers request.HandlerList 341 marshalers.PushBackNamed(jsonrpc.BuildHandler) 342 payloadMarshaler := protocol.HandlerPayloadMarshal{ 343 Marshalers: marshalers, 344 } 345 346 eventMsgs := []eventstream.Message{ 347 { 348 Headers: eventstream.Headers{ 349 eventstreamtest.EventMessageTypeHeader, 350 { 351 Name: eventstreamapi.EventTypeHeader, 352 Value: eventstream.StringValue("initial-response"), 353 }, 354 }, 355 Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]), 356 }, 357 { 358 Headers: eventstream.Headers{ 359 eventstreamtest.EventExceptionTypeHeader, 360 { 361 Name: eventstreamapi.ExceptionTypeHeader, 362 Value: eventstream.StringValue("InternalFailureException"), 363 }, 364 }, 365 Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]), 366 }, 367 } 368 369 sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t, 370 eventstreamtest.ServeEventStream{ 371 T: t, 372 Events: eventMsgs, 373 }, 374 true, 375 ) 376 if err != nil { 377 t.Fatalf("expect no error, %v", err) 378 } 379 defer cleanupFn() 380 381 svc := New(sess) 382 resp, err := svc.SubscribeToShard(nil) 383 if err != nil { 384 t.Fatalf("expect no error got, %v", err) 385 } 386 387 defer resp.GetStream().Close() 388 389 <-resp.GetStream().Events() 390 391 err = resp.GetStream().Err() 392 if err == nil { 393 t.Fatalf("expect err, got none") 394 } 395 396 expectErr := &InternalFailureException{ 397 RespMetadata: protocol.ResponseMetadata{ 398 StatusCode: 200, 399 }, 400 Message_: aws.String("string value goes here"), 401 } 402 aerr, ok := err.(awserr.Error) 403 if !ok { 404 t.Errorf("expect exception, got %T, %#v", err, err) 405 } 406 if e, a := expectErr.Code(), aerr.Code(); e != a { 407 t.Errorf("expect %v, got %v", e, a) 408 } 409 if e, a := expectErr.Message(), aerr.Message(); e != a { 410 t.Errorf("expect %v, got %v", e, a) 411 } 412 413 if e, a := expectErr, aerr; !reflect.DeepEqual(e, a) { 414 t.Errorf("expect error %+#v, got %+#v", e, a) 415 } 416} 417 418var _ awserr.Error = (*InternalFailureException)(nil) 419var _ awserr.Error = (*KMSAccessDeniedException)(nil) 420var _ awserr.Error = (*KMSDisabledException)(nil) 421var _ awserr.Error = (*KMSInvalidStateException)(nil) 422var _ awserr.Error = (*KMSNotFoundException)(nil) 423var _ awserr.Error = (*KMSOptInRequired)(nil) 424var _ awserr.Error = (*KMSThrottlingException)(nil) 425var _ awserr.Error = (*ResourceInUseException)(nil) 426var _ awserr.Error = (*ResourceNotFoundException)(nil) 427 428type loopReader struct { 429 source *bytes.Reader 430} 431 432func (c *loopReader) Read(p []byte) (int, error) { 433 if c.source.Len() == 0 { 434 c.source.Seek(0, 0) 435 } 436 437 return c.source.Read(p) 438} 439