1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(mem3_reshard_api_test). 14 15 16-include_lib("couch/include/couch_eunit.hrl"). 17-include_lib("couch/include/couch_db.hrl"). 18-include_lib("mem3/src/mem3_reshard.hrl"). 19 20 21-define(USER, "mem3_reshard_api_test_admin"). 22-define(PASS, "pass"). 23-define(AUTH, {basic_auth, {?USER, ?PASS}}). 24-define(JSON, {"Content-Type", "application/json"}). 25-define(RESHARD, "_reshard/"). 26-define(JOBS, "_reshard/jobs/"). 27-define(STATE, "_reshard/state"). 28-define(ID, <<"id">>). 29-define(OK, <<"ok">>). 30-define(TIMEOUT, 60). % seconds 31 32 33setup() -> 34 Hashed = couch_passwords:hash_admin_password(?PASS), 35 ok = config:set("admins", ?USER, ?b2l(Hashed), _Persist=false), 36 Addr = config:get("chttpd", "bind_address", "127.0.0.1"), 37 Port = mochiweb_socket_server:get(chttpd, port), 38 Url = lists:concat(["http://", Addr, ":", Port, "/"]), 39 {Db1, Db2, Db3} = {?tempdb(), ?tempdb(), ?tempdb()}, 40 create_db(Url, Db1, "?q=1&n=1"), 41 create_db(Url, Db2, "?q=1&n=1"), 42 create_db(Url, Db3, "?q=2&n=1"), 43 {Url, {Db1, Db2, Db3}}. 44 45 46teardown({Url, {Db1, Db2, Db3}}) -> 47 mem3_reshard:reset_state(), 48 application:unset_env(mem3, reshard_disabled), 49 delete_db(Url, Db1), 50 delete_db(Url, Db2), 51 delete_db(Url, Db3), 52 ok = config:delete("reshard", "max_jobs", _Persist=false), 53 ok = config:delete("reshard", "require_node_param", _Persist=false), 54 ok = config:delete("reshard", "require_range_param", _Persist=false), 55 ok = config:delete("admins", ?USER, _Persist=false), 56 meck:unload(). 57 58 59start_couch() -> 60 test_util:start_couch([mem3, chttpd]). 61 62 63stop_couch(Ctx) -> 64 test_util:stop_couch(Ctx). 65 66 67mem3_reshard_api_test_() -> 68 { 69 "mem3 shard split api tests", 70 { 71 setup, 72 fun start_couch/0, fun stop_couch/1, 73 { 74 foreach, 75 fun setup/0, fun teardown/1, 76 [ 77 fun basics/1, 78 fun create_job_basic/1, 79 fun create_two_jobs/1, 80 fun create_multiple_jobs_from_one_post/1, 81 fun start_stop_cluster_basic/1, 82 fun test_disabled/1, 83 fun start_stop_cluster_with_a_job/1, 84 fun individual_job_start_stop/1, 85 fun individual_job_stop_when_cluster_stopped/1, 86 fun create_job_with_invalid_arguments/1, 87 fun create_job_with_db/1, 88 fun create_job_with_shard_name/1, 89 fun completed_job_handling/1, 90 fun handle_db_deletion_in_initial_copy/1, 91 fun handle_db_deletion_in_topoff1/1, 92 fun handle_db_deletion_in_copy_local_docs/1, 93 fun handle_db_deletion_in_build_indices/1, 94 fun handle_db_deletion_in_update_shard_map/1, 95 fun handle_db_deletion_in_wait_source_close/1, 96 fun recover_in_initial_copy/1, 97 fun recover_in_topoff1/1, 98 fun recover_in_copy_local_docs/1, 99 fun recover_in_build_indices/1, 100 fun recover_in_update_shard_map/1, 101 fun recover_in_wait_source_close/1, 102 fun recover_in_topoff3/1, 103 fun recover_in_source_delete/1, 104 fun check_max_jobs/1, 105 fun check_node_and_range_required_params/1, 106 fun cleanup_completed_jobs/1 107 ] 108 } 109 } 110 }. 111 112 113basics({Top, _}) -> 114 {timeout, ?TIMEOUT, ?_test(begin 115 % GET /_reshard 116 ?assertMatch({200, #{ 117 <<"state">> := <<"running">>, 118 <<"state_reason">> := null, 119 <<"completed">> := 0, 120 <<"failed">> := 0, 121 <<"running">> := 0, 122 <<"stopped">> := 0, 123 <<"total">> := 0 124 }}, req(get, Top ++ ?RESHARD)), 125 126 % GET _reshard/state 127 ?assertMatch({200, #{<<"state">> := <<"running">>}}, 128 req(get, Top ++ ?STATE)), 129 130 % GET _reshard/jobs 131 ?assertMatch({200, #{ 132 <<"jobs">> := [], 133 <<"offset">> := 0, 134 <<"total_rows">> := 0 135 }}, req(get, Top ++ ?JOBS)), 136 137 % Some invalid paths and methods 138 ?assertMatch({404, _}, req(get, Top ++ ?RESHARD ++ "/invalidpath")), 139 ?assertMatch({405, _}, req(put, Top ++ ?RESHARD, #{dont => thinkso})), 140 ?assertMatch({405, _}, req(post, Top ++ ?RESHARD, #{nope => nope})) 141 end)}. 142 143 144create_job_basic({Top, {Db1, _, _}}) -> 145 {timeout, ?TIMEOUT, ?_test(begin 146 % POST /_reshard/jobs 147 {C1, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), 148 ?assertEqual(201, C1), 149 ?assertMatch([#{?OK := true, ?ID := J, <<"shard">> := S}] 150 when is_binary(J) andalso is_binary(S), R1), 151 [#{?ID := Id, <<"shard">> := Shard}] = R1, 152 153 % GET /_reshard/jobs 154 ?assertMatch({200, #{ 155 <<"jobs">> := [#{?ID := Id, <<"type">> := <<"split">>}], 156 <<"offset">> := 0, 157 <<"total_rows">> := 1 158 }}, req(get, Top ++ ?JOBS)), 159 160 % GET /_reshard/job/$jobid 161 {C2, R2} = req(get, Top ++ ?JOBS ++ ?b2l(Id)), 162 ?assertEqual(200, C2), 163 ThisNode = atom_to_binary(node(), utf8), 164 ?assertMatch(#{?ID := Id}, R2), 165 ?assertMatch(#{<<"type">> := <<"split">>}, R2), 166 ?assertMatch(#{<<"source">> := Shard}, R2), 167 ?assertMatch(#{<<"history">> := History} when length(History) > 1, R2), 168 ?assertMatch(#{<<"node">> := ThisNode}, R2), 169 ?assertMatch(#{<<"split_state">> := SSt} when is_binary(SSt), R2), 170 ?assertMatch(#{<<"job_state">> := JSt} when is_binary(JSt), R2), 171 ?assertMatch(#{<<"state_info">> := #{}}, R2), 172 ?assertMatch(#{<<"target">> := Target} when length(Target) == 2, R2), 173 174 % GET /_reshard/job/$jobid/state 175 ?assertMatch({200, #{<<"state">> := S, <<"reason">> := R}} 176 when is_binary(S) andalso (is_binary(R) orelse R =:= null), 177 req(get, Top ++ ?JOBS ++ ?b2l(Id) ++ "/state")), 178 179 % GET /_reshard 180 ?assertMatch({200, #{<<"state">> := <<"running">>, <<"total">> := 1}}, 181 req(get, Top ++ ?RESHARD)), 182 183 % DELETE /_reshard/jobs/$jobid 184 ?assertMatch({200, #{?OK := true}}, 185 req(delete, Top ++ ?JOBS ++ ?b2l(Id))), 186 187 % GET _reshard/jobs 188 ?assertMatch({200, #{<<"jobs">> := [], <<"total_rows">> := 0}}, 189 req(get, Top ++ ?JOBS)), 190 191 % GET /_reshard/job/$jobid should be a 404 192 ?assertMatch({404, #{}}, req(get, Top ++ ?JOBS ++ ?b2l(Id))), 193 194 % DELETE /_reshard/jobs/$jobid should be a 404 as well 195 ?assertMatch({404, #{}}, req(delete, Top ++ ?JOBS ++ ?b2l(Id))) 196 end)}. 197 198 199create_two_jobs({Top, {Db1, Db2, _}}) -> 200 {timeout, ?TIMEOUT, ?_test(begin 201 Jobs = Top ++ ?JOBS, 202 203 ?assertMatch({201, [#{?OK := true}]}, 204 req(post, Jobs, #{type => split, db => Db1})), 205 ?assertMatch({201, [#{?OK := true}]}, 206 req(post, Jobs, #{type => split, db => Db2})), 207 208 ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)), 209 210 ?assertMatch({200, #{ 211 <<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}], 212 <<"offset">> := 0, 213 <<"total_rows">> := 2 214 }} when Id1 =/= Id2, req(get, Jobs)), 215 216 {200, #{<<"jobs">> := [#{?ID := Id1}, #{?ID := Id2}]}} = req(get, Jobs), 217 218 {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id1)), 219 ?assertMatch({200, #{<<"total">> := 1}}, req(get, Top ++ ?RESHARD)), 220 {200, #{?OK := true}} = req(delete, Jobs ++ ?b2l(Id2)), 221 ?assertMatch({200, #{<<"total">> := 0}}, req(get, Top ++ ?RESHARD)) 222 end)}. 223 224 225create_multiple_jobs_from_one_post({Top, {_, _, Db3}}) -> 226 {timeout, ?TIMEOUT, ?_test(begin 227 Jobs = Top ++ ?JOBS, 228 {C1, R1} = req(post, Jobs, #{type => split, db => Db3}), 229 ?assertMatch({201, [#{?OK := true}, #{?OK := true}]}, {C1, R1}), 230 ?assertMatch({200, #{<<"total">> := 2}}, req(get, Top ++ ?RESHARD)) 231 end)}. 232 233 234start_stop_cluster_basic({Top, _}) -> 235 {timeout, ?TIMEOUT, ?_test(begin 236 Url = Top ++ ?STATE, 237 238 ?assertMatch({200, #{ 239 <<"state">> := <<"running">>, 240 <<"reason">> := null 241 }}, req(get, Url)), 242 243 ?assertMatch({200, _}, req(put, Url, #{state => stopped})), 244 ?assertMatch({200, #{ 245 <<"state">> := <<"stopped">>, 246 <<"reason">> := R 247 }} when is_binary(R), req(get, Url)), 248 249 ?assertMatch({200, _}, req(put, Url, #{state => running})), 250 251 % Make sure the reason shows in the state GET request 252 Reason = <<"somereason">>, 253 ?assertMatch({200, _}, req(put, Url, #{state => stopped, 254 reason => Reason})), 255 ?assertMatch({200, #{<<"state">> := <<"stopped">>, 256 <<"reason">> := Reason}}, req(get, Url)), 257 258 % Top level summary also shows the reason 259 ?assertMatch({200, #{ 260 <<"state">> := <<"stopped">>, 261 <<"state_reason">> := Reason 262 }}, req(get, Top ++ ?RESHARD)), 263 ?assertMatch({200, _}, req(put, Url, #{state => running})), 264 ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, Url)) 265 end)}. 266 267 268test_disabled({Top, _}) -> 269 {timeout, ?TIMEOUT, ?_test(begin 270 application:set_env(mem3, reshard_disabled, true), 271 ?assertMatch({501, _}, req(get, Top ++ ?RESHARD)), 272 ?assertMatch({501, _}, req(put, Top ++ ?STATE, #{state => running})), 273 274 application:unset_env(mem3, reshard_disabled), 275 ?assertMatch({200, _}, req(get, Top ++ ?RESHARD)), 276 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})) 277 end)}. 278 279 280start_stop_cluster_with_a_job({Top, {Db1, _, _}}) -> 281 {timeout, ?TIMEOUT, ?_test(begin 282 Url = Top ++ ?STATE, 283 284 ?assertMatch({200, _}, req(put, Url, #{state => stopped})), 285 ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, Url)), 286 287 % Can add jobs with global state stopped, they just won't be running 288 {201, R1} = req(post, Top ++ ?JOBS, #{type => split, db => Db1}), 289 ?assertMatch([#{?OK := true}], R1), 290 [#{?ID := Id1}] = R1, 291 {200, J1} = req(get, Top ++ ?JOBS ++ ?b2l(Id1)), 292 ?assertMatch(#{?ID := Id1, <<"job_state">> := <<"stopped">>}, J1), 293 % Check summary stats 294 ?assertMatch({200, #{ 295 <<"state">> := <<"stopped">>, 296 <<"running">> := 0, 297 <<"stopped">> := 1, 298 <<"total">> := 1 299 }}, req(get, Top ++ ?RESHARD)), 300 301 % Can delete the job when stopped 302 {200, #{?OK := true}} = req(delete, Top ++ ?JOBS ++ ?b2l(Id1)), 303 ?assertMatch({200, #{ 304 <<"state">> := <<"stopped">>, 305 <<"running">> := 0, 306 <<"stopped">> := 0, 307 <<"total">> := 0 308 }}, req(get, Top ++ ?RESHARD)), 309 310 % Add same job again 311 {201, [#{?ID := Id2}]} = req(post, Top ++ ?JOBS, #{type => split, 312 db => Db1}), 313 ?assertMatch({200, #{?ID := Id2, <<"job_state">> := <<"stopped">>}}, 314 req(get, Top ++ ?JOBS ++ ?b2l(Id2))), 315 316 % Job should start after resharding is started on the cluster 317 ?assertMatch({200, _}, req(put, Url, #{state => running})), 318 ?assertMatch({200, #{?ID := Id2, <<"job_state">> := JSt}} 319 when JSt =/= <<"stopped">>, req(get, Top ++ ?JOBS ++ ?b2l(Id2))) 320 end)}. 321 322 323individual_job_start_stop({Top, {Db1, _, _}}) -> 324 {timeout, ?TIMEOUT, ?_test(begin 325 intercept_state(topoff1), 326 327 Body = #{type => split, db => Db1}, 328 {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), 329 330 JobUrl = Top ++ ?JOBS ++ ?b2l(Id), 331 StUrl = JobUrl ++ "/state", 332 333 % Wait for the the job to start running and intercept it in topoff1 state 334 receive {JobPid, topoff1} -> ok end, 335 % Tell the intercept to never finish checkpointing so job is left hanging 336 % forever in running state 337 JobPid ! cancel, 338 ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), 339 340 {200, _} = req(put, StUrl, #{state => stopped}), 341 wait_state(StUrl, <<"stopped">>), 342 343 % Stop/start resharding globally and job should still stay stopped 344 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), 345 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), 346 ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), 347 348 % Start the job again 349 ?assertMatch({200, _}, req(put, StUrl, #{state => running})), 350 % Wait for the the job to start running and intercept it in topoff1 state 351 receive {JobPid2, topoff1} -> ok end, 352 ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), 353 % Let it continue running and it should complete eventually 354 JobPid2 ! continue, 355 wait_state(StUrl, <<"completed">>) 356 end)}. 357 358 359individual_job_stop_when_cluster_stopped({Top, {Db1, _, _}}) -> 360 {timeout, ?TIMEOUT, ?_test(begin 361 intercept_state(topoff1), 362 363 Body = #{type => split, db => Db1}, 364 {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), 365 366 JobUrl = Top ++ ?JOBS ++ ?b2l(Id), 367 StUrl = JobUrl ++ "/state", 368 369 % Wait for the the job to start running and intercept in topoff1 370 receive {JobPid, topoff1} -> ok end, 371 % Tell the intercept to never finish checkpointing so job is left 372 % hanging forever in running state 373 JobPid ! cancel, 374 ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), 375 376 % Stop resharding globally 377 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), 378 wait_state(StUrl, <<"stopped">>), 379 380 % Stop the job specifically 381 {200, _} = req(put, StUrl, #{state => stopped}), 382 % Job stays stopped 383 ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), 384 385 % Set cluster to running again 386 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), 387 388 % The job should stay stopped 389 ?assertMatch({200, #{<<"state">> := <<"stopped">>}}, req(get, StUrl)), 390 391 % It should be possible to resume job and it should complete 392 ?assertMatch({200, _}, req(put, StUrl, #{state => running})), 393 394 % Wait for the the job to start running and intercept in topoff1 state 395 receive {JobPid2, topoff1} -> ok end, 396 ?assertMatch({200, #{<<"state">> := <<"running">>}}, req(get, StUrl)), 397 398 % Let it continue running and it should complete eventually 399 JobPid2 ! continue, 400 wait_state(StUrl, <<"completed">>) 401 end)}. 402 403 404create_job_with_invalid_arguments({Top, {Db1, _, _}}) -> 405 {timeout, ?TIMEOUT, ?_test(begin 406 Jobs = Top ++ ?JOBS, 407 408 % Nothing in the body 409 ?assertMatch({400, _}, req(post, Jobs, #{})), 410 411 % Missing type 412 ?assertMatch({400, _}, req(post, Jobs, #{db => Db1})), 413 414 % Have type but no db and no shard 415 ?assertMatch({400, _}, req(post, Jobs, #{type => split})), 416 417 % Have type and db but db is invalid 418 ?assertMatch({400, _}, req(post, Jobs, #{db => <<"baddb">>, 419 type => split})), 420 421 % Have type and shard but shard is not an existing database 422 ?assertMatch({404, _}, req(post, Jobs, #{type => split, 423 shard => <<"shards/80000000-ffffffff/baddb.1549492084">>})), 424 425 % Bad range values, too large, different types, inverted 426 ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, range => 42, 427 type => split})), 428 ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, 429 range => <<"x">>, type => split})), 430 ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, 431 range => <<"ffffffff-80000000">>, type => split})), 432 ?assertMatch({400, _}, req(post, Jobs, #{db => Db1, 433 range => <<"00000000-fffffffff">>, type => split})), 434 435 % Can't have both db and shard 436 ?assertMatch({400, _}, req(post, Jobs, #{type => split, db => Db1, 437 shard => <<"blah">>})) 438 end)}. 439 440 441create_job_with_db({Top, {Db1, _, _}}) -> 442 {timeout, ?TIMEOUT, ?_test(begin 443 Jobs = Top ++ ?JOBS, 444 Body1 = #{type => split, db => Db1}, 445 446 % Node with db 447 N = atom_to_binary(node(), utf8), 448 {C1, R1} = req(post, Jobs, Body1#{node => N}), 449 ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), 450 wait_to_complete_then_cleanup(Top, R1), 451 452 % Range and db 453 {C2, R2} = req(post, Jobs, Body1#{range => <<"00000000-7fffffff">>}), 454 ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), 455 wait_to_complete_then_cleanup(Top, R2), 456 457 % Node, range and db 458 Range = <<"80000000-ffffffff">>, 459 {C3, R3} = req(post, Jobs, Body1#{range => Range, node => N}), 460 ?assertMatch({201, [#{?OK := true}]}, {C3, R3}), 461 wait_to_complete_then_cleanup(Top, R3), 462 463 ?assertMatch([ 464 [16#00000000, 16#3fffffff], 465 [16#40000000, 16#7fffffff], 466 [16#80000000, 16#bfffffff], 467 [16#c0000000, 16#ffffffff] 468 ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db1))]) 469 end)}. 470 471 472create_job_with_shard_name({Top, {_, _, Db3}}) -> 473 {timeout, ?TIMEOUT, ?_test(begin 474 Jobs = Top ++ ?JOBS, 475 [S1, S2] = [mem3:name(S) || S <- lists:sort(mem3:shards(Db3))], 476 477 % Shard only 478 {C1, R1} = req(post, Jobs, #{type => split, shard => S1}), 479 ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), 480 wait_to_complete_then_cleanup(Top, R1), 481 482 % Shard with a node 483 N = atom_to_binary(node(), utf8), 484 {C2, R2} = req(post, Jobs, #{type => split, shard => S2, node => N}), 485 ?assertMatch({201, [#{?OK := true}]}, {C2, R2}), 486 wait_to_complete_then_cleanup(Top, R2), 487 488 ?assertMatch([ 489 [16#00000000, 16#3fffffff], 490 [16#40000000, 16#7fffffff], 491 [16#80000000, 16#bfffffff], 492 [16#c0000000, 16#ffffffff] 493 ], [mem3:range(S) || S <- lists:sort(mem3:shards(Db3))]) 494 end)}. 495 496 497completed_job_handling({Top, {Db1, _, _}}) -> 498 {timeout, ?TIMEOUT, ?_test(begin 499 Jobs = Top ++ ?JOBS, 500 501 % Run job to completion 502 {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), 503 ?assertMatch({201, [#{?OK := true}]}, {C1, R1}), 504 [#{?ID := Id}] = R1, 505 wait_to_complete(Top, R1), 506 507 % Check top level stats 508 ?assertMatch({200, #{ 509 <<"state">> := <<"running">>, 510 <<"state_reason">> := null, 511 <<"completed">> := 1, 512 <<"failed">> := 0, 513 <<"running">> := 0, 514 <<"stopped">> := 0, 515 <<"total">> := 1 516 }}, req(get, Top ++ ?RESHARD)), 517 518 % Job state itself 519 JobUrl = Jobs ++ ?b2l(Id), 520 ?assertMatch({200, #{ 521 <<"split_state">> := <<"completed">>, 522 <<"job_state">> := <<"completed">> 523 }}, req(get, JobUrl)), 524 525 % Job's state endpoint 526 StUrl = Jobs ++ ?b2l(Id) ++ "/state", 527 ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), 528 529 % Try to stop it and it should stay completed 530 {200, _} = req(put, StUrl, #{state => stopped}), 531 ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), 532 533 % Try to resume it and it should stay completed 534 {200, _} = req(put, StUrl, #{state => running}), 535 ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), 536 537 % Stop resharding globally and job should still stay completed 538 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), 539 ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), 540 541 % Start resharding and job stays completed 542 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), 543 ?assertMatch({200, #{<<"state">> := <<"completed">>}}, req(get, StUrl)), 544 545 ?assertMatch({200, #{?OK := true}}, req(delete, JobUrl)) 546 end)}. 547 548 549handle_db_deletion_in_topoff1({Top, {Db1, _, _}}) -> 550 {timeout, ?TIMEOUT, ?_test(begin 551 JobId = delete_source_in_state(Top, Db1, topoff1), 552 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 553 end)}. 554 555 556handle_db_deletion_in_initial_copy({Top, {Db1, _, _}}) -> 557 {timeout, ?TIMEOUT, ?_test(begin 558 JobId = delete_source_in_state(Top, Db1, initial_copy), 559 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 560 end)}. 561 562 563handle_db_deletion_in_copy_local_docs({Top, {Db1, _, _}}) -> 564 {timeout, ?TIMEOUT, ?_test(begin 565 JobId = delete_source_in_state(Top, Db1, copy_local_docs), 566 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 567 end)}. 568 569 570handle_db_deletion_in_build_indices({Top, {Db1, _, _}}) -> 571 {timeout, ?TIMEOUT, ?_test(begin 572 JobId = delete_source_in_state(Top, Db1, build_indices), 573 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 574 end)}. 575 576 577handle_db_deletion_in_update_shard_map({Top, {Db1, _, _}}) -> 578 {timeout, ?TIMEOUT, ?_test(begin 579 JobId = delete_source_in_state(Top, Db1, update_shardmap), 580 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 581 end)}. 582 583 584handle_db_deletion_in_wait_source_close({Top, {Db1, _, _}}) -> 585 {timeout, ?TIMEOUT, ?_test(begin 586 JobId = delete_source_in_state(Top, Db1, wait_source_close), 587 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"failed">>) 588 end)}. 589 590 591recover_in_topoff1({Top, {Db1, _, _}}) -> 592 {timeout, ?TIMEOUT, ?_test(begin 593 JobId = recover_in_state(Top, Db1, topoff1), 594 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 595 end)}. 596 597 598recover_in_initial_copy({Top, {Db1, _, _}}) -> 599 {timeout, ?TIMEOUT, ?_test(begin 600 JobId = recover_in_state(Top, Db1, initial_copy), 601 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 602 end)}. 603 604 605recover_in_copy_local_docs({Top, {Db1, _, _}}) -> 606 {timeout, ?TIMEOUT, ?_test(begin 607 JobId = recover_in_state(Top, Db1, copy_local_docs), 608 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 609 end)}. 610 611 612recover_in_build_indices({Top, {Db1, _, _}}) -> 613 {timeout, ?TIMEOUT, ?_test(begin 614 JobId = recover_in_state(Top, Db1, build_indices), 615 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 616 end)}. 617 618 619recover_in_update_shard_map({Top, {Db1, _, _}}) -> 620 {timeout, ?TIMEOUT, ?_test(begin 621 JobId = recover_in_state(Top, Db1, update_shardmap), 622 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 623 end)}. 624 625 626recover_in_wait_source_close({Top, {Db1, _, _}}) -> 627 {timeout, ?TIMEOUT, ?_test(begin 628 JobId = recover_in_state(Top, Db1, wait_source_close), 629 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 630 end)}. 631 632 633recover_in_topoff3({Top, {Db1, _, _}}) -> 634 {timeout, ?TIMEOUT, ?_test(begin 635 JobId = recover_in_state(Top, Db1, topoff3), 636 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 637 end)}. 638 639 640recover_in_source_delete({Top, {Db1, _, _}}) -> 641 {timeout, ?TIMEOUT, ?_test(begin 642 JobId = recover_in_state(Top, Db1, source_delete), 643 wait_state(Top ++ ?JOBS ++ ?b2l(JobId) ++ "/state", <<"completed">>) 644 end)}. 645 646 647check_max_jobs({Top, {Db1, Db2, _}}) -> 648 {timeout, ?TIMEOUT, ?_test(begin 649 Jobs = Top ++ ?JOBS, 650 651 config:set("reshard", "max_jobs", "0", _Persist=false), 652 {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), 653 ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, {C1, R1}), 654 655 config:set("reshard", "max_jobs", "1", _Persist=false), 656 {201, R2} = req(post, Jobs, #{type => split, db => Db1}), 657 wait_to_complete(Top, R2), 658 659 % Stop clustering so jobs are not started anymore and ensure max jobs 660 % is enforced even if jobs are stopped 661 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), 662 663 {C3, R3} = req(post, Jobs, #{type => split, db => Db2}), 664 ?assertMatch({500, [#{<<"error">> := <<"max_jobs_exceeded">>}]}, 665 {C3, R3}), 666 667 % Allow the job to be created by raising max_jobs 668 config:set("reshard", "max_jobs", "2", _Persist=false), 669 670 {C4, R4} = req(post, Jobs, #{type => split, db => Db2}), 671 ?assertEqual(201, C4), 672 673 % Lower max_jobs after job is created but it's not running 674 config:set("reshard", "max_jobs", "1", _Persist=false), 675 676 % Start resharding again 677 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), 678 679 % Jobs that have been created already are not removed if max jobs is lowered 680 % so make sure the job completes 681 wait_to_complete(Top, R4) 682 end)}. 683 684 685check_node_and_range_required_params({Top, {Db1, _, _}}) -> 686 {timeout, ?TIMEOUT, ?_test(begin 687 Jobs = Top ++ ?JOBS, 688 689 Node = atom_to_binary(node(), utf8), 690 Range = <<"00000000-ffffffff">>, 691 692 config:set("reshard", "require_node_param", "true", _Persist=false), 693 {C1, R1} = req(post, Jobs, #{type => split, db => Db1}), 694 NodeRequiredErr = <<"`node` prameter is required">>, 695 ?assertEqual({400, #{<<"error">> => <<"bad_request">>, 696 <<"reason">> => NodeRequiredErr}}, {C1, R1}), 697 698 config:set("reshard", "require_range_param", "true", _Persist=false), 699 {C2, R2} = req(post, Jobs, #{type => split, db => Db1, node => Node}), 700 RangeRequiredErr = <<"`range` prameter is required">>, 701 ?assertEqual({400, #{<<"error">> => <<"bad_request">>, 702 <<"reason">> => RangeRequiredErr}}, {C2, R2}), 703 704 Body = #{type => split, db => Db1, range => Range, node => Node}, 705 {C3, R3} = req(post, Jobs, Body), 706 ?assertMatch({201, [#{?OK := true}]}, {C3, R3}), 707 wait_to_complete_then_cleanup(Top, R3) 708 end)}. 709 710 711cleanup_completed_jobs({Top, {Db1, _, _}}) -> 712 {timeout, ?TIMEOUT, ?_test(begin 713 Body = #{type => split, db => Db1}, 714 {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), 715 JobUrl = Top ++ ?JOBS ++ ?b2l(Id), 716 wait_state(JobUrl ++ "/state", <<"completed">>), 717 delete_db(Top, Db1), 718 wait_for_http_code(JobUrl, 404) 719 end)}. 720 721 722% Test help functions 723 724wait_to_complete_then_cleanup(Top, Jobs) -> 725 JobsUrl = Top ++ ?JOBS, 726 lists:foreach(fun(#{?ID := Id}) -> 727 wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>), 728 {200, _} = req(delete, JobsUrl ++ ?b2l(Id)) 729 end, Jobs). 730 731 732wait_to_complete(Top, Jobs) -> 733 JobsUrl = Top ++ ?JOBS, 734 lists:foreach(fun(#{?ID := Id}) -> 735 wait_state(JobsUrl ++ ?b2l(Id) ++ "/state", <<"completed">>) 736 end, Jobs). 737 738 739intercept_state(State) -> 740 TestPid = self(), 741 meck:new(mem3_reshard_job, [passthrough]), 742 meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> 743 case Job#job.split_state of 744 State -> 745 TestPid ! {self(), State}, 746 receive 747 continue -> meck:passthrough([Job]); 748 cancel -> ok 749 end; 750 _ -> 751 meck:passthrough([Job]) 752 end 753 end). 754 755 756cancel_intercept() -> 757 meck:expect(mem3_reshard_job, checkpoint_done, fun(Job) -> 758 meck:passthrough([Job]) 759 end). 760 761 762wait_state(Url, State) -> 763 test_util:wait(fun() -> 764 case req(get, Url) of 765 {200, #{<<"state">> := State}} -> ok; 766 {200, #{}} -> timer:sleep(100), wait 767 end 768 end, 30000). 769 770 771wait_for_http_code(Url, Code) when is_integer(Code) -> 772 test_util:wait(fun() -> 773 case req(get, Url) of 774 {Code, _} -> ok; 775 {_, _} -> timer:sleep(100), wait 776 end 777 end, 30000). 778 779 780delete_source_in_state(Top, Db, State) when is_atom(State), is_binary(Db) -> 781 intercept_state(State), 782 Body = #{type => split, db => Db}, 783 {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), 784 receive {JobPid, State} -> ok end, 785 sync_delete_db(Top, Db), 786 JobPid ! continue, 787 Id. 788 789 790recover_in_state(Top, Db, State) when is_atom(State) -> 791 intercept_state(State), 792 Body = #{type => split, db => Db}, 793 {201, [#{?ID := Id}]} = req(post, Top ++ ?JOBS, Body), 794 receive {JobPid, State} -> ok end, 795 % Job is now stuck in running we prevented it from executing 796 % the given state 797 JobPid ! cancel, 798 % Now restart resharding 799 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => stopped})), 800 cancel_intercept(), 801 ?assertMatch({200, _}, req(put, Top ++ ?STATE, #{state => running})), 802 Id. 803 804 805create_db(Top, Db, QArgs) when is_binary(Db) -> 806 Url = Top ++ binary_to_list(Db) ++ QArgs, 807 {ok, Status, _, _} = test_request:put(Url, [?JSON, ?AUTH], "{}"), 808 ?assert(Status =:= 201 orelse Status =:= 202). 809 810 811delete_db(Top, Db) when is_binary(Db) -> 812 Url = Top ++ binary_to_list(Db), 813 case test_request:get(Url, [?AUTH]) of 814 {ok, 404, _, _} -> 815 not_found; 816 {ok, 200, _, _} -> 817 {ok, 200, _, _} = test_request:delete(Url, [?AUTH]), 818 ok 819 end. 820 821 822sync_delete_db(Top, Db) when is_binary(Db) -> 823 delete_db(Top, Db), 824 try 825 Shards = mem3:local_shards(Db), 826 ShardNames = [mem3:name(S) || S <- Shards], 827 [couch_server:delete(N, [?ADMIN_CTX]) || N <- ShardNames], 828 ok 829 catch 830 error:database_does_not_exist -> 831 ok 832 end. 833 834 835req(Method, Url) -> 836 Headers = [?AUTH], 837 {ok, Code, _, Res} = test_request:request(Method, Url, Headers), 838 {Code, jiffy:decode(Res, [return_maps])}. 839 840 841req(Method, Url, #{} = Body) -> 842 req(Method, Url, jiffy:encode(Body)); 843 844req(Method, Url, Body) -> 845 Headers = [?JSON, ?AUTH], 846 {ok, Code, _, Res} = test_request:request(Method, Url, Headers, Body), 847 {Code, jiffy:decode(Res, [return_maps])}. 848