Skip to content

Commit

Permalink
[MINOR] Update RFC-46 doc (apache#7050)
Browse files Browse the repository at this point in the history
* [MINOR] update rfc46 doc

* update

* Update rfc-46.md

Co-authored-by: wangzixuan.wzxuan <wangzixuan.wzxuan@bytedance.com>
  • Loading branch information
wzx140 and wangzixuan.wzxuan committed Nov 10, 2022
1 parent 49c5fcb commit a06b1c0
Showing 1 changed file with 146 additions and 22 deletions.
168 changes: 146 additions & 22 deletions rfc/rfc-46/rfc-46.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ when dealing with records (during merge, column value extractions, writing into

While having a single format of the record representation is certainly making implementation of some components simpler,
it bears unavoidable performance penalty of de-/serialization loop: every record handled by Hudi has to be converted
from (low-level) engine-specific representation (`Row` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate
from (low-level) engine-specific representation (`InternalRow` for Spark, `RowData` for Flink, `ArrayWritable` for Hive) into intermediate
one (Avro), with some operations (like clustering, compaction) potentially incurring this penalty multiple times (on read-
and write-paths).

Expand Down Expand Up @@ -74,49 +74,126 @@ Following (high-level) steps are proposed:
2. Split into interface and engine-specific implementations (holding internal engine-specific representation of the payload)
3. Implementing new standardized record-level APIs (like `getPartitionKey` , `getRecordKey`, etc)
4. Staying **internal** component, that will **NOT** contain any user-defined semantic (like merging)
2. Extract Record Combining (Merge) API from `HoodieRecordPayload` into a standalone, stateless component (engine). Such component will be
2. Extract Record Merge API from `HoodieRecordPayload` into a standalone, stateless component. Such component will be
1. Abstracted as stateless object providing API to combine records (according to predefined semantics) for engines (Spark, Flink) of interest
2. Plug-in point for user-defined combination semantics
3. Gradually deprecate, phase-out and eventually remove `HoodieRecordPayload` abstraction

Phasing out usage of `HoodieRecordPayload` will also bring the benefit of avoiding to use Java reflection in the hot-path, which
is known to have poor performance (compared to non-reflection based instantiation).

#### Combine API Engine
#### Record Merge API

Stateless component interface providing for API Combining Records will look like following:
CombineAndGetUpdateValue and Precombine will converge to one API. Stateless component interface providing for API Combining Records will look like following:

```java
interface HoodieRecordCombiningEngine {

default HoodieRecord precombine(HoodieRecord older, HoodieRecord newer) {
if (spark) {
precombineSpark((SparkHoodieRecord) older, (SparkHoodieRecord) newer);
} else if (flink) {
// precombine for Flink
}
}
interface HoodieRecordMerger {

/**
* Spark-specific implementation
* The kind of merging strategy this recordMerger belongs to. A UUID represents merging strategy.
*/
SparkHoodieRecord precombineSpark(SparkHoodieRecord older, SparkHoodieRecord newer);
String getMergingStrategyId();

// ...
// This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
// It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C of the single record, both orders of operations applications have to yield the same result)
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException;

// The record type handled by the current merger
// SPARK, AVRO, FLINK
HoodieRecordType getRecordType();
}

/**
* Spark-specific implementation
*/
class HoodieSparkRecordMerger implements HoodieRecordMerger {

@Override
public String getMergingStrategyId() {
return LATEST_RECORD_MERGING_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// Implements particular merging semantic natively for Spark row representation encapsulated wrapped around in HoodieSparkRecord.
}

@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.SPARK;
}
}

/**
* Flink-specific implementation
*/
class HoodieFlinkRecordMerger implements HoodieRecordMerger {

@Override
public String getMergingStrategyId() {
return LATEST_RECORD_MERGING_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// Implements particular merging semantic natively for Flink row representation encapsulated wrapped around in HoodieFlinkRecord.
}

@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.FLINK;
}
}
```
Where user can provide their own subclass implementing such interface for the engines of interest.

#### Migration from `HoodieRecordPayload` to `HoodieRecordCombiningEngine`
### Merging Strategy
The RecordMerger is engine-aware. We provide a config called HoodieWriteConfig.MERGER_IMPLS. You can set a list of RecordMerger class name to it. And you can set HoodieWriteConfig.MERGER_STRATEGY which is UUID of RecordMerger. Hudi will pick RecordMergers in MERGER_IMPLS which has the same MERGER_STRATEGY according to the engine type at runtime.
- Every RecordMerger implementation being engine-specific (referred to as "implementation"), implements particular merging semantic (referred to as "merging strategy")
- Such tiering allowing us to be flexible in terms of providing implementations for the merging strategy only for engines you might be interested in
- Merging strategy is a table property that is set once during init
- Merging implementations could be configured for each write individually

#### Migration from `HoodieRecordPayload` to `HoodieRecordMerger`

To warrant backward-compatibility (BWC) on the code-level with already created subclasses of `HoodieRecordPayload` currently
already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordCombiningEngine`, that will
already used in production by Hudi users, we will provide a BWC-bridge in the form of instance of `HoodieRecordMerger` called `HoodieAvroRecordMerger`, that will
be using user-defined subclass of `HoodieRecordPayload` to combine the records.

Leveraging such bridge will make provide for seamless BWC migration to the 0.11 release, however will be removing the performance
Leveraging such bridge will provide for seamless BWC migration to the 0.11 release, however will be removing the performance
benefit of this refactoring, since it would unavoidably have to perform conversion to intermediate representation (Avro). To realize
full-suite of benefits of this refactoring, users will have to migrate their merging logic out of `HoodieRecordPayload` subclass and into
new `HoodieRecordCombiningEngine` implementation.
new `HoodieRecordMerger` implementation.

Previously, we used to have separate methods for merging:

- `preCombine` was used to either deduplicate records in a batch or merge ones coming from delta-logs, while
- `combineAndGetUpdateValue` was used to combine incoming record w/ the one persisted in storage

Now both of these methods semantics are unified in a single `merge` API w/in the `RecordMerger`, which is required to be _associative_ operation to be able to take on semantics of both `preCombine` and `combineAndGetUpdateValue`. `HoodieAvroRecordMerger`'s API will look like following:

```java
/**
* Backward compatibility HoodieRecordPayload implementation
*/
class HoodieAvroRecordMerger implements HoodieRecordMerger {

@Override
public String getMergingStrategyId() {
return LATEST_RECORD_MERGING_STRATEGY;
}

@Override
Option<HoodieRecord> merge(HoodieRecord older, HoodieRecord newer, Schema schema, Properties props) throws IOException {
// HoodieAvroRecordMerger precombine and combineAndGetUpdateValue. It'd be associative operation.
}

@Override
HoodieRecordType getRecordType() {
return HoodieRecordType.AVRO;
}
}
```

### Refactoring Flows Directly Interacting w/ Records:

Expand All @@ -128,21 +205,68 @@ Following major components will be refactored:

1. `HoodieWriteHandle`s will be
1. Accepting `HoodieRecord` instead of raw Avro payload (avoiding Avro conversion)
2. Using Combining API engine to merge records (when necessary)
2. Using Record Merge API to merge records (when necessary)
3. Passes `HoodieRecord` as is to `FileWriter`
2. `HoodieFileWriter`s will be
1. Accepting `HoodieRecord`
2. Will be engine-specific (so that they're able to handle internal record representation)
3. `HoodieRealtimeRecordReader`s
1. API will be returning opaque `HoodieRecord` instead of raw Avro payload

### Public Api in HoodieRecord
Because we implement different types of records, we need to implement functionality similar to AvroUtils in HoodieRecord for different engine-specific payload representations (GenericRecord, InternalRow, RowData).
Its public API will look like following:

```java
class HoodieRecord {

/**
* Get columns in record.
*/
Object[] getColumnValues(Schema recordSchema, String[] columns,
boolean consistentLogicalTimestampEnabled);

Comparable<?> getOrderingValue(Schema recordSchema, Properties props);

/**
* Support bootstrap.
*/
HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) throws IOException;

/**
* Rewrite record into new schema(add meta columns)
*/
HoodieRecord rewriteRecord(Schema recordSchema, Properties props, Schema targetSchema)
throws IOException;

/**
* Support schema evolution.
*/
HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema,
Map<String, String> renameCols) throws IOException;

HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, Properties props, Schema newSchema) throws IOException;

HoodieRecord updateMetadataValues(Schema recordSchema, Properties props,
MetadataValues metadataValues) throws IOException;

boolean isDelete(Schema recordSchema, Properties props) throws IOException;

/**
* Is EmptyRecord. Generated by ExpressionPayload.
*/
boolean shouldIgnore(Schema recordSchema, Properties props) throws IOException;

// Other functions with getter or setter ...
}
```

## Rollout/Adoption Plan

- What impact (if any) will there be on existing users?
- Users of the Hudi will observe considerably better performance for most of the routine operations: writing, reading, compaction, clustering, etc due to avoiding the superfluous intermediate de-/serialization penalty
- By default, modified hierarchy would still leverage
- Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieRecordCombiningEngine` to get full-suite of performance benefits
- Users will need to rebase their logic of combining records by creating a subclass of `HoodieRecordPayload`, and instead subclass newly created interface `HoodieRecordMerger` to get full-suite of performance benefits
- If we are changing behavior how will we phase out the older behavior?
- Older behavior leveraging `HoodieRecordPayload` for merging will be marked as deprecated in 0.11, and subsequently removed in 0.1x
- If we need special migration tools, describe them here.
Expand Down

0 comments on commit a06b1c0

Please sign in to comment.