1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5% http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(mango_selector).
14
15
16-export([
17    normalize/1,
18    match/2,
19    has_required_fields/2,
20    is_constant_field/2
21]).
22
23
24-include_lib("couch/include/couch_db.hrl").
25-include("mango.hrl").
26
27
28% Validate and normalize each operator. This translates
29% every selector operator into a consistent version that
30% we can then rely on for all other selector functions.
31% See the definition of each step below for more information
32% on what each one does.
33normalize({[]}) ->
34    {[]};
35normalize(Selector) ->
36    Steps = [
37        fun norm_ops/1,
38        fun norm_fields/1,
39        fun norm_negations/1
40    ],
41    {NProps} = lists:foldl(fun(Step, Sel) -> Step(Sel) end, Selector, Steps),
42    FieldNames = [Name || {Name, _} <- NProps],
43    case lists:member(<<>>, FieldNames) of
44        true ->
45            ?MANGO_ERROR({invalid_selector, missing_field_name});
46        false ->
47            ok
48    end,
49    {NProps}.
50
51
52% Match a selector against a #doc{} or EJSON value.
53% This assumes that the Selector has been normalized.
54% Returns true or false.
55match(Selector, D) ->
56    couch_stats:increment_counter([mango, evaluate_selector]),
57    match_int(Selector, D).
58
59
60% An empty selector matches any value.
61match_int({[]}, _) ->
62    true;
63
64match_int(Selector, #doc{body=Body}) ->
65    match(Selector, Body, fun mango_json:cmp/2);
66
67match_int(Selector, {Props}) ->
68    match(Selector, {Props}, fun mango_json:cmp/2).
69
70% Convert each operator into a normalized version as well
71% as convert an implict operators into their explicit
72% versions.
73norm_ops({[{<<"$and">>, Args}]}) when is_list(Args) ->
74    {[{<<"$and">>, [norm_ops(A) || A <- Args]}]};
75norm_ops({[{<<"$and">>, Arg}]}) ->
76    ?MANGO_ERROR({bad_arg, '$and', Arg});
77
78norm_ops({[{<<"$or">>, Args}]}) when is_list(Args) ->
79    {[{<<"$or">>, [norm_ops(A) || A <- Args]}]};
80norm_ops({[{<<"$or">>, Arg}]}) ->
81    ?MANGO_ERROR({bad_arg, '$or', Arg});
82
83norm_ops({[{<<"$not">>, {_}=Arg}]}) ->
84    {[{<<"$not">>, norm_ops(Arg)}]};
85norm_ops({[{<<"$not">>, Arg}]}) ->
86    ?MANGO_ERROR({bad_arg, '$not', Arg});
87
88norm_ops({[{<<"$nor">>, Args}]}) when is_list(Args) ->
89    {[{<<"$nor">>, [norm_ops(A) || A <- Args]}]};
90norm_ops({[{<<"$nor">>, Arg}]}) ->
91    ?MANGO_ERROR({bad_arg, '$nor', Arg});
92
93norm_ops({[{<<"$in">>, Args}]} = Cond) when is_list(Args) ->
94    Cond;
95norm_ops({[{<<"$in">>, Arg}]}) ->
96    ?MANGO_ERROR({bad_arg, '$in', Arg});
97
98norm_ops({[{<<"$nin">>, Args}]} = Cond) when is_list(Args) ->
99    Cond;
100norm_ops({[{<<"$nin">>, Arg}]}) ->
101    ?MANGO_ERROR({bad_arg, '$nin', Arg});
102
103norm_ops({[{<<"$exists">>, Arg}]} = Cond) when is_boolean(Arg) ->
104    Cond;
105norm_ops({[{<<"$exists">>, Arg}]}) ->
106    ?MANGO_ERROR({bad_arg, '$exists', Arg});
107
108norm_ops({[{<<"$type">>, Arg}]} = Cond) when is_binary(Arg) ->
109    Cond;
110norm_ops({[{<<"$type">>, Arg}]}) ->
111    ?MANGO_ERROR({bad_arg, '$type', Arg});
112
113norm_ops({[{<<"$mod">>, [D, R]}]} = Cond) when is_integer(D), is_integer(R) ->
114    Cond;
115norm_ops({[{<<"$mod">>, Arg}]}) ->
116    ?MANGO_ERROR({bad_arg, '$mod', Arg});
117
118norm_ops({[{<<"$regex">>, Regex}]} = Cond) when is_binary(Regex) ->
119    case re:compile(Regex) of
120        {ok, _} ->
121            Cond;
122        _ ->
123            ?MANGO_ERROR({bad_arg, '$regex', Regex})
124    end;
125
126norm_ops({[{<<"$all">>, Args}]}) when is_list(Args) ->
127    {[{<<"$all">>, Args}]};
128norm_ops({[{<<"$all">>, Arg}]}) ->
129    ?MANGO_ERROR({bad_arg, '$all', Arg});
130
131norm_ops({[{<<"$elemMatch">>, {_}=Arg}]}) ->
132    {[{<<"$elemMatch">>, norm_ops(Arg)}]};
133norm_ops({[{<<"$elemMatch">>, Arg}]}) ->
134    ?MANGO_ERROR({bad_arg, '$elemMatch', Arg});
135
136norm_ops({[{<<"$allMatch">>, {_}=Arg}]}) ->
137    {[{<<"$allMatch">>, norm_ops(Arg)}]};
138norm_ops({[{<<"$allMatch">>, Arg}]}) ->
139    ?MANGO_ERROR({bad_arg, '$allMatch', Arg});
140
141norm_ops({[{<<"$keyMapMatch">>, {_}=Arg}]}) ->
142    {[{<<"$keyMapMatch">>, norm_ops(Arg)}]};
143norm_ops({[{<<"$keyMapMatch">>, Arg}]}) ->
144    ?MANGO_ERROR({bad_arg, '$keyMapMatch', Arg});
145
146norm_ops({[{<<"$size">>, Arg}]}) when is_integer(Arg), Arg >= 0 ->
147    {[{<<"$size">>, Arg}]};
148norm_ops({[{<<"$size">>, Arg}]}) ->
149    ?MANGO_ERROR({bad_arg, '$size', Arg});
150
151norm_ops({[{<<"$text">>, Arg}]}) when is_binary(Arg); is_number(Arg);
152        is_boolean(Arg) ->
153    {[{<<"$default">>, {[{<<"$text">>, Arg}]}}]};
154norm_ops({[{<<"$text">>, Arg}]}) ->
155    ?MANGO_ERROR({bad_arg, '$text', Arg});
156
157% Not technically an operator but we pass it through here
158% so that this function accepts its own output. This exists
159% so that $text can have a field name value which simplifies
160% logic elsewhere.
161norm_ops({[{<<"$default">>, _}]} = Selector) ->
162    Selector;
163
164% Terminals where we can't perform any validation
165% on the value because any value is acceptable.
166norm_ops({[{<<"$lt">>, _}]} = Cond) ->
167    Cond;
168norm_ops({[{<<"$lte">>, _}]} = Cond) ->
169    Cond;
170norm_ops({[{<<"$eq">>, _}]} = Cond) ->
171    Cond;
172norm_ops({[{<<"$ne">>, _}]} = Cond) ->
173    Cond;
174norm_ops({[{<<"$gte">>, _}]} = Cond) ->
175    Cond;
176norm_ops({[{<<"$gt">>, _}]} = Cond) ->
177    Cond;
178
179% Known but unsupported operators
180norm_ops({[{<<"$where">>, _}]}) ->
181    ?MANGO_ERROR({not_supported, '$where'});
182norm_ops({[{<<"$geoWithin">>, _}]}) ->
183    ?MANGO_ERROR({not_supported, '$geoWithin'});
184norm_ops({[{<<"$geoIntersects">>, _}]}) ->
185    ?MANGO_ERROR({not_supported, '$geoIntersects'});
186norm_ops({[{<<"$near">>, _}]}) ->
187    ?MANGO_ERROR({not_supported, '$near'});
188norm_ops({[{<<"$nearSphere">>, _}]}) ->
189    ?MANGO_ERROR({not_supported, '$nearSphere'});
190
191% Unknown operator
192norm_ops({[{<<"$", _/binary>>=Op, _}]}) ->
193    ?MANGO_ERROR({invalid_operator, Op});
194
195% A {Field: Cond} pair
196norm_ops({[{Field, Cond}]}) ->
197    {[{Field, norm_ops(Cond)}]};
198
199% An implicit $and
200norm_ops({[_, _ | _] = Props}) ->
201    {[{<<"$and">>, [norm_ops({[P]}) || P <- Props]}]};
202
203% A bare value condition means equality
204norm_ops(Value) ->
205    {[{<<"$eq">>, Value}]}.
206
207
208% This takes a selector and normalizes all of the
209% field names as far as possible. For instance:
210%
211%   Unnormalized:
212%     {foo: {$and: [{$gt: 5}, {$lt: 10}]}}
213%
214%   Normalized:
215%     {$and: [{foo: {$gt: 5}}, {foo: {$lt: 10}}]}
216%
217% And another example:
218%
219%   Unnormalized:
220%     {foo: {bar: {$gt: 10}}}
221%
222%   Normalized:
223%     {"foo.bar": {$gt: 10}}
224%
225% Its important to note that we can only normalize
226% field names like this through boolean operators where
227% we can gaurantee commutativity. We can't necessarily
228% do the same through the '$elemMatch' or '$allMatch'
229% operators but we can apply the same algorithm to its
230% arguments.
231norm_fields({[]}) ->
232    {[]};
233norm_fields(Selector) ->
234    norm_fields(Selector, <<>>).
235
236
237% Operators where we can push the field names further
238% down the operator tree
239norm_fields({[{<<"$and">>, Args}]}, Path) ->
240    {[{<<"$and">>, [norm_fields(A, Path) || A <- Args]}]};
241
242norm_fields({[{<<"$or">>, Args}]}, Path) ->
243    {[{<<"$or">>, [norm_fields(A, Path) || A <- Args]}]};
244
245norm_fields({[{<<"$not">>, Arg}]}, Path) ->
246    {[{<<"$not">>, norm_fields(Arg, Path)}]};
247
248norm_fields({[{<<"$nor">>, Args}]}, Path) ->
249    {[{<<"$nor">>, [norm_fields(A, Path) || A <- Args]}]};
250
251% Fields where we can normalize fields in the
252% operator arguments independently.
253norm_fields({[{<<"$elemMatch">>, Arg}]}, Path) ->
254    Cond = {[{<<"$elemMatch">>, norm_fields(Arg)}]},
255    {[{Path, Cond}]};
256
257norm_fields({[{<<"$allMatch">>, Arg}]}, Path) ->
258    Cond = {[{<<"$allMatch">>, norm_fields(Arg)}]},
259    {[{Path, Cond}]};
260
261norm_fields({[{<<"$keyMapMatch">>, Arg}]}, Path) ->
262    Cond = {[{<<"$keyMapMatch">>, norm_fields(Arg)}]},
263    {[{Path, Cond}]};
264
265
266% The text operator operates against the internal
267% $default field. This also asserts that the $default
268% field is at the root as well as that it only has
269% a $text operator applied.
270norm_fields({[{<<"$default">>, {[{<<"$text">>, _Arg}]}}]}=Sel, <<>>) ->
271    Sel;
272norm_fields({[{<<"$default">>, _}]} = Selector, _) ->
273    ?MANGO_ERROR({bad_field, Selector});
274
275
276% Any other operator is a terminal below which no
277% field names should exist. Set the path to this
278% terminal and return it.
279norm_fields({[{<<"$", _/binary>>, _}]} = Cond, Path) ->
280    {[{Path, Cond}]};
281
282% We've found a field name. Append it to the path
283% and skip this node as we unroll the stack as
284% the full path will be further down the branch.
285norm_fields({[{Field, Cond}]}, <<>>) ->
286    % Don't include the '.' for the first element of
287    % the path.
288    norm_fields(Cond, Field);
289norm_fields({[{Field, Cond}]}, Path) ->
290    norm_fields(Cond, <<Path/binary, ".", Field/binary>>);
291
292% An empty selector
293norm_fields({[]}, Path) ->
294    {Path, {[]}};
295
296% Else we have an invalid selector
297norm_fields(BadSelector, _) ->
298    ?MANGO_ERROR({bad_field, BadSelector}).
299
300
301% Take all the negation operators and move the logic
302% as far down the branch as possible. This does things
303% like:
304%
305%   Unnormalized:
306%     {$not: {foo: {$gt: 10}}}
307%
308%   Normalized:
309%     {foo: {$lte: 10}}
310%
311% And we also apply DeMorgan's laws
312%
313%   Unnormalized:
314%     {$not: {$and: [{foo: {$gt: 10}}, {foo: {$lt: 5}}]}}
315%
316%   Normalized:
317%     {$or: [{foo: {$lte: 10}}, {foo: {$gte: 5}}]}
318%
319% This logic is important because we can't "see" through
320% a '$not' operator to be able to locate indices that may
321% service a specific query. Though if we move the negations
322% down to the terminals we may be able to negate specific
323% operators which allows us to find usable indices.
324
325% Operators that cause a negation
326norm_negations({[{<<"$not">>, Arg}]}) ->
327    negate(Arg);
328
329norm_negations({[{<<"$nor">>, Args}]}) ->
330    {[{<<"$and">>, [negate(A) || A <- Args]}]};
331
332% Operators that we merely seek through as we look for
333% negations.
334norm_negations({[{<<"$and">>, Args}]}) ->
335    {[{<<"$and">>, [norm_negations(A) || A <- Args]}]};
336
337norm_negations({[{<<"$or">>, Args}]}) ->
338    {[{<<"$or">>, [norm_negations(A) || A <- Args]}]};
339
340norm_negations({[{<<"$elemMatch">>, Arg}]}) ->
341    {[{<<"$elemMatch">>, norm_negations(Arg)}]};
342
343norm_negations({[{<<"$allMatch">>, Arg}]}) ->
344    {[{<<"$allMatch">>, norm_negations(Arg)}]};
345
346norm_negations({[{<<"$keyMapMatch">>, Arg}]}) ->
347    {[{<<"$keyMapMatch">>, norm_negations(Arg)}]};
348
349% All other conditions can't introduce negations anywhere
350% further down the operator tree.
351norm_negations(Cond) ->
352    Cond.
353
354
355% Actually negate an expression. Make sure and read up
356% on DeMorgan's laws if you're trying to read this, but
357% in a nutshell:
358%
359%     NOT(a AND b) == NOT(a) OR NOT(b)
360%     NOT(a OR b) == NOT(a) AND NOT(b)
361%
362% Also notice that if a negation hits another negation
363% operator that we just nullify the combination. Its
364% possible that below the nullification we have more
365% negations so we have to recurse back to norm_negations/1.
366
367% Negating negation, nullify but recurse to
368% norm_negations/1
369negate({[{<<"$not">>, Arg}]}) ->
370    norm_negations(Arg);
371
372negate({[{<<"$nor">>, Args}]}) ->
373    {[{<<"$or">>, [norm_negations(A) || A <- Args]}]};
374
375% DeMorgan Negations
376negate({[{<<"$and">>, Args}]}) ->
377    {[{<<"$or">>, [negate(A) || A <- Args]}]};
378
379negate({[{<<"$or">>, Args}]}) ->
380    {[{<<"$and">>, [negate(A) || A <- Args]}]};
381
382negate({[{<<"$default">>, _}]} = Arg) ->
383    ?MANGO_ERROR({bad_arg, '$not', Arg});
384
385% Negating comparison operators is straight forward
386negate({[{<<"$lt">>, Arg}]}) ->
387    {[{<<"$gte">>, Arg}]};
388negate({[{<<"$lte">>, Arg}]}) ->
389    {[{<<"$gt">>, Arg}]};
390negate({[{<<"$eq">>, Arg}]}) ->
391    {[{<<"$ne">>, Arg}]};
392negate({[{<<"$ne">>, Arg}]}) ->
393    {[{<<"$eq">>, Arg}]};
394negate({[{<<"$gte">>, Arg}]}) ->
395    {[{<<"$lt">>, Arg}]};
396negate({[{<<"$gt">>, Arg}]}) ->
397    {[{<<"$lte">>, Arg}]};
398negate({[{<<"$in">>, Args}]}) ->
399    {[{<<"$nin">>, Args}]};
400negate({[{<<"$nin">>, Args}]}) ->
401    {[{<<"$in">>, Args}]};
402
403% We can also trivially negate the exists operator
404negate({[{<<"$exists">>, Arg}]}) ->
405    {[{<<"$exists">>, not Arg}]};
406
407% Anything else we have to just terminate the
408% negation by reinserting the negation operator
409negate({[{<<"$", _/binary>>, _}]} = Cond) ->
410    {[{<<"$not">>, Cond}]};
411
412% Finally, negating a field just means we negate its
413% condition.
414negate({[{Field, Cond}]}) ->
415    {[{Field, negate(Cond)}]}.
416
417
418% We need to treat an empty array as always true. This will be applied
419% for $or, $in, $all, $nin as well.
420match({[{<<"$and">>, []}]}, _, _) ->
421    true;
422match({[{<<"$and">>, Args}]}, Value, Cmp) ->
423    Pred = fun(SubSel) -> match(SubSel, Value, Cmp) end,
424    lists:all(Pred, Args);
425
426match({[{<<"$or">>, []}]}, _, _) ->
427    true;
428match({[{<<"$or">>, Args}]}, Value, Cmp) ->
429    Pred = fun(SubSel) -> match(SubSel, Value, Cmp) end,
430    lists:any(Pred, Args);
431
432match({[{<<"$not">>, Arg}]}, Value, Cmp) ->
433    not match(Arg, Value, Cmp);
434
435match({[{<<"$all">>, []}]}, _, _) ->
436    false;
437% All of the values in Args must exist in Values or
438% Values == hd(Args) if Args is a single element list
439% that contains a list.
440match({[{<<"$all">>, Args}]}, Values, _Cmp) when is_list(Values) ->
441    Pred = fun(A) -> lists:member(A, Values) end,
442    HasArgs = lists:all(Pred, Args),
443    IsArgs = case Args of
444        [A] when is_list(A) ->
445            A == Values;
446        _ ->
447            false
448    end,
449    HasArgs orelse IsArgs;
450match({[{<<"$all">>, _Args}]}, _Values, _Cmp) ->
451    false;
452
453%% This is for $elemMatch, $allMatch, and possibly $in because of our normalizer.
454%% A selector such as {"field_name": {"$elemMatch": {"$gte": 80, "$lt": 85}}}
455%% gets normalized to:
456%% {[{<<"field_name">>,
457%%     {[{<<"$elemMatch">>,
458%%         {[{<<"$and">>, [
459%%             {[{<<>>,{[{<<"$gte">>,80}]}}]},
460%%             {[{<<>>,{[{<<"$lt">>,85}]}}]}
461%%         ]}]}
462%%     }]}
463%% }]}.
464%% So we filter out the <<>>.
465match({[{<<>>, Arg}]}, Values, Cmp) ->
466    match(Arg, Values, Cmp);
467
468% Matches when any element in values matches the
469% sub-selector Arg.
470match({[{<<"$elemMatch">>, Arg}]}, Values, Cmp) when is_list(Values) ->
471    try
472        lists:foreach(fun(V) ->
473            case match(Arg, V, Cmp) of
474                true -> throw(matched);
475                _ -> ok
476            end
477        end, Values),
478        false
479    catch
480        throw:matched ->
481            true;
482        _:_ ->
483            false
484    end;
485match({[{<<"$elemMatch">>, _Arg}]}, _Value, _Cmp) ->
486    false;
487
488% Matches when all elements in values match the
489% sub-selector Arg.
490match({[{<<"$allMatch">>, Arg}]}, [_ | _] = Values, Cmp) ->
491    try
492        lists:foreach(fun(V) ->
493            case match(Arg, V, Cmp) of
494              false -> throw(unmatched);
495              _ -> ok
496            end
497        end, Values),
498        true
499    catch
500        _:_ ->
501            false
502    end;
503match({[{<<"$allMatch">>, _Arg}]}, _Value, _Cmp) ->
504    false;
505
506% Matches when any key in the map value matches the
507% sub-selector Arg.
508match({[{<<"$keyMapMatch">>, Arg}]}, Value, Cmp) when is_tuple(Value) ->
509    try
510        lists:foreach(fun(V) ->
511            case match(Arg, V, Cmp) of
512                true -> throw(matched);
513                _ -> ok
514            end
515        end, [Key || {Key, _} <- element(1, Value)]),
516        false
517    catch
518        throw:matched ->
519            true;
520        _:_ ->
521            false
522    end;
523match({[{<<"$keyMapMatch">>, _Arg}]}, _Value, _Cmp) ->
524    false;
525
526% Our comparison operators are fairly straight forward
527match({[{<<"$lt">>, Arg}]}, Value, Cmp) ->
528    Cmp(Value, Arg) < 0;
529match({[{<<"$lte">>, Arg}]}, Value, Cmp) ->
530    Cmp(Value, Arg) =< 0;
531match({[{<<"$eq">>, Arg}]}, Value, Cmp) ->
532    Cmp(Value, Arg) == 0;
533match({[{<<"$ne">>, Arg}]}, Value, Cmp) ->
534    Cmp(Value, Arg) /= 0;
535match({[{<<"$gte">>, Arg}]}, Value, Cmp) ->
536    Cmp(Value, Arg) >= 0;
537match({[{<<"$gt">>, Arg}]}, Value, Cmp) ->
538    Cmp(Value, Arg) > 0;
539
540match({[{<<"$in">>, []}]}, _, _) ->
541    false;
542match({[{<<"$in">>, Args}]}, Values, Cmp) when is_list(Values)->
543    Pred = fun(Arg) ->
544        lists:foldl(fun(Value,Match) ->
545            (Cmp(Value, Arg) == 0) or Match
546        end, false, Values)
547    end,
548    lists:any(Pred, Args);
549match({[{<<"$in">>, Args}]}, Value, Cmp) ->
550    Pred = fun(Arg) -> Cmp(Value, Arg) == 0 end,
551    lists:any(Pred, Args);
552
553match({[{<<"$nin">>, []}]}, _, _) ->
554    true;
555match({[{<<"$nin">>, Args}]}, Values, Cmp) when is_list(Values)->
556    not match({[{<<"$in">>, Args}]}, Values, Cmp);
557match({[{<<"$nin">>, Args}]}, Value, Cmp) ->
558    Pred = fun(Arg) -> Cmp(Value, Arg) /= 0 end,
559    lists:all(Pred, Args);
560
561% This logic is a bit subtle. Basically, if value is
562% not undefined, then it exists.
563match({[{<<"$exists">>, ShouldExist}]}, Value, _Cmp) ->
564    Exists = Value /= undefined,
565    ShouldExist andalso Exists;
566
567match({[{<<"$type">>, Arg}]}, Value, _Cmp) when is_binary(Arg) ->
568    Arg == mango_json:type(Value);
569
570match({[{<<"$mod">>, [D, R]}]}, Value, _Cmp) when is_integer(Value) ->
571    Value rem D == R;
572match({[{<<"$mod">>, _}]}, _Value, _Cmp) ->
573    false;
574
575match({[{<<"$regex">>, Regex}]}, Value, _Cmp) when is_binary(Value) ->
576    try
577        match == re:run(Value, Regex, [{capture, none}])
578    catch _:_ ->
579        false
580    end;
581match({[{<<"$regex">>, _}]}, _Value, _Cmp) ->
582    false;
583
584match({[{<<"$size">>, Arg}]}, Values, _Cmp) when is_list(Values) ->
585    length(Values) == Arg;
586match({[{<<"$size">>, _}]}, _Value, _Cmp) ->
587    false;
588
589% We don't have any choice but to believe that the text
590% index returned valid matches
591match({[{<<"$default">>, _}]}, _Value, _Cmp) ->
592    true;
593
594% All other operators are internal assertion errors for
595% matching because we either should've removed them during
596% normalization or something else broke.
597match({[{<<"$", _/binary>>=Op, _}]}, _, _) ->
598    ?MANGO_ERROR({invalid_operator, Op});
599
600% We need to traverse value to find field. The call to
601% mango_doc:get_field/2 may return either not_found or
602% bad_path in which case matching fails.
603match({[{Field, Cond}]}, Value, Cmp) ->
604    case mango_doc:get_field(Value, Field) of
605        not_found when Cond == {[{<<"$exists">>, false}]} ->
606            true;
607        not_found ->
608            false;
609        bad_path ->
610            false;
611        SubValue when Field == <<"_id">> ->
612            match(Cond, SubValue, fun mango_json:cmp_raw/2);
613        SubValue ->
614            match(Cond, SubValue, Cmp)
615    end;
616
617match({[_, _ | _] = _Props} = Sel, _Value, _Cmp) ->
618    erlang:error({unnormalized_selector, Sel}).
619
620
621% Returns true if Selector requires all
622% fields in RequiredFields to exist in any matching documents.
623
624% For each condition in the selector, check
625% whether the field is in RequiredFields.
626% If it is, remove it from RequiredFields and continue
627% until we match then all or run out of selector to
628% match against.
629
630has_required_fields(Selector, RequiredFields) ->
631    Remainder = has_required_fields_int(Selector, RequiredFields),
632    Remainder == [].
633
634% Empty selector
635has_required_fields_int({[]}, Remainder) ->
636    Remainder;
637
638% No more required fields
639has_required_fields_int(_, []) ->
640    [];
641
642% No more selector
643has_required_fields_int([], Remainder) ->
644    Remainder;
645
646has_required_fields_int(Selector, RequiredFields) when not is_list(Selector) ->
647    has_required_fields_int([Selector], RequiredFields);
648
649% We can "see" through $and operator. Iterate
650% through the list of child operators.
651has_required_fields_int([{[{<<"$and">>, Args}]}], RequiredFields)
652        when is_list(Args) ->
653    has_required_fields_int(Args, RequiredFields);
654
655% We can "see" through $or operator. Required fields
656% must be covered by all children.
657has_required_fields_int([{[{<<"$or">>, Args}]} | Rest], RequiredFields)
658        when is_list(Args) ->
659    Remainder0 = lists:foldl(fun(Arg, Acc) ->
660        % for each child test coverage against the full
661        % set of required fields
662        Remainder = has_required_fields_int(Arg, RequiredFields),
663
664        % collect the remaining fields across all children
665        Acc ++ Remainder
666    end, [], Args),
667
668    % remove duplicate fields
669    Remainder1 = lists:usort(Remainder0),
670    has_required_fields_int(Rest, Remainder1);
671
672% Handle $and operator where it has peers. Required fields
673% can be covered by any child.
674has_required_fields_int([{[{<<"$and">>, Args}]} | Rest], RequiredFields)
675        when is_list(Args) ->
676    Remainder = has_required_fields_int(Args, RequiredFields),
677    has_required_fields_int(Rest, Remainder);
678
679has_required_fields_int([{[{Field, Cond}]} | Rest], RequiredFields) ->
680    case Cond of
681        % $exists:false is a special case - this is the only operator
682        % that explicitly does not require a field to exist
683        {[{<<"$exists">>, false}]} ->
684            has_required_fields_int(Rest, RequiredFields);
685        _ ->
686            has_required_fields_int(Rest, lists:delete(Field, RequiredFields))
687    end.
688
689
690% Returns true if a field in the selector is a constant value e.g. {a: {$eq: 1}}
691is_constant_field({[]}, _Field) ->
692    false;
693
694is_constant_field(Selector, Field) when not is_list(Selector) ->
695    is_constant_field([Selector], Field);
696
697is_constant_field([], _Field) ->
698    false;
699
700is_constant_field([{[{<<"$and">>, Args}]}], Field) when is_list(Args) ->
701    lists:any(fun(Arg) -> is_constant_field(Arg, Field) end, Args);
702
703is_constant_field([{[{<<"$and">>, Args}]}], Field) ->
704    is_constant_field(Args, Field);
705
706is_constant_field([{[{Field, {[{Cond, _Val}]}}]} | _Rest], Field) ->
707    Cond =:= <<"$eq">>;
708
709is_constant_field([{[{_UnMatched, _}]} | Rest], Field) ->
710    is_constant_field(Rest, Field).
711
712
713%%%%%%%% module tests below %%%%%%%%
714
715-ifdef(TEST).
716-include_lib("eunit/include/eunit.hrl").
717
718is_constant_field_basic_test() ->
719    Selector = normalize({[{<<"A">>, <<"foo">>}]}),
720    Field = <<"A">>,
721    ?assertEqual(true, is_constant_field(Selector, Field)).
722
723is_constant_field_basic_two_test() ->
724    Selector = normalize({[{<<"$and">>,
725        [
726            {[{<<"cars">>,{[{<<"$eq">>,<<"2">>}]}}]},
727            {[{<<"age">>,{[{<<"$gt">>,10}]}}]}
728        ]
729    }]}),
730    Field = <<"cars">>,
731    ?assertEqual(true, is_constant_field(Selector, Field)).
732
733is_constant_field_not_eq_test() ->
734    Selector = normalize({[{<<"$and">>,
735        [
736            {[{<<"cars">>,{[{<<"$eq">>,<<"2">>}]}}]},
737            {[{<<"age">>,{[{<<"$gt">>,10}]}}]}
738        ]
739    }]}),
740    Field = <<"age">>,
741    ?assertEqual(false, is_constant_field(Selector, Field)).
742
743is_constant_field_missing_field_test() ->
744    Selector = normalize({[{<<"$and">>,
745        [
746            {[{<<"cars">>,{[{<<"$eq">>,<<"2">>}]}}]},
747            {[{<<"age">>,{[{<<"$gt">>,10}]}}]}
748        ]
749    }]}),
750    Field = <<"wrong">>,
751    ?assertEqual(false, is_constant_field(Selector, Field)).
752
753is_constant_field_or_field_test() ->
754    Selector = {[{<<"$or">>,
755          [
756              {[{<<"A">>, <<"foo">>}]},
757              {[{<<"B">>, <<"foo">>}]}
758          ]
759    }]},
760    Normalized = normalize(Selector),
761    Field = <<"A">>,
762    ?assertEqual(false, is_constant_field(Normalized, Field)).
763
764is_constant_field_empty_selector_test() ->
765    Selector = normalize({[]}),
766    Field = <<"wrong">>,
767    ?assertEqual(false, is_constant_field(Selector, Field)).
768
769is_constant_nested_and_test() ->
770    Selector1 = {[{<<"$and">>,
771          [
772              {[{<<"A">>, <<"foo">>}]}
773          ]
774    }]},
775    Selector2 = {[{<<"$and">>,
776          [
777              {[{<<"B">>, {[{<<"$gt">>,10}]}}]}
778          ]
779    }]},
780    Selector = {[{<<"$and">>,
781          [
782              Selector1,
783              Selector2
784          ]
785    }]},
786
787    Normalized = normalize(Selector),
788    ?assertEqual(true, is_constant_field(Normalized, <<"A">>)),
789    ?assertEqual(false, is_constant_field(Normalized, <<"B">>)).
790
791is_constant_combined_or_and_equals_test() ->
792    Selector = {[{<<"A">>, "foo"},
793          {<<"$or">>,
794              [
795                  {[{<<"B">>, <<"bar">>}]},
796                  {[{<<"B">>, <<"baz">>}]}
797              ]
798          },
799		  {<<"C">>, "qux"}
800	]},
801    Normalized = normalize(Selector),
802    ?assertEqual(true, is_constant_field(Normalized, <<"C">>)),
803    ?assertEqual(false, is_constant_field(Normalized, <<"B">>)).
804
805has_required_fields_basic_test() ->
806    RequiredFields = [<<"A">>],
807    Selector = {[{<<"A">>, <<"foo">>}]},
808    Normalized = normalize(Selector),
809    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
810
811has_required_fields_basic_failure_test() ->
812    RequiredFields = [<<"B">>],
813    Selector = {[{<<"A">>, <<"foo">>}]},
814    Normalized = normalize(Selector),
815    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
816
817has_required_fields_empty_selector_test() ->
818    RequiredFields = [<<"A">>],
819    Selector = {[]},
820    Normalized = normalize(Selector),
821    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
822
823has_required_fields_exists_false_test() ->
824    RequiredFields = [<<"A">>],
825    Selector = {[{<<"A">>,{[{<<"$exists">>, false}]}}]},
826    Normalized = normalize(Selector),
827    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
828
829has_required_fields_and_true_test() ->
830    RequiredFields = [<<"A">>],
831    Selector = {[{<<"$and">>,
832          [
833              {[{<<"A">>, <<"foo">>}]},
834              {[{<<"B">>, <<"foo">>}]}
835          ]
836    }]},
837    Normalized = normalize(Selector),
838    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
839
840has_required_fields_nested_and_true_test() ->
841    RequiredFields = [<<"A">>, <<"B">>],
842    Selector1 = {[{<<"$and">>,
843          [
844              {[{<<"A">>, <<"foo">>}]}
845          ]
846    }]},
847    Selector2 = {[{<<"$and">>,
848          [
849              {[{<<"B">>, <<"foo">>}]}
850          ]
851    }]},
852    Selector = {[{<<"$and">>,
853          [
854              Selector1,
855              Selector2
856          ]
857    }]},
858
859    Normalized = normalize(Selector),
860    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
861
862has_required_fields_and_false_test() ->
863    RequiredFields = [<<"A">>, <<"C">>],
864    Selector = {[{<<"$and">>,
865          [
866              {[{<<"A">>, <<"foo">>}]},
867              {[{<<"B">>, <<"foo">>}]}
868          ]
869    }]},
870    Normalized = normalize(Selector),
871    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
872
873has_required_fields_or_false_test() ->
874    RequiredFields = [<<"A">>],
875    Selector = {[{<<"$or">>,
876          [
877              {[{<<"A">>, <<"foo">>}]},
878              {[{<<"B">>, <<"foo">>}]}
879          ]
880    }]},
881    Normalized = normalize(Selector),
882    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
883
884has_required_fields_or_true_test() ->
885    RequiredFields = [<<"A">>, <<"B">>, <<"C">>],
886    Selector = {[{<<"A">>, "foo"},
887          {<<"$or">>,
888              [
889                  {[{<<"B">>, <<"bar">>}]},
890                  {[{<<"B">>, <<"baz">>}]}
891              ]
892          },
893		  {<<"C">>, "qux"}
894	]},
895    Normalized = normalize(Selector),
896    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
897
898has_required_fields_and_nested_or_true_test() ->
899    RequiredFields = [<<"A">>, <<"B">>],
900    Selector1 = {[{<<"$and">>,
901          [
902              {[{<<"A">>, <<"foo">>}]}
903          ]
904    }]},
905    Selector2 = {[{<<"$or">>,
906          [
907              {[{<<"B">>, <<"foo">>}]},
908              {[{<<"B">>, <<"foo">>}]}
909          ]
910    }]},
911    Selector = {[{<<"$and">>,
912          [
913              Selector1,
914              Selector2
915          ]
916    }]},
917    Normalized = normalize(Selector),
918    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)),
919
920    SelectorReverse = {[{<<"$and">>,
921          [
922              Selector2,
923              Selector1
924          ]
925    }]},
926    NormalizedReverse = normalize(SelectorReverse),
927    ?assertEqual(true, has_required_fields(NormalizedReverse, RequiredFields)).
928
929has_required_fields_and_nested_or_false_test() ->
930    RequiredFields = [<<"A">>, <<"B">>],
931    Selector1 = {[{<<"$and">>,
932          [
933              {[{<<"A">>, <<"foo">>}]}
934          ]
935    }]},
936    Selector2 = {[{<<"$or">>,
937          [
938              {[{<<"A">>, <<"foo">>}]},
939              {[{<<"B">>, <<"foo">>}]}
940          ]
941    }]},
942    Selector = {[{<<"$and">>,
943          [
944              Selector1,
945              Selector2
946          ]
947    }]},
948    Normalized = normalize(Selector),
949    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)),
950
951    SelectorReverse = {[{<<"$and">>,
952          [
953              Selector2,
954              Selector1
955          ]
956    }]},
957
958    NormalizedReverse = normalize(SelectorReverse),
959    ?assertEqual(false, has_required_fields(NormalizedReverse, RequiredFields)).
960
961has_required_fields_or_nested_and_true_test() ->
962    RequiredFields = [<<"A">>],
963    Selector1 = {[{<<"$and">>,
964          [
965              {[{<<"A">>, <<"foo">>}]}
966          ]
967    }]},
968    Selector2 = {[{<<"$and">>,
969          [
970              {[{<<"A">>, <<"foo">>}]}
971          ]
972    }]},
973    Selector = {[{<<"$or">>,
974          [
975              Selector1,
976              Selector2
977          ]
978    }]},
979    Normalized = normalize(Selector),
980    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
981
982has_required_fields_or_nested_or_true_test() ->
983    RequiredFields = [<<"A">>],
984    Selector1 = {[{<<"$or">>,
985          [
986              {[{<<"A">>, <<"foo">>}]}
987          ]
988    }]},
989    Selector2 = {[{<<"$or">>,
990          [
991              {[{<<"A">>, <<"bar">>}]}
992          ]
993    }]},
994    Selector = {[{<<"$or">>,
995          [
996              Selector1,
997              Selector2
998          ]
999    }]},
1000    Normalized = normalize(Selector),
1001    ?assertEqual(true, has_required_fields(Normalized, RequiredFields)).
1002
1003has_required_fields_or_nested_or_false_test() ->
1004    RequiredFields = [<<"A">>],
1005    Selector1 = {[{<<"$or">>,
1006          [
1007              {[{<<"A">>, <<"foo">>}]}
1008          ]
1009    }]},
1010    Selector2 = {[{<<"$or">>,
1011          [
1012              {[{<<"B">>, <<"bar">>}]}
1013          ]
1014    }]},
1015    Selector = {[{<<"$or">>,
1016          [
1017              Selector1,
1018              Selector2
1019          ]
1020    }]},
1021    Normalized = normalize(Selector),
1022    ?assertEqual(false, has_required_fields(Normalized, RequiredFields)).
1023
1024-endif.
1025