1package meta 2 3import ( 4 "errors" 5 "net" 6 "net/url" 7 "sort" 8 "strings" 9 "sync" 10 "time" 11 "unicode" 12 13 "fmt" 14 15 "github.com/gogo/protobuf/proto" 16 "github.com/influxdata/influxdb" 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/query" 19 internal "github.com/influxdata/influxdb/services/meta/internal" 20 "github.com/influxdata/influxql" 21) 22 23//go:generate protoc --gogo_out=. internal/meta.proto 24 25const ( 26 // DefaultRetentionPolicyReplicaN is the default value of RetentionPolicyInfo.ReplicaN. 27 DefaultRetentionPolicyReplicaN = 1 28 29 // DefaultRetentionPolicyDuration is the default value of RetentionPolicyInfo.Duration. 30 DefaultRetentionPolicyDuration = time.Duration(0) 31 32 // DefaultRetentionPolicyName is the default name for auto generated retention policies. 33 DefaultRetentionPolicyName = "autogen" 34 35 // MinRetentionPolicyDuration represents the minimum duration for a policy. 36 MinRetentionPolicyDuration = time.Hour 37) 38 39// Data represents the top level collection of all metadata. 40type Data struct { 41 Term uint64 // associated raft term 42 Index uint64 // associated raft index 43 ClusterID uint64 44 Databases []DatabaseInfo 45 Users []UserInfo 46 47 // adminUserExists provides a constant time mechanism for determining 48 // if there is at least one admin user. 49 adminUserExists bool 50 51 MaxShardGroupID uint64 52 MaxShardID uint64 53} 54 55// Database returns a DatabaseInfo by the database name. 56func (data *Data) Database(name string) *DatabaseInfo { 57 for i := range data.Databases { 58 if data.Databases[i].Name == name { 59 return &data.Databases[i] 60 } 61 } 62 return nil 63} 64 65// CloneDatabases returns a copy of the DatabaseInfo. 66func (data *Data) CloneDatabases() []DatabaseInfo { 67 if data.Databases == nil { 68 return nil 69 } 70 dbs := make([]DatabaseInfo, len(data.Databases)) 71 for i := range data.Databases { 72 dbs[i] = data.Databases[i].clone() 73 } 74 return dbs 75} 76 77// CreateDatabase creates a new database. 78// It returns an error if name is blank or if a database with the same name already exists. 79func (data *Data) CreateDatabase(name string) error { 80 if name == "" { 81 return ErrDatabaseNameRequired 82 } else if data.Database(name) != nil { 83 return nil 84 } 85 86 // Append new node. 87 data.Databases = append(data.Databases, DatabaseInfo{Name: name}) 88 89 return nil 90} 91 92// DropDatabase removes a database by name. It does not return an error 93// if the database cannot be found. 94func (data *Data) DropDatabase(name string) error { 95 for i := range data.Databases { 96 if data.Databases[i].Name == name { 97 data.Databases = append(data.Databases[:i], data.Databases[i+1:]...) 98 99 // Remove all user privileges associated with this database. 100 for i := range data.Users { 101 delete(data.Users[i].Privileges, name) 102 } 103 break 104 } 105 } 106 return nil 107} 108 109// RetentionPolicy returns a retention policy for a database by name. 110func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) { 111 di := data.Database(database) 112 if di == nil { 113 return nil, influxdb.ErrDatabaseNotFound(database) 114 } 115 116 for i := range di.RetentionPolicies { 117 if di.RetentionPolicies[i].Name == name { 118 return &di.RetentionPolicies[i], nil 119 } 120 } 121 return nil, nil 122} 123 124// CreateRetentionPolicy creates a new retention policy on a database. 125// It returns an error if name is blank or if the database does not exist. 126func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error { 127 // Validate retention policy. 128 if rpi == nil { 129 return ErrRetentionPolicyRequired 130 } else if rpi.Name == "" { 131 return ErrRetentionPolicyNameRequired 132 } else if rpi.ReplicaN < 1 { 133 return ErrReplicationFactorTooLow 134 } 135 136 // Normalise ShardDuration before comparing to any existing 137 // retention policies. The client is supposed to do this, but 138 // do it again to verify input. 139 rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration) 140 141 if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration { 142 return ErrIncompatibleDurations 143 } 144 145 // Find database. 146 di := data.Database(database) 147 if di == nil { 148 return influxdb.ErrDatabaseNotFound(database) 149 } else if rp := di.RetentionPolicy(rpi.Name); rp != nil { 150 // RP with that name already exists. Make sure they're the same. 151 if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration { 152 return ErrRetentionPolicyExists 153 } 154 // if they want to make it default, and it's not the default, it's not an identical command so it's an error 155 if makeDefault && di.DefaultRetentionPolicy != rpi.Name { 156 return ErrRetentionPolicyConflict 157 } 158 return nil 159 } 160 161 // Append copy of new policy. 162 di.RetentionPolicies = append(di.RetentionPolicies, *rpi) 163 164 // Set the default if needed 165 if makeDefault { 166 di.DefaultRetentionPolicy = rpi.Name 167 } 168 169 return nil 170} 171 172// DropRetentionPolicy removes a retention policy from a database by name. 173func (data *Data) DropRetentionPolicy(database, name string) error { 174 // Find database. 175 di := data.Database(database) 176 if di == nil { 177 // no database? no problem 178 return nil 179 } 180 181 // Remove from list. 182 for i := range di.RetentionPolicies { 183 if di.RetentionPolicies[i].Name == name { 184 di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...) 185 break 186 } 187 } 188 189 return nil 190} 191 192// RetentionPolicyUpdate represents retention policy fields to be updated. 193type RetentionPolicyUpdate struct { 194 Name *string 195 Duration *time.Duration 196 ReplicaN *int 197 ShardGroupDuration *time.Duration 198} 199 200// SetName sets the RetentionPolicyUpdate.Name. 201func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v } 202 203// SetDuration sets the RetentionPolicyUpdate.Duration. 204func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v } 205 206// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN. 207func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v } 208 209// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration. 210func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v } 211 212// UpdateRetentionPolicy updates an existing retention policy. 213func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate, makeDefault bool) error { 214 // Find database. 215 di := data.Database(database) 216 if di == nil { 217 return influxdb.ErrDatabaseNotFound(database) 218 } 219 220 // Find policy. 221 rpi := di.RetentionPolicy(name) 222 if rpi == nil { 223 return influxdb.ErrRetentionPolicyNotFound(name) 224 } 225 226 // Ensure new policy doesn't match an existing policy. 227 if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil { 228 return ErrRetentionPolicyNameExists 229 } 230 231 // Enforce duration of at least MinRetentionPolicyDuration 232 if rpu.Duration != nil && *rpu.Duration < MinRetentionPolicyDuration && *rpu.Duration != 0 { 233 return ErrRetentionPolicyDurationTooLow 234 } 235 236 // Enforce duration is at least the shard duration 237 if (rpu.Duration != nil && *rpu.Duration > 0 && 238 ((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) || 239 (rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) || 240 (rpu.Duration == nil && rpi.Duration > 0 && 241 rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) { 242 return ErrIncompatibleDurations 243 } 244 245 // Update fields. 246 if rpu.Name != nil { 247 rpi.Name = *rpu.Name 248 } 249 if rpu.Duration != nil { 250 rpi.Duration = *rpu.Duration 251 } 252 if rpu.ReplicaN != nil { 253 rpi.ReplicaN = *rpu.ReplicaN 254 } 255 if rpu.ShardGroupDuration != nil { 256 rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration) 257 } 258 259 if di.DefaultRetentionPolicy != rpi.Name && makeDefault { 260 di.DefaultRetentionPolicy = rpi.Name 261 } 262 263 return nil 264} 265 266// DropShard removes a shard by ID. 267// 268// DropShard won't return an error if the shard can't be found, which 269// allows the command to be re-run in the case that the meta store 270// succeeds but a data node fails. 271func (data *Data) DropShard(id uint64) { 272 found := -1 273 for dbidx, dbi := range data.Databases { 274 for rpidx, rpi := range dbi.RetentionPolicies { 275 for sgidx, sg := range rpi.ShardGroups { 276 for sidx, s := range sg.Shards { 277 if s.ID == id { 278 found = sidx 279 break 280 } 281 } 282 283 if found > -1 { 284 shards := sg.Shards 285 data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...) 286 287 if len(shards) == 1 { 288 // We just deleted the last shard in the shard group. 289 data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now() 290 } 291 return 292 } 293 } 294 } 295 } 296} 297 298// ShardGroups returns a list of all shard groups on a database and retention policy. 299func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) { 300 // Find retention policy. 301 rpi, err := data.RetentionPolicy(database, policy) 302 if err != nil { 303 return nil, err 304 } else if rpi == nil { 305 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 306 } 307 groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) 308 for _, g := range rpi.ShardGroups { 309 if g.Deleted() { 310 continue 311 } 312 groups = append(groups, g) 313 } 314 return groups, nil 315} 316 317// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data 318// for the specified time range. Shard groups are sorted by start time. 319func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) ([]ShardGroupInfo, error) { 320 // Find retention policy. 321 rpi, err := data.RetentionPolicy(database, policy) 322 if err != nil { 323 return nil, err 324 } else if rpi == nil { 325 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 326 } 327 groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) 328 for _, g := range rpi.ShardGroups { 329 if g.Deleted() || !g.Overlaps(tmin, tmax) { 330 continue 331 } 332 groups = append(groups, g) 333 } 334 return groups, nil 335} 336 337// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp. 338func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { 339 // Find retention policy. 340 rpi, err := data.RetentionPolicy(database, policy) 341 if err != nil { 342 return nil, err 343 } else if rpi == nil { 344 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 345 } 346 347 return rpi.ShardGroupByTimestamp(timestamp), nil 348} 349 350// CreateShardGroup creates a shard group on a database and policy for a given timestamp. 351func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error { 352 // Find retention policy. 353 rpi, err := data.RetentionPolicy(database, policy) 354 if err != nil { 355 return err 356 } else if rpi == nil { 357 return influxdb.ErrRetentionPolicyNotFound(policy) 358 } 359 360 // Verify that shard group doesn't already exist for this timestamp. 361 if rpi.ShardGroupByTimestamp(timestamp) != nil { 362 return nil 363 } 364 365 // Create the shard group. 366 data.MaxShardGroupID++ 367 sgi := ShardGroupInfo{} 368 sgi.ID = data.MaxShardGroupID 369 sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC() 370 sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC() 371 if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) { 372 // Shard group range is [start, end) so add one to the max time. 373 sgi.EndTime = time.Unix(0, models.MaxNanoTime+1) 374 } 375 376 data.MaxShardID++ 377 sgi.Shards = []ShardInfo{ 378 {ID: data.MaxShardID}, 379 } 380 381 // Retention policy has a new shard group, so update the policy. Shard 382 // Groups must be stored in sorted order, as other parts of the system 383 // assume this to be the case. 384 rpi.ShardGroups = append(rpi.ShardGroups, sgi) 385 sort.Sort(ShardGroupInfos(rpi.ShardGroups)) 386 387 return nil 388} 389 390// DeleteShardGroup removes a shard group from a database and retention policy by id. 391func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { 392 // Find retention policy. 393 rpi, err := data.RetentionPolicy(database, policy) 394 if err != nil { 395 return err 396 } else if rpi == nil { 397 return influxdb.ErrRetentionPolicyNotFound(policy) 398 } 399 400 // Find shard group by ID and set its deletion timestamp. 401 for i := range rpi.ShardGroups { 402 if rpi.ShardGroups[i].ID == id { 403 rpi.ShardGroups[i].DeletedAt = time.Now().UTC() 404 return nil 405 } 406 } 407 408 return ErrShardGroupNotFound 409} 410 411// CreateContinuousQuery adds a named continuous query to a database. 412func (data *Data) CreateContinuousQuery(database, name, query string) error { 413 di := data.Database(database) 414 if di == nil { 415 return influxdb.ErrDatabaseNotFound(database) 416 } 417 418 // Ensure the name doesn't already exist. 419 for _, cq := range di.ContinuousQueries { 420 if cq.Name == name { 421 // If the query string is the same, we'll silently return, 422 // otherwise we'll assume the user might be trying to 423 // overwrite an existing CQ with a different query. 424 if strings.ToLower(cq.Query) == strings.ToLower(query) { 425 return nil 426 } 427 return ErrContinuousQueryExists 428 } 429 } 430 431 // Append new query. 432 di.ContinuousQueries = append(di.ContinuousQueries, ContinuousQueryInfo{ 433 Name: name, 434 Query: query, 435 }) 436 437 return nil 438} 439 440// DropContinuousQuery removes a continuous query. 441func (data *Data) DropContinuousQuery(database, name string) error { 442 di := data.Database(database) 443 if di == nil { 444 return nil 445 } 446 447 for i := range di.ContinuousQueries { 448 if di.ContinuousQueries[i].Name == name { 449 di.ContinuousQueries = append(di.ContinuousQueries[:i], di.ContinuousQueries[i+1:]...) 450 return nil 451 } 452 } 453 return nil 454} 455 456// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or HTTP. 457func validateURL(input string) error { 458 u, err := url.Parse(input) 459 if err != nil { 460 return ErrInvalidSubscriptionURL(input) 461 } 462 463 if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" { 464 return ErrInvalidSubscriptionURL(input) 465 } 466 467 _, port, err := net.SplitHostPort(u.Host) 468 if err != nil || port == "" { 469 return ErrInvalidSubscriptionURL(input) 470 } 471 472 return nil 473} 474 475// CreateSubscription adds a named subscription to a database and retention policy. 476func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error { 477 for _, d := range destinations { 478 if err := validateURL(d); err != nil { 479 return err 480 } 481 } 482 483 rpi, err := data.RetentionPolicy(database, rp) 484 if err != nil { 485 return err 486 } else if rpi == nil { 487 return influxdb.ErrRetentionPolicyNotFound(rp) 488 } 489 490 // Ensure the name doesn't already exist. 491 for i := range rpi.Subscriptions { 492 if rpi.Subscriptions[i].Name == name { 493 return ErrSubscriptionExists 494 } 495 } 496 497 // Append new query. 498 rpi.Subscriptions = append(rpi.Subscriptions, SubscriptionInfo{ 499 Name: name, 500 Mode: mode, 501 Destinations: destinations, 502 }) 503 504 return nil 505} 506 507// DropSubscription removes a subscription. 508func (data *Data) DropSubscription(database, rp, name string) error { 509 rpi, err := data.RetentionPolicy(database, rp) 510 if err != nil { 511 return err 512 } else if rpi == nil { 513 return influxdb.ErrRetentionPolicyNotFound(rp) 514 } 515 516 for i := range rpi.Subscriptions { 517 if rpi.Subscriptions[i].Name == name { 518 rpi.Subscriptions = append(rpi.Subscriptions[:i], rpi.Subscriptions[i+1:]...) 519 return nil 520 } 521 } 522 return ErrSubscriptionNotFound 523} 524 525func (data *Data) user(username string) *UserInfo { 526 for i := range data.Users { 527 if data.Users[i].Name == username { 528 return &data.Users[i] 529 } 530 } 531 return nil 532} 533 534// User returns a user by username. 535func (data *Data) User(username string) User { 536 u := data.user(username) 537 if u == nil { 538 // prevent non-nil interface with nil pointer 539 return nil 540 } 541 return u 542} 543 544// CreateUser creates a new user. 545func (data *Data) CreateUser(name, hash string, admin bool) error { 546 // Ensure the user doesn't already exist. 547 if name == "" { 548 return ErrUsernameRequired 549 } else if data.User(name) != nil { 550 return ErrUserExists 551 } 552 553 // Append new user. 554 data.Users = append(data.Users, UserInfo{ 555 Name: name, 556 Hash: hash, 557 Admin: admin, 558 }) 559 560 // We know there is now at least one admin user. 561 if admin { 562 data.adminUserExists = true 563 } 564 565 return nil 566} 567 568// DropUser removes an existing user by name. 569func (data *Data) DropUser(name string) error { 570 for i := range data.Users { 571 if data.Users[i].Name == name { 572 wasAdmin := data.Users[i].Admin 573 data.Users = append(data.Users[:i], data.Users[i+1:]...) 574 575 // Maybe we dropped the only admin user? 576 if wasAdmin { 577 data.adminUserExists = data.hasAdminUser() 578 } 579 return nil 580 } 581 } 582 583 return ErrUserNotFound 584} 585 586// UpdateUser updates the password hash of an existing user. 587func (data *Data) UpdateUser(name, hash string) error { 588 for i := range data.Users { 589 if data.Users[i].Name == name { 590 data.Users[i].Hash = hash 591 return nil 592 } 593 } 594 return ErrUserNotFound 595} 596 597// CloneUsers returns a copy of the user infos. 598func (data *Data) CloneUsers() []UserInfo { 599 if len(data.Users) == 0 { 600 return []UserInfo{} 601 } 602 users := make([]UserInfo, len(data.Users)) 603 for i := range data.Users { 604 users[i] = data.Users[i].clone() 605 } 606 607 return users 608} 609 610// SetPrivilege sets a privilege for a user on a database. 611func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error { 612 ui := data.user(name) 613 if ui == nil { 614 return ErrUserNotFound 615 } 616 617 if data.Database(database) == nil { 618 return influxdb.ErrDatabaseNotFound(database) 619 } 620 621 if ui.Privileges == nil { 622 ui.Privileges = make(map[string]influxql.Privilege) 623 } 624 ui.Privileges[database] = p 625 626 return nil 627} 628 629// SetAdminPrivilege sets the admin privilege for a user. 630func (data *Data) SetAdminPrivilege(name string, admin bool) error { 631 ui := data.user(name) 632 if ui == nil { 633 return ErrUserNotFound 634 } 635 636 ui.Admin = admin 637 638 // We could have promoted or revoked the only admin. Check if an admin 639 // user exists. 640 data.adminUserExists = data.hasAdminUser() 641 return nil 642} 643 644// AdminUserExists returns true if an admin user exists. 645func (data Data) AdminUserExists() bool { 646 return data.adminUserExists 647} 648 649// UserPrivileges gets the privileges for a user. 650func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) { 651 ui := data.user(name) 652 if ui == nil { 653 return nil, ErrUserNotFound 654 } 655 656 return ui.Privileges, nil 657} 658 659// UserPrivilege gets the privilege for a user on a database. 660func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) { 661 ui := data.user(name) 662 if ui == nil { 663 return nil, ErrUserNotFound 664 } 665 666 for db, p := range ui.Privileges { 667 if db == database { 668 return &p, nil 669 } 670 } 671 672 return influxql.NewPrivilege(influxql.NoPrivileges), nil 673} 674 675// Clone returns a copy of data with a new version. 676func (data *Data) Clone() *Data { 677 other := *data 678 679 other.Databases = data.CloneDatabases() 680 other.Users = data.CloneUsers() 681 682 return &other 683} 684 685// marshal serializes data to a protobuf representation. 686func (data *Data) marshal() *internal.Data { 687 pb := &internal.Data{ 688 Term: proto.Uint64(data.Term), 689 Index: proto.Uint64(data.Index), 690 ClusterID: proto.Uint64(data.ClusterID), 691 692 MaxShardGroupID: proto.Uint64(data.MaxShardGroupID), 693 MaxShardID: proto.Uint64(data.MaxShardID), 694 695 // Need this for reverse compatibility 696 MaxNodeID: proto.Uint64(0), 697 } 698 699 pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases)) 700 for i := range data.Databases { 701 pb.Databases[i] = data.Databases[i].marshal() 702 } 703 704 pb.Users = make([]*internal.UserInfo, len(data.Users)) 705 for i := range data.Users { 706 pb.Users[i] = data.Users[i].marshal() 707 } 708 709 return pb 710} 711 712// unmarshal deserializes from a protobuf representation. 713func (data *Data) unmarshal(pb *internal.Data) { 714 data.Term = pb.GetTerm() 715 data.Index = pb.GetIndex() 716 data.ClusterID = pb.GetClusterID() 717 718 data.MaxShardGroupID = pb.GetMaxShardGroupID() 719 data.MaxShardID = pb.GetMaxShardID() 720 721 data.Databases = make([]DatabaseInfo, len(pb.GetDatabases())) 722 for i, x := range pb.GetDatabases() { 723 data.Databases[i].unmarshal(x) 724 } 725 726 data.Users = make([]UserInfo, len(pb.GetUsers())) 727 for i, x := range pb.GetUsers() { 728 data.Users[i].unmarshal(x) 729 } 730 731 // Exhaustively determine if there is an admin user. The marshalled cache 732 // value may not be correct. 733 data.adminUserExists = data.hasAdminUser() 734} 735 736// MarshalBinary encodes the metadata to a binary format. 737func (data *Data) MarshalBinary() ([]byte, error) { 738 return proto.Marshal(data.marshal()) 739} 740 741// UnmarshalBinary decodes the object from a binary format. 742func (data *Data) UnmarshalBinary(buf []byte) error { 743 var pb internal.Data 744 if err := proto.Unmarshal(buf, &pb); err != nil { 745 return err 746 } 747 data.unmarshal(&pb) 748 return nil 749} 750 751// TruncateShardGroups truncates any shard group that could contain timestamps beyond t. 752func (data *Data) TruncateShardGroups(t time.Time) { 753 for i := range data.Databases { 754 dbi := &data.Databases[i] 755 756 for j := range dbi.RetentionPolicies { 757 rpi := &dbi.RetentionPolicies[j] 758 759 for k := range rpi.ShardGroups { 760 sgi := &rpi.ShardGroups[k] 761 762 if !t.Before(sgi.EndTime) || sgi.Deleted() || (sgi.Truncated() && sgi.TruncatedAt.Before(t)) { 763 continue 764 } 765 766 if !t.After(sgi.StartTime) { 767 // future shardgroup 768 sgi.TruncatedAt = sgi.StartTime 769 } else { 770 sgi.TruncatedAt = t 771 } 772 } 773 } 774 } 775} 776 777// hasAdminUser exhaustively checks for the presence of at least one admin 778// user. 779func (data *Data) hasAdminUser() bool { 780 for _, u := range data.Users { 781 if u.Admin { 782 return true 783 } 784 } 785 return false 786} 787 788// ImportData imports selected data into the current metadata. 789// if non-empty, backupDBName, restoreDBName, backupRPName, restoreRPName can be used to select DB metadata from other, 790// and to assign a new name to the imported data. Returns a map of shard ID's in the old metadata to new shard ID's 791// in the new metadata, along with a list of new databases created, both of which can assist in the import of existing 792// shard data during a database restore. 793func (data *Data) ImportData(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string) (map[uint64]uint64, []string, error) { 794 shardIDMap := make(map[uint64]uint64) 795 if backupDBName != "" { 796 dbName, err := data.importOneDB(other, backupDBName, restoreDBName, backupRPName, restoreRPName, shardIDMap) 797 if err != nil { 798 return nil, nil, err 799 } 800 801 return shardIDMap, []string{dbName}, nil 802 } 803 804 // if no backupDBName then we'll try to import all the DB's. If one of them fails, we'll mark the whole 805 // operation a failure and return an error. 806 var newDBs []string 807 for _, dbi := range other.Databases { 808 if dbi.Name == "_internal" { 809 continue 810 } 811 dbName, err := data.importOneDB(other, dbi.Name, "", "", "", shardIDMap) 812 if err != nil { 813 return nil, nil, err 814 } 815 newDBs = append(newDBs, dbName) 816 } 817 return shardIDMap, newDBs, nil 818} 819 820// importOneDB imports a single database/rp from an external metadata object, renaming them if new names are provided. 821func (data *Data) importOneDB(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string, shardIDMap map[uint64]uint64) (string, error) { 822 823 dbPtr := other.Database(backupDBName) 824 if dbPtr == nil { 825 return "", fmt.Errorf("imported metadata does not have datbase named %s", backupDBName) 826 } 827 828 if restoreDBName == "" { 829 restoreDBName = backupDBName 830 } 831 832 if data.Database(restoreDBName) != nil { 833 return "", errors.New("database already exists") 834 } 835 836 // change the names if we want/need to 837 err := data.CreateDatabase(restoreDBName) 838 if err != nil { 839 return "", err 840 } 841 dbImport := data.Database(restoreDBName) 842 843 if backupRPName != "" { 844 rpPtr := dbPtr.RetentionPolicy(backupRPName) 845 846 if rpPtr != nil { 847 rpImport := rpPtr.clone() 848 if restoreRPName == "" { 849 restoreRPName = backupRPName 850 } 851 rpImport.Name = restoreRPName 852 dbImport.RetentionPolicies = []RetentionPolicyInfo{rpImport} 853 dbImport.DefaultRetentionPolicy = restoreRPName 854 } else { 855 return "", fmt.Errorf("retention Policy not found in meta backup: %s.%s", backupDBName, backupRPName) 856 } 857 858 } else { // import all RP's without renaming 859 dbImport.DefaultRetentionPolicy = dbPtr.DefaultRetentionPolicy 860 if dbPtr.RetentionPolicies != nil { 861 dbImport.RetentionPolicies = make([]RetentionPolicyInfo, len(dbPtr.RetentionPolicies)) 862 for i := range dbPtr.RetentionPolicies { 863 dbImport.RetentionPolicies[i] = dbPtr.RetentionPolicies[i].clone() 864 } 865 } 866 867 } 868 869 // renumber the shard groups and shards for the new retention policy(ies) 870 for _, rpImport := range dbImport.RetentionPolicies { 871 for j, sgImport := range rpImport.ShardGroups { 872 data.MaxShardGroupID++ 873 rpImport.ShardGroups[j].ID = data.MaxShardGroupID 874 for k, _ := range sgImport.Shards { 875 data.MaxShardID++ 876 shardIDMap[sgImport.Shards[k].ID] = data.MaxShardID 877 sgImport.Shards[k].ID = data.MaxShardID 878 // OSS doesn't use Owners but if we are importing this from Enterprise, we'll want to clear it out 879 // to avoid any issues if they ever export this DB again to bring back to Enterprise. 880 sgImport.Shards[k].Owners = []ShardOwner{} 881 } 882 } 883 } 884 885 return restoreDBName, nil 886} 887 888// NodeInfo represents information about a single node in the cluster. 889type NodeInfo struct { 890 ID uint64 891 Host string 892 TCPHost string 893} 894 895// NodeInfos is a slice of NodeInfo used for sorting 896type NodeInfos []NodeInfo 897 898// Len implements sort.Interface. 899func (n NodeInfos) Len() int { return len(n) } 900 901// Swap implements sort.Interface. 902func (n NodeInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] } 903 904// Less implements sort.Interface. 905func (n NodeInfos) Less(i, j int) bool { return n[i].ID < n[j].ID } 906 907// DatabaseInfo represents information about a database in the system. 908type DatabaseInfo struct { 909 Name string 910 DefaultRetentionPolicy string 911 RetentionPolicies []RetentionPolicyInfo 912 ContinuousQueries []ContinuousQueryInfo 913} 914 915// RetentionPolicy returns a retention policy by name. 916func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo { 917 if name == "" { 918 if di.DefaultRetentionPolicy == "" { 919 return nil 920 } 921 name = di.DefaultRetentionPolicy 922 } 923 924 for i := range di.RetentionPolicies { 925 if di.RetentionPolicies[i].Name == name { 926 return &di.RetentionPolicies[i] 927 } 928 } 929 return nil 930} 931 932// ShardInfos returns a list of all shards' info for the database. 933func (di DatabaseInfo) ShardInfos() []ShardInfo { 934 shards := map[uint64]*ShardInfo{} 935 for i := range di.RetentionPolicies { 936 for j := range di.RetentionPolicies[i].ShardGroups { 937 sg := di.RetentionPolicies[i].ShardGroups[j] 938 // Skip deleted shard groups 939 if sg.Deleted() { 940 continue 941 } 942 for k := range sg.Shards { 943 si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k] 944 shards[si.ID] = si 945 } 946 } 947 } 948 949 infos := make([]ShardInfo, 0, len(shards)) 950 for _, info := range shards { 951 infos = append(infos, *info) 952 } 953 954 return infos 955} 956 957// clone returns a deep copy of di. 958func (di DatabaseInfo) clone() DatabaseInfo { 959 other := di 960 961 if di.RetentionPolicies != nil { 962 other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies)) 963 for i := range di.RetentionPolicies { 964 other.RetentionPolicies[i] = di.RetentionPolicies[i].clone() 965 } 966 } 967 968 // Copy continuous queries. 969 if di.ContinuousQueries != nil { 970 other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries)) 971 for i := range di.ContinuousQueries { 972 other.ContinuousQueries[i] = di.ContinuousQueries[i].clone() 973 } 974 } 975 976 return other 977} 978 979// marshal serializes to a protobuf representation. 980func (di DatabaseInfo) marshal() *internal.DatabaseInfo { 981 pb := &internal.DatabaseInfo{} 982 pb.Name = proto.String(di.Name) 983 pb.DefaultRetentionPolicy = proto.String(di.DefaultRetentionPolicy) 984 985 pb.RetentionPolicies = make([]*internal.RetentionPolicyInfo, len(di.RetentionPolicies)) 986 for i := range di.RetentionPolicies { 987 pb.RetentionPolicies[i] = di.RetentionPolicies[i].marshal() 988 } 989 990 pb.ContinuousQueries = make([]*internal.ContinuousQueryInfo, len(di.ContinuousQueries)) 991 for i := range di.ContinuousQueries { 992 pb.ContinuousQueries[i] = di.ContinuousQueries[i].marshal() 993 } 994 return pb 995} 996 997// unmarshal deserializes from a protobuf representation. 998func (di *DatabaseInfo) unmarshal(pb *internal.DatabaseInfo) { 999 di.Name = pb.GetName() 1000 di.DefaultRetentionPolicy = pb.GetDefaultRetentionPolicy() 1001 1002 if len(pb.GetRetentionPolicies()) > 0 { 1003 di.RetentionPolicies = make([]RetentionPolicyInfo, len(pb.GetRetentionPolicies())) 1004 for i, x := range pb.GetRetentionPolicies() { 1005 di.RetentionPolicies[i].unmarshal(x) 1006 } 1007 } 1008 1009 if len(pb.GetContinuousQueries()) > 0 { 1010 di.ContinuousQueries = make([]ContinuousQueryInfo, len(pb.GetContinuousQueries())) 1011 for i, x := range pb.GetContinuousQueries() { 1012 di.ContinuousQueries[i].unmarshal(x) 1013 } 1014 } 1015} 1016 1017// RetentionPolicySpec represents the specification for a new retention policy. 1018type RetentionPolicySpec struct { 1019 Name string 1020 ReplicaN *int 1021 Duration *time.Duration 1022 ShardGroupDuration time.Duration 1023} 1024 1025// NewRetentionPolicyInfo creates a new retention policy info from the specification. 1026func (s *RetentionPolicySpec) NewRetentionPolicyInfo() *RetentionPolicyInfo { 1027 return DefaultRetentionPolicyInfo().Apply(s) 1028} 1029 1030// Matches checks if this retention policy specification matches 1031// an existing retention policy. 1032func (s *RetentionPolicySpec) Matches(rpi *RetentionPolicyInfo) bool { 1033 if rpi == nil { 1034 return false 1035 } else if s.Name != "" && s.Name != rpi.Name { 1036 return false 1037 } else if s.Duration != nil && *s.Duration != rpi.Duration { 1038 return false 1039 } else if s.ReplicaN != nil && *s.ReplicaN != rpi.ReplicaN { 1040 return false 1041 } 1042 1043 // Normalise ShardDuration before comparing to any existing retention policies. 1044 // Normalize with the retention policy info's duration instead of the spec 1045 // since they should be the same and we're performing a comparison. 1046 sgDuration := normalisedShardDuration(s.ShardGroupDuration, rpi.Duration) 1047 return sgDuration == rpi.ShardGroupDuration 1048} 1049 1050// marshal serializes to a protobuf representation. 1051func (s *RetentionPolicySpec) marshal() *internal.RetentionPolicySpec { 1052 pb := &internal.RetentionPolicySpec{} 1053 if s.Name != "" { 1054 pb.Name = proto.String(s.Name) 1055 } 1056 if s.Duration != nil { 1057 pb.Duration = proto.Int64(int64(*s.Duration)) 1058 } 1059 if s.ShardGroupDuration > 0 { 1060 pb.ShardGroupDuration = proto.Int64(int64(s.ShardGroupDuration)) 1061 } 1062 if s.ReplicaN != nil { 1063 pb.ReplicaN = proto.Uint32(uint32(*s.ReplicaN)) 1064 } 1065 return pb 1066} 1067 1068// unmarshal deserializes from a protobuf representation. 1069func (s *RetentionPolicySpec) unmarshal(pb *internal.RetentionPolicySpec) { 1070 if pb.Name != nil { 1071 s.Name = pb.GetName() 1072 } 1073 if pb.Duration != nil { 1074 duration := time.Duration(pb.GetDuration()) 1075 s.Duration = &duration 1076 } 1077 if pb.ShardGroupDuration != nil { 1078 s.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration()) 1079 } 1080 if pb.ReplicaN != nil { 1081 replicaN := int(pb.GetReplicaN()) 1082 s.ReplicaN = &replicaN 1083 } 1084} 1085 1086// MarshalBinary encodes RetentionPolicySpec to a binary format. 1087func (s *RetentionPolicySpec) MarshalBinary() ([]byte, error) { 1088 return proto.Marshal(s.marshal()) 1089} 1090 1091// UnmarshalBinary decodes RetentionPolicySpec from a binary format. 1092func (s *RetentionPolicySpec) UnmarshalBinary(data []byte) error { 1093 var pb internal.RetentionPolicySpec 1094 if err := proto.Unmarshal(data, &pb); err != nil { 1095 return err 1096 } 1097 s.unmarshal(&pb) 1098 return nil 1099} 1100 1101// RetentionPolicyInfo represents metadata about a retention policy. 1102type RetentionPolicyInfo struct { 1103 Name string 1104 ReplicaN int 1105 Duration time.Duration 1106 ShardGroupDuration time.Duration 1107 ShardGroups []ShardGroupInfo 1108 Subscriptions []SubscriptionInfo 1109} 1110 1111// NewRetentionPolicyInfo returns a new instance of RetentionPolicyInfo 1112// with default replication and duration. 1113func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo { 1114 return &RetentionPolicyInfo{ 1115 Name: name, 1116 ReplicaN: DefaultRetentionPolicyReplicaN, 1117 Duration: DefaultRetentionPolicyDuration, 1118 } 1119} 1120 1121// DefaultRetentionPolicyInfo returns a new instance of RetentionPolicyInfo 1122// with default name, replication, and duration. 1123func DefaultRetentionPolicyInfo() *RetentionPolicyInfo { 1124 return NewRetentionPolicyInfo(DefaultRetentionPolicyName) 1125} 1126 1127// Apply applies a specification to the retention policy info. 1128func (rpi *RetentionPolicyInfo) Apply(spec *RetentionPolicySpec) *RetentionPolicyInfo { 1129 rp := &RetentionPolicyInfo{ 1130 Name: rpi.Name, 1131 ReplicaN: rpi.ReplicaN, 1132 Duration: rpi.Duration, 1133 ShardGroupDuration: rpi.ShardGroupDuration, 1134 } 1135 if spec.Name != "" { 1136 rp.Name = spec.Name 1137 } 1138 if spec.ReplicaN != nil { 1139 rp.ReplicaN = *spec.ReplicaN 1140 } 1141 if spec.Duration != nil { 1142 rp.Duration = *spec.Duration 1143 } 1144 rp.ShardGroupDuration = normalisedShardDuration(spec.ShardGroupDuration, rp.Duration) 1145 return rp 1146} 1147 1148// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp, 1149// or nil if no shard group matches. 1150func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo { 1151 for i := range rpi.ShardGroups { 1152 sgi := &rpi.ShardGroups[i] 1153 if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) { 1154 return &rpi.ShardGroups[i] 1155 } 1156 } 1157 1158 return nil 1159} 1160 1161// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time. 1162func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo { 1163 var groups = make([]*ShardGroupInfo, 0) 1164 for i := range rpi.ShardGroups { 1165 if rpi.ShardGroups[i].Deleted() { 1166 continue 1167 } 1168 if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) { 1169 groups = append(groups, &rpi.ShardGroups[i]) 1170 } 1171 } 1172 return groups 1173} 1174 1175// DeletedShardGroups returns the Shard Groups which are marked as deleted. 1176func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo { 1177 var groups = make([]*ShardGroupInfo, 0) 1178 for i := range rpi.ShardGroups { 1179 if rpi.ShardGroups[i].Deleted() { 1180 groups = append(groups, &rpi.ShardGroups[i]) 1181 } 1182 } 1183 return groups 1184} 1185 1186// marshal serializes to a protobuf representation. 1187func (rpi *RetentionPolicyInfo) marshal() *internal.RetentionPolicyInfo { 1188 pb := &internal.RetentionPolicyInfo{ 1189 Name: proto.String(rpi.Name), 1190 ReplicaN: proto.Uint32(uint32(rpi.ReplicaN)), 1191 Duration: proto.Int64(int64(rpi.Duration)), 1192 ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)), 1193 } 1194 1195 pb.ShardGroups = make([]*internal.ShardGroupInfo, len(rpi.ShardGroups)) 1196 for i, sgi := range rpi.ShardGroups { 1197 pb.ShardGroups[i] = sgi.marshal() 1198 } 1199 1200 pb.Subscriptions = make([]*internal.SubscriptionInfo, len(rpi.Subscriptions)) 1201 for i, sub := range rpi.Subscriptions { 1202 pb.Subscriptions[i] = sub.marshal() 1203 } 1204 1205 return pb 1206} 1207 1208// unmarshal deserializes from a protobuf representation. 1209func (rpi *RetentionPolicyInfo) unmarshal(pb *internal.RetentionPolicyInfo) { 1210 rpi.Name = pb.GetName() 1211 rpi.ReplicaN = int(pb.GetReplicaN()) 1212 rpi.Duration = time.Duration(pb.GetDuration()) 1213 rpi.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration()) 1214 1215 if len(pb.GetShardGroups()) > 0 { 1216 rpi.ShardGroups = make([]ShardGroupInfo, len(pb.GetShardGroups())) 1217 for i, x := range pb.GetShardGroups() { 1218 rpi.ShardGroups[i].unmarshal(x) 1219 } 1220 } 1221 if len(pb.GetSubscriptions()) > 0 { 1222 rpi.Subscriptions = make([]SubscriptionInfo, len(pb.GetSubscriptions())) 1223 for i, x := range pb.GetSubscriptions() { 1224 rpi.Subscriptions[i].unmarshal(x) 1225 } 1226 } 1227} 1228 1229// clone returns a deep copy of rpi. 1230func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo { 1231 other := rpi 1232 1233 if rpi.ShardGroups != nil { 1234 other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups)) 1235 for i := range rpi.ShardGroups { 1236 other.ShardGroups[i] = rpi.ShardGroups[i].clone() 1237 } 1238 } 1239 1240 return other 1241} 1242 1243// MarshalBinary encodes rpi to a binary format. 1244func (rpi *RetentionPolicyInfo) MarshalBinary() ([]byte, error) { 1245 return proto.Marshal(rpi.marshal()) 1246} 1247 1248// UnmarshalBinary decodes rpi from a binary format. 1249func (rpi *RetentionPolicyInfo) UnmarshalBinary(data []byte) error { 1250 var pb internal.RetentionPolicyInfo 1251 if err := proto.Unmarshal(data, &pb); err != nil { 1252 return err 1253 } 1254 rpi.unmarshal(&pb) 1255 return nil 1256} 1257 1258// shardGroupDuration returns the default duration for a shard group based on a policy duration. 1259func shardGroupDuration(d time.Duration) time.Duration { 1260 if d >= 180*24*time.Hour || d == 0 { // 6 months or 0 1261 return 7 * 24 * time.Hour 1262 } else if d >= 2*24*time.Hour { // 2 days 1263 return 1 * 24 * time.Hour 1264 } 1265 return 1 * time.Hour 1266} 1267 1268// normalisedShardDuration returns normalised shard duration based on a policy duration. 1269func normalisedShardDuration(sgd, d time.Duration) time.Duration { 1270 // If it is zero, it likely wasn't specified, so we default to the shard group duration 1271 if sgd == 0 { 1272 return shardGroupDuration(d) 1273 } 1274 // If it was specified, but it's less than the MinRetentionPolicyDuration, then normalize 1275 // to the MinRetentionPolicyDuration 1276 if sgd < MinRetentionPolicyDuration { 1277 return shardGroupDuration(MinRetentionPolicyDuration) 1278 } 1279 return sgd 1280} 1281 1282// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important 1283// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system 1284// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can 1285// safely delete any associated shards. 1286type ShardGroupInfo struct { 1287 ID uint64 1288 StartTime time.Time 1289 EndTime time.Time 1290 DeletedAt time.Time 1291 Shards []ShardInfo 1292 TruncatedAt time.Time 1293} 1294 1295// ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based 1296// on the StartTime field. 1297type ShardGroupInfos []ShardGroupInfo 1298 1299// Len implements sort.Interface. 1300func (a ShardGroupInfos) Len() int { return len(a) } 1301 1302// Swap implements sort.Interface. 1303func (a ShardGroupInfos) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1304 1305// Less implements sort.Interface. 1306func (a ShardGroupInfos) Less(i, j int) bool { 1307 iEnd := a[i].EndTime 1308 if a[i].Truncated() { 1309 iEnd = a[i].TruncatedAt 1310 } 1311 1312 jEnd := a[j].EndTime 1313 if a[j].Truncated() { 1314 jEnd = a[j].TruncatedAt 1315 } 1316 1317 if iEnd.Equal(jEnd) { 1318 return a[i].StartTime.Before(a[j].StartTime) 1319 } 1320 1321 return iEnd.Before(jEnd) 1322} 1323 1324// Contains returns true iif StartTime ≤ t < EndTime. 1325func (sgi *ShardGroupInfo) Contains(t time.Time) bool { 1326 return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime) 1327} 1328 1329// Overlaps returns whether the shard group contains data for the time range between min and max 1330func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool { 1331 return !sgi.StartTime.After(max) && sgi.EndTime.After(min) 1332} 1333 1334// Deleted returns whether this ShardGroup has been deleted. 1335func (sgi *ShardGroupInfo) Deleted() bool { 1336 return !sgi.DeletedAt.IsZero() 1337} 1338 1339// Truncated returns true if this ShardGroup has been truncated (no new writes). 1340func (sgi *ShardGroupInfo) Truncated() bool { 1341 return !sgi.TruncatedAt.IsZero() 1342} 1343 1344// clone returns a deep copy of sgi. 1345func (sgi ShardGroupInfo) clone() ShardGroupInfo { 1346 other := sgi 1347 1348 if sgi.Shards != nil { 1349 other.Shards = make([]ShardInfo, len(sgi.Shards)) 1350 for i := range sgi.Shards { 1351 other.Shards[i] = sgi.Shards[i].clone() 1352 } 1353 } 1354 1355 return other 1356} 1357 1358// ShardFor returns the ShardInfo for a Point hash. 1359func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo { 1360 return sgi.Shards[hash%uint64(len(sgi.Shards))] 1361} 1362 1363// marshal serializes to a protobuf representation. 1364func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo { 1365 pb := &internal.ShardGroupInfo{ 1366 ID: proto.Uint64(sgi.ID), 1367 StartTime: proto.Int64(MarshalTime(sgi.StartTime)), 1368 EndTime: proto.Int64(MarshalTime(sgi.EndTime)), 1369 DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)), 1370 } 1371 1372 if !sgi.TruncatedAt.IsZero() { 1373 pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt)) 1374 } 1375 1376 pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards)) 1377 for i := range sgi.Shards { 1378 pb.Shards[i] = sgi.Shards[i].marshal() 1379 } 1380 1381 return pb 1382} 1383 1384// unmarshal deserializes from a protobuf representation. 1385func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) { 1386 sgi.ID = pb.GetID() 1387 if i := pb.GetStartTime(); i == 0 { 1388 sgi.StartTime = time.Unix(0, 0).UTC() 1389 } else { 1390 sgi.StartTime = UnmarshalTime(i) 1391 } 1392 if i := pb.GetEndTime(); i == 0 { 1393 sgi.EndTime = time.Unix(0, 0).UTC() 1394 } else { 1395 sgi.EndTime = UnmarshalTime(i) 1396 } 1397 sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt()) 1398 1399 if pb != nil && pb.TruncatedAt != nil { 1400 sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt()) 1401 } 1402 1403 if len(pb.GetShards()) > 0 { 1404 sgi.Shards = make([]ShardInfo, len(pb.GetShards())) 1405 for i, x := range pb.GetShards() { 1406 sgi.Shards[i].unmarshal(x) 1407 } 1408 } 1409} 1410 1411// ShardInfo represents metadata about a shard. 1412type ShardInfo struct { 1413 ID uint64 1414 Owners []ShardOwner 1415} 1416 1417// OwnedBy determines whether the shard's owner IDs includes nodeID. 1418func (si ShardInfo) OwnedBy(nodeID uint64) bool { 1419 for _, so := range si.Owners { 1420 if so.NodeID == nodeID { 1421 return true 1422 } 1423 } 1424 return false 1425} 1426 1427// clone returns a deep copy of si. 1428func (si ShardInfo) clone() ShardInfo { 1429 other := si 1430 1431 if si.Owners != nil { 1432 other.Owners = make([]ShardOwner, len(si.Owners)) 1433 for i := range si.Owners { 1434 other.Owners[i] = si.Owners[i].clone() 1435 } 1436 } 1437 1438 return other 1439} 1440 1441// marshal serializes to a protobuf representation. 1442func (si ShardInfo) marshal() *internal.ShardInfo { 1443 pb := &internal.ShardInfo{ 1444 ID: proto.Uint64(si.ID), 1445 } 1446 1447 pb.Owners = make([]*internal.ShardOwner, len(si.Owners)) 1448 for i := range si.Owners { 1449 pb.Owners[i] = si.Owners[i].marshal() 1450 } 1451 1452 return pb 1453} 1454 1455// UnmarshalBinary decodes the object from a binary format. 1456func (si *ShardInfo) UnmarshalBinary(buf []byte) error { 1457 var pb internal.ShardInfo 1458 if err := proto.Unmarshal(buf, &pb); err != nil { 1459 return err 1460 } 1461 si.unmarshal(&pb) 1462 return nil 1463} 1464 1465// unmarshal deserializes from a protobuf representation. 1466func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) { 1467 si.ID = pb.GetID() 1468 1469 // If deprecated "OwnerIDs" exists then convert it to "Owners" format. 1470 if len(pb.GetOwnerIDs()) > 0 { 1471 si.Owners = make([]ShardOwner, len(pb.GetOwnerIDs())) 1472 for i, x := range pb.GetOwnerIDs() { 1473 si.Owners[i].unmarshal(&internal.ShardOwner{ 1474 NodeID: proto.Uint64(x), 1475 }) 1476 } 1477 } else if len(pb.GetOwners()) > 0 { 1478 si.Owners = make([]ShardOwner, len(pb.GetOwners())) 1479 for i, x := range pb.GetOwners() { 1480 si.Owners[i].unmarshal(x) 1481 } 1482 } 1483} 1484 1485// SubscriptionInfo holds the subscription information. 1486type SubscriptionInfo struct { 1487 Name string 1488 Mode string 1489 Destinations []string 1490} 1491 1492// marshal serializes to a protobuf representation. 1493func (si SubscriptionInfo) marshal() *internal.SubscriptionInfo { 1494 pb := &internal.SubscriptionInfo{ 1495 Name: proto.String(si.Name), 1496 Mode: proto.String(si.Mode), 1497 } 1498 1499 pb.Destinations = make([]string, len(si.Destinations)) 1500 for i := range si.Destinations { 1501 pb.Destinations[i] = si.Destinations[i] 1502 } 1503 return pb 1504} 1505 1506// unmarshal deserializes from a protobuf representation. 1507func (si *SubscriptionInfo) unmarshal(pb *internal.SubscriptionInfo) { 1508 si.Name = pb.GetName() 1509 si.Mode = pb.GetMode() 1510 1511 if len(pb.GetDestinations()) > 0 { 1512 si.Destinations = make([]string, len(pb.GetDestinations())) 1513 copy(si.Destinations, pb.GetDestinations()) 1514 } 1515} 1516 1517// ShardOwner represents a node that owns a shard. 1518type ShardOwner struct { 1519 NodeID uint64 1520} 1521 1522// clone returns a deep copy of so. 1523func (so ShardOwner) clone() ShardOwner { 1524 return so 1525} 1526 1527// marshal serializes to a protobuf representation. 1528func (so ShardOwner) marshal() *internal.ShardOwner { 1529 return &internal.ShardOwner{ 1530 NodeID: proto.Uint64(so.NodeID), 1531 } 1532} 1533 1534// unmarshal deserializes from a protobuf representation. 1535func (so *ShardOwner) unmarshal(pb *internal.ShardOwner) { 1536 so.NodeID = pb.GetNodeID() 1537} 1538 1539// ContinuousQueryInfo represents metadata about a continuous query. 1540type ContinuousQueryInfo struct { 1541 Name string 1542 Query string 1543} 1544 1545// clone returns a deep copy of cqi. 1546func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi } 1547 1548// marshal serializes to a protobuf representation. 1549func (cqi ContinuousQueryInfo) marshal() *internal.ContinuousQueryInfo { 1550 return &internal.ContinuousQueryInfo{ 1551 Name: proto.String(cqi.Name), 1552 Query: proto.String(cqi.Query), 1553 } 1554} 1555 1556// unmarshal deserializes from a protobuf representation. 1557func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) { 1558 cqi.Name = pb.GetName() 1559 cqi.Query = pb.GetQuery() 1560} 1561 1562var _ query.Authorizer = (*UserInfo)(nil) 1563 1564// UserInfo represents metadata about a user in the system. 1565type UserInfo struct { 1566 // User's name. 1567 Name string 1568 1569 // Hashed password. 1570 Hash string 1571 1572 // Whether the user is an admin, i.e. allowed to do everything. 1573 Admin bool 1574 1575 // Map of database name to granted privilege. 1576 Privileges map[string]influxql.Privilege 1577} 1578 1579type User interface { 1580 query.Authorizer 1581 ID() string 1582 AuthorizeUnrestricted() bool 1583} 1584 1585func (u *UserInfo) ID() string { 1586 return u.Name 1587} 1588 1589// AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database. 1590func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool { 1591 if ui.Admin || privilege == influxql.NoPrivileges { 1592 return true 1593 } 1594 p, ok := ui.Privileges[database] 1595 return ok && (p == privilege || p == influxql.AllPrivileges) 1596} 1597 1598// AuthorizeSeriesRead is used to limit access per-series (enterprise only) 1599func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { 1600 return true 1601} 1602 1603// AuthorizeSeriesWrite is used to limit access per-series (enterprise only) 1604func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { 1605 return true 1606} 1607 1608// AuthorizeUnrestricted allows admins to shortcut access checks. 1609func (u *UserInfo) AuthorizeUnrestricted() bool { 1610 return u.Admin 1611} 1612 1613// clone returns a deep copy of si. 1614func (ui UserInfo) clone() UserInfo { 1615 other := ui 1616 1617 if ui.Privileges != nil { 1618 other.Privileges = make(map[string]influxql.Privilege) 1619 for k, v := range ui.Privileges { 1620 other.Privileges[k] = v 1621 } 1622 } 1623 1624 return other 1625} 1626 1627// marshal serializes to a protobuf representation. 1628func (ui UserInfo) marshal() *internal.UserInfo { 1629 pb := &internal.UserInfo{ 1630 Name: proto.String(ui.Name), 1631 Hash: proto.String(ui.Hash), 1632 Admin: proto.Bool(ui.Admin), 1633 } 1634 1635 for database, privilege := range ui.Privileges { 1636 pb.Privileges = append(pb.Privileges, &internal.UserPrivilege{ 1637 Database: proto.String(database), 1638 Privilege: proto.Int32(int32(privilege)), 1639 }) 1640 } 1641 1642 return pb 1643} 1644 1645// unmarshal deserializes from a protobuf representation. 1646func (ui *UserInfo) unmarshal(pb *internal.UserInfo) { 1647 ui.Name = pb.GetName() 1648 ui.Hash = pb.GetHash() 1649 ui.Admin = pb.GetAdmin() 1650 1651 ui.Privileges = make(map[string]influxql.Privilege) 1652 for _, p := range pb.GetPrivileges() { 1653 ui.Privileges[p.GetDatabase()] = influxql.Privilege(p.GetPrivilege()) 1654 } 1655} 1656 1657// Lease represents a lease held on a resource. 1658type Lease struct { 1659 Name string `json:"name"` 1660 Expiration time.Time `json:"expiration"` 1661 Owner uint64 `json:"owner"` 1662} 1663 1664// Leases is a concurrency-safe collection of leases keyed by name. 1665type Leases struct { 1666 mu sync.Mutex 1667 m map[string]*Lease 1668 d time.Duration 1669} 1670 1671// NewLeases returns a new instance of Leases. 1672func NewLeases(d time.Duration) *Leases { 1673 return &Leases{ 1674 m: make(map[string]*Lease), 1675 d: d, 1676 } 1677} 1678 1679// Acquire acquires a lease with the given name for the given nodeID. 1680// If the lease doesn't exist or exists but is expired, a valid lease is returned. 1681// If nodeID already owns the named and unexpired lease, the lease expiration is extended. 1682// If a different node owns the lease, an error is returned. 1683func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) { 1684 leases.mu.Lock() 1685 defer leases.mu.Unlock() 1686 1687 l := leases.m[name] 1688 if l != nil { 1689 if time.Now().After(l.Expiration) || l.Owner == nodeID { 1690 l.Expiration = time.Now().Add(leases.d) 1691 l.Owner = nodeID 1692 return l, nil 1693 } 1694 return l, errors.New("another node has the lease") 1695 } 1696 1697 l = &Lease{ 1698 Name: name, 1699 Expiration: time.Now().Add(leases.d), 1700 Owner: nodeID, 1701 } 1702 1703 leases.m[name] = l 1704 1705 return l, nil 1706} 1707 1708// MarshalTime converts t to nanoseconds since epoch. A zero time returns 0. 1709func MarshalTime(t time.Time) int64 { 1710 if t.IsZero() { 1711 return 0 1712 } 1713 return t.UnixNano() 1714} 1715 1716// UnmarshalTime converts nanoseconds since epoch to time. 1717// A zero value returns a zero time. 1718func UnmarshalTime(v int64) time.Time { 1719 if v == 0 { 1720 return time.Time{} 1721 } 1722 return time.Unix(0, v).UTC() 1723} 1724 1725// ValidName checks to see if the given name can would be valid for DB/RP name 1726func ValidName(name string) bool { 1727 for _, r := range name { 1728 if !unicode.IsPrint(r) { 1729 return false 1730 } 1731 } 1732 1733 return name != "" && 1734 name != "." && 1735 name != ".." && 1736 !strings.ContainsAny(name, `/\`) 1737} 1738