1package retention_test
2
3import (
4	"bytes"
5	"fmt"
6	"reflect"
7	"sync"
8	"testing"
9	"time"
10
11	"github.com/influxdata/influxdb/internal"
12	"github.com/influxdata/influxdb/logger"
13	"github.com/influxdata/influxdb/services/meta"
14	"github.com/influxdata/influxdb/services/retention"
15	"github.com/influxdata/influxdb/toml"
16)
17
18func TestService_OpenDisabled(t *testing.T) {
19	// Opening a disabled service should be a no-op.
20	c := retention.NewConfig()
21	c.Enabled = false
22	s := NewService(c)
23
24	if err := s.Open(); err != nil {
25		t.Fatal(err)
26	}
27
28	if s.LogBuf.String() != "" {
29		t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String())
30	}
31}
32
33func TestService_OpenClose(t *testing.T) {
34	// Opening a disabled service should be a no-op.
35	s := NewService(retention.NewConfig())
36
37	if err := s.Open(); err != nil {
38		t.Fatal(err)
39	}
40
41	if s.LogBuf.String() == "" {
42		t.Fatal("service didn't log anything on open")
43	}
44
45	// Reopening is a no-op
46	if err := s.Open(); err != nil {
47		t.Fatal(err)
48	}
49
50	if err := s.Close(); err != nil {
51		t.Fatal(err)
52	}
53
54	// Re-closing is a no-op
55	if err := s.Close(); err != nil {
56		t.Fatal(err)
57	}
58}
59
60func TestService_CheckShards(t *testing.T) {
61	now := time.Now()
62	// Account for any time difference that could cause some of the logic in
63	// this test to fail due to a race condition. If we are at the very end of
64	// the hour, we can choose a time interval based on one "now" time and then
65	// run the retention service in the next hour. If we're in one of those
66	// situations, wait 100 milliseconds until we're in the next hour.
67	if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) {
68		time.Sleep(100 * time.Millisecond)
69	}
70
71	data := []meta.DatabaseInfo{
72		{
73			Name: "db0",
74
75			DefaultRetentionPolicy: "rp0",
76			RetentionPolicies: []meta.RetentionPolicyInfo{
77				{
78					Name:               "rp0",
79					ReplicaN:           1,
80					Duration:           time.Hour,
81					ShardGroupDuration: time.Hour,
82					ShardGroups: []meta.ShardGroupInfo{
83						{
84							ID:        1,
85							StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour),
86							EndTime:   now.Truncate(time.Hour).Add(-1 * time.Hour),
87							Shards: []meta.ShardInfo{
88								{ID: 2},
89								{ID: 3},
90							},
91						},
92						{
93							ID:        4,
94							StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour),
95							EndTime:   now.Truncate(time.Hour),
96							Shards: []meta.ShardInfo{
97								{ID: 5},
98								{ID: 6},
99							},
100						},
101						{
102							ID:        7,
103							StartTime: now.Truncate(time.Hour),
104							EndTime:   now.Truncate(time.Hour).Add(time.Hour),
105							Shards: []meta.ShardInfo{
106								{ID: 8},
107								{ID: 9},
108							},
109						},
110					},
111				},
112			},
113		},
114	}
115
116	config := retention.NewConfig()
117	config.CheckInterval = toml.Duration(10 * time.Millisecond)
118	s := NewService(config)
119	s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
120		return data
121	}
122
123	done := make(chan struct{})
124	deletedShardGroups := make(map[string]struct{})
125	s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error {
126		for _, dbi := range data {
127			if dbi.Name == database {
128				for _, rpi := range dbi.RetentionPolicies {
129					if rpi.Name == policy {
130						for i, sg := range rpi.ShardGroups {
131							if sg.ID == id {
132								rpi.ShardGroups[i].DeletedAt = time.Now().UTC()
133							}
134						}
135					}
136				}
137			}
138		}
139
140		deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{}
141		if got, want := deletedShardGroups, map[string]struct{}{
142			"db0.rp0.1": struct{}{},
143		}; reflect.DeepEqual(got, want) {
144			close(done)
145		} else if len(got) > 1 {
146			t.Errorf("deleted too many shard groups")
147		}
148		return nil
149	}
150
151	pruned := false
152	closing := make(chan struct{})
153	s.MetaClient.PruneShardGroupsFn = func() error {
154		select {
155		case <-done:
156			if !pruned {
157				close(closing)
158				pruned = true
159			}
160		default:
161		}
162		return nil
163	}
164
165	deletedShards := make(map[uint64]struct{})
166	s.TSDBStore.ShardIDsFn = func() []uint64 {
167		return []uint64{2, 3, 5, 6}
168	}
169	s.TSDBStore.DeleteShardFn = func(shardID uint64) error {
170		deletedShards[shardID] = struct{}{}
171		return nil
172	}
173
174	if err := s.Open(); err != nil {
175		t.Fatalf("unexpected open error: %s", err)
176	}
177	defer func() {
178		if err := s.Close(); err != nil {
179			t.Fatalf("unexpected close error: %s", err)
180		}
181	}()
182
183	timer := time.NewTimer(100 * time.Millisecond)
184	select {
185	case <-done:
186		timer.Stop()
187	case <-timer.C:
188		t.Errorf("timeout waiting for shard groups to be deleted")
189		return
190	}
191
192	timer = time.NewTimer(100 * time.Millisecond)
193	select {
194	case <-closing:
195		timer.Stop()
196	case <-timer.C:
197		t.Errorf("timeout waiting for shards to be deleted")
198		return
199	}
200
201	if got, want := deletedShards, map[uint64]struct{}{
202		2: struct{}{},
203		3: struct{}{},
204	}; !reflect.DeepEqual(got, want) {
205		t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want)
206	}
207}
208
209// This reproduces https://github.com/influxdata/influxdb/issues/8819
210func TestService_8819_repro(t *testing.T) {
211	for i := 0; i < 1000; i++ {
212		s, errC, done := testService_8819_repro(t)
213
214		if err := s.Open(); err != nil {
215			t.Fatal(err)
216		}
217
218		// Wait for service to run one sweep of all dbs/rps/shards.
219		if err := <-errC; err != nil {
220			t.Fatalf("%dth iteration: %v", i, err)
221		}
222		// Mark that we do not expect more errors in case it runs one more time.
223		close(done)
224
225		if err := s.Close(); err != nil {
226			t.Fatal(err)
227		}
228	}
229}
230
231func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) {
232	c := retention.NewConfig()
233	c.CheckInterval = toml.Duration(time.Millisecond)
234	s := NewService(c)
235	errC := make(chan error, 1) // Buffer Important to prevent deadlock.
236	done := make(chan struct{})
237
238	// A database and a bunch of shards
239	var mu sync.Mutex
240	shards := []uint64{3, 5, 8, 9, 11, 12}
241	localShards := []uint64{3, 5, 8, 9, 11, 12}
242	databases := []meta.DatabaseInfo{
243		{
244			Name: "db0",
245			RetentionPolicies: []meta.RetentionPolicyInfo{
246				{
247					Name:               "autogen",
248					Duration:           24 * time.Hour,
249					ShardGroupDuration: 24 * time.Hour,
250					ShardGroups: []meta.ShardGroupInfo{
251						{
252							ID:        1,
253							StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC),
254							EndTime:   time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC),
255							Shards: []meta.ShardInfo{
256								{ID: 3}, {ID: 9},
257							},
258						},
259						{
260							ID:        2,
261							StartTime: time.Now().Add(-1 * time.Hour),
262							EndTime:   time.Now(),
263							DeletedAt: time.Now(),
264							Shards: []meta.ShardInfo{
265								{ID: 11}, {ID: 12},
266							},
267						},
268					},
269				},
270			},
271		},
272	}
273
274	sendError := func(err error) {
275		select {
276		case errC <- err:
277		case <-done:
278		}
279	}
280
281	s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo {
282		mu.Lock()
283		defer mu.Unlock()
284		return databases
285	}
286
287	s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error {
288		if database != "db0" {
289			sendError(fmt.Errorf("wrong db name: %s", database))
290			return nil
291		} else if policy != "autogen" {
292			sendError(fmt.Errorf("wrong rp name: %s", policy))
293			return nil
294		} else if id != 1 {
295			sendError(fmt.Errorf("wrong shard group id: %d", id))
296			return nil
297		}
298
299		// remove the associated shards (3 and 9) from the shards slice...
300		mu.Lock()
301		newShards := make([]uint64, 0, len(shards))
302		for _, sid := range shards {
303			if sid != 3 && sid != 9 {
304				newShards = append(newShards, sid)
305			}
306		}
307		shards = newShards
308		databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC()
309		mu.Unlock()
310		return nil
311	}
312
313	s.MetaClient.PruneShardGroupsFn = func() error {
314		// When this is called all shards that have been deleted from the meta
315		// store (expired) should also have been deleted from disk.
316		// If they haven't then that indicates that shards can be removed from
317		// the meta store and there can be a race where they haven't yet been
318		// removed from the local disk and indexes. This has an impact on, for
319		// example, the max series per database limit.
320
321		mu.Lock()
322		defer mu.Unlock()
323		for _, lid := range localShards {
324			var found bool
325			for _, mid := range shards {
326				if lid == mid {
327					found = true
328					break
329				}
330			}
331
332			if !found {
333				sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards))
334				return nil
335			}
336		}
337
338		// We should have removed shards 3 and 9
339		if !reflect.DeepEqual(localShards, []uint64{5, 8}) {
340			sendError(fmt.Errorf("removed shards still present locally: %v", localShards))
341			return nil
342		}
343		sendError(nil)
344		return nil
345	}
346
347	s.TSDBStore.ShardIDsFn = func() []uint64 {
348		mu.Lock()
349		defer mu.Unlock()
350		return localShards
351	}
352
353	s.TSDBStore.DeleteShardFn = func(id uint64) error {
354		var found bool
355		mu.Lock()
356		newShards := make([]uint64, 0, len(localShards))
357		for _, sid := range localShards {
358			if sid != id {
359				newShards = append(newShards, sid)
360			} else {
361				found = true
362			}
363		}
364		localShards = newShards
365		mu.Unlock()
366
367		if !found {
368			return fmt.Errorf("shard %d not found locally", id)
369		}
370		return nil
371	}
372
373	return s, errC, done
374}
375
376type Service struct {
377	MetaClient *internal.MetaClientMock
378	TSDBStore  *internal.TSDBStoreMock
379
380	LogBuf bytes.Buffer
381	*retention.Service
382}
383
384func NewService(c retention.Config) *Service {
385	s := &Service{
386		MetaClient: &internal.MetaClientMock{},
387		TSDBStore:  &internal.TSDBStoreMock{},
388		Service:    retention.NewService(c),
389	}
390
391	l := logger.New(&s.LogBuf)
392	s.WithLogger(l)
393
394	s.Service.MetaClient = s.MetaClient
395	s.Service.TSDBStore = s.TSDBStore
396	return s
397}
398