Skip to content

Commit

Permalink
Fix fastparquet tests to work with HDFS (#9583)
Browse files Browse the repository at this point in the history
Fixes #9545.

This commit fixes the `fastparquet` tests to run on Spark clusters where
the `fs.default.name` does not point to the local filesystem.

Before this commit, the `fastparquet` tests assumed that the parquet files
generated for the tests were written to local filesystem, and could be read
from both `fastparquet` and Spark from the same location.  However, this fails
when run against clusters whose default filesystem is HDFS. `fastparquet` can
only read from the local filesystem.

This commit changes the tests as follows:
1. For tests where data is generated by Spark, the data is copied to local
   filesystem before it is read by `fastparquet`.
2. For tests where data is generated by `fastparquet`, the data is copied
   to the default Hadoop filesystem before reading through Spark.

Signed-off-by: MithunR <mythrocks@gmail.com>
  • Loading branch information
mythrocks authored Oct 31, 2023
1 parent 51f933f commit 58612bb
Showing 1 changed file with 103 additions and 16 deletions.
119 changes: 103 additions & 16 deletions integration_tests/src/main/python/fastparquet_compatibility_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,39 @@ def fastparquet_unavailable():
tzinfo=timezone.utc) # Pandas.Timestamp.max, rounded down.


def read_parquet(data_path):
def copy_to_local(spark, hdfs_source, local_target):
"""
Copies contents of hdfs_source to local_target.
"""
sc = spark.sparkContext
Path = sc._jvm.org.apache.hadoop.fs.Path
config = sc._jsc.hadoopConfiguration()
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config)
fs.copyToLocalFile(Path(hdfs_source), Path(local_target))


def delete_local_directory(local_path):
"""
Removes specified directory on local filesystem, along with all its contents.
:param local_path: String path on local filesystem
"""
print("Removing " + local_path)
# Clean up local data path.
if os.path.exists(path=local_path):
try:
import shutil
shutil.rmtree(path=local_path)
except OSError:
print("Could not clean up local files in {}".format(local_path))


def read_parquet(data_path, local_data_path):
"""
(Fetches a function that) Reads Parquet from the specified `data_path`.
If the plugin is enabled, the read is done via Spark APIs, through the plugin.
If the plugin is disabled, the data is read via `fastparquet`.
If the plugin is disabled, the data is copied to local_data_path, and read via `fastparquet`.
:param data_path: Location of the (single) Parquet input file.
:param local_data_path: Location of the Parquet input, on the local filesystem.
:return: A function that reads Parquet, via the plugin or `fastparquet`.
"""

Expand All @@ -60,7 +87,8 @@ def read_with_fastparquet_or_plugin(spark):
if plugin_enabled:
return spark.read.parquet(data_path)
else:
df = fastparquet.ParquetFile(data_path).to_pandas()
copy_to_local(spark, data_path, local_data_path)
df = fastparquet.ParquetFile(local_data_path).to_pandas()
return spark.createDataFrame(df)

return read_with_fastparquet_or_plugin
Expand Down Expand Up @@ -114,15 +142,23 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path):
as well as limitations in fastparquet's handling of Dates, Timestamps, Decimals, etc.
"""
data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
local_base_path = (spark_tmp_path + "_local")
local_data_path = local_base_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
gen = StructGen([('a', data_gen)], nullable=False)
# Write data with CPU session.
with_cpu_session(
# Single output file, to avoid differences in order of file reads.
lambda spark: gen_df(spark, gen, 2048).repartition(1).write.mode('overwrite').parquet(data_path),
conf=rebase_write_corrected_conf
)
# Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer())

try:
# Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path, local_data_path),
result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer())
finally:
# Clean up local copy of data.
delete_local_directory(local_base_path)


@pytest.mark.skipif(condition=fastparquet_unavailable(),
Expand Down Expand Up @@ -167,6 +203,8 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen):
There are xfails here because of fastparquet limitations in handling Decimal, Timestamps, Dates, etc.
"""
data_path = spark_tmp_path + "/FASTPARQUET_TEST_GPU_WRITE_PATH"
local_base_path = (spark_tmp_path + "_local")
local_data_path = local_base_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"

gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))
Expand All @@ -177,9 +215,24 @@ def test_reading_file_written_with_gpu(spark_tmp_path, column_gen):
conf=rebase_write_corrected_conf
)

# TODO: Maybe make _assert_equal() available to compare dataframes, regardless of CPU vs GPU?
# For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), conf=rebase_write_corrected_conf)
try:
# For now, this compares the results of reading back the GPU-written data, via fastparquet and GPU.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path=data_path, local_data_path=local_data_path),
conf=rebase_write_corrected_conf)
finally:
# Clean up local copy of data.
delete_local_directory(local_base_path)


def copy_from_local(spark, local_source, hdfs_target):
"""
Copies contents of local_source to hdfs_target.
"""
sc = spark.sparkContext
Path = sc._jvm.org.apache.hadoop.fs.Path
config = sc._jsc.hadoopConfiguration()
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config)
fs.copyFromLocalFile(Path(local_source), Path(hdfs_target))


@pytest.mark.skipif(condition=fastparquet_unavailable(),
Expand Down Expand Up @@ -243,13 +296,24 @@ def test_reading_file_written_with_fastparquet(column_gen, spark_tmp_path):
This test writes data-gen output with fastparquet, and checks that both Apache Spark and the RAPIDS plugin
read the written data correctly.
"""
data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH"
suffix = "/FASTPARQUET_WRITE_PATH"
data_path = spark_tmp_path + suffix
local_base_path = (spark_tmp_path + "_local")
local_data_path = local_base_path + suffix

def write_with_fastparquet(spark, data_gen):
import fastparquet
# TODO: (future) Compression settings?
dataframe = gen_df(spark, data_gen, 2048)
fastparquet.write(data_path, dataframe.toPandas())
os.makedirs(name=local_base_path, exist_ok=True)
fastparquet.write(local_data_path, dataframe.toPandas())
try:
# Copy out to cluster's filesystem (possibly HDFS).
copy_from_local(spark=spark, local_source=local_data_path, hdfs_target=data_path)
finally:
# Remove local copy.
delete_local_directory(local_path=local_base_path)


gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))
Expand Down Expand Up @@ -325,15 +389,38 @@ def test_reading_file_rewritten_with_fastparquet(column_gen, time_format, spark_
The final file should be in the correct format, with the right datatypes.
This is then checked for read-accuracy, via CPU and GPU.
"""
data_path = spark_tmp_path + "/FASTPARQUET_WRITE_PATH"
suffix = "/FASTPARQUET_WRITE_PATH"
hdfs_data_path = spark_tmp_path + suffix
local_base_path = (spark_tmp_path + "_local")
local_data_path = local_base_path + suffix

def rewrite_with_fastparquet(spark, data_gen):
"""
This helper function (eventually) writes data generated from `data_gen` to the local filesystem,
via fastparquet.
To preserve data types from data_gen, the writes are done first through Spark, thus:
1. Write to HDFS with Spark. (This preserves data types.)
2. Copy the data to local FS (so that fastparquet can read it).
3. Read data with fastparquet, write it back out to local FS with fastparquet.
4. Copy the fastparquet output back to HDFS, for reads with Spark.
"""
import fastparquet
tmp_data_path = data_path + "_tmp"
hdfs_tmp_data_path = hdfs_data_path + "_tmp"
spark_df = gen_df(spark, data_gen, 2048)
spark_df.repartition(1).write.mode("overwrite").parquet(tmp_data_path)
pandas_df = fastparquet.ParquetFile(tmp_data_path).to_pandas()
fastparquet.write(data_path, pandas_df, times=time_format)
# 1. Write to HDFS with Spark.
spark_df.repartition(1).write.mode("overwrite").parquet(hdfs_tmp_data_path)
# Make local base directory.
os.makedirs(name=local_base_path, exist_ok=True)
# 2. Copy Spark-written data to local filesystem, for read with Parquet.
local_tmp_data_path = local_data_path + "_tmp"
copy_to_local(spark=spark, hdfs_source=hdfs_tmp_data_path, local_target=local_tmp_data_path)
# 3. Read local tmp data with fastparquet, rewrite to final local path, with fastparquet.
pandas_df = fastparquet.ParquetFile(local_tmp_data_path).to_pandas()
fastparquet.write(local_data_path, pandas_df, times=time_format)
# 4. Copy fastparquet-written data back to HDFS, so that Spark can read it.
copy_from_local(spark=spark, local_source=local_data_path, hdfs_target=hdfs_data_path)
# Local data can now be cleaned up.
delete_local_directory(local_base_path)

gen = StructGen([('a', column_gen),
('part', IntegerGen(nullable=False))], nullable=False)
Expand All @@ -343,5 +430,5 @@ def rewrite_with_fastparquet(spark, data_gen):
)
# Read Parquet with CPU (Apache Spark) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path),
lambda spark: spark.read.parquet(hdfs_data_path),
rebase_write_corrected_conf)

0 comments on commit 58612bb

Please sign in to comment.