1package ledis
2
3import (
4	"bytes"
5	"encoding/binary"
6	"errors"
7	"time"
8
9	"github.com/siddontang/go/hack"
10	"github.com/siddontang/ledisdb/store"
11)
12
13// For zset const.
14const (
15	MinScore     int64 = -1<<63 + 1
16	MaxScore     int64 = 1<<63 - 1
17	InvalidScore int64 = -1 << 63
18
19	AggregateSum byte = 0
20	AggregateMin byte = 1
21	AggregateMax byte = 2
22)
23
24// ScorePair is the pair of score and member.
25type ScorePair struct {
26	Score  int64
27	Member []byte
28}
29
30var errZSizeKey = errors.New("invalid zsize key")
31var errZSetKey = errors.New("invalid zset key")
32var errZScoreKey = errors.New("invalid zscore key")
33var errScoreOverflow = errors.New("zset score overflow")
34var errInvalidAggregate = errors.New("invalid aggregate")
35var errInvalidWeightNum = errors.New("invalid weight number")
36var errInvalidSrcKeyNum = errors.New("invalid src key number")
37
38const (
39	zsetNScoreSep    byte = '<'
40	zsetPScoreSep    byte = zsetNScoreSep + 1
41	zsetStopScoreSep byte = zsetPScoreSep + 1
42
43	zsetStartMemSep byte = ':'
44	zsetStopMemSep  byte = zsetStartMemSep + 1
45)
46
47func checkZSetKMSize(key []byte, member []byte) error {
48	if len(key) > MaxKeySize || len(key) == 0 {
49		return errKeySize
50	} else if len(member) > MaxZSetMemberSize || len(member) == 0 {
51		return errZSetMemberSize
52	}
53	return nil
54}
55
56func (db *DB) zEncodeSizeKey(key []byte) []byte {
57	buf := make([]byte, len(key)+1+len(db.indexVarBuf))
58	pos := copy(buf, db.indexVarBuf)
59	buf[pos] = ZSizeType
60	pos++
61	copy(buf[pos:], key)
62	return buf
63}
64
65func (db *DB) zDecodeSizeKey(ek []byte) ([]byte, error) {
66	pos, err := db.checkKeyIndex(ek)
67	if err != nil {
68		return nil, err
69	}
70
71	if pos+1 > len(ek) || ek[pos] != ZSizeType {
72		return nil, errZSizeKey
73	}
74	pos++
75	return ek[pos:], nil
76}
77
78func (db *DB) zEncodeSetKey(key []byte, member []byte) []byte {
79	buf := make([]byte, len(key)+len(member)+4+len(db.indexVarBuf))
80
81	pos := copy(buf, db.indexVarBuf)
82
83	buf[pos] = ZSetType
84	pos++
85
86	binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
87	pos += 2
88
89	copy(buf[pos:], key)
90	pos += len(key)
91
92	buf[pos] = zsetStartMemSep
93	pos++
94
95	copy(buf[pos:], member)
96
97	return buf
98}
99
100func (db *DB) zDecodeSetKey(ek []byte) ([]byte, []byte, error) {
101	pos, err := db.checkKeyIndex(ek)
102	if err != nil {
103		return nil, nil, err
104	}
105
106	if pos+1 > len(ek) || ek[pos] != ZSetType {
107		return nil, nil, errZSetKey
108	}
109
110	pos++
111
112	if pos+2 > len(ek) {
113		return nil, nil, errZSetKey
114	}
115
116	keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
117	if keyLen+pos > len(ek) {
118		return nil, nil, errZSetKey
119	}
120
121	pos += 2
122	key := ek[pos : pos+keyLen]
123
124	if ek[pos+keyLen] != zsetStartMemSep {
125		return nil, nil, errZSetKey
126	}
127	pos++
128
129	member := ek[pos+keyLen:]
130	return key, member, nil
131}
132
133func (db *DB) zEncodeStartSetKey(key []byte) []byte {
134	k := db.zEncodeSetKey(key, nil)
135	return k
136}
137
138func (db *DB) zEncodeStopSetKey(key []byte) []byte {
139	k := db.zEncodeSetKey(key, nil)
140	k[len(k)-1] = zsetStartMemSep + 1
141	return k
142}
143
144func (db *DB) zEncodeScoreKey(key []byte, member []byte, score int64) []byte {
145	buf := make([]byte, len(key)+len(member)+13+len(db.indexVarBuf))
146
147	pos := copy(buf, db.indexVarBuf)
148
149	buf[pos] = ZScoreType
150	pos++
151
152	binary.BigEndian.PutUint16(buf[pos:], uint16(len(key)))
153	pos += 2
154
155	copy(buf[pos:], key)
156	pos += len(key)
157
158	if score < 0 {
159		buf[pos] = zsetNScoreSep
160	} else {
161		buf[pos] = zsetPScoreSep
162	}
163
164	pos++
165	binary.BigEndian.PutUint64(buf[pos:], uint64(score))
166	pos += 8
167
168	buf[pos] = zsetStartMemSep
169	pos++
170
171	copy(buf[pos:], member)
172	return buf
173}
174
175func (db *DB) zEncodeStartScoreKey(key []byte, score int64) []byte {
176	return db.zEncodeScoreKey(key, nil, score)
177}
178
179func (db *DB) zEncodeStopScoreKey(key []byte, score int64) []byte {
180	k := db.zEncodeScoreKey(key, nil, score)
181	k[len(k)-1] = zsetStopMemSep
182	return k
183}
184
185func (db *DB) zDecodeScoreKey(ek []byte) (key []byte, member []byte, score int64, err error) {
186	pos := 0
187	pos, err = db.checkKeyIndex(ek)
188	if err != nil {
189		return
190	}
191
192	if pos+1 > len(ek) || ek[pos] != ZScoreType {
193		err = errZScoreKey
194		return
195	}
196	pos++
197
198	if pos+2 > len(ek) {
199		err = errZScoreKey
200		return
201	}
202	keyLen := int(binary.BigEndian.Uint16(ek[pos:]))
203	pos += 2
204
205	if keyLen+pos > len(ek) {
206		err = errZScoreKey
207		return
208	}
209
210	key = ek[pos : pos+keyLen]
211	pos += keyLen
212
213	if pos+10 > len(ek) {
214		err = errZScoreKey
215		return
216	}
217
218	if (ek[pos] != zsetNScoreSep) && (ek[pos] != zsetPScoreSep) {
219		err = errZScoreKey
220		return
221	}
222	pos++
223
224	score = int64(binary.BigEndian.Uint64(ek[pos:]))
225	pos += 8
226
227	if ek[pos] != zsetStartMemSep {
228		err = errZScoreKey
229		return
230	}
231
232	pos++
233
234	member = ek[pos:]
235	return
236}
237
238func (db *DB) zSetItem(t *batch, key []byte, score int64, member []byte) (int64, error) {
239	if score <= MinScore || score >= MaxScore {
240		return 0, errScoreOverflow
241	}
242
243	var exists int64
244	ek := db.zEncodeSetKey(key, member)
245
246	if v, err := db.bucket.Get(ek); err != nil {
247		return 0, err
248	} else if v != nil {
249		exists = 1
250
251		s, err := Int64(v, err)
252		if err != nil {
253			return 0, err
254		}
255
256		sk := db.zEncodeScoreKey(key, member, s)
257		t.Delete(sk)
258	}
259
260	t.Put(ek, PutInt64(score))
261
262	sk := db.zEncodeScoreKey(key, member, score)
263	t.Put(sk, []byte{})
264
265	return exists, nil
266}
267
268func (db *DB) zDelItem(t *batch, key []byte, member []byte, skipDelScore bool) (int64, error) {
269	ek := db.zEncodeSetKey(key, member)
270	if v, err := db.bucket.Get(ek); err != nil {
271		return 0, err
272	} else if v == nil {
273		//not exists
274		return 0, nil
275	} else {
276		//exists
277		if !skipDelScore {
278			//we must del score
279			s, err := Int64(v, err)
280			if err != nil {
281				return 0, err
282			}
283			sk := db.zEncodeScoreKey(key, member, s)
284			t.Delete(sk)
285		}
286	}
287
288	t.Delete(ek)
289
290	return 1, nil
291}
292
293func (db *DB) zDelete(t *batch, key []byte) int64 {
294	delMembCnt, _ := db.zRemRange(t, key, MinScore, MaxScore, 0, -1)
295	//	todo : log err
296	return delMembCnt
297}
298
299func (db *DB) zExpireAt(key []byte, when int64) (int64, error) {
300	t := db.zsetBatch
301	t.Lock()
302	defer t.Unlock()
303
304	if zcnt, err := db.ZCard(key); err != nil || zcnt == 0 {
305		return 0, err
306	}
307
308	db.expireAt(t, ZSetType, key, when)
309	if err := t.Commit(); err != nil {
310		return 0, err
311	}
312
313	return 1, nil
314}
315
316// ZAdd add the members.
317func (db *DB) ZAdd(key []byte, args ...ScorePair) (int64, error) {
318	if len(args) == 0 {
319		return 0, nil
320	}
321
322	t := db.zsetBatch
323	t.Lock()
324	defer t.Unlock()
325
326	var num int64
327	for i := 0; i < len(args); i++ {
328		score := args[i].Score
329		member := args[i].Member
330
331		if err := checkZSetKMSize(key, member); err != nil {
332			return 0, err
333		}
334
335		if n, err := db.zSetItem(t, key, score, member); err != nil {
336			return 0, err
337		} else if n == 0 {
338			//add new
339			num++
340		}
341	}
342
343	if _, err := db.zIncrSize(t, key, num); err != nil {
344		return 0, err
345	}
346
347	err := t.Commit()
348	return num, err
349}
350
351func (db *DB) zIncrSize(t *batch, key []byte, delta int64) (int64, error) {
352	sk := db.zEncodeSizeKey(key)
353
354	size, err := Int64(db.bucket.Get(sk))
355	if err != nil {
356		return 0, err
357	}
358	size += delta
359	if size <= 0 {
360		size = 0
361		t.Delete(sk)
362		db.rmExpire(t, ZSetType, key)
363	} else {
364		t.Put(sk, PutInt64(size))
365	}
366
367	return size, nil
368}
369
370// ZCard gets the size of the zset.
371func (db *DB) ZCard(key []byte) (int64, error) {
372	if err := checkKeySize(key); err != nil {
373		return 0, err
374	}
375
376	sk := db.zEncodeSizeKey(key)
377	return Int64(db.bucket.Get(sk))
378}
379
380// ZScore gets the score of member.
381func (db *DB) ZScore(key []byte, member []byte) (int64, error) {
382	if err := checkZSetKMSize(key, member); err != nil {
383		return InvalidScore, err
384	}
385
386	score := InvalidScore
387
388	k := db.zEncodeSetKey(key, member)
389	if v, err := db.bucket.Get(k); err != nil {
390		return InvalidScore, err
391	} else if v == nil {
392		return InvalidScore, ErrScoreMiss
393	} else {
394		if score, err = Int64(v, nil); err != nil {
395			return InvalidScore, err
396		}
397	}
398
399	return score, nil
400}
401
402// ZRem removes members
403func (db *DB) ZRem(key []byte, members ...[]byte) (int64, error) {
404	if len(members) == 0 {
405		return 0, nil
406	}
407
408	t := db.zsetBatch
409	t.Lock()
410	defer t.Unlock()
411
412	var num int64
413	for i := 0; i < len(members); i++ {
414		if err := checkZSetKMSize(key, members[i]); err != nil {
415			return 0, err
416		}
417
418		if n, err := db.zDelItem(t, key, members[i], false); err != nil {
419			return 0, err
420		} else if n == 1 {
421			num++
422		}
423	}
424
425	if _, err := db.zIncrSize(t, key, -num); err != nil {
426		return 0, err
427	}
428
429	err := t.Commit()
430	return num, err
431}
432
433// ZIncrBy increases the score of member with delta.
434func (db *DB) ZIncrBy(key []byte, delta int64, member []byte) (int64, error) {
435	if err := checkZSetKMSize(key, member); err != nil {
436		return InvalidScore, err
437	}
438
439	t := db.zsetBatch
440	t.Lock()
441	defer t.Unlock()
442
443	ek := db.zEncodeSetKey(key, member)
444
445	var oldScore int64
446	v, err := db.bucket.Get(ek)
447	if err != nil {
448		return InvalidScore, err
449	} else if v == nil {
450		db.zIncrSize(t, key, 1)
451	} else {
452		if oldScore, err = Int64(v, err); err != nil {
453			return InvalidScore, err
454		}
455	}
456
457	newScore := oldScore + delta
458	if newScore >= MaxScore || newScore <= MinScore {
459		return InvalidScore, errScoreOverflow
460	}
461
462	sk := db.zEncodeScoreKey(key, member, newScore)
463	t.Put(sk, []byte{})
464	t.Put(ek, PutInt64(newScore))
465
466	if v != nil {
467		// so as to update score, we must delete the old one
468		oldSk := db.zEncodeScoreKey(key, member, oldScore)
469		t.Delete(oldSk)
470	}
471
472	err = t.Commit()
473	return newScore, err
474}
475
476// ZCount gets the number of score in [min, max]
477func (db *DB) ZCount(key []byte, min int64, max int64) (int64, error) {
478	if err := checkKeySize(key); err != nil {
479		return 0, err
480	}
481	minKey := db.zEncodeStartScoreKey(key, min)
482	maxKey := db.zEncodeStopScoreKey(key, max)
483
484	rangeType := store.RangeROpen
485
486	it := db.bucket.RangeLimitIterator(minKey, maxKey, rangeType, 0, -1)
487	var n int64
488	for ; it.Valid(); it.Next() {
489		n++
490	}
491	it.Close()
492
493	return n, nil
494}
495
496func (db *DB) zrank(key []byte, member []byte, reverse bool) (int64, error) {
497	if err := checkZSetKMSize(key, member); err != nil {
498		return 0, err
499	}
500
501	k := db.zEncodeSetKey(key, member)
502
503	it := db.bucket.NewIterator()
504	defer it.Close()
505
506	v := it.Find(k)
507	if v == nil {
508		return -1, nil
509	}
510
511	s, err := Int64(v, nil)
512	if err != nil {
513		return 0, err
514	}
515	var rit *store.RangeLimitIterator
516
517	sk := db.zEncodeScoreKey(key, member, s)
518
519	if !reverse {
520		minKey := db.zEncodeStartScoreKey(key, MinScore)
521
522		rit = store.NewRangeIterator(it, &store.Range{Min: minKey, Max: sk, Type: store.RangeClose})
523	} else {
524		maxKey := db.zEncodeStopScoreKey(key, MaxScore)
525		rit = store.NewRevRangeIterator(it, &store.Range{Min: sk, Max: maxKey, Type: store.RangeClose})
526	}
527
528	var lastKey []byte
529	var n int64
530
531	for ; rit.Valid(); rit.Next() {
532		n++
533
534		lastKey = rit.BufKey(lastKey)
535	}
536
537	if _, m, _, err := db.zDecodeScoreKey(lastKey); err == nil && bytes.Equal(m, member) {
538		n--
539		return n, nil
540	}
541
542	return -1, nil
543}
544
545func (db *DB) zIterator(key []byte, min int64, max int64, offset int, count int, reverse bool) *store.RangeLimitIterator {
546	minKey := db.zEncodeStartScoreKey(key, min)
547	maxKey := db.zEncodeStopScoreKey(key, max)
548
549	if !reverse {
550		return db.bucket.RangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
551	}
552	return db.bucket.RevRangeLimitIterator(minKey, maxKey, store.RangeClose, offset, count)
553}
554
555func (db *DB) zRemRange(t *batch, key []byte, min int64, max int64, offset int, count int) (int64, error) {
556	if len(key) > MaxKeySize {
557		return 0, errKeySize
558	}
559
560	it := db.zIterator(key, min, max, offset, count, false)
561	var num int64
562	for ; it.Valid(); it.Next() {
563		sk := it.RawKey()
564		_, m, _, err := db.zDecodeScoreKey(sk)
565		if err != nil {
566			continue
567		}
568
569		if n, err := db.zDelItem(t, key, m, true); err != nil {
570			return 0, err
571		} else if n == 1 {
572			num++
573		}
574
575		t.Delete(sk)
576	}
577	it.Close()
578
579	if _, err := db.zIncrSize(t, key, -num); err != nil {
580		return 0, err
581	}
582
583	return num, nil
584}
585
586func (db *DB) zRange(key []byte, min int64, max int64, offset int, count int, reverse bool) ([]ScorePair, error) {
587	if len(key) > MaxKeySize {
588		return nil, errKeySize
589	}
590
591	if offset < 0 {
592		return []ScorePair{}, nil
593	}
594
595	nv := count
596	// count may be very large, so we must limit it for below mem make.
597	if nv <= 0 || nv > 1024 {
598		nv = 64
599	}
600
601	v := make([]ScorePair, 0, nv)
602
603	var it *store.RangeLimitIterator
604
605	//if reverse and offset is 0, count < 0, we may use forward iterator then reverse
606	//because store iterator prev is slower than next
607	if !reverse || (offset == 0 && count < 0) {
608		it = db.zIterator(key, min, max, offset, count, false)
609	} else {
610		it = db.zIterator(key, min, max, offset, count, true)
611	}
612
613	for ; it.Valid(); it.Next() {
614		_, m, s, err := db.zDecodeScoreKey(it.Key())
615		//may be we will check key equal?
616		if err != nil {
617			continue
618		}
619
620		v = append(v, ScorePair{Member: m, Score: s})
621	}
622	it.Close()
623
624	if reverse && (offset == 0 && count < 0) {
625		for i, j := 0, len(v)-1; i < j; i, j = i+1, j-1 {
626			v[i], v[j] = v[j], v[i]
627		}
628	}
629
630	return v, nil
631}
632
633func (db *DB) zParseLimit(key []byte, start int, stop int) (offset int, count int, err error) {
634	if start < 0 || stop < 0 {
635		//refer redis implementation
636		var size int64
637		size, err = db.ZCard(key)
638		if err != nil {
639			return
640		}
641
642		llen := int(size)
643
644		if start < 0 {
645			start = llen + start
646		}
647		if stop < 0 {
648			stop = llen + stop
649		}
650
651		if start < 0 {
652			start = 0
653		}
654
655		if start >= llen {
656			offset = -1
657			return
658		}
659	}
660
661	if start > stop {
662		offset = -1
663		return
664	}
665
666	offset = start
667	count = (stop - start) + 1
668	return
669}
670
671// ZClear clears the zset.
672func (db *DB) ZClear(key []byte) (int64, error) {
673	t := db.zsetBatch
674	t.Lock()
675	defer t.Unlock()
676
677	rmCnt, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1)
678	if err == nil {
679		err = t.Commit()
680	}
681
682	return rmCnt, err
683}
684
685// ZMclear clears multi zsets.
686func (db *DB) ZMclear(keys ...[]byte) (int64, error) {
687	t := db.zsetBatch
688	t.Lock()
689	defer t.Unlock()
690
691	for _, key := range keys {
692		if _, err := db.zRemRange(t, key, MinScore, MaxScore, 0, -1); err != nil {
693			return 0, err
694		}
695	}
696
697	err := t.Commit()
698
699	return int64(len(keys)), err
700}
701
702// ZRange gets the members from start to stop.
703func (db *DB) ZRange(key []byte, start int, stop int) ([]ScorePair, error) {
704	return db.ZRangeGeneric(key, start, stop, false)
705}
706
707// ZRangeByScore gets the data with score in min and max.
708// min and max must be inclusive
709// if no limit, set offset = 0 and count = -1
710func (db *DB) ZRangeByScore(key []byte, min int64, max int64,
711	offset int, count int) ([]ScorePair, error) {
712	return db.ZRangeByScoreGeneric(key, min, max, offset, count, false)
713}
714
715// ZRank gets the rank of member.
716func (db *DB) ZRank(key []byte, member []byte) (int64, error) {
717	return db.zrank(key, member, false)
718}
719
720// ZRemRangeByRank removes the member at range from start to stop.
721func (db *DB) ZRemRangeByRank(key []byte, start int, stop int) (int64, error) {
722	offset, count, err := db.zParseLimit(key, start, stop)
723	if err != nil {
724		return 0, err
725	}
726
727	var rmCnt int64
728
729	t := db.zsetBatch
730	t.Lock()
731	defer t.Unlock()
732
733	rmCnt, err = db.zRemRange(t, key, MinScore, MaxScore, offset, count)
734	if err == nil {
735		err = t.Commit()
736	}
737
738	return rmCnt, err
739}
740
741// ZRemRangeByScore removes the data with score at [min, max]
742func (db *DB) ZRemRangeByScore(key []byte, min int64, max int64) (int64, error) {
743	t := db.zsetBatch
744	t.Lock()
745	defer t.Unlock()
746
747	rmCnt, err := db.zRemRange(t, key, min, max, 0, -1)
748	if err == nil {
749		err = t.Commit()
750	}
751
752	return rmCnt, err
753}
754
755// ZRevRange gets the data reversed.
756func (db *DB) ZRevRange(key []byte, start int, stop int) ([]ScorePair, error) {
757	return db.ZRangeGeneric(key, start, stop, true)
758}
759
760// ZRevRank gets the rank of member reversed.
761func (db *DB) ZRevRank(key []byte, member []byte) (int64, error) {
762	return db.zrank(key, member, true)
763}
764
765// ZRevRangeByScore gets the data with score at [min, max]
766// min and max must be inclusive
767// if no limit, set offset = 0 and count = -1
768func (db *DB) ZRevRangeByScore(key []byte, min int64, max int64, offset int, count int) ([]ScorePair, error) {
769	return db.ZRangeByScoreGeneric(key, min, max, offset, count, true)
770}
771
772// ZRangeGeneric is a generic function for scan zset.
773func (db *DB) ZRangeGeneric(key []byte, start int, stop int, reverse bool) ([]ScorePair, error) {
774	offset, count, err := db.zParseLimit(key, start, stop)
775	if err != nil {
776		return nil, err
777	}
778
779	return db.zRange(key, MinScore, MaxScore, offset, count, reverse)
780}
781
782// ZRangeByScoreGeneric is a generic function to scan zset with score.
783// min and max must be inclusive
784// if no limit, set offset = 0 and count = -1
785func (db *DB) ZRangeByScoreGeneric(key []byte, min int64, max int64,
786	offset int, count int, reverse bool) ([]ScorePair, error) {
787
788	return db.zRange(key, min, max, offset, count, reverse)
789}
790
791func (db *DB) zFlush() (drop int64, err error) {
792	t := db.zsetBatch
793	t.Lock()
794	defer t.Unlock()
795	return db.flushType(t, ZSetType)
796}
797
798// ZExpire expires the zset.
799func (db *DB) ZExpire(key []byte, duration int64) (int64, error) {
800	if duration <= 0 {
801		return 0, errExpireValue
802	}
803
804	return db.zExpireAt(key, time.Now().Unix()+duration)
805}
806
807// ZExpireAt expires the zset at when.
808func (db *DB) ZExpireAt(key []byte, when int64) (int64, error) {
809	if when <= time.Now().Unix() {
810		return 0, errExpireValue
811	}
812
813	return db.zExpireAt(key, when)
814}
815
816// ZTTL gets the TTL of zset.
817func (db *DB) ZTTL(key []byte) (int64, error) {
818	if err := checkKeySize(key); err != nil {
819		return -1, err
820	}
821
822	return db.ttl(ZSetType, key)
823}
824
825// ZPersist removes the TTL of zset.
826func (db *DB) ZPersist(key []byte) (int64, error) {
827	if err := checkKeySize(key); err != nil {
828		return 0, err
829	}
830
831	t := db.zsetBatch
832	t.Lock()
833	defer t.Unlock()
834
835	n, err := db.rmExpire(t, ZSetType, key)
836	if err != nil {
837		return 0, err
838	}
839
840	err = t.Commit()
841	return n, err
842}
843
844func getAggregateFunc(aggregate byte) func(int64, int64) int64 {
845	switch aggregate {
846	case AggregateSum:
847		return func(a int64, b int64) int64 {
848			return a + b
849		}
850	case AggregateMax:
851		return func(a int64, b int64) int64 {
852			if a > b {
853				return a
854			}
855			return b
856		}
857	case AggregateMin:
858		return func(a int64, b int64) int64 {
859			if a > b {
860				return b
861			}
862			return a
863		}
864	}
865	return nil
866}
867
868// ZUnionStore unions the zsets and stores to dest zset.
869func (db *DB) ZUnionStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
870
871	var destMap = map[string]int64{}
872	aggregateFunc := getAggregateFunc(aggregate)
873	if aggregateFunc == nil {
874		return 0, errInvalidAggregate
875	}
876	if len(srcKeys) < 1 {
877		return 0, errInvalidSrcKeyNum
878	}
879	if weights != nil {
880		if len(srcKeys) != len(weights) {
881			return 0, errInvalidWeightNum
882		}
883	} else {
884		weights = make([]int64, len(srcKeys))
885		for i := 0; i < len(weights); i++ {
886			weights[i] = 1
887		}
888	}
889
890	for i, key := range srcKeys {
891		scorePairs, err := db.ZRange(key, 0, -1)
892		if err != nil {
893			return 0, err
894		}
895		for _, pair := range scorePairs {
896			if score, ok := destMap[hack.String(pair.Member)]; !ok {
897				destMap[hack.String(pair.Member)] = pair.Score * weights[i]
898			} else {
899				destMap[hack.String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i])
900			}
901		}
902	}
903
904	t := db.zsetBatch
905	t.Lock()
906	defer t.Unlock()
907
908	db.zDelete(t, destKey)
909
910	for member, score := range destMap {
911		if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
912			return 0, err
913		}
914
915		if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
916			return 0, err
917		}
918	}
919
920	var n = int64(len(destMap))
921	sk := db.zEncodeSizeKey(destKey)
922	t.Put(sk, PutInt64(n))
923
924	if err := t.Commit(); err != nil {
925		return 0, err
926	}
927	return n, nil
928}
929
930// ZInterStore intersects the zsets and stores to dest zset.
931func (db *DB) ZInterStore(destKey []byte, srcKeys [][]byte, weights []int64, aggregate byte) (int64, error) {
932
933	aggregateFunc := getAggregateFunc(aggregate)
934	if aggregateFunc == nil {
935		return 0, errInvalidAggregate
936	}
937	if len(srcKeys) < 1 {
938		return 0, errInvalidSrcKeyNum
939	}
940	if weights != nil {
941		if len(srcKeys) != len(weights) {
942			return 0, errInvalidWeightNum
943		}
944	} else {
945		weights = make([]int64, len(srcKeys))
946		for i := 0; i < len(weights); i++ {
947			weights[i] = 1
948		}
949	}
950
951	var destMap = map[string]int64{}
952	scorePairs, err := db.ZRange(srcKeys[0], 0, -1)
953	if err != nil {
954		return 0, err
955	}
956	for _, pair := range scorePairs {
957		destMap[hack.String(pair.Member)] = pair.Score * weights[0]
958	}
959
960	for i, key := range srcKeys[1:] {
961		scorePairs, err := db.ZRange(key, 0, -1)
962		if err != nil {
963			return 0, err
964		}
965		tmpMap := map[string]int64{}
966		for _, pair := range scorePairs {
967			if score, ok := destMap[hack.String(pair.Member)]; ok {
968				tmpMap[hack.String(pair.Member)] = aggregateFunc(score, pair.Score*weights[i+1])
969			}
970		}
971		destMap = tmpMap
972	}
973
974	t := db.zsetBatch
975	t.Lock()
976	defer t.Unlock()
977
978	db.zDelete(t, destKey)
979
980	for member, score := range destMap {
981		if err := checkZSetKMSize(destKey, []byte(member)); err != nil {
982			return 0, err
983		}
984		if _, err := db.zSetItem(t, destKey, score, []byte(member)); err != nil {
985			return 0, err
986		}
987	}
988
989	n := int64(len(destMap))
990	sk := db.zEncodeSizeKey(destKey)
991	t.Put(sk, PutInt64(n))
992
993	if err := t.Commit(); err != nil {
994		return 0, err
995	}
996	return n, nil
997}
998
999// ZRangeByLex scans the zset lexicographically
1000func (db *DB) ZRangeByLex(key []byte, min []byte, max []byte, rangeType uint8, offset int, count int) ([][]byte, error) {
1001	if min == nil {
1002		min = db.zEncodeStartSetKey(key)
1003	} else {
1004		min = db.zEncodeSetKey(key, min)
1005	}
1006	if max == nil {
1007		max = db.zEncodeStopSetKey(key)
1008	} else {
1009		max = db.zEncodeSetKey(key, max)
1010	}
1011
1012	it := db.bucket.RangeLimitIterator(min, max, rangeType, offset, count)
1013	defer it.Close()
1014
1015	ay := make([][]byte, 0, 16)
1016	for ; it.Valid(); it.Next() {
1017		if _, m, err := db.zDecodeSetKey(it.Key()); err == nil {
1018			ay = append(ay, m)
1019		}
1020	}
1021
1022	return ay, nil
1023}
1024
1025// ZRemRangeByLex remvoes members in [min, max] lexicographically
1026func (db *DB) ZRemRangeByLex(key []byte, min []byte, max []byte, rangeType uint8) (int64, error) {
1027	if min == nil {
1028		min = db.zEncodeStartSetKey(key)
1029	} else {
1030		min = db.zEncodeSetKey(key, min)
1031	}
1032	if max == nil {
1033		max = db.zEncodeStopSetKey(key)
1034	} else {
1035		max = db.zEncodeSetKey(key, max)
1036	}
1037
1038	t := db.zsetBatch
1039	t.Lock()
1040	defer t.Unlock()
1041
1042	it := db.bucket.RangeIterator(min, max, rangeType)
1043	defer it.Close()
1044
1045	var n int64
1046	for ; it.Valid(); it.Next() {
1047		t.Delete(it.RawKey())
1048		n++
1049	}
1050
1051	if err := t.Commit(); err != nil {
1052		return 0, err
1053	}
1054
1055	return n, nil
1056}
1057
1058// ZLexCount gets the count of zset lexicographically.
1059func (db *DB) ZLexCount(key []byte, min []byte, max []byte, rangeType uint8) (int64, error) {
1060	if min == nil {
1061		min = db.zEncodeStartSetKey(key)
1062	} else {
1063		min = db.zEncodeSetKey(key, min)
1064	}
1065	if max == nil {
1066		max = db.zEncodeStopSetKey(key)
1067	} else {
1068		max = db.zEncodeSetKey(key, max)
1069	}
1070
1071	it := db.bucket.RangeIterator(min, max, rangeType)
1072	defer it.Close()
1073
1074	var n int64
1075	for ; it.Valid(); it.Next() {
1076		n++
1077	}
1078
1079	return n, nil
1080}
1081
1082// ZKeyExists checks zset existed or not.
1083func (db *DB) ZKeyExists(key []byte) (int64, error) {
1084	if err := checkKeySize(key); err != nil {
1085		return 0, err
1086	}
1087	sk := db.zEncodeSizeKey(key)
1088	v, err := db.bucket.Get(sk)
1089	if v != nil && err == nil {
1090		return 1, nil
1091	}
1092	return 0, err
1093}
1094