diff --git a/db/db_test_util.h b/db/db_test_util.h index 36f3813c971..50109e0a406 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -438,6 +438,12 @@ class SpecialEnv : public EnvWrapper { return s; } + virtual Status Prefetch(uint64_t offset, size_t n) override { + Status s = target_->Prefetch(offset, n); + *bytes_read_ += n; + return s; + } + private: std::unique_ptr target_; anon::AtomicCounter* counter_; diff --git a/db/table_cache.cc b/db/table_cache.cc index 2eb742e24f6..ce045a57993 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -94,7 +94,7 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { Status TableCache::GetTableReader( const EnvOptions& env_options, const InternalKeyComparator& internal_comparator, const FileDescriptor& fd, - bool sequential_mode, size_t readahead, bool record_read_stats, + bool sequential_mode, size_t /*readahead*/, bool record_read_stats, HistogramImpl* file_read_hist, std::unique_ptr* table_reader, const SliceTransform* prefix_extractor, bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, bool for_compaction) { @@ -105,13 +105,6 @@ Status TableCache::GetTableReader( RecordTick(ioptions_.statistics, NO_FILE_OPENS); if (s.ok()) { - if (readahead > 0 && !env_options.use_mmap_reads) { - // Not compatible with mmap files since ReadaheadRandomAccessFile requires - // its wrapped file's Read() to copy data into the provided scratch - // buffer, which mmap files don't use. - // TODO(ajkr): try madvise for mmap files in place of buffered readahead. - file = NewReadaheadRandomAccessFile(std::move(file), readahead); - } if (!sequential_mode && ioptions_.advise_random_on_open) { file->Hint(RandomAccessFile::RANDOM); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4aefbe7c54e..db86d824701 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -2465,32 +2465,32 @@ void BlockBasedTableIterator::InitDataBlock() { } auto* rep = table_->get_rep(); - // Automatically prefetch additional data when a range scan (iterator) does - // more than 2 sequential IOs. This is enabled only for user reads and when - // ReadOptions.readahead_size is 0. - if (!for_compaction_ && read_options_.readahead_size == 0) { - num_file_reads_++; - if (num_file_reads_ > 2) { - if (!rep->file->use_direct_io() && - (data_block_handle.offset() + - static_cast(data_block_handle.size()) + - kBlockTrailerSize > - readahead_limit_)) { - // Buffered I/O - // Discarding the return status of Prefetch calls intentionally, as we - // can fallback to reading from disk if Prefetch fails. - rep->file->Prefetch(data_block_handle.offset(), readahead_size_); - readahead_limit_ = - static_cast(data_block_handle.offset() + readahead_size_); - // Keep exponentially increasing readahead size until - // kMaxReadaheadSize. - readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_ * 2); - } else if (rep->file->use_direct_io() && !prefetch_buffer_) { - // Direct I/O - // Let FilePrefetchBuffer take care of the readahead. - prefetch_buffer_.reset(new FilePrefetchBuffer( - rep->file.get(), kInitReadaheadSize, kMaxReadaheadSize)); - } + // Readahead + num_file_reads_++; + if (enable_readahead_ && + num_file_reads_ > start_readahead_after_num_file_reads_) { + // TODO (svemuri): Ideally I'd like to get rid of this *if block* and let + // the prefetch buffer (i.e. the code in else block) handle everything, + // but it fails as of now when compactions reads are bufferred. + if (!rep->file->use_direct_io() && + (data_block_handle.offset() + + static_cast(data_block_handle.size()) + + kBlockTrailerSize > + readahead_limit_)) { + // Buffered I/O + // Discarding the return status of Prefetch calls intentionally, as we + // can fallback to reading from disk if Prefetch fails. + rep->file->Prefetch(data_block_handle.offset(), readahead_size_); + readahead_limit_ = + static_cast(data_block_handle.offset() + readahead_size_); + // Keep exponentially increasing readahead size until + // kMaxReadaheadSize. + readahead_size_ = std::min(max_readahead_size_, readahead_size_ * 2); + } else if (rep->file->use_direct_io() && !prefetch_buffer_) { + // Direct I/O + // Let FilePrefetchBuffer take care of the readahead. + prefetch_buffer_.reset(new FilePrefetchBuffer( + rep->file.get(), readahead_size_, max_readahead_size_)); } } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index ea8ba62c5d3..bed217d790f 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -603,7 +603,31 @@ class BlockBasedTableIterator : public InternalIteratorBase { is_index_(is_index), key_includes_seq_(key_includes_seq), index_key_is_full_(index_key_is_full), - for_compaction_(for_compaction) {} + for_compaction_(for_compaction) { + if (for_compaction) { + readahead_size_ = table->get_rep()->env_options.compaction_readahead_size; + } else { + readahead_size_ = read_options.readahead_size; + } + + if (readahead_size_ > 0) { + max_readahead_size_ = readahead_size_; + start_readahead_after_num_file_reads_ = 0; + } else { + readahead_size_ = kInitReadaheadSize; + max_readahead_size_ = kMaxReadaheadSize; + start_readahead_after_num_file_reads_ = 2; + } + + if (for_compaction_) { + if (table->get_rep()->env_options.compaction_readahead_size > 0) { + enable_readahead_ = true; + } + } else { + // user reads -- always enable readahead + enable_readahead_ = true; + } + } ~BlockBasedTableIterator() { delete index_iter_; } @@ -722,8 +746,11 @@ class BlockBasedTableIterator : public InternalIteratorBase { // experiments. static const size_t kMaxReadaheadSize; size_t readahead_size_ = kInitReadaheadSize; + size_t max_readahead_size_ = 0; size_t readahead_limit_ = 0; int num_file_reads_ = 0; + bool enable_readahead_ = false; + int start_readahead_after_num_file_reads_ = 0; std::unique_ptr prefetch_buffer_; };