1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License.  You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(chttpd_node).
14-compile(tuple_calls).
15
16-export([
17    handle_node_req/1,
18    get_stats/0,
19    run_queues/0
20]).
21
22-include_lib("couch/include/couch_db.hrl").
23
24-import(chttpd,
25    [send_json/2,send_json/3,send_method_not_allowed/2,
26    send_chunk/2,start_chunked_response/3]).
27
28% Node-specific request handler (_config and _stats)
29% Support _local meaning this node
30handle_node_req(#httpd{path_parts=[_, <<"_local">>]}=Req) ->
31    send_json(Req, 200, {[{name, node()}]});
32handle_node_req(#httpd{path_parts=[A, <<"_local">>|Rest]}=Req) ->
33    handle_node_req(Req#httpd{path_parts=[A, node()] ++ Rest});
34% GET /_node/$node/_versions
35handle_node_req(#httpd{method='GET', path_parts=[_, _Node, <<"_versions">>]}=Req) ->
36    IcuVer = couch_ejson_compare:get_icu_version(),
37    UcaVer = couch_ejson_compare:get_uca_version(),
38    send_json(Req, 200, #{
39        erlang_version => ?l2b(?COUCHDB_ERLANG_VERSION),
40        collation_driver => #{
41            name => <<"libicu">>,
42            library_version => version_tuple_to_str(IcuVer),
43            collation_algorithm_version => version_tuple_to_str(UcaVer)
44        },
45        javascript_engine => #{
46            name => <<"spidermonkey">>,
47            version => couch_server:get_spidermonkey_version()
48        }
49    });
50handle_node_req(#httpd{path_parts=[_, _Node, <<"_versions">>]}=Req) ->
51    send_method_not_allowed(Req, "GET");
52
53% GET /_node/$node/_config
54handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_config">>]}=Req) ->
55    Grouped = lists:foldl(fun({{Section, Key}, Value}, Acc) ->
56        case dict:is_key(Section, Acc) of
57        true ->
58            dict:append(Section, {list_to_binary(Key), list_to_binary(Value)}, Acc);
59        false ->
60            dict:store(Section, [{list_to_binary(Key), list_to_binary(Value)}], Acc)
61        end
62    end, dict:new(), call_node(Node, config, all, [])),
63    KVs = dict:fold(fun(Section, Values, Acc) ->
64        [{list_to_binary(Section), {Values}} | Acc]
65    end, [], Grouped),
66    send_json(Req, 200, {KVs});
67handle_node_req(#httpd{path_parts=[_, _Node, <<"_config">>]}=Req) ->
68    send_method_not_allowed(Req, "GET");
69% POST /_node/$node/_config/_reload - Flushes unpersisted config values from RAM
70handle_node_req(#httpd{method='POST', path_parts=[_, Node, <<"_config">>, <<"_reload">>]}=Req) ->
71    case call_node(Node, config, reload, []) of
72        ok ->
73            send_json(Req, 200, {[{ok, true}]});
74        {error, Reason} ->
75            chttpd:send_error(Req, {bad_request, Reason})
76    end;
77handle_node_req(#httpd{path_parts=[_, _Node, <<"_config">>, <<"_reload">>]}=Req) ->
78    send_method_not_allowed(Req, "POST");
79% GET /_node/$node/_config/Section
80handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_config">>, Section]}=Req) ->
81    KVs = [{list_to_binary(Key), list_to_binary(Value)}
82            || {Key, Value} <- call_node(Node, config, get, [Section])],
83    send_json(Req, 200, {KVs});
84handle_node_req(#httpd{path_parts=[_, _Node, <<"_config">>, _Section]}=Req) ->
85    send_method_not_allowed(Req, "GET");
86% PUT /_node/$node/_config/Section/Key
87% "value"
88handle_node_req(#httpd{method='PUT', path_parts=[_, Node, <<"_config">>, Section, Key]}=Req) ->
89    couch_util:check_config_blacklist(Section),
90    Value = couch_util:trim(chttpd:json_body(Req)),
91    Persist = chttpd:header_value(Req, "X-Couch-Persist") /= "false",
92    OldValue = call_node(Node, config, get, [Section, Key, ""]),
93    IsSensitive = Section == <<"admins">>,
94    Opts = #{persist => Persist, sensitive => IsSensitive},
95    case call_node(Node, config, set, [Section, Key, ?b2l(Value), Opts]) of
96        ok ->
97            send_json(Req, 200, list_to_binary(OldValue));
98        {error, Reason} ->
99            chttpd:send_error(Req, {bad_request, Reason})
100    end;
101% GET /_node/$node/_config/Section/Key
102handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_config">>, Section, Key]}=Req) ->
103    case call_node(Node, config, get, [Section, Key, undefined]) of
104    undefined ->
105        throw({not_found, unknown_config_value});
106    Value ->
107        send_json(Req, 200, list_to_binary(Value))
108    end;
109% DELETE /_node/$node/_config/Section/Key
110handle_node_req(#httpd{method='DELETE',path_parts=[_, Node, <<"_config">>, Section, Key]}=Req) ->
111    couch_util:check_config_blacklist(Section),
112    Persist = chttpd:header_value(Req, "X-Couch-Persist") /= "false",
113    case call_node(Node, config, get, [Section, Key, undefined]) of
114    undefined ->
115        throw({not_found, unknown_config_value});
116    OldValue ->
117        case call_node(Node, config, delete, [Section, Key, Persist]) of
118            ok ->
119                send_json(Req, 200, list_to_binary(OldValue));
120            {error, Reason} ->
121                chttpd:send_error(Req, {bad_request, Reason})
122        end
123    end;
124handle_node_req(#httpd{path_parts=[_, _Node, <<"_config">>, _Section, _Key]}=Req) ->
125    send_method_not_allowed(Req, "GET,PUT,DELETE");
126handle_node_req(#httpd{path_parts=[_, _Node, <<"_config">>, _Section, _Key | _]}=Req) ->
127    chttpd:send_error(Req, not_found);
128% GET /_node/$node/_stats
129handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_stats">> | Path]}=Req) ->
130    flush(Node, Req),
131    Stats0 = call_node(Node, couch_stats, fetch, []),
132    Stats = couch_stats_httpd:transform_stats(Stats0),
133    Nested = couch_stats_httpd:nest(Stats),
134    EJSON0 = couch_stats_httpd:to_ejson(Nested),
135    EJSON1 = couch_stats_httpd:extract_path(Path, EJSON0),
136    chttpd:send_json(Req, EJSON1);
137handle_node_req(#httpd{path_parts=[_, _Node, <<"_stats">>]}=Req) ->
138    send_method_not_allowed(Req, "GET");
139handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_prometheus">>]}=Req) ->
140    Metrics = call_node(Node, couch_prometheus_server, scrape, []),
141    Version = call_node(Node, couch_prometheus_server, version, []),
142    Type  = "text/plain; version=" ++ Version,
143    Header = [{<<"Content-Type">>, ?l2b(Type)}],
144    chttpd:send_response(Req, 200, Header, Metrics);
145handle_node_req(#httpd{path_parts=[_, _Node, <<"_prometheus">>]}=Req) ->
146    send_method_not_allowed(Req, "GET");
147% GET /_node/$node/_system
148handle_node_req(#httpd{method='GET', path_parts=[_, Node, <<"_system">>]}=Req) ->
149    Stats = call_node(Node, chttpd_node, get_stats, []),
150    EJSON = couch_stats_httpd:to_ejson(Stats),
151    send_json(Req, EJSON);
152handle_node_req(#httpd{path_parts=[_, _Node, <<"_system">>]}=Req) ->
153    send_method_not_allowed(Req, "GET");
154% POST /_node/$node/_restart
155handle_node_req(#httpd{method='POST', path_parts=[_, Node, <<"_restart">>]}=Req) ->
156    call_node(Node, init, restart, []),
157    send_json(Req, 200, {[{ok, true}]});
158handle_node_req(#httpd{path_parts=[_, _Node, <<"_restart">>]}=Req) ->
159    send_method_not_allowed(Req, "POST");
160handle_node_req(#httpd{path_parts=[_, Node | PathParts],
161                       mochi_req=MochiReq0}) ->
162    % strip /_node/{node} from Req0 before descending further
163    RawUri = MochiReq0:get(raw_path),
164    {_, Query, Fragment} = mochiweb_util:urlsplit_path(RawUri),
165    NewPath0 = "/" ++ lists:join("/", [couch_util:url_encode(P) || P <- PathParts]),
166    NewRawPath = mochiweb_util:urlunsplit_path({NewPath0, Query, Fragment}),
167    MaxSize = chttpd_util:get_chttpd_config_integer(
168        "max_http_request_size", 4294967296),
169    NewOpts = [{body, MochiReq0:recv_body(MaxSize)} | MochiReq0:get(opts)],
170    Ref = erlang:make_ref(),
171    MochiReq = mochiweb_request:new({remote, self(), Ref},
172                               NewOpts,
173                               MochiReq0:get(method),
174                               NewRawPath,
175                               MochiReq0:get(version),
176                               MochiReq0:get(headers)),
177    call_node(Node, couch_httpd, handle_request, [MochiReq]),
178    recv_loop(Ref, MochiReq0);
179handle_node_req(#httpd{path_parts=[_]}=Req) ->
180    chttpd:send_error(Req, {bad_request, <<"Incomplete path to _node request">>});
181handle_node_req(Req) ->
182    chttpd:send_error(Req, not_found).
183
184recv_loop(Ref, ReqResp) ->
185    receive
186        {Ref, Code, Headers, _Args, start_response} ->
187            recv_loop(Ref, ReqResp:start({Code, Headers}));
188        {Ref, Code, Headers, Len, start_response_length} ->
189            recv_loop(Ref, ReqResp:start_response_length({Code, Headers, Len}));
190        {Ref, Code, Headers, chunked, respond} ->
191            Resp = ReqResp:respond({Code, Headers, chunked}),
192            recv_loop(Ref, Resp);
193        {Ref, Code, Headers, Args, respond} ->
194            Resp = ReqResp:respond({Code, Headers, Args}),
195            {ok, Resp};
196        {Ref, send, Data} ->
197            ReqResp:send(Data),
198            {ok, ReqResp};
199        {Ref, chunk, <<>>} ->
200            ReqResp:write_chunk(<<>>),
201            {ok, ReqResp};
202        {Ref, chunk, Data} ->
203            ReqResp:write_chunk(Data),
204            recv_loop(Ref, ReqResp);
205        _Else ->
206            recv_loop(Ref, ReqResp)
207    end.
208
209call_node(Node0, Mod, Fun, Args) when is_binary(Node0) ->
210    Node1 = try
211                list_to_existing_atom(?b2l(Node0))
212            catch
213                error:badarg ->
214                    throw({not_found, <<"no such node: ", Node0/binary>>})
215            end,
216    call_node(Node1, Mod, Fun, Args);
217call_node(Node, Mod, Fun, Args) when is_atom(Node) ->
218    case rpc:call(Node, Mod, Fun, Args) of
219        {badrpc, nodedown} ->
220            Reason = ?l2b(io_lib:format("~s is down", [Node])),
221            throw({error, {nodedown, Reason}});
222        Else ->
223            Else
224    end.
225
226flush(Node, Req) ->
227    case couch_util:get_value("flush", chttpd:qs(Req)) of
228        "true" ->
229            call_node(Node, couch_stats_aggregator, flush, []);
230        _Else ->
231            ok
232    end.
233
234get_stats() ->
235    Other = erlang:memory(system) - lists:sum([X || {_,X} <-
236        erlang:memory([atom, code, binary, ets])]),
237    Memory = [{other, Other} | erlang:memory([atom, atom_used, processes,
238        processes_used, binary, code, ets])],
239    {NumberOfGCs, WordsReclaimed, _} = statistics(garbage_collection),
240    {{input, Input}, {output, Output}} = statistics(io),
241    {CF, CDU} = db_pid_stats(),
242    MessageQueues0 = [{couch_file, {CF}}, {couch_db_updater, {CDU}},
243        {couch_server, couch_server:aggregate_queue_len()}],
244    MessageQueues = MessageQueues0 ++ message_queues(registered()),
245    {SQ, DCQ} = run_queues(),
246    [
247        {uptime, couch_app:uptime() div 1000},
248        {memory, {Memory}},
249        {run_queue, SQ},
250        {run_queue_dirty_cpu, DCQ},
251        {ets_table_count, length(ets:all())},
252        {context_switches, element(1, statistics(context_switches))},
253        {reductions, element(1, statistics(reductions))},
254        {garbage_collection_count, NumberOfGCs},
255        {words_reclaimed, WordsReclaimed},
256        {io_input, Input},
257        {io_output, Output},
258        {os_proc_count, couch_proc_manager:get_proc_count()},
259        {stale_proc_count, couch_proc_manager:get_stale_proc_count()},
260        {process_count, erlang:system_info(process_count)},
261        {process_limit, erlang:system_info(process_limit)},
262        {message_queues, {MessageQueues}},
263        {internal_replication_jobs, mem3_sync:get_backlog()},
264        {distribution, {get_distribution_stats()}}
265    ].
266
267db_pid_stats() ->
268    {monitors, M} = process_info(whereis(couch_stats_process_tracker), monitors),
269    Candidates = [Pid || {process, Pid} <- M],
270    CouchFiles = db_pid_stats(couch_file, Candidates),
271    CouchDbUpdaters = db_pid_stats(couch_db_updater, Candidates),
272    {CouchFiles, CouchDbUpdaters}.
273
274db_pid_stats(Mod, Candidates) ->
275    Mailboxes = lists:foldl(
276        fun(Pid, Acc) ->
277            case process_info(Pid, [message_queue_len, dictionary]) of
278                undefined ->
279                    Acc;
280                PI ->
281                    Dictionary = proplists:get_value(dictionary, PI, []),
282                    case proplists:get_value('$initial_call', Dictionary) of
283                        {Mod, init, 1} ->
284                            case proplists:get_value(message_queue_len, PI) of
285                                undefined -> Acc;
286                                Len -> [Len|Acc]
287                            end;
288                        _  ->
289                            Acc
290                    end
291            end
292        end, [], Candidates
293    ),
294    format_pid_stats(Mailboxes).
295
296format_pid_stats([]) ->
297    [];
298format_pid_stats(Mailboxes) ->
299    Sorted = lists:sort(Mailboxes),
300    Count = length(Sorted),
301    [
302        {count, Count},
303        {min, hd(Sorted)},
304        {max, lists:nth(Count, Sorted)},
305        {'50', lists:nth(round(Count * 0.5), Sorted)},
306        {'90', lists:nth(round(Count * 0.9), Sorted)},
307        {'99', lists:nth(round(Count * 0.99), Sorted)}
308    ].
309
310get_distribution_stats() ->
311    lists:map(fun({Node, Socket}) ->
312        {ok, Stats} = inet:getstat(Socket),
313        {Node, {Stats}}
314    end, erlang:system_info(dist_ctrl)).
315
316message_queues(Registered) ->
317    lists:map(fun(Name) ->
318        Type = message_queue_len,
319        {Type, Length} = process_info(whereis(Name), Type),
320        {Name, Length}
321    end, Registered).
322
323%% Workaround for https://bugs.erlang.org/browse/ERL-1355
324run_queues() ->
325    case erlang:system_info(dirty_cpu_schedulers) > 0 of
326        false ->
327            {statistics(run_queue), 0};
328        true ->
329            [DCQ | SQs] = lists:reverse(statistics(run_queue_lengths)),
330            {lists:sum(SQs), DCQ}
331    end.
332
333version_tuple_to_str(Version) when is_tuple(Version) ->
334    List1 = tuple_to_list(Version),
335    IsZero = fun(N) -> N == 0 end,
336    List2 = lists:reverse(lists:dropwhile(IsZero, lists:reverse(List1))),
337    List3 = [erlang:integer_to_list(N) || N <- List2],
338    ?l2b(lists:join(".", List3)).
339