1// +build codegen 2 3package api 4 5import "text/template" 6 7var eventStreamShapeReaderTmpl = template.Must(template.New("eventStreamShapeReaderTmpl"). 8 Funcs(template.FuncMap{}). 9 Parse(` 10{{- $es := $.EventStream }} 11 12// {{ $es.StreamReaderAPIName }} provides the interface for reading to the stream. The 13// default implementation for this interface will be {{ $.ShapeName }}. 14// 15// The reader's Close method must allow multiple concurrent calls. 16// 17// These events are: 18// {{ range $_, $event := $es.Events }} 19// * {{ $event.Shape.ShapeName }} 20{{- end }} 21// * {{ $es.StreamUnknownEventName }} 22type {{ $es.StreamReaderAPIName }} interface { 23 // Returns a channel of events as they are read from the event stream. 24 Events() <-chan {{ $es.EventGroupName }} 25 26 // Close will stop the reader reading events from the stream. 27 Close() error 28 29 // Returns any error that has occurred while reading from the event stream. 30 Err() error 31} 32 33type {{ $es.StreamReaderImplName }} struct { 34 eventReader *eventstreamapi.EventReader 35 stream chan {{ $es.EventGroupName }} 36 err *eventstreamapi.OnceError 37 38 done chan struct{} 39 closeOnce sync.Once 40} 41 42func {{ $es.StreamReaderImplConstructorName }}(eventReader *eventstreamapi.EventReader) *{{ $es.StreamReaderImplName }} { 43 r := &{{ $es.StreamReaderImplName }}{ 44 eventReader: eventReader, 45 stream: make(chan {{ $es.EventGroupName }}), 46 done: make(chan struct{}), 47 err: eventstreamapi.NewOnceError(), 48 } 49 go r.readEventStream() 50 51 return r 52} 53 54// Close will close the underlying event stream reader. 55func (r *{{ $es.StreamReaderImplName }}) Close() error { 56 r.closeOnce.Do(r.safeClose) 57 return r.Err() 58} 59 60func (r *{{ $es.StreamReaderImplName }}) ErrorSet() <-chan struct{} { 61 return r.err.ErrorSet() 62} 63 64func (r *{{ $es.StreamReaderImplName }}) Closed() <-chan struct{} { 65 return r.done 66} 67 68func (r *{{ $es.StreamReaderImplName }}) safeClose() { 69 close(r.done) 70} 71 72func (r *{{ $es.StreamReaderImplName }}) Err() error { 73 return r.err.Err() 74} 75 76func (r *{{ $es.StreamReaderImplName }}) Events() <-chan {{ $es.EventGroupName }} { 77 return r.stream 78} 79 80func (r *{{ $es.StreamReaderImplName }}) readEventStream() { 81 defer r.Close() 82 defer close(r.stream) 83 84 for { 85 event, err := r.eventReader.ReadEvent() 86 if err != nil { 87 if err == io.EOF { 88 return 89 } 90 select { 91 case <-r.done: 92 // If closed already ignore the error 93 return 94 default: 95 } 96 if _, ok := err.(*eventstreamapi.UnknownMessageTypeError); ok { 97 continue 98 } 99 r.err.SetError(err) 100 return 101 } 102 103 select { 104 case r.stream <- event.({{ $es.EventGroupName }}): 105 case <-r.done: 106 return 107 } 108 } 109} 110 111type {{ $es.StreamUnmarshalerForEventName }} struct { 112 metadata protocol.ResponseMetadata 113} 114 115func (u {{ $es.StreamUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) { 116 switch eventType { 117 {{- range $_, $event := $es.Events }} 118 case {{ printf "%q" $event.Name }}: 119 return &{{ $event.Shape.ShapeName }}{}, nil 120 {{- end }} 121 {{- range $_, $event := $es.Exceptions }} 122 case {{ printf "%q" $event.Name }}: 123 return newError{{ $event.Shape.ShapeName }}(u.metadata).(eventstreamapi.Unmarshaler), nil 124 {{- end }} 125 default: 126 return &{{ $es.StreamUnknownEventName }}{Type: eventType}, nil 127 } 128} 129 130// {{ $es.StreamUnknownEventName }} provides a failsafe event for the 131// {{ $es.Name }} group of events when an unknown event is received. 132type {{ $es.StreamUnknownEventName }} struct { 133 Type string 134 Message eventstream.Message 135} 136 137// The {{ $es.StreamUnknownEventName }} is and event in the {{ $es.Name }} 138// group of events. 139func (s *{{ $es.StreamUnknownEventName }}) event{{ $es.Name }}() {} 140 141// MarshalEvent marshals the type into an stream event value. This method 142// should only used internally within the SDK's EventStream handling. 143func (e *{{ $es.StreamUnknownEventName }}) MarshalEvent(pm protocol.PayloadMarshaler) ( 144 msg eventstream.Message, err error, 145) { 146 return e.Message.Clone(), nil 147} 148 149// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value. 150// This method is only used internally within the SDK's EventStream handling. 151func (e *{{ $es.StreamUnknownEventName }}) UnmarshalEvent( 152 payloadUnmarshaler protocol.PayloadUnmarshaler, 153 msg eventstream.Message, 154) error { 155 e.Message = msg.Clone() 156 return nil 157} 158`)) 159