1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(fabric_view_reduce).
14
15-export([go/7]).
16
17-include_lib("fabric/include/fabric.hrl").
18-include_lib("mem3/include/mem3.hrl").
19-include_lib("couch/include/couch_db.hrl").
20-include_lib("couch_mrview/include/couch_mrview.hrl").
21
22go(DbName, GroupId, View, Args, Callback, Acc0, VInfo) when is_binary(GroupId) ->
23    {ok, DDoc} = fabric:open_doc(DbName, <<"_design/", GroupId/binary>>, []),
24    go(DbName, DDoc, View, Args, Callback, Acc0, VInfo);
25
26go(Db, DDoc, VName, Args, Callback, Acc, VInfo) ->
27    DbName = fabric:dbname(Db),
28    {Shards, RingOpts} = fabric_view:get_shards(Db, Args),
29    {CoordArgs, WorkerArgs} = fabric_view:fix_skip_and_limit(Args),
30    DocIdAndRev = fabric_util:doc_id_and_rev(DDoc),
31    RPCArgs = [DocIdAndRev, VName, WorkerArgs],
32    fabric_view:maybe_update_others(DbName, DocIdAndRev, Shards, VName, Args),
33    Repls = fabric_ring:get_shard_replacements(DbName, Shards),
34    StartFun = fun(Shard) ->
35        hd(fabric_util:submit_jobs([Shard], fabric_rpc, reduce_view, RPCArgs))
36    end,
37    Workers0 = fabric_util:submit_jobs(Shards,fabric_rpc,reduce_view,RPCArgs),
38    RexiMon = fabric_util:create_monitors(Workers0),
39    try
40        case fabric_streams:start(Workers0, #shard.ref, StartFun, Repls,
41                RingOpts) of
42            {ok, ddoc_updated} ->
43                Callback({error, ddoc_updated}, Acc);
44            {ok, Workers} ->
45                try
46                    go2(DbName, Workers, VInfo, CoordArgs, Callback, Acc)
47                after
48                    fabric_streams:cleanup(Workers)
49                end;
50            {timeout, NewState} ->
51                DefunctWorkers = fabric_util:remove_done_workers(
52                    NewState#stream_acc.workers,
53                    waiting
54                ),
55                fabric_util:log_timeout(
56                    DefunctWorkers,
57                    "reduce_view"
58                ),
59                Callback({error, timeout}, Acc);
60            {error, Error} ->
61                Callback({error, Error}, Acc)
62        end
63    after
64        rexi_monitor:stop(RexiMon)
65    end.
66
67go2(DbName, Workers, {red, {_, Lang, View}, _}=VInfo, Args, Callback, Acc0) ->
68    #mrargs{limit = Limit, skip = Skip, keys = Keys, update_seq = UpdateSeq} = Args,
69    RedSrc = couch_mrview_util:extract_view_reduce(VInfo),
70    OsProc = case os_proc_needed(RedSrc) of
71        true -> couch_query_servers:get_os_process(Lang);
72        _ -> nil
73    end,
74    State = #collector{
75        db_name = DbName,
76        query_args = Args,
77        callback = Callback,
78        counters = fabric_dict:init(Workers, 0),
79        keys = Keys,
80        skip = Skip,
81        limit = Limit,
82        lang = Lang,
83        os_proc = OsProc,
84        reducer = RedSrc,
85        collation = couch_util:get_value(<<"collation">>, View#mrview.options),
86        rows = dict:new(),
87        user_acc = Acc0,
88        update_seq = case UpdateSeq of true -> []; false -> nil end
89    },
90    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3,
91        State, fabric_util:view_timeout(Args), 1000 * 60 * 60) of
92    {ok, NewState} ->
93        {ok, NewState#collector.user_acc};
94    {timeout, NewState} ->
95        Callback({error, timeout}, NewState#collector.user_acc);
96    {error, Resp} ->
97        {ok, Resp}
98    after
99        if OsProc == nil -> ok; true ->
100            catch couch_query_servers:ret_os_process(OsProc)
101        end
102    end.
103
104handle_message({rexi_DOWN, _, {_, NodeRef}, _}, _, State) ->
105    fabric_view:check_down_shards(State, NodeRef);
106
107handle_message({rexi_EXIT, Reason}, Worker, State) ->
108    fabric_view:handle_worker_exit(State, Worker, Reason);
109
110handle_message({meta, Meta0}, {Worker, From}, State) ->
111    Seq = couch_util:get_value(update_seq, Meta0, 0),
112    #collector{
113        callback = Callback,
114        counters = Counters0,
115        user_acc = AccIn,
116        update_seq = UpdateSeq0
117    } = State,
118    % Assert that we don't have other messages from this
119    % worker when the total_and_offset message arrives.
120    0 = fabric_dict:lookup_element(Worker, Counters0),
121    rexi:stream_ack(From),
122    Counters1 = fabric_dict:update_counter(Worker, 1, Counters0),
123    UpdateSeq = case UpdateSeq0 of
124        nil -> nil;
125        _   -> [{Worker, Seq} | UpdateSeq0]
126    end,
127    case fabric_dict:any(0, Counters1) of
128    true ->
129        {ok, State#collector{
130            counters = Counters1,
131            update_seq = UpdateSeq
132        }};
133    false ->
134        Meta = case UpdateSeq of
135            nil ->
136                [];
137            _ ->
138                [{update_seq, fabric_view_changes:pack_seqs(UpdateSeq)}]
139        end,
140        {Go, Acc} = Callback({meta, Meta}, AccIn),
141        {Go, State#collector{
142            counters = fabric_dict:decrement_all(Counters1),
143            user_acc = Acc
144        }}
145    end;
146
147handle_message(#view_row{key=Key} = Row, {Worker, From}, State) ->
148    #collector{counters = Counters0, rows = Rows0} = State,
149    true = fabric_dict:is_key(Worker, Counters0),
150    Rows = dict:append(Key, Row#view_row{worker={Worker, From}}, Rows0),
151    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
152    State1 = State#collector{rows=Rows, counters=C1},
153    fabric_view:maybe_send_row(State1);
154
155handle_message(complete, Worker, #collector{counters = Counters0} = State) ->
156    true = fabric_dict:is_key(Worker, Counters0),
157    C1 = fabric_dict:update_counter(Worker, 1, Counters0),
158    fabric_view:maybe_send_row(State#collector{counters = C1});
159
160handle_message(ddoc_updated, _Worker, State) ->
161    {stop, State}.
162
163os_proc_needed(<<"_", _/binary>>) -> false;
164os_proc_needed(_) -> true.
165
166