1//go:build codegen 2// +build codegen 3 4package api 5 6import ( 7 "fmt" 8 "io" 9 "strings" 10 "text/template" 11) 12 13func renderEventStreamAPI(w io.Writer, op *Operation) error { 14 // Imports needed by the EventStream APIs. 15 op.API.AddImport("fmt") 16 op.API.AddImport("bytes") 17 op.API.AddImport("io") 18 op.API.AddImport("time") 19 op.API.AddSDKImport("aws") 20 op.API.AddSDKImport("aws/awserr") 21 op.API.AddSDKImport("aws/request") 22 op.API.AddSDKImport("private/protocol/eventstream") 23 op.API.AddSDKImport("private/protocol/eventstream/eventstreamapi") 24 25 w.Write([]byte(` 26var _ awserr.Error 27`)) 28 29 return eventStreamAPITmpl.Execute(w, op) 30} 31 32// Template for an EventStream API Shape that will provide read/writing events 33// across the EventStream. This is a special shape that's only public members 34// are the Events channel and a Close and Err method. 35// 36// Executed in the context of a Shape. 37var eventStreamAPITmpl = template.Must( 38 template.New("eventStreamAPITmplDef"). 39 Funcs(template.FuncMap{ 40 "unexported": func(v string) string { 41 return strings.ToLower(string(v[0])) + v[1:] 42 }, 43 }). 44 Parse(eventStreamAPITmplDef), 45) 46 47const eventStreamAPITmplDef = ` 48{{- $esapi := $.EventStreamAPI }} 49{{- $outputStream := $esapi.OutputStream }} 50{{- $inputStream := $esapi.InputStream }} 51 52// {{ $esapi.Name }} provides the event stream handling for the {{ $.ExportedName }}. 53// 54// For testing and mocking the event stream this type should be initialized via 55// the New{{ $esapi.Name }} constructor function. Using the functional options 56// to pass in nested mock behavior. 57type {{ $esapi.Name }} struct { 58 {{- if $inputStream }} 59 60 // Writer is the EventStream writer for the {{ $inputStream.Name }} 61 // events. This value is automatically set by the SDK when the API call is made 62 // Use this member when unit testing your code with the SDK to mock out the 63 // EventStream Writer. 64 // 65 // Must not be nil. 66 Writer {{ $inputStream.StreamWriterAPIName }} 67 68 inputWriter io.WriteCloser 69 {{- if eq .API.Metadata.Protocol "json" }} 70 input {{ $.InputRef.GoType }} 71 {{- end }} 72 {{- end }} 73 74 {{- if $outputStream }} 75 76 // Reader is the EventStream reader for the {{ $outputStream.Name }} 77 // events. This value is automatically set by the SDK when the API call is made 78 // Use this member when unit testing your code with the SDK to mock out the 79 // EventStream Reader. 80 // 81 // Must not be nil. 82 Reader {{ $outputStream.StreamReaderAPIName }} 83 84 outputReader io.ReadCloser 85 {{- if eq .API.Metadata.Protocol "json" }} 86 output {{ $.OutputRef.GoType }} 87 {{- end }} 88 {{- end }} 89 90 {{- if $esapi.Legacy }} 91 92 // StreamCloser is the io.Closer for the EventStream connection. For HTTP 93 // EventStream this is the response Body. The stream will be closed when 94 // the Close method of the EventStream is called. 95 StreamCloser io.Closer 96 {{- end }} 97 98 done chan struct{} 99 closeOnce sync.Once 100 err *eventstreamapi.OnceError 101} 102 103// New{{ $esapi.Name }} initializes an {{ $esapi.Name }}. 104// This function should only be used for testing and mocking the {{ $esapi.Name }} 105// stream within your application. 106{{- if $inputStream }} 107// 108// The Writer member must be set before writing events to the stream. 109{{- end }} 110{{- if $outputStream }} 111// 112// The Reader member must be set before reading events from the stream. 113{{- end }} 114{{- if $esapi.Legacy }} 115// 116// The StreamCloser member should be set to the underlying io.Closer, 117// (e.g. http.Response.Body), that will be closed when the stream Close method 118// is called. 119{{- end }} 120// 121// es := New{{ $esapi.Name }}(func(o *{{ $esapi.Name}}{ 122{{- if $inputStream }} 123// es.Writer = myMockStreamWriter 124{{- end }} 125{{- if $outputStream }} 126// es.Reader = myMockStreamReader 127{{- end }} 128{{- if $esapi.Legacy }} 129// es.StreamCloser = myMockStreamCloser 130{{- end }} 131// }) 132func New{{ $esapi.Name }}(opts ...func(*{{ $esapi.Name}})) *{{ $esapi.Name }} { 133 es := &{{ $esapi.Name }} { 134 done: make(chan struct{}), 135 err: eventstreamapi.NewOnceError(), 136 } 137 138 for _, fn := range opts { 139 fn(es) 140 } 141 142 return es 143} 144 145{{- if $esapi.Legacy }} 146 147 func (es *{{ $esapi.Name }}) setStreamCloser(r *request.Request) { 148 es.StreamCloser = r.HTTPResponse.Body 149 } 150{{- end }} 151 152func (es *{{ $esapi.Name }}) runOnStreamPartClose(r *request.Request) { 153 if es.done == nil { 154 return 155 } 156 go es.waitStreamPartClose() 157 158} 159 160func (es *{{ $esapi.Name }}) waitStreamPartClose() { 161 {{- if $inputStream }} 162 var inputErrCh <-chan struct{} 163 if v, ok := es.Writer.(interface{ErrorSet() <-chan struct{}}); ok { 164 inputErrCh = v.ErrorSet() 165 } 166 {{- end }} 167 {{- if $outputStream }} 168 var outputErrCh <-chan struct{} 169 if v, ok := es.Reader.(interface{ErrorSet() <-chan struct{}}); ok { 170 outputErrCh = v.ErrorSet() 171 } 172 var outputClosedCh <- chan struct{} 173 if v, ok := es.Reader.(interface{Closed() <-chan struct{}}); ok { 174 outputClosedCh = v.Closed() 175 } 176 {{- end }} 177 178 select { 179 case <-es.done: 180 181 {{- if $inputStream }} 182 case <-inputErrCh: 183 es.err.SetError(es.Writer.Err()) 184 es.Close() 185 {{- end }} 186 187 {{- if $outputStream }} 188 case <-outputErrCh: 189 es.err.SetError(es.Reader.Err()) 190 es.Close() 191 case <-outputClosedCh: 192 if err := es.Reader.Err(); err != nil { 193 es.err.SetError(es.Reader.Err()) 194 } 195 es.Close() 196 {{- end }} 197 } 198} 199 200{{- if $inputStream }} 201 202 {{- if eq .API.Metadata.Protocol "json" }} 203 204 func {{ $esapi.StreamInputEventTypeGetterName }}(event {{ $inputStream.EventGroupName }}) (string, error) { 205 if _, ok := event.({{ $.InputRef.GoType }}); ok { 206 return "initial-request", nil 207 } 208 return {{ $inputStream.StreamEventTypeGetterName }}(event) 209 } 210 {{- end }} 211 212 func (es *{{ $esapi.Name }}) setupInputPipe(r *request.Request) { 213 inputReader, inputWriter := io.Pipe() 214 r.SetStreamingBody(inputReader) 215 es.inputWriter = inputWriter 216 } 217 218 // Closes the input-pipe writer 219 func (es *{{ $esapi.Name }}) closeInputPipe() error { 220 if es.inputWriter != nil { 221 return es.inputWriter.Close() 222 } 223 return nil 224 } 225 226 // Send writes the event to the stream blocking until the event is written. 227 // Returns an error if the event was not written. 228 // 229 // These events are: 230 // {{ range $_, $event := $inputStream.Events }} 231 // * {{ $event.Shape.ShapeName }} 232 {{- end }} 233 func (es *{{ $esapi.Name }}) Send(ctx aws.Context, event {{ $inputStream.EventGroupName }}) error { 234 return es.Writer.Send(ctx, event) 235 } 236 237 func (es *{{ $esapi.Name }}) runInputStream(r *request.Request) { 238 var opts []func(*eventstream.Encoder) 239 if r.Config.Logger != nil && r.Config.LogLevel.Matches(aws.LogDebugWithEventStreamBody) { 240 opts = append(opts, eventstream.EncodeWithLogger(r.Config.Logger)) 241 } 242 var encoder eventstreamapi.Encoder = eventstream.NewEncoder(es.inputWriter, opts...) 243 244 var closer aws.MultiCloser 245 {{- if $.ShouldSignRequestBody }} 246 {{- $_ := $.API.AddSDKImport "aws/signer/v4" }} 247 sigSeed, err := v4.GetSignedRequestSignature(r.HTTPRequest) 248 if err != nil { 249 r.Error = awserr.New(request.ErrCodeSerialization, 250 "unable to get initial request's signature", err) 251 return 252 } 253 signer := eventstreamapi.NewSignEncoder( 254 v4.NewStreamSigner(r.ClientInfo.SigningRegion, r.ClientInfo.SigningName, 255 sigSeed, r.Config.Credentials), 256 encoder, 257 ) 258 encoder = signer 259 closer = append(closer, signer) 260 {{- end }} 261 closer = append(closer, es.inputWriter) 262 263 eventWriter := eventstreamapi.NewEventWriter(encoder, 264 protocol.HandlerPayloadMarshal{ 265 Marshalers: r.Handlers.BuildStream, 266 }, 267 {{- if eq .API.Metadata.Protocol "json" }} 268 {{ $esapi.StreamInputEventTypeGetterName }}, 269 {{- else }} 270 {{ $inputStream.StreamEventTypeGetterName }}, 271 {{- end }} 272 ) 273 274 es.Writer = &{{ $inputStream.StreamWriterImplName }}{ 275 StreamWriter: eventstreamapi.NewStreamWriter(eventWriter, closer), 276 } 277 } 278 279 {{- if eq .API.Metadata.Protocol "json" }} 280 func (es *{{ $esapi.Name }}) sendInitialEvent(r *request.Request) { 281 if err := es.Send(es.input); err != nil { 282 r.Error = err 283 } 284 } 285 {{- end }} 286{{- end }} 287 288{{- if $outputStream }} 289 {{- if eq .API.Metadata.Protocol "json" }} 290 291 type {{ $esapi.StreamOutputUnmarshalerForEventName }} struct { 292 unmarshalerForEvent func(string) (eventstreamapi.Unmarshaler, error) 293 output {{ $.OutputRef.GoType }} 294 } 295 func (e {{ $esapi.StreamOutputUnmarshalerForEventName }}) UnmarshalerForEventName(eventType string) (eventstreamapi.Unmarshaler, error) { 296 if eventType == "initial-response" { 297 return e.output, nil 298 } 299 return e.unmarshalerForEvent(eventType) 300 } 301 {{- end }} 302 303 // Events returns a channel to read events from. 304 // 305 // These events are: 306 // {{ range $_, $event := $outputStream.Events }} 307 // * {{ $event.Shape.ShapeName }} 308 {{- end }} 309 // * {{ $outputStream.StreamUnknownEventName }} 310 func (es *{{ $esapi.Name }}) Events() <-chan {{ $outputStream.EventGroupName }} { 311 return es.Reader.Events() 312 } 313 314 func (es *{{ $esapi.Name }}) runOutputStream(r *request.Request) { 315 var opts []func(*eventstream.Decoder) 316 if r.Config.Logger != nil && r.Config.LogLevel.Matches(aws.LogDebugWithEventStreamBody) { 317 opts = append(opts, eventstream.DecodeWithLogger(r.Config.Logger)) 318 } 319 320 unmarshalerForEvent := {{ $outputStream.StreamUnmarshalerForEventName }}{ 321 metadata: protocol.ResponseMetadata{ 322 StatusCode: r.HTTPResponse.StatusCode, 323 RequestID: r.RequestID, 324 }, 325 }.UnmarshalerForEventName 326 {{- if eq .API.Metadata.Protocol "json" }} 327 unmarshalerForEvent = {{ $esapi.StreamOutputUnmarshalerForEventName }}{ 328 unmarshalerForEvent: unmarshalerForEvent, 329 output: es.output, 330 }.UnmarshalerForEventName 331 {{- end }} 332 333 decoder := eventstream.NewDecoder(r.HTTPResponse.Body, opts...) 334 eventReader := eventstreamapi.NewEventReader(decoder, 335 protocol.HandlerPayloadUnmarshal{ 336 Unmarshalers: r.Handlers.UnmarshalStream, 337 }, 338 unmarshalerForEvent, 339 ) 340 341 es.outputReader = r.HTTPResponse.Body 342 es.Reader = {{ $outputStream.StreamReaderImplConstructorName }}(eventReader) 343 } 344 345 {{- if eq .API.Metadata.Protocol "json" }} 346 func (es *{{ $esapi.Name }}) recvInitialEvent(r *request.Request) { 347 // Wait for the initial response event, which must be the first 348 // event to be received from the API. 349 select { 350 case event, ok := <- es.Events(): 351 if !ok { 352 return 353 } 354 355 v, ok := event.({{ $.OutputRef.GoType }}) 356 if !ok || v == nil { 357 r.Error = awserr.New( 358 request.ErrCodeSerialization, 359 fmt.Sprintf("invalid event, %T, expect %T, %v", 360 event, ({{ $.OutputRef.GoType }})(nil), v), 361 nil, 362 ) 363 return 364 } 365 366 *es.output = *v 367 es.output.{{ $.EventStreamAPI.OutputMemberName }} = es 368 } 369 } 370 {{- end }} 371{{- end }} 372 373// Close closes the stream. This will also cause the stream to be closed. 374// Close must be called when done using the stream API. Not calling Close 375// may result in resource leaks. 376{{- if $inputStream }} 377// 378// Will close the underlying EventStream writer, and no more events can be 379// sent. 380{{- end }} 381{{- if $outputStream }} 382// 383// You can use the closing of the Reader's Events channel to terminate your 384// application's read from the API's stream. 385{{- end }} 386// 387func (es *{{ $esapi.Name }}) Close() (err error) { 388 es.closeOnce.Do(es.safeClose) 389 return es.Err() 390} 391 392func (es *{{ $esapi.Name }}) safeClose() { 393 if es.done != nil { 394 close(es.done) 395 } 396 397 {{- if $inputStream }} 398 399 t := time.NewTicker(time.Second) 400 defer t.Stop() 401 writeCloseDone := make(chan error) 402 go func() { 403 if err := es.Writer.Close(); err != nil { 404 es.err.SetError(err) 405 } 406 close(writeCloseDone) 407 }() 408 select { 409 case <-t.C: 410 case <-writeCloseDone: 411 } 412 if err := es.closeInputPipe(); err != nil { 413 es.err.SetError(err) 414 } 415 {{- end }} 416 417 {{- if $outputStream }} 418 419 es.Reader.Close() 420 if es.outputReader != nil { 421 es.outputReader.Close() 422 } 423 {{- end }} 424 425 {{- if $esapi.Legacy }} 426 427 es.StreamCloser.Close() 428 {{- end }} 429} 430 431// Err returns any error that occurred while reading or writing EventStream 432// Events from the service API's response. Returns nil if there were no errors. 433func (es *{{ $esapi.Name }}) Err() error { 434 if err := es.err.Err(); err != nil { 435 return err 436 } 437 438 {{- if $inputStream }} 439 if err := es.Writer.Err(); err != nil { 440 return err 441 } 442 {{- end }} 443 444 {{- if $outputStream }} 445 if err := es.Reader.Err(); err != nil { 446 return err 447 } 448 {{- end }} 449 450 return nil 451} 452` 453 454func renderEventStreamShape(w io.Writer, s *Shape) error { 455 // Imports needed by the EventStream APIs. 456 s.API.AddImport("fmt") 457 s.API.AddImport("bytes") 458 s.API.AddImport("io") 459 s.API.AddImport("sync") 460 s.API.AddSDKImport("aws") 461 s.API.AddSDKImport("aws/awserr") 462 s.API.AddSDKImport("private/protocol/eventstream") 463 s.API.AddSDKImport("private/protocol/eventstream/eventstreamapi") 464 465 return eventStreamShapeTmpl.Execute(w, s) 466} 467 468var eventStreamShapeTmpl = func() *template.Template { 469 t := template.Must( 470 template.New("eventStreamShapeTmplDef"). 471 Parse(eventStreamShapeTmplDef), 472 ) 473 template.Must( 474 t.AddParseTree( 475 "eventStreamShapeWriterTmpl", eventStreamShapeWriterTmpl.Tree), 476 ) 477 template.Must( 478 t.AddParseTree( 479 "eventStreamShapeReaderTmpl", eventStreamShapeReaderTmpl.Tree), 480 ) 481 482 return t 483}() 484 485const eventStreamShapeTmplDef = ` 486{{- $eventStream := $.EventStream }} 487{{- $eventStreamEventGroup := printf "%sEvent" $eventStream.Name }} 488 489// {{ $eventStreamEventGroup }} groups together all EventStream 490// events writes for {{ $eventStream.Name }}. 491// 492// These events are: 493// {{ range $_, $event := $eventStream.Events }} 494// * {{ $event.Shape.ShapeName }} 495{{- end }} 496type {{ $eventStreamEventGroup }} interface { 497 event{{ $eventStream.Name }}() 498 eventstreamapi.Marshaler 499 eventstreamapi.Unmarshaler 500} 501 502{{- if $.IsInputEventStream }} 503 {{- template "eventStreamShapeWriterTmpl" $ }} 504{{- end }} 505 506{{- if $.IsOutputEventStream }} 507 {{- template "eventStreamShapeReaderTmpl" $ }} 508{{- end }} 509` 510 511// EventStreamHeaderTypeMap provides the mapping of a EventStream Header's 512// Value type to the shape reference's member type. 513type EventStreamHeaderTypeMap struct { 514 Header string 515 Member string 516} 517 518// Returns if the event has any members which are not the event's blob payload, 519// nor a header. 520func eventHasNonBlobPayloadMembers(s *Shape) bool { 521 num := len(s.MemberRefs) 522 for _, ref := range s.MemberRefs { 523 if ref.IsEventHeader || (ref.IsEventPayload && (ref.Shape.Type == "blob" || ref.Shape.Type == "string")) { 524 num-- 525 } 526 } 527 return num > 0 528} 529 530func setEventHeaderValueForType(s *Shape, memVar string) string { 531 switch s.Type { 532 case "blob": 533 return fmt.Sprintf("eventstream.BytesValue(%s)", memVar) 534 case "string": 535 return fmt.Sprintf("eventstream.StringValue(*%s)", memVar) 536 case "boolean": 537 return fmt.Sprintf("eventstream.BoolValue(*%s)", memVar) 538 case "byte": 539 return fmt.Sprintf("eventstream.Int8Value(int8(*%s))", memVar) 540 case "short": 541 return fmt.Sprintf("eventstream.Int16Value(int16(*%s))", memVar) 542 case "integer": 543 return fmt.Sprintf("eventstream.Int32Value(int32(*%s))", memVar) 544 case "long": 545 return fmt.Sprintf("eventstream.Int64Value(*%s)", memVar) 546 case "float": 547 return fmt.Sprintf("eventstream.Float32Value(float32(*%s))", memVar) 548 case "double": 549 return fmt.Sprintf("eventstream.Float64Value(*%s)", memVar) 550 case "timestamp": 551 return fmt.Sprintf("eventstream.TimestampValue(*%s)", memVar) 552 default: 553 panic(fmt.Sprintf("value type %s not supported for event headers, %s", s.Type, s.ShapeName)) 554 } 555} 556 557func shapeMessageType(s *Shape) string { 558 if s.Exception { 559 return "eventstreamapi.ExceptionMessageType" 560 } 561 return "eventstreamapi.EventMessageType" 562} 563 564var eventStreamEventShapeTmplFuncs = template.FuncMap{ 565 "EventStreamHeaderTypeMap": func(ref *ShapeRef) EventStreamHeaderTypeMap { 566 switch ref.Shape.Type { 567 case "boolean": 568 return EventStreamHeaderTypeMap{Header: "bool", Member: "bool"} 569 case "byte": 570 return EventStreamHeaderTypeMap{Header: "int8", Member: "int64"} 571 case "short": 572 return EventStreamHeaderTypeMap{Header: "int16", Member: "int64"} 573 case "integer": 574 return EventStreamHeaderTypeMap{Header: "int32", Member: "int64"} 575 case "long": 576 return EventStreamHeaderTypeMap{Header: "int64", Member: "int64"} 577 case "timestamp": 578 return EventStreamHeaderTypeMap{Header: "time.Time", Member: "time.Time"} 579 case "blob": 580 return EventStreamHeaderTypeMap{Header: "[]byte", Member: "[]byte"} 581 case "string": 582 return EventStreamHeaderTypeMap{Header: "string", Member: "string"} 583 case "uuid": 584 return EventStreamHeaderTypeMap{Header: "[]byte", Member: "[]byte"} 585 default: 586 panic("unsupported EventStream header type, " + ref.Shape.Type) 587 } 588 }, 589 "EventHeaderValueForType": setEventHeaderValueForType, 590 "ShapeMessageType": shapeMessageType, 591 "HasNonBlobPayloadMembers": eventHasNonBlobPayloadMembers, 592} 593 594// Template for an EventStream Event shape. This is a normal API shape that is 595// decorated as an EventStream Event. 596// 597// Executed in the context of a Shape. 598var eventStreamEventShapeTmpl = template.Must(template.New("eventStreamEventShapeTmpl"). 599 Funcs(eventStreamEventShapeTmplFuncs).Parse(` 600{{ range $_, $eventStream := $.EventFor }} 601 // The {{ $.ShapeName }} is and event in the {{ $eventStream.Name }} group of events. 602 func (s *{{ $.ShapeName }}) event{{ $eventStream.Name }}() {} 603{{ end }} 604 605// UnmarshalEvent unmarshals the EventStream Message into the {{ $.ShapeName }} value. 606// This method is only used internally within the SDK's EventStream handling. 607func (s *{{ $.ShapeName }}) UnmarshalEvent( 608 payloadUnmarshaler protocol.PayloadUnmarshaler, 609 msg eventstream.Message, 610) error { 611 {{- range $memName, $memRef := $.MemberRefs }} 612 {{- if $memRef.IsEventHeader }} 613 if hv := msg.Headers.Get("{{ $memName }}"); hv != nil { 614 {{ $types := EventStreamHeaderTypeMap $memRef -}} 615 v := hv.Get().({{ $types.Header }}) 616 {{- if ne $types.Header $types.Member }} 617 m := {{ $types.Member }}(v) 618 s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}m 619 {{- else }} 620 s.{{ $memName }} = {{ if $memRef.UseIndirection }}&{{ end }}v 621 {{- end }} 622 } 623 {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "blob")) }} 624 s.{{ $memName }} = make([]byte, len(msg.Payload)) 625 copy(s.{{ $memName }}, msg.Payload) 626 {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "string")) }} 627 s.{{ $memName }} = aws.String(string(msg.Payload)) 628 {{- end }} 629 {{- end }} 630 {{- if HasNonBlobPayloadMembers $ }} 631 if err := payloadUnmarshaler.UnmarshalPayload( 632 bytes.NewReader(msg.Payload), s, 633 ); err != nil { 634 return err 635 } 636 {{- end }} 637 return nil 638} 639 640// MarshalEvent marshals the type into an stream event value. This method 641// should only used internally within the SDK's EventStream handling. 642func (s *{{ $.ShapeName}}) MarshalEvent(pm protocol.PayloadMarshaler) (msg eventstream.Message, err error) { 643 msg.Headers.Set(eventstreamapi.MessageTypeHeader, eventstream.StringValue({{ ShapeMessageType $ }})) 644 645 {{- range $memName, $memRef := $.MemberRefs }} 646 {{- if $memRef.IsEventHeader }} 647 {{ $memVar := printf "s.%s" $memName -}} 648 {{ $typedMem := EventHeaderValueForType $memRef.Shape $memVar -}} 649 msg.Headers.Set("{{ $memName }}", {{ $typedMem }}) 650 {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "blob")) }} 651 msg.Headers.Set(":content-type", eventstream.StringValue("application/octet-stream")) 652 msg.Payload = s.{{ $memName }} 653 {{- else if (and ($memRef.IsEventPayload) (eq $memRef.Shape.Type "string")) }} 654 msg.Payload = []byte(aws.StringValue(s.{{ $memName }})) 655 {{- end }} 656 {{- end }} 657 {{- if HasNonBlobPayloadMembers $ }} 658 var buf bytes.Buffer 659 if err = pm.MarshalPayload(&buf, s); err != nil { 660 return eventstream.Message{}, err 661 } 662 msg.Payload = buf.Bytes() 663 {{- end }} 664 return msg, err 665} 666`)) 667