1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5% http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13
14%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
15
16-module(dreyfus_fabric_group2).
17
18-include("dreyfus.hrl").
19-include_lib("mem3/include/mem3.hrl").
20-include_lib("couch/include/couch_db.hrl").
21
22-export([go/4]).
23
24-record(state, {
25    limit,
26    sort,
27    total_hits,
28    total_grouped_hits,
29    top_groups,
30    counters,
31    start_args,
32    replacements,
33    ring_opts
34}).
35
36go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
37    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
38    dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName),
39    go(DbName, DDoc, IndexName, QueryArgs);
40
41go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
42    DesignName = dreyfus_util:get_design_docid(DDoc),
43    dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
44    Shards = dreyfus_util:get_shards(DbName, QueryArgs),
45    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
46    Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2,
47                          [DDoc, IndexName, dreyfus_util:export(QueryArgs)]),
48    Replacements = fabric_view:get_shard_replacements(DbName, Workers),
49    Counters = fabric_dict:init(Workers, nil),
50    RexiMon = fabric_util:create_monitors(Workers),
51    State = #state{
52        limit = QueryArgs#index_query_args.limit,
53        sort = QueryArgs#index_query_args.sort,
54        total_hits = 0,
55        total_grouped_hits = 0,
56        top_groups = [],
57        counters = Counters,
58        start_args = [DDoc, IndexName, QueryArgs],
59        replacements = Replacements,
60        ring_opts = RingOpts
61    },
62    try
63        rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
64                        State, infinity, 1000 * 60 * 60)
65    after
66        rexi_monitor:stop(RexiMon),
67        fabric_util:cleanup(Workers)
68    end;
69go(DbName, DDoc, IndexName, OldArgs) ->
70    go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).
71
72
73handle_message({ok, NewTotalHits, NewTotalGroupedHits, NewTopGroups},
74               Shard, State0) ->
75    State = upgrade_state(State0),
76    #state{total_hits=TotalHits, total_grouped_hits=TotalGroupedHits,
77           top_groups=TopGroups, limit=Limit, sort=Sort} = State,
78    case fabric_dict:lookup_element(Shard, State#state.counters) of
79    undefined ->
80        %% already heard from someone else in this range
81        {ok, State};
82    nil ->
83        C1 = fabric_dict:store(Shard, ok, State#state.counters),
84        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
85        MergedTotalHits = NewTotalHits + TotalHits,
86        MergedTotalGroupedHits = NewTotalGroupedHits + TotalGroupedHits,
87        Sortable = make_sortable(Shard, NewTopGroups),
88        MergedTopGroups = merge_top_groups(TopGroups, Sortable, Limit, Sort),
89        State1 = State#state{
90            counters=C2,
91            total_hits=MergedTotalHits,
92            total_grouped_hits=MergedTotalGroupedHits,
93            top_groups=MergedTopGroups
94        },
95        case fabric_dict:any(nil, C2) of
96        true ->
97            {ok, State1};
98        false ->
99            {stop, {MergedTotalHits, MergedTotalGroupedHits,
100                    remove_sortable(MergedTopGroups)}}
101        end
102    end;
103
104handle_message(Error, Worker, State0) ->
105    State = upgrade_state(State0),
106    case dreyfus_fabric:handle_error_message(Error, Worker,
107      State#state.counters, State#state.replacements,
108      group2, State#state.start_args, State#state.ring_opts) of
109        {ok, Counters} ->
110            {ok, State#state{counters=Counters}};
111        {new_refs, NewRefs, NewCounters, NewReplacements} ->
112            NewState = State#state{
113                counters = NewCounters,
114                replacements = NewReplacements
115            },
116            {new_refs, NewRefs, NewState};
117        Else ->
118            Else
119    end.
120
121merge_top_groups([], TopGroups, _Limit, _Sort) ->
122    TopGroups;
123merge_top_groups(TopGroupsA, TopGroupsB, Limit, Sort) ->
124    lists:zipwith(fun(A,B) -> merge_top_group(A, B, Limit, Sort) end,
125                  TopGroupsA,
126                  TopGroupsB).
127
128merge_top_group({Name, TotalA, HitsA}, {Name, TotalB, HitsB}, Limit, Sort) ->
129    MergedHits = lists:sublist(dreyfus_util:sort(Sort, HitsA ++ HitsB), Limit),
130    {Name, TotalA + TotalB, MergedHits}.
131
132
133make_sortable(Shard, TopGroups) ->
134    [make_sortable_group(Shard, TopGroup) || TopGroup <- TopGroups].
135
136make_sortable_group(Shard, {Name, TotalHits, Hits}) ->
137    {Name, TotalHits, [make_sortable_hit(Shard, Hit) || Hit <- Hits]}.
138
139make_sortable_hit(Shard, Hit) ->
140    #sortable{item=Hit, order=Hit#hit.order, shard=Shard}.
141
142remove_sortable(SortableGroups) ->
143    [remove_sortable_group(G) || G <- SortableGroups].
144
145remove_sortable_group({Name, TotalHits, SortableHits}) ->
146    {Name, TotalHits, [remove_sortable_hit(H) || H <- SortableHits]}.
147
148remove_sortable_hit(SortableHit) ->
149    SortableHit#sortable.item.
150
151upgrade_state({state, Limit, Sort, TotalHits, TotalGroupedHits,
152               TopGroups, Counters}) ->
153    #state{limit = Limit, sort = Sort, total_hits = TotalHits,
154           total_grouped_hits = TotalGroupedHits,
155           top_groups = TopGroups, counters = Counters,
156           replacements = []};
157upgrade_state(#state{} = State) ->
158    State.
159