1package gocql
2
3import (
4	"fmt"
5	"net"
6	"sync"
7	"sync/atomic"
8)
9
10type ring struct {
11	// endpoints are the set of endpoints which the driver will attempt to connect
12	// to in the case it can not reach any of its hosts. They are also used to boot
13	// strap the initial connection.
14	endpoints []*HostInfo
15
16	// hosts are the set of all hosts in the cassandra ring that we know of
17	mu    sync.RWMutex
18	hosts map[string]*HostInfo
19
20	hostList []*HostInfo
21	pos      uint32
22
23	// TODO: we should store the ring metadata here also.
24}
25
26func (r *ring) rrHost() *HostInfo {
27	// TODO: should we filter hosts that get used here? These hosts will be used
28	// for the control connection, should we also provide an iterator?
29	r.mu.RLock()
30	defer r.mu.RUnlock()
31	if len(r.hostList) == 0 {
32		return nil
33	}
34
35	pos := int(atomic.AddUint32(&r.pos, 1) - 1)
36	return r.hostList[pos%len(r.hostList)]
37}
38
39func (r *ring) getHost(ip net.IP) *HostInfo {
40	r.mu.RLock()
41	host := r.hosts[ip.String()]
42	r.mu.RUnlock()
43	return host
44}
45
46func (r *ring) allHosts() []*HostInfo {
47	r.mu.RLock()
48	hosts := make([]*HostInfo, 0, len(r.hosts))
49	for _, host := range r.hosts {
50		hosts = append(hosts, host)
51	}
52	r.mu.RUnlock()
53	return hosts
54}
55
56func (r *ring) currentHosts() map[string]*HostInfo {
57	r.mu.RLock()
58	hosts := make(map[string]*HostInfo, len(r.hosts))
59	for k, v := range r.hosts {
60		hosts[k] = v
61	}
62	r.mu.RUnlock()
63	return hosts
64}
65
66func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
67	if existingHost, ok := r.addHostIfMissing(host); ok {
68		existingHost.update(host)
69		host = existingHost
70	}
71	return host
72}
73
74func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
75	if host.invalidConnectAddr() {
76		panic(fmt.Sprintf("invalid host: %v", host))
77	}
78	ip := host.ConnectAddress().String()
79
80	r.mu.Lock()
81	if r.hosts == nil {
82		r.hosts = make(map[string]*HostInfo)
83	}
84
85	existing, ok := r.hosts[ip]
86	if !ok {
87		r.hosts[ip] = host
88		existing = host
89		r.hostList = append(r.hostList, host)
90	}
91	r.mu.Unlock()
92	return existing, ok
93}
94
95func (r *ring) removeHost(ip net.IP) bool {
96	r.mu.Lock()
97	if r.hosts == nil {
98		r.hosts = make(map[string]*HostInfo)
99	}
100
101	k := ip.String()
102	_, ok := r.hosts[k]
103	if ok {
104		for i, host := range r.hostList {
105			if host.ConnectAddress().Equal(ip) {
106				r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
107				break
108			}
109		}
110	}
111	delete(r.hosts, k)
112	r.mu.Unlock()
113	return ok
114}
115
116type clusterMetadata struct {
117	mu          sync.RWMutex
118	partitioner string
119}
120
121func (c *clusterMetadata) setPartitioner(partitioner string) {
122	c.mu.Lock()
123	defer c.mu.Unlock()
124
125	if c.partitioner != partitioner {
126		// TODO: update other things now
127		c.partitioner = partitioner
128	}
129}
130