1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(fabric_doc_update).
14
15-export([go/3]).
16
17-include_lib("fabric/include/fabric.hrl").
18-include_lib("mem3/include/mem3.hrl").
19-include_lib("couch/include/couch_db.hrl").
20
21go(_, [], _) ->
22    {ok, []};
23go(DbName, AllDocs0, Opts) ->
24    AllDocs1 = before_doc_update(DbName, AllDocs0, Opts),
25    AllDocs = tag_docs(AllDocs1),
26    validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)),
27    Options = lists:delete(all_or_nothing, Opts),
28    GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) ->
29        Docs1 = untag_docs(Docs),
30        Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}),
31        {Shard#shard{ref=Ref}, Docs}
32    end, group_docs_by_shard(DbName, AllDocs)),
33    {Workers, _} = lists:unzip(GroupedDocs),
34    RexiMon = fabric_util:create_monitors(Workers),
35    W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))),
36    Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs,
37        dict:new()},
38    Timeout = fabric_util:request_timeout(),
39    try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of
40    {ok, {Health, Results}}
41            when Health =:= ok; Health =:= accepted; Health =:= error ->
42        {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]};
43    {timeout, Acc} ->
44        {_, _, W1, GroupedDocs1, DocReplDict} = Acc,
45        {DefunctWorkers, _} = lists:unzip(GroupedDocs1),
46        fabric_util:log_timeout(DefunctWorkers, "update_docs"),
47        {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []},
48            DocReplDict),
49        {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]};
50    Else ->
51        Else
52    after
53        rexi_monitor:stop(RexiMon)
54    end.
55
56handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) ->
57    {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0,
58    NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef],
59    skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict});
60
61handle_message({rexi_EXIT, _}, Worker, Acc0) ->
62    {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
63    NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
64    skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
65handle_message(internal_server_error, Worker, Acc0) ->
66    % happens when we fail to load validation functions in an RPC worker
67    {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0,
68    NewGrpDocs = lists:keydelete(Worker,1,GrpDocs),
69    skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict});
70handle_message(attachment_chunk_received, _Worker, Acc0) ->
71    {ok, Acc0};
72handle_message({ok, Replies}, Worker, Acc0) ->
73    {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0,
74    {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs),
75    DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0),
76    case {WaitingCount, dict:size(DocReplyDict)} of
77    {1, _} ->
78        % last message has arrived, we need to conclude things
79        {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []},
80           DocReplyDict),
81        {stop, {Health, Reply}};
82    {_, DocCount} ->
83        % we've got at least one reply for each document, let's take a look
84        case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of
85        continue ->
86            {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}};
87        {stop, W, FinalReplies} ->
88            {stop, {ok, FinalReplies}}
89        end;
90    _ ->
91        {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}
92    end;
93handle_message({missing_stub, Stub}, _, _) ->
94    throw({missing_stub, Stub});
95handle_message({not_found, no_db_file} = X, Worker, Acc0) ->
96    {_, _, _, GroupedDocs, _} = Acc0,
97    Docs = couch_util:get_value(Worker, GroupedDocs),
98    handle_message({ok, [X || _D <- Docs]}, Worker, Acc0);
99handle_message({bad_request, Msg}, _, _) ->
100    throw({bad_request, Msg});
101handle_message({request_entity_too_large, Entity}, _, _) ->
102    throw({request_entity_too_large, Entity}).
103
104before_doc_update(DbName, Docs, Opts) ->
105    case {fabric_util:is_replicator_db(DbName), fabric_util:is_users_db(DbName)} of
106        {true, _} ->
107            %% cluster db is expensive to create so we only do it if we have to
108            Db = fabric_util:open_cluster_db(DbName, Opts),
109            [couch_replicator_docs:before_doc_update(Doc, Db, replicated_changes)
110                || Doc <- Docs];
111        {_, true} ->
112            %% cluster db is expensive to create so we only do it if we have to
113            Db = fabric_util:open_cluster_db(DbName, Opts),
114            [couch_users_db:before_doc_update(Doc, Db, interactive_edit)
115                || Doc <- Docs];
116        _ ->
117            Docs
118    end.
119
120tag_docs([]) ->
121    [];
122tag_docs([#doc{meta=Meta}=Doc | Rest]) ->
123    [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)].
124
125untag_docs([]) ->
126    [];
127untag_docs([#doc{meta=Meta}=Doc | Rest]) ->
128    [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)].
129
130force_reply(Doc, [], {_, W, Acc}) ->
131    {error, W, [{Doc, {error, internal_server_error}} | Acc]};
132force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) ->
133    case update_quorum_met(W, Replies) of
134    {true, Reply} ->
135        {Health, W, [{Doc,Reply} | Acc]};
136    false ->
137        case [Reply || {ok, Reply} <- Replies] of
138        [] ->
139            % check if all errors are identical, if so inherit health
140            case lists:all(fun(E) -> E =:= FirstReply end, Replies) of
141            true ->
142                CounterKey = [fabric, doc_update, errors],
143                couch_stats:increment_counter(CounterKey),
144                {Health, W, [{Doc, FirstReply} | Acc]};
145            false ->
146                CounterKey = [fabric, doc_update, mismatched_errors],
147                couch_stats:increment_counter(CounterKey),
148                {error, W, [{Doc, FirstReply} | Acc]}
149            end;
150        [AcceptedRev | _] ->
151            CounterKey = [fabric, doc_update, write_quorum_errors],
152            couch_stats:increment_counter(CounterKey),
153            NewHealth = case Health of ok -> accepted; _ -> Health end,
154            {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]}
155        end
156    end.
157
158maybe_reply(_, _, continue) ->
159    % we didn't meet quorum for all docs, so we're fast-forwarding the fold
160    continue;
161maybe_reply(Doc, Replies, {stop, W, Acc}) ->
162    case update_quorum_met(W, Replies) of
163    {true, Reply} ->
164        {stop, W, [{Doc, Reply} | Acc]};
165    false ->
166        continue
167    end.
168
169update_quorum_met(W, Replies) ->
170    Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end,
171        orddict:new(), Replies),
172    GoodReplies = lists:filter(fun good_reply/1, Counters),
173    case lists:dropwhile(fun({_, Count}) -> Count < W end, GoodReplies) of
174    [] ->
175        false;
176    [{FinalReply, _} | _] ->
177        {true, FinalReply}
178    end.
179
180good_reply({{ok, _}, _}) ->
181    true;
182good_reply({noreply, _}) ->
183    true;
184good_reply(_) ->
185    false.
186
187-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}].
188group_docs_by_shard(DbName, Docs) ->
189    dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) ->
190        lists:foldl(fun(Shard, D1) ->
191            dict:append(Shard, Doc, D1)
192        end, D0, mem3:shards(DbName,Id))
193    end, dict:new(), Docs)).
194
195append_update_replies([], [], DocReplyDict) ->
196    DocReplyDict;
197append_update_replies([Doc|Rest], [], Dict0) ->
198    % icky, if replicated_changes only errors show up in result
199    append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0));
200append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) ->
201    append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)).
202
203skip_message({0, _, W, _, DocReplyDict}) ->
204    {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict),
205    {stop, {Health, Reply}};
206skip_message(Acc0) ->
207    {ok, Acc0}.
208
209validate_atomic_update(_, _, false) ->
210    ok;
211validate_atomic_update(_DbName, AllDocs, true) ->
212    % TODO actually perform the validation.  This requires some hackery, we need
213    % to basically extract the prep_and_validate_updates function from couch_db
214    % and only run that, without actually writing in case of a success.
215    Error = {not_implemented, <<"all_or_nothing is not supported">>},
216    PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) ->
217        case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end,
218        {{Id, {Pos, RevId}}, Error}
219    end, AllDocs),
220    throw({aborted, PreCommitFailures}).
221
222
223-ifdef(TEST).
224-include_lib("eunit/include/eunit.hrl").
225
226
227setup_all() ->
228    meck:new([couch_log, couch_stats]),
229    meck:expect(couch_log, warning, fun(_,_) -> ok end),
230    meck:expect(couch_stats, increment_counter, fun(_) -> ok end).
231
232
233teardown_all(_) ->
234    meck:unload().
235
236
237doc_update_test_() ->
238    {
239        setup,
240        fun setup_all/0,
241        fun teardown_all/1,
242        [
243            fun doc_update1/0,
244            fun doc_update2/0,
245            fun doc_update3/0
246        ]
247    }.
248
249
250% eunits
251doc_update1() ->
252    Doc1 = #doc{revs = {1,[<<"foo">>]}},
253    Doc2 = #doc{revs = {1,[<<"bar">>]}},
254    Docs = [Doc1],
255    Docs2 = [Doc2, Doc1],
256    Dict = dict:from_list([{Doc,[]} || Doc <- Docs]),
257    Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]),
258
259    Shards =
260        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
261    GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
262
263
264    % test for W = 2
265    AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
266        Dict},
267
268    {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} =
269        handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2),
270    ?assertEqual(WaitingCountW2_1,2),
271    {stop, FinalReplyW2 } =
272        handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1),
273    ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2),
274
275    % test for W = 3
276    AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs,
277        Dict},
278
279    {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} =
280        handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3),
281    ?assertEqual(WaitingCountW3_1,2),
282
283    {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} =
284        handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1),
285    ?assertEqual(WaitingCountW3_2,1),
286
287    {stop, FinalReplyW3 } =
288        handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2),
289    ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3),
290
291    % test w quorum > # shards, which should fail immediately
292
293    Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]),
294    GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs),
295
296    AccW4 =
297        {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict},
298    Bool =
299    case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of
300        {stop, _Reply} ->
301            true;
302        _ -> false
303    end,
304    ?assertEqual(Bool,true),
305
306    % Docs with no replies should end up as {error, internal_server_error}
307    SA1 = #shard{node=a, range=1},
308    SB1 = #shard{node=b, range=1},
309    SA2 = #shard{node=a, range=2},
310    SB2 = #shard{node=b, range=2},
311    GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}],
312    StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2},
313    {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0),
314    {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1),
315    {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2),
316    {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3),
317    ?assertEqual(
318        {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]},
319        ReplyW5
320    ).
321
322doc_update2() ->
323    Doc1 = #doc{revs = {1,[<<"foo">>]}},
324    Doc2 = #doc{revs = {1,[<<"bar">>]}},
325    Docs = [Doc2, Doc1],
326    Shards =
327        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
328    GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
329    Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
330        dict:from_list([{Doc,[]} || Doc <- Docs])},
331
332    {ok,{WaitingCount1,_,_,_,_}=Acc1} =
333        handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
334    ?assertEqual(WaitingCount1,2),
335
336    {ok,{WaitingCount2,_,_,_,_}=Acc2} =
337        handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
338    ?assertEqual(WaitingCount2,1),
339
340    {stop, Reply} =
341        handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2),
342
343    ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]},
344        Reply).
345
346doc_update3() ->
347    Doc1 = #doc{revs = {1,[<<"foo">>]}},
348    Doc2 = #doc{revs = {1,[<<"bar">>]}},
349    Docs = [Doc2, Doc1],
350    Shards =
351        mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]),
352    GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs),
353    Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs,
354        dict:from_list([{Doc,[]} || Doc <- Docs])},
355
356    {ok,{WaitingCount1,_,_,_,_}=Acc1} =
357        handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0),
358    ?assertEqual(WaitingCount1,2),
359
360    {ok,{WaitingCount2,_,_,_,_}=Acc2} =
361        handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1),
362    ?assertEqual(WaitingCount2,1),
363
364    {stop, Reply} =
365        handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2),
366
367    ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply).
368
369% needed for testing to avoid having to start the mem3 application
370group_docs_by_shard_hack(_DbName, Shards, Docs) ->
371    dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) ->
372        lists:foldl(fun(Shard, D1) ->
373            dict:append(Shard, Doc, D1)
374        end, D0, Shards)
375    end, dict:new(), Docs)).
376
377-endif.
378