1package coordinator_test
2
3import (
4	"fmt"
5	"reflect"
6	"sync"
7	"sync/atomic"
8	"testing"
9	"time"
10
11	"github.com/influxdata/influxdb"
12	"github.com/influxdata/influxdb/coordinator"
13	"github.com/influxdata/influxdb/models"
14	"github.com/influxdata/influxdb/services/meta"
15	"github.com/influxdata/influxdb/tsdb"
16)
17
18// TODO(benbjohnson): Rewrite tests to use cluster_test.MetaClient.
19
20// Ensures the points writer maps a single point to a single shard.
21func TestPointsWriter_MapShards_One(t *testing.T) {
22	ms := PointsWriterMetaClient{}
23	rp := NewRetentionPolicy("myp", time.Hour, 3)
24
25	ms.NodeIDFn = func() uint64 { return 1 }
26	ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
27		return rp, nil
28	}
29
30	ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
31		return &rp.ShardGroups[0], nil
32	}
33
34	c := coordinator.PointsWriter{MetaClient: ms}
35	pr := &coordinator.WritePointsRequest{
36		Database:        "mydb",
37		RetentionPolicy: "myrp",
38	}
39	pr.AddPoint("cpu", 1.0, time.Now(), nil)
40
41	var (
42		shardMappings *coordinator.ShardMapping
43		err           error
44	)
45	if shardMappings, err = c.MapShards(pr); err != nil {
46		t.Fatalf("unexpected an error: %v", err)
47	}
48
49	if exp := 1; len(shardMappings.Points) != exp {
50		t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
51	}
52}
53
54// Ensures the points writer maps to a new shard group when the shard duration
55// is changed.
56func TestPointsWriter_MapShards_AlterShardDuration(t *testing.T) {
57	ms := PointsWriterMetaClient{}
58	rp := NewRetentionPolicy("myp", time.Hour, 3)
59
60	ms.NodeIDFn = func() uint64 { return 1 }
61	ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
62		return rp, nil
63	}
64
65	var (
66		i   int
67		now = time.Now()
68	)
69
70	ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
71		sg := []meta.ShardGroupInfo{
72			meta.ShardGroupInfo{
73				Shards:    make([]meta.ShardInfo, 1),
74				StartTime: now, EndTime: now.Add(rp.Duration).Add(-1),
75			},
76			meta.ShardGroupInfo{
77				Shards:    make([]meta.ShardInfo, 1),
78				StartTime: now.Add(time.Hour), EndTime: now.Add(3 * time.Hour).Add(rp.Duration).Add(-1),
79			},
80		}[i]
81		i++
82		return &sg, nil
83	}
84
85	c := coordinator.NewPointsWriter()
86	c.MetaClient = ms
87
88	pr := &coordinator.WritePointsRequest{
89		Database:        "mydb",
90		RetentionPolicy: "myrp",
91	}
92	pr.AddPoint("cpu", 1.0, now, nil)
93	pr.AddPoint("cpu", 2.0, now.Add(2*time.Second), nil)
94
95	var (
96		shardMappings *coordinator.ShardMapping
97		err           error
98	)
99	if shardMappings, err = c.MapShards(pr); err != nil {
100		t.Fatalf("unexpected an error: %v", err)
101	}
102
103	if got, exp := len(shardMappings.Points[0]), 2; got != exp {
104		t.Fatalf("got %d point(s), expected %d", got, exp)
105	}
106
107	if got, exp := len(shardMappings.Shards), 1; got != exp {
108		t.Errorf("got %d shard(s), expected %d", got, exp)
109	}
110
111	// Now we alter the retention policy duration.
112	rp.ShardGroupDuration = 3 * time.Hour
113
114	pr = &coordinator.WritePointsRequest{
115		Database:        "mydb",
116		RetentionPolicy: "myrp",
117	}
118	pr.AddPoint("cpu", 1.0, now.Add(2*time.Hour), nil)
119
120	// Point is beyond previous shard group so a new shard group should be
121	// created.
122	if _, err = c.MapShards(pr); err != nil {
123		t.Fatalf("unexpected an error: %v", err)
124	}
125
126	// We can check value of i since it's only incremeneted when a shard group
127	// is created.
128	if got, exp := i, 2; got != exp {
129		t.Fatal("new shard group was not created, expected it to be")
130	}
131}
132
133// Ensures the points writer maps a multiple points across shard group boundaries.
134func TestPointsWriter_MapShards_Multiple(t *testing.T) {
135	ms := PointsWriterMetaClient{}
136	rp := NewRetentionPolicy("myp", time.Hour, 3)
137	rp.ShardGroupDuration = time.Hour
138	AttachShardGroupInfo(rp, []meta.ShardOwner{
139		{NodeID: 1},
140		{NodeID: 2},
141		{NodeID: 3},
142	})
143	AttachShardGroupInfo(rp, []meta.ShardOwner{
144		{NodeID: 1},
145		{NodeID: 2},
146		{NodeID: 3},
147	})
148
149	ms.NodeIDFn = func() uint64 { return 1 }
150	ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
151		return rp, nil
152	}
153
154	ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
155		for i, sg := range rp.ShardGroups {
156			if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) {
157				return &rp.ShardGroups[i], nil
158			}
159		}
160		panic("should not get here")
161	}
162
163	c := coordinator.NewPointsWriter()
164	c.MetaClient = ms
165	defer c.Close()
166	pr := &coordinator.WritePointsRequest{
167		Database:        "mydb",
168		RetentionPolicy: "myrp",
169	}
170
171	// Three points that range over the shardGroup duration (1h) and should map to two
172	// distinct shards
173	pr.AddPoint("cpu", 1.0, time.Now(), nil)
174	pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil)
175	pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)
176
177	var (
178		shardMappings *coordinator.ShardMapping
179		err           error
180	)
181	if shardMappings, err = c.MapShards(pr); err != nil {
182		t.Fatalf("unexpected an error: %v", err)
183	}
184
185	if exp := 2; len(shardMappings.Points) != exp {
186		t.Errorf("MapShards() len mismatch. got %v, exp %v", len(shardMappings.Points), exp)
187	}
188
189	for _, points := range shardMappings.Points {
190		// First shard should have 1 point w/ first point added
191		if len(points) == 1 && points[0].Time() != pr.Points[0].Time() {
192			t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[0].Time())
193		}
194
195		// Second shard should have the last two points added
196		if len(points) == 2 && points[0].Time() != pr.Points[1].Time() {
197			t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[0].Time(), pr.Points[1].Time())
198		}
199
200		if len(points) == 2 && points[1].Time() != pr.Points[2].Time() {
201			t.Fatalf("MapShards() value mismatch. got %v, exp %v", points[1].Time(), pr.Points[2].Time())
202		}
203	}
204}
205
206// Ensures the points writer does not map points beyond the retention policy.
207func TestPointsWriter_MapShards_Invalid(t *testing.T) {
208	ms := PointsWriterMetaClient{}
209	rp := NewRetentionPolicy("myp", time.Hour, 3)
210
211	ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
212		return rp, nil
213	}
214
215	ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
216		return &rp.ShardGroups[0], nil
217	}
218
219	c := coordinator.NewPointsWriter()
220	c.MetaClient = ms
221	defer c.Close()
222	pr := &coordinator.WritePointsRequest{
223		Database:        "mydb",
224		RetentionPolicy: "myrp",
225	}
226
227	// Add a point that goes beyond the current retention policy.
228	pr.AddPoint("cpu", 1.0, time.Now().Add(-2*time.Hour), nil)
229
230	var (
231		shardMappings *coordinator.ShardMapping
232		err           error
233	)
234	if shardMappings, err = c.MapShards(pr); err != nil {
235		t.Fatalf("unexpected an error: %v", err)
236	}
237
238	if got, exp := len(shardMappings.Points), 0; got != exp {
239		t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp)
240	}
241
242	if got, exp := len(shardMappings.Dropped), 1; got != exp {
243		t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp)
244	}
245}
246
247func TestPointsWriter_WritePoints(t *testing.T) {
248	tests := []struct {
249		name            string
250		database        string
251		retentionPolicy string
252
253		// the responses returned by each shard write call.  node ID 1 = pos 0
254		err    []error
255		expErr error
256	}{
257		{
258			name:            "write one success",
259			database:        "mydb",
260			retentionPolicy: "myrp",
261			err:             []error{nil, nil, nil},
262			expErr:          nil,
263		},
264
265		// Write to non-existent database
266		{
267			name:            "write to non-existent database",
268			database:        "doesnt_exist",
269			retentionPolicy: "",
270			err:             []error{nil, nil, nil},
271			expErr:          fmt.Errorf("database not found: doesnt_exist"),
272		},
273	}
274
275	for _, test := range tests {
276
277		pr := &coordinator.WritePointsRequest{
278			Database:        test.database,
279			RetentionPolicy: test.retentionPolicy,
280		}
281
282		// Ensure that the test shard groups are created before the points
283		// are created.
284		ms := NewPointsWriterMetaClient()
285
286		// Three points that range over the shardGroup duration (1h) and should map to two
287		// distinct shards
288		pr.AddPoint("cpu", 1.0, time.Now(), nil)
289		pr.AddPoint("cpu", 2.0, time.Now().Add(time.Hour), nil)
290		pr.AddPoint("cpu", 3.0, time.Now().Add(time.Hour+time.Second), nil)
291
292		// copy to prevent data race
293		theTest := test
294		sm := coordinator.NewShardMapping(16)
295		sm.MapPoint(
296			&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
297				{NodeID: 1},
298				{NodeID: 2},
299				{NodeID: 3},
300			}},
301			pr.Points[0])
302		sm.MapPoint(
303			&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
304				{NodeID: 1},
305				{NodeID: 2},
306				{NodeID: 3},
307			}},
308			pr.Points[1])
309		sm.MapPoint(
310			&meta.ShardInfo{ID: uint64(2), Owners: []meta.ShardOwner{
311				{NodeID: 1},
312				{NodeID: 2},
313				{NodeID: 3},
314			}},
315			pr.Points[2])
316
317		// Local coordinator.Node ShardWriter
318		// lock on the write increment since these functions get called in parallel
319		var mu sync.Mutex
320
321		store := &fakeStore{
322			WriteFn: func(shardID uint64, points []models.Point) error {
323				mu.Lock()
324				defer mu.Unlock()
325				return theTest.err[0]
326			},
327		}
328
329		ms.DatabaseFn = func(database string) *meta.DatabaseInfo {
330			return nil
331		}
332		ms.NodeIDFn = func() uint64 { return 1 }
333
334		subPoints := make(chan *coordinator.WritePointsRequest, 1)
335		sub := Subscriber{}
336		sub.PointsFn = func() chan<- *coordinator.WritePointsRequest {
337			return subPoints
338		}
339
340		c := coordinator.NewPointsWriter()
341		c.MetaClient = ms
342		c.TSDBStore = store
343		c.AddWriteSubscriber(sub.Points())
344		c.Node = &influxdb.Node{ID: 1}
345
346		c.Open()
347		defer c.Close()
348
349		err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
350		if err == nil && test.expErr != nil {
351			t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
352		}
353
354		if err != nil && test.expErr == nil {
355			t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
356		}
357		if err != nil && test.expErr != nil && err.Error() != test.expErr.Error() {
358			t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: got %v, exp %v", test.name, err, test.expErr)
359		}
360		if test.expErr == nil {
361			select {
362			case p := <-subPoints:
363				if !reflect.DeepEqual(p, pr) {
364					t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: unexpected WritePointsRequest got %v, exp %v", test.name, p, pr)
365				}
366			default:
367				t.Errorf("PointsWriter.WritePointsPrivileged(): '%s' error: Subscriber.Points not called", test.name)
368			}
369		}
370	}
371}
372
373func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
374	pr := &coordinator.WritePointsRequest{
375		Database:        "mydb",
376		RetentionPolicy: "myrp",
377	}
378
379	// Ensure that the test shard groups are created before the points
380	// are created.
381	ms := NewPointsWriterMetaClient()
382
383	// Three points that range over the shardGroup duration (1h) and should map to two
384	// distinct shards
385	pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil)
386
387	// copy to prevent data race
388	sm := coordinator.NewShardMapping(16)
389
390	// ShardMapper dropped this point
391	sm.Dropped = append(sm.Dropped, pr.Points[0])
392
393	// Local coordinator.Node ShardWriter
394	// lock on the write increment since these functions get called in parallel
395	var mu sync.Mutex
396
397	store := &fakeStore{
398		WriteFn: func(shardID uint64, points []models.Point) error {
399			mu.Lock()
400			defer mu.Unlock()
401			return nil
402		},
403	}
404
405	ms.DatabaseFn = func(database string) *meta.DatabaseInfo {
406		return nil
407	}
408	ms.NodeIDFn = func() uint64 { return 1 }
409
410	subPoints := make(chan *coordinator.WritePointsRequest, 1)
411	sub := Subscriber{}
412	sub.PointsFn = func() chan<- *coordinator.WritePointsRequest {
413		return subPoints
414	}
415
416	c := coordinator.NewPointsWriter()
417	c.MetaClient = ms
418	c.TSDBStore = store
419	c.AddWriteSubscriber(sub.Points())
420	c.Node = &influxdb.Node{ID: 1}
421
422	c.Open()
423	defer c.Close()
424
425	err := c.WritePointsPrivileged(pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
426	if _, ok := err.(tsdb.PartialWriteError); !ok {
427		t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
428	}
429}
430
431type fakePointsWriter struct {
432	WritePointsIntoFn func(*coordinator.IntoWriteRequest) error
433}
434
435func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) error {
436	return f.WritePointsIntoFn(req)
437}
438
439func TestBufferedPointsWriter(t *testing.T) {
440	db := "db0"
441	rp := "rp0"
442	capacity := 10000
443
444	writePointsIntoCnt := 0
445	pointsWritten := []models.Point{}
446
447	reset := func() {
448		writePointsIntoCnt = 0
449		pointsWritten = pointsWritten[:0]
450	}
451
452	fakeWriter := &fakePointsWriter{
453		WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error {
454			writePointsIntoCnt++
455			pointsWritten = append(pointsWritten, req.Points...)
456			return nil
457		},
458	}
459
460	w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity)
461
462	// Test that capacity and length are correct for new buffered writer.
463	if w.Cap() != capacity {
464		t.Fatalf("exp %d, got %d", capacity, w.Cap())
465	} else if w.Len() != 0 {
466		t.Fatalf("exp %d, got %d", 0, w.Len())
467	}
468
469	// Test flushing an empty buffer.
470	if err := w.Flush(); err != nil {
471		t.Fatal(err)
472	} else if writePointsIntoCnt > 0 {
473		t.Fatalf("exp 0, got %d", writePointsIntoCnt)
474	}
475
476	// Test writing zero points.
477	if err := w.WritePointsInto(&coordinator.IntoWriteRequest{
478		Database:        db,
479		RetentionPolicy: rp,
480		Points:          []models.Point{},
481	}); err != nil {
482		t.Fatal(err)
483	} else if writePointsIntoCnt > 0 {
484		t.Fatalf("exp 0, got %d", writePointsIntoCnt)
485	} else if w.Len() > 0 {
486		t.Fatalf("exp 0, got %d", w.Len())
487	}
488
489	// Test writing single large bunch of points points.
490	req := coordinator.WritePointsRequest{
491		Database:        db,
492		RetentionPolicy: rp,
493	}
494
495	numPoints := int(float64(capacity) * 5.5)
496	for i := 0; i < numPoints; i++ {
497		req.AddPoint("cpu", float64(i), time.Now().Add(time.Duration(i)*time.Second), nil)
498	}
499
500	r := coordinator.IntoWriteRequest(req)
501	if err := w.WritePointsInto(&r); err != nil {
502		t.Fatal(err)
503	} else if writePointsIntoCnt != 5 {
504		t.Fatalf("exp 5, got %d", writePointsIntoCnt)
505	} else if w.Len() != capacity/2 {
506		t.Fatalf("exp %d, got %d", capacity/2, w.Len())
507	} else if len(pointsWritten) != numPoints-capacity/2 {
508		t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten))
509	}
510
511	if err := w.Flush(); err != nil {
512		t.Fatal(err)
513	} else if writePointsIntoCnt != 6 {
514		t.Fatalf("exp 6, got %d", writePointsIntoCnt)
515	} else if w.Len() != 0 {
516		t.Fatalf("exp 0, got %d", w.Len())
517	} else if len(pointsWritten) != numPoints {
518		t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten))
519	} else if !reflect.DeepEqual(r.Points, pointsWritten) {
520		t.Fatal("points don't match")
521	}
522
523	reset()
524
525	// Test writing points one at a time.
526	for i, _ := range r.Points {
527		if err := w.WritePointsInto(&coordinator.IntoWriteRequest{
528			Database:        db,
529			RetentionPolicy: rp,
530			Points:          r.Points[i : i+1],
531		}); err != nil {
532			t.Fatal(err)
533		}
534	}
535
536	if err := w.Flush(); err != nil {
537		t.Fatal(err)
538	} else if writePointsIntoCnt != 6 {
539		t.Fatalf("exp 6, got %d", writePointsIntoCnt)
540	} else if w.Len() != 0 {
541		t.Fatalf("exp 0, got %d", w.Len())
542	} else if len(pointsWritten) != numPoints {
543		t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten))
544	} else if !reflect.DeepEqual(r.Points, pointsWritten) {
545		t.Fatal("points don't match")
546	}
547}
548
549var shardID uint64
550
551type fakeStore struct {
552	WriteFn       func(shardID uint64, points []models.Point) error
553	CreateShardfn func(database, retentionPolicy string, shardID uint64, enabled bool) error
554}
555
556func (f *fakeStore) WriteToShard(shardID uint64, points []models.Point) error {
557	return f.WriteFn(shardID, points)
558}
559
560func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64, enabled bool) error {
561	return f.CreateShardfn(database, retentionPolicy, shardID, enabled)
562}
563
564func NewPointsWriterMetaClient() *PointsWriterMetaClient {
565	ms := &PointsWriterMetaClient{}
566	rp := NewRetentionPolicy("myp", time.Hour, 3)
567	AttachShardGroupInfo(rp, []meta.ShardOwner{
568		{NodeID: 1},
569		{NodeID: 2},
570		{NodeID: 3},
571	})
572	AttachShardGroupInfo(rp, []meta.ShardOwner{
573		{NodeID: 1},
574		{NodeID: 2},
575		{NodeID: 3},
576	})
577
578	ms.RetentionPolicyFn = func(db, retentionPolicy string) (*meta.RetentionPolicyInfo, error) {
579		return rp, nil
580	}
581
582	ms.CreateShardGroupIfNotExistsFn = func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
583		for i, sg := range rp.ShardGroups {
584			if timestamp.Equal(sg.StartTime) || timestamp.After(sg.StartTime) && timestamp.Before(sg.EndTime) {
585				return &rp.ShardGroups[i], nil
586			}
587		}
588		panic("should not get here")
589	}
590	return ms
591}
592
593type PointsWriterMetaClient struct {
594	NodeIDFn                      func() uint64
595	RetentionPolicyFn             func(database, name string) (*meta.RetentionPolicyInfo, error)
596	CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
597	DatabaseFn                    func(database string) *meta.DatabaseInfo
598	ShardOwnerFn                  func(shardID uint64) (string, string, *meta.ShardGroupInfo)
599}
600
601func (m PointsWriterMetaClient) NodeID() uint64 { return m.NodeIDFn() }
602
603func (m PointsWriterMetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
604	return m.RetentionPolicyFn(database, name)
605}
606
607func (m PointsWriterMetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
608	return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
609}
610
611func (m PointsWriterMetaClient) Database(database string) *meta.DatabaseInfo {
612	return m.DatabaseFn(database)
613}
614
615func (m PointsWriterMetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) {
616	return m.ShardOwnerFn(shardID)
617}
618
619type Subscriber struct {
620	PointsFn func() chan<- *coordinator.WritePointsRequest
621}
622
623func (s Subscriber) Points() chan<- *coordinator.WritePointsRequest {
624	return s.PointsFn()
625}
626
627func NewRetentionPolicy(name string, duration time.Duration, nodeCount int) *meta.RetentionPolicyInfo {
628	shards := []meta.ShardInfo{}
629	owners := []meta.ShardOwner{}
630	for i := 1; i <= nodeCount; i++ {
631		owners = append(owners, meta.ShardOwner{NodeID: uint64(i)})
632	}
633
634	// each node is fully replicated with each other
635	shards = append(shards, meta.ShardInfo{
636		ID:     nextShardID(),
637		Owners: owners,
638	})
639
640	start := time.Now()
641	rp := &meta.RetentionPolicyInfo{
642		Name:               "myrp",
643		ReplicaN:           nodeCount,
644		Duration:           duration,
645		ShardGroupDuration: duration,
646		ShardGroups: []meta.ShardGroupInfo{
647			meta.ShardGroupInfo{
648				ID:        nextShardID(),
649				StartTime: start,
650				EndTime:   start.Add(duration).Add(-1),
651				Shards:    shards,
652			},
653		},
654	}
655	return rp
656}
657
658func AttachShardGroupInfo(rp *meta.RetentionPolicyInfo, owners []meta.ShardOwner) {
659	var startTime, endTime time.Time
660	if len(rp.ShardGroups) == 0 {
661		startTime = time.Now()
662	} else {
663		startTime = rp.ShardGroups[len(rp.ShardGroups)-1].StartTime.Add(rp.ShardGroupDuration)
664	}
665	endTime = startTime.Add(rp.ShardGroupDuration).Add(-1)
666
667	sh := meta.ShardGroupInfo{
668		ID:        uint64(len(rp.ShardGroups) + 1),
669		StartTime: startTime,
670		EndTime:   endTime,
671		Shards: []meta.ShardInfo{
672			meta.ShardInfo{
673				ID:     nextShardID(),
674				Owners: owners,
675			},
676		},
677	}
678	rp.ShardGroups = append(rp.ShardGroups, sh)
679}
680
681func nextShardID() uint64 {
682	return atomic.AddUint64(&shardID, 1)
683}
684