1package meta 2 3import ( 4 "errors" 5 "net" 6 "net/url" 7 "sort" 8 "strings" 9 "sync" 10 "time" 11 "unicode" 12 13 "fmt" 14 15 "github.com/gogo/protobuf/proto" 16 "github.com/influxdata/influxdb" 17 "github.com/influxdata/influxdb/models" 18 "github.com/influxdata/influxdb/query" 19 internal "github.com/influxdata/influxdb/services/meta/internal" 20 "github.com/influxdata/influxql" 21) 22 23//go:generate protoc --gogo_out=. internal/meta.proto 24 25const ( 26 // DefaultRetentionPolicyReplicaN is the default value of RetentionPolicyInfo.ReplicaN. 27 DefaultRetentionPolicyReplicaN = 1 28 29 // DefaultRetentionPolicyDuration is the default value of RetentionPolicyInfo.Duration. 30 DefaultRetentionPolicyDuration = time.Duration(0) 31 32 // DefaultRetentionPolicyName is the default name for auto generated retention policies. 33 DefaultRetentionPolicyName = "autogen" 34 35 // MinRetentionPolicyDuration represents the minimum duration for a policy. 36 MinRetentionPolicyDuration = time.Hour 37 38 // MaxNameLen is the maximum length of a database or retention policy name. 39 // InfluxDB uses the name for the directory name on disk. 40 MaxNameLen = 255 41) 42 43// Data represents the top level collection of all metadata. 44type Data struct { 45 Term uint64 // associated raft term 46 Index uint64 // associated raft index 47 ClusterID uint64 48 Databases []DatabaseInfo 49 Users []UserInfo 50 51 // adminUserExists provides a constant time mechanism for determining 52 // if there is at least one admin user. 53 adminUserExists bool 54 55 MaxShardGroupID uint64 56 MaxShardID uint64 57} 58 59// Database returns a DatabaseInfo by the database name. 60func (data *Data) Database(name string) *DatabaseInfo { 61 for i := range data.Databases { 62 if data.Databases[i].Name == name { 63 return &data.Databases[i] 64 } 65 } 66 return nil 67} 68 69// CloneDatabases returns a copy of the DatabaseInfo. 70func (data *Data) CloneDatabases() []DatabaseInfo { 71 if data.Databases == nil { 72 return nil 73 } 74 dbs := make([]DatabaseInfo, len(data.Databases)) 75 for i := range data.Databases { 76 dbs[i] = data.Databases[i].clone() 77 } 78 return dbs 79} 80 81// CreateDatabase creates a new database. 82// It returns an error if name is blank or if a database with the same name already exists. 83func (data *Data) CreateDatabase(name string) error { 84 if name == "" { 85 return ErrDatabaseNameRequired 86 } else if len(name) > MaxNameLen { 87 return ErrNameTooLong 88 } else if data.Database(name) != nil { 89 return nil 90 } 91 92 // Append new node. 93 data.Databases = append(data.Databases, DatabaseInfo{Name: name}) 94 95 return nil 96} 97 98// DropDatabase removes a database by name. It does not return an error 99// if the database cannot be found. 100func (data *Data) DropDatabase(name string) error { 101 for i := range data.Databases { 102 if data.Databases[i].Name == name { 103 data.Databases = append(data.Databases[:i], data.Databases[i+1:]...) 104 105 // Remove all user privileges associated with this database. 106 for i := range data.Users { 107 delete(data.Users[i].Privileges, name) 108 } 109 break 110 } 111 } 112 return nil 113} 114 115// RetentionPolicy returns a retention policy for a database by name. 116func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, error) { 117 di := data.Database(database) 118 if di == nil { 119 return nil, influxdb.ErrDatabaseNotFound(database) 120 } 121 122 for i := range di.RetentionPolicies { 123 if di.RetentionPolicies[i].Name == name { 124 return &di.RetentionPolicies[i], nil 125 } 126 } 127 return nil, nil 128} 129 130// CreateRetentionPolicy creates a new retention policy on a database. 131// It returns an error if name is blank or if the database does not exist. 132func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error { 133 // Validate retention policy. 134 if rpi == nil { 135 return ErrRetentionPolicyRequired 136 } else if rpi.Name == "" { 137 return ErrRetentionPolicyNameRequired 138 } else if len(rpi.Name) > MaxNameLen { 139 return ErrNameTooLong 140 } else if rpi.ReplicaN < 1 { 141 return ErrReplicationFactorTooLow 142 } 143 144 // Normalise ShardDuration before comparing to any existing 145 // retention policies. The client is supposed to do this, but 146 // do it again to verify input. 147 rpi.ShardGroupDuration = normalisedShardDuration(rpi.ShardGroupDuration, rpi.Duration) 148 149 if rpi.Duration > 0 && rpi.Duration < rpi.ShardGroupDuration { 150 return ErrIncompatibleDurations 151 } 152 153 // Find database. 154 di := data.Database(database) 155 if di == nil { 156 return influxdb.ErrDatabaseNotFound(database) 157 } else if rp := di.RetentionPolicy(rpi.Name); rp != nil { 158 // RP with that name already exists. Make sure they're the same. 159 if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration { 160 return ErrRetentionPolicyExists 161 } 162 // if they want to make it default, and it's not the default, it's not an identical command so it's an error 163 if makeDefault && di.DefaultRetentionPolicy != rpi.Name { 164 return ErrRetentionPolicyConflict 165 } 166 return nil 167 } 168 169 // Append copy of new policy. 170 di.RetentionPolicies = append(di.RetentionPolicies, *rpi) 171 172 // Set the default if needed 173 if makeDefault { 174 di.DefaultRetentionPolicy = rpi.Name 175 } 176 177 return nil 178} 179 180// DropRetentionPolicy removes a retention policy from a database by name. 181func (data *Data) DropRetentionPolicy(database, name string) error { 182 // Find database. 183 di := data.Database(database) 184 if di == nil { 185 // no database? no problem 186 return nil 187 } 188 189 // Remove from list. 190 for i := range di.RetentionPolicies { 191 if di.RetentionPolicies[i].Name == name { 192 di.RetentionPolicies = append(di.RetentionPolicies[:i], di.RetentionPolicies[i+1:]...) 193 break 194 } 195 } 196 197 return nil 198} 199 200// RetentionPolicyUpdate represents retention policy fields to be updated. 201type RetentionPolicyUpdate struct { 202 Name *string 203 Duration *time.Duration 204 ReplicaN *int 205 ShardGroupDuration *time.Duration 206} 207 208// SetName sets the RetentionPolicyUpdate.Name. 209func (rpu *RetentionPolicyUpdate) SetName(v string) { rpu.Name = &v } 210 211// SetDuration sets the RetentionPolicyUpdate.Duration. 212func (rpu *RetentionPolicyUpdate) SetDuration(v time.Duration) { rpu.Duration = &v } 213 214// SetReplicaN sets the RetentionPolicyUpdate.ReplicaN. 215func (rpu *RetentionPolicyUpdate) SetReplicaN(v int) { rpu.ReplicaN = &v } 216 217// SetShardGroupDuration sets the RetentionPolicyUpdate.ShardGroupDuration. 218func (rpu *RetentionPolicyUpdate) SetShardGroupDuration(v time.Duration) { rpu.ShardGroupDuration = &v } 219 220// UpdateRetentionPolicy updates an existing retention policy. 221func (data *Data) UpdateRetentionPolicy(database, name string, rpu *RetentionPolicyUpdate, makeDefault bool) error { 222 // Find database. 223 di := data.Database(database) 224 if di == nil { 225 return influxdb.ErrDatabaseNotFound(database) 226 } 227 228 // Find policy. 229 rpi := di.RetentionPolicy(name) 230 if rpi == nil { 231 return influxdb.ErrRetentionPolicyNotFound(name) 232 } 233 234 // Ensure new policy doesn't match an existing policy. 235 if rpu.Name != nil && *rpu.Name != name && di.RetentionPolicy(*rpu.Name) != nil { 236 return ErrRetentionPolicyNameExists 237 } 238 239 // Enforce duration of at least MinRetentionPolicyDuration 240 if rpu.Duration != nil && *rpu.Duration < MinRetentionPolicyDuration && *rpu.Duration != 0 { 241 return ErrRetentionPolicyDurationTooLow 242 } 243 244 // Enforce duration is at least the shard duration 245 if (rpu.Duration != nil && *rpu.Duration > 0 && 246 ((rpu.ShardGroupDuration != nil && *rpu.Duration < *rpu.ShardGroupDuration) || 247 (rpu.ShardGroupDuration == nil && *rpu.Duration < rpi.ShardGroupDuration))) || 248 (rpu.Duration == nil && rpi.Duration > 0 && 249 rpu.ShardGroupDuration != nil && rpi.Duration < *rpu.ShardGroupDuration) { 250 return ErrIncompatibleDurations 251 } 252 253 // Update fields. 254 if rpu.Name != nil { 255 rpi.Name = *rpu.Name 256 } 257 if rpu.Duration != nil { 258 rpi.Duration = *rpu.Duration 259 } 260 if rpu.ReplicaN != nil { 261 rpi.ReplicaN = *rpu.ReplicaN 262 } 263 if rpu.ShardGroupDuration != nil { 264 rpi.ShardGroupDuration = normalisedShardDuration(*rpu.ShardGroupDuration, rpi.Duration) 265 } 266 267 if di.DefaultRetentionPolicy != rpi.Name && makeDefault { 268 di.DefaultRetentionPolicy = rpi.Name 269 } 270 271 return nil 272} 273 274// DropShard removes a shard by ID. 275// 276// DropShard won't return an error if the shard can't be found, which 277// allows the command to be re-run in the case that the meta store 278// succeeds but a data node fails. 279func (data *Data) DropShard(id uint64) { 280 found := -1 281 for dbidx, dbi := range data.Databases { 282 for rpidx, rpi := range dbi.RetentionPolicies { 283 for sgidx, sg := range rpi.ShardGroups { 284 for sidx, s := range sg.Shards { 285 if s.ID == id { 286 found = sidx 287 break 288 } 289 } 290 291 if found > -1 { 292 shards := sg.Shards 293 data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].Shards = append(shards[:found], shards[found+1:]...) 294 295 if len(shards) == 1 { 296 // We just deleted the last shard in the shard group. 297 data.Databases[dbidx].RetentionPolicies[rpidx].ShardGroups[sgidx].DeletedAt = time.Now() 298 } 299 return 300 } 301 } 302 } 303 } 304} 305 306// ShardGroups returns a list of all shard groups on a database and retention policy. 307func (data *Data) ShardGroups(database, policy string) ([]ShardGroupInfo, error) { 308 // Find retention policy. 309 rpi, err := data.RetentionPolicy(database, policy) 310 if err != nil { 311 return nil, err 312 } else if rpi == nil { 313 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 314 } 315 groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) 316 for _, g := range rpi.ShardGroups { 317 if g.Deleted() { 318 continue 319 } 320 groups = append(groups, g) 321 } 322 return groups, nil 323} 324 325// ShardGroupsByTimeRange returns a list of all shard groups on a database and policy that may contain data 326// for the specified time range. Shard groups are sorted by start time. 327func (data *Data) ShardGroupsByTimeRange(database, policy string, tmin, tmax time.Time) ([]ShardGroupInfo, error) { 328 // Find retention policy. 329 rpi, err := data.RetentionPolicy(database, policy) 330 if err != nil { 331 return nil, err 332 } else if rpi == nil { 333 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 334 } 335 groups := make([]ShardGroupInfo, 0, len(rpi.ShardGroups)) 336 for _, g := range rpi.ShardGroups { 337 if g.Deleted() || !g.Overlaps(tmin, tmax) { 338 continue 339 } 340 groups = append(groups, g) 341 } 342 return groups, nil 343} 344 345// ShardGroupByTimestamp returns the shard group on a database and policy for a given timestamp. 346func (data *Data) ShardGroupByTimestamp(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { 347 // Find retention policy. 348 rpi, err := data.RetentionPolicy(database, policy) 349 if err != nil { 350 return nil, err 351 } else if rpi == nil { 352 return nil, influxdb.ErrRetentionPolicyNotFound(policy) 353 } 354 355 return rpi.ShardGroupByTimestamp(timestamp), nil 356} 357 358// CreateShardGroup creates a shard group on a database and policy for a given timestamp. 359func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error { 360 // Find retention policy. 361 rpi, err := data.RetentionPolicy(database, policy) 362 if err != nil { 363 return err 364 } else if rpi == nil { 365 return influxdb.ErrRetentionPolicyNotFound(policy) 366 } 367 368 // Verify that shard group doesn't already exist for this timestamp. 369 if rpi.ShardGroupByTimestamp(timestamp) != nil { 370 return nil 371 } 372 373 // Create the shard group. 374 data.MaxShardGroupID++ 375 sgi := ShardGroupInfo{} 376 sgi.ID = data.MaxShardGroupID 377 sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC() 378 sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC() 379 if sgi.EndTime.After(time.Unix(0, models.MaxNanoTime)) { 380 // Shard group range is [start, end) so add one to the max time. 381 sgi.EndTime = time.Unix(0, models.MaxNanoTime+1) 382 } 383 384 data.MaxShardID++ 385 sgi.Shards = []ShardInfo{ 386 {ID: data.MaxShardID}, 387 } 388 389 // Retention policy has a new shard group, so update the policy. Shard 390 // Groups must be stored in sorted order, as other parts of the system 391 // assume this to be the case. 392 rpi.ShardGroups = append(rpi.ShardGroups, sgi) 393 sort.Sort(ShardGroupInfos(rpi.ShardGroups)) 394 395 return nil 396} 397 398// DeleteShardGroup removes a shard group from a database and retention policy by id. 399func (data *Data) DeleteShardGroup(database, policy string, id uint64) error { 400 // Find retention policy. 401 rpi, err := data.RetentionPolicy(database, policy) 402 if err != nil { 403 return err 404 } else if rpi == nil { 405 return influxdb.ErrRetentionPolicyNotFound(policy) 406 } 407 408 // Find shard group by ID and set its deletion timestamp. 409 for i := range rpi.ShardGroups { 410 if rpi.ShardGroups[i].ID == id { 411 rpi.ShardGroups[i].DeletedAt = time.Now().UTC() 412 return nil 413 } 414 } 415 416 return ErrShardGroupNotFound 417} 418 419// CreateContinuousQuery adds a named continuous query to a database. 420func (data *Data) CreateContinuousQuery(database, name, query string) error { 421 di := data.Database(database) 422 if di == nil { 423 return influxdb.ErrDatabaseNotFound(database) 424 } 425 426 // Ensure the name doesn't already exist. 427 for _, cq := range di.ContinuousQueries { 428 if cq.Name == name { 429 // If the query string is the same, we'll silently return, 430 // otherwise we'll assume the user might be trying to 431 // overwrite an existing CQ with a different query. 432 if strings.ToLower(cq.Query) == strings.ToLower(query) { 433 return nil 434 } 435 return ErrContinuousQueryExists 436 } 437 } 438 439 // Append new query. 440 di.ContinuousQueries = append(di.ContinuousQueries, ContinuousQueryInfo{ 441 Name: name, 442 Query: query, 443 }) 444 445 return nil 446} 447 448// DropContinuousQuery removes a continuous query. 449func (data *Data) DropContinuousQuery(database, name string) error { 450 di := data.Database(database) 451 if di == nil { 452 return nil 453 } 454 455 for i := range di.ContinuousQueries { 456 if di.ContinuousQueries[i].Name == name { 457 di.ContinuousQueries = append(di.ContinuousQueries[:i], di.ContinuousQueries[i+1:]...) 458 return nil 459 } 460 } 461 return nil 462} 463 464// validateURL returns an error if the URL does not have a port or uses a scheme other than UDP or HTTP. 465func validateURL(input string) error { 466 u, err := url.Parse(input) 467 if err != nil { 468 return ErrInvalidSubscriptionURL(input) 469 } 470 471 if u.Scheme != "udp" && u.Scheme != "http" && u.Scheme != "https" { 472 return ErrInvalidSubscriptionURL(input) 473 } 474 475 _, port, err := net.SplitHostPort(u.Host) 476 if err != nil || port == "" { 477 return ErrInvalidSubscriptionURL(input) 478 } 479 480 return nil 481} 482 483// CreateSubscription adds a named subscription to a database and retention policy. 484func (data *Data) CreateSubscription(database, rp, name, mode string, destinations []string) error { 485 for _, d := range destinations { 486 if err := validateURL(d); err != nil { 487 return err 488 } 489 } 490 491 rpi, err := data.RetentionPolicy(database, rp) 492 if err != nil { 493 return err 494 } else if rpi == nil { 495 return influxdb.ErrRetentionPolicyNotFound(rp) 496 } 497 498 // Ensure the name doesn't already exist. 499 for i := range rpi.Subscriptions { 500 if rpi.Subscriptions[i].Name == name { 501 return ErrSubscriptionExists 502 } 503 } 504 505 // Append new query. 506 rpi.Subscriptions = append(rpi.Subscriptions, SubscriptionInfo{ 507 Name: name, 508 Mode: mode, 509 Destinations: destinations, 510 }) 511 512 return nil 513} 514 515// DropSubscription removes a subscription. 516func (data *Data) DropSubscription(database, rp, name string) error { 517 rpi, err := data.RetentionPolicy(database, rp) 518 if err != nil { 519 return err 520 } else if rpi == nil { 521 return influxdb.ErrRetentionPolicyNotFound(rp) 522 } 523 524 for i := range rpi.Subscriptions { 525 if rpi.Subscriptions[i].Name == name { 526 rpi.Subscriptions = append(rpi.Subscriptions[:i], rpi.Subscriptions[i+1:]...) 527 return nil 528 } 529 } 530 return ErrSubscriptionNotFound 531} 532 533func (data *Data) user(username string) *UserInfo { 534 for i := range data.Users { 535 if data.Users[i].Name == username { 536 return &data.Users[i] 537 } 538 } 539 return nil 540} 541 542// User returns a user by username. 543func (data *Data) User(username string) User { 544 u := data.user(username) 545 if u == nil { 546 // prevent non-nil interface with nil pointer 547 return nil 548 } 549 return u 550} 551 552// CreateUser creates a new user. 553func (data *Data) CreateUser(name, hash string, admin bool) error { 554 // Ensure the user doesn't already exist. 555 if name == "" { 556 return ErrUsernameRequired 557 } else if data.User(name) != nil { 558 return ErrUserExists 559 } 560 561 // Append new user. 562 data.Users = append(data.Users, UserInfo{ 563 Name: name, 564 Hash: hash, 565 Admin: admin, 566 }) 567 568 // We know there is now at least one admin user. 569 if admin { 570 data.adminUserExists = true 571 } 572 573 return nil 574} 575 576// DropUser removes an existing user by name. 577func (data *Data) DropUser(name string) error { 578 for i := range data.Users { 579 if data.Users[i].Name == name { 580 wasAdmin := data.Users[i].Admin 581 data.Users = append(data.Users[:i], data.Users[i+1:]...) 582 583 // Maybe we dropped the only admin user? 584 if wasAdmin { 585 data.adminUserExists = data.hasAdminUser() 586 } 587 return nil 588 } 589 } 590 591 return ErrUserNotFound 592} 593 594// UpdateUser updates the password hash of an existing user. 595func (data *Data) UpdateUser(name, hash string) error { 596 for i := range data.Users { 597 if data.Users[i].Name == name { 598 data.Users[i].Hash = hash 599 return nil 600 } 601 } 602 return ErrUserNotFound 603} 604 605// CloneUsers returns a copy of the user infos. 606func (data *Data) CloneUsers() []UserInfo { 607 if len(data.Users) == 0 { 608 return []UserInfo{} 609 } 610 users := make([]UserInfo, len(data.Users)) 611 for i := range data.Users { 612 users[i] = data.Users[i].clone() 613 } 614 615 return users 616} 617 618// SetPrivilege sets a privilege for a user on a database. 619func (data *Data) SetPrivilege(name, database string, p influxql.Privilege) error { 620 ui := data.user(name) 621 if ui == nil { 622 return ErrUserNotFound 623 } 624 625 if data.Database(database) == nil { 626 return influxdb.ErrDatabaseNotFound(database) 627 } 628 629 if ui.Privileges == nil { 630 ui.Privileges = make(map[string]influxql.Privilege) 631 } 632 ui.Privileges[database] = p 633 634 return nil 635} 636 637// SetAdminPrivilege sets the admin privilege for a user. 638func (data *Data) SetAdminPrivilege(name string, admin bool) error { 639 ui := data.user(name) 640 if ui == nil { 641 return ErrUserNotFound 642 } 643 644 ui.Admin = admin 645 646 // We could have promoted or revoked the only admin. Check if an admin 647 // user exists. 648 data.adminUserExists = data.hasAdminUser() 649 return nil 650} 651 652// AdminUserExists returns true if an admin user exists. 653func (data Data) AdminUserExists() bool { 654 return data.adminUserExists 655} 656 657// UserPrivileges gets the privileges for a user. 658func (data *Data) UserPrivileges(name string) (map[string]influxql.Privilege, error) { 659 ui := data.user(name) 660 if ui == nil { 661 return nil, ErrUserNotFound 662 } 663 664 return ui.Privileges, nil 665} 666 667// UserPrivilege gets the privilege for a user on a database. 668func (data *Data) UserPrivilege(name, database string) (*influxql.Privilege, error) { 669 ui := data.user(name) 670 if ui == nil { 671 return nil, ErrUserNotFound 672 } 673 674 for db, p := range ui.Privileges { 675 if db == database { 676 return &p, nil 677 } 678 } 679 680 return influxql.NewPrivilege(influxql.NoPrivileges), nil 681} 682 683// Clone returns a copy of data with a new version. 684func (data *Data) Clone() *Data { 685 other := *data 686 687 other.Databases = data.CloneDatabases() 688 other.Users = data.CloneUsers() 689 690 return &other 691} 692 693// marshal serializes data to a protobuf representation. 694func (data *Data) marshal() *internal.Data { 695 pb := &internal.Data{ 696 Term: proto.Uint64(data.Term), 697 Index: proto.Uint64(data.Index), 698 ClusterID: proto.Uint64(data.ClusterID), 699 700 MaxShardGroupID: proto.Uint64(data.MaxShardGroupID), 701 MaxShardID: proto.Uint64(data.MaxShardID), 702 703 // Need this for reverse compatibility 704 MaxNodeID: proto.Uint64(0), 705 } 706 707 pb.Databases = make([]*internal.DatabaseInfo, len(data.Databases)) 708 for i := range data.Databases { 709 pb.Databases[i] = data.Databases[i].marshal() 710 } 711 712 pb.Users = make([]*internal.UserInfo, len(data.Users)) 713 for i := range data.Users { 714 pb.Users[i] = data.Users[i].marshal() 715 } 716 717 return pb 718} 719 720// unmarshal deserializes from a protobuf representation. 721func (data *Data) unmarshal(pb *internal.Data) { 722 data.Term = pb.GetTerm() 723 data.Index = pb.GetIndex() 724 data.ClusterID = pb.GetClusterID() 725 726 data.MaxShardGroupID = pb.GetMaxShardGroupID() 727 data.MaxShardID = pb.GetMaxShardID() 728 729 data.Databases = make([]DatabaseInfo, len(pb.GetDatabases())) 730 for i, x := range pb.GetDatabases() { 731 data.Databases[i].unmarshal(x) 732 } 733 734 data.Users = make([]UserInfo, len(pb.GetUsers())) 735 for i, x := range pb.GetUsers() { 736 data.Users[i].unmarshal(x) 737 } 738 739 // Exhaustively determine if there is an admin user. The marshalled cache 740 // value may not be correct. 741 data.adminUserExists = data.hasAdminUser() 742} 743 744// MarshalBinary encodes the metadata to a binary format. 745func (data *Data) MarshalBinary() ([]byte, error) { 746 return proto.Marshal(data.marshal()) 747} 748 749// UnmarshalBinary decodes the object from a binary format. 750func (data *Data) UnmarshalBinary(buf []byte) error { 751 var pb internal.Data 752 if err := proto.Unmarshal(buf, &pb); err != nil { 753 return err 754 } 755 data.unmarshal(&pb) 756 return nil 757} 758 759// TruncateShardGroups truncates any shard group that could contain timestamps beyond t. 760func (data *Data) TruncateShardGroups(t time.Time) { 761 for i := range data.Databases { 762 dbi := &data.Databases[i] 763 764 for j := range dbi.RetentionPolicies { 765 rpi := &dbi.RetentionPolicies[j] 766 767 for k := range rpi.ShardGroups { 768 sgi := &rpi.ShardGroups[k] 769 770 if !t.Before(sgi.EndTime) || sgi.Deleted() || (sgi.Truncated() && sgi.TruncatedAt.Before(t)) { 771 continue 772 } 773 774 if !t.After(sgi.StartTime) { 775 // future shardgroup 776 sgi.TruncatedAt = sgi.StartTime 777 } else { 778 sgi.TruncatedAt = t 779 } 780 } 781 } 782 } 783} 784 785// hasAdminUser exhaustively checks for the presence of at least one admin 786// user. 787func (data *Data) hasAdminUser() bool { 788 for _, u := range data.Users { 789 if u.Admin { 790 return true 791 } 792 } 793 return false 794} 795 796// ImportData imports selected data into the current metadata. 797// if non-empty, backupDBName, restoreDBName, backupRPName, restoreRPName can be used to select DB metadata from other, 798// and to assign a new name to the imported data. Returns a map of shard ID's in the old metadata to new shard ID's 799// in the new metadata, along with a list of new databases created, both of which can assist in the import of existing 800// shard data during a database restore. 801func (data *Data) ImportData(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string) (map[uint64]uint64, []string, error) { 802 shardIDMap := make(map[uint64]uint64) 803 if backupDBName != "" { 804 dbName, err := data.importOneDB(other, backupDBName, restoreDBName, backupRPName, restoreRPName, shardIDMap) 805 if err != nil { 806 return nil, nil, err 807 } 808 809 return shardIDMap, []string{dbName}, nil 810 } 811 812 // if no backupDBName then we'll try to import all the DB's. If one of them fails, we'll mark the whole 813 // operation a failure and return an error. 814 var newDBs []string 815 for _, dbi := range other.Databases { 816 if dbi.Name == "_internal" { 817 continue 818 } 819 dbName, err := data.importOneDB(other, dbi.Name, "", "", "", shardIDMap) 820 if err != nil { 821 return nil, nil, err 822 } 823 newDBs = append(newDBs, dbName) 824 } 825 return shardIDMap, newDBs, nil 826} 827 828// importOneDB imports a single database/rp from an external metadata object, renaming them if new names are provided. 829func (data *Data) importOneDB(other Data, backupDBName, restoreDBName, backupRPName, restoreRPName string, shardIDMap map[uint64]uint64) (string, error) { 830 831 dbPtr := other.Database(backupDBName) 832 if dbPtr == nil { 833 return "", fmt.Errorf("imported metadata does not have datbase named %s", backupDBName) 834 } 835 836 if restoreDBName == "" { 837 restoreDBName = backupDBName 838 } 839 840 if data.Database(restoreDBName) != nil { 841 return "", errors.New("database already exists") 842 } 843 844 // change the names if we want/need to 845 err := data.CreateDatabase(restoreDBName) 846 if err != nil { 847 return "", err 848 } 849 dbImport := data.Database(restoreDBName) 850 851 if backupRPName != "" { 852 rpPtr := dbPtr.RetentionPolicy(backupRPName) 853 854 if rpPtr != nil { 855 rpImport := rpPtr.clone() 856 if restoreRPName == "" { 857 restoreRPName = backupRPName 858 } 859 rpImport.Name = restoreRPName 860 dbImport.RetentionPolicies = []RetentionPolicyInfo{rpImport} 861 dbImport.DefaultRetentionPolicy = restoreRPName 862 } else { 863 return "", fmt.Errorf("retention Policy not found in meta backup: %s.%s", backupDBName, backupRPName) 864 } 865 866 } else { // import all RP's without renaming 867 dbImport.DefaultRetentionPolicy = dbPtr.DefaultRetentionPolicy 868 if dbPtr.RetentionPolicies != nil { 869 dbImport.RetentionPolicies = make([]RetentionPolicyInfo, len(dbPtr.RetentionPolicies)) 870 for i := range dbPtr.RetentionPolicies { 871 dbImport.RetentionPolicies[i] = dbPtr.RetentionPolicies[i].clone() 872 } 873 } 874 875 } 876 877 // renumber the shard groups and shards for the new retention policy(ies) 878 for _, rpImport := range dbImport.RetentionPolicies { 879 for j, sgImport := range rpImport.ShardGroups { 880 data.MaxShardGroupID++ 881 rpImport.ShardGroups[j].ID = data.MaxShardGroupID 882 for k := range sgImport.Shards { 883 data.MaxShardID++ 884 shardIDMap[sgImport.Shards[k].ID] = data.MaxShardID 885 sgImport.Shards[k].ID = data.MaxShardID 886 // OSS doesn't use Owners but if we are importing this from Enterprise, we'll want to clear it out 887 // to avoid any issues if they ever export this DB again to bring back to Enterprise. 888 sgImport.Shards[k].Owners = []ShardOwner{} 889 } 890 } 891 } 892 893 return restoreDBName, nil 894} 895 896// NodeInfo represents information about a single node in the cluster. 897type NodeInfo struct { 898 ID uint64 899 Host string 900 TCPHost string 901} 902 903// NodeInfos is a slice of NodeInfo used for sorting 904type NodeInfos []NodeInfo 905 906// Len implements sort.Interface. 907func (n NodeInfos) Len() int { return len(n) } 908 909// Swap implements sort.Interface. 910func (n NodeInfos) Swap(i, j int) { n[i], n[j] = n[j], n[i] } 911 912// Less implements sort.Interface. 913func (n NodeInfos) Less(i, j int) bool { return n[i].ID < n[j].ID } 914 915// DatabaseInfo represents information about a database in the system. 916type DatabaseInfo struct { 917 Name string 918 DefaultRetentionPolicy string 919 RetentionPolicies []RetentionPolicyInfo 920 ContinuousQueries []ContinuousQueryInfo 921} 922 923// RetentionPolicy returns a retention policy by name. 924func (di DatabaseInfo) RetentionPolicy(name string) *RetentionPolicyInfo { 925 if name == "" { 926 if di.DefaultRetentionPolicy == "" { 927 return nil 928 } 929 name = di.DefaultRetentionPolicy 930 } 931 932 for i := range di.RetentionPolicies { 933 if di.RetentionPolicies[i].Name == name { 934 return &di.RetentionPolicies[i] 935 } 936 } 937 return nil 938} 939 940// ShardInfos returns a list of all shards' info for the database. 941func (di DatabaseInfo) ShardInfos() []ShardInfo { 942 shards := map[uint64]*ShardInfo{} 943 for i := range di.RetentionPolicies { 944 for j := range di.RetentionPolicies[i].ShardGroups { 945 sg := di.RetentionPolicies[i].ShardGroups[j] 946 // Skip deleted shard groups 947 if sg.Deleted() { 948 continue 949 } 950 for k := range sg.Shards { 951 si := &di.RetentionPolicies[i].ShardGroups[j].Shards[k] 952 shards[si.ID] = si 953 } 954 } 955 } 956 957 infos := make([]ShardInfo, 0, len(shards)) 958 for _, info := range shards { 959 infos = append(infos, *info) 960 } 961 962 return infos 963} 964 965// clone returns a deep copy of di. 966func (di DatabaseInfo) clone() DatabaseInfo { 967 other := di 968 969 if di.RetentionPolicies != nil { 970 other.RetentionPolicies = make([]RetentionPolicyInfo, len(di.RetentionPolicies)) 971 for i := range di.RetentionPolicies { 972 other.RetentionPolicies[i] = di.RetentionPolicies[i].clone() 973 } 974 } 975 976 // Copy continuous queries. 977 if di.ContinuousQueries != nil { 978 other.ContinuousQueries = make([]ContinuousQueryInfo, len(di.ContinuousQueries)) 979 for i := range di.ContinuousQueries { 980 other.ContinuousQueries[i] = di.ContinuousQueries[i].clone() 981 } 982 } 983 984 return other 985} 986 987// marshal serializes to a protobuf representation. 988func (di DatabaseInfo) marshal() *internal.DatabaseInfo { 989 pb := &internal.DatabaseInfo{} 990 pb.Name = proto.String(di.Name) 991 pb.DefaultRetentionPolicy = proto.String(di.DefaultRetentionPolicy) 992 993 pb.RetentionPolicies = make([]*internal.RetentionPolicyInfo, len(di.RetentionPolicies)) 994 for i := range di.RetentionPolicies { 995 pb.RetentionPolicies[i] = di.RetentionPolicies[i].marshal() 996 } 997 998 pb.ContinuousQueries = make([]*internal.ContinuousQueryInfo, len(di.ContinuousQueries)) 999 for i := range di.ContinuousQueries { 1000 pb.ContinuousQueries[i] = di.ContinuousQueries[i].marshal() 1001 } 1002 return pb 1003} 1004 1005// unmarshal deserializes from a protobuf representation. 1006func (di *DatabaseInfo) unmarshal(pb *internal.DatabaseInfo) { 1007 di.Name = pb.GetName() 1008 di.DefaultRetentionPolicy = pb.GetDefaultRetentionPolicy() 1009 1010 if len(pb.GetRetentionPolicies()) > 0 { 1011 di.RetentionPolicies = make([]RetentionPolicyInfo, len(pb.GetRetentionPolicies())) 1012 for i, x := range pb.GetRetentionPolicies() { 1013 di.RetentionPolicies[i].unmarshal(x) 1014 } 1015 } 1016 1017 if len(pb.GetContinuousQueries()) > 0 { 1018 di.ContinuousQueries = make([]ContinuousQueryInfo, len(pb.GetContinuousQueries())) 1019 for i, x := range pb.GetContinuousQueries() { 1020 di.ContinuousQueries[i].unmarshal(x) 1021 } 1022 } 1023} 1024 1025// RetentionPolicySpec represents the specification for a new retention policy. 1026type RetentionPolicySpec struct { 1027 Name string 1028 ReplicaN *int 1029 Duration *time.Duration 1030 ShardGroupDuration time.Duration 1031} 1032 1033// NewRetentionPolicyInfo creates a new retention policy info from the specification. 1034func (s *RetentionPolicySpec) NewRetentionPolicyInfo() *RetentionPolicyInfo { 1035 return DefaultRetentionPolicyInfo().Apply(s) 1036} 1037 1038// Matches checks if this retention policy specification matches 1039// an existing retention policy. 1040func (s *RetentionPolicySpec) Matches(rpi *RetentionPolicyInfo) bool { 1041 if rpi == nil { 1042 return false 1043 } else if s.Name != "" && s.Name != rpi.Name { 1044 return false 1045 } else if s.Duration != nil && *s.Duration != rpi.Duration { 1046 return false 1047 } else if s.ReplicaN != nil && *s.ReplicaN != rpi.ReplicaN { 1048 return false 1049 } 1050 1051 // Normalise ShardDuration before comparing to any existing retention policies. 1052 // Normalize with the retention policy info's duration instead of the spec 1053 // since they should be the same and we're performing a comparison. 1054 sgDuration := normalisedShardDuration(s.ShardGroupDuration, rpi.Duration) 1055 return sgDuration == rpi.ShardGroupDuration 1056} 1057 1058// marshal serializes to a protobuf representation. 1059func (s *RetentionPolicySpec) marshal() *internal.RetentionPolicySpec { 1060 pb := &internal.RetentionPolicySpec{} 1061 if s.Name != "" { 1062 pb.Name = proto.String(s.Name) 1063 } 1064 if s.Duration != nil { 1065 pb.Duration = proto.Int64(int64(*s.Duration)) 1066 } 1067 if s.ShardGroupDuration > 0 { 1068 pb.ShardGroupDuration = proto.Int64(int64(s.ShardGroupDuration)) 1069 } 1070 if s.ReplicaN != nil { 1071 pb.ReplicaN = proto.Uint32(uint32(*s.ReplicaN)) 1072 } 1073 return pb 1074} 1075 1076// unmarshal deserializes from a protobuf representation. 1077func (s *RetentionPolicySpec) unmarshal(pb *internal.RetentionPolicySpec) { 1078 if pb.Name != nil { 1079 s.Name = pb.GetName() 1080 } 1081 if pb.Duration != nil { 1082 duration := time.Duration(pb.GetDuration()) 1083 s.Duration = &duration 1084 } 1085 if pb.ShardGroupDuration != nil { 1086 s.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration()) 1087 } 1088 if pb.ReplicaN != nil { 1089 replicaN := int(pb.GetReplicaN()) 1090 s.ReplicaN = &replicaN 1091 } 1092} 1093 1094// MarshalBinary encodes RetentionPolicySpec to a binary format. 1095func (s *RetentionPolicySpec) MarshalBinary() ([]byte, error) { 1096 return proto.Marshal(s.marshal()) 1097} 1098 1099// UnmarshalBinary decodes RetentionPolicySpec from a binary format. 1100func (s *RetentionPolicySpec) UnmarshalBinary(data []byte) error { 1101 var pb internal.RetentionPolicySpec 1102 if err := proto.Unmarshal(data, &pb); err != nil { 1103 return err 1104 } 1105 s.unmarshal(&pb) 1106 return nil 1107} 1108 1109// RetentionPolicyInfo represents metadata about a retention policy. 1110type RetentionPolicyInfo struct { 1111 Name string 1112 ReplicaN int 1113 Duration time.Duration 1114 ShardGroupDuration time.Duration 1115 ShardGroups []ShardGroupInfo 1116 Subscriptions []SubscriptionInfo 1117} 1118 1119// NewRetentionPolicyInfo returns a new instance of RetentionPolicyInfo 1120// with default replication and duration. 1121func NewRetentionPolicyInfo(name string) *RetentionPolicyInfo { 1122 return &RetentionPolicyInfo{ 1123 Name: name, 1124 ReplicaN: DefaultRetentionPolicyReplicaN, 1125 Duration: DefaultRetentionPolicyDuration, 1126 } 1127} 1128 1129// DefaultRetentionPolicyInfo returns a new instance of RetentionPolicyInfo 1130// with default name, replication, and duration. 1131func DefaultRetentionPolicyInfo() *RetentionPolicyInfo { 1132 return NewRetentionPolicyInfo(DefaultRetentionPolicyName) 1133} 1134 1135// Apply applies a specification to the retention policy info. 1136func (rpi *RetentionPolicyInfo) Apply(spec *RetentionPolicySpec) *RetentionPolicyInfo { 1137 rp := &RetentionPolicyInfo{ 1138 Name: rpi.Name, 1139 ReplicaN: rpi.ReplicaN, 1140 Duration: rpi.Duration, 1141 ShardGroupDuration: rpi.ShardGroupDuration, 1142 } 1143 if spec.Name != "" { 1144 rp.Name = spec.Name 1145 } 1146 if spec.ReplicaN != nil { 1147 rp.ReplicaN = *spec.ReplicaN 1148 } 1149 if spec.Duration != nil { 1150 rp.Duration = *spec.Duration 1151 } 1152 rp.ShardGroupDuration = normalisedShardDuration(spec.ShardGroupDuration, rp.Duration) 1153 return rp 1154} 1155 1156// ShardGroupByTimestamp returns the shard group in the policy that contains the timestamp, 1157// or nil if no shard group matches. 1158func (rpi *RetentionPolicyInfo) ShardGroupByTimestamp(timestamp time.Time) *ShardGroupInfo { 1159 for i := range rpi.ShardGroups { 1160 sgi := &rpi.ShardGroups[i] 1161 if sgi.Contains(timestamp) && !sgi.Deleted() && (!sgi.Truncated() || timestamp.Before(sgi.TruncatedAt)) { 1162 return &rpi.ShardGroups[i] 1163 } 1164 } 1165 1166 return nil 1167} 1168 1169// ExpiredShardGroups returns the Shard Groups which are considered expired, for the given time. 1170func (rpi *RetentionPolicyInfo) ExpiredShardGroups(t time.Time) []*ShardGroupInfo { 1171 var groups = make([]*ShardGroupInfo, 0) 1172 for i := range rpi.ShardGroups { 1173 if rpi.ShardGroups[i].Deleted() { 1174 continue 1175 } 1176 if rpi.Duration != 0 && rpi.ShardGroups[i].EndTime.Add(rpi.Duration).Before(t) { 1177 groups = append(groups, &rpi.ShardGroups[i]) 1178 } 1179 } 1180 return groups 1181} 1182 1183// DeletedShardGroups returns the Shard Groups which are marked as deleted. 1184func (rpi *RetentionPolicyInfo) DeletedShardGroups() []*ShardGroupInfo { 1185 var groups = make([]*ShardGroupInfo, 0) 1186 for i := range rpi.ShardGroups { 1187 if rpi.ShardGroups[i].Deleted() { 1188 groups = append(groups, &rpi.ShardGroups[i]) 1189 } 1190 } 1191 return groups 1192} 1193 1194// marshal serializes to a protobuf representation. 1195func (rpi *RetentionPolicyInfo) marshal() *internal.RetentionPolicyInfo { 1196 pb := &internal.RetentionPolicyInfo{ 1197 Name: proto.String(rpi.Name), 1198 ReplicaN: proto.Uint32(uint32(rpi.ReplicaN)), 1199 Duration: proto.Int64(int64(rpi.Duration)), 1200 ShardGroupDuration: proto.Int64(int64(rpi.ShardGroupDuration)), 1201 } 1202 1203 pb.ShardGroups = make([]*internal.ShardGroupInfo, len(rpi.ShardGroups)) 1204 for i, sgi := range rpi.ShardGroups { 1205 pb.ShardGroups[i] = sgi.marshal() 1206 } 1207 1208 pb.Subscriptions = make([]*internal.SubscriptionInfo, len(rpi.Subscriptions)) 1209 for i, sub := range rpi.Subscriptions { 1210 pb.Subscriptions[i] = sub.marshal() 1211 } 1212 1213 return pb 1214} 1215 1216// unmarshal deserializes from a protobuf representation. 1217func (rpi *RetentionPolicyInfo) unmarshal(pb *internal.RetentionPolicyInfo) { 1218 rpi.Name = pb.GetName() 1219 rpi.ReplicaN = int(pb.GetReplicaN()) 1220 rpi.Duration = time.Duration(pb.GetDuration()) 1221 rpi.ShardGroupDuration = time.Duration(pb.GetShardGroupDuration()) 1222 1223 if len(pb.GetShardGroups()) > 0 { 1224 rpi.ShardGroups = make([]ShardGroupInfo, len(pb.GetShardGroups())) 1225 for i, x := range pb.GetShardGroups() { 1226 rpi.ShardGroups[i].unmarshal(x) 1227 } 1228 } 1229 if len(pb.GetSubscriptions()) > 0 { 1230 rpi.Subscriptions = make([]SubscriptionInfo, len(pb.GetSubscriptions())) 1231 for i, x := range pb.GetSubscriptions() { 1232 rpi.Subscriptions[i].unmarshal(x) 1233 } 1234 } 1235} 1236 1237// clone returns a deep copy of rpi. 1238func (rpi RetentionPolicyInfo) clone() RetentionPolicyInfo { 1239 other := rpi 1240 1241 if rpi.ShardGroups != nil { 1242 other.ShardGroups = make([]ShardGroupInfo, len(rpi.ShardGroups)) 1243 for i := range rpi.ShardGroups { 1244 other.ShardGroups[i] = rpi.ShardGroups[i].clone() 1245 } 1246 } 1247 1248 return other 1249} 1250 1251// MarshalBinary encodes rpi to a binary format. 1252func (rpi *RetentionPolicyInfo) MarshalBinary() ([]byte, error) { 1253 return proto.Marshal(rpi.marshal()) 1254} 1255 1256// UnmarshalBinary decodes rpi from a binary format. 1257func (rpi *RetentionPolicyInfo) UnmarshalBinary(data []byte) error { 1258 var pb internal.RetentionPolicyInfo 1259 if err := proto.Unmarshal(data, &pb); err != nil { 1260 return err 1261 } 1262 rpi.unmarshal(&pb) 1263 return nil 1264} 1265 1266// shardGroupDuration returns the default duration for a shard group based on a policy duration. 1267func shardGroupDuration(d time.Duration) time.Duration { 1268 if d >= 180*24*time.Hour || d == 0 { // 6 months or 0 1269 return 7 * 24 * time.Hour 1270 } else if d >= 2*24*time.Hour { // 2 days 1271 return 1 * 24 * time.Hour 1272 } 1273 return 1 * time.Hour 1274} 1275 1276// normalisedShardDuration returns normalised shard duration based on a policy duration. 1277func normalisedShardDuration(sgd, d time.Duration) time.Duration { 1278 // If it is zero, it likely wasn't specified, so we default to the shard group duration 1279 if sgd == 0 { 1280 return shardGroupDuration(d) 1281 } 1282 // If it was specified, but it's less than the MinRetentionPolicyDuration, then normalize 1283 // to the MinRetentionPolicyDuration 1284 if sgd < MinRetentionPolicyDuration { 1285 return shardGroupDuration(MinRetentionPolicyDuration) 1286 } 1287 return sgd 1288} 1289 1290// ShardGroupInfo represents metadata about a shard group. The DeletedAt field is important 1291// because it makes it clear that a ShardGroup has been marked as deleted, and allow the system 1292// to be sure that a ShardGroup is not simply missing. If the DeletedAt is set, the system can 1293// safely delete any associated shards. 1294type ShardGroupInfo struct { 1295 ID uint64 1296 StartTime time.Time 1297 EndTime time.Time 1298 DeletedAt time.Time 1299 Shards []ShardInfo 1300 TruncatedAt time.Time 1301} 1302 1303// ShardGroupInfos implements sort.Interface on []ShardGroupInfo, based 1304// on the StartTime field. 1305type ShardGroupInfos []ShardGroupInfo 1306 1307// Len implements sort.Interface. 1308func (a ShardGroupInfos) Len() int { return len(a) } 1309 1310// Swap implements sort.Interface. 1311func (a ShardGroupInfos) Swap(i, j int) { a[i], a[j] = a[j], a[i] } 1312 1313// Less implements sort.Interface. 1314func (a ShardGroupInfos) Less(i, j int) bool { 1315 iEnd := a[i].EndTime 1316 if a[i].Truncated() { 1317 iEnd = a[i].TruncatedAt 1318 } 1319 1320 jEnd := a[j].EndTime 1321 if a[j].Truncated() { 1322 jEnd = a[j].TruncatedAt 1323 } 1324 1325 if iEnd.Equal(jEnd) { 1326 return a[i].StartTime.Before(a[j].StartTime) 1327 } 1328 1329 return iEnd.Before(jEnd) 1330} 1331 1332// Contains returns true iif StartTime ≤ t < EndTime. 1333func (sgi *ShardGroupInfo) Contains(t time.Time) bool { 1334 return !t.Before(sgi.StartTime) && t.Before(sgi.EndTime) 1335} 1336 1337// Overlaps returns whether the shard group contains data for the time range between min and max 1338func (sgi *ShardGroupInfo) Overlaps(min, max time.Time) bool { 1339 return !sgi.StartTime.After(max) && sgi.EndTime.After(min) 1340} 1341 1342// Deleted returns whether this ShardGroup has been deleted. 1343func (sgi *ShardGroupInfo) Deleted() bool { 1344 return !sgi.DeletedAt.IsZero() 1345} 1346 1347// Truncated returns true if this ShardGroup has been truncated (no new writes). 1348func (sgi *ShardGroupInfo) Truncated() bool { 1349 return !sgi.TruncatedAt.IsZero() 1350} 1351 1352// clone returns a deep copy of sgi. 1353func (sgi ShardGroupInfo) clone() ShardGroupInfo { 1354 other := sgi 1355 1356 if sgi.Shards != nil { 1357 other.Shards = make([]ShardInfo, len(sgi.Shards)) 1358 for i := range sgi.Shards { 1359 other.Shards[i] = sgi.Shards[i].clone() 1360 } 1361 } 1362 1363 return other 1364} 1365 1366// ShardFor returns the ShardInfo for a Point hash. 1367func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo { 1368 return sgi.Shards[hash%uint64(len(sgi.Shards))] 1369} 1370 1371// marshal serializes to a protobuf representation. 1372func (sgi *ShardGroupInfo) marshal() *internal.ShardGroupInfo { 1373 pb := &internal.ShardGroupInfo{ 1374 ID: proto.Uint64(sgi.ID), 1375 StartTime: proto.Int64(MarshalTime(sgi.StartTime)), 1376 EndTime: proto.Int64(MarshalTime(sgi.EndTime)), 1377 DeletedAt: proto.Int64(MarshalTime(sgi.DeletedAt)), 1378 } 1379 1380 if !sgi.TruncatedAt.IsZero() { 1381 pb.TruncatedAt = proto.Int64(MarshalTime(sgi.TruncatedAt)) 1382 } 1383 1384 pb.Shards = make([]*internal.ShardInfo, len(sgi.Shards)) 1385 for i := range sgi.Shards { 1386 pb.Shards[i] = sgi.Shards[i].marshal() 1387 } 1388 1389 return pb 1390} 1391 1392// unmarshal deserializes from a protobuf representation. 1393func (sgi *ShardGroupInfo) unmarshal(pb *internal.ShardGroupInfo) { 1394 sgi.ID = pb.GetID() 1395 if i := pb.GetStartTime(); i == 0 { 1396 sgi.StartTime = time.Unix(0, 0).UTC() 1397 } else { 1398 sgi.StartTime = UnmarshalTime(i) 1399 } 1400 if i := pb.GetEndTime(); i == 0 { 1401 sgi.EndTime = time.Unix(0, 0).UTC() 1402 } else { 1403 sgi.EndTime = UnmarshalTime(i) 1404 } 1405 sgi.DeletedAt = UnmarshalTime(pb.GetDeletedAt()) 1406 1407 if pb != nil && pb.TruncatedAt != nil { 1408 sgi.TruncatedAt = UnmarshalTime(pb.GetTruncatedAt()) 1409 } 1410 1411 if len(pb.GetShards()) > 0 { 1412 sgi.Shards = make([]ShardInfo, len(pb.GetShards())) 1413 for i, x := range pb.GetShards() { 1414 sgi.Shards[i].unmarshal(x) 1415 } 1416 } 1417} 1418 1419// ShardInfo represents metadata about a shard. 1420type ShardInfo struct { 1421 ID uint64 1422 Owners []ShardOwner 1423} 1424 1425// OwnedBy determines whether the shard's owner IDs includes nodeID. 1426func (si ShardInfo) OwnedBy(nodeID uint64) bool { 1427 for _, so := range si.Owners { 1428 if so.NodeID == nodeID { 1429 return true 1430 } 1431 } 1432 return false 1433} 1434 1435// clone returns a deep copy of si. 1436func (si ShardInfo) clone() ShardInfo { 1437 other := si 1438 1439 if si.Owners != nil { 1440 other.Owners = make([]ShardOwner, len(si.Owners)) 1441 for i := range si.Owners { 1442 other.Owners[i] = si.Owners[i].clone() 1443 } 1444 } 1445 1446 return other 1447} 1448 1449// marshal serializes to a protobuf representation. 1450func (si ShardInfo) marshal() *internal.ShardInfo { 1451 pb := &internal.ShardInfo{ 1452 ID: proto.Uint64(si.ID), 1453 } 1454 1455 pb.Owners = make([]*internal.ShardOwner, len(si.Owners)) 1456 for i := range si.Owners { 1457 pb.Owners[i] = si.Owners[i].marshal() 1458 } 1459 1460 return pb 1461} 1462 1463// UnmarshalBinary decodes the object from a binary format. 1464func (si *ShardInfo) UnmarshalBinary(buf []byte) error { 1465 var pb internal.ShardInfo 1466 if err := proto.Unmarshal(buf, &pb); err != nil { 1467 return err 1468 } 1469 si.unmarshal(&pb) 1470 return nil 1471} 1472 1473// unmarshal deserializes from a protobuf representation. 1474func (si *ShardInfo) unmarshal(pb *internal.ShardInfo) { 1475 si.ID = pb.GetID() 1476 1477 // If deprecated "OwnerIDs" exists then convert it to "Owners" format. 1478 if len(pb.GetOwnerIDs()) > 0 { 1479 si.Owners = make([]ShardOwner, len(pb.GetOwnerIDs())) 1480 for i, x := range pb.GetOwnerIDs() { 1481 si.Owners[i].unmarshal(&internal.ShardOwner{ 1482 NodeID: proto.Uint64(x), 1483 }) 1484 } 1485 } else if len(pb.GetOwners()) > 0 { 1486 si.Owners = make([]ShardOwner, len(pb.GetOwners())) 1487 for i, x := range pb.GetOwners() { 1488 si.Owners[i].unmarshal(x) 1489 } 1490 } 1491} 1492 1493// SubscriptionInfo holds the subscription information. 1494type SubscriptionInfo struct { 1495 Name string 1496 Mode string 1497 Destinations []string 1498} 1499 1500// marshal serializes to a protobuf representation. 1501func (si SubscriptionInfo) marshal() *internal.SubscriptionInfo { 1502 pb := &internal.SubscriptionInfo{ 1503 Name: proto.String(si.Name), 1504 Mode: proto.String(si.Mode), 1505 } 1506 1507 pb.Destinations = make([]string, len(si.Destinations)) 1508 for i := range si.Destinations { 1509 pb.Destinations[i] = si.Destinations[i] 1510 } 1511 return pb 1512} 1513 1514// unmarshal deserializes from a protobuf representation. 1515func (si *SubscriptionInfo) unmarshal(pb *internal.SubscriptionInfo) { 1516 si.Name = pb.GetName() 1517 si.Mode = pb.GetMode() 1518 1519 if len(pb.GetDestinations()) > 0 { 1520 si.Destinations = make([]string, len(pb.GetDestinations())) 1521 copy(si.Destinations, pb.GetDestinations()) 1522 } 1523} 1524 1525// ShardOwner represents a node that owns a shard. 1526type ShardOwner struct { 1527 NodeID uint64 1528} 1529 1530// clone returns a deep copy of so. 1531func (so ShardOwner) clone() ShardOwner { 1532 return so 1533} 1534 1535// marshal serializes to a protobuf representation. 1536func (so ShardOwner) marshal() *internal.ShardOwner { 1537 return &internal.ShardOwner{ 1538 NodeID: proto.Uint64(so.NodeID), 1539 } 1540} 1541 1542// unmarshal deserializes from a protobuf representation. 1543func (so *ShardOwner) unmarshal(pb *internal.ShardOwner) { 1544 so.NodeID = pb.GetNodeID() 1545} 1546 1547// ContinuousQueryInfo represents metadata about a continuous query. 1548type ContinuousQueryInfo struct { 1549 Name string 1550 Query string 1551} 1552 1553// clone returns a deep copy of cqi. 1554func (cqi ContinuousQueryInfo) clone() ContinuousQueryInfo { return cqi } 1555 1556// marshal serializes to a protobuf representation. 1557func (cqi ContinuousQueryInfo) marshal() *internal.ContinuousQueryInfo { 1558 return &internal.ContinuousQueryInfo{ 1559 Name: proto.String(cqi.Name), 1560 Query: proto.String(cqi.Query), 1561 } 1562} 1563 1564// unmarshal deserializes from a protobuf representation. 1565func (cqi *ContinuousQueryInfo) unmarshal(pb *internal.ContinuousQueryInfo) { 1566 cqi.Name = pb.GetName() 1567 cqi.Query = pb.GetQuery() 1568} 1569 1570var _ query.Authorizer = (*UserInfo)(nil) 1571 1572// UserInfo represents metadata about a user in the system. 1573type UserInfo struct { 1574 // User's name. 1575 Name string 1576 1577 // Hashed password. 1578 Hash string 1579 1580 // Whether the user is an admin, i.e. allowed to do everything. 1581 Admin bool 1582 1583 // Map of database name to granted privilege. 1584 Privileges map[string]influxql.Privilege 1585} 1586 1587type User interface { 1588 query.Authorizer 1589 ID() string 1590 AuthorizeUnrestricted() bool 1591} 1592 1593func (u *UserInfo) ID() string { 1594 return u.Name 1595} 1596 1597// AuthorizeDatabase returns true if the user is authorized for the given privilege on the given database. 1598func (ui *UserInfo) AuthorizeDatabase(privilege influxql.Privilege, database string) bool { 1599 if ui.Admin || privilege == influxql.NoPrivileges { 1600 return true 1601 } 1602 p, ok := ui.Privileges[database] 1603 return ok && (p == privilege || p == influxql.AllPrivileges) 1604} 1605 1606// AuthorizeSeriesRead is used to limit access per-series (enterprise only) 1607func (u *UserInfo) AuthorizeSeriesRead(database string, measurement []byte, tags models.Tags) bool { 1608 return true 1609} 1610 1611// AuthorizeSeriesWrite is used to limit access per-series (enterprise only) 1612func (u *UserInfo) AuthorizeSeriesWrite(database string, measurement []byte, tags models.Tags) bool { 1613 return true 1614} 1615 1616// AuthorizeUnrestricted allows admins to shortcut access checks. 1617func (u *UserInfo) AuthorizeUnrestricted() bool { 1618 return u.Admin 1619} 1620 1621// clone returns a deep copy of si. 1622func (ui UserInfo) clone() UserInfo { 1623 other := ui 1624 1625 if ui.Privileges != nil { 1626 other.Privileges = make(map[string]influxql.Privilege) 1627 for k, v := range ui.Privileges { 1628 other.Privileges[k] = v 1629 } 1630 } 1631 1632 return other 1633} 1634 1635// marshal serializes to a protobuf representation. 1636func (ui UserInfo) marshal() *internal.UserInfo { 1637 pb := &internal.UserInfo{ 1638 Name: proto.String(ui.Name), 1639 Hash: proto.String(ui.Hash), 1640 Admin: proto.Bool(ui.Admin), 1641 } 1642 1643 for database, privilege := range ui.Privileges { 1644 pb.Privileges = append(pb.Privileges, &internal.UserPrivilege{ 1645 Database: proto.String(database), 1646 Privilege: proto.Int32(int32(privilege)), 1647 }) 1648 } 1649 1650 return pb 1651} 1652 1653// unmarshal deserializes from a protobuf representation. 1654func (ui *UserInfo) unmarshal(pb *internal.UserInfo) { 1655 ui.Name = pb.GetName() 1656 ui.Hash = pb.GetHash() 1657 ui.Admin = pb.GetAdmin() 1658 1659 ui.Privileges = make(map[string]influxql.Privilege) 1660 for _, p := range pb.GetPrivileges() { 1661 ui.Privileges[p.GetDatabase()] = influxql.Privilege(p.GetPrivilege()) 1662 } 1663} 1664 1665// Lease represents a lease held on a resource. 1666type Lease struct { 1667 Name string `json:"name"` 1668 Expiration time.Time `json:"expiration"` 1669 Owner uint64 `json:"owner"` 1670} 1671 1672// Leases is a concurrency-safe collection of leases keyed by name. 1673type Leases struct { 1674 mu sync.Mutex 1675 m map[string]*Lease 1676 d time.Duration 1677} 1678 1679// NewLeases returns a new instance of Leases. 1680func NewLeases(d time.Duration) *Leases { 1681 return &Leases{ 1682 m: make(map[string]*Lease), 1683 d: d, 1684 } 1685} 1686 1687// Acquire acquires a lease with the given name for the given nodeID. 1688// If the lease doesn't exist or exists but is expired, a valid lease is returned. 1689// If nodeID already owns the named and unexpired lease, the lease expiration is extended. 1690// If a different node owns the lease, an error is returned. 1691func (leases *Leases) Acquire(name string, nodeID uint64) (*Lease, error) { 1692 leases.mu.Lock() 1693 defer leases.mu.Unlock() 1694 1695 l := leases.m[name] 1696 if l != nil { 1697 if time.Now().After(l.Expiration) || l.Owner == nodeID { 1698 l.Expiration = time.Now().Add(leases.d) 1699 l.Owner = nodeID 1700 return l, nil 1701 } 1702 return l, errors.New("another node has the lease") 1703 } 1704 1705 l = &Lease{ 1706 Name: name, 1707 Expiration: time.Now().Add(leases.d), 1708 Owner: nodeID, 1709 } 1710 1711 leases.m[name] = l 1712 1713 return l, nil 1714} 1715 1716// MarshalTime converts t to nanoseconds since epoch. A zero time returns 0. 1717func MarshalTime(t time.Time) int64 { 1718 if t.IsZero() { 1719 return 0 1720 } 1721 return t.UnixNano() 1722} 1723 1724// UnmarshalTime converts nanoseconds since epoch to time. 1725// A zero value returns a zero time. 1726func UnmarshalTime(v int64) time.Time { 1727 if v == 0 { 1728 return time.Time{} 1729 } 1730 return time.Unix(0, v).UTC() 1731} 1732 1733// ValidName checks to see if the given name can would be valid for DB/RP name 1734func ValidName(name string) bool { 1735 for _, r := range name { 1736 if !unicode.IsPrint(r) { 1737 return false 1738 } 1739 } 1740 1741 return name != "" && 1742 name != "." && 1743 name != ".." && 1744 !strings.ContainsAny(name, `/\`) 1745} 1746