1package nosql 2 3import ( 4 "fmt" 5 "math" 6 "strconv" 7 8 "github.com/cayleygraph/cayley/graph" 9 "github.com/cayleygraph/cayley/graph/iterator" 10 "github.com/cayleygraph/cayley/graph/shape" 11 "github.com/cayleygraph/cayley/quad" 12) 13 14func (qs *QuadStore) OptimizeIterator(it graph.Iterator) (graph.Iterator, bool) { 15 return it, false // everything done is shapes 16} 17 18var _ shape.Optimizer = (*QuadStore)(nil) 19 20func (qs *QuadStore) OptimizeShape(s shape.Shape) (shape.Shape, bool) { 21 switch s := s.(type) { 22 case shape.Quads: 23 return qs.optimizeQuads(s) 24 case shape.Filter: 25 return qs.optimizeFilter(s) 26 case shape.Page: 27 return qs.optimizePage(s) 28 case shape.Composite: 29 if s2, opt := s.Simplify().Optimize(qs); opt { 30 return s2, true 31 } 32 } 33 return s, false 34} 35 36// Shape is a shape representing a documents query with filters 37type Shape struct { 38 Collection string // name of the collection 39 Filters []FieldFilter // filters to select documents 40 Limit int64 // limits a number of documents 41} 42 43func (s Shape) BuildIterator(qs graph.QuadStore) graph.Iterator { 44 db, ok := qs.(*QuadStore) 45 if !ok { 46 return iterator.NewError(fmt.Errorf("not a nosql database: %T", qs)) 47 } 48 return NewIterator(db, s.Collection, s.Filters...) 49} 50 51func (s Shape) Optimize(r shape.Optimizer) (shape.Shape, bool) { 52 return s, false 53} 54 55// Quads is a shape representing a quads query 56type Quads struct { 57 Links []Linkage // filters to select quads 58 Limit int64 // limits a number of documents 59} 60 61func (s Quads) BuildIterator(qs graph.QuadStore) graph.Iterator { 62 db, ok := qs.(*QuadStore) 63 if !ok { 64 return iterator.NewError(fmt.Errorf("not a nosql database: %T", qs)) 65 } 66 return NewLinksToIterator(db, colQuads, s.Links) 67} 68 69func (s Quads) Optimize(r shape.Optimizer) (shape.Shape, bool) { 70 return s, false 71} 72 73const int64Adjust = 1 << 63 74 75// itos serializes int64 into a sortable string 13 chars long. 76func itos(i int64) string { 77 s := strconv.FormatUint(uint64(i)+int64Adjust, 32) 78 const z = "0000000000000" 79 return z[len(s):] + s 80} 81 82// stoi de-serializes int64 from a sortable string 13 chars long. 83func stoi(s string) int64 { 84 ret, err := strconv.ParseUint(s, 32, 64) 85 if err != nil { 86 //TODO handle error? 87 return 0 88 } 89 return int64(ret - int64Adjust) 90} 91 92func (opt Options) toFieldFilter(c shape.Comparison) ([]FieldFilter, bool) { 93 var op FilterOp 94 switch c.Op { 95 case iterator.CompareGT: 96 op = GT 97 case iterator.CompareGTE: 98 op = GTE 99 case iterator.CompareLT: 100 op = LT 101 case iterator.CompareLTE: 102 op = LTE 103 default: 104 return nil, false 105 } 106 fieldPath := func(s string) []string { 107 return []string{fldValue, s} 108 } 109 110 var filters []FieldFilter 111 switch v := c.Val.(type) { 112 case quad.String: 113 filters = []FieldFilter{ 114 {Path: fieldPath(fldValData), Filter: op, Value: String(v)}, 115 {Path: fieldPath(fldIRI), Filter: NotEqual, Value: Bool(true)}, 116 {Path: fieldPath(fldBNode), Filter: NotEqual, Value: Bool(true)}, 117 } 118 case quad.IRI: 119 filters = []FieldFilter{ 120 {Path: fieldPath(fldValData), Filter: op, Value: String(v)}, 121 {Path: fieldPath(fldIRI), Filter: Equal, Value: Bool(true)}, 122 } 123 case quad.BNode: 124 filters = []FieldFilter{ 125 {Path: fieldPath(fldValData), Filter: op, Value: String(v)}, 126 {Path: fieldPath(fldBNode), Filter: Equal, Value: Bool(true)}, 127 } 128 case quad.Int: 129 if opt.Number32 && (v < math.MinInt32 || v > math.MaxInt32) { 130 // switch to range on string values 131 filters = []FieldFilter{ 132 {Path: fieldPath(fldValStrInt), Filter: op, Value: String(itos(int64(v)))}, 133 } 134 } else { 135 filters = []FieldFilter{ 136 {Path: fieldPath(fldValInt), Filter: op, Value: Int(v)}, 137 } 138 } 139 case quad.Float: 140 filters = []FieldFilter{ 141 {Path: fieldPath(fldValFloat), Filter: op, Value: Float(v)}, 142 } 143 case quad.Time: 144 filters = []FieldFilter{ 145 {Path: fieldPath(fldValTime), Filter: op, Value: Time(v)}, 146 } 147 default: 148 return nil, false 149 } 150 return filters, true 151} 152 153func (qs *QuadStore) optimizeFilter(s shape.Filter) (shape.Shape, bool) { 154 if _, ok := s.From.(shape.AllNodes); !ok { 155 return s, false 156 } 157 var ( 158 filters []FieldFilter 159 left []shape.ValueFilter 160 ) 161 fieldPath := func(s string) []string { 162 return []string{fldValue, s} 163 } 164 for _, f := range s.Filters { 165 switch f := f.(type) { 166 case shape.Comparison: 167 if fld, ok := qs.opt.toFieldFilter(f); ok { 168 filters = append(filters, fld...) 169 continue 170 } 171 case shape.Wildcard: 172 filters = append(filters, []FieldFilter{ 173 {Path: fieldPath(fldValData), Filter: Regexp, Value: String(f.Regexp())}, 174 }...) 175 continue 176 case shape.Regexp: 177 filters = append(filters, []FieldFilter{ 178 {Path: fieldPath(fldValData), Filter: Regexp, Value: String(f.Re.String())}, 179 }...) 180 if !f.Refs { 181 filters = append(filters, []FieldFilter{ 182 {Path: fieldPath(fldIRI), Filter: NotEqual, Value: Bool(true)}, 183 {Path: fieldPath(fldBNode), Filter: NotEqual, Value: Bool(true)}, 184 }...) 185 } 186 continue 187 } 188 left = append(left, f) 189 } 190 if len(filters) == 0 { 191 return s, false 192 } 193 var ns shape.Shape = Shape{Collection: colNodes, Filters: filters} 194 if len(left) != 0 { 195 ns = shape.Filter{From: ns, Filters: left} 196 } 197 return ns, true 198} 199 200func (qs *QuadStore) optimizeQuads(s shape.Quads) (shape.Shape, bool) { 201 var ( 202 links []Linkage 203 left []shape.QuadFilter 204 ) 205 for _, f := range s { 206 if v, ok := shape.One(f.Values); ok { 207 if h, ok := v.(NodeHash); ok { 208 links = append(links, Linkage{Dir: f.Dir, Val: h}) 209 continue 210 } 211 } 212 left = append(left, f) 213 } 214 if len(links) == 0 { 215 return s, false 216 } 217 var ns shape.Shape = Quads{Links: links} 218 if len(left) != 0 { 219 ns = shape.Intersect{ns, shape.Quads(left)} 220 } 221 return s, true 222} 223 224func (qs *QuadStore) optimizePage(s shape.Page) (shape.Shape, bool) { 225 if s.Skip != 0 { 226 return s, false 227 } 228 switch f := s.From.(type) { 229 case shape.AllNodes: 230 return Shape{Collection: colNodes, Limit: s.Limit}, false 231 case Shape: 232 s.ApplyPage(shape.Page{Limit: f.Limit}) 233 f.Limit = s.Limit 234 return f, true 235 case Quads: 236 s.ApplyPage(shape.Page{Limit: f.Limit}) 237 f.Limit = s.Limit 238 return f, true 239 } 240 return s, false 241} 242