1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(custodian_util).
14
15-include_lib("mem3/include/mem3.hrl").
16-include_lib("couch/include/couch_db.hrl").
17
18-export([summary/0, report/0]).
19-export([ensure_dbs_exists/0]).
20
21% Old design doc which should be cleaned up
22-define(CUSTODIAN_ID, <<"_design/custodian">>).
23
24-record(state, {live, safe, callback, db, acc}).
25
26%% public functions.
27
28summary() ->
29    Dict0 = dict:from_list([{conflicted, 0}] ++
30        [{{live, N}, 0} || N <- lists:seq(0, cluster_n() - 1)] ++
31        [{{safe, N}, 0} || N <- lists:seq(0, cluster_n() - 1)]),
32    Fun = fun(_Id, _Range, {conflicted, _N}, Dict) ->
33                  dict:update_counter(conflicted, 1, Dict);
34             (_Id, _Range, Item, Dict) ->
35                  dict:update_counter(Item, 1, Dict)
36    end,
37    dict:to_list(fold_dbs(Dict0, Fun)).
38
39report() ->
40    Fun = fun(Id, _Range, {conflicted, N}, Acc) ->
41                  [{Id, {conflicted, N}} | Acc];
42             (Id, Range, Item, Acc) ->
43                  [{Id, Range, Item} | Acc]
44          end,
45    fold_dbs([], Fun).
46
47ensure_dbs_exists() ->
48    DbName = mem3_sync:shards_db(),
49    {ok, Db} = mem3_util:ensure_exists(DbName),
50    ensure_custodian_ddoc_is_deleted(Db),
51    {ok, Db}.
52
53%% private functions.
54
55fold_dbs(Acc, Fun) ->
56    Safe = maybe_redirect([node() | nodes()]),
57    Live = Safe -- maintenance_nodes(Safe),
58    {ok, Db} = ensure_dbs_exists(),
59    try
60        State0 = #state{live=Live, safe=Safe, callback=Fun, db=Db, acc=Acc},
61        {ok, State1} = couch_db:fold_docs(Db, fun fold_dbs1/2, State0, []),
62        State1#state.acc
63    after
64        couch_db:close(Db)
65    end.
66
67fold_dbs1(#full_doc_info{id = <<"_design/", _/binary>>}, Acc) ->
68    {ok, Acc};
69fold_dbs1(#full_doc_info{deleted=true}, Acc) ->
70    {ok, Acc};
71fold_dbs1(#full_doc_info{id = Id} = FDI, State) ->
72    InternalAcc = case count_conflicts(FDI) of
73        0 ->
74            State#state.acc;
75        ConflictCount ->
76            (State#state.callback)(Id, null, {conflicted, ConflictCount}, State#state.acc)
77    end,
78    fold_dbs(Id, load_shards(State#state.db, FDI), State#state{acc=InternalAcc}).
79
80
81fold_dbs(Id, Shards, State) ->
82    IsSafe = fun(#shard{node = N}) -> lists:member(N, State#state.safe) end,
83    IsLive = fun(#shard{node = N}) -> lists:member(N, State#state.live) end,
84    LiveShards = lists:filter(IsLive, Shards),
85    SafeShards = lists:filter(IsSafe, Shards),
86    TargetN = mem3_util:calculate_max_n(Shards),
87    Acc0 = State#state.acc,
88    Acc1 = case mem3_util:calculate_max_n(LiveShards) of
89        LiveN when LiveN < TargetN ->
90            LiveRanges = get_range_counts(LiveN, LiveShards, Shards),
91            lists:foldl(fun({Range, N}, FAcc) ->
92                (State#state.callback)(Id, Range, {live, N}, FAcc)
93            end, Acc0, LiveRanges);
94        _ ->
95            Acc0
96    end,
97    Acc2 = case mem3_util:calculate_max_n(SafeShards) of
98        SafeN when SafeN < TargetN ->
99            SafeRanges = get_range_counts(SafeN, SafeShards, Shards),
100            lists:foldl(fun({Range, N}, FAcc) ->
101                (State#state.callback)(Id, Range, {safe, N}, FAcc)
102            end, Acc1, SafeRanges);
103        _ ->
104           Acc1
105    end,
106    {ok, State#state{acc = Acc2}}.
107
108
109get_range_counts(MaxN, Shards, AllShards) ->
110    Ranges = ranges(Shards),
111    AllRanges = ranges(AllShards),
112
113    % Get a list of ranges that were used to fill the MaxN rings. Also return
114    % whatever was left (not part of the rings).
115    {UnusedRanges, UsedRanges} = get_n_rings(MaxN, Ranges, []),
116
117    % All the ranges that participated in filling the N rings will get
118    % their number of copies set to MaxN.
119    UsedCounts = update_counts(UsedRanges, #{}, 1, fun(_) -> MaxN end),
120
121    % Add ranges that were present but didn't get picked in the rings
122    PresentCounts = update_counts(UnusedRanges, UsedCounts, 1, fun(N) ->
123        max(N + 1, MaxN)
124    end),
125
126    % Handle shards that are not present at all. Mark these ranges as missing.
127    Missing = [R || R <- AllRanges, not lists:member(R, Ranges)],
128    RangeCounts = update_counts(Missing, PresentCounts, 0, fun(_) -> 0 end),
129
130    % Report only shards with counts =< MaxN
131    RangeCounts1 = maps:filter(fun(_, N) -> N =< MaxN end, RangeCounts),
132    lists:sort(maps:to_list(RangeCounts1)).
133
134
135update_counts(Ranges, Acc0, Init, UpdateFun) ->
136    lists:foldl(fun({B, E}, Acc) ->
137        maps:update_with({B, E}, UpdateFun, Init, Acc)
138    end, Acc0, Ranges).
139
140
141ranges(Shards) ->
142    lists:map(fun(S) -> [B, E] = mem3:range(S), {B, E} end, Shards).
143
144
145get_n_rings(N, Ranges, Rings) when N =< 0 ->
146    {Ranges, Rings};
147get_n_rings(N, Ranges, Rings) ->
148    Ring = mem3_util:get_ring(Ranges),
149    get_n_rings(N - 1, Ranges -- Ring, Rings ++ Ring).
150
151
152cluster_n() ->
153    config:get_integer("cluster", "n", 3).
154
155maintenance_nodes(Nodes) ->
156    {Modes, _} = rpc:multicall(Nodes, config, get, ["couchdb", "maintenance_mode"]),
157    [N || {N, Mode} <- lists:zip(Nodes, Modes), Mode =:= "true"].
158
159load_shards(Db, #full_doc_info{id = Id} = FDI) ->
160    case couch_db:open_doc(Db, FDI, [ejson_body]) of
161        {ok, #doc{body = {Props}}} ->
162            mem3_util:build_shards(Id, Props);
163        {not_found, _} ->
164            erlang:error(database_does_not_exist, ?b2l(Id))
165    end.
166
167maybe_redirect(Nodes) ->
168    maybe_redirect(Nodes, []).
169
170maybe_redirect([], Acc) ->
171    Acc;
172maybe_redirect([Node|Rest], Acc) ->
173    case config:get("mem3.redirects", atom_to_list(Node)) of
174        undefined ->
175            maybe_redirect(Rest, [Node|Acc]);
176        Redirect ->
177            maybe_redirect(Rest, [list_to_atom(Redirect)|Acc])
178    end.
179
180count_conflicts(#full_doc_info{rev_tree = T}) ->
181    Leafs = [1 || {#leaf{deleted=false}, _} <- couch_key_tree:get_all_leafs(T)],
182    length(Leafs) - 1.
183
184
185% Ensure the design doc which was added 3.2.0 is deleted as we switched to using a BDU
186% function instead. After a few releases this function could be removed as well
187%
188ensure_custodian_ddoc_is_deleted(Db) ->
189    case couch_db:open_doc(Db, ?CUSTODIAN_ID, [ejson_body]) of
190        {not_found, _Reason} ->
191            ok;
192        {ok, Doc} ->
193            DeletedDoc = Doc#doc{deleted = true, body = {[]}},
194            try couch_db:update_doc(Db, DeletedDoc, [?ADMIN_CTX]) of
195                {ok, _} ->
196                    LogMsg = "~p : deleted custodian ddoc ~s",
197                    couch_log:notice(LogMsg, [?MODULE, ?CUSTODIAN_ID]),
198                    ok
199            catch
200                conflict ->
201                    {ok, NewDb} = couch_db:reopen(Db),
202                    ensure_custodian_ddoc_is_deleted(NewDb)
203            end
204    end.
205
206
207-ifdef(TEST).
208-include_lib("eunit/include/eunit.hrl").
209
210
211get_range_counts_test_() ->
212    [?_assertEqual(Res, get_range_counts(N, Shards, AllShards)) || {N, Shards,
213            AllShards, Res} <- [
214        % No shards are present. There is a full range shard that would
215        % fit. Report that range as missing.
216        {0, [], [full()], [{{0, ?RING_END}, 0}]},
217
218        % Can't complete the ring. But would complete it if had the
219        % {2, ?RING_END} interval available.
220        {0, [sh(0, 1)], [sh(0, 1), sh(2, ?RING_END)], [{{2, ?RING_END}, 0}]},
221
222        % Can complete the ring only 1 time. Report that range as the
223        % one available with a count of 1
224        {1, [full()], [full(), full()], [{{0, ?RING_END}, 1}]},
225
226        % Can complete the ring only 1 time with a full range shard, but
227        % there is also {2, ?RING_END} that would complete another the
228        % the ring as well if {0, 1} was present.
229        {1, [sh(2, ?RING_END), full()], [sh(0, 1), sh(2, ?RING_END), full()],
230            [
231                {{0, 1}, 0},
232                {{0, ?RING_END}, 1},
233                {{2, ?RING_END}, 1}
234            ]
235        },
236
237        % Can complete the ring 2 times [{0, 2},{3, ?RING_END)] and full(),
238        % and there is remnant of a 5, 9 range that would comlete the ring
239        % as well if {0, 4} and {10, ?RING_END} were present. So report
240        {2, [sh(0, 2), sh(3, ?RING_END), sh(5, 9), full()], [sh(0, 2), sh(3,
241            ?RING_END), full(), sh(0, 4), sh(5, 9), sh(10, ?RING_END)],
242            [
243                {{0, 2}, 1},
244                {{0, 4}, 0},
245                {{0, ?RING_END}, 1},
246                {{3, ?RING_END}, 1},
247                {{5, 9}, 1},
248                {{10, ?RING_END}, 0}
249            ]
250        }
251    ]].
252
253
254full() ->
255    #shard{range = [0, ?RING_END]}.
256
257
258sh(B, E) ->
259    #shard{range = [B, E]}.
260
261-endif.
262