1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(couch_index). 14-behaviour(gen_server). 15 16-compile(tuple_calls). 17 18-vsn(3). 19 20%% API 21-export([start_link/1, stop/1, get_state/2, get_info/1]). 22-export([trigger_update/2]). 23-export([compact/1, compact/2, get_compactor_pid/1]). 24 25%% gen_server callbacks 26-export([init/1, terminate/2, code_change/3]). 27-export([handle_call/3, handle_cast/2, handle_info/2]). 28 29 30-include_lib("couch/include/couch_db.hrl"). 31 32 33-define(CHECK_INTERVAL, 600000). % 10 minutes 34 35-record(st, { 36 mod, 37 idx_state, 38 updater, 39 compactor, 40 waiters=[], 41 committed=true, 42 shutdown=false 43}). 44 45 46start_link({Module0, IdxState0}) -> 47 [Module, IdxState] = couch_index_plugin:before_open(Module0, IdxState0), 48 proc_lib:start_link(?MODULE, init, [{Module, IdxState}]). 49 50 51stop(Pid) -> 52 gen_server:cast(Pid, stop). 53 54 55get_state(Pid, RequestSeq) -> 56 gen_server:call(Pid, {get_state, RequestSeq}, infinity). 57 58 59get_info(Pid) -> 60 gen_server:call(Pid, get_info, group_info_timeout_msec()). 61 62 63trigger_update(Pid, UpdateSeq) -> 64 gen_server:cast(Pid, {trigger_update, UpdateSeq}). 65 66 67compact(Pid) -> 68 compact(Pid, []). 69 70 71compact(Pid, Options) -> 72 {ok, CPid} = gen_server:call(Pid, compact), 73 case lists:member(monitor, Options) of 74 true -> {ok, erlang:monitor(process, CPid)}; 75 false -> ok 76 end. 77 78 79get_compactor_pid(Pid) -> 80 gen_server:call(Pid, get_compactor_pid). 81 82init({Mod, IdxState}) -> 83 DbName = Mod:get(db_name, IdxState), 84 erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), 85 Resp = couch_util:with_db(DbName, fun(Db) -> 86 case Mod:open(Db, IdxState) of 87 {ok, IdxSt} -> 88 couch_db:monitor(Db), 89 {ok, IdxSt}; 90 Error -> 91 Error 92 end 93 end), 94 case Resp of 95 {ok, NewIdxState} -> 96 {ok, UPid} = couch_index_updater:start_link(self(), Mod), 97 {ok, CPid} = couch_index_compactor:start_link(self(), Mod), 98 State = #st{ 99 mod=Mod, 100 idx_state=NewIdxState, 101 updater=UPid, 102 compactor=CPid 103 }, 104 Args = [ 105 Mod:get(db_name, IdxState), 106 Mod:get(idx_name, IdxState), 107 couch_index_util:hexsig(Mod:get(signature, IdxState)) 108 ], 109 couch_log:debug("Opening index for db: ~s idx: ~s sig: ~p", Args), 110 proc_lib:init_ack({ok, self()}), 111 gen_server:enter_loop(?MODULE, [], State); 112 Other -> 113 proc_lib:init_ack(Other) 114 end. 115 116 117terminate(Reason0, State) -> 118 #st{mod=Mod, idx_state=IdxState}=State, 119 case Reason0 of 120 {shutdown, ddoc_updated} -> 121 Mod:shutdown(IdxState), 122 Reason = ddoc_updated; 123 _ -> 124 Mod:close(IdxState), 125 Reason = Reason0 126 end, 127 send_all(State#st.waiters, Reason), 128 couch_util:shutdown_sync(State#st.updater), 129 couch_util:shutdown_sync(State#st.compactor), 130 Args = [ 131 Mod:get(db_name, IdxState), 132 Mod:get(idx_name, IdxState), 133 couch_index_util:hexsig(Mod:get(signature, IdxState)), 134 Reason 135 ], 136 couch_log:debug("Closing index for db: ~s idx: ~s sig: ~p because ~r", Args), 137 ok. 138 139 140handle_call({get_state, ReqSeq}, From, State) -> 141 #st{ 142 mod=Mod, 143 idx_state=IdxState, 144 waiters=Waiters 145 } = State, 146 IdxSeq = Mod:get(update_seq, IdxState), 147 case ReqSeq =< IdxSeq of 148 true -> 149 {reply, {ok, IdxState}, State}; 150 _ -> % View update required 151 couch_index_updater:run(State#st.updater, IdxState), 152 Waiters2 = [{From, ReqSeq} | Waiters], 153 {noreply, State#st{waiters=Waiters2}, infinity} 154 end; 155handle_call(get_info, _From, State) -> 156 #st{mod=Mod} = State, 157 IdxState = State#st.idx_state, 158 {ok, Info0} = Mod:get(info, IdxState), 159 IsUpdating = couch_index_updater:is_running(State#st.updater), 160 IsCompacting = couch_index_compactor:is_running(State#st.compactor), 161 IdxSeq = Mod:get(update_seq, IdxState), 162 GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, 163 DbName = Mod:get(db_name, IdxState), 164 CommittedSeq = couch_util:with_db(DbName, GetCommSeq), 165 Info = Info0 ++ [ 166 {updater_running, IsUpdating}, 167 {compact_running, IsCompacting}, 168 {waiting_commit, State#st.committed == false}, 169 {waiting_clients, length(State#st.waiters)}, 170 {pending_updates, max(CommittedSeq - IdxSeq, 0)} 171 ], 172 {reply, {ok, Info}, State}; 173handle_call(reset, _From, State) -> 174 #st{ 175 mod=Mod, 176 idx_state=IdxState 177 } = State, 178 {ok, NewIdxState} = Mod:reset(IdxState), 179 {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}}; 180handle_call(compact, _From, State) -> 181 Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state), 182 {reply, Resp, State}; 183handle_call(get_compactor_pid, _From, State) -> 184 {reply, {ok, State#st.compactor}, State}; 185handle_call({compacted, NewIdxState}, _From, State) -> 186 #st{ 187 mod=Mod, 188 idx_state=OldIdxState 189 } = State, 190 assert_signature_match(Mod, OldIdxState, NewIdxState), 191 NewSeq = Mod:get(update_seq, NewIdxState), 192 OldSeq = Mod:get(update_seq, OldIdxState), 193 % For indices that require swapping files, we have to make sure we're 194 % up to date with the current index. Otherwise indexes could roll back 195 % (perhaps considerably) to previous points in history. 196 case is_recompaction_enabled(NewIdxState, State) of 197 true -> 198 case NewSeq >= OldSeq of 199 true -> {reply, ok, commit_compacted(NewIdxState, State)}; 200 false -> {reply, recompact, State} 201 end; 202 false -> 203 {reply, ok, commit_compacted(NewIdxState, State)} 204 end; 205handle_call({compaction_failed, Reason}, _From, State) -> 206 #st{ 207 mod = Mod, 208 idx_state = OldIdxState, 209 waiters = Waiters 210 } = State, 211 send_all(Waiters, Reason), 212 {ok, NewIdxState} = Mod:remove_compacted(OldIdxState), 213 NewState = State#st{idx_state = NewIdxState, waiters = []}, 214 {reply, {ok, NewIdxState}, NewState}. 215 216handle_cast({trigger_update, UpdateSeq}, State) -> 217 #st{ 218 mod=Mod, 219 idx_state=IdxState 220 } = State, 221 case UpdateSeq =< Mod:get(update_seq, IdxState) of 222 true -> 223 {noreply, State}; 224 false -> 225 couch_index_updater:run(State#st.updater, IdxState), 226 {noreply, State} 227 end; 228handle_cast({updated, NewIdxState}, State) -> 229 {noreply, NewState} = handle_cast({new_state, NewIdxState}, State), 230 case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of 231 true -> 232 {stop, normal, NewState}; 233 false -> 234 maybe_restart_updater(NewState), 235 {noreply, NewState} 236 end; 237handle_cast({new_state, NewIdxState}, State) -> 238 #st{ 239 mod=Mod, 240 idx_state=OldIdxState 241 } = State, 242 OldFd = Mod:get(fd, OldIdxState), 243 NewFd = Mod:get(fd, NewIdxState), 244 case NewFd == OldFd of 245 true -> 246 assert_signature_match(Mod, OldIdxState, NewIdxState), 247 CurrSeq = Mod:get(update_seq, NewIdxState), 248 Args = [ 249 Mod:get(db_name, NewIdxState), 250 Mod:get(idx_name, NewIdxState), 251 CurrSeq 252 ], 253 couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args), 254 Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState), 255 case State#st.committed of 256 true -> erlang:send_after(commit_delay(), self(), commit); 257 false -> ok 258 end, 259 {noreply, State#st{ 260 idx_state=NewIdxState, 261 waiters=Rest, 262 committed=false 263 }}; 264 false -> 265 Fmt = "Ignoring update from old indexer for db: ~s idx: ~s", 266 Args = [ 267 Mod:get(db_name, NewIdxState), 268 Mod:get(idx_name, NewIdxState) 269 ], 270 couch_log:warning(Fmt, Args), 271 {noreply, State} 272 end; 273handle_cast({update_error, Error}, State) -> 274 send_all(State#st.waiters, Error), 275 {noreply, State#st{waiters=[]}}; 276handle_cast(stop, State) -> 277 {stop, normal, State}; 278handle_cast(delete, State) -> 279 #st{mod=Mod, idx_state=IdxState} = State, 280 ok = Mod:delete(IdxState), 281 {stop, normal, State}; 282handle_cast({ddoc_updated, DDocResult}, State) -> 283 #st{mod = Mod, idx_state = IdxState} = State, 284 Shutdown = case DDocResult of 285 {not_found, deleted} -> 286 true; 287 {ok, DDoc} -> 288 DbName = Mod:get(db_name, IdxState), 289 couch_util:with_db(DbName, fun(Db) -> 290 {ok, NewIdxState} = Mod:init(Db, DDoc), 291 Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) 292 end) 293 end, 294 case Shutdown of 295 true -> 296 {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}}; 297 false -> 298 {noreply, State#st{shutdown = false}} 299 end; 300handle_cast(ddoc_updated, State) -> 301 #st{mod = Mod, idx_state = IdxState} = State, 302 DbName = Mod:get(db_name, IdxState), 303 DDocId = Mod:get(idx_name, IdxState), 304 Shutdown = couch_util:with_db(DbName, fun(Db) -> 305 case couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) of 306 {not_found, deleted} -> 307 true; 308 {ok, DDoc} -> 309 {ok, NewIdxState} = Mod:init(Db, DDoc), 310 Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState) 311 end 312 end), 313 case Shutdown of 314 true -> 315 {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}}; 316 false -> 317 {noreply, State#st{shutdown = false}} 318 end; 319handle_cast(_Mesg, State) -> 320 {stop, unhandled_cast, State}. 321 322handle_info(commit, #st{committed=true}=State) -> 323 {noreply, State}; 324handle_info(commit, State) -> 325 #st{mod=Mod, idx_state=IdxState} = State, 326 DbName = Mod:get(db_name, IdxState), 327 IdxName = Mod:get(idx_name, IdxState), 328 GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end, 329 CommittedSeq = couch_util:with_db(DbName, GetCommSeq), 330 case CommittedSeq >= Mod:get(update_seq, IdxState) of 331 true -> 332 % Commit the updates 333 ok = Mod:commit(IdxState), 334 couch_event:notify(DbName, {index_commit, IdxName}), 335 {noreply, State#st{committed=true}}; 336 _ -> 337 % We can't commit the header because the database seq that's 338 % fully committed to disk is still behind us. If we committed 339 % now and the database lost those changes our view could be 340 % forever out of sync with the database. But a crash before we 341 % commit these changes, no big deal, we only lose incremental 342 % changes since last committal. 343 erlang:send_after(commit_delay(), self(), commit), 344 {noreply, State} 345 end; 346handle_info(maybe_close, State) -> 347 % We need to periodically check if our index file still 348 % exists on disk because index cleanups don't notify 349 % the couch_index process when a file has been deleted. If 350 % we don't check for this condition then the index can 351 % remain open indefinitely wasting disk space. 352 % 353 % We make sure that we're idle before closing by looking 354 % to see if we have any clients waiting for an update. 355 Mod = State#st.mod, 356 case State#st.waiters of 357 [] -> 358 case Mod:index_file_exists(State#st.idx_state) of 359 true -> 360 erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), 361 {noreply, State}; 362 false -> 363 {stop, normal, State} 364 end; 365 _ -> 366 erlang:send_after(?CHECK_INTERVAL, self(), maybe_close), 367 {noreply, State} 368 end; 369handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) -> 370 Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)], 371 couch_log:debug("Index shutdown by monitor notice for db: ~s idx: ~s", Args), 372 catch send_all(State#st.waiters, shutdown), 373 {stop, normal, State#st{waiters=[]}}. 374 375code_change(_OldVsn, State, _Extra) -> 376 {ok, State}. 377 378maybe_restart_updater(#st{waiters=[]}) -> 379 ok; 380maybe_restart_updater(#st{idx_state=IdxState}=State) -> 381 couch_index_updater:run(State#st.updater, IdxState). 382 383 384send_all(Waiters, Reply) -> 385 [gen_server:reply(From, Reply) || {From, _} <- Waiters]. 386 387 388send_replies(Waiters, UpdateSeq, IdxState) -> 389 Pred = fun({_, S}) -> S =< UpdateSeq end, 390 {ToSend, Remaining} = lists:partition(Pred, Waiters), 391 [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend], 392 Remaining. 393 394assert_signature_match(Mod, OldIdxState, NewIdxState) -> 395 case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of 396 {Sig, Sig} -> ok; 397 _ -> erlang:error(signature_mismatch) 398 end. 399 400commit_compacted(NewIdxState, State) -> 401 #st{ 402 mod=Mod, 403 idx_state=OldIdxState, 404 updater=Updater 405 } = State, 406 {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState), 407 % Restart the indexer if it's running. 408 case couch_index_updater:is_running(Updater) of 409 true -> ok = couch_index_updater:restart(Updater, NewIdxState1); 410 false -> ok 411 end, 412 case State#st.committed of 413 true -> erlang:send_after(commit_delay(), self(), commit); 414 false -> ok 415 end, 416 State#st{ 417 idx_state=NewIdxState1, 418 committed=false 419 }. 420 421is_recompaction_enabled(IdxState, #st{mod = Mod}) -> 422 DbName = binary_to_list(Mod:get(db_name, IdxState)), 423 IdxName = binary_to_list(Mod:get(idx_name, IdxState)), 424 IdxKey = DbName ++ ":" ++ IdxName, 425 426 IdxSignature = couch_index_util:hexsig((Mod:get(signature, IdxState))), 427 428 Global = get_value("view_compaction", "enabled_recompaction"), 429 PerSignature = get_value("view_compaction.recompaction", IdxSignature), 430 PerIdx = get_value("view_compaction.recompaction", IdxKey), 431 PerDb = get_value("view_compaction.recompaction", DbName), 432 433 find_most_specific([Global, PerDb, PerIdx, PerSignature], true). 434 435find_most_specific(Settings, Default) -> 436 Reversed = lists:reverse([Default | Settings]), 437 [Value | _] = lists:dropwhile(fun(A) -> A =:= undefined end, Reversed), 438 Value. 439 440get_value(Section, Key) -> 441 case config:get(Section, Key) of 442 "enabled" -> true; 443 "disabled" -> false; 444 "true" -> true; 445 "false" -> false; 446 undefined -> undefined 447 end. 448 449commit_delay() -> 450 config:get_integer("query_server_config", "commit_freq", 5) * 1000. 451 452 453group_info_timeout_msec() -> 454 Timeout = config:get("query_server_config", "group_info_timeout", "5000"), 455 case Timeout of 456 "infinity" -> 457 infinity; 458 Milliseconds -> 459 list_to_integer(Milliseconds) 460 end. 461 462 463-ifdef(TEST). 464-include_lib("couch/include/couch_eunit.hrl"). 465 466get(db_name, _, _) -> 467 <<"db_name">>; 468get(idx_name, _, _) -> 469 <<"idx_name">>; 470get(signature, _, _) -> 471 <<61,237,157,230,136,93,96,201,204,17,137,186,50,249,44,135>>. 472 473setup_all() -> 474 Ctx = test_util:start_couch(), 475 ok = meck:new([config], [passthrough]), 476 ok = meck:new([test_index], [non_strict]), 477 ok = meck:expect(test_index, get, fun get/3), 478 Ctx. 479 480teardown_all(Ctx) -> 481 meck:unload(), 482 test_util:stop_couch(Ctx). 483 484setup(Settings) -> 485 meck:reset([config, test_index]), 486 ok = meck:expect(config, get, fun(Section, Key) -> 487 configure(Section, Key, Settings) 488 end), 489 {undefined, #st{mod = {test_index}}}. 490 491teardown(_, _) -> 492 ok. 493 494configure("view_compaction", "enabled_recompaction", [Global, _Db, _Index]) -> 495 Global; 496configure("view_compaction.recompaction", "db_name", [_Global, Db, _Index]) -> 497 Db; 498configure("view_compaction.recompaction", "db_name:" ++ _, [_, _, Index]) -> 499 Index; 500configure(Section, Key, _) -> 501 meck:passthrough([Section, Key]). 502 503recompaction_configuration_test_() -> 504 { 505 "Compaction tests", 506 { 507 setup, 508 fun setup_all/0, 509 fun teardown_all/1, 510 { 511 foreachx, 512 fun setup/1, 513 fun teardown/2, 514 recompaction_configuration_tests() 515 } 516 } 517 }. 518 519recompaction_configuration_tests() -> 520 AllCases = couch_tests_combinatorics:product([ 521 [undefined, "true", "false"], 522 [undefined, "enabled", "disabled"], 523 [undefined, "enabled", "disabled"] 524 ]), 525 526 EnabledCases = [ 527 [undefined, undefined, undefined], 528 529 [undefined, undefined,"enabled"], 530 [undefined, "enabled", undefined], 531 [undefined, "disabled", "enabled"], 532 [undefined, "enabled", "enabled"], 533 534 ["true", undefined, undefined], 535 ["true", undefined, "enabled"], 536 ["true", "disabled", "enabled"], 537 ["true", "enabled", undefined], 538 ["true", "enabled", "enabled"], 539 540 ["false", undefined, "enabled"], 541 ["false", "enabled", undefined], 542 ["false", "disabled", "enabled"], 543 ["false", "enabled", "enabled"] 544 ], 545 546 DisabledCases = [ 547 [undefined, undefined, "disabled"], 548 [undefined, "disabled", undefined], 549 [undefined, "disabled", "disabled"], 550 [undefined, "enabled", "disabled"], 551 552 ["true", undefined, "disabled"], 553 ["true", "disabled", undefined], 554 ["true", "disabled", "disabled"], 555 ["true", "enabled", "disabled"], 556 557 ["false", undefined, undefined], 558 ["false", undefined, "disabled"], 559 ["false", "disabled", undefined], 560 ["false", "disabled", "disabled"], 561 ["false", "enabled", "disabled"] 562 ], 563 564 ?assertEqual([], AllCases -- (EnabledCases ++ DisabledCases)), 565 566 [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases] 567 ++ 568 [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases]. 569 570should_call_recompact(Settings, {IdxState, State}) -> 571 {test_id(Settings), ?_test(begin 572 ?assert(is_recompaction_enabled(IdxState, State)), 573 ok 574 end)}. 575 576should_not_call_recompact(Settings, {IdxState, State}) -> 577 {test_id(Settings), ?_test(begin 578 ?assertNot(is_recompaction_enabled(IdxState, State)), 579 ok 580 end)}. 581 582to_string(undefined) -> "undefined"; 583to_string(Value) -> Value. 584 585test_id(Settings0) -> 586 Settings1 = [to_string(Value) || Value <- Settings0], 587 "[ " ++ lists:flatten(string:join(Settings1, " , ")) ++ " ]". 588 589 590get_group_timeout_info_test_() -> 591 { 592 foreach, 593 fun() -> ok end, 594 fun(_) -> meck:unload() end, 595 [ 596 t_group_timeout_info_integer(), 597 t_group_timeout_info_infinity() 598 ] 599 }. 600 601 602t_group_timeout_info_integer() -> 603 ?_test(begin 604 meck:expect(config, get, 605 fun("query_server_config", "group_info_timeout", _) -> 606 "5001" 607 end), 608 ?assertEqual(5001, group_info_timeout_msec()) 609 end). 610 611 612t_group_timeout_info_infinity() -> 613 ?_test(begin 614 meck:expect(config, get, 615 fun("query_server_config", "group_info_timeout", _) -> 616 "infinity" 617 end), 618 ?assertEqual(infinity, group_info_timeout_msec()) 619 end). 620 621 622-endif. 623