1package nodes
2
3import (
4	"context"
5	"sync"
6	"time"
7
8	"github.com/sirupsen/logrus"
9	"gitlab.com/gitlab-org/gitaly/v14/internal/praefect/metrics"
10)
11
12// localElector relies on an in-memory datastore to track the primary
13// and secondaries. A single election strategy pertains to a single
14// shard. It does NOT support multiple Praefect nodes or have any
15// persistence. This is used mostly for testing and development.
16type localElector struct {
17	m           sync.RWMutex
18	shardName   string
19	nodes       []Node
20	primaryNode Node
21	log         logrus.FieldLogger
22
23	doneCh chan struct{}
24}
25
26func newLocalElector(name string, log logrus.FieldLogger, ns []*nodeStatus) *localElector {
27	nodes := make([]Node, len(ns))
28	for i, n := range ns {
29		nodes[i] = n
30	}
31	return &localElector{
32		shardName:   name,
33		log:         log.WithField("virtual_storage", name),
34		nodes:       nodes[:],
35		primaryNode: nodes[0],
36		doneCh:      make(chan struct{}),
37	}
38}
39
40// Start launches a Goroutine to check the state of the nodes and
41// continuously monitor their health via gRPC health checks.
42func (s *localElector) start(bootstrapInterval, monitorInterval time.Duration) {
43	s.bootstrap(bootstrapInterval)
44	go s.monitor(monitorInterval)
45}
46
47func (s *localElector) bootstrap(d time.Duration) {
48	timer := time.NewTimer(d)
49	defer timer.Stop()
50
51	for i := 0; i < healthcheckThreshold; i++ {
52		<-timer.C
53
54		ctx := context.TODO()
55		s.checkNodes(ctx)
56		timer.Reset(d)
57	}
58}
59
60func (s *localElector) monitor(d time.Duration) {
61	ticker := time.NewTicker(d)
62	defer ticker.Stop()
63
64	ctx := context.Background()
65
66	for {
67		select {
68		case <-s.doneCh:
69			return
70		case <-ticker.C:
71		}
72
73		err := s.checkNodes(ctx)
74		if err != nil {
75			s.log.WithError(err).Warn("error checking nodes")
76		}
77	}
78}
79
80func (s *localElector) stop() {
81	close(s.doneCh)
82}
83
84// checkNodes issues a gRPC health check for each node managed by the
85// shard.
86func (s *localElector) checkNodes(ctx context.Context) error {
87	defer s.updateMetrics()
88
89	var wg sync.WaitGroup
90	for _, n := range s.nodes {
91		wg.Add(1)
92		go func(n Node) {
93			defer wg.Done()
94			_, _ = n.CheckHealth(ctx)
95		}(n)
96	}
97	wg.Wait()
98
99	s.m.Lock()
100	defer s.m.Unlock()
101
102	if s.primaryNode.IsHealthy() {
103		return nil
104	}
105
106	var newPrimary Node
107
108	for _, node := range s.nodes {
109		if node != s.primaryNode && node.IsHealthy() {
110			newPrimary = node
111			break
112		}
113	}
114
115	if newPrimary == nil {
116		return ErrPrimaryNotHealthy
117	}
118
119	s.primaryNode = newPrimary
120
121	return nil
122}
123
124// GetShard gets the current status of the shard. If primary is not elected
125// or it is unhealthy and failover is enabled, ErrPrimaryNotHealthy is
126// returned.
127func (s *localElector) GetShard(ctx context.Context) (Shard, error) {
128	s.m.RLock()
129	primary := s.primaryNode
130	s.m.RUnlock()
131
132	if primary == nil {
133		return Shard{}, ErrPrimaryNotHealthy
134	}
135
136	if !primary.IsHealthy() {
137		return Shard{}, ErrPrimaryNotHealthy
138	}
139
140	var secondaries []Node
141	for _, n := range s.nodes {
142		if n != primary {
143			secondaries = append(secondaries, n)
144		}
145	}
146
147	return Shard{
148		Primary:     primary,
149		Secondaries: secondaries,
150	}, nil
151}
152
153func (s *localElector) updateMetrics() {
154	s.m.RLock()
155	primary := s.primaryNode
156	s.m.RUnlock()
157
158	for _, n := range s.nodes {
159		var val float64
160
161		if n == primary {
162			val = 1
163		}
164
165		metrics.PrimaryGauge.WithLabelValues(s.shardName, n.GetStorage()).Set(val)
166	}
167}
168