Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the library in Databricks environment #8

Closed
MrPowers opened this issue Mar 13, 2021 · 25 comments · Fixed by #9
Closed

Using the library in Databricks environment #8

MrPowers opened this issue Mar 13, 2021 · 25 comments · Fixed by #9

Comments

@MrPowers
Copy link
Collaborator

I did a bit of experimentation and looks like it's tricky to use this lib in Databricks.

Any way we can provide an interface that doesn't require the user to set a configuration option?

Perhaps we can let the user run an import statement like import org.apache.spark.sql.itachi.postgres._ to get all the functions? The function registration process is still a little fuzzy for me. Let me know if you think this would be possible!

@yaooqinn
Copy link
Owner

SGTM to have scala/java APIs support for Itachi

@MrPowers
Copy link
Collaborator Author

@yaooqinn - yea, we should consider Scala/Java APIs, but I'm specifically referring to how we can get the current SQL functions working in a Databricks notebook. For example, how can I get select array_cat(arr1, arr2) as both_arrays from some_data working in a Databricks notebook?

@yaooqinn
Copy link
Owner

I have no idea how Databricks notebook works.

But I guess it is as same as adding delta lake as we both use SparkSessionExtentions. FYI, https://docs.delta.io/latest/delta-update.html#configure-sparksession&language-scala.

Also cc @cloud-fan @HyukjinKwon

@MrPowers
Copy link
Collaborator Author

@yaooqinn - I'll try to figure it out. You can sign up for a Databricks Community account if you'd like to do experimentation on your end (it's free).

Will need to make sure to spell out all the steps clearly for Databricks users cause that'll be the most common runtime for this lib. Thanks!

@yaooqinn
Copy link
Owner

Looks nice! I will try it later

@HyukjinKwon
Copy link

Hm, it's just jars right? I think it should be same as just regular Spark. For example, setting spark.jars to standard Maven coordinate like com.github.yaooqinn:itachi:0.1.0.

Or I believe Databricks allows UI for that:
Screen Shot 2021-03-16 at 11 17 52 AM

with Spark 3.2 (DBR 9), users should be able to add them runtime via ADD JAR with Ivy scheme, see also apache/spark#29966

@MrPowers
Copy link
Collaborator Author

@yaooqinn @HyukjinKwon - thanks for the responses. This post provides more context.

I started up a Databricks community cluster with the spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions configuration option set.

Screen Shot 2021-03-16 at 7 22 52 AM

Then I attached the library.

Screen Shot 2021-03-16 at 7 28 10 AM

The array_append function that's defined in itachi isn't accessible like I expected it to be:

Screen Shot 2021-03-16 at 7 32 51 AM

new org.apache.spark.sql.extra.PostgreSQLExtensions().apply(spark.extensions) didn't work either.

Screen Shot 2021-03-16 at 7 42 18 AM

Here's how we're currently injecting the functions:

class PostgreSQLExtensions extends Extensions {
  override def apply(ext: SparkSessionExtensions): Unit = {
    ext.injectFunction(Age.fd)
    ext.injectFunction(ArrayAppend.fd)
    ext.injectFunction(ArrayLength.fd)
    ext.injectFunction(FunctionAliases.array_cat)
    ext.injectFunction(IntervalJustifyLike.justifyDays)
    ext.injectFunction(IntervalJustifyLike.justifyHours)
    ext.injectFunction(IntervalJustifyLike.justifyInterval)
    ext.injectFunction(Scale.fd)
    ext.injectFunction(SplitPart.fd)
    ext.injectFunction(StringToArray.fd)
    ext.injectFunction(UnNest.fd)
  }
}

Per the SparkSessionExtensions docs, perhaps we need to do something like this?

class MyExtensions extends Function1[SparkSessionExtensions, Unit] {
  override def apply(extensions: SparkSessionExtensions): Unit = {
    extensions.injectResolutionRule { session =>
      ...
    }
    extensions.injectParser { (session, parser) =>
      ...
    }
  }
}

Thanks for the help!

@yaooqinn
Copy link
Owner

can we run set spark.sql.extensions to check whether itachi is on it?

@yaooqinn
Copy link
Owner

I test is locally with the age function, it seems fine

bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.1.0 --conf spark.sql.extensions=org.apache.spark.sql.extra.PostgreSQLExtensions
spark-sql> select age(timestamp '2000', timestamp'1990')
         > ;
10 years
Time taken: 0.043 seconds, Fetched 1 row(s)
spark-sql> %

@yaooqinn
Copy link
Owner

yaooqinn commented Mar 17, 2021

spark-shell also works fine

scala> spark.sql("select age(timestamp '2000', timestamp'1990')").show
+---------------------------------------------------------------------+
|age(TIMESTAMP '2000-01-01 00:00:00', TIMESTAMP '1990-01-01 00:00:00')|
+---------------------------------------------------------------------+
|                                                             10 years|
+---------------------------------------------------------------------+

scala> spark.sql("select array_append(array(1, 2, 3), 4)").show
+-------------------------------+
|array_append(array(1, 2, 3), 4)|
+-------------------------------+
|                   [1, 2, 3, 4]|
+-------------------------------+
scala>

@MrPowers
Copy link
Collaborator Author

@yaooqinn - Ran set spark.sql.extensions to confirm that the extensions are added (it's still not working):

Screen Shot 2021-03-17 at 11 08 01 AM

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

Sim created another way of registering Spark SQL functions that works in Databricks. We might have to use that or another approach.

We'll have to get this figured out cause a lot of the ppl that will want to use Itachi are on Databricks!

@yaooqinn
Copy link
Owner

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

This is the same as what I did for a local test above.
What does the cluster actually mean? the standalone with master/workers? or the app with driver/executors?
However, I guess Databricks must support adding external jars and confs together before startup whatever the cluster is

@MrPowers
Copy link
Collaborator Author

@yaooqinn - I asked this question on StackOverflow and found two approaches that work on Databricks.

Option 1

import org.apache.spark.sql.catalyst.expressions.postgresql.{Age, ArrayAppend, ArrayLength, IntervalJustifyLike, Scale, SplitPart, StringToArray, UnNest}
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)
spark.sessionState.functionRegistry.registerFunction(FunctionAliases.array_cat._1, FunctionAliases.array_cat._2, FunctionAliases.array_cat._3)
spark.sessionState.functionRegistry.registerFunction(ArrayAppend.fd._1, ArrayAppend.fd._2, ArrayAppend.fd._3)
spark.sessionState.functionRegistry.registerFunction(ArrayLength.fd._1, ArrayLength.fd._2, ArrayLength.fd._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyDays._1, IntervalJustifyLike.justifyDays._2, IntervalJustifyLike.justifyDays._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyHours._1, IntervalJustifyLike.justifyHours._2, IntervalJustifyLike.justifyHours._3)
spark.sessionState.functionRegistry.registerFunction(IntervalJustifyLike.justifyInterval._1, IntervalJustifyLike.justifyInterval._2, IntervalJustifyLike.justifyInterval._3)
spark.sessionState.functionRegistry.registerFunction(Scale.fd._1, Scale.fd._2, Scale.fd._3)
spark.sessionState.functionRegistry.registerFunction(SplitPart.fd._1, SplitPart.fd._2, SplitPart.fd._3)
spark.sessionState.functionRegistry.registerFunction(StringToArray.fd._1, StringToArray.fd._2, StringToArray.fd._3)
spark.sessionState.functionRegistry.registerFunction(UnNest.fd._1, UnNest.fd._2, UnNest.fd._3)

Option 2

Cluster node initialization scripts

I haven't gotten this one working yet, but will keep you posted on my progress. Not sure what needs to be included in the cluster init script.

It's possible Option 1 will be better and we can just expose a wrapper function for itachi users to run that code. We should probably add a new function in Spark that makes it easier to register functions. This seems overly complicated.

@yaooqinn
Copy link
Owner

It looks like a limitation of Databricks runtime product to me 😁. It makes things complicated for pure SQL guys with third-party libs. IIUC, the spark.sql.extensions basically can not work with Databricks runtime CE, no matter what the extension is, FunctionInterface, ParserInterface, AnalyzerInterface.

Maybe there is another option if it supports:

  1. create an instance of SparkContext - sc
  2. Add Jar with itachi, sc.addJar("...")
  3. create SparkSession with extensions and the existing sc.
  4. run queries with spark

@cloud-fan
Copy link

IIUC, the spark.sql.extensions basically can not work with Databricks runtime CE

@hvanhovell Is this true? Doesn't seem like a reasonable limitation if ACL is not enabled. Also cc @liancheng as well.

@hvanhovell
Copy link

@cloud-fan this is correct. We initialize the session slightly differently, and as a result we load jars after the sessions has been created, we have an internal ticket to get this fixed (no ETA). The preferred way of doing this is to use init scripts (option 2).

@cloud-fan
Copy link

One idea is to extend the spark.sql.extensions config with resource files, similar to how we load DataSourceRegister. Then we can include the resource file in the jar and no need to set the config. @yaooqinn do you have interests to work on it?

@MrPowers MrPowers reopened this May 2, 2021
@MrPowers
Copy link
Collaborator Author

MrPowers commented May 2, 2021

@cloud-fan - Thanks for commenting. It'd be even better if we could register functions after a cluster is already started. The spark-alchemy project shows how this is possible. Here's the syntax.

import com.swoop.alchemy.spark.expressions.NativeFunctionRegistration
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
import org.apache.spark.sql.catalyst.expressions.postgresql.Age // from itachi
object ItachiFunctionRegistration extends NativeFunctionRegistration {
  val expressions: Map[String, (ExpressionInfo, FunctionBuilder)] = Map(
    expression[Age]("age")
  )
}
ItachiFunctionRegistration.registerFunctions(spark)

This is a bit more elegant than the syntax @alexott proposed here:

import org.apache.spark.sql.catalyst.expressions.postgresql.Age
import org.apache.spark.sql.extra.FunctionAliases

spark.sessionState.functionRegistry.registerFunction(Age.fd._1, Age.fd._2, Age.fd._3)

Itachi is an amazing project and think it'll be really important to get more users to Spark. I am going to suggest to the Databricks product team that they should add an "Enable Presto Syntax" button that'd give users the ability to load the relevant Itachi functions with the click of a button. That'd let users copy queries from AWS Athena / Postgres and run them in Databricks - I know a lot of users want this functionality.

I'm confident that the @yaooqinn + @cloud-fan Spark dream team can come up with an elegant solution that'll enable this great user experience for customers!

@cloud-fan
Copy link

Yea we can register functions after SparkSession is instantiated, as a workaround if the extension API can't be used. This means the library should provide an extra API for users to manually register functions with an existing SparkSession instance.

I think this is orthogonal to my proposal for making spark extension API easier to use. We can do both.

@MrPowers
Copy link
Collaborator Author

MrPowers commented May 3, 2021

@cloud-fan - yep, I agree that registering functions with an existing SparkSession & making the spark extension API easier to work with are orthogonal chunks of work. Like your suggestion of doing both 😄

I will try to figure this out and send you a pull request. Might need your help 😉 Stay tuned!

@alexott
Copy link

alexott commented May 4, 2021

@yaooqinn this is not a limitation of the Databricks runtime itself - this is a limitation of Databricks Community Edition.

@yaooqinn
Copy link
Owner

yaooqinn commented May 6, 2021

Sorry for the late reply. I was on holiday at that time.

One idea is to extend the spark.sql.extensions config with resource files, similar to how we load DataSourceRegister. Then we can include the resource file in the jar and no need to set the config.

Hi, @cloud-fan, according to the instructions that @MrPowers provides here, the reason for the problem is that DCE does not download the jars when start Spark Standalone cluster manager not a loading issue at Spark app side?

Thanks, @MrPowers for driving this. I am looking forward to your PR and willing to help.

@cloud-fan
Copy link

Databricks requires that you set the configuration options before starting the cluster and then attach the JAR file once the cluster is running. That's probably what's causing the error.

Ah, so the problem is DCE users can only attach jars after the cluster is running, so the solution should be to register itachi functions after the session is instantiated.

@alexott
Copy link

alexott commented May 6, 2021

@cloud-fan exactly...

@MrPowers
Copy link
Collaborator Author

MrPowers commented May 8, 2021

I wrote a blog post to recap what I learned from this conversation and share the knowledge with the Spark community. Thank you all for teaching me this valuable information.

yaooqinn added a commit to apache/spark that referenced this issue May 13, 2021
### What changes were proposed in this pull request?

In yaooqinn/itachi#8, we had a discussion about the current extension injection for the spark session.  We've agreed that the current way is not that convenient for both third-party developers and end-users.

It's much simple if third-party developers can provide a resource file that contains default extensions for Spark to  load ahead

### Why are the changes needed?

better use experience

### Does this PR introduce _any_ user-facing change?

no, dev change

### How was this patch tested?

new tests

Closes #32515 from yaooqinn/SPARK-35380.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants