From b02527178d38323a239f651506ca609fd963f454 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Thu, 26 Nov 2020 22:28:41 +0800 Subject: [PATCH 1/7] fix SPARK-33566 --- .../spark/sql/catalyst/csv/CSVOptions.scala | 8 ++++++- .../apache/spark/sql/DataFrameReader.scala | 21 +++++++++++++++++++ .../unescaped-quotes-unescaped-delimiter.csv | 3 +++ .../execution/datasources/csv/CSVSuite.scala | 15 +++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/test-data/unescaped-quotes-unescaped-delimiter.csv diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index f2191fcf35f1a..ec405994eadef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -213,6 +213,12 @@ class CSVOptions( } val lineSeparatorInWrite: Option[String] = lineSeparator + /** + * The handling method to be used when unescaped quotes are found in the input. + */ + val unescapedQuoteHandling: UnescapedQuoteHandling = UnescapedQuoteHandling.valueOf(parameters + .getOrElse("unescapedQuoteHandling", "STOP_AT_DELIMITER").toUpperCase(Locale.ROOT)) + def asWriterSettings: CsvWriterSettings = { val writerSettings = new CsvWriterSettings() val format = writerSettings.getFormat @@ -258,7 +264,7 @@ class CSVOptions( settings.setNullValue(nullValue) settings.setEmptyValue(emptyValueInRead) settings.setMaxCharsPerColumn(maxCharsPerColumn) - settings.setUnescapedQuoteHandling(UnescapedQuoteHandling.STOP_AT_DELIMITER) + settings.setUnescapedQuoteHandling(unescapedQuoteHandling) settings.setLineSeparatorDetectionEnabled(lineSeparatorInRead.isEmpty && multiLine) lineSeparatorInRead.foreach { _ => settings.setNormalizeLineEndingsWithinQuotes(!multiLine) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b26bc6441b6cf..8f96f0b882424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -727,6 +727,27 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * a record can have. *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • + *
  • `unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * + *
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. Note that Spark tries * to parse only required columns in CSV under column pruning. Therefore, corrupt records diff --git a/sql/core/src/test/resources/test-data/unescaped-quotes-unescaped-delimiter.csv b/sql/core/src/test/resources/test-data/unescaped-quotes-unescaped-delimiter.csv new file mode 100644 index 0000000000000..a1d91b6d27a79 --- /dev/null +++ b/sql/core/src/test/resources/test-data/unescaped-quotes-unescaped-delimiter.csv @@ -0,0 +1,3 @@ +c1,c2 +"a,""b,c","xyz" +"a,b,c","x""yz" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index a236814fdcdcd..97c0fe11c17ad 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -75,6 +75,8 @@ abstract class CSVSuite private val valueMalformedFile = "test-data/value-malformed.csv" private val badAfterGoodFile = "test-data/bad_after_good.csv" private val malformedRowFile = "test-data/malformedRow.csv" + private val unescapedQuotesAndUnescapedDelimiterFile = + "test-data/unescaped-quotes-unescaped-delimiter.csv" /** Verifies data and schema. */ private def verifyCars( @@ -2428,6 +2430,19 @@ abstract class CSVSuite assert(readback.collect sameElements Array(Row("0"), Row("1"), Row("2"))) } } + + test("SPARK-33566: configure UnescapedQuoteHandling to parse " + + "unescapedQuotesAndUnescapedDelimiterFile correctly") { + // Without configure UnescapedQuoteHandling to STOP_AT_CLOSING_QUOTE, + // the result will be Row(""""a,""b""", """c""""), Row("""a,b,c""", """"x""yz"""") + val result = spark.read + .option("inferSchema", "true") + .option("header", "true") + .option("unescapedQuoteHandling", "STOP_AT_CLOSING_QUOTE") + .csv(testFile(unescapedQuotesAndUnescapedDelimiterFile)).collect() + val exceptResults = Array(Row("""a,""b,c""", "xyz"), Row("""a,b,c""", """x""yz""")) + assert(result.sameElements(exceptResults)) + } } class CSVv1Suite extends CSVSuite { From 1770c565aa573e6f32e404b4f775f2c12edcae2e Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 27 Nov 2020 10:52:36 +0800 Subject: [PATCH 2/7] add comments to DataStreamReader.scala, readwriter.py and streaming.py --- python/pyspark/sql/readwriter.py | 21 +++++++++++++++++++ python/pyspark/sql/streaming.py | 21 +++++++++++++++++++ .../sql/streaming/DataStreamReader.scala | 21 +++++++++++++++++++ 3 files changed, 63 insertions(+) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index bb31e6a3e09f8..b492198f2959c 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -259,6 +259,27 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, allows accepting quoting of all character using backslash quoting mechanism. If None is set, it uses the default value, ``false``. + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate \ + the quote character and proceed parsing the value as a quoted value, until a closing \ + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value \ + as an unquoted value. This will make the parser accumulate all characters of the current \ + parsed value until the delimiter is found. If no delimiter is found in the value, the \ + parser will continue accumulating characters from the input until a delimiter or line \ + ending is found. + * ``STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value \ + as an unquoted value. This will make the parser accumulate all characters until the \ + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed \ + for the given value will be skipped and the value set in nullValue will be produced \ + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException \ + will be thrown. + mode : str, optional allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e7b2fa16d620a..63fcbd52fd366 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -851,6 +851,27 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non maxMalformedLogPerPartition : str or int, optional this parameter is no longer used since Spark 2.2.0. If specified, it is ignored. + unescapedQuoteHandling : str, optional + defines how the CsvParser will handle values with unescaped quotes. If None is + set, it uses the default value, ``STOP_AT_DELIMITER``. + + * ``STOP_AT_CLOSING_QUOTE``: If unescaped quotes are found in the input, accumulate \ + the quote character and proceed parsing the value as a quoted value, until a closing \ + quote is found. + * ``BACK_TO_DELIMITER``: If unescaped quotes are found in the input, consider the value \ + as an unquoted value. This will make the parser accumulate all characters of the current \ + parsed value until the delimiter is found. If no delimiter is found in the value, the \ + parser will continue accumulating characters from the input until a delimiter or line \ + ending is found. + * ``STOP_AT_DELIMITER`: If unescaped quotes are found in the input, consider the value \ + as an unquoted value. This will make the parser accumulate all characters until the \ + delimiter or a line ending is found in the input. + * ``STOP_AT_DELIMITER``: If unescaped quotes are found in the input, the content parsed \ + for the given value will be skipped and the value set in nullValue will be produced \ + instead. + * ``RAISE_ERROR``: If unescaped quotes are found in the input, a TextParsingException \ + will be thrown. + mode : str, optional allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 9bc4acd49a980..7f4ef8be562fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -396,6 +396,27 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * a record can have.
  • *
  • `maxCharsPerColumn` (default `-1`): defines the maximum number of characters allowed * for any given value being read. By default, it is -1 meaning unlimited length
  • + *
  • `unescapedQuoteHandling` (default `STOP_AT_DELIMITER`): defines how the CsvParser + * will handle values with unescaped quotes. + * + *
  • *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. *