1// Command influx_tsm converts b1 or bz1 shards (from InfluxDB releases earlier than v0.11)
2// to the current tsm1 format.
3package main
4
5import (
6	"bufio"
7	"errors"
8	"flag"
9	"fmt"
10	"io"
11	"io/ioutil"
12	"log"
13	"os"
14	"path/filepath"
15	"runtime"
16	"runtime/pprof"
17	"sort"
18	"strings"
19	"text/tabwriter"
20	"time"
21
22	"net/http"
23	_ "net/http/pprof"
24
25	"github.com/influxdata/influxdb/cmd/influx_tsm/b1"
26	"github.com/influxdata/influxdb/cmd/influx_tsm/bz1"
27	"github.com/influxdata/influxdb/cmd/influx_tsm/tsdb"
28)
29
30// ShardReader reads b* shards and converts to tsm shards
31type ShardReader interface {
32	KeyIterator
33	Open() error
34	Close() error
35}
36
37const (
38	tsmExt = "tsm"
39)
40
41var description = `
42Convert a database from b1 or bz1 format to tsm1 format.
43
44This tool will backup the directories before conversion (if not disabled).
45The backed-up files must be removed manually, generally after starting up the
46node again to make sure all of data has been converted correctly.
47
48To restore a backup:
49  Shut down the node, remove the converted directory, and
50  copy the backed-up directory to the original location.`
51
52type options struct {
53	DataPath       string
54	BackupPath     string
55	DBs            []string
56	DebugAddr      string
57	TSMSize        uint64
58	Parallel       bool
59	SkipBackup     bool
60	UpdateInterval time.Duration
61	Yes            bool
62	CPUFile        string
63}
64
65func (o *options) Parse() error {
66	fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)
67
68	var dbs string
69
70	fs.StringVar(&dbs, "dbs", "", "Comma-delimited list of databases to convert. Default is to convert all databases.")
71	fs.Uint64Var(&opts.TSMSize, "sz", maxTSMSz, "Maximum size of individual TSM files.")
72	fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)")
73	fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.")
74	fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.")
75	fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address")
76	fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.")
77	fs.BoolVar(&opts.Yes, "y", false, "Don't ask, just convert")
78	fs.StringVar(&opts.CPUFile, "profile", "", "CPU Profile location")
79	fs.Usage = func() {
80		fmt.Fprintf(os.Stderr, "Usage: %v [options] <data-path> \n", os.Args[0])
81		fmt.Fprintf(os.Stderr, "%v\n\nOptions:\n", description)
82		fs.PrintDefaults()
83		fmt.Fprintf(os.Stderr, "\n")
84	}
85
86	if err := fs.Parse(os.Args[1:]); err != nil {
87		return err
88	}
89
90	if len(fs.Args()) < 1 {
91		return errors.New("no data directory specified")
92	}
93	var err error
94	if o.DataPath, err = filepath.Abs(fs.Args()[0]); err != nil {
95		return err
96	}
97	if o.DataPath, err = filepath.EvalSymlinks(filepath.Clean(o.DataPath)); err != nil {
98		return err
99	}
100
101	if o.TSMSize > maxTSMSz {
102		return fmt.Errorf("bad TSM file size, maximum TSM file size is %d", maxTSMSz)
103	}
104
105	// Check if specific databases were requested.
106	o.DBs = strings.Split(dbs, ",")
107	if len(o.DBs) == 1 && o.DBs[0] == "" {
108		o.DBs = nil
109	}
110
111	if !o.SkipBackup {
112		if o.BackupPath == "" {
113			return errors.New("either -nobackup or -backup DIR must be set")
114		}
115		if o.BackupPath, err = filepath.Abs(o.BackupPath); err != nil {
116			return err
117		}
118		if o.BackupPath, err = filepath.EvalSymlinks(filepath.Clean(o.BackupPath)); err != nil {
119			if os.IsNotExist(err) {
120				return errors.New("backup directory must already exist")
121			}
122			return err
123		}
124
125		if strings.HasPrefix(o.BackupPath, o.DataPath) {
126			fmt.Println(o.BackupPath, o.DataPath)
127			return errors.New("backup directory cannot be contained within data directory")
128		}
129	}
130
131	if o.DebugAddr != "" {
132		log.Printf("Starting debugging server on http://%v", o.DebugAddr)
133		go func() {
134			log.Fatal(http.ListenAndServe(o.DebugAddr, nil))
135		}()
136	}
137
138	return nil
139}
140
141var opts options
142
143const maxTSMSz uint64 = 2 * 1024 * 1024 * 1024
144
145func init() {
146	log.SetOutput(os.Stderr)
147	log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
148}
149
150func main() {
151	if err := opts.Parse(); err != nil {
152		log.Fatal(err)
153	}
154
155	// Determine the list of databases
156	dbs, err := ioutil.ReadDir(opts.DataPath)
157	if err != nil {
158		log.Fatalf("failed to access data directory at %v: %v\n", opts.DataPath, err)
159	}
160	fmt.Println() // Cleanly separate output from start of program.
161
162	if opts.Parallel {
163		if !isEnvSet("GOMAXPROCS") {
164			// Only modify GOMAXPROCS if it wasn't set in the environment
165			// This means 'GOMAXPROCS=1 influx_tsm -parallel' will not actually
166			// run in parallel
167			runtime.GOMAXPROCS(runtime.NumCPU())
168		}
169	}
170
171	var badUser string
172	if opts.SkipBackup {
173		badUser = "(NOT RECOMMENDED)"
174	}
175
176	// Dump summary of what is about to happen.
177	fmt.Println("b1 and bz1 shard conversion.")
178	fmt.Println("-----------------------------------")
179	fmt.Println("Data directory is:                 ", opts.DataPath)
180	if !opts.SkipBackup {
181		fmt.Println("Backup directory is:               ", opts.BackupPath)
182	}
183	fmt.Println("Databases specified:               ", allDBs(opts.DBs))
184	fmt.Println("Database backups enabled:          ", yesno(!opts.SkipBackup), badUser)
185	fmt.Printf("Parallel mode enabled (GOMAXPROCS): %s (%d)\n", yesno(opts.Parallel), runtime.GOMAXPROCS(0))
186	fmt.Println()
187
188	shards := collectShards(dbs)
189
190	// Anything to convert?
191	fmt.Printf("\nFound %d shards that will be converted.\n", len(shards))
192	if len(shards) == 0 {
193		fmt.Println("Nothing to do.")
194		return
195	}
196
197	// Display list of convertible shards.
198	fmt.Println()
199	w := new(tabwriter.Writer)
200	w.Init(os.Stdout, 0, 8, 1, '\t', 0)
201	fmt.Fprintln(w, "Database\tRetention\tPath\tEngine\tSize")
202	for _, si := range shards {
203		fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%d\n", si.Database, si.RetentionPolicy, si.FullPath(opts.DataPath), si.FormatAsString(), si.Size)
204	}
205	w.Flush()
206
207	if !opts.Yes {
208		// Get confirmation from user.
209		fmt.Printf("\nThese shards will be converted. Proceed? y/N: ")
210		liner := bufio.NewReader(os.Stdin)
211		yn, err := liner.ReadString('\n')
212		if err != nil {
213			log.Fatalf("failed to read response: %v", err)
214		}
215		yn = strings.TrimRight(strings.ToLower(yn), "\n")
216		if yn != "y" {
217			log.Fatal("Conversion aborted.")
218		}
219	}
220	fmt.Println("Conversion starting....")
221
222	if opts.CPUFile != "" {
223		f, err := os.Create(opts.CPUFile)
224		if err != nil {
225			log.Fatal(err)
226		}
227		if err = pprof.StartCPUProfile(f); err != nil {
228			log.Fatal(err)
229		}
230		defer pprof.StopCPUProfile()
231	}
232
233	tr := newTracker(shards, opts)
234
235	if err := tr.Run(); err != nil {
236		log.Fatalf("Error occurred preventing completion: %v\n", err)
237	}
238
239	tr.PrintStats()
240}
241
242func collectShards(dbs []os.FileInfo) tsdb.ShardInfos {
243	// Get the list of shards for conversion.
244	var shards tsdb.ShardInfos
245	for _, db := range dbs {
246		d := tsdb.NewDatabase(filepath.Join(opts.DataPath, db.Name()))
247		shs, err := d.Shards()
248		if err != nil {
249			log.Fatalf("Failed to access shards for database %v: %v\n", d.Name(), err)
250		}
251		shards = append(shards, shs...)
252	}
253
254	sort.Sort(shards)
255	shards = shards.FilterFormat(tsdb.TSM1)
256	if len(dbs) > 0 {
257		shards = shards.ExclusiveDatabases(opts.DBs)
258	}
259
260	return shards
261}
262
263// backupDatabase backs up the database named db
264func backupDatabase(db string) error {
265	copyFile := func(path string, info os.FileInfo, err error) error {
266		if err != nil {
267			return err
268		}
269
270		// Strip the DataPath from the path and replace with BackupPath.
271		toPath := strings.Replace(path, opts.DataPath, opts.BackupPath, 1)
272
273		if info.IsDir() {
274			return os.MkdirAll(toPath, info.Mode())
275		}
276
277		in, err := os.Open(path)
278		if err != nil {
279			return err
280		}
281		defer in.Close()
282
283		srcInfo, err := os.Stat(path)
284		if err != nil {
285			return err
286		}
287
288		out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode())
289		if err != nil {
290			return err
291		}
292		defer out.Close()
293
294		dstInfo, err := os.Stat(toPath)
295		if err != nil {
296			return err
297		}
298
299		if dstInfo.Size() == srcInfo.Size() {
300			log.Printf("Backup file already found for %v with correct size, skipping.", path)
301			return nil
302		}
303
304		if dstInfo.Size() > srcInfo.Size() {
305			log.Printf("Invalid backup file found for %v, replacing with good copy.", path)
306			if err := out.Truncate(0); err != nil {
307				return err
308			}
309			if _, err := out.Seek(0, io.SeekStart); err != nil {
310				return err
311			}
312		}
313
314		if dstInfo.Size() > 0 {
315			log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size())
316		}
317
318		off, err := out.Seek(0, io.SeekEnd)
319		if err != nil {
320			return err
321		}
322		if _, err := in.Seek(off, io.SeekStart); err != nil {
323			return err
324		}
325
326		log.Printf("Backing up file %v", path)
327
328		_, err = io.Copy(out, in)
329
330		return err
331	}
332
333	return filepath.Walk(filepath.Join(opts.DataPath, db), copyFile)
334}
335
336// convertShard converts the shard in-place.
337func convertShard(si *tsdb.ShardInfo, tr *tracker) error {
338	src := si.FullPath(opts.DataPath)
339	dst := fmt.Sprintf("%v.%v", src, tsmExt)
340
341	var reader ShardReader
342	switch si.Format {
343	case tsdb.BZ1:
344		reader = bz1.NewReader(src, &tr.Stats, 0)
345	case tsdb.B1:
346		reader = b1.NewReader(src, &tr.Stats, 0)
347	default:
348		return fmt.Errorf("Unsupported shard format: %v", si.FormatAsString())
349	}
350
351	// Open the shard, and create a converter.
352	if err := reader.Open(); err != nil {
353		return fmt.Errorf("Failed to open %v for conversion: %v", src, err)
354	}
355	defer reader.Close()
356	converter := NewConverter(dst, uint32(opts.TSMSize), &tr.Stats)
357
358	// Perform the conversion.
359	if err := converter.Process(reader); err != nil {
360		return fmt.Errorf("Conversion of %v failed: %v", src, err)
361	}
362
363	// Delete source shard, and rename new tsm1 shard.
364	if err := reader.Close(); err != nil {
365		return fmt.Errorf("Conversion of %v failed due to close: %v", src, err)
366	}
367
368	if err := os.RemoveAll(si.FullPath(opts.DataPath)); err != nil {
369		return fmt.Errorf("Deletion of %v failed: %v", src, err)
370	}
371	if err := os.Rename(dst, src); err != nil {
372		return fmt.Errorf("Rename of %v to %v failed: %v", dst, src, err)
373	}
374
375	return nil
376}
377
378// ParallelGroup allows the maximum parrallelism of a set of operations to be controlled.
379type ParallelGroup chan struct{}
380
381// NewParallelGroup returns a group which allows n operations to run in parallel. A value of 0
382// means no operations will ever run.
383func NewParallelGroup(n int) ParallelGroup {
384	return make(chan struct{}, n)
385}
386
387// Do executes one operation of the ParallelGroup
388func (p ParallelGroup) Do(f func()) {
389	p <- struct{}{} // acquire working slot
390	defer func() { <-p }()
391
392	f()
393}
394
395// yesno returns "yes" for true, "no" for false.
396func yesno(b bool) string {
397	if b {
398		return "yes"
399	}
400	return "no"
401}
402
403// allDBs returns "all" if all databases are requested for conversion.
404func allDBs(dbs []string) string {
405	if dbs == nil {
406		return "all"
407	}
408	return fmt.Sprintf("%v", dbs)
409}
410
411// isEnvSet checks to see if a variable was set in the environment
412func isEnvSet(name string) bool {
413	for _, s := range os.Environ() {
414		if strings.SplitN(s, "=", 2)[0] == name {
415			return true
416		}
417	}
418	return false
419}
420