Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make chunk size configurable #75

Merged
merged 1 commit into from
May 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public String params() {
}

public static void printUsage() {
StringBuilder actionAndParams = new StringBuilder("");
StringBuilder actionAndParams = new StringBuilder();
for (Map.Entry<String, StorageAction> entry : ACTIONS.entrySet()) {
actionAndParams.append("\n\t").append(entry.getKey());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,11 @@ public interface BlobReadChannel extends ReadableByteChannel, Serializable, Clos
void close();

void seek(int position) throws IOException;

/**
* Sets the minimum size that will be read by a single RPC.
* Read data will be locally buffered until consumed.
*/
void chunkSize(int chunkSize);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.


}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
class BlobReadChannelImpl implements BlobReadChannel {

private static final int MIN_BUFFER_SIZE = 2 * 1024 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageServiceOptions serviceOptions;
Expand All @@ -42,6 +42,7 @@ class BlobReadChannelImpl implements BlobReadChannel {
private int position;
private boolean isOpen;
private boolean endOfStream;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
Expand Down Expand Up @@ -105,14 +106,19 @@ public void seek(int position) throws IOException {
endOfStream = false;
}

@Override
public void chunkSize(int chunkSize) {
this.chunkSize = chunkSize <= 0 ? DEFAULT_CHUNK_SIZE : chunkSize;
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
validateOpen();
if (buffer == null) {
if (endOfStream) {
return -1;
}
final int toRead = Math.max(byteBuffer.remaining(), MIN_BUFFER_SIZE);
final int toRead = Math.max(byteBuffer.remaining(), chunkSize);
buffer = runWithRetries(new Callable<byte[]>() {
@Override
public byte[] call() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@
*/
public interface BlobWriteChannel extends WritableByteChannel, Serializable, Closeable {

/**
* Sets the minimum size that will be written by a single RPC.
* Written data will be buffered and only flushed upon reaching this size or closing the channel.
*/
void chunkSize(int chunkSize);

This comment was marked as spam.

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
class BlobWriterChannelImpl implements BlobWriteChannel {

private static final long serialVersionUID = 8675286882724938737L;
private static final int CHUNK_SIZE = 256 * 1024;
private static final int MIN_BUFFER_SIZE = 8 * CHUNK_SIZE;
private static final int MIN_CHUNK_SIZE = 256 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 8 * MIN_CHUNK_SIZE;

private final StorageServiceOptions options;
private final Blob blob;
Expand All @@ -45,6 +45,7 @@ class BlobWriterChannelImpl implements BlobWriteChannel {
private byte[] buffer = new byte[0];
private int limit;
private boolean isOpen = true;
private int chunkSize = DEFAULT_CHUNK_SIZE;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
Expand All @@ -65,8 +66,8 @@ private void writeObject(ObjectOutputStream out) throws IOException {
}

private void flush(boolean compact) {
if (limit >= MIN_BUFFER_SIZE || compact && limit >= CHUNK_SIZE) {
final int length = limit - limit % CHUNK_SIZE;
if (limit >= chunkSize || compact && limit >= MIN_CHUNK_SIZE) {
final int length = limit - limit % MIN_CHUNK_SIZE;
runWithRetries(callable(new Runnable() {
@Override
public void run() {
Expand All @@ -75,7 +76,7 @@ public void run() {
}), options.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
position += length;
limit -= length;
byte[] temp = new byte[compact ? limit : MIN_BUFFER_SIZE];
byte[] temp = new byte[compact ? limit : chunkSize];
System.arraycopy(buffer, length, temp, 0, limit);
buffer = temp;
}
Expand Down Expand Up @@ -107,8 +108,7 @@ public int write(ByteBuffer byteBuffer) throws IOException {
if (spaceInBuffer >= toWrite) {
byteBuffer.get(buffer, limit, toWrite);
} else {
buffer = Arrays.copyOf(buffer,
Math.max(MIN_BUFFER_SIZE, buffer.length + toWrite - spaceInBuffer));
buffer = Arrays.copyOf(buffer, Math.max(chunkSize, buffer.length + toWrite - spaceInBuffer));
byteBuffer.get(buffer, limit, toWrite);
}
limit += toWrite;
Expand All @@ -135,4 +135,10 @@ public void run() {
buffer = null;
}
}

@Override
public void chunkSize(int chunkSize) {
chunkSize = (chunkSize / MIN_CHUNK_SIZE) * MIN_CHUNK_SIZE;
this.chunkSize = Math.max(MIN_CHUNK_SIZE, chunkSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public static BlobTargetOption predefinedAcl(PredefinedAcl acl) {
return new BlobTargetOption(StorageRpc.Option.PREDEFINED_ACL, acl.entry());
}

public static BlobTargetOption doesNotExists() {
return new BlobTargetOption(StorageRpc.Option.IF_GENERATION_MATCH, 0);
public static BlobTargetOption doesNotExist() {
return new BlobTargetOption(StorageRpc.Option.IF_GENERATION_MATCH, 0L);
}

public static BlobTargetOption generationMatch() {
Expand Down