From 2015fe207526853d40ec8930817529a5f6edeb46 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 10 Oct 2016 00:06:43 +0900 Subject: [PATCH] Address comment --- .../datasources/csv/CSVFileFormat.scala | 43 ++++++++++++- .../execution/datasources/csv/CSVUtils.scala | 63 ------------------- .../datasources/csv/CSVUtilsSuite.scala | 60 ------------------ 3 files changed, 42 insertions(+), 124 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala index 67293440fc222..7e19f3954ef42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.CompressionCodecs import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.csv.CSVUtils.makeSafeHeader import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -76,6 +75,48 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister { Some(schema) } + /** + * Generates a header from the given row which is null-safe and duplicate-safe. + */ + private def makeSafeHeader( + row: Array[String], + options: CSVOptions, + caseSensitive: Boolean): Array[String] = { + if (options.headerFlag) { + val duplicates = { + val safeRow = if (!caseSensitive) { + // Elements in row might be null. + row.flatMap(Option(_).map(_.toLowerCase)) + } else { + row + } + safeRow.diff(safeRow.distinct).distinct + } + + row.zipWithIndex.map { case (value, index) => + if (value == null || value.isEmpty || value == options.nullValue) { + // When there are empty strings or the values set in `nullValue`, put the + // index as the suffix. + s"_c$index" + } else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { + // When there are case-insensitive duplicates, put the index as the suffix. + s"$value$index" + } else if (duplicates.contains(value)) { + // When there are duplicates, put the index as the suffix. + s"$value$index" + } else { + value + } + } + } else { + row.zipWithIndex.map { case (value, index) => + // Uses default column names, "_c#" where # is its position of fields + // when header option is disabled. + s"_c$index" + } + } + } + override def prepareWrite( sparkSession: SparkSession, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala deleted file mode 100644 index 87f10c2cdf0f9..0000000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtils.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -private[csv] object CSVUtils { - - /** - * Generates a header from the given row which is null-safe and duplicate-safe. - */ - def makeSafeHeader( - row: Array[String], - options: CSVOptions, - caseSensitive: Boolean): Array[String] = { - if (options.headerFlag) { - val duplicates = { - val safeRow = if (!caseSensitive) { - // Elements in row might be null. - row.flatMap(Option(_).map(_.toLowerCase)) - } else { - row - } - safeRow.diff(safeRow.distinct).distinct - } - - row.zipWithIndex.map { case (value, index) => - if (value == null || value.isEmpty || value == options.nullValue) { - // When there are empty strings or the values set in `nullValue`, put the - // index as the suffix. - s"_c$index" - } else if (!caseSensitive && duplicates.contains(value.toLowerCase)) { - // When there are case-insensitive duplicates, put the index as the suffix. - s"$value$index" - } else if (duplicates.contains(value)) { - // When there are duplicates, put the index as the suffix. - s"$value$index" - } else { - value - } - } - } else { - row.zipWithIndex.map { case (value, index) => - // Uses default column names, "_c#" where # is its position of fields - // when header option is disabled. - s"_c$index" - } - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala deleted file mode 100644 index ee8e183bd1836..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVUtilsSuite.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.csv - -import org.apache.spark.SparkFunSuite - -class CSVUtilsSuite extends SparkFunSuite { - - test("generate default names when header is not used") { - val csvOptions = CSVOptions("header", "false") - val row = Array("a", "b", "c") - val expected = Array("_c0", "_c1", "_c2") - assert(CSVUtils.makeSafeHeader(row, csvOptions, false).deep == expected.deep) - } - - test("duplicated empty strings as field names") { - val csvOptions = CSVOptions("header", "true") - val row = Array("", "", "") - val expected = Array("_c0", "_c1", "_c2") - assert(CSVUtils.makeSafeHeader(row, csvOptions, false).deep == expected.deep) - } - - test("duplicated nullValue as field names") { - val csvOptions = new CSVOptions(Map("header" -> "true", "nullValue" -> "abc")) - val row = Array("abc", "abc", "abc") - val expected = Array("_c0", "_c1", "_c2") - assert(CSVUtils.makeSafeHeader(row, csvOptions, false).deep == expected.deep) - } - - test("duplicated field names - case-sensitive") { - val csvOptions = CSVOptions("header", "true") - val row = Array("a", "A", "a") - val expected = Array("a0", "A", "a2") - val caseSensitive = true - assert(CSVUtils.makeSafeHeader(row, csvOptions, caseSensitive).deep == expected.deep) - } - - test("duplicated field names - case-insensitive") { - val csvOptions = CSVOptions("header", "true") - val row = Array("a", "A", "a") - val expected = Array("a0", "A1", "a2") - val caseSensitive = false - assert(CSVUtils.makeSafeHeader(row, csvOptions, caseSensitive).deep == expected.deep) - } -}