1% Licensed under the Apache License, Version 2.0 (the "License"); you may not 2% use this file except in compliance with the License. You may obtain a copy of 3% the License at 4% 5% http://www.apache.org/licenses/LICENSE-2.0 6% 7% Unless required by applicable law or agreed to in writing, software 8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 10% License for the specific language governing permissions and limitations under 11% the License. 12 13-module(couch_work_queue_tests). 14 15-include_lib("couch/include/couch_eunit.hrl"). 16 17-define(TIMEOUT, 100). 18 19 20setup(Opts) -> 21 {ok, Q} = couch_work_queue:new(Opts), 22 Producer = spawn_producer(Q), 23 Consumer = spawn_consumer(Q), 24 {Q, Producer, Consumer}. 25 26setup_max_items() -> 27 setup([{max_items, 3}]). 28 29setup_max_size() -> 30 setup([{max_size, 160}]). 31 32setup_max_items_and_size() -> 33 setup([{max_size, 160}, {max_items, 3}]). 34 35setup_multi_workers() -> 36 {Q, Producer, Consumer1} = setup([{max_size, 160}, 37 {max_items, 3}, 38 {multi_workers, true}]), 39 Consumer2 = spawn_consumer(Q), 40 Consumer3 = spawn_consumer(Q), 41 {Q, Producer, [Consumer1, Consumer2, Consumer3]}. 42 43teardown({Q, Producer, Consumers}) when is_list(Consumers) -> 44 % consume all to unblock and let producer/consumer stop without timeout 45 [consume(Consumer, all) || Consumer <- Consumers], 46 47 ok = close_queue(Q), 48 ok = stop(Producer, "producer"), 49 R = [stop(Consumer, "consumer") || Consumer <- Consumers], 50 R = [ok || _ <- Consumers], 51 ok; 52teardown({Q, Producer, Consumer}) -> 53 teardown({Q, Producer, [Consumer]}). 54 55 56single_consumer_test_() -> 57 { 58 "Single producer and consumer", 59 [ 60 { 61 "Queue with 3 max items", 62 { 63 foreach, 64 fun setup_max_items/0, fun teardown/1, 65 single_consumer_max_item_count() ++ common_cases() 66 } 67 }, 68 { 69 "Queue with max size of 160 bytes", 70 { 71 foreach, 72 fun setup_max_size/0, fun teardown/1, 73 single_consumer_max_size() ++ common_cases() 74 } 75 }, 76 { 77 "Queue with max size of 160 bytes and 3 max items", 78 { 79 foreach, 80 fun setup_max_items_and_size/0, fun teardown/1, 81 single_consumer_max_items_and_size() ++ common_cases() 82 } 83 } 84 ] 85 }. 86 87multiple_consumers_test_() -> 88 { 89 "Single producer and multiple consumers", 90 [ 91 { 92 "Queue with max size of 160 bytes and 3 max items", 93 { 94 foreach, 95 fun setup_multi_workers/0, fun teardown/1, 96 common_cases() ++ multiple_consumers() 97 } 98 99 } 100 ] 101 }. 102 103common_cases()-> 104 [ 105 fun should_block_consumer_on_dequeue_from_empty_queue/1, 106 fun should_consume_right_item/1, 107 fun should_timeout_on_close_non_empty_queue/1, 108 fun should_not_block_producer_for_non_empty_queue_after_close/1, 109 fun should_be_closed/1 110 ]. 111 112single_consumer_max_item_count()-> 113 [ 114 fun should_have_no_items_for_new_queue/1, 115 fun should_block_producer_on_full_queue_count/1, 116 fun should_receive_first_queued_item/1, 117 fun should_consume_multiple_items/1, 118 fun should_consume_all/1 119 ]. 120 121single_consumer_max_size()-> 122 [ 123 fun should_have_zero_size_for_new_queue/1, 124 fun should_block_producer_on_full_queue_size/1, 125 fun should_increase_queue_size_on_produce/1, 126 fun should_receive_first_queued_item/1, 127 fun should_consume_multiple_items/1, 128 fun should_consume_all/1 129 ]. 130 131single_consumer_max_items_and_size() -> 132 single_consumer_max_item_count() ++ single_consumer_max_size(). 133 134multiple_consumers() -> 135 [ 136 fun should_have_zero_size_for_new_queue/1, 137 fun should_have_no_items_for_new_queue/1, 138 fun should_increase_queue_size_on_produce/1 139 ]. 140 141 142should_have_no_items_for_new_queue({Q, _, _}) -> 143 ?_assertEqual(0, couch_work_queue:item_count(Q)). 144 145should_have_zero_size_for_new_queue({Q, _, _}) -> 146 ?_assertEqual(0, couch_work_queue:size(Q)). 147 148should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) -> 149 [consume(C, 2) || C <- Consumers], 150 Pongs = [ping(C) || C <- Consumers], 151 ?_assertEqual([timeout, timeout, timeout], Pongs); 152should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) -> 153 consume(Consumer, 1), 154 Pong = ping(Consumer), 155 ?_assertEqual(timeout, Pong). 156 157should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) -> 158 [consume(C, 3) || C <- Consumers], 159 160 Item1 = produce(Q, Producer, 10, false), 161 ok = ping(Producer), 162 ?assertEqual(0, couch_work_queue:item_count(Q)), 163 ?assertEqual(0, couch_work_queue:size(Q)), 164 165 Item2 = produce(Q, Producer, 10, false), 166 ok = ping(Producer), 167 ?assertEqual(0, couch_work_queue:item_count(Q)), 168 ?assertEqual(0, couch_work_queue:size(Q)), 169 170 Item3 = produce(Q, Producer, 10, false), 171 ok = ping(Producer), 172 ?assertEqual(0, couch_work_queue:item_count(Q)), 173 ?assertEqual(0, couch_work_queue:size(Q)), 174 175 R = [{ping(C), Item} 176 || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])], 177 178 ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R); 179should_consume_right_item({Q, Producer, Consumer}) -> 180 consume(Consumer, 1), 181 Item = produce(Q, Producer, 10, false), 182 produce(Q, Producer, 20, true), 183 ok = ping(Producer), 184 ok = ping(Consumer), 185 {ok, Items} = last_consumer_items(Consumer), 186 ?_assertEqual([Item], Items). 187 188should_increase_queue_size_on_produce({Q, Producer, _}) -> 189 produce(Q, Producer, 50, true), 190 ok = ping(Producer), 191 Count1 = couch_work_queue:item_count(Q), 192 Size1 = couch_work_queue:size(Q), 193 194 produce(Q, Producer, 10, true), 195 Count2 = couch_work_queue:item_count(Q), 196 Size2 = couch_work_queue:size(Q), 197 198 ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]). 199 200should_block_producer_on_full_queue_count({Q, Producer, _}) -> 201 produce(Q, Producer, 10, true), 202 ?assertEqual(1, couch_work_queue:item_count(Q)), 203 ok = ping(Producer), 204 205 produce(Q, Producer, 15, true), 206 ?assertEqual(2, couch_work_queue:item_count(Q)), 207 ok = ping(Producer), 208 209 produce(Q, Producer, 20, true), 210 ?assertEqual(3, couch_work_queue:item_count(Q)), 211 Pong = ping(Producer), 212 213 ?_assertEqual(timeout, Pong). 214 215should_block_producer_on_full_queue_size({Q, Producer, _}) -> 216 produce(Q, Producer, 100, true), 217 ok = ping(Producer), 218 ?assertEqual(1, couch_work_queue:item_count(Q)), 219 ?assertEqual(100, couch_work_queue:size(Q)), 220 221 produce(Q, Producer, 110, false), 222 Pong = ping(Producer), 223 ?assertEqual(2, couch_work_queue:item_count(Q)), 224 ?assertEqual(210, couch_work_queue:size(Q)), 225 226 ?_assertEqual(timeout, Pong). 227 228should_consume_multiple_items({Q, Producer, Consumer}) -> 229 Item1 = produce(Q, Producer, 10, true), 230 ok = ping(Producer), 231 232 Item2 = produce(Q, Producer, 15, true), 233 ok = ping(Producer), 234 235 consume(Consumer, 2), 236 237 {ok, Items} = last_consumer_items(Consumer), 238 ?_assertEqual([Item1, Item2], Items). 239 240should_receive_first_queued_item({Q, Producer, Consumer}) -> 241 consume(Consumer, 100), 242 timeout = ping(Consumer), 243 244 Item = produce(Q, Producer, 11, false), 245 ok = ping(Producer), 246 247 ok = ping(Consumer), 248 ?assertEqual(0, couch_work_queue:item_count(Q)), 249 250 {ok, Items} = last_consumer_items(Consumer), 251 ?_assertEqual([Item], Items). 252 253should_consume_all({Q, Producer, Consumer}) -> 254 Item1 = produce(Q, Producer, 10, true), 255 Item2 = produce(Q, Producer, 15, true), 256 Item3 = produce(Q, Producer, 20, true), 257 258 consume(Consumer, all), 259 260 {ok, Items} = last_consumer_items(Consumer), 261 ?_assertEqual([Item1, Item2, Item3], Items). 262 263should_timeout_on_close_non_empty_queue({Q, Producer, _}) -> 264 produce(Q, Producer, 1, true), 265 Status = close_queue(Q), 266 267 ?_assertEqual(timeout, Status). 268 269should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) -> 270 produce(Q, Producer, 1, true), 271 close_queue(Q), 272 Pong = ping(Producer), 273 Size = couch_work_queue:size(Q), 274 Count = couch_work_queue:item_count(Q), 275 276 ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}). 277 278should_be_closed({Q, _, Consumers}) when is_list(Consumers) -> 279 ok = close_queue(Q), 280 281 [consume(C, 1) || C <- Consumers], 282 283 LastConsumerItems = [last_consumer_items(C) || C <- Consumers], 284 ItemsCount = couch_work_queue:item_count(Q), 285 Size = couch_work_queue:size(Q), 286 287 ?_assertEqual({[closed, closed, closed], closed, closed}, 288 {LastConsumerItems, ItemsCount, Size}); 289should_be_closed({Q, _, Consumer}) -> 290 ok = close_queue(Q), 291 292 consume(Consumer, 1), 293 294 LastConsumerItems = last_consumer_items(Consumer), 295 ItemsCount = couch_work_queue:item_count(Q), 296 Size = couch_work_queue:size(Q), 297 298 ?_assertEqual({closed, closed, closed}, 299 {LastConsumerItems, ItemsCount, Size}). 300 301 302close_queue(Q) -> 303 test_util:stop_sync(Q, fun() -> 304 ok = couch_work_queue:close(Q) 305 end, ?TIMEOUT). 306 307spawn_consumer(Q) -> 308 Parent = self(), 309 spawn(fun() -> consumer_loop(Parent, Q, nil) end). 310 311consumer_loop(Parent, Q, PrevItem) -> 312 receive 313 {stop, Ref} -> 314 Parent ! {ok, Ref}; 315 {ping, Ref} -> 316 Parent ! {pong, Ref}, 317 consumer_loop(Parent, Q, PrevItem); 318 {last_item, Ref} -> 319 Parent ! {item, Ref, PrevItem}, 320 consumer_loop(Parent, Q, PrevItem); 321 {consume, N} -> 322 Result = couch_work_queue:dequeue(Q, N), 323 consumer_loop(Parent, Q, Result) 324 end. 325 326spawn_producer(Q) -> 327 Parent = self(), 328 spawn(fun() -> producer_loop(Parent, Q) end). 329 330producer_loop(Parent, Q) -> 331 receive 332 {stop, Ref} -> 333 Parent ! {ok, Ref}; 334 {ping, Ref} -> 335 Parent ! {pong, Ref}, 336 producer_loop(Parent, Q); 337 {produce, Ref, Size} -> 338 Item = crypto:strong_rand_bytes(Size), 339 Parent ! {item, Ref, Item}, 340 ok = couch_work_queue:queue(Q, Item), 341 producer_loop(Parent, Q) 342 end. 343 344consume(Consumer, N) -> 345 Consumer ! {consume, N}. 346 347last_consumer_items(Consumer) -> 348 Ref = make_ref(), 349 Consumer ! {last_item, Ref}, 350 receive 351 {item, Ref, Items} -> 352 Items 353 after ?TIMEOUT -> 354 timeout 355 end. 356 357produce(Q, Producer, Size, Wait) -> 358 Ref = make_ref(), 359 ItemsCount = couch_work_queue:item_count(Q), 360 Producer ! {produce, Ref, Size}, 361 receive 362 {item, Ref, Item} when Wait -> 363 ok = wait_increment(Q, ItemsCount), 364 Item; 365 {item, Ref, Item} -> 366 Item 367 after ?TIMEOUT -> 368 erlang:error({assertion_failed, 369 [{module, ?MODULE}, 370 {line, ?LINE}, 371 {reason, "Timeout asking producer to produce an item"}]}) 372 end. 373 374ping(Pid) -> 375 Ref = make_ref(), 376 Pid ! {ping, Ref}, 377 receive 378 {pong, Ref} -> 379 ok 380 after ?TIMEOUT -> 381 timeout 382 end. 383 384stop(Pid, Name) -> 385 Ref = make_ref(), 386 Pid ! {stop, Ref}, 387 receive 388 {ok, Ref} -> ok 389 after ?TIMEOUT -> 390 ?debugMsg("Timeout stopping " ++ Name), 391 timeout 392 end. 393 394wait_increment(Q, ItemsCount) -> 395 test_util:wait(fun() -> 396 case couch_work_queue:item_count(Q) > ItemsCount of 397 true -> 398 ok; 399 false -> 400 wait 401 end 402 end). 403