1%%% -*- erlang -*- 2%%% 3%%% This file is part of couchbeam released under the MIT license. 4%%% See the NOTICE for more information. 5 6%% @doc gen_changes CouchDB continuous changes consumer behavior 7%% This behaviour allws you to create easily a server that consume 8%% Couchdb continuous changes 9 10-module(gen_changes). 11 12-include("couchbeam.hrl"). 13 14-behavior(gen_server). 15 16-export([start_link/4]). 17-export([init/1, 18 handle_call/3, 19 handle_cast/2, 20 handle_info/2, 21 terminate/2, 22 code_change/3]). 23-export([behaviour_info/1]). 24 25-export([call/2, 26 call/3, 27 cast/2]). 28 29-export([stop/1, get_seq/1]). 30 31 32behaviour_info(callbacks) -> 33 [{init, 1}, 34 {handle_change, 2}, 35 {handle_call, 3}, 36 {handle_cast, 2}, 37 {handle_info, 2}, 38 {terminate, 2}]; 39behaviour_info(_) -> 40 undefined. 41 42call(Name, Request) -> 43 gen_server:call(Name, Request). 44 45call(Name, Request, Timeout) -> 46 gen_server:call(Name, Request, Timeout). 47 48cast(Dest, Request) -> 49 gen_server:cast(Dest, Request). 50 51%% @doc create a gen_changes process as part of a supervision tree. 52%% The function should be called, directly or indirectly, by the supervisor. 53%% @spec start_link(Module, Db::db(), Options::changesoptions(), 54%% InitArgs::list()) -> term() 55%% changesoptions() = [changeoption()] 56%% changeoption() = {include_docs, string()} | 57%% {filter, string()} | 58%% {since, integer()|string()} | 59%% {heartbeat, string()|boolean()} 60start_link(Module, Db, Options, InitArgs) -> 61 gen_server:start_link(?MODULE, [Module, Db, Options, InitArgs], []). 62 63init([Module, Db, Options, InitArgs]) -> 64 case Module:init(InitArgs) of 65 {ok, ModState} -> 66 case couchbeam_changes:follow(Db, Options) of 67 {ok, StreamRef} -> 68 LastSeq = proplists:get_value(since, Options, 0), 69 {ok, #gen_changes_state{stream_ref=StreamRef, 70 mod=Module, 71 modstate=ModState, 72 db=Db, 73 options=Options, 74 last_seq=LastSeq}}; 75 {error, Error} -> 76 Module:terminate(Error, ModState), 77 {stop, Error} 78 end; 79 Error -> 80 Error 81 end. 82 83stop(Pid) when is_pid(Pid) -> 84 gen_server:cast(Pid, stop). 85 86get_seq(Pid) when is_pid(Pid) -> 87 gen_server:call(Pid, get_seq). 88 89handle_call(get_seq, _From, State=#gen_changes_state{last_seq=Seq}) -> 90 {reply, Seq, State}; 91handle_call(Request, From, 92 State=#gen_changes_state{mod=Module, modstate=ModState}) -> 93 case Module:handle_call(Request, From, ModState) of 94 {reply, Reply, NewModState} -> 95 {reply, Reply, State#gen_changes_state{modstate=NewModState}}; 96 {reply, Reply, NewModState, A} 97 when A =:= hibernate orelse is_number(A) -> 98 {reply, Reply, State#gen_changes_state{modstate=NewModState}, A}; 99 {noreply, NewModState} -> 100 {noreply, State#gen_changes_state{modstate=NewModState}}; 101 {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) -> 102 {noreply, State#gen_changes_state{modstate=NewModState}, A}; 103 {stop, Reason, NewModState} -> 104 {stop, Reason, State#gen_changes_state{modstate=NewModState}}; 105 {stop, Reason, Reply, NewModState} -> 106 {stop, Reason, Reply, State#gen_changes_state{modstate=NewModState}} 107 end. 108 109handle_cast(stop, State) -> 110 {stop, normal, State}; 111handle_cast(Msg, State=#gen_changes_state{mod=Module, modstate=ModState}) -> 112 case Module:handle_cast(Msg, ModState) of 113 {noreply, NewModState} -> 114 {noreply, State#gen_changes_state{modstate=NewModState}}; 115 {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) -> 116 {noreply, State#gen_changes_state{modstate=NewModState}, A}; 117 {stop, Reason, NewModState} -> 118 {stop, Reason, State#gen_changes_state{modstate=NewModState}} 119 end. 120 121 122handle_info({Ref, Msg}, 123 State=#gen_changes_state{mod=Module, modstate=ModState, 124 stream_ref=Ref}) -> 125 126 State2 = case Msg of 127 {done, LastSeq} -> 128 State#gen_changes_state{last_seq=LastSeq}; 129 {change, Change} -> 130 Seq = couchbeam_doc:get_value(<<"seq">>, Change), 131 State#gen_changes_state{last_seq=Seq} 132 end, 133 134 case catch Module:handle_change(Msg, ModState) of 135 {noreply, NewModState} -> 136 {noreply, State2#gen_changes_state{modstate=NewModState}}; 137 {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) -> 138 {noreply, State2#gen_changes_state{modstate=NewModState}, A}; 139 {stop, Reason, NewModState} -> 140 {stop, Reason, State2#gen_changes_state{modstate=NewModState}} 141 end; 142 143 144handle_info({Ref, {error, Error}}, 145 State=#gen_changes_state{stream_ref=Ref, last_seq=LastSeq}) -> 146 handle_info({error, [Error, {last_seq, LastSeq}]}, State); 147 148handle_info(Info, State=#gen_changes_state{mod=Module, modstate=ModState}) -> 149 case Module:handle_info(Info, ModState) of 150 {noreply, NewModState} -> 151 {noreply, State#gen_changes_state{modstate=NewModState}}; 152 {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) -> 153 {noreply, State#gen_changes_state{modstate=NewModState}, A}; 154 {stop, Reason, NewModState} -> 155 {stop, Reason, State#gen_changes_state{modstate=NewModState}} 156 end. 157 158code_change(_OldVersion, State, _Extra) -> 159 %% TODO: support code changes? 160 {ok, State}. 161 162terminate(Reason, #gen_changes_state{stream_ref=Ref, 163 mod=Module, modstate=ModState}) -> 164 Module:terminate(Reason, ModState), 165 couchbeam_changes:cancel_stream(Ref), 166 ok. 167