1// Copyright 2017 The Cayley Authors. All rights reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package kv 16 17import ( 18 "context" 19 "encoding/binary" 20 "errors" 21 "fmt" 22 "sync" 23 24 "github.com/cayleygraph/cayley/clog" 25 "github.com/cayleygraph/cayley/graph" 26 "github.com/cayleygraph/cayley/graph/proto" 27 "github.com/cayleygraph/cayley/internal/lru" 28 "github.com/cayleygraph/cayley/quad" 29 "github.com/cayleygraph/cayley/quad/pquads" 30 boom "github.com/tylertreat/BoomFilters" 31) 32 33type Registration struct { 34 NewFunc NewFunc 35 InitFunc InitFunc 36 IsPersistent bool 37} 38 39type InitFunc func(string, graph.Options) (BucketKV, error) 40type NewFunc func(string, graph.Options) (BucketKV, error) 41 42func Register(name string, r Registration) { 43 graph.RegisterQuadStore(name, graph.QuadStoreRegistration{ 44 InitFunc: func(addr string, opt graph.Options) error { 45 if !r.IsPersistent { 46 return nil 47 } 48 kv, err := r.InitFunc(addr, opt) 49 if err != nil { 50 return err 51 } 52 defer kv.Close() 53 if err = Init(kv, opt); err != nil { 54 return err 55 } 56 return kv.Close() 57 }, 58 NewFunc: func(addr string, opt graph.Options) (graph.QuadStore, error) { 59 kv, err := r.NewFunc(addr, opt) 60 if err != nil { 61 return nil, err 62 } 63 if !r.IsPersistent { 64 if err = Init(kv, opt); err != nil { 65 kv.Close() 66 return nil, err 67 } 68 } 69 return New(kv, opt) 70 }, 71 IsPersistent: r.IsPersistent, 72 }) 73} 74 75const ( 76 latestDataVersion = 2 77 nilDataVersion = 1 78) 79 80var _ graph.BatchQuadStore = (*QuadStore)(nil) 81 82type QuadStore struct { 83 db BucketKV 84 85 indexes struct { 86 sync.RWMutex 87 all []QuadIndex 88 // indexes used to detect duplicate quads 89 exists []QuadIndex 90 } 91 92 valueLRU *lru.Cache 93 94 writer sync.Mutex 95 mapBucket map[string]map[string][]uint64 96 97 exists struct { 98 sync.Mutex 99 buf []byte 100 *boom.DeletableBloomFilter 101 } 102} 103 104func newQuadStore(kv BucketKV) *QuadStore { 105 qs := &QuadStore{db: kv} 106 qs.indexes.all = DefaultQuadIndexes 107 return qs 108} 109 110func Init(kv BucketKV, opt graph.Options) error { 111 ctx := context.TODO() 112 qs := newQuadStore(kv) 113 if _, err := qs.getMetadata(ctx); err == nil { 114 return graph.ErrDatabaseExists 115 } else if err != ErrNoBucket { 116 return err 117 } 118 upfront, err := opt.BoolKey("upfront", false) 119 if err != nil { 120 return err 121 } 122 if err := qs.createBuckets(ctx, upfront); err != nil { 123 return err 124 } 125 if err := setVersion(ctx, qs.db, latestDataVersion); err != nil { 126 return err 127 } 128 return nil 129} 130 131func New(kv BucketKV, _ graph.Options) (graph.QuadStore, error) { 132 ctx := context.TODO() 133 qs := newQuadStore(kv) 134 if vers, err := qs.getMetadata(ctx); err == ErrNoBucket { 135 return nil, graph.ErrNotInitialized 136 } else if err != nil { 137 return nil, err 138 } else if vers != latestDataVersion { 139 return nil, errors.New("kv: data version is out of date. Run cayleyupgrade for your config to update the data.") 140 } 141 qs.valueLRU = lru.New(2000) 142 if err := qs.initBloomFilter(ctx); err != nil { 143 return nil, err 144 } 145 return qs, nil 146} 147 148func setVersion(ctx context.Context, kv BucketKV, version int64) error { 149 return Update(ctx, kv, func(tx BucketTx) error { 150 var buf [8]byte 151 binary.LittleEndian.PutUint64(buf[:], uint64(version)) 152 b := tx.Bucket(metaBucket) 153 if err := b.Put([]byte("version"), buf[:]); err != nil { 154 return fmt.Errorf("couldn't write version: %v", err) 155 } 156 return nil 157 }) 158} 159 160func (qs *QuadStore) getMetaInt(ctx context.Context, key string) (int64, error) { 161 var v int64 162 err := View(qs.db, func(tx BucketTx) error { 163 b := tx.Bucket(metaBucket) 164 var err error 165 vals, err := b.Get(ctx, [][]byte{ 166 []byte(key), 167 }) 168 if err != nil { 169 return err 170 } else if vals[0] == nil { 171 return ErrNoBucket 172 } 173 v, err = asInt64(vals[0], 0) 174 if err != nil { 175 return err 176 } 177 return nil 178 }) 179 return v, err 180} 181 182func (qs *QuadStore) Size() int64 { 183 sz, _ := qs.getMetaInt(context.TODO(), "size") 184 return sz 185} 186 187func (qs *QuadStore) Close() error { 188 return qs.db.Close() 189} 190 191func (qs *QuadStore) getMetadata(ctx context.Context) (int64, error) { 192 var vers int64 193 err := View(qs.db, func(tx BucketTx) error { 194 b := tx.Bucket(metaBucket) 195 var err error 196 vals, err := b.Get(ctx, [][]byte{ 197 []byte("version"), 198 }) 199 if err == ErrNotFound { 200 return ErrNoBucket 201 } else if err != nil { 202 return err 203 } else if vals[0] == nil { 204 return ErrNoBucket 205 } 206 vers, err = asInt64(vals[0], nilDataVersion) 207 if err != nil { 208 return err 209 } 210 return nil 211 }) 212 return vers, err 213} 214 215func asInt64(b []byte, empty int64) (int64, error) { 216 if len(b) == 0 { 217 return empty, nil 218 } else if len(b) != 8 { 219 return 0, fmt.Errorf("unexpected int size: %d", len(b)) 220 } 221 v := int64(binary.LittleEndian.Uint64(b)) 222 return v, nil 223} 224 225func (qs *QuadStore) horizon(ctx context.Context) int64 { 226 h, _ := qs.getMetaInt(ctx, "horizon") 227 return h 228} 229 230func (qs *QuadStore) ValuesOf(ctx context.Context, vals []graph.Value) ([]quad.Value, error) { 231 out := make([]quad.Value, len(vals)) 232 var ( 233 inds []int 234 refs []uint64 235 ) 236 for i, v := range vals { 237 if v == nil { 238 continue 239 } else if pv, ok := v.(graph.PreFetchedValue); ok { 240 out[i] = pv.NameOf() 241 continue 242 } 243 switch v := v.(type) { 244 case Int64Value: 245 if v == 0 { 246 continue 247 } 248 inds = append(inds, i) 249 refs = append(refs, uint64(v)) 250 default: 251 return out, fmt.Errorf("unknown type of graph.Value; not meant for this quadstore. apparently a %#v", v) 252 } 253 } 254 if len(refs) == 0 { 255 return out, nil 256 } 257 prim, err := qs.getPrimitives(ctx, refs) 258 if err != nil { 259 return out, err 260 } 261 var last error 262 for i, p := range prim { 263 if !p.IsNode() { 264 continue 265 } 266 qv, err := pquads.UnmarshalValue(p.Value) 267 if err != nil { 268 last = err 269 continue 270 } 271 out[inds[i]] = qv 272 } 273 return out, last 274} 275 276func (qs *QuadStore) RefsOf(ctx context.Context, nodes []quad.Value) ([]graph.Value, error) { 277 values := make([]graph.Value, len(nodes)) 278 err := View(qs.db, func(tx BucketTx) error { 279 for i, node := range nodes { 280 value, err := qs.resolveQuadValue(ctx, tx, node) 281 if err != nil { 282 return err 283 } 284 values[i] = Int64Value(value) 285 } 286 return nil 287 }) 288 if err != nil { 289 return nil, err 290 } 291 return values, nil 292} 293 294func (qs *QuadStore) NameOf(v graph.Value) quad.Value { 295 ctx := context.TODO() 296 vals, err := qs.ValuesOf(ctx, []graph.Value{v}) 297 if err != nil { 298 clog.Errorf("error getting NameOf %d: %s", v, err) 299 return nil 300 } 301 return vals[0] 302} 303 304func (qs *QuadStore) Quad(k graph.Value) quad.Quad { 305 key, ok := k.(*proto.Primitive) 306 if !ok { 307 clog.Errorf("passed value was not a quad primitive: %T", k) 308 return quad.Quad{} 309 } 310 ctx := context.TODO() 311 var v quad.Quad 312 err := View(qs.db, func(tx BucketTx) error { 313 var err error 314 v, err = qs.primitiveToQuad(ctx, tx, key) 315 return err 316 }) 317 if err != nil { 318 if err != ErrNotFound { 319 clog.Errorf("error fetching quad %#v: %s", key, err) 320 } 321 return quad.Quad{} 322 } 323 return v 324} 325 326func (qs *QuadStore) primitiveToQuad(ctx context.Context, tx BucketTx, p *proto.Primitive) (quad.Quad, error) { 327 q := &quad.Quad{} 328 for _, dir := range quad.Directions { 329 v := p.GetDirection(dir) 330 val, err := qs.getValFromLog(ctx, tx, v) 331 if err != nil { 332 return *q, err 333 } 334 q.Set(dir, val) 335 } 336 return *q, nil 337} 338 339func (qs *QuadStore) getValFromLog(ctx context.Context, tx BucketTx, k uint64) (quad.Value, error) { 340 if k == 0 { 341 return nil, nil 342 } 343 p, err := qs.getPrimitiveFromLog(ctx, tx, k) 344 if err != nil { 345 return nil, err 346 } 347 return pquads.UnmarshalValue(p.Value) 348} 349 350func (qs *QuadStore) ValueOf(s quad.Value) graph.Value { 351 ctx := context.TODO() 352 var out Int64Value 353 _ = View(qs.db, func(tx BucketTx) error { 354 v, err := qs.resolveQuadValue(ctx, tx, s) 355 out = Int64Value(v) 356 return err 357 }) 358 if out == 0 { 359 return nil 360 } 361 return out 362} 363 364func (qs *QuadStore) QuadDirection(val graph.Value, d quad.Direction) graph.Value { 365 p, ok := val.(*proto.Primitive) 366 if !ok { 367 return nil 368 } 369 switch d { 370 case quad.Subject: 371 return Int64Value(p.Subject) 372 case quad.Predicate: 373 return Int64Value(p.Predicate) 374 case quad.Object: 375 return Int64Value(p.Object) 376 case quad.Label: 377 if p.Label == 0 { 378 return nil 379 } 380 return Int64Value(p.Label) 381 } 382 return nil 383} 384 385func (qs *QuadStore) getPrimitives(ctx context.Context, vals []uint64) ([]*proto.Primitive, error) { 386 tx, err := qs.db.Tx(false) 387 if err != nil { 388 return nil, err 389 } 390 defer tx.Rollback() 391 return qs.getPrimitivesFromLog(ctx, tx, vals) 392} 393 394type Int64Value uint64 395 396func (v Int64Value) Key() interface{} { return v } 397