1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(ioq). 14-behaviour(gen_server). 15-behaviour(config_listener). 16 17-export([start_link/0, call/3]). 18-export([get_queue_lengths/0]). 19-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). 20 21% config_listener api 22-export([handle_config_change/5, handle_config_terminate/3]). 23 24-define(RELISTEN_DELAY, 5000). 25 26-record(state, { 27 concurrency, 28 ratio, 29 interactive=queue:new(), 30 background=queue:new(), 31 running=[] 32}). 33 34-record(request, { 35 fd, 36 msg, 37 priority, 38 from, 39 ref 40}). 41 42start_link() -> 43 gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). 44 45call(Fd, Msg, Metadata) -> 46 Priority = io_class(Msg, Metadata), 47 case bypass(Priority) of 48 true -> 49 gen_server:call(Fd, Msg, infinity); 50 false -> 51 queued_call(Fd, Msg, Priority) 52 end. 53 54get_queue_lengths() -> 55 gen_server:call(?MODULE, get_queue_lengths). 56 57bypass(Priority) -> 58 case Priority of 59 os_process -> config:get_boolean("ioq.bypass", "os_process", true); 60 read -> config:get_boolean("ioq.bypass", "read", true); 61 write -> config:get_boolean("ioq.bypass", "write", true); 62 view_update -> config:get_boolean("ioq.bypass", "view_update", true); 63 shard_sync -> config:get_boolean("ioq.bypass", "shard_sync", false); 64 compaction -> config:get_boolean("ioq.bypass", "compaction", false); 65 _ -> config:get("ioq.bypass", atom_to_list(Priority)) =:= "true" 66 end. 67 68io_class({prompt, _}, _) -> 69 os_process; 70io_class({data, _}, _) -> 71 os_process; 72io_class(_, {interactive, _}) -> 73 read; 74io_class(_, {db_update, _}) -> 75 write; 76io_class(_, {view_update, _, _}) -> 77 view_update; 78io_class(_, {internal_repl, _}) -> 79 shard_sync; 80io_class(_, {db_compact, _}) -> 81 compaction; 82io_class(_, {view_compact, _, _}) -> 83 compaction; 84io_class(_, _) -> 85 other. 86 87queued_call(Fd, Msg, Priority) -> 88 Request = #request{fd=Fd, msg=Msg, priority=Priority, from=self()}, 89 try 90 gen_server:call(?MODULE, Request, infinity) 91 catch 92 exit:{noproc,_} -> 93 gen_server:call(Fd, Msg, infinity) 94 end. 95 96init(_) -> 97 ok = config:listen_for_changes(?MODULE, nil), 98 State = #state{}, 99 {ok, read_config(State)}. 100 101read_config(State) -> 102 Ratio = config:get_float("ioq", "ratio", 0.01), 103 Concurrency = config:get_integer("ioq", "concurrency", 10), 104 State#state{concurrency=Concurrency, ratio=Ratio}. 105 106handle_call(get_queue_lengths, _From, State) -> 107 Response = #{ 108 interactive => queue:len(State#state.interactive), 109 background => queue:len(State#state.background) 110 }, 111 {reply, Response, State, 0}; 112handle_call(#request{}=Request, From, State) -> 113 {noreply, enqueue_request(Request#request{from=From}, State), 0}. 114 115handle_cast(change, State) -> 116 {noreply, read_config(State)}; 117handle_cast(_Msg, State) -> 118 {noreply, State}. 119 120handle_info({Ref, Reply}, State) -> 121 case lists:keytake(Ref, #request.ref, State#state.running) of 122 {value, Request, Remaining} -> 123 erlang:demonitor(Ref, [flush]), 124 gen_server:reply(Request#request.from, Reply), 125 {noreply, State#state{running=Remaining}, 0}; 126 false -> 127 {noreply, State, 0} 128 end; 129handle_info({'DOWN', Ref, _, _, Reason}, State) -> 130 case lists:keytake(Ref, #request.ref, State#state.running) of 131 {value, Request, Remaining} -> 132 gen_server:reply(Request#request.from, {'EXIT', Reason}), 133 {noreply, State#state{running=Remaining}, 0}; 134 false -> 135 {noreply, State, 0} 136 end; 137handle_info(restart_config_listener, State) -> 138 ok = config:listen_for_changes(?MODULE, nil), 139 {noreply, State}; 140handle_info(timeout, State) -> 141 {noreply, maybe_submit_request(State)}. 142 143handle_config_change("ioq", _, _, _, _) -> 144 {ok, gen_server:cast(?MODULE, change)}; 145handle_config_change(_, _, _, _, _) -> 146 {ok, nil}. 147 148handle_config_terminate(_Server, stop, _State) -> 149 ok; 150handle_config_terminate(_Server, _Reason, _State) -> 151 erlang:send_after(?RELISTEN_DELAY, whereis(?MODULE), restart_config_listener). 152 153code_change(_Vsn, State, _Extra) -> 154 {ok, State}. 155 156terminate(_Reason, _State) -> 157 ok. 158 159enqueue_request(#request{priority=compaction}=Request, #state{}=State) -> 160 State#state{background=queue:in(Request, State#state.background)}; 161enqueue_request(#request{priority=shard_sync}=Request, #state{}=State) -> 162 State#state{background=queue:in(Request, State#state.background)}; 163enqueue_request(#request{}=Request, #state{}=State) -> 164 State#state{interactive=queue:in(Request, State#state.interactive)}. 165 166maybe_submit_request(#state{concurrency=Concurrency, running=Running}=State) 167 when length(Running) < Concurrency -> 168 case make_next_request(State) of 169 State -> 170 State; 171 NewState when length(Running) >= Concurrency - 1 -> 172 NewState; 173 NewState -> 174 maybe_submit_request(NewState) 175 end; 176maybe_submit_request(State) -> 177 State. 178 179make_next_request(#state{}=State) -> 180 case {queue:is_empty(State#state.background), queue:is_empty(State#state.interactive)} of 181 {true, true} -> 182 State; 183 {true, false} -> 184 choose_next_request(#state.interactive, State); 185 {false, true} -> 186 choose_next_request(#state.background, State); 187 {false, false} -> 188 case couch_rand:uniform() < State#state.ratio of 189 true -> 190 choose_next_request(#state.background, State); 191 false -> 192 choose_next_request(#state.interactive, State) 193 end 194 end. 195 196choose_next_request(Index, State) -> 197 case queue:out(element(Index, State)) of 198 {empty, _} -> 199 State; 200 {{value, Request}, Q} -> 201 submit_request(Request, setelement(Index, State, Q)) 202 end. 203 204submit_request(#request{}=Request, #state{}=State) -> 205 Ref = erlang:monitor(process, Request#request.fd), 206 Request#request.fd ! {'$gen_call', {self(), Ref}, Request#request.msg}, 207 State#state{running = [Request#request{ref=Ref} | State#state.running]}. 208