1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(fabric_view_reduce). 14 15-export([go/7]). 16 17-include_lib("fabric/include/fabric.hrl"). 18-include_lib("mem3/include/mem3.hrl"). 19-include_lib("couch/include/couch_db.hrl"). 20-include_lib("couch_mrview/include/couch_mrview.hrl"). 21 22go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) -> 23 {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []), 24 go(DbName, DDoc, View, Args, Callback, Acc0, VInfo); 25 26go(Db, DDoc, VName, Args, Callback, Acc, VInfo) -> 27 DbName = fabric:dbname(Db), 28 {Shards, RingOpts} = fabric_view:get_shards(Db, Args), 29 {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args), 30 DocIdAndRev = fabric_util:doc_id_and_rev(DDoc), 31 RPCArgs = [DocIdAndRev, VName, WorkerArgs], 32 fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args), 33 Repls = fabric_ring:get_shard_replacements(DbName, Shards), 34 StartFun = fun(Shard) -> 35 hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs)) 36 end, 37 Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs), 38 RexiMon = fabric_util:create_monitors(Workers0), 39 try 40 case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls, 41 RingOpts) of 42 {ok, ddoc_updated} -> 43 Callback({error, ddoc_updated}, Acc); 44 {ok, Workers} -> 45 try 46 go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc) 47 after 48 fabric_streams:cleanup(Workers) 49 end; 50 {timeout, NewState} -> 51 DefunctWorkers = fabric_util:remove_done_workers( 52 NewState#stream_acc.workers, 53 waiting 54 ), 55 fabric_util:log_timeout( 56 DefunctWorkers, 57 "reduce_view" 58 ), 59 Callback({error, timeout}, Acc); 60 {error, Error} -> 61 Callback({error, Error}, Acc) 62 end 63 after 64 rexi_monitor:stop(RexiMon) 65 end. 66 67go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) -> 68 #mrargs{limit = Limit, skip = Skip, keys = Keys, update_seq = UpdateSeq} = Args, 69 RedSrc = couch_mrview_util:extract_view_reduce(VInfo), 70 OsProc = case os_proc_needed(RedSrc) of 71 true -> couch_query_servers:get_os_process(Lang); 72 _ -> nil 73 end, 74 State = #collector{ 75 db_name = DbName, 76 query_args = Args, 77 callback = Callback, 78 counters = fabric_dict:init(Workers, 0), 79 keys = Keys, 80 skip = Skip, 81 limit = Limit, 82 lang = Lang, 83 os_proc = OsProc, 84 reducer = RedSrc, 85 collation = couch_util:get_value(<<"collation">>, View#mrview.options), 86 rows = dict:new(), 87 user_acc = Acc0, 88 update_seq = case UpdateSeq of true -> []; false -> nil end 89 }, 90 try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, 91 State, fabric_util:view_timeout(Args), 1000 * 60 * 60) of 92 {ok, NewState} -> 93 {ok, NewState#collector.user_acc}; 94 {timeout, NewState} -> 95 Callback({error, timeout}, NewState#collector.user_acc); 96 {error, Resp} -> 97 {ok, Resp} 98 after 99 if OsProc == nil -> ok; true -> 100 catch couch_query_servers:ret_os_process(OsProc) 101 end 102 end. 103 104handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) -> 105 fabric_view:check_down_shards(State, NodeRef); 106 107handle_message({rexi_EXIT, Reason}, Worker, State) -> 108 fabric_view:handle_worker_exit(State, Worker, Reason); 109 110handle_message({meta, Meta0}, {Worker, From}, State) -> 111 Seq = couch_util:get_value(update_seq, Meta0, 0), 112 #collector{ 113 callback = Callback, 114 counters = Counters0, 115 user_acc = AccIn, 116 update_seq = UpdateSeq0 117 } = State, 118 % Assert that we don't have other messages from this 119 % worker when the total_and_offset message arrives. 120 0 = fabric_dict:lookup_element(Worker, Counters0), 121 rexi:stream_ack(From), 122 Counters1 = fabric_dict:update_counter(Worker, 1, Counters0), 123 UpdateSeq = case UpdateSeq0 of 124 nil -> nil; 125 _ -> [{Worker, Seq} | UpdateSeq0] 126 end, 127 case fabric_dict:any(0, Counters1) of 128 true -> 129 {ok, State#collector{ 130 counters = Counters1, 131 update_seq = UpdateSeq 132 }}; 133 false -> 134 Meta = case UpdateSeq of 135 nil -> 136 []; 137 _ -> 138 [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}] 139 end, 140 {Go, Acc} = Callback({meta, Meta}, AccIn), 141 {Go, State#collector{ 142 counters = fabric_dict:decrement_all(Counters1), 143 user_acc = Acc 144 }} 145 end; 146 147handle_message(#view_row{key=Key} = Row, {Worker, From}, State) -> 148 #collector{counters = Counters0, rows = Rows0} = State, 149 true = fabric_dict:is_key(Worker, Counters0), 150 Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0), 151 C1 = fabric_dict:update_counter(Worker, 1, Counters0), 152 State1 = State#collector{rows=Rows, counters=C1}, 153 fabric_view:maybe_send_row(State1); 154 155handle_message(complete, Worker, #collector{counters = Counters0} = State) -> 156 true = fabric_dict:is_key(Worker, Counters0), 157 C1 = fabric_dict:update_counter(Worker, 1, Counters0), 158 fabric_view:maybe_send_row(State#collector{counters = C1}); 159 160handle_message(ddoc_updated, _Worker, State) -> 161 {stop, State}. 162 163os_proc_needed(<<"_", _/binary>>) -> false; 164os_proc_needed(_) -> true. 165 166