1// Copyright 2017 The Cayley Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leveldb
16
17import (
18	"context"
19	"fmt"
20	"os"
21
22	"github.com/syndtr/goleveldb/leveldb"
23	"github.com/syndtr/goleveldb/leveldb/iterator"
24	"github.com/syndtr/goleveldb/leveldb/opt"
25	"github.com/syndtr/goleveldb/leveldb/util"
26
27	"github.com/cayleygraph/cayley/graph"
28	"github.com/cayleygraph/cayley/graph/kv"
29)
30
31func init() {
32	kv.Register(Type, kv.Registration{
33		NewFunc:      Open,
34		InitFunc:     Create,
35		IsPersistent: true,
36	})
37}
38
39const (
40	Type = "leveldb"
41)
42
43func newDB(d *leveldb.DB, m graph.Options) *DB {
44	db := &DB{
45		DB: d,
46		wo: &opt.WriteOptions{},
47	}
48	nosync, _ := m.BoolKey("nosync", false)
49	db.wo.Sync = !nosync
50	return db
51}
52
53func Create(path string, m graph.Options) (kv.BucketKV, error) {
54	err := os.MkdirAll(path, 0700)
55	if err != nil {
56		return nil, err
57	}
58	db, err := leveldb.OpenFile(path, &opt.Options{
59		ErrorIfExist: true,
60	})
61	if os.IsExist(err) {
62		return nil, graph.ErrDatabaseExists
63	} else if err != nil {
64		return nil, err
65	}
66	return kv.FromFlat(newDB(db, m)), nil
67}
68
69func Open(path string, m graph.Options) (kv.BucketKV, error) {
70	db, err := leveldb.OpenFile(path, &opt.Options{
71		ErrorIfMissing: true,
72	})
73	if err != nil {
74		return nil, err
75	}
76	return kv.FromFlat(newDB(db, m)), nil
77}
78
79type DB struct {
80	DB *leveldb.DB
81	wo *opt.WriteOptions
82	ro *opt.ReadOptions
83}
84
85func (db *DB) Type() string {
86	return Type
87}
88func (db *DB) Close() error {
89	return db.DB.Close()
90}
91func (db *DB) Tx(update bool) (kv.FlatTx, error) {
92	tx := &Tx{db: db}
93	var err error
94	if update {
95		tx.tx, err = db.DB.OpenTransaction()
96	} else {
97		tx.sn, err = db.DB.GetSnapshot()
98	}
99	if err != nil {
100		return nil, err
101	}
102	return tx, nil
103}
104
105type Tx struct {
106	db  *DB
107	sn  *leveldb.Snapshot
108	tx  *leveldb.Transaction
109	err error
110}
111
112func (tx *Tx) Commit(ctx context.Context) error {
113	if tx.err != nil {
114		return tx.err
115	}
116	if tx.tx != nil {
117		tx.err = tx.tx.Commit()
118		return tx.err
119	}
120	tx.sn.Release()
121	return tx.err
122}
123func (tx *Tx) Rollback() error {
124	if tx.tx != nil {
125		tx.tx.Discard()
126	} else {
127		tx.sn.Release()
128	}
129	return tx.err
130}
131func (tx *Tx) Get(ctx context.Context, keys [][]byte) ([][]byte, error) {
132	vals := make([][]byte, len(keys))
133	var err error
134	var get func(k []byte, ro *opt.ReadOptions) ([]byte, error)
135	if tx.tx != nil {
136		get = tx.tx.Get
137	} else {
138		get = tx.sn.Get
139	}
140	for i, k := range keys {
141		vals[i], err = get(k, tx.db.ro)
142		if err == leveldb.ErrNotFound {
143			vals[i] = nil
144		} else if err != nil {
145			return nil, err
146		}
147	}
148	return vals, nil
149}
150func (tx *Tx) Put(k, v []byte) error {
151	if tx.tx == nil {
152		return fmt.Errorf("put on ro tx")
153	}
154	return tx.tx.Put(k, v, tx.db.wo)
155}
156func (tx *Tx) Del(k []byte) error {
157	if tx.tx == nil {
158		return fmt.Errorf("del on ro tx")
159	}
160	return tx.tx.Delete(k, tx.db.wo)
161}
162func (tx *Tx) Scan(pref []byte) kv.KVIterator {
163	r, ro := util.BytesPrefix(pref), tx.db.ro
164	var it iterator.Iterator
165	if tx.tx != nil {
166		it = tx.tx.NewIterator(r, ro)
167	} else {
168		it = tx.sn.NewIterator(r, ro)
169	}
170	return &Iterator{it: it, first: true}
171}
172
173type Iterator struct {
174	it    iterator.Iterator
175	first bool
176}
177
178func (it *Iterator) Next(ctx context.Context) bool {
179	if it.first {
180		it.first = false
181		return it.it.First()
182	}
183	return it.it.Next()
184}
185func (it *Iterator) Key() []byte { return it.it.Key() }
186func (it *Iterator) Val() []byte { return it.it.Value() }
187func (it *Iterator) Err() error {
188	return it.it.Error()
189}
190func (it *Iterator) Close() error {
191	it.it.Release()
192	return it.Err()
193}
194