1package meta
2
3import (
4	"errors"
5	"net"
6	"net/url"
7	"sort"
8	"strings"
9	"sync"
10	"time"
11	"unicode"
12
13	"fmt"
14
15	"github.com/gogo/protobuf/proto"
16	"github.com/influxdata/influxdb"
17	"github.com/influxdata/influxdb/models"
18	"github.com/influxdata/influxdb/query"
19	internal "github.com/influxdata/influxdb/services/meta/internal"
20	"github.com/influxdata/influxql"
21)
22
23//go:generate protoc --gogo_out=. internal/meta.proto
24
25const (
26	// DefaultRetentionPolicyReplicaN is the default value of RetentionPolicyInfo.ReplicaN.
27	DefaultRetentionPolicyReplicaN = 1
28
29	// DefaultRetentionPolicyDuration is the default value of RetentionPolicyInfo.Duration.
30	DefaultRetentionPolicyDuration = time.Duration(0)
31
32	// DefaultRetentionPolicyName is the default name for auto generated retention policies.
33	DefaultRetentionPolicyName = "autogen"
34
35	// MinRetentionPolicyDuration represents the minimum duration for a policy.
36	MinRetentionPolicyDuration = time.Hour
37)
38
39// Data represents the top level collection of all metadata.
40type Data struct {
41	Term      uint64 // associated raft term
42	Index     uint64 // associated raft index
43	ClusterID uint64
44	Databases []DatabaseInfo
45	Users     []UserInfo
46
47	// adminUserExists provides a constant time mechanism for determining
48	// if there is at least one admin user.
49	adminUserExists bool
50
51	MaxShardGroupID uint64
52	MaxShardID      uint64
53}
54
55// Database returns a DatabaseInfo by the database name.
56func (data *Data) Database(name string) *DatabaseInfo {
57	for i := range data.Databases {
58		if data.Databases[i].Name == name {
59			return &data.Databases[i]
60		}
61	}
62	return nil
63}
64
65// CloneDatabases returns a copy of the DatabaseInfo.
66func (data *Data) CloneDatabases() []DatabaseInfo {
67	if data.Databases == nil {
68		return nil
69	}
70	dbs := make([]DatabaseInfo, len(data.Databases))
71	for i := range data.Databases {
72		dbs[i] = data.Databases[i].clone()
73	}
74	return dbs
75}
76
77// CreateDatabase creates a new database.
78// It returns an error if name is blank or if a database with the same name already exists.
79func (data *Data) CreateDatabase(name string) error {
80	if name == "" {
81		return ErrDatabaseNameRequired
82	} else if data.Database(name) != nil {
83		return nil
84	}
85
86	// Append new node.
87	data.Databases = append(data.Databases, DatabaseInfo{Name: name})
88
89	return nil
90}
91
92// DropDatabase removes a database by name. It does not return an error
93// if the database cannot be found.
94func (data *Data) DropDatabase(name string) error {
95	for i := range data.Databases {
96		if data.Databases[i].Name == name {
97			data.Databases = append(data.Databases[:i], data.Databases[i+1:]...)
98
99			// Remove all user privileges associated with this database.
100			for i := range data.Users {
101				delete(data.Users[i].Privileges, name)
102			}
103			break
104		}
105	}
106	return nil
107}
108
109// RetentionPolicy returns a retention policy for a database by name.
110func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) {
111	di := data.Database(database)
112	if di == nil {
113		return nil, influxdb.ErrDatabaseNotFound(database)
114	}
115
116	for i := range di.RetentionPolicies {
117		if di.RetentionPolicies[i].Name == name {
118			return &di.RetentionPolicies[i], nil
119		}
120	}
121	return nil, nil
122}
123
124// CreateRetentionPolicy creates a new retention policy on a database.
125// It returns an error if name is blank or if the database does not exist.
126func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {
127	// Validate retention policy.
128	if rpi == nil {
129		return ErrRetentionPolicyRequired
130	} else if rpi.Name == "" {
131		return ErrRetentionPolicyNameRequired
132	} else if rpi.ReplicaN < 1 {
133		return ErrReplicationFactorTooLow
134	}
135
136	// Normalise ShardDuration before comparing to any existing
137	// retention policies. The client is supposed to do this, but
138	// do it again to verify input.
139	rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration)
140
141	if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration {
142		return ErrIncompatibleDurations
143	}
144
145	// Find database.
146	di := data.Database(database)
147	if di == nil {
148		return influxdb.ErrDatabaseNotFound(database)
149	} else if rp := di.RetentionPolicy(rpi.Name); rp != nil {
150		// RP with that name already exists. Make sure they're the same.
151		if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
152			return ErrRetentionPolicyExists
153		}
154		// if they want to make it default, and it's not the default, it's not an identical command so it's an error
155		if makeDefault && di.DefaultRetentionPolicy != rpi.Name {
156			return ErrRetentionPolicyConflict
157		}
158		return nil
159	}
160
161	// Append copy of new policy.
162	di.RetentionPolicies = append(di.RetentionPolicies, *rpi)
163
164	// Set the default if needed
165	if makeDefault {
166		di.DefaultRetentionPolicy = rpi.Name
167	}
168
169	return nil
170}
171
172// DropRetentionPolicy removes a retention policy from a database by name.
173func (data *Data) DropRetentionPolicy(database, name string) error {
174	// Find database.
175	di := data.Database(database)
176	if di == nil {
177		// no database? no problem
178		return nil
179	}
180
181	// Remove from list.
182	for i := range di.RetentionPolicies {
183		if di.RetentionPolicies[i].Name == name {
184			di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...)
185			break
186		}
187	}
188
189	return nil
190}
191
192// RetentionPolicyUpdate represents retention policy fields to be updated.
193type RetentionPolicyUpdate struct {
194	Name               *string
195	Duration           *time.Duration
196	ReplicaN           *int
197	ShardGroupDuration *time.Duration
198}
199
200// SetName sets the RetentionPolicyUpdate.Name.
201func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v }
202
203// SetDuration sets the RetentionPolicyUpdate.Duration.
204func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v }
205
206// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN.
207func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v }
208
209// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration.
210func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v }
211
212// UpdateRetentionPolicy updates an existing retention policy.
213func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate, makeDefault bool) error {
214	// Find database.
215	di := data.Database(database)
216	if di == nil {
217		return influxdb.ErrDatabaseNotFound(database)
218	}
219
220	// Find policy.
221	rpi := di.RetentionPolicy(name)
222	if rpi == nil {
223		return influxdb.ErrRetentionPolicyNotFound(name)
224	}
225
226	// Ensure new policy doesn't match an existing policy.
227	if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil {
228		return ErrRetentionPolicyNameExists
229	}
230
231	// Enforce duration of at least MinRetentionPolicyDuration
232	if rpu.Duration != nil && *rpu.Duration < MinRetentionPolicyDuration && *rpu.Duration != 0 {
233		return ErrRetentionPolicyDurationTooLow
234	}
235
236	// Enforce duration is at least the shard duration
237	if (rpu.Duration != nil && *rpu.Duration > 0 &&
238		((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) ||
239			(rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) ||
240		(rpu.Duration == nil && rpi.Duration > 0 &&
241			rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) {
242		return ErrIncompatibleDurations
243	}
244
245	// Update fields.
246	if rpu.Name != nil {
247		rpi.Name = *rpu.Name
248	}
249	if rpu.Duration != nil {
250		rpi.Duration = *rpu.Duration
251	}
252	if rpu.ReplicaN != nil {
253		rpi.ReplicaN = *rpu.ReplicaN
254	}
255	if rpu.ShardGroupDuration != nil {
256		rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration)
257	}
258
259	if di.DefaultRetentionPolicy != rpi.Name && makeDefault {
260		di.DefaultRetentionPolicy = rpi.Name
261	}
262
263	return nil
264}
265
266// DropShard removes a shard by ID.
267//
268// DropShard won't return an error if the shard can't be found, which
269// allows the command to be re-run in the case that the meta store
270// succeeds but a data node fails.
271func (data *Data) DropShard(id uint64) {
272	found := -1
273	for dbidx, dbi := range data.Databases {
274		for rpidx, rpi := range dbi.RetentionPolicies {
275			for sgidx, sg := range rpi.ShardGroups {
276				for sidx, s := range sg.Shards {
277					if s.ID == id {
278						found = sidx
279						break
280					}
281				}
282
283				if found > -1 {
284					shards := sg.Shards
285					data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...)
286
287					if len(shards) == 1 {
288						// We just deleted the last shard in the shard group.
289						data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now()
290					}
291					return
292				}
293			}
294		}
295	}
296}
297
298// ShardGroups returns a list of all shard groups on a database and retention policy.
299func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) {
300	// Find retention policy.
301	rpi, err := data.RetentionPolicy(database, policy)
302	if err != nil {
303		return nil, err
304	} else if rpi == nil {
305		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
306	}
307	groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
308	for _, g := range rpi.ShardGroups {
309		if g.Deleted() {
310			continue
311		}
312		groups = append(groups, g)
313	}
314	return groups, nil
315}
316
317// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data
318// for the specified time range. Shard groups are sorted by start time.
319func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) ([]ShardGroupInfo, error) {
320	// Find retention policy.
321	rpi, err := data.RetentionPolicy(database, policy)
322	if err != nil {
323		return nil, err
324	} else if rpi == nil {
325		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
326	}
327	groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups))
328	for _, g := range rpi.ShardGroups {
329		if g.Deleted() || !g.Overlaps(tmin, tmax) {
330			continue
331		}
332		groups = append(groups, g)
333	}
334	return groups, nil
335}
336
337// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp.
338func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
339	// Find retention policy.
340	rpi, err := data.RetentionPolicy(database, policy)
341	if err != nil {
342		return nil, err
343	} else if rpi == nil {
344		return nil, influxdb.ErrRetentionPolicyNotFound(policy)
345	}
346
347	return rpi.ShardGroupByTimestamp(timestamp), nil
348}
349
350// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
351func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
352	// Find retention policy.
353	rpi, err := data.RetentionPolicy(database, policy)
354	if err != nil {
355		return err
356	} else if rpi == nil {
357		return influxdb.ErrRetentionPolicyNotFound(policy)
358	}
359
360	// Verify that shard group doesn't already exist for this timestamp.
361	if rpi.ShardGroupByTimestamp(timestamp) != nil {
362		return nil
363	}
364
365	// Create the shard group.
366	data.MaxShardGroupID++
367	sgi := ShardGroupInfo{}
368	sgi.ID = data.MaxShardGroupID
369	sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
370	sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
371	if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) {
372		// Shard group range is [start, end) so add one to the max time.
373		sgi.EndTime = time.Unix(0, models.MaxNanoTime+1)
374	}
375
376	data.MaxShardID++
377	sgi.Shards = []ShardInfo{
378		{ID: data.MaxShardID},
379	}
380
381	// Retention policy has a new shard group, so update the policy. Shard
382	// Groups must be stored in sorted order, as other parts of the system
383	// assume this to be the case.
384	rpi.ShardGroups = append(rpi.ShardGroups, sgi)
385	sort.Sort(ShardGroupInfos(rpi.ShardGroups))
386
387	return nil
388}
389
390// DeleteShardGroup removes a shard group from a database and retention policy by id.
391func (data *Data) DeleteShardGroup(database, policy string, id uint64) error {
392	// Find retention policy.
393	rpi, err := data.RetentionPolicy(database, policy)
394	if err != nil {
395		return err
396	} else if rpi == nil {
397		return influxdb.ErrRetentionPolicyNotFound(policy)
398	}
399
400	// Find shard group by ID and set its deletion timestamp.
401	for i := range rpi.ShardGroups {
402		if rpi.ShardGroups[i].ID == id {
403			rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
404			return nil
405		}
406	}
407
408	return ErrShardGroupNotFound
409}
410
411// CreateContinuousQuery adds a named continuous query to a database.
412func (data *Data) CreateContinuousQuery(database, name, query string) error {
413	di := data.Database(database)
414	if di == nil {
415		return influxdb.ErrDatabaseNotFound(database)
416	}
417
418	// Ensure the name doesn't already exist.
419	for _, cq := range di.ContinuousQueries {
420		if cq.Name == name {
421			// If the query string is the same, we'll silently return,
422			// otherwise we'll assume the user might be trying to
423			// overwrite an existing CQ with a different query.
424			if strings.ToLower(cq.Query) == strings.ToLower(query) {
425				return nil
426			}
427			return ErrContinuousQueryExists
428		}
429	}
430
431	// Append new query.
432	di.ContinuousQueries = append(di.ContinuousQueries, ContinuousQueryInfo{
433		Name:  name,
434		Query: query,
435	})
436
437	return nil
438}
439
440// DropContinuousQuery removes a continuous query.
441func (data *Data) DropContinuousQuery(database, name string) error {
442	di := data.Database(database)
443	if di == nil {
444		return nil
445	}
446
447	for i := range di.ContinuousQueries {
448		if di.ContinuousQueries[i].Name == name {
449			di.ContinuousQueries = append(di.ContinuousQueries[:i], di.ContinuousQueries[i+1:]...)
450			return nil
451		}
452	}
453	return nil
454}
455
456// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or HTTP.
457func validateURL(input string) error {
458	u, err := url.Parse(input)
459	if err != nil {
460		return ErrInvalidSubscriptionURL(input)
461	}
462
463	if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" {
464		return ErrInvalidSubscriptionURL(input)
465	}
466
467	_, port, err := net.SplitHostPort(u.Host)
468	if err != nil || port == "" {
469		return ErrInvalidSubscriptionURL(input)
470	}
471
472	return nil
473}
474
475// CreateSubscription adds a named subscription to a database and retention policy.
476func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error {
477	for _, d := range destinations {
478		if err := validateURL(d); err != nil {
479			return err
480		}
481	}
482
483	rpi, err := data.RetentionPolicy(database, rp)
484	if err != nil {
485		return err
486	} else if rpi == nil {
487		return influxdb.ErrRetentionPolicyNotFound(rp)
488	}
489
490	// Ensure the name doesn't already exist.
491	for i := range rpi.Subscriptions {
492		if rpi.Subscriptions[i].Name == name {
493			return ErrSubscriptionExists
494		}
495	}
496
497	// Append new query.
498	rpi.Subscriptions = append(rpi.Subscriptions, SubscriptionInfo{
499		Name:         name,
500		Mode:         mode,
501		Destinations: destinations,
502	})
503
504	return nil
505}
506
507// DropSubscription removes a subscription.
508func (data *Data) DropSubscription(database, rp, name string) error {
509	rpi, err := data.RetentionPolicy(database, rp)
510	if err != nil {
511		return err
512	} else if rpi == nil {
513		return influxdb.ErrRetentionPolicyNotFound(rp)
514	}
515
516	for i := range rpi.Subscriptions {
517		if rpi.Subscriptions[i].Name == name {
518			rpi.Subscriptions = append(rpi.Subscriptions[:i], rpi.Subscriptions[i+1:]...)
519			return nil
520		}
521	}
522	return ErrSubscriptionNotFound
523}
524
525func (data *Data) user(username string) *UserInfo {
526	for i := range data.Users {
527		if data.Users[i].Name == username {
528			return &data.Users[i]
529		}
530	}
531	return nil
532}
533
534// User returns a user by username.
535func (data *Data) User(username string) User {
536	u := data.user(username)
537	if u == nil {
538		// prevent non-nil interface with nil pointer
539		return nil
540	}
541	return u
542}
543
544// CreateUser creates a new user.
545func (data *Data) CreateUser(name, hash string, admin bool) error {
546	// Ensure the user doesn't already exist.
547	if name == "" {
548		return ErrUsernameRequired
549	} else if data.User(name) != nil {
550		return ErrUserExists
551	}
552
553	// Append new user.
554	data.Users = append(data.Users, UserInfo{
555		Name:  name,
556		Hash:  hash,
557		Admin: admin,
558	})
559
560	// We know there is now at least one admin user.
561	if admin {
562		data.adminUserExists = true
563	}
564
565	return nil
566}
567
568// DropUser removes an existing user by name.
569func (data *Data) DropUser(name string) error {
570	for i := range data.Users {
571		if data.Users[i].Name == name {
572			wasAdmin := data.Users[i].Admin
573			data.Users = append(data.Users[:i], data.Users[i+1:]...)
574
575			// Maybe we dropped the only admin user?
576			if wasAdmin {
577				data.adminUserExists = data.hasAdminUser()
578			}
579			return nil
580		}
581	}
582
583	return ErrUserNotFound
584}
585
586// UpdateUser updates the password hash of an existing user.
587func (data *Data) UpdateUser(name, hash string) error {
588	for i := range data.Users {
589		if data.Users[i].Name == name {
590			data.Users[i].Hash = hash
591			return nil
592		}
593	}
594	return ErrUserNotFound
595}
596
597// CloneUsers returns a copy of the user infos.
598func (data *Data) CloneUsers() []UserInfo {
599	if len(data.Users) == 0 {
600		return []UserInfo{}
601	}
602	users := make([]UserInfo, len(data.Users))
603	for i := range data.Users {
604		users[i] = data.Users[i].clone()
605	}
606
607	return users
608}
609
610// SetPrivilege sets a privilege for a user on a database.
611func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error {
612	ui := data.user(name)
613	if ui == nil {
614		return ErrUserNotFound
615	}
616
617	if data.Database(database) == nil {
618		return influxdb.ErrDatabaseNotFound(database)
619	}
620
621	if ui.Privileges == nil {
622		ui.Privileges = make(map[string]influxql.Privilege)
623	}
624	ui.Privileges[database] = p
625
626	return nil
627}
628
629// SetAdminPrivilege sets the admin privilege for a user.
630func (data *Data) SetAdminPrivilege(name string, admin bool) error {
631	ui := data.user(name)
632	if ui == nil {
633		return ErrUserNotFound
634	}
635
636	ui.Admin = admin
637
638	// We could have promoted or revoked the only admin. Check if an admin
639	// user exists.
640	data.adminUserExists = data.hasAdminUser()
641	return nil
642}
643
644// AdminUserExists returns true if an admin user exists.
645func (data Data) AdminUserExists() bool {
646	return data.adminUserExists
647}
648
649// UserPrivileges gets the privileges for a user.
650func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) {
651	ui := data.user(name)
652	if ui == nil {
653		return nil, ErrUserNotFound
654	}
655
656	return ui.Privileges, nil
657}
658
659// UserPrivilege gets the privilege for a user on a database.
660func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) {
661	ui := data.user(name)
662	if ui == nil {
663		return nil, ErrUserNotFound
664	}
665
666	for db, p := range ui.Privileges {
667		if db == database {
668			return &p, nil
669		}
670	}
671
672	return influxql.NewPrivilege(influxql.NoPrivileges), nil
673}
674
675// Clone returns a copy of data with a new version.
676func (data *Data) Clone() *Data {
677	other := *data
678
679	other.Databases = data.CloneDatabases()
680	other.Users = data.CloneUsers()
681
682	return &other
683}
684
685// marshal serializes data to a protobuf representation.
686func (data *Data) marshal() *internal.Data {
687	pb := &internal.Data{
688		Term:      proto.Uint64(data.Term),
689		Index:     proto.Uint64(data.Index),
690		ClusterID: proto.Uint64(data.ClusterID),
691
692		MaxShardGroupID: proto.Uint64(data.MaxShardGroupID),
693		MaxShardID:      proto.Uint64(data.MaxShardID),
694
695		// Need this for reverse compatibility
696		MaxNodeID: proto.Uint64(0),
697	}
698
699	pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases))
700	for i := range data.Databases {
701		pb.Databases[i] = data.Databases[i].marshal()
702	}
703
704	pb.Users = make([]*internal.UserInfo, len(data.Users))
705	for i := range data.Users {
706		pb.Users[i] = data.Users[i].marshal()
707	}
708
709	return pb
710}
711
712// unmarshal deserializes from a protobuf representation.
713func (data *Data) unmarshal(pb *internal.Data) {
714	data.Term = pb.GetTerm()
715	data.Index = pb.GetIndex()
716	data.ClusterID = pb.GetClusterID()
717
718	data.MaxShardGroupID = pb.GetMaxShardGroupID()
719	data.MaxShardID = pb.GetMaxShardID()
720
721	data.Databases = make([]DatabaseInfo, len(pb.GetDatabases()))
722	for i, x := range pb.GetDatabases() {
723		data.Databases[i].unmarshal(x)
724	}
725
726	data.Users = make([]UserInfo, len(pb.GetUsers()))
727	for i, x := range pb.GetUsers() {
728		data.Users[i].unmarshal(x)
729	}
730
731	// Exhaustively determine if there is an admin user. The marshalled cache
732	// value may not be correct.
733	data.adminUserExists = data.hasAdminUser()
734}
735
736// MarshalBinary encodes the metadata to a binary format.
737func (data *Data) MarshalBinary() ([]byte, error) {
738	return proto.Marshal(data.marshal())
739}
740
741// UnmarshalBinary decodes the object from a binary format.
742func (data *Data) UnmarshalBinary(buf []byte) error {
743	var pb internal.Data
744	if err := proto.Unmarshal(buf, &pb); err != nil {
745		return err
746	}
747	data.unmarshal(&pb)
748	return nil
749}
750
751// TruncateShardGroups truncates any shard group that could contain timestamps beyond t.
752func (data *Data) TruncateShardGroups(t time.Time) {
753	for i := range data.Databases {
754		dbi := &data.Databases[i]
755
756		for j := range dbi.RetentionPolicies {
757			rpi := &dbi.RetentionPolicies[j]
758
759			for k := range rpi.ShardGroups {
760				sgi := &rpi.ShardGroups[k]
761
762				if !t.Before(sgi.EndTime) || sgi.Deleted() || (sgi.Truncated() && sgi.TruncatedAt.Before(t)) {
763					continue
764				}
765
766				if !t.After(sgi.StartTime) {
767					// future shardgroup
768					sgi.TruncatedAt = sgi.StartTime
769				} else {
770					sgi.TruncatedAt = t
771				}
772			}
773		}
774	}
775}
776
777// hasAdminUser exhaustively checks for the presence of at least one admin
778// user.
779func (data *Data) hasAdminUser() bool {
780	for _, u := range data.Users {
781		if u.Admin {
782			return true
783		}
784	}
785	return false
786}
787
788// ImportData imports selected data into the current metadata.
789// if non-empty, backupDBName, restoreDBName, backupRPName, restoreRPName can be used to select DB metadata from other,
790// and to assign a new name to the imported data.  Returns a map of shard ID's in the old metadata to new shard ID's
791// in the new metadata, along with a list of new databases created, both of which can assist in the import of existing
792// shard data during a database restore.
793func (data *Data) ImportData(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string) (map[uint64]uint64, []string, error) {
794	shardIDMap := make(map[uint64]uint64)
795	if backupDBName != "" {
796		dbName, err := data.importOneDB(other, backupDBName, restoreDBName, backupRPName, restoreRPName, shardIDMap)
797		if err != nil {
798			return nil, nil, err
799		}
800
801		return shardIDMap, []string{dbName}, nil
802	}
803
804	// if no backupDBName then we'll try to import all the DB's.  If one of them fails, we'll mark the whole
805	// operation a failure and return an error.
806	var newDBs []string
807	for _, dbi := range other.Databases {
808		if dbi.Name == "_internal" {
809			continue
810		}
811		dbName, err := data.importOneDB(other, dbi.Name, "", "", "", shardIDMap)
812		if err != nil {
813			return nil, nil, err
814		}
815		newDBs = append(newDBs, dbName)
816	}
817	return shardIDMap, newDBs, nil
818}
819
820// importOneDB imports a single database/rp from an external metadata object, renaming them if new names are provided.
821func (data *Data) importOneDB(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string, shardIDMap map[uint64]uint64) (string, error) {
822
823	dbPtr := other.Database(backupDBName)
824	if dbPtr == nil {
825		return "", fmt.Errorf("imported metadata does not have datbase named %s", backupDBName)
826	}
827
828	if restoreDBName == "" {
829		restoreDBName = backupDBName
830	}
831
832	if data.Database(restoreDBName) != nil {
833		return "", errors.New("database already exists")
834	}
835
836	// change the names if we want/need to
837	err := data.CreateDatabase(restoreDBName)
838	if err != nil {
839		return "", err
840	}
841	dbImport := data.Database(restoreDBName)
842
843	if backupRPName != "" {
844		rpPtr := dbPtr.RetentionPolicy(backupRPName)
845
846		if rpPtr != nil {
847			rpImport := rpPtr.clone()
848			if restoreRPName == "" {
849				restoreRPName = backupRPName
850			}
851			rpImport.Name = restoreRPName
852			dbImport.RetentionPolicies = []RetentionPolicyInfo{rpImport}
853			dbImport.DefaultRetentionPolicy = restoreRPName
854		} else {
855			return "", fmt.Errorf("retention Policy not found in meta backup: %s.%s", backupDBName, backupRPName)
856		}
857
858	} else { // import all RP's without renaming
859		dbImport.DefaultRetentionPolicy = dbPtr.DefaultRetentionPolicy
860		if dbPtr.RetentionPolicies != nil {
861			dbImport.RetentionPolicies = make([]RetentionPolicyInfo, len(dbPtr.RetentionPolicies))
862			for i := range dbPtr.RetentionPolicies {
863				dbImport.RetentionPolicies[i] = dbPtr.RetentionPolicies[i].clone()
864			}
865		}
866
867	}
868
869	// renumber the shard groups and shards for the new retention policy(ies)
870	for _, rpImport := range dbImport.RetentionPolicies {
871		for j, sgImport := range rpImport.ShardGroups {
872			data.MaxShardGroupID++
873			rpImport.ShardGroups[j].ID = data.MaxShardGroupID
874			for k, _ := range sgImport.Shards {
875				data.MaxShardID++
876				shardIDMap[sgImport.Shards[k].ID] = data.MaxShardID
877				sgImport.Shards[k].ID = data.MaxShardID
878				// OSS doesn't use Owners but if we are importing this from Enterprise, we'll want to clear it out
879				// to avoid any issues if they ever export this DB again to bring back to Enterprise.
880				sgImport.Shards[k].Owners = []ShardOwner{}
881			}
882		}
883	}
884
885	return restoreDBName, nil
886}
887
888// NodeInfo represents information about a single node in the cluster.
889type NodeInfo struct {
890	ID      uint64
891	Host    string
892	TCPHost string
893}
894
895// NodeInfos is a slice of NodeInfo used for sorting
896type NodeInfos []NodeInfo
897
898// Len implements sort.Interface.
899func (n NodeInfos) Len() int { return len(n) }
900
901// Swap implements sort.Interface.
902func (n NodeInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] }
903
904// Less implements sort.Interface.
905func (n NodeInfos) Less(i, j int) bool { return n[i].ID < n[j].ID }
906
907// DatabaseInfo represents information about a database in the system.
908type DatabaseInfo struct {
909	Name                   string
910	DefaultRetentionPolicy string
911	RetentionPolicies      []RetentionPolicyInfo
912	ContinuousQueries      []ContinuousQueryInfo
913}
914
915// RetentionPolicy returns a retention policy by name.
916func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo {
917	if name == "" {
918		if di.DefaultRetentionPolicy == "" {
919			return nil
920		}
921		name = di.DefaultRetentionPolicy
922	}
923
924	for i := range di.RetentionPolicies {
925		if di.RetentionPolicies[i].Name == name {
926			return &di.RetentionPolicies[i]
927		}
928	}
929	return nil
930}
931
932// ShardInfos returns a list of all shards' info for the database.
933func (di DatabaseInfo) ShardInfos() []ShardInfo {
934	shards := map[uint64]*ShardInfo{}
935	for i := range di.RetentionPolicies {
936		for j := range di.RetentionPolicies[i].ShardGroups {
937			sg := di.RetentionPolicies[i].ShardGroups[j]
938			// Skip deleted shard groups
939			if sg.Deleted() {
940				continue
941			}
942			for k := range sg.Shards {
943				si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k]
944				shards[si.ID] = si
945			}
946		}
947	}
948
949	infos := make([]ShardInfo, 0, len(shards))
950	for _, info := range shards {
951		infos = append(infos, *info)
952	}
953
954	return infos
955}
956
957// clone returns a deep copy of di.
958func (di DatabaseInfo) clone() DatabaseInfo {
959	other := di
960
961	if di.RetentionPolicies != nil {
962		other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies))
963		for i := range di.RetentionPolicies {
964			other.RetentionPolicies[i] = di.RetentionPolicies[i].clone()
965		}
966	}
967
968	// Copy continuous queries.
969	if di.ContinuousQueries != nil {
970		other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries))
971		for i := range di.ContinuousQueries {
972			other.ContinuousQueries[i] = di.ContinuousQueries[i].clone()
973		}
974	}
975
976	return other
977}
978
979// marshal serializes to a protobuf representation.
980func (di DatabaseInfo) marshal() *internal.DatabaseInfo {
981	pb := &internal.DatabaseInfo{}
982	pb.Name = proto.String(di.Name)
983	pb.DefaultRetentionPolicy = proto.String(di.DefaultRetentionPolicy)
984
985	pb.RetentionPolicies = make([]*internal.RetentionPolicyInfo, len(di.RetentionPolicies))
986	for i := range di.RetentionPolicies {
987		pb.RetentionPolicies[i] = di.RetentionPolicies[i].marshal()
988	}
989
990	pb.ContinuousQueries = make([]*internal.ContinuousQueryInfo, len(di.ContinuousQueries))
991	for i := range di.ContinuousQueries {
992		pb.ContinuousQueries[i] = di.ContinuousQueries[i].marshal()
993	}
994	return pb
995}
996
997// unmarshal deserializes from a protobuf representation.
998func (di *DatabaseInfo) unmarshal(pb *internal.DatabaseInfo) {
999	di.Name = pb.GetName()
1000	di.DefaultRetentionPolicy = pb.GetDefaultRetentionPolicy()
1001
1002	if len(pb.GetRetentionPolicies()) > 0 {
1003		di.RetentionPolicies = make([]RetentionPolicyInfo, len(pb.GetRetentionPolicies()))
1004		for i, x := range pb.GetRetentionPolicies() {
1005			di.RetentionPolicies[i].unmarshal(x)
1006		}
1007	}
1008
1009	if len(pb.GetContinuousQueries()) > 0 {
1010		di.ContinuousQueries = make([]ContinuousQueryInfo, len(pb.GetContinuousQueries()))
1011		for i, x := range pb.GetContinuousQueries() {
1012			di.ContinuousQueries[i].unmarshal(x)
1013		}
1014	}
1015}
1016
1017// RetentionPolicySpec represents the specification for a new retention policy.
1018type RetentionPolicySpec struct {
1019	Name               string
1020	ReplicaN           *int
1021	Duration           *time.Duration
1022	ShardGroupDuration time.Duration
1023}
1024
1025// NewRetentionPolicyInfo creates a new retention policy info from the specification.
1026func (s *RetentionPolicySpec) NewRetentionPolicyInfo() *RetentionPolicyInfo {
1027	return DefaultRetentionPolicyInfo().Apply(s)
1028}
1029
1030// Matches checks if this retention policy specification matches
1031// an existing retention policy.
1032func (s *RetentionPolicySpec) Matches(rpi *RetentionPolicyInfo) bool {
1033	if rpi == nil {
1034		return false
1035	} else if s.Name != "" && s.Name != rpi.Name {
1036		return false
1037	} else if s.Duration != nil && *s.Duration != rpi.Duration {
1038		return false
1039	} else if s.ReplicaN != nil && *s.ReplicaN != rpi.ReplicaN {
1040		return false
1041	}
1042
1043	// Normalise ShardDuration before comparing to any existing retention policies.
1044	// Normalize with the retention policy info's duration instead of the spec
1045	// since they should be the same and we're performing a comparison.
1046	sgDuration := normalisedShardDuration(s.ShardGroupDuration, rpi.Duration)
1047	return sgDuration == rpi.ShardGroupDuration
1048}
1049
1050// marshal serializes to a protobuf representation.
1051func (s *RetentionPolicySpec) marshal() *internal.RetentionPolicySpec {
1052	pb := &internal.RetentionPolicySpec{}
1053	if s.Name != "" {
1054		pb.Name = proto.String(s.Name)
1055	}
1056	if s.Duration != nil {
1057		pb.Duration = proto.Int64(int64(*s.Duration))
1058	}
1059	if s.ShardGroupDuration > 0 {
1060		pb.ShardGroupDuration = proto.Int64(int64(s.ShardGroupDuration))
1061	}
1062	if s.ReplicaN != nil {
1063		pb.ReplicaN = proto.Uint32(uint32(*s.ReplicaN))
1064	}
1065	return pb
1066}
1067
1068// unmarshal deserializes from a protobuf representation.
1069func (s *RetentionPolicySpec) unmarshal(pb *internal.RetentionPolicySpec) {
1070	if pb.Name != nil {
1071		s.Name = pb.GetName()
1072	}
1073	if pb.Duration != nil {
1074		duration := time.Duration(pb.GetDuration())
1075		s.Duration = &duration
1076	}
1077	if pb.ShardGroupDuration != nil {
1078		s.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration())
1079	}
1080	if pb.ReplicaN != nil {
1081		replicaN := int(pb.GetReplicaN())
1082		s.ReplicaN = &replicaN
1083	}
1084}
1085
1086// MarshalBinary encodes RetentionPolicySpec to a binary format.
1087func (s *RetentionPolicySpec) MarshalBinary() ([]byte, error) {
1088	return proto.Marshal(s.marshal())
1089}
1090
1091// UnmarshalBinary decodes RetentionPolicySpec from a binary format.
1092func (s *RetentionPolicySpec) UnmarshalBinary(data []byte) error {
1093	var pb internal.RetentionPolicySpec
1094	if err := proto.Unmarshal(data, &pb); err != nil {
1095		return err
1096	}
1097	s.unmarshal(&pb)
1098	return nil
1099}
1100
1101// RetentionPolicyInfo represents metadata about a retention policy.
1102type RetentionPolicyInfo struct {
1103	Name               string
1104	ReplicaN           int
1105	Duration           time.Duration
1106	ShardGroupDuration time.Duration
1107	ShardGroups        []ShardGroupInfo
1108	Subscriptions      []SubscriptionInfo
1109}
1110
1111// NewRetentionPolicyInfo returns a new instance of RetentionPolicyInfo
1112// with default replication and duration.
1113func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo {
1114	return &RetentionPolicyInfo{
1115		Name:     name,
1116		ReplicaN: DefaultRetentionPolicyReplicaN,
1117		Duration: DefaultRetentionPolicyDuration,
1118	}
1119}
1120
1121// DefaultRetentionPolicyInfo returns a new instance of RetentionPolicyInfo
1122// with default name, replication, and duration.
1123func DefaultRetentionPolicyInfo() *RetentionPolicyInfo {
1124	return NewRetentionPolicyInfo(DefaultRetentionPolicyName)
1125}
1126
1127// Apply applies a specification to the retention policy info.
1128func (rpi *RetentionPolicyInfo) Apply(spec *RetentionPolicySpec) *RetentionPolicyInfo {
1129	rp := &RetentionPolicyInfo{
1130		Name:               rpi.Name,
1131		ReplicaN:           rpi.ReplicaN,
1132		Duration:           rpi.Duration,
1133		ShardGroupDuration: rpi.ShardGroupDuration,
1134	}
1135	if spec.Name != "" {
1136		rp.Name = spec.Name
1137	}
1138	if spec.ReplicaN != nil {
1139		rp.ReplicaN = *spec.ReplicaN
1140	}
1141	if spec.Duration != nil {
1142		rp.Duration = *spec.Duration
1143	}
1144	rp.ShardGroupDuration = normalisedShardDuration(spec.ShardGroupDuration, rp.Duration)
1145	return rp
1146}
1147
1148// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp,
1149// or nil if no shard group matches.
1150func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo {
1151	for i := range rpi.ShardGroups {
1152		sgi := &rpi.ShardGroups[i]
1153		if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) {
1154			return &rpi.ShardGroups[i]
1155		}
1156	}
1157
1158	return nil
1159}
1160
1161// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time.
1162func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo {
1163	var groups = make([]*ShardGroupInfo, 0)
1164	for i := range rpi.ShardGroups {
1165		if rpi.ShardGroups[i].Deleted() {
1166			continue
1167		}
1168		if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) {
1169			groups = append(groups, &rpi.ShardGroups[i])
1170		}
1171	}
1172	return groups
1173}
1174
1175// DeletedShardGroups returns the Shard Groups which are marked as deleted.
1176func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo {
1177	var groups = make([]*ShardGroupInfo, 0)
1178	for i := range rpi.ShardGroups {
1179		if rpi.ShardGroups[i].Deleted() {
1180			groups = append(groups, &rpi.ShardGroups[i])
1181		}
1182	}
1183	return groups
1184}
1185
1186// marshal serializes to a protobuf representation.
1187func (rpi *RetentionPolicyInfo) marshal() *internal.RetentionPolicyInfo {
1188	pb := &internal.RetentionPolicyInfo{
1189		Name:               proto.String(rpi.Name),
1190		ReplicaN:           proto.Uint32(uint32(rpi.ReplicaN)),
1191		Duration:           proto.Int64(int64(rpi.Duration)),
1192		ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)),
1193	}
1194
1195	pb.ShardGroups = make([]*internal.ShardGroupInfo, len(rpi.ShardGroups))
1196	for i, sgi := range rpi.ShardGroups {
1197		pb.ShardGroups[i] = sgi.marshal()
1198	}
1199
1200	pb.Subscriptions = make([]*internal.SubscriptionInfo, len(rpi.Subscriptions))
1201	for i, sub := range rpi.Subscriptions {
1202		pb.Subscriptions[i] = sub.marshal()
1203	}
1204
1205	return pb
1206}
1207
1208// unmarshal deserializes from a protobuf representation.
1209func (rpi *RetentionPolicyInfo) unmarshal(pb *internal.RetentionPolicyInfo) {
1210	rpi.Name = pb.GetName()
1211	rpi.ReplicaN = int(pb.GetReplicaN())
1212	rpi.Duration = time.Duration(pb.GetDuration())
1213	rpi.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration())
1214
1215	if len(pb.GetShardGroups()) > 0 {
1216		rpi.ShardGroups = make([]ShardGroupInfo, len(pb.GetShardGroups()))
1217		for i, x := range pb.GetShardGroups() {
1218			rpi.ShardGroups[i].unmarshal(x)
1219		}
1220	}
1221	if len(pb.GetSubscriptions()) > 0 {
1222		rpi.Subscriptions = make([]SubscriptionInfo, len(pb.GetSubscriptions()))
1223		for i, x := range pb.GetSubscriptions() {
1224			rpi.Subscriptions[i].unmarshal(x)
1225		}
1226	}
1227}
1228
1229// clone returns a deep copy of rpi.
1230func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo {
1231	other := rpi
1232
1233	if rpi.ShardGroups != nil {
1234		other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups))
1235		for i := range rpi.ShardGroups {
1236			other.ShardGroups[i] = rpi.ShardGroups[i].clone()
1237		}
1238	}
1239
1240	return other
1241}
1242
1243// MarshalBinary encodes rpi to a binary format.
1244func (rpi *RetentionPolicyInfo) MarshalBinary() ([]byte, error) {
1245	return proto.Marshal(rpi.marshal())
1246}
1247
1248// UnmarshalBinary decodes rpi from a binary format.
1249func (rpi *RetentionPolicyInfo) UnmarshalBinary(data []byte) error {
1250	var pb internal.RetentionPolicyInfo
1251	if err := proto.Unmarshal(data, &pb); err != nil {
1252		return err
1253	}
1254	rpi.unmarshal(&pb)
1255	return nil
1256}
1257
1258// shardGroupDuration returns the default duration for a shard group based on a policy duration.
1259func shardGroupDuration(d time.Duration) time.Duration {
1260	if d >= 180*24*time.Hour || d == 0 { // 6 months or 0
1261		return 7 * 24 * time.Hour
1262	} else if d >= 2*24*time.Hour { // 2 days
1263		return 1 * 24 * time.Hour
1264	}
1265	return 1 * time.Hour
1266}
1267
1268// normalisedShardDuration returns normalised shard duration based on a policy duration.
1269func normalisedShardDuration(sgd, d time.Duration) time.Duration {
1270	// If it is zero, it likely wasn't specified, so we default to the shard group duration
1271	if sgd == 0 {
1272		return shardGroupDuration(d)
1273	}
1274	// If it was specified, but it's less than the MinRetentionPolicyDuration, then normalize
1275	// to the MinRetentionPolicyDuration
1276	if sgd < MinRetentionPolicyDuration {
1277		return shardGroupDuration(MinRetentionPolicyDuration)
1278	}
1279	return sgd
1280}
1281
1282// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important
1283// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system
1284// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can
1285// safely delete any associated shards.
1286type ShardGroupInfo struct {
1287	ID          uint64
1288	StartTime   time.Time
1289	EndTime     time.Time
1290	DeletedAt   time.Time
1291	Shards      []ShardInfo
1292	TruncatedAt time.Time
1293}
1294
1295// ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based
1296// on the StartTime field.
1297type ShardGroupInfos []ShardGroupInfo
1298
1299// Len implements sort.Interface.
1300func (a ShardGroupInfos) Len() int { return len(a) }
1301
1302// Swap implements sort.Interface.
1303func (a ShardGroupInfos) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
1304
1305// Less implements sort.Interface.
1306func (a ShardGroupInfos) Less(i, j int) bool {
1307	iEnd := a[i].EndTime
1308	if a[i].Truncated() {
1309		iEnd = a[i].TruncatedAt
1310	}
1311
1312	jEnd := a[j].EndTime
1313	if a[j].Truncated() {
1314		jEnd = a[j].TruncatedAt
1315	}
1316
1317	if iEnd.Equal(jEnd) {
1318		return a[i].StartTime.Before(a[j].StartTime)
1319	}
1320
1321	return iEnd.Before(jEnd)
1322}
1323
1324// Contains returns true iif StartTime ≤ t < EndTime.
1325func (sgi *ShardGroupInfo) Contains(t time.Time) bool {
1326	return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime)
1327}
1328
1329// Overlaps returns whether the shard group contains data for the time range between min and max
1330func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool {
1331	return !sgi.StartTime.After(max) && sgi.EndTime.After(min)
1332}
1333
1334// Deleted returns whether this ShardGroup has been deleted.
1335func (sgi *ShardGroupInfo) Deleted() bool {
1336	return !sgi.DeletedAt.IsZero()
1337}
1338
1339// Truncated returns true if this ShardGroup has been truncated (no new writes).
1340func (sgi *ShardGroupInfo) Truncated() bool {
1341	return !sgi.TruncatedAt.IsZero()
1342}
1343
1344// clone returns a deep copy of sgi.
1345func (sgi ShardGroupInfo) clone() ShardGroupInfo {
1346	other := sgi
1347
1348	if sgi.Shards != nil {
1349		other.Shards = make([]ShardInfo, len(sgi.Shards))
1350		for i := range sgi.Shards {
1351			other.Shards[i] = sgi.Shards[i].clone()
1352		}
1353	}
1354
1355	return other
1356}
1357
1358// ShardFor returns the ShardInfo for a Point hash.
1359func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {
1360	return sgi.Shards[hash%uint64(len(sgi.Shards))]
1361}
1362
1363// marshal serializes to a protobuf representation.
1364func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo {
1365	pb := &internal.ShardGroupInfo{
1366		ID:        proto.Uint64(sgi.ID),
1367		StartTime: proto.Int64(MarshalTime(sgi.StartTime)),
1368		EndTime:   proto.Int64(MarshalTime(sgi.EndTime)),
1369		DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)),
1370	}
1371
1372	if !sgi.TruncatedAt.IsZero() {
1373		pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt))
1374	}
1375
1376	pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards))
1377	for i := range sgi.Shards {
1378		pb.Shards[i] = sgi.Shards[i].marshal()
1379	}
1380
1381	return pb
1382}
1383
1384// unmarshal deserializes from a protobuf representation.
1385func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) {
1386	sgi.ID = pb.GetID()
1387	if i := pb.GetStartTime(); i == 0 {
1388		sgi.StartTime = time.Unix(0, 0).UTC()
1389	} else {
1390		sgi.StartTime = UnmarshalTime(i)
1391	}
1392	if i := pb.GetEndTime(); i == 0 {
1393		sgi.EndTime = time.Unix(0, 0).UTC()
1394	} else {
1395		sgi.EndTime = UnmarshalTime(i)
1396	}
1397	sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt())
1398
1399	if pb != nil && pb.TruncatedAt != nil {
1400		sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt())
1401	}
1402
1403	if len(pb.GetShards()) > 0 {
1404		sgi.Shards = make([]ShardInfo, len(pb.GetShards()))
1405		for i, x := range pb.GetShards() {
1406			sgi.Shards[i].unmarshal(x)
1407		}
1408	}
1409}
1410
1411// ShardInfo represents metadata about a shard.
1412type ShardInfo struct {
1413	ID     uint64
1414	Owners []ShardOwner
1415}
1416
1417// OwnedBy determines whether the shard's owner IDs includes nodeID.
1418func (si ShardInfo) OwnedBy(nodeID uint64) bool {
1419	for _, so := range si.Owners {
1420		if so.NodeID == nodeID {
1421			return true
1422		}
1423	}
1424	return false
1425}
1426
1427// clone returns a deep copy of si.
1428func (si ShardInfo) clone() ShardInfo {
1429	other := si
1430
1431	if si.Owners != nil {
1432		other.Owners = make([]ShardOwner, len(si.Owners))
1433		for i := range si.Owners {
1434			other.Owners[i] = si.Owners[i].clone()
1435		}
1436	}
1437
1438	return other
1439}
1440
1441// marshal serializes to a protobuf representation.
1442func (si ShardInfo) marshal() *internal.ShardInfo {
1443	pb := &internal.ShardInfo{
1444		ID: proto.Uint64(si.ID),
1445	}
1446
1447	pb.Owners = make([]*internal.ShardOwner, len(si.Owners))
1448	for i := range si.Owners {
1449		pb.Owners[i] = si.Owners[i].marshal()
1450	}
1451
1452	return pb
1453}
1454
1455// UnmarshalBinary decodes the object from a binary format.
1456func (si *ShardInfo) UnmarshalBinary(buf []byte) error {
1457	var pb internal.ShardInfo
1458	if err := proto.Unmarshal(buf, &pb); err != nil {
1459		return err
1460	}
1461	si.unmarshal(&pb)
1462	return nil
1463}
1464
1465// unmarshal deserializes from a protobuf representation.
1466func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) {
1467	si.ID = pb.GetID()
1468
1469	// If deprecated "OwnerIDs" exists then convert it to "Owners" format.
1470	if len(pb.GetOwnerIDs()) > 0 {
1471		si.Owners = make([]ShardOwner, len(pb.GetOwnerIDs()))
1472		for i, x := range pb.GetOwnerIDs() {
1473			si.Owners[i].unmarshal(&internal.ShardOwner{
1474				NodeID: proto.Uint64(x),
1475			})
1476		}
1477	} else if len(pb.GetOwners()) > 0 {
1478		si.Owners = make([]ShardOwner, len(pb.GetOwners()))
1479		for i, x := range pb.GetOwners() {
1480			si.Owners[i].unmarshal(x)
1481		}
1482	}
1483}
1484
1485// SubscriptionInfo holds the subscription information.
1486type SubscriptionInfo struct {
1487	Name         string
1488	Mode         string
1489	Destinations []string
1490}
1491
1492// marshal serializes to a protobuf representation.
1493func (si SubscriptionInfo) marshal() *internal.SubscriptionInfo {
1494	pb := &internal.SubscriptionInfo{
1495		Name: proto.String(si.Name),
1496		Mode: proto.String(si.Mode),
1497	}
1498
1499	pb.Destinations = make([]string, len(si.Destinations))
1500	for i := range si.Destinations {
1501		pb.Destinations[i] = si.Destinations[i]
1502	}
1503	return pb
1504}
1505
1506// unmarshal deserializes from a protobuf representation.
1507func (si *SubscriptionInfo) unmarshal(pb *internal.SubscriptionInfo) {
1508	si.Name = pb.GetName()
1509	si.Mode = pb.GetMode()
1510
1511	if len(pb.GetDestinations()) > 0 {
1512		si.Destinations = make([]string, len(pb.GetDestinations()))
1513		copy(si.Destinations, pb.GetDestinations())
1514	}
1515}
1516
1517// ShardOwner represents a node that owns a shard.
1518type ShardOwner struct {
1519	NodeID uint64
1520}
1521
1522// clone returns a deep copy of so.
1523func (so ShardOwner) clone() ShardOwner {
1524	return so
1525}
1526
1527// marshal serializes to a protobuf representation.
1528func (so ShardOwner) marshal() *internal.ShardOwner {
1529	return &internal.ShardOwner{
1530		NodeID: proto.Uint64(so.NodeID),
1531	}
1532}
1533
1534// unmarshal deserializes from a protobuf representation.
1535func (so *ShardOwner) unmarshal(pb *internal.ShardOwner) {
1536	so.NodeID = pb.GetNodeID()
1537}
1538
1539// ContinuousQueryInfo represents metadata about a continuous query.
1540type ContinuousQueryInfo struct {
1541	Name  string
1542	Query string
1543}
1544
1545// clone returns a deep copy of cqi.
1546func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi }
1547
1548// marshal serializes to a protobuf representation.
1549func (cqi ContinuousQueryInfo) marshal() *internal.ContinuousQueryInfo {
1550	return &internal.ContinuousQueryInfo{
1551		Name:  proto.String(cqi.Name),
1552		Query: proto.String(cqi.Query),
1553	}
1554}
1555
1556// unmarshal deserializes from a protobuf representation.
1557func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) {
1558	cqi.Name = pb.GetName()
1559	cqi.Query = pb.GetQuery()
1560}
1561
1562var _ query.Authorizer = (*UserInfo)(nil)
1563
1564// UserInfo represents metadata about a user in the system.
1565type UserInfo struct {
1566	// User's name.
1567	Name string
1568
1569	// Hashed password.
1570	Hash string
1571
1572	// Whether the user is an admin, i.e. allowed to do everything.
1573	Admin bool
1574
1575	// Map of database name to granted privilege.
1576	Privileges map[string]influxql.Privilege
1577}
1578
1579type User interface {
1580	query.Authorizer
1581	ID() string
1582	AuthorizeUnrestricted() bool
1583}
1584
1585func (u *UserInfo) ID() string {
1586	return u.Name
1587}
1588
1589// AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database.
1590func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool {
1591	if ui.Admin || privilege == influxql.NoPrivileges {
1592		return true
1593	}
1594	p, ok := ui.Privileges[database]
1595	return ok && (p == privilege || p == influxql.AllPrivileges)
1596}
1597
1598// AuthorizeSeriesRead is used to limit access per-series (enterprise only)
1599func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool {
1600	return true
1601}
1602
1603// AuthorizeSeriesWrite is used to limit access per-series (enterprise only)
1604func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool {
1605	return true
1606}
1607
1608// AuthorizeUnrestricted allows admins to shortcut access checks.
1609func (u *UserInfo) AuthorizeUnrestricted() bool {
1610	return u.Admin
1611}
1612
1613// clone returns a deep copy of si.
1614func (ui UserInfo) clone() UserInfo {
1615	other := ui
1616
1617	if ui.Privileges != nil {
1618		other.Privileges = make(map[string]influxql.Privilege)
1619		for k, v := range ui.Privileges {
1620			other.Privileges[k] = v
1621		}
1622	}
1623
1624	return other
1625}
1626
1627// marshal serializes to a protobuf representation.
1628func (ui UserInfo) marshal() *internal.UserInfo {
1629	pb := &internal.UserInfo{
1630		Name:  proto.String(ui.Name),
1631		Hash:  proto.String(ui.Hash),
1632		Admin: proto.Bool(ui.Admin),
1633	}
1634
1635	for database, privilege := range ui.Privileges {
1636		pb.Privileges = append(pb.Privileges, &internal.UserPrivilege{
1637			Database:  proto.String(database),
1638			Privilege: proto.Int32(int32(privilege)),
1639		})
1640	}
1641
1642	return pb
1643}
1644
1645// unmarshal deserializes from a protobuf representation.
1646func (ui *UserInfo) unmarshal(pb *internal.UserInfo) {
1647	ui.Name = pb.GetName()
1648	ui.Hash = pb.GetHash()
1649	ui.Admin = pb.GetAdmin()
1650
1651	ui.Privileges = make(map[string]influxql.Privilege)
1652	for _, p := range pb.GetPrivileges() {
1653		ui.Privileges[p.GetDatabase()] = influxql.Privilege(p.GetPrivilege())
1654	}
1655}
1656
1657// Lease represents a lease held on a resource.
1658type Lease struct {
1659	Name       string    `json:"name"`
1660	Expiration time.Time `json:"expiration"`
1661	Owner      uint64    `json:"owner"`
1662}
1663
1664// Leases is a concurrency-safe collection of leases keyed by name.
1665type Leases struct {
1666	mu sync.Mutex
1667	m  map[string]*Lease
1668	d  time.Duration
1669}
1670
1671// NewLeases returns a new instance of Leases.
1672func NewLeases(d time.Duration) *Leases {
1673	return &Leases{
1674		m: make(map[string]*Lease),
1675		d: d,
1676	}
1677}
1678
1679// Acquire acquires a lease with the given name for the given nodeID.
1680// If the lease doesn't exist or exists but is expired, a valid lease is returned.
1681// If nodeID already owns the named and unexpired lease, the lease expiration is extended.
1682// If a different node owns the lease, an error is returned.
1683func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) {
1684	leases.mu.Lock()
1685	defer leases.mu.Unlock()
1686
1687	l := leases.m[name]
1688	if l != nil {
1689		if time.Now().After(l.Expiration) || l.Owner == nodeID {
1690			l.Expiration = time.Now().Add(leases.d)
1691			l.Owner = nodeID
1692			return l, nil
1693		}
1694		return l, errors.New("another node has the lease")
1695	}
1696
1697	l = &Lease{
1698		Name:       name,
1699		Expiration: time.Now().Add(leases.d),
1700		Owner:      nodeID,
1701	}
1702
1703	leases.m[name] = l
1704
1705	return l, nil
1706}
1707
1708// MarshalTime converts t to nanoseconds since epoch. A zero time returns 0.
1709func MarshalTime(t time.Time) int64 {
1710	if t.IsZero() {
1711		return 0
1712	}
1713	return t.UnixNano()
1714}
1715
1716// UnmarshalTime converts nanoseconds since epoch to time.
1717// A zero value returns a zero time.
1718func UnmarshalTime(v int64) time.Time {
1719	if v == 0 {
1720		return time.Time{}
1721	}
1722	return time.Unix(0, v).UTC()
1723}
1724
1725// ValidName checks to see if the given name can would be valid for DB/RP name
1726func ValidName(name string) bool {
1727	for _, r := range name {
1728		if !unicode.IsPrint(r) {
1729			return false
1730		}
1731	}
1732
1733	return name != "" &&
1734		name != "." &&
1735		name != ".." &&
1736		!strings.ContainsAny(name, `/\`)
1737}
1738