Skip to content

Commit

Permalink
Merge pull request lakesoul-io#149 from meta-soul/spark_native_io_pac…
Browse files Browse the repository at this point in the history
…kaging

[NativeIO][Spark] Package native lib in lakesoul-spark jar
  • Loading branch information
dmetasoul01 authored Feb 2, 2023
2 parents 968740f + d1a7536 commit e9b8092
Show file tree
Hide file tree
Showing 8 changed files with 271 additions and 61 deletions.
7 changes: 7 additions & 0 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,13 @@
<include>*.so</include>
<include>*.dll</include>
</includes>
<excludes>
<exclude>deps</exclude>
<exclude>.fingerprint</exclude>
<exclude>build</exclude>
<exclude>examples</exclude>
<exclude>*.d</exclude>
</excludes>
</resource>
</resources>
<plugins>
Expand Down
50 changes: 50 additions & 0 deletions lakesoul-spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@
<groupId>com.dmetasoul</groupId>
<artifactId>lakesoul-io-java</artifactId>
<version>2.2.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- scala deps -->
Expand Down Expand Up @@ -212,6 +234,23 @@
</dependencies>

<build>
<resources>
<resource>
<directory>${basedir}/../native-io/target/x86_64-unknown-linux-gnu/release/</directory>
<includes>
<include>*.dylib</include>
<include>*.so</include>
<include>*.dll</include>
</includes>
<excludes>
<exclude>deps</exclude>
<exclude>.fingerprint</exclude>
<exclude>build</exclude>
<exclude>examples</exclude>
<exclude>*.d</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -299,6 +338,13 @@
<includes>
<include>com.dmetasoul:lakesoul-spark</include>
<include>com.dmetasoul:lakesoul-common</include>
<include>com.dmetasoul:lakesoul-io-java</include>
<include>com.github.jnr:*</include>
<include>org.ow2.asm:*</include>
<include>org.apache.arrow:arrow-algorithm</include>
<include>org.apache.arrow:arrow-c-data</include>
<include>org.apache.arrow:arrow-dataset</include>
<include>com.google.flatbuffers:*</include>
<include>com.zaxxer:HikariCP</include>
<include>org.postgresql:postgresql</include>
<include>com.alibaba:fastjson</include>
Expand All @@ -318,6 +364,10 @@
<pattern>com.alibaba.fastjson</pattern>
<shadedPattern>com.lakesoul.shaded.com.alibaba.fastjson</shadedPattern>
</relocation>
<relocation>
<pattern>com.google.flatbuffers</pattern>
<shadedPattern>com.lakesoul.shaded.com.google.flatbuffers</shadedPattern>
</relocation>
</relocations>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

package org.apache.spark.sql.execution.datasources.parquet;

import com.amazonaws.auth.AWSCredentials;
import org.apache.arrow.lakesoul.io.NativeIOReader;
import org.apache.arrow.lakesoul.io.read.LakeSoulArrowReader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
Expand Down Expand Up @@ -162,7 +160,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
awsS3Bucket = s3aFileSystem.getBucket();
s3aEndpoint = taskAttemptContext.getConfiguration().get("fs.s3a.endpoint");
s3aRegion = taskAttemptContext.getConfiguration().get("fs.s3a.endpoint.region");
awsCredentials = S3AUtils.createAWSCredentialProviderSet(file.toUri(), taskAttemptContext.getConfiguration()).getCredentials();
s3aAccessKey = taskAttemptContext.getConfiguration().get("fs.s3a.access.key");
s3aSecretKey = taskAttemptContext.getConfiguration().get("fs.s3a.secret.key");
}
initializeInternal();
}
Expand Down Expand Up @@ -248,7 +247,7 @@ private void recreateNativeReader() throws IOException {
reader.setThreadNum(threadNum);

if (s3aFileSystem != null) {
reader.setObjectStoreOptions(awsCredentials.getAWSAccessKeyId(), awsCredentials.getAWSSecretKey(), s3aRegion, awsS3Bucket, s3aEndpoint);
reader.setObjectStoreOptions(s3aAccessKey, s3aSecretKey, s3aRegion, awsS3Bucket, s3aEndpoint);
}

if (filter != null) {
Expand Down Expand Up @@ -378,12 +377,12 @@ private void initializeInternal() throws IOException, UnsupportedOperationExcept
private S3AFileSystem s3aFileSystem = null;
private String s3aEndpoint = null;
private String s3aRegion = null;

private AWSCredentials awsCredentials = null;
private String s3aAccessKey = null;
private String s3aSecretKey = null;

private String awsS3Bucket = null;

private FilterPredicate filter;
private final FilterPredicate filter;
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package org.apache.spark.sql.lakesoul.benchmark.io

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf

/**
* Run with following commands with local minio env:
*
* mvn package -pl lakesoul-spark -am -DskipTests
* docker run --rm -ti --net host -v /opt/spark/work-dir/data:/opt/spark/work-dir/data -v $PWD/lakesoul-spark/target:/opt/spark/work-dir/jars bitnami/spark:3.3.1 spark-submit --driver-memory 4g --jars /opt/spark/work-dir/jars/lakesoul-spark-2.2.0-spark-3.3-SNAPSHOT.jar --class org.apache.spark.sql.lakesoul.benchmark.io.ParquetScanBenchmark /opt/spark/work-dir/jars/lakesoul-spark-2.2.0-spark-3.3-SNAPSHOT-tests.jar --localtest
*/
object ParquetScanBenchmark {
def main(args: Array[String]): Unit = {
val builder = SparkSession.builder()
.appName("ParquetScanBenchmark")
.master("local[1]")
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("hadoop.fs.s3a.committer.name", "directory")
.config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
.config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "/opt/spark/work-dir/s3a_staging")
.config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3.buffer.dir", "/tmp")
.config("spark.hadoop.fs.s3a.buffer.dir", "/tmp")
.config("spark.hadoop.fs.s3a.fast.upload.buffer", "disk")
.config("spark.hadoop.fs.s3a.fast.upload", value = true)
.config("spark.hadoop.fs.s3a.multipart.size", 33554432)
.config("spark.sql.shuffle.partitions", 1)
.config("spark.sql.files.maxPartitionBytes", "2g")
.config("spark.default.parallelism", 1)
.config("spark.sql.parquet.mergeSchema", value = false)
.config("spark.sql.parquet.filterPushdown", value = true)
.config("spark.hadoop.mapred.output.committer.class", "org.apache.hadoop.mapred.FileOutputCommitter")
.config("spark.sql.warehouse.dir", "s3://lakesoul-test-bucket/data/benchmark")
.config("spark.sql.extensions", "com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog")

var bucketName = "lakesoul-test-bucket"
if (args.length >= 1 && args(0) == "--localtest") {
builder.config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9000")
.config("spark.hadoop.fs.s3a.endpoint.region", "us-east-1")
.config("spark.hadoop.fs.s3a.access.key", "minioadmin1")
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin1")
} else {
if (args.length >= 1 && args(0) == "--bucketname") {
bucketName = args(1)
}
}

val spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

val dataPath0 = "/opt/spark/work-dir/data/base-0.parquet"
val tablePath = s"s3://$bucketName/data/benchmark/parquet-scan"
println(s"tablePath: $tablePath")

var tableExist = true
try {
val _ = LakeSoulTable.forPath(tablePath)
tableExist = true
} catch {
case _: Throwable => tableExist = false
}

if (!tableExist) {
println(s"LakeSoul table not exist, upload from local file")
val df = spark.read.format("parquet").load(dataPath0).repartition(1)
df.write.format("lakesoul")
.mode("Overwrite").save(tablePath)
}

println(s"Reading with parquet-mr")
// spark parquet-mr read
SQLConf.get.setConfString(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, "false")
spark.time({
spark.read.format("lakesoul").load(tablePath).write.format("noop").mode("Overwrite").save()
})
println(s"Reading with native io")
SQLConf.get.setConfString(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, "true")
spark.time({
spark.read.format("lakesoul").load(tablePath).write.format("noop").mode("Overwrite").save()
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Native IO Performance Comparison Results

## 1. Parquet Scan

### Settings

Code: lakesoul-spark/src/test/scala/org/apache/spark/sql/lakesoul/benchmark/io/ParquetScanBenchmark.scala

Tested on Spark 3.3.1 with Parquet-mr 1.12.2, Arrow-rs(parquet) 31.0.0.
Parquet file size: 894.3MB, compressed with snappy. Metadata:

```
############ file meta data ############
created_by: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
num_columns: 8
num_rows: 10000000
num_row_groups: 7
format_version: 1.0
serialized_size: 7688
```

File is read with only one parallelism in Spark.

### Results
1. MinIO

| | Parquet-mr | Native-IO | Improvement |
|----------|------------|-----------|-------------|
| Time(ms) | 11417 | 4381 | 2.61x |

2. AWS S3

| | Parquet-mr | Native-IO | Improvement |
|----------|------------|-----------|-------------|
| Time(ms) | 25190 | 6965 | 3.62x |
Loading

0 comments on commit e9b8092

Please sign in to comment.