Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for amendment log, and a bit code reformat #988

Merged
merged 8 commits into from
Oct 9, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions riak_test/tests/access_stats_test.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,14 @@
confirm() ->
Config = [{riak, rtcs:riak_config()}, {stanchion, rtcs:stanchion_config()},
{cs, rtcs:cs_config([{fold_objects_for_list_keys, true}])}],
{UserConfig, {_RiakNodes, _CSNodes, _Stanchion}} = rtcs:setup(2, Config),
{UserConfig, {RiakNodes, CSNodes, _Stanchion}} = rtcs:setup(2, Config),
rt:setup_log_capture(hd(CSNodes)),

{Begin, End} = generate_some_accesses(UserConfig),
flush_access_stats(),
assert_access_stats(json, UserConfig, {Begin, End}),
assert_access_stats(xml, UserConfig, {Begin, End}),
verify_stats_lost_logging(UserConfig, RiakNodes, CSNodes),
pass.

generate_some_accesses(UserConfig) ->
Expand Down Expand Up @@ -98,6 +101,22 @@ assert_access_stats(Format, UserConfig, {Begin, End}) ->
?assertEqual( 1, sum_samples(Format, "BucketDelete", "Count", Samples)),
pass.

verify_stats_lost_logging(UserConfig, RiakNodes, CSNodes) ->
KeyId = UserConfig#aws_config.access_key_id,
{_Begin, _End} = generate_some_accesses(UserConfig),
%% kill riak
[ rt:brutal_kill(Node) || Node <- RiakNodes ],
%% force archive
flush_access_stats(),
%% check logs, at same node with flush_access_stats
CSNode = hd(CSNodes),
lager:info("Checking log in ~p", [CSNode]),
ExpectLine = io_lib:format("lost access stat: User=~s, Slice=", [KeyId]),
lager:debug("expected log line: ~s", [ExpectLine]),
true = rt:expect_in_log(CSNode, ExpectLine),
pass.


node_samples_from_content(json, Node, Content) ->
Usage = mochijson2:decode(Content),
ListOfNodeStats = rtcs:json_get([<<"Access">>, <<"Nodes">>], Usage),
Expand Down Expand Up @@ -128,7 +147,7 @@ sum_samples_json(Keys, [Sample | Samples], Sum) ->
0;
Value when is_integer(Value) ->
Value
end,
end,
sum_samples_json(Keys, Samples, Sum + InSample).

sum_samples_xml(OperationType, StatsKey, Samples) ->
Expand Down
35 changes: 30 additions & 5 deletions src/riak_cs_access.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
log_flush_interval/0,
max_flush_size/0,
make_object/3,
get_usage/4
get_usage/4,
flush_to_log/2,
flush_access_object_to_log/3
]).

-include("riak_cs.hrl").
Expand Down Expand Up @@ -74,8 +76,8 @@ log_flush_interval() ->
{ok, AP div AF};
_ ->
{error, "riak_cs:access_log_flush_interval"
" does not evenly divide"
" riak_cs:access_archive_period"}
" does not evenly divide"
" riak_cs:access_archive_period"}
end;
APError ->
APError
Expand Down Expand Up @@ -105,7 +107,7 @@ max_flush_size() ->
-spec make_object(iodata(),
[[{atom()|binary(), number()}]],
slice())
-> riakc_obj:riakc_obj().
-> riakc_obj:riakc_obj().
make_object(User, Accesses, {Start, End}) ->
{ok, Period} = archive_period(),
Aggregate = aggregate_accesses(Accesses),
Expand Down Expand Up @@ -137,7 +139,7 @@ merge_stats(Stats, Acc) ->
term(), %% TODO: riak_cs:user_key() type doesn't exist
calendar:datetime(),
calendar:datetime()) ->
{Usage::orddict:orddict(), Errors::[{slice(), term()}]}.
{Usage::orddict:orddict(), Errors::[{slice(), term()}]}.
get_usage(RcPid, User, Start, End) ->
{ok, Period} = archive_period(),
{Usage, Errors} = rts:find_samples(RcPid, ?ACCESS_BUCKET, User,
Expand All @@ -153,6 +155,29 @@ group_by_node(Samples) ->
orddict:new(),
Samples).

%% @doc If writing access failed, output the data to log
flush_to_log(Table, Slice) ->
User = ets:first(Table),
flush_to_log(User, Table, Slice).

%% @doc iterate over all users on the ets table
flush_to_log('$end_of_table', _, _) ->
ok;
flush_to_log(User, Table, Slice) ->
Accesses = [ A || {_, A} <- ets:lookup(Table, User) ],
RiakObj = riak_cs_access:make_object(User, Accesses, Slice),
flush_access_object_to_log(User, RiakObj, Slice),
flush_to_log(ets:next(Table, User), Table, Slice).

flush_access_object_to_log(User, RiakObj, Slice) ->
{Start0, End0} = Slice,
Start = rts:iso8601(Start0),
End = rts:iso8601(End0),
Accesses = riakc_obj:get_update_value(RiakObj),
_ = lager:warning("lost access stat: User=~s, Slice=(~s, ~s), Accesses:'~s'",
[User, Start, End, Accesses]).


-ifdef(TEST).
-ifdef(EQC).

Expand Down
4 changes: 1 addition & 3 deletions src/riak_cs_access_archiver.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ store(User, RcPid, Record, Slice) ->
ok = lager:debug("Archived access stats for ~s ~p",
[User, Slice]);
{error, Reason} ->
ok = lager:error("Access archiver storage failed (~p), "
"stats for ~s ~p were lost.",
[Reason, User, Slice]),
riak_cs_pbc:check_connection_status(MasterPbc,
"riak_cs_access_archiver:store/4"),
riak_cs_access:flush_access_object_to_log(User, Record, Slice),
{error, Reason};
{'EXIT', {noproc, _}} ->
%% just haven't gotten the 'DOWN' yet
Expand Down
8 changes: 5 additions & 3 deletions src/riak_cs_access_archiver_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ archive(Table, Slice) ->

_ = lager:error("~p was not available, access stats for ~p lost",
[?MODULE, Slice]),
riak_cs_access:flush_to_log(Table, Slice),
%% if the archiver had been alive just now, but crashed
%% during operation, the stats also would have been lost,
%% so also losing them here is just an efficient way to
Expand All @@ -84,6 +85,7 @@ archive(Table, Slice) ->
_ ->
_ = lager:error("~p was not available, access stats for ~p lost",
[?MODULE, Slice]),
riak_cs_access:flush_to_log(Table, Slice),
ets:delete(Table),
false
end.
Expand Down Expand Up @@ -144,9 +146,9 @@ init([]) ->
handle_call(status, _From, State=#state{backlog=Backlog, workers=Workers}) ->
Props = [{backlog, length(Backlog)},
{workers, Workers}],
{reply, {ok, Props}, State};
{reply, {ok, Props}, State};
handle_call(archive, _From, State=#state{workers=Workers,
max_workers=MaxWorkers})
max_workers=MaxWorkers})
when length(Workers) >= MaxWorkers ->
%% All workers are busy so the manager takes ownership and adds an
%% entry to the backlog
Expand Down Expand Up @@ -181,7 +183,7 @@ handle_info({'ETS-TRANSFER', Table, _From, Slice}, State) ->
ok = lager:error("Skipping archival of accesses ~p to"
" catch up on backlog",
[DropSlice]),
State#state{backlog=RestBacklog++[{Table, Slice}]}
State#state{backlog=RestBacklog++[{Table, Slice}]}
end,
{noreply, NewState};
handle_info(_Info, State) ->
Expand Down
12 changes: 6 additions & 6 deletions src/riak_cs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ close_riak_connection(Pool, Pid) ->
%% {error, notfound} counts as success in this case,
%% with the list of UUIDs being [].
-spec delete_object(binary(), binary(), riak_client()) ->
{ok, [binary()]} | {error, term()}.
{ok, [binary()]} | {error, term()}.
delete_object(Bucket, Key, RcPid) ->
ok = riak_cs_stats:update_with_start(object_delete, os:timestamp()),
riak_cs_gc:gc_active_manifests(Bucket, Key, RcPid).
Expand Down Expand Up @@ -316,7 +316,7 @@ md5_final(Ctx) -> crypto:md5_final(Ctx).

-spec active_manifest_from_response({ok, orddict:orddict()} |
{error, notfound}) ->
{ok, lfs_manifest()} | {error, notfound}.
{ok, lfs_manifest()} | {error, notfound}.
active_manifest_from_response({ok, Manifests}) ->
handle_active_manifests(riak_cs_manifest_utils:active_manifest(Manifests));
active_manifest_from_response({error, notfound}=NotFound) ->
Expand All @@ -325,7 +325,7 @@ active_manifest_from_response({error, notfound}=NotFound) ->
%% @private
-spec handle_active_manifests({ok, lfs_manifest()} |
{error, no_active_manifest}) ->
{ok, lfs_manifest()} | {error, notfound}.
{ok, lfs_manifest()} | {error, notfound}.
handle_active_manifests({ok, _Active}=ActiveReply) ->
ActiveReply;
handle_active_manifests({error, no_active_manifest}) ->
Expand Down Expand Up @@ -386,7 +386,7 @@ pow(Base, Power, Acc) ->
-type resolve_ok() :: {term(), binary()}.
-type resolve_error() :: {atom(), atom()}.
-spec resolve_robj_siblings(RObj::term()) ->
{resolve_ok() | resolve_error(), NeedsRepair::boolean()}.
{resolve_ok() | resolve_error(), NeedsRepair::boolean()}.

resolve_robj_siblings(Cs) ->
[{BestRating, BestMDV}|Rest] = lists:sort([{rate_a_dict(MD, V), MDV} ||
Expand Down Expand Up @@ -453,7 +453,7 @@ riak_connection(Pool) ->
%% @doc Set the ACL for an object. Existing ACLs are only
%% replaced, they cannot be updated.
-spec set_object_acl(binary(), binary(), lfs_manifest(), acl(), riak_client()) ->
ok | {error, term()}.
ok | {error, term()}.
set_object_acl(Bucket, Key, Manifest, Acl, RcPid) ->
StartTime = os:timestamp(),
{ok, ManiPid} = riak_cs_manifest_fsm:start_link(Bucket, Key, RcPid),
Expand Down Expand Up @@ -561,7 +561,7 @@ from_bucket_name(BucketNameWithPrefix) ->
%% @private
-spec key_exists_handle_get_manifests({ok, riakc_obj:riakc_obj(), list()} |
{error, term()}) ->
boolean().
boolean().
key_exists_handle_get_manifests({ok, _Object, Manifests}) ->
active_to_bool(active_manifest_from_response({ok, Manifests}));
key_exists_handle_get_manifests(Error) ->
Expand Down