1package roaring
2
3import (
4	"bytes"
5	"encoding/binary"
6	"fmt"
7	"io"
8	"io/ioutil"
9
10	snappy "github.com/glycerine/go-unsnap-stream"
11	"github.com/tinylib/msgp/msgp"
12)
13
14//go:generate msgp -unexported
15
16type container interface {
17	addOffset(uint16) []container
18
19	clone() container
20	and(container) container
21	andCardinality(container) int
22	iand(container) container // i stands for inplace
23	andNot(container) container
24	iandNot(container) container // i stands for inplace
25	getCardinality() int
26	// rank returns the number of integers that are
27	// smaller or equal to x. rank(infinity) would be getCardinality().
28	rank(uint16) int
29
30	iadd(x uint16) bool                   // inplace, returns true if x was new.
31	iaddReturnMinimized(uint16) container // may change return type to minimize storage.
32
33	//addRange(start, final int) container  // range is [firstOfRange,lastOfRange) (unused)
34	iaddRange(start, endx int) container // i stands for inplace, range is [firstOfRange,endx)
35
36	iremove(x uint16) bool                   // inplace, returns true if x was present.
37	iremoveReturnMinimized(uint16) container // may change return type to minimize storage.
38
39	not(start, final int) container        // range is [firstOfRange,lastOfRange)
40	inot(firstOfRange, endx int) container // i stands for inplace, range is [firstOfRange,endx)
41	xor(r container) container
42	getShortIterator() shortIterable
43	getReverseIterator() shortIterable
44	getManyIterator() manyIterable
45	contains(i uint16) bool
46	maximum() uint16
47	minimum() uint16
48
49	// equals is now logical equals; it does not require the
50	// same underlying container types, but compares across
51	// any of the implementations.
52	equals(r container) bool
53
54	fillLeastSignificant16bits(array []uint32, i int, mask uint32)
55	or(r container) container
56	orCardinality(r container) int
57	isFull() bool
58	ior(r container) container   // i stands for inplace
59	intersects(r container) bool // whether the two containers intersect
60	lazyOR(r container) container
61	lazyIOR(r container) container
62	getSizeInBytes() int
63	//removeRange(start, final int) container  // range is [firstOfRange,lastOfRange) (unused)
64	iremoveRange(start, final int) container // i stands for inplace, range is [firstOfRange,lastOfRange)
65	selectInt(x uint16) int                  // selectInt returns the xth integer in the container
66	serializedSizeInBytes() int
67	readFrom(io.Reader) (int, error)
68	writeTo(io.Writer) (int, error)
69
70	numberOfRuns() int
71	toEfficientContainer() container
72	String() string
73	containerType() contype
74}
75
76type contype uint8
77
78const (
79	bitmapContype contype = iota
80	arrayContype
81	run16Contype
82	run32Contype
83)
84
85// careful: range is [firstOfRange,lastOfRange]
86func rangeOfOnes(start, last int) container {
87	if start > MaxUint16 {
88		panic("rangeOfOnes called with start > MaxUint16")
89	}
90	if last > MaxUint16 {
91		panic("rangeOfOnes called with last > MaxUint16")
92	}
93	if start < 0 {
94		panic("rangeOfOnes called with start < 0")
95	}
96	if last < 0 {
97		panic("rangeOfOnes called with last < 0")
98	}
99	return newRunContainer16Range(uint16(start), uint16(last))
100}
101
102type roaringArray struct {
103	keys            []uint16
104	containers      []container `msg:"-"` // don't try to serialize directly.
105	needCopyOnWrite []bool
106	copyOnWrite     bool
107
108	// conserz is used at serialization time
109	// to serialize containers. Otherwise empty.
110	conserz []containerSerz
111}
112
113// containerSerz facilitates serializing container (tricky to
114// serialize because it is an interface) by providing a
115// light wrapper with a type identifier.
116type containerSerz struct {
117	t contype  `msg:"t"` // type
118	r msgp.Raw `msg:"r"` // Raw msgpack of the actual container type
119}
120
121func newRoaringArray() *roaringArray {
122	return &roaringArray{}
123}
124
125// runOptimize compresses the element containers to minimize space consumed.
126// Q: how does this interact with copyOnWrite and needCopyOnWrite?
127// A: since we aren't changing the logical content, just the representation,
128//    we don't bother to check the needCopyOnWrite bits. We replace
129//    (possibly all) elements of ra.containers in-place with space
130//    optimized versions.
131func (ra *roaringArray) runOptimize() {
132	for i := range ra.containers {
133		ra.containers[i] = ra.containers[i].toEfficientContainer()
134	}
135}
136
137func (ra *roaringArray) appendContainer(key uint16, value container, mustCopyOnWrite bool) {
138	ra.keys = append(ra.keys, key)
139	ra.containers = append(ra.containers, value)
140	ra.needCopyOnWrite = append(ra.needCopyOnWrite, mustCopyOnWrite)
141}
142
143func (ra *roaringArray) appendWithoutCopy(sa roaringArray, startingindex int) {
144	mustCopyOnWrite := sa.needCopyOnWrite[startingindex]
145	ra.appendContainer(sa.keys[startingindex], sa.containers[startingindex], mustCopyOnWrite)
146}
147
148func (ra *roaringArray) appendCopy(sa roaringArray, startingindex int) {
149	// cow only if the two request it, or if we already have a lightweight copy
150	copyonwrite := (ra.copyOnWrite && sa.copyOnWrite) || sa.needsCopyOnWrite(startingindex)
151	if !copyonwrite {
152		// since there is no copy-on-write, we need to clone the container (this is important)
153		ra.appendContainer(sa.keys[startingindex], sa.containers[startingindex].clone(), copyonwrite)
154	} else {
155		ra.appendContainer(sa.keys[startingindex], sa.containers[startingindex], copyonwrite)
156		if !sa.needsCopyOnWrite(startingindex) {
157			sa.setNeedsCopyOnWrite(startingindex)
158		}
159	}
160}
161
162func (ra *roaringArray) appendWithoutCopyMany(sa roaringArray, startingindex, end int) {
163	for i := startingindex; i < end; i++ {
164		ra.appendWithoutCopy(sa, i)
165	}
166}
167
168func (ra *roaringArray) appendCopyMany(sa roaringArray, startingindex, end int) {
169	for i := startingindex; i < end; i++ {
170		ra.appendCopy(sa, i)
171	}
172}
173
174func (ra *roaringArray) appendCopiesUntil(sa roaringArray, stoppingKey uint16) {
175	// cow only if the two request it, or if we already have a lightweight copy
176	copyonwrite := ra.copyOnWrite && sa.copyOnWrite
177
178	for i := 0; i < sa.size(); i++ {
179		if sa.keys[i] >= stoppingKey {
180			break
181		}
182		thiscopyonewrite := copyonwrite || sa.needsCopyOnWrite(i)
183		if thiscopyonewrite {
184			ra.appendContainer(sa.keys[i], sa.containers[i], thiscopyonewrite)
185			if !sa.needsCopyOnWrite(i) {
186				sa.setNeedsCopyOnWrite(i)
187			}
188
189		} else {
190			// since there is no copy-on-write, we need to clone the container (this is important)
191			ra.appendContainer(sa.keys[i], sa.containers[i].clone(), thiscopyonewrite)
192
193		}
194	}
195}
196
197func (ra *roaringArray) appendCopiesAfter(sa roaringArray, beforeStart uint16) {
198	// cow only if the two request it, or if we already have a lightweight copy
199	copyonwrite := ra.copyOnWrite && sa.copyOnWrite
200
201	startLocation := sa.getIndex(beforeStart)
202	if startLocation >= 0 {
203		startLocation++
204	} else {
205		startLocation = -startLocation - 1
206	}
207
208	for i := startLocation; i < sa.size(); i++ {
209		thiscopyonewrite := copyonwrite || sa.needsCopyOnWrite(i)
210		if thiscopyonewrite {
211			ra.appendContainer(sa.keys[i], sa.containers[i], thiscopyonewrite)
212			if !sa.needsCopyOnWrite(i) {
213				sa.setNeedsCopyOnWrite(i)
214			}
215		} else {
216			// since there is no copy-on-write, we need to clone the container (this is important)
217			ra.appendContainer(sa.keys[i], sa.containers[i].clone(), thiscopyonewrite)
218
219		}
220	}
221}
222
223func (ra *roaringArray) removeIndexRange(begin, end int) {
224	if end <= begin {
225		return
226	}
227
228	r := end - begin
229
230	copy(ra.keys[begin:], ra.keys[end:])
231	copy(ra.containers[begin:], ra.containers[end:])
232	copy(ra.needCopyOnWrite[begin:], ra.needCopyOnWrite[end:])
233
234	ra.resize(len(ra.keys) - r)
235}
236
237func (ra *roaringArray) resize(newsize int) {
238	for k := newsize; k < len(ra.containers); k++ {
239		ra.containers[k] = nil
240	}
241
242	ra.keys = ra.keys[:newsize]
243	ra.containers = ra.containers[:newsize]
244	ra.needCopyOnWrite = ra.needCopyOnWrite[:newsize]
245}
246
247func (ra *roaringArray) clear() {
248	ra.resize(0)
249	ra.copyOnWrite = false
250	ra.conserz = nil
251}
252
253func (ra *roaringArray) clone() *roaringArray {
254
255	sa := roaringArray{}
256	sa.copyOnWrite = ra.copyOnWrite
257
258	// this is where copyOnWrite is used.
259	if ra.copyOnWrite {
260		sa.keys = make([]uint16, len(ra.keys))
261		copy(sa.keys, ra.keys)
262		sa.containers = make([]container, len(ra.containers))
263		copy(sa.containers, ra.containers)
264		sa.needCopyOnWrite = make([]bool, len(ra.needCopyOnWrite))
265
266		ra.markAllAsNeedingCopyOnWrite()
267		sa.markAllAsNeedingCopyOnWrite()
268
269		// sa.needCopyOnWrite is shared
270	} else {
271		// make a full copy
272
273		sa.keys = make([]uint16, len(ra.keys))
274		copy(sa.keys, ra.keys)
275
276		sa.containers = make([]container, len(ra.containers))
277		for i := range sa.containers {
278			sa.containers[i] = ra.containers[i].clone()
279		}
280
281		sa.needCopyOnWrite = make([]bool, len(ra.needCopyOnWrite))
282	}
283	return &sa
284}
285
286// unused function:
287//func (ra *roaringArray) containsKey(x uint16) bool {
288//	return (ra.binarySearch(0, int64(len(ra.keys)), x) >= 0)
289//}
290
291func (ra *roaringArray) getContainer(x uint16) container {
292	i := ra.binarySearch(0, int64(len(ra.keys)), x)
293	if i < 0 {
294		return nil
295	}
296	return ra.containers[i]
297}
298
299func (ra *roaringArray) getContainerAtIndex(i int) container {
300	return ra.containers[i]
301}
302
303func (ra *roaringArray) getFastContainerAtIndex(i int, needsWriteable bool) container {
304	c := ra.getContainerAtIndex(i)
305	switch t := c.(type) {
306	case *arrayContainer:
307		c = t.toBitmapContainer()
308	case *runContainer16:
309		if !t.isFull() {
310			c = t.toBitmapContainer()
311		}
312	case *bitmapContainer:
313		if needsWriteable && ra.needCopyOnWrite[i] {
314			c = ra.containers[i].clone()
315		}
316	}
317	return c
318}
319
320func (ra *roaringArray) getWritableContainerAtIndex(i int) container {
321	if ra.needCopyOnWrite[i] {
322		ra.containers[i] = ra.containers[i].clone()
323		ra.needCopyOnWrite[i] = false
324	}
325	return ra.containers[i]
326}
327
328func (ra *roaringArray) getIndex(x uint16) int {
329	// before the binary search, we optimize for frequent cases
330	size := len(ra.keys)
331	if (size == 0) || (ra.keys[size-1] == x) {
332		return size - 1
333	}
334	return ra.binarySearch(0, int64(size), x)
335}
336
337func (ra *roaringArray) getKeyAtIndex(i int) uint16 {
338	return ra.keys[i]
339}
340
341func (ra *roaringArray) insertNewKeyValueAt(i int, key uint16, value container) {
342	ra.keys = append(ra.keys, 0)
343	ra.containers = append(ra.containers, nil)
344
345	copy(ra.keys[i+1:], ra.keys[i:])
346	copy(ra.containers[i+1:], ra.containers[i:])
347
348	ra.keys[i] = key
349	ra.containers[i] = value
350
351	ra.needCopyOnWrite = append(ra.needCopyOnWrite, false)
352	copy(ra.needCopyOnWrite[i+1:], ra.needCopyOnWrite[i:])
353	ra.needCopyOnWrite[i] = false
354}
355
356func (ra *roaringArray) remove(key uint16) bool {
357	i := ra.binarySearch(0, int64(len(ra.keys)), key)
358	if i >= 0 { // if a new key
359		ra.removeAtIndex(i)
360		return true
361	}
362	return false
363}
364
365func (ra *roaringArray) removeAtIndex(i int) {
366	copy(ra.keys[i:], ra.keys[i+1:])
367	copy(ra.containers[i:], ra.containers[i+1:])
368
369	copy(ra.needCopyOnWrite[i:], ra.needCopyOnWrite[i+1:])
370
371	ra.resize(len(ra.keys) - 1)
372}
373
374func (ra *roaringArray) setContainerAtIndex(i int, c container) {
375	ra.containers[i] = c
376}
377
378func (ra *roaringArray) replaceKeyAndContainerAtIndex(i int, key uint16, c container, mustCopyOnWrite bool) {
379	ra.keys[i] = key
380	ra.containers[i] = c
381	ra.needCopyOnWrite[i] = mustCopyOnWrite
382}
383
384func (ra *roaringArray) size() int {
385	return len(ra.keys)
386}
387
388func (ra *roaringArray) binarySearch(begin, end int64, ikey uint16) int {
389	low := begin
390	high := end - 1
391	for low+16 <= high {
392		middleIndex := low + (high-low)/2 // avoid overflow
393		middleValue := ra.keys[middleIndex]
394
395		if middleValue < ikey {
396			low = middleIndex + 1
397		} else if middleValue > ikey {
398			high = middleIndex - 1
399		} else {
400			return int(middleIndex)
401		}
402	}
403	for ; low <= high; low++ {
404		val := ra.keys[low]
405		if val >= ikey {
406			if val == ikey {
407				return int(low)
408			}
409			break
410		}
411	}
412	return -int(low + 1)
413}
414
415func (ra *roaringArray) equals(o interface{}) bool {
416	srb, ok := o.(roaringArray)
417	if ok {
418
419		if srb.size() != ra.size() {
420			return false
421		}
422		for i, k := range ra.keys {
423			if k != srb.keys[i] {
424				return false
425			}
426		}
427
428		for i, c := range ra.containers {
429			if !c.equals(srb.containers[i]) {
430				return false
431			}
432		}
433		return true
434	}
435	return false
436}
437
438func (ra *roaringArray) headerSize() uint64 {
439	size := uint64(len(ra.keys))
440	if ra.hasRunCompression() {
441		if size < noOffsetThreshold { // for small bitmaps, we omit the offsets
442			return 4 + (size+7)/8 + 4*size
443		}
444		return 4 + (size+7)/8 + 8*size // - 4 because we pack the size with the cookie
445	}
446	return 4 + 4 + 8*size
447
448}
449
450// should be dirt cheap
451func (ra *roaringArray) serializedSizeInBytes() uint64 {
452	answer := ra.headerSize()
453	for _, c := range ra.containers {
454		answer += uint64(c.serializedSizeInBytes())
455	}
456	return answer
457}
458
459//
460// spec: https://github.com/RoaringBitmap/RoaringFormatSpec
461//
462func (ra *roaringArray) writeTo(w io.Writer) (n int64, err error) {
463	hasRun := ra.hasRunCompression()
464	isRunSizeInBytes := 0
465	cookieSize := 8
466	if hasRun {
467		cookieSize = 4
468		isRunSizeInBytes = (len(ra.keys) + 7) / 8
469	}
470	descriptiveHeaderSize := 4 * len(ra.keys)
471	preambleSize := cookieSize + isRunSizeInBytes + descriptiveHeaderSize
472
473	buf := make([]byte, preambleSize+4*len(ra.keys))
474
475	nw := 0
476
477	if hasRun {
478		binary.LittleEndian.PutUint16(buf[0:], uint16(serialCookie))
479		nw += 2
480		binary.LittleEndian.PutUint16(buf[2:], uint16(len(ra.keys)-1))
481		nw += 2
482
483		// compute isRun bitmap
484		var ir []byte
485
486		isRun := newBitmapContainer()
487		for i, c := range ra.containers {
488			switch c.(type) {
489			case *runContainer16:
490				isRun.iadd(uint16(i))
491			}
492		}
493		// convert to little endian
494		ir = isRun.asLittleEndianByteSlice()[:isRunSizeInBytes]
495		nw += copy(buf[nw:], ir)
496	} else {
497		binary.LittleEndian.PutUint32(buf[0:], uint32(serialCookieNoRunContainer))
498		nw += 4
499		binary.LittleEndian.PutUint32(buf[4:], uint32(len(ra.keys)))
500		nw += 4
501	}
502
503	// descriptive header
504	for i, key := range ra.keys {
505		binary.LittleEndian.PutUint16(buf[nw:], key)
506		nw += 2
507		c := ra.containers[i]
508		binary.LittleEndian.PutUint16(buf[nw:], uint16(c.getCardinality()-1))
509		nw += 2
510	}
511
512	startOffset := int64(preambleSize + 4*len(ra.keys))
513	if !hasRun || (len(ra.keys) >= noOffsetThreshold) {
514		// offset header
515		for _, c := range ra.containers {
516			binary.LittleEndian.PutUint32(buf[nw:], uint32(startOffset))
517			nw += 4
518			switch rc := c.(type) {
519			case *runContainer16:
520				startOffset += 2 + int64(len(rc.iv))*4
521			default:
522				startOffset += int64(getSizeInBytesFromCardinality(c.getCardinality()))
523			}
524		}
525	}
526
527	written, err := w.Write(buf[:nw])
528	if err != nil {
529		return n, err
530	}
531	n += int64(written)
532
533	for _, c := range ra.containers {
534		written, err := c.writeTo(w)
535		if err != nil {
536			return n, err
537		}
538		n += int64(written)
539	}
540	return n, nil
541}
542
543//
544// spec: https://github.com/RoaringBitmap/RoaringFormatSpec
545//
546func (ra *roaringArray) toBytes() ([]byte, error) {
547	var buf bytes.Buffer
548	_, err := ra.writeTo(&buf)
549	return buf.Bytes(), err
550}
551
552func (ra *roaringArray) fromBuffer(buf []byte) (int64, error) {
553	pos := 0
554	if len(buf) < 8 {
555		return 0, fmt.Errorf("buffer too small, expecting at least 8 bytes, was %d", len(buf))
556	}
557
558	cookie := binary.LittleEndian.Uint32(buf)
559	pos += 4
560	var size uint32 // number of containers
561	haveRunContainers := false
562	var isRunBitmap []byte
563
564	// cookie header
565	if cookie&0x0000FFFF == serialCookie {
566		haveRunContainers = true
567		size = uint32(uint16(cookie>>16) + 1) // number of containers
568
569		// create is-run-container bitmap
570		isRunBitmapSize := (int(size) + 7) / 8
571		if pos+isRunBitmapSize > len(buf) {
572			return 0, fmt.Errorf("malformed bitmap, is-run bitmap overruns buffer at %d", pos+isRunBitmapSize)
573		}
574
575		isRunBitmap = buf[pos : pos+isRunBitmapSize]
576		pos += isRunBitmapSize
577	} else if cookie == serialCookieNoRunContainer {
578		size = binary.LittleEndian.Uint32(buf[pos:])
579		pos += 4
580	} else {
581		return 0, fmt.Errorf("error in roaringArray.readFrom: did not find expected serialCookie in header")
582	}
583	if size > (1 << 16) {
584		return 0, fmt.Errorf("It is logically impossible to have more than (1<<16) containers.")
585	}
586	// descriptive header
587	// keycard - is {key, cardinality} tuple slice
588	if pos+2*2*int(size) > len(buf) {
589		return 0, fmt.Errorf("malfomred bitmap, key-cardinality slice overruns buffer at %d", pos+2*2*int(size))
590	}
591	keycard := byteSliceAsUint16Slice(buf[pos : pos+2*2*int(size)])
592	pos += 2 * 2 * int(size)
593
594	if !haveRunContainers || size >= noOffsetThreshold {
595		pos += 4 * int(size)
596	}
597
598	// Allocate slices upfront as number of containers is known
599	if cap(ra.containers) >= int(size) {
600		ra.containers = ra.containers[:size]
601	} else {
602		ra.containers = make([]container, size)
603	}
604	if cap(ra.keys) >= int(size) {
605		ra.keys = ra.keys[:size]
606	} else {
607		ra.keys = make([]uint16, size)
608	}
609	if cap(ra.needCopyOnWrite) >= int(size) {
610		ra.needCopyOnWrite = ra.needCopyOnWrite[:size]
611	} else {
612		ra.needCopyOnWrite = make([]bool, size)
613	}
614
615	for i := uint32(0); i < size; i++ {
616		key := uint16(keycard[2*i])
617		card := int(keycard[2*i+1]) + 1
618		ra.keys[i] = key
619		ra.needCopyOnWrite[i] = true
620
621		if haveRunContainers && isRunBitmap[i/8]&(1<<(i%8)) != 0 {
622			// run container
623			nr := binary.LittleEndian.Uint16(buf[pos:])
624			pos += 2
625			if pos+int(nr)*4 > len(buf) {
626				return 0, fmt.Errorf("malformed bitmap, a run container overruns buffer at %d:%d", pos, pos+int(nr)*4)
627			}
628			nb := runContainer16{
629				iv:   byteSliceAsInterval16Slice(buf[pos : pos+int(nr)*4]),
630				card: int64(card),
631			}
632			pos += int(nr) * 4
633			ra.containers[i] = &nb
634		} else if card > arrayDefaultMaxSize {
635			// bitmap container
636			nb := bitmapContainer{
637				cardinality: card,
638				bitmap:      byteSliceAsUint64Slice(buf[pos : pos+arrayDefaultMaxSize*2]),
639			}
640			pos += arrayDefaultMaxSize * 2
641			ra.containers[i] = &nb
642		} else {
643			// array container
644			nb := arrayContainer{
645				byteSliceAsUint16Slice(buf[pos : pos+card*2]),
646			}
647			pos += card * 2
648			ra.containers[i] = &nb
649		}
650	}
651
652	return int64(pos), nil
653}
654
655func (ra *roaringArray) readFrom(stream io.Reader) (int64, error) {
656	pos := 0
657	var cookie uint32
658	err := binary.Read(stream, binary.LittleEndian, &cookie)
659	if err != nil {
660		return 0, fmt.Errorf("error in roaringArray.readFrom: could not read initial cookie: %s", err)
661	}
662	pos += 4
663	var size uint32
664	haveRunContainers := false
665	var isRun *bitmapContainer
666	if cookie&0x0000FFFF == serialCookie {
667		haveRunContainers = true
668		size = uint32(uint16(cookie>>16) + 1)
669		bytesToRead := (int(size) + 7) / 8
670		numwords := (bytesToRead + 7) / 8
671		by := make([]byte, bytesToRead, numwords*8)
672		nr, err := io.ReadFull(stream, by)
673		if err != nil {
674			return 8 + int64(nr), fmt.Errorf("error in readFrom: could not read the "+
675				"runContainer bit flags of length %v bytes: %v", bytesToRead, err)
676		}
677		pos += bytesToRead
678		by = by[:cap(by)]
679		isRun = newBitmapContainer()
680		for i := 0; i < numwords; i++ {
681			isRun.bitmap[i] = binary.LittleEndian.Uint64(by)
682			by = by[8:]
683		}
684	} else if cookie == serialCookieNoRunContainer {
685		err = binary.Read(stream, binary.LittleEndian, &size)
686		if err != nil {
687			return 0, fmt.Errorf("error in roaringArray.readFrom: when reading size, got: %s", err)
688		}
689		pos += 4
690	} else {
691		return 0, fmt.Errorf("error in roaringArray.readFrom: did not find expected serialCookie in header")
692	}
693	if size > (1 << 16) {
694		return 0, fmt.Errorf("It is logically impossible to have more than (1<<16) containers.")
695	}
696	// descriptive header
697	keycard := make([]uint16, 2*size, 2*size)
698	err = binary.Read(stream, binary.LittleEndian, keycard)
699	if err != nil {
700		return 0, err
701	}
702	pos += 2 * 2 * int(size)
703	// offset header
704	if !haveRunContainers || size >= noOffsetThreshold {
705		io.CopyN(ioutil.Discard, stream, 4*int64(size)) // we never skip ahead so this data can be ignored
706		pos += 4 * int(size)
707	}
708	for i := uint32(0); i < size; i++ {
709		key := int(keycard[2*i])
710		card := int(keycard[2*i+1]) + 1
711		if haveRunContainers && isRun.contains(uint16(i)) {
712			nb := newRunContainer16()
713			nr, err := nb.readFrom(stream)
714			if err != nil {
715				return 0, err
716			}
717			pos += nr
718			ra.appendContainer(uint16(key), nb, false)
719		} else if card > arrayDefaultMaxSize {
720			nb := newBitmapContainer()
721			nr, err := nb.readFrom(stream)
722			if err != nil {
723				return 0, err
724			}
725			nb.cardinality = card
726			pos += nr
727			ra.appendContainer(keycard[2*i], nb, false)
728		} else {
729			nb := newArrayContainerSize(card)
730			nr, err := nb.readFrom(stream)
731			if err != nil {
732				return 0, err
733			}
734			pos += nr
735			ra.appendContainer(keycard[2*i], nb, false)
736		}
737	}
738	return int64(pos), nil
739}
740
741func (ra *roaringArray) hasRunCompression() bool {
742	for _, c := range ra.containers {
743		switch c.(type) {
744		case *runContainer16:
745			return true
746		}
747	}
748	return false
749}
750
751func (ra *roaringArray) writeToMsgpack(stream io.Writer) error {
752
753	ra.conserz = make([]containerSerz, len(ra.containers))
754	for i, v := range ra.containers {
755		switch cn := v.(type) {
756		case *bitmapContainer:
757			bts, err := cn.MarshalMsg(nil)
758			if err != nil {
759				return err
760			}
761			ra.conserz[i].t = bitmapContype
762			ra.conserz[i].r = bts
763		case *arrayContainer:
764			bts, err := cn.MarshalMsg(nil)
765			if err != nil {
766				return err
767			}
768			ra.conserz[i].t = arrayContype
769			ra.conserz[i].r = bts
770		case *runContainer16:
771			bts, err := cn.MarshalMsg(nil)
772			if err != nil {
773				return err
774			}
775			ra.conserz[i].t = run16Contype
776			ra.conserz[i].r = bts
777		default:
778			panic(fmt.Errorf("Unrecognized container implementation: %T", cn))
779		}
780	}
781	w := snappy.NewWriter(stream)
782	err := msgp.Encode(w, ra)
783	ra.conserz = nil
784	return err
785}
786
787func (ra *roaringArray) readFromMsgpack(stream io.Reader) error {
788	r := snappy.NewReader(stream)
789	err := msgp.Decode(r, ra)
790	if err != nil {
791		return err
792	}
793
794	if len(ra.containers) != len(ra.keys) {
795		ra.containers = make([]container, len(ra.keys))
796	}
797
798	for i, v := range ra.conserz {
799		switch v.t {
800		case bitmapContype:
801			c := &bitmapContainer{}
802			_, err = c.UnmarshalMsg(v.r)
803			if err != nil {
804				return err
805			}
806			ra.containers[i] = c
807		case arrayContype:
808			c := &arrayContainer{}
809			_, err = c.UnmarshalMsg(v.r)
810			if err != nil {
811				return err
812			}
813			ra.containers[i] = c
814		case run16Contype:
815			c := &runContainer16{}
816			_, err = c.UnmarshalMsg(v.r)
817			if err != nil {
818				return err
819			}
820			ra.containers[i] = c
821		default:
822			return fmt.Errorf("unrecognized contype serialization code: '%v'", v.t)
823		}
824	}
825	ra.conserz = nil
826	return nil
827}
828
829func (ra *roaringArray) advanceUntil(min uint16, pos int) int {
830	lower := pos + 1
831
832	if lower >= len(ra.keys) || ra.keys[lower] >= min {
833		return lower
834	}
835
836	spansize := 1
837
838	for lower+spansize < len(ra.keys) && ra.keys[lower+spansize] < min {
839		spansize *= 2
840	}
841	var upper int
842	if lower+spansize < len(ra.keys) {
843		upper = lower + spansize
844	} else {
845		upper = len(ra.keys) - 1
846	}
847
848	if ra.keys[upper] == min {
849		return upper
850	}
851
852	if ra.keys[upper] < min {
853		// means
854		// array
855		// has no
856		// item
857		// >= min
858		// pos = array.length;
859		return len(ra.keys)
860	}
861
862	// we know that the next-smallest span was too small
863	lower += (spansize >> 1)
864
865	mid := 0
866	for lower+1 != upper {
867		mid = (lower + upper) >> 1
868		if ra.keys[mid] == min {
869			return mid
870		} else if ra.keys[mid] < min {
871			lower = mid
872		} else {
873			upper = mid
874		}
875	}
876	return upper
877}
878
879func (ra *roaringArray) markAllAsNeedingCopyOnWrite() {
880	for i := range ra.needCopyOnWrite {
881		ra.needCopyOnWrite[i] = true
882	}
883}
884
885func (ra *roaringArray) needsCopyOnWrite(i int) bool {
886	return ra.needCopyOnWrite[i]
887}
888
889func (ra *roaringArray) setNeedsCopyOnWrite(i int) {
890	ra.needCopyOnWrite[i] = true
891}
892