1// Copyright (C) MongoDB, Inc. 2017-present.
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may
4// not use this file except in compliance with the License. You may obtain
5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6
7package topology
8
9import (
10	"bytes"
11	"fmt"
12	"sync/atomic"
13
14	"go.mongodb.org/mongo-driver/bson/primitive"
15	"go.mongodb.org/mongo-driver/x/mongo/driver/address"
16	"go.mongodb.org/mongo-driver/x/mongo/driver/description"
17)
18
19var supportedWireVersions = description.NewVersionRange(2, 9)
20var minSupportedMongoDBVersion = "2.6"
21
22type fsm struct {
23	description.Topology
24	SetName          string
25	maxElectionID    primitive.ObjectID
26	maxSetVersion    uint32
27	compatible       atomic.Value
28	compatibilityErr error
29}
30
31func newFSM() *fsm {
32	f := fsm{}
33	f.compatible.Store(true)
34	return &f
35}
36
37// apply takes a new server description and modifies the FSM's topology description based on it. It returns the
38// updated topology description as well as a server description. The returned server description is either the same
39// one that was passed in, or a new one in the case that it had to be changed.
40//
41// apply should operation on immutable descriptions so we don't have to lock for the entire time we're applying the
42// server description.
43func (f *fsm) apply(s description.Server) (description.Topology, description.Server, error) {
44	newServers := make([]description.Server, len(f.Servers))
45	copy(newServers, f.Servers)
46
47	oldMinutes := f.SessionTimeoutMinutes
48	f.Topology = description.Topology{
49		Kind:    f.Kind,
50		Servers: newServers,
51	}
52
53	// For data bearing servers, set SessionTimeoutMinutes to the lowest among them
54	if oldMinutes == 0 {
55		// If timeout currently 0, check all servers to see if any still don't have a timeout
56		// If they all have timeout, pick the lowest.
57		timeout := s.SessionTimeoutMinutes
58		for _, server := range f.Servers {
59			if server.DataBearing() && server.SessionTimeoutMinutes < timeout {
60				timeout = server.SessionTimeoutMinutes
61			}
62		}
63		f.SessionTimeoutMinutes = timeout
64	} else {
65		if s.DataBearing() && oldMinutes > s.SessionTimeoutMinutes {
66			f.SessionTimeoutMinutes = s.SessionTimeoutMinutes
67		} else {
68			f.SessionTimeoutMinutes = oldMinutes
69		}
70	}
71
72	if _, ok := f.findServer(s.Addr); !ok {
73		return f.Topology, s, nil
74	}
75
76	updatedDesc := s
77	switch f.Kind {
78	case description.Unknown:
79		updatedDesc = f.applyToUnknown(s)
80	case description.Sharded:
81		updatedDesc = f.applyToSharded(s)
82	case description.ReplicaSetNoPrimary:
83		updatedDesc = f.applyToReplicaSetNoPrimary(s)
84	case description.ReplicaSetWithPrimary:
85		updatedDesc = f.applyToReplicaSetWithPrimary(s)
86	case description.Single:
87		updatedDesc = f.applyToSingle(s)
88	}
89
90	for _, server := range f.Servers {
91		if server.WireVersion != nil {
92			if server.WireVersion.Max < supportedWireVersions.Min {
93				f.compatible.Store(false)
94				f.compatibilityErr = fmt.Errorf(
95					"server at %s reports wire version %d, but this version of the Go driver requires "+
96						"at least %d (MongoDB %s)",
97					server.Addr.String(),
98					server.WireVersion.Max,
99					supportedWireVersions.Min,
100					minSupportedMongoDBVersion,
101				)
102				return description.Topology{}, s, f.compatibilityErr
103			}
104
105			if server.WireVersion.Min > supportedWireVersions.Max {
106				f.compatible.Store(false)
107				f.compatibilityErr = fmt.Errorf(
108					"server at %s requires wire version %d, but this version of the Go driver only supports up to %d",
109					server.Addr.String(),
110					server.WireVersion.Min,
111					supportedWireVersions.Max,
112				)
113				return description.Topology{}, s, f.compatibilityErr
114			}
115		}
116	}
117
118	f.compatible.Store(true)
119	f.compatibilityErr = nil
120	return f.Topology, updatedDesc, nil
121}
122
123func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) description.Server {
124	switch s.Kind {
125	case description.Standalone, description.Mongos:
126		f.removeServerByAddr(s.Addr)
127	case description.RSPrimary:
128		f.updateRSFromPrimary(s)
129	case description.RSSecondary, description.RSArbiter, description.RSMember:
130		f.updateRSWithoutPrimary(s)
131	case description.Unknown, description.RSGhost:
132		f.replaceServer(s)
133	}
134
135	return s
136}
137
138func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) description.Server {
139	switch s.Kind {
140	case description.Standalone, description.Mongos:
141		f.removeServerByAddr(s.Addr)
142		f.checkIfHasPrimary()
143	case description.RSPrimary:
144		f.updateRSFromPrimary(s)
145	case description.RSSecondary, description.RSArbiter, description.RSMember:
146		f.updateRSWithPrimaryFromMember(s)
147	case description.Unknown, description.RSGhost:
148		f.replaceServer(s)
149		f.checkIfHasPrimary()
150	}
151
152	return s
153}
154
155func (f *fsm) applyToSharded(s description.Server) description.Server {
156	switch s.Kind {
157	case description.Mongos, description.Unknown:
158		f.replaceServer(s)
159	case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
160		f.removeServerByAddr(s.Addr)
161	}
162
163	return s
164}
165
166func (f *fsm) applyToSingle(s description.Server) description.Server {
167	switch s.Kind {
168	case description.Unknown:
169		f.replaceServer(s)
170	case description.Standalone, description.Mongos:
171		if f.SetName != "" {
172			f.removeServerByAddr(s.Addr)
173			return s
174		}
175
176		f.replaceServer(s)
177	case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost:
178		// A replica set name can be provided when creating a direct connection. In this case, if the set name returned
179		// by the isMaster response doesn't match up with the one provided during configuration, the server description
180		// is replaced with a default Unknown description.
181		//
182		// We create a new server description rather than doing s.Kind = description.Unknown because the other fields,
183		// such as RTT, need to be cleared for Unknown descriptions as well.
184		if f.SetName != "" && f.SetName != s.SetName {
185			s = description.Server{
186				Addr: s.Addr,
187				Kind: description.Unknown,
188			}
189		}
190
191		f.replaceServer(s)
192	}
193
194	return s
195}
196
197func (f *fsm) applyToUnknown(s description.Server) description.Server {
198	switch s.Kind {
199	case description.Mongos:
200		f.setKind(description.Sharded)
201		f.replaceServer(s)
202	case description.RSPrimary:
203		f.updateRSFromPrimary(s)
204	case description.RSSecondary, description.RSArbiter, description.RSMember:
205		f.setKind(description.ReplicaSetNoPrimary)
206		f.updateRSWithoutPrimary(s)
207	case description.Standalone:
208		f.updateUnknownWithStandalone(s)
209	case description.Unknown, description.RSGhost:
210		f.replaceServer(s)
211	}
212
213	return s
214}
215
216func (f *fsm) checkIfHasPrimary() {
217	if _, ok := f.findPrimary(); ok {
218		f.setKind(description.ReplicaSetWithPrimary)
219	} else {
220		f.setKind(description.ReplicaSetNoPrimary)
221	}
222}
223
224func (f *fsm) updateRSFromPrimary(s description.Server) {
225	if f.SetName == "" {
226		f.SetName = s.SetName
227	} else if f.SetName != s.SetName {
228		f.removeServerByAddr(s.Addr)
229		f.checkIfHasPrimary()
230		return
231	}
232
233	if s.SetVersion != 0 && !bytes.Equal(s.ElectionID[:], primitive.NilObjectID[:]) {
234		if f.maxSetVersion > s.SetVersion || bytes.Compare(f.maxElectionID[:], s.ElectionID[:]) == 1 {
235			f.replaceServer(description.Server{
236				Addr:      s.Addr,
237				LastError: fmt.Errorf("was a primary, but its set version or election id is stale"),
238			})
239			f.checkIfHasPrimary()
240			return
241		}
242
243		f.maxElectionID = s.ElectionID
244	}
245
246	if s.SetVersion > f.maxSetVersion {
247		f.maxSetVersion = s.SetVersion
248	}
249
250	if j, ok := f.findPrimary(); ok {
251		f.setServer(j, description.Server{
252			Addr:      f.Servers[j].Addr,
253			LastError: fmt.Errorf("was a primary, but a new primary was discovered"),
254		})
255	}
256
257	f.replaceServer(s)
258
259	for j := len(f.Servers) - 1; j >= 0; j-- {
260		found := false
261		for _, member := range s.Members {
262			if member == f.Servers[j].Addr {
263				found = true
264				break
265			}
266		}
267		if !found {
268			f.removeServer(j)
269		}
270	}
271
272	for _, member := range s.Members {
273		if _, ok := f.findServer(member); !ok {
274			f.addServer(member)
275		}
276	}
277
278	f.checkIfHasPrimary()
279}
280
281func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) {
282	if f.SetName != s.SetName {
283		f.removeServerByAddr(s.Addr)
284		f.checkIfHasPrimary()
285		return
286	}
287
288	if s.Addr != s.CanonicalAddr {
289		f.removeServerByAddr(s.Addr)
290		f.checkIfHasPrimary()
291		return
292	}
293
294	f.replaceServer(s)
295
296	if _, ok := f.findPrimary(); !ok {
297		f.setKind(description.ReplicaSetNoPrimary)
298	}
299}
300
301func (f *fsm) updateRSWithoutPrimary(s description.Server) {
302	if f.SetName == "" {
303		f.SetName = s.SetName
304	} else if f.SetName != s.SetName {
305		f.removeServerByAddr(s.Addr)
306		return
307	}
308
309	for _, member := range s.Members {
310		if _, ok := f.findServer(member); !ok {
311			f.addServer(member)
312		}
313	}
314
315	if s.Addr != s.CanonicalAddr {
316		f.removeServerByAddr(s.Addr)
317		return
318	}
319
320	f.replaceServer(s)
321}
322
323func (f *fsm) updateUnknownWithStandalone(s description.Server) {
324	if len(f.Servers) > 1 {
325		f.removeServerByAddr(s.Addr)
326		return
327	}
328
329	f.setKind(description.Single)
330	f.replaceServer(s)
331}
332
333func (f *fsm) addServer(addr address.Address) {
334	f.Servers = append(f.Servers, description.Server{
335		Addr: addr.Canonicalize(),
336	})
337}
338
339func (f *fsm) findPrimary() (int, bool) {
340	for i, s := range f.Servers {
341		if s.Kind == description.RSPrimary {
342			return i, true
343		}
344	}
345
346	return 0, false
347}
348
349func (f *fsm) findServer(addr address.Address) (int, bool) {
350	canon := addr.Canonicalize()
351	for i, s := range f.Servers {
352		if canon == s.Addr {
353			return i, true
354		}
355	}
356
357	return 0, false
358}
359
360func (f *fsm) removeServer(i int) {
361	f.Servers = append(f.Servers[:i], f.Servers[i+1:]...)
362}
363
364func (f *fsm) removeServerByAddr(addr address.Address) {
365	if i, ok := f.findServer(addr); ok {
366		f.removeServer(i)
367	}
368}
369
370func (f *fsm) replaceServer(s description.Server) bool {
371	if i, ok := f.findServer(s.Addr); ok {
372		f.setServer(i, s)
373		return true
374	}
375	return false
376}
377
378func (f *fsm) setServer(i int, s description.Server) {
379	f.Servers[i] = s
380}
381
382func (f *fsm) setKind(k description.TopologyKind) {
383	f.Kind = k
384}
385