1package nosql
2
3import (
4	"fmt"
5	"math"
6	"strconv"
7
8	"github.com/cayleygraph/cayley/graph"
9	"github.com/cayleygraph/cayley/graph/iterator"
10	"github.com/cayleygraph/cayley/graph/shape"
11	"github.com/cayleygraph/cayley/quad"
12)
13
14func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) {
15	return it, false // everything done is shapes
16}
17
18var _ shape.Optimizer = (*QuadStore)(nil)
19
20func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) {
21	switch s := s.(type) {
22	case shape.Quads:
23		return qs.optimizeQuads(s)
24	case shape.Filter:
25		return qs.optimizeFilter(s)
26	case shape.Page:
27		return qs.optimizePage(s)
28	case shape.Composite:
29		if s2, opt := s.Simplify().Optimize(qs); opt {
30			return s2, true
31		}
32	}
33	return s, false
34}
35
36// Shape is a shape representing a documents query with filters
37type Shape struct {
38	Collection string        // name of the collection
39	Filters    []FieldFilter // filters to select documents
40	Limit      int64         // limits a number of documents
41}
42
43func (s Shape) BuildIterator(qs graph.QuadStore) graph.Iterator {
44	db, ok := qs.(*QuadStore)
45	if !ok {
46		return iterator.NewError(fmt.Errorf("not a nosql database: %T", qs))
47	}
48	return NewIterator(db, s.Collection, s.Filters...)
49}
50
51func (s Shape) Optimize(r shape.Optimizer) (shape.Shape, bool) {
52	return s, false
53}
54
55// Quads is a shape representing a quads query
56type Quads struct {
57	Links []Linkage // filters to select quads
58	Limit int64     // limits a number of documents
59}
60
61func (s Quads) BuildIterator(qs graph.QuadStore) graph.Iterator {
62	db, ok := qs.(*QuadStore)
63	if !ok {
64		return iterator.NewError(fmt.Errorf("not a nosql database: %T", qs))
65	}
66	return NewLinksToIterator(db, colQuads, s.Links)
67}
68
69func (s Quads) Optimize(r shape.Optimizer) (shape.Shape, bool) {
70	return s, false
71}
72
73const int64Adjust = 1 << 63
74
75// itos serializes int64 into a sortable string 13 chars long.
76func itos(i int64) string {
77	s := strconv.FormatUint(uint64(i)+int64Adjust, 32)
78	const z = "0000000000000"
79	return z[len(s):] + s
80}
81
82// stoi de-serializes int64 from a sortable string 13 chars long.
83func stoi(s string) int64 {
84	ret, err := strconv.ParseUint(s, 32, 64)
85	if err != nil {
86		//TODO handle error?
87		return 0
88	}
89	return int64(ret - int64Adjust)
90}
91
92func (opt Options) toFieldFilter(c shape.Comparison) ([]FieldFilter, bool) {
93	var op FilterOp
94	switch c.Op {
95	case iterator.CompareGT:
96		op = GT
97	case iterator.CompareGTE:
98		op = GTE
99	case iterator.CompareLT:
100		op = LT
101	case iterator.CompareLTE:
102		op = LTE
103	default:
104		return nil, false
105	}
106	fieldPath := func(s string) []string {
107		return []string{fldValue, s}
108	}
109
110	var filters []FieldFilter
111	switch v := c.Val.(type) {
112	case quad.String:
113		filters = []FieldFilter{
114			{Path: fieldPath(fldValData), Filter: op, Value: String(v)},
115			{Path: fieldPath(fldIRI), Filter: NotEqual, Value: Bool(true)},
116			{Path: fieldPath(fldBNode), Filter: NotEqual, Value: Bool(true)},
117		}
118	case quad.IRI:
119		filters = []FieldFilter{
120			{Path: fieldPath(fldValData), Filter: op, Value: String(v)},
121			{Path: fieldPath(fldIRI), Filter: Equal, Value: Bool(true)},
122		}
123	case quad.BNode:
124		filters = []FieldFilter{
125			{Path: fieldPath(fldValData), Filter: op, Value: String(v)},
126			{Path: fieldPath(fldBNode), Filter: Equal, Value: Bool(true)},
127		}
128	case quad.Int:
129		if opt.Number32 && (v < math.MinInt32 || v > math.MaxInt32) {
130			// switch to range on string values
131			filters = []FieldFilter{
132				{Path: fieldPath(fldValStrInt), Filter: op, Value: String(itos(int64(v)))},
133			}
134		} else {
135			filters = []FieldFilter{
136				{Path: fieldPath(fldValInt), Filter: op, Value: Int(v)},
137			}
138		}
139	case quad.Float:
140		filters = []FieldFilter{
141			{Path: fieldPath(fldValFloat), Filter: op, Value: Float(v)},
142		}
143	case quad.Time:
144		filters = []FieldFilter{
145			{Path: fieldPath(fldValTime), Filter: op, Value: Time(v)},
146		}
147	default:
148		return nil, false
149	}
150	return filters, true
151}
152
153func (qs *QuadStore) optimizeFilter(s shape.Filter) (shape.Shape, bool) {
154	if _, ok := s.From.(shape.AllNodes); !ok {
155		return s, false
156	}
157	var (
158		filters []FieldFilter
159		left    []shape.ValueFilter
160	)
161	fieldPath := func(s string) []string {
162		return []string{fldValue, s}
163	}
164	for _, f := range s.Filters {
165		switch f := f.(type) {
166		case shape.Comparison:
167			if fld, ok := qs.opt.toFieldFilter(f); ok {
168				filters = append(filters, fld...)
169				continue
170			}
171		case shape.Wildcard:
172			filters = append(filters, []FieldFilter{
173				{Path: fieldPath(fldValData), Filter: Regexp, Value: String(f.Regexp())},
174			}...)
175			continue
176		case shape.Regexp:
177			filters = append(filters, []FieldFilter{
178				{Path: fieldPath(fldValData), Filter: Regexp, Value: String(f.Re.String())},
179			}...)
180			if !f.Refs {
181				filters = append(filters, []FieldFilter{
182					{Path: fieldPath(fldIRI), Filter: NotEqual, Value: Bool(true)},
183					{Path: fieldPath(fldBNode), Filter: NotEqual, Value: Bool(true)},
184				}...)
185			}
186			continue
187		}
188		left = append(left, f)
189	}
190	if len(filters) == 0 {
191		return s, false
192	}
193	var ns shape.Shape = Shape{Collection: colNodes, Filters: filters}
194	if len(left) != 0 {
195		ns = shape.Filter{From: ns, Filters: left}
196	}
197	return ns, true
198}
199
200func (qs *QuadStore) optimizeQuads(s shape.Quads) (shape.Shape, bool) {
201	var (
202		links []Linkage
203		left  []shape.QuadFilter
204	)
205	for _, f := range s {
206		if v, ok := shape.One(f.Values); ok {
207			if h, ok := v.(NodeHash); ok {
208				links = append(links, Linkage{Dir: f.Dir, Val: h})
209				continue
210			}
211		}
212		left = append(left, f)
213	}
214	if len(links) == 0 {
215		return s, false
216	}
217	var ns shape.Shape = Quads{Links: links}
218	if len(left) != 0 {
219		ns = shape.Intersect{ns, shape.Quads(left)}
220	}
221	return s, true
222}
223
224func (qs *QuadStore) optimizePage(s shape.Page) (shape.Shape, bool) {
225	if s.Skip != 0 {
226		return s, false
227	}
228	switch f := s.From.(type) {
229	case shape.AllNodes:
230		return Shape{Collection: colNodes, Limit: s.Limit}, false
231	case Shape:
232		s.ApplyPage(shape.Page{Limit: f.Limit})
233		f.Limit = s.Limit
234		return f, true
235	case Quads:
236		s.ApplyPage(shape.Page{Limit: f.Limit})
237		f.Limit = s.Limit
238		return f, true
239	}
240	return s, false
241}
242