1package coordinator_test 2 3import ( 4 "fmt" 5 "reflect" 6 "sync" 7 "sync/atomic" 8 "testing" 9 "time" 10 11 "github.com/influxdata/influxdb" 12 "github.com/influxdata/influxdb/coordinator" 13 "github.com/influxdata/influxdb/models" 14 "github.com/influxdata/influxdb/services/meta" 15 "github.com/influxdata/influxdb/tsdb" 16) 17 18// TODO(benbjohnson): Rewrite tests to use cluster_test.MetaClient. 19 20// Ensures the points writer maps a single point to a single shard. 21func TestPointsWriter_MapShards_One(t *testing.T) { 22 ms := PointsWriterMetaClient{} 23 rp := NewRetentionPolicy("myp", time.Hour, 3) 24 25 ms.NodeIDFn = func() uint64 { return 1 } 26 ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { 27 return rp, nil 28 } 29 30 ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 31 return &rp.ShardGroups[0], nil 32 } 33 34 c := coordinator.PointsWriter{MetaClient: ms} 35 pr := &coordinator.WritePointsRequest{ 36 Database: "mydb", 37 RetentionPolicy: "myrp", 38 } 39 pr.AddPoint("cpu", 1.0, time.Now(), nil) 40 41 var ( 42 shardMappings *coordinator.ShardMapping 43 err error 44 ) 45 if shardMappings, err = c.MapShards(pr); err != nil { 46 t.Fatalf("unexpected an error: %v", err) 47 } 48 49 if exp := 1; len(shardMappings.Points) != exp { 50 t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp) 51 } 52} 53 54// Ensures the points writer maps to a new shard group when the shard duration 55// is changed. 56func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) { 57 ms := PointsWriterMetaClient{} 58 rp := NewRetentionPolicy("myp", time.Hour, 3) 59 60 ms.NodeIDFn = func() uint64 { return 1 } 61 ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { 62 return rp, nil 63 } 64 65 var ( 66 i int 67 now = time.Now() 68 ) 69 70 ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 71 sg := []meta.ShardGroupInfo{ 72 meta.ShardGroupInfo{ 73 Shards: make([]meta.ShardInfo, 1), 74 StartTime: now, EndTime: now.Add(rp.Duration).Add(-1), 75 }, 76 meta.ShardGroupInfo{ 77 Shards: make([]meta.ShardInfo, 1), 78 StartTime: now.Add(time.Hour), EndTime: now.Add(3 * time.Hour).Add(rp.Duration).Add(-1), 79 }, 80 }[i] 81 i++ 82 return &sg, nil 83 } 84 85 c := coordinator.NewPointsWriter() 86 c.MetaClient = ms 87 88 pr := &coordinator.WritePointsRequest{ 89 Database: "mydb", 90 RetentionPolicy: "myrp", 91 } 92 pr.AddPoint("cpu", 1.0, now, nil) 93 pr.AddPoint("cpu", 2.0, now.Add(2*time.Second), nil) 94 95 var ( 96 shardMappings *coordinator.ShardMapping 97 err error 98 ) 99 if shardMappings, err = c.MapShards(pr); err != nil { 100 t.Fatalf("unexpected an error: %v", err) 101 } 102 103 if got, exp := len(shardMappings.Points[0]), 2; got != exp { 104 t.Fatalf("got %d point(s), expected %d", got, exp) 105 } 106 107 if got, exp := len(shardMappings.Shards), 1; got != exp { 108 t.Errorf("got %d shard(s), expected %d", got, exp) 109 } 110 111 // Now we alter the retention policy duration. 112 rp.ShardGroupDuration = 3 * time.Hour 113 114 pr = &coordinator.WritePointsRequest{ 115 Database: "mydb", 116 RetentionPolicy: "myrp", 117 } 118 pr.AddPoint("cpu", 1.0, now.Add(2*time.Hour), nil) 119 120 // Point is beyond previous shard group so a new shard group should be 121 // created. 122 if _, err = c.MapShards(pr); err != nil { 123 t.Fatalf("unexpected an error: %v", err) 124 } 125 126 // We can check value of i since it's only incremeneted when a shard group 127 // is created. 128 if got, exp := i, 2; got != exp { 129 t.Fatal("new shard group was not created, expected it to be") 130 } 131} 132 133// Ensures the points writer maps a multiple points across shard group boundaries. 134func TestPointsWriter_MapShards_Multiple(t *testing.T) { 135 ms := PointsWriterMetaClient{} 136 rp := NewRetentionPolicy("myp", time.Hour, 3) 137 rp.ShardGroupDuration = time.Hour 138 AttachShardGroupInfo(rp, []meta.ShardOwner{ 139 {NodeID: 1}, 140 {NodeID: 2}, 141 {NodeID: 3}, 142 }) 143 AttachShardGroupInfo(rp, []meta.ShardOwner{ 144 {NodeID: 1}, 145 {NodeID: 2}, 146 {NodeID: 3}, 147 }) 148 149 ms.NodeIDFn = func() uint64 { return 1 } 150 ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { 151 return rp, nil 152 } 153 154 ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 155 for i, sg := range rp.ShardGroups { 156 if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) { 157 return &rp.ShardGroups[i], nil 158 } 159 } 160 panic("should not get here") 161 } 162 163 c := coordinator.NewPointsWriter() 164 c.MetaClient = ms 165 defer c.Close() 166 pr := &coordinator.WritePointsRequest{ 167 Database: "mydb", 168 RetentionPolicy: "myrp", 169 } 170 171 // Three points that range over the shardGroup duration (1h) and should map to two 172 // distinct shards 173 pr.AddPoint("cpu", 1.0, time.Now(), nil) 174 pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil) 175 pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) 176 177 var ( 178 shardMappings *coordinator.ShardMapping 179 err error 180 ) 181 if shardMappings, err = c.MapShards(pr); err != nil { 182 t.Fatalf("unexpected an error: %v", err) 183 } 184 185 if exp := 2; len(shardMappings.Points) != exp { 186 t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp) 187 } 188 189 for _, points := range shardMappings.Points { 190 // First shard should have 1 point w/ first point added 191 if len(points) == 1 && points[0].Time() != pr.Points[0].Time() { 192 t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[0].Time()) 193 } 194 195 // Second shard should have the last two points added 196 if len(points) == 2 && points[0].Time() != pr.Points[1].Time() { 197 t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[1].Time()) 198 } 199 200 if len(points) == 2 && points[1].Time() != pr.Points[2].Time() { 201 t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[1].Time(), pr.Points[2].Time()) 202 } 203 } 204} 205 206// Ensures the points writer does not map points beyond the retention policy. 207func TestPointsWriter_MapShards_Invalid(t *testing.T) { 208 ms := PointsWriterMetaClient{} 209 rp := NewRetentionPolicy("myp", time.Hour, 3) 210 211 ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { 212 return rp, nil 213 } 214 215 ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 216 return &rp.ShardGroups[0], nil 217 } 218 219 c := coordinator.NewPointsWriter() 220 c.MetaClient = ms 221 defer c.Close() 222 pr := &coordinator.WritePointsRequest{ 223 Database: "mydb", 224 RetentionPolicy: "myrp", 225 } 226 227 // Add a point that goes beyond the current retention policy. 228 pr.AddPoint("cpu", 1.0, time.Now().Add(-2*time.Hour), nil) 229 230 var ( 231 shardMappings *coordinator.ShardMapping 232 err error 233 ) 234 if shardMappings, err = c.MapShards(pr); err != nil { 235 t.Fatalf("unexpected an error: %v", err) 236 } 237 238 if got, exp := len(shardMappings.Points), 0; got != exp { 239 t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp) 240 } 241 242 if got, exp := len(shardMappings.Dropped), 1; got != exp { 243 t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp) 244 } 245} 246 247func TestPointsWriter_WritePoints(t *testing.T) { 248 tests := []struct { 249 name string 250 database string 251 retentionPolicy string 252 253 // the responses returned by each shard write call. node ID 1 = pos 0 254 err []error 255 expErr error 256 }{ 257 { 258 name: "write one success", 259 database: "mydb", 260 retentionPolicy: "myrp", 261 err: []error{nil, nil, nil}, 262 expErr: nil, 263 }, 264 265 // Write to non-existent database 266 { 267 name: "write to non-existent database", 268 database: "doesnt_exist", 269 retentionPolicy: "", 270 err: []error{nil, nil, nil}, 271 expErr: fmt.Errorf("database not found: doesnt_exist"), 272 }, 273 } 274 275 for _, test := range tests { 276 277 pr := &coordinator.WritePointsRequest{ 278 Database: test.database, 279 RetentionPolicy: test.retentionPolicy, 280 } 281 282 // Ensure that the test shard groups are created before the points 283 // are created. 284 ms := NewPointsWriterMetaClient() 285 286 // Three points that range over the shardGroup duration (1h) and should map to two 287 // distinct shards 288 pr.AddPoint("cpu", 1.0, time.Now(), nil) 289 pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil) 290 pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil) 291 292 // copy to prevent data race 293 theTest := test 294 sm := coordinator.NewShardMapping(16) 295 sm.MapPoint( 296 &meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{ 297 {NodeID: 1}, 298 {NodeID: 2}, 299 {NodeID: 3}, 300 }}, 301 pr.Points[0]) 302 sm.MapPoint( 303 &meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ 304 {NodeID: 1}, 305 {NodeID: 2}, 306 {NodeID: 3}, 307 }}, 308 pr.Points[1]) 309 sm.MapPoint( 310 &meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{ 311 {NodeID: 1}, 312 {NodeID: 2}, 313 {NodeID: 3}, 314 }}, 315 pr.Points[2]) 316 317 // Local coordinator.Node ShardWriter 318 // lock on the write increment since these functions get called in parallel 319 var mu sync.Mutex 320 321 store := &fakeStore{ 322 WriteFn: func(shardID uint64, points []models.Point) error { 323 mu.Lock() 324 defer mu.Unlock() 325 return theTest.err[0] 326 }, 327 } 328 329 ms.DatabaseFn = func(database string) *meta.DatabaseInfo { 330 return nil 331 } 332 ms.NodeIDFn = func() uint64 { return 1 } 333 334 subPoints := make(chan *coordinator.WritePointsRequest, 1) 335 sub := Subscriber{} 336 sub.PointsFn = func() chan<- *coordinator.WritePointsRequest { 337 return subPoints 338 } 339 340 c := coordinator.NewPointsWriter() 341 c.MetaClient = ms 342 c.TSDBStore = store 343 c.AddWriteSubscriber(sub.Points()) 344 c.Node = &influxdb.Node{ID: 1} 345 346 c.Open() 347 defer c.Close() 348 349 err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) 350 if err == nil && test.expErr != nil { 351 t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) 352 } 353 354 if err != nil && test.expErr == nil { 355 t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) 356 } 357 if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() { 358 t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr) 359 } 360 if test.expErr == nil { 361 select { 362 case p := <-subPoints: 363 if !reflect.DeepEqual(p, pr) { 364 t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr) 365 } 366 default: 367 t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: Subscriber.Points not called", test.name) 368 } 369 } 370 } 371} 372 373func TestPointsWriter_WritePoints_Dropped(t *testing.T) { 374 pr := &coordinator.WritePointsRequest{ 375 Database: "mydb", 376 RetentionPolicy: "myrp", 377 } 378 379 // Ensure that the test shard groups are created before the points 380 // are created. 381 ms := NewPointsWriterMetaClient() 382 383 // Three points that range over the shardGroup duration (1h) and should map to two 384 // distinct shards 385 pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil) 386 387 // copy to prevent data race 388 sm := coordinator.NewShardMapping(16) 389 390 // ShardMapper dropped this point 391 sm.Dropped = append(sm.Dropped, pr.Points[0]) 392 393 // Local coordinator.Node ShardWriter 394 // lock on the write increment since these functions get called in parallel 395 var mu sync.Mutex 396 397 store := &fakeStore{ 398 WriteFn: func(shardID uint64, points []models.Point) error { 399 mu.Lock() 400 defer mu.Unlock() 401 return nil 402 }, 403 } 404 405 ms.DatabaseFn = func(database string) *meta.DatabaseInfo { 406 return nil 407 } 408 ms.NodeIDFn = func() uint64 { return 1 } 409 410 subPoints := make(chan *coordinator.WritePointsRequest, 1) 411 sub := Subscriber{} 412 sub.PointsFn = func() chan<- *coordinator.WritePointsRequest { 413 return subPoints 414 } 415 416 c := coordinator.NewPointsWriter() 417 c.MetaClient = ms 418 c.TSDBStore = store 419 c.AddWriteSubscriber(sub.Points()) 420 c.Node = &influxdb.Node{ID: 1} 421 422 c.Open() 423 defer c.Close() 424 425 err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points) 426 if _, ok := err.(tsdb.PartialWriteError); !ok { 427 t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{}) 428 } 429} 430 431type fakePointsWriter struct { 432 WritePointsIntoFn func(*coordinator.IntoWriteRequest) error 433} 434 435func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) error { 436 return f.WritePointsIntoFn(req) 437} 438 439func TestBufferedPointsWriter(t *testing.T) { 440 db := "db0" 441 rp := "rp0" 442 capacity := 10000 443 444 writePointsIntoCnt := 0 445 pointsWritten := []models.Point{} 446 447 reset := func() { 448 writePointsIntoCnt = 0 449 pointsWritten = pointsWritten[:0] 450 } 451 452 fakeWriter := &fakePointsWriter{ 453 WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error { 454 writePointsIntoCnt++ 455 pointsWritten = append(pointsWritten, req.Points...) 456 return nil 457 }, 458 } 459 460 w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity) 461 462 // Test that capacity and length are correct for new buffered writer. 463 if w.Cap() != capacity { 464 t.Fatalf("exp %d, got %d", capacity, w.Cap()) 465 } else if w.Len() != 0 { 466 t.Fatalf("exp %d, got %d", 0, w.Len()) 467 } 468 469 // Test flushing an empty buffer. 470 if err := w.Flush(); err != nil { 471 t.Fatal(err) 472 } else if writePointsIntoCnt > 0 { 473 t.Fatalf("exp 0, got %d", writePointsIntoCnt) 474 } 475 476 // Test writing zero points. 477 if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ 478 Database: db, 479 RetentionPolicy: rp, 480 Points: []models.Point{}, 481 }); err != nil { 482 t.Fatal(err) 483 } else if writePointsIntoCnt > 0 { 484 t.Fatalf("exp 0, got %d", writePointsIntoCnt) 485 } else if w.Len() > 0 { 486 t.Fatalf("exp 0, got %d", w.Len()) 487 } 488 489 // Test writing single large bunch of points points. 490 req := coordinator.WritePointsRequest{ 491 Database: db, 492 RetentionPolicy: rp, 493 } 494 495 numPoints := int(float64(capacity) * 5.5) 496 for i := 0; i < numPoints; i++ { 497 req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil) 498 } 499 500 r := coordinator.IntoWriteRequest(req) 501 if err := w.WritePointsInto(&r); err != nil { 502 t.Fatal(err) 503 } else if writePointsIntoCnt != 5 { 504 t.Fatalf("exp 5, got %d", writePointsIntoCnt) 505 } else if w.Len() != capacity/2 { 506 t.Fatalf("exp %d, got %d", capacity/2, w.Len()) 507 } else if len(pointsWritten) != numPoints-capacity/2 { 508 t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten)) 509 } 510 511 if err := w.Flush(); err != nil { 512 t.Fatal(err) 513 } else if writePointsIntoCnt != 6 { 514 t.Fatalf("exp 6, got %d", writePointsIntoCnt) 515 } else if w.Len() != 0 { 516 t.Fatalf("exp 0, got %d", w.Len()) 517 } else if len(pointsWritten) != numPoints { 518 t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) 519 } else if !reflect.DeepEqual(r.Points, pointsWritten) { 520 t.Fatal("points don't match") 521 } 522 523 reset() 524 525 // Test writing points one at a time. 526 for i, _ := range r.Points { 527 if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ 528 Database: db, 529 RetentionPolicy: rp, 530 Points: r.Points[i : i+1], 531 }); err != nil { 532 t.Fatal(err) 533 } 534 } 535 536 if err := w.Flush(); err != nil { 537 t.Fatal(err) 538 } else if writePointsIntoCnt != 6 { 539 t.Fatalf("exp 6, got %d", writePointsIntoCnt) 540 } else if w.Len() != 0 { 541 t.Fatalf("exp 0, got %d", w.Len()) 542 } else if len(pointsWritten) != numPoints { 543 t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) 544 } else if !reflect.DeepEqual(r.Points, pointsWritten) { 545 t.Fatal("points don't match") 546 } 547} 548 549var shardID uint64 550 551type fakeStore struct { 552 WriteFn func(shardID uint64, points []models.Point) error 553 CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error 554} 555 556func (f *fakeStore) WriteToShard(shardID uint64, points []models.Point) error { 557 return f.WriteFn(shardID, points) 558} 559 560func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error { 561 return f.CreateShardfn(database, retentionPolicy, shardID, enabled) 562} 563 564func NewPointsWriterMetaClient() *PointsWriterMetaClient { 565 ms := &PointsWriterMetaClient{} 566 rp := NewRetentionPolicy("myp", time.Hour, 3) 567 AttachShardGroupInfo(rp, []meta.ShardOwner{ 568 {NodeID: 1}, 569 {NodeID: 2}, 570 {NodeID: 3}, 571 }) 572 AttachShardGroupInfo(rp, []meta.ShardOwner{ 573 {NodeID: 1}, 574 {NodeID: 2}, 575 {NodeID: 3}, 576 }) 577 578 ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) { 579 return rp, nil 580 } 581 582 ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 583 for i, sg := range rp.ShardGroups { 584 if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) { 585 return &rp.ShardGroups[i], nil 586 } 587 } 588 panic("should not get here") 589 } 590 return ms 591} 592 593type PointsWriterMetaClient struct { 594 NodeIDFn func() uint64 595 RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error) 596 CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) 597 DatabaseFn func(database string) *meta.DatabaseInfo 598 ShardOwnerFn func(shardID uint64) (string, string, *meta.ShardGroupInfo) 599} 600 601func (m PointsWriterMetaClient) NodeID() uint64 { return m.NodeIDFn() } 602 603func (m PointsWriterMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) { 604 return m.RetentionPolicyFn(database, name) 605} 606 607func (m PointsWriterMetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { 608 return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp) 609} 610 611func (m PointsWriterMetaClient) Database(database string) *meta.DatabaseInfo { 612 return m.DatabaseFn(database) 613} 614 615func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) { 616 return m.ShardOwnerFn(shardID) 617} 618 619type Subscriber struct { 620 PointsFn func() chan<- *coordinator.WritePointsRequest 621} 622 623func (s Subscriber) Points() chan<- *coordinator.WritePointsRequest { 624 return s.PointsFn() 625} 626 627func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo { 628 shards := []meta.ShardInfo{} 629 owners := []meta.ShardOwner{} 630 for i := 1; i <= nodeCount; i++ { 631 owners = append(owners, meta.ShardOwner{NodeID: uint64(i)}) 632 } 633 634 // each node is fully replicated with each other 635 shards = append(shards, meta.ShardInfo{ 636 ID: nextShardID(), 637 Owners: owners, 638 }) 639 640 start := time.Now() 641 rp := &meta.RetentionPolicyInfo{ 642 Name: "myrp", 643 ReplicaN: nodeCount, 644 Duration: duration, 645 ShardGroupDuration: duration, 646 ShardGroups: []meta.ShardGroupInfo{ 647 meta.ShardGroupInfo{ 648 ID: nextShardID(), 649 StartTime: start, 650 EndTime: start.Add(duration).Add(-1), 651 Shards: shards, 652 }, 653 }, 654 } 655 return rp 656} 657 658func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, owners []meta.ShardOwner) { 659 var startTime, endTime time.Time 660 if len(rp.ShardGroups) == 0 { 661 startTime = time.Now() 662 } else { 663 startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration) 664 } 665 endTime = startTime.Add(rp.ShardGroupDuration).Add(-1) 666 667 sh := meta.ShardGroupInfo{ 668 ID: uint64(len(rp.ShardGroups) + 1), 669 StartTime: startTime, 670 EndTime: endTime, 671 Shards: []meta.ShardInfo{ 672 meta.ShardInfo{ 673 ID: nextShardID(), 674 Owners: owners, 675 }, 676 }, 677 } 678 rp.ShardGroups = append(rp.ShardGroups, sh) 679} 680 681func nextShardID() uint64 { 682 return atomic.AddUint64(&shardID, 1) 683} 684