Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiayi771 committed Apr 2, 2024
1 parent c4dd224 commit c8cd7a2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <random>

#include "velox/exec/tests/SimpleAggregateFunctionsRegistration.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand All @@ -34,72 +35,60 @@ class CollectListAggregateTest : public AggregationTestBase {
AggregationTestBase::SetUp();
registerAggregateFunctions("spark_");
}

RowVectorPtr fuzzFlat(const RowTypePtr& rowType, size_t size) {
VectorFuzzer::Options options;
options.vectorSize = size;
VectorFuzzer fuzzer(options, pool());
return fuzzer.fuzzInputFlatRow(rowType);
}
};

TEST_F(CollectListAggregateTest, groupBy) {
constexpr int32_t kNumGroups = 10;
std::vector<RowVectorPtr> batches;
batches.push_back(
fuzzFlat(ROW({"c0", "a"}, {INTEGER(), ARRAY(VARCHAR())}), 100));
auto keys = batches[0]->childAt(0)->as<FlatVector<int32_t>>();
auto values = batches[0]->childAt(1)->as<ArrayVector>();
for (auto i = 0; i < keys->size(); ++i) {
if (i % 10 == 0) {
keys->setNull(i, true);
} else {
keys->set(i, i % kNumGroups);
}

if (i % 7 == 0) {
values->setNull(i, true);
}
}

for (auto i = 0; i < 9; ++i) {
batches.push_back(batches[0]);
// Creating 3 batches of input data.
// 0: {0, null} {0, 1} {0, 2}
// 1: {1, 1} {1, null} {1, 3}
// 2: {2, 2} {2, 3} {2, null}
// 3: {3, 3} {3, 4} {3, 5}
// 4: {4, 4} {4, 5} {4, 6}
for (auto i = 0; i < 3; i++) {
RowVectorPtr data = makeRowVector(
{makeFlatVector<int32_t>(
5, [](const vector_size_t& row) { return row; }),
makeFlatVector<int64_t>(
5,
[&i](const vector_size_t& row) { return i + row; },
[&i](const auto& row) { return i == row; })});
batches.push_back(data);
}

createDuckDbTable(batches);
testAggregations(
batches,
{"c0"},
{"spark_collect_list(a)"},
{"spark_collect_list(c1)"},
{"c0", "array_sort(a0)"},
"SELECT c0, array_sort(array_agg(a)"
"filter (where a is not null)) FROM tmp GROUP BY c0");
"SELECT c0, array_sort(array_agg(c1)"
"filter (where c1 is not null)) FROM tmp GROUP BY c0");
testAggregationsWithCompanion(
batches,
[](auto& /*builder*/) {},
{"c0"},
{"spark_collect_list(a)"},
{{ARRAY(VARCHAR())}},
{"spark_collect_list(c1)"},
{{ARRAY(BIGINT())}},
{"c0", "array_sort(a0)"},
"SELECT c0, array_sort(array_agg(a)"
"filter (where a is not null)) FROM tmp GROUP BY c0");
"SELECT c0, array_sort(array_agg(c1)"
"filter (where c1 is not null)) FROM tmp GROUP BY c0");
}

TEST_F(CollectListAggregateTest, global) {
vector_size_t size = 10;
std::vector<RowVectorPtr> vectors = {makeRowVector({makeFlatVector<int32_t>(
size, [](vector_size_t row) { return row * 2; }, nullEvery(3))})};
auto data = makeRowVector({makeNullableFlatVector<int32_t>(
{std::nullopt, 1, 2, std::nullopt, 4, 5})});

createDuckDbTable(vectors);
createDuckDbTable({data});
testAggregations(
vectors,
{data},
{},
{"spark_collect_list(c0)"},
{"array_sort(a0)"},
"SELECT array_sort(array_agg(c0)"
"filter (where c0 is not null)) FROM tmp");
testAggregationsWithCompanion(
vectors,
{data},
[](auto& /*builder*/) {},
{},
{"spark_collect_list(c0)"},
Expand All @@ -111,9 +100,10 @@ TEST_F(CollectListAggregateTest, global) {

TEST_F(CollectListAggregateTest, ignoreNulls) {
auto input = makeRowVector({makeNullableFlatVector<int32_t>(
{1, 2, std::nullopt, 4, std::nullopt, 6}, INTEGER())});
{1, 2, std::nullopt, 4, std::nullopt, 6})});
// Spark will ignore all null values in the input.
auto expected = makeRowVector({makeArrayVector<int32_t>({{1, 2, 4, 6}})});
auto expected =
makeRowVector({makeArrayVectorFromJson<int32_t>({"[1, 2, 4, 6]"})});
testAggregations(
{input}, {}, {"spark_collect_list(c0)"}, {"array_sort(a0)"}, {expected});
testAggregationsWithCompanion(
Expand All @@ -128,11 +118,9 @@ TEST_F(CollectListAggregateTest, ignoreNulls) {
}

TEST_F(CollectListAggregateTest, allNullsInput) {
std::vector<std::optional<int64_t>> allNull(100, std::nullopt);
auto input =
makeRowVector({makeNullableFlatVector<int64_t>(allNull, BIGINT())});
auto input = makeRowVector({makeAllNullFlatVector<int64_t>(100)});
// If all input data is null, Spark will output an empty array.
auto expected = makeRowVector({makeArrayVector<int32_t>({{}})});
auto expected = makeRowVector({makeArrayVectorFromJson<int32_t>({"[]"})});
testAggregations({input}, {}, {"spark_collect_list(c0)"}, {expected});
testAggregationsWithCompanion(
{input},
Expand Down
40 changes: 16 additions & 24 deletions velox/functions/sparksql/fuzzer/SparkAggregationFuzzerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
#include "velox/exec/fuzzer/AggregationFuzzerOptions.h"
#include "velox/exec/fuzzer/AggregationFuzzerRunner.h"
#include "velox/exec/fuzzer/DuckQueryRunner.h"
#include "velox/exec/fuzzer/TransformResultVerifier.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/sparksql/aggregates/Register.h"

DEFINE_int64(
Expand All @@ -50,20 +48,13 @@ int main(int argc, char** argv) {
// experience, and initialize glog and gflags.
folly::Init init(&argc, &argv);

facebook::velox::functions::prestosql::registerInternalFunctions();
facebook::velox::memory::MemoryManager::initialize({});

// TODO: List of the functions that at some point crash or fail and need to
// be fixed before we can enable. Constant argument of bloom_filter_agg cause
// fuzzer test fail.
std::unordered_set<std::string> skipFunctions = {"bloom_filter_agg"};

using facebook::velox::exec::test::TransformResultVerifier;

auto makeArrayVerifier = []() {
return TransformResultVerifier::create("\"$internal$canonicalize\"({})");
};

// The results of the following functions depend on the order of input
// rows. For some functions, the result can be transformed to a value that
// doesn't depend on the order of inputs. If such transformation exists, it
Expand All @@ -80,25 +71,26 @@ int main(int argc, char** argv) {
{"max_by", nullptr},
{"min_by", nullptr},
{"skewness", nullptr},
{"kurtosis", nullptr},
{"collect_list", makeArrayVerifier()}};
{"kurtosis", nullptr}};

size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
auto duckQueryRunner =
std::make_unique<facebook::velox::exec::test::DuckQueryRunner>();
duckQueryRunner->disableAggregateFunctions({
// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis",
});
duckQueryRunner->disableAggregateFunctions(
{// https://github.com/facebookincubator/velox/issues/7677
"max_by",
"min_by",
// The skewness functions of Velox and DuckDB use different
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis",
// When all data in a group are null, Spark returns an empty array while
// DuckDB returns null.
"collect_list"});

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
using Options = facebook::velox::exec::test::AggregationFuzzerOptions;
Expand Down

0 comments on commit c8cd7a2

Please sign in to comment.