1// +build codegen
2
3package api
4
5import (
6	"bytes"
7	"fmt"
8	"text/template"
9)
10
11// EventStreamAPI provides details about the event stream async API and
12// associated EventStream shapes.
13type EventStreamAPI struct {
14	API          *API
15	Operation    *Operation
16	Name         string
17	InputStream  *EventStream
18	OutputStream *EventStream
19	RequireHTTP2 bool
20
21	// The eventstream generated code was generated with an older model that
22	// does not scale with bi-directional models. This drives the need to
23	// expose the output shape's event stream member as an exported member.
24	Legacy bool
25}
26
27func (es *EventStreamAPI) StreamInputEventTypeGetterName() string {
28	return "eventTypeFor" + es.Name + "InputEvent"
29}
30func (es *EventStreamAPI) StreamOutputUnmarshalerForEventName() string {
31	return "eventTypeFor" + es.Name + "OutputEvent"
32}
33
34// EventStream represents a single eventstream group (input/output) and the
35// modeled events that are known for the stream.
36type EventStream struct {
37	Name       string
38	Shape      *Shape
39	Events     []*Event
40	Exceptions []*Event
41}
42
43func (es *EventStream) EventGroupName() string {
44	return es.Name + "Event"
45}
46
47func (es *EventStream) StreamWriterAPIName() string {
48	return es.Name + "Writer"
49}
50
51func (es *EventStream) StreamWriterImplName() string {
52	return "write" + es.Name
53}
54
55func (es *EventStream) StreamEventTypeGetterName() string {
56	return "eventTypeFor" + es.Name + "Event"
57}
58
59func (es *EventStream) StreamReaderAPIName() string {
60	return es.Name + "Reader"
61}
62
63func (es *EventStream) StreamReaderImplName() string {
64	return "read" + es.Name
65}
66func (es *EventStream) StreamReaderImplConstructorName() string {
67	return "newRead" + es.Name
68}
69
70func (es *EventStream) StreamUnmarshalerForEventName() string {
71	return "unmarshalerFor" + es.Name + "Event"
72}
73
74func (es *EventStream) StreamUnknownEventName() string {
75	return es.Name + "UnknownEvent"
76}
77
78// Event is a single EventStream event that can be sent or received in an
79// EventStream.
80type Event struct {
81	Name    string
82	Shape   *Shape
83	For     *EventStream
84	Private bool
85}
86
87// ShapeDoc returns the docstring for the EventStream API.
88func (esAPI *EventStreamAPI) ShapeDoc() string {
89	tmpl := template.Must(template.New("eventStreamShapeDoc").Parse(`
90{{- $.Name }} provides handling of EventStreams for
91the {{ $.Operation.ExportedName }} API.
92
93{{- if $.OutputStream }}
94
95Use this type to receive {{ $.OutputStream.Name }} events. The events
96can be read from the stream.
97
98The events that can be received are:
99{{- range $_, $event := $.OutputStream.Events }}
100    * {{ $event.Shape.ShapeName }}
101{{- end }}
102
103{{- end }}
104
105{{- if $.InputStream }}
106
107Use this type to send {{ $.InputStream.Name }} events. The events
108can be written to the stream.
109
110The events that can be sent are:
111{{ range $_, $event := $.InputStream.Events -}}
112    * {{ $event.Shape.ShapeName }}
113{{- end }}
114
115{{- end }}`))
116
117	var w bytes.Buffer
118	if err := tmpl.Execute(&w, esAPI); err != nil {
119		panic(fmt.Sprintf("failed to generate eventstream shape template for %v, %v",
120			esAPI.Operation.ExportedName, err))
121	}
122
123	return commentify(w.String())
124}
125
126func hasEventStream(topShape *Shape) bool {
127	for _, ref := range topShape.MemberRefs {
128		if ref.Shape.IsEventStream {
129			return true
130		}
131	}
132
133	return false
134}
135
136func eventStreamAPIShapeRefDoc(refName string) string {
137	return commentify(fmt.Sprintf("Use %s to use the API's stream.", refName))
138}
139
140func (a *API) setupEventStreams() error {
141	streams := EventStreams{}
142
143	for opName, op := range a.Operations {
144		inputRef := getEventStreamMember(op.InputRef.Shape)
145		outputRef := getEventStreamMember(op.OutputRef.Shape)
146
147		if inputRef == nil && outputRef == nil {
148			continue
149		}
150		if inputRef != nil && outputRef == nil {
151			return fmt.Errorf("event stream input only stream not supported for protocol %s, %s, %v",
152				a.NiceName(), opName, a.Metadata.Protocol)
153		}
154		switch a.Metadata.Protocol {
155		case `rest-json`, `rest-xml`, `json`:
156		default:
157			return UnsupportedAPIModelError{
158				Err: fmt.Errorf("EventStream not supported for protocol %s, %s, %v",
159					a.NiceName(), opName, a.Metadata.Protocol),
160			}
161		}
162
163		var inputStream *EventStream
164		if inputRef != nil {
165			inputStream = streams.GetStream(op.InputRef.Shape, inputRef.Shape)
166			inputStream.Shape.IsInputEventStream = true
167		}
168
169		var outputStream *EventStream
170		if outputRef != nil {
171			outputStream = streams.GetStream(op.OutputRef.Shape, outputRef.Shape)
172			outputStream.Shape.IsOutputEventStream = true
173		}
174
175		requireHTTP2 := op.API.Metadata.ProtocolSettings.HTTP2 == "eventstream" &&
176			inputStream != nil && outputStream != nil
177
178		a.HasEventStream = true
179		op.EventStreamAPI = &EventStreamAPI{
180			API:          a,
181			Operation:    op,
182			Name:         op.ExportedName + "EventStream",
183			InputStream:  inputStream,
184			OutputStream: outputStream,
185			Legacy:       isLegacyEventStream(op),
186			RequireHTTP2: requireHTTP2,
187		}
188		op.OutputRef.Shape.OutputEventStreamAPI = op.EventStreamAPI
189
190		if s, ok := a.Shapes[op.EventStreamAPI.Name]; ok {
191			newName := op.EventStreamAPI.Name + "Data"
192			if _, ok := a.Shapes[newName]; ok {
193				panic(fmt.Sprintf(
194					"%s: attempting to rename %s to %s, but shape with that name already exists",
195					a.NiceName(), op.EventStreamAPI.Name, newName))
196			}
197			s.Rename(newName)
198		}
199	}
200
201	return nil
202}
203
204// EventStreams is a map of streams for the API shared across all operations.
205// Ensurs that no stream is duplicated.
206type EventStreams map[*Shape]*EventStream
207
208// GetStream returns an EventStream for the operations top level shape, and
209// member reference to the stream shape.
210func (es *EventStreams) GetStream(topShape *Shape, streamShape *Shape) *EventStream {
211	var stream *EventStream
212	if v, ok := (*es)[streamShape]; ok {
213		stream = v
214	} else {
215		stream = setupEventStream(streamShape)
216		(*es)[streamShape] = stream
217	}
218
219	if topShape.API.Metadata.Protocol == "json" {
220		if topShape.EventFor == nil {
221			topShape.EventFor = map[string]*EventStream{}
222		}
223		topShape.EventFor[stream.Name] = stream
224	}
225
226	return stream
227}
228
229var legacyEventStream = map[string]map[string]struct{}{
230	"s3": {
231		"SelectObjectContent": struct{}{},
232	},
233	"kinesis": {
234		"SubscribeToShard": struct{}{},
235	},
236}
237
238func isLegacyEventStream(op *Operation) bool {
239	if s, ok := legacyEventStream[op.API.PackageName()]; ok {
240		if _, ok = s[op.ExportedName]; ok {
241			return true
242		}
243	}
244	return false
245}
246
247func (e EventStreamAPI) OutputMemberName() string {
248	if e.Legacy {
249		return "EventStream"
250	}
251
252	return "eventStream"
253}
254
255func getEventStreamMember(topShape *Shape) *ShapeRef {
256	for _, ref := range topShape.MemberRefs {
257		if !ref.Shape.IsEventStream {
258			continue
259		}
260		return ref
261	}
262
263	return nil
264}
265
266func setupEventStream(s *Shape) *EventStream {
267	eventStream := &EventStream{
268		Name:  s.ShapeName,
269		Shape: s,
270	}
271	s.EventStream = eventStream
272
273	for _, eventRefName := range s.MemberNames() {
274		eventRef := s.MemberRefs[eventRefName]
275		if !(eventRef.Shape.IsEvent || eventRef.Shape.Exception) {
276			panic(fmt.Sprintf("unexpected non-event member reference %s.%s",
277				s.ShapeName, eventRefName))
278		}
279
280		updateEventPayloadRef(eventRef.Shape)
281
282		if eventRef.Shape.EventFor == nil {
283			eventRef.Shape.EventFor = map[string]*EventStream{}
284		}
285		eventRef.Shape.EventFor[eventStream.Name] = eventStream
286
287		// Exceptions and events are two different lists to allow the SDK
288		// to easily generate code with the two handled differently.
289		event := &Event{
290			Name:  eventRefName,
291			Shape: eventRef.Shape,
292			For:   eventStream,
293		}
294		if eventRef.Shape.Exception {
295			eventStream.Exceptions = append(eventStream.Exceptions, event)
296		} else {
297			eventStream.Events = append(eventStream.Events, event)
298		}
299	}
300
301	return eventStream
302}
303
304func updateEventPayloadRef(parent *Shape) {
305	refName := parent.PayloadRefName()
306	if len(refName) == 0 {
307		return
308	}
309
310	payloadRef := parent.MemberRefs[refName]
311	if payloadRef.Shape.Type == "blob" {
312		return
313	}
314
315	if len(payloadRef.LocationName) != 0 {
316		return
317	}
318
319	payloadRef.LocationName = refName
320}
321