1// Code generated by private/model/cli/gen-api/main.go. DO NOT EDIT.
2
3// +build go1.10
4
5package kinesis
6
7import (
8	"bytes"
9	"context"
10	"io/ioutil"
11	"net/http"
12	"reflect"
13	"strings"
14	"sync"
15	"testing"
16	"time"
17
18	"github.com/aws/aws-sdk-go/aws"
19	"github.com/aws/aws-sdk-go/aws/awserr"
20	"github.com/aws/aws-sdk-go/aws/corehandlers"
21	"github.com/aws/aws-sdk-go/aws/request"
22	"github.com/aws/aws-sdk-go/awstesting/unit"
23	"github.com/aws/aws-sdk-go/private/protocol"
24	"github.com/aws/aws-sdk-go/private/protocol/eventstream"
25	"github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi"
26	"github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamtest"
27	"github.com/aws/aws-sdk-go/private/protocol/jsonrpc"
28)
29
30var _ time.Time
31var _ awserr.Error
32var _ context.Context
33var _ sync.WaitGroup
34var _ strings.Reader
35
36func TestSubscribeToShard_Read(t *testing.T) {
37	expectEvents, eventMsgs := mockSubscribeToShardReadEvents()
38	sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
39		eventstreamtest.ServeEventStream{
40			T:      t,
41			Events: eventMsgs,
42		},
43		true,
44	)
45	if err != nil {
46		t.Fatalf("expect no error, %v", err)
47	}
48	defer cleanupFn()
49
50	svc := New(sess)
51	resp, err := svc.SubscribeToShard(nil)
52	if err != nil {
53		t.Fatalf("expect no error got, %v", err)
54	}
55	defer resp.GetStream().Close()
56	// Trim off response output type pseudo event so only event messages remain.
57	expectEvents = expectEvents[1:]
58
59	var i int
60	for event := range resp.GetStream().Events() {
61		if event == nil {
62			t.Errorf("%d, expect event, got nil", i)
63		}
64		if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) {
65			t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a)
66		}
67		i++
68	}
69
70	if err := resp.GetStream().Err(); err != nil {
71		t.Errorf("expect no error, %v", err)
72	}
73}
74
75func TestSubscribeToShard_ReadClose(t *testing.T) {
76	_, eventMsgs := mockSubscribeToShardReadEvents()
77	sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
78		eventstreamtest.ServeEventStream{
79			T:      t,
80			Events: eventMsgs,
81		},
82		true,
83	)
84	if err != nil {
85		t.Fatalf("expect no error, %v", err)
86	}
87	defer cleanupFn()
88
89	svc := New(sess)
90	resp, err := svc.SubscribeToShard(nil)
91	if err != nil {
92		t.Fatalf("expect no error got, %v", err)
93	}
94
95	// Assert calling Err before close does not close the stream.
96	resp.GetStream().Err()
97	select {
98	case _, ok := <-resp.GetStream().Events():
99		if !ok {
100			t.Fatalf("expect stream not to be closed, but was")
101		}
102	default:
103	}
104
105	resp.GetStream().Close()
106	<-resp.GetStream().Events()
107
108	if err := resp.GetStream().Err(); err != nil {
109		t.Errorf("expect no error, %v", err)
110	}
111}
112
113func TestSubscribeToShard_ReadUnknownEvent(t *testing.T) {
114	expectEvents, eventMsgs := mockSubscribeToShardReadEvents()
115	eventOffset := 1
116
117	unknownEvent := eventstream.Message{
118		Headers: eventstream.Headers{
119			eventstreamtest.EventMessageTypeHeader,
120			{
121				Name:  eventstreamapi.EventTypeHeader,
122				Value: eventstream.StringValue("UnknownEventName"),
123			},
124		},
125		Payload: []byte("some unknown event"),
126	}
127
128	eventMsgs = append(eventMsgs[:eventOffset],
129		append([]eventstream.Message{unknownEvent}, eventMsgs[eventOffset:]...)...)
130
131	expectEvents = append(expectEvents[:eventOffset],
132		append([]SubscribeToShardEventStreamEvent{
133			&SubscribeToShardEventStreamUnknownEvent{
134				Type:    "UnknownEventName",
135				Message: unknownEvent,
136			},
137		},
138			expectEvents[eventOffset:]...)...)
139
140	sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
141		eventstreamtest.ServeEventStream{
142			T:      t,
143			Events: eventMsgs,
144		},
145		true,
146	)
147	if err != nil {
148		t.Fatalf("expect no error, %v", err)
149	}
150	defer cleanupFn()
151
152	svc := New(sess)
153	resp, err := svc.SubscribeToShard(nil)
154	if err != nil {
155		t.Fatalf("expect no error got, %v", err)
156	}
157	defer resp.GetStream().Close()
158	// Trim off response output type pseudo event so only event messages remain.
159	expectEvents = expectEvents[1:]
160
161	var i int
162	for event := range resp.GetStream().Events() {
163		if event == nil {
164			t.Errorf("%d, expect event, got nil", i)
165		}
166		if e, a := expectEvents[i], event; !reflect.DeepEqual(e, a) {
167			t.Errorf("%d, expect %T %v, got %T %v", i, e, e, a, a)
168		}
169		i++
170	}
171
172	if err := resp.GetStream().Err(); err != nil {
173		t.Errorf("expect no error, %v", err)
174	}
175}
176
177func BenchmarkSubscribeToShard_Read(b *testing.B) {
178	_, eventMsgs := mockSubscribeToShardReadEvents()
179	var buf bytes.Buffer
180	encoder := eventstream.NewEncoder(&buf)
181	for _, msg := range eventMsgs {
182		if err := encoder.Encode(msg); err != nil {
183			b.Fatalf("failed to encode message, %v", err)
184		}
185	}
186	stream := &loopReader{source: bytes.NewReader(buf.Bytes())}
187
188	sess := unit.Session
189	svc := New(sess, &aws.Config{
190		Endpoint:               aws.String("https://example.com"),
191		DisableParamValidation: aws.Bool(true),
192	})
193	svc.Handlers.Send.Swap(corehandlers.SendHandler.Name,
194		request.NamedHandler{Name: "mockSend",
195			Fn: func(r *request.Request) {
196				r.HTTPResponse = &http.Response{
197					Status:     "200 OK",
198					StatusCode: 200,
199					Header:     http.Header{},
200					Body:       ioutil.NopCloser(stream),
201				}
202			},
203		},
204	)
205
206	resp, err := svc.SubscribeToShard(nil)
207	if err != nil {
208		b.Fatalf("failed to create request, %v", err)
209	}
210	defer resp.GetStream().Close()
211	b.ResetTimer()
212
213	for i := 0; i < b.N; i++ {
214		if err = resp.GetStream().Err(); err != nil {
215			b.Fatalf("expect no error, got %v", err)
216		}
217		event := <-resp.GetStream().Events()
218		if event == nil {
219			b.Fatalf("expect event, got nil, %v, %d", resp.GetStream().Err(), i)
220		}
221	}
222}
223
224func mockSubscribeToShardReadEvents() (
225	[]SubscribeToShardEventStreamEvent,
226	[]eventstream.Message,
227) {
228	expectEvents := []SubscribeToShardEventStreamEvent{
229		&SubscribeToShardOutput{},
230		&SubscribeToShardEvent{
231			ChildShards: []*ChildShard{
232				{
233					HashKeyRange: &HashKeyRange{
234						EndingHashKey:   aws.String("string value goes here"),
235						StartingHashKey: aws.String("string value goes here"),
236					},
237					ParentShards: []*string{
238						aws.String("string value goes here"),
239						aws.String("string value goes here"),
240						aws.String("string value goes here"),
241					},
242					ShardId: aws.String("string value goes here"),
243				},
244				{
245					HashKeyRange: &HashKeyRange{
246						EndingHashKey:   aws.String("string value goes here"),
247						StartingHashKey: aws.String("string value goes here"),
248					},
249					ParentShards: []*string{
250						aws.String("string value goes here"),
251						aws.String("string value goes here"),
252						aws.String("string value goes here"),
253					},
254					ShardId: aws.String("string value goes here"),
255				},
256				{
257					HashKeyRange: &HashKeyRange{
258						EndingHashKey:   aws.String("string value goes here"),
259						StartingHashKey: aws.String("string value goes here"),
260					},
261					ParentShards: []*string{
262						aws.String("string value goes here"),
263						aws.String("string value goes here"),
264						aws.String("string value goes here"),
265					},
266					ShardId: aws.String("string value goes here"),
267				},
268			},
269			ContinuationSequenceNumber: aws.String("string value goes here"),
270			MillisBehindLatest:         aws.Int64(1234),
271			Records: []*Record{
272				{
273					ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
274					Data:                        []byte("blob value goes here"),
275					EncryptionType:              aws.String("string value goes here"),
276					PartitionKey:                aws.String("string value goes here"),
277					SequenceNumber:              aws.String("string value goes here"),
278				},
279				{
280					ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
281					Data:                        []byte("blob value goes here"),
282					EncryptionType:              aws.String("string value goes here"),
283					PartitionKey:                aws.String("string value goes here"),
284					SequenceNumber:              aws.String("string value goes here"),
285				},
286				{
287					ApproximateArrivalTimestamp: aws.Time(time.Unix(1396594860, 0).UTC()),
288					Data:                        []byte("blob value goes here"),
289					EncryptionType:              aws.String("string value goes here"),
290					PartitionKey:                aws.String("string value goes here"),
291					SequenceNumber:              aws.String("string value goes here"),
292				},
293			},
294		},
295	}
296
297	var marshalers request.HandlerList
298	marshalers.PushBackNamed(jsonrpc.BuildHandler)
299	payloadMarshaler := protocol.HandlerPayloadMarshal{
300		Marshalers: marshalers,
301	}
302	_ = payloadMarshaler
303
304	eventMsgs := []eventstream.Message{
305		{
306			Headers: eventstream.Headers{
307				eventstreamtest.EventMessageTypeHeader,
308				{
309					Name:  eventstreamapi.EventTypeHeader,
310					Value: eventstream.StringValue("initial-response"),
311				},
312			},
313			Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]),
314		},
315		{
316			Headers: eventstream.Headers{
317				eventstreamtest.EventMessageTypeHeader,
318				{
319					Name:  eventstreamapi.EventTypeHeader,
320					Value: eventstream.StringValue("SubscribeToShardEvent"),
321				},
322			},
323			Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]),
324		},
325	}
326
327	return expectEvents, eventMsgs
328}
329func TestSubscribeToShard_ReadException(t *testing.T) {
330	expectEvents := []SubscribeToShardEventStreamEvent{
331		&SubscribeToShardOutput{},
332		&InternalFailureException{
333			RespMetadata: protocol.ResponseMetadata{
334				StatusCode: 200,
335			},
336			Message_: aws.String("string value goes here"),
337		},
338	}
339
340	var marshalers request.HandlerList
341	marshalers.PushBackNamed(jsonrpc.BuildHandler)
342	payloadMarshaler := protocol.HandlerPayloadMarshal{
343		Marshalers: marshalers,
344	}
345
346	eventMsgs := []eventstream.Message{
347		{
348			Headers: eventstream.Headers{
349				eventstreamtest.EventMessageTypeHeader,
350				{
351					Name:  eventstreamapi.EventTypeHeader,
352					Value: eventstream.StringValue("initial-response"),
353				},
354			},
355			Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[0]),
356		},
357		{
358			Headers: eventstream.Headers{
359				eventstreamtest.EventExceptionTypeHeader,
360				{
361					Name:  eventstreamapi.ExceptionTypeHeader,
362					Value: eventstream.StringValue("InternalFailureException"),
363				},
364			},
365			Payload: eventstreamtest.MarshalEventPayload(payloadMarshaler, expectEvents[1]),
366		},
367	}
368
369	sess, cleanupFn, err := eventstreamtest.SetupEventStreamSession(t,
370		eventstreamtest.ServeEventStream{
371			T:      t,
372			Events: eventMsgs,
373		},
374		true,
375	)
376	if err != nil {
377		t.Fatalf("expect no error, %v", err)
378	}
379	defer cleanupFn()
380
381	svc := New(sess)
382	resp, err := svc.SubscribeToShard(nil)
383	if err != nil {
384		t.Fatalf("expect no error got, %v", err)
385	}
386
387	defer resp.GetStream().Close()
388
389	<-resp.GetStream().Events()
390
391	err = resp.GetStream().Err()
392	if err == nil {
393		t.Fatalf("expect err, got none")
394	}
395
396	expectErr := &InternalFailureException{
397		RespMetadata: protocol.ResponseMetadata{
398			StatusCode: 200,
399		},
400		Message_: aws.String("string value goes here"),
401	}
402	aerr, ok := err.(awserr.Error)
403	if !ok {
404		t.Errorf("expect exception, got %T, %#v", err, err)
405	}
406	if e, a := expectErr.Code(), aerr.Code(); e != a {
407		t.Errorf("expect %v, got %v", e, a)
408	}
409	if e, a := expectErr.Message(), aerr.Message(); e != a {
410		t.Errorf("expect %v, got %v", e, a)
411	}
412
413	if e, a := expectErr, aerr; !reflect.DeepEqual(e, a) {
414		t.Errorf("expect error %+#v, got %+#v", e, a)
415	}
416}
417
418var _ awserr.Error = (*InternalFailureException)(nil)
419var _ awserr.Error = (*KMSAccessDeniedException)(nil)
420var _ awserr.Error = (*KMSDisabledException)(nil)
421var _ awserr.Error = (*KMSInvalidStateException)(nil)
422var _ awserr.Error = (*KMSNotFoundException)(nil)
423var _ awserr.Error = (*KMSOptInRequired)(nil)
424var _ awserr.Error = (*KMSThrottlingException)(nil)
425var _ awserr.Error = (*ResourceInUseException)(nil)
426var _ awserr.Error = (*ResourceNotFoundException)(nil)
427
428type loopReader struct {
429	source *bytes.Reader
430}
431
432func (c *loopReader) Read(p []byte) (int, error) {
433	if c.source.Len() == 0 {
434		c.source.Seek(0, 0)
435	}
436
437	return c.source.Read(p)
438}
439