1%%% -*- erlang -*-
2%%%
3%%% This file is part of couchbeam released under the MIT license.
4%%% See the NOTICE for more information.
5
6%% @doc gen_changes CouchDB continuous changes consumer behavior
7%% This behaviour allws you to create easily a server that consume
8%% Couchdb continuous changes
9
10-module(gen_changes).
11
12-include("couchbeam.hrl").
13
14-behavior(gen_server).
15
16-export([start_link/4]).
17-export([init/1,
18         handle_call/3,
19         handle_cast/2,
20         handle_info/2,
21         terminate/2,
22         code_change/3]).
23-export([behaviour_info/1]).
24
25-export([call/2,
26         call/3,
27         cast/2]).
28
29-export([stop/1, get_seq/1]).
30
31
32behaviour_info(callbacks) ->
33    [{init, 1},
34     {handle_change, 2},
35     {handle_call, 3},
36     {handle_cast, 2},
37     {handle_info, 2},
38     {terminate, 2}];
39behaviour_info(_) ->
40    undefined.
41
42call(Name, Request) ->
43    gen_server:call(Name, Request).
44
45call(Name, Request, Timeout) ->
46    gen_server:call(Name, Request, Timeout).
47
48cast(Dest, Request) ->
49    gen_server:cast(Dest, Request).
50
51%% @doc create a gen_changes process as part of a supervision tree.
52%% The function should be called, directly or indirectly, by the supervisor.
53%% @spec start_link(Module, Db::db(), Options::changesoptions(),
54%%                  InitArgs::list()) -> term()
55%%       changesoptions() = [changeoption()]
56%%       changeoption() = {include_docs, string()} |
57%%                  {filter, string()} |
58%%                  {since, integer()|string()} |
59%%                  {heartbeat, string()|boolean()}
60start_link(Module, Db, Options, InitArgs) ->
61    gen_server:start_link(?MODULE, [Module, Db, Options, InitArgs], []).
62
63init([Module, Db, Options, InitArgs]) ->
64    case Module:init(InitArgs) of
65        {ok, ModState} ->
66            case couchbeam_changes:follow(Db, Options) of
67            {ok, StreamRef} ->
68                LastSeq = proplists:get_value(since, Options, 0),
69                {ok, #gen_changes_state{stream_ref=StreamRef,
70                                        mod=Module,
71                                        modstate=ModState,
72                                        db=Db,
73                                        options=Options,
74                                        last_seq=LastSeq}};
75            {error, Error} ->
76                Module:terminate(Error, ModState),
77                {stop, Error}
78            end;
79        Error ->
80            Error
81    end.
82
83stop(Pid) when is_pid(Pid) ->
84    gen_server:cast(Pid, stop).
85
86get_seq(Pid) when is_pid(Pid) ->
87    gen_server:call(Pid, get_seq).
88
89handle_call(get_seq, _From, State=#gen_changes_state{last_seq=Seq}) ->
90    {reply, Seq, State};
91handle_call(Request, From,
92            State=#gen_changes_state{mod=Module, modstate=ModState}) ->
93    case Module:handle_call(Request, From, ModState) of
94        {reply, Reply, NewModState} ->
95            {reply, Reply, State#gen_changes_state{modstate=NewModState}};
96        {reply, Reply, NewModState, A}
97          when A =:= hibernate orelse is_number(A) ->
98            {reply, Reply, State#gen_changes_state{modstate=NewModState}, A};
99        {noreply, NewModState} ->
100            {noreply, State#gen_changes_state{modstate=NewModState}};
101        {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) ->
102            {noreply, State#gen_changes_state{modstate=NewModState}, A};
103        {stop, Reason, NewModState} ->
104            {stop, Reason, State#gen_changes_state{modstate=NewModState}};
105        {stop, Reason, Reply, NewModState} ->
106            {stop, Reason, Reply, State#gen_changes_state{modstate=NewModState}}
107  end.
108
109handle_cast(stop, State) ->
110    {stop, normal, State};
111handle_cast(Msg, State=#gen_changes_state{mod=Module, modstate=ModState}) ->
112    case Module:handle_cast(Msg, ModState) of
113        {noreply, NewModState} ->
114            {noreply, State#gen_changes_state{modstate=NewModState}};
115        {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) ->
116            {noreply, State#gen_changes_state{modstate=NewModState}, A};
117        {stop, Reason, NewModState} ->
118            {stop, Reason, State#gen_changes_state{modstate=NewModState}}
119    end.
120
121
122handle_info({Ref, Msg},
123        State=#gen_changes_state{mod=Module, modstate=ModState,
124            stream_ref=Ref}) ->
125
126    State2 = case Msg of
127        {done, LastSeq} ->
128            State#gen_changes_state{last_seq=LastSeq};
129        {change, Change} ->
130            Seq = couchbeam_doc:get_value(<<"seq">>, Change),
131            State#gen_changes_state{last_seq=Seq}
132    end,
133
134    case catch Module:handle_change(Msg, ModState) of
135        {noreply, NewModState} ->
136            {noreply, State2#gen_changes_state{modstate=NewModState}};
137        {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) ->
138            {noreply, State2#gen_changes_state{modstate=NewModState}, A};
139        {stop, Reason, NewModState} ->
140            {stop, Reason, State2#gen_changes_state{modstate=NewModState}}
141    end;
142
143
144handle_info({Ref, {error, Error}},
145        State=#gen_changes_state{stream_ref=Ref, last_seq=LastSeq}) ->
146    handle_info({error, [Error, {last_seq, LastSeq}]}, State);
147
148handle_info(Info, State=#gen_changes_state{mod=Module, modstate=ModState}) ->
149    case Module:handle_info(Info, ModState) of
150        {noreply, NewModState} ->
151            {noreply, State#gen_changes_state{modstate=NewModState}};
152        {noreply, NewModState, A} when A =:= hibernate orelse is_number(A) ->
153            {noreply, State#gen_changes_state{modstate=NewModState}, A};
154        {stop, Reason, NewModState} ->
155            {stop, Reason, State#gen_changes_state{modstate=NewModState}}
156    end.
157
158code_change(_OldVersion, State, _Extra) ->
159    %% TODO:  support code changes?
160    {ok, State}.
161
162terminate(Reason, #gen_changes_state{stream_ref=Ref,
163        mod=Module, modstate=ModState}) ->
164    Module:terminate(Reason, ModState),
165    couchbeam_changes:cancel_stream(Ref),
166    ok.
167