Skip to content

Commit

Permalink
Implement stream based cross file system copy.
Browse files Browse the repository at this point in the history
This CL implements the FileStreamReader/Writer based cross file system copy
operation.

BUG=279278
TEST=Ran content_unittests and unit_tests

Review URL: https://codereview.chromium.org/23463048

git-svn-id: svn://svn.chromium.org/chrome/trunk/src@225378 0039d316-1c4b-4281-b951-d872f2087c98
  • Loading branch information
hidehiko@chromium.org committed Sep 26, 2013
1 parent 9afdaef commit 3fd8f7a
Show file tree
Hide file tree
Showing 4 changed files with 390 additions and 8 deletions.
254 changes: 248 additions & 6 deletions webkit/browser/fileapi/copy_or_move_operation_delegate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

#include "base/bind.h"
#include "base/files/file_path.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "webkit/browser/blob/file_stream_reader.h"
#include "webkit/browser/fileapi/copy_or_move_file_validator.h"
#include "webkit/browser/fileapi/file_stream_writer.h"
#include "webkit/browser/fileapi/file_system_context.h"
#include "webkit/browser/fileapi/file_system_operation_runner.h"
#include "webkit/browser/fileapi/file_system_url.h"
Expand Down Expand Up @@ -289,8 +293,230 @@ class SnapshotCopyOrMoveImpl
DISALLOW_COPY_AND_ASSIGN(SnapshotCopyOrMoveImpl);
};

// The size of buffer for StreamCopyHelper.
const int kReadBufferSize = 32768;

// To avoid too many progress callbacks, it should be called less
// frequently than 50ms.
const int kMinProgressCallbackInvocationSpanInMilliseconds = 50;

// Specifically for cross file system copy/move operation, this class uses
// stream reader and writer for copying. Validator is not supported, so if
// necessary SnapshotCopyOrMoveImpl should be used.
class StreamCopyOrMoveImpl
: public CopyOrMoveOperationDelegate::CopyOrMoveImpl {
public:
StreamCopyOrMoveImpl(
FileSystemOperationRunner* operation_runner,
CopyOrMoveOperationDelegate::OperationType operation_type,
const FileSystemURL& src_url,
const FileSystemURL& dest_url,
scoped_ptr<webkit_blob::FileStreamReader> reader,
scoped_ptr<FileStreamWriter> writer,
const FileSystemOperation::CopyFileProgressCallback&
file_progress_callback)
: operation_runner_(operation_runner),
operation_type_(operation_type),
src_url_(src_url),
dest_url_(dest_url),
reader_(reader.Pass()),
writer_(writer.Pass()),
file_progress_callback_(file_progress_callback),
weak_factory_(this) {
}

virtual void Run(
const CopyOrMoveOperationDelegate::StatusCallback& callback) OVERRIDE {
// Reader can be created even if the entry does not exist or the entry is
// a directory. To check errors before destination file creation,
// check metadata first.
operation_runner_->GetMetadata(
src_url_,
base::Bind(&StreamCopyOrMoveImpl::RunAfterGetMetadataForSource,
weak_factory_.GetWeakPtr(), callback));
}

private:
void RunAfterGetMetadataForSource(
const CopyOrMoveOperationDelegate::StatusCallback& callback,
base::PlatformFileError error,
const base::PlatformFileInfo& file_info) {
if (error != base::PLATFORM_FILE_OK) {
callback.Run(error);
return;
}
if (file_info.is_directory) {
// If not a directory, failed with appropriate error code.
callback.Run(base::PLATFORM_FILE_ERROR_NOT_A_FILE);
return;
}

// To use FileStreamWriter, we need to ensure the destination file exists.
operation_runner_->CreateFile(
dest_url_, false /* exclusive */,
base::Bind(&StreamCopyOrMoveImpl::RunAfterCreateFileForDestination,
weak_factory_.GetWeakPtr(), callback));
}

void RunAfterCreateFileForDestination(
const CopyOrMoveOperationDelegate::StatusCallback& callback,
base::PlatformFileError error) {
if (error != base::PLATFORM_FILE_OK) {
callback.Run(error);
return;
}

DCHECK(!copy_helper_);
copy_helper_.reset(
new CopyOrMoveOperationDelegate::StreamCopyHelper(
reader_.Pass(), writer_.Pass(),
kReadBufferSize,
file_progress_callback_,
base::TimeDelta::FromMilliseconds(
kMinProgressCallbackInvocationSpanInMilliseconds)));
copy_helper_->Run(
base::Bind(&StreamCopyOrMoveImpl::RunAfterStreamCopy,
weak_factory_.GetWeakPtr(), callback));
}

void RunAfterStreamCopy(
const CopyOrMoveOperationDelegate::StatusCallback& callback,
base::PlatformFileError error) {
if (error != base::PLATFORM_FILE_OK) {
callback.Run(error);
return;
}

if (operation_type_ == CopyOrMoveOperationDelegate::OPERATION_COPY) {
callback.Run(base::PLATFORM_FILE_OK);
return;
}

DCHECK_EQ(CopyOrMoveOperationDelegate::OPERATION_MOVE, operation_type_);

// Remove the source for finalizing move operation.
operation_runner_->Remove(
src_url_, false /* recursive */,
base::Bind(&StreamCopyOrMoveImpl::RunAfterRemoveForMove,
weak_factory_.GetWeakPtr(), callback));
}

void RunAfterRemoveForMove(
const CopyOrMoveOperationDelegate::StatusCallback& callback,
base::PlatformFileError error) {
if (error == base::PLATFORM_FILE_ERROR_NOT_FOUND)
error = base::PLATFORM_FILE_OK;
callback.Run(error);
}

FileSystemOperationRunner* operation_runner_;
CopyOrMoveOperationDelegate::OperationType operation_type_;
FileSystemURL src_url_;
FileSystemURL dest_url_;
scoped_ptr<webkit_blob::FileStreamReader> reader_;
scoped_ptr<FileStreamWriter> writer_;
FileSystemOperation::CopyFileProgressCallback file_progress_callback_;
scoped_ptr<CopyOrMoveOperationDelegate::StreamCopyHelper> copy_helper_;

base::WeakPtrFactory<StreamCopyOrMoveImpl> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(StreamCopyOrMoveImpl);
};

} // namespace

CopyOrMoveOperationDelegate::StreamCopyHelper::StreamCopyHelper(
scoped_ptr<webkit_blob::FileStreamReader> reader,
scoped_ptr<FileStreamWriter> writer,
int buffer_size,
const FileSystemOperation::CopyFileProgressCallback&
file_progress_callback,
const base::TimeDelta& min_progress_callback_invocation_span)
: reader_(reader.Pass()),
writer_(writer.Pass()),
file_progress_callback_(file_progress_callback),
io_buffer_(new net::IOBufferWithSize(buffer_size)),
num_copied_bytes_(0),
min_progress_callback_invocation_span_(
min_progress_callback_invocation_span),
weak_factory_(this) {
}

CopyOrMoveOperationDelegate::StreamCopyHelper::~StreamCopyHelper() {
}

void CopyOrMoveOperationDelegate::StreamCopyHelper::Run(
const StatusCallback& callback) {
file_progress_callback_.Run(0);
last_progress_callback_invocation_time_ = base::Time::Now();
Read(callback);
}

void CopyOrMoveOperationDelegate::StreamCopyHelper::Read(
const StatusCallback& callback) {
int result = reader_->Read(
io_buffer_.get(), io_buffer_->size(),
base::Bind(&StreamCopyHelper::DidRead,
weak_factory_.GetWeakPtr(), callback));
if (result != net::ERR_IO_PENDING)
DidRead(callback, result);
}

void CopyOrMoveOperationDelegate::StreamCopyHelper::DidRead(
const StatusCallback& callback, int result) {
if (result < 0) {
callback.Run(NetErrorToPlatformFileError(result));
return;
}

if (result == 0) {
// Here is the EOF.
callback.Run(base::PLATFORM_FILE_OK);
return;
}

Write(callback, new net::DrainableIOBuffer(io_buffer_.get(), result));
}

void CopyOrMoveOperationDelegate::StreamCopyHelper::Write(
const StatusCallback& callback,
scoped_refptr<net::DrainableIOBuffer> buffer) {
DCHECK_GT(buffer->BytesRemaining(), 0);

int result = writer_->Write(
buffer.get(), buffer->BytesRemaining(),
base::Bind(&StreamCopyHelper::DidWrite,
weak_factory_.GetWeakPtr(), callback, buffer));
if (result != net::ERR_IO_PENDING)
DidWrite(callback, buffer, result);
}

void CopyOrMoveOperationDelegate::StreamCopyHelper::DidWrite(
const StatusCallback& callback,
scoped_refptr<net::DrainableIOBuffer> buffer,
int result) {
if (result < 0) {
callback.Run(NetErrorToPlatformFileError(result));
return;
}

buffer->DidConsume(result);
num_copied_bytes_ += result;

// Check the elapsed time since last |file_progress_callback_| invocation.
base::Time now = base::Time::Now();
if (now - last_progress_callback_invocation_time_ >=
min_progress_callback_invocation_span_) {
file_progress_callback_.Run(num_copied_bytes_);
last_progress_callback_invocation_time_ = now;
}

if (buffer->BytesRemaining() > 0) {
Write(callback, buffer);
return;
}

Read(callback);
}

CopyOrMoveOperationDelegate::CopyOrMoveOperationDelegate(
FileSystemContext* file_system_context,
Expand Down Expand Up @@ -359,7 +585,6 @@ void CopyOrMoveOperationDelegate::ProcessFile(
weak_factory_.GetWeakPtr(), src_url));
} else {
// Cross filesystem case.
// TODO(hidehiko): Support stream based copy. crbug.com/279287.
base::PlatformFileError error = base::PLATFORM_FILE_ERROR_FAILED;
CopyOrMoveFileValidatorFactory* validator_factory =
file_system_context()->GetCopyOrMoveFileValidatorFactory(
Expand All @@ -369,11 +594,28 @@ void CopyOrMoveOperationDelegate::ProcessFile(
return;
}

impl = new SnapshotCopyOrMoveImpl(
operation_runner(), operation_type_, src_url, dest_url, option_,
validator_factory,
base::Bind(&CopyOrMoveOperationDelegate::OnCopyFileProgress,
weak_factory_.GetWeakPtr(), src_url));
if (!validator_factory) {
scoped_ptr<webkit_blob::FileStreamReader> reader =
file_system_context()->CreateFileStreamReader(
src_url, 0, base::Time());
scoped_ptr<FileStreamWriter> writer =
file_system_context()->CreateFileStreamWriter(dest_url, 0);
if (reader && writer) {
impl = new StreamCopyOrMoveImpl(
operation_runner(), operation_type_, src_url, dest_url,
reader.Pass(), writer.Pass(),
base::Bind(&CopyOrMoveOperationDelegate::OnCopyFileProgress,
weak_factory_.GetWeakPtr(), src_url));
}
}

if (!impl) {
impl = new SnapshotCopyOrMoveImpl(
operation_runner(), operation_type_, src_url, dest_url, option_,
validator_factory,
base::Bind(&CopyOrMoveOperationDelegate::OnCopyFileProgress,
weak_factory_.GetWeakPtr(), src_url));
}
}

// Register the running task.
Expand Down
45 changes: 45 additions & 0 deletions webkit/browser/fileapi/copy_or_move_operation_delegate.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,23 @@

#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/time/time.h"
#include "webkit/browser/fileapi/recursive_operation_delegate.h"

namespace net {
class DrainableIOBuffer;
class IOBufferWithSize;
}

namespace webkit_blob {
class FileStreamReader;
class ShareableFileReference;
}

namespace fileapi {

class CopyOrMoveFileValidator;
class FileStreamWriter;

// A delegate class for recursive copy or move operations.
class CopyOrMoveOperationDelegate
Expand All @@ -32,6 +40,43 @@ class CopyOrMoveOperationDelegate
OPERATION_MOVE
};

// Helper to copy a file by reader and writer streams.
// Export for testing.
class WEBKIT_STORAGE_BROWSER_EXPORT StreamCopyHelper {
public:
StreamCopyHelper(
scoped_ptr<webkit_blob::FileStreamReader> reader,
scoped_ptr<FileStreamWriter> writer,
int buffer_size,
const FileSystemOperation::CopyFileProgressCallback&
file_progress_callback,
const base::TimeDelta& min_progress_callback_invocation_span);
~StreamCopyHelper();

void Run(const StatusCallback& callback);

private:
// Reads the content from the |reader_|.
void Read(const StatusCallback& callback);
void DidRead(const StatusCallback& callback, int result);

// Writes the content in |buffer| to |writer_|.
void Write(const StatusCallback& callback,
scoped_refptr<net::DrainableIOBuffer> buffer);
void DidWrite(const StatusCallback& callback,
scoped_refptr<net::DrainableIOBuffer> buffer, int result);

scoped_ptr<webkit_blob::FileStreamReader> reader_;
scoped_ptr<FileStreamWriter> writer_;
FileSystemOperation::CopyFileProgressCallback file_progress_callback_;
scoped_refptr<net::IOBufferWithSize> io_buffer_;
int64 num_copied_bytes_;
base::Time last_progress_callback_invocation_time_;
base::TimeDelta min_progress_callback_invocation_span_;
base::WeakPtrFactory<StreamCopyHelper> weak_factory_;
DISALLOW_COPY_AND_ASSIGN(StreamCopyHelper);
};

CopyOrMoveOperationDelegate(
FileSystemContext* file_system_context,
const FileSystemURL& src_root,
Expand Down
Loading

0 comments on commit 3fd8f7a

Please sign in to comment.