1%% This Source Code Form is subject to the terms of the Mozilla Public 2%% License, v. 2.0. If a copy of the MPL was not distributed with this 3%% file, You can obtain one at https://mozilla.org/MPL/2.0/. 4%% 5%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved. 6%% 7 8-module(gatherer). 9 10%% Gatherer is a queue which has producer and consumer processes. Before producers 11%% push items to the queue using gatherer:in/2 they need to declare their intent 12%% to do so with gatherer:fork/1. When a publisher's work is done, it states so 13%% using gatherer:finish/1. 14%% 15%% Consumers pop messages off queues with gatherer:out/1. If a queue is empty 16%% and there are producers that haven't finished working, the caller is blocked 17%% until an item is available. If there are no active producers, gatherer:out/1 18%% immediately returns 'empty'. 19%% 20%% This module is primarily used to collect results from asynchronous tasks 21%% running in a worker pool, e.g. when recovering bindings or rebuilding 22%% message store indices. 23 24-behaviour(gen_server2). 25 26-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]). 27 28-export([init/1, handle_call/3, handle_cast/2, handle_info/2, 29 terminate/2, code_change/3]). 30 31%%---------------------------------------------------------------------------- 32 33-define(HIBERNATE_AFTER_MIN, 1000). 34-define(DESIRED_HIBERNATE, 10000). 35 36%%---------------------------------------------------------------------------- 37 38-record(gstate, { forks, values, blocked }). 39 40%%---------------------------------------------------------------------------- 41 42-spec start_link() -> rabbit_types:ok_pid_or_error(). 43 44start_link() -> 45 gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). 46 47-spec stop(pid()) -> 'ok'. 48 49stop(Pid) -> 50 unlink(Pid), 51 gen_server2:call(Pid, stop, infinity). 52 53-spec fork(pid()) -> 'ok'. 54 55fork(Pid) -> 56 gen_server2:call(Pid, fork, infinity). 57 58-spec finish(pid()) -> 'ok'. 59 60finish(Pid) -> 61 gen_server2:cast(Pid, finish). 62 63-spec in(pid(), any()) -> 'ok'. 64 65in(Pid, Value) -> 66 gen_server2:cast(Pid, {in, Value}). 67 68-spec sync_in(pid(), any()) -> 'ok'. 69 70sync_in(Pid, Value) -> 71 gen_server2:call(Pid, {in, Value}, infinity). 72 73-spec out(pid()) -> {'value', any()} | 'empty'. 74 75out(Pid) -> 76 gen_server2:call(Pid, out, infinity). 77 78%%---------------------------------------------------------------------------- 79 80init([]) -> 81 {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() }, 82 hibernate, 83 {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. 84 85handle_call(stop, _From, State) -> 86 {stop, normal, ok, State}; 87 88handle_call(fork, _From, State = #gstate { forks = Forks }) -> 89 {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; 90 91handle_call({in, Value}, From, State) -> 92 {noreply, in(Value, From, State), hibernate}; 93 94handle_call(out, From, State = #gstate { forks = Forks, 95 values = Values, 96 blocked = Blocked }) -> 97 case queue:out(Values) of 98 {empty, _} when Forks == 0 -> 99 {reply, empty, State, hibernate}; 100 {empty, _} -> 101 {noreply, State #gstate { blocked = queue:in(From, Blocked) }, 102 hibernate}; 103 {{value, {PendingIn, Value}}, NewValues} -> 104 reply(PendingIn, ok), 105 {reply, {value, Value}, State #gstate { values = NewValues }, 106 hibernate} 107 end; 108 109handle_call(Msg, _From, State) -> 110 {stop, {unexpected_call, Msg}, State}. 111 112handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> 113 NewForks = Forks - 1, 114 NewBlocked = case NewForks of 115 0 -> _ = [gen_server2:reply(From, empty) || 116 From <- queue:to_list(Blocked)], 117 queue:new(); 118 _ -> Blocked 119 end, 120 {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, 121 hibernate}; 122 123handle_cast({in, Value}, State) -> 124 {noreply, in(Value, undefined, State), hibernate}; 125 126handle_cast(Msg, State) -> 127 {stop, {unexpected_cast, Msg}, State}. 128 129handle_info(Msg, State) -> 130 {stop, {unexpected_info, Msg}, State}. 131 132code_change(_OldVsn, State, _Extra) -> 133 {ok, State}. 134 135terminate(_Reason, State) -> 136 State. 137 138%%---------------------------------------------------------------------------- 139 140in(Value, From, State = #gstate { values = Values, blocked = Blocked }) -> 141 case queue:out(Blocked) of 142 {empty, _} -> 143 State #gstate { values = queue:in({From, Value}, Values) }; 144 {{value, PendingOut}, NewBlocked} -> 145 reply(From, ok), 146 gen_server2:reply(PendingOut, {value, Value}), 147 State #gstate { blocked = NewBlocked } 148 end. 149 150reply(undefined, _Reply) -> ok; 151reply(From, Reply) -> gen_server2:reply(From, Reply). 152