1// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package backend
16
17import (
18	"bytes"
19	"sort"
20)
21
22const bucketBufferInitialSize = 512
23
24// txBuffer handles functionality shared between txWriteBuffer and txReadBuffer.
25type txBuffer struct {
26	buckets map[BucketID]*bucketBuffer
27}
28
29func (txb *txBuffer) reset() {
30	for k, v := range txb.buckets {
31		if v.used == 0 {
32			// demote
33			delete(txb.buckets, k)
34		}
35		v.used = 0
36	}
37}
38
39// txWriteBuffer buffers writes of pending updates that have not yet committed.
40type txWriteBuffer struct {
41	txBuffer
42	// Map from bucket ID into information whether this bucket is edited
43	// sequentially (i.e. keys are growing monotonically).
44	bucket2seq map[BucketID]bool
45}
46
47func (txw *txWriteBuffer) put(bucket Bucket, k, v []byte) {
48	txw.bucket2seq[bucket.ID()] = false
49	txw.putInternal(bucket, k, v)
50}
51
52func (txw *txWriteBuffer) putSeq(bucket Bucket, k, v []byte) {
53	// TODO: Add (in tests?) verification whether k>b[len(b)]
54	txw.putInternal(bucket, k, v)
55}
56
57func (txw *txWriteBuffer) putInternal(bucket Bucket, k, v []byte) {
58	b, ok := txw.buckets[bucket.ID()]
59	if !ok {
60		b = newBucketBuffer()
61		txw.buckets[bucket.ID()] = b
62	}
63	b.add(k, v)
64}
65
66func (txw *txWriteBuffer) reset() {
67	txw.txBuffer.reset()
68	for k := range txw.bucket2seq {
69		v, ok := txw.buckets[k]
70		if !ok {
71			delete(txw.bucket2seq, k)
72		} else if v.used == 0 {
73			txw.bucket2seq[k] = true
74		}
75	}
76}
77
78func (txw *txWriteBuffer) writeback(txr *txReadBuffer) {
79	for k, wb := range txw.buckets {
80		rb, ok := txr.buckets[k]
81		if !ok {
82			delete(txw.buckets, k)
83			txr.buckets[k] = wb
84			continue
85		}
86		if seq, ok := txw.bucket2seq[k]; ok && !seq && wb.used > 1 {
87			// assume no duplicate keys
88			sort.Sort(wb)
89		}
90		rb.merge(wb)
91	}
92	txw.reset()
93	// increase the buffer version
94	txr.bufVersion++
95}
96
97// txReadBuffer accesses buffered updates.
98type txReadBuffer struct {
99	txBuffer
100	// bufVersion is used to check if the buffer is modified recently
101	bufVersion uint64
102}
103
104func (txr *txReadBuffer) Range(bucket Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
105	if b := txr.buckets[bucket.ID()]; b != nil {
106		return b.Range(key, endKey, limit)
107	}
108	return nil, nil
109}
110
111func (txr *txReadBuffer) ForEach(bucket Bucket, visitor func(k, v []byte) error) error {
112	if b := txr.buckets[bucket.ID()]; b != nil {
113		return b.ForEach(visitor)
114	}
115	return nil
116}
117
118// unsafeCopy returns a copy of txReadBuffer, caller should acquire backend.readTx.RLock()
119func (txr *txReadBuffer) unsafeCopy() txReadBuffer {
120	txrCopy := txReadBuffer{
121		txBuffer: txBuffer{
122			buckets: make(map[BucketID]*bucketBuffer, len(txr.txBuffer.buckets)),
123		},
124		bufVersion: 0,
125	}
126	for bucketName, bucket := range txr.txBuffer.buckets {
127		txrCopy.txBuffer.buckets[bucketName] = bucket.Copy()
128	}
129	return txrCopy
130}
131
132type kv struct {
133	key []byte
134	val []byte
135}
136
137// bucketBuffer buffers key-value pairs that are pending commit.
138type bucketBuffer struct {
139	buf []kv
140	// used tracks number of elements in use so buf can be reused without reallocation.
141	used int
142}
143
144func newBucketBuffer() *bucketBuffer {
145	return &bucketBuffer{buf: make([]kv, bucketBufferInitialSize), used: 0}
146}
147
148func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) {
149	f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 }
150	idx := sort.Search(bb.used, f)
151	if idx < 0 {
152		return nil, nil
153	}
154	if len(endKey) == 0 {
155		if bytes.Equal(key, bb.buf[idx].key) {
156			keys = append(keys, bb.buf[idx].key)
157			vals = append(vals, bb.buf[idx].val)
158		}
159		return keys, vals
160	}
161	if bytes.Compare(endKey, bb.buf[idx].key) <= 0 {
162		return nil, nil
163	}
164	for i := idx; i < bb.used && int64(len(keys)) < limit; i++ {
165		if bytes.Compare(endKey, bb.buf[i].key) <= 0 {
166			break
167		}
168		keys = append(keys, bb.buf[i].key)
169		vals = append(vals, bb.buf[i].val)
170	}
171	return keys, vals
172}
173
174func (bb *bucketBuffer) ForEach(visitor func(k, v []byte) error) error {
175	for i := 0; i < bb.used; i++ {
176		if err := visitor(bb.buf[i].key, bb.buf[i].val); err != nil {
177			return err
178		}
179	}
180	return nil
181}
182
183func (bb *bucketBuffer) add(k, v []byte) {
184	bb.buf[bb.used].key, bb.buf[bb.used].val = k, v
185	bb.used++
186	if bb.used == len(bb.buf) {
187		buf := make([]kv, (3*len(bb.buf))/2)
188		copy(buf, bb.buf)
189		bb.buf = buf
190	}
191}
192
193// merge merges data from bbsrc into bb.
194func (bb *bucketBuffer) merge(bbsrc *bucketBuffer) {
195	for i := 0; i < bbsrc.used; i++ {
196		bb.add(bbsrc.buf[i].key, bbsrc.buf[i].val)
197	}
198	if bb.used == bbsrc.used {
199		return
200	}
201	if bytes.Compare(bb.buf[(bb.used-bbsrc.used)-1].key, bbsrc.buf[0].key) < 0 {
202		return
203	}
204
205	sort.Stable(bb)
206
207	// remove duplicates, using only newest update
208	widx := 0
209	for ridx := 1; ridx < bb.used; ridx++ {
210		if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) {
211			widx++
212		}
213		bb.buf[widx] = bb.buf[ridx]
214	}
215	bb.used = widx + 1
216}
217
218func (bb *bucketBuffer) Len() int { return bb.used }
219func (bb *bucketBuffer) Less(i, j int) bool {
220	return bytes.Compare(bb.buf[i].key, bb.buf[j].key) < 0
221}
222func (bb *bucketBuffer) Swap(i, j int) { bb.buf[i], bb.buf[j] = bb.buf[j], bb.buf[i] }
223
224func (bb *bucketBuffer) Copy() *bucketBuffer {
225	bbCopy := bucketBuffer{
226		buf:  make([]kv, len(bb.buf)),
227		used: bb.used,
228	}
229	copy(bbCopy.buf, bb.buf)
230	return &bbCopy
231}
232