Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for collect_list Spark aggregate function #9231

Closed

Conversation

liujiayi771
Copy link
Contributor

The semantics of Spark's collect_list and Presto's array_agg are
generally consistent, but there are inconsistencies in the handling of null
values. Spark always ignores null values in the input, whereas Presto has a
parameter that controls whether to retain them. Moreover, Presto returns null
when all inputs are null, while Spark returns an empty array.

Because of these differences, we need to re-implement the array_agg
function for Spark.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Mar 24, 2024
Copy link

netlify bot commented Mar 24, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 3b0a10a
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/660c18eda477fb00081a6f8e

@liujiayi771 liujiayi771 force-pushed the spark_array_agg branch 6 times, most recently from 059061f to c428066 Compare March 29, 2024 10:38
@liujiayi771 liujiayi771 marked this pull request as ready for review March 29, 2024 11:47
@liujiayi771
Copy link
Contributor Author

@mbasmanova Could you help review?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Would you add documentation for this function?

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Looks good overall % comments for the tests.

{"c0", "array_sort(a0)"},
"SELECT c0, array_sort(array_agg(a)"
"filter (where a is not null)) FROM tmp GROUP BY c0");
testAggregationsWithCompanion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any particular reasons companion function testing is not included as part of testAggregations? testAggregationsWithCompanion calls appear too verbose and repetitive.

CC: @kagamiori

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Would you take a look at this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I think the possible reason is that some aggregate functions have not registered the companion functions due to certain restrictions, such as when isResultTypeResolvableGivenIntermediateType is false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't understand is why we need to pass [](auto& /*builder*/) {}, and {{BIGINT()}}, to testAggregationsWithCompanion and why do we need to call both testAggregations and testAggregationsWithCompanion.

Why can't we just call

testAggregationsWithCompanion(
      batches,
      {"c0"},
      {"spark_collect_list(c1)"},
      {"c0", "array_sort(a0)"},
      "SELECT c0, array_sort(array_agg(c1)"
      "filter (where c1 is not null)) FROM tmp GROUP BY c0");

and have it test both regular functions as well as companion functions.

CC: @kagamiori

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is better, and the config parameter is also not necessary. Right now, many tests are calling testAggregations followed by testAggregationsWithCompanion. We need to combine these two test functions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this refactoring in a follow-up.

{},
{"spark_collect_list(c0)"},
{"array_sort(a0)"},
"SELECT array_sort(array_agg(c0)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simple cases like this, it might be better to provide expected results:

auto expected = makeRowVector({
   makeArrayVectorFromJson<int32_t>({"[1, 2, 4, 5]"});
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this would be more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova Will the result become unstable if we do not use agg_sort?

Failed
Expected 1, got 1
1 extra rows, 1 missing rows
1 of extra rows:
	[4,1,5,2]

1 of missing rows:
	[1,2,4,5]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or can I assume that the output will remain stable as [4,1,5,2]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still use array_sort to ensure results are stable:

  testAggregations(
      {data},
      {},
      {"spark_collect_list(c0)"},
      {"array_sort(a0)"},
      {expected});

@@ -88,6 +88,9 @@ int main(int argc, char** argv) {
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis",
// When all data in a group are null, Spark returns an empty array while
// DuckDB returns null.
"collect_list",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I think this function should not be compared with DuckDB. If the fuzzer generates a group where all the data is null, DuckDB's result will be null, while Spark will return an empty array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We need to change Fuzzer to verify results against Spark, not DuckDB: #9270

@liujiayi771
Copy link
Contributor Author

@mbasmanova Addressed the comments for the tests.

velox/docs/functions/spark/aggregate.rst Outdated Show resolved Hide resolved
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <random>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all these includes needed. Looks like some can be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have cleaned up the "include" and "using namespace" statements.

{"c0", "array_sort(a0)"},
"SELECT c0, array_sort(array_agg(a)"
"filter (where a is not null)) FROM tmp GROUP BY c0");
testAggregationsWithCompanion(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liujiayi771 Would you take a look at this comment?

{},
{"spark_collect_list(c0)"},
{"array_sort(a0)"},
"SELECT array_sort(array_agg(c0)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this would be more readable.

Copy link
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@facebook-github-bot
Copy link
Contributor

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@@ -34,6 +35,7 @@ target_link_libraries(
velox_functions_aggregates_test_lib
velox_functions_spark_aggregates
velox_hive_connector
velox_vector_fuzzer
Copy link
Contributor Author

@liujiayi771 liujiayi771 Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova I noticed that there's an omission here that hasn't been removed. I have removed it. Please help to re-import.

@facebook-github-bot
Copy link
Contributor

@mbasmanova has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@mbasmanova merged this pull request in 1ba16a9.

Copy link

Conbench analyzed the 1 benchmark run on commit 1ba16a96.

There were no benchmark performance regressions. 🎉

The full Conbench report has more details.

Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Apr 4, 2024
…tor#9231)

Summary:
The semantics of Spark's `collect_list` and Presto's `array_agg` are
generally consistent, but there are inconsistencies in the handling of null
values. Spark always ignores null values in the input, whereas Presto has a
parameter that controls whether to retain them. Moreover, Presto returns null
when all inputs are null, while Spark returns an empty array.

Because of these differences, we need to re-implement the `array_agg`
function for Spark.

Pull Request resolved: facebookincubator#9231

Reviewed By: xiaoxmeng

Differential Revision: D55639676

Pulled By: mbasmanova

fbshipit-source-id: 958471779a1fa66dba27569a6c12538ad5489f46
facebook-github-bot pushed a commit that referenced this pull request Apr 4, 2024
#9361)

Summary:
In #9231, `collect_list` is added to the disable list of `duckQueryRunner`.
However, this is unnecessary because DuckDB does not have an aggregate function
named `collect_list`, hence it would not be compared against DuckDB. This
setting is redundant.

Other than this, the results verification of `collect_list` has been set to
`nullptr`, so its results are not verified. But we can use a custom array
verifier used by Presto's `array_agg` to check the results of itself.

Pull Request resolved: #9361

Reviewed By: xiaoxmeng

Differential Revision: D55744044

Pulled By: mbasmanova

fbshipit-source-id: a1a94c58b2a01463261775d8b6e08b65fd986d29
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
…tor#9231)

Summary:
The semantics of Spark's `collect_list` and Presto's `array_agg` are
generally consistent, but there are inconsistencies in the handling of null
values. Spark always ignores null values in the input, whereas Presto has a
parameter that controls whether to retain them. Moreover, Presto returns null
when all inputs are null, while Spark returns an empty array.

Because of these differences, we need to re-implement the `array_agg`
function for Spark.

Pull Request resolved: facebookincubator#9231

Reviewed By: xiaoxmeng

Differential Revision: D55639676

Pulled By: mbasmanova

fbshipit-source-id: 958471779a1fa66dba27569a6c12538ad5489f46
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this pull request Jun 7, 2024
facebookincubator#9361)

Summary:
In facebookincubator#9231, `collect_list` is added to the disable list of `duckQueryRunner`.
However, this is unnecessary because DuckDB does not have an aggregate function
named `collect_list`, hence it would not be compared against DuckDB. This
setting is redundant.

Other than this, the results verification of `collect_list` has been set to
`nullptr`, so its results are not verified. But we can use a custom array
verifier used by Presto's `array_agg` to check the results of itself.

Pull Request resolved: facebookincubator#9361

Reviewed By: xiaoxmeng

Differential Revision: D55744044

Pulled By: mbasmanova

fbshipit-source-id: a1a94c58b2a01463261775d8b6e08b65fd986d29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants