1package retention_test 2 3import ( 4 "bytes" 5 "fmt" 6 "reflect" 7 "sync" 8 "testing" 9 "time" 10 11 "github.com/influxdata/influxdb/internal" 12 "github.com/influxdata/influxdb/logger" 13 "github.com/influxdata/influxdb/services/meta" 14 "github.com/influxdata/influxdb/services/retention" 15 "github.com/influxdata/influxdb/toml" 16) 17 18func TestService_OpenDisabled(t *testing.T) { 19 // Opening a disabled service should be a no-op. 20 c := retention.NewConfig() 21 c.Enabled = false 22 s := NewService(c) 23 24 if err := s.Open(); err != nil { 25 t.Fatal(err) 26 } 27 28 if s.LogBuf.String() != "" { 29 t.Fatalf("service logged %q, didn't expect any logging", s.LogBuf.String()) 30 } 31} 32 33func TestService_OpenClose(t *testing.T) { 34 // Opening a disabled service should be a no-op. 35 s := NewService(retention.NewConfig()) 36 37 if err := s.Open(); err != nil { 38 t.Fatal(err) 39 } 40 41 if s.LogBuf.String() == "" { 42 t.Fatal("service didn't log anything on open") 43 } 44 45 // Reopening is a no-op 46 if err := s.Open(); err != nil { 47 t.Fatal(err) 48 } 49 50 if err := s.Close(); err != nil { 51 t.Fatal(err) 52 } 53 54 // Re-closing is a no-op 55 if err := s.Close(); err != nil { 56 t.Fatal(err) 57 } 58} 59 60func TestService_CheckShards(t *testing.T) { 61 now := time.Now() 62 // Account for any time difference that could cause some of the logic in 63 // this test to fail due to a race condition. If we are at the very end of 64 // the hour, we can choose a time interval based on one "now" time and then 65 // run the retention service in the next hour. If we're in one of those 66 // situations, wait 100 milliseconds until we're in the next hour. 67 if got, want := now.Add(100*time.Millisecond).Truncate(time.Hour), now.Truncate(time.Hour); !got.Equal(want) { 68 time.Sleep(100 * time.Millisecond) 69 } 70 71 data := []meta.DatabaseInfo{ 72 { 73 Name: "db0", 74 75 DefaultRetentionPolicy: "rp0", 76 RetentionPolicies: []meta.RetentionPolicyInfo{ 77 { 78 Name: "rp0", 79 ReplicaN: 1, 80 Duration: time.Hour, 81 ShardGroupDuration: time.Hour, 82 ShardGroups: []meta.ShardGroupInfo{ 83 { 84 ID: 1, 85 StartTime: now.Truncate(time.Hour).Add(-2 * time.Hour), 86 EndTime: now.Truncate(time.Hour).Add(-1 * time.Hour), 87 Shards: []meta.ShardInfo{ 88 {ID: 2}, 89 {ID: 3}, 90 }, 91 }, 92 { 93 ID: 4, 94 StartTime: now.Truncate(time.Hour).Add(-1 * time.Hour), 95 EndTime: now.Truncate(time.Hour), 96 Shards: []meta.ShardInfo{ 97 {ID: 5}, 98 {ID: 6}, 99 }, 100 }, 101 { 102 ID: 7, 103 StartTime: now.Truncate(time.Hour), 104 EndTime: now.Truncate(time.Hour).Add(time.Hour), 105 Shards: []meta.ShardInfo{ 106 {ID: 8}, 107 {ID: 9}, 108 }, 109 }, 110 }, 111 }, 112 }, 113 }, 114 } 115 116 config := retention.NewConfig() 117 config.CheckInterval = toml.Duration(10 * time.Millisecond) 118 s := NewService(config) 119 s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { 120 return data 121 } 122 123 done := make(chan struct{}) 124 deletedShardGroups := make(map[string]struct{}) 125 s.MetaClient.DeleteShardGroupFn = func(database, policy string, id uint64) error { 126 for _, dbi := range data { 127 if dbi.Name == database { 128 for _, rpi := range dbi.RetentionPolicies { 129 if rpi.Name == policy { 130 for i, sg := range rpi.ShardGroups { 131 if sg.ID == id { 132 rpi.ShardGroups[i].DeletedAt = time.Now().UTC() 133 } 134 } 135 } 136 } 137 } 138 } 139 140 deletedShardGroups[fmt.Sprintf("%s.%s.%d", database, policy, id)] = struct{}{} 141 if got, want := deletedShardGroups, map[string]struct{}{ 142 "db0.rp0.1": struct{}{}, 143 }; reflect.DeepEqual(got, want) { 144 close(done) 145 } else if len(got) > 1 { 146 t.Errorf("deleted too many shard groups") 147 } 148 return nil 149 } 150 151 pruned := false 152 closing := make(chan struct{}) 153 s.MetaClient.PruneShardGroupsFn = func() error { 154 select { 155 case <-done: 156 if !pruned { 157 close(closing) 158 pruned = true 159 } 160 default: 161 } 162 return nil 163 } 164 165 deletedShards := make(map[uint64]struct{}) 166 s.TSDBStore.ShardIDsFn = func() []uint64 { 167 return []uint64{2, 3, 5, 6} 168 } 169 s.TSDBStore.DeleteShardFn = func(shardID uint64) error { 170 deletedShards[shardID] = struct{}{} 171 return nil 172 } 173 174 if err := s.Open(); err != nil { 175 t.Fatalf("unexpected open error: %s", err) 176 } 177 defer func() { 178 if err := s.Close(); err != nil { 179 t.Fatalf("unexpected close error: %s", err) 180 } 181 }() 182 183 timer := time.NewTimer(100 * time.Millisecond) 184 select { 185 case <-done: 186 timer.Stop() 187 case <-timer.C: 188 t.Errorf("timeout waiting for shard groups to be deleted") 189 return 190 } 191 192 timer = time.NewTimer(100 * time.Millisecond) 193 select { 194 case <-closing: 195 timer.Stop() 196 case <-timer.C: 197 t.Errorf("timeout waiting for shards to be deleted") 198 return 199 } 200 201 if got, want := deletedShards, map[uint64]struct{}{ 202 2: struct{}{}, 203 3: struct{}{}, 204 }; !reflect.DeepEqual(got, want) { 205 t.Errorf("unexpected deleted shards: got=%#v want=%#v", got, want) 206 } 207} 208 209// This reproduces https://github.com/influxdata/influxdb/issues/8819 210func TestService_8819_repro(t *testing.T) { 211 for i := 0; i < 1000; i++ { 212 s, errC, done := testService_8819_repro(t) 213 214 if err := s.Open(); err != nil { 215 t.Fatal(err) 216 } 217 218 // Wait for service to run one sweep of all dbs/rps/shards. 219 if err := <-errC; err != nil { 220 t.Fatalf("%dth iteration: %v", i, err) 221 } 222 // Mark that we do not expect more errors in case it runs one more time. 223 close(done) 224 225 if err := s.Close(); err != nil { 226 t.Fatal(err) 227 } 228 } 229} 230 231func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{}) { 232 c := retention.NewConfig() 233 c.CheckInterval = toml.Duration(time.Millisecond) 234 s := NewService(c) 235 errC := make(chan error, 1) // Buffer Important to prevent deadlock. 236 done := make(chan struct{}) 237 238 // A database and a bunch of shards 239 var mu sync.Mutex 240 shards := []uint64{3, 5, 8, 9, 11, 12} 241 localShards := []uint64{3, 5, 8, 9, 11, 12} 242 databases := []meta.DatabaseInfo{ 243 { 244 Name: "db0", 245 RetentionPolicies: []meta.RetentionPolicyInfo{ 246 { 247 Name: "autogen", 248 Duration: 24 * time.Hour, 249 ShardGroupDuration: 24 * time.Hour, 250 ShardGroups: []meta.ShardGroupInfo{ 251 { 252 ID: 1, 253 StartTime: time.Date(1980, 1, 1, 0, 0, 0, 0, time.UTC), 254 EndTime: time.Date(1981, 1, 1, 0, 0, 0, 0, time.UTC), 255 Shards: []meta.ShardInfo{ 256 {ID: 3}, {ID: 9}, 257 }, 258 }, 259 { 260 ID: 2, 261 StartTime: time.Now().Add(-1 * time.Hour), 262 EndTime: time.Now(), 263 DeletedAt: time.Now(), 264 Shards: []meta.ShardInfo{ 265 {ID: 11}, {ID: 12}, 266 }, 267 }, 268 }, 269 }, 270 }, 271 }, 272 } 273 274 sendError := func(err error) { 275 select { 276 case errC <- err: 277 case <-done: 278 } 279 } 280 281 s.MetaClient.DatabasesFn = func() []meta.DatabaseInfo { 282 mu.Lock() 283 defer mu.Unlock() 284 return databases 285 } 286 287 s.MetaClient.DeleteShardGroupFn = func(database string, policy string, id uint64) error { 288 if database != "db0" { 289 sendError(fmt.Errorf("wrong db name: %s", database)) 290 return nil 291 } else if policy != "autogen" { 292 sendError(fmt.Errorf("wrong rp name: %s", policy)) 293 return nil 294 } else if id != 1 { 295 sendError(fmt.Errorf("wrong shard group id: %d", id)) 296 return nil 297 } 298 299 // remove the associated shards (3 and 9) from the shards slice... 300 mu.Lock() 301 newShards := make([]uint64, 0, len(shards)) 302 for _, sid := range shards { 303 if sid != 3 && sid != 9 { 304 newShards = append(newShards, sid) 305 } 306 } 307 shards = newShards 308 databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = time.Now().UTC() 309 mu.Unlock() 310 return nil 311 } 312 313 s.MetaClient.PruneShardGroupsFn = func() error { 314 // When this is called all shards that have been deleted from the meta 315 // store (expired) should also have been deleted from disk. 316 // If they haven't then that indicates that shards can be removed from 317 // the meta store and there can be a race where they haven't yet been 318 // removed from the local disk and indexes. This has an impact on, for 319 // example, the max series per database limit. 320 321 mu.Lock() 322 defer mu.Unlock() 323 for _, lid := range localShards { 324 var found bool 325 for _, mid := range shards { 326 if lid == mid { 327 found = true 328 break 329 } 330 } 331 332 if !found { 333 sendError(fmt.Errorf("local shard %d present, yet it's missing from meta store. %v -- %v ", lid, shards, localShards)) 334 return nil 335 } 336 } 337 338 // We should have removed shards 3 and 9 339 if !reflect.DeepEqual(localShards, []uint64{5, 8}) { 340 sendError(fmt.Errorf("removed shards still present locally: %v", localShards)) 341 return nil 342 } 343 sendError(nil) 344 return nil 345 } 346 347 s.TSDBStore.ShardIDsFn = func() []uint64 { 348 mu.Lock() 349 defer mu.Unlock() 350 return localShards 351 } 352 353 s.TSDBStore.DeleteShardFn = func(id uint64) error { 354 var found bool 355 mu.Lock() 356 newShards := make([]uint64, 0, len(localShards)) 357 for _, sid := range localShards { 358 if sid != id { 359 newShards = append(newShards, sid) 360 } else { 361 found = true 362 } 363 } 364 localShards = newShards 365 mu.Unlock() 366 367 if !found { 368 return fmt.Errorf("shard %d not found locally", id) 369 } 370 return nil 371 } 372 373 return s, errC, done 374} 375 376type Service struct { 377 MetaClient *internal.MetaClientMock 378 TSDBStore *internal.TSDBStoreMock 379 380 LogBuf bytes.Buffer 381 *retention.Service 382} 383 384func NewService(c retention.Config) *Service { 385 s := &Service{ 386 MetaClient: &internal.MetaClientMock{}, 387 TSDBStore: &internal.TSDBStoreMock{}, 388 Service: retention.NewService(c), 389 } 390 391 l := logger.New(&s.LogBuf) 392 s.WithLogger(l) 393 394 s.Service.MetaClient = s.MetaClient 395 s.Service.TSDBStore = s.TSDBStore 396 return s 397} 398