1package eventstream 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "hash" 7 "hash/crc32" 8 "io" 9) 10 11// Encoder provides EventStream message encoding. 12type Encoder struct { 13 w io.Writer 14 15 headersBuf *bytes.Buffer 16} 17 18// NewEncoder initializes and returns an Encoder to encode Event Stream 19// messages to an io.Writer. 20func NewEncoder(w io.Writer) *Encoder { 21 return &Encoder{ 22 w: w, 23 headersBuf: bytes.NewBuffer(nil), 24 } 25} 26 27// Encode encodes a single EventStream message to the io.Writer the Encoder 28// was created with. An error is returned if writing the message fails. 29func (e *Encoder) Encode(msg Message) error { 30 e.headersBuf.Reset() 31 32 err := encodeHeaders(e.headersBuf, msg.Headers) 33 if err != nil { 34 return err 35 } 36 37 crc := crc32.New(crc32IEEETable) 38 hashWriter := io.MultiWriter(e.w, crc) 39 40 headersLen := uint32(e.headersBuf.Len()) 41 payloadLen := uint32(len(msg.Payload)) 42 43 if err := encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil { 44 return err 45 } 46 47 if headersLen > 0 { 48 if _, err := io.Copy(hashWriter, e.headersBuf); err != nil { 49 return err 50 } 51 } 52 53 if payloadLen > 0 { 54 if _, err := hashWriter.Write(msg.Payload); err != nil { 55 return err 56 } 57 } 58 59 msgCRC := crc.Sum32() 60 return binary.Write(e.w, binary.BigEndian, msgCRC) 61} 62 63func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error { 64 p := messagePrelude{ 65 Length: minMsgLen + headersLen + payloadLen, 66 HeadersLen: headersLen, 67 } 68 if err := p.ValidateLens(); err != nil { 69 return err 70 } 71 72 err := binaryWriteFields(w, binary.BigEndian, 73 p.Length, 74 p.HeadersLen, 75 ) 76 if err != nil { 77 return err 78 } 79 80 p.PreludeCRC = crc.Sum32() 81 err = binary.Write(w, binary.BigEndian, p.PreludeCRC) 82 if err != nil { 83 return err 84 } 85 86 return nil 87} 88 89func encodeHeaders(w io.Writer, headers Headers) error { 90 for _, h := range headers { 91 hn := headerName{ 92 Len: uint8(len(h.Name)), 93 } 94 copy(hn.Name[:hn.Len], h.Name) 95 if err := hn.encode(w); err != nil { 96 return err 97 } 98 99 if err := h.Value.encode(w); err != nil { 100 return err 101 } 102 } 103 104 return nil 105} 106 107func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error { 108 for _, v := range vs { 109 if err := binary.Write(w, order, v); err != nil { 110 return err 111 } 112 } 113 return nil 114} 115