1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(mem3_reshard_api). 14 15-export([ 16 create_jobs/5, 17 get_jobs/0, 18 get_job/1, 19 get_summary/0, 20 resume_job/1, 21 stop_job/2, 22 start_shard_splitting/0, 23 stop_shard_splitting/1, 24 get_shard_splitting_state/0 25]). 26 27 28create_jobs(Node, Shard, Db, Range, split) -> 29 lists:map(fun(S) -> 30 N = mem3:node(S), 31 Name = mem3:name(S), 32 case rpc:call(N, mem3_reshard, start_split_job, [Name]) of 33 {badrpc, Error} -> 34 {error, Error, N, Name}; 35 {ok, JobId} -> 36 {ok, JobId, N, Name}; 37 {error, Error} -> 38 {error, Error, N, Name} 39 end 40 end, pick_shards(Node, Shard, Db, Range)). 41 42 43get_jobs() -> 44 Nodes = mem3_util:live_nodes(), 45 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, jobs, []), 46 lists:flatten(Replies). 47 48 49get_job(JobId) -> 50 Nodes = mem3_util:live_nodes(), 51 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, job, [JobId]), 52 case [JobInfo || {ok, JobInfo} <- Replies] of 53 [JobInfo | _] -> 54 {ok, JobInfo}; 55 [] -> 56 {error, not_found} 57 end. 58 59 60get_summary() -> 61 Nodes = mem3_util:live_nodes(), 62 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []), 63 Stats0 = #{running => 0, total => 0, completed => 0, failed => 0, 64 stopped => 0}, 65 StatsF = lists:foldl(fun({Res}, Stats) -> 66 maps:map(fun(Stat, OldVal) -> 67 OldVal + couch_util:get_value(Stat, Res, 0) 68 end, Stats) 69 end, Stats0, Replies), 70 {State, Reason} = state_and_reason(Replies), 71 StateReasonProps = [{state, State}, {state_reason, Reason}], 72 {StateReasonProps ++ lists:sort(maps:to_list(StatsF))}. 73 74 75resume_job(JobId) -> 76 Nodes = mem3_util:live_nodes(), 77 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, resume_job, 78 [JobId]), 79 WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], 80 case lists:usort(WithoutNotFound) of 81 [ok] -> 82 ok; 83 [{error, Error} | _] -> 84 {error, {[{error, couch_util:to_binary(Error)}]}}; 85 [] -> 86 {error, not_found} 87 end. 88 89 90stop_job(JobId, Reason) -> 91 Nodes = mem3_util:live_nodes(), 92 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, stop_job, 93 [JobId, Reason]), 94 WithoutNotFound = [R || R <- Replies, R =/= {error, not_found}], 95 case lists:usort(WithoutNotFound) of 96 [ok] -> 97 ok; 98 [{error, Error} | _] -> 99 {error, {[{error, couch_util:to_binary(Error)}]}}; 100 [] -> 101 {error, not_found} 102 end. 103 104 105start_shard_splitting() -> 106 {Replies, _Bad} = rpc:multicall(mem3_reshard, start, []), 107 case lists:usort(lists:flatten(Replies)) of 108 [ok] -> 109 {ok, {[{ok, true}]}}; 110 [Error | _] -> 111 {error, {[{error, couch_util:to_binary(Error)}]}} 112 end. 113 114 115stop_shard_splitting(Reason) -> 116 {Replies, _Bad} = rpc:multicall(mem3_reshard, stop, [Reason]), 117 case lists:usort(lists:flatten(Replies)) of 118 [ok] -> 119 {ok, {[{ok, true}]}}; 120 [Error | _] -> 121 {error, {[{error, couch_util:to_binary(Error)}]}} 122 end. 123 124 125get_shard_splitting_state() -> 126 Nodes = mem3_util:live_nodes(), 127 {Replies, _Bad} = rpc:multicall(Nodes, mem3_reshard, get_state, []), 128 state_and_reason(Replies). 129 130 131state_and_reason(StateReplies) -> 132 AccF = lists:foldl(fun({ResProps}, Acc) -> 133 Reason = get_reason(ResProps), 134 case couch_util:get_value(state, ResProps) of 135 <<"running">> -> orddict:append(running, Reason, Acc); 136 <<"stopped">> -> orddict:append(stopped, Reason, Acc); 137 undefined -> Acc 138 end 139 end, orddict:from_list([{running, []}, {stopped, []}]), StateReplies), 140 Running = orddict:fetch(running, AccF), 141 case length(Running) > 0 of 142 true -> 143 Reason = pick_reason(Running), 144 {running, Reason}; 145 false -> 146 Reason = pick_reason(orddict:fetch(stopped, AccF)), 147 {stopped, Reason} 148 end. 149 150 151pick_reason(Reasons) -> 152 Reasons1 = lists:usort(Reasons), 153 Reasons2 = [R || R <- Reasons1, R =/= undefined], 154 case Reasons2 of 155 [] -> null; 156 [R1 | _] -> R1 157 end. 158 159 160get_reason(StateProps) when is_list(StateProps) -> 161 case couch_util:get_value(state_info, StateProps) of 162 [] -> undefined; 163 undefined -> undefined; 164 {SInfoProps} -> couch_util:get_value(reason, SInfoProps) 165 end. 166 167 168pick_shards(undefined, undefined, Db, undefined) when is_binary(Db) -> 169 check_node_required(), 170 check_range_required(), 171 mem3:shards(Db); 172 173pick_shards(Node, undefined, Db, undefined) when is_atom(Node), 174 is_binary(Db) -> 175 check_range_required(), 176 [S || S <- mem3:shards(Db), mem3:node(S) == Node]; 177 178pick_shards(undefined, undefined, Db, [_B, _E] = Range) when is_binary(Db) -> 179 check_node_required(), 180 [S || S <- mem3:shards(Db), mem3:range(S) == Range]; 181 182pick_shards(Node, undefined, Db, [_B, _E] = Range) when is_atom(Node), 183 is_binary(Db) -> 184 [S || S <- mem3:shards(Db), mem3:node(S) == Node, mem3:range(S) == Range]; 185 186pick_shards(undefined, Shard, undefined, undefined) when is_binary(Shard) -> 187 check_node_required(), 188 Db = mem3:dbname(Shard), 189 [S || S <- mem3:shards(Db), mem3:name(S) == Shard]; 190 191pick_shards(Node, Shard, undefined, undefined) when is_atom(Node), 192 is_binary(Shard) -> 193 Db = mem3:dbname(Shard), 194 [S || S <- mem3:shards(Db), mem3:name(S) == Shard, mem3:node(S) == Node]; 195 196pick_shards(_, undefined, undefined, _) -> 197 throw({bad_request, <<"Must specify at least `db` or `shard`">>}); 198 199pick_shards(_, Db, Shard, _) when is_binary(Db), is_binary(Shard) -> 200 throw({bad_request, <<"`db` and `shard` are mutually exclusive">>}). 201 202 203check_node_required() -> 204 case config:get_boolean("reshard", "require_node_param", false) of 205 true -> 206 throw({bad_request, <<"`node` prameter is required">>}); 207 false -> 208 ok 209 end. 210 211check_range_required() -> 212 case config:get_boolean("reshard", "require_range_param", false) of 213 true -> 214 throw({bad_request, <<"`range` prameter is required">>}); 215 false -> 216 ok 217 end. 218