1package sd
2
3import (
4	"errors"
5	"io"
6	"testing"
7	"time"
8
9	"github.com/go-kit/kit/endpoint"
10	"github.com/go-kit/kit/log"
11)
12
13func TestEndpointCache(t *testing.T) {
14	var (
15		ca    = make(closer)
16		cb    = make(closer)
17		c     = map[string]io.Closer{"a": ca, "b": cb}
18		f     = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
19		cache = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{})
20	)
21
22	// Populate
23	cache.Update(Event{Instances: []string{"a", "b"}})
24	select {
25	case <-ca:
26		t.Errorf("endpoint a closed, not good")
27	case <-cb:
28		t.Errorf("endpoint b closed, not good")
29	case <-time.After(time.Millisecond):
30		t.Logf("no closures yet, good")
31	}
32	assertEndpointsLen(t, cache, 2)
33
34	// Duplicate, should be no-op
35	cache.Update(Event{Instances: []string{"a", "b"}})
36	select {
37	case <-ca:
38		t.Errorf("endpoint a closed, not good")
39	case <-cb:
40		t.Errorf("endpoint b closed, not good")
41	case <-time.After(time.Millisecond):
42		t.Logf("no closures yet, good")
43	}
44	assertEndpointsLen(t, cache, 2)
45
46	// Error, should continue returning old endpoints
47	cache.Update(Event{Err: errors.New("sd error")})
48	select {
49	case <-ca:
50		t.Errorf("endpoint a closed, not good")
51	case <-cb:
52		t.Errorf("endpoint b closed, not good")
53	case <-time.After(time.Millisecond):
54		t.Logf("no closures yet, good")
55	}
56	assertEndpointsLen(t, cache, 2)
57
58	// Delete b
59	go cache.Update(Event{Instances: []string{"a"}})
60	select {
61	case <-ca:
62		t.Errorf("endpoint a closed, not good")
63	case <-cb:
64		t.Logf("endpoint b closed, good")
65	case <-time.After(time.Second):
66		t.Errorf("didn't close the deleted instance in time")
67	}
68	assertEndpointsLen(t, cache, 1)
69
70	// Delete a
71	go cache.Update(Event{Instances: []string{}})
72	select {
73	// case <-cb: will succeed, as it's closed
74	case <-ca:
75		t.Logf("endpoint a closed, good")
76	case <-time.After(time.Second):
77		t.Errorf("didn't close the deleted instance in time")
78	}
79	assertEndpointsLen(t, cache, 0)
80}
81
82func TestEndpointCacheErrorAndTimeout(t *testing.T) {
83	var (
84		ca      = make(closer)
85		cb      = make(closer)
86		c       = map[string]io.Closer{"a": ca, "b": cb}
87		f       = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, c[instance], nil }
88		timeOut = 100 * time.Millisecond
89		cache   = newEndpointCache(f, log.NewNopLogger(), endpointerOptions{
90			invalidateOnError: true,
91			invalidateTimeout: timeOut,
92		})
93	)
94
95	timeNow := time.Now()
96	cache.timeNow = func() time.Time { return timeNow }
97
98	// Populate
99	cache.Update(Event{Instances: []string{"a"}})
100	select {
101	case <-ca:
102		t.Errorf("endpoint a closed, not good")
103	case <-time.After(time.Millisecond):
104		t.Logf("no closures yet, good")
105	}
106	assertEndpointsLen(t, cache, 1)
107
108	// Send error, keep time still.
109	cache.Update(Event{Err: errors.New("sd error")})
110	select {
111	case <-ca:
112		t.Errorf("endpoint a closed, not good")
113	case <-time.After(time.Millisecond):
114		t.Logf("no closures yet, good")
115	}
116	assertEndpointsLen(t, cache, 1)
117
118	// Move the time, but less than the timeout
119	timeNow = timeNow.Add(timeOut / 2)
120	assertEndpointsLen(t, cache, 1)
121	select {
122	case <-ca:
123		t.Errorf("endpoint a closed, not good")
124	case <-time.After(time.Millisecond):
125		t.Logf("no closures yet, good")
126	}
127
128	// Move the time past the timeout
129	timeNow = timeNow.Add(timeOut)
130	assertEndpointsError(t, cache, "sd error")
131	select {
132	case <-ca:
133		t.Logf("endpoint a closed, good")
134	case <-time.After(time.Millisecond):
135		t.Errorf("didn't close the deleted instance in time")
136	}
137
138	// Send another error
139	cache.Update(Event{Err: errors.New("another sd error")})
140	assertEndpointsError(t, cache, "sd error") // expect original error
141}
142
143func TestBadFactory(t *testing.T) {
144	cache := newEndpointCache(func(string) (endpoint.Endpoint, io.Closer, error) {
145		return nil, nil, errors.New("bad factory")
146	}, log.NewNopLogger(), endpointerOptions{})
147
148	cache.Update(Event{Instances: []string{"foo:1234", "bar:5678"}})
149	assertEndpointsLen(t, cache, 0)
150}
151
152func assertEndpointsLen(t *testing.T, cache *endpointCache, l int) {
153	endpoints, err := cache.Endpoints()
154	if err != nil {
155		t.Errorf("unexpected error %v", err)
156		return
157	}
158	if want, have := l, len(endpoints); want != have {
159		t.Errorf("want %d, have %d", want, have)
160	}
161}
162
163func assertEndpointsError(t *testing.T, cache *endpointCache, wantErr string) {
164	endpoints, err := cache.Endpoints()
165	if err == nil {
166		t.Errorf("expecting error, not good")
167		return
168	}
169	if want, have := wantErr, err.Error(); want != have {
170		t.Errorf("want %s, have %s", want, have)
171		return
172	}
173	if want, have := 0, len(endpoints); want != have {
174		t.Errorf("want %d, have %d", want, have)
175	}
176}
177
178type closer chan struct{}
179
180func (c closer) Close() error { close(c); return nil }
181