Skip to content

Commit

Permalink
Add HiveConnectorSplit Serde
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Oct 7, 2024
1 parent 96944d5 commit 9a73e97
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 16 deletions.
7 changes: 6 additions & 1 deletion velox/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class DataSource;

/// A split represents a chunk of data that a connector should load and return
/// as a RowVectorPtr, potentially after processing pushdowns.
struct ConnectorSplit {
struct ConnectorSplit : public ISerializable {
const std::string connectorId;
const int64_t splitWeight{0};

Expand All @@ -59,6 +59,11 @@ struct ConnectorSplit {
int64_t _splitWeight = 0)
: connectorId(_connectorId), splitWeight(_splitWeight) {}

folly::dynamic serialize() const override {
VELOX_UNSUPPORTED();
return nullptr;
}

virtual ~ConnectorSplit() {}

virtual std::string toString() const {
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ velox_add_library(
HiveConfig.cpp
HiveConnector.cpp
HiveConnectorUtil.cpp
HiveConnectorSplit.cpp
HiveDataSink.cpp
HiveDataSource.cpp
HivePartitionUtil.cpp
Expand Down
166 changes: 166 additions & 0 deletions velox/connectors/hive/HiveConnectorSplit.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/connectors/hive/HiveConnectorSplit.h"

namespace facebook::velox::connector::hive {

std::string HiveConnectorSplit::toString() const {
if (tableBucketNumber.has_value()) {
return fmt::format(
"Hive: {} {} - {} {}",
filePath,
start,
length,
tableBucketNumber.value());
}
return fmt::format("Hive: {} {} - {}", filePath, start, length);
}

std::string HiveConnectorSplit::getFileName() const {
auto i = filePath.rfind('/');
return i == std::string::npos ? filePath : filePath.substr(i + 1);
}

folly::dynamic HiveConnectorSplit::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "HiveConnectorSplit";
obj["connectorId"] = connectorId;
obj["splitWeight"] = splitWeight;
obj["filePath"] = filePath;
obj["fileFormat"] = dwio::common::toString(fileFormat);
obj["start"] = start;
obj["length"] = length;

folly::dynamic partitionKeysObj = folly::dynamic::object;
for (const auto& [key, value] : partitionKeys) {
partitionKeysObj[key] =
value.has_value() ? folly::dynamic(value.value()) : nullptr;
}
obj["partitionKeys"] = partitionKeysObj;

obj["tableBucketNumber"] = tableBucketNumber.has_value()
? folly::dynamic(tableBucketNumber.value())
: nullptr;

folly::dynamic customSplitInfoObj = folly::dynamic::object;
for (const auto& [key, value] : customSplitInfo) {
customSplitInfoObj[key] = value;
}
obj["customSplitInfo"] = customSplitInfoObj;
obj["extraFileInfo"] = *extraFileInfo;

folly::dynamic serdeParametersObj = folly::dynamic::object;
for (const auto& [key, value] : serdeParameters) {
serdeParametersObj[key] = value;
}
obj["serdeParameters"] = serdeParametersObj;

folly::dynamic infoColumnsObj = folly::dynamic::object;
for (const auto& [key, value] : infoColumns) {
infoColumnsObj[key] = value;
}
obj["infoColumns"] = infoColumnsObj;

if (properties.has_value()) {
folly::dynamic propertiesObj = folly::dynamic::object;
propertiesObj["fileSize"] = properties->fileSize.has_value()
? folly::dynamic(properties->fileSize.value())
: nullptr;
propertiesObj["modificationTime"] = properties->modificationTime.has_value()
? folly::dynamic(properties->modificationTime.value())
: nullptr;
obj["properties"] = propertiesObj;
} else {
obj["properties"] = nullptr;
}

return obj;
}

// static
std::shared_ptr<HiveConnectorSplit> HiveConnectorSplit::create(
const folly::dynamic& obj) {
const auto connectorId = obj["connectorId"].asString();
const auto splitWeight = obj["splitWeight"].asInt();
const auto filePath = obj["filePath"].asString();
const auto fileFormat =
dwio::common::toFileFormat(obj["fileFormat"].asString());
const auto start = static_cast<uint64_t>(obj["start"].asInt());
const auto length = static_cast<uint64_t>(obj["length"].asInt());

std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
for (const auto& [key, value] : obj["partitionKeys"].items()) {
partitionKeys[key.asString()] = value.isNull()
? std::nullopt
: std::optional<std::string>(value.asString());
}

const auto tableBucketNumber = obj["tableBucketNumber"].isNull()
? std::nullopt
: std::optional<int32_t>(obj["tableBucketNumber"].asInt());

std::unordered_map<std::string, std::string> customSplitInfo;
for (const auto& [key, value] : obj["customSplitInfo"].items()) {
customSplitInfo[key.asString()] = value.asString();
}

std::shared_ptr<std::string> extraFileInfo =
std::make_shared<std::string>(obj["extraFileInfo"].asString());
std::unordered_map<std::string, std::string> serdeParameters;
for (const auto& [key, value] : obj["serdeParameters"].items()) {
serdeParameters[key.asString()] = value.asString();
}

std::unordered_map<std::string, std::string> infoColumns;
for (const auto& [key, value] : obj["infoColumns"].items()) {
infoColumns[key.asString()] = value.asString();
}

std::optional<FileProperties> properties = std::nullopt;
const auto propertiesObj = obj["properties"];
if (!propertiesObj.isNull()) {
properties = FileProperties{
propertiesObj["fileSize"].isNull()
? std::nullopt
: std::optional(propertiesObj["fileSize"].asInt()),
propertiesObj["modificationTime"].isNull()
? std::nullopt
: std::optional(propertiesObj["modificationTime"].asInt())};
}

return std::make_shared<HiveConnectorSplit>(
connectorId,
filePath,
fileFormat,
start,
length,
partitionKeys,
tableBucketNumber,
customSplitInfo,
extraFileInfo,
serdeParameters,
splitWeight,
infoColumns,
properties);
}

// static
void HiveConnectorSplit::registerSerDe() {
auto& registry = DeserializationRegistryForSharedPtr();
registry.Register("HiveConnectorSplit", HiveConnectorSplit::create);
}
} // namespace facebook::velox::connector::hive
23 changes: 8 additions & 15 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,15 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
infoColumns(_infoColumns),
properties(_properties) {}

std::string toString() const override {
if (tableBucketNumber.has_value()) {
return fmt::format(
"Hive: {} {} - {} {}",
filePath,
start,
length,
tableBucketNumber.value());
}
return fmt::format("Hive: {} {} - {}", filePath, start, length);
}
std::string toString() const override;

std::string getFileName() const {
auto i = filePath.rfind('/');
return i == std::string::npos ? filePath : filePath.substr(i + 1);
}
std::string getFileName() const;

folly::dynamic HiveConnectorSplit::serialize() const override;

static std::shared_ptr<HiveConnectorSplit> create(const folly::dynamic& obj);

static void registerSerDe();
};

} // namespace facebook::velox::connector::hive
76 changes: 76 additions & 0 deletions velox/connectors/hive/tests/HiveConnectorSerDeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
HiveInsertTableHandle::registerSerDe();
HiveBucketProperty::registerSerDe();
HiveSortingColumn::registerSerDe();
HiveConnectorSplit::registerSerDe();
}

template <typename T>
Expand All @@ -63,6 +64,44 @@ class HiveConnectorSerDeTest : public exec::test::HiveConnectorTestBase {
ASSERT_TRUE(filter->testingEquals(*cloneFilters.at(subfield)));
}
}

static void testSerde(const HiveConnectorSplit& split) {
const auto str = split.toString();
const auto obj = split.serialize();
const auto clone = ISerializable::deserialize<HiveConnectorSplit>(obj);
ASSERT_EQ(clone->toString(), str);
ASSERT_EQ(split.partitionKeys.size(), clone->partitionKeys.size());
for (const auto& [key, value] : split.partitionKeys) {
ASSERT_EQ(value, clone->partitionKeys.at(key));
}

ASSERT_EQ(split.tableBucketNumber, clone->tableBucketNumber);
ASSERT_EQ(split.customSplitInfo.size(), clone->customSplitInfo.size());
for (const auto& [key, value] : split.customSplitInfo) {
ASSERT_EQ(value, clone->customSplitInfo.at(key));
}

ASSERT_EQ(*split.extraFileInfo, *clone->extraFileInfo);
ASSERT_EQ(split.serdeParameters.size(), clone->serdeParameters.size());
for (const auto& [key, value] : split.serdeParameters) {
ASSERT_EQ(value, clone->serdeParameters.at(key));
}

ASSERT_EQ(split.infoColumns.size(), clone->infoColumns.size());
for (const auto& [key, value] : split.infoColumns) {
ASSERT_EQ(value, clone->infoColumns.at(key));
}

if (split.properties.has_value()) {
ASSERT_TRUE(clone->properties.has_value());
ASSERT_EQ(split.properties->fileSize, clone->properties->fileSize);
ASSERT_EQ(
split.properties->modificationTime,
clone->properties->modificationTime);
} else {
ASSERT_FALSE(clone->properties.has_value());
}
}
};

TEST_F(HiveConnectorSerDeTest, hiveTableHandle) {
Expand Down Expand Up @@ -157,5 +196,42 @@ TEST_F(HiveConnectorSerDeTest, hiveInsertTableHandle) {
testSerde(*hiveInsertTableHandle);
}

TEST_F(HiveConnectorSerDeTest, hiveConnectorSplit) {
const auto connectorId = "testSerde";
constexpr auto splitWeight = 1;
constexpr auto filePath = "/testSerde/p";
constexpr auto fileFormat = dwio::common::FileFormat::DWRF;
constexpr auto start = 0;
constexpr auto length = 1024;
const std::unordered_map<std::string, std::optional<std::string>>
partitionKeys{{"p0", "0"}, {"p1", "1"}};
constexpr auto tableBucketNumber = std::optional<int32_t>(4);
const std::unordered_map<std::string, std::string> customSplitInfo{
{"s0", "0"}, {"s1", "1"}};
const auto extraFileInfo = std::make_shared<std::string>("testSerdeFileInfo");
const std::unordered_map<std::string, std::string> serdeParameters{
{"k1", "1"}, {"k2", "v2"}};
const std::unordered_map<std::string, std::string> infoColumns{
{"c0", "0"}, {"c1", "1"}};
FileProperties fileProperties{
.fileSize = 2048, .modificationTime = std::nullopt};
const auto properties = std::optional<FileProperties>(fileProperties);
const auto split = HiveConnectorSplit(
connectorId,
filePath,
fileFormat,
start,
length,
partitionKeys,
tableBucketNumber,
customSplitInfo,
extraFileInfo,
serdeParameters,
splitWeight,
infoColumns,
std::nullopt);
testSerde(split);
}

} // namespace
} // namespace facebook::velox::connector::hive::test

0 comments on commit 9a73e97

Please sign in to comment.