1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package mvcc 16 17import ( 18 "bytes" 19 "fmt" 20 "os" 21 "reflect" 22 "testing" 23 "time" 24 25 "github.com/coreos/etcd/lease" 26 "github.com/coreos/etcd/mvcc/backend" 27 "github.com/coreos/etcd/mvcc/mvccpb" 28) 29 30// TestWatcherWatchID tests that each watcher provides unique watchID, 31// and the watched event attaches the correct watchID. 32func TestWatcherWatchID(t *testing.T) { 33 b, tmpPath := backend.NewDefaultTmpBackend() 34 s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) 35 defer cleanup(s, b, tmpPath) 36 37 w := s.NewWatchStream() 38 defer w.Close() 39 40 idm := make(map[WatchID]struct{}) 41 42 for i := 0; i < 10; i++ { 43 id := w.Watch([]byte("foo"), nil, 0) 44 if _, ok := idm[id]; ok { 45 t.Errorf("#%d: id %d exists", i, id) 46 } 47 idm[id] = struct{}{} 48 49 s.Put([]byte("foo"), []byte("bar"), lease.NoLease) 50 51 resp := <-w.Chan() 52 if resp.WatchID != id { 53 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) 54 } 55 56 if err := w.Cancel(id); err != nil { 57 t.Error(err) 58 } 59 } 60 61 s.Put([]byte("foo2"), []byte("bar"), lease.NoLease) 62 63 // unsynced watchers 64 for i := 10; i < 20; i++ { 65 id := w.Watch([]byte("foo2"), nil, 1) 66 if _, ok := idm[id]; ok { 67 t.Errorf("#%d: id %d exists", i, id) 68 } 69 idm[id] = struct{}{} 70 71 resp := <-w.Chan() 72 if resp.WatchID != id { 73 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) 74 } 75 76 if err := w.Cancel(id); err != nil { 77 t.Error(err) 78 } 79 } 80} 81 82// TestWatcherWatchPrefix tests if Watch operation correctly watches 83// and returns events with matching prefixes. 84func TestWatcherWatchPrefix(t *testing.T) { 85 b, tmpPath := backend.NewDefaultTmpBackend() 86 s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) 87 defer cleanup(s, b, tmpPath) 88 89 w := s.NewWatchStream() 90 defer w.Close() 91 92 idm := make(map[WatchID]struct{}) 93 94 val := []byte("bar") 95 keyWatch, keyEnd, keyPut := []byte("foo"), []byte("fop"), []byte("foobar") 96 97 for i := 0; i < 10; i++ { 98 id := w.Watch(keyWatch, keyEnd, 0) 99 if _, ok := idm[id]; ok { 100 t.Errorf("#%d: unexpected duplicated id %x", i, id) 101 } 102 idm[id] = struct{}{} 103 104 s.Put(keyPut, val, lease.NoLease) 105 106 resp := <-w.Chan() 107 if resp.WatchID != id { 108 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) 109 } 110 111 if err := w.Cancel(id); err != nil { 112 t.Errorf("#%d: unexpected cancel error %v", i, err) 113 } 114 115 if len(resp.Events) != 1 { 116 t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) 117 } 118 if len(resp.Events) == 1 { 119 if !bytes.Equal(resp.Events[0].Kv.Key, keyPut) { 120 t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut) 121 } 122 } 123 } 124 125 keyWatch1, keyEnd1, keyPut1 := []byte("foo1"), []byte("foo2"), []byte("foo1bar") 126 s.Put(keyPut1, val, lease.NoLease) 127 128 // unsynced watchers 129 for i := 10; i < 15; i++ { 130 id := w.Watch(keyWatch1, keyEnd1, 1) 131 if _, ok := idm[id]; ok { 132 t.Errorf("#%d: id %d exists", i, id) 133 } 134 idm[id] = struct{}{} 135 136 resp := <-w.Chan() 137 if resp.WatchID != id { 138 t.Errorf("#%d: watch id in event = %d, want %d", i, resp.WatchID, id) 139 } 140 141 if err := w.Cancel(id); err != nil { 142 t.Error(err) 143 } 144 145 if len(resp.Events) != 1 { 146 t.Errorf("#%d: len(resp.Events) got = %d, want = 1", i, len(resp.Events)) 147 } 148 if len(resp.Events) == 1 { 149 if !bytes.Equal(resp.Events[0].Kv.Key, keyPut1) { 150 t.Errorf("#%d: resp.Events got = %s, want = %s", i, resp.Events[0].Kv.Key, keyPut1) 151 } 152 } 153 } 154} 155 156// TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range 157// does not create watcher, which panics when canceling in range tree. 158func TestWatcherWatchWrongRange(t *testing.T) { 159 b, tmpPath := backend.NewDefaultTmpBackend() 160 s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) 161 defer cleanup(s, b, tmpPath) 162 163 w := s.NewWatchStream() 164 defer w.Close() 165 166 if id := w.Watch([]byte("foa"), []byte("foa"), 1); id != -1 { 167 t.Fatalf("key == end range given; id expected -1, got %d", id) 168 } 169 if id := w.Watch([]byte("fob"), []byte("foa"), 1); id != -1 { 170 t.Fatalf("key > end range given; id expected -1, got %d", id) 171 } 172 // watch request with 'WithFromKey' has empty-byte range end 173 if id := w.Watch([]byte("foo"), []byte{}, 1); id != 0 { 174 t.Fatalf("\x00 is range given; id expected 0, got %d", id) 175 } 176} 177 178func TestWatchDeleteRange(t *testing.T) { 179 b, tmpPath := backend.NewDefaultTmpBackend() 180 s := newWatchableStore(b, &lease.FakeLessor{}, nil) 181 182 defer func() { 183 s.store.Close() 184 os.Remove(tmpPath) 185 }() 186 187 testKeyPrefix := []byte("foo") 188 189 for i := 0; i < 3; i++ { 190 s.Put([]byte(fmt.Sprintf("%s_%d", testKeyPrefix, i)), []byte("bar"), lease.NoLease) 191 } 192 193 w := s.NewWatchStream() 194 from, to := []byte(testKeyPrefix), []byte(fmt.Sprintf("%s_%d", testKeyPrefix, 99)) 195 w.Watch(from, to, 0) 196 197 s.DeleteRange(from, to) 198 199 we := []mvccpb.Event{ 200 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_0"), ModRevision: 5}}, 201 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_1"), ModRevision: 5}}, 202 {Type: mvccpb.DELETE, Kv: &mvccpb.KeyValue{Key: []byte("foo_2"), ModRevision: 5}}, 203 } 204 205 select { 206 case r := <-w.Chan(): 207 if !reflect.DeepEqual(r.Events, we) { 208 t.Errorf("event = %v, want %v", r.Events, we) 209 } 210 case <-time.After(10 * time.Second): 211 t.Fatal("failed to receive event after 10 seconds!") 212 } 213} 214 215// TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher 216// with given id inside watchStream. 217func TestWatchStreamCancelWatcherByID(t *testing.T) { 218 b, tmpPath := backend.NewDefaultTmpBackend() 219 s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) 220 defer cleanup(s, b, tmpPath) 221 222 w := s.NewWatchStream() 223 defer w.Close() 224 225 id := w.Watch([]byte("foo"), nil, 0) 226 227 tests := []struct { 228 cancelID WatchID 229 werr error 230 }{ 231 // no error should be returned when cancel the created watcher. 232 {id, nil}, 233 // not exist error should be returned when cancel again. 234 {id, ErrWatcherNotExist}, 235 // not exist error should be returned when cancel a bad id. 236 {id + 1, ErrWatcherNotExist}, 237 } 238 239 for i, tt := range tests { 240 gerr := w.Cancel(tt.cancelID) 241 242 if gerr != tt.werr { 243 t.Errorf("#%d: err = %v, want %v", i, gerr, tt.werr) 244 } 245 } 246 247 if l := len(w.(*watchStream).cancels); l != 0 { 248 t.Errorf("cancels = %d, want 0", l) 249 } 250} 251 252// TestWatcherRequestProgress ensures synced watcher can correctly 253// report its correct progress. 254func TestWatcherRequestProgress(t *testing.T) { 255 b, tmpPath := backend.NewDefaultTmpBackend() 256 257 // manually create watchableStore instead of newWatchableStore 258 // because newWatchableStore automatically calls syncWatchers 259 // method to sync watchers in unsynced map. We want to keep watchers 260 // in unsynced to test if syncWatchers works as expected. 261 s := &watchableStore{ 262 store: NewStore(b, &lease.FakeLessor{}, nil), 263 unsynced: newWatcherGroup(), 264 synced: newWatcherGroup(), 265 } 266 267 defer func() { 268 s.store.Close() 269 os.Remove(tmpPath) 270 }() 271 272 testKey := []byte("foo") 273 notTestKey := []byte("bad") 274 testValue := []byte("bar") 275 s.Put(testKey, testValue, lease.NoLease) 276 277 w := s.NewWatchStream() 278 279 badID := WatchID(1000) 280 w.RequestProgress(badID) 281 select { 282 case resp := <-w.Chan(): 283 t.Fatalf("unexpected %+v", resp) 284 default: 285 } 286 287 id := w.Watch(notTestKey, nil, 1) 288 w.RequestProgress(id) 289 select { 290 case resp := <-w.Chan(): 291 t.Fatalf("unexpected %+v", resp) 292 default: 293 } 294 295 s.syncWatchers() 296 297 w.RequestProgress(id) 298 wrs := WatchResponse{WatchID: 0, Revision: 2} 299 select { 300 case resp := <-w.Chan(): 301 if !reflect.DeepEqual(resp, wrs) { 302 t.Fatalf("got %+v, expect %+v", resp, wrs) 303 } 304 case <-time.After(time.Second): 305 t.Fatal("failed to receive progress") 306 } 307} 308 309func TestWatcherWatchWithFilter(t *testing.T) { 310 b, tmpPath := backend.NewDefaultTmpBackend() 311 s := WatchableKV(newWatchableStore(b, &lease.FakeLessor{}, nil)) 312 defer cleanup(s, b, tmpPath) 313 314 w := s.NewWatchStream() 315 defer w.Close() 316 317 filterPut := func(e mvccpb.Event) bool { 318 return e.Type == mvccpb.PUT 319 } 320 321 w.Watch([]byte("foo"), nil, 0, filterPut) 322 done := make(chan struct{}) 323 324 go func() { 325 <-w.Chan() 326 done <- struct{}{} 327 }() 328 329 s.Put([]byte("foo"), []byte("bar"), 0) 330 331 select { 332 case <-done: 333 t.Fatal("failed to filter put request") 334 case <-time.After(100 * time.Millisecond): 335 } 336 337 s.DeleteRange([]byte("foo"), nil) 338 339 select { 340 case <-done: 341 case <-time.After(100 * time.Millisecond): 342 t.Fatal("failed to receive delete request") 343 } 344} 345