1// +build codegen
2
3package api
4
5import "text/template"
6
7var eventStreamShapeReaderTmpl = template.Must(template.New("eventStreamShapeReaderTmpl").
8	Funcs(template.FuncMap{}).
9	Parse(`
10{{- $es := $.EventStream }}
11
12// {{ $es.StreamReaderAPIName }} provides the interface for reading to the stream. The
13// default implementation for this interface will be {{ $.ShapeName }}.
14//
15// The reader's Close method must allow multiple concurrent calls.
16//
17// These events are:
18// {{ range $_, $event := $es.Events }}
19//     * {{ $event.Shape.ShapeName }}
20{{- end }}
21//     * {{ $es.StreamUnknownEventName }}
22type {{ $es.StreamReaderAPIName }} interface {
23	// Returns a channel of events as they are read from the event stream.
24	Events() <-chan {{ $es.EventGroupName }}
25
26	// Close will stop the reader reading events from the stream.
27	Close() error
28
29	// Returns any error that has occurred while reading from the event stream.
30	Err() error
31}
32
33type {{ $es.StreamReaderImplName }} struct {
34	eventReader *eventstreamapi.EventReader
35	stream      chan {{ $es.EventGroupName }}
36	err         *eventstreamapi.OnceError
37
38	done      chan struct{}
39	closeOnce sync.Once
40}
41
42func {{ $es.StreamReaderImplConstructorName }}(eventReader *eventstreamapi.EventReader) *{{ $es.StreamReaderImplName }} {
43	r := &{{ $es.StreamReaderImplName }}{
44		eventReader: eventReader,
45		stream: make(chan {{ $es.EventGroupName }}),
46		done:   make(chan struct{}),
47		err:    eventstreamapi.NewOnceError(),
48	}
49	go r.readEventStream()
50
51	return r
52}
53
54// Close will close the underlying event stream reader.
55func (r *{{ $es.StreamReaderImplName }}) Close() error {
56	r.closeOnce.Do(r.safeClose)
57	return r.Err()
58}
59
60func (r *{{ $es.StreamReaderImplName }}) ErrorSet() <-chan struct{} {
61	return r.err.ErrorSet()
62}
63
64func (r *{{ $es.StreamReaderImplName }}) Closed() <-chan struct{} {
65	return r.done
66}
67
68func (r *{{ $es.StreamReaderImplName }}) safeClose() {
69	close(r.done)
70}
71
72func (r *{{ $es.StreamReaderImplName }}) Err() error {
73	return r.err.Err()
74}
75
76func (r *{{ $es.StreamReaderImplName }}) Events() <-chan {{ $es.EventGroupName }} {
77	return r.stream
78}
79
80func (r *{{ $es.StreamReaderImplName }}) readEventStream() {
81	defer r.Close()
82	defer close(r.stream)
83
84	for {
85		event, err := r.eventReader.ReadEvent()
86		if err != nil {
87			if err == io.EOF {
88				return
89			}
90			select {
91			case <-r.done:
92				// If closed already ignore the error
93				return
94			default:
95			}
96			if _, ok := err.(*eventstreamapi.UnknownMessageTypeError); ok {
97				continue
98			}
99			r.err.SetError(err)
100			return
101		}
102
103		select {
104		case r.stream <- event.({{ $es.EventGroupName }}):
105		case <-r.done:
106			return
107		}
108	}
109}
110
111type {{ $es.StreamUnmarshalerForEventName }} struct {
112	metadata protocol.ResponseMetadata
113}
114
115func (u {{ $es.StreamUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) {
116	switch eventType {
117		{{- range $_, $event := $es.Events }}
118			case {{ printf "%q" $event.Name }}:
119				return &{{ $event.Shape.ShapeName }}{}, nil
120		{{- end }}
121		{{- range $_, $event := $es.Exceptions }}
122			case {{ printf "%q" $event.Name }}:
123				return newError{{ $event.Shape.ShapeName }}(u.metadata).(eventstreamapi.Unmarshaler), nil
124		{{- end }}
125	default:
126		return &{{ $es.StreamUnknownEventName }}{Type: eventType}, nil
127	}
128}
129
130// {{ $es.StreamUnknownEventName }} provides a failsafe event for the
131// {{ $es.Name }} group of events when an unknown event is received.
132type {{ $es.StreamUnknownEventName }} struct {
133	Type string
134	Message eventstream.Message
135}
136
137// The {{ $es.StreamUnknownEventName }} is and event in the {{ $es.Name }}
138// group of events.
139func (s *{{ $es.StreamUnknownEventName }}) event{{ $es.Name }}() {}
140
141// MarshalEvent marshals the type into an stream event value. This method
142// should only used internally within the SDK's EventStream handling.
143func (e *{{ $es.StreamUnknownEventName }}) MarshalEvent(pm protocol.PayloadMarshaler) (
144	msg eventstream.Message, err error,
145) {
146	return e.Message.Clone(), nil
147}
148
149// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value.
150// This method is only used internally within the SDK's EventStream handling.
151func (e *{{ $es.StreamUnknownEventName }}) UnmarshalEvent(
152	payloadUnmarshaler protocol.PayloadUnmarshaler,
153	msg eventstream.Message,
154) error {
155	e.Message = msg.Clone()
156	return nil
157}
158`))
159