1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13 14%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- 15 16-module(dreyfus_fabric_group1). 17 18-include("dreyfus.hrl"). 19-include_lib("mem3/include/mem3.hrl"). 20-include_lib("couch/include/couch_db.hrl"). 21 22-export([go/4]). 23 24-record(state, { 25 limit, 26 sort, 27 top_groups, 28 counters, 29 start_args, 30 replacements, 31 ring_opts 32}). 33 34go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> 35 {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), 36 dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName), 37 go(DbName, DDoc, IndexName, QueryArgs); 38 39go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> 40 DesignName = dreyfus_util:get_design_docid(DDoc), 41 dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), 42 Shards = dreyfus_util:get_shards(DbName, QueryArgs), 43 RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), 44 Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group1, [DDoc, 45 IndexName, dreyfus_util:export(QueryArgs)]), 46 Replacements = fabric_view:get_shard_replacements(DbName, Workers), 47 Counters = fabric_dict:init(Workers, nil), 48 RexiMon = fabric_util:create_monitors(Workers), 49 State = #state{ 50 limit = QueryArgs#index_query_args.grouping#grouping.limit, 51 sort = QueryArgs#index_query_args.grouping#grouping.sort, 52 top_groups = [], 53 counters = Counters, 54 start_args = [DDoc, IndexName, QueryArgs], 55 replacements = Replacements, 56 ring_opts = RingOpts 57 }, 58 try 59 rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, 60 State, infinity, 1000 * 60 * 60) 61 after 62 rexi_monitor:stop(RexiMon), 63 fabric_util:cleanup(Workers) 64 end; 65go(DbName, DDoc, IndexName, OldArgs) -> 66 go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). 67 68handle_message({ok, NewTopGroups}, Shard, State0) -> 69 State = upgrade_state(State0), 70 #state{top_groups=TopGroups, limit=Limit, sort=Sort} = State, 71 case fabric_dict:lookup_element(Shard, State#state.counters) of 72 undefined -> 73 %% already heard from someone else in this range 74 {ok, State}; 75 nil -> 76 C1 = fabric_dict:store(Shard, ok, State#state.counters), 77 C2 = fabric_view:remove_overlapping_shards(Shard, C1), 78 MergedTopGroups = merge_top_groups(TopGroups, make_sortable(Shard, NewTopGroups), Limit, Sort), 79 State1 = State#state{ 80 counters=C2, 81 top_groups=MergedTopGroups 82 }, 83 case fabric_dict:any(nil, C2) of 84 true -> 85 {ok, State1}; 86 false -> 87 {stop, remove_sortable(MergedTopGroups)} 88 end 89 end; 90 91handle_message(Error, Worker, State0) -> 92 State = upgrade_state(State0), 93 case dreyfus_fabric:handle_error_message(Error, Worker, 94 State#state.counters, State#state.replacements, 95 group1, State#state.start_args, State#state.ring_opts) of 96 {ok, Counters} -> 97 {ok, State#state{counters=Counters}}; 98 {new_refs, NewRefs, NewCounters, NewReplacements} -> 99 NewState = State#state{ 100 counters = NewCounters, 101 replacements = NewReplacements 102 }, 103 {new_refs, NewRefs, NewState}; 104 Else -> 105 Else 106 end. 107 108merge_top_groups(TopGroupsA, TopGroupsB, Limit, Sort) -> 109 MergedGroups0 = TopGroupsA ++ TopGroupsB, 110 GNs = lists:usort([N || #sortable{item={N,_}} <- MergedGroups0]), 111 MergedGroups = [merge_top_group(Sort, [S || #sortable{item={N,_}}=S <- MergedGroups0, N =:= GN]) || GN <- GNs], 112 lists:sublist(dreyfus_util:sort(Sort, MergedGroups), Limit). 113 114merge_top_group(_Sort, [Group]) -> 115 Group; 116merge_top_group(Sort, [_, _] = Groups) -> 117 hd(dreyfus_util:sort(Sort, Groups)). 118 119make_sortable(Shard, TopGroups) -> 120 [#sortable{item=G, order=Order, shard=Shard} || {_Name, Order}=G <- TopGroups]. 121 122remove_sortable(Sortables) -> 123 [Item || #sortable{item=Item} <- Sortables]. 124 125upgrade_state({state, Limit, Sort, TopGroups, Counters}) -> 126 #state{limit=Limit, sort=Sort, top_groups=TopGroups, counters=Counters, 127 replacements=[]}; 128upgrade_state(#state{}=State) -> 129 State. 130