From 672dda9b3b76e6390d0239bff59aa4aa88fb19cc Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 28 May 2015 13:21:39 -0700 Subject: [PATCH] [API Change] Move listeners from ColumnFamilyOptions to DBOptions Summary: Move listeners from ColumnFamilyOptions to DBOptions Test Plan: listener_test compact_files_test Reviewers: rven, anthony, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D39087 --- HISTORY.md | 1 + db/column_family.cc | 45 --------------------------------------- db/column_family.h | 7 ------ db/db_impl.cc | 44 +++++++++++++++++++++++++++++++------- include/rocksdb/options.h | 12 +++++------ util/options.cc | 22 +++++++------------ 6 files changed, 51 insertions(+), 80 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 7a5416459e2..e6d04d964e9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,7 @@ ## Public API changes * DB::GetDbIdentity() is now a const function. If this function is overridden in your application, be sure to also make GetDbIdentity() const to avoid compile error. +* Move listeners from ColumnFamilyOptions to DBOptions. ## 3.11.0 (5/19/2015) diff --git a/db/column_family.cc b/db/column_family.cc index 55847d1675d..19c0e1b70ac 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -634,51 +634,6 @@ bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { return false; } -void ColumnFamilyData::NotifyOnCompactionCompleted( - DB* db, Compaction* c, const Status& status) { -#ifndef ROCKSDB_LITE - auto listeners = ioptions()->listeners; - assert(listeners.size() > 0U); - CompactionJobInfo info; - info.cf_name = c->column_family_data()->GetName(); - info.status = status; - info.output_level = c->output_level(); - for (size_t i = 0; i < c->num_input_levels(); ++i) { - for (const auto fmd : *c->inputs(i)) { - info.input_files.push_back( - TableFileName(options_.db_paths, - fmd->fd.GetNumber(), - fmd->fd.GetPathId())); - } - } - for (const auto newf : c->edit()->GetNewFiles()) { - info.output_files.push_back( - TableFileName(options_.db_paths, - newf.second.fd.GetNumber(), - newf.second.fd.GetPathId())); - } - for (auto listener : listeners) { - listener->OnCompactionCompleted(db, info); - } -#endif // ROCKSDB_LITE -} - -void ColumnFamilyData::NotifyOnFlushCompleted( - DB* db, const std::string& file_path, - bool triggered_flush_slowdown, - bool triggered_flush_stop) { - -#ifndef ROCKSDB_LITE - auto listeners = ioptions()->listeners; - for (auto listener : listeners) { - listener->OnFlushCompleted( - db, GetName(), file_path, - // Use path 0 as fulled memtables are first flushed into path 0. - triggered_flush_slowdown, triggered_flush_stop); - } -#endif // ROCKSDB_LITE -} - SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion, InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); diff --git a/db/column_family.h b/db/column_family.h index 77af5c7aaeb..58c777d5de3 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -292,13 +292,6 @@ class ColumnFamilyData { void ResetThreadLocalSuperVersions(); - void NotifyOnCompactionCompleted(DB* db, Compaction* c, const Status& status); - - void NotifyOnFlushCompleted( - DB* db, const std::string& file_path, - bool triggered_flush_slowdown, - bool triggered_flush_stop); - // Protected by DB mutex void set_pending_flush(bool value) { pending_flush_ = value; } void set_pending_compaction(bool value) { pending_compaction_ = value; } diff --git a/db/db_impl.cc b/db/db_impl.cc index 5c3df311be9..9e37ae341b5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1255,7 +1255,7 @@ void DBImpl::NotifyOnFlushCompleted( ColumnFamilyData* cfd, uint64_t file_number, const MutableCFOptions& mutable_cf_options) { #ifndef ROCKSDB_LITE - if (cfd->ioptions()->listeners.size() == 0U) { + if (db_options_.listeners.size() == 0U) { return; } mutex_.AssertHeld(); @@ -1271,11 +1271,17 @@ void DBImpl::NotifyOnFlushCompleted( notifying_events_++; // release lock while notifying events mutex_.Unlock(); - // TODO(yhchiang): make db_paths dynamic. - cfd->NotifyOnFlushCompleted( - this, MakeTableFileName(db_options_.db_paths[0].path, file_number), - triggered_flush_slowdown, - triggered_flush_stop); + { + // TODO(yhchiang): make db_paths dynamic. + auto file_path = MakeTableFileName(db_options_.db_paths[0].path, + file_number); + for (auto listener : db_options_.listeners) { + listener->OnFlushCompleted( + this, cfd->GetName(), file_path, + // Use path 0 as fulled memtables are first flushed into path 0. + triggered_flush_slowdown, triggered_flush_stop); + } + } mutex_.Lock(); notifying_events_--; assert(notifying_events_ >= 0); @@ -1540,7 +1546,7 @@ Status DBImpl::CompactFilesImpl( void DBImpl::NotifyOnCompactionCompleted( ColumnFamilyData* cfd, Compaction *c, const Status &st) { #ifndef ROCKSDB_LITE - if (cfd->ioptions()->listeners.size() == 0U) { + if (db_options_.listeners.size() == 0U) { return; } mutex_.AssertHeld(); @@ -1550,7 +1556,29 @@ void DBImpl::NotifyOnCompactionCompleted( notifying_events_++; // release lock while notifying events mutex_.Unlock(); - cfd->NotifyOnCompactionCompleted(this, c, st); + { + CompactionJobInfo info; + info.cf_name = cfd->GetName(); + info.status = st; + info.output_level = c->output_level(); + for (size_t i = 0; i < c->num_input_levels(); ++i) { + for (const auto fmd : *c->inputs(i)) { + info.input_files.push_back( + TableFileName(db_options_.db_paths, + fmd->fd.GetNumber(), + fmd->fd.GetPathId())); + } + } + for (const auto newf : c->edit()->GetNewFiles()) { + info.output_files.push_back( + TableFileName(db_options_.db_paths, + newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); + } + for (auto listener : db_options_.listeners) { + listener->OnCompactionCompleted(this, info); + } + } mutex_.Lock(); notifying_events_--; assert(notifying_events_ >= 0); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0090aa8afd2..fdb4fed0d4e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -709,12 +709,6 @@ struct ColumnFamilyOptions { // Default: false bool paranoid_file_checks; -#ifndef ROCKSDB_LITE - // A vector of EventListeners which call-back functions will be called - // when specific RocksDB event happens. - std::vector> listeners; -#endif // ROCKSDB_LITE - // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options @@ -1009,6 +1003,12 @@ struct DBOptions { // Default: 0, turned off uint64_t wal_bytes_per_sync; +#ifndef ROCKSDB_LITE + // A vector of EventListeners which call-back functions will be called + // when specific RocksDB event happens. + std::vector> listeners; +#endif // ROCKSDB_LITE + // If true, then the status of the threads involved in this DB will // be tracked and available via GetThreadList() API. // diff --git a/util/options.cc b/util/options.cc index 404140e2552..a4e93f48938 100644 --- a/util/options.cc +++ b/util/options.cc @@ -129,13 +129,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() max_successive_merges(0), min_partial_merge_operands(2), optimize_filters_for_hits(false), - paranoid_file_checks(false) -#ifndef ROCKSDB_LITE - , - listeners() { -#else // ROCKSDB_LITE -{ -#endif // ROCKSDB_LITE + paranoid_file_checks(false) { assert(memtable_factory.get() != nullptr); } @@ -199,13 +193,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) max_successive_merges(options.max_successive_merges), min_partial_merge_operands(options.min_partial_merge_operands), optimize_filters_for_hits(options.optimize_filters_for_hits), - paranoid_file_checks(options.paranoid_file_checks) -#ifndef ROCKSDB_LITE - , - listeners(options.listeners) { -#else // ROCKSDB_LITE -{ -#endif // ROCKSDB_LITE + paranoid_file_checks(options.paranoid_file_checks) { assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < static_cast(num_levels)) { @@ -256,6 +244,9 @@ DBOptions::DBOptions() use_adaptive_mutex(false), bytes_per_sync(0), wal_bytes_per_sync(0), +#ifndef ROCKSDB_LITE + listeners(), +#endif enable_thread_tracking(false) { } @@ -300,6 +291,9 @@ DBOptions::DBOptions(const Options& options) use_adaptive_mutex(options.use_adaptive_mutex), bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), +#ifndef ROCKSDB_LITE + listeners(options.listeners), +#endif enable_thread_tracking(options.enable_thread_tracking) {} static const char* const access_hints[] = {