1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_index).
14-behaviour(gen_server).
15
16-compile(tuple_calls).
17
18-vsn(3).
19
20%% API
21-export([start_link/1, stop/1, get_state/2, get_info/1]).
22-export([trigger_update/2]).
23-export([compact/1, compact/2, get_compactor_pid/1]).
24
25%% gen_server callbacks
26-export([init/1, terminate/2, code_change/3]).
27-export([handle_call/3, handle_cast/2, handle_info/2]).
28
29
30-include_lib("couch/include/couch_db.hrl").
31
32
33-define(CHECK_INTERVAL, 600000). % 10 minutes
34
35-record(st, {
36    mod,
37    idx_state,
38    updater,
39    compactor,
40    waiters=[],
41    committed=true,
42    shutdown=false
43}).
44
45
46start_link({Module0, IdxState0}) ->
47    [Module, IdxState] = couch_index_plugin:before_open(Module0, IdxState0),
48    proc_lib:start_link(?MODULE, init, [{Module, IdxState}]).
49
50
51stop(Pid) ->
52    gen_server:cast(Pid, stop).
53
54
55get_state(Pid, RequestSeq) ->
56    gen_server:call(Pid, {get_state, RequestSeq}, infinity).
57
58
59get_info(Pid) ->
60    gen_server:call(Pid, get_info, group_info_timeout_msec()).
61
62
63trigger_update(Pid, UpdateSeq) ->
64    gen_server:cast(Pid, {trigger_update, UpdateSeq}).
65
66
67compact(Pid) ->
68    compact(Pid, []).
69
70
71compact(Pid, Options) ->
72    {ok, CPid} = gen_server:call(Pid, compact),
73    case lists:member(monitor, Options) of
74        true -> {ok, erlang:monitor(process, CPid)};
75        false -> ok
76    end.
77
78
79get_compactor_pid(Pid) ->
80    gen_server:call(Pid, get_compactor_pid).
81
82init({Mod, IdxState}) ->
83    DbName = Mod:get(db_name, IdxState),
84    erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
85    Resp = couch_util:with_db(DbName, fun(Db) ->
86        case Mod:open(Db, IdxState) of
87            {ok, IdxSt} ->
88                couch_db:monitor(Db),
89                {ok, IdxSt};
90            Error ->
91                Error
92        end
93    end),
94    case Resp of
95        {ok, NewIdxState} ->
96            {ok, UPid} = couch_index_updater:start_link(self(), Mod),
97            {ok, CPid} = couch_index_compactor:start_link(self(), Mod),
98            State = #st{
99                mod=Mod,
100                idx_state=NewIdxState,
101                updater=UPid,
102                compactor=CPid
103            },
104            Args = [
105                Mod:get(db_name, IdxState),
106                Mod:get(idx_name, IdxState),
107                couch_index_util:hexsig(Mod:get(signature, IdxState))
108            ],
109            couch_log:debug("Opening index for db: ~s idx: ~s sig: ~p", Args),
110            proc_lib:init_ack({ok, self()}),
111            gen_server:enter_loop(?MODULE, [], State);
112        Other ->
113            proc_lib:init_ack(Other)
114    end.
115
116
117terminate(Reason0, State) ->
118    #st{mod=Mod, idx_state=IdxState}=State,
119    case Reason0 of
120        {shutdown, ddoc_updated} ->
121            Mod:shutdown(IdxState),
122            Reason = ddoc_updated;
123        _ ->
124            Mod:close(IdxState),
125            Reason = Reason0
126    end,
127    send_all(State#st.waiters, Reason),
128    couch_util:shutdown_sync(State#st.updater),
129    couch_util:shutdown_sync(State#st.compactor),
130    Args = [
131        Mod:get(db_name, IdxState),
132        Mod:get(idx_name, IdxState),
133        couch_index_util:hexsig(Mod:get(signature, IdxState)),
134        Reason
135    ],
136    couch_log:debug("Closing index for db: ~s idx: ~s sig: ~p because ~r", Args),
137    ok.
138
139
140handle_call({get_state, ReqSeq}, From, State) ->
141    #st{
142        mod=Mod,
143        idx_state=IdxState,
144        waiters=Waiters
145    } = State,
146    IdxSeq = Mod:get(update_seq, IdxState),
147    case ReqSeq =< IdxSeq of
148        true ->
149            {reply, {ok, IdxState}, State};
150        _ -> % View update required
151            couch_index_updater:run(State#st.updater, IdxState),
152            Waiters2 = [{From, ReqSeq} | Waiters],
153            {noreply, State#st{waiters=Waiters2}, infinity}
154    end;
155handle_call(get_info, _From, State) ->
156    #st{mod=Mod} = State,
157    IdxState = State#st.idx_state,
158    {ok, Info0} = Mod:get(info, IdxState),
159    IsUpdating = couch_index_updater:is_running(State#st.updater),
160    IsCompacting = couch_index_compactor:is_running(State#st.compactor),
161    IdxSeq = Mod:get(update_seq, IdxState),
162    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
163    DbName = Mod:get(db_name, IdxState),
164    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
165    Info = Info0 ++ [
166        {updater_running, IsUpdating},
167        {compact_running, IsCompacting},
168        {waiting_commit, State#st.committed == false},
169        {waiting_clients, length(State#st.waiters)},
170        {pending_updates, max(CommittedSeq - IdxSeq, 0)}
171    ],
172    {reply, {ok, Info}, State};
173handle_call(reset, _From, State) ->
174    #st{
175        mod=Mod,
176        idx_state=IdxState
177    } = State,
178    {ok, NewIdxState} = Mod:reset(IdxState),
179    {reply, {ok, NewIdxState}, State#st{idx_state=NewIdxState}};
180handle_call(compact, _From, State) ->
181    Resp = couch_index_compactor:run(State#st.compactor, State#st.idx_state),
182    {reply, Resp, State};
183handle_call(get_compactor_pid, _From, State) ->
184    {reply, {ok, State#st.compactor}, State};
185handle_call({compacted, NewIdxState}, _From, State) ->
186    #st{
187        mod=Mod,
188        idx_state=OldIdxState
189    } = State,
190    assert_signature_match(Mod, OldIdxState, NewIdxState),
191    NewSeq = Mod:get(update_seq, NewIdxState),
192    OldSeq = Mod:get(update_seq, OldIdxState),
193    % For indices that require swapping files, we have to make sure we're
194    % up to date with the current index. Otherwise indexes could roll back
195    % (perhaps considerably) to previous points in history.
196    case is_recompaction_enabled(NewIdxState, State) of
197        true ->
198            case NewSeq >= OldSeq of
199                true -> {reply, ok, commit_compacted(NewIdxState, State)};
200                false -> {reply, recompact, State}
201            end;
202        false ->
203            {reply, ok, commit_compacted(NewIdxState, State)}
204    end;
205handle_call({compaction_failed, Reason}, _From, State) ->
206    #st{
207        mod = Mod,
208        idx_state = OldIdxState,
209        waiters = Waiters
210    } = State,
211    send_all(Waiters, Reason),
212    {ok, NewIdxState} = Mod:remove_compacted(OldIdxState),
213    NewState = State#st{idx_state = NewIdxState, waiters = []},
214    {reply, {ok, NewIdxState}, NewState}.
215
216handle_cast({trigger_update, UpdateSeq}, State) ->
217    #st{
218        mod=Mod,
219        idx_state=IdxState
220    } = State,
221    case UpdateSeq =< Mod:get(update_seq, IdxState) of
222        true ->
223            {noreply, State};
224        false ->
225            couch_index_updater:run(State#st.updater, IdxState),
226            {noreply, State}
227    end;
228handle_cast({updated, NewIdxState}, State) ->
229    {noreply, NewState} = handle_cast({new_state, NewIdxState}, State),
230    case NewState#st.shutdown andalso (NewState#st.waiters =:= []) of
231        true ->
232            {stop, normal, NewState};
233        false ->
234            maybe_restart_updater(NewState),
235            {noreply, NewState}
236    end;
237handle_cast({new_state, NewIdxState}, State) ->
238    #st{
239        mod=Mod,
240        idx_state=OldIdxState
241    } = State,
242    OldFd = Mod:get(fd, OldIdxState),
243    NewFd = Mod:get(fd, NewIdxState),
244    case NewFd == OldFd of
245        true ->
246            assert_signature_match(Mod, OldIdxState, NewIdxState),
247            CurrSeq = Mod:get(update_seq, NewIdxState),
248            Args = [
249                Mod:get(db_name, NewIdxState),
250                Mod:get(idx_name, NewIdxState),
251                CurrSeq
252            ],
253            couch_log:debug("Updated index for db: ~s idx: ~s seq: ~B", Args),
254            Rest = send_replies(State#st.waiters, CurrSeq, NewIdxState),
255            case State#st.committed of
256                true -> erlang:send_after(commit_delay(), self(), commit);
257                false -> ok
258            end,
259            {noreply, State#st{
260                idx_state=NewIdxState,
261                waiters=Rest,
262                committed=false
263            }};
264        false ->
265            Fmt = "Ignoring update from old indexer for db: ~s idx: ~s",
266            Args = [
267                Mod:get(db_name, NewIdxState),
268                Mod:get(idx_name, NewIdxState)
269            ],
270            couch_log:warning(Fmt, Args),
271            {noreply, State}
272    end;
273handle_cast({update_error, Error}, State) ->
274    send_all(State#st.waiters, Error),
275    {noreply, State#st{waiters=[]}};
276handle_cast(stop, State) ->
277    {stop, normal, State};
278handle_cast(delete, State) ->
279    #st{mod=Mod, idx_state=IdxState} = State,
280    ok = Mod:delete(IdxState),
281    {stop, normal, State};
282handle_cast({ddoc_updated, DDocResult}, State) ->
283    #st{mod = Mod, idx_state = IdxState} = State,
284    Shutdown = case DDocResult of
285        {not_found, deleted} ->
286            true;
287        {ok, DDoc} ->
288            DbName = Mod:get(db_name, IdxState),
289            couch_util:with_db(DbName, fun(Db) ->
290                {ok, NewIdxState} = Mod:init(Db, DDoc),
291                Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
292            end)
293    end,
294    case Shutdown of
295        true ->
296            {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
297        false ->
298            {noreply, State#st{shutdown = false}}
299    end;
300handle_cast(ddoc_updated, State) ->
301    #st{mod = Mod, idx_state = IdxState} = State,
302    DbName = Mod:get(db_name, IdxState),
303    DDocId = Mod:get(idx_name, IdxState),
304    Shutdown = couch_util:with_db(DbName, fun(Db) ->
305        case couch_db:open_doc(Db, DDocId, [ejson_body, ?ADMIN_CTX]) of
306            {not_found, deleted} ->
307                true;
308            {ok, DDoc} ->
309                {ok, NewIdxState} = Mod:init(Db, DDoc),
310                Mod:get(signature, NewIdxState) =/= Mod:get(signature, IdxState)
311        end
312    end),
313    case Shutdown of
314        true ->
315            {stop, {shutdown, ddoc_updated}, State#st{shutdown = true}};
316        false ->
317            {noreply, State#st{shutdown = false}}
318    end;
319handle_cast(_Mesg, State) ->
320    {stop, unhandled_cast, State}.
321
322handle_info(commit, #st{committed=true}=State) ->
323    {noreply, State};
324handle_info(commit, State) ->
325    #st{mod=Mod, idx_state=IdxState} = State,
326    DbName = Mod:get(db_name, IdxState),
327    IdxName = Mod:get(idx_name, IdxState),
328    GetCommSeq = fun(Db) -> couch_db:get_committed_update_seq(Db) end,
329    CommittedSeq = couch_util:with_db(DbName, GetCommSeq),
330    case CommittedSeq >= Mod:get(update_seq, IdxState) of
331        true ->
332            % Commit the updates
333            ok = Mod:commit(IdxState),
334            couch_event:notify(DbName, {index_commit, IdxName}),
335            {noreply, State#st{committed=true}};
336        _ ->
337            % We can't commit the header because the database seq that's
338            % fully committed to disk is still behind us. If we committed
339            % now and the database lost those changes our view could be
340            % forever out of sync with the database. But a crash before we
341            % commit these changes, no big deal, we only lose incremental
342            % changes since last committal.
343            erlang:send_after(commit_delay(), self(), commit),
344            {noreply, State}
345    end;
346handle_info(maybe_close, State) ->
347    % We need to periodically check if our index file still
348    % exists on disk because index cleanups don't notify
349    % the couch_index process when a file has been deleted. If
350    % we don't check for this condition then the index can
351    % remain open indefinitely wasting disk space.
352    %
353    % We make sure that we're idle before closing by looking
354    % to see if we have any clients waiting for an update.
355    Mod = State#st.mod,
356    case State#st.waiters of
357        [] ->
358            case Mod:index_file_exists(State#st.idx_state) of
359                true ->
360                    erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
361                    {noreply, State};
362                false ->
363                    {stop, normal, State}
364            end;
365        _ ->
366            erlang:send_after(?CHECK_INTERVAL, self(), maybe_close),
367            {noreply, State}
368    end;
369handle_info({'DOWN', _, _, _Pid, _}, #st{mod=Mod, idx_state=IdxState}=State) ->
370    Args = [Mod:get(db_name, IdxState), Mod:get(idx_name, IdxState)],
371    couch_log:debug("Index shutdown by monitor notice for db: ~s idx: ~s", Args),
372    catch send_all(State#st.waiters, shutdown),
373    {stop, normal, State#st{waiters=[]}}.
374
375code_change(_OldVsn, State, _Extra) ->
376    {ok, State}.
377
378maybe_restart_updater(#st{waiters=[]}) ->
379    ok;
380maybe_restart_updater(#st{idx_state=IdxState}=State) ->
381    couch_index_updater:run(State#st.updater, IdxState).
382
383
384send_all(Waiters, Reply) ->
385    [gen_server:reply(From, Reply) || {From, _} <- Waiters].
386
387
388send_replies(Waiters, UpdateSeq, IdxState) ->
389    Pred = fun({_, S}) -> S =< UpdateSeq end,
390    {ToSend, Remaining} = lists:partition(Pred, Waiters),
391    [gen_server:reply(From, {ok, IdxState}) || {From, _} <- ToSend],
392    Remaining.
393
394assert_signature_match(Mod, OldIdxState, NewIdxState) ->
395    case {Mod:get(signature, OldIdxState), Mod:get(signature, NewIdxState)} of
396        {Sig, Sig} -> ok;
397        _ -> erlang:error(signature_mismatch)
398    end.
399
400commit_compacted(NewIdxState, State) ->
401    #st{
402        mod=Mod,
403        idx_state=OldIdxState,
404        updater=Updater
405    } = State,
406    {ok, NewIdxState1} = Mod:swap_compacted(OldIdxState, NewIdxState),
407    % Restart the indexer if it's running.
408    case couch_index_updater:is_running(Updater) of
409        true -> ok = couch_index_updater:restart(Updater, NewIdxState1);
410        false -> ok
411    end,
412    case State#st.committed of
413        true -> erlang:send_after(commit_delay(), self(), commit);
414        false -> ok
415    end,
416    State#st{
417        idx_state=NewIdxState1,
418        committed=false
419     }.
420
421is_recompaction_enabled(IdxState, #st{mod = Mod}) ->
422    DbName = binary_to_list(Mod:get(db_name, IdxState)),
423    IdxName = binary_to_list(Mod:get(idx_name, IdxState)),
424    IdxKey = DbName ++ ":" ++ IdxName,
425
426    IdxSignature = couch_index_util:hexsig((Mod:get(signature, IdxState))),
427
428    Global = get_value("view_compaction", "enabled_recompaction"),
429    PerSignature = get_value("view_compaction.recompaction", IdxSignature),
430    PerIdx = get_value("view_compaction.recompaction", IdxKey),
431    PerDb = get_value("view_compaction.recompaction", DbName),
432
433    find_most_specific([Global, PerDb, PerIdx, PerSignature], true).
434
435find_most_specific(Settings, Default) ->
436    Reversed = lists:reverse([Default | Settings]),
437    [Value | _] = lists:dropwhile(fun(A) -> A =:= undefined end, Reversed),
438    Value.
439
440get_value(Section, Key) ->
441    case config:get(Section, Key) of
442        "enabled" -> true;
443        "disabled" -> false;
444        "true" -> true;
445        "false" -> false;
446        undefined -> undefined
447    end.
448
449commit_delay() ->
450    config:get_integer("query_server_config", "commit_freq", 5) * 1000.
451
452
453group_info_timeout_msec() ->
454    Timeout = config:get("query_server_config", "group_info_timeout", "5000"),
455    case Timeout of
456        "infinity" ->
457            infinity;
458        Milliseconds ->
459            list_to_integer(Milliseconds)
460    end.
461
462
463-ifdef(TEST).
464-include_lib("couch/include/couch_eunit.hrl").
465
466get(db_name, _, _) ->
467    <<"db_name">>;
468get(idx_name, _, _) ->
469    <<"idx_name">>;
470get(signature, _, _) ->
471    <<61,237,157,230,136,93,96,201,204,17,137,186,50,249,44,135>>.
472
473setup_all() ->
474    Ctx = test_util:start_couch(),
475    ok = meck:new([config], [passthrough]),
476    ok = meck:new([test_index], [non_strict]),
477    ok = meck:expect(test_index, get, fun get/3),
478    Ctx.
479
480teardown_all(Ctx) ->
481    meck:unload(),
482    test_util:stop_couch(Ctx).
483
484setup(Settings) ->
485    meck:reset([config, test_index]),
486    ok = meck:expect(config, get, fun(Section, Key) ->
487        configure(Section, Key, Settings)
488    end),
489    {undefined, #st{mod = {test_index}}}.
490
491teardown(_, _) ->
492    ok.
493
494configure("view_compaction", "enabled_recompaction", [Global, _Db, _Index]) ->
495    Global;
496configure("view_compaction.recompaction", "db_name", [_Global, Db, _Index]) ->
497    Db;
498configure("view_compaction.recompaction", "db_name:" ++ _, [_, _, Index]) ->
499    Index;
500configure(Section, Key, _) ->
501    meck:passthrough([Section, Key]).
502
503recompaction_configuration_test_() ->
504    {
505        "Compaction tests",
506        {
507            setup,
508            fun setup_all/0,
509            fun teardown_all/1,
510            {
511                foreachx,
512                fun setup/1,
513                fun teardown/2,
514                recompaction_configuration_tests()
515            }
516        }
517    }.
518
519recompaction_configuration_tests() ->
520    AllCases = couch_tests_combinatorics:product([
521        [undefined, "true", "false"],
522        [undefined, "enabled", "disabled"],
523        [undefined, "enabled", "disabled"]
524    ]),
525
526    EnabledCases = [
527        [undefined, undefined, undefined],
528
529        [undefined, undefined,"enabled"],
530        [undefined, "enabled", undefined],
531        [undefined, "disabled", "enabled"],
532        [undefined, "enabled", "enabled"],
533
534        ["true", undefined, undefined],
535        ["true", undefined, "enabled"],
536        ["true", "disabled", "enabled"],
537        ["true", "enabled", undefined],
538        ["true", "enabled", "enabled"],
539
540        ["false", undefined, "enabled"],
541        ["false", "enabled", undefined],
542        ["false", "disabled", "enabled"],
543        ["false", "enabled", "enabled"]
544    ],
545
546    DisabledCases = [
547        [undefined, undefined, "disabled"],
548        [undefined, "disabled", undefined],
549        [undefined, "disabled", "disabled"],
550        [undefined, "enabled", "disabled"],
551
552        ["true", undefined, "disabled"],
553        ["true", "disabled", undefined],
554        ["true", "disabled", "disabled"],
555        ["true", "enabled", "disabled"],
556
557        ["false", undefined, undefined],
558        ["false", undefined, "disabled"],
559        ["false", "disabled", undefined],
560        ["false", "disabled", "disabled"],
561        ["false", "enabled", "disabled"]
562    ],
563
564    ?assertEqual([], AllCases -- (EnabledCases ++ DisabledCases)),
565
566    [{Settings, fun should_not_call_recompact/2} || Settings <- DisabledCases]
567    ++
568    [{Settings, fun should_call_recompact/2} || Settings <- EnabledCases].
569
570should_call_recompact(Settings, {IdxState, State}) ->
571    {test_id(Settings), ?_test(begin
572        ?assert(is_recompaction_enabled(IdxState, State)),
573        ok
574    end)}.
575
576should_not_call_recompact(Settings, {IdxState, State}) ->
577    {test_id(Settings), ?_test(begin
578        ?assertNot(is_recompaction_enabled(IdxState, State)),
579        ok
580    end)}.
581
582to_string(undefined) -> "undefined";
583to_string(Value) -> Value.
584
585test_id(Settings0) ->
586    Settings1 = [to_string(Value) || Value <- Settings0],
587    "[ " ++ lists:flatten(string:join(Settings1, " , ")) ++ " ]".
588
589
590get_group_timeout_info_test_() ->
591    {
592        foreach,
593        fun() -> ok end,
594        fun(_) -> meck:unload() end,
595        [
596            t_group_timeout_info_integer(),
597            t_group_timeout_info_infinity()
598        ]
599    }.
600
601
602t_group_timeout_info_integer() ->
603     ?_test(begin
604        meck:expect(config, get,
605            fun("query_server_config", "group_info_timeout", _) ->
606               "5001"
607            end),
608        ?assertEqual(5001, group_info_timeout_msec())
609    end).
610
611
612t_group_timeout_info_infinity() ->
613     ?_test(begin
614        meck:expect(config, get,
615            fun("query_server_config", "group_info_timeout", _) ->
616                "infinity"
617            end),
618        ?assertEqual(infinity, group_info_timeout_msec())
619    end).
620
621
622-endif.
623