Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark Direct Readers and Writers for Druid. #10920

Conversation

JulianJaffePinterest
Copy link
Contributor

@JulianJaffePinterest JulianJaffePinterest commented Feb 24, 2021

Implements #9780

Description

See #9780 and linked discussions for more context. This PR adds a new module, druid-spark, containing Spark direct readers and writers for Druid. Usage is documented in the module documentation.

As discussed on the dev mailing list, a summary in human language of the UX and testing regimen follows:

UX

The entry point for users is deceptively simple: all interaction is handled through the existing spark interfaces plus configuration. To be specific, users read Druid data into a Spark dataframe via

Dataset<Row> df = sparkSession
  .read()
  .format("druid")
  .schema(schema)
  .options(propertiesMap)
  .load();

and write a Spark dataframe to a Druid data source with

df
  .write()
  .format("druid")
  .mode(SaveMode.Overwrite)
  .options(propertiesMap)
  .save();

The meat of the interaction is through the propertiesMap passed to the reader or writer. These properties, cataloged in the documentation mostly follow the corresponding Druid properties. If desired, there are typed helpers for setting these options in org.apache.druid.spark.DruidDataFrameReader and org.apache.druid.spark.DruidDataFrameWriter as well. Sample usages of these helpers are

import org.apache.druid.spark.DruidDataFrameReader

sparkSession
  .read
  .brokerHost("localhost")
  .brokerPort(8082)
  .metadataDbType("mysql")
  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
  .metadataUser("druid")
  .metadataPassword("diurd")
  .dataSource("dataSource")
  .druid()

and

import org.apache.druid.spark.DruidDataFrameWriter
import org.apache.druid.spark.model.LocalDeepStorageConfig

val deepStorageConfig = new LocalDeepStorageConfig().storageDirectory("/mnt/druid/druid-segments/")

df
  .write
  .metadataDbType("mysql")
  .metadataUri("jdbc:mysql://druid.metadata.server:3306/druid")
  .metadataUser("druid")
  .metadataPassword("diurd")
  .version(1)
  .deepStorage(deepStorageConfig)
  .mode(SaveMode.Overwrite)
  .dataSource("dataSource")
  .druid()

There are a few key areas to be aware of:

First, due to Spark's design, DataSourceWriters can not repartition the dataframe they are responsible for writing and they have very little information about the overall partitioning. To compensate for this, the writer includes a number of partitioners out of the box. These partitioners by necessity have no context for the data they are partitioning and so will be slower than usage-specific partitioners but are suitable for prototyping. If the provided partitioners are not used, there are a few behaviors to be aware of. For "simple" ShardSpecs (LinearShardSpec and NumberedShardSpec) the writer will default to rationalizing the output segments into contiguous and complete blocks, ensuring that loading and overshadowing of the output will be handled atomically by Druid. For more complex shard spec types such as HashBasedNumberedShardSpec and SingleDimensionShardSpec, users will need to provided a partition map to the writer, linking Spark partition ids to information required to construct the corresponding Druid segments. The included Spark partitioners all provide partition maps and can be used as references for how to implement similar behavior in custom partitioners.

Second, because this code executes in a Spark cluster rather than a Druid cluster, it cannot take advantage of Druid's extension support directly. Instead, these connectors utilize a plugin registry system that parallels Druid's extensions. The complex metric types, metadata stores, and deep storage types supported by core Druid extensions are also supported out of the box with this extension, with the exception of Azure deep storage. If users wish to implement their own plugins to handle specific complex metrics types, metadata servers, shard specs, or deep storage implementations, they can register plugins to handle their use cases with the respective plugin registries before loading or saving data and the Spark connectors will use the provided logic for the corresponding tasks.

Testing

Testing is handled via a combination of unit tests and light integration tests. Testing is focused on the core Spark code that handles reading and writing data, although most functionality is covered by at least one unit test. The key classes are DruidDataSourceReader, DruidInputPartitionReader, and DruidInputPartition on the read side and DruidDataSourceWriter, DruidDataWriterFactory, and DruidDataWriter on the write side. The tests are in the corresponding *Suite classes. Additionally, there is a lightweight round-trip test in DruidDataSourceV2 suite which writes out test data to local segments, updates metadata in an embedded Derby instance, reads the segments back into a Dataframe, and confirms that the rows read in matches the rows written out. This test also confirms that the metadata entries created by the writer is correct.

The main gap in testing is the cross-compatibility matrix. I'm not sure how to repeatably test these connectors' interaction with deep storage types other than local and with metadata servers other than derby. This code has been run at scale against Druid deployments using HDFS for deep storage and Postgresql for metadata and I am aware of production users with S3 and GCP deep storage, but the Azure support is lightly modified from how the corresponding Druid extension handles configuration and otherwise untested beyond parsing.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Key changed/added classes in this PR
  • spark/*

This commit consolidates the previous work for apache#9780 into one single merged commit. This commit adds Spark DataFrame readers and writers that interact directly with Druid deep storage, avoiding placing load on active clusters.
Reworked the configuration system, cleaned up the documentation, moved it to the docs/ directory, and a few other code-quality odds and ends.
Added support for callers to provide a list of Druid dimension objects via  in addition to the existing support for a comma-separated list of dimension names or no value. This allows callers to specify how String dimension columns handle multi-values and whether or not to create bitmap indices instead of forcing the defaults. Also added more tests, improved the scaladocs throughout, and resolved a few scalastyle warnings.
@JulianJaffePinterest
Copy link
Contributor Author

I've added some more tests, improved some comments, and added support for configuring non-default multi-value handling and indexing for string dimensions.

I also see that instructions for how to update the licenses and notices have been committed, but I'll wait on doing that until this has a little more traction 😉

@JulianJaffePinterest
Copy link
Contributor Author

Is anyone actively reviewing this? If not, I have a bit of a refactor I'd like to push that improves the organization of the code and hopefully makes it easier to follow.

@bcmcmill
Copy link

bcmcmill commented Apr 6, 2021

Is anyone actively reviewing this? If not, I have a bit of a refactor I'd like to push that improves the organization of the code and hopefully makes it easier to follow.

Please post this update.

Refactor the spark extensions to improve the layout of packages and classes. Also add experimental support for columnar reads, initial support for Druid's default value null handling, improved tests and code documentation, and bump the Spark version to 2.4.7 & scala version to 2.12.12 to pull in security fixes.
@JulianJaffePinterest
Copy link
Contributor Author

Life intervened as always, so I haven't thought about this for a while. I've just pushed that refactor I mentioned above that improves the package and class layout. The commit also bumps the target Spark and scala patch versions (to 2.4.7 & 2.12.12, respectively) to pull in security fixes, adds more unit tests, and improves the code documentation. Finally, the commit adds two new features: the ability to specify SQL-compliant or Druid default value null handling (via the reader.useDefaultValueForNull config option) and experimental support for columnar reads. See the extension documentation for more detail.

@bcmcmill
Copy link

Awesome, I have had to resort adding a InitializeForTesting call from the NullHandliing module to the writer class, to ensure that NullHandling was initialized on the executors or else exceptions were thrown. Glad that is properly fixed up now.

Make null ahndling configurable for writing as well as reading.
@JulianJaffePinterest
Copy link
Contributor Author

Added the writer.useDefaultValueForNull config option as well to control how nulls are handled while writing.

* @return A map from column name to data type and whether or not the column is multi-value
* for the schema of DATASOURCE.
*/
def getSchema(dataSource: String, intervals: List[Interval]): Map[String, (String, Boolean)] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should intervals be optional? If interval is not provided, the default interval specified by the broker config druid.query.segmentMetadata.defaultHistory will be used. https://druid.apache.org/docs/latest/querying/segmentmetadataquery.html#intervals

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intervals are required because the default behavior is unlikely to be what Spark users would expect (e.g. if no time bounds are provided, I would expect to read the whole table in a batch context, not just the last week.) For now, this is only called in one place, which uses JodaTime.MIN_INSTANT and JodaTime.MAX_INSTANT as the interval bounds if no time filters are provided. This will be an expensive query for large datasources, but until #9707 is implemented I don't know of a better way. I suppose it would make sense to move that default handling into the getSchema method here instead though, so I'll make the change.

if (segments.size() > 1) {
logWarn("Segment metadata response had more than one row, problem?")
}
segments.asScala.head.getColumns.asScala.map{ case (name: String, col: ColumnAnalysis) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find it in the docs, but what happens if segments have different schemas. For ex - a dimension maybe a string in one segment and a double in another. Should we set merge to false and give precedence to the latest segment when determining a column's type and other information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The analysis will error if a dimension has different types across two merged segments. The returned column will have type String and so we'll silently widen the type to String. It's been a couple years since I wrote this piece, and yeah that is really non-obvious (I had to go code spelunking to figure it out). I'm inclined to add a comment to the code explaining what happens and then leave the current behavior, but I could also see just throwing an error back to the caller and refusing to continue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the second part of your question, the reason we don't just take the last segment in an interval and use its schema is because the latest segment may happen to not have multiple values for a particular column where other segments do, or a column may be all null for the latest segment and so not show up in the Segment Metadata. We query the full range of the requested interval and then merge to handle these cases, which does open up the merging problem if types change. At least in my Druid experience, the first two cases are much more frequent than the third case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to fail the operation if there is an error reported in the response? At a minimum, we should log a warning to let the user know that we are falling back to using "String" as datatype for a column since, possibly, data types are different for a column across segments in the requested interval.

Per review: Made the intervals arg for getSchema() optional, added a comment explaining how we handle a changing dimension type across the queried interval for getSchema and debug-level logging of the SegmentAnalysis we extract our Spark schema from, and changed the behavior when multiple SegmentAnalyses are returned from a merged SegmentMetadataQuery to throw an error instead of just logging a warning. (A merged SegmentMetadataQuery result should only have one row, so we should not expect to ever hit this error).
"metadataPassword" -> "diurd"
)

sparkSession
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to also include an example that illustrates how to provide broker url for determining schema.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JulianJaffePinterest , appreciate your effort to build something like this. Thank you !! . I am trying to use this spark connector . Just wanted to check if the - spark.md file is merged to sprak-druid-connector branch?

.load()
```

Filters should be applied to the read-in data frame before any [Spark actions](http://spark.apache.org/docs/2.4.7/api/scala/index.html#org.apache.spark.sql.Dataset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a gap in my understanding - since the readers are directly loading from deep storage, would they not need to essentially download and scan the entire file to apply predicates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a filter is on the time dimension, then we don't need to scan files that don't cover the requested time interval (down the road we could also add support for not fetching unnecessary files if the underlying data source is partitioned by range as well). If the segment file contains data for the requested time period we'll need to fetch it from deep storage but we can still filter out values while we read data into a DataFrame so there's some benefit there as well, at least for what Spark needs to keep in memory.

@JulianJaffePinterest
Copy link
Contributor Author

JulianJaffePinterest commented Apr 28, 2021

I pushed up a commit to clean up the documentation as you requested @samarthjain. I also added support for building a library jar suitable for adding to a Spark cluster with mvn clean package -pl extensions-core/spark-extensions (edit 7/14/21: use mvn clean package -pl spark). To keep the size reasonable I added dependency exclusions to the pom. I haven't yet made sure that we're excluding every dependency possible, but even the obvious first cut trimmed the size by over 60%.

@birTiwana
Copy link

@JulianJaffePinterest I have tried to create a jar file from your PR branch and copied it to my spark code. I am using it to read a druid data source using the following code (I have retrieved the segments directly via the DruidMetaClient ):

val readDf = sparkSession .read .format("druid") .options(Map("segments" -> segmentsString)) .load()

but I keep on hitting the error:

icationMaster.runDriver(ApplicationMaster.scala:472) at org.apache.spark.deploy.yarn.ApplicationMaster.org$apache$spark$deploy$yarn$ApplicationMaster$$runImpl(ApplicationMaster.scala:308) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply(ApplicationMaster.scala:248) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:783) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1926) at org.apache.spark.deploy.yarn.ApplicationMaster.doAsUser(ApplicationMaster.scala:782) at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:247) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:807) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) Caused by: java.lang.ClassNotFoundException: Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:213) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186) at LoadData$.main(LoadData.scala:80) at LoadData.main(LoadData.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688) Caused by: java.lang.ClassNotFoundException: druid.DefaultSource at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:652) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:652) at scala.util.Try.orElse(Try.scala:84) at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) ... 9 more

Not sure what I am missing here exactly but I did not see a test case for this in your PR as well.

) extends Logging {
private lazy val druidMetadataTableConfig = MetadataStorageTablesConfig.fromBase(base)
private lazy val dbcpProperties = new Properties()
dbcpProperties.putAll(dbcpMap)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines can be combined into one.
private lazy val dbcpProperties = new Properties(dbcpMap)

Without this I am getting a build failure when trying to build this PR locally:

[ERROR] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala/org/apache/druid/spark/utils/DruidMetadataClient.scala:52: error: ambiguous reference to overloaded definition, [ERROR] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [ERROR] and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [ERROR] match argument types (java.util.Properties) [ERROR] dbcpProperties.putAll(dbcpMap) [ERROR] ^ [ERROR] one error found

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you share a stack trace or how you triggered this error?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am simply doing mvn clean package -pl extensions-core/spark-extensions . This actually seems like a scala version mismatch issue for me. I am using 2.13.5 but I think the expectation is to use 2.12.11 or 2.12.10 instead? Adding some stack trace here:

[INFO] --- scala-maven-plugin:3.2.2:compile (scala-compile-first) @ druid-spark-extensions --- [WARNING] Expected all dependencies to require Scala version: 2.12.11 [WARNING] com.fasterxml.jackson.module:jackson-module-scala_2.12:2.10.2 requires scala version: 2.12.10 [WARNING] Multiple versions of scala libraries detected! [INFO] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala:-1: info: compiling [INFO] Compiling 29 source files to /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/target/classes at 1621433638727 [ERROR] /Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/src/main/scala/org/apache/druid/spark/utils/DruidMetadataClient.scala:52: error: ambiguous reference to overloaded definition, [ERROR] both method putAll in class Properties of type (x$1: java.util.Map[_, _])Unit [ERROR] and method putAll in class Hashtable of type (x$1: java.util.Map[_ <: Object, _ <: Object])Unit [ERROR] match argument types (java.util.Properties) [ERROR] dbcpProperties.putAll(dbcpMap) [ERROR] ^ [ERROR] one error found [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 29.832 s [INFO] Finished at: 2021-05-19T07:14:04-07:00 [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project druid-spark-extensions: wrap: org.apache.commons.exec.ExecuteException: Process exited with an error: 1 (Exit value: 1) -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

if (segments.size() > 1) {
logWarn("Segment metadata response had more than one row, problem?")
}
segments.asScala.head.getColumns.asScala.map{ case (name: String, col: ColumnAnalysis) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to fail the operation if there is an error reported in the response? At a minimum, we should log a warning to let the user know that we are falling back to using "String" as datatype for a column since, possibly, data types are different for a column across segments in the requested interval.

val httpClient: HttpClient,
val hostAndPort: HostAndPort
) extends Logging {
private val RetryCount = 5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to have these settings configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Seq.empty
}
}.toSeq.flatten
closer.close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in a try-with-resources like block you have added in this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if this throws an exception we should blow up to avoid polluting the Spark cluster's workspace with orphaned filed, but I'm open to alternative approaches.

private[spark] val brokerPortDefaultKey: (String, Int) = (brokerPortKey, 8082)

// Common configs
val useCompactSketchesKey: String = "useCompactSketches" // Default: false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allows users to work with compact versions of ThetaSketches if desired (for example, if they don't need to update the sketches after reading them)


val df = sparkSession
.read
.format("druid")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to run these test cases locally using Intellij's scala plugin but they are failing with the same error that I mentioned in my comment earlier regarding "druid" not being a correct DataSource:

`2021-05-18T14:23:04,851 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.internal.SharedState - Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/spark-warehouse').
2021-05-18T14:23:04,852 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.internal.SharedState - Warehouse path is 'file:/Users/btiwana/Downloads/druid-spark/druid/extensions-core/spark-extensions/spark-warehouse'.
2021-05-18T14:23:04,855 INFO [ScalaTest-run-running-DruidDataSourceV2Suite] org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef - Registered StateStoreCoordinator endpoint

Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html
java.lang.ClassNotFoundException: Failed to find data source: druid. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:660)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:245)
at org.apache.druid.spark.v2.DruidDataSourceV2Suite.$anonfun$new$2(DruidDataSourceV2Suite.scala:77)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
at org.apache.druid.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:34)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.druid.spark.SparkFunSuite.runTest(SparkFunSuite.scala:34)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite.run(Suite.scala:1112)
at org.scalatest.Suite.run$(Suite.scala:1094)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
Caused by: java.lang.ClassNotFoundException: druid.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:213)
at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:634)
at scala.util.Failure.orElse(Try.scala:224)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 51 more

`

I think we are missing a step of adding druid as a data source to spark.

@JulianJaffePinterest
Copy link
Contributor Author

@birTiwana the problem is that you don't have the org.apache.spark.sql.sources.DataSourceRegister resource in your META-INF/services, so the short name resolution isn't working. How are you building the jar?

If you're using mvn clean package -pl extensions-core/spark-extensions, try deleting the line in the pom that sets the ServicesResourcesTransformer (<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> ). If you don't want to include the resource in your META-INF services, you can also just use the fully qualified name instead: org.apache.druid.spark.v2.DruidDataSourceV2

@jihoonson
Copy link
Contributor

Hi @JulianJaffePinterest, just want to check how things are going. My previous suggestion for splitting this PR into smaller chunks was based on an assumption that it will not slow down the process. But it seems not working as well as I expected. Our ultimate goal here is merging this cool feature quickly. If you are busy, I totally understand and will support you to merge this PR as quickly as possible without further split.

@JulianJaffePinterest
Copy link
Contributor Author

Hey @jihoonson, I had some unexpected and unfortunate personal/familial crises to deal with these past few months. While they're not entirely in the rear-view mirror, I should have more time again to push this to the finish line. I've opened #11823 with the next chunk of code (the reading half of the connector). Please let me know if you think the PR is still too big; I couldn't find a good place to split it that wouldn't require a reviewer to know the rest of the code anyway.

@jihoonson
Copy link
Contributor

@JulianJaffePinterest sorry to hear that. I hope things have gotten better. Also, thank you for making #11823. I will take a look.

@wangxiaobaidu11
Copy link
Contributor

wangxiaobaidu11 commented Dec 2, 2021

hi @JulianJaffePinterest, I hope you can get out of your sad mood soon and take good care of yourself! Last month I tested your code. I met a problem described below:
when run “dataset.write().format("druid").mode(SaveMode.Overwrite).options(map).save();“
21/12/02 15:19:40 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) noc.com.fasterxml.jackson.databind.exc.InvalidTypeIdException: Could not resolve type id 'thetaSketch' as a subtype of org.apache.druid.query.aggregation.AggregatorFactory: known type ids = [cardinality, count, doubleAny, doubleFirst, doubleLast, doubleMax, doubleMean, doubleMin, doubleSum, filtered, floatAny, floatFirst, floatLast, floatMax, floatMin, floatSum, grouping, histogram, hyperUnique, javascript, longAny, longFirst, longLast, longMax, longMin, longSum, stringAny, stringFirst, stringFirstFold, stringLast, stringLastFold] at [Source: (String)"[ { "type": "count", "name": "count" }, { "type": "longSum", "name": "sum_metric1", "fieldName": "sum_metric1" }, { "type": "longSum", "name": "sum_metric2", "fieldName": "sum_metric2" }, { "type": "doubleSum", "name": "sum_metric3", "fieldName": "sum_metric3" }, { "type": "floatSum", "name": "sum_metric4", "fieldName": "sum_metric4" }, { "type": "thetaSketch", "name": "uniq_id1", "fieldName": "uniq_id1", "isInputThetaSketch": true } ]"; line: 7, column: 13] (through reference chain: java.lang.Object[][5]) at noc.com.fasterxml.jackson.databind.exc.InvalidTypeIdException.from(InvalidTypeIdException.java:43) at noc.com.fasterxml.jackson.databind.DeserializationContext.invalidTypeIdException(DeserializationContext.java:1761) at noc.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownTypeId(DeserializationContext.java:1268) at noc.com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._handleUnknownTypeId(TypeDeserializerBase.java:290) at noc.com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._findDeserializer(TypeDeserializerBase.java:162) at noc.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:113) at noc.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97) at noc.com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254) at noc.com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:197) at noc.com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:21) at noc.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218) at noc.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3214) at noc.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3197) at org.apache.druid.spark.v2.writer.DruidDataWriterFactory$.createDataSchemaFromConfiguration(DruidDataWriterFactory.scala:99) at org.apache.druid.spark.v2.writer.DruidDataWriterFactory.createDataWriter(DruidDataWriterFactory.scala:70) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 21/12/02 15:19:40 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): noc.com.fasterxml.jackson.databind.exc.InvalidTypeIdException: Could not resolve type id 'thetaSketch' as a subtype of org.apache.druid.query.aggregation.AggregatorFactory: known type ids = [cardinality, count, doubleAny, doubleFirst, doubleLast, doubleMax, doubleMean, doubleMin, doubleSum, filtered, floatAny, floatFirst, floatLast, floatMax, floatMin, floatSum, grouping, histogram, hyperUnique, javascript, longAny, longFirst, longLast, longMax, longMin, longSum, stringAny, stringFirst, stringFirstFold, stringLast, stringLastFold] at [Source: (String)"[ { "type": "count", "name": "count" }, { "type": "longSum", "name": "sum_metric1", "fieldName": "sum_metric1" }, { "type": "longSum", "name": "sum_metric2", "fieldName": "sum_metric2" }, { "type": "doubleSum", "name": "sum_metric3", "fieldName": "sum_metric3" }, { "type": "floatSum", "name": "sum_metric4", "fieldName": "sum_metric4" }, { "type": "thetaSketch", "name": "uniq_id1", "fieldName": "uniq_id1", "isInputThetaSketch": true } ]"; line: 7, column: 13] (through reference chain: java.lang.Object[][5]) at noc.com.fasterxml.jackson.databind.exc.InvalidTypeIdException.from(InvalidTypeIdException.java:43) at noc.com.fasterxml.jackson.databind.DeserializationContext.invalidTypeIdException(DeserializationContext.java:1761) at noc.com.fasterxml.jackson.databind.DeserializationContext.handleUnknownTypeId(DeserializationContext.java:1268) at noc.com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._handleUnknownTypeId(TypeDeserializerBase.java:290) at noc.com.fasterxml.jackson.databind.jsontype.impl.TypeDeserializerBase._findDeserializer(TypeDeserializerBase.java:162) at noc.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:113) at noc.com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:97) at noc.com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:254) at noc.com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:197) at noc.com.fasterxml.jackson.databind.deser.std.ObjectArrayDeserializer.deserialize(ObjectArrayDeserializer.java:21) at noc.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4218) at noc.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3214) at noc.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3197) at org.apache.druid.spark.v2.writer.DruidDataWriterFactory$.createDataSchemaFromConfiguration(DruidDataWriterFactory.scala:99) at org.apache.druid.spark.v2.writer.DruidDataWriterFactory.createDataWriter(DruidDataWriterFactory.scala:70) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

Hey @jihoonson, I had some unexpected and unfortunate personal/familial crises to deal with these past few months. While they're not entirely in the rear-view mirror, I should have more time again to push this to the finish line. I've opened #11823 with the next chunk of code (the reading half of the connector). Please let me know if you think the PR is still too big; I couldn't find a good place to split it that wouldn't require a reviewer to know the rest of the code anyway.

@JulianJaffePinterest
Copy link
Contributor Author

@wangxiaobaidu11 thank you for your kind wishes. The issue you've encountered is an artifact of pulling out some custom code and updating logic to match mainline Druid changes from newer versions. In your case, you can call AggregatorFactoryRegistry.register("thetaSketch", new SketchAggregatorFactory("", "", 2, 0)) on the driver before writing (note that for historical reasons you need to pass an AggregatorFactory instance and not just the class). In the piecemeal writer code for review that is waiting on #11823, this is done automatically for you. #11823 has been approved pending a broken test in the unrelated code I'm running down, so I hope to have the writer code up soon. If I can find a few minutes I'll put up a repo with the up-to-date connector code and compatibility with older Druid versions as well.

@wangxiaobaidu11
Copy link
Contributor

wangxiaobaidu11 commented Dec 9, 2021

@JulianJaffePinterest Thanks for your reply. I added my own extension, Long Unique Extension, and I ran the entire Spark writing process to Druid. Use hdfs for deep storage.
The code I changed is a little ugly, how can I make it more elegant.
image

@JulianJaffePinterest
Copy link
Contributor Author

@wangxiaobaidu11 you don't need to make changes to the druid spark code for your use case - you can call AggregatorFactoryRegistry.register("longUnique", new LongUniqueAggregatorFactory("", "", 0) from within your own spark app. That's definitely still ugly since the AggregatorFactory instance is unnecessary, but as mentioned in my previous comment this won't be the case for long. If instantiating an instance is a problem, there is one other temporary work-around: because all AggregatorFactoryRegistry does under the hood is register subtypes, you can use the public package method registerSubType. In your case, you would call org.apache.druid.spark.registerSubtype(new NamedType(classOf[LongUniqueAggregatorFactory], "longUnique")) from your spark app. (You can statically that method if you'd like, leaving just registerSubtype(...))

@wangxiaobaidu11
Copy link
Contributor

wangxiaobaidu11 commented Dec 10, 2021

@wangxiaobaidu11 you don't need to make changes to the druid spark code for your use case - you can call AggregatorFactoryRegistry.register("longUnique", new LongUniqueAggregatorFactory("", "", 0) from within your own spark app. That's definitely still ugly since the AggregatorFactory instance is unnecessary, but as mentioned in my previous comment this won't be the case for long. If instantiating an instance is a problem, there is one other temporary work-around: because all AggregatorFactoryRegistry does under the hood is register subtypes, you can use the public package method registerSubType. In your case, you would call org.apache.druid.spark.registerSubtype(new NamedType(classOf[LongUniqueAggregatorFactory], "longUnique")) from your spark app. (You can statically that method if you'd like, leaving just registerSubtype(...))

@JulianJaffePinterest Thanks!I will update it . I have another question.
①when i set:
image
②spark runtime info:
21/12/10 16:09:45 INFO DruidDataSourceWriter: Committing the following segments: DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.601Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200101T000000.000Z_20200102T000000.000Z/2021-12-10T08_09_41.601Z/3_5427a1c2-6405-4516-83b1-2dd17bfff433_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3390}, DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.443Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200101T000000.000Z_20200102T000000.000Z/2021-12-10T08_09_41.443Z/1_c5be6b7e-76a6-44dd-9c53-f189950cb54d_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3390}, DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.767Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200101T000000.000Z_20200102T000000.000Z/2021-12-10T08_09_41.767Z/2_04fc7ed4-4131-4856-b60d-95c7b409251c_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3390}, DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.336Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200101T000000.000Z_20200102T000000.000Z/2021-12-10T08_09_41.336Z/0_918c926a-5738-4a19-a58b-8a3024ee01ad_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3390}, DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z_2021-12-10T08:09:41.299Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200102T000000.000Z_20200103T000000.000Z/2021-12-10T08_09_41.299Z/5_c62528cf-4377-4fa4-98da-df09dcc8e359_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3390}, DataSegment{binaryVersion=9, id=test_spark_druid_cube_v4_2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z_2021-12-10T08:09:41.835Z, loadSpec={type=>hdfs, path=>hdfs://xxxx/xxxx/xxx/xxxxxx/segments/test_spark_druid_cube_v4/20200102T000000.000Z_20200103T000000.000Z/2021-12-10T08_09_41.835Z/4_176e1242-cb5f-4745-bae3-0e9bb47b6c62_index.zip}, dimensions=[dim1, dim2, id1, id2], metrics=[count, sum_metric1, sum_metric2, sum_metric3, sum_metric4, uniq_id1_unique], shardSpec=NumberedShardSpec{partitionNum=0, partitions=1}, lastCompactionState=null, size=3466} 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.601Z 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.443Z 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.767Z 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2021-12-10T08:09:41.336Z 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z_2021-12-10T08:09:41.299Z 21/12/10 16:09:45 INFO SQLMetadataStorageUpdaterJobHandler: Published test_spark_druid_cube_v4_2020-01-02T00:00:00.000Z_2020-01-03T00:00:00.000Z_2021-12-10T08:09:41.835Z 21/12/10 16:09:45 INFO WriteToDataSourceV2Exec: Data source writer org.apache.druid.spark.v2.writer.DruidDataSourceWriter@1d1c63af committed.
③ the same date is covered,but I didn't want that to happen
21/12/10 16:09:45 WARN SegmentRationalizer: More than one version detected for interval 2020-01-01T00:00:00.000Z/2020-01-02T00:00:00.000Z on dataSource test_spark_druid_cube_v4! Some segments will be overshadowed! 21/12/10 16:09:45 WARN SegmentRationalizer: More than one version detected for interval 2020-01-02T00:00:00.000Z/2020-01-03T00:00:00.000Z on dataSource test_spark_druid_cube_v4! Some segments will be overshadowed!
image
④ I expect result which is combined segments,How do I set partition

@JulianJaffePinterest
Copy link
Contributor Author

@wangxiaobaidu11 you can control the number of partitions by partitioning your dataframe prior to calling .write() on the dataframe. There's a brief discussion in the docs about partitioning; I'm happy to add more/clarify/etc. as needed. Can you share more details about how you're configuring and calling the writer?

@wangxiaobaidu11
Copy link
Contributor

wangxiaobaidu11 commented Dec 13, 2021

@wangxiaobaidu11 you can control the number of partitions by partitioning your dataframe prior to calling .write() on the dataframe. There's a brief discussion in the docs about partitioning; I'm happy to add more/clarify/etc. as needed. Can you share more details about how you're configuring and calling the writer?
java code
StructType schema = (new StructType()) .add(new StructField("__time", LongType, true, new Metadata().empty())) .add(new StructField("dim1", new ArrayType(StringType, false), true, new Metadata().empty())) .add(new StructField("dim2", StringType, true, new Metadata().empty())) .add(new StructField("id1", StringType, true, new Metadata().empty())) .add(new StructField("id2", StringType, true, new Metadata().empty())) .add(new StructField("count", LongType, true, new Metadata().empty())) .add(new StructField("sum_metric1", LongType, true, new Metadata().empty())) .add(new StructField("sum_metric2", LongType, true, new Metadata().empty())) .add(new StructField("sum_metric3", DoubleType, true, new Metadata().empty())) .add(new StructField("sum_metric4", FloatType, true, new Metadata().empty())) .add(new StructField("uniq_id1", LongType, true, new Metadata().empty())); Dataset<Row> dataset = sparkSession.createDataFrame(list, schema); dataset.write().partitionBy("__time").format("druid").mode(SaveMode.Overwrite).options(map).save();

@JulianJaffePinterest
Copy link
Contributor Author

JulianJaffePinterest commented Dec 14, 2021

Calling .partitionBy on a DataFrameWriter (what you get when you call .write() on a DataFrame) doesn't do anything for a v2 data source that doesn't have a managed catalog, which Druid does not (see #11929 for a recent example). The docs have a more in-depth discussion of partitioning, but the short version is that you'll either need to partition your dataframe before calling .write() on it or use one of the DruidDataFrame wrapper's convenience methods. For example, in scala

import org.apache.druid.spark.DruidDataFrame

df.partitionAndWrite("__time", "millis", "DAY", 200000).format("druid").mode(SaveMode.Overwrite).options(map).save()

or in Java

import org.apache.druid.spark.package$.MODULE$.DruidDataFrame;

DruidDataFrame(dataset).partitionAndWrite("__time", "millis", "DAY", 200000).format("druid").mode(SaveMode.Overwrite).options(map).save();

If you don't want to use implicits/wrapper classes, you can also use the partitioner directly:

SingleDimensionPartitioner partitioner = new SingleDimensionPartitioner(dataset);
Dataset<Row> partitionedDataSet = partitioner.partition("__time", "millis", "DAY", 200000, "dim1", true);
partitionedDataset.write().format("druid").mode(SaveMode.Overwrite).options(map).save();

Also, are you setting writer.version in your options map? I'm surprised to see the segments differ in version between each partition. That's what's causing the partitions to overshadow each other.

@stale
Copy link

stale bot commented Apr 17, 2022

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Apr 17, 2022
@didip
Copy link
Contributor

didip commented Apr 17, 2022

No, please don’t close this PR.

@stale
Copy link

stale bot commented Apr 17, 2022

This issue is no longer marked as stale.

@stale stale bot removed the stale label Apr 17, 2022
@dhia-gharsallaoui
Copy link

@JulianJaffePinterest thank you for the efforts. There is any news about this PR. And there is a way that we can contribute to it.

@guanjieshen
Copy link

Hi @JulianJaffePinterest is this PR still active?

@github-actions
Copy link

github-actions bot commented Oct 8, 2023

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

Copy link

This pull request has been marked as stale due to 60 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If you think
that's incorrect or this pull request should instead be reviewed, please simply
write any comment. Even if closed, you can still revive the PR at any time or
discuss it on the dev@druid.apache.org list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Dec 12, 2023
Copy link

github-actions bot commented Jan 9, 2024

This pull request/issue has been closed due to lack of activity. If you think that
is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jan 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support directly reading and writing Druid data from Spark