1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(mem3_reshard_api_test).
14
15
16-include_lib("couch/include/couch_eunit.hrl").
17-include_lib("couch/include/couch_db.hrl").
18-include_lib("mem3/src/mem3_reshard.hrl").
19
20
21-define(USER, "mem3_reshard_api_test_admin").
22-define(PASS, "pass").
23-define(AUTH, {basic_auth, {?USER, ?PASS}}).
24-define(JSON, {"Content-Type", "application/json"}).
25-define(RESHARD, "_reshard/").
26-define(JOBS, "_reshard/jobs/").
27-define(STATE, "_reshard/state").
28-define(ID, <<"id">>).
29-define(OK, <<"ok">>).
30-define(TIMEOUT, 60). % seconds
31
32
33setup() ->
34    Hashed = couch_passwords:hash_admin_password(?PASS),
35    ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false),
36    Addr = config:get("chttpd", "bind_address", "127.0.0.1"),
37    Port = mochiweb_socket_server:get(chttpd, port),
38    Url = lists:concat(["http://", Addr, ":", Port, "/"]),
39    {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()},
40    create_db(Url, Db1, "?q=1&n=1"),
41    create_db(Url, Db2, "?q=1&n=1"),
42    create_db(Url, Db3, "?q=2&n=1"),
43    {Url, {Db1, Db2, Db3}}.
44
45
46teardown({Url, {Db1, Db2, Db3}}) ->
47    mem3_reshard:reset_state(),
48    application:unset_env(mem3, reshard_disabled),
49    delete_db(Url, Db1),
50    delete_db(Url, Db2),
51    delete_db(Url, Db3),
52    ok = config:delete("reshard", "max_jobs", _Persist=false),
53    ok = config:delete("reshard", "require_node_param", _Persist=false),
54    ok = config:delete("reshard", "require_range_param", _Persist=false),
55    ok = config:delete("admins", ?USER, _Persist=false),
56    meck:unload().
57
58
59start_couch() ->
60    test_util:start_couch([mem3, chttpd]).
61
62
63stop_couch(Ctx) ->
64    test_util:stop_couch(Ctx).
65
66
67mem3_reshard_api_test_() ->
68    {
69        "mem3 shard split api tests",
70        {
71            setup,
72            fun start_couch/0, fun stop_couch/1,
73            {
74                foreach,
75                fun setup/0, fun teardown/1,
76                [
77                    fun basics/1,
78                    fun create_job_basic/1,
79                    fun create_two_jobs/1,
80                    fun create_multiple_jobs_from_one_post/1,
81                    fun start_stop_cluster_basic/1,
82                    fun test_disabled/1,
83                    fun start_stop_cluster_with_a_job/1,
84                    fun individual_job_start_stop/1,
85                    fun individual_job_stop_when_cluster_stopped/1,
86                    fun create_job_with_invalid_arguments/1,
87                    fun create_job_with_db/1,
88                    fun create_job_with_shard_name/1,
89                    fun completed_job_handling/1,
90                    fun handle_db_deletion_in_initial_copy/1,
91                    fun handle_db_deletion_in_topoff1/1,
92                    fun handle_db_deletion_in_copy_local_docs/1,
93                    fun handle_db_deletion_in_build_indices/1,
94                    fun handle_db_deletion_in_update_shard_map/1,
95                    fun handle_db_deletion_in_wait_source_close/1,
96                    fun recover_in_initial_copy/1,
97                    fun recover_in_topoff1/1,
98                    fun recover_in_copy_local_docs/1,
99                    fun recover_in_build_indices/1,
100                    fun recover_in_update_shard_map/1,
101                    fun recover_in_wait_source_close/1,
102                    fun recover_in_topoff3/1,
103                    fun recover_in_source_delete/1,
104                    fun check_max_jobs/1,
105                    fun check_node_and_range_required_params/1,
106                    fun cleanup_completed_jobs/1
107                ]
108            }
109        }
110    }.
111
112
113basics({Top, _}) ->
114    {timeout, ?TIMEOUT, ?_test(begin
115        % GET /_reshard
116        ?assertMatch({200, #{
117            <<"state">> := <<"running">>,
118            <<"state_reason">> := null,
119            <<"completed">> := 0,
120            <<"failed">> := 0,
121            <<"running">> := 0,
122            <<"stopped">> := 0,
123            <<"total">> := 0
124        }}, req(get, Top ++ ?RESHARD)),
125
126        % GET _reshard/state
127        ?assertMatch({200, #{<<"state">> := <<"running">>}},
128            req(get, Top ++ ?STATE)),
129
130        % GET _reshard/jobs
131        ?assertMatch({200, #{
132            <<"jobs">> := [],
133            <<"offset">> := 0,
134            <<"total_rows">> := 0
135        }}, req(get, Top ++ ?JOBS)),
136
137        % Some invalid paths and methods
138        ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")),
139        ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})),
140        ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope}))
141    end)}.
142
143
144create_job_basic({Top, {Db1, _, _}}) ->
145    {timeout, ?TIMEOUT, ?_test(begin
146        % POST /_reshard/jobs
147        {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
148        ?assertEqual(201, C1),
149        ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> :=  S}]
150            when is_binary(J) andalso is_binary(S), R1),
151        [#{?ID := Id, <<"shard">> := Shard}] = R1,
152
153        % GET /_reshard/jobs
154        ?assertMatch({200, #{
155            <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}],
156            <<"offset">> := 0,
157            <<"total_rows">> := 1
158        }}, req(get, Top ++ ?JOBS)),
159
160        % GET /_reshard/job/$jobid
161        {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)),
162        ?assertEqual(200, C2),
163        ThisNode = atom_to_binary(node(), utf8),
164        ?assertMatch(#{?ID := Id}, R2),
165        ?assertMatch(#{<<"type">> := <<"split">>}, R2),
166        ?assertMatch(#{<<"source">> := Shard}, R2),
167        ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2),
168        ?assertMatch(#{<<"node">> := ThisNode}, R2),
169        ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2),
170        ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2),
171        ?assertMatch(#{<<"state_info">> := #{}}, R2),
172        ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2),
173
174        % GET  /_reshard/job/$jobid/state
175        ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}}
176            when is_binary(S) andalso (is_binary(R) orelse R =:= null),
177            req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")),
178
179        % GET /_reshard
180        ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}},
181            req(get, Top ++ ?RESHARD)),
182
183        % DELETE /_reshard/jobs/$jobid
184        ?assertMatch({200, #{?OK := true}},
185            req(delete, Top ++ ?JOBS ++ ?b2l(Id))),
186
187        % GET _reshard/jobs
188        ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}},
189            req(get, Top ++ ?JOBS)),
190
191        % GET /_reshard/job/$jobid should be a 404
192        ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))),
193
194        % DELETE /_reshard/jobs/$jobid  should be a 404 as well
195        ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id)))
196    end)}.
197
198
199create_two_jobs({Top, {Db1, Db2, _}}) ->
200    {timeout, ?TIMEOUT, ?_test(begin
201        Jobs = Top ++ ?JOBS,
202
203        ?assertMatch({201, [#{?OK := true}]},
204            req(post, Jobs, #{type => split, db => Db1})),
205        ?assertMatch({201, [#{?OK := true}]},
206            req(post, Jobs, #{type => split, db => Db2})),
207
208        ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)),
209
210        ?assertMatch({200, #{
211            <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}],
212            <<"offset">> := 0,
213            <<"total_rows">> := 2
214        }} when Id1 =/= Id2, req(get, Jobs)),
215
216        {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs),
217
218        {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)),
219        ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)),
220        {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)),
221        ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD))
222    end)}.
223
224
225create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) ->
226     {timeout, ?TIMEOUT, ?_test(begin
227        Jobs = Top ++ ?JOBS,
228        {C1, R1} = req(post, Jobs, #{type => split, db => Db3}),
229        ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}),
230        ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD))
231    end)}.
232
233
234start_stop_cluster_basic({Top, _}) ->
235    {timeout, ?TIMEOUT, ?_test(begin
236        Url = Top ++ ?STATE,
237
238        ?assertMatch({200, #{
239            <<"state">> := <<"running">>,
240            <<"reason">> := null
241        }}, req(get, Url)),
242
243        ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
244        ?assertMatch({200, #{
245            <<"state">> := <<"stopped">>,
246            <<"reason">> := R
247        }} when is_binary(R), req(get, Url)),
248
249        ?assertMatch({200, _}, req(put, Url, #{state => running})),
250
251        % Make sure the reason shows in the state GET request
252        Reason = <<"somereason">>,
253        ?assertMatch({200, _}, req(put, Url, #{state => stopped,
254            reason => Reason})),
255        ?assertMatch({200, #{<<"state">> := <<"stopped">>,
256            <<"reason">> := Reason}}, req(get, Url)),
257
258        % Top level summary also shows the reason
259        ?assertMatch({200, #{
260            <<"state">> := <<"stopped">>,
261            <<"state_reason">> := Reason
262        }}, req(get, Top ++ ?RESHARD)),
263        ?assertMatch({200, _}, req(put, Url, #{state => running})),
264        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url))
265    end)}.
266
267
268test_disabled({Top, _}) ->
269    {timeout, ?TIMEOUT, ?_test(begin
270        application:set_env(mem3, reshard_disabled, true),
271        ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)),
272        ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})),
273
274        application:unset_env(mem3, reshard_disabled),
275        ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)),
276        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running}))
277    end)}.
278
279
280start_stop_cluster_with_a_job({Top, {Db1, _, _}}) ->
281    {timeout, ?TIMEOUT, ?_test(begin
282        Url = Top ++ ?STATE,
283
284        ?assertMatch({200, _}, req(put, Url, #{state => stopped})),
285        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)),
286
287        % Can add jobs with global state stopped, they just won't be running
288        {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}),
289        ?assertMatch([#{?OK := true}], R1),
290        [#{?ID := Id1}] = R1,
291        {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)),
292        ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1),
293        % Check summary stats
294        ?assertMatch({200, #{
295            <<"state">> := <<"stopped">>,
296            <<"running">> := 0,
297            <<"stopped">> := 1,
298            <<"total">> := 1
299        }}, req(get, Top ++ ?RESHARD)),
300
301        % Can delete the job when stopped
302        {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)),
303        ?assertMatch({200, #{
304            <<"state">> := <<"stopped">>,
305            <<"running">> := 0,
306            <<"stopped">> := 0,
307            <<"total">> := 0
308        }}, req(get, Top ++ ?RESHARD)),
309
310        % Add same job again
311        {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split,
312            db => Db1}),
313        ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}},
314            req(get, Top ++ ?JOBS ++ ?b2l(Id2))),
315
316        % Job should start after resharding is started on the cluster
317        ?assertMatch({200, _}, req(put, Url, #{state => running})),
318        ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}}
319            when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2)))
320     end)}.
321
322
323individual_job_start_stop({Top, {Db1, _, _}}) ->
324    {timeout, ?TIMEOUT, ?_test(begin
325        intercept_state(topoff1),
326
327        Body = #{type => split, db => Db1},
328        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
329
330        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
331        StUrl =  JobUrl ++ "/state",
332
333        % Wait for the the job to start running and intercept it in topoff1 state
334        receive {JobPid, topoff1} -> ok end,
335        % Tell the intercept to never finish checkpointing so job is left hanging
336        % forever in running state
337        JobPid ! cancel,
338        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
339
340        {200, _} = req(put, StUrl, #{state => stopped}),
341        wait_state(StUrl, <<"stopped">>),
342
343        % Stop/start resharding globally and job should still stay stopped
344        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
345        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
346        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
347
348        % Start the job again
349        ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
350        % Wait for the the job to start running and intercept it in topoff1 state
351        receive {JobPid2, topoff1} -> ok end,
352        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
353        % Let it continue running and it should complete eventually
354        JobPid2 ! continue,
355        wait_state(StUrl, <<"completed">>)
356    end)}.
357
358
359individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) ->
360    {timeout, ?TIMEOUT, ?_test(begin
361        intercept_state(topoff1),
362
363        Body = #{type => split, db => Db1},
364        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
365
366        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
367        StUrl =  JobUrl ++ "/state",
368
369        % Wait for the the job to start running and intercept in topoff1
370        receive {JobPid, topoff1} -> ok end,
371        % Tell the intercept to never finish checkpointing so job is left
372        % hanging forever in running state
373        JobPid ! cancel,
374        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
375
376        % Stop resharding globally
377        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
378        wait_state(StUrl, <<"stopped">>),
379
380        % Stop the job specifically
381        {200, _} = req(put, StUrl, #{state => stopped}),
382        % Job stays stopped
383        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
384
385        % Set cluster to running again
386        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
387
388        % The job should stay stopped
389        ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)),
390
391        % It should be possible to resume job and it should complete
392        ?assertMatch({200, _}, req(put, StUrl, #{state => running})),
393
394        % Wait for the the job to start running and intercept in topoff1 state
395        receive {JobPid2, topoff1} -> ok end,
396        ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)),
397
398        % Let it continue running and it should complete eventually
399        JobPid2 ! continue,
400        wait_state(StUrl, <<"completed">>)
401    end)}.
402
403
404create_job_with_invalid_arguments({Top, {Db1, _, _}}) ->
405    {timeout, ?TIMEOUT, ?_test(begin
406        Jobs = Top ++ ?JOBS,
407
408        % Nothing in the body
409        ?assertMatch({400, _}, req(post, Jobs, #{})),
410
411        % Missing type
412        ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})),
413
414        % Have type but no db and no shard
415        ?assertMatch({400, _}, req(post, Jobs, #{type => split})),
416
417        % Have type and db but db is invalid
418        ?assertMatch({400, _}, req(post, Jobs,  #{db => <<"baddb">>,
419            type => split})),
420
421        % Have type and shard but shard is not an existing database
422        ?assertMatch({404, _}, req(post, Jobs, #{type => split,
423            shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})),
424
425        % Bad range values, too large, different types, inverted
426        ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42,
427            type => split})),
428        ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
429            range => <<"x">>, type => split})),
430        ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
431            range => <<"ffffffff-80000000">>, type => split})),
432        ?assertMatch({400, _}, req(post, Jobs, #{db => Db1,
433            range => <<"00000000-fffffffff">>, type => split})),
434
435        % Can't have both db and shard
436        ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1,
437             shard => <<"blah">>}))
438    end)}.
439
440
441create_job_with_db({Top, {Db1, _, _}}) ->
442    {timeout, ?TIMEOUT, ?_test(begin
443        Jobs = Top ++ ?JOBS,
444        Body1 = #{type => split, db => Db1},
445
446        % Node with db
447        N = atom_to_binary(node(), utf8),
448        {C1, R1} = req(post, Jobs, Body1#{node => N}),
449        ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
450        wait_to_complete_then_cleanup(Top, R1),
451
452        % Range and db
453        {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}),
454        ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
455        wait_to_complete_then_cleanup(Top, R2),
456
457        % Node, range and db
458        Range = <<"80000000-ffffffff">>,
459        {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}),
460        ?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
461        wait_to_complete_then_cleanup(Top, R3),
462
463        ?assertMatch([
464            [16#00000000, 16#3fffffff],
465            [16#40000000, 16#7fffffff],
466            [16#80000000, 16#bfffffff],
467            [16#c0000000, 16#ffffffff]
468        ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))])
469    end)}.
470
471
472create_job_with_shard_name({Top, {_, _, Db3}}) ->
473    {timeout, ?TIMEOUT, ?_test(begin
474        Jobs = Top ++ ?JOBS,
475        [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))],
476
477        % Shard only
478        {C1, R1} = req(post, Jobs, #{type => split, shard => S1}),
479        ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
480        wait_to_complete_then_cleanup(Top, R1),
481
482        % Shard with a node
483        N = atom_to_binary(node(), utf8),
484        {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}),
485        ?assertMatch({201, [#{?OK := true}]}, {C2, R2}),
486        wait_to_complete_then_cleanup(Top, R2),
487
488        ?assertMatch([
489            [16#00000000, 16#3fffffff],
490            [16#40000000, 16#7fffffff],
491            [16#80000000, 16#bfffffff],
492            [16#c0000000, 16#ffffffff]
493        ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))])
494    end)}.
495
496
497completed_job_handling({Top, {Db1, _, _}}) ->
498    {timeout, ?TIMEOUT, ?_test(begin
499        Jobs = Top ++ ?JOBS,
500
501        % Run job to completion
502        {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
503        ?assertMatch({201, [#{?OK := true}]}, {C1, R1}),
504        [#{?ID := Id}] = R1,
505        wait_to_complete(Top, R1),
506
507        % Check top level stats
508        ?assertMatch({200, #{
509            <<"state">> := <<"running">>,
510            <<"state_reason">> := null,
511            <<"completed">> := 1,
512            <<"failed">> := 0,
513            <<"running">> := 0,
514            <<"stopped">> := 0,
515            <<"total">> := 1
516        }}, req(get, Top ++ ?RESHARD)),
517
518        % Job state itself
519        JobUrl = Jobs ++ ?b2l(Id),
520        ?assertMatch({200, #{
521            <<"split_state">> := <<"completed">>,
522            <<"job_state">> := <<"completed">>
523        }}, req(get, JobUrl)),
524
525        % Job's state endpoint
526        StUrl = Jobs ++ ?b2l(Id) ++ "/state",
527        ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
528
529        % Try to stop it and it should stay completed
530        {200, _} = req(put, StUrl, #{state => stopped}),
531        ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
532
533        % Try to resume it and it should stay completed
534        {200, _} = req(put, StUrl, #{state => running}),
535        ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
536
537        % Stop resharding globally and job should still stay completed
538        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
539        ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
540
541        % Start resharding and job stays completed
542        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
543        ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)),
544
545        ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl))
546    end)}.
547
548
549handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) ->
550    {timeout, ?TIMEOUT, ?_test(begin
551        JobId = delete_source_in_state(Top, Db1, topoff1),
552        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
553    end)}.
554
555
556handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) ->
557    {timeout, ?TIMEOUT, ?_test(begin
558        JobId = delete_source_in_state(Top, Db1, initial_copy),
559        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
560    end)}.
561
562
563handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) ->
564    {timeout, ?TIMEOUT, ?_test(begin
565        JobId = delete_source_in_state(Top, Db1, copy_local_docs),
566        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
567    end)}.
568
569
570handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) ->
571    {timeout, ?TIMEOUT, ?_test(begin
572        JobId = delete_source_in_state(Top, Db1, build_indices),
573        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
574    end)}.
575
576
577handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) ->
578    {timeout, ?TIMEOUT, ?_test(begin
579        JobId = delete_source_in_state(Top, Db1, update_shardmap),
580        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
581    end)}.
582
583
584handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) ->
585    {timeout, ?TIMEOUT, ?_test(begin
586        JobId = delete_source_in_state(Top, Db1, wait_source_close),
587        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>)
588    end)}.
589
590
591recover_in_topoff1({Top, {Db1, _, _}}) ->
592    {timeout, ?TIMEOUT, ?_test(begin
593        JobId = recover_in_state(Top, Db1, topoff1),
594        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
595    end)}.
596
597
598recover_in_initial_copy({Top, {Db1, _, _}}) ->
599    {timeout, ?TIMEOUT, ?_test(begin
600        JobId = recover_in_state(Top, Db1, initial_copy),
601        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
602    end)}.
603
604
605recover_in_copy_local_docs({Top, {Db1, _, _}}) ->
606    {timeout, ?TIMEOUT, ?_test(begin
607        JobId = recover_in_state(Top, Db1, copy_local_docs),
608        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
609    end)}.
610
611
612recover_in_build_indices({Top, {Db1, _, _}}) ->
613    {timeout, ?TIMEOUT, ?_test(begin
614        JobId = recover_in_state(Top, Db1, build_indices),
615        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
616    end)}.
617
618
619recover_in_update_shard_map({Top, {Db1, _, _}}) ->
620    {timeout, ?TIMEOUT, ?_test(begin
621        JobId = recover_in_state(Top, Db1, update_shardmap),
622        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
623    end)}.
624
625
626recover_in_wait_source_close({Top, {Db1, _, _}}) ->
627    {timeout, ?TIMEOUT, ?_test(begin
628        JobId = recover_in_state(Top, Db1, wait_source_close),
629        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
630    end)}.
631
632
633recover_in_topoff3({Top, {Db1, _, _}}) ->
634    {timeout, ?TIMEOUT, ?_test(begin
635        JobId = recover_in_state(Top, Db1, topoff3),
636        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
637    end)}.
638
639
640recover_in_source_delete({Top, {Db1, _, _}}) ->
641    {timeout, ?TIMEOUT, ?_test(begin
642        JobId = recover_in_state(Top, Db1, source_delete),
643        wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>)
644    end)}.
645
646
647check_max_jobs({Top, {Db1, Db2, _}}) ->
648    {timeout, ?TIMEOUT, ?_test(begin
649        Jobs = Top ++ ?JOBS,
650
651        config:set("reshard", "max_jobs", "0", _Persist=false),
652        {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
653        ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}),
654
655        config:set("reshard", "max_jobs", "1", _Persist=false),
656        {201, R2} = req(post, Jobs, #{type => split, db => Db1}),
657        wait_to_complete(Top, R2),
658
659        % Stop clustering so jobs are not started anymore and ensure max jobs
660        % is enforced even if jobs are stopped
661        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
662
663        {C3, R3} = req(post, Jobs, #{type => split, db => Db2}),
664        ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]},
665            {C3, R3}),
666
667        % Allow the job to be created by raising max_jobs
668        config:set("reshard", "max_jobs", "2", _Persist=false),
669
670        {C4, R4} = req(post, Jobs, #{type => split, db => Db2}),
671        ?assertEqual(201, C4),
672
673        % Lower max_jobs after job is created but it's not running
674        config:set("reshard", "max_jobs", "1", _Persist=false),
675
676        % Start resharding again
677        ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
678
679        % Jobs that have been created already are not removed if max jobs is lowered
680        % so make sure the job completes
681        wait_to_complete(Top, R4)
682    end)}.
683
684
685check_node_and_range_required_params({Top, {Db1, _, _}}) ->
686    {timeout, ?TIMEOUT, ?_test(begin
687        Jobs = Top ++ ?JOBS,
688
689        Node = atom_to_binary(node(), utf8),
690        Range = <<"00000000-ffffffff">>,
691
692        config:set("reshard", "require_node_param", "true", _Persist=false),
693        {C1, R1} = req(post, Jobs, #{type => split, db => Db1}),
694        NodeRequiredErr =  <<"`node` prameter is required">>,
695        ?assertEqual({400, #{<<"error">> => <<"bad_request">>,
696            <<"reason">> => NodeRequiredErr}}, {C1, R1}),
697
698        config:set("reshard", "require_range_param", "true", _Persist=false),
699        {C2, R2} = req(post, Jobs, #{type => split, db => Db1, node => Node}),
700        RangeRequiredErr =  <<"`range` prameter is required">>,
701        ?assertEqual({400, #{<<"error">> => <<"bad_request">>,
702            <<"reason">> => RangeRequiredErr}}, {C2, R2}),
703
704        Body = #{type => split, db => Db1, range => Range, node => Node},
705        {C3, R3} = req(post, Jobs, Body),
706        ?assertMatch({201, [#{?OK := true}]}, {C3, R3}),
707        wait_to_complete_then_cleanup(Top, R3)
708    end)}.
709
710
711cleanup_completed_jobs({Top, {Db1, _, _}}) ->
712    {timeout, ?TIMEOUT, ?_test(begin
713        Body = #{type => split, db => Db1},
714        {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
715        JobUrl = Top ++ ?JOBS ++ ?b2l(Id),
716        wait_state(JobUrl ++ "/state", <<"completed">>),
717        delete_db(Top, Db1),
718        wait_for_http_code(JobUrl, 404)
719    end)}.
720
721
722% Test help functions
723
724wait_to_complete_then_cleanup(Top, Jobs) ->
725    JobsUrl = Top ++ ?JOBS,
726    lists:foreach(fun(#{?ID := Id}) ->
727        wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>),
728        {200, _} = req(delete, JobsUrl ++ ?b2l(Id))
729    end, Jobs).
730
731
732wait_to_complete(Top, Jobs) ->
733    JobsUrl = Top ++ ?JOBS,
734    lists:foreach(fun(#{?ID := Id}) ->
735        wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>)
736    end, Jobs).
737
738
739intercept_state(State) ->
740    TestPid = self(),
741    meck:new(mem3_reshard_job, [passthrough]),
742    meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
743        case Job#job.split_state of
744            State ->
745                TestPid ! {self(), State},
746                receive
747                    continue -> meck:passthrough([Job]);
748                    cancel -> ok
749                end;
750            _ ->
751                meck:passthrough([Job])
752        end
753    end).
754
755
756cancel_intercept() ->
757    meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) ->
758        meck:passthrough([Job])
759    end).
760
761
762wait_state(Url, State) ->
763    test_util:wait(fun() ->
764            case req(get, Url) of
765                {200, #{<<"state">> := State}} -> ok;
766                {200, #{}} -> timer:sleep(100), wait
767            end
768    end, 30000).
769
770
771wait_for_http_code(Url, Code) when is_integer(Code) ->
772    test_util:wait(fun() ->
773            case req(get, Url) of
774                {Code, _} -> ok;
775                {_, _} -> timer:sleep(100), wait
776            end
777    end, 30000).
778
779
780delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) ->
781    intercept_state(State),
782    Body = #{type => split, db => Db},
783    {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
784    receive {JobPid, State} -> ok end,
785    sync_delete_db(Top, Db),
786    JobPid ! continue,
787    Id.
788
789
790recover_in_state(Top, Db, State) when is_atom(State) ->
791    intercept_state(State),
792    Body = #{type => split, db => Db},
793    {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body),
794    receive {JobPid, State} -> ok end,
795    % Job is now stuck in running we prevented it from executing
796    % the given state
797    JobPid ! cancel,
798    % Now restart resharding
799    ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})),
800    cancel_intercept(),
801    ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})),
802    Id.
803
804
805create_db(Top, Db, QArgs) when is_binary(Db) ->
806    Url = Top ++ binary_to_list(Db) ++ QArgs,
807    {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"),
808    ?assert(Status =:= 201 orelse Status =:= 202).
809
810
811delete_db(Top, Db) when is_binary(Db) ->
812    Url = Top ++ binary_to_list(Db),
813    case test_request:get(Url, [?AUTH]) of
814        {ok, 404, _, _} ->
815            not_found;
816        {ok, 200, _, _} ->
817            {ok, 200, _, _} = test_request:delete(Url, [?AUTH]),
818            ok
819    end.
820
821
822sync_delete_db(Top, Db) when is_binary(Db) ->
823    delete_db(Top, Db),
824    try
825        Shards = mem3:local_shards(Db),
826        ShardNames = [mem3:name(S) || S <- Shards],
827        [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames],
828        ok
829    catch
830        error:database_does_not_exist ->
831            ok
832    end.
833
834
835req(Method, Url) ->
836    Headers = [?AUTH],
837    {ok, Code, _, Res} = test_request:request(Method, Url, Headers),
838    {Code, jiffy:decode(Res, [return_maps])}.
839
840
841req(Method, Url, #{} = Body) ->
842    req(Method, Url, jiffy:encode(Body));
843
844req(Method, Url, Body) ->
845    Headers = [?JSON, ?AUTH],
846    {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body),
847    {Code, jiffy:decode(Res, [return_maps])}.
848