1// Package export exports TSM files into InfluxDB line protocol format.
2package export
3
4import (
5	"bufio"
6	"compress/gzip"
7	"flag"
8	"fmt"
9	"io"
10	"io/ioutil"
11	"math"
12	"os"
13	"path/filepath"
14	"sort"
15	"strconv"
16	"strings"
17	"sync"
18	"time"
19
20	"github.com/influxdata/influxdb/models"
21	"github.com/influxdata/influxdb/pkg/escape"
22	"github.com/influxdata/influxdb/tsdb/engine/tsm1"
23	"github.com/influxdata/influxql"
24)
25
26// Command represents the program execution for "influx_inspect export".
27type Command struct {
28	// Standard input/output, overridden for testing.
29	Stderr io.Writer
30	Stdout io.Writer
31
32	dataDir         string
33	walDir          string
34	out             string
35	database        string
36	retentionPolicy string
37	startTime       int64
38	endTime         int64
39	compress        bool
40	lponly          bool
41
42	manifest map[string]struct{}
43	tsmFiles map[string][]string
44	walFiles map[string][]string
45}
46
47// NewCommand returns a new instance of Command.
48func NewCommand() *Command {
49	return &Command{
50		Stderr: os.Stderr,
51		Stdout: os.Stdout,
52
53		manifest: make(map[string]struct{}),
54		tsmFiles: make(map[string][]string),
55		walFiles: make(map[string][]string),
56	}
57}
58
59// Run executes the command.
60func (cmd *Command) Run(args ...string) error {
61	var start, end string
62	fs := flag.NewFlagSet("export", flag.ExitOnError)
63	fs.StringVar(&cmd.dataDir, "datadir", os.Getenv("HOME")+"/.influxdb/data", "Data storage path")
64	fs.StringVar(&cmd.walDir, "waldir", os.Getenv("HOME")+"/.influxdb/wal", "WAL storage path")
65	fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to")
66	fs.StringVar(&cmd.database, "database", "", "Optional: the database to export")
67	fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)")
68	fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)")
69	fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)")
70	fs.BoolVar(&cmd.lponly, "lponly", false, "Only export line protocol")
71	fs.BoolVar(&cmd.compress, "compress", false, "Compress the output")
72
73	fs.SetOutput(cmd.Stdout)
74	fs.Usage = func() {
75		fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol format.\n\n")
76		fmt.Fprintf(cmd.Stdout, "Usage: %s export [flags]\n\n", filepath.Base(os.Args[0]))
77		fs.PrintDefaults()
78	}
79
80	if err := fs.Parse(args); err != nil {
81		return err
82	}
83
84	// set defaults
85	if start != "" {
86		s, err := time.Parse(time.RFC3339, start)
87		if err != nil {
88			return err
89		}
90		cmd.startTime = s.UnixNano()
91	} else {
92		cmd.startTime = math.MinInt64
93	}
94	if end != "" {
95		e, err := time.Parse(time.RFC3339, end)
96		if err != nil {
97			return err
98		}
99		cmd.endTime = e.UnixNano()
100	} else {
101		// set end time to max if it is not set.
102		cmd.endTime = math.MaxInt64
103	}
104
105	if err := cmd.validate(); err != nil {
106		return err
107	}
108
109	return cmd.export()
110}
111
112func (cmd *Command) validate() error {
113	if cmd.retentionPolicy != "" && cmd.database == "" {
114		return fmt.Errorf("must specify a db")
115	}
116	if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime {
117		return fmt.Errorf("end time before start time")
118	}
119	return nil
120}
121
122func (cmd *Command) export() error {
123	if err := cmd.walkTSMFiles(); err != nil {
124		return err
125	}
126	if err := cmd.walkWALFiles(); err != nil {
127		return err
128	}
129
130	return cmd.write()
131}
132
133func (cmd *Command) walkTSMFiles() error {
134	return filepath.Walk(cmd.dataDir, func(path string, f os.FileInfo, err error) error {
135		if err != nil {
136			return err
137		}
138
139		// check to see if this is a tsm file
140		if filepath.Ext(path) != "."+tsm1.TSMFileExtension {
141			return nil
142		}
143
144		relPath, err := filepath.Rel(cmd.dataDir, path)
145		if err != nil {
146			return err
147		}
148		dirs := strings.Split(relPath, string(byte(os.PathSeparator)))
149		if len(dirs) < 2 {
150			return fmt.Errorf("invalid directory structure for %s", path)
151		}
152		if dirs[0] == cmd.database || cmd.database == "" {
153			if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" {
154				key := filepath.Join(dirs[0], dirs[1])
155				cmd.manifest[key] = struct{}{}
156				cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path)
157			}
158		}
159		return nil
160	})
161}
162
163func (cmd *Command) walkWALFiles() error {
164	return filepath.Walk(cmd.walDir, func(path string, f os.FileInfo, err error) error {
165		if err != nil {
166			return err
167		}
168
169		// check to see if this is a wal file
170		fileName := filepath.Base(path)
171		if filepath.Ext(path) != "."+tsm1.WALFileExtension || !strings.HasPrefix(fileName, tsm1.WALFilePrefix) {
172			return nil
173		}
174
175		relPath, err := filepath.Rel(cmd.walDir, path)
176		if err != nil {
177			return err
178		}
179		dirs := strings.Split(relPath, string(byte(os.PathSeparator)))
180		if len(dirs) < 2 {
181			return fmt.Errorf("invalid directory structure for %s", path)
182		}
183		if dirs[0] == cmd.database || cmd.database == "" {
184			if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" {
185				key := filepath.Join(dirs[0], dirs[1])
186				cmd.manifest[key] = struct{}{}
187				cmd.walFiles[key] = append(cmd.walFiles[key], path)
188			}
189		}
190		return nil
191	})
192}
193
194func (cmd *Command) writeDDL(mw io.Writer, w io.Writer) error {
195	// Write out all the DDL
196	fmt.Fprintln(mw, "# DDL")
197	for key := range cmd.manifest {
198		keys := strings.Split(key, string(os.PathSeparator))
199		db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1])
200		fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp)
201	}
202
203	return nil
204}
205
206func (cmd *Command) writeDML(mw io.Writer, w io.Writer) error {
207	fmt.Fprintln(mw, "# DML")
208	for key := range cmd.manifest {
209		keys := strings.Split(key, string(os.PathSeparator))
210		fmt.Fprintf(mw, "# CONTEXT-DATABASE:%s\n", keys[0])
211		fmt.Fprintf(mw, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1])
212		if files, ok := cmd.tsmFiles[key]; ok {
213			fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key)
214			if err := cmd.writeTsmFiles(mw, w, files); err != nil {
215				return err
216			}
217			fmt.Fprintln(cmd.Stdout, "complete.")
218		}
219		if _, ok := cmd.walFiles[key]; ok {
220			fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key)
221			if err := cmd.writeWALFiles(mw, w, cmd.walFiles[key], key); err != nil {
222				return err
223			}
224			fmt.Fprintln(cmd.Stdout, "complete.")
225		}
226	}
227
228	return nil
229}
230
231// writeFull writes the full DML and DDL to the supplied io.Writers.  mw is the
232// "meta" writer where comments and other informational writes go and w is for
233// the actual payload of the writes -- DML and DDL.
234//
235// Typically mw and w are the same but if we'd like to, for example, filter out
236// comments and other meta data, we can pass ioutil.Discard to mw to only
237// include the raw data that writeFull() generates.
238func (cmd *Command) writeFull(mw io.Writer, w io.Writer) error {
239	s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339)
240
241	fmt.Fprintf(mw, "# INFLUXDB EXPORT: %s - %s\n", s, e)
242
243	if shouldWriteDDL := !cmd.lponly; shouldWriteDDL {
244		if err := cmd.writeDDL(mw, w); err != nil {
245			return err
246		}
247	}
248
249	if err := cmd.writeDML(mw, w); err != nil {
250		return err
251	}
252
253	return nil
254}
255
256func (cmd *Command) write() error {
257	// open our output file and create an output buffer
258	f, err := os.Create(cmd.out)
259	if err != nil {
260		return err
261	}
262	defer f.Close()
263
264	// Because calling (*os.File).Write is relatively expensive,
265	// and we don't *need* to sync to disk on every written line of export,
266	// use a sized buffered writer so that we only sync the file every megabyte.
267	bw := bufio.NewWriterSize(f, 1024*1024)
268	defer bw.Flush()
269
270	var w io.Writer = bw
271
272	if cmd.compress {
273		gzw := gzip.NewWriter(w)
274		defer gzw.Close()
275		w = gzw
276	}
277
278	// mw is our "meta writer" -- the io.Writer to which meta/out-of-band data
279	// like comments will be sent.  If the lponly flag is set, mw will be
280	// ioutil.Discard which effectively filters out comments and any other
281	// non-line protocol data.
282	//
283	// Otherwise, mw is set to the same writer as the actual DDL and line
284	// protocol DML which will cause the comments to be intermixed with the
285	// data..
286	//
287	mw := w
288	if cmd.lponly {
289		mw = ioutil.Discard
290	}
291
292	return cmd.writeFull(mw, w)
293}
294
295func (cmd *Command) writeTsmFiles(mw io.Writer, w io.Writer, files []string) error {
296	fmt.Fprintln(mw, "# writing tsm data")
297
298	// we need to make sure we write the same order that the files were written
299	sort.Strings(files)
300
301	for _, f := range files {
302		if err := cmd.exportTSMFile(f, w); err != nil {
303			return err
304		}
305	}
306
307	return nil
308}
309
310func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error {
311	f, err := os.Open(tsmFilePath)
312	if err != nil {
313		if os.IsNotExist(err) {
314			fmt.Fprintf(w, "skipped missing file: %s", tsmFilePath)
315			return nil
316		}
317		return err
318	}
319	defer f.Close()
320
321	r, err := tsm1.NewTSMReader(f)
322	if err != nil {
323		fmt.Fprintf(cmd.Stderr, "unable to read %s, skipping: %s\n", tsmFilePath, err.Error())
324		return nil
325	}
326	defer r.Close()
327
328	if sgStart, sgEnd := r.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime {
329		return nil
330	}
331
332	for i := 0; i < r.KeyCount(); i++ {
333		key, _ := r.KeyAt(i)
334		values, err := r.ReadAll(key)
335		if err != nil {
336			fmt.Fprintf(cmd.Stderr, "unable to read key %q in %s, skipping: %s\n", string(key), tsmFilePath, err.Error())
337			continue
338		}
339		measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)
340		field = escape.Bytes(field)
341
342		if err := cmd.writeValues(w, measurement, string(field), values); err != nil {
343			// An error from writeValues indicates an IO error, which should be returned.
344			return err
345		}
346	}
347	return nil
348}
349
350func (cmd *Command) writeWALFiles(mw io.Writer, w io.Writer, files []string, key string) error {
351	fmt.Fprintln(mw, "# writing wal data")
352
353	// we need to make sure we write the same order that the wal received the data
354	sort.Strings(files)
355
356	var once sync.Once
357	warnDelete := func() {
358		once.Do(func() {
359			msg := fmt.Sprintf(`WARNING: detected deletes in wal file.
360Some series for %q may be brought back by replaying this data.
361To resolve, you can either let the shard snapshot prior to exporting the data
362or manually editing the exported file.
363			`, key)
364			fmt.Fprintln(cmd.Stderr, msg)
365		})
366	}
367
368	for _, f := range files {
369		if err := cmd.exportWALFile(f, w, warnDelete); err != nil {
370			return err
371		}
372	}
373
374	return nil
375}
376
377// exportWAL reads every WAL entry from r and exports it to w.
378func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete func()) error {
379	f, err := os.Open(walFilePath)
380	if err != nil {
381		if os.IsNotExist(err) {
382			fmt.Fprintf(w, "skipped missing file: %s", walFilePath)
383			return nil
384		}
385		return err
386	}
387	defer f.Close()
388
389	r := tsm1.NewWALSegmentReader(f)
390	defer r.Close()
391
392	for r.Next() {
393		entry, err := r.Read()
394		if err != nil {
395			n := r.Count()
396			fmt.Fprintf(cmd.Stderr, "file %s corrupt at position %d: %v", walFilePath, n, err)
397			break
398		}
399
400		switch t := entry.(type) {
401		case *tsm1.DeleteWALEntry, *tsm1.DeleteRangeWALEntry:
402			warnDelete()
403			continue
404		case *tsm1.WriteWALEntry:
405			for key, values := range t.Values {
406				measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key))
407				// measurements are stored escaped, field names are not
408				field = escape.Bytes(field)
409
410				if err := cmd.writeValues(w, measurement, string(field), values); err != nil {
411					// An error from writeValues indicates an IO error, which should be returned.
412					return err
413				}
414			}
415		}
416	}
417	return nil
418}
419
420// writeValues writes every value in values to w, using the given series key and field name.
421// If any call to w.Write fails, that error is returned.
422func (cmd *Command) writeValues(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error {
423	buf := []byte(string(seriesKey) + " " + field + "=")
424	prefixLen := len(buf)
425
426	for _, value := range values {
427		ts := value.UnixNano()
428		if (ts < cmd.startTime) || (ts > cmd.endTime) {
429			continue
430		}
431
432		// Re-slice buf to be "<series_key> <field>=".
433		buf = buf[:prefixLen]
434
435		// Append the correct representation of the value.
436		switch v := value.Value().(type) {
437		case float64:
438			buf = strconv.AppendFloat(buf, v, 'g', -1, 64)
439		case int64:
440			buf = strconv.AppendInt(buf, v, 10)
441			buf = append(buf, 'i')
442		case uint64:
443			buf = strconv.AppendUint(buf, v, 10)
444			buf = append(buf, 'u')
445		case bool:
446			buf = strconv.AppendBool(buf, v)
447		case string:
448			buf = append(buf, '"')
449			buf = append(buf, models.EscapeStringField(v)...)
450			buf = append(buf, '"')
451		default:
452			// This shouldn't be possible, but we'll format it anyway.
453			buf = append(buf, fmt.Sprintf("%v", v)...)
454		}
455
456		// Now buf has "<series_key> <field>=<value>".
457		// Append the timestamp and a newline, then write it.
458		buf = append(buf, ' ')
459		buf = strconv.AppendInt(buf, ts, 10)
460		buf = append(buf, '\n')
461		if _, err := w.Write(buf); err != nil {
462			// Underlying IO error needs to be returned.
463			return err
464		}
465	}
466
467	return nil
468}
469