1package eventstream
2
3import (
4	"bytes"
5	"encoding/binary"
6	"hash/crc32"
7)
8
9const preludeLen = 8
10const preludeCRCLen = 4
11const msgCRCLen = 4
12const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen
13const maxPayloadLen = 1024 * 1024 * 16 // 16MB
14const maxHeadersLen = 1024 * 128       // 128KB
15const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen
16
17var crc32IEEETable = crc32.MakeTable(crc32.IEEE)
18
19// A Message provides the eventstream message representation.
20type Message struct {
21	Headers Headers
22	Payload []byte
23}
24
25func (m *Message) rawMessage() (rawMessage, error) {
26	var raw rawMessage
27
28	if len(m.Headers) > 0 {
29		var headers bytes.Buffer
30		if err := encodeHeaders(&headers, m.Headers); err != nil {
31			return rawMessage{}, err
32		}
33		raw.Headers = headers.Bytes()
34		raw.HeadersLen = uint32(len(raw.Headers))
35	}
36
37	raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen
38
39	hash := crc32.New(crc32IEEETable)
40	binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen)
41	raw.PreludeCRC = hash.Sum32()
42
43	binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC)
44
45	if raw.HeadersLen > 0 {
46		hash.Write(raw.Headers)
47	}
48
49	// Read payload bytes and update hash for it as well.
50	if len(m.Payload) > 0 {
51		raw.Payload = m.Payload
52		hash.Write(raw.Payload)
53	}
54
55	raw.CRC = hash.Sum32()
56
57	return raw, nil
58}
59
60type messagePrelude struct {
61	Length     uint32
62	HeadersLen uint32
63	PreludeCRC uint32
64}
65
66func (p messagePrelude) PayloadLen() uint32 {
67	return p.Length - p.HeadersLen - minMsgLen
68}
69
70func (p messagePrelude) ValidateLens() error {
71	if p.Length == 0 || p.Length > maxMsgLen {
72		return LengthError{
73			Part: "message prelude",
74			Want: maxMsgLen,
75			Have: int(p.Length),
76		}
77	}
78	if p.HeadersLen > maxHeadersLen {
79		return LengthError{
80			Part: "message headers",
81			Want: maxHeadersLen,
82			Have: int(p.HeadersLen),
83		}
84	}
85	if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen {
86		return LengthError{
87			Part: "message payload",
88			Want: maxPayloadLen,
89			Have: int(payloadLen),
90		}
91	}
92
93	return nil
94}
95
96type rawMessage struct {
97	messagePrelude
98
99	Headers []byte
100	Payload []byte
101
102	CRC uint32
103}
104