1//  Copyright (c) 2015 Couchbase, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// 		http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package gtreap provides an in-memory implementation of the
16// KVStore interfaces using the gtreap balanced-binary treap,
17// copy-on-write data structure.
18package gtreap
19
20import (
21	"fmt"
22	"math/rand"
23
24	"github.com/blevesearch/bleve/index/store"
25)
26
27type Writer struct {
28	s *Store
29}
30
31func (w *Writer) NewBatch() store.KVBatch {
32	return store.NewEmulatedBatch(w.s.mo)
33}
34
35func (w *Writer) NewBatchEx(options store.KVBatchOptions) ([]byte, store.KVBatch, error) {
36	return make([]byte, options.TotalBytes), w.NewBatch(), nil
37}
38
39func (w *Writer) ExecuteBatch(batch store.KVBatch) error {
40
41	emulatedBatch, ok := batch.(*store.EmulatedBatch)
42	if !ok {
43		return fmt.Errorf("wrong type of batch")
44	}
45
46	w.s.m.Lock()
47	for k, mergeOps := range emulatedBatch.Merger.Merges {
48		kb := []byte(k)
49		var existingVal []byte
50		existingItem := w.s.t.Get(&Item{k: kb})
51		if existingItem != nil {
52			existingVal = w.s.t.Get(&Item{k: kb}).(*Item).v
53		}
54		mergedVal, fullMergeOk := w.s.mo.FullMerge(kb, existingVal, mergeOps)
55		if !fullMergeOk {
56			return fmt.Errorf("merge operator returned failure")
57		}
58		w.s.t = w.s.t.Upsert(&Item{k: kb, v: mergedVal}, rand.Int())
59	}
60
61	for _, op := range emulatedBatch.Ops {
62		if op.V != nil {
63			w.s.t = w.s.t.Upsert(&Item{k: op.K, v: op.V}, rand.Int())
64		} else {
65			w.s.t = w.s.t.Delete(&Item{k: op.K})
66		}
67	}
68	w.s.m.Unlock()
69
70	return nil
71}
72
73func (w *Writer) Close() error {
74	w.s = nil
75	return nil
76}
77