1// Package snapshotter provides the meta snapshot service. 2package snapshotter // import "github.com/influxdata/influxdb/services/snapshotter" 3 4import ( 5 "bytes" 6 "encoding" 7 "encoding/binary" 8 "encoding/json" 9 "fmt" 10 "io" 11 "net" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/influxdata/influxdb" 17 "github.com/influxdata/influxdb/services/meta" 18 "github.com/influxdata/influxdb/tsdb" 19 "go.uber.org/zap" 20) 21 22const ( 23 // MuxHeader is the header byte used for the TCP muxer. 24 MuxHeader = 3 25 26 // BackupMagicHeader is the first 8 bytes used to identify and validate 27 // a metastore backup file 28 BackupMagicHeader = 0x59590101 29) 30 31// Service manages the listener for the snapshot endpoint. 32type Service struct { 33 wg sync.WaitGroup 34 35 Node *influxdb.Node 36 37 MetaClient interface { 38 encoding.BinaryMarshaler 39 Database(name string) *meta.DatabaseInfo 40 } 41 42 TSDBStore interface { 43 BackupShard(id uint64, since time.Time, w io.Writer) error 44 ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error 45 Shard(id uint64) *tsdb.Shard 46 ShardRelativePath(id uint64) (string, error) 47 SetShardEnabled(shardID uint64, enabled bool) error 48 RestoreShard(id uint64, r io.Reader) error 49 CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error 50 } 51 52 Listener net.Listener 53 Logger *zap.Logger 54} 55 56// NewService returns a new instance of Service. 57func NewService() *Service { 58 return &Service{ 59 Logger: zap.NewNop(), 60 } 61} 62 63// Open starts the service. 64func (s *Service) Open() error { 65 s.Logger.Info("Starting snapshot service") 66 67 s.wg.Add(1) 68 go s.serve() 69 return nil 70} 71 72// Close implements the Service interface. 73func (s *Service) Close() error { 74 if s.Listener != nil { 75 if err := s.Listener.Close(); err != nil { 76 return err 77 } 78 } 79 s.wg.Wait() 80 return nil 81} 82 83// WithLogger sets the logger on the service. 84func (s *Service) WithLogger(log *zap.Logger) { 85 s.Logger = log.With(zap.String("service", "snapshot")) 86} 87 88// serve serves snapshot requests from the listener. 89func (s *Service) serve() { 90 defer s.wg.Done() 91 92 for { 93 // Wait for next connection. 94 conn, err := s.Listener.Accept() 95 if err != nil && strings.Contains(err.Error(), "connection closed") { 96 s.Logger.Info("Listener closed") 97 return 98 } else if err != nil { 99 s.Logger.Info("Error accepting snapshot request", zap.Error(err)) 100 continue 101 } 102 103 // Handle connection in separate goroutine. 104 s.wg.Add(1) 105 go func(conn net.Conn) { 106 defer s.wg.Done() 107 defer conn.Close() 108 if err := s.handleConn(conn); err != nil { 109 s.Logger.Info(err.Error()) 110 } 111 }(conn) 112 } 113} 114 115// handleConn processes conn. This is run in a separate goroutine. 116func (s *Service) handleConn(conn net.Conn) error { 117 var typ [1]byte 118 119 _, err := conn.Read(typ[:]) 120 if err != nil { 121 return err 122 } 123 124 if RequestType(typ[0]) == RequestShardUpdate { 125 return s.updateShardsLive(conn) 126 } 127 128 r, bytes, err := s.readRequest(conn) 129 if err != nil { 130 return fmt.Errorf("read request: %s", err) 131 } 132 133 switch RequestType(typ[0]) { 134 case RequestShardBackup: 135 if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil { 136 return err 137 } 138 case RequestShardExport: 139 if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil { 140 return err 141 } 142 case RequestMetastoreBackup: 143 if err := s.writeMetaStore(conn); err != nil { 144 return err 145 } 146 case RequestDatabaseInfo: 147 return s.writeDatabaseInfo(conn, r.BackupDatabase) 148 case RequestRetentionPolicyInfo: 149 return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy) 150 case RequestMetaStoreUpdate: 151 return s.updateMetaStore(conn, bytes, r.BackupDatabase, r.RestoreDatabase, r.BackupRetentionPolicy, r.RestoreRetentionPolicy) 152 default: 153 return fmt.Errorf("request type unknown: %v", r.Type) 154 } 155 156 return nil 157} 158 159func (s *Service) updateShardsLive(conn net.Conn) error { 160 var sidBytes [8]byte 161 _, err := conn.Read(sidBytes[:]) 162 if err != nil { 163 return err 164 } 165 sid := binary.BigEndian.Uint64(sidBytes[:]) 166 167 if err := s.TSDBStore.SetShardEnabled(sid, false); err != nil { 168 return err 169 } 170 defer s.TSDBStore.SetShardEnabled(sid, true) 171 172 return s.TSDBStore.RestoreShard(sid, conn) 173} 174 175func (s *Service) updateMetaStore(conn net.Conn, bits []byte, backupDBName, restoreDBName, backupRPName, restoreRPName string) error { 176 md := meta.Data{} 177 err := md.UnmarshalBinary(bits) 178 if err != nil { 179 if err := s.respondIDMap(conn, map[uint64]uint64{}); err != nil { 180 return err 181 } 182 return fmt.Errorf("failed to decode meta: %s", err) 183 } 184 185 data := s.MetaClient.(*meta.Client).Data() 186 187 IDMap, newDBs, err := data.ImportData(md, backupDBName, restoreDBName, backupRPName, restoreRPName) 188 if err != nil { 189 if err := s.respondIDMap(conn, map[uint64]uint64{}); err != nil { 190 return err 191 } 192 return err 193 } 194 195 err = s.MetaClient.(*meta.Client).SetData(&data) 196 if err != nil { 197 return err 198 } 199 200 err = s.createNewDBShards(data, newDBs) 201 if err != nil { 202 return err 203 } 204 205 err = s.respondIDMap(conn, IDMap) 206 return err 207} 208 209// iterate over a list of newDB's that should have just been added to the metadata 210// If the db was not created in the metadata return an error. 211// None of the shards should exist on a new DB, and CreateShard protects against double-creation. 212func (s *Service) createNewDBShards(data meta.Data, newDBs []string) error { 213 for _, restoreDBName := range newDBs { 214 dbi := data.Database(restoreDBName) 215 if dbi == nil { 216 return fmt.Errorf("db %s not found when creating new db shards", restoreDBName) 217 } 218 for _, rpi := range dbi.RetentionPolicies { 219 for _, sgi := range rpi.ShardGroups { 220 for _, shard := range sgi.Shards { 221 err := s.TSDBStore.CreateShard(restoreDBName, rpi.Name, shard.ID, true) 222 if err != nil { 223 return err 224 } 225 } 226 } 227 } 228 } 229 return nil 230} 231 232// send the IDMapping based on the metadata from the source server vs the shard ID 233// metadata on this server. Sends back [BackupMagicHeader,0] if there's no mapped 234// values, signaling that nothing should be imported. 235func (s *Service) respondIDMap(conn net.Conn, IDMap map[uint64]uint64) error { 236 npairs := len(IDMap) 237 // 2 information ints, then npairs of 8byte ints. 238 numBytes := make([]byte, (npairs+1)*16) 239 240 binary.BigEndian.PutUint64(numBytes[:8], BackupMagicHeader) 241 binary.BigEndian.PutUint64(numBytes[8:16], uint64(npairs)) 242 next := 16 243 for k, v := range IDMap { 244 binary.BigEndian.PutUint64(numBytes[next:next+8], k) 245 binary.BigEndian.PutUint64(numBytes[next+8:next+16], v) 246 next += 16 247 } 248 249 _, err := conn.Write(numBytes[:]) 250 return err 251} 252 253func (s *Service) writeMetaStore(conn net.Conn) error { 254 // Retrieve and serialize the current meta data. 255 metaBlob, err := s.MetaClient.MarshalBinary() 256 if err != nil { 257 return fmt.Errorf("marshal meta: %s", err) 258 } 259 260 var nodeBytes bytes.Buffer 261 if err := json.NewEncoder(&nodeBytes).Encode(s.Node); err != nil { 262 return err 263 } 264 265 var numBytes [24]byte 266 267 binary.BigEndian.PutUint64(numBytes[:8], BackupMagicHeader) 268 binary.BigEndian.PutUint64(numBytes[8:16], uint64(len(metaBlob))) 269 binary.BigEndian.PutUint64(numBytes[16:24], uint64(nodeBytes.Len())) 270 271 // backup header followed by meta blob length 272 if _, err := conn.Write(numBytes[:16]); err != nil { 273 return err 274 } 275 276 if _, err := conn.Write(metaBlob); err != nil { 277 return err 278 } 279 280 if _, err := conn.Write(numBytes[16:24]); err != nil { 281 return err 282 } 283 284 if _, err := nodeBytes.WriteTo(conn); err != nil { 285 return err 286 } 287 return nil 288} 289 290// writeDatabaseInfo will write the relative paths of all shards in the database on 291// this server into the connection. 292func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error { 293 res := Response{} 294 dbs := []meta.DatabaseInfo{} 295 if database != "" { 296 db := s.MetaClient.Database(database) 297 if db == nil { 298 return influxdb.ErrDatabaseNotFound(database) 299 } 300 dbs = append(dbs, *db) 301 } else { 302 // we'll allow collecting info on all databases 303 dbs = s.MetaClient.(*meta.Client).Databases() 304 } 305 306 for _, db := range dbs { 307 for _, rp := range db.RetentionPolicies { 308 for _, sg := range rp.ShardGroups { 309 for _, sh := range sg.Shards { 310 // ignore if the shard isn't on the server 311 if s.TSDBStore.Shard(sh.ID) == nil { 312 continue 313 } 314 315 path, err := s.TSDBStore.ShardRelativePath(sh.ID) 316 if err != nil { 317 return err 318 } 319 320 res.Paths = append(res.Paths, path) 321 } 322 } 323 } 324 } 325 if err := json.NewEncoder(conn).Encode(res); err != nil { 326 return fmt.Errorf("encode response: %s", err.Error()) 327 } 328 329 return nil 330} 331 332// writeDatabaseInfo will write the relative paths of all shards in the retention policy on 333// this server into the connection 334func (s *Service) writeRetentionPolicyInfo(conn net.Conn, database, retentionPolicy string) error { 335 res := Response{} 336 db := s.MetaClient.Database(database) 337 if db == nil { 338 return influxdb.ErrDatabaseNotFound(database) 339 } 340 341 var ret *meta.RetentionPolicyInfo 342 343 for _, rp := range db.RetentionPolicies { 344 if rp.Name == retentionPolicy { 345 ret = &rp 346 break 347 } 348 } 349 350 if ret == nil { 351 return influxdb.ErrRetentionPolicyNotFound(retentionPolicy) 352 } 353 354 for _, sg := range ret.ShardGroups { 355 for _, sh := range sg.Shards { 356 // ignore if the shard isn't on the server 357 if s.TSDBStore.Shard(sh.ID) == nil { 358 continue 359 } 360 361 path, err := s.TSDBStore.ShardRelativePath(sh.ID) 362 if err != nil { 363 return err 364 } 365 366 res.Paths = append(res.Paths, path) 367 } 368 } 369 370 if err := json.NewEncoder(conn).Encode(res); err != nil { 371 return fmt.Errorf("encode resonse: %s", err.Error()) 372 } 373 374 return nil 375} 376 377// readRequest unmarshals a request object from the conn. 378func (s *Service) readRequest(conn net.Conn) (Request, []byte, error) { 379 var r Request 380 d := json.NewDecoder(conn) 381 382 if err := d.Decode(&r); err != nil { 383 return r, nil, err 384 } 385 386 bits := make([]byte, r.UploadSize+1) 387 388 if r.UploadSize > 0 { 389 390 remainder := d.Buffered() 391 392 n, err := remainder.Read(bits) 393 if err != nil && err != io.EOF { 394 return r, bits, err 395 } 396 397 // it is a bit random but sometimes the Json decoder will consume all the bytes and sometimes 398 // it will leave a few behind. 399 if err != io.EOF && n < int(r.UploadSize+1) { 400 _, err = conn.Read(bits[n:]) 401 } 402 403 if err != nil && err != io.EOF { 404 return r, bits, err 405 } 406 // the JSON encoder on the client side seems to write an extra byte, so trim that off the front. 407 return r, bits[1:], nil 408 } 409 410 return r, bits, nil 411} 412 413// RequestType indicates the typeof snapshot request. 414type RequestType uint8 415 416const ( 417 // RequestShardBackup represents a request for a shard backup. 418 RequestShardBackup RequestType = iota 419 420 // RequestMetastoreBackup represents a request to back up the metastore. 421 RequestMetastoreBackup 422 423 // RequestSeriesFileBackup represents a request to back up the database series file. 424 RequestSeriesFileBackup 425 426 // RequestDatabaseInfo represents a request for database info. 427 RequestDatabaseInfo 428 429 // RequestRetentionPolicyInfo represents a request for retention policy info. 430 RequestRetentionPolicyInfo 431 432 // RequestShardExport represents a request to export Shard data. Similar to a backup, but shards 433 // may be filtered based on the start/end times on each block. 434 RequestShardExport 435 436 // RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update 437 // to the existing metastore. 438 RequestMetaStoreUpdate 439 440 // RequestShardUpdate will initiate the upload of a shard data tar file 441 // and have the engine import the data. 442 RequestShardUpdate 443) 444 445// Request represents a request for a specific backup or for information 446// about the shards on this server for a database or retention policy. 447type Request struct { 448 Type RequestType 449 BackupDatabase string 450 RestoreDatabase string 451 BackupRetentionPolicy string 452 RestoreRetentionPolicy string 453 ShardID uint64 454 Since time.Time 455 ExportStart time.Time 456 ExportEnd time.Time 457 UploadSize int64 458} 459 460// Response contains the relative paths for all the shards on this server 461// that are in the requested database or retention policy. 462type Response struct { 463 Paths []string 464} 465