1// Copyright 2017 The Cayley Authors. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package leveldb 16 17import ( 18 "context" 19 "fmt" 20 "os" 21 22 "github.com/syndtr/goleveldb/leveldb" 23 "github.com/syndtr/goleveldb/leveldb/iterator" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/util" 26 27 "github.com/cayleygraph/cayley/graph" 28 "github.com/cayleygraph/cayley/graph/kv" 29) 30 31func init() { 32 kv.Register(Type, kv.Registration{ 33 NewFunc: Open, 34 InitFunc: Create, 35 IsPersistent: true, 36 }) 37} 38 39const ( 40 Type = "leveldb" 41) 42 43func newDB(d *leveldb.DB, m graph.Options) *DB { 44 db := &DB{ 45 DB: d, 46 wo: &opt.WriteOptions{}, 47 } 48 nosync, _ := m.BoolKey("nosync", false) 49 db.wo.Sync = !nosync 50 return db 51} 52 53func Create(path string, m graph.Options) (kv.BucketKV, error) { 54 err := os.MkdirAll(path, 0700) 55 if err != nil { 56 return nil, err 57 } 58 db, err := leveldb.OpenFile(path, &opt.Options{ 59 ErrorIfExist: true, 60 }) 61 if os.IsExist(err) { 62 return nil, graph.ErrDatabaseExists 63 } else if err != nil { 64 return nil, err 65 } 66 return kv.FromFlat(newDB(db, m)), nil 67} 68 69func Open(path string, m graph.Options) (kv.BucketKV, error) { 70 db, err := leveldb.OpenFile(path, &opt.Options{ 71 ErrorIfMissing: true, 72 }) 73 if err != nil { 74 return nil, err 75 } 76 return kv.FromFlat(newDB(db, m)), nil 77} 78 79type DB struct { 80 DB *leveldb.DB 81 wo *opt.WriteOptions 82 ro *opt.ReadOptions 83} 84 85func (db *DB) Type() string { 86 return Type 87} 88func (db *DB) Close() error { 89 return db.DB.Close() 90} 91func (db *DB) Tx(update bool) (kv.FlatTx, error) { 92 tx := &Tx{db: db} 93 var err error 94 if update { 95 tx.tx, err = db.DB.OpenTransaction() 96 } else { 97 tx.sn, err = db.DB.GetSnapshot() 98 } 99 if err != nil { 100 return nil, err 101 } 102 return tx, nil 103} 104 105type Tx struct { 106 db *DB 107 sn *leveldb.Snapshot 108 tx *leveldb.Transaction 109 err error 110} 111 112func (tx *Tx) Commit(ctx context.Context) error { 113 if tx.err != nil { 114 return tx.err 115 } 116 if tx.tx != nil { 117 tx.err = tx.tx.Commit() 118 return tx.err 119 } 120 tx.sn.Release() 121 return tx.err 122} 123func (tx *Tx) Rollback() error { 124 if tx.tx != nil { 125 tx.tx.Discard() 126 } else { 127 tx.sn.Release() 128 } 129 return tx.err 130} 131func (tx *Tx) Get(ctx context.Context, keys [][]byte) ([][]byte, error) { 132 vals := make([][]byte, len(keys)) 133 var err error 134 var get func(k []byte, ro *opt.ReadOptions) ([]byte, error) 135 if tx.tx != nil { 136 get = tx.tx.Get 137 } else { 138 get = tx.sn.Get 139 } 140 for i, k := range keys { 141 vals[i], err = get(k, tx.db.ro) 142 if err == leveldb.ErrNotFound { 143 vals[i] = nil 144 } else if err != nil { 145 return nil, err 146 } 147 } 148 return vals, nil 149} 150func (tx *Tx) Put(k, v []byte) error { 151 if tx.tx == nil { 152 return fmt.Errorf("put on ro tx") 153 } 154 return tx.tx.Put(k, v, tx.db.wo) 155} 156func (tx *Tx) Del(k []byte) error { 157 if tx.tx == nil { 158 return fmt.Errorf("del on ro tx") 159 } 160 return tx.tx.Delete(k, tx.db.wo) 161} 162func (tx *Tx) Scan(pref []byte) kv.KVIterator { 163 r, ro := util.BytesPrefix(pref), tx.db.ro 164 var it iterator.Iterator 165 if tx.tx != nil { 166 it = tx.tx.NewIterator(r, ro) 167 } else { 168 it = tx.sn.NewIterator(r, ro) 169 } 170 return &Iterator{it: it, first: true} 171} 172 173type Iterator struct { 174 it iterator.Iterator 175 first bool 176} 177 178func (it *Iterator) Next(ctx context.Context) bool { 179 if it.first { 180 it.first = false 181 return it.it.First() 182 } 183 return it.it.Next() 184} 185func (it *Iterator) Key() []byte { return it.it.Key() } 186func (it *Iterator) Val() []byte { return it.it.Value() } 187func (it *Iterator) Err() error { 188 return it.it.Error() 189} 190func (it *Iterator) Close() error { 191 it.it.Release() 192 return it.Err() 193} 194