1package gocql
2
3import (
4	"fmt"
5	"net"
6	"sync"
7	"sync/atomic"
8)
9
10type ring struct {
11	// endpoints are the set of endpoints which the driver will attempt to connect
12	// to in the case it can not reach any of its hosts. They are also used to boot
13	// strap the initial connection.
14	endpoints []*HostInfo
15
16	// hosts are the set of all hosts in the cassandra ring that we know of
17	mu    sync.RWMutex
18	hosts map[string]*HostInfo
19
20	hostList []*HostInfo
21	pos      uint32
22
23	// TODO: we should store the ring metadata here also.
24}
25
26func (r *ring) rrHost() *HostInfo {
27	// TODO: should we filter hosts that get used here? These hosts will be used
28	// for the control connection, should we also provide an iterator?
29	r.mu.RLock()
30	defer r.mu.RUnlock()
31	if len(r.hostList) == 0 {
32		return nil
33	}
34
35	pos := int(atomic.AddUint32(&r.pos, 1) - 1)
36	return r.hostList[pos%len(r.hostList)]
37}
38
39func (r *ring) getHost(ip net.IP) *HostInfo {
40	r.mu.RLock()
41	host := r.hosts[ip.String()]
42	r.mu.RUnlock()
43	return host
44}
45
46func (r *ring) allHosts() []*HostInfo {
47	r.mu.RLock()
48	hosts := make([]*HostInfo, 0, len(r.hosts))
49	for _, host := range r.hosts {
50		hosts = append(hosts, host)
51	}
52	r.mu.RUnlock()
53	return hosts
54}
55
56func (r *ring) currentHosts() map[string]*HostInfo {
57	r.mu.RLock()
58	hosts := make(map[string]*HostInfo, len(r.hosts))
59	for k, v := range r.hosts {
60		hosts[k] = v
61	}
62	r.mu.RUnlock()
63	return hosts
64}
65
66func (r *ring) addHost(host *HostInfo) bool {
67	// TODO(zariel): key all host info by HostID instead of
68	// ip addresses
69	if host.invalidConnectAddr() {
70		panic(fmt.Sprintf("invalid host: %v", host))
71	}
72	ip := host.ConnectAddress().String()
73
74	r.mu.Lock()
75	if r.hosts == nil {
76		r.hosts = make(map[string]*HostInfo)
77	}
78
79	_, ok := r.hosts[ip]
80	if !ok {
81		r.hostList = append(r.hostList, host)
82	}
83
84	r.hosts[ip] = host
85	r.mu.Unlock()
86	return ok
87}
88
89func (r *ring) addOrUpdate(host *HostInfo) *HostInfo {
90	if existingHost, ok := r.addHostIfMissing(host); ok {
91		existingHost.update(host)
92		host = existingHost
93	}
94	return host
95}
96
97func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) {
98	if host.invalidConnectAddr() {
99		panic(fmt.Sprintf("invalid host: %v", host))
100	}
101	ip := host.ConnectAddress().String()
102
103	r.mu.Lock()
104	if r.hosts == nil {
105		r.hosts = make(map[string]*HostInfo)
106	}
107
108	existing, ok := r.hosts[ip]
109	if !ok {
110		r.hosts[ip] = host
111		existing = host
112		r.hostList = append(r.hostList, host)
113	}
114	r.mu.Unlock()
115	return existing, ok
116}
117
118func (r *ring) removeHost(ip net.IP) bool {
119	r.mu.Lock()
120	if r.hosts == nil {
121		r.hosts = make(map[string]*HostInfo)
122	}
123
124	k := ip.String()
125	_, ok := r.hosts[k]
126	if ok {
127		for i, host := range r.hostList {
128			if host.ConnectAddress().Equal(ip) {
129				r.hostList = append(r.hostList[:i], r.hostList[i+1:]...)
130				break
131			}
132		}
133	}
134	delete(r.hosts, k)
135	r.mu.Unlock()
136	return ok
137}
138
139type clusterMetadata struct {
140	mu          sync.RWMutex
141	partitioner string
142}
143
144func (c *clusterMetadata) setPartitioner(partitioner string) {
145	c.mu.Lock()
146	defer c.mu.Unlock()
147
148	if c.partitioner != partitioner {
149		// TODO: update other things now
150		c.partitioner = partitioner
151	}
152}
153