1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13 14%% -*- erlang-indent-level: 4;indent-tabs-mode: nil -*- 15 16-module(dreyfus_fabric_group2). 17 18-include("dreyfus.hrl"). 19-include_lib("mem3/include/mem3.hrl"). 20-include_lib("couch/include/couch_db.hrl"). 21 22-export([go/4]). 23 24-record(state, { 25 limit, 26 sort, 27 total_hits, 28 total_grouped_hits, 29 top_groups, 30 counters, 31 start_args, 32 replacements, 33 ring_opts 34}). 35 36go(DbName, GroupId, IndexName, QueryArgs) when is_binary(GroupId) -> 37 {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), 38 dreyfus_util:maybe_deny_index(DbName, GroupId, IndexName), 39 go(DbName, DDoc, IndexName, QueryArgs); 40 41go(DbName, DDoc, IndexName, #index_query_args{}=QueryArgs) -> 42 DesignName = dreyfus_util:get_design_docid(DDoc), 43 dreyfus_util:maybe_deny_index(DbName, DesignName, IndexName), 44 Shards = dreyfus_util:get_shards(DbName, QueryArgs), 45 RingOpts = dreyfus_util:get_ring_opts(QueryArgs, Shards), 46 Workers = fabric_util:submit_jobs(Shards, dreyfus_rpc, group2, 47 [DDoc, IndexName, dreyfus_util:export(QueryArgs)]), 48 Replacements = fabric_view:get_shard_replacements(DbName, Workers), 49 Counters = fabric_dict:init(Workers, nil), 50 RexiMon = fabric_util:create_monitors(Workers), 51 State = #state{ 52 limit = QueryArgs#index_query_args.limit, 53 sort = QueryArgs#index_query_args.sort, 54 total_hits = 0, 55 total_grouped_hits = 0, 56 top_groups = [], 57 counters = Counters, 58 start_args = [DDoc, IndexName, QueryArgs], 59 replacements = Replacements, 60 ring_opts = RingOpts 61 }, 62 try 63 rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, 64 State, infinity, 1000 * 60 * 60) 65 after 66 rexi_monitor:stop(RexiMon), 67 fabric_util:cleanup(Workers) 68 end; 69go(DbName, DDoc, IndexName, OldArgs) -> 70 go(DbName, DDoc, IndexName, dreyfus_util:upgrade(OldArgs)). 71 72 73handle_message({ok, NewTotalHits, NewTotalGroupedHits, NewTopGroups}, 74 Shard, State0) -> 75 State = upgrade_state(State0), 76 #state{total_hits=TotalHits, total_grouped_hits=TotalGroupedHits, 77 top_groups=TopGroups, limit=Limit, sort=Sort} = State, 78 case fabric_dict:lookup_element(Shard, State#state.counters) of 79 undefined -> 80 %% already heard from someone else in this range 81 {ok, State}; 82 nil -> 83 C1 = fabric_dict:store(Shard, ok, State#state.counters), 84 C2 = fabric_view:remove_overlapping_shards(Shard, C1), 85 MergedTotalHits = NewTotalHits + TotalHits, 86 MergedTotalGroupedHits = NewTotalGroupedHits + TotalGroupedHits, 87 Sortable = make_sortable(Shard, NewTopGroups), 88 MergedTopGroups = merge_top_groups(TopGroups, Sortable, Limit, Sort), 89 State1 = State#state{ 90 counters=C2, 91 total_hits=MergedTotalHits, 92 total_grouped_hits=MergedTotalGroupedHits, 93 top_groups=MergedTopGroups 94 }, 95 case fabric_dict:any(nil, C2) of 96 true -> 97 {ok, State1}; 98 false -> 99 {stop, {MergedTotalHits, MergedTotalGroupedHits, 100 remove_sortable(MergedTopGroups)}} 101 end 102 end; 103 104handle_message(Error, Worker, State0) -> 105 State = upgrade_state(State0), 106 case dreyfus_fabric:handle_error_message(Error, Worker, 107 State#state.counters, State#state.replacements, 108 group2, State#state.start_args, State#state.ring_opts) of 109 {ok, Counters} -> 110 {ok, State#state{counters=Counters}}; 111 {new_refs, NewRefs, NewCounters, NewReplacements} -> 112 NewState = State#state{ 113 counters = NewCounters, 114 replacements = NewReplacements 115 }, 116 {new_refs, NewRefs, NewState}; 117 Else -> 118 Else 119 end. 120 121merge_top_groups([], TopGroups, _Limit, _Sort) -> 122 TopGroups; 123merge_top_groups(TopGroupsA, TopGroupsB, Limit, Sort) -> 124 lists:zipwith(fun(A,B) -> merge_top_group(A, B, Limit, Sort) end, 125 TopGroupsA, 126 TopGroupsB). 127 128merge_top_group({Name, TotalA, HitsA}, {Name, TotalB, HitsB}, Limit, Sort) -> 129 MergedHits = lists:sublist(dreyfus_util:sort(Sort, HitsA ++ HitsB), Limit), 130 {Name, TotalA + TotalB, MergedHits}. 131 132 133make_sortable(Shard, TopGroups) -> 134 [make_sortable_group(Shard, TopGroup) || TopGroup <- TopGroups]. 135 136make_sortable_group(Shard, {Name, TotalHits, Hits}) -> 137 {Name, TotalHits, [make_sortable_hit(Shard, Hit) || Hit <- Hits]}. 138 139make_sortable_hit(Shard, Hit) -> 140 #sortable{item=Hit, order=Hit#hit.order, shard=Shard}. 141 142remove_sortable(SortableGroups) -> 143 [remove_sortable_group(G) || G <- SortableGroups]. 144 145remove_sortable_group({Name, TotalHits, SortableHits}) -> 146 {Name, TotalHits, [remove_sortable_hit(H) || H <- SortableHits]}. 147 148remove_sortable_hit(SortableHit) -> 149 SortableHit#sortable.item. 150 151upgrade_state({state, Limit, Sort, TotalHits, TotalGroupedHits, 152 TopGroups, Counters}) -> 153 #state{limit = Limit, sort = Sort, total_hits = TotalHits, 154 total_grouped_hits = TotalGroupedHits, 155 top_groups = TopGroups, counters = Counters, 156 replacements = []}; 157upgrade_state(#state{} = State) -> 158 State. 159