Skip to content

Commit

Permalink
[SPARK-29999][SS][FOLLOWUP] Fix test to check the actual metadata log…
Browse files Browse the repository at this point in the history
… directory

### What changes were proposed in this pull request?

This patch fixes the missed spot - the test initializes FileStreamSinkLog with its "output" directory instead of "metadata" directory, hence the verification against sink log was no-op.

### Why are the changes needed?

Without the fix, the verification against sink log was no-op.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Checked with debugger in test, and verified `allFiles()` returns non-zero entries. (It returned zero entry, as there's no metadata.)

Closes #28930 from HeartSaVioR/SPARK-29999-FOLLOWUP-fix-test.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 5472170)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
HeartSaVioR authored and cloud-fan committed Jun 30, 2020
1 parent 503e56a commit 22e3433
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ object FileStreamSink extends Logging {
val hdfsPath = new Path(singlePath)
val fs = hdfsPath.getFileSystem(hadoopConf)
if (fs.isDirectory(hdfsPath)) {
val metadataPath = new Path(hdfsPath, metadataDir)
checkEscapedMetadataPath(fs, metadataPath, sqlConf)
val metadataPath = getMetadataLogPath(fs, hdfsPath, sqlConf)
fs.exists(metadataPath)
} else {
false
Expand All @@ -55,6 +54,12 @@ object FileStreamSink extends Logging {
}
}

def getMetadataLogPath(fs: FileSystem, path: Path, sqlConf: SQLConf): Path = {
val metadataDir = new Path(path, FileStreamSink.metadataDir)
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sqlConf)
metadataDir
}

def checkEscapedMetadataPath(fs: FileSystem, metadataPath: Path, sqlConf: SQLConf): Unit = {
if (sqlConf.getConf(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
&& StreamExecution.containsSpecialCharsInPath(metadataPath)) {
Expand Down Expand Up @@ -125,14 +130,12 @@ class FileStreamSink(
partitionColumnNames: Seq[String],
options: Map[String, String]) extends Sink with Logging {

import FileStreamSink._

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
private val logPath = {
val metadataDir = new Path(basePath, FileStreamSink.metadataDir)
val fs = metadataDir.getFileSystem(hadoopConf)
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,10 +555,12 @@ abstract class FileStreamSinkSuite extends StreamTest {
}
}

val fs = new Path(outputDir.getCanonicalPath).getFileSystem(
spark.sessionState.newHadoopConf())
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark,
outputDir.getCanonicalPath)
val outputDirPath = new Path(outputDir.getCanonicalPath)
val hadoopConf = spark.sessionState.newHadoopConf()
val fs = outputDirPath.getFileSystem(hadoopConf)
val logPath = FileStreamSink.getMetadataLogPath(fs, outputDirPath, conf)

val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toString)

val allFiles = sinkLog.allFiles()
// only files from non-empty partition should be logged
Expand Down

0 comments on commit 22e3433

Please sign in to comment.