1// Copyright 2017 The Cayley Authors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package kv
16
17import (
18	"context"
19	"encoding/binary"
20	"errors"
21	"fmt"
22	"sync"
23
24	"github.com/cayleygraph/cayley/clog"
25	"github.com/cayleygraph/cayley/graph"
26	"github.com/cayleygraph/cayley/graph/proto"
27	"github.com/cayleygraph/cayley/internal/lru"
28	"github.com/cayleygraph/cayley/quad"
29	"github.com/cayleygraph/cayley/quad/pquads"
30	boom "github.com/tylertreat/BoomFilters"
31)
32
33type Registration struct {
34	NewFunc      NewFunc
35	InitFunc     InitFunc
36	IsPersistent bool
37}
38
39type InitFunc func(string, graph.Options) (BucketKV, error)
40type NewFunc func(string, graph.Options) (BucketKV, error)
41
42func Register(name string, r Registration) {
43	graph.RegisterQuadStore(name, graph.QuadStoreRegistration{
44		InitFunc: func(addr string, opt graph.Options) error {
45			if !r.IsPersistent {
46				return nil
47			}
48			kv, err := r.InitFunc(addr, opt)
49			if err != nil {
50				return err
51			}
52			defer kv.Close()
53			if err = Init(kv, opt); err != nil {
54				return err
55			}
56			return kv.Close()
57		},
58		NewFunc: func(addr string, opt graph.Options) (graph.QuadStore, error) {
59			kv, err := r.NewFunc(addr, opt)
60			if err != nil {
61				return nil, err
62			}
63			if !r.IsPersistent {
64				if err = Init(kv, opt); err != nil {
65					kv.Close()
66					return nil, err
67				}
68			}
69			return New(kv, opt)
70		},
71		IsPersistent: r.IsPersistent,
72	})
73}
74
75const (
76	latestDataVersion = 2
77	nilDataVersion    = 1
78)
79
80var _ graph.BatchQuadStore = (*QuadStore)(nil)
81
82type QuadStore struct {
83	db BucketKV
84
85	indexes struct {
86		sync.RWMutex
87		all []QuadIndex
88		// indexes used to detect duplicate quads
89		exists []QuadIndex
90	}
91
92	valueLRU *lru.Cache
93
94	writer    sync.Mutex
95	mapBucket map[string]map[string][]uint64
96
97	exists struct {
98		sync.Mutex
99		buf []byte
100		*boom.DeletableBloomFilter
101	}
102}
103
104func newQuadStore(kv BucketKV) *QuadStore {
105	qs := &QuadStore{db: kv}
106	qs.indexes.all = DefaultQuadIndexes
107	return qs
108}
109
110func Init(kv BucketKV, opt graph.Options) error {
111	ctx := context.TODO()
112	qs := newQuadStore(kv)
113	if _, err := qs.getMetadata(ctx); err == nil {
114		return graph.ErrDatabaseExists
115	} else if err != ErrNoBucket {
116		return err
117	}
118	upfront, err := opt.BoolKey("upfront", false)
119	if err != nil {
120		return err
121	}
122	if err := qs.createBuckets(ctx, upfront); err != nil {
123		return err
124	}
125	if err := setVersion(ctx, qs.db, latestDataVersion); err != nil {
126		return err
127	}
128	return nil
129}
130
131func New(kv BucketKV, _ graph.Options) (graph.QuadStore, error) {
132	ctx := context.TODO()
133	qs := newQuadStore(kv)
134	if vers, err := qs.getMetadata(ctx); err == ErrNoBucket {
135		return nil, graph.ErrNotInitialized
136	} else if err != nil {
137		return nil, err
138	} else if vers != latestDataVersion {
139		return nil, errors.New("kv: data version is out of date. Run cayleyupgrade for your config to update the data.")
140	}
141	qs.valueLRU = lru.New(2000)
142	if err := qs.initBloomFilter(ctx); err != nil {
143		return nil, err
144	}
145	return qs, nil
146}
147
148func setVersion(ctx context.Context, kv BucketKV, version int64) error {
149	return Update(ctx, kv, func(tx BucketTx) error {
150		var buf [8]byte
151		binary.LittleEndian.PutUint64(buf[:], uint64(version))
152		b := tx.Bucket(metaBucket)
153		if err := b.Put([]byte("version"), buf[:]); err != nil {
154			return fmt.Errorf("couldn't write version: %v", err)
155		}
156		return nil
157	})
158}
159
160func (qs *QuadStore) getMetaInt(ctx context.Context, key string) (int64, error) {
161	var v int64
162	err := View(qs.db, func(tx BucketTx) error {
163		b := tx.Bucket(metaBucket)
164		var err error
165		vals, err := b.Get(ctx, [][]byte{
166			[]byte(key),
167		})
168		if err != nil {
169			return err
170		} else if vals[0] == nil {
171			return ErrNoBucket
172		}
173		v, err = asInt64(vals[0], 0)
174		if err != nil {
175			return err
176		}
177		return nil
178	})
179	return v, err
180}
181
182func (qs *QuadStore) Size() int64 {
183	sz, _ := qs.getMetaInt(context.TODO(), "size")
184	return sz
185}
186
187func (qs *QuadStore) Close() error {
188	return qs.db.Close()
189}
190
191func (qs *QuadStore) getMetadata(ctx context.Context) (int64, error) {
192	var vers int64
193	err := View(qs.db, func(tx BucketTx) error {
194		b := tx.Bucket(metaBucket)
195		var err error
196		vals, err := b.Get(ctx, [][]byte{
197			[]byte("version"),
198		})
199		if err == ErrNotFound {
200			return ErrNoBucket
201		} else if err != nil {
202			return err
203		} else if vals[0] == nil {
204			return ErrNoBucket
205		}
206		vers, err = asInt64(vals[0], nilDataVersion)
207		if err != nil {
208			return err
209		}
210		return nil
211	})
212	return vers, err
213}
214
215func asInt64(b []byte, empty int64) (int64, error) {
216	if len(b) == 0 {
217		return empty, nil
218	} else if len(b) != 8 {
219		return 0, fmt.Errorf("unexpected int size: %d", len(b))
220	}
221	v := int64(binary.LittleEndian.Uint64(b))
222	return v, nil
223}
224
225func (qs *QuadStore) horizon(ctx context.Context) int64 {
226	h, _ := qs.getMetaInt(ctx, "horizon")
227	return h
228}
229
230func (qs *QuadStore) ValuesOf(ctx context.Context, vals []graph.Value) ([]quad.Value, error) {
231	out := make([]quad.Value, len(vals))
232	var (
233		inds []int
234		refs []uint64
235	)
236	for i, v := range vals {
237		if v == nil {
238			continue
239		} else if pv, ok := v.(graph.PreFetchedValue); ok {
240			out[i] = pv.NameOf()
241			continue
242		}
243		switch v := v.(type) {
244		case Int64Value:
245			if v == 0 {
246				continue
247			}
248			inds = append(inds, i)
249			refs = append(refs, uint64(v))
250		default:
251			return out, fmt.Errorf("unknown type of graph.Value; not meant for this quadstore. apparently a %#v", v)
252		}
253	}
254	if len(refs) == 0 {
255		return out, nil
256	}
257	prim, err := qs.getPrimitives(ctx, refs)
258	if err != nil {
259		return out, err
260	}
261	var last error
262	for i, p := range prim {
263		if !p.IsNode() {
264			continue
265		}
266		qv, err := pquads.UnmarshalValue(p.Value)
267		if err != nil {
268			last = err
269			continue
270		}
271		out[inds[i]] = qv
272	}
273	return out, last
274}
275
276func (qs *QuadStore) RefsOf(ctx context.Context, nodes []quad.Value) ([]graph.Value, error) {
277	values := make([]graph.Value, len(nodes))
278	err := View(qs.db, func(tx BucketTx) error {
279		for i, node := range nodes {
280			value, err := qs.resolveQuadValue(ctx, tx, node)
281			if err != nil {
282				return err
283			}
284			values[i] = Int64Value(value)
285		}
286		return nil
287	})
288	if err != nil {
289		return nil, err
290	}
291	return values, nil
292}
293
294func (qs *QuadStore) NameOf(v graph.Value) quad.Value {
295	ctx := context.TODO()
296	vals, err := qs.ValuesOf(ctx, []graph.Value{v})
297	if err != nil {
298		clog.Errorf("error getting NameOf %d: %s", v, err)
299		return nil
300	}
301	return vals[0]
302}
303
304func (qs *QuadStore) Quad(k graph.Value) quad.Quad {
305	key, ok := k.(*proto.Primitive)
306	if !ok {
307		clog.Errorf("passed value was not a quad primitive: %T", k)
308		return quad.Quad{}
309	}
310	ctx := context.TODO()
311	var v quad.Quad
312	err := View(qs.db, func(tx BucketTx) error {
313		var err error
314		v, err = qs.primitiveToQuad(ctx, tx, key)
315		return err
316	})
317	if err != nil {
318		if err != ErrNotFound {
319			clog.Errorf("error fetching quad %#v: %s", key, err)
320		}
321		return quad.Quad{}
322	}
323	return v
324}
325
326func (qs *QuadStore) primitiveToQuad(ctx context.Context, tx BucketTx, p *proto.Primitive) (quad.Quad, error) {
327	q := &quad.Quad{}
328	for _, dir := range quad.Directions {
329		v := p.GetDirection(dir)
330		val, err := qs.getValFromLog(ctx, tx, v)
331		if err != nil {
332			return *q, err
333		}
334		q.Set(dir, val)
335	}
336	return *q, nil
337}
338
339func (qs *QuadStore) getValFromLog(ctx context.Context, tx BucketTx, k uint64) (quad.Value, error) {
340	if k == 0 {
341		return nil, nil
342	}
343	p, err := qs.getPrimitiveFromLog(ctx, tx, k)
344	if err != nil {
345		return nil, err
346	}
347	return pquads.UnmarshalValue(p.Value)
348}
349
350func (qs *QuadStore) ValueOf(s quad.Value) graph.Value {
351	ctx := context.TODO()
352	var out Int64Value
353	_ = View(qs.db, func(tx BucketTx) error {
354		v, err := qs.resolveQuadValue(ctx, tx, s)
355		out = Int64Value(v)
356		return err
357	})
358	if out == 0 {
359		return nil
360	}
361	return out
362}
363
364func (qs *QuadStore) QuadDirection(val graph.Value, d quad.Direction) graph.Value {
365	p, ok := val.(*proto.Primitive)
366	if !ok {
367		return nil
368	}
369	switch d {
370	case quad.Subject:
371		return Int64Value(p.Subject)
372	case quad.Predicate:
373		return Int64Value(p.Predicate)
374	case quad.Object:
375		return Int64Value(p.Object)
376	case quad.Label:
377		if p.Label == 0 {
378			return nil
379		}
380		return Int64Value(p.Label)
381	}
382	return nil
383}
384
385func (qs *QuadStore) getPrimitives(ctx context.Context, vals []uint64) ([]*proto.Primitive, error) {
386	tx, err := qs.db.Tx(false)
387	if err != nil {
388		return nil, err
389	}
390	defer tx.Rollback()
391	return qs.getPrimitivesFromLog(ctx, tx, vals)
392}
393
394type Int64Value uint64
395
396func (v Int64Value) Key() interface{} { return v }
397