1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(fabric_doc_update). 14 15-export([go/3]). 16 17-include_lib("fabric/include/fabric.hrl"). 18-include_lib("mem3/include/mem3.hrl"). 19-include_lib("couch/include/couch_db.hrl"). 20 21go(_, [], _) -> 22 {ok, []}; 23go(DbName, AllDocs0, Opts) -> 24 AllDocs1 = before_doc_update(DbName, AllDocs0, Opts), 25 AllDocs = tag_docs(AllDocs1), 26 validate_atomic_update(DbName, AllDocs, lists:member(all_or_nothing, Opts)), 27 Options = lists:delete(all_or_nothing, Opts), 28 GroupedDocs = lists:map(fun({#shard{name=Name, node=Node} = Shard, Docs}) -> 29 Docs1 = untag_docs(Docs), 30 Ref = rexi:cast(Node, {fabric_rpc, update_docs, [Name,Docs1,Options]}), 31 {Shard#shard{ref=Ref}, Docs} 32 end, group_docs_by_shard(DbName, AllDocs)), 33 {Workers, _} = lists:unzip(GroupedDocs), 34 RexiMon = fabric_util:create_monitors(Workers), 35 W = couch_util:get_value(w, Options, integer_to_list(mem3:quorum(DbName))), 36 Acc0 = {length(Workers), length(AllDocs), list_to_integer(W), GroupedDocs, 37 dict:new()}, 38 Timeout = fabric_util:request_timeout(), 39 try rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, Acc0, infinity, Timeout) of 40 {ok, {Health, Results}} 41 when Health =:= ok; Health =:= accepted; Health =:= error -> 42 {Health, [R || R <- couch_util:reorder_results(AllDocs, Results), R =/= noreply]}; 43 {timeout, Acc} -> 44 {_, _, W1, GroupedDocs1, DocReplDict} = Acc, 45 {DefunctWorkers, _} = lists:unzip(GroupedDocs1), 46 fabric_util:log_timeout(DefunctWorkers, "update_docs"), 47 {Health, _, Resp} = dict:fold(fun force_reply/3, {ok, W1, []}, 48 DocReplDict), 49 {Health, [R || R <- couch_util:reorder_results(AllDocs, Resp), R =/= noreply]}; 50 Else -> 51 Else 52 after 53 rexi_monitor:stop(RexiMon) 54 end. 55 56handle_message({rexi_DOWN, _, {_,NodeRef},_}, _Worker, Acc0) -> 57 {_, LenDocs, W, GroupedDocs, DocReplyDict} = Acc0, 58 NewGrpDocs = [X || {#shard{node=N}, _} = X <- GroupedDocs, N =/= NodeRef], 59 skip_message({length(NewGrpDocs), LenDocs, W, NewGrpDocs, DocReplyDict}); 60 61handle_message({rexi_EXIT, _}, Worker, Acc0) -> 62 {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, 63 NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), 64 skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); 65handle_message(internal_server_error, Worker, Acc0) -> 66 % happens when we fail to load validation functions in an RPC worker 67 {WC,LenDocs,W,GrpDocs,DocReplyDict} = Acc0, 68 NewGrpDocs = lists:keydelete(Worker,1,GrpDocs), 69 skip_message({WC-1,LenDocs,W,NewGrpDocs,DocReplyDict}); 70handle_message(attachment_chunk_received, _Worker, Acc0) -> 71 {ok, Acc0}; 72handle_message({ok, Replies}, Worker, Acc0) -> 73 {WaitingCount, DocCount, W, GroupedDocs, DocReplyDict0} = Acc0, 74 {value, {_, Docs}, NewGrpDocs} = lists:keytake(Worker, 1, GroupedDocs), 75 DocReplyDict = append_update_replies(Docs, Replies, DocReplyDict0), 76 case {WaitingCount, dict:size(DocReplyDict)} of 77 {1, _} -> 78 % last message has arrived, we need to conclude things 79 {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, 80 DocReplyDict), 81 {stop, {Health, Reply}}; 82 {_, DocCount} -> 83 % we've got at least one reply for each document, let's take a look 84 case dict:fold(fun maybe_reply/3, {stop,W,[]}, DocReplyDict) of 85 continue -> 86 {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}}; 87 {stop, W, FinalReplies} -> 88 {stop, {ok, FinalReplies}} 89 end; 90 _ -> 91 {ok, {WaitingCount - 1, DocCount, W, NewGrpDocs, DocReplyDict}} 92 end; 93handle_message({missing_stub, Stub}, _, _) -> 94 throw({missing_stub, Stub}); 95handle_message({not_found, no_db_file} = X, Worker, Acc0) -> 96 {_, _, _, GroupedDocs, _} = Acc0, 97 Docs = couch_util:get_value(Worker, GroupedDocs), 98 handle_message({ok, [X || _D <- Docs]}, Worker, Acc0); 99handle_message({bad_request, Msg}, _, _) -> 100 throw({bad_request, Msg}); 101handle_message({request_entity_too_large, Entity}, _, _) -> 102 throw({request_entity_too_large, Entity}). 103 104before_doc_update(DbName, Docs, Opts) -> 105 case {fabric_util:is_replicator_db(DbName), fabric_util:is_users_db(DbName)} of 106 {true, _} -> 107 %% cluster db is expensive to create so we only do it if we have to 108 Db = fabric_util:open_cluster_db(DbName, Opts), 109 [couch_replicator_docs:before_doc_update(Doc, Db, replicated_changes) 110 || Doc <- Docs]; 111 {_, true} -> 112 %% cluster db is expensive to create so we only do it if we have to 113 Db = fabric_util:open_cluster_db(DbName, Opts), 114 [couch_users_db:before_doc_update(Doc, Db, interactive_edit) 115 || Doc <- Docs]; 116 _ -> 117 Docs 118 end. 119 120tag_docs([]) -> 121 []; 122tag_docs([#doc{meta=Meta}=Doc | Rest]) -> 123 [Doc#doc{meta=[{ref, make_ref()} | Meta]} | tag_docs(Rest)]. 124 125untag_docs([]) -> 126 []; 127untag_docs([#doc{meta=Meta}=Doc | Rest]) -> 128 [Doc#doc{meta=lists:keydelete(ref, 1, Meta)} | untag_docs(Rest)]. 129 130force_reply(Doc, [], {_, W, Acc}) -> 131 {error, W, [{Doc, {error, internal_server_error}} | Acc]}; 132force_reply(Doc, [FirstReply|_] = Replies, {Health, W, Acc}) -> 133 case update_quorum_met(W, Replies) of 134 {true, Reply} -> 135 {Health, W, [{Doc,Reply} | Acc]}; 136 false -> 137 case [Reply || {ok, Reply} <- Replies] of 138 [] -> 139 % check if all errors are identical, if so inherit health 140 case lists:all(fun(E) -> E =:= FirstReply end, Replies) of 141 true -> 142 CounterKey = [fabric, doc_update, errors], 143 couch_stats:increment_counter(CounterKey), 144 {Health, W, [{Doc, FirstReply} | Acc]}; 145 false -> 146 CounterKey = [fabric, doc_update, mismatched_errors], 147 couch_stats:increment_counter(CounterKey), 148 {error, W, [{Doc, FirstReply} | Acc]} 149 end; 150 [AcceptedRev | _] -> 151 CounterKey = [fabric, doc_update, write_quorum_errors], 152 couch_stats:increment_counter(CounterKey), 153 NewHealth = case Health of ok -> accepted; _ -> Health end, 154 {NewHealth, W, [{Doc, {accepted,AcceptedRev}} | Acc]} 155 end 156 end. 157 158maybe_reply(_, _, continue) -> 159 % we didn't meet quorum for all docs, so we're fast-forwarding the fold 160 continue; 161maybe_reply(Doc, Replies, {stop, W, Acc}) -> 162 case update_quorum_met(W, Replies) of 163 {true, Reply} -> 164 {stop, W, [{Doc, Reply} | Acc]}; 165 false -> 166 continue 167 end. 168 169update_quorum_met(W, Replies) -> 170 Counters = lists:foldl(fun(R,D) -> orddict:update_counter(R,1,D) end, 171 orddict:new(), Replies), 172 GoodReplies = lists:filter(fun good_reply/1, Counters), 173 case lists:dropwhile(fun({_, Count}) -> Count < W end, GoodReplies) of 174 [] -> 175 false; 176 [{FinalReply, _} | _] -> 177 {true, FinalReply} 178 end. 179 180good_reply({{ok, _}, _}) -> 181 true; 182good_reply({noreply, _}) -> 183 true; 184good_reply(_) -> 185 false. 186 187-spec group_docs_by_shard(binary(), [#doc{}]) -> [{#shard{}, [#doc{}]}]. 188group_docs_by_shard(DbName, Docs) -> 189 dict:to_list(lists:foldl(fun(#doc{id=Id} = Doc, D0) -> 190 lists:foldl(fun(Shard, D1) -> 191 dict:append(Shard, Doc, D1) 192 end, D0, mem3:shards(DbName,Id)) 193 end, dict:new(), Docs)). 194 195append_update_replies([], [], DocReplyDict) -> 196 DocReplyDict; 197append_update_replies([Doc|Rest], [], Dict0) -> 198 % icky, if replicated_changes only errors show up in result 199 append_update_replies(Rest, [], dict:append(Doc, noreply, Dict0)); 200append_update_replies([Doc|Rest1], [Reply|Rest2], Dict0) -> 201 append_update_replies(Rest1, Rest2, dict:append(Doc, Reply, Dict0)). 202 203skip_message({0, _, W, _, DocReplyDict}) -> 204 {Health, W, Reply} = dict:fold(fun force_reply/3, {ok, W, []}, DocReplyDict), 205 {stop, {Health, Reply}}; 206skip_message(Acc0) -> 207 {ok, Acc0}. 208 209validate_atomic_update(_, _, false) -> 210 ok; 211validate_atomic_update(_DbName, AllDocs, true) -> 212 % TODO actually perform the validation. This requires some hackery, we need 213 % to basically extract the prep_and_validate_updates function from couch_db 214 % and only run that, without actually writing in case of a success. 215 Error = {not_implemented, <<"all_or_nothing is not supported">>}, 216 PreCommitFailures = lists:map(fun(#doc{id=Id, revs = {Pos,Revs}}) -> 217 case Revs of [] -> RevId = <<>>; [RevId|_] -> ok end, 218 {{Id, {Pos, RevId}}, Error} 219 end, AllDocs), 220 throw({aborted, PreCommitFailures}). 221 222 223-ifdef(TEST). 224-include_lib("eunit/include/eunit.hrl"). 225 226 227setup_all() -> 228 meck:new([couch_log, couch_stats]), 229 meck:expect(couch_log, warning, fun(_,_) -> ok end), 230 meck:expect(couch_stats, increment_counter, fun(_) -> ok end). 231 232 233teardown_all(_) -> 234 meck:unload(). 235 236 237doc_update_test_() -> 238 { 239 setup, 240 fun setup_all/0, 241 fun teardown_all/1, 242 [ 243 fun doc_update1/0, 244 fun doc_update2/0, 245 fun doc_update3/0 246 ] 247 }. 248 249 250% eunits 251doc_update1() -> 252 Doc1 = #doc{revs = {1,[<<"foo">>]}}, 253 Doc2 = #doc{revs = {1,[<<"bar">>]}}, 254 Docs = [Doc1], 255 Docs2 = [Doc2, Doc1], 256 Dict = dict:from_list([{Doc,[]} || Doc <- Docs]), 257 Dict2 = dict:from_list([{Doc,[]} || Doc <- Docs2]), 258 259 Shards = 260 mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), 261 GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), 262 263 264 % test for W = 2 265 AccW2 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, 266 Dict}, 267 268 {ok,{WaitingCountW2_1,_,_,_,_}=AccW2_1} = 269 handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW2), 270 ?assertEqual(WaitingCountW2_1,2), 271 {stop, FinalReplyW2 } = 272 handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW2_1), 273 ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW2), 274 275 % test for W = 3 276 AccW3 = {length(Shards), length(Docs), list_to_integer("3"), GroupedDocs, 277 Dict}, 278 279 {ok,{WaitingCountW3_1,_,_,_,_}=AccW3_1} = 280 handle_message({ok, [{ok, Doc1}]},hd(Shards),AccW3), 281 ?assertEqual(WaitingCountW3_1,2), 282 283 {ok,{WaitingCountW3_2,_,_,_,_}=AccW3_2} = 284 handle_message({ok, [{ok, Doc1}]},lists:nth(2,Shards),AccW3_1), 285 ?assertEqual(WaitingCountW3_2,1), 286 287 {stop, FinalReplyW3 } = 288 handle_message({ok, [{ok, Doc1}]},lists:nth(3,Shards),AccW3_2), 289 ?assertEqual({ok, [{Doc1, {ok,Doc1}}]},FinalReplyW3), 290 291 % test w quorum > # shards, which should fail immediately 292 293 Shards2 = mem3_util:create_partition_map("foo",1,1,["node1"]), 294 GroupedDocs2 = group_docs_by_shard_hack(<<"foo">>,Shards2,Docs), 295 296 AccW4 = 297 {length(Shards2), length(Docs), list_to_integer("2"), GroupedDocs2, Dict}, 298 Bool = 299 case handle_message({ok, [{ok, Doc1}]},hd(Shards2),AccW4) of 300 {stop, _Reply} -> 301 true; 302 _ -> false 303 end, 304 ?assertEqual(Bool,true), 305 306 % Docs with no replies should end up as {error, internal_server_error} 307 SA1 = #shard{node=a, range=1}, 308 SB1 = #shard{node=b, range=1}, 309 SA2 = #shard{node=a, range=2}, 310 SB2 = #shard{node=b, range=2}, 311 GroupedDocs3 = [{SA1,[Doc1]}, {SB1,[Doc1]}, {SA2,[Doc2]}, {SB2,[Doc2]}], 312 StW5_0 = {length(GroupedDocs3), length(Docs2), 2, GroupedDocs3, Dict2}, 313 {ok, StW5_1} = handle_message({ok, [{ok, "A"}]}, SA1, StW5_0), 314 {ok, StW5_2} = handle_message({rexi_EXIT, nil}, SB1, StW5_1), 315 {ok, StW5_3} = handle_message({rexi_EXIT, nil}, SA2, StW5_2), 316 {stop, ReplyW5} = handle_message({rexi_EXIT, nil}, SB2, StW5_3), 317 ?assertEqual( 318 {error, [{Doc1,{accepted,"A"}},{Doc2,{error,internal_server_error}}]}, 319 ReplyW5 320 ). 321 322doc_update2() -> 323 Doc1 = #doc{revs = {1,[<<"foo">>]}}, 324 Doc2 = #doc{revs = {1,[<<"bar">>]}}, 325 Docs = [Doc2, Doc1], 326 Shards = 327 mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), 328 GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), 329 Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, 330 dict:from_list([{Doc,[]} || Doc <- Docs])}, 331 332 {ok,{WaitingCount1,_,_,_,_}=Acc1} = 333 handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), 334 ?assertEqual(WaitingCount1,2), 335 336 {ok,{WaitingCount2,_,_,_,_}=Acc2} = 337 handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), 338 ?assertEqual(WaitingCount2,1), 339 340 {stop, Reply} = 341 handle_message({rexi_EXIT, 1},lists:nth(3,Shards),Acc2), 342 343 ?assertEqual({accepted, [{Doc1,{accepted,Doc2}}, {Doc2,{accepted,Doc1}}]}, 344 Reply). 345 346doc_update3() -> 347 Doc1 = #doc{revs = {1,[<<"foo">>]}}, 348 Doc2 = #doc{revs = {1,[<<"bar">>]}}, 349 Docs = [Doc2, Doc1], 350 Shards = 351 mem3_util:create_partition_map("foo",3,1,["node1","node2","node3"]), 352 GroupedDocs = group_docs_by_shard_hack(<<"foo">>,Shards,Docs), 353 Acc0 = {length(Shards), length(Docs), list_to_integer("2"), GroupedDocs, 354 dict:from_list([{Doc,[]} || Doc <- Docs])}, 355 356 {ok,{WaitingCount1,_,_,_,_}=Acc1} = 357 handle_message({ok, [{ok, Doc1},{ok, Doc2}]},hd(Shards),Acc0), 358 ?assertEqual(WaitingCount1,2), 359 360 {ok,{WaitingCount2,_,_,_,_}=Acc2} = 361 handle_message({rexi_EXIT, 1},lists:nth(2,Shards),Acc1), 362 ?assertEqual(WaitingCount2,1), 363 364 {stop, Reply} = 365 handle_message({ok, [{ok, Doc1},{ok, Doc2}]},lists:nth(3,Shards),Acc2), 366 367 ?assertEqual({ok, [{Doc1, {ok, Doc2}},{Doc2, {ok,Doc1}}]},Reply). 368 369% needed for testing to avoid having to start the mem3 application 370group_docs_by_shard_hack(_DbName, Shards, Docs) -> 371 dict:to_list(lists:foldl(fun(#doc{id=_Id} = Doc, D0) -> 372 lists:foldl(fun(Shard, D1) -> 373 dict:append(Shard, Doc, D1) 374 end, D0, Shards) 375 end, dict:new(), Docs)). 376 377-endif. 378