1package sarama 2 3type MessageBlock struct { 4 Offset int64 5 Msg *Message 6} 7 8// Messages convenience helper which returns either all the 9// messages that are wrapped in this block 10func (msb *MessageBlock) Messages() []*MessageBlock { 11 if msb.Msg.Set != nil { 12 return msb.Msg.Set.Messages 13 } 14 return []*MessageBlock{msb} 15} 16 17func (msb *MessageBlock) encode(pe packetEncoder) error { 18 pe.putInt64(msb.Offset) 19 pe.push(&lengthField{}) 20 err := msb.Msg.encode(pe) 21 if err != nil { 22 return err 23 } 24 return pe.pop() 25} 26 27func (msb *MessageBlock) decode(pd packetDecoder) (err error) { 28 if msb.Offset, err = pd.getInt64(); err != nil { 29 return err 30 } 31 32 if err = pd.push(&lengthField{}); err != nil { 33 return err 34 } 35 36 msb.Msg = new(Message) 37 if err = msb.Msg.decode(pd); err != nil { 38 return err 39 } 40 41 if err = pd.pop(); err != nil { 42 return err 43 } 44 45 return nil 46} 47 48type MessageSet struct { 49 PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock 50 OverflowMessage bool // whether the set on the wire contained an overflow message 51 Messages []*MessageBlock 52} 53 54func (ms *MessageSet) encode(pe packetEncoder) error { 55 for i := range ms.Messages { 56 err := ms.Messages[i].encode(pe) 57 if err != nil { 58 return err 59 } 60 } 61 return nil 62} 63 64func (ms *MessageSet) decode(pd packetDecoder) (err error) { 65 ms.Messages = nil 66 67 for pd.remaining() > 0 { 68 magic, err := magicValue(pd) 69 if err != nil { 70 if err == ErrInsufficientData { 71 ms.PartialTrailingMessage = true 72 return nil 73 } 74 return err 75 } 76 77 if magic > 1 { 78 return nil 79 } 80 81 msb := new(MessageBlock) 82 err = msb.decode(pd) 83 switch err { 84 case nil: 85 ms.Messages = append(ms.Messages, msb) 86 case ErrInsufficientData: 87 // As an optimization the server is allowed to return a partial message at the 88 // end of the message set. Clients should handle this case. So we just ignore such things. 89 if msb.Offset == -1 { 90 // This is an overflow message caused by chunked down conversion 91 ms.OverflowMessage = true 92 } else { 93 ms.PartialTrailingMessage = true 94 } 95 return nil 96 default: 97 return err 98 } 99 } 100 101 return nil 102} 103 104func (ms *MessageSet) addMessage(msg *Message) { 105 block := new(MessageBlock) 106 block.Msg = msg 107 ms.Messages = append(ms.Messages, block) 108} 109