1// Command influx_tsm converts b1 or bz1 shards (from InfluxDB releases earlier than v0.11) 2// to the current tsm1 format. 3package main 4 5import ( 6 "bufio" 7 "errors" 8 "flag" 9 "fmt" 10 "io" 11 "io/ioutil" 12 "log" 13 "os" 14 "path/filepath" 15 "runtime" 16 "runtime/pprof" 17 "sort" 18 "strings" 19 "text/tabwriter" 20 "time" 21 22 "net/http" 23 _ "net/http/pprof" 24 25 "github.com/influxdata/influxdb/cmd/influx_tsm/b1" 26 "github.com/influxdata/influxdb/cmd/influx_tsm/bz1" 27 "github.com/influxdata/influxdb/cmd/influx_tsm/tsdb" 28) 29 30// ShardReader reads b* shards and converts to tsm shards 31type ShardReader interface { 32 KeyIterator 33 Open() error 34 Close() error 35} 36 37const ( 38 tsmExt = "tsm" 39) 40 41var description = ` 42Convert a database from b1 or bz1 format to tsm1 format. 43 44This tool will backup the directories before conversion (if not disabled). 45The backed-up files must be removed manually, generally after starting up the 46node again to make sure all of data has been converted correctly. 47 48To restore a backup: 49 Shut down the node, remove the converted directory, and 50 copy the backed-up directory to the original location.` 51 52type options struct { 53 DataPath string 54 BackupPath string 55 DBs []string 56 DebugAddr string 57 TSMSize uint64 58 Parallel bool 59 SkipBackup bool 60 UpdateInterval time.Duration 61 Yes bool 62 CPUFile string 63} 64 65func (o *options) Parse() error { 66 fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError) 67 68 var dbs string 69 70 fs.StringVar(&dbs, "dbs", "", "Comma-delimited list of databases to convert. Default is to convert all databases.") 71 fs.Uint64Var(&opts.TSMSize, "sz", maxTSMSz, "Maximum size of individual TSM files.") 72 fs.BoolVar(&opts.Parallel, "parallel", false, "Perform parallel conversion. (up to GOMAXPROCS shards at once)") 73 fs.BoolVar(&opts.SkipBackup, "nobackup", false, "Disable database backups. Not recommended.") 74 fs.StringVar(&opts.BackupPath, "backup", "", "The location to backup up the current databases. Must not be within the data directory.") 75 fs.StringVar(&opts.DebugAddr, "debug", "", "If set, http debugging endpoints will be enabled on the given address") 76 fs.DurationVar(&opts.UpdateInterval, "interval", 5*time.Second, "How often status updates are printed.") 77 fs.BoolVar(&opts.Yes, "y", false, "Don't ask, just convert") 78 fs.StringVar(&opts.CPUFile, "profile", "", "CPU Profile location") 79 fs.Usage = func() { 80 fmt.Fprintf(os.Stderr, "Usage: %v [options] <data-path> \n", os.Args[0]) 81 fmt.Fprintf(os.Stderr, "%v\n\nOptions:\n", description) 82 fs.PrintDefaults() 83 fmt.Fprintf(os.Stderr, "\n") 84 } 85 86 if err := fs.Parse(os.Args[1:]); err != nil { 87 return err 88 } 89 90 if len(fs.Args()) < 1 { 91 return errors.New("no data directory specified") 92 } 93 var err error 94 if o.DataPath, err = filepath.Abs(fs.Args()[0]); err != nil { 95 return err 96 } 97 if o.DataPath, err = filepath.EvalSymlinks(filepath.Clean(o.DataPath)); err != nil { 98 return err 99 } 100 101 if o.TSMSize > maxTSMSz { 102 return fmt.Errorf("bad TSM file size, maximum TSM file size is %d", maxTSMSz) 103 } 104 105 // Check if specific databases were requested. 106 o.DBs = strings.Split(dbs, ",") 107 if len(o.DBs) == 1 && o.DBs[0] == "" { 108 o.DBs = nil 109 } 110 111 if !o.SkipBackup { 112 if o.BackupPath == "" { 113 return errors.New("either -nobackup or -backup DIR must be set") 114 } 115 if o.BackupPath, err = filepath.Abs(o.BackupPath); err != nil { 116 return err 117 } 118 if o.BackupPath, err = filepath.EvalSymlinks(filepath.Clean(o.BackupPath)); err != nil { 119 if os.IsNotExist(err) { 120 return errors.New("backup directory must already exist") 121 } 122 return err 123 } 124 125 if strings.HasPrefix(o.BackupPath, o.DataPath) { 126 fmt.Println(o.BackupPath, o.DataPath) 127 return errors.New("backup directory cannot be contained within data directory") 128 } 129 } 130 131 if o.DebugAddr != "" { 132 log.Printf("Starting debugging server on http://%v", o.DebugAddr) 133 go func() { 134 log.Fatal(http.ListenAndServe(o.DebugAddr, nil)) 135 }() 136 } 137 138 return nil 139} 140 141var opts options 142 143const maxTSMSz uint64 = 2 * 1024 * 1024 * 1024 144 145func init() { 146 log.SetOutput(os.Stderr) 147 log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds) 148} 149 150func main() { 151 if err := opts.Parse(); err != nil { 152 log.Fatal(err) 153 } 154 155 // Determine the list of databases 156 dbs, err := ioutil.ReadDir(opts.DataPath) 157 if err != nil { 158 log.Fatalf("failed to access data directory at %v: %v\n", opts.DataPath, err) 159 } 160 fmt.Println() // Cleanly separate output from start of program. 161 162 if opts.Parallel { 163 if !isEnvSet("GOMAXPROCS") { 164 // Only modify GOMAXPROCS if it wasn't set in the environment 165 // This means 'GOMAXPROCS=1 influx_tsm -parallel' will not actually 166 // run in parallel 167 runtime.GOMAXPROCS(runtime.NumCPU()) 168 } 169 } 170 171 var badUser string 172 if opts.SkipBackup { 173 badUser = "(NOT RECOMMENDED)" 174 } 175 176 // Dump summary of what is about to happen. 177 fmt.Println("b1 and bz1 shard conversion.") 178 fmt.Println("-----------------------------------") 179 fmt.Println("Data directory is: ", opts.DataPath) 180 if !opts.SkipBackup { 181 fmt.Println("Backup directory is: ", opts.BackupPath) 182 } 183 fmt.Println("Databases specified: ", allDBs(opts.DBs)) 184 fmt.Println("Database backups enabled: ", yesno(!opts.SkipBackup), badUser) 185 fmt.Printf("Parallel mode enabled (GOMAXPROCS): %s (%d)\n", yesno(opts.Parallel), runtime.GOMAXPROCS(0)) 186 fmt.Println() 187 188 shards := collectShards(dbs) 189 190 // Anything to convert? 191 fmt.Printf("\nFound %d shards that will be converted.\n", len(shards)) 192 if len(shards) == 0 { 193 fmt.Println("Nothing to do.") 194 return 195 } 196 197 // Display list of convertible shards. 198 fmt.Println() 199 w := new(tabwriter.Writer) 200 w.Init(os.Stdout, 0, 8, 1, '\t', 0) 201 fmt.Fprintln(w, "Database\tRetention\tPath\tEngine\tSize") 202 for _, si := range shards { 203 fmt.Fprintf(w, "%v\t%v\t%v\t%v\t%d\n", si.Database, si.RetentionPolicy, si.FullPath(opts.DataPath), si.FormatAsString(), si.Size) 204 } 205 w.Flush() 206 207 if !opts.Yes { 208 // Get confirmation from user. 209 fmt.Printf("\nThese shards will be converted. Proceed? y/N: ") 210 liner := bufio.NewReader(os.Stdin) 211 yn, err := liner.ReadString('\n') 212 if err != nil { 213 log.Fatalf("failed to read response: %v", err) 214 } 215 yn = strings.TrimRight(strings.ToLower(yn), "\n") 216 if yn != "y" { 217 log.Fatal("Conversion aborted.") 218 } 219 } 220 fmt.Println("Conversion starting....") 221 222 if opts.CPUFile != "" { 223 f, err := os.Create(opts.CPUFile) 224 if err != nil { 225 log.Fatal(err) 226 } 227 if err = pprof.StartCPUProfile(f); err != nil { 228 log.Fatal(err) 229 } 230 defer pprof.StopCPUProfile() 231 } 232 233 tr := newTracker(shards, opts) 234 235 if err := tr.Run(); err != nil { 236 log.Fatalf("Error occurred preventing completion: %v\n", err) 237 } 238 239 tr.PrintStats() 240} 241 242func collectShards(dbs []os.FileInfo) tsdb.ShardInfos { 243 // Get the list of shards for conversion. 244 var shards tsdb.ShardInfos 245 for _, db := range dbs { 246 d := tsdb.NewDatabase(filepath.Join(opts.DataPath, db.Name())) 247 shs, err := d.Shards() 248 if err != nil { 249 log.Fatalf("Failed to access shards for database %v: %v\n", d.Name(), err) 250 } 251 shards = append(shards, shs...) 252 } 253 254 sort.Sort(shards) 255 shards = shards.FilterFormat(tsdb.TSM1) 256 if len(dbs) > 0 { 257 shards = shards.ExclusiveDatabases(opts.DBs) 258 } 259 260 return shards 261} 262 263// backupDatabase backs up the database named db 264func backupDatabase(db string) error { 265 copyFile := func(path string, info os.FileInfo, err error) error { 266 if err != nil { 267 return err 268 } 269 270 // Strip the DataPath from the path and replace with BackupPath. 271 toPath := strings.Replace(path, opts.DataPath, opts.BackupPath, 1) 272 273 if info.IsDir() { 274 return os.MkdirAll(toPath, info.Mode()) 275 } 276 277 in, err := os.Open(path) 278 if err != nil { 279 return err 280 } 281 defer in.Close() 282 283 srcInfo, err := os.Stat(path) 284 if err != nil { 285 return err 286 } 287 288 out, err := os.OpenFile(toPath, os.O_CREATE|os.O_WRONLY, info.Mode()) 289 if err != nil { 290 return err 291 } 292 defer out.Close() 293 294 dstInfo, err := os.Stat(toPath) 295 if err != nil { 296 return err 297 } 298 299 if dstInfo.Size() == srcInfo.Size() { 300 log.Printf("Backup file already found for %v with correct size, skipping.", path) 301 return nil 302 } 303 304 if dstInfo.Size() > srcInfo.Size() { 305 log.Printf("Invalid backup file found for %v, replacing with good copy.", path) 306 if err := out.Truncate(0); err != nil { 307 return err 308 } 309 if _, err := out.Seek(0, io.SeekStart); err != nil { 310 return err 311 } 312 } 313 314 if dstInfo.Size() > 0 { 315 log.Printf("Resuming backup of file %v, starting at %v bytes", path, dstInfo.Size()) 316 } 317 318 off, err := out.Seek(0, io.SeekEnd) 319 if err != nil { 320 return err 321 } 322 if _, err := in.Seek(off, io.SeekStart); err != nil { 323 return err 324 } 325 326 log.Printf("Backing up file %v", path) 327 328 _, err = io.Copy(out, in) 329 330 return err 331 } 332 333 return filepath.Walk(filepath.Join(opts.DataPath, db), copyFile) 334} 335 336// convertShard converts the shard in-place. 337func convertShard(si *tsdb.ShardInfo, tr *tracker) error { 338 src := si.FullPath(opts.DataPath) 339 dst := fmt.Sprintf("%v.%v", src, tsmExt) 340 341 var reader ShardReader 342 switch si.Format { 343 case tsdb.BZ1: 344 reader = bz1.NewReader(src, &tr.Stats, 0) 345 case tsdb.B1: 346 reader = b1.NewReader(src, &tr.Stats, 0) 347 default: 348 return fmt.Errorf("Unsupported shard format: %v", si.FormatAsString()) 349 } 350 351 // Open the shard, and create a converter. 352 if err := reader.Open(); err != nil { 353 return fmt.Errorf("Failed to open %v for conversion: %v", src, err) 354 } 355 defer reader.Close() 356 converter := NewConverter(dst, uint32(opts.TSMSize), &tr.Stats) 357 358 // Perform the conversion. 359 if err := converter.Process(reader); err != nil { 360 return fmt.Errorf("Conversion of %v failed: %v", src, err) 361 } 362 363 // Delete source shard, and rename new tsm1 shard. 364 if err := reader.Close(); err != nil { 365 return fmt.Errorf("Conversion of %v failed due to close: %v", src, err) 366 } 367 368 if err := os.RemoveAll(si.FullPath(opts.DataPath)); err != nil { 369 return fmt.Errorf("Deletion of %v failed: %v", src, err) 370 } 371 if err := os.Rename(dst, src); err != nil { 372 return fmt.Errorf("Rename of %v to %v failed: %v", dst, src, err) 373 } 374 375 return nil 376} 377 378// ParallelGroup allows the maximum parrallelism of a set of operations to be controlled. 379type ParallelGroup chan struct{} 380 381// NewParallelGroup returns a group which allows n operations to run in parallel. A value of 0 382// means no operations will ever run. 383func NewParallelGroup(n int) ParallelGroup { 384 return make(chan struct{}, n) 385} 386 387// Do executes one operation of the ParallelGroup 388func (p ParallelGroup) Do(f func()) { 389 p <- struct{}{} // acquire working slot 390 defer func() { <-p }() 391 392 f() 393} 394 395// yesno returns "yes" for true, "no" for false. 396func yesno(b bool) string { 397 if b { 398 return "yes" 399 } 400 return "no" 401} 402 403// allDBs returns "all" if all databases are requested for conversion. 404func allDBs(dbs []string) string { 405 if dbs == nil { 406 return "all" 407 } 408 return fmt.Sprintf("%v", dbs) 409} 410 411// isEnvSet checks to see if a variable was set in the environment 412func isEnvSet(name string) bool { 413 for _, s := range os.Environ() { 414 if strings.SplitN(s, "=", 2)[0] == name { 415 return true 416 } 417 } 418 return false 419} 420