1// +build codegen 2 3package api 4 5import ( 6 "bytes" 7 "fmt" 8 "text/template" 9) 10 11// EventStreamAPI provides details about the event stream async API and 12// associated EventStream shapes. 13type EventStreamAPI struct { 14 API *API 15 Operation *Operation 16 Name string 17 InputStream *EventStream 18 OutputStream *EventStream 19 RequireHTTP2 bool 20 21 // The eventstream generated code was generated with an older model that 22 // does not scale with bi-directional models. This drives the need to 23 // expose the output shape's event stream member as an exported member. 24 Legacy bool 25} 26 27func (es *EventStreamAPI) StreamInputEventTypeGetterName() string { 28 return "eventTypeFor" + es.Name + "InputEvent" 29} 30func (es *EventStreamAPI) StreamOutputUnmarshalerForEventName() string { 31 return "eventTypeFor" + es.Name + "OutputEvent" 32} 33 34// EventStream represents a single eventstream group (input/output) and the 35// modeled events that are known for the stream. 36type EventStream struct { 37 Name string 38 Shape *Shape 39 Events []*Event 40 Exceptions []*Event 41} 42 43func (es *EventStream) EventGroupName() string { 44 return es.Name + "Event" 45} 46 47func (es *EventStream) StreamWriterAPIName() string { 48 return es.Name + "Writer" 49} 50 51func (es *EventStream) StreamWriterImplName() string { 52 return "write" + es.Name 53} 54 55func (es *EventStream) StreamEventTypeGetterName() string { 56 return "eventTypeFor" + es.Name + "Event" 57} 58 59func (es *EventStream) StreamReaderAPIName() string { 60 return es.Name + "Reader" 61} 62 63func (es *EventStream) StreamReaderImplName() string { 64 return "read" + es.Name 65} 66func (es *EventStream) StreamReaderImplConstructorName() string { 67 return "newRead" + es.Name 68} 69 70func (es *EventStream) StreamUnmarshalerForEventName() string { 71 return "unmarshalerFor" + es.Name + "Event" 72} 73 74func (es *EventStream) StreamUnknownEventName() string { 75 return es.Name + "UnknownEvent" 76} 77 78// Event is a single EventStream event that can be sent or received in an 79// EventStream. 80type Event struct { 81 Name string 82 Shape *Shape 83 For *EventStream 84 Private bool 85} 86 87// ShapeDoc returns the docstring for the EventStream API. 88func (esAPI *EventStreamAPI) ShapeDoc() string { 89 tmpl := template.Must(template.New("eventStreamShapeDoc").Parse(` 90{{- $.Name }} provides handling of EventStreams for 91the {{ $.Operation.ExportedName }} API. 92 93{{- if $.OutputStream }} 94 95Use this type to receive {{ $.OutputStream.Name }} events. The events 96can be read from the stream. 97 98The events that can be received are: 99{{- range $_, $event := $.OutputStream.Events }} 100 * {{ $event.Shape.ShapeName }} 101{{- end }} 102 103{{- end }} 104 105{{- if $.InputStream }} 106 107Use this type to send {{ $.InputStream.Name }} events. The events 108can be written to the stream. 109 110The events that can be sent are: 111{{ range $_, $event := $.InputStream.Events -}} 112 * {{ $event.Shape.ShapeName }} 113{{- end }} 114 115{{- end }}`)) 116 117 var w bytes.Buffer 118 if err := tmpl.Execute(&w, esAPI); err != nil { 119 panic(fmt.Sprintf("failed to generate eventstream shape template for %v, %v", 120 esAPI.Operation.ExportedName, err)) 121 } 122 123 return commentify(w.String()) 124} 125 126func hasEventStream(topShape *Shape) bool { 127 for _, ref := range topShape.MemberRefs { 128 if ref.Shape.IsEventStream { 129 return true 130 } 131 } 132 133 return false 134} 135 136func eventStreamAPIShapeRefDoc(refName string) string { 137 return commentify(fmt.Sprintf("Use %s to use the API's stream.", refName)) 138} 139 140func (a *API) setupEventStreams() error { 141 streams := EventStreams{} 142 143 for opName, op := range a.Operations { 144 inputRef := getEventStreamMember(op.InputRef.Shape) 145 outputRef := getEventStreamMember(op.OutputRef.Shape) 146 147 if inputRef == nil && outputRef == nil { 148 continue 149 } 150 if inputRef != nil && outputRef == nil { 151 return fmt.Errorf("event stream input only stream not supported for protocol %s, %s, %v", 152 a.NiceName(), opName, a.Metadata.Protocol) 153 } 154 switch a.Metadata.Protocol { 155 case `rest-json`, `rest-xml`, `json`: 156 default: 157 return UnsupportedAPIModelError{ 158 Err: fmt.Errorf("EventStream not supported for protocol %s, %s, %v", 159 a.NiceName(), opName, a.Metadata.Protocol), 160 } 161 } 162 163 var inputStream *EventStream 164 if inputRef != nil { 165 inputStream = streams.GetStream(op.InputRef.Shape, inputRef.Shape) 166 inputStream.Shape.IsInputEventStream = true 167 } 168 169 var outputStream *EventStream 170 if outputRef != nil { 171 outputStream = streams.GetStream(op.OutputRef.Shape, outputRef.Shape) 172 outputStream.Shape.IsOutputEventStream = true 173 } 174 175 requireHTTP2 := op.API.Metadata.ProtocolSettings.HTTP2 == "eventstream" && 176 inputStream != nil && outputStream != nil 177 178 a.HasEventStream = true 179 op.EventStreamAPI = &EventStreamAPI{ 180 API: a, 181 Operation: op, 182 Name: op.ExportedName + "EventStream", 183 InputStream: inputStream, 184 OutputStream: outputStream, 185 Legacy: isLegacyEventStream(op), 186 RequireHTTP2: requireHTTP2, 187 } 188 op.OutputRef.Shape.OutputEventStreamAPI = op.EventStreamAPI 189 190 if s, ok := a.Shapes[op.EventStreamAPI.Name]; ok { 191 newName := op.EventStreamAPI.Name + "Data" 192 if _, ok := a.Shapes[newName]; ok { 193 panic(fmt.Sprintf( 194 "%s: attempting to rename %s to %s, but shape with that name already exists", 195 a.NiceName(), op.EventStreamAPI.Name, newName)) 196 } 197 s.Rename(newName) 198 } 199 } 200 201 return nil 202} 203 204// EventStreams is a map of streams for the API shared across all operations. 205// Ensurs that no stream is duplicated. 206type EventStreams map[*Shape]*EventStream 207 208// GetStream returns an EventStream for the operations top level shape, and 209// member reference to the stream shape. 210func (es *EventStreams) GetStream(topShape *Shape, streamShape *Shape) *EventStream { 211 var stream *EventStream 212 if v, ok := (*es)[streamShape]; ok { 213 stream = v 214 } else { 215 stream = setupEventStream(streamShape) 216 (*es)[streamShape] = stream 217 } 218 219 if topShape.API.Metadata.Protocol == "json" { 220 if topShape.EventFor == nil { 221 topShape.EventFor = map[string]*EventStream{} 222 } 223 topShape.EventFor[stream.Name] = stream 224 } 225 226 return stream 227} 228 229var legacyEventStream = map[string]map[string]struct{}{ 230 "s3": { 231 "SelectObjectContent": struct{}{}, 232 }, 233 "kinesis": { 234 "SubscribeToShard": struct{}{}, 235 }, 236} 237 238func isLegacyEventStream(op *Operation) bool { 239 if s, ok := legacyEventStream[op.API.PackageName()]; ok { 240 if _, ok = s[op.ExportedName]; ok { 241 return true 242 } 243 } 244 return false 245} 246 247func (e EventStreamAPI) OutputMemberName() string { 248 if e.Legacy { 249 return "EventStream" 250 } 251 252 return "eventStream" 253} 254 255func getEventStreamMember(topShape *Shape) *ShapeRef { 256 for _, ref := range topShape.MemberRefs { 257 if !ref.Shape.IsEventStream { 258 continue 259 } 260 return ref 261 } 262 263 return nil 264} 265 266func setupEventStream(s *Shape) *EventStream { 267 eventStream := &EventStream{ 268 Name: s.ShapeName, 269 Shape: s, 270 } 271 s.EventStream = eventStream 272 273 for _, eventRefName := range s.MemberNames() { 274 eventRef := s.MemberRefs[eventRefName] 275 if !(eventRef.Shape.IsEvent || eventRef.Shape.Exception) { 276 panic(fmt.Sprintf("unexpected non-event member reference %s.%s", 277 s.ShapeName, eventRefName)) 278 } 279 280 updateEventPayloadRef(eventRef.Shape) 281 282 if eventRef.Shape.EventFor == nil { 283 eventRef.Shape.EventFor = map[string]*EventStream{} 284 } 285 eventRef.Shape.EventFor[eventStream.Name] = eventStream 286 287 // Exceptions and events are two different lists to allow the SDK 288 // to easily generate code with the two handled differently. 289 event := &Event{ 290 Name: eventRefName, 291 Shape: eventRef.Shape, 292 For: eventStream, 293 } 294 if eventRef.Shape.Exception { 295 eventStream.Exceptions = append(eventStream.Exceptions, event) 296 } else { 297 eventStream.Events = append(eventStream.Events, event) 298 } 299 } 300 301 return eventStream 302} 303 304func updateEventPayloadRef(parent *Shape) { 305 refName := parent.PayloadRefName() 306 if len(refName) == 0 { 307 return 308 } 309 310 payloadRef := parent.MemberRefs[refName] 311 if payloadRef.Shape.Type == "blob" { 312 return 313 } 314 315 if len(payloadRef.LocationName) != 0 { 316 return 317 } 318 319 payloadRef.LocationName = refName 320} 321