1%% This Source Code Form is subject to the terms of the Mozilla Public
2%% License, v. 2.0. If a copy of the MPL was not distributed with this
3%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4%%
5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates.  All rights reserved.
6%%
7
8-module(gatherer).
9
10%% Gatherer is a queue which has producer and consumer processes. Before producers
11%% push items to the queue using gatherer:in/2 they need to declare their intent
12%% to do so with gatherer:fork/1. When a publisher's work is done, it states so
13%% using gatherer:finish/1.
14%%
15%% Consumers pop messages off queues with gatherer:out/1. If a queue is empty
16%% and there are producers that haven't finished working, the caller is blocked
17%% until an item is available. If there are no active producers, gatherer:out/1
18%% immediately returns 'empty'.
19%%
20%% This module is primarily used to collect results from asynchronous tasks
21%% running in a worker pool, e.g. when recovering bindings or rebuilding
22%% message store indices.
23
24-behaviour(gen_server2).
25
26-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]).
27
28-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
29         terminate/2, code_change/3]).
30
31%%----------------------------------------------------------------------------
32
33-define(HIBERNATE_AFTER_MIN, 1000).
34-define(DESIRED_HIBERNATE, 10000).
35
36%%----------------------------------------------------------------------------
37
38-record(gstate, { forks, values, blocked }).
39
40%%----------------------------------------------------------------------------
41
42-spec start_link() -> rabbit_types:ok_pid_or_error().
43
44start_link() ->
45    gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
46
47-spec stop(pid()) -> 'ok'.
48
49stop(Pid) ->
50    unlink(Pid),
51    gen_server2:call(Pid, stop, infinity).
52
53-spec fork(pid()) -> 'ok'.
54
55fork(Pid) ->
56    gen_server2:call(Pid, fork, infinity).
57
58-spec finish(pid()) -> 'ok'.
59
60finish(Pid) ->
61    gen_server2:cast(Pid, finish).
62
63-spec in(pid(), any()) -> 'ok'.
64
65in(Pid, Value) ->
66    gen_server2:cast(Pid, {in, Value}).
67
68-spec sync_in(pid(), any()) -> 'ok'.
69
70sync_in(Pid, Value) ->
71    gen_server2:call(Pid, {in, Value}, infinity).
72
73-spec out(pid()) -> {'value', any()} | 'empty'.
74
75out(Pid) ->
76    gen_server2:call(Pid, out, infinity).
77
78%%----------------------------------------------------------------------------
79
80init([]) ->
81    {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() },
82     hibernate,
83     {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
84
85handle_call(stop, _From, State) ->
86    {stop, normal, ok, State};
87
88handle_call(fork, _From, State = #gstate { forks = Forks }) ->
89    {reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
90
91handle_call({in, Value}, From, State) ->
92    {noreply, in(Value, From, State), hibernate};
93
94handle_call(out, From, State = #gstate { forks   = Forks,
95                                         values  = Values,
96                                         blocked = Blocked }) ->
97    case queue:out(Values) of
98        {empty, _} when Forks == 0 ->
99            {reply, empty, State, hibernate};
100        {empty, _} ->
101            {noreply, State #gstate { blocked = queue:in(From, Blocked) },
102             hibernate};
103        {{value, {PendingIn, Value}}, NewValues} ->
104            reply(PendingIn, ok),
105            {reply, {value, Value}, State #gstate { values = NewValues },
106             hibernate}
107    end;
108
109handle_call(Msg, _From, State) ->
110    {stop, {unexpected_call, Msg}, State}.
111
112handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
113    NewForks = Forks - 1,
114    NewBlocked = case NewForks of
115                     0 -> _ = [gen_server2:reply(From, empty) ||
116                                  From <- queue:to_list(Blocked)],
117                          queue:new();
118                     _ -> Blocked
119                 end,
120    {noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
121     hibernate};
122
123handle_cast({in, Value}, State) ->
124    {noreply, in(Value, undefined, State), hibernate};
125
126handle_cast(Msg, State) ->
127    {stop, {unexpected_cast, Msg}, State}.
128
129handle_info(Msg, State) ->
130    {stop, {unexpected_info, Msg}, State}.
131
132code_change(_OldVsn, State, _Extra) ->
133    {ok, State}.
134
135terminate(_Reason, State) ->
136    State.
137
138%%----------------------------------------------------------------------------
139
140in(Value, From,  State = #gstate { values = Values, blocked = Blocked }) ->
141    case queue:out(Blocked) of
142        {empty, _} ->
143            State #gstate { values = queue:in({From, Value}, Values) };
144        {{value, PendingOut}, NewBlocked} ->
145            reply(From, ok),
146            gen_server2:reply(PendingOut, {value, Value}),
147            State #gstate { blocked = NewBlocked }
148    end.
149
150reply(undefined, _Reply) -> ok;
151reply(From,       Reply) -> gen_server2:reply(From, Reply).
152