1package sarama
2
3type MessageBlock struct {
4	Offset int64
5	Msg    *Message
6}
7
8// Messages convenience helper which returns either all the
9// messages that are wrapped in this block
10func (msb *MessageBlock) Messages() []*MessageBlock {
11	if msb.Msg.Set != nil {
12		return msb.Msg.Set.Messages
13	}
14	return []*MessageBlock{msb}
15}
16
17func (msb *MessageBlock) encode(pe packetEncoder) error {
18	pe.putInt64(msb.Offset)
19	pe.push(&lengthField{})
20	err := msb.Msg.encode(pe)
21	if err != nil {
22		return err
23	}
24	return pe.pop()
25}
26
27func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
28	if msb.Offset, err = pd.getInt64(); err != nil {
29		return err
30	}
31
32	if err = pd.push(&lengthField{}); err != nil {
33		return err
34	}
35
36	msb.Msg = new(Message)
37	if err = msb.Msg.decode(pd); err != nil {
38		return err
39	}
40
41	if err = pd.pop(); err != nil {
42		return err
43	}
44
45	return nil
46}
47
48type MessageSet struct {
49	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
50	OverflowMessage        bool // whether the set on the wire contained an overflow message
51	Messages               []*MessageBlock
52}
53
54func (ms *MessageSet) encode(pe packetEncoder) error {
55	for i := range ms.Messages {
56		err := ms.Messages[i].encode(pe)
57		if err != nil {
58			return err
59		}
60	}
61	return nil
62}
63
64func (ms *MessageSet) decode(pd packetDecoder) (err error) {
65	ms.Messages = nil
66
67	for pd.remaining() > 0 {
68		magic, err := magicValue(pd)
69		if err != nil {
70			if err == ErrInsufficientData {
71				ms.PartialTrailingMessage = true
72				return nil
73			}
74			return err
75		}
76
77		if magic > 1 {
78			return nil
79		}
80
81		msb := new(MessageBlock)
82		err = msb.decode(pd)
83		switch err {
84		case nil:
85			ms.Messages = append(ms.Messages, msb)
86		case ErrInsufficientData:
87			// As an optimization the server is allowed to return a partial message at the
88			// end of the message set. Clients should handle this case. So we just ignore such things.
89			if msb.Offset == -1 {
90				// This is an overflow message caused by chunked down conversion
91				ms.OverflowMessage = true
92			} else {
93				ms.PartialTrailingMessage = true
94			}
95			return nil
96		default:
97			return err
98		}
99	}
100
101	return nil
102}
103
104func (ms *MessageSet) addMessage(msg *Message) {
105	block := new(MessageBlock)
106	block.Msg = msg
107	ms.Messages = append(ms.Messages, block)
108}
109