Skip to content

Commit

Permalink
get conf info from LakeSoulSQLConf
Browse files Browse the repository at this point in the history
  • Loading branch information
F-PHantam committed Feb 2, 2023
1 parent b2501f2 commit 753ce93
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: CI with flink cdc Test

on:
push:
branches: [ "main", "lakesoul_native_write" ]
branches: [ "main", "release/spark_3.3" ]

jobs:
build:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name: CI with Maven Test

on:
push:
branches: [ "main", "lakesoul_native_write"]
branches: [ "main", "release/spark_3.3"]

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.arrow.ArrowWriter
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.ArrowUtils

class NativeParquetOutputWriter(val path: String, dataSchema: StructType, timeZoneId: String, context: TaskAttemptContext) extends OutputWriter {

val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE = 250000
val NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE = SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_WRITE_MAX_ROW_GROUP_SIZE)

private var recordCount = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package org.apache.spark.sql.lakesoul

import org.apache.arrow.lakesoul.io.NativeIOBase
import org.apache.spark.sql.api.r.SQLUtils
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.parquet.NativeParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.lakesoul.LakeSoulOptions.ReadType
import org.apache.spark.sql.lakesoul.sources.LakeSoulSQLConf
import org.apache.spark.sql.lakesoul.utils._


Expand Down Expand Up @@ -54,7 +57,7 @@ class Snapshot(table_info: TableInfo,
}

/** Return the underlying Spark `FileFormat` of the LakeSoulTableRel. */
def fileFormat: FileFormat = if (NativeIOBase.isNativeIOLibExist) {
def fileFormat: FileFormat = if (SQLConf.get.getConf(LakeSoulSQLConf.NATIVE_IO_ENABLE)) {
new NativeParquetFileFormat
} else {
new ParquetFileFormat
Expand Down

0 comments on commit 753ce93

Please sign in to comment.