diff --git a/src/riak_kv_replrtq_peer.erl b/src/riak_kv_replrtq_peer.erl index 8c92df1b6..bbeb19455 100644 --- a/src/riak_kv_replrtq_peer.erl +++ b/src/riak_kv_replrtq_peer.erl @@ -41,8 +41,8 @@ -type discovery_peer() :: {riak_kv_replrtq_snk:queue_name(), [riak_kv_replrtq_snk:peer_info()]}. --define(DISCOVERY_TIMEOUT_SECONDS, 60). --define(UPDATE_TIMEOUT_SECONDS, 60). +-define(DISCOVERY_TIMEOUT_SECONDS, 300). +-define(UPDATE_TIMEOUT_SECONDS, 300). -define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900). -define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60). @@ -66,7 +66,7 @@ update_discovery(QueueName) -> ?DISCOVERY_TIMEOUT_SECONDS * 1000). -spec update_workers(pos_integer(), pos_integer()) -> boolean(). -update_workers(WorkerCount, PerPeerLimit) -> +update_workers(WorkerCount, PerPeerLimit) when PerPeerLimit =< WorkerCount -> gen_server:call( ?MODULE, {update_workers, WorkerCount, PerPeerLimit}, @@ -142,6 +142,11 @@ handle_info({scheduled_discovery, QueueName}, State) -> ?AUTO_DISCOVERY_MAXIMUM_SECONDS), Delay = rand:uniform(max(1, MaxDelay - MinDelay)) + MinDelay, _ = schedule_discovery(QueueName, self(), Delay), + {noreply, State}; +handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) -> + lager:info( + "Client error caught - error ~p returned after timeout", + [HTTPClientError]), {noreply, State}. terminate(_Reason, _State) -> diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index dbfb4949e..28478f179 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -31,6 +31,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3]). @@ -65,7 +66,7 @@ -define(STARTING_DELAYMS, 8). -define(MAX_SUCCESS_DELAYMS, 1024). -define(ON_ERROR_DELAYMS, 65536). --define(INACTIVITY_TIMEOUT_MS, 60000). +-define(INITIAL_TIMEOUT_MS, 60000). -define(DEFAULT_WORKERCOUNT, 1). -record(sink_work, {queue_name :: queue_name(), @@ -208,10 +209,13 @@ add_snkqueue(QueueName, Peers, WorkerCount) -> %% number of workers overall -spec add_snkqueue(queue_name(), list(peer_info()), pos_integer(), pos_integer()) -> ok. -add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {add, QueueName, Peers, WorkerCount, PerPeerLimit}). +add_snkqueue( + QueueName, Peers, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {add, QueueName, Peers, WorkerCount, PerPeerLimit}, + infinity). %% @doc @@ -220,7 +224,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit) %% Returns undefined if there are currently no peers defined. -spec current_peers(queue_name()) -> list(peer_info())|undefined. current_peers(QueueName) -> - gen_server:call(?MODULE, {current_peers, QueueName}). + gen_server:call(?MODULE, {current_peers, QueueName}, infinity). %% @doc @@ -236,47 +240,26 @@ set_workercount(QueueName, WorkerCount) -> %% @doc %% Change the number of concurrent workers whilst limiting the number of %% workers per peer --spec set_workercount(queue_name(), pos_integer(), pos_integer()) - -> ok|not_found. -set_workercount(QueueName, WorkerCount, PerPeerLimit) - when PerPeerLimit =< WorkerCount -> - gen_server:call(?MODULE, - {worker_count, QueueName, WorkerCount, PerPeerLimit}). +-spec set_workercount( + queue_name(), pos_integer(), pos_integer()) -> ok|not_found. +set_workercount( + QueueName, WorkerCount, PerPeerLimit) + when PerPeerLimit =< WorkerCount -> + gen_server:call( + ?MODULE, + {worker_count, QueueName, WorkerCount, PerPeerLimit}, + infinity + ). %%%============================================================================ %%% gen_server callbacks %%%============================================================================ init([]) -> - SinkEnabled = - app_helper:get_env(riak_kv, replrtq_enablesink, false), + SinkEnabled = app_helper:get_env(riak_kv, replrtq_enablesink, false), case SinkEnabled of true -> - SinkPeers = - app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), - DefaultQueue = - app_helper:get_env(riak_kv, replrtq_sinkqueue), - SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), - {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), - Iteration = 1, - MapPeerInfoFun = - fun({SnkQueueName, SnkPeerInfo}) -> - {SnkQueueLength, SnkWorkQueue} = - determine_workitems(SnkQueueName, - Iteration, - SnkPeerInfo, - SnkWorkerCount, - min(SnkWorkerCount, PerPeerLimit)), - SnkW = - #sink_work{queue_name = SnkQueueName, - work_queue = SnkWorkQueue, - minimum_queue_length = SnkQueueLength, - peer_list = SnkPeerInfo, - max_worker_count = SnkWorkerCount}, - {SnkQueueName, Iteration, SnkW} - end, - Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), - {ok, #state{enabled = true, work = Work}, ?INACTIVITY_TIMEOUT_MS}; + {ok, #state{}, {continue, initialise_work}}; false -> {ok, #state{}} end. @@ -432,6 +415,33 @@ handle_info({prompt_requeue, WorkItem}, State) -> requeue_work(WorkItem), {noreply, State}. +handle_continue(initialise_work, State) -> + SinkPeers = + app_helper:get_env(riak_kv, replrtq_sinkpeers, ""), + DefaultQueue = + app_helper:get_env(riak_kv, replrtq_sinkqueue), + SnkQueuePeerInfo = tokenise_peers(DefaultQueue, SinkPeers), + {SnkWorkerCount, PerPeerLimit} = get_worker_counts(), + Iteration = 1, + MapPeerInfoFun = + fun({SnkQueueName, SnkPeerInfo}) -> + {SnkQueueLength, SnkWorkQueue} = + determine_workitems(SnkQueueName, + Iteration, + SnkPeerInfo, + SnkWorkerCount, + min(SnkWorkerCount, PerPeerLimit)), + SnkW = + #sink_work{queue_name = SnkQueueName, + work_queue = SnkWorkQueue, + minimum_queue_length = SnkQueueLength, + peer_list = SnkPeerInfo, + max_worker_count = SnkWorkerCount}, + {SnkQueueName, Iteration, SnkW} + end, + Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo), + {noreply, State#state{enabled = true, work = Work}, ?INITIAL_TIMEOUT_MS}. + terminate(_Reason, State) -> WorkItems = lists:map(fun(SW) -> element(3, SW) end, State#state.work), CloseFun =