Skip to content

Commit

Permalink
SyncPoint to allow a callback with an argument and use it to get DBTe…
Browse files Browse the repository at this point in the history
…st.DynamicLevelCompressionPerLevel2 more straight-forward

Summary:
Allow users to give a callback function with parameter using sync point, so more complicated verification can be done in tests.
Use it in DBTest.DynamicLevelCompressionPerLevel2 so that failures will be more easy to debug.

Test Plan: Run all tests. Run DBTest.DynamicLevelCompressionPerLevel2 with valgrind check.

Reviewers: rven, yhchiang, anthony, kradhakrishnan, igor

Reviewed By: igor

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D36999
  • Loading branch information
siying committed Apr 14, 2015
1 parent 281db8b commit fcb206b
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 64 deletions.
3 changes: 3 additions & 0 deletions db/compaction_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "util/log_buffer.h"
#include "util/statistics.h"
#include "util/string_util.h"
#include "util/sync_point.h"

namespace rocksdb {

Expand Down Expand Up @@ -768,6 +769,8 @@ Compaction* LevelCompactionPicker::PickCompaction(
dummy_compaction_options_fifo);
}

TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c);

return c;
}

Expand Down
80 changes: 39 additions & 41 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4220,7 +4220,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
std::atomic<int> num_compactions_running(0);
std::atomic<bool> has_parallel(false);
rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start",
[&]() {
[&](void* arg) {
if (num_compactions_running.fetch_add(1) > 0) {
has_parallel.store(true);
return;
Expand All @@ -4235,7 +4235,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) {
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():End",
[&]() { num_compactions_running.fetch_add(-1); });
[&](void* arg) { num_compactions_running.fetch_add(-1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

options = CurrentOptions(options);
Expand Down Expand Up @@ -10379,7 +10379,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:TimedWait",
[&]() { sleep_count.fetch_add(1); });
[&](void* arg) { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) {
Expand Down Expand Up @@ -11018,7 +11018,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase2) {
// Hold compaction jobs to make sure
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::Run():Start",
[&]() { env_->SleepForMicroseconds(100000); });
[&](void* arg) { env_->SleepForMicroseconds(100000); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
Expand Down Expand Up @@ -11242,57 +11242,54 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {

DestroyAndReopen(options);
// When base level is L4, L4 is LZ4.
std::atomic<bool> seen_lz4(false);
std::function<void(const CompressionType&, uint64_t)> cb1 =
[&](const CompressionType& ct, uint64_t size) {
ASSERT_TRUE(size <= 30 || ct == kLZ4Compression);
if (ct == kLZ4Compression) {
seen_lz4.store(true);
}
};
mock::MockTableBuilder::finish_cb_ = &cb1;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
if (compaction->output_level() == 4) {
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
}
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
}
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(seen_lz4.load());
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();

ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
ASSERT_EQ(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), 0);
int prev_num_files_l4 = NumTableFilesAtLevel(4);

// After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib
std::atomic<bool> seen_zlib(false);
std::function<void(const CompressionType&, uint64_t)> cb2 =
[&](const CompressionType& ct, uint64_t size) {
ASSERT_TRUE(size <= 30 || ct != kNoCompression);
if (ct == kZlibCompression) {
if (!seen_zlib.load()) {
seen_lz4.store(false);
}
seen_zlib.store(true);
}
// Make sure after making L4 the base level, L4 is LZ4.
if (seen_zlib.load()) {
if (ct == kLZ4Compression && size < 80) {
seen_lz4.store(true);
}
}
};
mock::MockTableBuilder::finish_cb_ = &cb2;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
if (compaction->output_level() == 4 && compaction->start_level() == 3) {
ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression);
} else {
ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression);
}
});

for (int i = 101; i < 500; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
if (i % 100 == 99) {
Flush();
dbfull()->TEST_WaitForCompact();
}
}
ASSERT_TRUE(seen_lz4.load());
ASSERT_TRUE(seen_zlib.load());

rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
ASSERT_EQ(NumTableFilesAtLevel(2), 0);
mock::MockTableBuilder::finish_cb_ = nullptr;
ASSERT_GT(NumTableFilesAtLevel(3), 0);
ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4);
}

TEST_F(DBTest, DynamicCompactionOptions) {
Expand Down Expand Up @@ -11541,8 +11538,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {

std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep",
[&]() { sleep_count.fetch_add(1); });
"DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

// Hard rate limit slow down for 1000 us, so default 10ms should be ok
Expand Down Expand Up @@ -12371,14 +12367,16 @@ TEST_F(DBTest, CompressLevelCompaction) {

rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Compaction::InputCompressionMatchesOutput:Matches",
[&]() { matches++; });
[&](void* arg) { matches++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Compaction::InputCompressionMatchesOutput:DidntMatch",
[&]() { didnt_match++; });
[&](void* arg) { didnt_match++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial", [&]() { non_trivial++; });
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove", [&]() { trivial_move++; });
"DBImpl::BackgroundCompaction:TrivialMove",
[&](void* arg) { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();

Reopen(options);
Expand Down
6 changes: 1 addition & 5 deletions table/mock_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,12 @@ Status MockTableFactory::NewTableReader(
return Status::OK();
}

std::function<void(const CompressionType&, uint64_t)>*
MockTableBuilder::finish_cb_ = nullptr;

TableBuilder* MockTableFactory::NewTableBuilder(
const TableBuilderOptions& table_builder_options,
WritableFile* file) const {
uint32_t id = GetAndWriteNextID(file);

return new MockTableBuilder(id, &file_system_,
table_builder_options.compression_type);
return new MockTableBuilder(id, &file_system_);
}

Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
Expand Down
13 changes: 2 additions & 11 deletions table/mock_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ class MockTableIterator : public Iterator {

class MockTableBuilder : public TableBuilder {
public:
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system,
CompressionType compression_type)
: id_(id),
file_system_(file_system),
compression_type_(compression_type) {}
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system)
: id_(id), file_system_(file_system) {}

// REQUIRES: Either Finish() or Abandon() has been called.
~MockTableBuilder() {}
Expand All @@ -117,9 +114,6 @@ class MockTableBuilder : public TableBuilder {
Status status() const override { return Status::OK(); }

Status Finish() override {
if (finish_cb_ != nullptr) {
(*finish_cb_)(compression_type_, FileSize());
}
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});
return Status::OK();
Expand All @@ -131,13 +125,10 @@ class MockTableBuilder : public TableBuilder {

uint64_t FileSize() const override { return table_.size(); }

static std::function<void(const CompressionType&, uint64_t)>* finish_cb_;

private:
uint32_t id_;
MockTableFileSystem* file_system_;
MockFileContents table_;
CompressionType compression_type_;
};

class MockTableFactory : public TableFactory {
Expand Down
7 changes: 3 additions & 4 deletions util/sync_point.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
}

void SyncPoint::SetCallBack(const std::string point,
std::function<void()> callback) {
std::function<void(void*)> callback) {
std::unique_lock<std::mutex> lock(mutex_);
callbacks_[point] = callback;
}
Expand Down Expand Up @@ -63,7 +63,7 @@ void SyncPoint::ClearTrace() {
cleared_points_.clear();
}

void SyncPoint::Process(const std::string& point) {
void SyncPoint::Process(const std::string& point, void* cb_arg) {
std::unique_lock<std::mutex> lock(mutex_);

if (!enabled_) return;
Expand All @@ -72,7 +72,7 @@ void SyncPoint::Process(const std::string& point) {
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second();
callback_pair->second(cb_arg);
mutex_.lock();
num_callbacks_running_--;
cv_.notify_all();
Expand All @@ -85,6 +85,5 @@ void SyncPoint::Process(const std::string& point) {
cleared_points_.insert(point);
cv_.notify_all();
}

} // namespace rocksdb
#endif // NDEBUG
11 changes: 8 additions & 3 deletions util/sync_point.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#ifdef NDEBUG
#define TEST_SYNC_POINT(x)
#define TEST_SYNC_POINT_CALLBACK(x, y)
#else

namespace rocksdb {
Expand All @@ -39,7 +40,8 @@ class SyncPoint {
void LoadDependency(const std::vector<Dependency>& dependencies);

// Set up a call back function in sync point.
void SetCallBack(const std::string point, std::function<void()> callback);
void SetCallBack(const std::string point,
std::function<void(void*)> callback);
// Clear all call back functions.
void ClearAllCallBacks();

Expand All @@ -54,7 +56,8 @@ class SyncPoint {

// triggered by TEST_SYNC_POINT, blocking execution until all predecessors
// are executed.
void Process(const std::string& point);
// And/or call registered callback functionn, with argument `cb_arg`
void Process(const std::string& point, void* cb_arg = nullptr);

// TODO: it might be useful to provide a function that blocks until all
// sync points are cleared.
Expand All @@ -65,7 +68,7 @@ class SyncPoint {
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void()> > callbacks_;
std::unordered_map<std::string, std::function<void(void*)> > callbacks_;

std::mutex mutex_;
std::condition_variable cv_;
Expand All @@ -84,4 +87,6 @@ class SyncPoint {
// See TransactionLogIteratorRace in db_test.cc for an example use case.
// TEST_SYNC_POINT is no op in release build.
#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x)
#define TEST_SYNC_POINT_CALLBACK(x, y) \
rocksdb::SyncPoint::GetInstance()->Process(x, y)
#endif // NDEBUG

0 comments on commit fcb206b

Please sign in to comment.