From 94d130fe36392dbde1fc0e817b75930650c97040 Mon Sep 17 00:00:00 2001 From: Alexander Shorin Date: Wed, 9 Mar 2016 15:48:38 +0300 Subject: [PATCH] Emit heartbeats until feed timeout COUCHDB-2961 --- src/fabric_view_changes.erl | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/fabric_view_changes.erl b/src/fabric_view_changes.erl index 4855989..f9c7684 100644 --- a/src/fabric_view_changes.erl +++ b/src/fabric_view_changes.erl @@ -32,11 +32,11 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse case validate_start_seq(DbName, Since) of ok -> {ok, Acc} = Callback(start, Acc0), - {Timeout, _} = couch_changes:get_changes_timeout(Args, Callback), + {InterruptTimeout, _} = couch_changes:get_changes_timeout(Args, Callback), Ref = make_ref(), Parent = self(), UpdateListener = {spawn_link(fabric_db_update_listener, go, - [Parent, Ref, DbName, Timeout]), + [Parent, Ref, DbName, InterruptTimeout]), Ref}, put(changes_epoch, get_changes_epoch()), try @@ -46,7 +46,7 @@ go(DbName, Feed, Options, Callback, Acc0) when Feed == "continuous" orelse Callback, Since, Acc, - Timeout, + InterruptTimeout, UpdateListener, os:timestamp() ) @@ -77,9 +77,9 @@ go(DbName, "normal", Options, Callback, Acc0) -> Callback(Error, Acc0) end. -keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) -> - #changes_args{limit=Limit, feed=Feed, heartbeat=Heartbeat} = Args, - {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, Timeout), +keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, InterruptTimeout, UpListen, T0) -> + #changes_args{limit=Limit, feed=Feed, heartbeat = Heartbeat, timeout = Timeout} = Args, + {ok, Collector} = send_changes(DbName, Args, Callback, Seqs, AccIn, InterruptTimeout), #collector{ limit = Limit2, counters = NewSeqs, @@ -96,13 +96,15 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) {ok, AccOut} = Callback(waiting_for_updates, AccOut0), WaitForUpdate = wait_db_updated(UpListen), AccumulatedTime = timer:now_diff(os:timestamp(), T0) div 1000, - Max = case config:get("fabric", "changes_duration") of - undefined -> - infinity; - MaxStr -> - list_to_integer(MaxStr) + MaxFeedTimeout = case config:get("fabric", "changes_duration") of + undefined -> infinity; + MaxStr -> list_to_integer(MaxStr) + end, + FeedTimeout = case Timeout of + undefined -> MaxFeedTimeout; + _ when is_integer(Timeout) -> Timeout end, - case {Heartbeat, AccumulatedTime > Max, WaitForUpdate} of + case {Heartbeat, AccumulatedTime > FeedTimeout, WaitForUpdate} of {undefined, _, timeout} -> Callback({stop, LastSeq, pending_count(Offset)}, AccOut); {_, true, timeout} -> @@ -115,7 +117,7 @@ keep_sending_changes(DbName, Args, Callback, Seqs, AccIn, Timeout, UpListen, T0) Callback, LastSeq, AccTimeout, - Timeout, + InterruptTimeout, UpListen, T0 ) @@ -215,7 +217,7 @@ receive_results(Workers, State, Timeout, Callback) -> case rexi_utils:recv(Workers, #shard.ref, fun handle_message/3, State, Timeout, infinity) of {timeout, NewState0} -> - {ok, AccOut} = Callback(timeout, NewState0#collector.user_acc), + {ok, AccOut} = Callback(heartbeat, NewState0#collector.user_acc), NewState = NewState0#collector{user_acc = AccOut}, receive_results(Workers, NewState, Timeout, Callback); {_, NewState} ->