Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Oct 10, 2016
1 parent ef7bb9f commit 2015fe2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.csv.CSVUtils.makeSafeHeader
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -76,6 +75,48 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
Some(schema)
}

/**
* Generates a header from the given row which is null-safe and duplicate-safe.
*/
private def makeSafeHeader(
row: Array[String],
options: CSVOptions,
caseSensitive: Boolean): Array[String] = {
if (options.headerFlag) {
val duplicates = {
val safeRow = if (!caseSensitive) {
// Elements in row might be null.
row.flatMap(Option(_).map(_.toLowerCase))
} else {
row
}
safeRow.diff(safeRow.distinct).distinct
}

row.zipWithIndex.map { case (value, index) =>
if (value == null || value.isEmpty || value == options.nullValue) {
// When there are empty strings or the values set in `nullValue`, put the
// index as the suffix.
s"_c$index"
} else if (!caseSensitive && duplicates.contains(value.toLowerCase)) {
// When there are case-insensitive duplicates, put the index as the suffix.
s"$value$index"
} else if (duplicates.contains(value)) {
// When there are duplicates, put the index as the suffix.
s"$value$index"
} else {
value
}
}
} else {
row.zipWithIndex.map { case (value, index) =>
// Uses default column names, "_c#" where # is its position of fields
// when header option is disabled.
s"_c$index"
}
}
}

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
Expand Down

This file was deleted.

This file was deleted.

0 comments on commit 2015fe2

Please sign in to comment.