1package sarama 2 3import "github.com/rcrowley/go-metrics" 4 5// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements 6// it must see before responding. Any of the constants defined here are valid. On broker versions 7// prior to 0.8.2.0 any other positive int16 is also valid (the broker will wait for that many 8// acknowledgements) but in 0.8.2.0 and later this will raise an exception (it has been replaced 9// by setting the `min.isr` value in the brokers configuration). 10type RequiredAcks int16 11 12const ( 13 // NoResponse doesn't send any response, the TCP ACK is all you get. 14 NoResponse RequiredAcks = 0 15 // WaitForLocal waits for only the local commit to succeed before responding. 16 WaitForLocal RequiredAcks = 1 17 // WaitForAll waits for all in-sync replicas to commit before responding. 18 // The minimum number of in-sync replicas is configured on the broker via 19 // the `min.insync.replicas` configuration key. 20 WaitForAll RequiredAcks = -1 21) 22 23type ProduceRequest struct { 24 TransactionalID *string 25 RequiredAcks RequiredAcks 26 Timeout int32 27 Version int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11 28 records map[string]map[int32]Records 29} 30 31func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram, 32 topicCompressionRatioMetric metrics.Histogram) int64 { 33 var topicRecordCount int64 34 for _, messageBlock := range msgSet.Messages { 35 // Is this a fake "message" wrapping real messages? 36 if messageBlock.Msg.Set != nil { 37 topicRecordCount += int64(len(messageBlock.Msg.Set.Messages)) 38 } else { 39 // A single uncompressed message 40 topicRecordCount++ 41 } 42 // Better be safe than sorry when computing the compression ratio 43 if messageBlock.Msg.compressedSize != 0 { 44 compressionRatio := float64(len(messageBlock.Msg.Value)) / 45 float64(messageBlock.Msg.compressedSize) 46 // Histogram do not support decimal values, let's multiple it by 100 for better precision 47 intCompressionRatio := int64(100 * compressionRatio) 48 compressionRatioMetric.Update(intCompressionRatio) 49 topicCompressionRatioMetric.Update(intCompressionRatio) 50 } 51 } 52 return topicRecordCount 53} 54 55func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram, 56 topicCompressionRatioMetric metrics.Histogram) int64 { 57 if recordBatch.compressedRecords != nil { 58 compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100) 59 compressionRatioMetric.Update(compressionRatio) 60 topicCompressionRatioMetric.Update(compressionRatio) 61 } 62 63 return int64(len(recordBatch.Records)) 64} 65 66func (r *ProduceRequest) encode(pe packetEncoder) error { 67 if r.Version >= 3 { 68 if err := pe.putNullableString(r.TransactionalID); err != nil { 69 return err 70 } 71 } 72 pe.putInt16(int16(r.RequiredAcks)) 73 pe.putInt32(r.Timeout) 74 metricRegistry := pe.metricRegistry() 75 var batchSizeMetric metrics.Histogram 76 var compressionRatioMetric metrics.Histogram 77 if metricRegistry != nil { 78 batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry) 79 compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry) 80 } 81 totalRecordCount := int64(0) 82 83 err := pe.putArrayLength(len(r.records)) 84 if err != nil { 85 return err 86 } 87 88 for topic, partitions := range r.records { 89 err = pe.putString(topic) 90 if err != nil { 91 return err 92 } 93 err = pe.putArrayLength(len(partitions)) 94 if err != nil { 95 return err 96 } 97 topicRecordCount := int64(0) 98 var topicCompressionRatioMetric metrics.Histogram 99 if metricRegistry != nil { 100 topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry) 101 } 102 for id, records := range partitions { 103 startOffset := pe.offset() 104 pe.putInt32(id) 105 pe.push(&lengthField{}) 106 err = records.encode(pe) 107 if err != nil { 108 return err 109 } 110 err = pe.pop() 111 if err != nil { 112 return err 113 } 114 if metricRegistry != nil { 115 if r.Version >= 3 { 116 topicRecordCount += updateBatchMetrics(records.RecordBatch, compressionRatioMetric, topicCompressionRatioMetric) 117 } else { 118 topicRecordCount += updateMsgSetMetrics(records.MsgSet, compressionRatioMetric, topicCompressionRatioMetric) 119 } 120 batchSize := int64(pe.offset() - startOffset) 121 batchSizeMetric.Update(batchSize) 122 getOrRegisterTopicHistogram("batch-size", topic, metricRegistry).Update(batchSize) 123 } 124 } 125 if topicRecordCount > 0 { 126 getOrRegisterTopicMeter("record-send-rate", topic, metricRegistry).Mark(topicRecordCount) 127 getOrRegisterTopicHistogram("records-per-request", topic, metricRegistry).Update(topicRecordCount) 128 totalRecordCount += topicRecordCount 129 } 130 } 131 if totalRecordCount > 0 { 132 metrics.GetOrRegisterMeter("record-send-rate", metricRegistry).Mark(totalRecordCount) 133 getOrRegisterHistogram("records-per-request", metricRegistry).Update(totalRecordCount) 134 } 135 136 return nil 137} 138 139func (r *ProduceRequest) decode(pd packetDecoder, version int16) error { 140 r.Version = version 141 142 if version >= 3 { 143 id, err := pd.getNullableString() 144 if err != nil { 145 return err 146 } 147 r.TransactionalID = id 148 } 149 requiredAcks, err := pd.getInt16() 150 if err != nil { 151 return err 152 } 153 r.RequiredAcks = RequiredAcks(requiredAcks) 154 if r.Timeout, err = pd.getInt32(); err != nil { 155 return err 156 } 157 topicCount, err := pd.getArrayLength() 158 if err != nil { 159 return err 160 } 161 if topicCount == 0 { 162 return nil 163 } 164 165 r.records = make(map[string]map[int32]Records) 166 for i := 0; i < topicCount; i++ { 167 topic, err := pd.getString() 168 if err != nil { 169 return err 170 } 171 partitionCount, err := pd.getArrayLength() 172 if err != nil { 173 return err 174 } 175 r.records[topic] = make(map[int32]Records) 176 177 for j := 0; j < partitionCount; j++ { 178 partition, err := pd.getInt32() 179 if err != nil { 180 return err 181 } 182 size, err := pd.getInt32() 183 if err != nil { 184 return err 185 } 186 recordsDecoder, err := pd.getSubset(int(size)) 187 if err != nil { 188 return err 189 } 190 var records Records 191 if err := records.decode(recordsDecoder); err != nil { 192 return err 193 } 194 r.records[topic][partition] = records 195 } 196 } 197 198 return nil 199} 200 201func (r *ProduceRequest) key() int16 { 202 return 0 203} 204 205func (r *ProduceRequest) version() int16 { 206 return r.Version 207} 208 209func (r *ProduceRequest) headerVersion() int16 { 210 return 1 211} 212 213func (r *ProduceRequest) requiredVersion() KafkaVersion { 214 switch r.Version { 215 case 1: 216 return V0_9_0_0 217 case 2: 218 return V0_10_0_0 219 case 3: 220 return V0_11_0_0 221 case 7: 222 return V2_1_0_0 223 default: 224 return MinVersion 225 } 226} 227 228func (r *ProduceRequest) ensureRecords(topic string, partition int32) { 229 if r.records == nil { 230 r.records = make(map[string]map[int32]Records) 231 } 232 233 if r.records[topic] == nil { 234 r.records[topic] = make(map[int32]Records) 235 } 236} 237 238func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) { 239 r.ensureRecords(topic, partition) 240 set := r.records[topic][partition].MsgSet 241 242 if set == nil { 243 set = new(MessageSet) 244 r.records[topic][partition] = newLegacyRecords(set) 245 } 246 247 set.addMessage(msg) 248} 249 250func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) { 251 r.ensureRecords(topic, partition) 252 r.records[topic][partition] = newLegacyRecords(set) 253} 254 255func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) { 256 r.ensureRecords(topic, partition) 257 r.records[topic][partition] = newDefaultRecords(batch) 258} 259