1// Package snapshotter provides the meta snapshot service.
2package snapshotter // import "github.com/influxdata/influxdb/services/snapshotter"
3
4import (
5	"bytes"
6	"encoding"
7	"encoding/binary"
8	"encoding/json"
9	"fmt"
10	"io"
11	"net"
12	"strings"
13	"sync"
14	"time"
15
16	"github.com/influxdata/influxdb"
17	"github.com/influxdata/influxdb/services/meta"
18	"github.com/influxdata/influxdb/tsdb"
19	"go.uber.org/zap"
20)
21
22const (
23	// MuxHeader is the header byte used for the TCP muxer.
24	MuxHeader = 3
25
26	// BackupMagicHeader is the first 8 bytes used to identify and validate
27	// a metastore backup file
28	BackupMagicHeader = 0x59590101
29)
30
31// Service manages the listener for the snapshot endpoint.
32type Service struct {
33	wg sync.WaitGroup
34
35	Node *influxdb.Node
36
37	MetaClient interface {
38		encoding.BinaryMarshaler
39		Database(name string) *meta.DatabaseInfo
40	}
41
42	TSDBStore interface {
43		BackupShard(id uint64, since time.Time, w io.Writer) error
44		ExportShard(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
45		Shard(id uint64) *tsdb.Shard
46		ShardRelativePath(id uint64) (string, error)
47		SetShardEnabled(shardID uint64, enabled bool) error
48		RestoreShard(id uint64, r io.Reader) error
49		CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error
50	}
51
52	Listener net.Listener
53	Logger   *zap.Logger
54}
55
56// NewService returns a new instance of Service.
57func NewService() *Service {
58	return &Service{
59		Logger: zap.NewNop(),
60	}
61}
62
63// Open starts the service.
64func (s *Service) Open() error {
65	s.Logger.Info("Starting snapshot service")
66
67	s.wg.Add(1)
68	go s.serve()
69	return nil
70}
71
72// Close implements the Service interface.
73func (s *Service) Close() error {
74	if s.Listener != nil {
75		if err := s.Listener.Close(); err != nil {
76			return err
77		}
78	}
79	s.wg.Wait()
80	return nil
81}
82
83// WithLogger sets the logger on the service.
84func (s *Service) WithLogger(log *zap.Logger) {
85	s.Logger = log.With(zap.String("service", "snapshot"))
86}
87
88// serve serves snapshot requests from the listener.
89func (s *Service) serve() {
90	defer s.wg.Done()
91
92	for {
93		// Wait for next connection.
94		conn, err := s.Listener.Accept()
95		if err != nil && strings.Contains(err.Error(), "connection closed") {
96			s.Logger.Info("Listener closed")
97			return
98		} else if err != nil {
99			s.Logger.Info("Error accepting snapshot request", zap.Error(err))
100			continue
101		}
102
103		// Handle connection in separate goroutine.
104		s.wg.Add(1)
105		go func(conn net.Conn) {
106			defer s.wg.Done()
107			defer conn.Close()
108			if err := s.handleConn(conn); err != nil {
109				s.Logger.Info(err.Error())
110			}
111		}(conn)
112	}
113}
114
115// handleConn processes conn. This is run in a separate goroutine.
116func (s *Service) handleConn(conn net.Conn) error {
117	var typ [1]byte
118
119	_, err := conn.Read(typ[:])
120	if err != nil {
121		return err
122	}
123
124	if RequestType(typ[0]) == RequestShardUpdate {
125		return s.updateShardsLive(conn)
126	}
127
128	r, bytes, err := s.readRequest(conn)
129	if err != nil {
130		return fmt.Errorf("read request: %s", err)
131	}
132
133	switch RequestType(typ[0]) {
134	case RequestShardBackup:
135		if err := s.TSDBStore.BackupShard(r.ShardID, r.Since, conn); err != nil {
136			return err
137		}
138	case RequestShardExport:
139		if err := s.TSDBStore.ExportShard(r.ShardID, r.ExportStart, r.ExportEnd, conn); err != nil {
140			return err
141		}
142	case RequestMetastoreBackup:
143		if err := s.writeMetaStore(conn); err != nil {
144			return err
145		}
146	case RequestDatabaseInfo:
147		return s.writeDatabaseInfo(conn, r.BackupDatabase)
148	case RequestRetentionPolicyInfo:
149		return s.writeRetentionPolicyInfo(conn, r.BackupDatabase, r.BackupRetentionPolicy)
150	case RequestMetaStoreUpdate:
151		return s.updateMetaStore(conn, bytes, r.BackupDatabase, r.RestoreDatabase, r.BackupRetentionPolicy, r.RestoreRetentionPolicy)
152	default:
153		return fmt.Errorf("request type unknown: %v", r.Type)
154	}
155
156	return nil
157}
158
159func (s *Service) updateShardsLive(conn net.Conn) error {
160	var sidBytes [8]byte
161	_, err := conn.Read(sidBytes[:])
162	if err != nil {
163		return err
164	}
165	sid := binary.BigEndian.Uint64(sidBytes[:])
166
167	if err := s.TSDBStore.SetShardEnabled(sid, false); err != nil {
168		return err
169	}
170	defer s.TSDBStore.SetShardEnabled(sid, true)
171
172	return s.TSDBStore.RestoreShard(sid, conn)
173}
174
175func (s *Service) updateMetaStore(conn net.Conn, bits []byte, backupDBName, restoreDBName, backupRPName, restoreRPName string) error {
176	md := meta.Data{}
177	err := md.UnmarshalBinary(bits)
178	if err != nil {
179		if err := s.respondIDMap(conn, map[uint64]uint64{}); err != nil {
180			return err
181		}
182		return fmt.Errorf("failed to decode meta: %s", err)
183	}
184
185	data := s.MetaClient.(*meta.Client).Data()
186
187	IDMap, newDBs, err := data.ImportData(md, backupDBName, restoreDBName, backupRPName, restoreRPName)
188	if err != nil {
189		if err := s.respondIDMap(conn, map[uint64]uint64{}); err != nil {
190			return err
191		}
192		return err
193	}
194
195	err = s.MetaClient.(*meta.Client).SetData(&data)
196	if err != nil {
197		return err
198	}
199
200	err = s.createNewDBShards(data, newDBs)
201	if err != nil {
202		return err
203	}
204
205	err = s.respondIDMap(conn, IDMap)
206	return err
207}
208
209// iterate over a list of newDB's that should have just been added to the metadata
210// If the db was not created in the metadata return an error.
211// None of the shards should exist on a new DB, and CreateShard protects against double-creation.
212func (s *Service) createNewDBShards(data meta.Data, newDBs []string) error {
213	for _, restoreDBName := range newDBs {
214		dbi := data.Database(restoreDBName)
215		if dbi == nil {
216			return fmt.Errorf("db %s not found when creating new db shards", restoreDBName)
217		}
218		for _, rpi := range dbi.RetentionPolicies {
219			for _, sgi := range rpi.ShardGroups {
220				for _, shard := range sgi.Shards {
221					err := s.TSDBStore.CreateShard(restoreDBName, rpi.Name, shard.ID, true)
222					if err != nil {
223						return err
224					}
225				}
226			}
227		}
228	}
229	return nil
230}
231
232// send the IDMapping based on the metadata from the source server vs the shard ID
233// metadata on this server.  Sends back [BackupMagicHeader,0] if there's no mapped
234// values, signaling that nothing should be imported.
235func (s *Service) respondIDMap(conn net.Conn, IDMap map[uint64]uint64) error {
236	npairs := len(IDMap)
237	// 2 information ints, then npairs of 8byte ints.
238	numBytes := make([]byte, (npairs+1)*16)
239
240	binary.BigEndian.PutUint64(numBytes[:8], BackupMagicHeader)
241	binary.BigEndian.PutUint64(numBytes[8:16], uint64(npairs))
242	next := 16
243	for k, v := range IDMap {
244		binary.BigEndian.PutUint64(numBytes[next:next+8], k)
245		binary.BigEndian.PutUint64(numBytes[next+8:next+16], v)
246		next += 16
247	}
248
249	_, err := conn.Write(numBytes[:])
250	return err
251}
252
253func (s *Service) writeMetaStore(conn net.Conn) error {
254	// Retrieve and serialize the current meta data.
255	metaBlob, err := s.MetaClient.MarshalBinary()
256	if err != nil {
257		return fmt.Errorf("marshal meta: %s", err)
258	}
259
260	var nodeBytes bytes.Buffer
261	if err := json.NewEncoder(&nodeBytes).Encode(s.Node); err != nil {
262		return err
263	}
264
265	var numBytes [24]byte
266
267	binary.BigEndian.PutUint64(numBytes[:8], BackupMagicHeader)
268	binary.BigEndian.PutUint64(numBytes[8:16], uint64(len(metaBlob)))
269	binary.BigEndian.PutUint64(numBytes[16:24], uint64(nodeBytes.Len()))
270
271	// backup header followed by meta blob length
272	if _, err := conn.Write(numBytes[:16]); err != nil {
273		return err
274	}
275
276	if _, err := conn.Write(metaBlob); err != nil {
277		return err
278	}
279
280	if _, err := conn.Write(numBytes[16:24]); err != nil {
281		return err
282	}
283
284	if _, err := nodeBytes.WriteTo(conn); err != nil {
285		return err
286	}
287	return nil
288}
289
290// writeDatabaseInfo will write the relative paths of all shards in the database on
291// this server into the connection.
292func (s *Service) writeDatabaseInfo(conn net.Conn, database string) error {
293	res := Response{}
294	dbs := []meta.DatabaseInfo{}
295	if database != "" {
296		db := s.MetaClient.Database(database)
297		if db == nil {
298			return influxdb.ErrDatabaseNotFound(database)
299		}
300		dbs = append(dbs, *db)
301	} else {
302		// we'll allow collecting info on all databases
303		dbs = s.MetaClient.(*meta.Client).Databases()
304	}
305
306	for _, db := range dbs {
307		for _, rp := range db.RetentionPolicies {
308			for _, sg := range rp.ShardGroups {
309				for _, sh := range sg.Shards {
310					// ignore if the shard isn't on the server
311					if s.TSDBStore.Shard(sh.ID) == nil {
312						continue
313					}
314
315					path, err := s.TSDBStore.ShardRelativePath(sh.ID)
316					if err != nil {
317						return err
318					}
319
320					res.Paths = append(res.Paths, path)
321				}
322			}
323		}
324	}
325	if err := json.NewEncoder(conn).Encode(res); err != nil {
326		return fmt.Errorf("encode response: %s", err.Error())
327	}
328
329	return nil
330}
331
332// writeDatabaseInfo will write the relative paths of all shards in the retention policy on
333// this server into the connection
334func (s *Service) writeRetentionPolicyInfo(conn net.Conn, database, retentionPolicy string) error {
335	res := Response{}
336	db := s.MetaClient.Database(database)
337	if db == nil {
338		return influxdb.ErrDatabaseNotFound(database)
339	}
340
341	var ret *meta.RetentionPolicyInfo
342
343	for _, rp := range db.RetentionPolicies {
344		if rp.Name == retentionPolicy {
345			ret = &rp
346			break
347		}
348	}
349
350	if ret == nil {
351		return influxdb.ErrRetentionPolicyNotFound(retentionPolicy)
352	}
353
354	for _, sg := range ret.ShardGroups {
355		for _, sh := range sg.Shards {
356			// ignore if the shard isn't on the server
357			if s.TSDBStore.Shard(sh.ID) == nil {
358				continue
359			}
360
361			path, err := s.TSDBStore.ShardRelativePath(sh.ID)
362			if err != nil {
363				return err
364			}
365
366			res.Paths = append(res.Paths, path)
367		}
368	}
369
370	if err := json.NewEncoder(conn).Encode(res); err != nil {
371		return fmt.Errorf("encode resonse: %s", err.Error())
372	}
373
374	return nil
375}
376
377// readRequest unmarshals a request object from the conn.
378func (s *Service) readRequest(conn net.Conn) (Request, []byte, error) {
379	var r Request
380	d := json.NewDecoder(conn)
381
382	if err := d.Decode(&r); err != nil {
383		return r, nil, err
384	}
385
386	bits := make([]byte, r.UploadSize+1)
387
388	if r.UploadSize > 0 {
389
390		remainder := d.Buffered()
391
392		n, err := remainder.Read(bits)
393		if err != nil && err != io.EOF {
394			return r, bits, err
395		}
396
397		// it is a bit random but sometimes the Json decoder will consume all the bytes and sometimes
398		// it will leave a few behind.
399		if err != io.EOF && n < int(r.UploadSize+1) {
400			_, err = conn.Read(bits[n:])
401		}
402
403		if err != nil && err != io.EOF {
404			return r, bits, err
405		}
406		// the JSON encoder on the client side seems to write an extra byte, so trim that off the front.
407		return r, bits[1:], nil
408	}
409
410	return r, bits, nil
411}
412
413// RequestType indicates the typeof snapshot request.
414type RequestType uint8
415
416const (
417	// RequestShardBackup represents a request for a shard backup.
418	RequestShardBackup RequestType = iota
419
420	// RequestMetastoreBackup represents a request to back up the metastore.
421	RequestMetastoreBackup
422
423	// RequestSeriesFileBackup represents a request to back up the database series file.
424	RequestSeriesFileBackup
425
426	// RequestDatabaseInfo represents a request for database info.
427	RequestDatabaseInfo
428
429	// RequestRetentionPolicyInfo represents a request for retention policy info.
430	RequestRetentionPolicyInfo
431
432	// RequestShardExport represents a request to export Shard data.  Similar to a backup, but shards
433	// may be filtered based on the start/end times on each block.
434	RequestShardExport
435
436	// RequestMetaStoreUpdate represents a request to upload a metafile that will be used to do a live update
437	// to the existing metastore.
438	RequestMetaStoreUpdate
439
440	// RequestShardUpdate will initiate the upload of a shard data tar file
441	// and have the engine import the data.
442	RequestShardUpdate
443)
444
445// Request represents a request for a specific backup or for information
446// about the shards on this server for a database or retention policy.
447type Request struct {
448	Type                   RequestType
449	BackupDatabase         string
450	RestoreDatabase        string
451	BackupRetentionPolicy  string
452	RestoreRetentionPolicy string
453	ShardID                uint64
454	Since                  time.Time
455	ExportStart            time.Time
456	ExportEnd              time.Time
457	UploadSize             int64
458}
459
460// Response contains the relative paths for all the shards on this server
461// that are in the requested database or retention policy.
462type Response struct {
463	Paths []string
464}
465