Skip to content

Commit

Permalink
Add jvm libhdfs support
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Oct 10, 2024
1 parent 63c848d commit 701cbec
Show file tree
Hide file tree
Showing 19 changed files with 2,361 additions and 136 deletions.
1 change: 1 addition & 0 deletions .github/workflows/linux-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ jobs:
LIBHDFS3_CONF: "${{ github.workspace }}/scripts/hdfs-client.xml"
working-directory: _build/release
run: |
export CLASSPATH=`/usr/local/hadoop/bin/hdfs classpath --glob`
ctest -j 8 --output-on-failure --no-tests=error
ubuntu-debug:
Expand Down
8 changes: 3 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,9 @@ if(VELOX_ENABLE_ABFS)
endif()

if(VELOX_ENABLE_HDFS)
find_library(
LIBHDFS3
NAMES libhdfs3.so libhdfs3.dylib
HINTS "${CMAKE_SOURCE_DIR}/hawq/depends/libhdfs3/_build/src/" REQUIRED)
add_definitions(-DVELOX_ENABLE_HDFS3)
add_definitions(-DVELOX_ENABLE_HDFS)
# JVM libhdfs requires arrow dependency.
set(VELOX_ENABLE_ARROW ON)
endif()

if(VELOX_ENABLE_PARQUET)
Expand Down
100 changes: 100 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,103 @@ This product includes software from the QT project (BSD, 3-clause).

This product includes software from HowardHinnant's date library (MIT License).
* https://github.com/HowardHinnant/date/tree/master

This product includes software from the The Arrow project.
* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.h
* https://github.com/apache/arrow/blob/apache-arrow-15.0.0/cpp/src/arrow/io/hdfs_internal.cc
Which contain the following NOTICE file:
-------
Apache Arrow
Copyright 2016-2024 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
This product includes software from the SFrame project (BSD, 3-clause).
* Copyright (C) 2015 Dato, Inc.
* Copyright (c) 2009 Carnegie Mellon University.
This product includes software from the Feather project (Apache 2.0)
https://github.com/wesm/feather
This product includes software from the DyND project (BSD 2-clause)
https://github.com/libdynd
This product includes software from the LLVM project
* distributed under the University of Illinois Open Source
This product includes software from the google-lint project
* Copyright (c) 2009 Google Inc. All rights reserved.
This product includes software from the mman-win32 project
* Copyright https://code.google.com/p/mman-win32/
* Licensed under the MIT License;
This product includes software from the LevelDB project
* Copyright (c) 2011 The LevelDB Authors. All rights reserved.
* Use of this source code is governed by a BSD-style license that can be
* Moved from Kudu http://github.com/cloudera/kudu
This product includes software from the CMake project
* Copyright 2001-2009 Kitware, Inc.
* Copyright 2012-2014 Continuum Analytics, Inc.
* All rights reserved.
This product includes software from https://github.com/matthew-brett/multibuild (BSD 2-clause)
* Copyright (c) 2013-2016, Matt Terry and Matthew Brett; all rights reserved.
This product includes software from the Ibis project (Apache 2.0)
* Copyright (c) 2015 Cloudera, Inc.
* https://github.com/cloudera/ibis
This product includes software from Dremio (Apache 2.0)
* Copyright (C) 2017-2018 Dremio Corporation
* https://github.com/dremio/dremio-oss
This product includes software from Google Guava (Apache 2.0)
* Copyright (C) 2007 The Guava Authors
* https://github.com/google/guava
This product include software from CMake (BSD 3-Clause)
* CMake - Cross Platform Makefile Generator
* Copyright 2000-2019 Kitware, Inc. and Contributors
The web site includes files generated by Jekyll.
--------------------------------------------------------------------------------
This product includes code from Apache Kudu, which includes the following in
its NOTICE file:
Apache Kudu
Copyright 2016 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Portions of this software were developed at
Cloudera, Inc (http://www.cloudera.com/).
--------------------------------------------------------------------------------
This product includes code from Apache ORC, which includes the following in
its NOTICE file:
Apache ORC
Copyright 2013-2019 The Apache Software Foundation
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/).
This product includes software developed by Hewlett-Packard:
(c) Copyright [2014-2015] Hewlett-Packard Development Company, L.P
-------

This product includes software from the The Hadoop project.
* https://github.com/apache/hadoop/blob/release-3.3.0-RC0/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfs/include/hdfs/hdfs.h
Which contains the following NOTICE file:
----
Apache Hadoop
Copyright 2006 and onwards The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
Export Control Notice
---------------------
This distribution includes cryptographic software. The country in
which you currently reside may have restrictions on the import,
possession, use, and/or re-export to another country, of
encryption software. BEFORE using any encryption software, please
check your country's laws, regulations and policies concerning the
import, possession, or use, and re-export of encryption software, to
see if this is permitted. See <http://www.wassenaar.org/> for more
information.
The U.S. Government Department of Commerce, Bureau of Industry and
Security (BIS), has classified this software as Export Commodity
Control Number (ECCN) 5D002.C.1, which includes information security
software using or performing cryptographic functions with asymmetric
algorithms. The form and manner of this Apache Software Foundation
distribution makes it eligible for export under the License Exception
ENC Technology Software Unrestricted (TSU) exception (see the BIS
Export Administration Regulations, Section 740.13) for both object
code and source code.
The following provides more details on the included cryptographic software:
This software uses the SSL libraries from the Jetty project written
by mortbay.org.
Hadoop Yarn Server Web Proxy uses the BouncyCastle Java
cryptography APIs written by the Legion of the Bouncy Castle Inc.
----
1 change: 1 addition & 0 deletions velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ add_subdirectory(row)
add_subdirectory(flag_definitions)
add_subdirectory(external/date)
add_subdirectory(external/md5)
add_subdirectory(external/hdfs)
#

# examples depend on expression
Expand Down
7 changes: 6 additions & 1 deletion velox/connectors/hive/storage_adapters/hdfs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ if(VELOX_ENABLE_HDFS)
HdfsFileSystem.cpp
HdfsReadFile.cpp
HdfsWriteFile.cpp)
velox_link_libraries(velox_hdfs Folly::folly ${LIBHDFS3} xsimd)
velox_link_libraries(
velox_hdfs
velox_external_hdfs
velox_dwio_common
Folly::folly
xsimd)

if(${VELOX_BUILD_TESTING})
add_subdirectory(tests)
Expand Down
33 changes: 23 additions & 10 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
* limitations under the License.
*/
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include <hdfs/hdfs.h>
#include <mutex>
#include "velox/common/config/Config.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#include "velox/external/hdfs/ArrowHdfsInternal.h"

namespace facebook::velox::filesystems {
std::string_view HdfsFileSystem::kScheme("hdfs://");
Expand All @@ -29,21 +29,27 @@ class HdfsFileSystem::Impl {
explicit Impl(
const config::ConfigBase* config,
const HdfsServiceEndpoint& endpoint) {
auto builder = hdfsNewBuilder();
hdfsBuilderSetNameNode(builder, endpoint.host.c_str());
hdfsBuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
hdfsClient_ = hdfsBuilderConnect(builder);
hdfsFreeBuilder(builder);
auto status = filesystems::arrow::io::internal::ConnectLibHdfs(&driver_);
if (!status.ok()) {
LOG(ERROR) << "ConnectLibHdfs failed ";
}

// connect to HDFS with the builder object
hdfsBuilder* builder = driver_->NewBuilder();
driver_->BuilderSetNameNode(builder, endpoint.host.c_str());
driver_->BuilderSetNameNodePort(builder, atoi(endpoint.port.data()));
driver_->BuilderSetForceNewInstance(builder);
hdfsClient_ = driver_->BuilderConnect(builder);
VELOX_CHECK_NOT_NULL(
hdfsClient_,
"Unable to connect to HDFS: {}, got error: {}.",
endpoint.identity(),
hdfsGetLastError());
driver_->GetLastExceptionRootCause());
}

~Impl() {
LOG(INFO) << "Disconnecting HDFS file system";
int disconnectResult = hdfsDisconnect(hdfsClient_);
int disconnectResult = driver_->Disconnect(hdfsClient_);
if (disconnectResult != 0) {
LOG(WARNING) << "hdfs disconnect failure in HdfsReadFile close: "
<< errno;
Expand All @@ -54,8 +60,13 @@ class HdfsFileSystem::Impl {
return hdfsClient_;
}

filesystems::arrow::io::internal::LibHdfsShim* hdfsShim() {
return driver_;
}

private:
hdfsFS hdfsClient_;
filesystems::arrow::io::internal::LibHdfsShim* driver_;
};

HdfsFileSystem::HdfsFileSystem(
Expand All @@ -79,13 +90,15 @@ std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
path.remove_prefix(index);
}

return std::make_unique<HdfsReadFile>(impl_->hdfsClient(), path);
return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}

std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
return std::make_unique<HdfsWriteFile>(impl_->hdfsClient(), path);
return std::make_unique<HdfsWriteFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
}

bool HdfsFileSystem::isHdfsFile(const std::string_view filePath) {
Expand Down
5 changes: 5 additions & 0 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
*/
#include "velox/common/file/FileSystems.h"

namespace velox::filesystems::arrow::io::internal {
class LibHdfsShim;
}

namespace facebook::velox::filesystems {

struct HdfsServiceEndpoint {
HdfsServiceEndpoint(const std::string& hdfsHost, const std::string& hdfsPort)
: host(hdfsHost), port(hdfsPort) {}
Expand Down
64 changes: 56 additions & 8 deletions velox/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,65 @@

#include "HdfsReadFile.h"
#include <folly/synchronization/CallOnce.h>
#include <hdfs/hdfs.h>
#include "velox/external/hdfs/ArrowHdfsInternal.h"

namespace facebook::velox {

HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)
: hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = hdfsGetPathInfo(hdfsClient_, filePath_.data());
struct HdfsFile {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS client_;
hdfsFile handle_;

HdfsFile() : driver_(nullptr), client_(nullptr), handle_(nullptr) {}
~HdfsFile() {
if (handle_ && driver_->CloseFile(client_, handle_) == -1) {
LOG(ERROR) << "Unable to close file, errno: " << errno;
}
}

void open(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS client,
const std::string& path) {
driver_ = driver;
client_ = client;
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
VELOX_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
path,
driver_->GetLastExceptionRootCause());
}

void seek(uint64_t offset) const {
VELOX_CHECK_EQ(
driver_->Seek(client_, handle_, offset),
0,
"Cannot seek through HDFS file, error is : {}",
driver_->GetLastExceptionRootCause());
}

int32_t read(char* pos, uint64_t length) const {
auto bytesRead = driver_->Read(client_, handle_, pos, length);
VELOX_CHECK(bytesRead >= 0, "Read failure in HDFSReadFile::preadInternal.");
return bytesRead;
}
};

HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
const std::string_view path)
: driver_(driver), hdfsClient_(hdfs), filePath_(path) {
fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
auto error = hdfsGetLastError();
auto error = fmt::format(
"FileNotFoundException: Path {} does not exist.", filePath_);
auto errMsg = fmt::format(
"Unable to get file path info for file: {}. got error: {}",
filePath_,
error);
if (std::strstr(error, "FileNotFoundException") != nullptr) {
if (error.find("FileNotFoundException") != std::string::npos) {
VELOX_FILE_NOT_FOUND_ERROR(errMsg);
}
VELOX_FAIL(errMsg);
Expand All @@ -38,14 +83,17 @@ HdfsReadFile::HdfsReadFile(hdfsFS hdfs, const std::string_view path)

HdfsReadFile::~HdfsReadFile() {
// should call hdfsFreeFileInfo to avoid memory leak
hdfsFreeFileInfo(fileInfo_, 1);
if (fileInfo_) {
driver_->FreeFileInfo(fileInfo_, 1);
}
}

void HdfsReadFile::preadInternal(uint64_t offset, uint64_t length, char* pos)
const {
checkFileReadParameters(offset, length);
folly::ThreadLocal<HdfsFile> file_;
if (!file_->handle_) {
file_->open(hdfsClient_, filePath_);
file_->open(driver_, hdfsClient_, filePath_);
}
file_->seek(offset);
uint64_t totalBytesRead = 0;
Expand Down
Loading

0 comments on commit 701cbec

Please sign in to comment.