Skip to content

Commit

Permalink
Speed up Spark + Flink unit test execution
Browse files Browse the repository at this point in the history
`test` task execution for Spark and Flink is rather slow. Gradle allows forking multiple JVMs to parallelize test execution, test _classes_ are distributed amount the available test worker JVMs.

This change allows more than one parallel fork (test worker JVM). The maximum number of workers is calculated like this:`max(min(Integer.getInteger("iceberg.maxSparkTestParallelism", 2), Runtime.runtime.availableProcessors() / 2), 1)`.

The default max settings for Spark and Flink are configured in `gradle.properties` to `2`, but this can be overridden in `~/.gradle/gradle.properties`.

This change does not affect CI, because in CI the number of "CPUs" on GitHub free hosted runners is 2, divided by 2 = 1.
  • Loading branch information
snazy committed Jun 28, 2024
1 parent 7071dc1 commit fc2bc7e
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 23 deletions.
7 changes: 7 additions & 0 deletions flink/v1.17/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {

test {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxFlinkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

Expand Down
7 changes: 7 additions & 0 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {

test {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxFlinkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

Expand Down
7 changes: 7 additions & 0 deletions flink/v1.19/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {

test {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxFlinkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

Expand Down
6 changes: 6 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,9 @@ systemProp.defaultScalaVersion=2.12
systemProp.knownScalaVersions=2.12,2.13
org.gradle.parallel=true
org.gradle.jvmargs=-Xmx1024m

# The maximum allowed test forks for Spark + Flink tests.
# The number of `Test.maxParallelForks` is computed like this:
# max(min(Integer.getInteger("iceberg.maxSparkTestParallelism", 2), Runtime.runtime.availableProcessors() / 2), 1)
systemProp.iceberg.maxSparkTestParallelism=1
systemProp.iceberg.maxFlinkTestParallelism=1
19 changes: 13 additions & 6 deletions spark/v3.3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ configure(sparkProjects) {
}
}
}

tasks.withType(Test).configureEach {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxSparkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
Expand Down Expand Up @@ -105,10 +116,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation libs.awaitility
}

test {
useJUnitPlatform()
}

tasks.withType(Test) {
// Vectorized reads need more memory
maxHeapSize '2560m'
Expand Down Expand Up @@ -280,15 +287,15 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
archiveClassifier.set(null)
}

task integrationTest(type: Test) {
tasks.register("integrationTest", Test).configure {
description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}"
group = "verification"
jvmArgs += project.property('extraJvmArgs')
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
dependsOn shadowJar
}
integrationTest.dependsOn shadowJar
check.dependsOn integrationTest

jar {
Expand Down
19 changes: 13 additions & 6 deletions spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ configure(sparkProjects) {
}
}
}

tasks.withType(Test).configureEach {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxSparkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
Expand Down Expand Up @@ -106,10 +117,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation libs.awaitility
}

test {
useJUnitPlatform()
}

tasks.withType(Test) {
// Vectorized reads need more memory
maxHeapSize '2560m'
Expand Down Expand Up @@ -283,15 +290,15 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
archiveClassifier.set(null)
}

task integrationTest(type: Test) {
tasks.register("integrationTest", Test).configure {
description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}"
group = "verification"
jvmArgs += project.property('extraJvmArgs')
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
dependsOn shadowJar
}
integrationTest.dependsOn shadowJar
check.dependsOn integrationTest

jar {
Expand Down
24 changes: 13 additions & 11 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ configure(sparkProjects) {
}
}
}

tasks.withType(Test).configureEach {
useJUnitPlatform()

// See `gradle.properties` and `$HOME/.gradle/gradle.properties`
maxParallelForks = Math.max(
Math.min(
Integer.getInteger("iceberg.maxSparkTestParallelism", 2),
(Runtime.runtime.availableProcessors() / 2).intValue()).intValue(),
1)
}
}

project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
Expand Down Expand Up @@ -106,10 +117,6 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
testImplementation libs.awaitility
}

test {
useJUnitPlatform()
}

tasks.withType(Test) {
// Vectorized reads need more memory
maxHeapSize '2560m'
Expand Down Expand Up @@ -173,10 +180,6 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
antlr libs.antlr.antlr4
}

test {
useJUnitPlatform()
}

generateGrammarSource {
maxHeapSize = "64m"
arguments += ['-visitor', '-package', 'org.apache.spark.sql.catalyst.parser.extensions']
Expand Down Expand Up @@ -288,16 +291,15 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
archiveClassifier.set(null)
}

task integrationTest(type: Test) {
useJUnitPlatform()
tasks.register("integrationTest", Test).configure {
description = "Test Spark3 Runtime Jar against Spark ${sparkMajorVersion}"
group = "verification"
jvmArgs += project.property('extraJvmArgs')
testClassesDirs = sourceSets.integration.output.classesDirs
classpath = sourceSets.integration.runtimeClasspath + files(shadowJar.archiveFile.get().asFile.path)
inputs.file(shadowJar.archiveFile.get().asFile.path)
dependsOn shadowJar
}
integrationTest.dependsOn shadowJar
check.dependsOn integrationTest

jar {
Expand Down

0 comments on commit fc2bc7e

Please sign in to comment.