1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(mem3_reshard_api).
14
15-export([
16    create_jobs/5,
17    get_jobs/0,
18    get_job/1,
19    get_summary/0,
20    resume_job/1,
21    stop_job/2,
22    start_shard_splitting/0,
23    stop_shard_splitting/1,
24    get_shard_splitting_state/0
25]).
26
27
28create_jobs(Node, Shard, Db, Range, split) ->
29    lists:map(fun(S) ->
30        N = mem3:node(S),
31        Name = mem3:name(S),
32        case rpc:call(N, mem3_reshard, start_split_job, [Name]) of
33            {badrpc, Error} ->
34                {error, Error, N, Name};
35            {ok, JobId} ->
36                {ok, JobId, N, Name};
37            {error, Error} ->
38                {error, Error, N, Name}
39        end
40    end, pick_shards(Node, Shard, Db, Range)).
41
42
43get_jobs() ->
44    Nodes = mem3_util:live_nodes(),
45    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []),
46    lists:flatten(Replies).
47
48
49get_job(JobId) ->
50    Nodes = mem3_util:live_nodes(),
51    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]),
52    case [JobInfo || {ok, JobInfo} <- Replies] of
53        [JobInfo | _] ->
54            {ok, JobInfo};
55        [] ->
56            {error, not_found}
57    end.
58
59
60get_summary() ->
61    Nodes = mem3_util:live_nodes(),
62    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
63    Stats0 = #{running => 0, total => 0, completed => 0, failed => 0,
64        stopped => 0},
65    StatsF = lists:foldl(fun({Res}, Stats) ->
66        maps:map(fun(Stat, OldVal) ->
67            OldVal + couch_util:get_value(Stat, Res, 0)
68        end, Stats)
69    end, Stats0, Replies),
70    {State, Reason} = state_and_reason(Replies),
71    StateReasonProps = [{state, State}, {state_reason, Reason}],
72    {StateReasonProps ++ lists:sort(maps:to_list(StatsF))}.
73
74
75resume_job(JobId) ->
76    Nodes = mem3_util:live_nodes(),
77    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, resume_job,
78        [JobId]),
79    WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
80    case lists:usort(WithoutNotFound) of
81        [ok] ->
82            ok;
83        [{error, Error} | _] ->
84            {error, {[{error, couch_util:to_binary(Error)}]}};
85        [] ->
86            {error, not_found}
87    end.
88
89
90stop_job(JobId, Reason) ->
91    Nodes = mem3_util:live_nodes(),
92    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, stop_job,
93        [JobId, Reason]),
94    WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}],
95    case lists:usort(WithoutNotFound) of
96        [ok] ->
97            ok;
98        [{error, Error} | _] ->
99            {error, {[{error, couch_util:to_binary(Error)}]}};
100        [] ->
101            {error, not_found}
102    end.
103
104
105start_shard_splitting() ->
106    {Replies, _Bad} = rpc:multicall(mem3_reshard, start, []),
107    case lists:usort(lists:flatten(Replies)) of
108        [ok] ->
109            {ok, {[{ok, true}]}};
110        [Error | _] ->
111            {error, {[{error, couch_util:to_binary(Error)}]}}
112    end.
113
114
115stop_shard_splitting(Reason) ->
116    {Replies, _Bad} = rpc:multicall(mem3_reshard, stop, [Reason]),
117    case lists:usort(lists:flatten(Replies)) of
118        [ok] ->
119            {ok, {[{ok, true}]}};
120        [Error | _] ->
121            {error, {[{error, couch_util:to_binary(Error)}]}}
122    end.
123
124
125get_shard_splitting_state() ->
126    Nodes = mem3_util:live_nodes(),
127    {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []),
128    state_and_reason(Replies).
129
130
131state_and_reason(StateReplies) ->
132    AccF = lists:foldl(fun({ResProps}, Acc) ->
133        Reason = get_reason(ResProps),
134        case couch_util:get_value(state, ResProps) of
135            <<"running">> -> orddict:append(running, Reason, Acc);
136            <<"stopped">> -> orddict:append(stopped, Reason, Acc);
137            undefined -> Acc
138        end
139    end, orddict:from_list([{running, []}, {stopped, []}]), StateReplies),
140    Running = orddict:fetch(running, AccF),
141    case length(Running) > 0 of
142        true ->
143            Reason = pick_reason(Running),
144            {running, Reason};
145        false ->
146            Reason = pick_reason(orddict:fetch(stopped, AccF)),
147            {stopped, Reason}
148    end.
149
150
151pick_reason(Reasons) ->
152    Reasons1 = lists:usort(Reasons),
153    Reasons2 = [R || R <- Reasons1, R =/= undefined],
154    case Reasons2 of
155        [] -> null;
156        [R1 | _] -> R1
157    end.
158
159
160get_reason(StateProps) when is_list(StateProps) ->
161    case couch_util:get_value(state_info, StateProps) of
162        [] -> undefined;
163        undefined -> undefined;
164        {SInfoProps} -> couch_util:get_value(reason, SInfoProps)
165    end.
166
167
168pick_shards(undefined, undefined, Db, undefined) when is_binary(Db) ->
169    check_node_required(),
170    check_range_required(),
171    mem3:shards(Db);
172
173pick_shards(Node, undefined, Db, undefined) when is_atom(Node),
174        is_binary(Db) ->
175    check_range_required(),
176    [S || S <- mem3:shards(Db), mem3:node(S) == Node];
177
178pick_shards(undefined, undefined, Db, [_B, _E] = Range) when  is_binary(Db) ->
179    check_node_required(),
180    [S || S <- mem3:shards(Db), mem3:range(S) == Range];
181
182pick_shards(Node, undefined, Db, [_B, _E] = Range) when is_atom(Node),
183        is_binary(Db) ->
184    [S || S <- mem3:shards(Db), mem3:node(S) == Node, mem3:range(S) == Range];
185
186pick_shards(undefined, Shard, undefined, undefined) when is_binary(Shard) ->
187    check_node_required(),
188    Db = mem3:dbname(Shard),
189    [S || S <- mem3:shards(Db), mem3:name(S) == Shard];
190
191pick_shards(Node, Shard, undefined, undefined) when is_atom(Node),
192        is_binary(Shard) ->
193    Db = mem3:dbname(Shard),
194    [S || S <- mem3:shards(Db), mem3:name(S) == Shard, mem3:node(S) == Node];
195
196pick_shards(_, undefined, undefined, _) ->
197    throw({bad_request, <<"Must specify at least `db` or `shard`">>});
198
199pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) ->
200    throw({bad_request, <<"`db` and `shard` are mutually exclusive">>}).
201
202
203check_node_required() ->
204    case config:get_boolean("reshard", "require_node_param", false) of
205        true ->
206            throw({bad_request, <<"`node` prameter is required">>});
207        false ->
208            ok
209    end.
210
211check_range_required() ->
212    case config:get_boolean("reshard", "require_range_param", false) of
213        true ->
214            throw({bad_request, <<"`range` prameter is required">>});
215        false ->
216            ok
217    end.
218