1package eventstream 2 3import ( 4 "bytes" 5 "encoding/binary" 6 "hash/crc32" 7) 8 9const preludeLen = 8 10const preludeCRCLen = 4 11const msgCRCLen = 4 12const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen 13const maxPayloadLen = 1024 * 1024 * 16 // 16MB 14const maxHeadersLen = 1024 * 128 // 128KB 15const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen 16 17var crc32IEEETable = crc32.MakeTable(crc32.IEEE) 18 19// A Message provides the eventstream message representation. 20type Message struct { 21 Headers Headers 22 Payload []byte 23} 24 25func (m *Message) rawMessage() (rawMessage, error) { 26 var raw rawMessage 27 28 if len(m.Headers) > 0 { 29 var headers bytes.Buffer 30 if err := encodeHeaders(&headers, m.Headers); err != nil { 31 return rawMessage{}, err 32 } 33 raw.Headers = headers.Bytes() 34 raw.HeadersLen = uint32(len(raw.Headers)) 35 } 36 37 raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen 38 39 hash := crc32.New(crc32IEEETable) 40 binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen) 41 raw.PreludeCRC = hash.Sum32() 42 43 binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC) 44 45 if raw.HeadersLen > 0 { 46 hash.Write(raw.Headers) 47 } 48 49 // Read payload bytes and update hash for it as well. 50 if len(m.Payload) > 0 { 51 raw.Payload = m.Payload 52 hash.Write(raw.Payload) 53 } 54 55 raw.CRC = hash.Sum32() 56 57 return raw, nil 58} 59 60type messagePrelude struct { 61 Length uint32 62 HeadersLen uint32 63 PreludeCRC uint32 64} 65 66func (p messagePrelude) PayloadLen() uint32 { 67 return p.Length - p.HeadersLen - minMsgLen 68} 69 70func (p messagePrelude) ValidateLens() error { 71 if p.Length == 0 || p.Length > maxMsgLen { 72 return LengthError{ 73 Part: "message prelude", 74 Want: maxMsgLen, 75 Have: int(p.Length), 76 } 77 } 78 if p.HeadersLen > maxHeadersLen { 79 return LengthError{ 80 Part: "message headers", 81 Want: maxHeadersLen, 82 Have: int(p.HeadersLen), 83 } 84 } 85 if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen { 86 return LengthError{ 87 Part: "message payload", 88 Want: maxPayloadLen, 89 Have: int(payloadLen), 90 } 91 } 92 93 return nil 94} 95 96type rawMessage struct { 97 messagePrelude 98 99 Headers []byte 100 Payload []byte 101 102 CRC uint32 103} 104