Skip to content

Commit

Permalink
Make all kind of timeouts hitting Riak configurable
Browse files Browse the repository at this point in the history
- All configuration items gathered into `riak_cs_config:*_timeout/0`
- Defaults varies from 5 to 60 seconds according to old code
- New items has defaults by 60 seconds defined in `riak_cs.hrl`
- You can set all configurations at once by setting `riakc_timeouts`
  in `riak_cs` section of `app.config`
- Riak client calls that stem from manual operations does not have
  timeout values configurable

 #Please enter the commit message for your changes. Lines starting
  • Loading branch information
kuenishi authored and kellymclaughlin committed Dec 3, 2014
1 parent 09aa0a4 commit 100a5e8
Show file tree
Hide file tree
Showing 19 changed files with 165 additions and 67 deletions.
1 change: 0 additions & 1 deletion include/list_objects.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,3 @@
-define(MAX_CACHE_BYTES, 104857600). % 100MB
-define(KEY_LIST_MULTIPLIER, 1.1).
-define(FOLD_OBJECTS_FOR_LIST_KEYS, true).
-define(FOLD_OBJECTS_TIMEOUT, timer:minutes(1)).
3 changes: 3 additions & 0 deletions include/riak_cs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@
-define(DEFAULT_MANIFEST_WARN_HISTORY, 30).
-define(DEFAULT_MAX_PART_NUMBER, 10000).

%% timeout hitting Riak PB API
-define(DEFAULT_RIAK_TIMEOUT, 60000).

%% General system info
-define(WORD_SIZE, erlang:system_info(wordsize)).

Expand Down
37 changes: 24 additions & 13 deletions src/riak_cs_block_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#sta
dt_entry(<<"delete_block">>, [BlockNumber], [Bucket, Key]),
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber),
StartTime = os:timestamp(),
Timeout = riak_cs_config:get_block_timeout(),

%% do a get first to get the vclock (only do a head request though)
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}, head],
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions) of
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of
{ok, RiakObject} ->
ok = delete_block(RcPid, ReplyPid, RiakObject, {UUID, BlockNumber});
{error, notfound} ->
Expand Down Expand Up @@ -269,7 +270,7 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive,
ProxyActive, UUID, BlockNumber, RcPid, MaxRetries, NewPause)
end,

Timeout = timer:seconds(5),
Timeout = riak_cs_config:local_get_block_timeout(),
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2,
Timeout, ProceedFun, RetryFun, NumRetries, UseProxyGet,
ProxyActive, ClusterID).
Expand Down Expand Up @@ -299,7 +300,8 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2,
ProceedFun, RetryFun, NumRetries, UseProxyGet,
ProxyActive, ClusterID) ->

case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, 60*1000) of
Timeout = riak_cs_config:get_block_timeout(),
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout) of
{ok, _} = Success ->
ProceedFun(Success);

Expand Down Expand Up @@ -338,9 +340,13 @@ get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout) ->
Else
end.

get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions) ->
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) ->
%% replace get_block_timeout with proxy_get_block_timeout
GetOptions1 = proplists:delete(timeout, GetOptions0),
Timeout = riak_cs_config:proxy_get_block_timeout(),
GetOptions2 = [proplists:property(timeout, Timeout) | GetOptions1],
case riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey,
ClusterID, GetOptions) of
ClusterID, GetOptions2) of
{ok, RiakObject} ->
resolve_block_object(RiakObject, RcPid);
Else ->
Expand All @@ -357,11 +363,13 @@ normal_nval_block_get(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID,
StartTime = os:timestamp(),
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}],
Object = case UseProxyGet of
false ->
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions);
true ->
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions)
end,
false ->
LocalTimeout = riak_cs_config:get_block_timeout(),
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, LocalTimeout);
true ->
RemoteTimeout = riak_cs_config:proxy_get_block_timeout(),
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions, RemoteTimeout)
end,
ChunkValue = case Object of
{ok, RiakObject} ->
{ok, riakc_obj:get_value(RiakObject)};
Expand All @@ -380,12 +388,14 @@ delete_block(RcPid, ReplyPid, RiakObject, BlockId) ->

constrained_delete(RcPid, RiakObject, BlockId) ->
DeleteOptions = [{r, all}, {pr, all}, {w, all}, {pw, all}],
Timeout = riak_cs_config:delete_block_timeout(),
format_delete_result(
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions),
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions, Timeout),
BlockId).

secondary_delete_check({error, {unsatisfied_constraint, _, _}}, RcPid, RiakObject) ->
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject);
Timeout = riak_cs_config:delete_block_timeout(),
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout);
secondary_delete_check({error, Reason} = E, _, _) ->
_ = lager:warning("Constrained block deletion failed. Reason: ~p", [Reason]),
E;
Expand Down Expand Up @@ -507,8 +517,9 @@ do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, FailFun) ->
RiakObject0 = riakc_obj:new(FullBucket, FullKey, Value),
RiakObject = riakc_obj:set_vclock(
riakc_obj:update_metadata(RiakObject0, MD), VClock),
Timeout = riak_cs_config:put_block_timeout(),
StartTime = os:timestamp(),
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject) of
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject, Timeout) of
ok ->
ok = riak_cs_stats:update_with_start(block_put, StartTime),
ok;
Expand Down
3 changes: 2 additions & 1 deletion src/riak_cs_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ bucket_empty(Bucket, RcPid) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
%% @TODO Use `stream_list_keys' instead
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
ListKeysResult = riak_cs_pbc:list_keys(ManifestPbc, ManifestBucket),
Timeout = riak_cs_config:list_keys_list_objects_timeout(),
ListKeysResult = riak_cs_pbc:list_keys(ManifestPbc, ManifestBucket, Timeout),
{ok, bucket_empty_handle_list_keys(RcPid, Bucket, ListKeysResult)}.

-spec bucket_empty_handle_list_keys(riak_client(), binary(),
Expand Down
76 changes: 76 additions & 0 deletions src/riak_cs_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@
read_before_last_manifest_write/0
]).

%% Timeouts hitting Riak
-export([ping_timeout/0,
get_user_timeout/0,
get_bucket_timeout/0,
get_manifest_timeout/0,
get_block_timeout/0, %% for n_val=3
local_get_block_timeout/0, %% for n_val=1, default 5
proxy_get_block_timeout/0, %% for remote
get_access_timeout/0,
get_gckey_timeout/0,
put_manifest_timeout/0,
put_block_timeout/0,
put_access_timeout/0,
put_gckey_timeout/0,
put_user_usage_timeout/0,
delete_manifest_timeout/0,
delete_block_timeout/0,
delete_gckey_timeout/0,
list_keys_list_objects_timeout/0,
list_keys_list_users_timeout/0,
storage_calc_timeout/0,
list_objects_timeout/0, %% using mapred (v0)
fold_objects_timeout/0, %% for cs_bucket_fold
get_index_range_gckeys_timeout/0,
get_index_range_gckeys_call_timeout/0,
get_index_list_multipart_uploads_timeout/0
]).

%% OpenStack config
-export([os_auth_url/0,
os_operator_roles/0,
Expand Down Expand Up @@ -346,6 +374,54 @@ read_before_last_manifest_write() ->
get_env(riak_cs, read_before_last_manifest_write, true).


%% ===================================================================
%% ALL Timeouts hitting Riak
%% ===================================================================

-define(TIMEOUT_CONFIG_FUNC(ConfigName),
ConfigName() ->
get_env(riak_cs, ConfigName,
get_env(riak_cs, riakc_timeouts, ?DEFAULT_RIAK_TIMEOUT))).

%% @doc Return the configured ping timeout. Default is 5 seconds. The
%% timeout is used in call to `poolboy:checkout' and if that fails in
%% the call to `riakc_pb_socket:ping' so the effective cumulative
%% timeout could be up to 2 * `ping_timeout()'.
-spec ping_timeout() -> pos_integer().
ping_timeout() ->
get_env(riak_cs, ping_timeout, ?DEFAULT_PING_TIMEOUT).

%% timeouts in milliseconds
?TIMEOUT_CONFIG_FUNC(get_user_timeout).
?TIMEOUT_CONFIG_FUNC(get_bucket_timeout).
?TIMEOUT_CONFIG_FUNC(get_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(get_block_timeout).

local_get_block_timeout() ->
get_env(riak_cs, local_get_block_timeout, timer:seconds(5)).

?TIMEOUT_CONFIG_FUNC(proxy_get_block_timeout).
?TIMEOUT_CONFIG_FUNC(get_access_timeout).
?TIMEOUT_CONFIG_FUNC(get_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(put_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(put_block_timeout).
?TIMEOUT_CONFIG_FUNC(put_access_timeout).
?TIMEOUT_CONFIG_FUNC(put_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(put_user_usage_timeout).
?TIMEOUT_CONFIG_FUNC(delete_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(delete_block_timeout).
?TIMEOUT_CONFIG_FUNC(delete_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_users_timeout).
?TIMEOUT_CONFIG_FUNC(storage_calc_timeout).
?TIMEOUT_CONFIG_FUNC(list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(fold_objects_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_call_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_list_multipart_uploads_timeout).

-undef(TIMEOUT_CONFIG_FUNC).

%% ===================================================================
%% S3 config options
%% ===================================================================
Expand Down
6 changes: 4 additions & 2 deletions src/riak_cs_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,9 @@ move_manifests_to_gc_bucket(Manifests, RcPid) ->
Key = generate_key(),
ManifestSet = build_manifest_set(Manifests),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
Timeout = riak_cs_config:get_gckey_timeout(),
ObjectToWrite =
case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key) of
case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key, Timeout) of
{error, notfound} ->
%% There was no previous value, so we'll
%% create a new riak object and write it
Expand All @@ -383,7 +384,8 @@ move_manifests_to_gc_bucket(Manifests, RcPid) ->

%% Create a set from the list of manifests
_ = lager:debug("Manifests scheduled for deletion: ~p", [ManifestSet]),
riak_cs_pbc:put(ManifestPbc, ObjectToWrite).
Timeout1 = riak_cs_config:put_gckey_timeout(),
riak_cs_pbc:put(ManifestPbc, ObjectToWrite, Timeout1).

-spec build_manifest_set([cs_uuid_and_manifest()]) -> twop_set:twop_set().
build_manifest_set(Manifests) ->
Expand Down
5 changes: 4 additions & 1 deletion src/riak_cs_gc_key_list.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ gc_index_query(RcPid, EndTime, BatchSize, Continuation, UsePaginatedIndexes) ->
end,
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
EpochStart = riak_cs_gc:epoch_start(),
Timeout = riak_cs_config:get_index_range_gckeys_timeout(),
CallTimeout = riak_cs_config:get_index_range_gckeys_call_timeout(),
Options1 = [{timeout, Timeout}, {call_timeout, CallTimeout}] ++ Options,
QueryResult = riakc_pb_socket:get_index_range(
ManifestPbc,
?GC_BUCKET, ?KEY_INDEX,
EpochStart, EndTime,
Options),
Options1),

case QueryResult of
{error, disconnected} ->
Expand Down
6 changes: 4 additions & 2 deletions src/riak_cs_gc_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ continue() ->
fetch_next_fileset(ManifestSetKey, RcPid) ->
%% Get the set of manifests represented by the key
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
case riak_cs_pbc:get_object(ManifestPbc, ?GC_BUCKET, ManifestSetKey) of
Timeout = riak_cs_config:get_gckey_timeout(),
case riak_cs_pbc:get_object(ManifestPbc, ?GC_BUCKET, ManifestSetKey, Timeout) of
{ok, RiakObj} ->
ManifestSet = riak_cs_gc:decode_and_merge_siblings(
RiakObj, twop_set:new()),
Expand All @@ -240,7 +241,8 @@ fetch_next_fileset(ManifestSetKey, RcPid) ->
finish_file_delete(0, _, RiakObj, RcPid) ->
%% Delete the key from the GC bucket
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
_ = riakc_pb_socket:delete_obj(ManifestPbc, RiakObj),
Timeout = riak_cs_config:delete_gckey_timeout(),
_ = riakc_pb_socket:delete_obj(ManifestPbc, RiakObj, Timeout),
ok;
finish_file_delete(_, FileSet, _RiakObj, _RcPid) ->
_ = lager:debug("Remaining file keys: ~p", [twop_set:to_list(FileSet)]),
Expand Down
7 changes: 3 additions & 4 deletions src/riak_cs_list_objects_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ fetch_key_list(RcPid, Request, State, false) ->
-spec make_list_keys_request(riak_client(), list_object_request()) ->
streaming_req_response().
make_list_keys_request(RcPid, ?LOREQ{name=BucketName}) ->
%% hardcoded for now
ServerTimeout = timer:seconds(60),
ServerTimeout = riak_cs_config:list_keys_list_objects_timeout(),
ManifestBucket = riak_cs_utils:to_bucket_name(objects, BucketName),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
riakc_pb_socket:stream_list_keys(ManifestPbc,
Expand Down Expand Up @@ -493,9 +492,9 @@ make_bkeys(ManifestBucketName, Keys) ->
-spec send_map_reduce_request(riak_client(), list()) -> streaming_req_response().
send_map_reduce_request(RcPid, BKeyTuples) ->
%% TODO: change this:
%% hardcode 60 seconds for now
%% default 60 seconds for now
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
Timeout = timer:seconds(60),
Timeout = riak_cs_config:list_objects_timeout(),
riakc_pb_socket:mapred_stream(ManifestPbc,
BKeyTuples,
mapred_query(),
Expand Down
3 changes: 1 addition & 2 deletions src/riak_cs_list_objects_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,4 @@ fold_objects_for_list_keys() ->

-spec fold_objects_timeout() -> non_neg_integer().
fold_objects_timeout() ->
riak_cs_config:get_env(riak_cs, fold_objects_timeout,
?FOLD_OBJECTS_TIMEOUT).
riak_cs_config:fold_objects_timeout().
3 changes: 2 additions & 1 deletion src/riak_cs_manifest.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ get_manifests_raw(RcPid, Bucket, Key) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
ok = riak_cs_riak_client:set_bucket_name(RcPid, Bucket),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
case riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key) of
Timeout = riak_cs_config:get_manifest_timeout(),
case riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key, Timeout) of
{ok, _} = Result -> Result;
{error, disconnected} ->
riak_cs_pbc:check_connection_status(ManifestPbc, get_manifests_raw),
Expand Down
15 changes: 10 additions & 5 deletions src/riak_cs_manifest_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,16 @@ get_and_delete(RcPid, UUID, Bucket, Key) ->
UpdatedManifests = orddict:erase(UUID, ResolvedManifests),
case UpdatedManifests of
[] ->
riakc_pb_socket:delete_obj(manifest_pbc(RcPid), RiakObject);
DeleteTimeout = riak_cs_config:delete_manifest_timeout(),
riakc_pb_socket:delete_obj(manifest_pbc(RcPid), RiakObject, DeleteTimeout);
_ ->
ObjectToWrite0 =
riak_cs_utils:update_obj_value(
RiakObject, riak_cs_utils:encode_term(UpdatedManifests)),
ObjectToWrite = update_md_with_multipart_2i(
ObjectToWrite0, UpdatedManifests, Bucket, Key),
riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite)
PutTimeout = riak_cs_config:put_manifest_timeout(),
riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite, PutTimeout)
end;
{error, notfound} ->
ok
Expand All @@ -322,7 +324,8 @@ get_and_update(RcPid, WrappedManifests, Bucket, Key) ->
ObjectToWrite0 = riakc_obj:new(ManifestBucket, Key, riak_cs_utils:encode_term(WrappedManifests)),
ObjectToWrite = update_md_with_multipart_2i(
ObjectToWrite0, WrappedManifests, Bucket, Key),
PutResult = riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite),
Timeout = riak_cs_config:put_manifest_timeout(),
PutResult = riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite, Timeout),
{PutResult, undefined, undefined}
end.

Expand Down Expand Up @@ -357,7 +360,8 @@ update(RcPid, OldManifests, OldRiakObject, WrappedManifests, Bucket, Key) ->
{Result, NewRiakObject} =
case riak_cs_manifest_utils:overwritten_UUIDs(NewManiAdded) of
[] ->
riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite, [return_body]);
Timeout = riak_cs_config:put_manifest_timeout(),
riak_cs_pbc:put(manifest_pbc(RcPid), ObjectToWrite, [return_body], Timeout);
OverwrittenUUIDs ->
riak_cs_gc:gc_specific_manifests(OverwrittenUUIDs,
ObjectToWrite,
Expand Down Expand Up @@ -387,7 +391,8 @@ update_from_previous_read(RcPid, RiakObject, Bucket, Key,
%% currently we don't do
%% anything to make sure
%% this call succeeded
riak_cs_pbc:put(manifest_pbc(RcPid), NewRiakObject).
Timeout = riak_cs_config:put_manifest_timeout(),
riak_cs_pbc:put(manifest_pbc(RcPid), NewRiakObject, Timeout).

update_md_with_multipart_2i(RiakObject, WrappedManifests, Bucket, Key) ->
%% During testing, it's handy to delete Riak keys in the
Expand Down
3 changes: 2 additions & 1 deletion src/riak_cs_mp_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,9 @@ list_all_multipart_uploads(Bucket, Opts, RcPid) ->
list_multipart_uploads_with_2ikey(Bucket, Opts, RcPid, Key2i) ->
HashBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
Timeout = riak_cs_config:get_index_list_multipart_uploads_timeout(),
case riakc_pb_socket:get_index(ManifestPbc, HashBucket,
Key2i, <<"1">>) of
Key2i, <<"1">>, Timeout) of

{ok, ?INDEX_RESULTS{keys=Names}} ->
{ok, list_multipart_uploads2(Bucket, RcPid,
Expand Down
Loading

0 comments on commit 100a5e8

Please sign in to comment.