1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(custodian_util). 14 15-include_lib("mem3/include/mem3.hrl"). 16-include_lib("couch/include/couch_db.hrl"). 17 18-export([summary/0, report/0]). 19-export([ensure_dbs_exists/0]). 20 21% Old design doc which should be cleaned up 22-define(CUSTODIAN_ID, <<"_design/custodian">>). 23 24-record(state, {live, safe, callback, db, acc}). 25 26%% public functions. 27 28summary() -> 29 Dict0 = dict:from_list([{conflicted, 0}] ++ 30 [{{live, N}, 0} || N <- lists:seq(0, cluster_n() - 1)] ++ 31 [{{safe, N}, 0} || N <- lists:seq(0, cluster_n() - 1)]), 32 Fun = fun(_Id, _Range, {conflicted, _N}, Dict) -> 33 dict:update_counter(conflicted, 1, Dict); 34 (_Id, _Range, Item, Dict) -> 35 dict:update_counter(Item, 1, Dict) 36 end, 37 dict:to_list(fold_dbs(Dict0, Fun)). 38 39report() -> 40 Fun = fun(Id, _Range, {conflicted, N}, Acc) -> 41 [{Id, {conflicted, N}} | Acc]; 42 (Id, Range, Item, Acc) -> 43 [{Id, Range, Item} | Acc] 44 end, 45 fold_dbs([], Fun). 46 47ensure_dbs_exists() -> 48 DbName = mem3_sync:shards_db(), 49 {ok, Db} = mem3_util:ensure_exists(DbName), 50 ensure_custodian_ddoc_is_deleted(Db), 51 {ok, Db}. 52 53%% private functions. 54 55fold_dbs(Acc, Fun) -> 56 Safe = maybe_redirect([node() | nodes()]), 57 Live = Safe -- maintenance_nodes(Safe), 58 {ok, Db} = ensure_dbs_exists(), 59 try 60 State0 = #state{live=Live, safe=Safe, callback=Fun, db=Db, acc=Acc}, 61 {ok, State1} = couch_db:fold_docs(Db, fun fold_dbs1/2, State0, []), 62 State1#state.acc 63 after 64 couch_db:close(Db) 65 end. 66 67fold_dbs1(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) -> 68 {ok, Acc}; 69fold_dbs1(#full_doc_info{deleted=true}, Acc) -> 70 {ok, Acc}; 71fold_dbs1(#full_doc_info{id = Id} = FDI, State) -> 72 InternalAcc = case count_conflicts(FDI) of 73 0 -> 74 State#state.acc; 75 ConflictCount -> 76 (State#state.callback)(Id, null, {conflicted, ConflictCount}, State#state.acc) 77 end, 78 fold_dbs(Id, load_shards(State#state.db, FDI), State#state{acc=InternalAcc}). 79 80 81fold_dbs(Id, Shards, State) -> 82 IsSafe = fun(#shard{node = N}) -> lists:member(N, State#state.safe) end, 83 IsLive = fun(#shard{node = N}) -> lists:member(N, State#state.live) end, 84 LiveShards = lists:filter(IsLive, Shards), 85 SafeShards = lists:filter(IsSafe, Shards), 86 TargetN = mem3_util:calculate_max_n(Shards), 87 Acc0 = State#state.acc, 88 Acc1 = case mem3_util:calculate_max_n(LiveShards) of 89 LiveN when LiveN < TargetN -> 90 LiveRanges = get_range_counts(LiveN, LiveShards, Shards), 91 lists:foldl(fun({Range, N}, FAcc) -> 92 (State#state.callback)(Id, Range, {live, N}, FAcc) 93 end, Acc0, LiveRanges); 94 _ -> 95 Acc0 96 end, 97 Acc2 = case mem3_util:calculate_max_n(SafeShards) of 98 SafeN when SafeN < TargetN -> 99 SafeRanges = get_range_counts(SafeN, SafeShards, Shards), 100 lists:foldl(fun({Range, N}, FAcc) -> 101 (State#state.callback)(Id, Range, {safe, N}, FAcc) 102 end, Acc1, SafeRanges); 103 _ -> 104 Acc1 105 end, 106 {ok, State#state{acc = Acc2}}. 107 108 109get_range_counts(MaxN, Shards, AllShards) -> 110 Ranges = ranges(Shards), 111 AllRanges = ranges(AllShards), 112 113 % Get a list of ranges that were used to fill the MaxN rings. Also return 114 % whatever was left (not part of the rings). 115 {UnusedRanges, UsedRanges} = get_n_rings(MaxN, Ranges, []), 116 117 % All the ranges that participated in filling the N rings will get 118 % their number of copies set to MaxN. 119 UsedCounts = update_counts(UsedRanges, #{}, 1, fun(_) -> MaxN end), 120 121 % Add ranges that were present but didn't get picked in the rings 122 PresentCounts = update_counts(UnusedRanges, UsedCounts, 1, fun(N) -> 123 max(N + 1, MaxN) 124 end), 125 126 % Handle shards that are not present at all. Mark these ranges as missing. 127 Missing = [R || R <- AllRanges, not lists:member(R, Ranges)], 128 RangeCounts = update_counts(Missing, PresentCounts, 0, fun(_) -> 0 end), 129 130 % Report only shards with counts =< MaxN 131 RangeCounts1 = maps:filter(fun(_, N) -> N =< MaxN end, RangeCounts), 132 lists:sort(maps:to_list(RangeCounts1)). 133 134 135update_counts(Ranges, Acc0, Init, UpdateFun) -> 136 lists:foldl(fun({B, E}, Acc) -> 137 maps:update_with({B, E}, UpdateFun, Init, Acc) 138 end, Acc0, Ranges). 139 140 141ranges(Shards) -> 142 lists:map(fun(S) -> [B, E] = mem3:range(S), {B, E} end, Shards). 143 144 145get_n_rings(N, Ranges, Rings) when N =< 0 -> 146 {Ranges, Rings}; 147get_n_rings(N, Ranges, Rings) -> 148 Ring = mem3_util:get_ring(Ranges), 149 get_n_rings(N - 1, Ranges -- Ring, Rings ++ Ring). 150 151 152cluster_n() -> 153 config:get_integer("cluster", "n", 3). 154 155maintenance_nodes(Nodes) -> 156 {Modes, _} = rpc:multicall(Nodes, config, get, ["couchdb", "maintenance_mode"]), 157 [N || {N, Mode} <- lists:zip(Nodes, Modes), Mode =:= "true"]. 158 159load_shards(Db, #full_doc_info{id = Id} = FDI) -> 160 case couch_db:open_doc(Db, FDI, [ejson_body]) of 161 {ok, #doc{body = {Props}}} -> 162 mem3_util:build_shards(Id, Props); 163 {not_found, _} -> 164 erlang:error(database_does_not_exist, ?b2l(Id)) 165 end. 166 167maybe_redirect(Nodes) -> 168 maybe_redirect(Nodes, []). 169 170maybe_redirect([], Acc) -> 171 Acc; 172maybe_redirect([Node|Rest], Acc) -> 173 case config:get("mem3.redirects", atom_to_list(Node)) of 174 undefined -> 175 maybe_redirect(Rest, [Node|Acc]); 176 Redirect -> 177 maybe_redirect(Rest, [list_to_atom(Redirect)|Acc]) 178 end. 179 180count_conflicts(#full_doc_info{rev_tree = T}) -> 181 Leafs = [1 || {#leaf{deleted=false}, _} <- couch_key_tree:get_all_leafs(T)], 182 length(Leafs) - 1. 183 184 185% Ensure the design doc which was added 3.2.0 is deleted as we switched to using a BDU 186% function instead. After a few releases this function could be removed as well 187% 188ensure_custodian_ddoc_is_deleted(Db) -> 189 case couch_db:open_doc(Db, ?CUSTODIAN_ID, [ejson_body]) of 190 {not_found, _Reason} -> 191 ok; 192 {ok, Doc} -> 193 DeletedDoc = Doc#doc{deleted = true, body = {[]}}, 194 try couch_db:update_doc(Db, DeletedDoc, [?ADMIN_CTX]) of 195 {ok, _} -> 196 LogMsg = "~p : deleted custodian ddoc ~s", 197 couch_log:notice(LogMsg, [?MODULE, ?CUSTODIAN_ID]), 198 ok 199 catch 200 conflict -> 201 {ok, NewDb} = couch_db:reopen(Db), 202 ensure_custodian_ddoc_is_deleted(NewDb) 203 end 204 end. 205 206 207-ifdef(TEST). 208-include_lib("eunit/include/eunit.hrl"). 209 210 211get_range_counts_test_() -> 212 [?_assertEqual(Res, get_range_counts(N, Shards, AllShards)) || {N, Shards, 213 AllShards, Res} <- [ 214 % No shards are present. There is a full range shard that would 215 % fit. Report that range as missing. 216 {0, [], [full()], [{{0, ?RING_END}, 0}]}, 217 218 % Can't complete the ring. But would complete it if had the 219 % {2, ?RING_END} interval available. 220 {0, [sh(0, 1)], [sh(0, 1), sh(2, ?RING_END)], [{{2, ?RING_END}, 0}]}, 221 222 % Can complete the ring only 1 time. Report that range as the 223 % one available with a count of 1 224 {1, [full()], [full(), full()], [{{0, ?RING_END}, 1}]}, 225 226 % Can complete the ring only 1 time with a full range shard, but 227 % there is also {2, ?RING_END} that would complete another the 228 % the ring as well if {0, 1} was present. 229 {1, [sh(2, ?RING_END), full()], [sh(0, 1), sh(2, ?RING_END), full()], 230 [ 231 {{0, 1}, 0}, 232 {{0, ?RING_END}, 1}, 233 {{2, ?RING_END}, 1} 234 ] 235 }, 236 237 % Can complete the ring 2 times [{0, 2},{3, ?RING_END)] and full(), 238 % and there is remnant of a 5, 9 range that would comlete the ring 239 % as well if {0, 4} and {10, ?RING_END} were present. So report 240 {2, [sh(0, 2), sh(3, ?RING_END), sh(5, 9), full()], [sh(0, 2), sh(3, 241 ?RING_END), full(), sh(0, 4), sh(5, 9), sh(10, ?RING_END)], 242 [ 243 {{0, 2}, 1}, 244 {{0, 4}, 0}, 245 {{0, ?RING_END}, 1}, 246 {{3, ?RING_END}, 1}, 247 {{5, 9}, 1}, 248 {{10, ?RING_END}, 0} 249 ] 250 } 251 ]]. 252 253 254full() -> 255 #shard{range = [0, ?RING_END]}. 256 257 258sh(B, E) -> 259 #shard{range = [B, E]}. 260 261-endif. 262