1package retention_test
2
3import (
4	"bytes"
5	"fmt"
6	"reflect"
7	"sync"
8	"testing"
9	"time"
10
11	"github.com/influxdata/influxdb/internal"
12	"github.com/influxdata/influxdb/logger"
13	"github.com/influxdata/influxdb/services/meta"
14	"github.com/influxdata/influxdb/services/retention"
15	"github.com/influxdata/influxdb/toml"
16)
17
18func TestService_OpenDisabled(t *testing.T) {
19	// Opening a disabled service should be a no-op.
20	c := retention.NewConfig()
21	c.Enabled = false
22	s := NewService(c)
23
24	if err := s.Open(); err != nil {
25		t.Fatal(err)
26	}
27
28	if s.LogBuf.String() != "" {
29		t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String())
30	}
31}
32
33func TestService_OpenClose(t *testing.T) {
34	// Opening a disabled service should be a no-op.
35	s := NewService(retention.NewConfig())
36
37	if err := s.Open(); err != nil {
38		t.Fatal(err)
39	}
40
41	if s.LogBuf.String() == "" {
42		t.Fatal("service didn't log anything on open")
43	}
44
45	// Reopening is a no-op
46	if err := s.Open(); err != nil {
47		t.Fatal(err)
48	}
49
50	if err := s.Close(); err != nil {
51		t.Fatal(err)
52	}
53
54	// Re-closing is a no-op
55	if err := s.Close(); err != nil {
56		t.Fatal(err)
57	}
58}
59
60func TestService_CheckShards(t *testing.T) {
61	now := time.Now()
62	// Account for any time difference that could cause some of the logic in
63	// this test to fail due to a race condition. If we are at the very end of
64	// the hour, we can choose a time interval based on one "now" time and then
65	// run the retention service in the next hour. If we're in one of those
66	// situations, wait 100 milliseconds until we're in the next hour.
67	if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) {
68		time.Sleep(100 * time.Millisecond)
69	}
70
71	data := []meta.DatabaseInfo{
72		{
73			Name: "db0",
74			DefaultRetentionPolicy: "rp0",
75			RetentionPolicies: []meta.RetentionPolicyInfo{
76				{
77					Name:               "rp0",
78					ReplicaN:           1,
79					Duration:           time.Hour,
80					ShardGroupDuration: time.Hour,
81					ShardGroups: []meta.ShardGroupInfo{
82						{
83							ID:        1,
84							StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour),
85							EndTime:   now.Truncate(time.Hour).Add(-1 * time.Hour),
86							Shards: []meta.ShardInfo{
87								{ID: 2},
88								{ID: 3},
89							},
90						},
91						{
92							ID:        4,
93							StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
94							EndTime:   now.Truncate(time.Hour),
95							Shards: []meta.ShardInfo{
96								{ID: 5},
97								{ID: 6},
98							},
99						},
100						{
101							ID:        7,
102							StartTime: now.Truncate(time.Hour),
103							EndTime:   now.Truncate(time.Hour).Add(time.Hour),
104							Shards: []meta.ShardInfo{
105								{ID: 8},
106								{ID: 9},
107							},
108						},
109					},
110				},
111			},
112		},
113	}
114
115	config := retention.NewConfig()
116	config.CheckInterval = toml.Duration(10 * time.Millisecond)
117	s := NewService(config)
118	s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
119		return data
120	}
121
122	done := make(chan struct{})
123	deletedShardGroups := make(map[string]struct{})
124	s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error {
125		for _, dbi := range data {
126			if dbi.Name == database {
127				for _, rpi := range dbi.RetentionPolicies {
128					if rpi.Name == policy {
129						for i, sg := range rpi.ShardGroups {
130							if sg.ID == id {
131								rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
132							}
133						}
134					}
135				}
136			}
137		}
138
139		deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{}
140		if got, want := deletedShardGroups, map[string]struct{}{
141			"db0.rp0.1": struct{}{},
142		}; reflect.DeepEqual(got, want) {
143			close(done)
144		} else if len(got) > 1 {
145			t.Errorf("deleted too many shard groups")
146		}
147		return nil
148	}
149
150	pruned := false
151	closing := make(chan struct{})
152	s.MetaClient.PruneShardGroupsFn = func() error {
153		select {
154		case <-done:
155			if !pruned {
156				close(closing)
157				pruned = true
158			}
159		default:
160		}
161		return nil
162	}
163
164	deletedShards := make(map[uint64]struct{})
165	s.TSDBStore.ShardIDsFn = func() []uint64 {
166		return []uint64{2, 3, 5, 6}
167	}
168	s.TSDBStore.DeleteShardFn = func(shardID uint64) error {
169		deletedShards[shardID] = struct{}{}
170		return nil
171	}
172
173	if err := s.Open(); err != nil {
174		t.Fatalf("unexpected open error: %s", err)
175	}
176	defer func() {
177		if err := s.Close(); err != nil {
178			t.Fatalf("unexpected close error: %s", err)
179		}
180	}()
181
182	timer := time.NewTimer(100 * time.Millisecond)
183	select {
184	case <-done:
185		timer.Stop()
186	case <-timer.C:
187		t.Errorf("timeout waiting for shard groups to be deleted")
188		return
189	}
190
191	timer = time.NewTimer(100 * time.Millisecond)
192	select {
193	case <-closing:
194		timer.Stop()
195	case <-timer.C:
196		t.Errorf("timeout waiting for shards to be deleted")
197		return
198	}
199
200	if got, want := deletedShards, map[uint64]struct{}{
201		2: struct{}{},
202		3: struct{}{},
203	}; !reflect.DeepEqual(got, want) {
204		t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want)
205	}
206}
207
208// This reproduces https://github.com/influxdata/influxdb/issues/8819
209func TestService_8819_repro(t *testing.T) {
210	for i := 0; i < 1000; i++ {
211		s, errC, done := testService_8819_repro(t)
212
213		if err := s.Open(); err != nil {
214			t.Fatal(err)
215		}
216
217		// Wait for service to run one sweep of all dbs/rps/shards.
218		if err := <-errC; err != nil {
219			t.Fatalf("%dth iteration: %v", i, err)
220		}
221		// Mark that we do not expect more errors in case it runs one more time.
222		close(done)
223
224		if err := s.Close(); err != nil {
225			t.Fatal(err)
226		}
227	}
228}
229
230func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) {
231	c := retention.NewConfig()
232	c.CheckInterval = toml.Duration(time.Millisecond)
233	s := NewService(c)
234	errC := make(chan error, 1) // Buffer Important to prevent deadlock.
235	done := make(chan struct{})
236
237	// A database and a bunch of shards
238	var mu sync.Mutex
239	shards := []uint64{3, 5, 8, 9, 11, 12}
240	localShards := []uint64{3, 5, 8, 9, 11, 12}
241	databases := []meta.DatabaseInfo{
242		{
243			Name: "db0",
244			RetentionPolicies: []meta.RetentionPolicyInfo{
245				{
246					Name:               "autogen",
247					Duration:           24 * time.Hour,
248					ShardGroupDuration: 24 * time.Hour,
249					ShardGroups: []meta.ShardGroupInfo{
250						{
251							ID:        1,
252							StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC),
253							EndTime:   time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC),
254							Shards: []meta.ShardInfo{
255								{ID: 3}, {ID: 9},
256							},
257						},
258						{
259							ID:        2,
260							StartTime: time.Now().Add(-1 * time.Hour),
261							EndTime:   time.Now(),
262							DeletedAt: time.Now(),
263							Shards: []meta.ShardInfo{
264								{ID: 11}, {ID: 12},
265							},
266						},
267					},
268				},
269			},
270		},
271	}
272
273	sendError := func(err error) {
274		select {
275		case errC <- err:
276		case <-done:
277		}
278	}
279
280	s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
281		mu.Lock()
282		defer mu.Unlock()
283		return databases
284	}
285
286	s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error {
287		if database != "db0" {
288			sendError(fmt.Errorf("wrong db name: %s", database))
289			return nil
290		} else if policy != "autogen" {
291			sendError(fmt.Errorf("wrong rp name: %s", policy))
292			return nil
293		} else if id != 1 {
294			sendError(fmt.Errorf("wrong shard group id: %d", id))
295			return nil
296		}
297
298		// remove the associated shards (3 and 9) from the shards slice...
299		mu.Lock()
300		newShards := make([]uint64, 0, len(shards))
301		for _, sid := range shards {
302			if sid != 3 && sid != 9 {
303				newShards = append(newShards, sid)
304			}
305		}
306		shards = newShards
307		databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC()
308		mu.Unlock()
309		return nil
310	}
311
312	s.MetaClient.PruneShardGroupsFn = func() error {
313		// When this is called all shards that have been deleted from the meta
314		// store (expired) should also have been deleted from disk.
315		// If they haven't then that indicates that shards can be removed from
316		// the meta store and there can be a race where they haven't yet been
317		// removed from the local disk and indexes. This has an impact on, for
318		// example, the max series per database limit.
319
320		mu.Lock()
321		defer mu.Unlock()
322		for _, lid := range localShards {
323			var found bool
324			for _, mid := range shards {
325				if lid == mid {
326					found = true
327					break
328				}
329			}
330
331			if !found {
332				sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards))
333				return nil
334			}
335		}
336
337		// We should have removed shards 3 and 9
338		if !reflect.DeepEqual(localShards, []uint64{5, 8}) {
339			sendError(fmt.Errorf("removed shards still present locally: %v", localShards))
340			return nil
341		}
342		sendError(nil)
343		return nil
344	}
345
346	s.TSDBStore.ShardIDsFn = func() []uint64 {
347		mu.Lock()
348		defer mu.Unlock()
349		return localShards
350	}
351
352	s.TSDBStore.DeleteShardFn = func(id uint64) error {
353		var found bool
354		mu.Lock()
355		newShards := make([]uint64, 0, len(localShards))
356		for _, sid := range localShards {
357			if sid != id {
358				newShards = append(newShards, sid)
359			} else {
360				found = true
361			}
362		}
363		localShards = newShards
364		mu.Unlock()
365
366		if !found {
367			return fmt.Errorf("shard %d not found locally", id)
368		}
369		return nil
370	}
371
372	return s, errC, done
373}
374
375type Service struct {
376	MetaClient *internal.MetaClientMock
377	TSDBStore  *internal.TSDBStoreMock
378
379	LogBuf bytes.Buffer
380	*retention.Service
381}
382
383func NewService(c retention.Config) *Service {
384	s := &Service{
385		MetaClient: &internal.MetaClientMock{},
386		TSDBStore:  &internal.TSDBStoreMock{},
387		Service:    retention.NewService(c),
388	}
389
390	l := logger.New(&s.LogBuf)
391	s.WithLogger(l)
392
393	s.Service.MetaClient = s.MetaClient
394	s.Service.TSDBStore = s.TSDBStore
395	return s
396}
397