Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into cherry-pick/native_io…
Browse files Browse the repository at this point in the history
…_merge
  • Loading branch information
dmetasoul-opensource committed Feb 6, 2023
2 parents cd1f787 + 301e4a5 commit db9fa5d
Show file tree
Hide file tree
Showing 12 changed files with 278 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: CI with Maven Test

on:
push:
branches: [ "main", "release/spark_3.3" ]
branches: [ "main", "release/spark_3.3"]

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReader, Parti
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.vectorized.ColumnarBatch


abstract class NativeFilePartitionReaderFactory extends PartitionReaderFactory with Logging{
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
throw new UnsupportedOperationException("Cannot create row-based reader.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright [2022] [DMetaSoul Team]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2.parquet

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql. SparkSession
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetUtils}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, StructType, UserDefinedType}

class NativeParquetFileFormat extends FileFormat
with DataSourceRegister
with Logging
with Serializable {

override def prepareWrite(
sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {

val timeZoneId = options.get(DateTimeUtils.TIMEZONE_OPTION)
.getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone)

new OutputWriterFactory {
override def newInstance(
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new NativeParquetOutputWriter(path, dataSchema, timeZoneId, context)
}

override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}
}

}

override def inferSchema(
sparkSession: SparkSession,
parameters: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
ParquetUtils.inferSchema(sparkSession, parameters, files)
}

override def shortName(): String = "parquet"

override def toString: String = "Parquet"

override def hashCode(): Int = getClass.hashCode()

override def equals(other: Any): Boolean = other.isInstanceOf[ParquetFileFormat]

override def supportDataType(dataType: DataType): Boolean = dataType match {
case _: AtomicType => true

case st: StructType => st.forall { f => supportDataType(f.dataType) }

case ArrayType(elementType, _) => supportDataType(elementType)

case MapType(keyType, valueType, _) =>
supportDataType(keyType) && supportDataType(valueType)

case udt: UserDefinedType[_] => supportDataType(udt.sqlType)

case _ => false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright [2022] [DMetaSoul Team]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v2.parquet

import org.apache.arrow.lakesoul.io.NativeIOWriter
import org.apache.arrow.lakesoul.memory.ArrowMemoryUtils
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.s3a.S3AFileSystem
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils

class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter {

val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)

private var recordCount = 0

val arrowSchema: Schema = ArrowUtils.toArrowSchema(dataSchema, timeZoneId)
val nativeIOWriter: NativeIOWriter = new NativeIOWriter(arrowSchema)
nativeIOWriter.addFile(path)

val conf: Configuration = context.getConfiguration
val fileSystem: FileSystem = new Path(path).getFileSystem(conf)
if (fileSystem.isInstanceOf[S3AFileSystem]) {
val s3aAccessKey = conf.get("fs.s3a.access.key")
val s3aAccessSecret = conf.get("fs.s3a.access.secret")
val s3aRegion = conf.get("fs.s3a.endpoint.region");
val s3aEndpoint = conf.get("fs.s3a.endpoint")
nativeIOWriter.setObjectStoreOption("fs.s3a.access.key", s3aAccessKey)
nativeIOWriter.setObjectStoreOption("fs.s3a.access.secret", s3aAccessSecret)
nativeIOWriter.setObjectStoreOption("fs.s3a.endpoint.region", s3aRegion)
nativeIOWriter.setObjectStoreOption("fs.s3a.endpoint", s3aEndpoint)
}

nativeIOWriter.initializeWriter()

val allocator: BufferAllocator =
ArrowMemoryUtils.rootAllocator.newChildAllocator("toBatchIterator", 0, Long.MaxValue)
val root: VectorSchemaRoot = VectorSchemaRoot.create(arrowSchema, allocator)

val recordWriter: ArrowWriter = ArrowWriter.create(root)

override def write(row: InternalRow): Unit = {

recordWriter.write(row)
recordCount += 1

if (recordCount >= NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE) {
recordWriter.finish()
nativeIOWriter.write(root)
recordWriter.reset()
recordCount = 0
}
}


override def close(): Unit = {

recordWriter.finish()

nativeIOWriter.write(root)
nativeIOWriter.close()

recordWriter.reset()
root.close()
allocator.close()

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

package org.apache.spark.sql.execution.datasources.v2.parquet

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.FileSplit
import org.apache.hadoop.mapreduce._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.apache.spark.sql.lakesoul

import com.dmetasoul.lakesoul.meta.{DataOperation, MetaUtils}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.NativeParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.LakeSoulOptions.ReadType
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.utils._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


class Snapshot(table_info: TableInfo,
Expand Down Expand Up @@ -54,7 +55,11 @@ class Snapshot(table_info: TableInfo,
}

/** Return the underlying Spark `FileFormat` of the LakeSoulTableRel. */
def fileFormat: FileFormat = new ParquetFileFormat()
def fileFormat: FileFormat = if (SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)) {
new NativeParquetFileFormat()
} else {
new ParquetFileFormat()
}

def getConfiguration: Map[String, String] = table_info.configuration

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.LakeSoulOptions.ReadType
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.sources.{LakeSoulSQLConf, LakeSoulSourceUtils}
import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils
import org.apache.spark.sql.lakesoul.test.{LakeSoulTestSparkSession, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.{SparkUtil, TimestampFormatter}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.sql.types.StructType

import java.text.SimpleDateFormat
Expand All @@ -40,6 +41,17 @@ class CDCSuite
with SharedSparkSession
with LakeSoulTestUtils {

override protected def createSparkSession: TestSparkSession = {
SparkSession.cleanupAnyExistingSession()
val session = new LakeSoulTestSparkSession(sparkConf)
session.conf.set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
session.conf.set(SQLConf.DEFAULT_CATALOG.key, "lakesoul")
session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, true)
session.sparkContext.setLogLevel("ERROR")

session
}

import testImplicits._

val format = "lakesoul"
Expand Down Expand Up @@ -72,7 +84,7 @@ class CDCSuite
SnapshotManagement(path)
}

Seq("false", NativeIOBase.isNativeIOLibExist.toString).distinct.foreach { nativeIOEnabled =>
Seq("false", "true").distinct.foreach { nativeIOEnabled =>
test(s"test cdc with MultiPartitionMergeScan(native_io_enabled=$nativeIOEnabled) ") {
withTable("tt") {
withTempDir(dir => {
Expand Down Expand Up @@ -101,6 +113,7 @@ class CDCSuite
})
}
}

test(s"test cdc with OnePartitionMergeBucketScan(native_io_enabled=$nativeIOEnabled) ") {
withTable("tt") {
withTempDir(dir => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,31 @@ package org.apache.spark.sql.lakesoul.commands

import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, MergeOpInt}
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestSparkSession, MergeOpInt}
import org.apache.spark.sql.lakesoul.utils.SparkUtil
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession}
import org.scalatest.BeforeAndAfterEach

class CompactionSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
with LakeSoulSQLCommandTest {

override protected def createSparkSession: TestSparkSession = {
SparkSession.cleanupAnyExistingSession()
val session = new LakeSoulTestSparkSession(sparkConf)
session.conf.set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
session.conf.set(SQLConf.DEFAULT_CATALOG.key, "lakesoul")
session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, true)
session.sparkContext.setLogLevel("ERROR")

session
}

import testImplicits._

test("partitions are not been compacted by default") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,27 @@ package org.apache.spark.sql.lakesoul.commands

import com.dmetasoul.lakesoul
import com.dmetasoul.lakesoul.tables.{LakeSoulTable, LakeSoulTableTestUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.SnapshotManagement
import org.apache.spark.sql.lakesoul.test.LakeSoulSQLCommandTest
import org.apache.spark.sql.{Row, functions}
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSoulSQLCommandTest, LakeSoulTestSparkSession}
import org.apache.spark.sql.test.TestSparkSession
import org.apache.spark.sql.{Row, SparkSession, functions}

class DeleteScalaSuite extends DeleteSuiteBase with LakeSoulSQLCommandTest {

override protected def createSparkSession: TestSparkSession = {
SparkSession.cleanupAnyExistingSession()
val session = new LakeSoulTestSparkSession(sparkConf)
session.conf.set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
session.conf.set(SQLConf.DEFAULT_CATALOG.key, "lakesoul")
session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, true)
session.sparkContext.setLogLevel("ERROR")

session
}

import testImplicits._

test("delete cached table by path") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,32 @@

package org.apache.spark.sql.lakesoul.commands

import com.dmetasoul.lakesoul.meta.{MetaUtils, MetaVersion}
import com.dmetasoul.lakesoul.tables.LakeSoulTable
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.lakesoul.test.LakeSoulTestUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.test.{LakeSoulTestSparkSession, LakeSoulTestUtils}
import org.apache.spark.sql.lakesoul.utils.SparkUtil
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.test.{SharedSparkSession, TestSparkSession}
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SparkSession}
import org.scalatest.BeforeAndAfterEach

class DropTableSuite extends QueryTest
with SharedSparkSession with BeforeAndAfterEach
with LakeSoulTestUtils {

override protected def createSparkSession: TestSparkSession = {
SparkSession.cleanupAnyExistingSession()
val session = new LakeSoulTestSparkSession(sparkConf)
session.conf.set("spark.sql.catalog.lakesoul", classOf[LakeSoulCatalog].getName)
session.conf.set(SQLConf.DEFAULT_CATALOG.key, "lakesoul")
session.conf.set(LakeSoulSQLConf.NATIVE_IO_ENABLE.key, true)
session.sparkContext.setLogLevel("ERROR")

session
}

import testImplicits._
test("drop table") {
withTempDir(f => {
Expand Down
Loading

0 comments on commit db9fa5d

Please sign in to comment.