Skip to content

Commit

Permalink
Move ValueList to functions/lib/aggregates for Spark reuse (#9213)
Browse files Browse the repository at this point in the history
Summary:
The array_agg function in Spark and Presto has some inconsistent behavior
when handling null values. We need to re-implement Spark's array_agg in Velox.
To be able to reuse `ValueList`, it needs to be moved to the
`functions/lib/aggregates` directory first.

The `copyValueListToArrayWriter` method has also been moved to the
`ValueList`, making it reusable.

Pull Request resolved: #9213

Reviewed By: Yuhta

Differential Revision: D55406723

Pulled By: pedroerp

fbshipit-source-id: 7447cfd12f18d9ca4c474579da6a55a876f97ec1
  • Loading branch information
liujiayi771 authored and facebook-github-bot committed Mar 28, 2024
1 parent f240905 commit 397bc05
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 31 deletions.
22 changes: 1 addition & 21 deletions velox/exec/tests/SimpleArrayAggAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,13 @@
#include "velox/exec/SimpleAggregateAdapter.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/expression/VectorWriters.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/functions/lib/aggregates/ValueList.h"

using namespace facebook::velox::exec;

namespace facebook::velox::aggregate {

namespace {

// Write ValueList accumulators to Array-typed intermediate or final result
// vectors.
// TODO: This API only works if it is the only logic writing to `writer`.
template <typename T>
void copyValueListToArrayWriter(ArrayWriter<T>& writer, ValueList& elements) {
writer.resetLength();
auto size = elements.size();
if (size == 0) {
return;
}
writer.reserve(size);

ValueListReader reader(elements);
for (vector_size_t i = 0; i < size; ++i) {
reader.next(*writer.elementsVector(), writer.valuesOffset() + i);
}
writer.resize(size);
}

class ArrayAggAggregate {
public:
// Type(s) of input vector(s) wrapped in Row.
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/lib/aggregates/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_library(
velox_functions_aggregates
AverageAggregateBase.cpp CentralMomentsAggregatesBase.cpp
SingleValueAccumulator.cpp ValueSet.cpp)
SingleValueAccumulator.cpp ValueList.cpp ValueSet.cpp)

target_link_libraries(velox_functions_aggregates velox_exec
velox_presto_serializer Folly::folly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include "velox/exec/ContainerRowSerde.h"

namespace facebook::velox::aggregate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/exec/Aggregate.h"
#include "velox/expression/ComplexViewTypes.h"
#include "velox/expression/ComplexWriterTypes.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"

Expand Down Expand Up @@ -132,4 +133,25 @@ class ValueListReader {
vector_size_t pos_{0};
};

// Write ValueList accumulators to Array-typed intermediate or final result
// vectors.
// TODO: This API only works if it is the only logic writing to `writer`.
template <typename T>
void copyValueListToArrayWriter(
facebook::velox::exec::ArrayWriter<T>& writer,
ValueList& elements) {
writer.resetLength();
auto size = elements.size();
if (size == 0) {
return;
}
writer.reserve(size);

ValueListReader reader(elements);
for (vector_size_t i = 0; i < size; ++i) {
reader.next(*writer.elementsVector(), writer.valuesOffset() + i);
}
writer.resize(size);
}

} // namespace facebook::velox::aggregate
14 changes: 14 additions & 0 deletions velox/functions/lib/aggregates/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,17 @@
# limitations under the License.

add_subdirectory(utils)

add_executable(velox_functions_aggregates_test ValueListTest.cpp)

add_test(NAME velox_functions_aggregates_test
COMMAND velox_functions_aggregates_test)

target_link_libraries(
velox_functions_aggregates_test
velox_functions_aggregates
velox_functions_test_lib
Folly::folly
gtest
gtest_main
glog::glog)
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include <gtest/gtest.h>
#include "velox/functions/prestosql/tests/utils/FunctionBaseTest.h"
#include "velox/vector/tests/utils/VectorMaker.h"

using namespace facebook::velox;
using namespace facebook::velox::test;
Expand Down
3 changes: 1 addition & 2 deletions velox/functions/prestosql/aggregates/ArrayAggAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
*/
#include "velox/exec/ContainerRowSerde.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/vector/ComplexVector.h"

namespace facebook::velox::aggregate::prestosql {
namespace {
Expand Down
1 change: 0 additions & 1 deletion velox/functions/prestosql/aggregates/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ add_library(
SetAggregates.cpp
SumAggregate.cpp
SumDataSizeForStatsAggregate.cpp
ValueList.cpp
VarianceAggregates.cpp)

target_link_libraries(
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/prestosql/aggregates/MapAccumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "velox/common/memory/HashStringAllocator.h"
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Strings.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/FlatVector.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/Strings.h"
#include "velox/functions/lib/aggregates/ValueList.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/functions/prestosql/aggregates/ValueList.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::aggregate::prestosql {
Expand Down
1 change: 0 additions & 1 deletion velox/functions/prestosql/aggregates/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ add_executable(
SetUnionTest.cpp
SumDataSizeForStatsTest.cpp
SumTest.cpp
ValueListTest.cpp
VarianceAggregationTest.cpp)

add_test(
Expand Down

0 comments on commit 397bc05

Please sign in to comment.