Skip to content

Commit

Permalink
Merge pull request #485 from basho/bugfix/t2b-compressed
Browse files Browse the repository at this point in the history
Compress terms before writing to Riak
  • Loading branch information
reiddraper committed Mar 15, 2013
2 parents 57362a7 + 9c60e15 commit e651d17
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
1 change: 1 addition & 0 deletions include/riak_cs.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
-define(PART_MANIFEST, #part_manifest_v1).
-define(PART_MANIFEST_RECNAME, part_manifest_v1).
-define(MULTIPART_DESCR, #multipart_descr_v1).
-define(COMPRESS_TERMS, false).
-define(PART_DESCR, #part_descr_v1).

-record(moss_user, {
Expand Down
6 changes: 3 additions & 3 deletions src/riak_cs_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ mark_manifests(RiakObject, Bucket, Key, UUIDsToMark, ManiFunction, RiakcPid) ->
Manifests = riak_cs_utils:manifests_from_riak_object(RiakObject),
Marked = ManiFunction(Manifests, UUIDsToMark),
UpdObj0 = riak_cs_utils:update_obj_value(RiakObject,
term_to_binary(Marked)),
riak_cs_utils:encode_term(Marked)),
UpdObj = riak_cs_manifest_fsm:update_md_with_multipart_2i(
UpdObj0, Marked, Bucket, Key),

Expand All @@ -209,15 +209,15 @@ move_manifests_to_gc_bucket(Manifests, RiakcPid, AddLeewayP) ->
{error, notfound} ->
%% There was no previous value, so we'll
%% create a new riak object and write it
riakc_obj:new(?GC_BUCKET, Key, term_to_binary(ManifestSet));
riakc_obj:new(?GC_BUCKET, Key, riak_cs_utils:encode_term(ManifestSet));
{ok, PreviousObject} ->
%% There is a value currently stored here,
%% so resolve all the siblings and add the
%% new set in as well. Write this
%% value back to riak
Resolved = decode_and_merge_siblings(PreviousObject, ManifestSet),
riak_cs_utils:update_obj_value(PreviousObject,
term_to_binary(Resolved))
riak_cs_utils:encode_term(Resolved))
end,

%% Create a set from the list of manifests
Expand Down
8 changes: 4 additions & 4 deletions src/riak_cs_manifest_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ get_and_delete(RiakcPid, UUID, Bucket, Key) ->
_ ->
ObjectToWrite0 =
riak_cs_utils:update_obj_value(
RiakObject, term_to_binary(UpdatedManifests)),
RiakObject, riak_cs_utils:encode_term(UpdatedManifests)),
ObjectToWrite = update_md_with_multipart_2i(
ObjectToWrite0, UpdatedManifests, Bucket, Key),
riak_cs_utils:put(RiakcPid, ObjectToWrite)
Expand All @@ -298,7 +298,7 @@ get_and_update(RiakcPid, WrappedManifests, Bucket, Key) ->
%% operate on NewManiAdded and save it to Riak when it is
%% finished.
ObjectToWrite0 = riak_cs_utils:update_obj_value(
RiakObject, term_to_binary(NewManiAdded)),
RiakObject, riak_cs_utils:encode_term(NewManiAdded)),
ObjectToWrite = update_md_with_multipart_2i(
ObjectToWrite0, NewManiAdded, Bucket, Key),
{Result, NewRiakObject} =
Expand All @@ -314,7 +314,7 @@ get_and_update(RiakcPid, WrappedManifests, Bucket, Key) ->
{Result, NewRiakObject, Manifests};
{error, notfound} ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
ObjectToWrite0 = riakc_obj:new(ManifestBucket, Key, term_to_binary(WrappedManifests)),
ObjectToWrite0 = riakc_obj:new(ManifestBucket, Key, riak_cs_utils:encode_term(WrappedManifests)),
ObjectToWrite = update_md_with_multipart_2i(
ObjectToWrite0, WrappedManifests, Bucket, Key),
PutResult = riak_cs_utils:put(RiakcPid, ObjectToWrite),
Expand All @@ -331,7 +331,7 @@ update_from_previous_read(RiakcPid, RiakObject, Bucket, Key,
Resolved = riak_cs_manifest_resolution:resolve([PreviousManifests,
NewManifests]),
NewRiakObject0 = riak_cs_utils:update_obj_value(RiakObject,
term_to_binary(Resolved)),
riak_cs_utils:encode_term(Resolved)),
NewRiakObject = update_md_with_multipart_2i(NewRiakObject0, Resolved,
Bucket, Key),
%% TODO:
Expand Down
16 changes: 15 additions & 1 deletion src/riak_cs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
cs_version/0,
delete_bucket/4,
delete_object/3,
encode_term/1,
from_bucket_name/1,
get_admin_creds/0,
get_buckets/1,
Expand Down Expand Up @@ -338,6 +339,19 @@ delete_object(Bucket, Key, RiakcPid) ->
Else
end.

-spec encode_term(term()) -> binary().
encode_term(Term) ->
case use_t2b_compression() of
true ->
term_to_binary(Term, [compressed]);
false ->
term_to_binary(Term)
end.

-spec use_t2b_compression() -> boolean().
use_t2b_compression() ->
get_env(riak_cs, compress_terms, ?COMPRESS_TERMS).

%% @private
maybe_gc_active_manifests({ok, RiakObject, Manifests}, Bucket, Key, StartTime, RiakcPid) ->
R = riak_cs_gc:gc_active_manifests(Manifests, RiakObject, Bucket, Key, RiakcPid),
Expand Down Expand Up @@ -751,7 +765,7 @@ riak_connection(Pool) ->
save_user(User, UserObj, RiakPid) ->
%% Metadata is currently never updated so if there
%% are siblings all copies should be the same
UpdUserObj = update_obj_value(UserObj, term_to_binary(User)),
UpdUserObj = update_obj_value(UserObj, riak_cs_utils:encode_term(User)),
riakc_pb_socket:put(RiakPid, UpdUserObj).

%% @doc Set the ACL for a bucket. Existing ACLs are only
Expand Down

0 comments on commit e651d17

Please sign in to comment.