1package gocql 2 3import ( 4 "fmt" 5 "net" 6 "sync" 7 "sync/atomic" 8) 9 10type ring struct { 11 // endpoints are the set of endpoints which the driver will attempt to connect 12 // to in the case it can not reach any of its hosts. They are also used to boot 13 // strap the initial connection. 14 endpoints []*HostInfo 15 16 // hosts are the set of all hosts in the cassandra ring that we know of 17 mu sync.RWMutex 18 hosts map[string]*HostInfo 19 20 hostList []*HostInfo 21 pos uint32 22 23 // TODO: we should store the ring metadata here also. 24} 25 26func (r *ring) rrHost() *HostInfo { 27 // TODO: should we filter hosts that get used here? These hosts will be used 28 // for the control connection, should we also provide an iterator? 29 r.mu.RLock() 30 defer r.mu.RUnlock() 31 if len(r.hostList) == 0 { 32 return nil 33 } 34 35 pos := int(atomic.AddUint32(&r.pos, 1) - 1) 36 return r.hostList[pos%len(r.hostList)] 37} 38 39func (r *ring) getHost(ip net.IP) *HostInfo { 40 r.mu.RLock() 41 host := r.hosts[ip.String()] 42 r.mu.RUnlock() 43 return host 44} 45 46func (r *ring) allHosts() []*HostInfo { 47 r.mu.RLock() 48 hosts := make([]*HostInfo, 0, len(r.hosts)) 49 for _, host := range r.hosts { 50 hosts = append(hosts, host) 51 } 52 r.mu.RUnlock() 53 return hosts 54} 55 56func (r *ring) currentHosts() map[string]*HostInfo { 57 r.mu.RLock() 58 hosts := make(map[string]*HostInfo, len(r.hosts)) 59 for k, v := range r.hosts { 60 hosts[k] = v 61 } 62 r.mu.RUnlock() 63 return hosts 64} 65 66func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { 67 if existingHost, ok := r.addHostIfMissing(host); ok { 68 existingHost.update(host) 69 host = existingHost 70 } 71 return host 72} 73 74func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { 75 if host.invalidConnectAddr() { 76 panic(fmt.Sprintf("invalid host: %v", host)) 77 } 78 ip := host.ConnectAddress().String() 79 80 r.mu.Lock() 81 if r.hosts == nil { 82 r.hosts = make(map[string]*HostInfo) 83 } 84 85 existing, ok := r.hosts[ip] 86 if !ok { 87 r.hosts[ip] = host 88 existing = host 89 r.hostList = append(r.hostList, host) 90 } 91 r.mu.Unlock() 92 return existing, ok 93} 94 95func (r *ring) removeHost(ip net.IP) bool { 96 r.mu.Lock() 97 if r.hosts == nil { 98 r.hosts = make(map[string]*HostInfo) 99 } 100 101 k := ip.String() 102 _, ok := r.hosts[k] 103 if ok { 104 for i, host := range r.hostList { 105 if host.ConnectAddress().Equal(ip) { 106 r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) 107 break 108 } 109 } 110 } 111 delete(r.hosts, k) 112 r.mu.Unlock() 113 return ok 114} 115 116type clusterMetadata struct { 117 mu sync.RWMutex 118 partitioner string 119} 120 121func (c *clusterMetadata) setPartitioner(partitioner string) { 122 c.mu.Lock() 123 defer c.mu.Unlock() 124 125 if c.partitioner != partitioner { 126 // TODO: update other things now 127 c.partitioner = partitioner 128 } 129} 130