Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): add support for FetchRel offset field #296

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

andrew-coleman
Copy link
Contributor

Add missing support for the ‘offset’ clause in the spark module.

if (fetch.getCount.isPresent) {
val limit = Literal(fetch.getCount.getAsLong.intValue(), IntegerType)
fetch.getOffset match {
case 1L => GlobalLimit(limitExpr = limit, child = child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw this is just misusing the field, but I know it was there already so not a problem for this PR 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this, you're using offset 1 or -1 as a switching field? What's needed to not do this? At the very least, could we used something like -2 to signal this to avoid using a valid offset value as a magic number?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blizzara Do you know if there's anything outside of substrait-spark that's relying on this magic number, or is it something that we can change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of anything else, I'd be happy to see it changed! (I had already changed it in our internal fork of substrait-spark, but the code of this PR is nicer than mine 🎉 )

@vbarua vbarua changed the title feat(spark): add support for ‘offset’ clause feat(spark): add support for FetchRel offset field Sep 27, 2024
fetch.getOffset match {
case 1L => GlobalLimit(limitExpr = limit, child = child)
case -1L => LocalLimit(limitExpr = limit, child = child)
case _ => visitFallback(fetch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Spark not handle the case where both the offset and count are set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OFFSET is supported as of Spark 3.4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can Spark not handle the case where both the offset and count are set?

Yes, the unit test in this PR sets limit and offset. Spark handles limit by breaking it into two logical relations LocalLimit and GlobalLimit (there's an explanation here).

In the case of the test query in this PR:

select l_partkey from lineitem where l_shipdate < date '1998-01-01' " +
        "order by l_shipdate asc, l_discount desc limit 100 offset 1000

the spark logical plan is:

    GlobalLimit 100
    +- Offset 1000
       +- LocalLimit 1100
          +- Project [l_partkey#7997L]
             +- Sort [l_shipdate#8006 ASC NULLS FIRST, l_discount#8002 DESC NULLS LAST], true
                +- Project [l_partkey#7997L, l_shipdate#8006, l_discount#8002]
                   +- Filter (isnotnull(l_shipdate#8006) AND (l_shipdate#8006 < 1998-01-01))
                      +- Relation ...

I'm not sure how this would best translate into a Substrait plan.

The handling of the limit clause was already in the codebase as inherited from Gluten. As @Blizzara noted, this PR adds the offset. But I can understand that this whole area needs rethinking.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on what you've indicated, is converting FetchRel(count=X, offset = Y, <input>) into:

    GlobalLimit X
    +- Offset Y
       +- LocalLimit X + Y
          + <converted input>

viable?

Generally speaking, Substrait consumers should be written in such a way that they accept all valid Substrait plans, even those not built specifically by or for Spark in this case. It's perfectly valid to set both fields of the FetchRel, even if Spark doesn't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

viable?

Could be... I'll do some more testing to get a better understanding of this.

Generally speaking, ...

Completely agree. One of the weaknesses of the current test suite it that is relies entirely on round-tripping the query plans. I need to improve on this, somehow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From reading the Spark PRs here is my understanding.

When converting from Substrait -> Spark:

Fetch(limit>=0, offset>0) => LocalLimit -> GlobalLimitAndOffset
Fetch(limit<0, offset>0) => Offset
Fetch(limit>=0, offset==0) => LocalLimit -> GlobalLimit

When converting from Spark -> Substrait then the following logic should work:

if current_node == LocalLimit:
  if next_node == GlobalLimitAndOffset:
    if current_node.limit < next_node.limit:
      raise Exception("this plan cannot be converted into Substrait")
    else:
      yield Fetch(limit=next_node.limit, offset=next_node.offset)
      skip(next_node)
  elif next_node == GlobalLimit:
    if current_node.limit < next_node.limit:
      raise Exception("this plan cannot be converted into Substrait")
    else:
      yield Fetch(limit=next_node.limit, offset=0)
      skip(next_node)
  else:
    raise Exception("this plan cannot be converted into Substrait")

# This cannot have been preceded by LocalLimit because we would have
# skipped it above so it is just a global limit
if current_node == GlobalLimit:
  yield Fetch(limit=current_node.limit, offset=0)

if current_node == Offset:
  yield Fetch(limit=-1, offset=current_node.offset)

if current_node == GlobalLimitAndOffset:
  yield Fetch(limit=current_node.limit, offset=current_node.offset)

The above logic should always round trip successfully (unless I made a mistake in my understanding)

The case where there is either a LocalLimit alone or a LocalLimit followed by GlobalLimit or GlobalLimitAndOffset but the LocalLimit's limit is smaller cannot be supported today.

This plan implements a per-group limit and the closest equivalent Substrait plan would be something like...

┌──────┐  ┌───────────────────────────┐  ┌───────┐          ┌───────────┐
│ SCAN │  │ FILTER (partition_id = 1) │  │ LIMIT │ ────┬────│ UNION ALL │
└──────┘  └───────────────────────────┘  └───────┘     │    └───────────┘
                                                       │                 
                                                       │                 
┌──────┐  ┌───────────────────────────┐  ┌───────┐     │                 
│ SCAN │  │ FILTER (partition_id = 2) │  │ LIMIT │ ────┼                 
└──────┘  └───────────────────────────┘  └───────┘     │                 
                                                       │                 
                                                       │                 
                                                       │                 
                  ...                                  │                 
                                                       │                 
                                                       │                 
                                                       │                 
┌──────┐  ┌───────────────────────────┐  ┌───────┐     │                 
│ SCAN │  │ FILTER (partition_id = N) │  │ LIMIT │ ────┘                 
└──────┘  └───────────────────────────┘  └───────┘                       

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @westonpace, that's really helpful. I think I've captured that in the amended commit, and added a couple of extra tests

Add missing support for the ‘offset’ clause in the spark module.

Signed-off-by: Andrew Coleman <andrew_coleman@uk.ibm.com>
@andrew-coleman
Copy link
Contributor Author

@vbarua I believe I have addressed all the comments and this is ready for re-review. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants