1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_work_queue_tests).
14
15-include_lib("couch/include/couch_eunit.hrl").
16
17-define(TIMEOUT, 100).
18
19
20setup(Opts) ->
21    {ok, Q} = couch_work_queue:new(Opts),
22    Producer = spawn_producer(Q),
23    Consumer = spawn_consumer(Q),
24    {Q, Producer, Consumer}.
25
26setup_max_items() ->
27    setup([{max_items, 3}]).
28
29setup_max_size() ->
30    setup([{max_size, 160}]).
31
32setup_max_items_and_size() ->
33    setup([{max_size, 160}, {max_items, 3}]).
34
35setup_multi_workers() ->
36    {Q, Producer, Consumer1} = setup([{max_size, 160},
37                                      {max_items, 3},
38                                      {multi_workers, true}]),
39    Consumer2 = spawn_consumer(Q),
40    Consumer3 = spawn_consumer(Q),
41    {Q, Producer, [Consumer1, Consumer2, Consumer3]}.
42
43teardown({Q, Producer, Consumers}) when is_list(Consumers) ->
44    % consume all to unblock and let producer/consumer stop without timeout
45    [consume(Consumer, all) || Consumer <- Consumers],
46
47    ok = close_queue(Q),
48    ok = stop(Producer, "producer"),
49    R = [stop(Consumer, "consumer") || Consumer <- Consumers],
50    R = [ok || _ <- Consumers],
51    ok;
52teardown({Q, Producer, Consumer}) ->
53    teardown({Q, Producer, [Consumer]}).
54
55
56single_consumer_test_() ->
57    {
58        "Single producer and consumer",
59        [
60            {
61                "Queue with 3 max items",
62                {
63                    foreach,
64                    fun setup_max_items/0, fun teardown/1,
65                    single_consumer_max_item_count() ++ common_cases()
66                }
67            },
68            {
69                "Queue with max size of 160 bytes",
70                {
71                    foreach,
72                    fun setup_max_size/0, fun teardown/1,
73                    single_consumer_max_size() ++ common_cases()
74                }
75            },
76            {
77                "Queue with max size of 160 bytes and 3 max items",
78                {
79                    foreach,
80                    fun setup_max_items_and_size/0, fun teardown/1,
81                    single_consumer_max_items_and_size() ++ common_cases()
82                }
83            }
84        ]
85    }.
86
87multiple_consumers_test_() ->
88    {
89        "Single producer and multiple consumers",
90        [
91            {
92                "Queue with max size of 160 bytes and 3 max items",
93                {
94                    foreach,
95                    fun setup_multi_workers/0, fun teardown/1,
96                    common_cases() ++ multiple_consumers()
97                }
98
99            }
100        ]
101    }.
102
103common_cases()->
104    [
105        fun should_block_consumer_on_dequeue_from_empty_queue/1,
106        fun should_consume_right_item/1,
107        fun should_timeout_on_close_non_empty_queue/1,
108        fun should_not_block_producer_for_non_empty_queue_after_close/1,
109        fun should_be_closed/1
110    ].
111
112single_consumer_max_item_count()->
113    [
114        fun should_have_no_items_for_new_queue/1,
115        fun should_block_producer_on_full_queue_count/1,
116        fun should_receive_first_queued_item/1,
117        fun should_consume_multiple_items/1,
118        fun should_consume_all/1
119    ].
120
121single_consumer_max_size()->
122    [
123        fun should_have_zero_size_for_new_queue/1,
124        fun should_block_producer_on_full_queue_size/1,
125        fun should_increase_queue_size_on_produce/1,
126        fun should_receive_first_queued_item/1,
127        fun should_consume_multiple_items/1,
128        fun should_consume_all/1
129    ].
130
131single_consumer_max_items_and_size() ->
132    single_consumer_max_item_count() ++ single_consumer_max_size().
133
134multiple_consumers() ->
135    [
136        fun should_have_zero_size_for_new_queue/1,
137        fun should_have_no_items_for_new_queue/1,
138        fun should_increase_queue_size_on_produce/1
139    ].
140
141
142should_have_no_items_for_new_queue({Q, _, _}) ->
143    ?_assertEqual(0, couch_work_queue:item_count(Q)).
144
145should_have_zero_size_for_new_queue({Q, _, _}) ->
146    ?_assertEqual(0, couch_work_queue:size(Q)).
147
148should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumers}) when is_list(Consumers) ->
149    [consume(C, 2) || C <- Consumers],
150    Pongs = [ping(C) || C <- Consumers],
151    ?_assertEqual([timeout, timeout, timeout], Pongs);
152should_block_consumer_on_dequeue_from_empty_queue({_, _, Consumer}) ->
153    consume(Consumer, 1),
154    Pong = ping(Consumer),
155    ?_assertEqual(timeout, Pong).
156
157should_consume_right_item({Q, Producer, Consumers}) when is_list(Consumers) ->
158    [consume(C, 3) || C <- Consumers],
159
160    Item1 = produce(Q, Producer, 10, false),
161    ok = ping(Producer),
162    ?assertEqual(0, couch_work_queue:item_count(Q)),
163    ?assertEqual(0, couch_work_queue:size(Q)),
164
165    Item2 = produce(Q, Producer, 10, false),
166    ok = ping(Producer),
167    ?assertEqual(0, couch_work_queue:item_count(Q)),
168    ?assertEqual(0, couch_work_queue:size(Q)),
169
170    Item3 = produce(Q, Producer, 10, false),
171    ok = ping(Producer),
172    ?assertEqual(0, couch_work_queue:item_count(Q)),
173    ?assertEqual(0, couch_work_queue:size(Q)),
174
175    R = [{ping(C), Item}
176         || {C, Item} <- lists:zip(Consumers, [Item1, Item2, Item3])],
177
178    ?_assertEqual([{ok, Item1}, {ok, Item2}, {ok, Item3}], R);
179should_consume_right_item({Q, Producer, Consumer}) ->
180    consume(Consumer, 1),
181    Item = produce(Q, Producer, 10, false),
182    produce(Q, Producer, 20, true),
183    ok = ping(Producer),
184    ok = ping(Consumer),
185    {ok, Items} = last_consumer_items(Consumer),
186    ?_assertEqual([Item], Items).
187
188should_increase_queue_size_on_produce({Q, Producer, _}) ->
189    produce(Q, Producer, 50, true),
190    ok = ping(Producer),
191    Count1 = couch_work_queue:item_count(Q),
192    Size1 = couch_work_queue:size(Q),
193
194    produce(Q, Producer, 10, true),
195    Count2 = couch_work_queue:item_count(Q),
196    Size2 = couch_work_queue:size(Q),
197
198    ?_assertEqual([{Count1, Size1}, {Count2, Size2}], [{1, 50}, {2, 60}]).
199
200should_block_producer_on_full_queue_count({Q, Producer, _}) ->
201    produce(Q, Producer, 10, true),
202    ?assertEqual(1, couch_work_queue:item_count(Q)),
203    ok = ping(Producer),
204
205    produce(Q, Producer, 15, true),
206    ?assertEqual(2, couch_work_queue:item_count(Q)),
207    ok = ping(Producer),
208
209    produce(Q, Producer, 20, true),
210    ?assertEqual(3, couch_work_queue:item_count(Q)),
211    Pong = ping(Producer),
212
213    ?_assertEqual(timeout, Pong).
214
215should_block_producer_on_full_queue_size({Q, Producer, _}) ->
216    produce(Q, Producer, 100, true),
217    ok = ping(Producer),
218    ?assertEqual(1, couch_work_queue:item_count(Q)),
219    ?assertEqual(100, couch_work_queue:size(Q)),
220
221    produce(Q, Producer, 110, false),
222    Pong = ping(Producer),
223    ?assertEqual(2, couch_work_queue:item_count(Q)),
224    ?assertEqual(210, couch_work_queue:size(Q)),
225
226    ?_assertEqual(timeout, Pong).
227
228should_consume_multiple_items({Q, Producer, Consumer}) ->
229    Item1 = produce(Q, Producer, 10, true),
230    ok = ping(Producer),
231
232    Item2 = produce(Q, Producer, 15, true),
233    ok = ping(Producer),
234
235    consume(Consumer, 2),
236
237    {ok, Items} = last_consumer_items(Consumer),
238    ?_assertEqual([Item1, Item2], Items).
239
240should_receive_first_queued_item({Q, Producer, Consumer}) ->
241    consume(Consumer, 100),
242    timeout = ping(Consumer),
243
244    Item = produce(Q, Producer, 11, false),
245    ok = ping(Producer),
246
247    ok = ping(Consumer),
248    ?assertEqual(0, couch_work_queue:item_count(Q)),
249
250    {ok, Items} = last_consumer_items(Consumer),
251    ?_assertEqual([Item], Items).
252
253should_consume_all({Q, Producer, Consumer}) ->
254    Item1 = produce(Q, Producer, 10, true),
255    Item2 = produce(Q, Producer, 15, true),
256    Item3 = produce(Q, Producer, 20, true),
257
258    consume(Consumer, all),
259
260    {ok, Items} = last_consumer_items(Consumer),
261    ?_assertEqual([Item1, Item2, Item3], Items).
262
263should_timeout_on_close_non_empty_queue({Q, Producer, _}) ->
264    produce(Q, Producer, 1, true),
265    Status = close_queue(Q),
266
267    ?_assertEqual(timeout, Status).
268
269should_not_block_producer_for_non_empty_queue_after_close({Q, Producer, _}) ->
270    produce(Q, Producer, 1, true),
271    close_queue(Q),
272    Pong = ping(Producer),
273    Size = couch_work_queue:size(Q),
274    Count = couch_work_queue:item_count(Q),
275
276    ?_assertEqual({ok, 1, 1}, {Pong, Size, Count}).
277
278should_be_closed({Q, _, Consumers}) when is_list(Consumers) ->
279    ok = close_queue(Q),
280
281    [consume(C, 1) || C <- Consumers],
282
283    LastConsumerItems = [last_consumer_items(C) || C <- Consumers],
284    ItemsCount = couch_work_queue:item_count(Q),
285    Size = couch_work_queue:size(Q),
286
287    ?_assertEqual({[closed, closed, closed], closed, closed},
288                  {LastConsumerItems, ItemsCount, Size});
289should_be_closed({Q, _, Consumer}) ->
290    ok = close_queue(Q),
291
292    consume(Consumer, 1),
293
294    LastConsumerItems = last_consumer_items(Consumer),
295    ItemsCount = couch_work_queue:item_count(Q),
296    Size = couch_work_queue:size(Q),
297
298    ?_assertEqual({closed, closed, closed},
299                  {LastConsumerItems, ItemsCount, Size}).
300
301
302close_queue(Q) ->
303    test_util:stop_sync(Q, fun() ->
304        ok = couch_work_queue:close(Q)
305    end, ?TIMEOUT).
306
307spawn_consumer(Q) ->
308    Parent = self(),
309    spawn(fun() -> consumer_loop(Parent, Q, nil) end).
310
311consumer_loop(Parent, Q, PrevItem) ->
312    receive
313        {stop, Ref} ->
314            Parent ! {ok, Ref};
315        {ping, Ref} ->
316            Parent ! {pong, Ref},
317            consumer_loop(Parent, Q, PrevItem);
318        {last_item, Ref} ->
319            Parent ! {item, Ref, PrevItem},
320            consumer_loop(Parent, Q, PrevItem);
321        {consume, N} ->
322            Result = couch_work_queue:dequeue(Q, N),
323            consumer_loop(Parent, Q, Result)
324    end.
325
326spawn_producer(Q) ->
327    Parent = self(),
328    spawn(fun() -> producer_loop(Parent, Q) end).
329
330producer_loop(Parent, Q) ->
331    receive
332        {stop, Ref} ->
333            Parent ! {ok, Ref};
334        {ping, Ref} ->
335            Parent ! {pong, Ref},
336            producer_loop(Parent, Q);
337        {produce, Ref, Size} ->
338            Item = crypto:strong_rand_bytes(Size),
339            Parent ! {item, Ref, Item},
340            ok = couch_work_queue:queue(Q, Item),
341            producer_loop(Parent, Q)
342    end.
343
344consume(Consumer, N) ->
345    Consumer ! {consume, N}.
346
347last_consumer_items(Consumer) ->
348    Ref = make_ref(),
349    Consumer ! {last_item, Ref},
350    receive
351        {item, Ref, Items} ->
352            Items
353    after ?TIMEOUT ->
354        timeout
355    end.
356
357produce(Q, Producer, Size, Wait) ->
358    Ref = make_ref(),
359    ItemsCount = couch_work_queue:item_count(Q),
360    Producer ! {produce, Ref, Size},
361    receive
362        {item, Ref, Item} when Wait ->
363            ok = wait_increment(Q, ItemsCount),
364            Item;
365        {item, Ref, Item} ->
366            Item
367    after ?TIMEOUT ->
368        erlang:error({assertion_failed,
369                      [{module, ?MODULE},
370                       {line, ?LINE},
371                       {reason, "Timeout asking producer to produce an item"}]})
372    end.
373
374ping(Pid) ->
375    Ref = make_ref(),
376    Pid ! {ping, Ref},
377    receive
378        {pong, Ref} ->
379            ok
380    after ?TIMEOUT ->
381        timeout
382    end.
383
384stop(Pid, Name) ->
385    Ref = make_ref(),
386    Pid ! {stop, Ref},
387    receive
388        {ok, Ref} -> ok
389    after ?TIMEOUT ->
390        ?debugMsg("Timeout stopping " ++ Name),
391        timeout
392    end.
393
394wait_increment(Q, ItemsCount) ->
395    test_util:wait(fun() ->
396       case couch_work_queue:item_count(Q) > ItemsCount of
397           true ->
398               ok;
399           false ->
400               wait
401       end
402    end).
403