diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 1b79bdb763f..d6d05926099 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -67,6 +67,11 @@ dependencies: - pydata-sphinx-theme - librdkafka=1.7.0 - python-confluent-kafka=1.7.0 + - moto>=3.1.6 + - boto3>=1.21.21 + - botocore>=1.24.21 + - aiobotocore>=2.2.0 + - s3fs>=2022.3.0 - pip: - git+https://github.com/python-streamz/streamz.git@master - pyorc diff --git a/docs/cudf/source/api_docs/io.rst b/docs/cudf/source/api_docs/io.rst index 7e4d1b48c93..a52667cd3e4 100644 --- a/docs/cudf/source/api_docs/io.rst +++ b/docs/cudf/source/api_docs/io.rst @@ -36,6 +36,10 @@ Parquet read_parquet DataFrame.to_parquet cudf.io.parquet.read_parquet_metadata + :template: autosummary/class_with_autosummary.rst + + cudf.io.parquet.ParquetDatasetWriter + ORC ~~~ diff --git a/python/cudf/cudf/io/__init__.py b/python/cudf/cudf/io/__init__.py index 15404b26042..4ec84ecbc74 100644 --- a/python/cudf/cudf/io/__init__.py +++ b/python/cudf/cudf/io/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. from cudf.io.avro import read_avro from cudf.io.csv import read_csv, to_csv from cudf.io.dlpack import from_dlpack @@ -9,6 +9,7 @@ from cudf.io.parquet import ( merge_parquet_filemetadata, read_parquet, + ParquetDatasetWriter, read_parquet_metadata, write_to_dataset, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index d7e85d72ba0..a9398a3139f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,6 +1,8 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. import math +import shutil +import tempfile import warnings from collections import defaultdict from contextlib import ExitStack @@ -232,12 +234,15 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset + # TODO: Remove the if len(paths) workaround after following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 dataset = ds.dataset( - paths, + source=paths[0] if len(paths) == 1 else paths, filesystem=fs, format="parquet", partitioning="hive", ) + file_list = dataset.files if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") @@ -837,6 +842,67 @@ def _parse_bytes(s): class ParquetDatasetWriter: + """ + Write a parquet file or dataset incrementally + + Parameters + ---------- + path : str + A local directory path or S3 URL. Will be used as root directory + path while writing a partitioned dataset. + partition_cols : list + Column names by which to partition the dataset + Columns are partitioned in the order they are given + index : bool, default None + If ``True``, include the dataframe's index(es) in the file output. + If ``False``, they will not be written to the file. If ``None``, + index(es) other than RangeIndex will be saved as columns. + compression : {'snappy', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. + statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' + Level at which column statistics should be included in file. + max_file_size : int or str, default None + A file size that cannot be exceeded by the writer. + It is in bytes, if the input is int. + Size can also be a str in form or "10 MB", "1 GB", etc. + If this parameter is used, it is mandatory to pass + `file_name_prefix`. + file_name_prefix : str + This is a prefix to file names generated only when + `max_file_size` is specified. + + + Examples + -------- + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: none + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + + """ + @_cudf_nvtx_annotate def __init__( self, @@ -847,68 +913,15 @@ def __init__( statistics="ROWGROUP", max_file_size=None, file_name_prefix=None, + **kwargs, ) -> None: - """ - Write a parquet file or dataset incrementally - - Parameters - ---------- - path : str - File path or Root Directory path. Will be used as Root Directory - path while writing a partitioned dataset. - partition_cols : list - Column names by which to partition the dataset - Columns are partitioned in the order they are given - index : bool, default None - If ``True``, include the dataframe’s index(es) in the file output. - If ``False``, they will not be written to the file. If ``None``, - index(es) other than RangeIndex will be saved as columns. - compression : {'snappy', None}, default 'snappy' - Name of the compression to use. Use ``None`` for no compression. - statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' - Level at which column statistics should be included in file. - max_file_size : int or str, default None - A file size that cannot be exceeded by the writer. - It is in bytes, if the input is int. - Size can also be a str in form or "10 MB", "1 GB", etc. - If this parameter is used, it is mandatory to pass - `file_name_prefix`. - file_name_prefix : str - This is a prefix to file names generated only when - `max_file_size` is specified. - - - Examples - ________ - Using a context - - >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) - >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) - >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: - ... cw.write_table(df1) - ... cw.write_table(df2) - - By manually calling ``close()`` - - >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) - >>> cw.write_table(df1) - >>> cw.write_table(df2) - >>> cw.close() - - Both the methods will generate the same directory structure - - .. code-block:: bash - - dataset/ - a=1 - .parquet - a=2 - .parquet - a=3 - .parquet + if isinstance(path, str) and path.startswith("s3://"): + self.fs_meta = {"is_s3": True, "actual_path": path} + self.path = tempfile.TemporaryDirectory().name + else: + self.fs_meta = {} + self.path = path - """ - self.path = path self.common_args = { "index": index, "compression": compression, @@ -923,6 +936,7 @@ def __init__( # Map of partition_col values to their ParquetWriter's index # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} + self.kwargs = kwargs self.filename = file_name_prefix self.max_file_size = max_file_size if max_file_size is not None: @@ -1051,18 +1065,19 @@ def write_table(self, df): ] cw.write_table(grouped_df, this_cw_part_info) - # Create new cw for unhandled paths encountered in this write_table - new_paths, part_info, meta_paths = zip(*new_cw_paths) - self._chunked_writers.append( - ( - ParquetWriter(new_paths, **self.common_args), - new_paths, - meta_paths, + if new_cw_paths: + # Create new cw for unhandled paths encountered in this write_table + new_paths, part_info, meta_paths = zip(*new_cw_paths) + self._chunked_writers.append( + ( + ParquetWriter(new_paths, **self.common_args), + new_paths, + meta_paths, + ) ) - ) - new_cw_idx = len(self._chunked_writers) - 1 - self.path_cw_map.update({k: new_cw_idx for k in new_paths}) - self._chunked_writers[-1][0].write_table(grouped_df, part_info) + new_cw_idx = len(self._chunked_writers) - 1 + self.path_cw_map.update({k: new_cw_idx for k in new_paths}) + self._chunked_writers[-1][0].write_table(grouped_df, part_info) @_cudf_nvtx_annotate def close(self, return_metadata=False): @@ -1076,6 +1091,15 @@ def close(self, return_metadata=False): for cw, _, meta_path in self._chunked_writers ] + if self.fs_meta.get("is_s3", False): + local_path = self.path + s3_path = self.fs_meta["actual_path"] + s3_file, _ = ioutils._get_filesystem_and_paths( + s3_path, **self.kwargs + ) + s3_file.put(local_path, s3_path, recursive=True) + shutil.rmtree(self.path) + if return_metadata: return ( merge_parquet_filemetadata(metadata) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index e8d93caaf55..0966bee93fd 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -2,6 +2,7 @@ import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -18,12 +19,22 @@ import cudf from cudf.testing._utils import assert_eq -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + @contextmanager def ensure_safe_environment_variables(): """ @@ -40,7 +51,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -49,15 +60,11 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -82,13 +89,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} @@ -141,13 +145,13 @@ def pdf_ext(scope="module"): def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer fname = "test_csv_reader.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=False, @@ -155,9 +159,9 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): assert_eq(pdf, got) # Use Arrow PythonFile object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=True, @@ -168,13 +172,13 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): def test_read_csv_arrow_nativefile(s3_base, s3so, pdf): # Write to buffer fname = "test_csv_reader_arrow_nativefile.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_csv(fil) assert_eq(pdf, got) @@ -187,13 +191,13 @@ def test_read_csv_byte_range( ): # Write to buffer fname = "test_csv_reader_byte_range.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, byte_range=(74, 73), bytes_per_thread=bytes_per_thread, @@ -209,19 +213,19 @@ def test_read_csv_byte_range( def test_write_csv(s3_base, s3so, pdf, chunksize): # Write to buffer fname = "test_csv_writer.csv" - bname = "csv" + bucket = "csv" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", index=False, chunksize=chunksize, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname}") # TODO: Update to use `storage_options` from pandas v1.2.0 - got = pd.read_csv(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_csv(s3fs.open(f"s3://{bucket}/{fname}")) assert_eq(pdf, got) @@ -240,15 +244,15 @@ def test_read_parquet( use_python_file_object, ): fname = "test_parquet_reader.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", open_file_options=( {"precache_options": {"method": precache}} if use_python_file_object @@ -264,11 +268,11 @@ def test_read_parquet( # Check fsspec file-object handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): - fs = get_fs_token_paths(f"s3://{bname}/{fname}", storage_options=s3so)[ - 0 - ] - with fs.open(f"s3://{bname}/{fname}", mode="rb") as f: + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + fs = get_fs_token_paths( + f"s3://{bucket}/{fname}", storage_options=s3so + )[0] + with fs.open(f"s3://{bucket}/{fname}", mode="rb") as f: got2 = cudf.read_parquet( f, bytes_per_thread=bytes_per_thread, @@ -290,7 +294,7 @@ def test_read_parquet_ext( index, ): fname = "test_parquet_reader_ext.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() if index: @@ -300,9 +304,9 @@ def test_read_parquet_ext( # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, footer_sample_size=3200, @@ -323,15 +327,15 @@ def test_read_parquet_ext( def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer fname = "test_parquet_reader_arrow_nativefile.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_parquet(fil, columns=columns) expect = pdf[columns] if columns else pdf @@ -341,14 +345,14 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): @pytest.mark.parametrize("precache", [None, "parquet"]) def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): fname = "test_parquet_reader_filters.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf_ext.to_parquet(path=buffer) buffer.seek(0) filters = [("String", "==", "Omega")] - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, filters=filters, open_file_options={"precache_options": {"method": precache}}, @@ -360,25 +364,38 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): @pytest.mark.parametrize("partition_cols", [None, ["String"]]) def test_write_parquet(s3_base, s3so, pdf, partition_cols): - fname = "test_parquet_writer.parquet" - bname = "parquet" + fname_cudf = "test_parquet_writer_cudf" + fname_pandas = "test_parquet_writer_pandas" + bucket = "parquet" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname_cudf}", + partition_cols=partition_cols, + storage_options=s3so, + ) + assert s3fs.exists(f"s3://{bucket}/{fname_cudf}") + pdf.to_parquet( + f"s3://{bucket}/{fname_pandas}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname_pandas}") - got = pd.read_parquet(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_parquet( + f"s3://{bucket}/{fname_pandas}", storage_options=s3so + ) + expect = cudf.read_parquet( + f"s3://{bucket}/{fname_cudf}", storage_options=s3so + ) - assert_eq(pdf, got) + assert_eq(expect, got) def test_read_json(s3_base, s3so): fname = "test_json_reader.json" - bname = "json" + bucket = "json" # TODO: After following bug is fixed switch # back to using bytes: # https://github.com/pandas-dev/pandas/issues/46935 @@ -396,9 +413,9 @@ def test_read_json(s3_base, s3so): '{"amount": 400, "name": "Dennis"}\n' ) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_json( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", engine="cudf", orient="records", lines=True, @@ -414,15 +431,15 @@ def test_read_json(s3_base, s3so): def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_orc( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", columns=columns, storage_options=s3so, use_python_file_object=use_python_file_object, @@ -437,17 +454,17 @@ def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_orc(fil, columns=columns) if columns: @@ -457,13 +474,51 @@ def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): def test_write_orc(s3_base, s3so, pdf): fname = "test_orc_writer.orc" - bname = "orc" + bucket = "orc" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: - gdf.to_orc(f"s3://{bname}/{fname}", storage_options=s3so) - assert s3fs.exists(f"s3://{bname}/{fname}") + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: + gdf.to_orc(f"s3://{bucket}/{fname}", storage_options=s3so) + assert s3fs.exists(f"s3://{bucket}/{fname}") - with s3fs.open(f"s3://{bname}/{fname}") as f: + with s3fs.open(f"s3://{bucket}/{fname}") as f: got = pa.orc.ORCFile(f).read().to_pandas() assert_eq(pdf, got) + + +def test_write_chunked_parquet(s3_base, s3so): + df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}) + df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]}) + dirname = "chunked_writer_directory" + bucket = "parquet" + from cudf.io.parquet import ParquetDatasetWriter + + with s3_context( + s3_base=s3_base, bucket=bucket, files={dirname: BytesIO()} + ) as s3fs: + with ParquetDatasetWriter( + f"s3://{bucket}/{dirname}", + partition_cols=["a"], + storage_options=s3so, + ) as cw: + cw.write_table(df1) + cw.write_table(df2) + + # TODO: Replace following workaround with: + # expect = cudf.read_parquet(f"s3://{bucket}/{dirname}/", + # storage_options=s3so) + # after the following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + + dfs = [] + for folder in {"a=1", "a=2", "a=3"}: + assert s3fs.exists(f"s3://{bucket}/{dirname}/{folder}") + for file in s3fs.ls(f"s3://{bucket}/{dirname}/{folder}"): + df = cudf.read_parquet("s3://" + file, storage_options=s3so) + dfs.append(df) + + actual = cudf.concat(dfs).astype("int64") + assert_eq( + actual.sort_values(["b"]).reset_index(drop=True), + cudf.concat([df1, df2]).sort_values(["b"]).reset_index(drop=True), + ) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 83ff1273b36..9283380296c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,5 +1,8 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. + import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -11,12 +14,22 @@ import dask_cudf -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("127.0.0.1", 0)) + port = sock.getsockname()[1] + sock.close() + return port + + @contextmanager def ensure_safe_environment_variables(): """ @@ -33,7 +46,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -42,15 +55,10 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -75,13 +83,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}}