1package main
2
3import (
4	"fmt"
5	"os"
6	"path/filepath"
7
8	"github.com/influxdata/influxdb/cmd/influx_tsm/stats"
9	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
10)
11
12const (
13	maxBlocksPerKey = 65535
14)
15
16// KeyIterator is used to iterate over b* keys for conversion to tsm keys
17type KeyIterator interface {
18	Next() bool
19	Read() (string, []tsm1.Value, error)
20}
21
22// Converter encapsulates the logic for converting b*1 shards to tsm1 shards.
23type Converter struct {
24	path           string
25	maxTSMFileSize uint32
26	sequence       int
27	stats          *stats.Stats
28}
29
30// NewConverter returns a new instance of the Converter.
31func NewConverter(path string, sz uint32, stats *stats.Stats) *Converter {
32	return &Converter{
33		path:           path,
34		maxTSMFileSize: sz,
35		stats:          stats,
36	}
37}
38
39// Process writes the data provided by iter to a tsm1 shard.
40func (c *Converter) Process(iter KeyIterator) error {
41	// Ensure the tsm1 directory exists.
42	if err := os.MkdirAll(c.path, 0777); err != nil {
43		return err
44	}
45
46	// Iterate until no more data remains.
47	var w tsm1.TSMWriter
48	var keyCount map[string]int
49
50	for iter.Next() {
51		k, v, err := iter.Read()
52		if err != nil {
53			return err
54		}
55
56		if w == nil {
57			w, err = c.nextTSMWriter()
58			if err != nil {
59				return err
60			}
61			keyCount = map[string]int{}
62		}
63		if err := w.Write([]byte(k), v); err != nil {
64			return err
65		}
66		keyCount[k]++
67
68		c.stats.AddPointsRead(len(v))
69		c.stats.AddPointsWritten(len(v))
70
71		// If we have a max file size configured and we're over it, start a new TSM file.
72		if w.Size() > c.maxTSMFileSize || keyCount[k] == maxBlocksPerKey {
73			if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
74				return err
75			}
76
77			c.stats.AddTSMBytes(w.Size())
78
79			if err := w.Close(); err != nil {
80				return err
81			}
82			w = nil
83		}
84	}
85
86	if w != nil {
87		if err := w.WriteIndex(); err != nil && err != tsm1.ErrNoValues {
88			return err
89		}
90		c.stats.AddTSMBytes(w.Size())
91
92		if err := w.Close(); err != nil {
93			return err
94		}
95	}
96
97	return nil
98}
99
100// nextTSMWriter returns the next TSMWriter for the Converter.
101func (c *Converter) nextTSMWriter() (tsm1.TSMWriter, error) {
102	c.sequence++
103	fileName := filepath.Join(c.path, fmt.Sprintf("%09d-%09d.%s", 1, c.sequence, tsm1.TSMFileExtension))
104
105	fd, err := os.OpenFile(fileName, os.O_CREATE|os.O_RDWR, 0666)
106	if err != nil {
107		return nil, err
108	}
109
110	// Create the writer for the new TSM file.
111	w, err := tsm1.NewTSMWriter(fd)
112	if err != nil {
113		return nil, err
114	}
115
116	c.stats.IncrTSMFileCount()
117	return w, nil
118}
119