1// Package export exports TSM files into InfluxDB line protocol format. 2package export 3 4import ( 5 "bufio" 6 "compress/gzip" 7 "flag" 8 "fmt" 9 "io" 10 "io/ioutil" 11 "math" 12 "os" 13 "path/filepath" 14 "sort" 15 "strconv" 16 "strings" 17 "sync" 18 "time" 19 20 "github.com/influxdata/influxdb/models" 21 "github.com/influxdata/influxdb/pkg/escape" 22 "github.com/influxdata/influxdb/tsdb/engine/tsm1" 23 "github.com/influxdata/influxql" 24) 25 26// Command represents the program execution for "influx_inspect export". 27type Command struct { 28 // Standard input/output, overridden for testing. 29 Stderr io.Writer 30 Stdout io.Writer 31 32 dataDir string 33 walDir string 34 out string 35 database string 36 retentionPolicy string 37 startTime int64 38 endTime int64 39 compress bool 40 lponly bool 41 42 manifest map[string]struct{} 43 tsmFiles map[string][]string 44 walFiles map[string][]string 45} 46 47// NewCommand returns a new instance of Command. 48func NewCommand() *Command { 49 return &Command{ 50 Stderr: os.Stderr, 51 Stdout: os.Stdout, 52 53 manifest: make(map[string]struct{}), 54 tsmFiles: make(map[string][]string), 55 walFiles: make(map[string][]string), 56 } 57} 58 59// Run executes the command. 60func (cmd *Command) Run(args ...string) error { 61 var start, end string 62 fs := flag.NewFlagSet("export", flag.ExitOnError) 63 fs.StringVar(&cmd.dataDir, "datadir", os.Getenv("HOME")+"/.influxdb/data", "Data storage path") 64 fs.StringVar(&cmd.walDir, "waldir", os.Getenv("HOME")+"/.influxdb/wal", "WAL storage path") 65 fs.StringVar(&cmd.out, "out", os.Getenv("HOME")+"/.influxdb/export", "Destination file to export to") 66 fs.StringVar(&cmd.database, "database", "", "Optional: the database to export") 67 fs.StringVar(&cmd.retentionPolicy, "retention", "", "Optional: the retention policy to export (requires -database)") 68 fs.StringVar(&start, "start", "", "Optional: the start time to export (RFC3339 format)") 69 fs.StringVar(&end, "end", "", "Optional: the end time to export (RFC3339 format)") 70 fs.BoolVar(&cmd.lponly, "lponly", false, "Only export line protocol") 71 fs.BoolVar(&cmd.compress, "compress", false, "Compress the output") 72 73 fs.SetOutput(cmd.Stdout) 74 fs.Usage = func() { 75 fmt.Fprintf(cmd.Stdout, "Exports TSM files into InfluxDB line protocol format.\n\n") 76 fmt.Fprintf(cmd.Stdout, "Usage: %s export [flags]\n\n", filepath.Base(os.Args[0])) 77 fs.PrintDefaults() 78 } 79 80 if err := fs.Parse(args); err != nil { 81 return err 82 } 83 84 // set defaults 85 if start != "" { 86 s, err := time.Parse(time.RFC3339, start) 87 if err != nil { 88 return err 89 } 90 cmd.startTime = s.UnixNano() 91 } else { 92 cmd.startTime = math.MinInt64 93 } 94 if end != "" { 95 e, err := time.Parse(time.RFC3339, end) 96 if err != nil { 97 return err 98 } 99 cmd.endTime = e.UnixNano() 100 } else { 101 // set end time to max if it is not set. 102 cmd.endTime = math.MaxInt64 103 } 104 105 if err := cmd.validate(); err != nil { 106 return err 107 } 108 109 return cmd.export() 110} 111 112func (cmd *Command) validate() error { 113 if cmd.retentionPolicy != "" && cmd.database == "" { 114 return fmt.Errorf("must specify a db") 115 } 116 if cmd.startTime != 0 && cmd.endTime != 0 && cmd.endTime < cmd.startTime { 117 return fmt.Errorf("end time before start time") 118 } 119 return nil 120} 121 122func (cmd *Command) export() error { 123 if err := cmd.walkTSMFiles(); err != nil { 124 return err 125 } 126 if err := cmd.walkWALFiles(); err != nil { 127 return err 128 } 129 130 return cmd.write() 131} 132 133func (cmd *Command) walkTSMFiles() error { 134 return filepath.Walk(cmd.dataDir, func(path string, f os.FileInfo, err error) error { 135 if err != nil { 136 return err 137 } 138 139 // check to see if this is a tsm file 140 if filepath.Ext(path) != "."+tsm1.TSMFileExtension { 141 return nil 142 } 143 144 relPath, err := filepath.Rel(cmd.dataDir, path) 145 if err != nil { 146 return err 147 } 148 dirs := strings.Split(relPath, string(byte(os.PathSeparator))) 149 if len(dirs) < 2 { 150 return fmt.Errorf("invalid directory structure for %s", path) 151 } 152 if dirs[0] == cmd.database || cmd.database == "" { 153 if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { 154 key := filepath.Join(dirs[0], dirs[1]) 155 cmd.manifest[key] = struct{}{} 156 cmd.tsmFiles[key] = append(cmd.tsmFiles[key], path) 157 } 158 } 159 return nil 160 }) 161} 162 163func (cmd *Command) walkWALFiles() error { 164 return filepath.Walk(cmd.walDir, func(path string, f os.FileInfo, err error) error { 165 if err != nil { 166 return err 167 } 168 169 // check to see if this is a wal file 170 fileName := filepath.Base(path) 171 if filepath.Ext(path) != "."+tsm1.WALFileExtension || !strings.HasPrefix(fileName, tsm1.WALFilePrefix) { 172 return nil 173 } 174 175 relPath, err := filepath.Rel(cmd.walDir, path) 176 if err != nil { 177 return err 178 } 179 dirs := strings.Split(relPath, string(byte(os.PathSeparator))) 180 if len(dirs) < 2 { 181 return fmt.Errorf("invalid directory structure for %s", path) 182 } 183 if dirs[0] == cmd.database || cmd.database == "" { 184 if dirs[1] == cmd.retentionPolicy || cmd.retentionPolicy == "" { 185 key := filepath.Join(dirs[0], dirs[1]) 186 cmd.manifest[key] = struct{}{} 187 cmd.walFiles[key] = append(cmd.walFiles[key], path) 188 } 189 } 190 return nil 191 }) 192} 193 194func (cmd *Command) writeDDL(mw io.Writer, w io.Writer) error { 195 // Write out all the DDL 196 fmt.Fprintln(mw, "# DDL") 197 for key := range cmd.manifest { 198 keys := strings.Split(key, string(os.PathSeparator)) 199 db, rp := influxql.QuoteIdent(keys[0]), influxql.QuoteIdent(keys[1]) 200 fmt.Fprintf(w, "CREATE DATABASE %s WITH NAME %s\n", db, rp) 201 } 202 203 return nil 204} 205 206func (cmd *Command) writeDML(mw io.Writer, w io.Writer) error { 207 fmt.Fprintln(mw, "# DML") 208 for key := range cmd.manifest { 209 keys := strings.Split(key, string(os.PathSeparator)) 210 fmt.Fprintf(mw, "# CONTEXT-DATABASE:%s\n", keys[0]) 211 fmt.Fprintf(mw, "# CONTEXT-RETENTION-POLICY:%s\n", keys[1]) 212 if files, ok := cmd.tsmFiles[key]; ok { 213 fmt.Fprintf(cmd.Stdout, "writing out tsm file data for %s...", key) 214 if err := cmd.writeTsmFiles(mw, w, files); err != nil { 215 return err 216 } 217 fmt.Fprintln(cmd.Stdout, "complete.") 218 } 219 if _, ok := cmd.walFiles[key]; ok { 220 fmt.Fprintf(cmd.Stdout, "writing out wal file data for %s...", key) 221 if err := cmd.writeWALFiles(mw, w, cmd.walFiles[key], key); err != nil { 222 return err 223 } 224 fmt.Fprintln(cmd.Stdout, "complete.") 225 } 226 } 227 228 return nil 229} 230 231// writeFull writes the full DML and DDL to the supplied io.Writers. mw is the 232// "meta" writer where comments and other informational writes go and w is for 233// the actual payload of the writes -- DML and DDL. 234// 235// Typically mw and w are the same but if we'd like to, for example, filter out 236// comments and other meta data, we can pass ioutil.Discard to mw to only 237// include the raw data that writeFull() generates. 238func (cmd *Command) writeFull(mw io.Writer, w io.Writer) error { 239 s, e := time.Unix(0, cmd.startTime).Format(time.RFC3339), time.Unix(0, cmd.endTime).Format(time.RFC3339) 240 241 fmt.Fprintf(mw, "# INFLUXDB EXPORT: %s - %s\n", s, e) 242 243 if shouldWriteDDL := !cmd.lponly; shouldWriteDDL { 244 if err := cmd.writeDDL(mw, w); err != nil { 245 return err 246 } 247 } 248 249 if err := cmd.writeDML(mw, w); err != nil { 250 return err 251 } 252 253 return nil 254} 255 256func (cmd *Command) write() error { 257 // open our output file and create an output buffer 258 f, err := os.Create(cmd.out) 259 if err != nil { 260 return err 261 } 262 defer f.Close() 263 264 // Because calling (*os.File).Write is relatively expensive, 265 // and we don't *need* to sync to disk on every written line of export, 266 // use a sized buffered writer so that we only sync the file every megabyte. 267 bw := bufio.NewWriterSize(f, 1024*1024) 268 defer bw.Flush() 269 270 var w io.Writer = bw 271 272 if cmd.compress { 273 gzw := gzip.NewWriter(w) 274 defer gzw.Close() 275 w = gzw 276 } 277 278 // mw is our "meta writer" -- the io.Writer to which meta/out-of-band data 279 // like comments will be sent. If the lponly flag is set, mw will be 280 // ioutil.Discard which effectively filters out comments and any other 281 // non-line protocol data. 282 // 283 // Otherwise, mw is set to the same writer as the actual DDL and line 284 // protocol DML which will cause the comments to be intermixed with the 285 // data.. 286 // 287 mw := w 288 if cmd.lponly { 289 mw = ioutil.Discard 290 } 291 292 return cmd.writeFull(mw, w) 293} 294 295func (cmd *Command) writeTsmFiles(mw io.Writer, w io.Writer, files []string) error { 296 fmt.Fprintln(mw, "# writing tsm data") 297 298 // we need to make sure we write the same order that the files were written 299 sort.Strings(files) 300 301 for _, f := range files { 302 if err := cmd.exportTSMFile(f, w); err != nil { 303 return err 304 } 305 } 306 307 return nil 308} 309 310func (cmd *Command) exportTSMFile(tsmFilePath string, w io.Writer) error { 311 f, err := os.Open(tsmFilePath) 312 if err != nil { 313 if os.IsNotExist(err) { 314 fmt.Fprintf(w, "skipped missing file: %s", tsmFilePath) 315 return nil 316 } 317 return err 318 } 319 defer f.Close() 320 321 r, err := tsm1.NewTSMReader(f) 322 if err != nil { 323 fmt.Fprintf(cmd.Stderr, "unable to read %s, skipping: %s\n", tsmFilePath, err.Error()) 324 return nil 325 } 326 defer r.Close() 327 328 if sgStart, sgEnd := r.TimeRange(); sgStart > cmd.endTime || sgEnd < cmd.startTime { 329 return nil 330 } 331 332 for i := 0; i < r.KeyCount(); i++ { 333 key, _ := r.KeyAt(i) 334 values, err := r.ReadAll(key) 335 if err != nil { 336 fmt.Fprintf(cmd.Stderr, "unable to read key %q in %s, skipping: %s\n", string(key), tsmFilePath, err.Error()) 337 continue 338 } 339 measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key) 340 field = escape.Bytes(field) 341 342 if err := cmd.writeValues(w, measurement, string(field), values); err != nil { 343 // An error from writeValues indicates an IO error, which should be returned. 344 return err 345 } 346 } 347 return nil 348} 349 350func (cmd *Command) writeWALFiles(mw io.Writer, w io.Writer, files []string, key string) error { 351 fmt.Fprintln(mw, "# writing wal data") 352 353 // we need to make sure we write the same order that the wal received the data 354 sort.Strings(files) 355 356 var once sync.Once 357 warnDelete := func() { 358 once.Do(func() { 359 msg := fmt.Sprintf(`WARNING: detected deletes in wal file. 360Some series for %q may be brought back by replaying this data. 361To resolve, you can either let the shard snapshot prior to exporting the data 362or manually editing the exported file. 363 `, key) 364 fmt.Fprintln(cmd.Stderr, msg) 365 }) 366 } 367 368 for _, f := range files { 369 if err := cmd.exportWALFile(f, w, warnDelete); err != nil { 370 return err 371 } 372 } 373 374 return nil 375} 376 377// exportWAL reads every WAL entry from r and exports it to w. 378func (cmd *Command) exportWALFile(walFilePath string, w io.Writer, warnDelete func()) error { 379 f, err := os.Open(walFilePath) 380 if err != nil { 381 if os.IsNotExist(err) { 382 fmt.Fprintf(w, "skipped missing file: %s", walFilePath) 383 return nil 384 } 385 return err 386 } 387 defer f.Close() 388 389 r := tsm1.NewWALSegmentReader(f) 390 defer r.Close() 391 392 for r.Next() { 393 entry, err := r.Read() 394 if err != nil { 395 n := r.Count() 396 fmt.Fprintf(cmd.Stderr, "file %s corrupt at position %d: %v", walFilePath, n, err) 397 break 398 } 399 400 switch t := entry.(type) { 401 case *tsm1.DeleteWALEntry, *tsm1.DeleteRangeWALEntry: 402 warnDelete() 403 continue 404 case *tsm1.WriteWALEntry: 405 for key, values := range t.Values { 406 measurement, field := tsm1.SeriesAndFieldFromCompositeKey([]byte(key)) 407 // measurements are stored escaped, field names are not 408 field = escape.Bytes(field) 409 410 if err := cmd.writeValues(w, measurement, string(field), values); err != nil { 411 // An error from writeValues indicates an IO error, which should be returned. 412 return err 413 } 414 } 415 } 416 } 417 return nil 418} 419 420// writeValues writes every value in values to w, using the given series key and field name. 421// If any call to w.Write fails, that error is returned. 422func (cmd *Command) writeValues(w io.Writer, seriesKey []byte, field string, values []tsm1.Value) error { 423 buf := []byte(string(seriesKey) + " " + field + "=") 424 prefixLen := len(buf) 425 426 for _, value := range values { 427 ts := value.UnixNano() 428 if (ts < cmd.startTime) || (ts > cmd.endTime) { 429 continue 430 } 431 432 // Re-slice buf to be "<series_key> <field>=". 433 buf = buf[:prefixLen] 434 435 // Append the correct representation of the value. 436 switch v := value.Value().(type) { 437 case float64: 438 buf = strconv.AppendFloat(buf, v, 'g', -1, 64) 439 case int64: 440 buf = strconv.AppendInt(buf, v, 10) 441 buf = append(buf, 'i') 442 case uint64: 443 buf = strconv.AppendUint(buf, v, 10) 444 buf = append(buf, 'u') 445 case bool: 446 buf = strconv.AppendBool(buf, v) 447 case string: 448 buf = append(buf, '"') 449 buf = append(buf, models.EscapeStringField(v)...) 450 buf = append(buf, '"') 451 default: 452 // This shouldn't be possible, but we'll format it anyway. 453 buf = append(buf, fmt.Sprintf("%v", v)...) 454 } 455 456 // Now buf has "<series_key> <field>=<value>". 457 // Append the timestamp and a newline, then write it. 458 buf = append(buf, ' ') 459 buf = strconv.AppendInt(buf, ts, 10) 460 buf = append(buf, '\n') 461 if _, err := w.Write(buf); err != nil { 462 // Underlying IO error needs to be returned. 463 return err 464 } 465 } 466 467 return nil 468} 469