1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(ioq).
14-behaviour(gen_server).
15-behaviour(config_listener).
16
17-export([start_link/0, call/3]).
18-export([get_queue_lengths/0]).
19-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).
20
21% config_listener api
22-export([handle_config_change/5, handle_config_terminate/3]).
23
24-define(RELISTEN_DELAY, 5000).
25
26-record(state, {
27    concurrency,
28    ratio,
29    interactive=queue:new(),
30    background=queue:new(),
31    running=[]
32}).
33
34-record(request, {
35    fd,
36    msg,
37    priority,
38    from,
39    ref
40}).
41
42start_link() ->
43    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
44
45call(Fd, Msg, Metadata) ->
46    Priority = io_class(Msg, Metadata),
47    case bypass(Priority) of
48        true ->
49            gen_server:call(Fd, Msg, infinity);
50        false ->
51            queued_call(Fd, Msg, Priority)
52    end.
53
54get_queue_lengths() ->
55    gen_server:call(?MODULE, get_queue_lengths).
56
57bypass(Priority) ->
58    case Priority of
59        os_process -> config:get_boolean("ioq.bypass", "os_process", true);
60        read -> config:get_boolean("ioq.bypass", "read", true);
61        write -> config:get_boolean("ioq.bypass", "write", true);
62        view_update -> config:get_boolean("ioq.bypass", "view_update", true);
63        shard_sync -> config:get_boolean("ioq.bypass", "shard_sync", false);
64        compaction -> config:get_boolean("ioq.bypass", "compaction", false);
65        _ -> config:get("ioq.bypass", atom_to_list(Priority)) =:= "true"
66    end.
67
68io_class({prompt, _}, _) ->
69    os_process;
70io_class({data, _}, _) ->
71    os_process;
72io_class(_, {interactive, _}) ->
73    read;
74io_class(_, {db_update, _}) ->
75    write;
76io_class(_, {view_update, _, _}) ->
77    view_update;
78io_class(_, {internal_repl, _}) ->
79    shard_sync;
80io_class(_, {db_compact, _}) ->
81    compaction;
82io_class(_, {view_compact, _, _}) ->
83    compaction;
84io_class(_, _) ->
85    other.
86
87queued_call(Fd, Msg, Priority) ->
88    Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()},
89    try
90        gen_server:call(?MODULE, Request, infinity)
91    catch
92        exit:{noproc,_} ->
93            gen_server:call(Fd, Msg, infinity)
94    end.
95
96init(_) ->
97    ok = config:listen_for_changes(?MODULE, nil),
98    State = #state{},
99    {ok, read_config(State)}.
100
101read_config(State) ->
102    Ratio = config:get_float("ioq", "ratio", 0.01),
103    Concurrency = config:get_integer("ioq", "concurrency", 10),
104    State#state{concurrency=Concurrency, ratio=Ratio}.
105
106handle_call(get_queue_lengths, _From, State) ->
107    Response = #{
108        interactive => queue:len(State#state.interactive),
109        background => queue:len(State#state.background)
110    },
111    {reply, Response, State, 0};
112handle_call(#request{}=Request, From, State) ->
113    {noreply, enqueue_request(Request#request{from=From}, State), 0}.
114
115handle_cast(change, State) ->
116    {noreply, read_config(State)};
117handle_cast(_Msg, State) ->
118    {noreply, State}.
119
120handle_info({Ref, Reply}, State) ->
121    case lists:keytake(Ref, #request.ref, State#state.running) of
122        {value, Request, Remaining} ->
123            erlang:demonitor(Ref, [flush]),
124            gen_server:reply(Request#request.from, Reply),
125            {noreply, State#state{running=Remaining}, 0};
126        false ->
127            {noreply, State, 0}
128    end;
129handle_info({'DOWN', Ref, _, _, Reason}, State) ->
130    case lists:keytake(Ref, #request.ref, State#state.running) of
131        {value, Request, Remaining} ->
132            gen_server:reply(Request#request.from, {'EXIT', Reason}),
133            {noreply, State#state{running=Remaining}, 0};
134        false ->
135            {noreply, State, 0}
136    end;
137handle_info(restart_config_listener, State) ->
138    ok = config:listen_for_changes(?MODULE, nil),
139    {noreply, State};
140handle_info(timeout, State) ->
141    {noreply, maybe_submit_request(State)}.
142
143handle_config_change("ioq", _, _, _, _) ->
144    {ok, gen_server:cast(?MODULE, change)};
145handle_config_change(_, _, _, _, _) ->
146    {ok, nil}.
147
148handle_config_terminate(_Server, stop, _State) ->
149    ok;
150handle_config_terminate(_Server, _Reason, _State) ->
151    erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener).
152
153code_change(_Vsn, State, _Extra) ->
154    {ok, State}.
155
156terminate(_Reason, _State) ->
157    ok.
158
159enqueue_request(#request{priority=compaction}=Request, #state{}=State) ->
160    State#state{background=queue:in(Request, State#state.background)};
161enqueue_request(#request{priority=shard_sync}=Request, #state{}=State) ->
162    State#state{background=queue:in(Request, State#state.background)};
163enqueue_request(#request{}=Request, #state{}=State) ->
164    State#state{interactive=queue:in(Request, State#state.interactive)}.
165
166maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State)
167  when length(Running) < Concurrency ->
168    case make_next_request(State) of
169        State ->
170            State;
171        NewState when length(Running) >= Concurrency - 1 ->
172            NewState;
173        NewState ->
174            maybe_submit_request(NewState)
175    end;
176maybe_submit_request(State) ->
177    State.
178
179make_next_request(#state{}=State) ->
180    case {queue:is_empty(State#state.background), queue:is_empty(State#state.interactive)} of
181        {true, true} ->
182            State;
183        {true, false} ->
184            choose_next_request(#state.interactive, State);
185        {false, true} ->
186            choose_next_request(#state.background, State);
187        {false, false} ->
188            case couch_rand:uniform() < State#state.ratio of
189                true ->
190                    choose_next_request(#state.background, State);
191                false ->
192                    choose_next_request(#state.interactive, State)
193            end
194    end.
195
196choose_next_request(Index, State) ->
197    case queue:out(element(Index, State)) of
198        {empty, _} ->
199            State;
200        {{value, Request}, Q} ->
201            submit_request(Request, setelement(Index, State, Q))
202    end.
203
204submit_request(#request{}=Request, #state{}=State) ->
205    Ref = erlang:monitor(process, Request#request.fd),
206    Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg},
207    State#state{running = [Request#request{ref=Ref} | State#state.running]}.
208