Skip to content

Commit

Permalink
Fix fastparquet tests to work with HDFS
Browse files Browse the repository at this point in the history
Fixes NVIDIA#9545.

This commit fixes the `fastparquet` tests to run on Spark clusters where
the `fs.default.name` does not point to the local filesystem.

Before this commit, the `fastparquet` tests assumed that the parquet files
generated for the tests were written to local filesystem, and could be read
from both `fastparquet` and Spark from the same location.  However, this fails
when run against clusters whose default filesystem is HDFS. `fastparquet` can
only read from the local filesystem.

This commit changes the tests as follows:
1. For tests where data is generated by Spark, the data is copied to local
   filesystem before it is read by `fastparquet`.
2. For tests where data is generated by `fastparquet`, the data is copied
   to the default Hadoop filesystem before reading through Spark.

Signed-off-by: MithunR <mythrocks@gmail.com>
  • Loading branch information
mythrocks committed Oct 30, 2023
1 parent ddb8f6b commit 4992a88
Showing 1 changed file with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,38 @@ def fastparquet_unavailable():
tzinfo=timezone.utc) # Pandas.Timestamp.max, rounded down.


def read_parquet(data_path):
def copy_to_local(spark, hdfs_source, local_target):
"""
Copies contents of hdfs_source to local_target.
"""
sc = spark.sparkContext
Path = sc._jvm.org.apache.hadoop.fs.Path
config = sc._jsc.hadoopConfiguration()
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(config)
fs.copyToLocalFile(Path(hdfs_source), Path(local_target))


def delete_local_directory(local_path):
"""
Removes specified directory on local filesystem, along with all its contents.
:param local_path: String path on local filesystem
"""
print("Removing " + local_path)
# Clean up local data path.
try:
import shutil
shutil.rmtree(path=local_path)
except OSError:
print("Could not clean up local files in {}".format(local_path))


def read_parquet(data_path, local_data_path):
"""
(Fetches a function that) Reads Parquet from the specified `data_path`.
If the plugin is enabled, the read is done via Spark APIs, through the plugin.
If the plugin is disabled, the data is read via `fastparquet`.
:param data_path: Location of the (single) Parquet input file.
:param local_data_path: Location of the Parquet input, on the local filesystem.
:return: A function that reads Parquet, via the plugin or `fastparquet`.
"""

Expand All @@ -60,7 +86,8 @@ def read_with_fastparquet_or_plugin(spark):
if plugin_enabled:
return spark.read.parquet(data_path)
else:
df = fastparquet.ParquetFile(data_path).to_pandas()
copy_to_local(spark, data_path, local_data_path)
df = fastparquet.ParquetFile(local_data_path).to_pandas()
return spark.createDataFrame(df)

return read_with_fastparquet_or_plugin
Expand Down Expand Up @@ -114,6 +141,8 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path):
as well as limitations in fastparquet's handling of Dates, Timestamps, Decimals, etc.
"""
data_path = spark_tmp_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
local_base_path = (spark_tmp_path + "_local").replace("hdfs:///", "")
local_data_path = local_base_path + "/FASTPARQUET_SINGLE_COLUMN_INPUT"
gen = StructGen([('a', data_gen)], nullable=False)
# Write data with CPU session.
with_cpu_session(
Expand All @@ -122,7 +151,11 @@ def test_reading_file_written_by_spark_cpu(data_gen, spark_tmp_path):
conf=rebase_write_corrected_conf
)
# Read Parquet with CPU (fastparquet) and GPU (plugin), and compare records.
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path), result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer())
assert_gpu_and_cpu_are_equal_collect(read_parquet(data_path, local_data_path),
result_canonicalize_func_before_compare=get_fastparquet_result_canonicalizer())

# Clean up local copy of data.
delete_local_directory(local_base_path)


@pytest.mark.skipif(condition=fastparquet_unavailable(),
Expand Down

0 comments on commit 4992a88

Please sign in to comment.