1// +build ccm, ignore 2 3package gocql 4 5import ( 6 "log" 7 "testing" 8 "time" 9 10 "github.com/gocql/gocql/internal/ccm" 11) 12 13func TestEventDiscovery(t *testing.T) { 14 t.Skip("FLAKE skipping") 15 if err := ccm.AllUp(); err != nil { 16 t.Fatal(err) 17 } 18 19 session := createSession(t) 20 defer session.Close() 21 22 status, err := ccm.Status() 23 if err != nil { 24 t.Fatal(err) 25 } 26 t.Logf("status=%+v\n", status) 27 28 session.pool.mu.RLock() 29 poolHosts := session.pool.hostConnPools // TODO: replace with session.ring 30 t.Logf("poolhosts=%+v\n", poolHosts) 31 // check we discovered all the nodes in the ring 32 for _, host := range status { 33 if _, ok := poolHosts[host.Addr]; !ok { 34 t.Errorf("did not discover %q", host.Addr) 35 } 36 } 37 session.pool.mu.RUnlock() 38 if t.Failed() { 39 t.FailNow() 40 } 41} 42 43func TestEventNodeDownControl(t *testing.T) { 44 t.Skip("FLAKE skipping") 45 const targetNode = "node1" 46 if err := ccm.AllUp(); err != nil { 47 t.Fatal(err) 48 } 49 50 status, err := ccm.Status() 51 if err != nil { 52 t.Fatal(err) 53 } 54 55 cluster := createCluster() 56 cluster.Hosts = []string{status[targetNode].Addr} 57 session := createSessionFromCluster(cluster, t) 58 defer session.Close() 59 60 t.Log("marking " + targetNode + " as down") 61 if err := ccm.NodeDown(targetNode); err != nil { 62 t.Fatal(err) 63 } 64 65 t.Logf("status=%+v\n", status) 66 t.Logf("marking node %q down: %v\n", targetNode, status[targetNode]) 67 68 time.Sleep(5 * time.Second) 69 70 session.pool.mu.RLock() 71 72 poolHosts := session.pool.hostConnPools 73 node := status[targetNode] 74 t.Logf("poolhosts=%+v\n", poolHosts) 75 76 if _, ok := poolHosts[node.Addr]; ok { 77 session.pool.mu.RUnlock() 78 t.Fatal("node not removed after remove event") 79 } 80 session.pool.mu.RUnlock() 81 82 host := session.ring.getHost(node.Addr) 83 if host == nil { 84 t.Fatal("node not in metadata ring") 85 } else if host.IsUp() { 86 t.Fatalf("not not marked as down after event in metadata: %v", host) 87 } 88} 89 90func TestEventNodeDown(t *testing.T) { 91 t.Skip("FLAKE skipping") 92 const targetNode = "node3" 93 if err := ccm.AllUp(); err != nil { 94 t.Fatal(err) 95 } 96 97 session := createSession(t) 98 defer session.Close() 99 100 if err := ccm.NodeDown(targetNode); err != nil { 101 t.Fatal(err) 102 } 103 104 status, err := ccm.Status() 105 if err != nil { 106 t.Fatal(err) 107 } 108 t.Logf("status=%+v\n", status) 109 t.Logf("marking node %q down: %v\n", targetNode, status[targetNode]) 110 111 time.Sleep(5 * time.Second) 112 113 session.pool.mu.RLock() 114 defer session.pool.mu.RUnlock() 115 116 poolHosts := session.pool.hostConnPools 117 node := status[targetNode] 118 t.Logf("poolhosts=%+v\n", poolHosts) 119 120 if _, ok := poolHosts[node.Addr]; ok { 121 t.Fatal("node not removed after remove event") 122 } 123 124 host := session.ring.getHost(node.Addr) 125 if host == nil { 126 t.Fatal("node not in metadata ring") 127 } else if host.IsUp() { 128 t.Fatalf("not not marked as down after event in metadata: %v", host) 129 } 130} 131 132func TestEventNodeUp(t *testing.T) { 133 t.Skip("FLAKE skipping") 134 if err := ccm.AllUp(); err != nil { 135 t.Fatal(err) 136 } 137 138 status, err := ccm.Status() 139 if err != nil { 140 t.Fatal(err) 141 } 142 log.Printf("status=%+v\n", status) 143 144 session := createSession(t) 145 defer session.Close() 146 147 const targetNode = "node2" 148 node := status[targetNode] 149 150 _, ok := session.pool.getPool(node.Addr) 151 if !ok { 152 session.pool.mu.RLock() 153 t.Errorf("target pool not in connection pool: addr=%q pools=%v", status[targetNode].Addr, session.pool.hostConnPools) 154 session.pool.mu.RUnlock() 155 t.FailNow() 156 } 157 158 if err := ccm.NodeDown(targetNode); err != nil { 159 t.Fatal(err) 160 } 161 162 time.Sleep(5 * time.Second) 163 164 _, ok = session.pool.getPool(node.Addr) 165 if ok { 166 t.Fatal("node not removed after remove event") 167 } 168 169 if err := ccm.NodeUp(targetNode); err != nil { 170 t.Fatal(err) 171 } 172 173 // cassandra < 2.2 needs 10 seconds to start up the binary service 174 time.Sleep(15 * time.Second) 175 176 _, ok = session.pool.getPool(node.Addr) 177 if !ok { 178 t.Fatal("node not added after node added event") 179 } 180 181 host := session.ring.getHost(node.Addr) 182 if host == nil { 183 t.Fatal("node not in metadata ring") 184 } else if !host.IsUp() { 185 t.Fatalf("not not marked as UP after event in metadata: addr=%q host=%p: %v", node.Addr, host, host) 186 } 187} 188 189func TestEventFilter(t *testing.T) { 190 t.Skip("FLAKE skipping") 191 if err := ccm.AllUp(); err != nil { 192 t.Fatal(err) 193 } 194 195 status, err := ccm.Status() 196 if err != nil { 197 t.Fatal(err) 198 } 199 log.Printf("status=%+v\n", status) 200 201 cluster := createCluster() 202 cluster.HostFilter = WhiteListHostFilter(status["node1"].Addr) 203 session := createSessionFromCluster(cluster, t) 204 defer session.Close() 205 206 if _, ok := session.pool.getPool(status["node1"].Addr); !ok { 207 t.Errorf("should have %v in pool but dont", "node1") 208 } 209 210 for _, host := range [...]string{"node2", "node3"} { 211 _, ok := session.pool.getPool(status[host].Addr) 212 if ok { 213 t.Errorf("should not have %v in pool", host) 214 } 215 } 216 217 if t.Failed() { 218 t.FailNow() 219 } 220 221 if err := ccm.NodeDown("node2"); err != nil { 222 t.Fatal(err) 223 } 224 225 time.Sleep(5 * time.Second) 226 227 if err := ccm.NodeUp("node2"); err != nil { 228 t.Fatal(err) 229 } 230 231 time.Sleep(15 * time.Second) 232 for _, host := range [...]string{"node2", "node3"} { 233 _, ok := session.pool.getPool(status[host].Addr) 234 if ok { 235 t.Errorf("should not have %v in pool", host) 236 } 237 } 238 239 if t.Failed() { 240 t.FailNow() 241 } 242 243} 244 245func TestEventDownQueryable(t *testing.T) { 246 t.Skip("FLAKE skipping") 247 if err := ccm.AllUp(); err != nil { 248 t.Fatal(err) 249 } 250 251 status, err := ccm.Status() 252 if err != nil { 253 t.Fatal(err) 254 } 255 log.Printf("status=%+v\n", status) 256 257 const targetNode = "node1" 258 259 addr := status[targetNode].Addr 260 261 cluster := createCluster() 262 cluster.Hosts = []string{addr} 263 cluster.HostFilter = WhiteListHostFilter(addr) 264 session := createSessionFromCluster(cluster, t) 265 defer session.Close() 266 267 if pool, ok := session.pool.getPool(addr); !ok { 268 t.Fatalf("should have %v in pool but dont", addr) 269 } else if !pool.host.IsUp() { 270 t.Fatalf("host is not up %v", pool.host) 271 } 272 273 if err := ccm.NodeDown(targetNode); err != nil { 274 t.Fatal(err) 275 } 276 277 time.Sleep(5 * time.Second) 278 279 if err := ccm.NodeUp(targetNode); err != nil { 280 t.Fatal(err) 281 } 282 283 time.Sleep(15 * time.Second) 284 285 if pool, ok := session.pool.getPool(addr); !ok { 286 t.Fatalf("should have %v in pool but dont", addr) 287 } else if !pool.host.IsUp() { 288 t.Fatalf("host is not up %v", pool.host) 289 } 290 291 var rows int 292 if err := session.Query("SELECT COUNT(*) FROM system.local").Scan(&rows); err != nil { 293 t.Fatal(err) 294 } else if rows != 1 { 295 t.Fatalf("expected to get 1 row got %d", rows) 296 } 297} 298