Skip to content

Commit

Permalink
Fix the streaming unsafe escape flag for non-additive schema changes.
Browse files Browse the repository at this point in the history
GitOrigin-RevId: 9dfe9417689563810fa63fd10c035937ea8c6d5b
  • Loading branch information
jackierwzhang authored and vkorukanti committed Apr 20, 2023
1 parent 8dfa534 commit 30fa2c5
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 18 deletions.
5 changes: 3 additions & 2 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1566,16 +1566,17 @@
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"For further information and possible next steps to resolve this issue, please review the documentation at <docLink>",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>."
],
"sqlState" : "42KD4"
},
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE_USE_SCHEMA_LOG" : {
"message" : [
"Streaming read is not supported on tables with read-incompatible schema changes (e.g. rename or drop or datatype changes).",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>.",
"Please provide a 'schemaTrackingLocation' to enable non-additive schema evolution for Delta stream processing.",
"See <docLink> for more details."
"See <docLink> for more details.",
"Read schema: <readSchema>. Incompatible data schema: <incompatibleSchema>."
],
"sqlState" : "42KD4"
},
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ trait DocsPath {
"icebergClassMissing",
"tableFeatureReadRequiresWriteException",
"tableFeatureRequiresHigherReaderProtocolVersion",
"tableFeatureRequiresHigherWriterProtocolVersion"
"tableFeatureRequiresHigherWriterProtocolVersion",
"blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges"
)
}

Expand Down Expand Up @@ -2557,12 +2558,13 @@ trait DeltaErrorsBase
readSchema: StructType,
incompatibleSchema: StructType,
detectedDuringStreaming: Boolean): Throwable = {
val docLink = "/versioning.html#column-mapping"
val enableNonAdditiveSchemaEvolution = spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_STREAMING_ENABLE_NON_ADDITIVE_SCHEMA_EVOLUTION)
new DeltaStreamingColumnMappingSchemaIncompatibleException(
readSchema,
incompatibleSchema,
"",
generateDocsLink(spark.sparkContext.getConf, docLink),
enableNonAdditiveSchemaEvolution,
additionalProperties = Map(
"detectedDuringStreaming" -> detectedDuringStreaming.toString
Expand Down Expand Up @@ -3109,7 +3111,7 @@ class DeltaStreamingColumnMappingSchemaIncompatibleException(
"DELTA_STREAMING_INCOMPATIBLE_SCHEMA_CHANGE"
},
messageParameters = Array(
docLink,
readSchema.json,
incompatibleSchema.json,
docLink)
incompatibleSchema.json)
)
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ object SchemaUtils extends DeltaLogging {
* As the Delta snapshots update, the schema may change as well. This method defines whether the
* new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our
* rules are to return false if:
* - Dropping any column that was present in the existing schema
* - Dropping any column that was present in the existing schema, if not allowMissingColumns
* - Any change of datatype
* - If `forbidTightenNullability` = true:
* - Forbids tightening the nullability (existing nullable=true -> read nullable=false)
Expand All @@ -249,7 +249,8 @@ object SchemaUtils extends DeltaLogging {
def isReadCompatible(
existingSchema: StructType,
readSchema: StructType,
forbidTightenNullability: Boolean = false): Boolean = {
forbidTightenNullability: Boolean = false,
allowMissingColumns: Boolean = false): Boolean = {

def isNullabilityCompatible(existingNullable: Boolean, readNullable: Boolean): Boolean = {
if (forbidTightenNullability) {
Expand Down Expand Up @@ -287,7 +288,7 @@ object SchemaUtils extends DeltaLogging {
"Delta tables don't allow field names that only differ by case")
// scalastyle:on caselocale

if (!existingFieldNames.subsetOf(newFields)) {
if (!allowMissingColumns && !existingFieldNames.subsetOf(newFields)) {
// Dropped a column that was present in the DataFrame schema
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,18 @@ trait DeltaSourceBase extends Source
protected lazy val forceEnableUnsafeReadOnNullabilityChange =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STREAM_UNSAFE_READ_ON_NULLABILITY_CHANGE)

/**
* Whether we are streaming from a table with column mapping enabled
*/
protected val isStreamingFromColumnMappingTable: Boolean =
snapshotAtSourceInit.metadata.columnMappingMode != NoMapping

/**
* Whether we should explicitly verify column mapping related schema changes such as rename or
* drop columns.
*/
protected lazy val shouldVerifyColumnMappingSchemaChanges =
snapshotAtSourceInit.metadata.columnMappingMode != NoMapping &&
!forceEnableStreamingReadOnColumnMappingSchemaChanges
isStreamingFromColumnMappingTable && !forceEnableStreamingReadOnColumnMappingSchemaChanges

/**
* The persisted schema from the schema log that must be used to read data files in this Delta
Expand Down Expand Up @@ -531,7 +536,20 @@ trait DeltaSourceBase extends Source
// because we don't ever want to read back any nulls when the read schema is non-nullable.
val shouldForbidTightenNullability = !forceEnableUnsafeReadOnNullabilityChange
if (!SchemaUtils.isReadCompatible(
schemaChange, schema, forbidTightenNullability = shouldForbidTightenNullability)) {
schemaChange, schema,
forbidTightenNullability = shouldForbidTightenNullability,
// If a user is streaming from a column mapping table and enable the unsafe flag to ignore
// column mapping schema changes, we can allow the standard check to allow missing columns
// from the read schema in the schema change, because the only case that happens is when
// user rename/drops column but they don't care so they enabled the flag to unblock.
// This is only allowed when we are "backfilling", i.e. the stream progress is older than
// the analyzed table version. Any schema change past the analysis should still throw
// exception, because additive schema changes MUST be taken into account.
allowMissingColumns =
isStreamingFromColumnMappingTable &&
forceEnableStreamingReadOnColumnMappingSchemaChanges &&
backfilling
)) {
// Only schema change later than the current read snapshot/schema can be retried, in other
// words, backfills could never be retryable, because we have no way to refresh
// the latest schema to "catch up" when the schema change happens before than current read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,13 @@ trait DeltaErrorsSuiteBase
DeltaErrors.tableFeatureRequiresHigherReaderProtocolVersion(
feature = "feature",
currentVersion = 1,
requiredVersion = 7)
requiredVersion = 7),
"blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges" ->
DeltaErrors.blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges(
spark,
StructType.fromDDL("id int"),
StructType.fromDDL("id2 int"),
detectedDuringStreaming = true)
)

def otherMessagesToTest: Map[String, String] = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@ import org.apache.spark.sql.delta.sources.{DeltaSource, DeltaSQLConf}
import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingExecutionRelation}
import org.apache.spark.sql.streaming.{DataStreamReader, StreamingQuery, StreamTest}
import org.apache.spark.sql.streaming.{DataStreamReader, StreamTest}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -214,11 +215,11 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
writeDeltaData(0 until 5, deltaLog, Some(StructType.fromDDL("id string, name string")))
}

def df: DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath))
def createNewDf(): DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath))

val checkpointDir = new File(inputDir, "_checkpoint")

testStream(df)(
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
ProcessAllAvailable(),
CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*),
Expand Down Expand Up @@ -247,7 +248,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
)

// but should not block after restarting, now in column mapping mode
testStream(df)(
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
ProcessAllAvailable(),
// Sink is reinitialized, only 10-15 are ingested
Expand All @@ -258,7 +259,7 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
// use a different checkpoint to simulate a clean stream restart
val checkpointDir2 = new File(inputDir, "_checkpoint2")

testStream(df)(
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir2.getCanonicalPath),
ProcessAllAvailable(),
// Since the latest schema contain the additional column, it is null for previous batches.
Expand Down Expand Up @@ -630,6 +631,83 @@ trait ColumnMappingStreamingBlockedWorkflowSuiteBase extends ColumnMappingStream
)
}
}

test("unsafe flag can unblock drop or rename column") {
// upgrade should not blocked both during the stream AND during stream restart
withTempDir { inputDir =>
Seq(
s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` DROP COLUMN value",
s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` RENAME COLUMN value TO value2"
).foreach { schemaChangeQuery =>
FileUtils.deleteDirectory(inputDir)
val deltaLog = DeltaLog.forTable(spark, new Path(inputDir.toURI))
withColumnMappingConf("none") {
writeDeltaData(0 until 5, deltaLog,
Some(StructType.fromDDL("id string, value string")))
}

def createNewDf(): DataFrame = dropCDCFields(dsr.load(inputDir.getCanonicalPath))

val checkpointDir = new File(inputDir, s"_checkpoint_${schemaChangeQuery.hashCode}")
val isRename = schemaChangeQuery.contains("RENAME")
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
ProcessAllAvailable(),
CheckAnswer((0 until 5).map(i => (i.toString, i.toString)): _*),
Execute { _ =>
sql(
s"""
|ALTER TABLE delta.`${inputDir.getCanonicalPath}`
|SET TBLPROPERTIES (
| ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name",
| ${DeltaConfigs.MIN_READER_VERSION.key} = "2",
| ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin)
// Add another schema change to ensure even after enable the flag, we would still hit
// a schema change with more columns than read schema so `verifySchemaChange` would see
// that can complain.
sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (random STRING)")
sql(schemaChangeQuery)
writeDeltaData(5 until 10, deltaLog)
},
ProcessAllAvailableIgnoreError,
ExistingRetryableInStreamSchemaChangeFailure
)

// Without the flag it would still fail
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
ProcessAllAvailableIgnoreError,
CheckAnswer(Nil: _*),
ExpectStreamStartInCompatibleSchemaFailure
)

val checkExpectedResult = if (isRename) {
CheckAnswer((5 until 10).map(i => (i.toString, i.toString, i.toString)): _*)
} else {
CheckAnswer((5 until 10).map(i => (i.toString, i.toString)): _*)
}

withSQLConf(DeltaSQLConf
.DELTA_STREAMING_UNSAFE_READ_ON_INCOMPATIBLE_COLUMN_MAPPING_SCHEMA_CHANGES
.key -> "true") {
testStream(createNewDf())(
StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
// The processing will pass, ignoring any schema column missing in the backfill.
ProcessAllAvailable(),
// Show up as dropped column
checkExpectedResult,
Execute { _ =>
// But any schema change post the stream analysis would still cause exceptions
// as usual, which is critical to avoid data loss.
sql(s"ALTER TABLE delta.`${inputDir.getCanonicalPath}` ADD COLUMN (random2 STRING)")
},
ProcessAllAvailableIgnoreError,
ExistingRetryableInStreamSchemaChangeFailure
)
}
}
}
}
}

trait DeltaSourceColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {
Expand Down

0 comments on commit 30fa2c5

Please sign in to comment.