1package nodes 2 3import ( 4 "context" 5 "sync" 6 "time" 7 8 "github.com/sirupsen/logrus" 9 "gitlab.com/gitlab-org/gitaly/v14/internal/praefect/metrics" 10) 11 12// localElector relies on an in-memory datastore to track the primary 13// and secondaries. A single election strategy pertains to a single 14// shard. It does NOT support multiple Praefect nodes or have any 15// persistence. This is used mostly for testing and development. 16type localElector struct { 17 m sync.RWMutex 18 shardName string 19 nodes []Node 20 primaryNode Node 21 log logrus.FieldLogger 22 23 doneCh chan struct{} 24} 25 26func newLocalElector(name string, log logrus.FieldLogger, ns []*nodeStatus) *localElector { 27 nodes := make([]Node, len(ns)) 28 for i, n := range ns { 29 nodes[i] = n 30 } 31 return &localElector{ 32 shardName: name, 33 log: log.WithField("virtual_storage", name), 34 nodes: nodes[:], 35 primaryNode: nodes[0], 36 doneCh: make(chan struct{}), 37 } 38} 39 40// Start launches a Goroutine to check the state of the nodes and 41// continuously monitor their health via gRPC health checks. 42func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) { 43 s.bootstrap(bootstrapInterval) 44 go s.monitor(monitorInterval) 45} 46 47func (s *localElector) bootstrap(d time.Duration) { 48 timer := time.NewTimer(d) 49 defer timer.Stop() 50 51 for i := 0; i < healthcheckThreshold; i++ { 52 <-timer.C 53 54 ctx := context.TODO() 55 s.checkNodes(ctx) 56 timer.Reset(d) 57 } 58} 59 60func (s *localElector) monitor(d time.Duration) { 61 ticker := time.NewTicker(d) 62 defer ticker.Stop() 63 64 ctx := context.Background() 65 66 for { 67 select { 68 case <-s.doneCh: 69 return 70 case <-ticker.C: 71 } 72 73 err := s.checkNodes(ctx) 74 if err != nil { 75 s.log.WithError(err).Warn("error checking nodes") 76 } 77 } 78} 79 80func (s *localElector) stop() { 81 close(s.doneCh) 82} 83 84// checkNodes issues a gRPC health check for each node managed by the 85// shard. 86func (s *localElector) checkNodes(ctx context.Context) error { 87 defer s.updateMetrics() 88 89 var wg sync.WaitGroup 90 for _, n := range s.nodes { 91 wg.Add(1) 92 go func(n Node) { 93 defer wg.Done() 94 _, _ = n.CheckHealth(ctx) 95 }(n) 96 } 97 wg.Wait() 98 99 s.m.Lock() 100 defer s.m.Unlock() 101 102 if s.primaryNode.IsHealthy() { 103 return nil 104 } 105 106 var newPrimary Node 107 108 for _, node := range s.nodes { 109 if node != s.primaryNode && node.IsHealthy() { 110 newPrimary = node 111 break 112 } 113 } 114 115 if newPrimary == nil { 116 return ErrPrimaryNotHealthy 117 } 118 119 s.primaryNode = newPrimary 120 121 return nil 122} 123 124// GetShard gets the current status of the shard. If primary is not elected 125// or it is unhealthy and failover is enabled, ErrPrimaryNotHealthy is 126// returned. 127func (s *localElector) GetShard(ctx context.Context) (Shard, error) { 128 s.m.RLock() 129 primary := s.primaryNode 130 s.m.RUnlock() 131 132 if primary == nil { 133 return Shard{}, ErrPrimaryNotHealthy 134 } 135 136 if !primary.IsHealthy() { 137 return Shard{}, ErrPrimaryNotHealthy 138 } 139 140 var secondaries []Node 141 for _, n := range s.nodes { 142 if n != primary { 143 secondaries = append(secondaries, n) 144 } 145 } 146 147 return Shard{ 148 Primary: primary, 149 Secondaries: secondaries, 150 }, nil 151} 152 153func (s *localElector) updateMetrics() { 154 s.m.RLock() 155 primary := s.primaryNode 156 s.m.RUnlock() 157 158 for _, n := range s.nodes { 159 var val float64 160 161 if n == primary { 162 val = 1 163 } 164 165 metrics.PrimaryGauge.WithLabelValues(s.shardName, n.GetStorage()).Set(val) 166 } 167} 168