1package retention_test 2 3import ( 4 "bytes" 5 "fmt" 6 "reflect" 7 "sync" 8 "testing" 9 "time" 10 11 "github.com/influxdata/influxdb/internal" 12 "github.com/influxdata/influxdb/logger" 13 "github.com/influxdata/influxdb/services/meta" 14 "github.com/influxdata/influxdb/services/retention" 15 "github.com/influxdata/influxdb/toml" 16) 17 18func TestService_OpenDisabled(t *testing.T) { 19 // Opening a disabled service should be a no-op. 20 c := retention.NewConfig() 21 c.Enabled = false 22 s := NewService(c) 23 24 if err := s.Open(); err != nil { 25 t.Fatal(err) 26 } 27 28 if s.LogBuf.String() != "" { 29 t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String()) 30 } 31} 32 33func TestService_OpenClose(t *testing.T) { 34 // Opening a disabled service should be a no-op. 35 s := NewService(retention.NewConfig()) 36 37 if err := s.Open(); err != nil { 38 t.Fatal(err) 39 } 40 41 if s.LogBuf.String() == "" { 42 t.Fatal("service didn't log anything on open") 43 } 44 45 // Reopening is a no-op 46 if err := s.Open(); err != nil { 47 t.Fatal(err) 48 } 49 50 if err := s.Close(); err != nil { 51 t.Fatal(err) 52 } 53 54 // Re-closing is a no-op 55 if err := s.Close(); err != nil { 56 t.Fatal(err) 57 } 58} 59 60func TestService_CheckShards(t *testing.T) { 61 now := time.Now() 62 // Account for any time difference that could cause some of the logic in 63 // this test to fail due to a race condition. If we are at the very end of 64 // the hour, we can choose a time interval based on one "now" time and then 65 // run the retention service in the next hour. If we're in one of those 66 // situations, wait 100 milliseconds until we're in the next hour. 67 if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) { 68 time.Sleep(100 * time.Millisecond) 69 } 70 71 data := []meta.DatabaseInfo{ 72 { 73 Name: "db0", 74 DefaultRetentionPolicy: "rp0", 75 RetentionPolicies: []meta.RetentionPolicyInfo{ 76 { 77 Name: "rp0", 78 ReplicaN: 1, 79 Duration: time.Hour, 80 ShardGroupDuration: time.Hour, 81 ShardGroups: []meta.ShardGroupInfo{ 82 { 83 ID: 1, 84 StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour), 85 EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour), 86 Shards: []meta.ShardInfo{ 87 {ID: 2}, 88 {ID: 3}, 89 }, 90 }, 91 { 92 ID: 4, 93 StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour), 94 EndTime: now.Truncate(time.Hour), 95 Shards: []meta.ShardInfo{ 96 {ID: 5}, 97 {ID: 6}, 98 }, 99 }, 100 { 101 ID: 7, 102 StartTime: now.Truncate(time.Hour), 103 EndTime: now.Truncate(time.Hour).Add(time.Hour), 104 Shards: []meta.ShardInfo{ 105 {ID: 8}, 106 {ID: 9}, 107 }, 108 }, 109 }, 110 }, 111 }, 112 }, 113 } 114 115 config := retention.NewConfig() 116 config.CheckInterval = toml.Duration(10 * time.Millisecond) 117 s := NewService(config) 118 s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { 119 return data 120 } 121 122 done := make(chan struct{}) 123 deletedShardGroups := make(map[string]struct{}) 124 s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error { 125 for _, dbi := range data { 126 if dbi.Name == database { 127 for _, rpi := range dbi.RetentionPolicies { 128 if rpi.Name == policy { 129 for i, sg := range rpi.ShardGroups { 130 if sg.ID == id { 131 rpi.ShardGroups[i].DeletedAt = time.Now().UTC() 132 } 133 } 134 } 135 } 136 } 137 } 138 139 deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{} 140 if got, want := deletedShardGroups, map[string]struct{}{ 141 "db0.rp0.1": struct{}{}, 142 }; reflect.DeepEqual(got, want) { 143 close(done) 144 } else if len(got) > 1 { 145 t.Errorf("deleted too many shard groups") 146 } 147 return nil 148 } 149 150 pruned := false 151 closing := make(chan struct{}) 152 s.MetaClient.PruneShardGroupsFn = func() error { 153 select { 154 case <-done: 155 if !pruned { 156 close(closing) 157 pruned = true 158 } 159 default: 160 } 161 return nil 162 } 163 164 deletedShards := make(map[uint64]struct{}) 165 s.TSDBStore.ShardIDsFn = func() []uint64 { 166 return []uint64{2, 3, 5, 6} 167 } 168 s.TSDBStore.DeleteShardFn = func(shardID uint64) error { 169 deletedShards[shardID] = struct{}{} 170 return nil 171 } 172 173 if err := s.Open(); err != nil { 174 t.Fatalf("unexpected open error: %s", err) 175 } 176 defer func() { 177 if err := s.Close(); err != nil { 178 t.Fatalf("unexpected close error: %s", err) 179 } 180 }() 181 182 timer := time.NewTimer(100 * time.Millisecond) 183 select { 184 case <-done: 185 timer.Stop() 186 case <-timer.C: 187 t.Errorf("timeout waiting for shard groups to be deleted") 188 return 189 } 190 191 timer = time.NewTimer(100 * time.Millisecond) 192 select { 193 case <-closing: 194 timer.Stop() 195 case <-timer.C: 196 t.Errorf("timeout waiting for shards to be deleted") 197 return 198 } 199 200 if got, want := deletedShards, map[uint64]struct{}{ 201 2: struct{}{}, 202 3: struct{}{}, 203 }; !reflect.DeepEqual(got, want) { 204 t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want) 205 } 206} 207 208// This reproduces https://github.com/influxdata/influxdb/issues/8819 209func TestService_8819_repro(t *testing.T) { 210 for i := 0; i < 1000; i++ { 211 s, errC, done := testService_8819_repro(t) 212 213 if err := s.Open(); err != nil { 214 t.Fatal(err) 215 } 216 217 // Wait for service to run one sweep of all dbs/rps/shards. 218 if err := <-errC; err != nil { 219 t.Fatalf("%dth iteration: %v", i, err) 220 } 221 // Mark that we do not expect more errors in case it runs one more time. 222 close(done) 223 224 if err := s.Close(); err != nil { 225 t.Fatal(err) 226 } 227 } 228} 229 230func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) { 231 c := retention.NewConfig() 232 c.CheckInterval = toml.Duration(time.Millisecond) 233 s := NewService(c) 234 errC := make(chan error, 1) // Buffer Important to prevent deadlock. 235 done := make(chan struct{}) 236 237 // A database and a bunch of shards 238 var mu sync.Mutex 239 shards := []uint64{3, 5, 8, 9, 11, 12} 240 localShards := []uint64{3, 5, 8, 9, 11, 12} 241 databases := []meta.DatabaseInfo{ 242 { 243 Name: "db0", 244 RetentionPolicies: []meta.RetentionPolicyInfo{ 245 { 246 Name: "autogen", 247 Duration: 24 * time.Hour, 248 ShardGroupDuration: 24 * time.Hour, 249 ShardGroups: []meta.ShardGroupInfo{ 250 { 251 ID: 1, 252 StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC), 253 EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC), 254 Shards: []meta.ShardInfo{ 255 {ID: 3}, {ID: 9}, 256 }, 257 }, 258 { 259 ID: 2, 260 StartTime: time.Now().Add(-1 * time.Hour), 261 EndTime: time.Now(), 262 DeletedAt: time.Now(), 263 Shards: []meta.ShardInfo{ 264 {ID: 11}, {ID: 12}, 265 }, 266 }, 267 }, 268 }, 269 }, 270 }, 271 } 272 273 sendError := func(err error) { 274 select { 275 case errC <- err: 276 case <-done: 277 } 278 } 279 280 s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { 281 mu.Lock() 282 defer mu.Unlock() 283 return databases 284 } 285 286 s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { 287 if database != "db0" { 288 sendError(fmt.Errorf("wrong db name: %s", database)) 289 return nil 290 } else if policy != "autogen" { 291 sendError(fmt.Errorf("wrong rp name: %s", policy)) 292 return nil 293 } else if id != 1 { 294 sendError(fmt.Errorf("wrong shard group id: %d", id)) 295 return nil 296 } 297 298 // remove the associated shards (3 and 9) from the shards slice... 299 mu.Lock() 300 newShards := make([]uint64, 0, len(shards)) 301 for _, sid := range shards { 302 if sid != 3 && sid != 9 { 303 newShards = append(newShards, sid) 304 } 305 } 306 shards = newShards 307 databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC() 308 mu.Unlock() 309 return nil 310 } 311 312 s.MetaClient.PruneShardGroupsFn = func() error { 313 // When this is called all shards that have been deleted from the meta 314 // store (expired) should also have been deleted from disk. 315 // If they haven't then that indicates that shards can be removed from 316 // the meta store and there can be a race where they haven't yet been 317 // removed from the local disk and indexes. This has an impact on, for 318 // example, the max series per database limit. 319 320 mu.Lock() 321 defer mu.Unlock() 322 for _, lid := range localShards { 323 var found bool 324 for _, mid := range shards { 325 if lid == mid { 326 found = true 327 break 328 } 329 } 330 331 if !found { 332 sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards)) 333 return nil 334 } 335 } 336 337 // We should have removed shards 3 and 9 338 if !reflect.DeepEqual(localShards, []uint64{5, 8}) { 339 sendError(fmt.Errorf("removed shards still present locally: %v", localShards)) 340 return nil 341 } 342 sendError(nil) 343 return nil 344 } 345 346 s.TSDBStore.ShardIDsFn = func() []uint64 { 347 mu.Lock() 348 defer mu.Unlock() 349 return localShards 350 } 351 352 s.TSDBStore.DeleteShardFn = func(id uint64) error { 353 var found bool 354 mu.Lock() 355 newShards := make([]uint64, 0, len(localShards)) 356 for _, sid := range localShards { 357 if sid != id { 358 newShards = append(newShards, sid) 359 } else { 360 found = true 361 } 362 } 363 localShards = newShards 364 mu.Unlock() 365 366 if !found { 367 return fmt.Errorf("shard %d not found locally", id) 368 } 369 return nil 370 } 371 372 return s, errC, done 373} 374 375type Service struct { 376 MetaClient *internal.MetaClientMock 377 TSDBStore *internal.TSDBStoreMock 378 379 LogBuf bytes.Buffer 380 *retention.Service 381} 382 383func NewService(c retention.Config) *Service { 384 s := &Service{ 385 MetaClient: &internal.MetaClientMock{}, 386 TSDBStore: &internal.TSDBStoreMock{}, 387 Service: retention.NewService(c), 388 } 389 390 l := logger.New(&s.LogBuf) 391 s.WithLogger(l) 392 393 s.Service.MetaClient = s.MetaClient 394 s.Service.TSDBStore = s.TSDBStore 395 return s 396} 397