1%%% -*- erlang -*-
2%%%
3%%% This file is part of couchbeam released under the MIT license.
4%%% See the NOTICE for more information.
5%%%
6
7-module(couchbeam_changes_stream).
8
9-export([start_link/4]).
10
11-export([init_stream/5,
12         maybe_continue/1,
13         wait_reconnect/1,
14         system_continue/3,
15         system_terminate/4,
16         system_code_change/4]).
17
18-export([init/1,
19         handle_event/2,
20         wait_results/2,
21         wait_results1/2,
22         collect_object/2,
23         maybe_continue_decoding/1]).
24
25-include("couchbeam.hrl").
26
27
28-record(state, {parent,
29                owner,
30                ref,
31                mref,
32                db,
33                options,
34                client_ref=nil,
35                decoder,
36                feed_type=continuous,
37                reconnect_after=1000,
38                async=normal}).
39
40-define(TIMEOUT, 10000).
41
42
43start_link(Owner, StreamRef, Db, Options) ->
44    proc_lib:start_link(?MODULE, init_stream, [self(), Owner, StreamRef,
45                                               Db, Options]).
46
47
48init_stream(Parent, Owner, StreamRef, Db, Options) ->
49    %% clean options
50    Options1 = parse_options(Options, []),
51
52    %% reconnect option, time to wait before re.connecting when using a
53    %% longpoll feed. Default is 1s.
54    ReconnectAfter = proplists:get_value(reconnect_after, Options1, 1000),
55
56    %% type of asynchronous request
57    Async = proplists:get_value(async, Options, normal),
58
59    %% feed type
60    {FeedType, FinalOptions} = case proplists:get_value(feed, Options1) of
61        undefined ->
62            {continuous, [{feed, continuous} | Options1]};
63        Type ->
64            {Type, Options1}
65    end,
66
67    %% Get since
68    Since = proplists:get_value(since, FinalOptions, 0),
69
70    %% monitor the process receiving§ the messages
71    MRef = erlang:monitor(process, Owner),
72
73    %% initial state
74    InitState = #state{parent=Parent,
75                       owner=Owner,
76                       ref=StreamRef,
77                       mref=MRef,
78                       db=Db,
79                       options=FinalOptions,
80                       feed_type=FeedType,
81                       reconnect_after=ReconnectAfter,
82                       async=Async},
83
84    %% connect to the changes
85    {ok, State} = do_init_stream(InitState),
86
87    %% register the stream
88    ets:insert(couchbeam_changes_streams, [{StreamRef, self()}]),
89
90    %% initialise the last sequece
91    put(last_seq, Since),
92
93    %% tell to the parent that we are ok
94    proc_lib:init_ack(Parent, {ok, self()}),
95
96    %% start the loop
97    loop(State),
98    %% stop to monitor the parent
99    erlang:demonitor(MRef),
100    ok.
101
102do_init_stream(#state{mref=MRef,
103                      db=Db,
104                      options=Options,
105                      feed_type=FeedType}=State) ->
106    #db{server=Server, options=ConnOpts} = Db,
107    %% we are doing the request asynchronously
108    ConnOpts1 = [{async, once}, {recv_timeout, infinity}| ConnOpts],
109
110    %% if we are filtering the changes using docids, send a POST request
111    %% instead of a GET to make sure it will be accepted whatever the
112    %% number of doc ids given.
113    {DocIds, Options1} = case proplists:get_value(doc_ids, Options) of
114        undefined ->
115            {[], Options};
116        [] ->
117             {[], Options};
118        Ids ->
119            {Ids, proplists:delete(doc_ids, Options)}
120    end,
121
122    %% make the changes url
123    Url = hackney_url:make_url(couchbeam_httpc:server_url(Server),
124                               [couchbeam_httpc:db_url(Db), <<"_changes">>],
125                               Options1),
126
127    {ok, ClientRef} = case DocIds of
128        [] ->
129            couchbeam_httpc:request(get, Url, [], <<>>, ConnOpts1);
130        DocIds ->
131            Body =  couchbeam_ejson:encode({[{<<"doc_ids">>, DocIds}]}),
132            Headers = [{<<"Content-Type">>, <<"application/json">>}],
133            couchbeam_httpc:request(post, Url, Headers, Body, ConnOpts1)
134    end,
135    receive
136        {'DOWN', MRef, _, _, _} ->
137            %% parent exited there is no need to continue
138            exit(normal);
139        {hackney_response, ClientRef, {status, 200, _}} ->
140            State1 = State#state{client_ref=ClientRef},
141            DecoderFun = case FeedType of
142                longpoll ->
143                    jsx:decoder(?MODULE, [State1], [stream]);
144                _ ->
145                    nil
146            end,
147            {ok, State1#state{decoder=DecoderFun}};
148        {hackney_response, ClientRef, {error, Reason}} ->
149            exit(Reason)
150    after ?TIMEOUT ->
151           exit(timeout)
152    end.
153
154
155loop(#state{owner=Owner,
156            ref=StreamRef,
157            mref=MRef,
158            client_ref=ClientRef}=State) ->
159
160
161    hackney:stream_next(ClientRef),
162    receive
163        {'DOWN', MRef, _, _, _} ->
164            %% parent exited there is no need to continue
165            exit(normal);
166        {hackney_response, ClientRef, {headers, _Headers}} ->
167            loop(State);
168        {hackney_response, ClientRef, done} ->
169            maybe_reconnect(State);
170        {hackney_response, ClientRef, <<"\n">>} ->
171             maybe_continue(State);
172        {hackney_response, ClientRef, Data} when is_binary(Data) ->
173            decode_data(Data, State);
174        {hackney_response, ClientRef, Error} ->
175            ets:delete(couchbeam_changes_streams, StreamRef),
176            %% report the error
177            report_error(Error, StreamRef, Owner),
178            exit(Error)
179    end.
180
181
182maybe_reconnect(#state{ref=Ref,
183                       options=Options,
184                       feed_type=longpoll,
185                       reconnect_after=After}=State)
186        when is_integer(After) ->
187    %% longpoll connections, we will restart after the delay
188
189    %% update the state so we will restart on the last sequence
190    LastSeq = get(last_seq),
191    Options1 = couchbeam_util:force_param(since, LastSeq, Options),
192    NState = State#state{options=Options1,
193                         client_ref=nil},
194    %% send the message in the interval
195    erlang:send_after(After, self(), {Ref, reconnect}),
196    %% hibernate the process, waiting on the reconnect message
197    erlang:hibernate(?MODULE, wait_reconnect, [NState]);
198maybe_reconnect(#state{owner=Owner, ref=StreamRef}) ->
199    %% stop the change stream
200    %% unregister the stream
201    ets:delete(couchbeam_changes_streams, StreamRef),
202    %% tell to the owner that we are done and exit,
203    LastSeq = get(last_seq),
204    Owner ! {StreamRef, {done, LastSeq}}.
205
206%% wait to reconnect
207wait_reconnect(#state{parent=Parent,
208                      owner=Owner,
209                      mref=MRef,
210                      ref=Ref}=State) ->
211    receive
212        {'DOWN', MRef, _, _, _} ->
213            %% parent exited there is no need to continue
214            exit(normal);
215        {Ref, cancel} ->
216            maybe_close(State),
217            %% unregister the stream
218            ets:delete(couchbeam_changes_streams, Ref),
219            %% tell the parent we exited
220            Owner ! {Ref, ok};
221        {Ref, reconnect} ->
222            {ok, NState} = do_init_stream(State),
223            loop(NState);
224        {Ref, _} ->
225            wait_reconnect(State);
226        {system, From, Request} ->
227            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
228                                  {wait_reconnect, State});
229        Else ->
230            error_logger:error_msg("Unexpected message: ~w~n", [Else]),
231            %% unregister the stream
232            ets:delete(couchbeam_changes_streams, Ref),
233            %% report the error
234            report_error(Else, Ref, Owner),
235            exit(Else)
236    after 0 ->
237            loop(State)
238    end.
239
240
241seq(Props,#state{owner=Owner,ref=Ref}) ->
242  Seq = couchbeam_util:get_value(<<"seq">>, Props),
243  put(last_seq, Seq),
244  Owner ! {Ref, {change, {Props}}}.
245
246decode(Data) ->
247  jsx:decode(Data,[return_tail,stream]).
248
249decodefun(nil) ->
250  fun(Data) -> decode(Data) end;
251decodefun(Fun) ->
252  Fun.
253
254decode_with_tail(Data, Fun, State) ->
255  case (decodefun(Fun))(Data) of
256    {with_tail,Props,Rest} ->
257      seq(Props,State),
258      decode_with_tail(Rest,decodefun(nil),State);
259    Other -> Other
260  end.
261
262decode_data(Data, #state{feed_type=continuous,
263  decoder=DecodeFun}=State) ->
264
265  {incomplete, DecodeFun2} =
266    try
267      decode_with_tail(Data,DecodeFun,State)
268    catch error:badarg -> exit(badarg)
269    end,
270
271  try DecodeFun2(end_stream) of
272    Props ->
273      seq(Props,State),
274      maybe_continue(State#state{decoder=nil})
275  catch error:badarg -> maybe_continue(State#state{decoder=DecodeFun2})
276  end;
277decode_data(Data, #state{client_ref=ClientRef,
278                         decoder=DecodeFun}=State) ->
279    try
280        {incomplete, DecodeFun2} = DecodeFun(Data),
281        try DecodeFun2(end_stream) of done ->
282            %% stop the request
283            {ok, _} = hackney:stop_async(ClientRef),
284            %% skip the rest of the body so the socket is
285            %% replaced in the pool
286            hackney:skip_body(ClientRef),
287            %% maybe reconnect
288            maybe_reconnect(State)
289        catch error:badarg ->
290            maybe_continue(State#state{decoder=DecodeFun2})
291        end
292    catch error:badarg -> exit(badarg)
293    end.
294
295maybe_continue(#state{parent=Parent,
296                      owner=Owner,
297                      ref=Ref,
298                      mref=MRef,
299                      async=once}=State) ->
300
301    receive
302        {'DOWN', MRef, _, _, _} ->
303            %% parent exited there is no need to continue
304            exit(normal);
305        {Ref, stream_next} ->
306            loop(State);
307        {Ref, cancel} ->
308            maybe_close(State),
309            %% unregister the stream
310            ets:delete(couchbeam_changes_streams, Ref),
311            %% tell the parent we exited
312            Owner ! {Ref, ok};
313        {system, From, Request} ->
314            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
315                                  {loop, State});
316        Else ->
317            error_logger:error_msg("Unexpected message: ~w~n", [Else]),
318            %% unregister the stream
319            ets:delete(couchbeam_changes_streams, Ref),
320            %% report the error
321            report_error(Else, Ref, Owner),
322            exit(Else)
323    after 0 ->
324            loop(State)
325    end;
326maybe_continue(#state{parent=Parent,
327                      owner=Owner,
328                      ref=Ref,
329                      mref=MRef}=State) ->
330    receive
331        {'DOWN', MRef, _, _, _} ->
332            %% parent exited there is no need to continue
333            exit(normal);
334        {Ref, cancel} ->
335            maybe_close(State),
336            %% unregister the stream
337            ets:delete(couchbeam_changes_streams, Ref),
338            %% tell the parent we exited
339            Owner ! {Ref, ok};
340        {Ref, pause} ->
341            erlang:hibernate(?MODULE, maybe_continue, [State]);
342        {Ref, resume} ->
343            loop(State);
344        {system, From, Request} ->
345            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
346                                  {loop, State});
347        Else ->
348            error_logger:error_msg("Unexpected message: ~w~n", [Else]),
349            %% unregister the stream
350            ets:delete(couchbeam_changes_streams, Ref),
351            %% report the error
352            report_error(Else, Ref, Owner),
353            exit(Else)
354    after 0 ->
355            loop(State)
356    end.
357
358system_continue(_, _, {wait_reconnect, State}) ->
359    wait_reconnect(State);
360system_continue(_, _, {maybe_continue, State}) ->
361    maybe_continue(State);
362system_continue(_, _, {loop, State}) ->
363    loop(State).
364
365-spec system_terminate(any(), _, _, _) -> no_return().
366system_terminate(Reason, _, _, #state{ref=StreamRef}) ->
367    %% unregister the stream
368    catch ets:delete(couchbeam_changes_streams, StreamRef),
369    exit(Reason).
370
371system_code_change(Misc, _, _, _) ->
372    {ok, Misc}.
373
374
375%%% json decoder %%%
376
377init([State]) ->
378    {wait_results, 0, [[]], State}.
379
380
381handle_event(end_json, _) ->
382    done;
383handle_event(Event, {Fun, _, _, _}=St) ->
384    ?MODULE:Fun(Event, St).
385
386
387
388wait_results(start_object, St) ->
389    St;
390wait_results(end_object, St) ->
391    St;
392wait_results({key, <<"results">>}, {_, _, _, St}) ->
393    {wait_results1, 0, [[]], St};
394wait_results(_,  {_, _, _, St}) ->
395    {wait_results, 0, [[]], St}.
396
397
398
399wait_results1(start_array, {_, _, _, St}) ->
400    {wait_results1, 0, [[]], St};
401wait_results1(start_object, {_, _, Terms, St}) ->
402    {collect_object, 0, [[]|Terms], St};
403wait_results1(end_array, {_, _, _, St}) ->
404    {wait_results, 0, [[]], St}.
405
406
407collect_object(start_object, {_, NestCount, Terms, St}) ->
408    {collect_object, NestCount + 1, [[]|Terms], St};
409
410collect_object(end_object, {_, NestCount, [[], {key, Key}, Last|Terms],
411                           St}) ->
412    {collect_object, NestCount - 1, [[{Key, {[{}]}}] ++ Last] ++ Terms,
413     St};
414
415collect_object(end_object, {_, NestCount, [Object, {key, Key},
416                                           Last|Terms], St}) ->
417    {collect_object, NestCount - 1,
418     [[{Key, {lists:reverse(Object)}}] ++ Last] ++ Terms, St};
419
420collect_object(end_object, {_, 0, [[], Last|Terms], St}) ->
421    [[Change]] = [[{[{}]}] ++ Last] ++ Terms,
422    send_change(Change, St);
423
424collect_object(end_object, {_, NestCount, [[], Last|Terms], St}) ->
425    {collect_object, NestCount - 1, [[{[{}]}] ++ Last] ++ Terms, St};
426
427collect_object(end_object, {_, 0, [Object, Last|Terms], St}) ->
428    [[Change]] = [[{lists:reverse(Object)}] ++ Last] ++ Terms,
429    send_change(Change, St);
430
431
432collect_object(end_object, {_, NestCount, [Object, Last|Terms], St}) ->
433    Acc = [[{lists:reverse(Object)}] ++ Last] ++ Terms,
434    {collect_object, NestCount - 1, Acc, St};
435
436
437collect_object(start_array, {_, NestCount, Terms, St}) ->
438    {collect_object, NestCount, [[]|Terms], St};
439collect_object(end_array, {_, NestCount, [List, {key, Key}, Last|Terms],
440                          St}) ->
441    {collect_object, NestCount,
442     [[{Key, lists:reverse(List)}] ++ Last] ++ Terms, St};
443collect_object(end_array, {_, NestCount, [List, Last|Terms], St}) ->
444    {collect_object, NestCount, [[lists:reverse(List)] ++ Last] ++ Terms,
445     St};
446
447collect_object({key, Key}, {_, NestCount, Terms, St}) ->
448    {collect_object, NestCount, [{key, Key}] ++ Terms,
449     St};
450
451collect_object({_, Event}, {_, NestCount, [{key, Key}, Last|Terms], St}) ->
452    {collect_object, NestCount, [[{Key, Event}] ++ Last] ++ Terms, St};
453collect_object({_, Event}, {_, NestCount, [Last|Terms], St}) ->
454    {collect_object, NestCount, [[Event] ++ Last] ++ Terms, St}.
455
456send_change({Props}=Change, #state{owner=Owner, ref=Ref}=St) ->
457    Seq = couchbeam_util:get_value(<<"seq">>, Props),
458    put(last_seq, Seq),
459    Owner ! {Ref, {change, Change}},
460    maybe_continue_decoding(St).
461
462
463%% eventually wait for the next call from the parent
464maybe_continue_decoding(#state{parent=Parent,
465                               owner=Owner,
466                               ref=Ref,
467                               mref=MRef,
468                               client_ref=ClientRef,
469                               async=once}=St) ->
470    receive
471        {'DOWN', MRef, _, _, _} ->
472            %% parent exited there is no need to continue
473            exit(normal);
474        {Ref, stream_next} ->
475            {wait_results1, 0, [[]], St};
476        {Ref, cancel} ->
477            hackney:close(ClientRef),
478            %% unregister the stream
479            ets:delete(couchbeam_changes_streams, Ref),
480            %% tell the parent we exited
481            Owner ! {Ref, ok},
482            %% and exit
483            exit(normal);
484        {system, From, Request} ->
485            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
486                                  {maybe_continue_decoding, St});
487        Else ->
488            error_logger:error_msg("Unexpected message: ~w~n", [Else]),
489            %% unregister the stream
490            ets:delete(couchbeam_changes_streams, Ref),
491            %% report the error
492            report_error(Else, Ref, Owner),
493            exit(Else)
494    after 5000 ->
495            erlang:hibernate(?MODULE, maybe_continue_decoding, [St])
496    end;
497
498maybe_continue_decoding(#state{parent=Parent,
499                               owner=Owner,
500                               ref=Ref,
501                               mref=MRef,
502                               client_ref=ClientRef}=St) ->
503    receive
504        {'DOWN', MRef, _, _, _} ->
505            %% parent exited there is no need to continue
506            exit(normal);
507        {Ref, cancel} ->
508            hackney:close(ClientRef),
509            Owner ! {Ref, ok},
510            exit(normal);
511        {Ref, pause} ->
512            erlang:hibernate(?MODULE, maybe_continue_decoding, [St]);
513        {Ref, resume} ->
514            {wait_results1, 0, [[]], St};
515        {system, From, Request} ->
516            sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
517                                  {maybe_continue_decoding, St});
518        Else ->
519            error_logger:error_msg("Unexpected message: ~w~n", [Else]),
520            report_error(Else, Ref, Owner),
521            exit(Else)
522    after 0 ->
523        {wait_results1, 0, [[]], St}
524    end.
525
526%% @private
527
528%% parse options to get feed type when it's not passed in a tuple
529%% to support the old api.
530parse_options([], Acc) ->
531    lists:reverse(Acc);
532parse_options([normal | Rest], Acc) ->
533    parse_options(Rest, couchbeam_util:force_param(feed, continuous, Acc));
534parse_options([continuous | Rest], Acc) ->
535    parse_options(Rest, couchbeam_util:force_param(feed, continuous, Acc));
536parse_options([longpoll | Rest], Acc) ->
537    parse_options(Rest, couchbeam_util:force_param(feed, longpoll, Acc));
538parse_options([heartbeat | Rest], Acc) ->
539    parse_options(Rest, couchbeam_util:force_param(heartbeat, true, Acc));
540parse_options([descending | Rest], Acc) ->
541    parse_options(Rest, couchbeam_util:force_param(descending, true, Acc));
542parse_options([conflicts | Rest], Acc) ->
543    parse_options(Rest, couchbeam_util:force_param(conflicts, true, Acc));
544parse_options([include_docs | Rest], Acc) ->
545    parse_options(Rest, couchbeam_util:force_param(include_docs, true, Acc));
546parse_options([{K, V} | Rest], Acc) ->
547    parse_options(Rest, [{K, V} | Acc]).
548
549%% report errors
550report_error({error, _What}=Error, Ref, Pid) ->
551    Pid ! {Ref, Error};
552report_error(What, Ref, Pid) ->
553    Pid ! {Ref, {error, What}}.
554
555%% close the connection only if one is active
556maybe_close(#state{client_ref=nil}) ->
557    ok;
558maybe_close(#state{client_ref=Ref}) ->
559    hackney:close(Ref).
560
561post_decode([{}]) ->
562    {[]};
563post_decode([{_Key, _Value} | _Rest] = PropList) ->
564    {[ {Key, post_decode(Value)} || {Key, Value} <- PropList ]};
565post_decode(List) when is_list(List) ->
566    [ post_decode(Term) || Term <- List];
567post_decode(Term) ->
568    Term.
569