Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable read_text with dask_cudf using byte_range #10407

Merged
merged 6 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/dask_cudf/dask_cudf/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

from dask.dataframe import from_delayed

import cudf
Expand All @@ -6,7 +8,7 @@
from . import backends
from .core import DataFrame, Series, concat, from_cudf, from_dask_dataframe
from .groupby import groupby_agg
from .io import read_csv, read_json, read_orc, to_orc
from .io import read_csv, read_json, read_orc, to_orc, read_text

try:
from .io import read_parquet
Expand Down
3 changes: 3 additions & 0 deletions python/dask_cudf/dask_cudf/io/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Copyright (c) 2018-2022, NVIDIA CORPORATION.

from .csv import read_csv
from .json import read_json
from .orc import read_orc, to_orc
from .text import read_text

try:
from .parquet import read_parquet, to_parquet
Expand Down
53 changes: 53 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/sample.pgn
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[Event "Rated Bullet tournament https://lichess.org/tournament/IaRkDsvp"]
ChrisJar marked this conversation as resolved.
Show resolved Hide resolved
[Site "https://lichess.org/r0cYFhsy"]
[White "GreatGig"]
[Black "hackattack"]
[Result "0-1"]
[UTCDate "2016.04.30"]
[UTCTime "22:00:03"]
[WhiteElo "1777"]
[BlackElo "1809"]
[WhiteRatingDiff "-11"]
[BlackRatingDiff "+11"]
[ECO "B01"]
[Opening "Scandinavian Defense: Mieses-Kotroc Variation"]
[TimeControl "60+0"]
[Termination "Time forfeit"]

1. e4 d5 2. exd5 Qxd5 3. Nc3 Qd8 4. d4 Nf6 5. Nf3 Bg4 6. h3 Bxf3 7. gxf3 c6 8. Bg2 Nbd7 9. Be3 e6 10. Qd2 Nd5 11. Nxd5 cxd5 12. O-O-O Be7 13. c3 Qc7 14. Kb1 O-O-O 15. f4 Kb8 16. Rhg1 Ka8 17. Bh1 g6 18. h4 Bxh4 19. f3 Be7 20. Qc2 Nf6 21. Bg2 Nh5 22. Bh3 Nxf4 23. Bxf4 Qxf4 24. Rdf1 Qd6 25. Rg4 Rdf8 26. Rfg1 f5 27. R4g2 Bf6 28. Rg3 Rfg8 29. Bf1 Rg7 30. Bd3 Rhg8 31. Qh2 Qb8 32. Qg2 Qc8 33. f4 Qc6 34. Qf2 Bh4 35. Rxg6 Bxf2 36. Rxg7 Rxg7 37. Rxg7 a6 38. Rg8+ Ka7 39. Rh8 Qd7 40. Rxh7 Qxh7 0-1

[Event "Rated Bullet tournament https://lichess.org/tournament/IaRkDsvp"]
[Site "https://lichess.org/s7lpBNiu"]
[White "kh447"]
[Black "blueskyminer23"]
[Result "0-1"]
[UTCDate "2016.04.30"]
[UTCTime "22:00:03"]
[WhiteElo "2025"]
[BlackElo "2046"]
[WhiteRatingDiff "-12"]
[BlackRatingDiff "+11"]
[ECO "D94"]
[Opening "Gruenfeld Defense: Three Knights Variation, Paris Variation"]
[TimeControl "60+0"]
[Termination "Time forfeit"]

1. d4 Nf6 2. c4 g6 3. Nc3 d5 4. Nf3 Bg7 5. e3 O-O 6. Bd3 c5 7. cxd5 Nxd5 8. O-O Nc6 9. Qe2 Bg4 10. dxc5 Nxc3 11. bxc3 Bxf3 12. gxf3 Bxc3 13. Rb1 Rb8 14. Kh1 Qd5 15. Rg1 Qxc5 16. Qc2 Qa5 17. Qb3 Bg7 18. Qc2 Ne5 19. Be4 Rfc8 20. Qe2 f5 21. Bc2 Qd5 22. e4 Qc6 23. Bb3+ Kh8 24. Bf4 fxe4 25. fxe4 Nd7 26. Rbc1 Qf6 27. Bxb8 Rxb8 28. Rg3 Ne5 29. Rcg1 h6 30. f4 Qxf4 31. Rxg6 Nxg6 32. Qg2 Qe5 33. Bc2 Kg8 34. Qe2 Rf8 35. Qf3 Rf7 36. Qe2 Rf6 37. Qg2 Re6 0-1

[Event "Rated Bullet tournament https://lichess.org/tournament/IaRkDsvp"]
[Site "https://lichess.org/9CTXrWUB"]
[White "Demis115"]
[Black "churrosagogo"]
[Result "1-0"]
[UTCDate "2016.04.30"]
[UTCTime "22:00:03"]
[WhiteElo "1944"]
[BlackElo "2007"]
[WhiteRatingDiff "+14"]
[BlackRatingDiff "-13"]
[ECO "C28"]
[Opening "Bishop's Opening: Vienna Hybrid"]
[TimeControl "60+0"]
[Termination "Normal"]

1. e4 e5 2. Nc3 Nc6 3. Bc4 Nf6 4. d3 Bc5 5. Be3 O-O 6. Bxc5 d6 7. Be3 a6 8. Nge2 b5 9. Bb3 b4 10. Nd5 Na5 11. Bg5 Nxb3 12. axb3 Be6 13. Bxf6 gxf6 14. Ng3 Bxd5 15. exd5 Qd7 16. O-O Kh8 17. Qe2 f5 18. Qh5 f4 19. Nf5 Rg8 20. Nh6 Rg6 21. Rfe1 Rag8 22. g3 f5 23. Qxf5 Qg7 24. Nf7+ Qxf7 25. Qxf7 fxg3 26. hxg3 R6g7 27. Qf6 1-0
20 changes: 20 additions & 0 deletions python/dask_cudf/dask_cudf/io/tests/test_text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) 2022, NVIDIA CORPORATION.

import os
import pytest

import cudf
import dask_cudf

import dask.dataframe as dd

cur_dir = os.path.dirname(__file__)
text_file = os.path.join(cur_dir, "sample.pgn")


@pytest.mark.parametrize("file", [text_file, [text_file]])
@pytest.mark.parametrize("chunksize", [12, "50 B", None])
def test_read_text(file, chunksize):
df1 = cudf.read_text(text_file, delimiter='"]')
df2 = dask_cudf.read_text(file, chunksize=chunksize, delimiter='"]')
dd.assert_eq(df1, df2, check_index=False)
57 changes: 57 additions & 0 deletions python/dask_cudf/dask_cudf/io/text.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2022, NVIDIA CORPORATION.

import os
from glob import glob

from dask.base import tokenize

import cudf

from dask.utils import apply, parse_bytes

import dask.dataframe as dd


def read_text(path, chunksize="256 MiB", **kwargs):

if isinstance(chunksize, str):
chunksize = parse_bytes(chunksize)

if isinstance(path, list):
filenames = path
elif isinstance(path, str):
filenames = sorted(glob(path))
elif hasattr(path, "__fspath__"):
filenames = sorted(glob(path.__fspath__()))
else:
raise TypeError(f"Path type not understood:{type(path)}")

if not filenames:
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

name = "read-text-" + tokenize(path, tokenize, **kwargs)

if chunksize:
dsk = {}
i = 0
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, chunksize):
kwargs1 = kwargs.copy()
kwargs1["byte_range"] = (
start,
chunksize - 1,
) # specify which chunk of the file we care about

dsk[(name, i)] = (apply, cudf.read_text, [fn], kwargs1)
i += 1
else:
dsk = {
(name, i): (apply, cudf.read_text, [fn], kwargs)
for i, fn in enumerate(filenames)
}

meta = cudf.Series([], dtype="O")
divisions = [None] * (len(dsk) + 1)
return dd.core.new_dd_object(dsk, name, meta, divisions)