1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5% http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13
14%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*-
15
16-module(dreyfus_fabric_group1).
17
18-include("dreyfus.hrl").
19-include_lib("mem3/include/mem3.hrl").
20-include_lib("couch/include/couch_db.hrl").
21
22-export([go/4]).
23
24-record(state, {
25    limit,
26    sort,
27    top_groups,
28    counters,
29    start_args,
30    replacements,
31    ring_opts
32}).
33
34go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) ->
35    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
36    dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName),
37    go(DbName, DDoc, IndexName, QueryArgs);
38
39go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) ->
40    DesignName = dreyfus_util:get_design_docid(DDoc),
41    dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName),
42    Shards = dreyfus_util:get_shards(DbName, QueryArgs),
43    RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards),
44    Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc,
45         IndexName, dreyfus_util:export(QueryArgs)]),
46    Replacements = fabric_view:get_shard_replacements(DbName, Workers),
47    Counters = fabric_dict:init(Workers, nil),
48    RexiMon = fabric_util:create_monitors(Workers),
49    State = #state{
50        limit = QueryArgs#index_query_args.grouping#grouping.limit,
51        sort = QueryArgs#index_query_args.grouping#grouping.sort,
52        top_groups = [],
53        counters = Counters,
54        start_args = [DDoc, IndexName, QueryArgs],
55        replacements = Replacements,
56        ring_opts = RingOpts
57    },
58    try
59        rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
60                        State, infinity, 1000 * 60 * 60)
61    after
62        rexi_monitor:stop(RexiMon),
63        fabric_util:cleanup(Workers)
64    end;
65go(DbName, DDoc, IndexName, OldArgs) ->
66    go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)).
67
68handle_message({ok, NewTopGroups}, Shard, State0) ->
69    State = upgrade_state(State0),
70    #state{top_groups=TopGroups, limit=Limit, sort=Sort} = State,
71    case fabric_dict:lookup_element(Shard, State#state.counters) of
72    undefined ->
73        %% already heard from someone else in this range
74        {ok, State};
75    nil ->
76        C1 = fabric_dict:store(Shard, ok, State#state.counters),
77        C2 = fabric_view:remove_overlapping_shards(Shard, C1),
78        MergedTopGroups = merge_top_groups(TopGroups, make_sortable(Shard, NewTopGroups), Limit, Sort),
79        State1 = State#state{
80            counters=C2,
81            top_groups=MergedTopGroups
82        },
83        case fabric_dict:any(nil, C2) of
84        true ->
85            {ok, State1};
86        false ->
87            {stop, remove_sortable(MergedTopGroups)}
88        end
89    end;
90
91handle_message(Error, Worker, State0) ->
92    State = upgrade_state(State0),
93    case dreyfus_fabric:handle_error_message(Error, Worker,
94      State#state.counters, State#state.replacements,
95      group1, State#state.start_args, State#state.ring_opts) of
96        {ok, Counters} ->
97            {ok, State#state{counters=Counters}};
98        {new_refs, NewRefs, NewCounters, NewReplacements} ->
99            NewState = State#state{
100                counters = NewCounters,
101                replacements = NewReplacements
102            },
103            {new_refs, NewRefs, NewState};
104        Else ->
105            Else
106    end.
107
108merge_top_groups(TopGroupsA, TopGroupsB, Limit, Sort) ->
109    MergedGroups0 = TopGroupsA ++ TopGroupsB,
110    GNs = lists:usort([N || #sortable{item={N,_}} <- MergedGroups0]),
111    MergedGroups = [merge_top_group(Sort, [S || #sortable{item={N,_}}=S <- MergedGroups0, N =:= GN]) || GN <- GNs],
112    lists:sublist(dreyfus_util:sort(Sort, MergedGroups), Limit).
113
114merge_top_group(_Sort, [Group]) ->
115    Group;
116merge_top_group(Sort, [_, _] = Groups) ->
117    hd(dreyfus_util:sort(Sort, Groups)).
118
119make_sortable(Shard, TopGroups) ->
120    [#sortable{item=G, order=Order, shard=Shard} || {_Name, Order}=G <- TopGroups].
121
122remove_sortable(Sortables) ->
123    [Item || #sortable{item=Item} <- Sortables].
124
125upgrade_state({state, Limit, Sort, TopGroups, Counters}) ->
126    #state{limit=Limit, sort=Sort, top_groups=TopGroups, counters=Counters,
127           replacements=[]};
128upgrade_state(#state{}=State) ->
129    State.
130