Skip to content

Commit

Permalink
SimpleCache: move CRC computations on writes off I/O thread.
Browse files Browse the repository at this point in the history
This has two parts:
1) For stream 0 writes, which are kept in-memory, compute their
   checksum during ::Close.
2) For writes to other streams, have SimpleSynchronousEntry::WriteData
   compute it on the worker thread it uses for disk calls and pass it back
   to I/O thread with its results, much like the computation is done on
   read.

Change-Id: I02ce9e5993d6c8e743e45aaa66ac2caccd617bf8
Reviewed-on: https://chromium-review.googlesource.com/902262
Commit-Queue: Maks Orlovich <morlovich@chromium.org>
Reviewed-by: Julia Tuttle <juliatuttle@chromium.org>
Cr-Commit-Position: refs/heads/master@{#536437}
  • Loading branch information
Maks Orlovich authored and Commit Bot committed Feb 13, 2018
1 parent c4b0575 commit 9d0e8ed
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 66 deletions.
42 changes: 42 additions & 0 deletions net/disk_cache/entry_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4841,6 +4841,48 @@ TEST_F(DiskCacheEntryTest, SimpleCacheLazyStream2CreateFailure) {
entry->Close();
}

TEST_F(DiskCacheEntryTest, SimpleCacheChecksumpScrewUp) {
// Test for a bug that occurred during development of movement of CRC
// computation off I/O thread.
const int kSize = 10;
scoped_refptr<net::IOBuffer> buffer(new net::IOBuffer(kSize));
CacheTestFillBuffer(buffer->data(), kSize, false);

const int kDoubleSize = kSize * 2;
scoped_refptr<net::IOBuffer> big_buffer(new net::IOBuffer(kDoubleSize));
CacheTestFillBuffer(big_buffer->data(), kDoubleSize, false);

SetSimpleCacheMode();
InitCache();

const char kKey[] = "a key";
disk_cache::Entry* entry = nullptr;
ASSERT_THAT(CreateEntry(kKey, &entry), IsOk());

// Write out big_buffer for the double range. Checksum will be set to this.
ASSERT_EQ(kDoubleSize,
WriteData(entry, 1, 0, big_buffer.get(), kDoubleSize, false));

// Reset remembered position to 0 by writing at an earlier non-zero offset.
ASSERT_EQ(1, WriteData(entry, /* stream = */ 1, /* offset = */ 1,
big_buffer.get(), /* len = */ 1, false));

// Now write out the half-range twice. An intermediate revision would
// incorrectly compute checksum as if payload was buffer followed by buffer
// rather than buffer followed by end of big_buffer.
ASSERT_EQ(kSize, WriteData(entry, 1, 0, buffer.get(), kSize, false));
ASSERT_EQ(kSize, WriteData(entry, 1, 0, buffer.get(), kSize, false));
entry->Close();

ASSERT_THAT(OpenEntry(kKey, &entry), IsOk());
scoped_refptr<net::IOBuffer> buffer2(new net::IOBuffer(kSize));
EXPECT_EQ(kSize, ReadData(entry, 1, 0, buffer2.get(), kSize));
EXPECT_EQ(0, memcmp(buffer->data(), buffer2->data(), kSize));
EXPECT_EQ(kSize, ReadData(entry, 1, kSize, buffer2.get(), kSize));
EXPECT_EQ(0, memcmp(big_buffer->data() + kSize, buffer2->data(), kSize));
entry->Close();
}

// TODO(morlovich): There seems to be an imperfection of leak detection, see
// https://crbug.com/811276. Please reenable this test when the bug is fixed.
TEST_F(DiskCacheEntryTest, DISABLED_UseAfterBackendDestruction) {
Expand Down
78 changes: 39 additions & 39 deletions net/disk_cache/simple/simple_entry_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,19 @@ void SimpleEntryImpl::WriteDataInternal(int stream_index,
if (stream_index == 1)
stream_1_prefetch_data_ = nullptr;

AdvanceCrc(buf, offset, buf_len, stream_index);
bool request_update_crc = false;
uint32_t initial_crc = 0;

if (offset < crc32s_end_offset_[stream_index]) {
// If a range for which the crc32 was already computed is rewritten, the
// computation of the crc32 need to start from 0 again.
crc32s_end_offset_[stream_index] = 0;
}

if (crc32s_end_offset_[stream_index] == offset) {
request_update_crc = true;
initial_crc = (offset != 0) ? crc32s_[stream_index] : crc32(0, Z_NULL, 0);
}

// |entry_stat| needs to be initialized before modifying |data_size_|.
std::unique_ptr<SimpleEntryStat> entry_stat(new SimpleEntryStat(
Expand All @@ -1030,6 +1042,9 @@ void SimpleEntryImpl::WriteDataInternal(int stream_index,
GetDataSize(stream_index));
}

std::unique_ptr<SimpleSynchronousEntry::WriteResult> write_result =
std::make_unique<SimpleSynchronousEntry::WriteResult>();

// Since we don't know the correct values for |last_used_| and
// |last_modified_| yet, we make this approximation.
last_used_ = last_modified_ = base::Time::Now();
Expand All @@ -1040,8 +1055,6 @@ void SimpleEntryImpl::WriteDataInternal(int stream_index,
if (stream_index == 1)
have_written_[0] = true;

std::unique_ptr<int> result(new int());

// Retain a reference to |buf| in |reply| instead of |task|, so that we can
// reduce cross thread malloc/free pairs. The cross thread malloc/free pair
// increases the apparent memory usage due to the thread cached free list.
Expand All @@ -1050,12 +1063,14 @@ void SimpleEntryImpl::WriteDataInternal(int stream_index,
// just work.
Closure task = base::Bind(
&SimpleSynchronousEntry::WriteData, base::Unretained(synchronous_entry_),
SimpleSynchronousEntry::WriteRequest(stream_index, offset, buf_len,
truncate, doom_state_ != DOOM_NONE),
base::Unretained(buf), entry_stat.get(), result.get());
Closure reply = base::Bind(&SimpleEntryImpl::WriteOperationComplete, this,
stream_index, callback, base::Passed(&entry_stat),
base::Passed(&result), base::RetainedRef(buf));
SimpleSynchronousEntry::WriteRequest(
stream_index, offset, buf_len, initial_crc, truncate,
doom_state_ != DOOM_NONE, request_update_crc),
base::Unretained(buf), entry_stat.get(), write_result.get());
Closure reply =
base::Bind(&SimpleEntryImpl::WriteOperationComplete, this, stream_index,
callback, base::Passed(&entry_stat),
base::Passed(&write_result), base::RetainedRef(buf));
worker_pool_->PostTaskAndReply(FROM_HERE, task, reply);
}

Expand Down Expand Up @@ -1403,22 +1418,27 @@ void SimpleEntryImpl::WriteOperationComplete(
int stream_index,
const CompletionCallback& completion_callback,
std::unique_ptr<SimpleEntryStat> entry_stat,
std::unique_ptr<int> result,
std::unique_ptr<SimpleSynchronousEntry::WriteResult> write_result,
net::IOBuffer* buf) {
if (*result >= 0)
int result = write_result->result;
if (result >= 0)
RecordWriteResult(cache_type_, WRITE_RESULT_SUCCESS);
else
RecordWriteResult(cache_type_, WRITE_RESULT_SYNC_WRITE_FAILURE);
if (net_log_.IsCapturing()) {
net_log_.AddEvent(net::NetLogEventType::SIMPLE_CACHE_ENTRY_WRITE_END,
CreateNetLogReadWriteCompleteCallback(*result));
CreateNetLogReadWriteCompleteCallback(result));
}

if (*result < 0) {
if (result < 0)
crc32s_end_offset_[stream_index] = 0;

if (result > 0 && write_result->crc_updated) {
crc32s_end_offset_[stream_index] += result;
crc32s_[stream_index] = write_result->updated_crc32;
}

EntryOperationComplete(completion_callback, *entry_stat, *result);
EntryOperationComplete(completion_callback, *entry_stat, result);
}

void SimpleEntryImpl::ReadSparseOperationComplete(
Expand Down Expand Up @@ -1652,36 +1672,16 @@ int SimpleEntryImpl::SetStream0Data(net::IOBuffer* buf,
data_size_[0] = buffer_size;
}
base::Time modification_time = base::Time::Now();
AdvanceCrc(buf, offset, buf_len, 0);

// Reset checksum; SimpleSynchronousEntry::Close will compute it for us,
// and do it off the I/O thread.
crc32s_end_offset_[0] = 0;

UpdateDataFromEntryStat(
SimpleEntryStat(modification_time, modification_time, data_size_,
sparse_data_size_));
RecordWriteResult(cache_type_, WRITE_RESULT_SUCCESS);
return buf_len;
}

void SimpleEntryImpl::AdvanceCrc(net::IOBuffer* buffer,
int offset,
int length,
int stream_index) {
// It is easy to incrementally compute the CRC from [0 .. |offset + buf_len|)
// if |offset == 0| or we have already computed the CRC for [0 .. offset).
// We rely on most write operations being sequential, start to end to compute
// the crc of the data. When we write to an entry and close without having
// done a sequential write, we don't check the CRC on read.
if (offset == 0 || crc32s_end_offset_[stream_index] == offset) {
uint32_t initial_crc =
(offset != 0) ? crc32s_[stream_index] : crc32(0, Z_NULL, 0);
if (length > 0) {
crc32s_[stream_index] =
simple_util::IncrementalCrc32(initial_crc, buffer->data(), length);
}
crc32s_end_offset_[stream_index] = offset + length;
} else if (offset < crc32s_end_offset_[stream_index]) {
// If a range for which the crc32 was already computed is rewritten, the
// computation of the crc32 need to start from 0 again.
crc32s_end_offset_[stream_index] = 0;
}
}

} // namespace disk_cache
18 changes: 6 additions & 12 deletions net/disk_cache/simple/simple_entry_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,12 @@ class NET_EXPORT_PRIVATE SimpleEntryImpl : public Entry,
// |buf| parameter brings back a reference to net::IOBuffer to the original
// thread, so that we can reduce cross thread malloc/free pair.
// See http://crbug.com/708644 for details.
void WriteOperationComplete(int stream_index,
const CompletionCallback& completion_callback,
std::unique_ptr<SimpleEntryStat> entry_stat,
std::unique_ptr<int> result,
net::IOBuffer* buf);
void WriteOperationComplete(
int stream_index,
const CompletionCallback& completion_callback,
std::unique_ptr<SimpleEntryStat> entry_stat,
std::unique_ptr<SimpleSynchronousEntry::WriteResult> result,
net::IOBuffer* buf);

void ReadSparseOperationComplete(
const CompletionCallback& completion_callback,
Expand Down Expand Up @@ -343,13 +344,6 @@ class NET_EXPORT_PRIVATE SimpleEntryImpl : public Entry,
int offset, int buf_len,
bool truncate);

// Updates |crc32s_| and |crc32s_end_offset_| for a write of the data in
// |buffer| on |stream_index|, starting at |offset| and of length |length|.
void AdvanceCrc(net::IOBuffer* buffer,
int offset,
int length,
int stream_index);

// We want all async I/O on entries to complete before recycling the dir.
scoped_refptr<BackendCleanupTracker> cleanup_tracker_;

Expand Down
45 changes: 32 additions & 13 deletions net/disk_cache/simple/simple_synchronous_entry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,17 @@ SimpleSynchronousEntry::ReadRequest::ReadRequest(int index_p,
SimpleSynchronousEntry::WriteRequest::WriteRequest(int index_p,
int offset_p,
int buf_len_p,
uint32_t previous_crc32_p,
bool truncate_p,
bool doomed_p)
bool doomed_p,
bool request_update_crc_p)
: index(index_p),
offset(offset_p),
buf_len(buf_len_p),
previous_crc32(previous_crc32_p),
truncate(truncate_p),
doomed(doomed_p) {}
doomed(doomed_p),
request_update_crc(request_update_crc_p) {}

SimpleSynchronousEntry::SparseRequest::SparseRequest(int64_t sparse_offset_p,
int buf_len_p)
Expand Down Expand Up @@ -429,7 +433,7 @@ void SimpleSynchronousEntry::ReadData(const ReadRequest& in_entry_op,
void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
net::IOBuffer* in_buf,
SimpleEntryStat* out_entry_stat,
int* out_result) {
WriteResult* out_write_result) {
base::ElapsedTimer write_time;
DCHECK(initialized_);
DCHECK_NE(0, in_entry_op.index);
Expand All @@ -440,7 +444,7 @@ void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
SimpleFileTracker::FileHandle file =
file_tracker_->Acquire(this, SubFileForFileIndex(file_index));
if (!file.IsOK() || !CheckHeaderAndKey(file.get(), file_index)) {
*out_result = net::ERR_FAILED;
out_write_result->result = net::ERR_FAILED;
Doom();
return;
}
Expand All @@ -461,21 +465,21 @@ void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
<< in_entry_op.index << " of doomed cache entry.";
RecordWriteResult(cache_type_,
SYNC_WRITE_RESULT_LAZY_STREAM_ENTRY_DOOMED);
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
File::Error error;
if (!MaybeCreateFile(file_index, FILE_REQUIRED, &error)) {
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_LAZY_CREATE_FAILURE);
Doom();
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
CreateEntryResult result;
if (!InitializeCreatedFile(file_index, &result)) {
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_LAZY_INITIALIZE_FAILURE);
Doom();
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
}
Expand All @@ -486,7 +490,7 @@ void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
SimpleFileTracker::FileHandle file =
file_tracker_->Acquire(this, SubFileForFileIndex(file_index));
if (!file.IsOK()) {
*out_result = net::ERR_FAILED;
out_write_result->result = net::ERR_FAILED;
Doom();
return;
}
Expand All @@ -498,15 +502,15 @@ void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
if (!file->SetLength(file_eof_offset)) {
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_PRETRUNCATE_FAILURE);
Doom();
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
}
if (buf_len > 0) {
if (file->Write(file_offset, in_buf->data(), buf_len) != buf_len) {
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_WRITE_FAILURE);
Doom();
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
}
Expand All @@ -520,18 +524,24 @@ void SimpleSynchronousEntry::WriteData(const WriteRequest& in_entry_op,
if (!file->SetLength(file_eof_offset)) {
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_TRUNCATE_FAILURE);
Doom();
*out_result = net::ERR_CACHE_WRITE_FAILURE;
out_write_result->result = net::ERR_CACHE_WRITE_FAILURE;
return;
}
}

if (in_entry_op.request_update_crc && buf_len > 0) {
out_write_result->updated_crc32 = simple_util::IncrementalCrc32(
in_entry_op.previous_crc32, in_buf->data(), buf_len);
out_write_result->crc_updated = true;
}

SIMPLE_CACHE_UMA(TIMES, "DiskWriteLatency", cache_type_,
write_time.Elapsed());
RecordWriteResult(cache_type_, SYNC_WRITE_RESULT_SUCCESS);
base::Time modification_time = Time::Now();
out_entry_stat->set_last_used(modification_time);
out_entry_stat->set_last_modified(modification_time);
*out_result = buf_len;
out_write_result->result = buf_len;
}

void SimpleSynchronousEntry::ReadSparseData(const SparseRequest& in_entry_op,
Expand Down Expand Up @@ -827,7 +837,7 @@ void SimpleSynchronousEntry::Close(
base::ElapsedTimer close_time;
DCHECK(stream_0_data);

for (std::vector<CRCRecord>::const_iterator it = crc32s_to_write->begin();
for (std::vector<CRCRecord>::iterator it = crc32s_to_write->begin();
it != crc32s_to_write->end(); ++it) {
const int stream_index = it->index;
const int file_index = GetFileIndexFromStreamIndex(stream_index);
Expand Down Expand Up @@ -860,6 +870,15 @@ void SimpleSynchronousEntry::Close(
DVLOG(1) << "Could not write stream 0 data.";
Doom();
}

// Re-compute stream 0 CRC if the data got changed (we may be here even
// if it didn't change if stream 0's position on disk got changed due to
// stream 1 write).
if (!it->has_crc32) {
it->data_crc32 =
simple_util::Crc32(stream_0_data->data(), entry_stat.data_size(0));
it->has_crc32 = true;
}
}

SimpleFileEOF eof_record;
Expand Down
15 changes: 13 additions & 2 deletions net/disk_cache/simple/simple_synchronous_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,24 @@ class SimpleSynchronousEntry {
WriteRequest(int index_p,
int offset_p,
int buf_len_p,
uint32_t previous_crc32_p,
bool truncate_p,
bool doomed_p);
bool doomed_p,
bool request_update_crc_p);
int index;
int offset;
int buf_len;
uint32_t previous_crc32;
bool truncate;
bool doomed;
bool request_update_crc;
};

struct WriteResult {
WriteResult() : crc_updated(false) {}
int result;
uint32_t updated_crc32; // only relevant if crc_updated set
bool crc_updated;
};

struct SparseRequest {
Expand Down Expand Up @@ -225,7 +236,7 @@ class SimpleSynchronousEntry {
void WriteData(const WriteRequest& in_entry_op,
net::IOBuffer* in_buf,
SimpleEntryStat* out_entry_stat,
int* out_result);
WriteResult* out_write_result);
int CheckEOFRecord(base::File* file,
int stream_index,
const SimpleEntryStat& entry_stat,
Expand Down

0 comments on commit 9d0e8ed

Please sign in to comment.