Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Compressor interface #7650

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

lucagiac81
Copy link
Contributor

@lucagiac81 lucagiac81 commented Nov 9, 2020

This PR refactors compression by introducing a Compressor interface.
This is a first step towards supporting compressors as plugins. PR #6717 covers the next step.

Compressor class

The Compressor class defines an interface for each compression algorithm to implement. It is a Customizable class, like other extensible components in RocksDB.

A Compressor has

  • A unique name
  • Compress/Uncompress methods
  • CreateDictionary method (for algorithms supporting dictionaries): the logic to build/train a dictionary is moved here, so future compressors have the option to customize it if necessary
  • Methods to handle processed/digested dictionaries (implemented by zstd, for example)
  • Options: each Compressor can define the options it supports using the Configurable framework

Streaming compression is not included in the Compressor class yet. The plan is to cover that in a separate PR.

Built-in compressors

The existing compression algorithms (referred to as "built-in") are encapsulated in Compressor classes. The classes inherit from BuiltinCompressor, which includes functionality shared by all built-in compressors.
Built-in compressors can be referenced by their numeric id (as defined by the CompressionType enum) to ensure backward compatibility. BuiltinCompressor uses the existing CompressionOptions struct as its configurable options.

Compressor lifecycle

For this PR, compression options exposed in the public API are unchanged (exposing Compressor in the public API and options is the scope of PR #6717).

  • The CompressionType and CompressionOptions passed through ColumnFamilyOptions and DBOptions are used to instantiate suitable Compressor instances (this is done in MutableCFOptions and ImmutableDBOptions).
  • The Compressor class keeps track of the instances currently in use, so they can be retrieved and reused.
  • Such Compressor instances are then used in other parts of the code in place of CompressionType and CompressionOptions (e.g., in BlockBasedTableBuilder, BlockBasedTable, FlushJob, Compaction, BlobFileBuilder...).
  • The details of the Compressor used for a block-based table are serialized in the Properties block.
  • When opening the SST file, the info in the Properties block is used to instantiate/retrieve a suitable Compressor for the table. If the compression id for a block doesn't match the table-level Compressor, a suitable Compressor is obtained when reading the block.

Copy link
Contributor

@mrambacher mrambacher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I like where this is heading. I have some questions on CompressionInfo (whether it is turned upside down enough) and on the CompressorRegistry but I think this is a very good thing to have.

include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/memory_allocator.h Show resolved Hide resolved
util/compression.cc Outdated Show resolved Hide resolved
util/compression.cc Outdated Show resolved Hide resolved
util/compression.cc Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
@lucagiac81
Copy link
Contributor Author

Addressed some of the comments above and rebased branch. There are two additional changes from the previous version.

@lucagiac81 lucagiac81 force-pushed the compressor_interface branch 2 times, most recently from 2982813 to 004b920 Compare March 31, 2022 01:27
@pdillinger
Copy link
Contributor

Keep in mind that for us to accept this, the performance penalty would have to be minimal.

@lucagiac81
Copy link
Contributor Author

Absolutely. Performance is a key requirement for this feature. From the data I've collected so far with db_bench (readrandom and readrandomwriterandom workloads), the performance difference is in general <1% (throughput, p99 latency). I'm collecting more data covering different conditions and additional metrics, so we can have a more accurate picture.
Are there specific conditions you'd recommend for performance testing, or do you have a set of benchmarks you already use to validate the effect of any code changes?

include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
include/rocksdb/compressor.h Outdated Show resolved Hide resolved
" is not linked with the binary.");
} else if (!moptions.compressor_per_level.empty()) {
for (const auto& compressor : moptions.compressor_per_level) {
if (!compressor->Supported()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any possibility that this compressor is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • compressor_per_level is initialized as an empty vector.
  • MutableCFOptions::RefreshDerivedOptions populates compressor_per_level using the content of compression_per_level.
  • Compressors are obtained from BuiltinCompressor::GetCompressor, which could return nullptr in case of an invalid selection in compression_per_level.

I thought an invalid selection in compression_per_level would be caught before reaching this method, but that's incorrect. I added a null check and a test in compression_test (to verify the expected error is returned in case of invalid selection in compression_per_level).

db/db_block_cache_test.cc Outdated Show resolved Hide resolved
db/db_test.cc Outdated
@@ -5010,7 +5010,8 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
if (compaction->output_level() == 4) {
ASSERT_TRUE(compaction->output_compression() == kLZ4Compression);
ASSERT_EQ(compaction->output_compression()->GetCompressionType(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I wonder if output_compression should be renamed to compressor or output_compressor?
  2. Does it make sense to have a compaction->GetCompressionType shortcut method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed output_compression to output_compressor.
I did not add the shortcut method for compression type. I see both GetCompressionType and GetId called on the compressor returned by output_compressor in different places.

db/db_test2.cc Outdated
ASSERT_LE(TestGetTickerCount(options,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
2 * kBlockLen);
2 * kBlockLen + sizeof(UncompressionDict));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to be the sizeof the Uncompression Dictionary itself or the struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a previous version of the code, I added a Compressor* member to UncompressionDict, and these tests started failing due to the size being slightly larger than 2 blocks. With the current code, these changes are actually not needed, and they were removed.

db/flush_job.cc Outdated Show resolved Hide resolved
@pdillinger
Copy link
Contributor

fillseq and readrandom with -readonly should suffice, with low block cache hit rate on data blocks. You can do readrandom on the same DB before-and-after to eliminate LSM structure variances. But I would consider 1% overall ops/sec as a significant regression for Meta to absorb for a feature that Meta doesn't need.

@lucagiac81
Copy link
Contributor Author

I ran fillseq and readrandom benchmarks, comparing the PR against the previous commit on the main branch.
Workload parameters: 16-byte keys, 256-byte values, 4kB blocks, ZSTD/LZ4 compression, db on /dev/shm, no block cache.

The relative ops/s differences (PR vs main, mean of 5 runs) are listed below, although they are not statistically significant:

  • fillseq: +0.07% (ZSTD), -0.1% (LZ4)
  • readrandom: -0.18% (ZSTD), +0.51% (LZ4)

The instruction path length is also verified to be matched within 0.2%.
As we make changes as part of the review, we can verify performance stays matched with the latest code.

@facebook-github-bot
Copy link
Contributor

@mrambacher has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

Comment on lines +177 to +186
virtual Status Compress(const CompressionInfo& info, const Slice& input,
std::string* output) = 0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud, I wonder if the names of the Compress and Uncompress functions should be different. I can see there eventually being (at least) three signatures:

  • This method that takes a block (Slice) and returns a block (output)
  • An aysnchronous method that does something similar, invoking some sort of callback when the Compression is complete
  • A method for the compression of streams

With this in mind, I wonder if these methods should be named "CompressBlock" (or something similar) instead of just Compress

Status Compressor::CreateDictionary(
std::vector<std::string>& data_block_buffers,
std::unique_ptr<CompressionDict>* compression_dict) {
uint32_t max_dict_bytes = GetMaxDictBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert(compression_dict)

and maybe assert that this Compressor supports a Dictionary? Or return an error if it does not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert added for compression_dict (in BlockBasedTableBuilder::EnterUnbuffered, where CreateDictionary is called). Added check for DictCompressionSupported in CreateDictionary. Also added comment for IsDictionaryEnabled to return false if DictCompressionSupported is false.

Comment on lines 295 to 303
if (use_dict_trainer) {
dict = ZSTD_TrainDictionary(compression_dict_samples,
compression_dict_sample_lens, max_dict_bytes);
} else {
dict = ZSTD_FinalizeDictionary(compression_dict_samples,
compression_dict_sample_lens,
max_dict_bytes, level);
}
compression_dict->reset(NewCompressionDict(dict));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is referring to ZSTD methods, this seems like it should be done in a derived (not the base) class, even if multiple compressors might use the same dictionary trainer/finalize methods.

Can we add a virtual method to build the dictionary from the samples?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #6717 already introduced a separate SampleAndTrainDictionary method to make it easier for plugins to override only parts of CreateDictionary (and use the default sampling/training methods inside a customized implementation of CreateDictionary). I now split sampling and training into two virtual methods (SampleDict and TrainDict). TrainDict's default implementation still uses ZSTD_TrainDictionary and ZSTD_FinalizeDictionary, so plugins are able to use those as default.
I also renamed CreateDictionary as CreateDict and IsDictionaryEnabled as IsDictEnabled for consistency with other methods.
Some more tweaks may be needed to finalize the interface.

Comment on lines 313 to 320
uint32_t Compressor::GetMaxDictBytes() const {
uint32_t max_dict_bytes = 0;
#ifndef ROCKSDB_LITE
std::string value;
ConfigOptions config_options;
Status s = GetOption(config_options, "max_dict_bytes", &value);
if (s.ok()) {
max_dict_bytes = static_cast<uint32_t>(ParseUint64(value));
}
#endif // ROCKSDB_LITE
return max_dict_bytes;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not in favor of this mechanism. Is there a reason not to just make these virtual and have the derived class do the right thing?

Also, are dictionaries not supported in LITE mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. All compressors so far override these methods. The methods are now changed to simply return a default value. This also eliminates the different behavior for the LITE version, which doesn't support GetOption.


// Returns a new compression dictionary from the input dict.
// Classes which have a ProcessedDict should override this method.
virtual CompressionDict* NewCompressionDict(const std::string& dict) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has been an effort to not return raw pointers as much in the code. Can you tell me if these can easily be made into std::unique_ptr ? That would satisfy the "not raw" and can help prevent leaks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simple change. CreateDictionary was already placing the raw pointer in a std::unique_ptr. We cannot do the same for UncompressionDict without further changes, as the Create method of BlocklikeTraitsreturns a raw pointer and that's aligned with other specializations.

Comment on lines 2813 to 2815
CompressionInfo info(CompressionDict::GetEmptyDict(),
compress_format_version,
FLAGS_sample_for_compression);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be cleaner as:
CompressionInfo info;
info.sample_for_compression = FLAGS_sample_for_compression
(and eliminate the empty dictionary and format version)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified CompressionDict constructor to take sample_for_compression as parameter with default value. This made the code in db_bench_tool.cc more concise.

Comment on lines 3952 to 3953
CompressionInfo info(CompressionDict::GetEmptyDict(),
compress_format_version, FLAGS_sample_for_compression);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above on cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

Comment on lines 3981 to 3985
CompressionInfo compression_info(CompressionDict::GetEmptyDict(),
compress_format_version,
FLAGS_sample_for_compression);
UncompressionInfo uncompression_info(UncompressionDict::GetEmptyDict(),
compress_format_version);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above on cleanup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above

Comment on lines 1168 to 1170
UncompressionInfo info(
UncompressionDict::GetEmptyDict(),
GetCompressFormatForVersion(kBlockBasedTableVersionFormat));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just UncompressionInfo info; (like CompressionInfo above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. Updated

@@ -191,7 +193,7 @@ std::string RandomName(Random* rnd, const size_t len) {

CompressionType RandomCompressionType(Random* rnd) {
auto ret = static_cast<CompressionType>(rnd->Uniform(6));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that this is your bug, but 6 seems wrong here. The builtin types go from 0x0 to 0x7 (so 8 of them)

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

@facebook-github-bot
Copy link
Contributor

@lucagiac81 has updated the pull request. You must reimport the pull request before landing.

rep_->table_properties->compression_name !=
CompressionTypeToString(kNoCompression);
ConfigOptions config_options;
s = Compressor::CreateFromString(config_options,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that this implementation requires dependency injection (ObjectRegistry) (also known as "spooky action at a distance") to use a custom compressor. Reliable dependency injection can be a pain point for some people integrating RocksDB, e.g. through a shared object file.

Using prefix_extractor as an example, when the SST file uses the same prefix extractor as what is currently configured (for new SST files), dependency injection is not required to use it for reads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compressor::CreateFromString will reuse an existing instance if an equivalent one is already available. The existing instances are tracked by the Compressor class rather than passed to the table like prefix_extractor.

To follow a similar approach as for prefix_extractor, it should be possible to pass a custom compressor instance to RocksDB via options, without adding a factory function to ObjectRegistry. Would this address the concern with dependency injection? I will update PR #6717 (which adds support for custom compressors) to include and test this scenario.

@@ -725,6 +732,14 @@ struct BlockBasedTable::Rep {
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
}

std::shared_ptr<Compressor> GetCompressor(CompressionType compression_type) {
if (compression_type == compressor->GetCompressionType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important to note that only one custom compression type can be used per file. Allowing various custom compressors in a single file would be a substantial schema+code change. Yes someone could add their own layer within the custom compressor, but that would be extra overhead.

We don't currently foresee needing various compressions within a file (except for some blocks not compressed) but we want to be sure everyone is on board with this decision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is a limitation for plugin compressors. For built-in compressors, it's still possible to have different compressors per file. Their CompressionType id is still stored in each block as before. Plugins, on the other hand, all have the same id and rely on the compressor name in the properties block.

If various custom compressors per file are needed, we may be able to handle it by assigning "dynamic" numeric ids to custom compressors and recording them in blocks as usual. I can look into the implementation details if there is interest.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may be able to handle it by assigning "dynamic" numeric ids to custom compressors and recording them in blocks as usual

Right. Might be worth keeping this in mind as a possible future feature, especially for schema purposes. I think the ideal state right now would be to reserve what that would look like in the schema, detect on read if that future feature is in use and provide a good error message.

s = UncompressSerializedBlock(
info, req.result.data() + req_offset, handle.size(), &contents,
footer.format_version(), rep_->ioptions, memory_allocator);
std::shared_ptr<Compressor> compressor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating temporary copies of shared_ptrs on the read "hot path" is a performance hazard.

In the past we have had to re-engineer some new customizable integrations because of excessive shared_ptr ops.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy of shared_ptr is needed here in case the block is compressed by a different compressor than the one specified for the table (in rep_). I'll try to remove the copy at least for the most common case (block compressed with the table compressor).

Copy link
Contributor

@pdillinger pdillinger Feb 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that's an over-simplified interface. There should be immortal (de-)compressors for built-in algorithms. And the needed custom compressors should be known and kept alive by the table reader (shared_ptr). (Are the compressors thread-safe by the way?)

While it might be nice to have a "GetCompressor" that copies a shared_ptr for arbitrary lifetime extension, the one used most would be to get a (de-)compressor reference or pointer that we promise not to use beyond the life of the table reader (which is guaranteed by the table cache to survive pending read operations).

@@ -152,15 +152,16 @@ Status ReadAndParseBlockFromFile(
BlockCreateContext& create_context, bool maybe_compressed,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* memory_allocator, bool for_compaction, bool async_read) {
MemoryAllocator* memory_allocator, bool for_compaction,
const std::shared_ptr<Compressor>& compressor, bool async_read) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider const shared_ptr<X>& parameters to be an anti-pattern. It prevents move semantics where it might be useful, and is non-committal about intention to save a reference. I suggest deciding between const X& and plain shared_ptr<X>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the "mixed message" of passing const shared_ptr&. I'll review all occurrences and verify if they're justified (e.g., if a copy of the shared_ptr is eventually made).

UncompressionDict& operator=(const CompressionDict&) = delete;
};

// Interface for each compression algorithm to implement.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the thread safety expectations?

@pdillinger
Copy link
Contributor

By the way, we are finally very interested in this line of features. Thanks for working on it!

@lucagiac81
Copy link
Contributor Author

By the way, we are finally very interested in this line of features. Thanks for working on it!

That's great, thank you! I'll work on the items above and let's continue the review. I'll also rebase, as I see some new conflicts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants