diff --git a/.github/workflows/auto-merge.yml b/.github/workflows/auto-merge.yml index b1c3c2b32b..8a71a90f3b 100755 --- a/.github/workflows/auto-merge.yml +++ b/.github/workflows/auto-merge.yml @@ -18,12 +18,12 @@ name: auto-merge HEAD to BASE on: pull_request_target: branches: - - branch-24.06 + - branch-24.08 types: [closed] env: - HEAD: branch-24.06 - BASE: branch-24.08 + HEAD: branch-24.08 + BASE: branch-24.10 jobs: auto-merge: diff --git a/.github/workflows/blossom-ci.yml b/.github/workflows/blossom-ci.yml index 33ccf50ea8..a17fbddd83 100644 --- a/.github/workflows/blossom-ci.yml +++ b/.github/workflows/blossom-ci.yml @@ -33,40 +33,43 @@ jobs: args: ${{ env.args }} # This job only runs for pull request comments - if: contains( '\ - abellina,\ - anfeng,\ - firestarman,\ - GaryShen2008,\ - jlowe,\ - mythrocks,\ - nartal1,\ - nvdbaranec,\ - NvTimLiu,\ - razajafri,\ - revans2,\ - rwlee,\ - sameerz,\ - tgravescs,\ - wbo4958,\ - wjxiz1992,\ - sperlingxx,\ - YanxuanLiu,\ - hyperbolic2346,\ - gerashegalov,\ - ttnghia,\ - nvliyuan,\ - res-life,\ - HaoYang670,\ - NVnavkumar,\ - yinqingh,\ - thirtiseven,\ - parthosa,\ - liurenjie1024,\ - binmahone,\ - pmattione-nvidia,\ - Feng-Jiang28,\ - ', format('{0},', github.actor)) && github.event.comment.body == 'build' + if: | + github.event.comment.body == 'build' && + ( + github.actor == 'abellina' || + github.actor == 'anfeng' || + github.actor == 'firestarman' || + github.actor == 'GaryShen2008' || + github.actor == 'jlowe' || + github.actor == 'mythrocks' || + github.actor == 'nartal1' || + github.actor == 'nvdbaranec' || + github.actor == 'NvTimLiu' || + github.actor == 'razajafri' || + github.actor == 'revans2' || + github.actor == 'rwlee' || + github.actor == 'sameerz' || + github.actor == 'tgravescs' || + github.actor == 'wbo4958' || + github.actor == 'wjxiz1992' || + github.actor == 'sperlingxx' || + github.actor == 'YanxuanLiu' || + github.actor == 'hyperbolic2346' || + github.actor == 'gerashegalov' || + github.actor == 'ttnghia' || + github.actor == 'nvliyuan' || + github.actor == 'res-life' || + github.actor == 'HaoYang670' || + github.actor == 'NVnavkumar' || + github.actor == 'yinqingh' || + github.actor == 'thirtiseven' || + github.actor == 'parthosa' || + github.actor == 'liurenjie1024' || + github.actor == 'binmahone' || + github.actor == 'pmattione-nvidia' || + github.actor == 'Feng-Jiang28' || + github.actor == 'pxLi' + ) steps: - name: Check if comment is issued by authorized person run: blossom-ci @@ -81,7 +84,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Checkout code - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: ${{ fromJson(needs.Authorization.outputs.args).repo }} ref: ${{ fromJson(needs.Authorization.outputs.args).ref }} @@ -89,7 +92,7 @@ jobs: # repo specific steps - name: Setup java - uses: actions/setup-java@v3 + uses: actions/setup-java@v4 with: distribution: adopt java-version: 8 diff --git a/.gitmodules b/.gitmodules index 12b07c5b18..862e1ef3e6 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,4 +1,4 @@ [submodule "thirdparty/cudf"] path = thirdparty/cudf url = https://github.com/rapidsai/cudf.git - branch = branch-24.06 + branch = branch-24.08 diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1ada0b474b..7f83e2169b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -61,15 +61,21 @@ the Docker container. The script passes all of its arguments onto the Maven command run inside the Docker container, so it should be invoked as one would invoke Maven, e.g.: `build/build-in-docker clean package` +#### Using spark-rapids-jni Docker Container with other Repos + +Spark RAPIDS project spans multiple repos. Some issues are discovered in +spark-rapids-jni but they need to be made easily reproducible in the cudf repo + +To this end export WORKDIR with the path pointing to a different repo + +``` +export WORKDIR=~/gits/rapidsai/cudf +~/gits/NVIDIA/spark-rapids-jni/build/run-in-docker head README.md +``` + ### cudf Submodule and Build [RAPIDS cuDF](https://github.com/rapidsai/cudf) is being used as a submodule in this project. -Due to the lengthy build of libcudf, it is **not cleaned** during a normal Maven clean phase -unless built using `build/build-in-docker`. `build/build-in-docker` uses `ccache` by default -unless CCACHE_DISABLE=1 is set in the environment. - -`-Dlibcudf.clean.skip=false` can also be specified on the Maven command-line to force -libcudf to be cleaned during the Maven clean phase. Currently libcudf is only configured once and the build relies on cmake to re-configure as needed. This is because libcudf currently is rebuilding almost entirely when it is configured with the same @@ -93,7 +99,6 @@ to control aspects of the build: | `BUILD_BENCHMARKS` | Compile benchmarks | OFF | | `BUILD_FAULTINJ` | Compile fault injection | ON | | `libcudf.build.configure` | Force libcudf build to configure | false | -| `libcudf.clean.skip` | Whether to skip cleaning libcudf build | true | | `submodule.check.skip` | Whether to skip checking git submodules | false | @@ -160,7 +165,7 @@ $ ./build/build-in-docker install ... ``` Now cd to ~/repos/NVIDIA/spark-rapids and build with one of the options from -[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-24.06/CONTRIBUTING.md#building-from-source). +[spark-rapids instructions](https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/CONTRIBUTING.md#building-from-source). ```bash $ ./build/buildall diff --git a/build-libcudf.xml b/build-libcudf.xml deleted file mode 100644 index 765f50e8a0..0000000000 --- a/build-libcudf.xml +++ /dev/null @@ -1,65 +0,0 @@ - - - - - Configures and builds the libcudf library from the cudf submodule. - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/build/run-in-docker b/build/run-in-docker index 81152a1d9d..912feaa03c 100755 --- a/build/run-in-docker +++ b/build/run-in-docker @@ -20,9 +20,11 @@ set -e -# Base paths relative to this script's location -SCRIPTDIR=$(cd $(dirname $0); pwd) -REPODIR=$SCRIPTDIR/.. +REPODIR_REL=$(git rev-parse --show-toplevel) +REPODIR=$(realpath "$REPODIR_REL") +GIT_COMMON_DIR_REL=$(git rev-parse --git-common-dir) +GIT_COMMON_DIR=$(realpath "$GIT_COMMON_DIR_REL") +WORKDIR=${WORKDIR:-$REPODIR} CUDA_VERSION=${CUDA_VERSION:-11.8.0} DOCKER_CMD=${DOCKER_CMD:-docker} @@ -63,11 +65,11 @@ $DOCKER_CMD run $DOCKER_GPU_OPTS $DOCKER_RUN_EXTRA_ARGS -u $(id -u):$(id -g) --r -v "/etc/passwd:/etc/passwd:ro" \ -v "/etc/shadow:/etc/shadow:ro" \ -v "/etc/sudoers.d:/etc/sudoers.d:ro" \ - -v "$REPODIR:$REPODIR:rw" \ + -v "$GIT_COMMON_DIR:$GIT_COMMON_DIR:rw" \ + -v "$WORKDIR:$WORKDIR:rw" \ -v "$LOCAL_CCACHE_DIR:$LOCAL_CCACHE_DIR:rw" \ -v "$LOCAL_MAVEN_REPO:$LOCAL_MAVEN_REPO:rw" \ - --workdir "$REPODIR" \ - -e CCACHE_DISABLE \ + --workdir "$WORKDIR" \ -e CCACHE_DIR="$LOCAL_CCACHE_DIR" \ -e CMAKE_C_COMPILER_LAUNCHER="ccache" \ -e CMAKE_CXX_COMPILER_LAUNCHER="ccache" \ diff --git a/ci/Jenkinsfile.premerge b/ci/Jenkinsfile.premerge index 0a00eb6f1b..fb7c3cd0de 100644 --- a/ci/Jenkinsfile.premerge +++ b/ci/Jenkinsfile.premerge @@ -57,7 +57,8 @@ pipeline { parameters { string(name: 'PARALLEL_LEVEL', defaultValue: '18', description: 'Parallel build cudf cpp with -DCPP_PARALLEL_LEVEL') - string(name: 'REF', defaultValue: '', + // Put a default value for REF to avoid error when running the pipeline manually + string(name: 'REF', defaultValue: 'main', description: 'Merged commit of specific PR') string(name: 'GITHUB_DATA', defaultValue: '', description: 'Json-formatted github data from upstream blossom-ci') diff --git a/ci/check-cuda-dependencies.sh b/ci/check-cuda-dependencies.sh new file mode 100644 index 0000000000..9d988bedae --- /dev/null +++ b/ci/check-cuda-dependencies.sh @@ -0,0 +1,35 @@ +#!/bin/bash +# +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# common script to help check if packaged *.so files have dynamical link to CUDA Runtime + +set -exo pipefail + +jar_path=$1 +tmp_path=/tmp/"jni-$(date "+%Y%m%d%H%M%S")" +unzip -j "${jar_path}" "*64/Linux/*.so" -d "${tmp_path}" + +find "$tmp_path" -type f -name "*.so" | while read -r so_file; do + # Check if *.so file has a dynamic link to CUDA Runtime + if objdump -p "$so_file" | grep NEEDED | grep -qi cudart; then + echo "Dynamic link to CUDA Runtime found in $so_file..." + ldd "$so_file" + exit 1 + else + echo "No dynamic link to CUDA Runtime found in $so_file" + fi +done diff --git a/ci/nightly-build.sh b/ci/nightly-build.sh index 8a3c2dbacf..267b26efdf 100755 --- a/ci/nightly-build.sh +++ b/ci/nightly-build.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ USE_GDS=${USE_GDS:-ON} USE_SANITIZER=${USE_SANITIZER:-ON} BUILD_FAULTINJ=${BUILD_FAULTINJ:-ON} ARM64=${ARM64:-false} +artifact_suffix="${CUDA_VER}" profiles="source-javadoc" if [ "${ARM64}" == "true" ]; then @@ -36,6 +37,7 @@ if [ "${ARM64}" == "true" ]; then USE_GDS="OFF" USE_SANITIZER="ON" BUILD_FAULTINJ="OFF" + artifact_suffix="${artifact_suffix}-arm64" fi ${MVN} clean package ${MVN_MIRROR} \ @@ -43,5 +45,8 @@ ${MVN} clean package ${MVN_MIRROR} \ -DCPP_PARALLEL_LEVEL=${PARALLEL_LEVEL} \ -Dlibcudf.build.configure=true \ -DUSE_GDS=${USE_GDS} -Dtest=*,!CuFileTest,!CudaFatalTest,!ColumnViewNonEmptyNullsTest \ - -DBUILD_TESTS=ON -DBUILD_FAULTINJ=${BUILD_FAULTINJ} -Dcuda.version=$CUDA_VER \ + -DBUILD_TESTS=ON -DBUILD_BENCHMARKS=ON -DBUILD_FAULTINJ=${BUILD_FAULTINJ} -Dcuda.version=$CUDA_VER \ -DUSE_SANITIZER=${USE_SANITIZER} + +build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout) +. ci/check-cuda-dependencies.sh "target/${build_name}-${artifact_suffix}.jar" diff --git a/ci/premerge-build.sh b/ci/premerge-build.sh index e3adc10b3e..7297a9ecdc 100755 --- a/ci/premerge-build.sh +++ b/ci/premerge-build.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,4 +27,8 @@ ${MVN} verify ${MVN_MIRROR} \ -DCPP_PARALLEL_LEVEL=${PARALLEL_LEVEL} \ -Dlibcudf.build.configure=true \ -DUSE_GDS=ON -Dtest=*,!CuFileTest,!CudaFatalTest,!ColumnViewNonEmptyNullsTest \ - -DBUILD_TESTS=ON + -DBUILD_TESTS=ON -DBUILD_BENCHMARKS=ON + +build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout) +cuda_version=$(${MVN} help:evaluate -Dexpression=cuda.version -q -DforceStdout) +. ci/check-cuda-dependencies.sh "target/${build_name}-${cuda_version}.jar" diff --git a/ci/submodule-sync.sh b/ci/submodule-sync.sh index 1888696ba5..a889d86eb0 100755 --- a/ci/submodule-sync.sh +++ b/ci/submodule-sync.sh @@ -88,8 +88,13 @@ else echo "Test failed, will update the result" fi +build_name=$(${MVN} help:evaluate -Dexpression=project.build.finalName -q -DforceStdout) +cuda_version=$(${MVN} help:evaluate -Dexpression=cuda.version -q -DforceStdout) +. ci/check-cuda-dependencies.sh "target/${build_name}-${cuda_version}.jar" + +LIBCUDF_BUILD_PATH=$(${MVN} help:evaluate -Dexpression=libcudf.build.path -q -DforceStdout) # Extract the rapids-cmake sha1 that we need to pin too -rapids_cmake_sha=$(git -C thirdparty/cudf/cpp/build/_deps/rapids-cmake-src/ rev-parse HEAD) +rapids_cmake_sha=$(git -C ${LIBCUDF_BUILD_PATH}/_deps/rapids-cmake-src/ rev-parse HEAD) echo "Update rapids-cmake pinned SHA1 to ${rapids_cmake_sha}" echo "${rapids_cmake_sha}" > thirdparty/cudf-pins/rapids-cmake.sha diff --git a/pom.xml b/pom.xml index 566c06b934..70920a5316 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ com.nvidia spark-rapids-jni - 24.06.0 + 24.08.0 jar RAPIDS Accelerator JNI for Apache Spark @@ -81,8 +81,11 @@ OFF OFF + Release OFF OFF + OFF + OFF ON ON false @@ -93,16 +96,16 @@ ${project.basedir}/thirdparty/cudf-pins/ 3.2.4 5.8.1 - ${cudf.path}/cpp/build + ${project.build.directory}/libcudf/cmake-build/ false - true + false ${project.build.directory}/libcudf-install pinned ${project.build.directory}/libcudfjni 1.8 1.8 2.25.0 - ${project.build.directory}/cmake-build + ${project.build.directory}/jni/cmake-build 1.10.0 UTF-8 1.7.30 @@ -322,11 +325,12 @@ failonerror="true" executable="cmake"> - + + ${skipTests} run @@ -392,18 +396,49 @@ build-libcudf validate - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -429,6 +464,8 @@ + + + + + + @@ -461,6 +502,8 @@ + + #include -#include +#include #include static void bloom_filter_put(nvbench::state& state) diff --git a/src/main/cpp/faultinj/faultinj.cu b/src/main/cpp/faultinj/faultinj.cu index 13065a81ed..fcb4b3a12d 100644 --- a/src/main/cpp/faultinj/faultinj.cu +++ b/src/main/cpp/faultinj/faultinj.cu @@ -136,12 +136,12 @@ CUptiResult cuptiInitialize(void) return status; } -__global__ void faultInjectorKernelAssert(void) +__global__ static void faultInjectorKernelAssert(void) { assert(0 && "faultInjectorKernelAssert triggered"); } -__global__ void faultInjectorKernelTrap(void) { asm("trap;"); } +__global__ static void faultInjectorKernelTrap(void) { asm("trap;"); } boost::optional lookupConfig( boost::optional domainConfigs, diff --git a/src/main/cpp/profiler/ProfilerJni.cpp b/src/main/cpp/profiler/ProfilerJni.cpp index 1271b89d7b..87e01b2e92 100644 --- a/src/main/cpp/profiler/ProfilerJni.cpp +++ b/src/main/cpp/profiler/ProfilerJni.cpp @@ -184,7 +184,8 @@ struct free_buffer_tracker { void writer_thread_process(JavaVM* vm, jobject j_writer, size_t buffer_size, - size_t flush_threshold); + size_t flush_threshold, + bool async_alloc_capture); struct subscriber_state { CUpti_SubscriberHandle subscriber_handle; @@ -363,11 +364,16 @@ void setup_nvtx_env(JNIEnv* env, jstring j_lib_path) } // Main processing loop for the background writer thread -void writer_thread_process(JavaVM* vm, jobject j_writer, size_t buffer_size, size_t flush_threshold) +void writer_thread_process(JavaVM* vm, + jobject j_writer, + size_t buffer_size, + size_t flush_threshold, + bool async_alloc_capture) { try { JNIEnv* env = attach_to_jvm(vm); - profiler_serializer serializer(env, j_writer, buffer_size, flush_threshold); + profiler_serializer serializer( + env, j_writer, buffer_size, flush_threshold, async_alloc_capture); auto buffer = State->completed_buffers.get(); while (buffer) { serializer.process_cupti_buffer(buffer->data(), buffer->valid_size()); @@ -419,12 +425,14 @@ extern "C" { using namespace spark_rapids_jni::profiler; -JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIEnv* env, - jclass, - jstring j_lib_path, - jobject j_writer, - jlong write_buffer_size, - jint flush_period_msec) +JNIEXPORT void JNICALL +Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIEnv* env, + jclass, + jstring j_lib_path, + jobject j_writer, + jlong write_buffer_size, + jint flush_period_msec, + bool async_alloc_capture) { try { setup_nvtx_env(env, j_lib_path); @@ -432,9 +440,13 @@ JNIEXPORT void JNICALL Java_com_nvidia_spark_rapids_jni_Profiler_nativeInit(JNIE auto writer = static_cast(env->NewGlobalRef(j_writer)); if (!writer) { throw std::runtime_error("Unable to create a global reference to writer"); } State = new subscriber_state(writer, write_buffer_size); - State->writer_thread = std::thread( - writer_thread_process, get_jvm(env), writer, write_buffer_size, write_buffer_size); - auto rc = cuptiSubscribe(&State->subscriber_handle, callback_handler, nullptr); + State->writer_thread = std::thread(writer_thread_process, + get_jvm(env), + writer, + write_buffer_size, + write_buffer_size, + async_alloc_capture); + auto rc = cuptiSubscribe(&State->subscriber_handle, callback_handler, nullptr); check_cupti(rc, "Error initializing CUPTI"); rc = cuptiEnableCallback(1, State->subscriber_handle, diff --git a/src/main/cpp/profiler/profiler_serializer.cpp b/src/main/cpp/profiler/profiler_serializer.cpp index b47ff234ad..84729c9dd9 100644 --- a/src/main/cpp/profiler/profiler_serializer.cpp +++ b/src/main/cpp/profiler/profiler_serializer.cpp @@ -197,11 +197,13 @@ ShmemLimitConfig to_shmem_limit_config(CUpti_FuncShmemLimitConfig c) } // anonymous namespace -profiler_serializer::profiler_serializer(JNIEnv* env, - jobject writer, - size_t buffer_size, - size_t flush_threshold) - : env_(env), j_writer_(writer), flush_threshold_(flush_threshold), fbb_(buffer_size) +profiler_serializer::profiler_serializer( + JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold, bool capture_allocs) + : env_(env), + j_writer_(writer), + flush_threshold_(flush_threshold), + fbb_(buffer_size), + capture_allocs_(capture_allocs) { auto writer_class = env->GetObjectClass(writer); if (!writer_class) { throw std::runtime_error("Failed to locate class of data writer"); } @@ -322,6 +324,10 @@ void profiler_serializer::flush() void profiler_serializer::process_api_activity(CUpti_ActivityAPI const* r) { + if (r->start == 0 || r->end == 0) { + // Ignore records with bad timestamps + return; + } auto api_kind = ApiKind_Runtime; if (r->kind == CUPTI_ACTIVITY_KIND_DRIVER) { api_kind = ApiKind_Driver; @@ -332,6 +338,14 @@ void profiler_serializer::process_api_activity(CUpti_ActivityAPI const* r) case CUPTI_RUNTIME_TRACE_CBID_cudaGetLastError_v3020: case CUPTI_RUNTIME_TRACE_CBID_cudaPeekAtLastError_v3020: case CUPTI_RUNTIME_TRACE_CBID_cudaDeviceGetAttribute_v5000: return; + case CUPTI_RUNTIME_TRACE_CBID_cudaMallocAsync_v11020: + case CUPTI_RUNTIME_TRACE_CBID_cudaMallocAsync_ptsz_v11020: + case CUPTI_RUNTIME_TRACE_CBID_cudaMallocFromPoolAsync_v11020: + case CUPTI_RUNTIME_TRACE_CBID_cudaMallocFromPoolAsync_ptsz_v11020: + case CUPTI_RUNTIME_TRACE_CBID_cudaFreeAsync_v11020: + case CUPTI_RUNTIME_TRACE_CBID_cudaFreeAsync_ptsz_v11020: + if (capture_allocs_) { break; } + return; default: break; } } else { @@ -393,6 +407,10 @@ void profiler_serializer::process_dropped_records(size_t num_dropped) void profiler_serializer::process_kernel(CUpti_ActivityKernel8 const* r) { + if (r->start == 0 || r->end == 0) { + // Ignore records with invalid timestamps + return; + } auto name = fbb_.CreateSharedString(r->name); KernelActivityBuilder kab(fbb_); kab.add_requested(r->cacheConfig.config.requested); @@ -443,6 +461,10 @@ void profiler_serializer::process_kernel(CUpti_ActivityKernel8 const* r) void profiler_serializer::process_marker_activity(CUpti_ActivityMarker2 const* r) { + if (r->timestamp == 0) { + // Ignore records with invalid timestamps + return; + } auto object_id = add_object_id(fbb_, r->objectKind, r->objectId); auto has_name = r->name != nullptr; auto has_domain = r->name != nullptr; @@ -462,6 +484,10 @@ void profiler_serializer::process_marker_activity(CUpti_ActivityMarker2 const* r void profiler_serializer::process_marker_data(CUpti_ActivityMarkerData const* r) { + if (r->flags == 0 && r->color == 0 && r->category == 0) { + // Ignore uninteresting marker data records + return; + } MarkerDataBuilder mdb(fbb_); mdb.add_flags(marker_flags_to_fb(r->flags)); mdb.add_id(r->id); @@ -472,6 +498,10 @@ void profiler_serializer::process_marker_data(CUpti_ActivityMarkerData const* r) void profiler_serializer::process_memcpy(CUpti_ActivityMemcpy5 const* r) { + if (r->start == 0 || r->end == 0) { + // Ignore records with invalid timestamps + return; + } MemcpyActivityBuilder mab(fbb_); mab.add_copy_kind(to_memcpy_kind(r->copyKind)); mab.add_src_kind(to_memory_kind(r->srcKind)); @@ -494,6 +524,10 @@ void profiler_serializer::process_memcpy(CUpti_ActivityMemcpy5 const* r) void profiler_serializer::process_memset(CUpti_ActivityMemset4 const* r) { + if (r->start == 0 || r->end == 0) { + // Ignore records with invalid timestamps + return; + } MemsetActivityBuilder mab(fbb_); mab.add_value(r->value); mab.add_bytes(r->bytes); @@ -514,6 +548,10 @@ void profiler_serializer::process_memset(CUpti_ActivityMemset4 const* r) void profiler_serializer::process_overhead(CUpti_ActivityOverhead const* r) { + if (r->start == 0 || r->end == 0) { + // Ignore records with invalid timestamps + return; + } auto object_id = add_object_id(fbb_, r->objectKind, r->objectId); OverheadActivityBuilder oab(fbb_); oab.add_overhead_kind(to_overhead_kind(r->overheadKind)); diff --git a/src/main/cpp/profiler/profiler_serializer.hpp b/src/main/cpp/profiler/profiler_serializer.hpp index 1feebf1b96..861cf9d1ab 100644 --- a/src/main/cpp/profiler/profiler_serializer.hpp +++ b/src/main/cpp/profiler/profiler_serializer.hpp @@ -29,7 +29,8 @@ namespace spark_rapids_jni::profiler { // Serializes profile data as flatbuffers struct profiler_serializer { - profiler_serializer(JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold); + profiler_serializer( + JNIEnv* env, jobject writer, size_t buffer_size, size_t flush_threshold, bool capture_allocs); void process_cupti_buffer(uint8_t* buffer, size_t valid_size); void flush(); @@ -51,6 +52,7 @@ struct profiler_serializer { jmethodID j_write_method_; jobject j_writer_; size_t flush_threshold_; + bool capture_allocs_; flatbuffers::FlatBufferBuilder fbb_; std::vector> api_offsets_; std::vector> device_offsets_; diff --git a/src/main/cpp/src/CaseWhenJni.cpp b/src/main/cpp/src/CaseWhenJni.cpp new file mode 100644 index 0000000000..2f99e85b4b --- /dev/null +++ b/src/main/cpp/src/CaseWhenJni.cpp @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "case_when.hpp" +#include "cudf_jni_apis.hpp" + +extern "C" { + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_CaseWhen_selectFirstTrueIndex( + JNIEnv* env, jclass, jlongArray bool_cols) +{ + JNI_NULL_CHECK(env, bool_cols, "array of column handles is null", 0); + try { + cudf::jni::auto_set_device(env); + cudf::jni::native_jpointerArray n_cudf_bool_columns(env, bool_cols); + auto bool_column_views = n_cudf_bool_columns.get_dereferenced(); + return cudf::jni::release_as_jlong( + spark_rapids_jni::select_first_true_index(cudf::table_view(bool_column_views))); + } + CATCH_STD(env, 0); +} +} diff --git a/src/main/cpp/src/DecimalUtilsJni.cpp b/src/main/cpp/src/DecimalUtilsJni.cpp index 6c7c1cc781..c63b84d92e 100644 --- a/src/main/cpp/src/DecimalUtilsJni.cpp +++ b/src/main/cpp/src/DecimalUtilsJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,4 +110,24 @@ JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_DecimalUtils_subtr CATCH_STD(env, 0); } +JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_DecimalUtils_floatingPointToDecimal( + JNIEnv* env, jclass, jlong j_input, jint output_type_id, jint precision, jint decimal_scale) +{ + JNI_NULL_CHECK(env, j_input, "j_input is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const input = reinterpret_cast(j_input); + cudf::jni::native_jlongArray output(env, 2); + + auto [casted_col, has_failure] = cudf::jni::floating_point_to_decimal( + *input, + cudf::data_type{static_cast(output_type_id), static_cast(decimal_scale)}, + precision); + output[0] = cudf::jni::release_as_jlong(std::move(casted_col)); + output[1] = static_cast(has_failure); + return output.get_jArray(); + } + CATCH_STD(env, 0); +} + } // extern "C" diff --git a/src/main/cpp/src/HashJni.cpp b/src/main/cpp/src/HashJni.cpp index 9e556cdd2d..c0adf38686 100644 --- a/src/main/cpp/src/HashJni.cpp +++ b/src/main/cpp/src/HashJni.cpp @@ -16,7 +16,7 @@ #include "cudf_jni_apis.hpp" #include "dtype_utils.hpp" -#include "hash.cuh" +#include "hash.hpp" #include "jni_utils.hpp" extern "C" { @@ -52,4 +52,19 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_xxhash64(JNIEnv* e } CATCH_STD(env, 0); } + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_hiveHash(JNIEnv* env, + jclass, + jlongArray column_handles) +{ + JNI_NULL_CHECK(env, column_handles, "array of column handles is null", 0); + + try { + cudf::jni::auto_set_device(env); + auto column_views = + cudf::jni::native_jpointerArray{env, column_handles}.get_dereferenced(); + return cudf::jni::release_as_jlong(spark_rapids_jni::hive_hash(cudf::table_view{column_views})); + } + CATCH_STD(env, 0); +} } diff --git a/src/main/cpp/src/JSONUtilsJni.cpp b/src/main/cpp/src/JSONUtilsJni.cpp index 73b932d4b9..0da20f53f9 100644 --- a/src/main/cpp/src/JSONUtilsJni.cpp +++ b/src/main/cpp/src/JSONUtilsJni.cpp @@ -24,6 +24,7 @@ using path_instruction_type = spark_rapids_jni::path_instruction_type; extern "C" { + JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObject( JNIEnv* env, jclass, jlong input_column, jobjectArray path_instructions) { @@ -67,4 +68,66 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObject } CATCH_STD(env, 0); } + +JNIEXPORT jlongArray JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObjectMultiplePaths( + JNIEnv* env, jclass, jlong j_input, jobjectArray j_paths, jintArray j_path_offsets) +{ + JNI_NULL_CHECK(env, j_input, "j_input column is null", 0); + JNI_NULL_CHECK(env, j_paths, "j_paths is null", 0); + JNI_NULL_CHECK(env, j_path_offsets, "j_path_offsets is null", 0); + + using path_type = std::vector>; + + try { + cudf::jni::auto_set_device(env); + + auto const path_offsets = cudf::jni::native_jintArray(env, j_path_offsets).to_vector(); + CUDF_EXPECTS(path_offsets.size() > 1, "Invalid path offsets."); + auto const num_paths = path_offsets.size() - 1; + std::vector paths(num_paths); + + for (std::size_t i = 0; i < num_paths; ++i) { + auto const path_size = path_offsets[i + 1] - path_offsets[i]; + auto path = path_type{}; + path.reserve(path_size); + for (int j = path_offsets[i]; j < path_offsets[i + 1]; ++j) { + jobject instruction = env->GetObjectArrayElement(j_paths, j); + JNI_NULL_CHECK(env, instruction, "path_instruction is null", 0); + jclass instruction_class = env->GetObjectClass(instruction); + JNI_NULL_CHECK(env, instruction_class, "instruction_class is null", 0); + + jfieldID field_id = env->GetFieldID(instruction_class, "type", "I"); + JNI_NULL_CHECK(env, field_id, "field_id is null", 0); + jint type = env->GetIntField(instruction, field_id); + path_instruction_type instruction_type = static_cast(type); + + field_id = env->GetFieldID(instruction_class, "name", "Ljava/lang/String;"); + JNI_NULL_CHECK(env, field_id, "field_id is null", 0); + jstring name = (jstring)env->GetObjectField(instruction, field_id); + JNI_NULL_CHECK(env, name, "name is null", 0); + const char* name_str = env->GetStringUTFChars(name, JNI_FALSE); + + field_id = env->GetFieldID(instruction_class, "index", "J"); + JNI_NULL_CHECK(env, field_id, "field_id is null", 0); + jlong index = env->GetLongField(instruction, field_id); + + path.emplace_back(instruction_type, name_str, index); + env->ReleaseStringUTFChars(name, name_str); + } + + paths[i] = std::move(path); + } + + auto const input_cv = reinterpret_cast(j_input); + auto output = + spark_rapids_jni::get_json_object_multiple_paths(cudf::strings_column_view{*input_cv}, paths); + + auto out_handles = cudf::jni::native_jlongArray(env, output.size()); + std::transform(output.begin(), output.end(), out_handles.begin(), [](auto& col) { + return cudf::jni::release_as_jlong(col); + }); + return out_handles.get_jArray(); + } + CATCH_STD(env, 0); +} } diff --git a/src/main/cpp/src/SubStringIndexJni.cpp b/src/main/cpp/src/SubStringIndexJni.cpp new file mode 100644 index 0000000000..1e53166ab7 --- /dev/null +++ b/src/main/cpp/src/SubStringIndexJni.cpp @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cudf_jni_apis.hpp" +#include "substring_index.hpp" + +extern "C" { + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_GpuSubstringIndexUtils_substringIndex( + JNIEnv* env, jclass, jlong strings_handle, jlong delimiter, jint count) +{ + JNI_NULL_CHECK(env, strings_handle, "strings column handle is null", 0); + JNI_NULL_CHECK(env, delimiter, "delimiter scalar handle is null", 0); + try { + cudf::jni::auto_set_device(env); + auto const input = reinterpret_cast(strings_handle); + auto const strings_column = cudf::strings_column_view{*input}; + cudf::string_scalar* ss_scalar = reinterpret_cast(delimiter); + return cudf::jni::release_as_jlong( + spark_rapids_jni::substring_index(strings_column, *ss_scalar, count)); + } + CATCH_STD(env, 0); +} +} // extern "C" diff --git a/src/main/cpp/src/bloom_filter.cu b/src/main/cpp/src/bloom_filter.cu index 5dfdd582ef..da4e3c5cb9 100644 --- a/src/main/cpp/src/bloom_filter.cu +++ b/src/main/cpp/src/bloom_filter.cu @@ -60,10 +60,10 @@ __device__ inline std::pair gpu_get_hash_ma } template -__global__ void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, - cudf::size_type bloom_filter_bits, - cudf::column_device_view input, - cudf::size_type num_hashes) +CUDF_KERNEL void gpu_bloom_filter_put(cudf::bitmask_type* const bloom_filter, + cudf::size_type bloom_filter_bits, + cudf::column_device_view input, + cudf::size_type num_hashes) { size_t const tid = threadIdx.x + blockIdx.x * blockDim.x; if (tid >= input.size()) { return; } diff --git a/src/main/cpp/src/case_when.cu b/src/main/cpp/src/case_when.cu new file mode 100644 index 0000000000..9857403898 --- /dev/null +++ b/src/main/cpp/src/case_when.cu @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "case_when.hpp" + +#include +#include +#include +#include + +#include + +namespace spark_rapids_jni { +namespace detail { +namespace { + +/** + * Select the column index for the first true in bool columns for the specified row + */ +struct select_first_true_fn { + // bool columns stores the results of executing `when` expressions + cudf::table_device_view const d_table; + + /** + * The number of bool columns is the size of case when branches. + * Note: reuturned index may be out of bound, valid bound is [0, col_num) + * When returning col_num index, it means final result is NULL value or ELSE value. + * + * e.g.: + * CASE WHEN 'a' THEN 'A' END + * The number of bool columns is 1 + * The number of scalars is 1 + * Max index is 1 which means using NULL(all when exprs are false). + * CASE WHEN 'a' THEN 'A' ELSE '_' END + * The number of bool columns is 1 + * The number of scalars is 2 + * Max index is also 1 which means using else value '_' + */ + __device__ cudf::size_type operator()(std::size_t row_idx) const + { + auto col_num = d_table.num_columns(); + for (auto col_idx = 0; col_idx < col_num; col_idx++) { + auto const& col = d_table.column(col_idx); + if (!col.is_null(row_idx) && col.element(row_idx)) { + // Predicate is true and not null + return col_idx; + } + } + return col_num; + } +}; + +} // anonymous namespace + +std::unique_ptr select_first_true_index(cudf::table_view const& when_bool_columns, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + // checks + auto const num_columns = when_bool_columns.num_columns(); + CUDF_EXPECTS(num_columns > 0, "At least one column must be specified"); + auto const row_count = when_bool_columns.num_rows(); + if (row_count == 0) { // empty begets empty + return cudf::make_empty_column(cudf::type_id::INT32); + } + // make output column + auto ret = cudf::make_numeric_column( + cudf::data_type{cudf::type_id::INT32}, row_count, cudf::mask_state::UNALLOCATED, stream, mr); + + // select first true index + auto const d_table_ptr = cudf::table_device_view::create(when_bool_columns, stream); + thrust::transform(rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(row_count), + ret->mutable_view().begin(), + select_first_true_fn{*d_table_ptr}); + return ret; +} + +} // namespace detail + +std::unique_ptr select_first_true_index(cudf::table_view const& when_bool_columns, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + return detail::select_first_true_index(when_bool_columns, stream, mr); +} + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/case_when.hpp b/src/main/cpp/src/case_when.hpp new file mode 100644 index 0000000000..b7056c6c8f --- /dev/null +++ b/src/main/cpp/src/case_when.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +#include + +namespace spark_rapids_jni { + +/** + * + * Select the column index for the first true in bool columns. + * For the row does not contain true, use end index(number of columns). + * + * e.g.: + * column 0 in table: true, false, false, false + * column 1 in table: false, true, false, false + * column 2 in table: false, false, true, false + * + * 1st row is: true, flase, false; first true index is 0 + * 2nd row is: false, true, false; first true index is 1 + * 3rd row is: false, flase, true; first true index is 2 + * 4th row is: false, false, false; do not find true, set index to the end index 3 + * + * output column: 0, 1, 2, 3 + * In the `case when` context, here 3 index means using NULL value. + * + */ +std::unique_ptr select_first_true_index( + cudf::table_view const& when_bool_columns, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/cast_string.cu b/src/main/cpp/src/cast_string.cu index bfbbc3777d..156dbeb7bf 100644 --- a/src/main/cpp/src/cast_string.cu +++ b/src/main/cpp/src/cast_string.cu @@ -156,14 +156,14 @@ process_value(bool first_value, T current_val, T const new_digit, bool adding) * @param ansi_mode true if ansi mode is required, which is more strict and throws */ template -void __global__ string_to_integer_kernel(T* out, - bitmask_type* validity, - const char* const chars, - size_type const* offsets, - bitmask_type const* incoming_null_mask, - size_type num_rows, - bool ansi_mode, - bool strip) +CUDF_KERNEL void string_to_integer_kernel(T* out, + bitmask_type* validity, + const char* const chars, + size_type const* offsets, + bitmask_type const* incoming_null_mask, + size_type num_rows, + bool ansi_mode, + bool strip) { auto const group = cooperative_groups::this_thread_block(); auto const warp = cooperative_groups::tiled_partition(group); @@ -386,18 +386,17 @@ __device__ thrust::optional> validate_and_exponent * @param scale scale of desired decimals * @param precision precision of desired decimals * @param ansi_mode true if ansi mode is required, which is more strict and throws - * @return __global__ */ template -__global__ void string_to_decimal_kernel(T* out, - bitmask_type* validity, - const char* const chars, - size_type const* offsets, - bitmask_type const* incoming_null_mask, - size_type num_rows, - int32_t scale, - int32_t precision, - bool strip) +CUDF_KERNEL void string_to_decimal_kernel(T* out, + bitmask_type* validity, + const char* const chars, + size_type const* offsets, + bitmask_type const* incoming_null_mask, + size_type num_rows, + int32_t scale, + int32_t precision, + bool strip) { auto const group = cooperative_groups::this_thread_block(); auto const warp = cooperative_groups::tiled_partition(group); diff --git a/src/main/cpp/src/cast_string_to_float.cu b/src/main/cpp/src/cast_string_to_float.cu index e843d645ce..c19a2a10fe 100644 --- a/src/main/cpp/src/cast_string_to_float.cu +++ b/src/main/cpp/src/cast_string_to_float.cu @@ -102,7 +102,7 @@ class string_to_float { int sign = check_for_sign(); // check for leading nan - if (check_for_nan()) { + if (check_for_nan(sign)) { _out[_row] = NAN; compute_validity(_valid, _except); return; @@ -112,7 +112,7 @@ class string_to_float { if (check_for_inf()) { if (_warp_lane == 0) { _out[_row] = - sign > 0 ? std::numeric_limits::infinity() : -std::numeric_limits::infinity(); + sign >= 0 ? std::numeric_limits::infinity() : -std::numeric_limits::infinity(); } compute_validity(_valid, _except); return; @@ -140,7 +140,9 @@ class string_to_float { _except = true; } - if (_warp_lane == 0) { _out[_row] = sign * static_cast(0); } + if (_warp_lane == 0) { + _out[_row] = sign >= 0 ? static_cast(0) : -static_cast(0); + } compute_validity(_valid, _except); return; } @@ -154,15 +156,15 @@ class string_to_float { // construct the final float value if (_warp_lane == 0) { // base value - double digitsf = sign * static_cast(digits); + double digitsf = sign >= 0 ? static_cast(digits) : -static_cast(digits); // exponent int exp_ten = exp_base + manual_exp; // final value if (exp_ten > std::numeric_limits::max_exponent10) { - _out[_row] = sign > 0 ? std::numeric_limits::infinity() - : -std::numeric_limits::infinity(); + _out[_row] = sign >= 0 ? std::numeric_limits::infinity() + : -std::numeric_limits::infinity(); } else { // make sure we don't produce a subnormal number. // - a normal number is one where the leading digit of the floating point rep is not zero. @@ -236,32 +238,45 @@ class string_to_float { // returns true if we encountered 'nan' // potentially changes: valid/except - __device__ bool check_for_nan() + __device__ bool check_for_nan(int const& sign) { auto const nan_mask = __ballot_sync(0xffffffff, (_warp_lane == 0 && (_c == 'N' || _c == 'n')) || (_warp_lane == 1 && (_c == 'A' || _c == 'a')) || (_warp_lane == 2 && (_c == 'N' || _c == 'n'))); if (nan_mask == 0x7) { - // if we start with 'nan', then even if we have other garbage character, this is a null row. - // - // if we're in ansi mode and this is not -precisely- nan, report that so that we can throw - // an exception later. - if (_len != 3) { - _valid = false; - _except = _len != 3; - } - return true; + // if we start with 'nan', then even if we have other garbage character(excluding + // whitespaces), this is a null row. but for e.g. : "nan " cases. spark will treat the as + // "nan", when the trailing characters are whitespaces, it is still a valid string. if we're + // in ansi mode and this is not -precisely- nan, report that so that we can throw an exception + // later. + + // move forward the current position by 3 + _bpos += 3; + _c = __shfl_down_sync(0xffffffff, _c, 3); + + // remove the trailing whitespaces, if there exits + remove_leading_whitespace(); + + // if we're at the end and there is no sign, because Spark treats '-nan' and '+nan' as null. + if (_bpos == _len && sign == 0) { return true; } + // if we reach out here, it means that we have other garbage character. + _valid = false; + _except = true; } return false; } - // returns 1 or -1 to indicate sign + // The `sign` variables is initialized to 0, indicating no sign. + // If a sign is detected, it sets `sign` to 1, indicating `+` sign. + // If `-` is then detected, it sets `sign` to -1. + // returns 1, 0, -1 to indicate signs. __device__ int check_for_sign() { auto const sign_mask = __ballot_sync(0xffffffff, _warp_lane == 0 && (_c == '+' || _c == '-')); - int sign = 1; + int sign = 0; if (sign_mask) { + sign = 1; // NOTE: warp lane 0 is the only thread that ever reads `sign`, so technically it would be // valid to just check if(c == '-'), but that would leave other threads with an incorrect // value. if this code ever changes, that could lead to hard-to-find bugs. @@ -299,11 +314,19 @@ class string_to_float { _bpos += 5; // if we're at the end if (_bpos == _len) { return true; } + _c = __shfl_down_sync(0xffffffff, _c, 5); } + // remove the remaining whitespace if exists + remove_leading_whitespace(); + + // if we're at the end + if (_bpos == _len) { return true; } + // if we reach here for any reason, it means we have "inf" or "infinity" at the start of the // string but also have additional characters, making this whole thing bogus/null _valid = false; + return true; } return false; @@ -595,14 +618,14 @@ class string_to_float { }; template -__global__ void string_to_float_kernel(T* out, - bitmask_type* validity, - int32_t* ansi_except, - size_type* valid_count, - const char* const chars, - size_type const* offsets, - bitmask_type const* incoming_null_mask, - size_type const num_rows) +CUDF_KERNEL void string_to_float_kernel(T* out, + bitmask_type* validity, + int32_t* ansi_except, + size_type* valid_count, + const char* const chars, + size_type const* offsets, + bitmask_type const* incoming_null_mask, + size_type const num_rows) { size_type const tid = threadIdx.x + (blockDim.x * blockIdx.x); size_type const row = tid / 32; diff --git a/src/main/cpp/src/decimal_utils.cu b/src/main/cpp/src/decimal_utils.cu index 6b9ae61076..147818d9aa 100644 --- a/src/main/cpp/src/decimal_utils.cu +++ b/src/main/cpp/src/decimal_utils.cu @@ -15,14 +15,23 @@ */ #include "decimal_utils.hpp" +#include "jni_utils.hpp" +#include #include #include +#include +#include #include #include +#include +#include #include +#include +#include + #include #include @@ -1172,4 +1181,256 @@ std::unique_ptr sub_decimal128(cudf::column_view const& a, dec128_sub(overflows_view.begin(), sub_view, a, b)); return std::make_unique(std::move(columns)); } + +namespace { + +using namespace numeric; +using namespace numeric::detail; + +/** + * @brief Perform floating-point to integer decimal conversion, matching Spark behavior. + * + * The desired decimal value is computed as (returned_value * 10^{-pow10}). + * + * The rounding and precision decisions made here are chosen to match Apache Spark. + * Spark wants to perform the conversion as double to have the most precision. + * However, the behavior is still slightly different if the original type was float. + * + * @tparam FloatType The type of floating-point value we are converting from + * @tparam IntType The type of integer we are converting to, to store the decimal value + * + * @param input The floating point value to convert + * @param pow10 The power of 10 to scale the floating-point value by + * @return Integer representation of the floating-point value, rounding after scaled + */ +template )> +__device__ inline IntType scaled_round(FloatType input, int32_t pow10) +{ + // Extract components of the (double-ized) floating point number + using converter = floating_converter; + auto const integer_rep = converter::bit_cast_to_integer(static_cast(input)); + if (converter::is_zero(integer_rep)) { return 0; } + + // Note that the significand here is an unsigned integer with sizeof(double) + auto const is_negative = converter::get_is_negative(integer_rep); + auto const [significand, floating_pow2] = converter::get_significand_and_pow2(integer_rep); + + auto const unsigned_floating = (input < 0) ? -input : input; + auto const rounding_wont_overflow = [&] { + auto const scale_factor = static_cast( + multiply_power10(cuda::std::make_unsigned_t{1}, -pow10)); + return 10.0 * static_cast(unsigned_floating) * scale_factor < + static_cast(cuda::std::numeric_limits::max()); + }(); + + // Spark often wants to round the last decimal place, so we'll perform the conversion + // with one lower power of 10 so that we can (optionally) round at the end. + // Note that we can't round this way if we've requested the minimum power. + bool const can_round = cuda::std::is_same_v ? rounding_wont_overflow : true; + auto const shifting_pow10 = can_round ? pow10 - 1 : pow10; + + // Sometimes add half a bit to correct for compiler rounding to nearest floating-point value. + // See comments in add_half_if_truncates(), with differences detailed below. + // Even if we don't add the bit, shift bits to line up with what the shifting algorithm is + // expecting. + bool const is_whole_number = cuda::std::floor(input) == input; + auto const [base2_value, pow2] = [is_whole_number](auto significand, auto floating_pow2) { + if constexpr (cuda::std::is_same_v) { + // Add the 1/2 bit regardless of truncation, but still not for whole numbers. + auto const base2_value = + (significand << 1) + static_cast(!is_whole_number); + return cuda::std::make_pair(base2_value, floating_pow2 - 1); + } else { + // Input was float: never add 1/2 bit. + // Why? Because we converted to double, and the 1/2 bit beyond float is WAY too large compared + // to double's precision. And the 1/2 bit beyond double is not due to user input. + return cuda::std::make_pair(significand << 1, floating_pow2 - 1); + } + }(significand, floating_pow2); + + // Main algorithm: Apply the powers of 2 and 10 (except for the last power-of-10). + // Use larger intermediate type for conversion to avoid overflow for last power-of-10. + using intermediate_type = + cuda::std::conditional_t, std::int64_t, __int128_t>; + cuda::std::make_unsigned_t magnitude = + [&, base2_value = base2_value, pow2 = pow2] { + if constexpr (cuda::std::is_same_v) { + return rounding_wont_overflow ? convert_floating_to_integral_shifting( + base2_value, shifting_pow10, pow2) + : convert_floating_to_integral_shifting( + base2_value, shifting_pow10, pow2); + } else { + return convert_floating_to_integral_shifting<__int128_t, double>( + base2_value, shifting_pow10, pow2); + } + }(); + + // Spark wants to floor the last digits of the output, clearing data that was beyond the + // precision that was available in double. + + // How many digits do we need to floor? + // From the decimal digit corresponding to pow2 (just past double precision) to the end (pow10). + int const floor_pow10 = [&](int pow2_bit) { + // The conversion from pow2 to pow10 is log10(2), which is ~ 90/299 (close enough for ints) + // But Spark chooses the rougher 3/10 ratio instead of 90/299. + if constexpr (cuda::std::is_same_v) { + return (3 * pow2_bit - 10 * pow10) / 10; + } else { + // Spark rounds up the power-of-10 to floor for DOUBLES >= 2^63 (and yes, this is the exact + // cutoff). + bool const round_up = unsigned_floating > std::numeric_limits::max(); + return (3 * pow2_bit - 10 * pow10 + 9 * round_up) / 10; + } + }(pow2); + + // Floor end digits + if (can_round) { + if (floor_pow10 < 0) { + // Truncated: The scale factor cut off the extra, imprecise bits. + // To round to the final decimal place, add 5 to one past the last decimal place. + magnitude += 5U; + magnitude /= 10U; // Apply the last power of 10 + } else { + // We are keeping decimal digits with data beyond the precision of double. + // We want to truncate these digits, but sometimes we want to round first. + // We will round if and only if we didn't already add a half-bit earlier. + if constexpr (cuda::std::is_same_v) { + // For doubles, only round the extra digits of whole numbers. + // If it was not a whole number, we already added 1/2 a bit at higher precision than this + // earlier. + if (is_whole_number) { + magnitude += multiply_power10(decltype(magnitude)(5), floor_pow10); + } + } else { + // Input was float: we didn't add a half-bit earlier, so round at the edge of precision + // here. + magnitude += multiply_power10(decltype(magnitude)(5), floor_pow10); + } + + // +1: Divide the last power-of-10 that we postponed earlier to do rounding. + auto const truncated = divide_power10(magnitude, floor_pow10 + 1); + magnitude = multiply_power10(truncated, floor_pow10); + } + } else if (floor_pow10 > 0) { + auto const truncated = divide_power10(magnitude, floor_pow10); + magnitude = multiply_power10(truncated, floor_pow10); + } + + // Reapply the sign and return. + // NOTE: Cast can overflow! + auto const signed_magnitude = static_cast(magnitude); + return is_negative ? -signed_magnitude : signed_magnitude; +} + +template +struct floating_point_to_decimal_fn { + cudf::column_device_view input; + int8_t* validity; + bool* has_failure; + int32_t decimal_places; + DecimalRepType exclusive_bound; + + __device__ DecimalRepType operator()(cudf::size_type idx) const + { + auto const x = input.element(idx); + + if (input.is_null(idx) || !std::isfinite(x)) { + if (!std::isfinite(x)) { *has_failure = true; } + validity[idx] = false; + return DecimalRepType{0}; + } + + auto const scaled_rounded = scaled_round(x, -decimal_places); + auto const is_out_of_bound = + -exclusive_bound >= scaled_rounded || scaled_rounded >= exclusive_bound; + if (is_out_of_bound) { *has_failure = true; } + validity[idx] = !is_out_of_bound; + + return is_out_of_bound ? DecimalRepType{0} : scaled_rounded; + } +}; + +struct floating_point_to_decimal_dispatcher { + template + static constexpr bool supported_types() + { + return (std::is_same_v || // + std::is_same_v)&& // + (std::is_same_v || + std::is_same_v || + std::is_same_v); + } + + template ())> + void operator()(Args...) const + { + CUDF_FAIL("Unsupported types for floating_point_to_decimal_fn", cudf::data_type_error); + } + + template ())> + void operator()(cudf::column_view const& input, + cudf::mutable_column_view const& output, + int8_t* validity, + bool* has_failure, + int32_t decimal_places, + int32_t precision, + rmm::cuda_stream_view stream) const + { + using DecimalRepType = cudf::device_storage_type_t; + + auto const d_input_ptr = cudf::column_device_view::create(input, stream); + auto const exclusive_bound = static_cast( + multiply_power10(cuda::std::make_unsigned_t{1}, precision)); + + thrust::tabulate(rmm::exec_policy_nosync(stream), + output.begin(), + output.end(), + floating_point_to_decimal_fn{ + *d_input_ptr, validity, has_failure, decimal_places, exclusive_bound}); + } +}; + +} // namespace + +std::pair, bool> floating_point_to_decimal( + cudf::column_view const& input, + cudf::data_type output_type, + int32_t precision, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto output = cudf::make_fixed_point_column( + output_type, input.size(), cudf::mask_state::UNALLOCATED, stream, mr); + + auto const decimal_places = -output_type.scale(); + auto const default_mr = rmm::mr::get_current_device_resource(); + + rmm::device_uvector validity(input.size(), stream, default_mr); + rmm::device_scalar has_failure(false, stream, default_mr); + + cudf::double_type_dispatcher(input.type(), + output_type, + floating_point_to_decimal_dispatcher{}, + input, + output->mutable_view(), + validity.begin(), + has_failure.data(), + decimal_places, + precision, + stream); + + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + + return {std::move(output), has_failure.value(stream)}; +} + } // namespace cudf::jni diff --git a/src/main/cpp/src/decimal_utils.hpp b/src/main/cpp/src/decimal_utils.hpp index 9793e63445..8673314454 100644 --- a/src/main/cpp/src/decimal_utils.hpp +++ b/src/main/cpp/src/decimal_utils.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,4 +62,23 @@ std::unique_ptr sub_decimal128( cudf::column_view const& b, int32_t quotient_scale, rmm::cuda_stream_view stream = cudf::get_default_stream()); + +/** + * @brief Cast floating point values to decimals, matching the behavior of Spark. + * + * @param input The input column, which is either FLOAT32 or FLOAT64 type + * @param output_type The output decimal type + * @param precision The maximum number of digits that will be preserved in the output + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * @return A cudf column containing the cast result and a boolean value indicating whether the cast + operation has failed for any input rows + */ +std::pair, bool> floating_point_to_decimal( + cudf::column_view const& input, + cudf::data_type output_type, + int32_t precision, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + } // namespace cudf::jni diff --git a/src/main/cpp/src/get_json_object.cu b/src/main/cpp/src/get_json_object.cu index 887b9887de..690da3f702 100644 --- a/src/main/cpp/src/get_json_object.cu +++ b/src/main/cpp/src/get_json_object.cu @@ -19,25 +19,26 @@ #include #include -#include -#include +#include #include #include #include +#include #include #include -#include #include #include #include -#include #include #include #include #include +#include +#include #include +#include #include namespace spark_rapids_jni { @@ -51,57 +52,52 @@ namespace detail { constexpr int max_path_depth = 16; /** - * write JSON style + * @brief JSON style to write. */ -enum class write_style { RAW, QUOTED, FLATTEN }; +enum class write_style : int8_t { RAW, QUOTED, FLATTEN }; /** - * path instruction + * @brief Instruction along a JSON path. */ struct path_instruction { __device__ inline path_instruction(path_instruction_type _type) : type(_type) {} - path_instruction_type type; - // used when type is named type cudf::string_view name; // used when type is index int index{-1}; + + path_instruction_type type; }; /** - * JSON generator is used to write out JSON content. + * @brief JSON generator used to write out JSON content. + * * Because of get_json_object only outputs JSON object as a whole item, * it's no need to store internal state for JSON object when outputing, * only need to store internal state for JSON array. */ class json_generator { public: - __device__ json_generator(char* _output) : output(_output), output_len(0) {} - __device__ json_generator() : output(nullptr), output_len(0) {} + __device__ json_generator(int _offset = 0) : offset(_offset), output_len(0) {} // create a nested child generator based on this parent generator, // child generator is a view, parent and child share the same byte array - __device__ json_generator new_child_generator() + __device__ json_generator new_child_generator() const { - if (nullptr == output) { - return json_generator(); - } else { - return json_generator(output + output_len); - } + return json_generator(offset + output_len); } // write [ // add an extra comma if needed, // e.g.: when JSON content is: [[1,2,3] // writing a new [ should result: [[1,2,3],[ - __device__ void write_start_array() + __device__ void write_start_array(char* out_begin) { - try_write_comma(); - - if (output) { *(output + output_len) = '['; } + try_write_comma(out_begin); + out_begin[offset + output_len] = '['; output_len++; array_depth++; // new array is empty @@ -109,14 +105,12 @@ class json_generator { } // write ] - __device__ void write_end_array() + __device__ void write_end_array(char* out_begin) { - if (output) { *(output + output_len) = ']'; } + out_begin[offset + output_len] = ']'; output_len++; - // point to parent array array_depth--; - // set parent array as non-empty because already had a closed child item. is_curr_array_empty = false; } @@ -132,16 +126,16 @@ class json_generator { } // return true if it's in a array context and it's not writing the first item. - __device__ inline bool need_comma() { return (array_depth > 0 && !is_curr_array_empty); } + __device__ inline bool need_comma() const { return (array_depth > 0 && !is_curr_array_empty); } /** * write comma accroding to current generator state */ - __device__ void try_write_comma() + __device__ void try_write_comma(char* out_begin) { if (need_comma()) { // in array context and writes first item - if (output) { *(output + output_len) = ','; } + out_begin[offset + output_len] = ','; output_len++; } } @@ -151,24 +145,16 @@ class json_generator { * object/array, then copy to corresponding matched end object/array. return * false if JSON format is invalid return true if JSON format is valid */ - __device__ bool copy_current_structure(json_parser& parser) + __device__ bool copy_current_structure(json_parser& parser, char* out_begin) { // first try add comma - try_write_comma(); + try_write_comma(out_begin); if (array_depth > 0) { is_curr_array_empty = false; } - if (nullptr != output) { - auto copy_to = output + output_len; - auto [b, copy_len] = parser.copy_current_structure(copy_to); - output_len += copy_len; - return b; - } else { - char* copy_to = nullptr; - auto [b, copy_len] = parser.copy_current_structure(copy_to); - output_len += copy_len; - return b; - } + auto [b, copy_len] = parser.copy_current_structure(out_begin + offset + output_len); + output_len += copy_len; + return b; } /** @@ -178,17 +164,12 @@ class json_generator { * then can not return a pointer and length pair (char *, len), * For number token, JSON parser can return a pair (char *, len) */ - __device__ void write_raw(json_parser& parser) + __device__ void write_raw(json_parser& parser, char* out_begin) { if (array_depth > 0) { is_curr_array_empty = false; } - if (nullptr != output) { - auto copied = parser.write_unescaped_text(output + output_len); - output_len += copied; - } else { - auto len = parser.compute_unescaped_len(); - output_len += len; - } + auto copied = parser.write_unescaped_text(out_begin + offset + output_len); + output_len += copied; } /** @@ -222,34 +203,32 @@ class json_generator { * block */ __device__ void write_child_raw_value(char* child_block_begin, - size_t child_block_len, + int child_block_len, bool write_outer_array_tokens) { bool insert_comma = need_comma(); if (array_depth > 0) { is_curr_array_empty = false; } - if (nullptr != output) { - if (write_outer_array_tokens) { - if (insert_comma) { - *(child_block_begin + child_block_len + 2) = ']'; - move_forward(child_block_begin, child_block_len, 2); - *(child_block_begin + 1) = '['; - *(child_block_begin) = ','; - } else { - *(child_block_begin + child_block_len + 1) = ']'; - move_forward(child_block_begin, child_block_len, 1); - *(child_block_begin) = '['; - } + if (write_outer_array_tokens) { + if (insert_comma) { + *(child_block_begin + child_block_len + 2) = ']'; + move_forward(child_block_begin, child_block_len, 2); + *(child_block_begin + 1) = '['; + *(child_block_begin) = ','; } else { - if (insert_comma) { - move_forward(child_block_begin, child_block_len, 1); - *(child_block_begin) = ','; - } else { - // do not need comma && do not need write outer array tokens - // do nothing, because child generator buff is directly after the - // parent generator - } + *(child_block_begin + child_block_len + 1) = ']'; + move_forward(child_block_begin, child_block_len, 1); + *(child_block_begin) = '['; + } + } else { + if (insert_comma) { + move_forward(child_block_begin, child_block_len, 1); + *(child_block_begin) = ','; + } else { + // do not need comma && do not need write outer array tokens + // do nothing, because child generator buff is directly after the + // parent generator } } @@ -265,7 +244,7 @@ class json_generator { // e.g.: memory is: 1 2 0 0, begin is 1, len is 1, after moving, // memory is: 1 1 2 0. // Note: should move from end to begin to avoid overwrite buffer - __device__ void move_forward(char* begin, size_t len, int forward) + static __device__ void move_forward(char* begin, size_t len, int forward) { // TODO copy by 8 bytes char* pos = begin + len + forward - 1; @@ -276,9 +255,8 @@ class json_generator { } } - __device__ inline size_t get_output_len() const { return output_len; } - __device__ inline char* get_output_start_position() const { return output; } - __device__ inline char* get_current_output_position() const { return output + output_len; } + __device__ inline int get_offset() const { return offset; } + __device__ inline int get_output_len() const { return output_len; } /** * generator may contain trash output, e.g.: generator writes some output, @@ -289,13 +267,14 @@ class json_generator { __device__ inline void set_output_len(size_t len) { output_len = len; } private: - char* output; - size_t output_len; + int offset; // offset from the global output buffer + int output_len; + + int array_depth = 0; // whether already worte a item in current array // used to decide whether add a comma before writing out a new item. bool is_curr_array_empty; - int array_depth = 0; }; /** @@ -353,83 +332,100 @@ __device__ inline thrust::tuple path_match_index_wildcard( } /** - * - * This function is rewritten from above commented recursive function. - * this function is equivalent to the above commented recursive function. + * @brief The cases that mirro Apache Spark case path in `jsonExpressions.scala#evaluatePath()`. */ -__device__ bool evaluate_path(json_parser& p, - json_generator& root_g, - write_style root_style, - cudf::device_span root_path) -{ - // manually maintained context stack in lieu of calling evaluate_path recursively. - struct context { - // current token - json_token token; +enum class evaluation_case_path : int8_t { + INVALID = -1, + START_ARRAY___EMPTY_PATH___FLATTEN_STYLE = 2, + START_OBJECT___MATCHED_NAME_PATH = 4, + START_ARRAY___MATCHED_DOUBLE_WILDCARD = 5, + START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED = 6, + START_ARRAY___MATCHED_WILDCARD = 7, + START_ARRAY___MATCHED_INDEX_AND_WILDCARD = 8, + START_ARRAY___MATCHED_INDEX = 9 +}; + +/** + * @brief The struct to store states during processing JSON through different nested levels. + */ +struct context { + // used to save current generator + json_generator g; - // which case path that this task is from - int case_path; + // used to save child JSON generator for case path 6 + json_generator child_g; - // used to save current generator - json_generator g; + cudf::device_span path; - write_style style; + // whether written output + // if dirty > 0, indicates success + int dirty; - cudf::device_span path; - // is this context task is done - bool task_is_done; + // which case path that this task is from + evaluation_case_path case_path; - // whether written output - // if dirty > 0, indicates success - int dirty; + // current token + json_token token; - // for some case paths - bool is_first_enter; + write_style style; - // used to save child JSON generator for case path 8 - json_generator child_g; - }; + // for some case paths + bool is_first_enter; + + // is this context task is done + bool task_is_done; +}; + +/** + * @brief Parse a single json string using the provided command buffer. + * + * @param input The incoming json string + * @param path_commands The command buffer to be applied to the string + * @param out_buf Buffer user to store the string resulted from the query + * @return A pair containing the result code and the output size + */ +__device__ thrust::pair evaluate_path( + char_range input, cudf::device_span path_commands, char* out_buf) +{ + json_parser p{input}; + p.next_token(); + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // define stack; plus 1 indicates root context task needs an extra memory context stack[max_path_depth + 1]; - int stack_pos = 0; + int stack_size = 0; // push context function - auto push_context = [&stack, &stack_pos](json_token _token, - int _case_path, - json_generator _g, - write_style _style, - cudf::device_span _path) { + auto push_context = [&p, &stack, &stack_size](evaluation_case_path _case_path, + json_generator _g, + write_style _style, + cudf::device_span _path) { // no need to check stack is full // because Spark-Rapids already checked maximum length of `path_instruction` - auto& ctx = stack[stack_pos]; - ctx.token = _token; + auto& ctx = stack[stack_size++]; + ctx.g = std::move(_g); + ctx.path = std::move(_path); + ctx.dirty = 0; ctx.case_path = _case_path; - ctx.g = _g; + ctx.token = p.get_current_token(); ctx.style = _style; - ctx.path = _path; - ctx.task_is_done = false; - ctx.dirty = 0; ctx.is_first_enter = true; - - stack_pos++; + ctx.task_is_done = false; }; // put the first context task - push_context(p.get_current_token(), -1, root_g, root_style, root_path); + push_context(evaluation_case_path::INVALID, json_generator{}, write_style::RAW, path_commands); - while (stack_pos > 0) { - auto& ctx = stack[stack_pos - 1]; + while (stack_size > 0) { + auto& ctx = stack[stack_size - 1]; if (!ctx.task_is_done) { - // task is not done. - // case (VALUE_STRING, Nil) if style == RawStyle // case path 1 if (json_token::VALUE_STRING == ctx.token && path_is_empty(ctx.path.size()) && ctx.style == write_style::RAW) { // there is no array wildcard or slice parent, emit this string without // quotes write current string in parser to generator - ctx.g.write_raw(p); + ctx.g.write_raw(p, out_buf); ctx.dirty = 1; ctx.task_is_done = true; } @@ -440,10 +436,13 @@ __device__ bool evaluate_path(json_parser& p, // flatten this array into the parent if (json_token::END_ARRAY != p.next_token()) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // push back task // add child task - push_context(p.get_current_token(), 2, ctx.g, ctx.style, {nullptr, 0}); + push_context(evaluation_case_path::START_ARRAY___EMPTY_PATH___FLATTEN_STYLE, + ctx.g, + ctx.style, + {nullptr, 0}); } else { // END_ARRAY ctx.task_is_done = true; @@ -453,9 +452,9 @@ __device__ bool evaluate_path(json_parser& p, // case path 3 else if (path_is_empty(ctx.path.size())) { // general case: just copy the child tree verbatim - if (!(ctx.g.copy_current_structure(p))) { + if (!(ctx.g.copy_current_structure(p, out_buf))) { // JSON validation check - return false; + return {false, 0}; } ctx.dirty = 1; ctx.task_is_done = true; @@ -470,23 +469,22 @@ __device__ bool evaluate_path(json_parser& p, if (ctx.dirty > 0) { while (json_token::END_OBJECT != p.next_token()) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // skip FIELD_NAME token p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // skip value of FIELD_NAME if (!p.try_skip_children()) { // JSON validation check - return false; + return {false, 0}; } } - ctx.task_is_done = true; - } else { - return false; } + // Mark task is done regardless whether the expected child was found. + ctx.task_is_done = true; } else { // below is 1st enter ctx.is_first_enter = false; @@ -494,7 +492,7 @@ __device__ bool evaluate_path(json_parser& p, bool found_expected_child = false; while (json_token::END_OBJECT != p.next_token()) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // need to try more children auto match_named = path_match_named(ctx.path); @@ -504,13 +502,12 @@ __device__ bool evaluate_path(json_parser& p, // skip FIELD_NAME token p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // meets null token, it's not expected, return false - if (json_token::VALUE_NULL == p.get_current_token()) { return false; } + if (json_token::VALUE_NULL == p.get_current_token()) { return {false, 0}; } // push sub task; sub task will update the result of path 4 - push_context(p.get_current_token(), - 4, + push_context(evaluation_case_path::START_OBJECT___MATCHED_NAME_PATH, ctx.g, ctx.style, {ctx.path.data() + 1, ctx.path.size() - 1}); @@ -520,12 +517,12 @@ __device__ bool evaluate_path(json_parser& p, // skip FIELD_NAME token p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // current child is not expected, skip current child if (!p.try_skip_children()) { // JSON validation check - return false; + return {false, 0}; } } } @@ -545,19 +542,18 @@ __device__ bool evaluate_path(json_parser& p, // behavior in Hive if (ctx.is_first_enter) { ctx.is_first_enter = false; - ctx.g.write_start_array(); + ctx.g.write_start_array(out_buf); } if (p.next_token() != json_token::END_ARRAY) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } - push_context(p.get_current_token(), - 5, + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } + push_context(evaluation_case_path::START_ARRAY___MATCHED_DOUBLE_WILDCARD, ctx.g, write_style::FLATTEN, {ctx.path.data() + 2, ctx.path.size() - 2}); } else { - ctx.g.write_end_array(); + ctx.g.write_end_array(out_buf); ctx.task_is_done = true; } } @@ -590,28 +586,28 @@ __device__ bool evaluate_path(json_parser& p, if (p.next_token() != json_token::END_ARRAY) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // track the number of array elements and only emit an outer array if // we've written more than one element, this matches Hive's behavior - push_context(p.get_current_token(), - 6, + push_context(evaluation_case_path::START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED, child_g, next_style, {ctx.path.data() + 1, ctx.path.size() - 1}); } else { - char* child_g_start = child_g.get_output_start_position(); - size_t child_g_len = child_g.get_output_len(); + char* child_g_start = out_buf + child_g.get_offset(); + int child_g_len = child_g.get_output_len(); if (ctx.dirty > 1) { // add outer array tokens ctx.g.write_child_raw_value( child_g_start, child_g_len, /* write_outer_array_tokens */ true); - ctx.task_is_done = true; } else if (ctx.dirty == 1) { // remove outer array tokens ctx.g.write_child_raw_value( child_g_start, child_g_len, /* write_outer_array_tokens */ false); - ctx.task_is_done = true; } // else do not write anything + + // Done anyway, since we already reached the end array. + ctx.task_is_done = true; } } // case (START_ARRAY, Wildcard :: xs) @@ -620,21 +616,20 @@ __device__ bool evaluate_path(json_parser& p, path_match_element(ctx.path, path_instruction_type::WILDCARD)) { if (ctx.is_first_enter) { ctx.is_first_enter = false; - ctx.g.write_start_array(); + ctx.g.write_start_array(out_buf); } if (p.next_token() != json_token::END_ARRAY) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // wildcards can have multiple matches, continually update the dirty // count - push_context(p.get_current_token(), - 7, + push_context(evaluation_case_path::START_ARRAY___MATCHED_WILDCARD, ctx.g, write_style::QUOTED, {ctx.path.data() + 1, ctx.path.size() - 1}); } else { - ctx.g.write_end_array(); + ctx.g.write_end_array(out_buf); ctx.task_is_done = true; } } @@ -646,28 +641,27 @@ __device__ bool evaluate_path(json_parser& p, p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } ctx.is_first_enter = false; int i = idx; while (i > 0) { if (p.get_current_token() == json_token::END_ARRAY) { // terminate, nothing has been written - return false; + return {false, 0}; } - if (!p.try_skip_children()) { return false; } + if (!p.try_skip_children()) { return {false, 0}; } p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } --i; } // i == 0 - push_context(p.get_current_token(), - 8, + push_context(evaluation_case_path::START_ARRAY___MATCHED_INDEX_AND_WILDCARD, ctx.g, write_style::QUOTED, {ctx.path.data() + 1, ctx.path.size() - 1}); @@ -679,338 +673,447 @@ __device__ bool evaluate_path(json_parser& p, p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } int i = idx; while (i > 0) { if (p.get_current_token() == json_token::END_ARRAY) { // terminate, nothing has been written - return false; + return {false, 0}; } - if (!p.try_skip_children()) { return false; } + if (!p.try_skip_children()) { return {false, 0}; } p.next_token(); // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } --i; } // i == 0 - push_context( - p.get_current_token(), 9, ctx.g, ctx.style, {ctx.path.data() + 1, ctx.path.size() - 1}); + push_context(evaluation_case_path::START_ARRAY___MATCHED_INDEX, + ctx.g, + ctx.style, + {ctx.path.data() + 1, ctx.path.size() - 1}); } // case _ => // case path 12 else { - if (!p.try_skip_children()) { return false; } + if (!p.try_skip_children()) { return {false, 0}; } // default case path, return false for this task ctx.dirty = 0; ctx.task_is_done = true; } - } else { - // current context is done. - + } // if (!ctx.task_is_done) + else { // current context is done. // pop current top context - stack_pos--; + stack_size--; - // pop parent task + // has no parent task, stack is empty, will exit + if (stack_size == 0) { break; } + + // peek parent context task // update parent task info according to current task result - if (stack_pos > 0) { - // peek parent context task - auto& p_ctx = stack[stack_pos - 1]; - - // case (VALUE_STRING, Nil) if style == RawStyle - // case path 1 - if (1 == ctx.case_path) { - // never happen - } - // path 2: case (START_ARRAY, Nil) if style == FlattenStyle - // path 5: case (START_ARRAY, Wildcard :: Wildcard :: xs) - // path 7: case (START_ARRAY, Wildcard :: xs) - else if (2 == ctx.case_path || 5 == ctx.case_path || 7 == ctx.case_path) { + auto& p_ctx = stack[stack_size - 1]; + + switch (ctx.case_path) { + // path 2: case (START_ARRAY, Nil) if style == FlattenStyle + // path 5: case (START_ARRAY, Wildcard :: Wildcard :: xs) + // path 7: case (START_ARRAY, Wildcard :: xs) + case evaluation_case_path::START_ARRAY___EMPTY_PATH___FLATTEN_STYLE: + case evaluation_case_path::START_ARRAY___MATCHED_DOUBLE_WILDCARD: + case evaluation_case_path::START_ARRAY___MATCHED_WILDCARD: { // collect result from child task p_ctx.dirty += ctx.dirty; // copy generator states to parent task; p_ctx.g = ctx.g; + + break; } - // case (START_OBJECT, Named :: xs) - // case path 4 - else if (4 == ctx.case_path) { + + // case (START_OBJECT, Named :: xs) + // case path 4 + case evaluation_case_path::START_OBJECT___MATCHED_NAME_PATH: { p_ctx.dirty = ctx.dirty; // copy generator states to parent task; p_ctx.g = ctx.g; + + break; } - // case (START_ARRAY, Wildcard :: xs) if style != QuotedStyle - // case path 6 - else if (6 == ctx.case_path) { + + // case (START_ARRAY, Wildcard :: xs) if style != QuotedStyle + // case path 6 + case evaluation_case_path::START_ARRAY___MATCHED_WILDCARD___STYLE_NOT_QUOTED: { // collect result from child task p_ctx.dirty += ctx.dirty; // update child generator for parent task p_ctx.child_g = ctx.g; + + break; } - /* case (START_ARRAY, Index(idx) :: (xs@Wildcard :: _)) */ - // case path 8 - // case (START_ARRAY, Index(idx) :: xs) - // case path 9 - else if (8 == ctx.case_path || 9 == ctx.case_path) { + + /* case (START_ARRAY, Index(idx) :: (xs@Wildcard :: _)) */ + // case path 8 + // case (START_ARRAY, Index(idx) :: xs) + // case path 9 + case evaluation_case_path::START_ARRAY___MATCHED_INDEX_AND_WILDCARD: + case evaluation_case_path::START_ARRAY___MATCHED_INDEX: { // collect result from child task p_ctx.dirty += ctx.dirty; // post logic: while (p.next_token() != json_token::END_ARRAY) { // JSON validation check - if (json_token::ERROR == p.get_current_token()) { return false; } + if (json_token::ERROR == p.get_current_token()) { return {false, 0}; } // advance the token stream to the end of the array - if (!p.try_skip_children()) { return false; } + if (!p.try_skip_children()) { return {false, 0}; } } // task is done p_ctx.task_is_done = true; // copy generator states to parent task; p_ctx.g = ctx.g; + + break; } - // case path 3: case (_, Nil) - // case path 12: case _ => - // others - else { - // never happen - } - } else { - // has no parent task, stack is empty, will exit - } - } - } - // copy output len - root_g.set_output_len(stack[0].g.get_output_len()); - return stack[0].dirty > 0; -} + default:; // Never happens! + } // end switch (ctx.case_path) + } // ctx.task_is_done + } // while (stack_size > 0) -rmm::device_uvector construct_path_commands( - std::vector> const& instructions, - cudf::string_scalar const& all_names_scalar, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - int name_pos = 0; - - // construct the path commands - std::vector path_commands; - for (auto const& inst : instructions) { - auto const& [type, name, index] = inst; - switch (type) { - case path_instruction_type::WILDCARD: - path_commands.emplace_back(path_instruction{path_instruction_type::WILDCARD}); - break; - case path_instruction_type::INDEX: - path_commands.emplace_back(path_instruction{path_instruction_type::INDEX}); - path_commands.back().index = index; - break; - case path_instruction_type::NAMED: - path_commands.emplace_back(path_instruction{path_instruction_type::NAMED}); - path_commands.back().name = - cudf::string_view(all_names_scalar.data() + name_pos, name.size()); - name_pos += name.size(); - break; - default: CUDF_FAIL("Invalid path instruction type"); - } - } - // convert to uvector - return cudf::detail::make_device_uvector_sync(path_commands, stream, mr); + auto const success = stack[0].dirty > 0; + + // generator may contain trash output, e.g.: generator writes some output, + // then JSON format is invalid, the previous output becomes trash. + // We need to return output size as zero. + return {success, success ? stack[0].g.get_output_len() : 0}; } /** - * @brief Parse a single json string using the provided command buffer + * @brief Struct storing data such as path instructions, output buffer etc, corresponding to a + * single JSON path. + */ +struct json_path_processing_data { + cudf::device_span path_commands; + cudf::detail::input_offsetalator offsets; + thrust::pair* out_stringviews; + char* out_buf; + int8_t* has_out_of_bound; +}; + +/** + * @brief Kernel for running the JSONPath query, in which one input row is processed by entire + * warp (or multiple warps) of threads. * + * The number of warps processing each row is computed as `ceil(num_paths / warp_size)`. * - * @param input The incoming json string - * @param input_len Size of the incoming json string - * @param path_commands_ptr The command buffer to be applied to the string. - * @param path_commands_size The command buffer size. - * @param out_buf Buffer user to store the results of the query - * (nullptr in the size computation step) - * @param out_buf_size Size of the output buffer - * @returns A pair containing the result code and the output buffer. + * We explicitly set a value for `min_block_per_sm` parameter in the launch bounds to avoid + * spilling from the kernel itself. By default NVCC uses a heuristic to find a balance between + * the maximum number of registers used by a kernel and the parallelism of the kernel. + * If lots of registers are used the parallelism may suffer. But in our case NVCC gets this wrong + * and we want to avoid spilling all the time or else the performance is really bad. This + * essentially tells NVCC to prefer using lots of registers over spilling. + * + * @param input The input JSON strings stored in a strings column + * @param path_data Array containing all path data + * @param num_threads_per_row Number of threads processing each input row */ -__device__ thrust::pair get_json_object_single( - char_range input, - cudf::device_span path_commands, - char* out_buf, - size_t out_buf_size) +template +__launch_bounds__(block_size, min_block_per_sm) CUDF_KERNEL + void get_json_object_kernel(cudf::column_device_view input, + cudf::device_span path_data, + std::size_t num_threads_per_row) { - json_parser j_parser(input); - j_parser.next_token(); - // JSON validation check - if (json_token::ERROR == j_parser.get_current_token()) { return {false, 0}; } - - // First pass: preprocess sizes. - // Second pass: writes output. - // The generator automatically determines which pass based on `out_buf`. - // If `out_buf_size` is zero, pass in `nullptr` to avoid generator writing trash output. - json_generator generator((out_buf_size == 0) ? nullptr : out_buf); - - bool const success = evaluate_path( - j_parser, generator, write_style::RAW, {path_commands.data(), path_commands.size()}); - - if (!success) { - // generator may contain trash output, e.g.: generator writes some output, - // then JSON format is invalid, the previous output becomes trash. - // set output as zero to tell second step - generator.set_output_len_zero(); + auto const tidx = cudf::detail::grid_1d::global_thread_id(); + auto const row_idx = tidx / num_threads_per_row; + if (row_idx >= input.size()) { return; } + + auto const path_idx = tidx % num_threads_per_row; + if (path_idx >= path_data.size()) { return; } + + auto const& path = path_data[path_idx]; + char* const dst = path.out_buf + path.offsets[row_idx]; + bool is_valid = false; + cudf::size_type out_size = 0; + + auto const str = input.element(row_idx); + if (str.size_bytes() > 0) { + thrust::tie(is_valid, out_size) = evaluate_path(char_range{str}, path.path_commands, dst); + + auto const max_size = path.offsets[row_idx + 1] - path.offsets[row_idx]; + if (out_size > max_size) { *(path.has_out_of_bound) = 1; } } - return {success, generator.get_output_len()}; + // Write out `nullptr` in the output string_view to indicate that the output is a null. + // The situation `out_stringviews == nullptr` should only happen if the kernel is launched a + // second time due to out-of-bound write in the first launch. + if (path.out_stringviews) { + path.out_stringviews[row_idx] = {is_valid ? dst : nullptr, out_size}; + } } /** - * @brief Kernel for running the JSONPath query. + * @brief A utility class to launch the main kernel. + */ +struct kernel_launcher { + static void exec(cudf::column_device_view const& input, + cudf::device_span path_data, + rmm::cuda_stream_view stream) + { + // The optimal values for block_size and min_block_per_sm were found through testing, + // which are either 128-8 or 256-4. The pair 128-8 seems a bit better. + static constexpr int block_size = 128; + static constexpr int min_block_per_sm = 8; + + // The number of threads for processing one input row is at least one warp. + auto const num_threads_per_row = + cudf::util::div_rounding_up_safe(path_data.size(), + static_cast(cudf::detail::warp_size)) * + cudf::detail::warp_size; + auto const num_blocks = cudf::util::div_rounding_up_safe(num_threads_per_row * input.size(), + static_cast(block_size)); + get_json_object_kernel + <<>>(input, path_data, num_threads_per_row); + } +}; + +/** + * @brief Construct the device vector containing necessary data for the input JSON paths. * - * This kernel operates in a 2-pass way. On the first pass it computes the - * output sizes. On the second pass, it fills in the provided output buffers - * (chars and validity). + * All JSON paths are processed at once, without stream synchronization, to minimize overhead. * - * @param col Device view of the incoming string - * @param path_commands JSONPath command buffer - * @param d_sizes a buffer used to write the output sizes in the first pass, - * and is read back in on the second pass to compute offsets. - * @param output_offsets Buffer used to store the string offsets for the results - * of the query - * @param out_buf Buffer used to store the results of the query - * @param out_validity Output validity buffer - * @param out_valid_count Output count of # of valid bits + * A tuple of values are returned, however, only the first element is needed for further kernel + * launch. The remaining are unused but need to be kept alive as they contains data for later + * asynchronous host-device memcpy. */ -template -// We have 1 for the minBlocksPerMultiprocessor in the launch bounds to avoid spilling from -// the kernel itself. By default NVCC uses a heuristic to find a balance between the -// maximum number of registers used by a kernel and the parallelism of the kernel. -// If lots of registers are used the parallelism may suffer. But in our case -// NVCC gets this wrong and we want to avoid spilling all the time or else -// the performance is really bad. This essentially tells NVCC to prefer using lots -// of registers over spilling. -__launch_bounds__(block_size, 1) CUDF_KERNEL - void get_json_object_kernel(cudf::column_device_view col, - cudf::device_span path_commands, - cudf::size_type* d_sizes, - cudf::detail::input_offsetalator output_offsets, - char* out_buf, - cudf::bitmask_type* out_validity, - cudf::size_type* out_valid_count) +std::tuple>, + std::unique_ptr>>, + cudf::string_scalar, + std::string> +construct_path_commands( + std::vector>> const& + json_paths, + rmm::cuda_stream_view stream) { - auto tid = cudf::detail::grid_1d::global_thread_id(); - auto const stride = cudf::detail::grid_1d::grid_stride(); - - cudf::size_type warp_valid_count{0}; - - auto active_threads = __ballot_sync(0xffff'ffffu, tid < col.size()); - while (tid < col.size()) { - bool is_valid = false; - cudf::string_view const str = col.element(tid); - if (str.size_bytes() > 0) { - char* dst = out_buf != nullptr ? out_buf + output_offsets[tid] : nullptr; - size_t const dst_size = - out_buf != nullptr ? output_offsets[tid + 1] - output_offsets[tid] : 0; - - // process one single row - auto [result, output_size] = - get_json_object_single(str, {path_commands.data(), path_commands.size()}, dst, dst_size); - if (result) { is_valid = true; } - - // filled in only during the precompute step. during the compute step, the - // offsets are fed back in so we do -not- want to write them out - if (out_buf == nullptr) { d_sizes[tid] = static_cast(output_size); } - } else { - // valid JSON length is always greater than 0 - // if `str` size len is zero, output len is 0 and `is_valid` is false - if (out_buf == nullptr) { d_sizes[tid] = 0; } + // Concatenate all names from path instructions. + auto h_inst_names = [&] { + std::size_t length{0}; + for (auto const& instructions : json_paths) { + for (auto const& [type, name, index] : instructions) { + if (type == path_instruction_type::NAMED) { length += name.length(); } + } } - - // validity filled in only during the output step - if (out_validity != nullptr) { - uint32_t mask = __ballot_sync(active_threads, is_valid); - // 0th lane of the warp writes the validity - if (!(tid % cudf::detail::warp_size)) { - out_validity[cudf::word_index(tid)] = mask; - warp_valid_count += __popc(mask); + std::string all_names; + all_names.reserve(length); + for (auto const& instructions : json_paths) { + for (auto const& [type, name, index] : instructions) { + if (type == path_instruction_type::NAMED) { all_names += name; } } } + return all_names; + }(); + auto d_inst_names = cudf::string_scalar(h_inst_names, true, stream); - tid += stride; - active_threads = __ballot_sync(active_threads, tid < col.size()); + std::size_t name_pos{0}; + auto h_path_commands = std::make_unique>>(); + h_path_commands->reserve(json_paths.size()); + + for (auto const& instructions : json_paths) { + h_path_commands->emplace_back(); + auto& path_commands = h_path_commands->back(); + path_commands.reserve(instructions.size()); + + for (auto const& [type, name, index] : instructions) { + path_commands.emplace_back(path_instruction{type}); + + if (type == path_instruction_type::INDEX) { + path_commands.back().index = index; + } else if (type == path_instruction_type::NAMED) { + path_commands.back().name = cudf::string_view(d_inst_names.data() + name_pos, name.size()); + name_pos += name.size(); + } else if (type != path_instruction_type::WILDCARD) { + CUDF_FAIL("Invalid path instruction type"); + } + } } - // sum the valid counts across the whole block - if (out_valid_count != nullptr) { - cudf::size_type block_valid_count = - cudf::detail::single_lane_block_sum_reduce(warp_valid_count); - if (threadIdx.x == 0) { atomicAdd(out_valid_count, block_valid_count); } + auto d_path_commands = std::vector>{}; + d_path_commands.reserve(h_path_commands->size()); + for (auto const& path_commands : *h_path_commands) { + d_path_commands.emplace_back(cudf::detail::make_device_uvector_async( + path_commands, stream, rmm::mr::get_current_device_resource())); } + + return {std::move(d_path_commands), + std::move(h_path_commands), + std::move(d_inst_names), + std::move(h_inst_names)}; } -std::unique_ptr get_json_object( +std::vector> get_json_object( cudf::strings_column_view const& input, - std::vector> const& instructions, + std::vector>> const& + json_paths, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (input.is_empty()) return cudf::make_empty_column(cudf::type_id::STRING); + auto const num_outputs = json_paths.size(); + std::vector> output; + + // Input is empty or all nulls - just return all null columns. + if (input.is_empty() || input.size() == input.null_count()) { + for (std::size_t idx = 0; idx < num_outputs; ++idx) { + output.emplace_back(std::make_unique(input.parent(), stream, mr)); + } + return output; + } + + auto const d_input_ptr = cudf::column_device_view::create(input.parent(), stream); + auto const in_offsets = + cudf::detail::offsetalator_factory::make_input_iterator(input.offsets(), input.offset()); + auto const [d_json_paths, h_json_paths, d_inst_names, h_inst_names] = + construct_path_commands(json_paths, stream); + + auto const max_row_size = thrust::transform_reduce( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(input.size()), + cuda::proclaim_return_type( + [in_offsets] __device__(auto const idx) { return in_offsets[idx + 1] - in_offsets[idx]; }), + int64_t{0}, + thrust::maximum{}); + + // We will use scratch buffers to store the output strings without knowing their sizes. + // Since we do not know their sizes, we need to allocate the buffer a bit larger than the input + // size so that we will not write output strings into an out-of-bound position. + // Checking out-of-bound needs to be performed in the main kernel to make sure we will not have + // data corruption. + auto const scratch_size = [&, max_row_size = max_row_size] { + // Pad the scratch buffer by an additional size that is a multiple of max row size. + auto constexpr padding_rows = 10; + return input.chars_size(stream) + max_row_size * padding_rows; + }(); + + rmm::device_uvector d_has_out_of_bound(num_outputs, stream); + std::vector> scratch_buffers; + std::vector>> out_stringviews; + std::vector h_path_data; + scratch_buffers.reserve(json_paths.size()); + out_stringviews.reserve(json_paths.size()); + h_path_data.reserve(json_paths.size()); + + for (std::size_t idx = 0; idx < num_outputs; ++idx) { + auto const& instructions = json_paths[idx]; + if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); } + + scratch_buffers.emplace_back(rmm::device_uvector(scratch_size, stream)); + out_stringviews.emplace_back(rmm::device_uvector>{ + static_cast(input.size()), stream}); + + h_path_data.emplace_back(json_path_processing_data{d_json_paths[idx], + in_offsets, + out_stringviews.back().data(), + scratch_buffers.back().data(), + d_has_out_of_bound.data() + idx}); + } + auto d_path_data = cudf::detail::make_device_uvector_async( + h_path_data, stream, rmm::mr::get_current_device_resource()); + thrust::uninitialized_fill( + rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0); + kernel_launcher::exec(*d_input_ptr, d_path_data, stream); + + // Do not use parallel check since we do not have many elements. + auto h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream); + auto has_no_oob = std::none_of( + h_has_out_of_bound.begin(), h_has_out_of_bound.end(), [](auto const val) { return val != 0; }); + + // If we didn't see any out-of-bound write, everything is good so far. + // Just gather the output strings and return. + if (has_no_oob) { + for (auto const& out_sview : out_stringviews) { + output.emplace_back(cudf::make_strings_column(out_sview, stream, mr)); + } + return output; + } + // From here, we had out-of-bound write. Although this is very rare, it may still happen. + + std::vector> out_null_masks_and_null_counts; + std::vector, int64_t>> out_offsets_and_sizes; + std::vector> out_char_buffers; + std::vector oob_indices; - if (instructions.size() > max_path_depth) { CUDF_FAIL("JSONPath query exceeds maximum depth"); } + // Check validity from the stored char pointers. + auto const validator = [] __device__(thrust::pair const item) { + return item.first != nullptr; + }; - // get a string buffer to store all the names and convert to device - std::string all_names; - for (auto const& inst : instructions) { - all_names += std::get<1>(inst); + // Rebuild the data only for paths that had out of bound write. + h_path_data.clear(); + for (std::size_t idx = 0; idx < num_outputs; ++idx) { + auto const& out_sview = out_stringviews[idx]; + + if (h_has_out_of_bound[idx]) { + oob_indices.emplace_back(idx); + output.emplace_back(nullptr); // just placeholder. + + out_null_masks_and_null_counts.emplace_back( + cudf::detail::valid_if(out_sview.begin(), out_sview.end(), validator, stream, mr)); + + // The string sizes computed in the previous kernel call will be used to allocate a new char + // buffer to store the output. + auto const size_it = cudf::detail::make_counting_transform_iterator( + 0, + cuda::proclaim_return_type( + [string_pairs = out_sview.data()] __device__(auto const idx) { + return string_pairs[idx].second; + })); + out_offsets_and_sizes.emplace_back(cudf::strings::detail::make_offsets_child_column( + size_it, size_it + input.size(), stream, mr)); + out_char_buffers.emplace_back( + rmm::device_uvector(out_offsets_and_sizes.back().second, stream, mr)); + + h_path_data.emplace_back( + json_path_processing_data{d_json_paths[idx], + cudf::detail::offsetalator_factory::make_input_iterator( + out_offsets_and_sizes.back().first->view()), + nullptr /*out_stringviews*/, + out_char_buffers.back().data(), + d_has_out_of_bound.data() + idx}); + } else { + output.emplace_back(cudf::make_strings_column(out_sview, stream, mr)); + } + } + // These buffers are no longer needed. + scratch_buffers.clear(); + out_stringviews.clear(); + + // Push data to the GPU and launch the kernel again. + d_path_data = cudf::detail::make_device_uvector_async( + h_path_data, stream, rmm::mr::get_current_device_resource()); + thrust::uninitialized_fill( + rmm::exec_policy(stream), d_has_out_of_bound.begin(), d_has_out_of_bound.end(), 0); + kernel_launcher::exec(*d_input_ptr, d_path_data, stream); + + // Check out of bound again to make sure everything looks right. + h_has_out_of_bound = cudf::detail::make_host_vector_sync(d_has_out_of_bound, stream); + has_no_oob = std::none_of( + h_has_out_of_bound.begin(), h_has_out_of_bound.end(), [](auto const val) { return val != 0; }); + + // The last kernel call should not encounter any out-of-bound write. + // If OOB is still detected, there must be something wrong happened. + CUDF_EXPECTS(has_no_oob, "Unexpected out-of-bound write in get_json_object kernel."); + + for (std::size_t idx = 0; idx < oob_indices.size(); ++idx) { + auto const out_idx = oob_indices[idx]; + output[out_idx] = + cudf::make_strings_column(input.size(), + std::move(out_offsets_and_sizes[idx].first), + out_char_buffers[idx].release(), + out_null_masks_and_null_counts[idx].second, + std::move(out_null_masks_and_null_counts[idx].first)); } - cudf::string_scalar all_names_scalar(all_names, true, stream); - // parse the json_path into a command buffer - auto path_commands = construct_path_commands( - instructions, all_names_scalar, stream, rmm::mr::get_current_device_resource()); - - // compute output sizes - auto sizes = rmm::device_uvector( - input.size(), stream, rmm::mr::get_current_device_resource()); - auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(input.offsets()); - - constexpr int block_size = 512; - cudf::detail::grid_1d const grid{input.size(), block_size}; - auto d_input_ptr = cudf::column_device_view::create(input.parent(), stream); - // preprocess sizes (returned in the offsets buffer) - get_json_object_kernel - <<>>( - *d_input_ptr, path_commands, sizes.data(), d_offsets, nullptr, nullptr, nullptr); - - // convert sizes to offsets - auto [offsets, output_size] = - cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr); - d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view()); - - // allocate output string column - rmm::device_uvector chars(output_size, stream, mr); - - // potential optimization : if we know that all outputs are valid, we could - // skip creating the validity mask altogether - rmm::device_buffer validity = - cudf::detail::create_null_mask(input.size(), cudf::mask_state::UNINITIALIZED, stream, mr); - - // compute results - rmm::device_scalar d_valid_count{0, stream}; - - get_json_object_kernel - <<>>( - *d_input_ptr, - path_commands, - sizes.data(), - d_offsets, - chars.data(), - static_cast(validity.data()), - d_valid_count.data()); - - return make_strings_column(input.size(), - std::move(offsets), - chars.release(), - input.size() - d_valid_count.value(stream), - std::move(validity)); + return output; } } // namespace detail @@ -1021,7 +1124,19 @@ std::unique_ptr get_json_object( rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - return detail::get_json_object(input, instructions, stream, mr); + CUDF_FUNC_RANGE(); + return std::move(detail::get_json_object(input, {instructions}, stream, mr).front()); +} + +std::vector> get_json_object_multiple_paths( + cudf::strings_column_view const& input, + std::vector>> const& + json_paths, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::get_json_object(input, json_paths, stream, mr); } } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/get_json_object.hpp b/src/main/cpp/src/get_json_object.hpp index bb3294b424..963bc91a74 100644 --- a/src/main/cpp/src/get_json_object.hpp +++ b/src/main/cpp/src/get_json_object.hpp @@ -16,31 +16,25 @@ #pragma once -#include #include #include -#include -#include -#include -#include - #include -#include #include namespace spark_rapids_jni { /** - * path instruction type + * @brief Type of instruction in a JSON path. */ -enum class path_instruction_type { WILDCARD, INDEX, NAMED }; +enum class path_instruction_type : int8_t { WILDCARD, INDEX, NAMED }; /** - * Extracts json object from a json string based on json path specified, and - * returns json string of the extracted json object. It will return null if the - * input json string is invalid. + * @brief Extract JSON object from a JSON string based on the specified JSON path. + * + * If the input JSON string is invalid, or it does not contain the object at the given path, a null + * will be returned. */ std::unique_ptr get_json_object( cudf::strings_column_view const& input, @@ -48,4 +42,18 @@ std::unique_ptr get_json_object( rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); +/** + * @brief Extract multiple JSON objects from a JSON string based on the specified JSON paths. + * + * This function processes all the JSON paths in parallel, which may be faster than calling + * to `get_json_object` on the individual JSON paths. However, it may consume much more GPU + * memory, proportional to the number of JSON paths. + */ +std::vector> get_json_object_multiple_paths( + cudf::strings_column_view const& input, + std::vector>> const& + json_paths, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/hash.cuh b/src/main/cpp/src/hash.cuh index 8cf489a7e7..35969ca42a 100644 --- a/src/main/cpp/src/hash.cuh +++ b/src/main/cpp/src/hash.cuh @@ -19,15 +19,9 @@ #include #include -#include -#include - #include namespace spark_rapids_jni { - -constexpr int64_t DEFAULT_XXHASH64_SEED = 42; - /** * Normalization of floating point NaNs, passthrough for all other values. */ @@ -101,37 +95,4 @@ __device__ __inline__ std::pair<__int128_t, cudf::size_type> to_java_bigdecimal( return {big_endian_value, length}; } - -/** - * @brief Computes the murmur32 hash value of each row in the input set of columns. - * - * @param input The table of columns to hash - * @param seed Optional seed value to use for the hash function - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @returns A column where each row is the hash of a column from the input. - */ -std::unique_ptr murmur_hash3_32( - cudf::table_view const& input, - uint32_t seed = 0, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - -/** - * @brief Computes the xxhash64 hash value of each row in the input set of columns. - * - * @param input The table of columns to hash - * @param seed Optional seed value to use for the hash function - * @param stream CUDA stream used for device memory operations and kernel launches - * @param mr Device memory resource used to allocate the returned column's device memory - * - * @returns A column where each row is the hash of a column from the input. - */ -std::unique_ptr xxhash64( - cudf::table_view const& input, - int64_t seed = DEFAULT_XXHASH64_SEED, - rmm::cuda_stream_view stream = cudf::get_default_stream(), - rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); - } // namespace spark_rapids_jni diff --git a/src/main/cpp/src/hash.hpp b/src/main/cpp/src/hash.hpp new file mode 100644 index 0000000000..4021b9e75c --- /dev/null +++ b/src/main/cpp/src/hash.hpp @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace spark_rapids_jni { + +constexpr int64_t DEFAULT_XXHASH64_SEED = 42; + +/** + * @brief Computes the murmur32 hash value of each row in the input set of columns. + * + * @param input The table of columns to hash + * @param seed Optional seed value to use for the hash function + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns A column where each row is the hash of a column from the input. + */ +std::unique_ptr murmur_hash3_32( + cudf::table_view const& input, + uint32_t seed = 0, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Computes the xxhash64 hash value of each row in the input set of columns. + * + * @param input The table of columns to hash + * @param seed Optional seed value to use for the hash function + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns A column where each row is the hash of a column from the input. + */ +std::unique_ptr xxhash64( + cudf::table_view const& input, + int64_t seed = DEFAULT_XXHASH64_SEED, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + +/** + * @brief Computes the Hive hash value of each row in the input set of columns. + * + * @param input The table of columns to hash + * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the returned column's device memory + * + * @returns A column where each row is the hash of a column from the input. + */ +std::unique_ptr hive_hash( + cudf::table_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/hive_hash.cu b/src/main/cpp/src/hive_hash.cu new file mode 100644 index 0000000000..85598565a9 --- /dev/null +++ b/src/main/cpp/src/hive_hash.cu @@ -0,0 +1,255 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "hash.cuh" + +#include +#include +#include +#include + +#include +#include + +#include +#include +#include + +namespace spark_rapids_jni { + +namespace { + +using hive_hash_value_t = int32_t; + +constexpr hive_hash_value_t HIVE_HASH_FACTOR = 31; +constexpr hive_hash_value_t HIVE_INIT_HASH = 0; + +hive_hash_value_t __device__ inline compute_int(int32_t key) { return key; } + +hive_hash_value_t __device__ inline compute_long(int64_t key) +{ + return (static_cast(key) >> 32) ^ key; +} + +hive_hash_value_t __device__ inline compute_bytes(int8_t const* data, cudf::size_type const len) +{ + hive_hash_value_t ret = HIVE_INIT_HASH; + for (auto i = 0; i < len; i++) { + ret = ret * HIVE_HASH_FACTOR + static_cast(data[i]); + } + return ret; +} + +template +struct hive_hash_function { + // 'seed' is not used in 'hive_hash_function', but required by 'element_hasher'. + constexpr hive_hash_function(uint32_t) {} + + [[nodiscard]] hive_hash_value_t __device__ inline operator()(Key const& key) const + { + CUDF_UNREACHABLE("Unsupported type for hive hash"); + } +}; // struct hive_hash_function + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + cudf::string_view const& key) const +{ + auto const data = reinterpret_cast(key.data()); + auto const len = key.size_bytes(); + return compute_bytes(data, len); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()(bool const& key) const +{ + return compute_int(static_cast(key)); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()(int8_t const& key) const +{ + return compute_int(static_cast(key)); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + int16_t const& key) const +{ + return compute_int(static_cast(key)); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + int32_t const& key) const +{ + return compute_int(key); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + int64_t const& key) const +{ + return compute_long(key); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()(float const& key) const +{ + auto normalized = spark_rapids_jni::normalize_nans(key); + auto* p_int = reinterpret_cast(&normalized); + return compute_int(*p_int); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()(double const& key) const +{ + auto normalized = spark_rapids_jni::normalize_nans(key); + auto* p_long = reinterpret_cast(&normalized); + return compute_long(*p_long); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + cudf::timestamp_D const& key) const +{ + auto* p_int = reinterpret_cast(&key); + return compute_int(*p_int); +} + +template <> +hive_hash_value_t __device__ inline hive_hash_function::operator()( + cudf::timestamp_us const& key) const +{ + auto time_as_long = *reinterpret_cast(&key); + constexpr int MICRO_PER_SEC = 1000000; + constexpr int NANO_PER_MICRO = 1000; + + int64_t ts = time_as_long / MICRO_PER_SEC; + int64_t tns = (time_as_long % MICRO_PER_SEC) * NANO_PER_MICRO; + + int64_t result = ts; + result <<= 30; + result |= tns; + + result = (static_cast(result) >> 32) ^ result; + return static_cast(result); +} + +/** + * @brief Computes the hash value of a row in the given table. + * + * This functor produces the same result as "HiveHash" in Spark for supported types. + * + * @tparam hash_function Hash functor to use for hashing elements. Must be hive_hash_function. + * @tparam Nullate A cudf::nullate type describing whether to check for nulls. + */ +template