1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package membership 16 17import ( 18 "bytes" 19 "context" 20 "crypto/sha1" 21 "encoding/binary" 22 "encoding/json" 23 "fmt" 24 "path" 25 "sort" 26 "strings" 27 "sync" 28 "time" 29 30 "go.etcd.io/etcd/etcdserver/api/v2store" 31 "go.etcd.io/etcd/mvcc/backend" 32 "go.etcd.io/etcd/pkg/netutil" 33 "go.etcd.io/etcd/pkg/types" 34 "go.etcd.io/etcd/raft" 35 "go.etcd.io/etcd/raft/raftpb" 36 "go.etcd.io/etcd/version" 37 38 "github.com/coreos/go-semver/semver" 39 "github.com/prometheus/client_golang/prometheus" 40 "go.uber.org/zap" 41) 42 43const maxLearners = 1 44 45// RaftCluster is a list of Members that belong to the same raft cluster 46type RaftCluster struct { 47 lg *zap.Logger 48 49 localID types.ID 50 cid types.ID 51 token string 52 53 v2store v2store.Store 54 be backend.Backend 55 56 sync.Mutex // guards the fields below 57 version *semver.Version 58 members map[types.ID]*Member 59 // removed contains the ids of removed members in the cluster. 60 // removed id cannot be reused. 61 removed map[types.ID]bool 62} 63 64// ConfigChangeContext represents a context for confChange. 65type ConfigChangeContext struct { 66 Member 67 // IsPromote indicates if the config change is for promoting a learner member. 68 // This flag is needed because both adding a new member and promoting a learner member 69 // uses the same config change type 'ConfChangeAddNode'. 70 IsPromote bool `json:"isPromote"` 71} 72 73// NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating 74// cluster with raft learner member. 75func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) { 76 c := NewCluster(lg, token) 77 for name, urls := range urlsmap { 78 m := NewMember(name, urls, token, nil) 79 if _, ok := c.members[m.ID]; ok { 80 return nil, fmt.Errorf("member exists with identical ID %v", m) 81 } 82 if uint64(m.ID) == raft.None { 83 return nil, fmt.Errorf("cannot use %x as member id", raft.None) 84 } 85 c.members[m.ID] = m 86 } 87 c.genID() 88 return c, nil 89} 90 91func NewClusterFromMembers(lg *zap.Logger, token string, id types.ID, membs []*Member) *RaftCluster { 92 c := NewCluster(lg, token) 93 c.cid = id 94 for _, m := range membs { 95 c.members[m.ID] = m 96 } 97 return c 98} 99 100func NewCluster(lg *zap.Logger, token string) *RaftCluster { 101 return &RaftCluster{ 102 lg: lg, 103 token: token, 104 members: make(map[types.ID]*Member), 105 removed: make(map[types.ID]bool), 106 } 107} 108 109func (c *RaftCluster) ID() types.ID { return c.cid } 110 111func (c *RaftCluster) Members() []*Member { 112 c.Lock() 113 defer c.Unlock() 114 var ms MembersByID 115 for _, m := range c.members { 116 ms = append(ms, m.Clone()) 117 } 118 sort.Sort(ms) 119 return []*Member(ms) 120} 121 122func (c *RaftCluster) Member(id types.ID) *Member { 123 c.Lock() 124 defer c.Unlock() 125 return c.members[id].Clone() 126} 127 128func (c *RaftCluster) VotingMembers() []*Member { 129 c.Lock() 130 defer c.Unlock() 131 var ms MembersByID 132 for _, m := range c.members { 133 if !m.IsLearner { 134 ms = append(ms, m.Clone()) 135 } 136 } 137 sort.Sort(ms) 138 return []*Member(ms) 139} 140 141// MemberByName returns a Member with the given name if exists. 142// If more than one member has the given name, it will panic. 143func (c *RaftCluster) MemberByName(name string) *Member { 144 c.Lock() 145 defer c.Unlock() 146 var memb *Member 147 for _, m := range c.members { 148 if m.Name == name { 149 if memb != nil { 150 if c.lg != nil { 151 c.lg.Panic("two member with same name found", zap.String("name", name)) 152 } else { 153 plog.Panicf("two members with the given name %q exist", name) 154 } 155 } 156 memb = m 157 } 158 } 159 return memb.Clone() 160} 161 162func (c *RaftCluster) MemberIDs() []types.ID { 163 c.Lock() 164 defer c.Unlock() 165 var ids []types.ID 166 for _, m := range c.members { 167 ids = append(ids, m.ID) 168 } 169 sort.Sort(types.IDSlice(ids)) 170 return ids 171} 172 173func (c *RaftCluster) IsIDRemoved(id types.ID) bool { 174 c.Lock() 175 defer c.Unlock() 176 return c.removed[id] 177} 178 179// PeerURLs returns a list of all peer addresses. 180// The returned list is sorted in ascending lexicographical order. 181func (c *RaftCluster) PeerURLs() []string { 182 c.Lock() 183 defer c.Unlock() 184 urls := make([]string, 0) 185 for _, p := range c.members { 186 urls = append(urls, p.PeerURLs...) 187 } 188 sort.Strings(urls) 189 return urls 190} 191 192// ClientURLs returns a list of all client addresses. 193// The returned list is sorted in ascending lexicographical order. 194func (c *RaftCluster) ClientURLs() []string { 195 c.Lock() 196 defer c.Unlock() 197 urls := make([]string, 0) 198 for _, p := range c.members { 199 urls = append(urls, p.ClientURLs...) 200 } 201 sort.Strings(urls) 202 return urls 203} 204 205func (c *RaftCluster) String() string { 206 c.Lock() 207 defer c.Unlock() 208 b := &bytes.Buffer{} 209 fmt.Fprintf(b, "{ClusterID:%s ", c.cid) 210 var ms []string 211 for _, m := range c.members { 212 ms = append(ms, fmt.Sprintf("%+v", m)) 213 } 214 fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " ")) 215 var ids []string 216 for id := range c.removed { 217 ids = append(ids, id.String()) 218 } 219 fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " ")) 220 return b.String() 221} 222 223func (c *RaftCluster) genID() { 224 mIDs := c.MemberIDs() 225 b := make([]byte, 8*len(mIDs)) 226 for i, id := range mIDs { 227 binary.BigEndian.PutUint64(b[8*i:], uint64(id)) 228 } 229 hash := sha1.Sum(b) 230 c.cid = types.ID(binary.BigEndian.Uint64(hash[:8])) 231} 232 233func (c *RaftCluster) SetID(localID, cid types.ID) { 234 c.localID = localID 235 c.cid = cid 236} 237 238func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st } 239 240func (c *RaftCluster) SetBackend(be backend.Backend) { 241 c.be = be 242 mustCreateBackendBuckets(c.be) 243} 244 245func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) { 246 c.Lock() 247 defer c.Unlock() 248 249 c.members, c.removed = membersFromStore(c.lg, c.v2store) 250 c.version = clusterVersionFromStore(c.lg, c.v2store) 251 mustDetectDowngrade(c.lg, c.version) 252 onSet(c.lg, c.version) 253 254 for _, m := range c.members { 255 if c.lg != nil { 256 c.lg.Info( 257 "recovered/added member from store", 258 zap.String("cluster-id", c.cid.String()), 259 zap.String("local-member-id", c.localID.String()), 260 zap.String("recovered-remote-peer-id", m.ID.String()), 261 zap.Strings("recovered-remote-peer-urls", m.PeerURLs), 262 ) 263 } else { 264 plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.cid) 265 } 266 } 267 if c.version != nil { 268 if c.lg != nil { 269 c.lg.Info( 270 "set cluster version from store", 271 zap.String("cluster-version", version.Cluster(c.version.String())), 272 ) 273 } else { 274 plog.Infof("set the cluster version to %v from store", version.Cluster(c.version.String())) 275 } 276 } 277} 278 279// ValidateConfigurationChange takes a proposed ConfChange and 280// ensures that it is still valid. 281func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { 282 members, removed := membersFromStore(c.lg, c.v2store) 283 id := types.ID(cc.NodeID) 284 if removed[id] { 285 return ErrIDRemoved 286 } 287 switch cc.Type { 288 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode: 289 confChangeContext := new(ConfigChangeContext) 290 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil { 291 if c.lg != nil { 292 c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err)) 293 } else { 294 plog.Panicf("unmarshal confChangeContext should never fail: %v", err) 295 } 296 } 297 298 if confChangeContext.IsPromote { // promoting a learner member to voting member 299 if members[id] == nil { 300 return ErrIDNotFound 301 } 302 if !members[id].IsLearner { 303 return ErrMemberNotLearner 304 } 305 } else { // adding a new member 306 if members[id] != nil { 307 return ErrIDExists 308 } 309 310 urls := make(map[string]bool) 311 for _, m := range members { 312 for _, u := range m.PeerURLs { 313 urls[u] = true 314 } 315 } 316 for _, u := range confChangeContext.Member.PeerURLs { 317 if urls[u] { 318 return ErrPeerURLexists 319 } 320 } 321 322 if confChangeContext.Member.IsLearner { // the new member is a learner 323 numLearners := 0 324 for _, m := range members { 325 if m.IsLearner { 326 numLearners++ 327 } 328 } 329 if numLearners+1 > maxLearners { 330 return ErrTooManyLearners 331 } 332 } 333 } 334 case raftpb.ConfChangeRemoveNode: 335 if members[id] == nil { 336 return ErrIDNotFound 337 } 338 339 case raftpb.ConfChangeUpdateNode: 340 if members[id] == nil { 341 return ErrIDNotFound 342 } 343 urls := make(map[string]bool) 344 for _, m := range members { 345 if m.ID == id { 346 continue 347 } 348 for _, u := range m.PeerURLs { 349 urls[u] = true 350 } 351 } 352 m := new(Member) 353 if err := json.Unmarshal(cc.Context, m); err != nil { 354 if c.lg != nil { 355 c.lg.Panic("failed to unmarshal member", zap.Error(err)) 356 } else { 357 plog.Panicf("unmarshal member should never fail: %v", err) 358 } 359 } 360 for _, u := range m.PeerURLs { 361 if urls[u] { 362 return ErrPeerURLexists 363 } 364 } 365 366 default: 367 if c.lg != nil { 368 c.lg.Panic("unknown ConfChange type", zap.String("type", cc.Type.String())) 369 } else { 370 plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode") 371 } 372 } 373 return nil 374} 375 376// AddMember adds a new Member into the cluster, and saves the given member's 377// raftAttributes into the store. The given member should have empty attributes. 378// A Member with a matching id must not exist. 379func (c *RaftCluster) AddMember(m *Member) { 380 c.Lock() 381 defer c.Unlock() 382 if c.v2store != nil { 383 mustSaveMemberToStore(c.v2store, m) 384 } 385 if c.be != nil { 386 mustSaveMemberToBackend(c.be, m) 387 } 388 389 c.members[m.ID] = m 390 391 if c.lg != nil { 392 c.lg.Info( 393 "added member", 394 zap.String("cluster-id", c.cid.String()), 395 zap.String("local-member-id", c.localID.String()), 396 zap.String("added-peer-id", m.ID.String()), 397 zap.Strings("added-peer-peer-urls", m.PeerURLs), 398 ) 399 } else { 400 plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.cid) 401 } 402} 403 404// RemoveMember removes a member from the store. 405// The given id MUST exist, or the function panics. 406func (c *RaftCluster) RemoveMember(id types.ID) { 407 c.Lock() 408 defer c.Unlock() 409 if c.v2store != nil { 410 mustDeleteMemberFromStore(c.v2store, id) 411 } 412 if c.be != nil { 413 mustDeleteMemberFromBackend(c.be, id) 414 } 415 416 m, ok := c.members[id] 417 delete(c.members, id) 418 c.removed[id] = true 419 420 if c.lg != nil { 421 if ok { 422 c.lg.Info( 423 "removed member", 424 zap.String("cluster-id", c.cid.String()), 425 zap.String("local-member-id", c.localID.String()), 426 zap.String("removed-remote-peer-id", id.String()), 427 zap.Strings("removed-remote-peer-urls", m.PeerURLs), 428 ) 429 } else { 430 c.lg.Warn( 431 "skipped removing already removed member", 432 zap.String("cluster-id", c.cid.String()), 433 zap.String("local-member-id", c.localID.String()), 434 zap.String("removed-remote-peer-id", id.String()), 435 ) 436 } 437 } else { 438 plog.Infof("removed member %s from cluster %s", id, c.cid) 439 } 440} 441 442func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) { 443 c.Lock() 444 defer c.Unlock() 445 446 if m, ok := c.members[id]; ok { 447 m.Attributes = attr 448 if c.v2store != nil { 449 mustUpdateMemberAttrInStore(c.v2store, m) 450 } 451 if c.be != nil { 452 mustSaveMemberToBackend(c.be, m) 453 } 454 return 455 } 456 457 _, ok := c.removed[id] 458 if !ok { 459 if c.lg != nil { 460 c.lg.Panic( 461 "failed to update; member unknown", 462 zap.String("cluster-id", c.cid.String()), 463 zap.String("local-member-id", c.localID.String()), 464 zap.String("unknown-remote-peer-id", id.String()), 465 ) 466 } else { 467 plog.Panicf("error updating attributes of unknown member %s", id) 468 } 469 } 470 471 if c.lg != nil { 472 c.lg.Warn( 473 "skipped attributes update of removed member", 474 zap.String("cluster-id", c.cid.String()), 475 zap.String("local-member-id", c.localID.String()), 476 zap.String("updated-peer-id", id.String()), 477 ) 478 } else { 479 plog.Warningf("skipped updating attributes of removed member %s", id) 480 } 481} 482 483// PromoteMember marks the member's IsLearner RaftAttributes to false. 484func (c *RaftCluster) PromoteMember(id types.ID) { 485 c.Lock() 486 defer c.Unlock() 487 488 c.members[id].RaftAttributes.IsLearner = false 489 if c.v2store != nil { 490 mustUpdateMemberInStore(c.v2store, c.members[id]) 491 } 492 if c.be != nil { 493 mustSaveMemberToBackend(c.be, c.members[id]) 494 } 495 496 if c.lg != nil { 497 c.lg.Info( 498 "promote member", 499 zap.String("cluster-id", c.cid.String()), 500 zap.String("local-member-id", c.localID.String()), 501 ) 502 } else { 503 plog.Noticef("promote member %s in cluster %s", id, c.cid) 504 } 505} 506 507func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) { 508 c.Lock() 509 defer c.Unlock() 510 511 c.members[id].RaftAttributes = raftAttr 512 if c.v2store != nil { 513 mustUpdateMemberInStore(c.v2store, c.members[id]) 514 } 515 if c.be != nil { 516 mustSaveMemberToBackend(c.be, c.members[id]) 517 } 518 519 if c.lg != nil { 520 c.lg.Info( 521 "updated member", 522 zap.String("cluster-id", c.cid.String()), 523 zap.String("local-member-id", c.localID.String()), 524 zap.String("updated-remote-peer-id", id.String()), 525 zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs), 526 ) 527 } else { 528 plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.cid) 529 } 530} 531 532func (c *RaftCluster) Version() *semver.Version { 533 c.Lock() 534 defer c.Unlock() 535 if c.version == nil { 536 return nil 537 } 538 return semver.Must(semver.NewVersion(c.version.String())) 539} 540 541func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version)) { 542 c.Lock() 543 defer c.Unlock() 544 if c.version != nil { 545 if c.lg != nil { 546 c.lg.Info( 547 "updated cluster version", 548 zap.String("cluster-id", c.cid.String()), 549 zap.String("local-member-id", c.localID.String()), 550 zap.String("from", version.Cluster(c.version.String())), 551 zap.String("from", version.Cluster(ver.String())), 552 ) 553 } else { 554 plog.Noticef("updated the cluster version from %v to %v", version.Cluster(c.version.String()), version.Cluster(ver.String())) 555 } 556 } else { 557 if c.lg != nil { 558 c.lg.Info( 559 "set initial cluster version", 560 zap.String("cluster-id", c.cid.String()), 561 zap.String("local-member-id", c.localID.String()), 562 zap.String("cluster-version", version.Cluster(ver.String())), 563 ) 564 } else { 565 plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String())) 566 } 567 } 568 oldVer := c.version 569 c.version = ver 570 mustDetectDowngrade(c.lg, c.version) 571 if c.v2store != nil { 572 mustSaveClusterVersionToStore(c.v2store, ver) 573 } 574 if c.be != nil { 575 mustSaveClusterVersionToBackend(c.be, ver) 576 } 577 if oldVer != nil { 578 ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0) 579 } 580 ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1) 581 onSet(c.lg, ver) 582} 583 584func (c *RaftCluster) IsReadyToAddVotingMember() bool { 585 nmembers := 1 586 nstarted := 0 587 588 for _, member := range c.VotingMembers() { 589 if member.IsStarted() { 590 nstarted++ 591 } 592 nmembers++ 593 } 594 595 if nstarted == 1 && nmembers == 2 { 596 // a case of adding a new node to 1-member cluster for restoring cluster data 597 // https://github.com/etcd-io/etcd/blob/master/Documentation/v2/admin_guide.md#restoring-the-cluster 598 if c.lg != nil { 599 c.lg.Debug("number of started member is 1; can accept add member request") 600 } else { 601 plog.Debugf("The number of started member is 1. This cluster can accept add member request.") 602 } 603 return true 604 } 605 606 nquorum := nmembers/2 + 1 607 if nstarted < nquorum { 608 if c.lg != nil { 609 c.lg.Warn( 610 "rejecting member add; started member will be less than quorum", 611 zap.Int("number-of-started-member", nstarted), 612 zap.Int("quorum", nquorum), 613 zap.String("cluster-id", c.cid.String()), 614 zap.String("local-member-id", c.localID.String()), 615 ) 616 } else { 617 plog.Warningf("Reject add member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum) 618 } 619 return false 620 } 621 622 return true 623} 624 625func (c *RaftCluster) IsReadyToRemoveVotingMember(id uint64) bool { 626 nmembers := 0 627 nstarted := 0 628 629 for _, member := range c.VotingMembers() { 630 if uint64(member.ID) == id { 631 continue 632 } 633 634 if member.IsStarted() { 635 nstarted++ 636 } 637 nmembers++ 638 } 639 640 nquorum := nmembers/2 + 1 641 if nstarted < nquorum { 642 if c.lg != nil { 643 c.lg.Warn( 644 "rejecting member remove; started member will be less than quorum", 645 zap.Int("number-of-started-member", nstarted), 646 zap.Int("quorum", nquorum), 647 zap.String("cluster-id", c.cid.String()), 648 zap.String("local-member-id", c.localID.String()), 649 ) 650 } else { 651 plog.Warningf("Reject remove member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum) 652 } 653 return false 654 } 655 656 return true 657} 658 659func (c *RaftCluster) IsReadyToPromoteMember(id uint64) bool { 660 nmembers := 1 // We count the learner to be promoted for the future quorum 661 nstarted := 1 // and we also count it as started. 662 663 for _, member := range c.VotingMembers() { 664 if member.IsStarted() { 665 nstarted++ 666 } 667 nmembers++ 668 } 669 670 nquorum := nmembers/2 + 1 671 if nstarted < nquorum { 672 if c.lg != nil { 673 c.lg.Warn( 674 "rejecting member promote; started member will be less than quorum", 675 zap.Int("number-of-started-member", nstarted), 676 zap.Int("quorum", nquorum), 677 zap.String("cluster-id", c.cid.String()), 678 zap.String("local-member-id", c.localID.String()), 679 ) 680 } else { 681 plog.Warningf("Reject promote member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum) 682 } 683 return false 684 } 685 686 return true 687} 688 689func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, map[types.ID]bool) { 690 members := make(map[types.ID]*Member) 691 removed := make(map[types.ID]bool) 692 e, err := st.Get(StoreMembersPrefix, true, true) 693 if err != nil { 694 if isKeyNotFound(err) { 695 return members, removed 696 } 697 if lg != nil { 698 lg.Panic("failed to get members from store", zap.String("path", StoreMembersPrefix), zap.Error(err)) 699 } else { 700 plog.Panicf("get storeMembers should never fail: %v", err) 701 } 702 } 703 for _, n := range e.Node.Nodes { 704 var m *Member 705 m, err = nodeToMember(n) 706 if err != nil { 707 if lg != nil { 708 lg.Panic("failed to nodeToMember", zap.Error(err)) 709 } else { 710 plog.Panicf("nodeToMember should never fail: %v", err) 711 } 712 } 713 members[m.ID] = m 714 } 715 716 e, err = st.Get(storeRemovedMembersPrefix, true, true) 717 if err != nil { 718 if isKeyNotFound(err) { 719 return members, removed 720 } 721 if lg != nil { 722 lg.Panic( 723 "failed to get removed members from store", 724 zap.String("path", storeRemovedMembersPrefix), 725 zap.Error(err), 726 ) 727 } else { 728 plog.Panicf("get storeRemovedMembers should never fail: %v", err) 729 } 730 } 731 for _, n := range e.Node.Nodes { 732 removed[MustParseMemberIDFromKey(n.Key)] = true 733 } 734 return members, removed 735} 736 737func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version { 738 e, err := st.Get(path.Join(storePrefix, "version"), false, false) 739 if err != nil { 740 if isKeyNotFound(err) { 741 return nil 742 } 743 if lg != nil { 744 lg.Panic( 745 "failed to get cluster version from store", 746 zap.String("path", path.Join(storePrefix, "version")), 747 zap.Error(err), 748 ) 749 } else { 750 plog.Panicf("unexpected error (%v) when getting cluster version from store", err) 751 } 752 } 753 return semver.Must(semver.NewVersion(*e.Node.Value)) 754} 755 756// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs 757// with the existing cluster. If the validation succeeds, it assigns the IDs 758// from the existing cluster to the local cluster. 759// If the validation fails, an error will be returned. 760func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *RaftCluster) error { 761 ems := existing.Members() 762 lms := local.Members() 763 if len(ems) != len(lms) { 764 return fmt.Errorf("member count is unequal") 765 } 766 sort.Sort(MembersByPeerURLs(ems)) 767 sort.Sort(MembersByPeerURLs(lms)) 768 769 ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) 770 defer cancel() 771 for i := range ems { 772 if ok, err := netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[i].PeerURLs); !ok { 773 return fmt.Errorf("unmatched member while checking PeerURLs (%v)", err) 774 } 775 lms[i].ID = ems[i].ID 776 } 777 local.members = make(map[types.ID]*Member) 778 for _, m := range lms { 779 local.members[m.ID] = m 780 } 781 return nil 782} 783 784func mustDetectDowngrade(lg *zap.Logger, cv *semver.Version) { 785 lv := semver.Must(semver.NewVersion(version.Version)) 786 // only keep major.minor version for comparison against cluster version 787 lv = &semver.Version{Major: lv.Major, Minor: lv.Minor} 788 if cv != nil && lv.LessThan(*cv) { 789 if lg != nil { 790 lg.Fatal( 791 "invalid downgrade; server version is lower than determined cluster version", 792 zap.String("current-server-version", version.Version), 793 zap.String("determined-cluster-version", version.Cluster(cv.String())), 794 ) 795 } else { 796 plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String())) 797 } 798 } 799} 800 801// IsLocalMemberLearner returns if the local member is raft learner 802func (c *RaftCluster) IsLocalMemberLearner() bool { 803 c.Lock() 804 defer c.Unlock() 805 localMember, ok := c.members[c.localID] 806 if !ok { 807 if c.lg != nil { 808 c.lg.Panic( 809 "failed to find local ID in cluster members", 810 zap.String("cluster-id", c.cid.String()), 811 zap.String("local-member-id", c.localID.String()), 812 ) 813 } else { 814 plog.Panicf("failed to find local ID %s in cluster %s", c.localID.String(), c.cid.String()) 815 } 816 } 817 return localMember.IsLearner 818} 819 820// IsMemberExist returns if the member with the given id exists in cluster. 821func (c *RaftCluster) IsMemberExist(id types.ID) bool { 822 c.Lock() 823 defer c.Unlock() 824 _, ok := c.members[id] 825 return ok 826} 827 828// VotingMemberIDs returns the ID of voting members in cluster. 829func (c *RaftCluster) VotingMemberIDs() []types.ID { 830 c.Lock() 831 defer c.Unlock() 832 var ids []types.ID 833 for _, m := range c.members { 834 if !m.IsLearner { 835 ids = append(ids, m.ID) 836 } 837 } 838 sort.Sort(types.IDSlice(ids)) 839 return ids 840} 841