Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abstract over logical and physical plan representations in Ballista #1677

Merged
merged 16 commits into from
Feb 9, 2022

Conversation

thinkharderdev
Copy link
Contributor

@thinkharderdev thinkharderdev commented Jan 25, 2022

Which issue does this PR close?

Partially addresses apache/datafusion-ballista#8

Closes #.

Rationale for this change

This is an initial draft at how we can make Ballista able to leverage the extensibility of DataFusion. This PR in particular addresses a few items in apache/datafusion-ballista#8 :

  1. Abstracts the SchedulerServer over serializable representations of DataFusion LogicalPlans
  2. Adds a trait AsLogicalPlan which can be implemented by a user to shim a custom serialization into existing Ballista functionality.
  3. Adds a trait AsExecutionPlan which can be implemented by a user to shim a custom serialization into existing Ballista functionality.
  4. Makes the serde for LogicalPlan and ExecutionPlan "context aware" which should, in the immediate term help sort out Ballista does not support external file systems  datafusion-ballista#10
  5. Add a shared ExecutionContext to SchedulerServer and Executor to allow for registration of extensions (ObjectStore etc).

This is just an initial draft to demonstrate/validate the basic approach. Ideally we would extend this to:

  1. Create an explicit mechanism for embedding the scheduler/executor into a separate service.

What changes are included in this PR?

Are there any user-facing changes?

This shouldn't break anything using the existing executables as we default to the existing plan representation.

@thinkharderdev thinkharderdev changed the title Abstract over logical plan representation in Ballista Abstract over logical and physical plan representations in Ballista Jan 26, 2022
@thinkharderdev
Copy link
Contributor Author

Added some additional changes:

  1. Abstract over ExecutionPlan representation as well.
  2. Use a global ExecutionContext in SchedulerServer and Executor to allow registration of extensions at startup.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this proposal @thinkharderdev

While I am not an expert in Ballista and I will defer to others for this PR, I did have a few comments:

Using a trait that handled serializing an entire logical plan would allow use cases like using alternate serializers for the plan (like one could imagine using JSON for example), but I am not sure how it directly helps serializing extension points.

What would you think about something like the following:

/// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {

    /// Serializes a [UserDefinedLogicalNode] into an opaque set of
    /// bytes for transport over the network
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes, BallistaError>;

    /// Deserializes a set of bytes created by
    /// [serialize_extension_node] into a new instance of a
    /// [UserDefinedLogicalNode]
    fn deserialize_extension_node(bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>, BallistaError>;

    // TBD Similar functions for the other extension points
}

struct DefaultExtensionSerializer {
}

impl ExtensionSerializer for DefaultExtensionSerializer {
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes> {
        Err(BallistaError::NotImplemented(format!("Default serializer does not know how to serialize {:?}", node)))
    }

    fn deserialize_extension_node(_bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>> {
        Err(BallistaError::NotImplemented("Default serializer does not know how to deserialize user defined extensions".to_string()))
    }
}

And an instance of &dyn ExtensionSerializer could be passed when serializing plans?

@thinkharderdev
Copy link
Contributor Author

Thank you for this proposal @thinkharderdev

While I am not an expert in Ballista and I will defer to others for this PR, I did have a few comments:

Using a trait that handled serializing an entire logical plan would allow use cases like using alternate serializers for the plan (like one could imagine using JSON for example), but I am not sure how it directly helps serializing extension points.

What would you think about something like the following:

/// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {

    /// Serializes a [UserDefinedLogicalNode] into an opaque set of
    /// bytes for transport over the network
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes, BallistaError>;

    /// Deserializes a set of bytes created by
    /// [serialize_extension_node] into a new instance of a
    /// [UserDefinedLogicalNode]
    fn deserialize_extension_node(bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>, BallistaError>;

    // TBD Similar functions for the other extension points
}

struct DefaultExtensionSerializer {
}

impl ExtensionSerializer for DefaultExtensionSerializer {
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode) -> Result<Bytes> {
        Err(BallistaError::NotImplemented(format!("Default serializer does not know how to serialize {:?}", node)))
    }

    fn deserialize_extension_node(_bytes: Bytes) -> Result<Arc<dyn UserDefinedLogicalNode>> {
        Err(BallistaError::NotImplemented("Default serializer does not know how to deserialize user defined extensions".to_string()))
    }
}

And an instance of &dyn ExtensionSerializer could be passed when serializing plans?

Yeah, this is what I tried to do originally but it didn't work out very well. The basic problem I was running into is that since we decoding the message recursively from the leaves, as soon as you "break out" of the specific types defined in Ballista, you can no longer use any of the decoding machinery. For example, if I define a logical node like:

struct ReverseAllString {}

impl UserDefinedLogicalNode for ReverseAllString {
  ...
}

I have to encode my ReverseAllString struct but also need to encode all of it's inputs. But I can't define the inputs using the protobuf types from Ballista and still hook into the machinery which converts from my serialized type to an actual LogicalPlan. At least not any way that I'm aware of (short of hand-rolling the binary codec).

@alamb
Copy link
Contributor

alamb commented Jan 26, 2022

What if we changed the signature to something like the following (where the inputs are already provided)?

/// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {

    /// Serializes a [UserDefinedLogicalNode] into an opaque set of
    /// bytes for transport over the network
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode, inputs: Vec<LogicalNode>) -> Result<Bytes, BallistaError>;

    //....
}

@thinkharderdev
Copy link
Contributor Author

What if we changed the signature to something like the following (where the inputs are already provided)?

/// Describes something that knows how to serialize / deserialize the
/// contents of DataFusion user defined extension points
pub trait ExtensionSerializer {

    /// Serializes a [UserDefinedLogicalNode] into an opaque set of
    /// bytes for transport over the network
    fn serialize_extension_node(node: &dyn UserDefinedLogicalNode, inputs: Vec<LogicalNode>) -> Result<Bytes, BallistaError>;

    //....
}

Something like that could work. You might package it up in something like:

message ExtensionNode {
  bytes extension = 1;
  repeated bytes inputs = 2;
}

to allow you to pull the same trick on the decoding side.

I think this could be complementary to the approach in this PR. We could change the signature for AsLogicalPlan to:

     fn try_into_logical_plan(
        &self,
        ctx: &ExecutionContext,
        extension_serde: Option<Arc<dyn ExtensionSerializer>>
    ) -> Result<LogicalPlan, BallistaError>;

@thinkharderdev thinkharderdev marked this pull request as ready for review February 3, 2022 18:11
@alamb
Copy link
Contributor

alamb commented Feb 3, 2022

6bdfa1f clippy is a cruel tyrant

😆

@thinkharderdev
Copy link
Contributor Author

@alamb Does this approach seem sensible? I think this PR is ready to merge unless we're uncomfortable with the basic approach.

@alamb
Copy link
Contributor

alamb commented Feb 7, 2022

Looks fine with me -- cc @realno @yahoNanJing @Ted-Jiang any thoughts or opinions?

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thinkharderdev there are couple left over comments that I think could be removed?

I am a fan of the proposed extension node message and trait design. I am cool with us merging this design as is since it provides new functionality. But I think if we go with the extension node design, then we won't need to have both the AsLogicalPlan and AsExecutionPlan trait anymore? And we will still be able to encode logical plan and physical plan as a proper typed message field in ballista.proto instead of bytes?

Comment on lines 152 to 154
// (&self.plan)
// .try_into()
// .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be deleted?

Comment on lines 753 to 761
// let mut config_builder = BallistaConfig::builder();
// for kv_pair in &settings {
// config_builder = config_builder.set(&kv_pair.key, &kv_pair.value);
// }
// let config = config_builder.build().map_err(|e| {
// let msg = format!("Could not parse configs: {}", e);
// error!("{}", msg);
// tonic::Status::internal(msg)
// })?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be removed?

error!("{}", msg);
tonic::Status::internal(msg)
})?
// let ctx = create_datafusion_context(&config);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this logic was removed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an oversight. I did not think these settings were actually used anywhere but was not accounting for the SQL case. Discussing fixes on #1848

@houqp
Copy link
Member

houqp commented Feb 8, 2022

That said, really cool working demo and thanks for taking a stab at this :)

@thinkharderdev
Copy link
Contributor Author

@houqp Thanks! Removed the commented code.

I like the ExtensionCodec approach as well but I think it would work with the AsLogicalPlan/AsExecutionPlan since it gives us a hook to "inject" the extension codec. So we would end up with something that looks like:

pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
    fn try_decode(buf: &[u8]) -> Result<Self, BallistaError>
    where
        Self: Sized;

    fn try_encode<B>(&self, buf: &mut B) -> Result<(), BallistaError>
    where
        B: BufMut,
        Self: Sized;

    fn try_into_logical_plan(
        &self,
        ctx: &ExecutionContext,
        extension_codec: &dyn LogicalExtensionCodec,
    ) -> Result<LogicalPlan, BallistaError>;

    fn try_from_logical_plan(
        plan: &LogicalPlan,
        extension_codec: &dyn LogicalExtensionCodec,
    ) -> Result<Self, BallistaError>
    where
        Self: Sized;
}

pub trait LogicalExtensionCodec: Debug + Send + Sync {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &Vec<LogicalPlan>,
    ) -> Result<Extension, BallistaError>;

    fn try_encode(&self, node: Extension, buf: &mut Vec<u8>)
        -> Result<(), BallistaError>;
}

where we extend the protobuf encoding of LogicalPlanNode with a new enum

message ExtensionPlanNode {
  bytes plan = 1;
  repeated LogicalPlanNode inputs = 2;
}

I think this will be useful as well if we ultimately want to migrate to Substrait for a serializable representation of the plans. We can experiment without having to do a "hard cutover".

I can also work on integrating the ExtensionCodec.

@thinkharderdev
Copy link
Contributor Author

@alamb @houqp I figured it would be better to do the whole thing in one shot so I've included the LogicalExtensionCodec`PhysicalExtensionCodec` piece in this PR with the latest commit.

@alamb
Copy link
Contributor

alamb commented Feb 9, 2022

I will plan to merge this later today unless anyone would like to comment further

CC @realno @yahoNanJing @Ted-Jiang @matthewmturner @andygrove as you have expressed interest in Ballista in the past

@realno
Copy link
Contributor

realno commented Feb 9, 2022

LGTM thanks @thinkharderdev @alamb

@matthewmturner
Copy link
Contributor

Good for me. Thank you @thinkharderdev @alamb

@alamb alamb merged commit 1bedaf3 into apache:master Feb 9, 2022
@alamb
Copy link
Contributor

alamb commented Feb 9, 2022

Thanks everyone for their help and review! Thank you @thinkharderdev for your patience

@ZhangqyTJ
Copy link

It seems that registering objectstore cannot be used. Please take a look at this issue.#2136

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants