1// Copyright (C) MongoDB, Inc. 2017-present. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may 4// not use this file except in compliance with the License. You may obtain 5// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 6 7package topology 8 9import ( 10 "bytes" 11 "fmt" 12 "sync/atomic" 13 14 "go.mongodb.org/mongo-driver/bson/primitive" 15 "go.mongodb.org/mongo-driver/x/mongo/driver/address" 16 "go.mongodb.org/mongo-driver/x/mongo/driver/description" 17) 18 19var supportedWireVersions = description.NewVersionRange(2, 9) 20var minSupportedMongoDBVersion = "2.6" 21 22type fsm struct { 23 description.Topology 24 SetName string 25 maxElectionID primitive.ObjectID 26 maxSetVersion uint32 27 compatible atomic.Value 28 compatibilityErr error 29} 30 31func newFSM() *fsm { 32 f := fsm{} 33 f.compatible.Store(true) 34 return &f 35} 36 37// apply takes a new server description and modifies the FSM's topology description based on it. It returns the 38// updated topology description as well as a server description. The returned server description is either the same 39// one that was passed in, or a new one in the case that it had to be changed. 40// 41// apply should operation on immutable descriptions so we don't have to lock for the entire time we're applying the 42// server description. 43func (f *fsm) apply(s description.Server) (description.Topology, description.Server, error) { 44 newServers := make([]description.Server, len(f.Servers)) 45 copy(newServers, f.Servers) 46 47 oldMinutes := f.SessionTimeoutMinutes 48 f.Topology = description.Topology{ 49 Kind: f.Kind, 50 Servers: newServers, 51 } 52 53 // For data bearing servers, set SessionTimeoutMinutes to the lowest among them 54 if oldMinutes == 0 { 55 // If timeout currently 0, check all servers to see if any still don't have a timeout 56 // If they all have timeout, pick the lowest. 57 timeout := s.SessionTimeoutMinutes 58 for _, server := range f.Servers { 59 if server.DataBearing() && server.SessionTimeoutMinutes < timeout { 60 timeout = server.SessionTimeoutMinutes 61 } 62 } 63 f.SessionTimeoutMinutes = timeout 64 } else { 65 if s.DataBearing() && oldMinutes > s.SessionTimeoutMinutes { 66 f.SessionTimeoutMinutes = s.SessionTimeoutMinutes 67 } else { 68 f.SessionTimeoutMinutes = oldMinutes 69 } 70 } 71 72 if _, ok := f.findServer(s.Addr); !ok { 73 return f.Topology, s, nil 74 } 75 76 updatedDesc := s 77 switch f.Kind { 78 case description.Unknown: 79 updatedDesc = f.applyToUnknown(s) 80 case description.Sharded: 81 updatedDesc = f.applyToSharded(s) 82 case description.ReplicaSetNoPrimary: 83 updatedDesc = f.applyToReplicaSetNoPrimary(s) 84 case description.ReplicaSetWithPrimary: 85 updatedDesc = f.applyToReplicaSetWithPrimary(s) 86 case description.Single: 87 updatedDesc = f.applyToSingle(s) 88 } 89 90 for _, server := range f.Servers { 91 if server.WireVersion != nil { 92 if server.WireVersion.Max < supportedWireVersions.Min { 93 f.compatible.Store(false) 94 f.compatibilityErr = fmt.Errorf( 95 "server at %s reports wire version %d, but this version of the Go driver requires "+ 96 "at least %d (MongoDB %s)", 97 server.Addr.String(), 98 server.WireVersion.Max, 99 supportedWireVersions.Min, 100 minSupportedMongoDBVersion, 101 ) 102 return description.Topology{}, s, f.compatibilityErr 103 } 104 105 if server.WireVersion.Min > supportedWireVersions.Max { 106 f.compatible.Store(false) 107 f.compatibilityErr = fmt.Errorf( 108 "server at %s requires wire version %d, but this version of the Go driver only supports up to %d", 109 server.Addr.String(), 110 server.WireVersion.Min, 111 supportedWireVersions.Max, 112 ) 113 return description.Topology{}, s, f.compatibilityErr 114 } 115 } 116 } 117 118 f.compatible.Store(true) 119 f.compatibilityErr = nil 120 return f.Topology, updatedDesc, nil 121} 122 123func (f *fsm) applyToReplicaSetNoPrimary(s description.Server) description.Server { 124 switch s.Kind { 125 case description.Standalone, description.Mongos: 126 f.removeServerByAddr(s.Addr) 127 case description.RSPrimary: 128 f.updateRSFromPrimary(s) 129 case description.RSSecondary, description.RSArbiter, description.RSMember: 130 f.updateRSWithoutPrimary(s) 131 case description.Unknown, description.RSGhost: 132 f.replaceServer(s) 133 } 134 135 return s 136} 137 138func (f *fsm) applyToReplicaSetWithPrimary(s description.Server) description.Server { 139 switch s.Kind { 140 case description.Standalone, description.Mongos: 141 f.removeServerByAddr(s.Addr) 142 f.checkIfHasPrimary() 143 case description.RSPrimary: 144 f.updateRSFromPrimary(s) 145 case description.RSSecondary, description.RSArbiter, description.RSMember: 146 f.updateRSWithPrimaryFromMember(s) 147 case description.Unknown, description.RSGhost: 148 f.replaceServer(s) 149 f.checkIfHasPrimary() 150 } 151 152 return s 153} 154 155func (f *fsm) applyToSharded(s description.Server) description.Server { 156 switch s.Kind { 157 case description.Mongos, description.Unknown: 158 f.replaceServer(s) 159 case description.Standalone, description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost: 160 f.removeServerByAddr(s.Addr) 161 } 162 163 return s 164} 165 166func (f *fsm) applyToSingle(s description.Server) description.Server { 167 switch s.Kind { 168 case description.Unknown: 169 f.replaceServer(s) 170 case description.Standalone, description.Mongos: 171 if f.SetName != "" { 172 f.removeServerByAddr(s.Addr) 173 return s 174 } 175 176 f.replaceServer(s) 177 case description.RSPrimary, description.RSSecondary, description.RSArbiter, description.RSMember, description.RSGhost: 178 // A replica set name can be provided when creating a direct connection. In this case, if the set name returned 179 // by the isMaster response doesn't match up with the one provided during configuration, the server description 180 // is replaced with a default Unknown description. 181 // 182 // We create a new server description rather than doing s.Kind = description.Unknown because the other fields, 183 // such as RTT, need to be cleared for Unknown descriptions as well. 184 if f.SetName != "" && f.SetName != s.SetName { 185 s = description.Server{ 186 Addr: s.Addr, 187 Kind: description.Unknown, 188 } 189 } 190 191 f.replaceServer(s) 192 } 193 194 return s 195} 196 197func (f *fsm) applyToUnknown(s description.Server) description.Server { 198 switch s.Kind { 199 case description.Mongos: 200 f.setKind(description.Sharded) 201 f.replaceServer(s) 202 case description.RSPrimary: 203 f.updateRSFromPrimary(s) 204 case description.RSSecondary, description.RSArbiter, description.RSMember: 205 f.setKind(description.ReplicaSetNoPrimary) 206 f.updateRSWithoutPrimary(s) 207 case description.Standalone: 208 f.updateUnknownWithStandalone(s) 209 case description.Unknown, description.RSGhost: 210 f.replaceServer(s) 211 } 212 213 return s 214} 215 216func (f *fsm) checkIfHasPrimary() { 217 if _, ok := f.findPrimary(); ok { 218 f.setKind(description.ReplicaSetWithPrimary) 219 } else { 220 f.setKind(description.ReplicaSetNoPrimary) 221 } 222} 223 224func (f *fsm) updateRSFromPrimary(s description.Server) { 225 if f.SetName == "" { 226 f.SetName = s.SetName 227 } else if f.SetName != s.SetName { 228 f.removeServerByAddr(s.Addr) 229 f.checkIfHasPrimary() 230 return 231 } 232 233 if s.SetVersion != 0 && !bytes.Equal(s.ElectionID[:], primitive.NilObjectID[:]) { 234 if f.maxSetVersion > s.SetVersion || bytes.Compare(f.maxElectionID[:], s.ElectionID[:]) == 1 { 235 f.replaceServer(description.Server{ 236 Addr: s.Addr, 237 LastError: fmt.Errorf("was a primary, but its set version or election id is stale"), 238 }) 239 f.checkIfHasPrimary() 240 return 241 } 242 243 f.maxElectionID = s.ElectionID 244 } 245 246 if s.SetVersion > f.maxSetVersion { 247 f.maxSetVersion = s.SetVersion 248 } 249 250 if j, ok := f.findPrimary(); ok { 251 f.setServer(j, description.Server{ 252 Addr: f.Servers[j].Addr, 253 LastError: fmt.Errorf("was a primary, but a new primary was discovered"), 254 }) 255 } 256 257 f.replaceServer(s) 258 259 for j := len(f.Servers) - 1; j >= 0; j-- { 260 found := false 261 for _, member := range s.Members { 262 if member == f.Servers[j].Addr { 263 found = true 264 break 265 } 266 } 267 if !found { 268 f.removeServer(j) 269 } 270 } 271 272 for _, member := range s.Members { 273 if _, ok := f.findServer(member); !ok { 274 f.addServer(member) 275 } 276 } 277 278 f.checkIfHasPrimary() 279} 280 281func (f *fsm) updateRSWithPrimaryFromMember(s description.Server) { 282 if f.SetName != s.SetName { 283 f.removeServerByAddr(s.Addr) 284 f.checkIfHasPrimary() 285 return 286 } 287 288 if s.Addr != s.CanonicalAddr { 289 f.removeServerByAddr(s.Addr) 290 f.checkIfHasPrimary() 291 return 292 } 293 294 f.replaceServer(s) 295 296 if _, ok := f.findPrimary(); !ok { 297 f.setKind(description.ReplicaSetNoPrimary) 298 } 299} 300 301func (f *fsm) updateRSWithoutPrimary(s description.Server) { 302 if f.SetName == "" { 303 f.SetName = s.SetName 304 } else if f.SetName != s.SetName { 305 f.removeServerByAddr(s.Addr) 306 return 307 } 308 309 for _, member := range s.Members { 310 if _, ok := f.findServer(member); !ok { 311 f.addServer(member) 312 } 313 } 314 315 if s.Addr != s.CanonicalAddr { 316 f.removeServerByAddr(s.Addr) 317 return 318 } 319 320 f.replaceServer(s) 321} 322 323func (f *fsm) updateUnknownWithStandalone(s description.Server) { 324 if len(f.Servers) > 1 { 325 f.removeServerByAddr(s.Addr) 326 return 327 } 328 329 f.setKind(description.Single) 330 f.replaceServer(s) 331} 332 333func (f *fsm) addServer(addr address.Address) { 334 f.Servers = append(f.Servers, description.Server{ 335 Addr: addr.Canonicalize(), 336 }) 337} 338 339func (f *fsm) findPrimary() (int, bool) { 340 for i, s := range f.Servers { 341 if s.Kind == description.RSPrimary { 342 return i, true 343 } 344 } 345 346 return 0, false 347} 348 349func (f *fsm) findServer(addr address.Address) (int, bool) { 350 canon := addr.Canonicalize() 351 for i, s := range f.Servers { 352 if canon == s.Addr { 353 return i, true 354 } 355 } 356 357 return 0, false 358} 359 360func (f *fsm) removeServer(i int) { 361 f.Servers = append(f.Servers[:i], f.Servers[i+1:]...) 362} 363 364func (f *fsm) removeServerByAddr(addr address.Address) { 365 if i, ok := f.findServer(addr); ok { 366 f.removeServer(i) 367 } 368} 369 370func (f *fsm) replaceServer(s description.Server) bool { 371 if i, ok := f.findServer(s.Addr); ok { 372 f.setServer(i, s) 373 return true 374 } 375 return false 376} 377 378func (f *fsm) setServer(i int, s description.Server) { 379 f.Servers[i] = s 380} 381 382func (f *fsm) setKind(k description.TopologyKind) { 383 f.Kind = k 384} 385