Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all kind of timeouts hitting Riak configurable #1021

Merged
merged 6 commits into from
Dec 10, 2014
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/list_objects.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,3 @@
-define(MAX_CACHE_BYTES, 104857600). % 100MB
-define(KEY_LIST_MULTIPLIER, 1.1).
-define(FOLD_OBJECTS_FOR_LIST_KEYS, true).
-define(FOLD_OBJECTS_TIMEOUT, timer:minutes(1)).
3 changes: 3 additions & 0 deletions include/riak_cs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@
-define(DEFAULT_MANIFEST_WARN_HISTORY, 30).
-define(DEFAULT_MAX_PART_NUMBER, 10000).

%% timeout hitting Riak PB API
-define(DEFAULT_RIAK_TIMEOUT, 60000).

%% General system info
-define(WORD_SIZE, erlang:system_info(wordsize)).

Expand Down
3 changes: 2 additions & 1 deletion priv/tools/repair_gc_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ process_gc_keys(Pbc, Options, Continuation, [GCKey | Keys]) ->
ok |
{error, term()}.
repair_manifests_for_gc_key(Pbc, Options, GCKey) ->
case riak_cs_pbc:get_object(Pbc, ?GC_BUCKET, GCKey) of
Timeout = riak_cs_config:get_gckey_timeout(),
case riak_cs_pbc:get_object(Pbc, ?GC_BUCKET, GCKey, Timeout) of
{ok, GCObj} ->
FileSet = riak_cs_gc:decode_and_merge_siblings(
GCObj, twop_set:new()),
Expand Down
6 changes: 3 additions & 3 deletions riak_test/tests/cs743_regression_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ assert_storage_stats(UserConfig, Begin, End) ->
fun(Sample) ->
case rtcs:json_get(list_to_binary(?TEST_BUCKET), Sample) of
notfound -> false;
ErrorStr ->
?assert(not is_integer(ErrorStr)),
?assertEqual(<<"{error,{timeout,[]}}">>, ErrorStr),
ResultStr ->
?assert(not is_integer(ResultStr)),
?assertNotEqual(<<"{error,{timeout,[]}}">>, ResultStr),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#743 and #759 were fix for crash caused by timeout. So the assertion here should be ?assertEqual to check calculation has successfully detected the timeout without crashing. This timeout setting which is not working any more should be changed to storage_calc_timeout.

true
end
end,
Expand Down
13 changes: 9 additions & 4 deletions riak_test/tests/gc_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,20 @@ confirm() ->
%% Set up to grep logs to verify messages
rt:setup_log_capture(hd(CSNodes)),

rtcs:gc(1, "set-interval infinity"),
rtcs:gc(1, "set-leeway 1"),
rtcs:gc(1, "cancel"),

lager:info("Test GC run under an invalid state manifest..."),
{GCKey, {BKey, UUID}} = setup_obj(RiakNodes, UserConfig),
%% Ensure the leeway has expired
timer:sleep(2000),
ok = verify_gc_run(hd(CSNodes), GCKey),
ok = verify_riak_object_remaining_for_bad_key(RiakNodes, GCKey, {BKey, UUID}),

lager:info("Test repair script (repair_gc_bucket.erl) with more invlaid states..."),
ok = put_more_bad_keys(RiakNodes, UserConfig),
%% Ensure the leeway has expired
timer:sleep(2000),
RiakIDs = rtcs:riak_id_per_cluster(NumNodes),
[repair_gc_bucket(ID) || ID <- RiakIDs],
Expand Down Expand Up @@ -163,9 +170,7 @@ repair_gc_bucket(RiakNodeID) ->
ok.

verify_gc_run(Node, GCKey) ->
rtcs:gc(1, "set-leeway 1"),
timer:sleep(2000),
rtcs:gc(1, "batch"),
rtcs:gc(1, "batch 1"),
lager:info("Check log, warning for invalid state and info for GC finish"),
true = rt:expect_in_log(Node,
"Invalid state manifest in GC bucket at <<\""
Expand All @@ -179,7 +184,7 @@ verify_gc_run(Node, GCKey) ->
ok.

verify_gc_run2(Node) ->
rtcs:gc(1, "batch"),
rtcs:gc(1, "batch 1"),
lager:info("Check collected count =:= 101, 1 from setup_obj, "
"100 from put_more_bad_keys."),
true = rt:expect_in_log(Node,
Expand Down
36 changes: 23 additions & 13 deletions src/riak_cs_block_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ handle_cast({delete_block, ReplyPid, Bucket, Key, UUID, BlockNumber}, State=#sta
dt_entry(<<"delete_block">>, [BlockNumber], [Bucket, Key]),
{FullBucket, FullKey} = full_bkey(Bucket, Key, UUID, BlockNumber),
StartTime = os:timestamp(),
Timeout = riak_cs_config:get_block_timeout(),

%% do a get first to get the vclock (only do a head request though)
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}, head],
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions) of
_ = case riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, Timeout) of
{ok, RiakObject} ->
ok = delete_block(RcPid, ReplyPid, RiakObject, {UUID, BlockNumber});
{error, notfound} ->
Expand Down Expand Up @@ -269,7 +270,7 @@ do_get_block(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, ProxyActive,
ProxyActive, UUID, BlockNumber, RcPid, MaxRetries, NewPause)
end,

Timeout = timer:seconds(5),
Timeout = riak_cs_config:local_get_block_timeout(),
try_local_get(RcPid, FullBucket, FullKey, GetOptions1, GetOptions2,
Timeout, ProceedFun, RetryFun, NumRetries, UseProxyGet,
ProxyActive, ClusterID).
Expand Down Expand Up @@ -299,7 +300,8 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2,
ProceedFun, RetryFun, NumRetries, UseProxyGet,
ProxyActive, ClusterID) ->

case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, 60*1000) of
Timeout = riak_cs_config:get_block_timeout(),
case get_block_local(RcPid, FullBucket, FullKey, GetOptions2, Timeout) of
{ok, _} = Success ->
ProceedFun(Success);

Expand Down Expand Up @@ -338,9 +340,12 @@ get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout) ->
Else
end.

get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions) ->
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0) ->
%% replace get_block_timeout with proxy_get_block_timeout
GetOptions = proplists:delete(timeout, GetOptions0),
Timeout = riak_cs_config:proxy_get_block_timeout(),
case riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey,
ClusterID, GetOptions) of
ClusterID, GetOptions, Timeout) of
{ok, RiakObject} ->
resolve_block_object(RiakObject, RcPid);
Else ->
Expand All @@ -357,11 +362,13 @@ normal_nval_block_get(ReplyPid, Bucket, Key, ClusterID, UseProxyGet, UUID,
StartTime = os:timestamp(),
GetOptions = [{r, 1}, {notfound_ok, false}, {basic_quorum, false}],
Object = case UseProxyGet of
false ->
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions);
true ->
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions)
end,
false ->
LocalTimeout = riak_cs_config:get_block_timeout(),
riakc_pb_socket:get(block_pbc(RcPid), FullBucket, FullKey, GetOptions, LocalTimeout);
true ->
RemoteTimeout = riak_cs_config:proxy_get_block_timeout(),
riak_repl_pb_api:get(block_pbc(RcPid), FullBucket, FullKey, ClusterID, GetOptions, RemoteTimeout)
end,
ChunkValue = case Object of
{ok, RiakObject} ->
{ok, riakc_obj:get_value(RiakObject)};
Expand All @@ -380,12 +387,14 @@ delete_block(RcPid, ReplyPid, RiakObject, BlockId) ->

constrained_delete(RcPid, RiakObject, BlockId) ->
DeleteOptions = [{r, all}, {pr, all}, {w, all}, {pw, all}],
Timeout = riak_cs_config:delete_block_timeout(),
format_delete_result(
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions),
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, DeleteOptions, Timeout),
BlockId).

secondary_delete_check({error, {unsatisfied_constraint, _, _}}, RcPid, RiakObject) ->
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject);
Timeout = riak_cs_config:delete_block_timeout(),
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject, [], Timeout);
secondary_delete_check({error, Reason} = E, _, _) ->
_ = lager:warning("Constrained block deletion failed. Reason: ~p", [Reason]),
E;
Expand Down Expand Up @@ -507,8 +516,9 @@ do_put_block(FullBucket, FullKey, VClock, Value, MD, RcPid, FailFun) ->
RiakObject0 = riakc_obj:new(FullBucket, FullKey, Value),
RiakObject = riakc_obj:set_vclock(
riakc_obj:update_metadata(RiakObject0, MD), VClock),
Timeout = riak_cs_config:put_block_timeout(),
StartTime = os:timestamp(),
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject) of
case riakc_pb_socket:put(block_pbc(RcPid), RiakObject, Timeout) of
ok ->
ok = riak_cs_stats:update_with_start(block_put, StartTime),
ok;
Expand Down
3 changes: 2 additions & 1 deletion src/riak_cs_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ bucket_empty(Bucket, RcPid) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
%% @TODO Use `stream_list_keys' instead
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
ListKeysResult = riak_cs_pbc:list_keys(ManifestPbc, ManifestBucket),
Timeout = riak_cs_config:list_keys_list_objects_timeout(),
ListKeysResult = riak_cs_pbc:list_keys(ManifestPbc, ManifestBucket, Timeout),
{ok, bucket_empty_handle_list_keys(RcPid, Bucket, ListKeysResult)}.

-spec bucket_empty_handle_list_keys(riak_client(), binary(),
Expand Down
76 changes: 76 additions & 0 deletions src/riak_cs_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,34 @@
read_before_last_manifest_write/0
]).

%% Timeouts hitting Riak
-export([ping_timeout/0,
get_user_timeout/0,
get_bucket_timeout/0,
get_manifest_timeout/0,
get_block_timeout/0, %% for n_val=3
local_get_block_timeout/0, %% for n_val=1, default 5
proxy_get_block_timeout/0, %% for remote
get_access_timeout/0,
get_gckey_timeout/0,
put_manifest_timeout/0,
put_block_timeout/0,
put_access_timeout/0,
put_gckey_timeout/0,
put_user_usage_timeout/0,
delete_manifest_timeout/0,
delete_block_timeout/0,
delete_gckey_timeout/0,
list_keys_list_objects_timeout/0,
list_keys_list_users_timeout/0,
storage_calc_timeout/0,
list_objects_timeout/0, %% using mapred (v0)
fold_objects_timeout/0, %% for cs_bucket_fold
get_index_range_gckeys_timeout/0,
get_index_range_gckeys_call_timeout/0,
get_index_list_multipart_uploads_timeout/0
]).

%% OpenStack config
-export([os_auth_url/0,
os_operator_roles/0,
Expand Down Expand Up @@ -346,6 +374,54 @@ read_before_last_manifest_write() ->
get_env(riak_cs, read_before_last_manifest_write, true).


%% ===================================================================
%% ALL Timeouts hitting Riak
%% ===================================================================

-define(TIMEOUT_CONFIG_FUNC(ConfigName),
ConfigName() ->
get_env(riak_cs, ConfigName,
get_env(riak_cs, riakc_timeouts, ?DEFAULT_RIAK_TIMEOUT))).

%% @doc Return the configured ping timeout. Default is 5 seconds. The
%% timeout is used in call to `poolboy:checkout' and if that fails in
%% the call to `riakc_pb_socket:ping' so the effective cumulative
%% timeout could be up to 2 * `ping_timeout()'.
-spec ping_timeout() -> pos_integer().
ping_timeout() ->
get_env(riak_cs, ping_timeout, ?DEFAULT_PING_TIMEOUT).

%% timeouts in milliseconds
?TIMEOUT_CONFIG_FUNC(get_user_timeout).
?TIMEOUT_CONFIG_FUNC(get_bucket_timeout).
?TIMEOUT_CONFIG_FUNC(get_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(get_block_timeout).

local_get_block_timeout() ->
get_env(riak_cs, local_get_block_timeout, timer:seconds(5)).

?TIMEOUT_CONFIG_FUNC(proxy_get_block_timeout).
?TIMEOUT_CONFIG_FUNC(get_access_timeout).
?TIMEOUT_CONFIG_FUNC(get_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(put_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(put_block_timeout).
?TIMEOUT_CONFIG_FUNC(put_access_timeout).
?TIMEOUT_CONFIG_FUNC(put_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(put_user_usage_timeout).
?TIMEOUT_CONFIG_FUNC(delete_manifest_timeout).
?TIMEOUT_CONFIG_FUNC(delete_block_timeout).
?TIMEOUT_CONFIG_FUNC(delete_gckey_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(list_keys_list_users_timeout).
?TIMEOUT_CONFIG_FUNC(storage_calc_timeout).
?TIMEOUT_CONFIG_FUNC(list_objects_timeout).
?TIMEOUT_CONFIG_FUNC(fold_objects_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_range_gckeys_call_timeout).
?TIMEOUT_CONFIG_FUNC(get_index_list_multipart_uploads_timeout).

-undef(TIMEOUT_CONFIG_FUNC).

%% ===================================================================
%% S3 config options
%% ===================================================================
Expand Down
8 changes: 5 additions & 3 deletions src/riak_cs_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ mark_manifests(RiakObject, Bucket, Key, UUIDsToMark, ManiFunction, RcPid) ->
%% with vector clock. This allows us to do a PUT
%% again without having to re-retrieve the object
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
riak_cs_pbc:put(ManifestPbc, UpdObj, [return_body]).
riak_cs_pbc:put(ManifestPbc, UpdObj, [return_body], riak_cs_config:get_gckey_timeout()).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

put manifest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, riak_cs_config:put_manifest_timeout() might be more suitable here.


%% @doc Copy data for a list of manifests to the
%% `riak-cs-gc' bucket to schedule them for deletion.
Expand All @@ -365,8 +365,9 @@ move_manifests_to_gc_bucket(Manifests, RcPid) ->
Key = generate_key(),
ManifestSet = build_manifest_set(Manifests),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
Timeout = riak_cs_config:get_gckey_timeout(),
ObjectToWrite =
case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key) of
case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key, Timeout) of
{error, notfound} ->
%% There was no previous value, so we'll
%% create a new riak object and write it
Expand All @@ -383,7 +384,8 @@ move_manifests_to_gc_bucket(Manifests, RcPid) ->

%% Create a set from the list of manifests
_ = lager:debug("Manifests scheduled for deletion: ~p", [ManifestSet]),
riak_cs_pbc:put(ManifestPbc, ObjectToWrite).
Timeout1 = riak_cs_config:put_gckey_timeout(),
riak_cs_pbc:put(ManifestPbc, ObjectToWrite, Timeout1).

-spec build_manifest_set([cs_uuid_and_manifest()]) -> twop_set:twop_set().
build_manifest_set(Manifests) ->
Expand Down
5 changes: 4 additions & 1 deletion src/riak_cs_gc_key_list.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ gc_index_query(RcPid, EndTime, BatchSize, Continuation, UsePaginatedIndexes) ->
end,
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
EpochStart = riak_cs_gc:epoch_start(),
Timeout = riak_cs_config:get_index_range_gckeys_timeout(),
CallTimeout = riak_cs_config:get_index_range_gckeys_call_timeout(),
Options1 = [{timeout, Timeout}, {call_timeout, CallTimeout}] ++ Options,
QueryResult = riakc_pb_socket:get_index_range(
ManifestPbc,
?GC_BUCKET, ?KEY_INDEX,
EpochStart, EndTime,
Options),
Options1),

case QueryResult of
{error, disconnected} ->
Expand Down
6 changes: 4 additions & 2 deletions src/riak_cs_gc_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ continue() ->
fetch_next_fileset(ManifestSetKey, RcPid) ->
%% Get the set of manifests represented by the key
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
case riak_cs_pbc:get_object(ManifestPbc, ?GC_BUCKET, ManifestSetKey) of
Timeout = riak_cs_config:get_gckey_timeout(),
case riak_cs_pbc:get_object(ManifestPbc, ?GC_BUCKET, ManifestSetKey, Timeout) of
{ok, RiakObj} ->
ManifestSet = riak_cs_gc:decode_and_merge_siblings(
RiakObj, twop_set:new()),
Expand All @@ -240,7 +241,8 @@ fetch_next_fileset(ManifestSetKey, RcPid) ->
finish_file_delete(0, _, RiakObj, RcPid) ->
%% Delete the key from the GC bucket
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
_ = riakc_pb_socket:delete_obj(ManifestPbc, RiakObj),
Timeout = riak_cs_config:delete_gckey_timeout(),
_ = riakc_pb_socket:delete_obj(ManifestPbc, RiakObj, [], Timeout),
ok;
finish_file_delete(_, FileSet, _RiakObj, _RcPid) ->
_ = lager:debug("Remaining file keys: ~p", [twop_set:to_list(FileSet)]),
Expand Down
7 changes: 3 additions & 4 deletions src/riak_cs_list_objects_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ fetch_key_list(RcPid, Request, State, false) ->
-spec make_list_keys_request(riak_client(), list_object_request()) ->
streaming_req_response().
make_list_keys_request(RcPid, ?LOREQ{name=BucketName}) ->
%% hardcoded for now
ServerTimeout = timer:seconds(60),
ServerTimeout = riak_cs_config:list_keys_list_objects_timeout(),
ManifestBucket = riak_cs_utils:to_bucket_name(objects, BucketName),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
riakc_pb_socket:stream_list_keys(ManifestPbc,
Expand Down Expand Up @@ -493,9 +492,9 @@ make_bkeys(ManifestBucketName, Keys) ->
-spec send_map_reduce_request(riak_client(), list()) -> streaming_req_response().
send_map_reduce_request(RcPid, BKeyTuples) ->
%% TODO: change this:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this comment any more because TODO has been done here?

%% hardcode 60 seconds for now
%% default 60 seconds for now
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
Timeout = timer:seconds(60),
Timeout = riak_cs_config:list_objects_timeout(),
riakc_pb_socket:mapred_stream(ManifestPbc,
BKeyTuples,
mapred_query(),
Expand Down
3 changes: 1 addition & 2 deletions src/riak_cs_list_objects_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,4 @@ fold_objects_for_list_keys() ->

-spec fold_objects_timeout() -> non_neg_integer().
fold_objects_timeout() ->
riak_cs_config:get_env(riak_cs, fold_objects_timeout,
?FOLD_OBJECTS_TIMEOUT).
riak_cs_config:fold_objects_timeout().
3 changes: 2 additions & 1 deletion src/riak_cs_manifest.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ get_manifests_raw(RcPid, Bucket, Key) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
ok = riak_cs_riak_client:set_bucket_name(RcPid, Bucket),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
case riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key) of
Timeout = riak_cs_config:get_manifest_timeout(),
case riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key, Timeout) of
{ok, _} = Result -> Result;
{error, disconnected} ->
riak_cs_pbc:check_connection_status(ManifestPbc, get_manifests_raw),
Expand Down
Loading