diff --git a/python/cudf/cudf/tests/test_avro.py b/python/cudf/cudf/tests/test_avro.py deleted file mode 100644 index 2d89ae64752..00000000000 --- a/python/cudf/cudf/tests/test_avro.py +++ /dev/null @@ -1,108 +0,0 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. - -from io import BytesIO - -import fastavro as fa -import numpy as np -import pandas as pd -import pytest - -import cudf -from cudf.tests.utils import assert_eq - - -@pytest.fixture(scope="module") -def datadir(datadir): - return datadir / "avro" - - -@pytest.fixture -def path_or_buf(datadir): - fname = datadir / "example.avro" - try: - with open(fname, "rb") as f: - buffer = BytesIO(f.read()) - except Exception as excpr: - if type(excpr).__name__ == "FileNotFoundError": - pytest.skip(".parquet file is not found") - else: - print(type(excpr).__name__) - - def _make_path_or_buf(src): - if src == "filepath": - return str(fname) - if src == "pathobj": - return fname - if src == "bytes_io": - return buffer - if src == "bytes": - return buffer.getvalue() - if src == "url": - return fname.as_uri() - - raise ValueError("Invalid source type") - - yield _make_path_or_buf - - -@pytest.mark.filterwarnings("ignore:Using CPU") -@pytest.mark.parametrize("engine", ["cudf"]) -@pytest.mark.parametrize("inputfile, columns", [("example.avro", None)]) -def test_avro_reader_basic(datadir, inputfile, columns, engine): - path = datadir / inputfile - try: - reader = fa.reader(open(path, "rb")) - except Exception as excpr: - if type(excpr).__name__ == "FileNotFoundError": - pytest.skip(".avro file is not found") - else: - print(type(excpr).__name__) - - expect = pd.DataFrame.from_records(reader) - got = cudf.read_avro(path, engine=engine, columns=columns) - - # PANDAS uses NaN to represent invalid data, which forces float dtype - # For comparison, we can replace NaN with 0 and cast to the cuDF dtype - # FASTAVRO produces int64 columns from avro int32 dtype, so convert - # it back to int32 here - for col in expect.columns: - expect[col] = expect[col].astype(got[col].dtype) - - # fastavro appears to return columns in reverse order - # (actual order may depend on pandas/python version) - assert_eq(expect, got[expect.columns], check_categorical=False) - - -def test_empty_dataframe(tmpdir): - filepath = tmpdir + "empty.avro" - # write empty dataframe - with open(filepath, "wb") as out: - fa.writer(out, [], []) - - df = cudf.read_avro(filepath) - assert_eq(df, cudf.DataFrame()) - - -def test_no_data(tmpdir): - filepath = tmpdir + "no_data.avro" - schema = { - "name": "Weather", - "type": "record", - "fields": [ - {"name": "station", "type": "string"}, - {"name": "time", "type": "long"}, - {"name": "temp", "type": "int"}, - ], - } - parsed_schema = fa.parse_schema(schema) - with open(filepath, "wb") as out: - fa.writer(out, parsed_schema, []) - - df = cudf.read_avro(filepath) - - # fastavro returns an empty dataframe, need to verify manually - assert_eq(df.shape, (0, 3)) - dtypes = df.dtypes.values.tolist() - assert_eq(dtypes, [np.dtype("O"), np.dtype("int64"), np.dtype("int32")]) - col_names = df.columns.tolist() - assert_eq(col_names, ["station", "time", "temp"]) diff --git a/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py b/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py new file mode 100644 index 00000000000..a52ee937574 --- /dev/null +++ b/python/cudf/cudf/tests/test_avro_reader_fastavro_integration.py @@ -0,0 +1,209 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io + +import fastavro +import pytest + +import cudf +from cudf.tests.utils import assert_eq + + +def cudf_from_avro_util(schema, records): + + schema = [] if schema is None else fastavro.parse_schema(schema) + buffer = io.BytesIO() + fastavro.writer(buffer, schema, records) + buffer.seek(0) + return cudf.read_avro(buffer) + + +avro_type_params = [ + ("boolean", "bool"), + ("int", "int32"), + ("long", "int64"), + ("float", "float32"), + ("double", "float64"), + ("bytes", "str"), + ("string", "str"), +] + + +@pytest.mark.parametrize("avro_type, expected_dtype", avro_type_params) +@pytest.mark.parametrize("namespace", [None, "root_ns"]) +@pytest.mark.parametrize("nullable", [True, False]) +def test_can_detect_dtype_from_avro_type( + avro_type, expected_dtype, namespace, nullable +): + avro_type = avro_type if not nullable else ["null", avro_type] + + schema = fastavro.parse_schema( + { + "type": "record", + "name": "test", + "namespace": namespace, + "fields": [{"name": "prop", "type": avro_type}], + } + ) + + actual = cudf_from_avro_util(schema, []) + + expected = cudf.DataFrame( + {"prop": cudf.Series(None, None, expected_dtype)} + ) + + assert_eq(expected, actual) + + +@pytest.mark.parametrize("avro_type, expected_dtype", avro_type_params) +@pytest.mark.parametrize("namespace", [None, "root_ns"]) +@pytest.mark.parametrize("nullable", [True, False]) +def test_can_detect_dtype_from_avro_type_nested( + avro_type, expected_dtype, namespace, nullable +): + avro_type = avro_type if not nullable else ["null", avro_type] + + schema_leaf = { + "name": "leaf", + "type": "record", + "fields": [{"name": "prop3", "type": avro_type}], + } + + schema_child = { + "name": "child", + "type": "record", + "fields": [{"name": "prop2", "type": schema_leaf}], + } + + schema_root = { + "name": "root", + "type": "record", + "namespace": namespace, + "fields": [{"name": "prop1", "type": schema_child}], + } + + actual = cudf_from_avro_util(schema_root, []) + + col_name = "{ns}child.{ns}leaf.prop3".format( + ns="" if namespace is None else namespace + "." + ) + + expected = cudf.DataFrame( + {col_name: cudf.Series(None, None, expected_dtype)} + ) + + assert_eq(expected, actual) + + +@pytest.mark.parametrize( + "avro_type, cudf_type, avro_val, cudf_val", + [ + ("boolean", "bool", True, True), + ("boolean", "bool", False, False), + ("int", "int32", 1234, 1234), + ("long", "int64", 1234, 1234), + ("float", "float32", 12.34, 12.34), + ("double", "float64", 12.34, 12.34), + ("string", "str", "heyϴ", "heyϴ"), + # ("bytes", "str", "heyϴ", "heyϴ"), + ], +) +def test_can_parse_single_value(avro_type, cudf_type, avro_val, cudf_val): + + schema_root = { + "name": "root", + "type": "record", + "fields": [{"name": "prop", "type": ["null", avro_type]}], + } + + records = [ + {"prop": avro_val}, + ] + + actual = cudf_from_avro_util(schema_root, records) + + expected = cudf.DataFrame( + {"prop": cudf.Series(data=[cudf_val], dtype=cudf_type)} + ) + + assert_eq(expected, actual) + + +@pytest.mark.parametrize("avro_type, cudf_type", avro_type_params) +def test_can_parse_single_null(avro_type, cudf_type): + + schema_root = { + "name": "root", + "type": "record", + "fields": [{"name": "prop", "type": ["null", avro_type]}], + } + + records = [{"prop": None}] + + actual = cudf_from_avro_util(schema_root, records) + + expected = cudf.DataFrame( + {"prop": cudf.Series(data=[None], dtype=cudf_type)} + ) + + assert_eq(expected, actual) + + +@pytest.mark.parametrize("avro_type, cudf_type", avro_type_params) +def test_can_parse_no_data(avro_type, cudf_type): + + schema_root = { + "name": "root", + "type": "record", + "fields": [{"name": "prop", "type": ["null", avro_type]}], + } + + records = [] + + actual = cudf_from_avro_util(schema_root, records) + + expected = cudf.DataFrame({"prop": cudf.Series(data=[], dtype=cudf_type)}) + + assert_eq(expected, actual) + + +@pytest.mark.xfail( + reason="cudf avro reader is unable to parse zero-field metadata." +) +@pytest.mark.parametrize("avro_type, cudf_type", avro_type_params) +def test_can_parse_no_fields(avro_type, cudf_type): + + schema_root = { + "name": "root", + "type": "record", + "fields": [], + } + + records = [] + + actual = cudf_from_avro_util(schema_root, records) + + expected = cudf.DataFrame() + + assert_eq(expected, actual) + + +def test_can_parse_no_schema(): + + schema_root = None + records = [] + actual = cudf_from_avro_util(schema_root, records) + expected = cudf.DataFrame() + assert_eq(expected, actual)