1package sd 2 3import ( 4 "errors" 5 "io" 6 "testing" 7 "time" 8 9 "github.com/go-kit/kit/endpoint" 10 "github.com/go-kit/kit/log" 11) 12 13func TestEndpointCache(t *testing.T) { 14 var ( 15 ca = make(closer) 16 cb = make(closer) 17 c = map[string]io.Closer{"a": ca, "b": cb} 18 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } 19 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{}) 20 ) 21 22 // Populate 23 cache.Update(Event{Instances: []string{"a", "b"}}) 24 select { 25 case <-ca: 26 t.Errorf("endpoint a closed, not good") 27 case <-cb: 28 t.Errorf("endpoint b closed, not good") 29 case <-time.After(time.Millisecond): 30 t.Logf("no closures yet, good") 31 } 32 assertEndpointsLen(t, cache, 2) 33 34 // Duplicate, should be no-op 35 cache.Update(Event{Instances: []string{"a", "b"}}) 36 select { 37 case <-ca: 38 t.Errorf("endpoint a closed, not good") 39 case <-cb: 40 t.Errorf("endpoint b closed, not good") 41 case <-time.After(time.Millisecond): 42 t.Logf("no closures yet, good") 43 } 44 assertEndpointsLen(t, cache, 2) 45 46 // Error, should continue returning old endpoints 47 cache.Update(Event{Err: errors.New("sd error")}) 48 select { 49 case <-ca: 50 t.Errorf("endpoint a closed, not good") 51 case <-cb: 52 t.Errorf("endpoint b closed, not good") 53 case <-time.After(time.Millisecond): 54 t.Logf("no closures yet, good") 55 } 56 assertEndpointsLen(t, cache, 2) 57 58 // Delete b 59 go cache.Update(Event{Instances: []string{"a"}}) 60 select { 61 case <-ca: 62 t.Errorf("endpoint a closed, not good") 63 case <-cb: 64 t.Logf("endpoint b closed, good") 65 case <-time.After(time.Second): 66 t.Errorf("didn't close the deleted instance in time") 67 } 68 assertEndpointsLen(t, cache, 1) 69 70 // Delete a 71 go cache.Update(Event{Instances: []string{}}) 72 select { 73 // case <-cb: will succeed, as it's closed 74 case <-ca: 75 t.Logf("endpoint a closed, good") 76 case <-time.After(time.Second): 77 t.Errorf("didn't close the deleted instance in time") 78 } 79 assertEndpointsLen(t, cache, 0) 80} 81 82func TestEndpointCacheErrorAndTimeout(t *testing.T) { 83 var ( 84 ca = make(closer) 85 cb = make(closer) 86 c = map[string]io.Closer{"a": ca, "b": cb} 87 f = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil } 88 timeOut = 100 * time.Millisecond 89 cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{ 90 invalidateOnError: true, 91 invalidateTimeout: timeOut, 92 }) 93 ) 94 95 timeNow := time.Now() 96 cache.timeNow = func() time.Time { return timeNow } 97 98 // Populate 99 cache.Update(Event{Instances: []string{"a"}}) 100 select { 101 case <-ca: 102 t.Errorf("endpoint a closed, not good") 103 case <-time.After(time.Millisecond): 104 t.Logf("no closures yet, good") 105 } 106 assertEndpointsLen(t, cache, 1) 107 108 // Send error, keep time still. 109 cache.Update(Event{Err: errors.New("sd error")}) 110 select { 111 case <-ca: 112 t.Errorf("endpoint a closed, not good") 113 case <-time.After(time.Millisecond): 114 t.Logf("no closures yet, good") 115 } 116 assertEndpointsLen(t, cache, 1) 117 118 // Move the time, but less than the timeout 119 timeNow = timeNow.Add(timeOut / 2) 120 assertEndpointsLen(t, cache, 1) 121 select { 122 case <-ca: 123 t.Errorf("endpoint a closed, not good") 124 case <-time.After(time.Millisecond): 125 t.Logf("no closures yet, good") 126 } 127 128 // Move the time past the timeout 129 timeNow = timeNow.Add(timeOut) 130 assertEndpointsError(t, cache, "sd error") 131 select { 132 case <-ca: 133 t.Logf("endpoint a closed, good") 134 case <-time.After(time.Millisecond): 135 t.Errorf("didn't close the deleted instance in time") 136 } 137 138 // Send another error 139 cache.Update(Event{Err: errors.New("another sd error")}) 140 assertEndpointsError(t, cache, "sd error") // expect original error 141} 142 143func TestBadFactory(t *testing.T) { 144 cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) { 145 return nil, nil, errors.New("bad factory") 146 }, log.NewNopLogger(), endpointerOptions{}) 147 148 cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}}) 149 assertEndpointsLen(t, cache, 0) 150} 151 152func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) { 153 endpoints, err := cache.Endpoints() 154 if err != nil { 155 t.Errorf("unexpected error %v", err) 156 return 157 } 158 if want, have := l, len(endpoints); want != have { 159 t.Errorf("want %d, have %d", want, have) 160 } 161} 162 163func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) { 164 endpoints, err := cache.Endpoints() 165 if err == nil { 166 t.Errorf("expecting error, not good") 167 return 168 } 169 if want, have := wantErr, err.Error(); want != have { 170 t.Errorf("want %s, have %s", want, have) 171 return 172 } 173 if want, have := 0, len(endpoints); want != have { 174 t.Errorf("want %d, have %d", want, have) 175 } 176} 177 178type closer chan struct{} 179 180func (c closer) Close() error { close(c); return nil } 181