1package main 2 3import ( 4 "fmt" 5 "os" 6 "path/filepath" 7 8 "github.com/influxdata/influxdb/cmd/influx_tsm/stats" 9 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 10) 11 12const ( 13 maxBlocksPerKey = 65535 14) 15 16// KeyIterator is used to iterate over b* keys for conversion to tsm keys 17type KeyIterator interface { 18 Next() bool 19 Read() (string, []tsm1.Value, error) 20} 21 22// Converter encapsulates the logic for converting b*1 shards to tsm1 shards. 23type Converter struct { 24 path string 25 maxTSMFileSize uint32 26 sequence int 27 stats *stats.Stats 28} 29 30// NewConverter returns a new instance of the Converter. 31func NewConverter(path string, sz uint32, stats *stats.Stats) *Converter { 32 return &Converter{ 33 path: path, 34 maxTSMFileSize: sz, 35 stats: stats, 36 } 37} 38 39// Process writes the data provided by iter to a tsm1 shard. 40func (c *Converter) Process(iter KeyIterator) error { 41 // Ensure the tsm1 directory exists. 42 if err := os.MkdirAll(c.path, 0777); err != nil { 43 return err 44 } 45 46 // Iterate until no more data remains. 47 var w tsm1.TSMWriter 48 var keyCount map[string]int 49 50 for iter.Next() { 51 k, v, err := iter.Read() 52 if err != nil { 53 return err 54 } 55 56 if w == nil { 57 w, err = c.nextTSMWriter() 58 if err != nil { 59 return err 60 } 61 keyCount = map[string]int{} 62 } 63 if err := w.Write([]byte(k), v); err != nil { 64 return err 65 } 66 keyCount[k]++ 67 68 c.stats.AddPointsRead(len(v)) 69 c.stats.AddPointsWritten(len(v)) 70 71 // If we have a max file size configured and we're over it, start a new TSM file. 72 if w.Size() > c.maxTSMFileSize || keyCount[k] == maxBlocksPerKey { 73 if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { 74 return err 75 } 76 77 c.stats.AddTSMBytes(w.Size()) 78 79 if err := w.Close(); err != nil { 80 return err 81 } 82 w = nil 83 } 84 } 85 86 if w != nil { 87 if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues { 88 return err 89 } 90 c.stats.AddTSMBytes(w.Size()) 91 92 if err := w.Close(); err != nil { 93 return err 94 } 95 } 96 97 return nil 98} 99 100// nextTSMWriter returns the next TSMWriter for the Converter. 101func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) { 102 c.sequence++ 103 fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", 1, c.sequence, tsm1.TSMFileExtension)) 104 105 fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666) 106 if err != nil { 107 return nil, err 108 } 109 110 // Create the writer for the new TSM file. 111 w, err := tsm1.NewTSMWriter(fd) 112 if err != nil { 113 return nil, err 114 } 115 116 c.stats.IncrTSMFileCount() 117 return w, nil 118} 119