From 6f4f7195fe7e067c12c53a9b8d366f6f879f9a4b Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 30 Oct 2023 14:07:23 -0700 Subject: [PATCH] Fix fastparquet tests to work with HDFS Fixes #9545. This commit fixes the `fastparquet` tests to run on Spark clusters where the `fs.default.name` does not point to the local filesystem. Before this commit, the `fastparquet` tests assumed that the parquet files generated for the tests were written to local filesystem, and could be read from both `fastparquet` and Spark from the same location. However, this fails when run against clusters whose default filesystem is HDFS. `fastparquet` can only read from the local filesystem. This commit changes the tests as follows: 1. For tests where data is generated by Spark, the data is copied to local filesystem before it is read by `fastparquet`. 2. For tests where data is generated by `fastparquet`, the data is copied to the default Hadoop filesystem before reading through Spark. Signed-off-by: MithunR --- .../python/fastparquet_compatibility_test.py | 119 +++++++++++++++--- 1 file changed, 103 insertions(+), 16 deletions(-) diff --git a/integration_tests/src/main/python/fastparquet_compatibility_test.py b/integration_tests/src/main/python/fastparquet_compatibility_test.py index 590da35fb7e..d2636d58d01 100644 --- a/integration_tests/src/main/python/fastparquet_compatibility_test.py +++ b/integration_tests/src/main/python/fastparquet_compatibility_test.py @@ -45,12 +45,39 @@ def fastparquet_unavailable(): tzinfo=timezone.utc) # Pandas.Timestamp.max, rounded down. -def read_parquet(data_path): +def copy_to_local(spark, hdfs_source, local_target): + """ + Copies contents of hdfs_source to local_target. + """ + sc = spark.sparkContext + Path = sc._jvm.org.apache.hadoop.fs.Path + config = sc._jsc.hadoopConfiguration() + fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config) + fs.copyToLocalFile(Path(hdfs_source), Path(local_target)) + + +def delete_local_directory(local_path): + """ + Removes specified directory on local filesystem, along with all its contents. + :param local_path: String path on local filesystem + """ + print("Removing " + local_path) + # Clean up local data path. + if os.path.exists(path=local_path): + try: + import shutil + shutil.rmtree(path=local_path) + except OSError: + print("Could not clean up local files in {}".format(local_path)) + + +def read_parquet(data_path, local_data_path): """ (Fetches a function that) Reads Parquet from the specified `data_path`. If the plugin is enabled, the read is done via Spark APIs, through the plugin. - If the plugin is disabled, the data is read via `fastparquet`. + If the plugin is disabled, the data is copied to local_data_path, and read via `fastparquet`. :param data_path: Location of the (single) Parquet input file. + :param local_data_path: Location of the Parquet input, on the local filesystem. :return: A function that reads Parquet, via the plugin or `fastparquet`. """ @@ -60,7 +87,8 @@ def read_with_fastparquet_or_plugin(spark): if plugin_enabled: return spark.read.parquet(data_path) else: - df = fastparquet.ParquetFile(data_path).to_pandas() + copy_to_local(spark, data_path, local_data_path) + df = fastparquet.ParquetFile(local_data_path).to_pandas() return spark.createDataFrame(df) return read_with_fastparquet_or_plugin @@ -114,6 +142,8 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): as well as limitations in fastparquet's handling of Dates, Timestamps, Decimals, etc. """ data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" + local_base_path = (spark_tmp_path + "_local") + local_data_path = local_base_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', data_gen)], nullable=False) # Write data with CPU session. with_cpu_session( @@ -121,8 +151,14 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path): lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path), conf=rebase_write_corrected_conf ) - # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. - assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer()) + + try: + # Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records. + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path, local_data_path), + result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer()) + finally: + # Clean up local copy of data. + delete_local_directory(local_base_path) @pytest.mark.skipif(condition=fastparquet_unavailable(), @@ -167,6 +203,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): There are xfails here because of fastparquet limitations in handling Decimal, Timestamps, Dates, etc. """ data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH" + local_base_path = (spark_tmp_path + "_local") + local_data_path = local_base_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT" gen = StructGen([('a', column_gen), ('part', IntegerGen(nullable=False)) @@ -177,9 +215,24 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen): conf=rebase_write_corrected_conf ) - # TODO: Maybe make _assert_equal() available to compare dataframes, regardless of CPU vs GPU? - # For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU. - assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=rebase_write_corrected_conf) + try: + # For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU. + assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path=data_path, local_data_path=local_data_path), + conf=rebase_write_corrected_conf) + finally: + # Clean up local copy of data. + delete_local_directory(local_base_path) + + +def copy_from_local(spark, local_source, hdfs_target): + """ + Copies contents of local_source to hdfs_target. + """ + sc = spark.sparkContext + Path = sc._jvm.org.apache.hadoop.fs.Path + config = sc._jsc.hadoopConfiguration() + fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config) + fs.copyFromLocalFile(Path(local_source), Path(hdfs_target)) @pytest.mark.skipif(condition=fastparquet_unavailable(), @@ -243,13 +296,24 @@ def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path): This test writes data-gen output with fastparquet, and checks that both Apache Spark and the RAPIDS plugin read the written data correctly. """ - data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" + suffix = "/FASTPARQUET_WRITE_PATH" + data_path = spark_tmp_path + suffix + local_base_path = (spark_tmp_path + "_local") + local_data_path = local_base_path + suffix def write_with_fastparquet(spark, data_gen): import fastparquet # TODO: (future) Compression settings? dataframe = gen_df(spark, data_gen, 2048) - fastparquet.write(data_path, dataframe.toPandas()) + os.makedirs(name=local_base_path, exist_ok=True) + fastparquet.write(local_data_path, dataframe.toPandas()) + try: + # Copy out to cluster's filesystem (possibly HDFS). + copy_from_local(spark=spark, local_source=local_data_path, hdfs_target=data_path) + finally: + # Remove local copy. + delete_local_directory(local_path=local_base_path) + gen = StructGen([('a', column_gen), ('part', IntegerGen(nullable=False)) @@ -325,15 +389,38 @@ def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_ The final file should be in the correct format, with the right datatypes. This is then checked for read-accuracy, via CPU and GPU. """ - data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH" + suffix = "/FASTPARQUET_WRITE_PATH" + hdfs_data_path = spark_tmp_path + suffix + local_base_path = (spark_tmp_path + "_local") + local_data_path = local_base_path + suffix def rewrite_with_fastparquet(spark, data_gen): + """ + This helper function (eventually) writes data generated from `data_gen` to the local filesystem, + via fastparquet. + To preserve data types from data_gen, the writes are done first through Spark, thus: + 1. Write to HDFS with Spark. (This preserves data types.) + 2. Copy the data to local FS (so that fastparquet can read it). + 3. Read data with fastparquet, write it back out to local FS with fastparquet. + 4. Copy the fastparquet output back to HDFS, for reads with Spark. + """ import fastparquet - tmp_data_path = data_path + "_tmp" + hdfs_tmp_data_path = hdfs_data_path + "_tmp" spark_df = gen_df(spark, data_gen, 2048) - spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path) - pandas_df = fastparquet.ParquetFile(tmp_data_path).to_pandas() - fastparquet.write(data_path, pandas_df, times=time_format) + # 1. Write to HDFS with Spark. + spark_df.repartition(1).write.mode("overwrite").parquet(hdfs_tmp_data_path) + # Make local base directory. + os.makedirs(name=local_base_path, exist_ok=True) + # 2. Copy Spark-written data to local filesystem, for read with Parquet. + local_tmp_data_path = local_data_path + "_tmp" + copy_to_local(spark=spark, hdfs_source=hdfs_tmp_data_path, local_target=local_tmp_data_path) + # 3. Read local tmp data with fastparquet, rewrite to final local path, with fastparquet. + pandas_df = fastparquet.ParquetFile(local_tmp_data_path).to_pandas() + fastparquet.write(local_data_path, pandas_df, times=time_format) + # 4. Copy fastparquet-written data back to HDFS, so that Spark can read it. + copy_from_local(spark=spark, local_source=local_data_path, hdfs_target=hdfs_data_path) + # Local data can now be cleaned up. + delete_local_directory(local_base_path) gen = StructGen([('a', column_gen), ('part', IntegerGen(nullable=False))], nullable=False) @@ -343,5 +430,5 @@ def rewrite_with_fastparquet(spark, data_gen): ) # Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records. assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.read.parquet(data_path), + lambda spark: spark.read.parquet(hdfs_data_path), rebase_write_corrected_conf)