1// +build ccm, ignore
2
3package gocql
4
5import (
6	"log"
7	"testing"
8	"time"
9
10	"github.com/gocql/gocql/internal/ccm"
11)
12
13func TestEventDiscovery(t *testing.T) {
14	t.Skip("FLAKE skipping")
15	if err := ccm.AllUp(); err != nil {
16		t.Fatal(err)
17	}
18
19	session := createSession(t)
20	defer session.Close()
21
22	status, err := ccm.Status()
23	if err != nil {
24		t.Fatal(err)
25	}
26	t.Logf("status=%+v\n", status)
27
28	session.pool.mu.RLock()
29	poolHosts := session.pool.hostConnPools // TODO: replace with session.ring
30	t.Logf("poolhosts=%+v\n", poolHosts)
31	// check we discovered all the nodes in the ring
32	for _, host := range status {
33		if _, ok := poolHosts[host.Addr]; !ok {
34			t.Errorf("did not discover %q", host.Addr)
35		}
36	}
37	session.pool.mu.RUnlock()
38	if t.Failed() {
39		t.FailNow()
40	}
41}
42
43func TestEventNodeDownControl(t *testing.T) {
44	t.Skip("FLAKE skipping")
45	const targetNode = "node1"
46	if err := ccm.AllUp(); err != nil {
47		t.Fatal(err)
48	}
49
50	status, err := ccm.Status()
51	if err != nil {
52		t.Fatal(err)
53	}
54
55	cluster := createCluster()
56	cluster.Hosts = []string{status[targetNode].Addr}
57	session := createSessionFromCluster(cluster, t)
58	defer session.Close()
59
60	t.Log("marking " + targetNode + " as down")
61	if err := ccm.NodeDown(targetNode); err != nil {
62		t.Fatal(err)
63	}
64
65	t.Logf("status=%+v\n", status)
66	t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
67
68	time.Sleep(5 * time.Second)
69
70	session.pool.mu.RLock()
71
72	poolHosts := session.pool.hostConnPools
73	node := status[targetNode]
74	t.Logf("poolhosts=%+v\n", poolHosts)
75
76	if _, ok := poolHosts[node.Addr]; ok {
77		session.pool.mu.RUnlock()
78		t.Fatal("node not removed after remove event")
79	}
80	session.pool.mu.RUnlock()
81
82	host := session.ring.getHost(node.Addr)
83	if host == nil {
84		t.Fatal("node not in metadata ring")
85	} else if host.IsUp() {
86		t.Fatalf("not not marked as down after event in metadata: %v", host)
87	}
88}
89
90func TestEventNodeDown(t *testing.T) {
91	t.Skip("FLAKE skipping")
92	const targetNode = "node3"
93	if err := ccm.AllUp(); err != nil {
94		t.Fatal(err)
95	}
96
97	session := createSession(t)
98	defer session.Close()
99
100	if err := ccm.NodeDown(targetNode); err != nil {
101		t.Fatal(err)
102	}
103
104	status, err := ccm.Status()
105	if err != nil {
106		t.Fatal(err)
107	}
108	t.Logf("status=%+v\n", status)
109	t.Logf("marking node %q down: %v\n", targetNode, status[targetNode])
110
111	time.Sleep(5 * time.Second)
112
113	session.pool.mu.RLock()
114	defer session.pool.mu.RUnlock()
115
116	poolHosts := session.pool.hostConnPools
117	node := status[targetNode]
118	t.Logf("poolhosts=%+v\n", poolHosts)
119
120	if _, ok := poolHosts[node.Addr]; ok {
121		t.Fatal("node not removed after remove event")
122	}
123
124	host := session.ring.getHost(node.Addr)
125	if host == nil {
126		t.Fatal("node not in metadata ring")
127	} else if host.IsUp() {
128		t.Fatalf("not not marked as down after event in metadata: %v", host)
129	}
130}
131
132func TestEventNodeUp(t *testing.T) {
133	t.Skip("FLAKE skipping")
134	if err := ccm.AllUp(); err != nil {
135		t.Fatal(err)
136	}
137
138	status, err := ccm.Status()
139	if err != nil {
140		t.Fatal(err)
141	}
142	log.Printf("status=%+v\n", status)
143
144	session := createSession(t)
145	defer session.Close()
146
147	const targetNode = "node2"
148	node := status[targetNode]
149
150	_, ok := session.pool.getPool(node.Addr)
151	if !ok {
152		session.pool.mu.RLock()
153		t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools)
154		session.pool.mu.RUnlock()
155		t.FailNow()
156	}
157
158	if err := ccm.NodeDown(targetNode); err != nil {
159		t.Fatal(err)
160	}
161
162	time.Sleep(5 * time.Second)
163
164	_, ok = session.pool.getPool(node.Addr)
165	if ok {
166		t.Fatal("node not removed after remove event")
167	}
168
169	if err := ccm.NodeUp(targetNode); err != nil {
170		t.Fatal(err)
171	}
172
173	// cassandra < 2.2 needs 10 seconds to start up the binary service
174	time.Sleep(15 * time.Second)
175
176	_, ok = session.pool.getPool(node.Addr)
177	if !ok {
178		t.Fatal("node not added after node added event")
179	}
180
181	host := session.ring.getHost(node.Addr)
182	if host == nil {
183		t.Fatal("node not in metadata ring")
184	} else if !host.IsUp() {
185		t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host)
186	}
187}
188
189func TestEventFilter(t *testing.T) {
190	t.Skip("FLAKE skipping")
191	if err := ccm.AllUp(); err != nil {
192		t.Fatal(err)
193	}
194
195	status, err := ccm.Status()
196	if err != nil {
197		t.Fatal(err)
198	}
199	log.Printf("status=%+v\n", status)
200
201	cluster := createCluster()
202	cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr)
203	session := createSessionFromCluster(cluster, t)
204	defer session.Close()
205
206	if _, ok := session.pool.getPool(status["node1"].Addr); !ok {
207		t.Errorf("should have %v in pool but dont", "node1")
208	}
209
210	for _, host := range [...]string{"node2", "node3"} {
211		_, ok := session.pool.getPool(status[host].Addr)
212		if ok {
213			t.Errorf("should not have %v in pool", host)
214		}
215	}
216
217	if t.Failed() {
218		t.FailNow()
219	}
220
221	if err := ccm.NodeDown("node2"); err != nil {
222		t.Fatal(err)
223	}
224
225	time.Sleep(5 * time.Second)
226
227	if err := ccm.NodeUp("node2"); err != nil {
228		t.Fatal(err)
229	}
230
231	time.Sleep(15 * time.Second)
232	for _, host := range [...]string{"node2", "node3"} {
233		_, ok := session.pool.getPool(status[host].Addr)
234		if ok {
235			t.Errorf("should not have %v in pool", host)
236		}
237	}
238
239	if t.Failed() {
240		t.FailNow()
241	}
242
243}
244
245func TestEventDownQueryable(t *testing.T) {
246	t.Skip("FLAKE skipping")
247	if err := ccm.AllUp(); err != nil {
248		t.Fatal(err)
249	}
250
251	status, err := ccm.Status()
252	if err != nil {
253		t.Fatal(err)
254	}
255	log.Printf("status=%+v\n", status)
256
257	const targetNode = "node1"
258
259	addr := status[targetNode].Addr
260
261	cluster := createCluster()
262	cluster.Hosts = []string{addr}
263	cluster.HostFilter = WhiteListHostFilter(addr)
264	session := createSessionFromCluster(cluster, t)
265	defer session.Close()
266
267	if pool, ok := session.pool.getPool(addr); !ok {
268		t.Fatalf("should have %v in pool but dont", addr)
269	} else if !pool.host.IsUp() {
270		t.Fatalf("host is not up %v", pool.host)
271	}
272
273	if err := ccm.NodeDown(targetNode); err != nil {
274		t.Fatal(err)
275	}
276
277	time.Sleep(5 * time.Second)
278
279	if err := ccm.NodeUp(targetNode); err != nil {
280		t.Fatal(err)
281	}
282
283	time.Sleep(15 * time.Second)
284
285	if pool, ok := session.pool.getPool(addr); !ok {
286		t.Fatalf("should have %v in pool but dont", addr)
287	} else if !pool.host.IsUp() {
288		t.Fatalf("host is not up %v", pool.host)
289	}
290
291	var rows int
292	if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil {
293		t.Fatal(err)
294	} else if rows != 1 {
295		t.Fatalf("expected to get 1 row got %d", rows)
296	}
297}
298