From 9375b49335568bafdb2e423705b31bd73fba42a4 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 25 Aug 2023 13:26:37 +0100 Subject: [PATCH 1/5] Handle timeouts in the PB client --- src/riak_kv_replrtq_peer.erl | 2 +- src/riak_kv_replrtq_snk.erl | 89 ++++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 41 deletions(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 8c92df1b6..8614761ad 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -66,7 +66,7 @@ update_discovery(QueueName) -> ?DISCOVERY_TIMEOUT_SECONDS * 1000). -spec update_workers(pos_integer(), pos_integer()) -> boolean(). -update_workers(WorkerCount, PerPeerLimit) -> +update_workers(WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call( ?MODULE, {update_workers, WorkerCount, PerPeerLimit}, diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index dbfb4949e..979fefaf5 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -65,7 +65,7 @@ -define(STARTING_DELAYMS, 8). -define(MAX_SUCCESS_DELAYMS, 1024). -define(ON_ERROR_DELAYMS, 65536). --define(INACTIVITY_TIMEOUT_MS, 60000). +-define(INITIAL_TIMEOUT_MS, 60000). -define(DEFAULT_WORKERCOUNT, 1). -record(sink_work, {queue_name :: queue_name(), @@ -208,10 +208,13 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {add, QueueName, Peers, WorkerCount, PerPeerLimit}). +add_snkqueue( + QueueName, Peers, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {add, QueueName, Peers, WorkerCount, PerPeerLimit}, + infinity). %% @doc @@ -236,50 +239,30 @@ set_workercount(QueueName, WorkerCount) -> %% @doc %% Change the number of concurrent workers whilst limiting the number of %% workers per peer --spec set_workercount(queue_name(), pos_integer(), pos_integer()) - -> ok|not_found. -set_workercount(QueueName, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {worker_count, QueueName, WorkerCount, PerPeerLimit}). +-spec set_workercount( + queue_name(), pos_integer(), pos_integer()) -> ok|not_found. +set_workercount( + QueueName, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {worker_count, QueueName, WorkerCount, PerPeerLimit}, + infinity + ). %%%============================================================================ %%% gen_server callbacks %%%============================================================================ init([]) -> - SinkEnabled = - app_helper:get_env(riak_kv, replrtq_enablesink, false), + SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false), case SinkEnabled of true -> - SinkPeers = - app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), - DefaultQueue = - app_helper:get_env(riak_kv, replrtq_sinkqueue), - SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), - {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), - Iteration = 1, - MapPeerInfoFun = - fun({SnkQueueName, SnkPeerInfo}) -> - {SnkQueueLength, SnkWorkQueue} = - determine_workitems(SnkQueueName, - Iteration, - SnkPeerInfo, - SnkWorkerCount, - min(SnkWorkerCount, PerPeerLimit)), - SnkW = - #sink_work{queue_name = SnkQueueName, - work_queue = SnkWorkQueue, - minimum_queue_length = SnkQueueLength, - peer_list = SnkPeerInfo, - max_worker_count = SnkWorkerCount}, - {SnkQueueName, Iteration, SnkW} - end, - Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {ok, #state{enabled = true, work = Work}, ?INACTIVITY_TIMEOUT_MS}; + gen_server:cast(?MODULE, initialise_work); false -> - {ok, #state{}} - end. + ok + end, + {ok, #state{}}. handle_call({suspend, QueueN}, _From, State) -> case lists:keyfind(QueueN, 1, State#state.work) of @@ -364,6 +347,32 @@ handle_call({current_peers, QueueN}, _From, State) -> end. +handle_cast(initialise_work, State) -> + SinkPeers = + app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), + DefaultQueue = + app_helper:get_env(riak_kv, replrtq_sinkqueue), + SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), + {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), + Iteration = 1, + MapPeerInfoFun = + fun({SnkQueueName, SnkPeerInfo}) -> + {SnkQueueLength, SnkWorkQueue} = + determine_workitems(SnkQueueName, + Iteration, + SnkPeerInfo, + SnkWorkerCount, + min(SnkWorkerCount, PerPeerLimit)), + SnkW = + #sink_work{queue_name = SnkQueueName, + work_queue = SnkWorkQueue, + minimum_queue_length = SnkQueueLength, + peer_list = SnkPeerInfo, + max_worker_count = SnkWorkerCount}, + {SnkQueueName, Iteration, SnkW} + end, + Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), + {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}; handle_cast(prompt_work, State) -> Work0 = lists:map(fun do_work/1, State#state.work), {noreply, State#state{work = Work0}}; From 16929197f62320e1f1da2032a92b09ade75097ed Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 28 Aug 2023 13:05:41 +0100 Subject: [PATCH 2/5] Handle any delay in snk The peer needs to call current_peers, and this may timeout if the process is held busy by a client initialisation that is timing out due to unavailability of a peer --- src/riak_kv_replrtq_peer.erl | 4 ++-- src/riak_kv_replrtq_snk.erl | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 8614761ad..342e65b00 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -41,8 +41,8 @@ -type discovery_peer() :: {riak_kv_replrtq_snk:queue_name(), [riak_kv_replrtq_snk:peer_info()]}. --define(DISCOVERY_TIMEOUT_SECONDS, 60). --define(UPDATE_TIMEOUT_SECONDS, 60). +-define(DISCOVERY_TIMEOUT_SECONDS, 300). +-define(UPDATE_TIMEOUT_SECONDS, 300). -define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900). -define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60). diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index 979fefaf5..169b334d7 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -223,7 +223,7 @@ add_snkqueue( %% Returns undefined if there are currently no peers defined. -spec current_peers(queue_name()) -> list(peer_info())|undefined. current_peers(QueueName) -> - gen_server:call(?MODULE, {current_peers, QueueName}). + gen_server:call(?MODULE, {current_peers, QueueName}, infinity). %% @doc From 53b415c2acbb2d9417dcc9df747624c73de6b499 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 28 Aug 2023 16:04:15 +0100 Subject: [PATCH 3/5] HTTP client may respond after timeout The ibrowse http client will gen_server:call on send_req with a timeout, and catch any timeout error. This means that the client may receive an {error, reqd_timeout}, but the gen_server has not crashed and so may later send a gen_server:reply to the riak_kv_replrtq_peer. These errors need to be picked up in handle_info to avoid noise of repeated crashes. This is not a problem in riak_kv_replrtq_snk, as in the process we are always using the http client from within a short-lived spawned process. Likewise in riak_kv_ttaaefs_manager the client is sued within a short-lived aae_exchange. --- src/riak_kv_replrtq_peer.erl | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 342e65b00..8820e87b0 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -142,6 +142,11 @@ handle_info({scheduled_discovery, QueueName}, State) -> ?AUTO_DISCOVERY_MAXIMUM_SECONDS), Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay, _ = schedule_discovery(QueueName, self(), Delay), + {noreply, State}; +handle_info({_Ref, HTTPClientError}, State) -> + lager:info( + "Client error caught - error ~p returned after timeout", + [HTTPClientError]), {noreply, State}. terminate(_Reason, _State) -> From 3d3e5e80e2728eb9ca841d477b02577784a7cb7f Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 1 Sep 2023 12:50:17 +0100 Subject: [PATCH 4/5] Change to use handle_continue/2 Riak now OTP 22+ only, so ok to use feature added in OTP 21 rather than an artificial method for deferring work after init reply. --- src/riak_kv_replrtq_snk.erl | 61 +++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index 169b334d7..28478f179 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -31,6 +31,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3]). @@ -258,11 +259,10 @@ init([]) -> SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false), case SinkEnabled of true -> - gen_server:cast(?MODULE, initialise_work); + {ok, #state{}, {continue, initialise_work}}; false -> - ok - end, - {ok, #state{}}. + {ok, #state{}} + end. handle_call({suspend, QueueN}, _From, State) -> case lists:keyfind(QueueN, 1, State#state.work) of @@ -347,32 +347,6 @@ handle_call({current_peers, QueueN}, _From, State) -> end. -handle_cast(initialise_work, State) -> - SinkPeers = - app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), - DefaultQueue = - app_helper:get_env(riak_kv, replrtq_sinkqueue), - SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), - {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), - Iteration = 1, - MapPeerInfoFun = - fun({SnkQueueName, SnkPeerInfo}) -> - {SnkQueueLength, SnkWorkQueue} = - determine_workitems(SnkQueueName, - Iteration, - SnkPeerInfo, - SnkWorkerCount, - min(SnkWorkerCount, PerPeerLimit)), - SnkW = - #sink_work{queue_name = SnkQueueName, - work_queue = SnkWorkQueue, - minimum_queue_length = SnkQueueLength, - peer_list = SnkPeerInfo, - max_worker_count = SnkWorkerCount}, - {SnkQueueName, Iteration, SnkW} - end, - Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}; handle_cast(prompt_work, State) -> Work0 = lists:map(fun do_work/1, State#state.work), {noreply, State#state{work = Work0}}; @@ -441,6 +415,33 @@ handle_info({prompt_requeue, WorkItem}, State) -> requeue_work(WorkItem), {noreply, State}. +handle_continue(initialise_work, State) -> + SinkPeers = + app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), + DefaultQueue = + app_helper:get_env(riak_kv, replrtq_sinkqueue), + SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), + {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), + Iteration = 1, + MapPeerInfoFun = + fun({SnkQueueName, SnkPeerInfo}) -> + {SnkQueueLength, SnkWorkQueue} = + determine_workitems(SnkQueueName, + Iteration, + SnkPeerInfo, + SnkWorkerCount, + min(SnkWorkerCount, PerPeerLimit)), + SnkW = + #sink_work{queue_name = SnkQueueName, + work_queue = SnkWorkQueue, + minimum_queue_length = SnkQueueLength, + peer_list = SnkPeerInfo, + max_worker_count = SnkWorkerCount}, + {SnkQueueName, Iteration, SnkW} + end, + Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), + {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}. + terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), CloseFun = From dabe0feaed063a12e9097b0be4d9a6fc23b4b76c Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Fri, 1 Sep 2023 18:18:18 +0100 Subject: [PATCH 5/5] More specific catch of HTTP client error message --- src/riak_kv_replrtq_peer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 8820e87b0..bbeb19455 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -143,7 +143,7 @@ handle_info({scheduled_discovery, QueueName}, State) -> Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay, _ = schedule_discovery(QueueName, self(), Delay), {noreply, State}; -handle_info({_Ref, HTTPClientError}, State) -> +handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) -> lager:info( "Client error caught - error ~p returned after timeout", [HTTPClientError]),