1%%% -*- erlang -*- 2%%% 3%%% This file is part of couchbeam released under the MIT license. 4%%% See the NOTICE for more information. 5%%% 6 7-module(couchbeam_changes_stream). 8 9-export([start_link/4]). 10 11-export([init_stream/5, 12 maybe_continue/1, 13 wait_reconnect/1, 14 system_continue/3, 15 system_terminate/4, 16 system_code_change/4]). 17 18-export([init/1, 19 handle_event/2, 20 wait_results/2, 21 wait_results1/2, 22 collect_object/2, 23 maybe_continue_decoding/1]). 24 25-include("couchbeam.hrl"). 26 27 28-record(state, {parent, 29 owner, 30 ref, 31 mref, 32 db, 33 options, 34 client_ref=nil, 35 decoder, 36 feed_type=continuous, 37 reconnect_after=1000, 38 async=normal}). 39 40-define(TIMEOUT, 10000). 41 42 43start_link(Owner, StreamRef, Db, Options) -> 44 proc_lib:start_link(?MODULE, init_stream, [self(), Owner, StreamRef, 45 Db, Options]). 46 47 48init_stream(Parent, Owner, StreamRef, Db, Options) -> 49 %% clean options 50 Options1 = parse_options(Options, []), 51 52 %% reconnect option, time to wait before re.connecting when using a 53 %% longpoll feed. Default is 1s. 54 ReconnectAfter = proplists:get_value(reconnect_after, Options1, 1000), 55 56 %% type of asynchronous request 57 Async = proplists:get_value(async, Options, normal), 58 59 %% feed type 60 {FeedType, FinalOptions} = case proplists:get_value(feed, Options1) of 61 undefined -> 62 {continuous, [{feed, continuous} | Options1]}; 63 Type -> 64 {Type, Options1} 65 end, 66 67 %% Get since 68 Since = proplists:get_value(since, FinalOptions, 0), 69 70 %% monitor the process receiving§ the messages 71 MRef = erlang:monitor(process, Owner), 72 73 %% initial state 74 InitState = #state{parent=Parent, 75 owner=Owner, 76 ref=StreamRef, 77 mref=MRef, 78 db=Db, 79 options=FinalOptions, 80 feed_type=FeedType, 81 reconnect_after=ReconnectAfter, 82 async=Async}, 83 84 %% connect to the changes 85 {ok, State} = do_init_stream(InitState), 86 87 %% register the stream 88 ets:insert(couchbeam_changes_streams, [{StreamRef, self()}]), 89 90 %% initialise the last sequece 91 put(last_seq, Since), 92 93 %% tell to the parent that we are ok 94 proc_lib:init_ack(Parent, {ok, self()}), 95 96 %% start the loop 97 loop(State), 98 %% stop to monitor the parent 99 erlang:demonitor(MRef), 100 ok. 101 102do_init_stream(#state{mref=MRef, 103 db=Db, 104 options=Options, 105 feed_type=FeedType}=State) -> 106 #db{server=Server, options=ConnOpts} = Db, 107 %% we are doing the request asynchronously 108 ConnOpts1 = [{async, once}, {recv_timeout, infinity}| ConnOpts], 109 110 %% if we are filtering the changes using docids, send a POST request 111 %% instead of a GET to make sure it will be accepted whatever the 112 %% number of doc ids given. 113 {DocIds, Options1} = case proplists:get_value(doc_ids, Options) of 114 undefined -> 115 {[], Options}; 116 [] -> 117 {[], Options}; 118 Ids -> 119 {Ids, proplists:delete(doc_ids, Options)} 120 end, 121 122 %% make the changes url 123 Url = hackney_url:make_url(couchbeam_httpc:server_url(Server), 124 [couchbeam_httpc:db_url(Db), <<"_changes">>], 125 Options1), 126 127 {ok, ClientRef} = case DocIds of 128 [] -> 129 couchbeam_httpc:request(get, Url, [], <<>>, ConnOpts1); 130 DocIds -> 131 Body = couchbeam_ejson:encode({[{<<"doc_ids">>, DocIds}]}), 132 Headers = [{<<"Content-Type">>, <<"application/json">>}], 133 couchbeam_httpc:request(post, Url, Headers, Body, ConnOpts1) 134 end, 135 receive 136 {'DOWN', MRef, _, _, _} -> 137 %% parent exited there is no need to continue 138 exit(normal); 139 {hackney_response, ClientRef, {status, 200, _}} -> 140 State1 = State#state{client_ref=ClientRef}, 141 DecoderFun = case FeedType of 142 longpoll -> 143 jsx:decoder(?MODULE, [State1], [stream]); 144 _ -> 145 nil 146 end, 147 {ok, State1#state{decoder=DecoderFun}}; 148 {hackney_response, ClientRef, {error, Reason}} -> 149 exit(Reason) 150 after ?TIMEOUT -> 151 exit(timeout) 152 end. 153 154 155loop(#state{owner=Owner, 156 ref=StreamRef, 157 mref=MRef, 158 client_ref=ClientRef}=State) -> 159 160 161 hackney:stream_next(ClientRef), 162 receive 163 {'DOWN', MRef, _, _, _} -> 164 %% parent exited there is no need to continue 165 exit(normal); 166 {hackney_response, ClientRef, {headers, _Headers}} -> 167 loop(State); 168 {hackney_response, ClientRef, done} -> 169 maybe_reconnect(State); 170 {hackney_response, ClientRef, <<"\n">>} -> 171 maybe_continue(State); 172 {hackney_response, ClientRef, Data} when is_binary(Data) -> 173 decode_data(Data, State); 174 {hackney_response, ClientRef, Error} -> 175 ets:delete(couchbeam_changes_streams, StreamRef), 176 %% report the error 177 report_error(Error, StreamRef, Owner), 178 exit(Error) 179 end. 180 181 182maybe_reconnect(#state{ref=Ref, 183 options=Options, 184 feed_type=longpoll, 185 reconnect_after=After}=State) 186 when is_integer(After) -> 187 %% longpoll connections, we will restart after the delay 188 189 %% update the state so we will restart on the last sequence 190 LastSeq = get(last_seq), 191 Options1 = couchbeam_util:force_param(since, LastSeq, Options), 192 NState = State#state{options=Options1, 193 client_ref=nil}, 194 %% send the message in the interval 195 erlang:send_after(After, self(), {Ref, reconnect}), 196 %% hibernate the process, waiting on the reconnect message 197 erlang:hibernate(?MODULE, wait_reconnect, [NState]); 198maybe_reconnect(#state{owner=Owner, ref=StreamRef}) -> 199 %% stop the change stream 200 %% unregister the stream 201 ets:delete(couchbeam_changes_streams, StreamRef), 202 %% tell to the owner that we are done and exit, 203 LastSeq = get(last_seq), 204 Owner ! {StreamRef, {done, LastSeq}}. 205 206%% wait to reconnect 207wait_reconnect(#state{parent=Parent, 208 owner=Owner, 209 mref=MRef, 210 ref=Ref}=State) -> 211 receive 212 {'DOWN', MRef, _, _, _} -> 213 %% parent exited there is no need to continue 214 exit(normal); 215 {Ref, cancel} -> 216 maybe_close(State), 217 %% unregister the stream 218 ets:delete(couchbeam_changes_streams, Ref), 219 %% tell the parent we exited 220 Owner ! {Ref, ok}; 221 {Ref, reconnect} -> 222 {ok, NState} = do_init_stream(State), 223 loop(NState); 224 {Ref, _} -> 225 wait_reconnect(State); 226 {system, From, Request} -> 227 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 228 {wait_reconnect, State}); 229 Else -> 230 error_logger:error_msg("Unexpected message: ~w~n", [Else]), 231 %% unregister the stream 232 ets:delete(couchbeam_changes_streams, Ref), 233 %% report the error 234 report_error(Else, Ref, Owner), 235 exit(Else) 236 after 0 -> 237 loop(State) 238 end. 239 240 241seq(Props,#state{owner=Owner,ref=Ref}) -> 242 Seq = couchbeam_util:get_value(<<"seq">>, Props), 243 put(last_seq, Seq), 244 Owner ! {Ref, {change, {Props}}}. 245 246decode(Data) -> 247 jsx:decode(Data,[return_tail,stream]). 248 249decodefun(nil) -> 250 fun(Data) -> decode(Data) end; 251decodefun(Fun) -> 252 Fun. 253 254decode_with_tail(Data, Fun, State) -> 255 case (decodefun(Fun))(Data) of 256 {with_tail,Props,Rest} -> 257 seq(Props,State), 258 decode_with_tail(Rest,decodefun(nil),State); 259 Other -> Other 260 end. 261 262decode_data(Data, #state{feed_type=continuous, 263 decoder=DecodeFun}=State) -> 264 265 {incomplete, DecodeFun2} = 266 try 267 decode_with_tail(Data,DecodeFun,State) 268 catch error:badarg -> exit(badarg) 269 end, 270 271 try DecodeFun2(end_stream) of 272 Props -> 273 seq(Props,State), 274 maybe_continue(State#state{decoder=nil}) 275 catch error:badarg -> maybe_continue(State#state{decoder=DecodeFun2}) 276 end; 277decode_data(Data, #state{client_ref=ClientRef, 278 decoder=DecodeFun}=State) -> 279 try 280 {incomplete, DecodeFun2} = DecodeFun(Data), 281 try DecodeFun2(end_stream) of done -> 282 %% stop the request 283 {ok, _} = hackney:stop_async(ClientRef), 284 %% skip the rest of the body so the socket is 285 %% replaced in the pool 286 hackney:skip_body(ClientRef), 287 %% maybe reconnect 288 maybe_reconnect(State) 289 catch error:badarg -> 290 maybe_continue(State#state{decoder=DecodeFun2}) 291 end 292 catch error:badarg -> exit(badarg) 293 end. 294 295maybe_continue(#state{parent=Parent, 296 owner=Owner, 297 ref=Ref, 298 mref=MRef, 299 async=once}=State) -> 300 301 receive 302 {'DOWN', MRef, _, _, _} -> 303 %% parent exited there is no need to continue 304 exit(normal); 305 {Ref, stream_next} -> 306 loop(State); 307 {Ref, cancel} -> 308 maybe_close(State), 309 %% unregister the stream 310 ets:delete(couchbeam_changes_streams, Ref), 311 %% tell the parent we exited 312 Owner ! {Ref, ok}; 313 {system, From, Request} -> 314 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 315 {loop, State}); 316 Else -> 317 error_logger:error_msg("Unexpected message: ~w~n", [Else]), 318 %% unregister the stream 319 ets:delete(couchbeam_changes_streams, Ref), 320 %% report the error 321 report_error(Else, Ref, Owner), 322 exit(Else) 323 after 0 -> 324 loop(State) 325 end; 326maybe_continue(#state{parent=Parent, 327 owner=Owner, 328 ref=Ref, 329 mref=MRef}=State) -> 330 receive 331 {'DOWN', MRef, _, _, _} -> 332 %% parent exited there is no need to continue 333 exit(normal); 334 {Ref, cancel} -> 335 maybe_close(State), 336 %% unregister the stream 337 ets:delete(couchbeam_changes_streams, Ref), 338 %% tell the parent we exited 339 Owner ! {Ref, ok}; 340 {Ref, pause} -> 341 erlang:hibernate(?MODULE, maybe_continue, [State]); 342 {Ref, resume} -> 343 loop(State); 344 {system, From, Request} -> 345 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 346 {loop, State}); 347 Else -> 348 error_logger:error_msg("Unexpected message: ~w~n", [Else]), 349 %% unregister the stream 350 ets:delete(couchbeam_changes_streams, Ref), 351 %% report the error 352 report_error(Else, Ref, Owner), 353 exit(Else) 354 after 0 -> 355 loop(State) 356 end. 357 358system_continue(_, _, {wait_reconnect, State}) -> 359 wait_reconnect(State); 360system_continue(_, _, {maybe_continue, State}) -> 361 maybe_continue(State); 362system_continue(_, _, {loop, State}) -> 363 loop(State). 364 365-spec system_terminate(any(), _, _, _) -> no_return(). 366system_terminate(Reason, _, _, #state{ref=StreamRef}) -> 367 %% unregister the stream 368 catch ets:delete(couchbeam_changes_streams, StreamRef), 369 exit(Reason). 370 371system_code_change(Misc, _, _, _) -> 372 {ok, Misc}. 373 374 375%%% json decoder %%% 376 377init([State]) -> 378 {wait_results, 0, [[]], State}. 379 380 381handle_event(end_json, _) -> 382 done; 383handle_event(Event, {Fun, _, _, _}=St) -> 384 ?MODULE:Fun(Event, St). 385 386 387 388wait_results(start_object, St) -> 389 St; 390wait_results(end_object, St) -> 391 St; 392wait_results({key, <<"results">>}, {_, _, _, St}) -> 393 {wait_results1, 0, [[]], St}; 394wait_results(_, {_, _, _, St}) -> 395 {wait_results, 0, [[]], St}. 396 397 398 399wait_results1(start_array, {_, _, _, St}) -> 400 {wait_results1, 0, [[]], St}; 401wait_results1(start_object, {_, _, Terms, St}) -> 402 {collect_object, 0, [[]|Terms], St}; 403wait_results1(end_array, {_, _, _, St}) -> 404 {wait_results, 0, [[]], St}. 405 406 407collect_object(start_object, {_, NestCount, Terms, St}) -> 408 {collect_object, NestCount + 1, [[]|Terms], St}; 409 410collect_object(end_object, {_, NestCount, [[], {key, Key}, Last|Terms], 411 St}) -> 412 {collect_object, NestCount - 1, [[{Key, {[{}]}}] ++ Last] ++ Terms, 413 St}; 414 415collect_object(end_object, {_, NestCount, [Object, {key, Key}, 416 Last|Terms], St}) -> 417 {collect_object, NestCount - 1, 418 [[{Key, {lists:reverse(Object)}}] ++ Last] ++ Terms, St}; 419 420collect_object(end_object, {_, 0, [[], Last|Terms], St}) -> 421 [[Change]] = [[{[{}]}] ++ Last] ++ Terms, 422 send_change(Change, St); 423 424collect_object(end_object, {_, NestCount, [[], Last|Terms], St}) -> 425 {collect_object, NestCount - 1, [[{[{}]}] ++ Last] ++ Terms, St}; 426 427collect_object(end_object, {_, 0, [Object, Last|Terms], St}) -> 428 [[Change]] = [[{lists:reverse(Object)}] ++ Last] ++ Terms, 429 send_change(Change, St); 430 431 432collect_object(end_object, {_, NestCount, [Object, Last|Terms], St}) -> 433 Acc = [[{lists:reverse(Object)}] ++ Last] ++ Terms, 434 {collect_object, NestCount - 1, Acc, St}; 435 436 437collect_object(start_array, {_, NestCount, Terms, St}) -> 438 {collect_object, NestCount, [[]|Terms], St}; 439collect_object(end_array, {_, NestCount, [List, {key, Key}, Last|Terms], 440 St}) -> 441 {collect_object, NestCount, 442 [[{Key, lists:reverse(List)}] ++ Last] ++ Terms, St}; 443collect_object(end_array, {_, NestCount, [List, Last|Terms], St}) -> 444 {collect_object, NestCount, [[lists:reverse(List)] ++ Last] ++ Terms, 445 St}; 446 447collect_object({key, Key}, {_, NestCount, Terms, St}) -> 448 {collect_object, NestCount, [{key, Key}] ++ Terms, 449 St}; 450 451collect_object({_, Event}, {_, NestCount, [{key, Key}, Last|Terms], St}) -> 452 {collect_object, NestCount, [[{Key, Event}] ++ Last] ++ Terms, St}; 453collect_object({_, Event}, {_, NestCount, [Last|Terms], St}) -> 454 {collect_object, NestCount, [[Event] ++ Last] ++ Terms, St}. 455 456send_change({Props}=Change, #state{owner=Owner, ref=Ref}=St) -> 457 Seq = couchbeam_util:get_value(<<"seq">>, Props), 458 put(last_seq, Seq), 459 Owner ! {Ref, {change, Change}}, 460 maybe_continue_decoding(St). 461 462 463%% eventually wait for the next call from the parent 464maybe_continue_decoding(#state{parent=Parent, 465 owner=Owner, 466 ref=Ref, 467 mref=MRef, 468 client_ref=ClientRef, 469 async=once}=St) -> 470 receive 471 {'DOWN', MRef, _, _, _} -> 472 %% parent exited there is no need to continue 473 exit(normal); 474 {Ref, stream_next} -> 475 {wait_results1, 0, [[]], St}; 476 {Ref, cancel} -> 477 hackney:close(ClientRef), 478 %% unregister the stream 479 ets:delete(couchbeam_changes_streams, Ref), 480 %% tell the parent we exited 481 Owner ! {Ref, ok}, 482 %% and exit 483 exit(normal); 484 {system, From, Request} -> 485 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 486 {maybe_continue_decoding, St}); 487 Else -> 488 error_logger:error_msg("Unexpected message: ~w~n", [Else]), 489 %% unregister the stream 490 ets:delete(couchbeam_changes_streams, Ref), 491 %% report the error 492 report_error(Else, Ref, Owner), 493 exit(Else) 494 after 5000 -> 495 erlang:hibernate(?MODULE, maybe_continue_decoding, [St]) 496 end; 497 498maybe_continue_decoding(#state{parent=Parent, 499 owner=Owner, 500 ref=Ref, 501 mref=MRef, 502 client_ref=ClientRef}=St) -> 503 receive 504 {'DOWN', MRef, _, _, _} -> 505 %% parent exited there is no need to continue 506 exit(normal); 507 {Ref, cancel} -> 508 hackney:close(ClientRef), 509 Owner ! {Ref, ok}, 510 exit(normal); 511 {Ref, pause} -> 512 erlang:hibernate(?MODULE, maybe_continue_decoding, [St]); 513 {Ref, resume} -> 514 {wait_results1, 0, [[]], St}; 515 {system, From, Request} -> 516 sys:handle_system_msg(Request, From, Parent, ?MODULE, [], 517 {maybe_continue_decoding, St}); 518 Else -> 519 error_logger:error_msg("Unexpected message: ~w~n", [Else]), 520 report_error(Else, Ref, Owner), 521 exit(Else) 522 after 0 -> 523 {wait_results1, 0, [[]], St} 524 end. 525 526%% @private 527 528%% parse options to get feed type when it's not passed in a tuple 529%% to support the old api. 530parse_options([], Acc) -> 531 lists:reverse(Acc); 532parse_options([normal | Rest], Acc) -> 533 parse_options(Rest, couchbeam_util:force_param(feed, continuous, Acc)); 534parse_options([continuous | Rest], Acc) -> 535 parse_options(Rest, couchbeam_util:force_param(feed, continuous, Acc)); 536parse_options([longpoll | Rest], Acc) -> 537 parse_options(Rest, couchbeam_util:force_param(feed, longpoll, Acc)); 538parse_options([heartbeat | Rest], Acc) -> 539 parse_options(Rest, couchbeam_util:force_param(heartbeat, true, Acc)); 540parse_options([descending | Rest], Acc) -> 541 parse_options(Rest, couchbeam_util:force_param(descending, true, Acc)); 542parse_options([conflicts | Rest], Acc) -> 543 parse_options(Rest, couchbeam_util:force_param(conflicts, true, Acc)); 544parse_options([include_docs | Rest], Acc) -> 545 parse_options(Rest, couchbeam_util:force_param(include_docs, true, Acc)); 546parse_options([{K, V} | Rest], Acc) -> 547 parse_options(Rest, [{K, V} | Acc]). 548 549%% report errors 550report_error({error, _What}=Error, Ref, Pid) -> 551 Pid ! {Ref, Error}; 552report_error(What, Ref, Pid) -> 553 Pid ! {Ref, {error, What}}. 554 555%% close the connection only if one is active 556maybe_close(#state{client_ref=nil}) -> 557 ok; 558maybe_close(#state{client_ref=Ref}) -> 559 hackney:close(Ref). 560 561post_decode([{}]) -> 562 {[]}; 563post_decode([{_Key, _Value} | _Rest] = PropList) -> 564 {[ {Key, post_decode(Value)} || {Key, Value} <- PropList ]}; 565post_decode(List) when is_list(List) -> 566 [ post_decode(Term) || Term <- List]; 567post_decode(Term) -> 568 Term. 569