Skip to content

Commit

Permalink
[HUDI-7236] Allow MIT to change partition path when using global index (
Browse files Browse the repository at this point in the history
apache#10337)

Co-authored-by: Jonathan Vexler <=>
Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
jonvex and nsivabalan committed Sep 19, 2024
1 parent f6a2ef4 commit f71a47f
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.index;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -34,6 +35,7 @@
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
Expand All @@ -45,11 +47,14 @@
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieAvroKeyGeneratorFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -245,6 +250,63 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
.getMergedRecords().iterator());
}

/**
* getExistingRecords will create records with expression payload so we overwrite the config.
* Additionally, we don't want to restore this value because the write will fail later on.
* We also need the keygenerator so we can figure out the partition path after expression payload
* evaluates the merge.
*/
private static Pair<HoodieWriteConfig, Option<BaseKeyGenerator>> getKeygenAndUpdatedWriteConfig(HoodieWriteConfig config, HoodieTableConfig tableConfig) {
if (config.getPayloadClass().equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload")) {
TypedProperties typedProperties = new TypedProperties(config.getProps());
// set the payload class to table's payload class and not expresison payload. this will be used to read the existing records
typedProperties.setProperty(HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
typedProperties.setProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key(), tableConfig.getPayloadClass());
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withProperties(typedProperties).build();
try {
return Pair.of(writeConfig, Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(writeConfig.getProps())));
} catch (IOException e) {
throw new RuntimeException("KeyGenerator must inherit from BaseKeyGenerator to update a records partition path using spark sql merge into", e);
}
}
return Pair.of(config, Option.empty());
}

/**
* Special merge handling for MIT
* We need to wait until after merging before we can add meta fields because
* ExpressionPayload does not allow rewriting
*/
private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecordWithExpressionPayload(
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
HoodieRecordMerger recordMerger,
BaseKeyGenerator keyGenerator) throws IOException {
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(existing, existingSchema,
incoming, writeSchemaWithMetaFields, config.getProps());
if (!mergeResult.isPresent()) {
//the record was deleted
return Option.empty();
}
HoodieRecord<R> result = mergeResult.get().getLeft();
if (result.getData().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(result);
}

//record is inserted or updated
String partitionPath = keyGenerator.getPartitionPath((GenericRecord) result.getData());
HoodieRecord<R> withMeta = result.prependMetaFields(writeSchema, writeSchemaWithMetaFields,
new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(partitionPath), config.getProps());
return Option.of(withMeta.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, config.getProps(), Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema)));

}

/**
* Merge the incoming record with the matching existing record loaded via {@link HoodieMergedReadHandle}. The existing record is the latest version in the table.
*/
Expand All @@ -253,25 +315,31 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(
HoodieRecord<R> existing,
Schema writeSchema,
HoodieWriteConfig config,
HoodieRecordMerger recordMerger) throws IOException {
HoodieRecordMerger recordMerger,
Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
// prepend the hoodie meta fields as the incoming record does not have them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()), config.getProps());
// after prepend the meta fields, convert the record back to the original payload
HoodieRecord incomingWithMetaFields = incomingPrepended
.wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.empty());
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
.merge(existing, existingSchema, incomingWithMetaFields, writeSchemaWithMetaFields, config.getProps());
if (mergeResult.isPresent()) {
// the merged record needs to be converted back to the original payload
HoodieRecord<R> merged = mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
writeSchemaWithMetaFields, config.getProps(), Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema));
return Option.of(merged);
if (expressionPayloadKeygen.isPresent()) {
return mergeIncomingWithExistingRecordWithExpressionPayload(incoming, existing, writeSchema,
existingSchema, writeSchemaWithMetaFields, config, recordMerger, expressionPayloadKeygen.get());
} else {
return Option.empty();
// prepend the hoodie meta fields as the incoming record does not have them
HoodieRecord incomingPrepended = incoming
.prependMetaFields(writeSchema, writeSchemaWithMetaFields, new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()), config.getProps());
// after prepend the meta fields, convert the record back to the original payload
HoodieRecord incomingWithMetaFields = incomingPrepended
.wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.empty());
Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
.merge(existing, existingSchema, incomingWithMetaFields, writeSchemaWithMetaFields, config.getProps());
if (mergeResult.isPresent()) {
// the merged record needs to be converted back to the original payload
HoodieRecord<R> merged = mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
writeSchemaWithMetaFields, config.getProps(), Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema));
return Option.of(merged);
} else {
return Option.empty();
}
}
}

Expand All @@ -280,22 +348,25 @@ private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(
*/
public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
HoodieData<Pair<HoodieRecord<R>, Option<HoodieRecordGlobalLocation>>> incomingRecordsAndLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
Pair<HoodieWriteConfig, Option<BaseKeyGenerator>> keyGeneratorWriteConfigOpt = getKeygenAndUpdatedWriteConfig(config, hoodieTable.getMetaClient().getTableConfig());
HoodieWriteConfig updatedConfig = keyGeneratorWriteConfigOpt.getLeft();
Option<BaseKeyGenerator> expressionPayloadKeygen = keyGeneratorWriteConfigOpt.getRight();
// completely new records
HoodieData<HoodieRecord<R>> taggedNewRecords = incomingRecordsAndLocations.filter(p -> !p.getRight().isPresent()).map(Pair::getLeft);
// the records found in existing base files
HoodieData<HoodieRecord<R>> untaggedUpdatingRecords = incomingRecordsAndLocations.filter(p -> p.getRight().isPresent()).map(Pair::getLeft)
.distinctWithKey(HoodieRecord::getRecordKey, config.getGlobalIndexReconcileParallelism());
.distinctWithKey(HoodieRecord::getRecordKey, updatedConfig.getGlobalIndexReconcileParallelism());
// the tagging partitions and locations
// NOTE: The incoming records may only differ in record position, however, for the purpose of
// merging in case of partition updates, it is safe to ignore the record positions.
HoodieData<Pair<String, String>> globalLocations = incomingRecordsAndLocations
.filter(p -> p.getRight().isPresent())
.map(p -> Pair.of(p.getRight().get().getPartitionPath(), p.getRight().get().getFileId()))
.distinct(config.getGlobalIndexReconcileParallelism());
.distinct(updatedConfig.getGlobalIndexReconcileParallelism());
// merged existing records with current locations being set
HoodieData<HoodieRecord<R>> existingRecords = getExistingRecords(globalLocations, config, hoodieTable);
HoodieData<HoodieRecord<R>> existingRecords = getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), hoodieTable);

final HoodieRecordMerger recordMerger = config.getRecordMerger();
final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
HoodieData<HoodieRecord<R>> taggedUpdatingRecords = untaggedUpdatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
.leftOuterJoin(existingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r)))
.values().flatMap(entry -> {
Expand All @@ -306,24 +377,28 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesIfNeeded(
return Collections.singletonList(incoming).iterator();
}
HoodieRecord<R> existing = existingOpt.get();
Schema writeSchema = new Schema.Parser().parse(config.getWriteSchema());
if (incoming.isDelete(writeSchema, config.getProps())) {
Schema writeSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
if (incoming.isDelete(writeSchema, updatedConfig.getProps())) {
// incoming is a delete: force tag the incoming to the old partition
return Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), existing.getCurrentLocation())).iterator();
}

Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config, recordMerger);
Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(incoming, existing, writeSchema, updatedConfig, recordMerger, expressionPayloadKeygen);
if (!mergedOpt.isPresent()) {
// merge resulted in delete: force tag the incoming to the old partition
return Collections.singletonList(tagRecord(incoming.newInstance(existing.getKey()), existing.getCurrentLocation())).iterator();
}
HoodieRecord<R> merged = mergedOpt.get();
if (merged.getData().equals(HoodieRecord.SENTINEL)) {
//if MIT update and it doesn't match any merge conditions, we omit the record
return Collections.emptyIterator();
}
if (Objects.equals(merged.getPartitionPath(), existing.getPartitionPath())) {
// merged record has the same partition: route the merged result to the current location as an update
return Collections.singletonList(tagRecord(merged, existing.getCurrentLocation())).iterator();
} else {
// merged record has a different partition: issue a delete to the old partition and insert the merged record to the new partition
HoodieRecord<R> deleteRecord = createDeleteRecord(config, existing.getKey());
HoodieRecord<R> deleteRecord = createDeleteRecord(updatedConfig, existing.getKey());
deleteRecord.setIgnoreIndexUpdate(true);
return Arrays.asList(tagRecord(deleteRecord, existing.getCurrentLocation()), merged).iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
public class HoodieMergedReadHandle<T, I, K, O> extends HoodieReadHandle<T, I, K, O> {

protected final Schema readerSchema;
protected final Schema baseFileReaderSchema;
private final Option<FileSlice> fileSliceOpt;

public HoodieMergedReadHandle(HoodieWriteConfig config,
Expand All @@ -68,6 +69,8 @@ public HoodieMergedReadHandle(HoodieWriteConfig config,
Option<FileSlice> fileSliceOption) {
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice();
}

Expand Down Expand Up @@ -153,7 +156,7 @@ private List<HoodieRecord<T>> doMergedRead(Option<HoodieFileReader> baseFileRead
if (baseFileReaderOpt.isPresent()) {
HoodieFileReader baseFileReader = baseFileReaderOpt.get();
HoodieRecordMerger recordMerger = config.getRecordMerger();
ClosableIterator<HoodieRecord<T>> baseFileItr = baseFileReader.getRecordIterator(readerSchema);
ClosableIterator<HoodieRecord<T>> baseFileItr = baseFileReader.getRecordIterator(baseFileReaderSchema);
HoodieTableConfig tableConfig = hoodieTable.getMetaClient().getTableConfig();
Option<Pair<String, String>> simpleKeyGenFieldsOpt =
tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
try {
Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema());
if (useWriterSchema) {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
Expand Down
Loading

0 comments on commit f71a47f

Please sign in to comment.