1package binary 2 3import ( 4 "bufio" 5 "fmt" 6 "io" 7 "time" 8 9 "github.com/influxdata/influxdb/cmd/influx_tools/internal/format" 10 "github.com/influxdata/influxdb/cmd/influx_tools/internal/tlv" 11 "github.com/influxdata/influxdb/models" 12 "github.com/influxdata/influxdb/tsdb" 13 "github.com/influxdata/influxql" 14) 15 16type Writer struct { 17 w *bufio.Writer 18 buf []byte 19 db, rp string 20 duration time.Duration 21 err error 22 bw *bucketWriter 23 state writeState 24 wroteHeader bool 25 26 msg struct { 27 bucketHeader BucketHeader 28 bucketFooter BucketFooter 29 seriesHeader SeriesHeader 30 seriesFooter SeriesFooter 31 } 32 33 stats struct { 34 series int 35 counts [8]struct { 36 series, values int 37 } 38 } 39} 40 41type writeState int 42 43const ( 44 writeHeader writeState = iota 45 writeBucket 46 writeSeries 47 writeSeriesHeader 48 writePoints 49) 50 51func NewWriter(w io.Writer, database, rp string, duration time.Duration) *Writer { 52 var wr *bufio.Writer 53 if wr, _ = w.(*bufio.Writer); wr == nil { 54 wr = bufio.NewWriter(w) 55 } 56 return &Writer{w: wr, db: database, rp: rp, duration: duration} 57} 58 59func (w *Writer) WriteStats(o io.Writer) { 60 fmt.Fprintf(o, "total series: %d\n", w.stats.series) 61 62 for i := 0; i < 5; i++ { 63 ft := FieldType(i) 64 fmt.Fprintf(o, "%s unique series: %d\n", ft, w.stats.counts[i].series) 65 fmt.Fprintf(o, "%s total values : %d\n", ft, w.stats.counts[i].values) 66 } 67} 68 69func (w *Writer) NewBucket(start, end int64) (format.BucketWriter, error) { 70 if w.state == writeHeader { 71 w.writeHeader() 72 } 73 74 if w.err != nil { 75 return nil, w.err 76 } 77 78 if w.state != writeBucket { 79 panic(fmt.Sprintf("writer state: got=%v, exp=%v", w.state, writeBucket)) 80 } 81 82 w.bw = &bucketWriter{w: w, start: start, end: end} 83 w.writeBucketHeader(start, end) 84 85 return w.bw, w.err 86} 87 88func (w *Writer) Close() error { 89 if w.err == ErrWriteAfterClose { 90 return nil 91 } 92 if w.err != nil { 93 return w.err 94 } 95 96 w.err = ErrWriteAfterClose 97 98 return nil 99} 100 101func (w *Writer) writeHeader() { 102 w.state = writeBucket 103 w.wroteHeader = true 104 105 w.write(Magic[:]) 106 107 h := Header{ 108 Version: Version0, 109 Database: w.db, 110 RetentionPolicy: w.rp, 111 ShardDuration: w.duration, 112 } 113 w.writeTypeMessage(HeaderType, &h) 114} 115 116func (w *Writer) writeBucketHeader(start, end int64) { 117 w.state = writeSeries 118 w.msg.bucketHeader.Start = start 119 w.msg.bucketHeader.End = end 120 w.writeTypeMessage(BucketHeaderType, &w.msg.bucketHeader) 121} 122 123func (w *Writer) writeBucketFooter() { 124 w.state = writeBucket 125 w.writeTypeMessage(BucketFooterType, &w.msg.bucketFooter) 126} 127 128func (w *Writer) writeSeriesHeader(key, field []byte, ft FieldType) { 129 w.state = writePoints 130 w.stats.series++ 131 w.stats.counts[ft&7].series++ 132 133 w.msg.seriesHeader.SeriesKey = key 134 w.msg.seriesHeader.Field = field 135 w.msg.seriesHeader.FieldType = ft 136 w.writeTypeMessage(SeriesHeaderType, &w.msg.seriesHeader) 137} 138 139func (w *Writer) writeSeriesFooter(ft FieldType, count int) { 140 w.stats.counts[ft&7].values += count 141 w.writeTypeMessage(SeriesFooterType, &w.msg.seriesFooter) 142} 143 144func (w *Writer) write(p []byte) { 145 if w.err != nil { 146 return 147 } 148 _, w.err = w.w.Write(p) 149} 150 151func (w *Writer) writeTypeMessage(typ MessageType, msg message) { 152 if w.err != nil { 153 return 154 } 155 156 // ensure size 157 n := msg.Size() 158 if n > cap(w.buf) { 159 w.buf = make([]byte, n) 160 } else { 161 w.buf = w.buf[:n] 162 } 163 164 _, w.err = msg.MarshalTo(w.buf) 165 w.writeTypeBytes(typ, w.buf) 166} 167 168func (w *Writer) writeTypeBytes(typ MessageType, b []byte) { 169 if w.err != nil { 170 return 171 } 172 w.err = tlv.WriteTLV(w.w, byte(typ), w.buf) 173} 174 175type bucketWriter struct { 176 w *Writer 177 err error 178 start, end int64 179 key []byte 180 field []byte 181 n int 182 closed bool 183} 184 185func (bw *bucketWriter) Err() error { 186 if bw.w.err != nil { 187 return bw.w.err 188 } 189 return bw.err 190} 191 192func (bw *bucketWriter) hasErr() bool { 193 return bw.w.err != nil || bw.err != nil 194} 195 196func (bw *bucketWriter) BeginSeries(name, field []byte, typ influxql.DataType, tags models.Tags) { 197 if bw.hasErr() { 198 return 199 } 200 201 if bw.w.state != writeSeries { 202 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writeSeries)) 203 } 204 bw.w.state = writeSeriesHeader 205 206 bw.key = models.AppendMakeKey(bw.key[:0], name, tags) 207 bw.field = field 208} 209 210func (bw *bucketWriter) EndSeries() { 211 if bw.hasErr() { 212 return 213 } 214 215 if bw.w.state != writePoints && bw.w.state != writeSeriesHeader { 216 panic(fmt.Sprintf("writer state: got=%v, exp=%v,%v", bw.w.state, writeSeriesHeader, writePoints)) 217 } 218 if bw.w.state == writePoints { 219 bw.w.writeSeriesFooter(IntegerFieldType, bw.n) 220 } 221 bw.w.state = writeSeries 222} 223 224func (bw *bucketWriter) WriteIntegerCursor(cur tsdb.IntegerArrayCursor) { 225 if bw.hasErr() { 226 return 227 } 228 229 if bw.w.state == writeSeriesHeader { 230 bw.w.writeSeriesHeader(bw.key, bw.field, IntegerFieldType) 231 } 232 233 if bw.w.state != writePoints { 234 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints)) 235 } 236 237 var msg IntegerPoints 238 for { 239 a := cur.Next() 240 if a.Len() == 0 { 241 break 242 } 243 244 bw.n += a.Len() 245 msg.Timestamps = a.Timestamps 246 msg.Values = a.Values 247 bw.w.writeTypeMessage(IntegerPointsType, &msg) 248 } 249} 250 251func (bw *bucketWriter) WriteFloatCursor(cur tsdb.FloatArrayCursor) { 252 if bw.hasErr() { 253 return 254 } 255 256 if bw.w.state == writeSeriesHeader { 257 bw.w.writeSeriesHeader(bw.key, bw.field, FloatFieldType) 258 } 259 260 if bw.w.state != writePoints { 261 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints)) 262 } 263 264 var msg FloatPoints 265 for { 266 a := cur.Next() 267 if a.Len() == 0 { 268 break 269 } 270 271 bw.n += a.Len() 272 msg.Timestamps = a.Timestamps 273 msg.Values = a.Values 274 bw.w.writeTypeMessage(FloatPointsType, &msg) 275 } 276} 277 278func (bw *bucketWriter) WriteUnsignedCursor(cur tsdb.UnsignedArrayCursor) { 279 if bw.hasErr() { 280 return 281 } 282 283 if bw.w.state == writeSeriesHeader { 284 bw.w.writeSeriesHeader(bw.key, bw.field, UnsignedFieldType) 285 } 286 287 if bw.w.state != writePoints { 288 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints)) 289 } 290 291 var msg UnsignedPoints 292 for { 293 a := cur.Next() 294 if a.Len() == 0 { 295 break 296 } 297 298 bw.n += a.Len() 299 msg.Timestamps = a.Timestamps 300 msg.Values = a.Values 301 bw.w.writeTypeMessage(UnsignedPointsType, &msg) 302 } 303} 304 305func (bw *bucketWriter) WriteBooleanCursor(cur tsdb.BooleanArrayCursor) { 306 if bw.hasErr() { 307 return 308 } 309 310 if bw.w.state == writeSeriesHeader { 311 bw.w.writeSeriesHeader(bw.key, bw.field, BooleanFieldType) 312 } 313 314 if bw.w.state != writePoints { 315 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints)) 316 } 317 318 var msg BooleanPoints 319 for { 320 a := cur.Next() 321 if a.Len() == 0 { 322 break 323 } 324 325 bw.n += a.Len() 326 msg.Timestamps = a.Timestamps 327 msg.Values = a.Values 328 bw.w.writeTypeMessage(BooleanPointsType, &msg) 329 } 330} 331 332func (bw *bucketWriter) WriteStringCursor(cur tsdb.StringArrayCursor) { 333 if bw.hasErr() { 334 return 335 } 336 337 if bw.w.state == writeSeriesHeader { 338 bw.w.writeSeriesHeader(bw.key, bw.field, StringFieldType) 339 } 340 341 if bw.w.state != writePoints { 342 panic(fmt.Sprintf("writer state: got=%v, exp=%v", bw.w.state, writePoints)) 343 } 344 345 var msg StringPoints 346 for { 347 a := cur.Next() 348 if a.Len() == 0 { 349 break 350 } 351 352 bw.n += a.Len() 353 msg.Timestamps = a.Timestamps 354 msg.Values = a.Values 355 bw.w.writeTypeMessage(StringPointsType, &msg) 356 } 357} 358 359func (bw *bucketWriter) Close() error { 360 if bw.closed { 361 return nil 362 } 363 364 bw.closed = true 365 366 if bw.hasErr() { 367 return bw.Err() 368 } 369 370 bw.w.bw = nil 371 bw.w.writeBucketFooter() 372 bw.err = ErrWriteBucketAfterClose 373 374 return bw.w.w.Flush() 375} 376