1package meta
2
3import (
4	"errors"
5	"net"
6	"net/url"
7	"sort"
8	"strings"
9	"sync"
10	"time"
11	"unicode"
12
13	"fmt"
14
15	"github.com/gogo/protobuf/proto"
16	"github.com/influxdata/influxdb"
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/query"
19	internal "github.com/influxdata/influxdb/services/meta/internal"
20	"github.com/influxdata/influxql"
21)
22
23//go:generate protoc --gogo_out=. internal/meta.proto
24
25const (
26	// DefaultRetentionPolicyReplicaN is the default value of RetentionPolicyInfo.ReplicaN.
27	DefaultRetentionPolicyReplicaN = 1
28
29	// DefaultRetentionPolicyDuration is the default value of RetentionPolicyInfo.Duration.
30	DefaultRetentionPolicyDuration = time.Duration(0)
31
32	// DefaultRetentionPolicyName is the default name for auto generated retention policies.
33	DefaultRetentionPolicyName = "autogen"
34
35	// MinRetentionPolicyDuration represents the minimum duration for a policy.
36	MinRetentionPolicyDuration = time.Hour
37
38	// MaxNameLen is the maximum length of a database or retention policy name.
39	// InfluxDB uses the name for the directory name on disk.
40	MaxNameLen = 255
41)
42
43// Data represents the top level collection of all metadata.
44type Data struct {
45	Term      uint64 // associated raft term
46	Index     uint64 // associated raft index
47	ClusterID uint64
48	Databases []DatabaseInfo
49	Users     []UserInfo
50
51	// adminUserExists provides a constant time mechanism for determining
52	// if there is at least one admin user.
53	adminUserExists bool
54
55	MaxShardGroupID uint64
56	MaxShardID      uint64
57}
58
59// Database returns a DatabaseInfo by the database name.
60func (data *Data) Database(name string) *DatabaseInfo {
61	for i := range data.Databases {
62		if data.Databases[i].Name == name {
63			return &data.Databases[i]
64		}
65	}
66	return nil
67}
68
69// CloneDatabases returns a copy of the DatabaseInfo.
70func (data *Data) CloneDatabases() []DatabaseInfo {
71	if data.Databases == nil {
72		return nil
73	}
74	dbs := make([]DatabaseInfo, len(data.Databases))
75	for i := range data.Databases {
76		dbs[i] = data.Databases[i].clone()
77	}
78	return dbs
79}
80
81// CreateDatabase creates a new database.
82// It returns an error if name is blank or if a database with the same name already exists.
83func (data *Data) CreateDatabase(name string) error {
84	if name == "" {
85		return ErrDatabaseNameRequired
86	} else if len(name) > MaxNameLen {
87		return ErrNameTooLong
88	} else if data.Database(name) != nil {
89		return nil
90	}
91
92	// Append new node.
93	data.Databases = append(data.Databases, DatabaseInfo{Name: name})
94
95	return nil
96}
97
98// DropDatabase removes a database by name. It does not return an error
99// if the database cannot be found.
100func (data *Data) DropDatabase(name string) error {
101	for i := range data.Databases {
102		if data.Databases[i].Name == name {
103			data.Databases = append(data.Databases[:i], data.Databases[i+1:]...)
104
105			// Remove all user privileges associated with this database.
106			for i := range data.Users {
107				delete(data.Users[i].Privileges, name)
108			}
109			break
110		}
111	}
112	return nil
113}
114
115// RetentionPolicy returns a retention policy for a database by name.
116func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) {
117	di := data.Database(database)
118	if di == nil {
119		return nil, influxdb.ErrDatabaseNotFound(database)
120	}
121
122	for i := range di.RetentionPolicies {
123		if di.RetentionPolicies[i].Name == name {
124			return &di.RetentionPolicies[i], nil
125		}
126	}
127	return nil, nil
128}
129
130// CreateRetentionPolicy creates a new retention policy on a database.
131// It returns an error if name is blank or if the database does not exist.
132func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {
133	// Validate retention policy.
134	if rpi == nil {
135		return ErrRetentionPolicyRequired
136	} else if rpi.Name == "" {
137		return ErrRetentionPolicyNameRequired
138	} else if len(rpi.Name) > MaxNameLen {
139		return ErrNameTooLong
140	} else if rpi.ReplicaN < 1 {
141		return ErrReplicationFactorTooLow
142	}
143
144	// Normalise ShardDuration before comparing to any existing
145	// retention policies. The client is supposed to do this, but
146	// do it again to verify input.
147	rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)
148
149	if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration {
150		return ErrIncompatibleDurations
151	}
152
153	// Find database.
154	di := data.Database(database)
155	if di == nil {
156		return influxdb.ErrDatabaseNotFound(database)
157	} else if rp := di.RetentionPolicy(rpi.Name); rp != nil {
158		// RP with that name already exists. Make sure they're the same.
159		if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
160			return ErrRetentionPolicyExists
161		}
162		// if they want to make it default, and it's not the default, it's not an identical command so it's an error
163		if makeDefault && di.DefaultRetentionPolicy != rpi.Name {
164			return ErrRetentionPolicyConflict
165		}
166		return nil
167	}
168
169	// Append copy of new policy.
170	di.RetentionPolicies = append(di.RetentionPolicies, *rpi)
171
172	// Set the default if needed
173	if makeDefault {
174		di.DefaultRetentionPolicy = rpi.Name
175	}
176
177	return nil
178}
179
180// DropRetentionPolicy removes a retention policy from a database by name.
181func (data *Data) DropRetentionPolicy(database, name string) error {
182	// Find database.
183	di := data.Database(database)
184	if di == nil {
185		// no database? no problem
186		return nil
187	}
188
189	// Remove from list.
190	for i := range di.RetentionPolicies {
191		if di.RetentionPolicies[i].Name == name {
192			di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...)
193			break
194		}
195	}
196
197	return nil
198}
199
200// RetentionPolicyUpdate represents retention policy fields to be updated.
201type RetentionPolicyUpdate struct {
202	Name               *string
203	Duration           *time.Duration
204	ReplicaN           *int
205	ShardGroupDuration *time.Duration
206}
207
208// SetName sets the RetentionPolicyUpdate.Name.
209func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v }
210
211// SetDuration sets the RetentionPolicyUpdate.Duration.
212func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v }
213
214// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN.
215func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }
216
217// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration.
218func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v }
219
220// UpdateRetentionPolicy updates an existing retention policy.
221func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate, makeDefault bool) error {
222	// Find database.
223	di := data.Database(database)
224	if di == nil {
225		return influxdb.ErrDatabaseNotFound(database)
226	}
227
228	// Find policy.
229	rpi := di.RetentionPolicy(name)
230	if rpi == nil {
231		return influxdb.ErrRetentionPolicyNotFound(name)
232	}
233
234	// Ensure new policy doesn't match an existing policy.
235	if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil {
236		return ErrRetentionPolicyNameExists
237	}
238
239	// Enforce duration of at least MinRetentionPolicyDuration
240	if rpu.Duration != nil && *rpu.Duration < MinRetentionPolicyDuration && *rpu.Duration != 0 {
241		return ErrRetentionPolicyDurationTooLow
242	}
243
244	// Enforce duration is at least the shard duration
245	if (rpu.Duration != nil && *rpu.Duration > 0 &&
246		((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) ||
247			(rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) ||
248		(rpu.Duration == nil && rpi.Duration > 0 &&
249			rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) {
250		return ErrIncompatibleDurations
251	}
252
253	// Update fields.
254	if rpu.Name != nil {
255		rpi.Name = *rpu.Name
256	}
257	if rpu.Duration != nil {
258		rpi.Duration = *rpu.Duration
259	}
260	if rpu.ReplicaN != nil {
261		rpi.ReplicaN = *rpu.ReplicaN
262	}
263	if rpu.ShardGroupDuration != nil {
264		rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration)
265	}
266
267	if di.DefaultRetentionPolicy != rpi.Name && makeDefault {
268		di.DefaultRetentionPolicy = rpi.Name
269	}
270
271	return nil
272}
273
274// DropShard removes a shard by ID.
275//
276// DropShard won't return an error if the shard can't be found, which
277// allows the command to be re-run in the case that the meta store
278// succeeds but a data node fails.
279func (data *Data) DropShard(id uint64) {
280	found := -1
281	for dbidx, dbi := range data.Databases {
282		for rpidx, rpi := range dbi.RetentionPolicies {
283			for sgidx, sg := range rpi.ShardGroups {
284				for sidx, s := range sg.Shards {
285					if s.ID == id {
286						found = sidx
287						break
288					}
289				}
290
291				if found > -1 {
292					shards := sg.Shards
293					data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)
294
295					if len(shards) == 1 {
296						// We just deleted the last shard in the shard group.
297						data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
298					}
299					return
300				}
301			}
302		}
303	}
304}
305
306// ShardGroups returns a list of all shard groups on a database and retention policy.
307func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) {
308	// Find retention policy.
309	rpi, err := data.RetentionPolicy(database, policy)
310	if err != nil {
311		return nil, err
312	} else if rpi == nil {
313		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
314	}
315	groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
316	for _, g := range rpi.ShardGroups {
317		if g.Deleted() {
318			continue
319		}
320		groups = append(groups, g)
321	}
322	return groups, nil
323}
324
325// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
326// for the specified time range. Shard groups are sorted by start time.
327func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) ([]ShardGroupInfo, error) {
328	// Find retention policy.
329	rpi, err := data.RetentionPolicy(database, policy)
330	if err != nil {
331		return nil, err
332	} else if rpi == nil {
333		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
334	}
335	groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
336	for _, g := range rpi.ShardGroups {
337		if g.Deleted() || !g.Overlaps(tmin, tmax) {
338			continue
339		}
340		groups = append(groups, g)
341	}
342	return groups, nil
343}
344
345// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
346func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
347	// Find retention policy.
348	rpi, err := data.RetentionPolicy(database, policy)
349	if err != nil {
350		return nil, err
351	} else if rpi == nil {
352		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
353	}
354
355	return rpi.ShardGroupByTimestamp(timestamp), nil
356}
357
358// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
359func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
360	// Find retention policy.
361	rpi, err := data.RetentionPolicy(database, policy)
362	if err != nil {
363		return err
364	} else if rpi == nil {
365		return influxdb.ErrRetentionPolicyNotFound(policy)
366	}
367
368	// Verify that shard group doesn't already exist for this timestamp.
369	if rpi.ShardGroupByTimestamp(timestamp) != nil {
370		return nil
371	}
372
373	// Create the shard group.
374	data.MaxShardGroupID++
375	sgi := ShardGroupInfo{}
376	sgi.ID = data.MaxShardGroupID
377	sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
378	sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
379	if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
380		// Shard group range is [start, end) so add one to the max time.
381		sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
382	}
383
384	data.MaxShardID++
385	sgi.Shards = []ShardInfo{
386		{ID: data.MaxShardID},
387	}
388
389	// Retention policy has a new shard group, so update the policy. Shard
390	// Groups must be stored in sorted order, as other parts of the system
391	// assume this to be the case.
392	rpi.ShardGroups = append(rpi.ShardGroups, sgi)
393	sort.Sort(ShardGroupInfos(rpi.ShardGroups))
394
395	return nil
396}
397
398// DeleteShardGroup removes a shard group from a database and retention policy by id.
399func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
400	// Find retention policy.
401	rpi, err := data.RetentionPolicy(database, policy)
402	if err != nil {
403		return err
404	} else if rpi == nil {
405		return influxdb.ErrRetentionPolicyNotFound(policy)
406	}
407
408	// Find shard group by ID and set its deletion timestamp.
409	for i := range rpi.ShardGroups {
410		if rpi.ShardGroups[i].ID == id {
411			rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
412			return nil
413		}
414	}
415
416	return ErrShardGroupNotFound
417}
418
419// CreateContinuousQuery adds a named continuous query to a database.
420func (data *Data) CreateContinuousQuery(database, name, query string) error {
421	di := data.Database(database)
422	if di == nil {
423		return influxdb.ErrDatabaseNotFound(database)
424	}
425
426	// Ensure the name doesn't already exist.
427	for _, cq := range di.ContinuousQueries {
428		if cq.Name == name {
429			// If the query string is the same, we'll silently return,
430			// otherwise we'll assume the user might be trying to
431			// overwrite an existing CQ with a different query.
432			if strings.ToLower(cq.Query) == strings.ToLower(query) {
433				return nil
434			}
435			return ErrContinuousQueryExists
436		}
437	}
438
439	// Append new query.
440	di.ContinuousQueries = append(di.ContinuousQueries, ContinuousQueryInfo{
441		Name:  name,
442		Query: query,
443	})
444
445	return nil
446}
447
448// DropContinuousQuery removes a continuous query.
449func (data *Data) DropContinuousQuery(database, name string) error {
450	di := data.Database(database)
451	if di == nil {
452		return nil
453	}
454
455	for i := range di.ContinuousQueries {
456		if di.ContinuousQueries[i].Name == name {
457			di.ContinuousQueries = append(di.ContinuousQueries[:i], di.ContinuousQueries[i+1:]...)
458			return nil
459		}
460	}
461	return nil
462}
463
464// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or HTTP.
465func validateURL(input string) error {
466	u, err := url.Parse(input)
467	if err != nil {
468		return ErrInvalidSubscriptionURL(input)
469	}
470
471	if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" {
472		return ErrInvalidSubscriptionURL(input)
473	}
474
475	_, port, err := net.SplitHostPort(u.Host)
476	if err != nil || port == "" {
477		return ErrInvalidSubscriptionURL(input)
478	}
479
480	return nil
481}
482
483// CreateSubscription adds a named subscription to a database and retention policy.
484func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error {
485	for _, d := range destinations {
486		if err := validateURL(d); err != nil {
487			return err
488		}
489	}
490
491	rpi, err := data.RetentionPolicy(database, rp)
492	if err != nil {
493		return err
494	} else if rpi == nil {
495		return influxdb.ErrRetentionPolicyNotFound(rp)
496	}
497
498	// Ensure the name doesn't already exist.
499	for i := range rpi.Subscriptions {
500		if rpi.Subscriptions[i].Name == name {
501			return ErrSubscriptionExists
502		}
503	}
504
505	// Append new query.
506	rpi.Subscriptions = append(rpi.Subscriptions, SubscriptionInfo{
507		Name:         name,
508		Mode:         mode,
509		Destinations: destinations,
510	})
511
512	return nil
513}
514
515// DropSubscription removes a subscription.
516func (data *Data) DropSubscription(database, rp, name string) error {
517	rpi, err := data.RetentionPolicy(database, rp)
518	if err != nil {
519		return err
520	} else if rpi == nil {
521		return influxdb.ErrRetentionPolicyNotFound(rp)
522	}
523
524	for i := range rpi.Subscriptions {
525		if rpi.Subscriptions[i].Name == name {
526			rpi.Subscriptions = append(rpi.Subscriptions[:i], rpi.Subscriptions[i+1:]...)
527			return nil
528		}
529	}
530	return ErrSubscriptionNotFound
531}
532
533func (data *Data) user(username string) *UserInfo {
534	for i := range data.Users {
535		if data.Users[i].Name == username {
536			return &data.Users[i]
537		}
538	}
539	return nil
540}
541
542// User returns a user by username.
543func (data *Data) User(username string) User {
544	u := data.user(username)
545	if u == nil {
546		// prevent non-nil interface with nil pointer
547		return nil
548	}
549	return u
550}
551
552// CreateUser creates a new user.
553func (data *Data) CreateUser(name, hash string, admin bool) error {
554	// Ensure the user doesn't already exist.
555	if name == "" {
556		return ErrUsernameRequired
557	} else if data.User(name) != nil {
558		return ErrUserExists
559	}
560
561	// Append new user.
562	data.Users = append(data.Users, UserInfo{
563		Name:  name,
564		Hash:  hash,
565		Admin: admin,
566	})
567
568	// We know there is now at least one admin user.
569	if admin {
570		data.adminUserExists = true
571	}
572
573	return nil
574}
575
576// DropUser removes an existing user by name.
577func (data *Data) DropUser(name string) error {
578	for i := range data.Users {
579		if data.Users[i].Name == name {
580			wasAdmin := data.Users[i].Admin
581			data.Users = append(data.Users[:i], data.Users[i+1:]...)
582
583			// Maybe we dropped the only admin user?
584			if wasAdmin {
585				data.adminUserExists = data.hasAdminUser()
586			}
587			return nil
588		}
589	}
590
591	return ErrUserNotFound
592}
593
594// UpdateUser updates the password hash of an existing user.
595func (data *Data) UpdateUser(name, hash string) error {
596	for i := range data.Users {
597		if data.Users[i].Name == name {
598			data.Users[i].Hash = hash
599			return nil
600		}
601	}
602	return ErrUserNotFound
603}
604
605// CloneUsers returns a copy of the user infos.
606func (data *Data) CloneUsers() []UserInfo {
607	if len(data.Users) == 0 {
608		return []UserInfo{}
609	}
610	users := make([]UserInfo, len(data.Users))
611	for i := range data.Users {
612		users[i] = data.Users[i].clone()
613	}
614
615	return users
616}
617
618// SetPrivilege sets a privilege for a user on a database.
619func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error {
620	ui := data.user(name)
621	if ui == nil {
622		return ErrUserNotFound
623	}
624
625	if data.Database(database) == nil {
626		return influxdb.ErrDatabaseNotFound(database)
627	}
628
629	if ui.Privileges == nil {
630		ui.Privileges = make(map[string]influxql.Privilege)
631	}
632	ui.Privileges[database] = p
633
634	return nil
635}
636
637// SetAdminPrivilege sets the admin privilege for a user.
638func (data *Data) SetAdminPrivilege(name string, admin bool) error {
639	ui := data.user(name)
640	if ui == nil {
641		return ErrUserNotFound
642	}
643
644	ui.Admin = admin
645
646	// We could have promoted or revoked the only admin. Check if an admin
647	// user exists.
648	data.adminUserExists = data.hasAdminUser()
649	return nil
650}
651
652// AdminUserExists returns true if an admin user exists.
653func (data Data) AdminUserExists() bool {
654	return data.adminUserExists
655}
656
657// UserPrivileges gets the privileges for a user.
658func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) {
659	ui := data.user(name)
660	if ui == nil {
661		return nil, ErrUserNotFound
662	}
663
664	return ui.Privileges, nil
665}
666
667// UserPrivilege gets the privilege for a user on a database.
668func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) {
669	ui := data.user(name)
670	if ui == nil {
671		return nil, ErrUserNotFound
672	}
673
674	for db, p := range ui.Privileges {
675		if db == database {
676			return &p, nil
677		}
678	}
679
680	return influxql.NewPrivilege(influxql.NoPrivileges), nil
681}
682
683// Clone returns a copy of data with a new version.
684func (data *Data) Clone() *Data {
685	other := *data
686
687	other.Databases = data.CloneDatabases()
688	other.Users = data.CloneUsers()
689
690	return &other
691}
692
693// marshal serializes data to a protobuf representation.
694func (data *Data) marshal() *internal.Data {
695	pb := &internal.Data{
696		Term:      proto.Uint64(data.Term),
697		Index:     proto.Uint64(data.Index),
698		ClusterID: proto.Uint64(data.ClusterID),
699
700		MaxShardGroupID: proto.Uint64(data.MaxShardGroupID),
701		MaxShardID:      proto.Uint64(data.MaxShardID),
702
703		// Need this for reverse compatibility
704		MaxNodeID: proto.Uint64(0),
705	}
706
707	pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases))
708	for i := range data.Databases {
709		pb.Databases[i] = data.Databases[i].marshal()
710	}
711
712	pb.Users = make([]*internal.UserInfo, len(data.Users))
713	for i := range data.Users {
714		pb.Users[i] = data.Users[i].marshal()
715	}
716
717	return pb
718}
719
720// unmarshal deserializes from a protobuf representation.
721func (data *Data) unmarshal(pb *internal.Data) {
722	data.Term = pb.GetTerm()
723	data.Index = pb.GetIndex()
724	data.ClusterID = pb.GetClusterID()
725
726	data.MaxShardGroupID = pb.GetMaxShardGroupID()
727	data.MaxShardID = pb.GetMaxShardID()
728
729	data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))
730	for i, x := range pb.GetDatabases() {
731		data.Databases[i].unmarshal(x)
732	}
733
734	data.Users = make([]UserInfo, len(pb.GetUsers()))
735	for i, x := range pb.GetUsers() {
736		data.Users[i].unmarshal(x)
737	}
738
739	// Exhaustively determine if there is an admin user. The marshalled cache
740	// value may not be correct.
741	data.adminUserExists = data.hasAdminUser()
742}
743
744// MarshalBinary encodes the metadata to a binary format.
745func (data *Data) MarshalBinary() ([]byte, error) {
746	return proto.Marshal(data.marshal())
747}
748
749// UnmarshalBinary decodes the object from a binary format.
750func (data *Data) UnmarshalBinary(buf []byte) error {
751	var pb internal.Data
752	if err := proto.Unmarshal(buf, &pb); err != nil {
753		return err
754	}
755	data.unmarshal(&pb)
756	return nil
757}
758
759// TruncateShardGroups truncates any shard group that could contain timestamps beyond t.
760func (data *Data) TruncateShardGroups(t time.Time) {
761	for i := range data.Databases {
762		dbi := &data.Databases[i]
763
764		for j := range dbi.RetentionPolicies {
765			rpi := &dbi.RetentionPolicies[j]
766
767			for k := range rpi.ShardGroups {
768				sgi := &rpi.ShardGroups[k]
769
770				if !t.Before(sgi.EndTime) || sgi.Deleted() || (sgi.Truncated() && sgi.TruncatedAt.Before(t)) {
771					continue
772				}
773
774				if !t.After(sgi.StartTime) {
775					// future shardgroup
776					sgi.TruncatedAt = sgi.StartTime
777				} else {
778					sgi.TruncatedAt = t
779				}
780			}
781		}
782	}
783}
784
785// hasAdminUser exhaustively checks for the presence of at least one admin
786// user.
787func (data *Data) hasAdminUser() bool {
788	for _, u := range data.Users {
789		if u.Admin {
790			return true
791		}
792	}
793	return false
794}
795
796// ImportData imports selected data into the current metadata.
797// if non-empty, backupDBName, restoreDBName, backupRPName, restoreRPName can be used to select DB metadata from other,
798// and to assign a new name to the imported data.  Returns a map of shard ID's in the old metadata to new shard ID's
799// in the new metadata, along with a list of new databases created, both of which can assist in the import of existing
800// shard data during a database restore.
801func (data *Data) ImportData(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string) (map[uint64]uint64, []string, error) {
802	shardIDMap := make(map[uint64]uint64)
803	if backupDBName != "" {
804		dbName, err := data.importOneDB(other, backupDBName, restoreDBName, backupRPName, restoreRPName, shardIDMap)
805		if err != nil {
806			return nil, nil, err
807		}
808
809		return shardIDMap, []string{dbName}, nil
810	}
811
812	// if no backupDBName then we'll try to import all the DB's.  If one of them fails, we'll mark the whole
813	// operation a failure and return an error.
814	var newDBs []string
815	for _, dbi := range other.Databases {
816		if dbi.Name == "_internal" {
817			continue
818		}
819		dbName, err := data.importOneDB(other, dbi.Name, "", "", "", shardIDMap)
820		if err != nil {
821			return nil, nil, err
822		}
823		newDBs = append(newDBs, dbName)
824	}
825	return shardIDMap, newDBs, nil
826}
827
828// importOneDB imports a single database/rp from an external metadata object, renaming them if new names are provided.
829func (data *Data) importOneDB(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string, shardIDMap map[uint64]uint64) (string, error) {
830
831	dbPtr := other.Database(backupDBName)
832	if dbPtr == nil {
833		return "", fmt.Errorf("imported metadata does not have datbase named %s", backupDBName)
834	}
835
836	if restoreDBName == "" {
837		restoreDBName = backupDBName
838	}
839
840	if data.Database(restoreDBName) != nil {
841		return "", errors.New("database already exists")
842	}
843
844	// change the names if we want/need to
845	err := data.CreateDatabase(restoreDBName)
846	if err != nil {
847		return "", err
848	}
849	dbImport := data.Database(restoreDBName)
850
851	if backupRPName != "" {
852		rpPtr := dbPtr.RetentionPolicy(backupRPName)
853
854		if rpPtr != nil {
855			rpImport := rpPtr.clone()
856			if restoreRPName == "" {
857				restoreRPName = backupRPName
858			}
859			rpImport.Name = restoreRPName
860			dbImport.RetentionPolicies = []RetentionPolicyInfo{rpImport}
861			dbImport.DefaultRetentionPolicy = restoreRPName
862		} else {
863			return "", fmt.Errorf("retention Policy not found in meta backup: %s.%s", backupDBName, backupRPName)
864		}
865
866	} else { // import all RP's without renaming
867		dbImport.DefaultRetentionPolicy = dbPtr.DefaultRetentionPolicy
868		if dbPtr.RetentionPolicies != nil {
869			dbImport.RetentionPolicies = make([]RetentionPolicyInfo, len(dbPtr.RetentionPolicies))
870			for i := range dbPtr.RetentionPolicies {
871				dbImport.RetentionPolicies[i] = dbPtr.RetentionPolicies[i].clone()
872			}
873		}
874
875	}
876
877	// renumber the shard groups and shards for the new retention policy(ies)
878	for _, rpImport := range dbImport.RetentionPolicies {
879		for j, sgImport := range rpImport.ShardGroups {
880			data.MaxShardGroupID++
881			rpImport.ShardGroups[j].ID = data.MaxShardGroupID
882			for k := range sgImport.Shards {
883				data.MaxShardID++
884				shardIDMap[sgImport.Shards[k].ID] = data.MaxShardID
885				sgImport.Shards[k].ID = data.MaxShardID
886				// OSS doesn't use Owners but if we are importing this from Enterprise, we'll want to clear it out
887				// to avoid any issues if they ever export this DB again to bring back to Enterprise.
888				sgImport.Shards[k].Owners = []ShardOwner{}
889			}
890		}
891	}
892
893	return restoreDBName, nil
894}
895
896// NodeInfo represents information about a single node in the cluster.
897type NodeInfo struct {
898	ID      uint64
899	Host    string
900	TCPHost string
901}
902
903// NodeInfos is a slice of NodeInfo used for sorting
904type NodeInfos []NodeInfo
905
906// Len implements sort.Interface.
907func (n NodeInfos) Len() int { return len(n) }
908
909// Swap implements sort.Interface.
910func (n NodeInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
911
912// Less implements sort.Interface.
913func (n NodeInfos) Less(i, j int) bool { return n[i].ID < n[j].ID }
914
915// DatabaseInfo represents information about a database in the system.
916type DatabaseInfo struct {
917	Name                   string
918	DefaultRetentionPolicy string
919	RetentionPolicies      []RetentionPolicyInfo
920	ContinuousQueries      []ContinuousQueryInfo
921}
922
923// RetentionPolicy returns a retention policy by name.
924func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo {
925	if name == "" {
926		if di.DefaultRetentionPolicy == "" {
927			return nil
928		}
929		name = di.DefaultRetentionPolicy
930	}
931
932	for i := range di.RetentionPolicies {
933		if di.RetentionPolicies[i].Name == name {
934			return &di.RetentionPolicies[i]
935		}
936	}
937	return nil
938}
939
940// ShardInfos returns a list of all shards' info for the database.
941func (di DatabaseInfo) ShardInfos() []ShardInfo {
942	shards := map[uint64]*ShardInfo{}
943	for i := range di.RetentionPolicies {
944		for j := range di.RetentionPolicies[i].ShardGroups {
945			sg := di.RetentionPolicies[i].ShardGroups[j]
946			// Skip deleted shard groups
947			if sg.Deleted() {
948				continue
949			}
950			for k := range sg.Shards {
951				si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k]
952				shards[si.ID] = si
953			}
954		}
955	}
956
957	infos := make([]ShardInfo, 0, len(shards))
958	for _, info := range shards {
959		infos = append(infos, *info)
960	}
961
962	return infos
963}
964
965// clone returns a deep copy of di.
966func (di DatabaseInfo) clone() DatabaseInfo {
967	other := di
968
969	if di.RetentionPolicies != nil {
970		other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies))
971		for i := range di.RetentionPolicies {
972			other.RetentionPolicies[i] = di.RetentionPolicies[i].clone()
973		}
974	}
975
976	// Copy continuous queries.
977	if di.ContinuousQueries != nil {
978		other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries))
979		for i := range di.ContinuousQueries {
980			other.ContinuousQueries[i] = di.ContinuousQueries[i].clone()
981		}
982	}
983
984	return other
985}
986
987// marshal serializes to a protobuf representation.
988func (di DatabaseInfo) marshal() *internal.DatabaseInfo {
989	pb := &internal.DatabaseInfo{}
990	pb.Name = proto.String(di.Name)
991	pb.DefaultRetentionPolicy = proto.String(di.DefaultRetentionPolicy)
992
993	pb.RetentionPolicies = make([]*internal.RetentionPolicyInfo, len(di.RetentionPolicies))
994	for i := range di.RetentionPolicies {
995		pb.RetentionPolicies[i] = di.RetentionPolicies[i].marshal()
996	}
997
998	pb.ContinuousQueries = make([]*internal.ContinuousQueryInfo, len(di.ContinuousQueries))
999	for i := range di.ContinuousQueries {
1000		pb.ContinuousQueries[i] = di.ContinuousQueries[i].marshal()
1001	}
1002	return pb
1003}
1004
1005// unmarshal deserializes from a protobuf representation.
1006func (di *DatabaseInfo) unmarshal(pb *internal.DatabaseInfo) {
1007	di.Name = pb.GetName()
1008	di.DefaultRetentionPolicy = pb.GetDefaultRetentionPolicy()
1009
1010	if len(pb.GetRetentionPolicies()) > 0 {
1011		di.RetentionPolicies = make([]RetentionPolicyInfo, len(pb.GetRetentionPolicies()))
1012		for i, x := range pb.GetRetentionPolicies() {
1013			di.RetentionPolicies[i].unmarshal(x)
1014		}
1015	}
1016
1017	if len(pb.GetContinuousQueries()) > 0 {
1018		di.ContinuousQueries = make([]ContinuousQueryInfo, len(pb.GetContinuousQueries()))
1019		for i, x := range pb.GetContinuousQueries() {
1020			di.ContinuousQueries[i].unmarshal(x)
1021		}
1022	}
1023}
1024
1025// RetentionPolicySpec represents the specification for a new retention policy.
1026type RetentionPolicySpec struct {
1027	Name               string
1028	ReplicaN           *int
1029	Duration           *time.Duration
1030	ShardGroupDuration time.Duration
1031}
1032
1033// NewRetentionPolicyInfo creates a new retention policy info from the specification.
1034func (s *RetentionPolicySpec) NewRetentionPolicyInfo() *RetentionPolicyInfo {
1035	return DefaultRetentionPolicyInfo().Apply(s)
1036}
1037
1038// Matches checks if this retention policy specification matches
1039// an existing retention policy.
1040func (s *RetentionPolicySpec) Matches(rpi *RetentionPolicyInfo) bool {
1041	if rpi == nil {
1042		return false
1043	} else if s.Name != "" && s.Name != rpi.Name {
1044		return false
1045	} else if s.Duration != nil && *s.Duration != rpi.Duration {
1046		return false
1047	} else if s.ReplicaN != nil && *s.ReplicaN != rpi.ReplicaN {
1048		return false
1049	}
1050
1051	// Normalise ShardDuration before comparing to any existing retention policies.
1052	// Normalize with the retention policy info's duration instead of the spec
1053	// since they should be the same and we're performing a comparison.
1054	sgDuration := normalisedShardDuration(s.ShardGroupDuration, rpi.Duration)
1055	return sgDuration == rpi.ShardGroupDuration
1056}
1057
1058// marshal serializes to a protobuf representation.
1059func (s *RetentionPolicySpec) marshal() *internal.RetentionPolicySpec {
1060	pb := &internal.RetentionPolicySpec{}
1061	if s.Name != "" {
1062		pb.Name = proto.String(s.Name)
1063	}
1064	if s.Duration != nil {
1065		pb.Duration = proto.Int64(int64(*s.Duration))
1066	}
1067	if s.ShardGroupDuration > 0 {
1068		pb.ShardGroupDuration = proto.Int64(int64(s.ShardGroupDuration))
1069	}
1070	if s.ReplicaN != nil {
1071		pb.ReplicaN = proto.Uint32(uint32(*s.ReplicaN))
1072	}
1073	return pb
1074}
1075
1076// unmarshal deserializes from a protobuf representation.
1077func (s *RetentionPolicySpec) unmarshal(pb *internal.RetentionPolicySpec) {
1078	if pb.Name != nil {
1079		s.Name = pb.GetName()
1080	}
1081	if pb.Duration != nil {
1082		duration := time.Duration(pb.GetDuration())
1083		s.Duration = &duration
1084	}
1085	if pb.ShardGroupDuration != nil {
1086		s.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration())
1087	}
1088	if pb.ReplicaN != nil {
1089		replicaN := int(pb.GetReplicaN())
1090		s.ReplicaN = &replicaN
1091	}
1092}
1093
1094// MarshalBinary encodes RetentionPolicySpec to a binary format.
1095func (s *RetentionPolicySpec) MarshalBinary() ([]byte, error) {
1096	return proto.Marshal(s.marshal())
1097}
1098
1099// UnmarshalBinary decodes RetentionPolicySpec from a binary format.
1100func (s *RetentionPolicySpec) UnmarshalBinary(data []byte) error {
1101	var pb internal.RetentionPolicySpec
1102	if err := proto.Unmarshal(data, &pb); err != nil {
1103		return err
1104	}
1105	s.unmarshal(&pb)
1106	return nil
1107}
1108
1109// RetentionPolicyInfo represents metadata about a retention policy.
1110type RetentionPolicyInfo struct {
1111	Name               string
1112	ReplicaN           int
1113	Duration           time.Duration
1114	ShardGroupDuration time.Duration
1115	ShardGroups        []ShardGroupInfo
1116	Subscriptions      []SubscriptionInfo
1117}
1118
1119// NewRetentionPolicyInfo returns a new instance of RetentionPolicyInfo
1120// with default replication and duration.
1121func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo {
1122	return &RetentionPolicyInfo{
1123		Name:     name,
1124		ReplicaN: DefaultRetentionPolicyReplicaN,
1125		Duration: DefaultRetentionPolicyDuration,
1126	}
1127}
1128
1129// DefaultRetentionPolicyInfo returns a new instance of RetentionPolicyInfo
1130// with default name, replication, and duration.
1131func DefaultRetentionPolicyInfo() *RetentionPolicyInfo {
1132	return NewRetentionPolicyInfo(DefaultRetentionPolicyName)
1133}
1134
1135// Apply applies a specification to the retention policy info.
1136func (rpi *RetentionPolicyInfo) Apply(spec *RetentionPolicySpec) *RetentionPolicyInfo {
1137	rp := &RetentionPolicyInfo{
1138		Name:               rpi.Name,
1139		ReplicaN:           rpi.ReplicaN,
1140		Duration:           rpi.Duration,
1141		ShardGroupDuration: rpi.ShardGroupDuration,
1142	}
1143	if spec.Name != "" {
1144		rp.Name = spec.Name
1145	}
1146	if spec.ReplicaN != nil {
1147		rp.ReplicaN = *spec.ReplicaN
1148	}
1149	if spec.Duration != nil {
1150		rp.Duration = *spec.Duration
1151	}
1152	rp.ShardGroupDuration = normalisedShardDuration(spec.ShardGroupDuration, rp.Duration)
1153	return rp
1154}
1155
1156// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp,
1157// or nil if no shard group matches.
1158func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
1159	for i := range rpi.ShardGroups {
1160		sgi := &rpi.ShardGroups[i]
1161		if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) {
1162			return &rpi.ShardGroups[i]
1163		}
1164	}
1165
1166	return nil
1167}
1168
1169// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time.
1170func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo {
1171	var groups = make([]*ShardGroupInfo, 0)
1172	for i := range rpi.ShardGroups {
1173		if rpi.ShardGroups[i].Deleted() {
1174			continue
1175		}
1176		if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) {
1177			groups = append(groups, &rpi.ShardGroups[i])
1178		}
1179	}
1180	return groups
1181}
1182
1183// DeletedShardGroups returns the Shard Groups which are marked as deleted.
1184func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo {
1185	var groups = make([]*ShardGroupInfo, 0)
1186	for i := range rpi.ShardGroups {
1187		if rpi.ShardGroups[i].Deleted() {
1188			groups = append(groups, &rpi.ShardGroups[i])
1189		}
1190	}
1191	return groups
1192}
1193
1194// marshal serializes to a protobuf representation.
1195func (rpi *RetentionPolicyInfo) marshal() *internal.RetentionPolicyInfo {
1196	pb := &internal.RetentionPolicyInfo{
1197		Name:               proto.String(rpi.Name),
1198		ReplicaN:           proto.Uint32(uint32(rpi.ReplicaN)),
1199		Duration:           proto.Int64(int64(rpi.Duration)),
1200		ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)),
1201	}
1202
1203	pb.ShardGroups = make([]*internal.ShardGroupInfo, len(rpi.ShardGroups))
1204	for i, sgi := range rpi.ShardGroups {
1205		pb.ShardGroups[i] = sgi.marshal()
1206	}
1207
1208	pb.Subscriptions = make([]*internal.SubscriptionInfo, len(rpi.Subscriptions))
1209	for i, sub := range rpi.Subscriptions {
1210		pb.Subscriptions[i] = sub.marshal()
1211	}
1212
1213	return pb
1214}
1215
1216// unmarshal deserializes from a protobuf representation.
1217func (rpi *RetentionPolicyInfo) unmarshal(pb *internal.RetentionPolicyInfo) {
1218	rpi.Name = pb.GetName()
1219	rpi.ReplicaN = int(pb.GetReplicaN())
1220	rpi.Duration = time.Duration(pb.GetDuration())
1221	rpi.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration())
1222
1223	if len(pb.GetShardGroups()) > 0 {
1224		rpi.ShardGroups = make([]ShardGroupInfo, len(pb.GetShardGroups()))
1225		for i, x := range pb.GetShardGroups() {
1226			rpi.ShardGroups[i].unmarshal(x)
1227		}
1228	}
1229	if len(pb.GetSubscriptions()) > 0 {
1230		rpi.Subscriptions = make([]SubscriptionInfo, len(pb.GetSubscriptions()))
1231		for i, x := range pb.GetSubscriptions() {
1232			rpi.Subscriptions[i].unmarshal(x)
1233		}
1234	}
1235}
1236
1237// clone returns a deep copy of rpi.
1238func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo {
1239	other := rpi
1240
1241	if rpi.ShardGroups != nil {
1242		other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups))
1243		for i := range rpi.ShardGroups {
1244			other.ShardGroups[i] = rpi.ShardGroups[i].clone()
1245		}
1246	}
1247
1248	return other
1249}
1250
1251// MarshalBinary encodes rpi to a binary format.
1252func (rpi *RetentionPolicyInfo) MarshalBinary() ([]byte, error) {
1253	return proto.Marshal(rpi.marshal())
1254}
1255
1256// UnmarshalBinary decodes rpi from a binary format.
1257func (rpi *RetentionPolicyInfo) UnmarshalBinary(data []byte) error {
1258	var pb internal.RetentionPolicyInfo
1259	if err := proto.Unmarshal(data, &pb); err != nil {
1260		return err
1261	}
1262	rpi.unmarshal(&pb)
1263	return nil
1264}
1265
1266// shardGroupDuration returns the default duration for a shard group based on a policy duration.
1267func shardGroupDuration(d time.Duration) time.Duration {
1268	if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
1269		return 7 * 24 * time.Hour
1270	} else if d >= 2*24*time.Hour { // 2 days
1271		return 1 * 24 * time.Hour
1272	}
1273	return 1 * time.Hour
1274}
1275
1276// normalisedShardDuration returns normalised shard duration based on a policy duration.
1277func normalisedShardDuration(sgd, d time.Duration) time.Duration {
1278	// If it is zero, it likely wasn't specified, so we default to the shard group duration
1279	if sgd == 0 {
1280		return shardGroupDuration(d)
1281	}
1282	// If it was specified, but it's less than the MinRetentionPolicyDuration, then normalize
1283	// to the MinRetentionPolicyDuration
1284	if sgd < MinRetentionPolicyDuration {
1285		return shardGroupDuration(MinRetentionPolicyDuration)
1286	}
1287	return sgd
1288}
1289
1290// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important
1291// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
1292// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
1293// safely delete any associated shards.
1294type ShardGroupInfo struct {
1295	ID          uint64
1296	StartTime   time.Time
1297	EndTime     time.Time
1298	DeletedAt   time.Time
1299	Shards      []ShardInfo
1300	TruncatedAt time.Time
1301}
1302
1303// ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based
1304// on the StartTime field.
1305type ShardGroupInfos []ShardGroupInfo
1306
1307// Len implements sort.Interface.
1308func (a ShardGroupInfos) Len() int { return len(a) }
1309
1310// Swap implements sort.Interface.
1311func (a ShardGroupInfos) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
1312
1313// Less implements sort.Interface.
1314func (a ShardGroupInfos) Less(i, j int) bool {
1315	iEnd := a[i].EndTime
1316	if a[i].Truncated() {
1317		iEnd = a[i].TruncatedAt
1318	}
1319
1320	jEnd := a[j].EndTime
1321	if a[j].Truncated() {
1322		jEnd = a[j].TruncatedAt
1323	}
1324
1325	if iEnd.Equal(jEnd) {
1326		return a[i].StartTime.Before(a[j].StartTime)
1327	}
1328
1329	return iEnd.Before(jEnd)
1330}
1331
1332// Contains returns true iif StartTime ≤ t < EndTime.
1333func (sgi *ShardGroupInfo) Contains(t time.Time) bool {
1334	return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime)
1335}
1336
1337// Overlaps returns whether the shard group contains data for the time range between min and max
1338func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool {
1339	return !sgi.StartTime.After(max) && sgi.EndTime.After(min)
1340}
1341
1342// Deleted returns whether this ShardGroup has been deleted.
1343func (sgi *ShardGroupInfo) Deleted() bool {
1344	return !sgi.DeletedAt.IsZero()
1345}
1346
1347// Truncated returns true if this ShardGroup has been truncated (no new writes).
1348func (sgi *ShardGroupInfo) Truncated() bool {
1349	return !sgi.TruncatedAt.IsZero()
1350}
1351
1352// clone returns a deep copy of sgi.
1353func (sgi ShardGroupInfo) clone() ShardGroupInfo {
1354	other := sgi
1355
1356	if sgi.Shards != nil {
1357		other.Shards = make([]ShardInfo, len(sgi.Shards))
1358		for i := range sgi.Shards {
1359			other.Shards[i] = sgi.Shards[i].clone()
1360		}
1361	}
1362
1363	return other
1364}
1365
1366// ShardFor returns the ShardInfo for a Point hash.
1367func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {
1368	return sgi.Shards[hash%uint64(len(sgi.Shards))]
1369}
1370
1371// marshal serializes to a protobuf representation.
1372func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo {
1373	pb := &internal.ShardGroupInfo{
1374		ID:        proto.Uint64(sgi.ID),
1375		StartTime: proto.Int64(MarshalTime(sgi.StartTime)),
1376		EndTime:   proto.Int64(MarshalTime(sgi.EndTime)),
1377		DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)),
1378	}
1379
1380	if !sgi.TruncatedAt.IsZero() {
1381		pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt))
1382	}
1383
1384	pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards))
1385	for i := range sgi.Shards {
1386		pb.Shards[i] = sgi.Shards[i].marshal()
1387	}
1388
1389	return pb
1390}
1391
1392// unmarshal deserializes from a protobuf representation.
1393func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) {
1394	sgi.ID = pb.GetID()
1395	if i := pb.GetStartTime(); i == 0 {
1396		sgi.StartTime = time.Unix(0, 0).UTC()
1397	} else {
1398		sgi.StartTime = UnmarshalTime(i)
1399	}
1400	if i := pb.GetEndTime(); i == 0 {
1401		sgi.EndTime = time.Unix(0, 0).UTC()
1402	} else {
1403		sgi.EndTime = UnmarshalTime(i)
1404	}
1405	sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt())
1406
1407	if pb != nil && pb.TruncatedAt != nil {
1408		sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt())
1409	}
1410
1411	if len(pb.GetShards()) > 0 {
1412		sgi.Shards = make([]ShardInfo, len(pb.GetShards()))
1413		for i, x := range pb.GetShards() {
1414			sgi.Shards[i].unmarshal(x)
1415		}
1416	}
1417}
1418
1419// ShardInfo represents metadata about a shard.
1420type ShardInfo struct {
1421	ID     uint64
1422	Owners []ShardOwner
1423}
1424
1425// OwnedBy determines whether the shard's owner IDs includes nodeID.
1426func (si ShardInfo) OwnedBy(nodeID uint64) bool {
1427	for _, so := range si.Owners {
1428		if so.NodeID == nodeID {
1429			return true
1430		}
1431	}
1432	return false
1433}
1434
1435// clone returns a deep copy of si.
1436func (si ShardInfo) clone() ShardInfo {
1437	other := si
1438
1439	if si.Owners != nil {
1440		other.Owners = make([]ShardOwner, len(si.Owners))
1441		for i := range si.Owners {
1442			other.Owners[i] = si.Owners[i].clone()
1443		}
1444	}
1445
1446	return other
1447}
1448
1449// marshal serializes to a protobuf representation.
1450func (si ShardInfo) marshal() *internal.ShardInfo {
1451	pb := &internal.ShardInfo{
1452		ID: proto.Uint64(si.ID),
1453	}
1454
1455	pb.Owners = make([]*internal.ShardOwner, len(si.Owners))
1456	for i := range si.Owners {
1457		pb.Owners[i] = si.Owners[i].marshal()
1458	}
1459
1460	return pb
1461}
1462
1463// UnmarshalBinary decodes the object from a binary format.
1464func (si *ShardInfo) UnmarshalBinary(buf []byte) error {
1465	var pb internal.ShardInfo
1466	if err := proto.Unmarshal(buf, &pb); err != nil {
1467		return err
1468	}
1469	si.unmarshal(&pb)
1470	return nil
1471}
1472
1473// unmarshal deserializes from a protobuf representation.
1474func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) {
1475	si.ID = pb.GetID()
1476
1477	// If deprecated "OwnerIDs" exists then convert it to "Owners" format.
1478	if len(pb.GetOwnerIDs()) > 0 {
1479		si.Owners = make([]ShardOwner, len(pb.GetOwnerIDs()))
1480		for i, x := range pb.GetOwnerIDs() {
1481			si.Owners[i].unmarshal(&internal.ShardOwner{
1482				NodeID: proto.Uint64(x),
1483			})
1484		}
1485	} else if len(pb.GetOwners()) > 0 {
1486		si.Owners = make([]ShardOwner, len(pb.GetOwners()))
1487		for i, x := range pb.GetOwners() {
1488			si.Owners[i].unmarshal(x)
1489		}
1490	}
1491}
1492
1493// SubscriptionInfo holds the subscription information.
1494type SubscriptionInfo struct {
1495	Name         string
1496	Mode         string
1497	Destinations []string
1498}
1499
1500// marshal serializes to a protobuf representation.
1501func (si SubscriptionInfo) marshal() *internal.SubscriptionInfo {
1502	pb := &internal.SubscriptionInfo{
1503		Name: proto.String(si.Name),
1504		Mode: proto.String(si.Mode),
1505	}
1506
1507	pb.Destinations = make([]string, len(si.Destinations))
1508	for i := range si.Destinations {
1509		pb.Destinations[i] = si.Destinations[i]
1510	}
1511	return pb
1512}
1513
1514// unmarshal deserializes from a protobuf representation.
1515func (si *SubscriptionInfo) unmarshal(pb *internal.SubscriptionInfo) {
1516	si.Name = pb.GetName()
1517	si.Mode = pb.GetMode()
1518
1519	if len(pb.GetDestinations()) > 0 {
1520		si.Destinations = make([]string, len(pb.GetDestinations()))
1521		copy(si.Destinations, pb.GetDestinations())
1522	}
1523}
1524
1525// ShardOwner represents a node that owns a shard.
1526type ShardOwner struct {
1527	NodeID uint64
1528}
1529
1530// clone returns a deep copy of so.
1531func (so ShardOwner) clone() ShardOwner {
1532	return so
1533}
1534
1535// marshal serializes to a protobuf representation.
1536func (so ShardOwner) marshal() *internal.ShardOwner {
1537	return &internal.ShardOwner{
1538		NodeID: proto.Uint64(so.NodeID),
1539	}
1540}
1541
1542// unmarshal deserializes from a protobuf representation.
1543func (so *ShardOwner) unmarshal(pb *internal.ShardOwner) {
1544	so.NodeID = pb.GetNodeID()
1545}
1546
1547// ContinuousQueryInfo represents metadata about a continuous query.
1548type ContinuousQueryInfo struct {
1549	Name  string
1550	Query string
1551}
1552
1553// clone returns a deep copy of cqi.
1554func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi }
1555
1556// marshal serializes to a protobuf representation.
1557func (cqi ContinuousQueryInfo) marshal() *internal.ContinuousQueryInfo {
1558	return &internal.ContinuousQueryInfo{
1559		Name:  proto.String(cqi.Name),
1560		Query: proto.String(cqi.Query),
1561	}
1562}
1563
1564// unmarshal deserializes from a protobuf representation.
1565func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) {
1566	cqi.Name = pb.GetName()
1567	cqi.Query = pb.GetQuery()
1568}
1569
1570var _ query.Authorizer = (*UserInfo)(nil)
1571
1572// UserInfo represents metadata about a user in the system.
1573type UserInfo struct {
1574	// User's name.
1575	Name string
1576
1577	// Hashed password.
1578	Hash string
1579
1580	// Whether the user is an admin, i.e. allowed to do everything.
1581	Admin bool
1582
1583	// Map of database name to granted privilege.
1584	Privileges map[string]influxql.Privilege
1585}
1586
1587type User interface {
1588	query.Authorizer
1589	ID() string
1590	AuthorizeUnrestricted() bool
1591}
1592
1593func (u *UserInfo) ID() string {
1594	return u.Name
1595}
1596
1597// AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database.
1598func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool {
1599	if ui.Admin || privilege == influxql.NoPrivileges {
1600		return true
1601	}
1602	p, ok := ui.Privileges[database]
1603	return ok && (p == privilege || p == influxql.AllPrivileges)
1604}
1605
1606// AuthorizeSeriesRead is used to limit access per-series (enterprise only)
1607func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
1608	return true
1609}
1610
1611// AuthorizeSeriesWrite is used to limit access per-series (enterprise only)
1612func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
1613	return true
1614}
1615
1616// AuthorizeUnrestricted allows admins to shortcut access checks.
1617func (u *UserInfo) AuthorizeUnrestricted() bool {
1618	return u.Admin
1619}
1620
1621// clone returns a deep copy of si.
1622func (ui UserInfo) clone() UserInfo {
1623	other := ui
1624
1625	if ui.Privileges != nil {
1626		other.Privileges = make(map[string]influxql.Privilege)
1627		for k, v := range ui.Privileges {
1628			other.Privileges[k] = v
1629		}
1630	}
1631
1632	return other
1633}
1634
1635// marshal serializes to a protobuf representation.
1636func (ui UserInfo) marshal() *internal.UserInfo {
1637	pb := &internal.UserInfo{
1638		Name:  proto.String(ui.Name),
1639		Hash:  proto.String(ui.Hash),
1640		Admin: proto.Bool(ui.Admin),
1641	}
1642
1643	for database, privilege := range ui.Privileges {
1644		pb.Privileges = append(pb.Privileges, &internal.UserPrivilege{
1645			Database:  proto.String(database),
1646			Privilege: proto.Int32(int32(privilege)),
1647		})
1648	}
1649
1650	return pb
1651}
1652
1653// unmarshal deserializes from a protobuf representation.
1654func (ui *UserInfo) unmarshal(pb *internal.UserInfo) {
1655	ui.Name = pb.GetName()
1656	ui.Hash = pb.GetHash()
1657	ui.Admin = pb.GetAdmin()
1658
1659	ui.Privileges = make(map[string]influxql.Privilege)
1660	for _, p := range pb.GetPrivileges() {
1661		ui.Privileges[p.GetDatabase()] = influxql.Privilege(p.GetPrivilege())
1662	}
1663}
1664
1665// Lease represents a lease held on a resource.
1666type Lease struct {
1667	Name       string    `json:"name"`
1668	Expiration time.Time `json:"expiration"`
1669	Owner      uint64    `json:"owner"`
1670}
1671
1672// Leases is a concurrency-safe collection of leases keyed by name.
1673type Leases struct {
1674	mu sync.Mutex
1675	m  map[string]*Lease
1676	d  time.Duration
1677}
1678
1679// NewLeases returns a new instance of Leases.
1680func NewLeases(d time.Duration) *Leases {
1681	return &Leases{
1682		m: make(map[string]*Lease),
1683		d: d,
1684	}
1685}
1686
1687// Acquire acquires a lease with the given name for the given nodeID.
1688// If the lease doesn't exist or exists but is expired, a valid lease is returned.
1689// If nodeID already owns the named and unexpired lease, the lease expiration is extended.
1690// If a different node owns the lease, an error is returned.
1691func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) {
1692	leases.mu.Lock()
1693	defer leases.mu.Unlock()
1694
1695	l := leases.m[name]
1696	if l != nil {
1697		if time.Now().After(l.Expiration) || l.Owner == nodeID {
1698			l.Expiration = time.Now().Add(leases.d)
1699			l.Owner = nodeID
1700			return l, nil
1701		}
1702		return l, errors.New("another node has the lease")
1703	}
1704
1705	l = &Lease{
1706		Name:       name,
1707		Expiration: time.Now().Add(leases.d),
1708		Owner:      nodeID,
1709	}
1710
1711	leases.m[name] = l
1712
1713	return l, nil
1714}
1715
1716// MarshalTime converts t to nanoseconds since epoch. A zero time returns 0.
1717func MarshalTime(t time.Time) int64 {
1718	if t.IsZero() {
1719		return 0
1720	}
1721	return t.UnixNano()
1722}
1723
1724// UnmarshalTime converts nanoseconds since epoch to time.
1725// A zero value returns a zero time.
1726func UnmarshalTime(v int64) time.Time {
1727	if v == 0 {
1728		return time.Time{}
1729	}
1730	return time.Unix(0, v).UTC()
1731}
1732
1733// ValidName checks to see if the given name can would be valid for DB/RP name
1734func ValidName(name string) bool {
1735	for _, r := range name {
1736		if !unicode.IsPrint(r) {
1737			return false
1738		}
1739	}
1740
1741	return name != "" &&
1742		name != "." &&
1743		name != ".." &&
1744		!strings.ContainsAny(name, `/\`)
1745}
1746