Skip to content

Commit

Permalink
[Flink] Optimize CDC sink serde with Fury (lakesoul-io#307)
Browse files Browse the repository at this point in the history
* use fury serializer in flink multi table cdc sink

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* remove jsonobject in record

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix property type. fix macos action failure

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix table properties settings

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix github actions

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* add more class register

Signed-off-by: chenxu <chenxu@dmetasoul.com>

---------

Signed-off-by: chenxu <chenxu@dmetasoul.com>
Co-authored-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Aug 30, 2023
1 parent c6e892f commit c3271e1
Show file tree
Hide file tree
Showing 20 changed files with 399 additions and 146 deletions.
89 changes: 89 additions & 0 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,83 @@ jobs:
name: lakesoul-nativeio-x86_64-apple-darwin
path: ./native-io/target/release/liblakesoul_io_c.dylib

build-native-metadata-linux-x86_64:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./native-metadata -> target"
- uses: actions-rs/cargo@v1
with:
use-cross: true
command: build
args: '--manifest-path native-metadata/Cargo.toml --target x86_64-unknown-linux-gnu --release --all-features'
- uses: actions/upload-artifact@master
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu
path: ./native-metadata/target/x86_64-unknown-linux-gnu/release/liblakesoul_metadata_c.so

build-native-metadata-windows-x86_64:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./native-metadata -> target"
- uses: actions-rs/cargo@v1
with:
command: build
args: '--manifest-path native-metadata/Cargo.toml --release --all-features'
- uses: actions/upload-artifact@master
with:
name: lakesoul-nativemetadata-x86_64-pc-windows-msvc
path: ./native-metadata/target/release/lakesoul_metadata_c.dll

build-native-metadata-macos-x86_64:
runs-on: macos-latest
steps:
- name: Install automake
run: brew install automake
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
default: true
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./native-metadata -> target"
- uses: actions-rs/cargo@v1
with:
command: build
args: '--manifest-path native-metadata/Cargo.toml --release --all-features'
- uses: actions/upload-artifact@master
with:
name: lakesoul-nativemetadata-x86_64-apple-darwin
path: ./native-metadata/target/release/liblakesoul_metadata_c.dylib


deploy-maven-package:
runs-on: ubuntu-latest
needs: [build-linux-x86_64, build-windows-x86_64, build-macos-x86_64]
Expand All @@ -112,6 +189,18 @@ jobs:
with:
name: lakesoul-nativeio-x86_64-pc-windows-msvc
path: ./native-io/target/release/
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu
path: ./native-metadata/target/release/
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-apple-darwin
path: ./native-metadata/target/release/
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-pc-windows-msvc
path: ./native-metadata/target/release/
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down
11 changes: 5 additions & 6 deletions .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down Expand Up @@ -124,6 +118,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test
Expand Down Expand Up @@ -191,6 +186,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test
Expand Down Expand Up @@ -261,6 +257,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test
Expand Down Expand Up @@ -328,6 +325,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test
Expand Down Expand Up @@ -406,6 +404,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions/download-artifact@v3
with:
name: lakesoul-nativemetadata-x86_64-unknown-linux-gnu-maven-test
Expand Down
21 changes: 3 additions & 18 deletions .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
cache: maven
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand All @@ -139,16 +133,11 @@ jobs:
runs-on: windows-latest
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
cache: maven
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand All @@ -172,16 +161,11 @@ jobs:
- name: Install automake
run: brew install automake
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
cache: maven
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: actions-rs/toolchain@v1
with:
profile: minimal
Expand Down Expand Up @@ -238,6 +222,7 @@ jobs:
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: |
MAVEN_OPTS="-Xmx4000m" mvn -q -B package --file pom.xml -Pcross-build -DskipTests -Dmaven.test.skip=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@
// SPDX-License-Identifier: Apache-2.0
package com.dmetasoul.lakesoul.meta.jnr;

import com.google.protobuf.InvalidProtocolBufferException;
import jnr.ffi.Pointer;
import jnr.ffi.annotations.Delegate;
import jnr.ffi.annotations.LongLong;

import java.awt.*;

public interface LibLakeSoulMetaData {

Pointer create_tokio_runtime();
Expand Down
13 changes: 10 additions & 3 deletions lakesoul-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ SPDX-License-Identifier: Apache-2.0
<modelVersion>4.0.0</modelVersion>

<artifactId>lakesoul-flink</artifactId>
<version>2.3.0-flink-1.14-SNAPSHOT</version>
<version>2.3.0-flink-1.17-SNAPSHOT</version>
<properties>
<flink.version>1.17.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down Expand Up @@ -219,8 +219,7 @@ SPDX-License-Identifier: Apache-2.0
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
<scope>test</scope>
<version>32.0.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -248,6 +247,11 @@ SPDX-License-Identifier: Apache-2.0
<artifactId>parquet-column</artifactId>
<version>1.12.2</version>
</dependency>
<dependency>
<groupId>org.furyio</groupId>
<artifactId>fury-core</artifactId>
<version>0.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -526,6 +530,9 @@ SPDX-License-Identifier: Apache-2.0
<include>com.fasterxml.jackson.module:jackson-module-scala_2.12</include>
<include>com.fasterxml.jackson.module:jackson-module-paranamer</include>
<include>com.thoughtworks.paranamer:paranamer</include>
<include>org.furyio:fury-core</include>
<include>com.google.guava:guava</include>
<include>com.google.guava:failureaccess</include>

<!-- casbin & aspectj -->
<include>org.casbin:jdbc-adapter</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.flink.lakesoul.tool.LakeSoulSinkOptions;
import org.apache.flink.lakesoul.types.BinaryDebeziumDeserializationSchema;
import org.apache.flink.lakesoul.types.BinarySourceRecord;
import org.apache.flink.lakesoul.types.BinarySourceRecordSerializer;
import org.apache.flink.lakesoul.types.LakeSoulRecordConvert;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -48,17 +49,17 @@ public static void main(String[] args) throws Exception {
int sourceParallelism = parameter.getInt(SOURCE_PARALLELISM.key());
int bucketParallelism = parameter.getInt(BUCKET_PARALLELISM.key());
int checkpointInterval = parameter.getInt(JOB_CHECKPOINT_INTERVAL.key(),
JOB_CHECKPOINT_INTERVAL.defaultValue()); //mill second
JOB_CHECKPOINT_INTERVAL.defaultValue()); //mill second

MysqlDBManager mysqlDBManager = new MysqlDBManager(dbName,
userName,
passWord,
host,
Integer.toString(port),
new HashSet<>(),
databasePrefixPath,
bucketParallelism,
true);
userName,
passWord,
host,
Integer.toString(port),
new HashSet<>(),
databasePrefixPath,
bucketParallelism,
true);

mysqlDBManager.importOrSyncLakeSoulNamespace(dbName);

Expand All @@ -82,6 +83,7 @@ public static void main(String[] args) throws Exception {
conf.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.getConfig().registerTypeWithKryoSerializer(BinarySourceRecord.class, BinarySourceRecordSerializer.class);

ParameterTool pt = ParameterTool.fromMap(conf.toMap());
env.getConfig().setGlobalJobParameters(pt);
Expand All @@ -95,7 +97,8 @@ public static void main(String[] args) throws Exception {
}
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
env.getCheckpointConfig().setCheckpointingMode(checkpointingMode);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

env.getCheckpointConfig().setCheckpointStorage(parameter.get(FLINK_CHECKPOINT.key()));
env.setRestartStrategy(RestartStrategies.failureRateRestart(
Expand All @@ -105,18 +108,18 @@ public static void main(String[] args) throws Exception {
));

MySqlSourceBuilder<BinarySourceRecord> sourceBuilder = MySqlSource.<BinarySourceRecord>builder()
.hostname(host)
.port(port)
.databaseList(dbName) // set captured database
.tableList(dbName + ".*") // set captured table
.serverTimeZone(serverTimezone) // default -- Asia/Shanghai
.username(userName)
.password(passWord);

sourceBuilder.includeSchemaChanges(true);
sourceBuilder.scanNewlyAddedTableEnabled(true);
.hostname(host)
.port(port)
.databaseList(dbName) // set captured database
.tableList(dbName + ".*") // set captured table
.serverTimeZone(serverTimezone) // default -- Asia/Shanghai
.scanNewlyAddedTableEnabled(true)
.username(userName)
.password(passWord);

LakeSoulRecordConvert lakeSoulRecordConvert = new LakeSoulRecordConvert(conf, conf.getString(SERVER_TIME_ZONE));
sourceBuilder.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert, conf.getString(WAREHOUSE_PATH)));
sourceBuilder.deserializer(new BinaryDebeziumDeserializationSchema(lakeSoulRecordConvert,
conf.getString(WAREHOUSE_PATH)));
Properties jdbcProperties = new Properties();
jdbcProperties.put("allowPublicKeyRetrieval", "true");
jdbcProperties.put("useSSL", "false");
Expand All @@ -126,7 +129,9 @@ public static void main(String[] args) throws Exception {
LakeSoulMultiTableSinkStreamBuilder.Context context = new LakeSoulMultiTableSinkStreamBuilder.Context();
context.env = env;
context.conf = conf;
LakeSoulMultiTableSinkStreamBuilder builder = new LakeSoulMultiTableSinkStreamBuilder(mySqlSource, context, lakeSoulRecordConvert);
LakeSoulMultiTableSinkStreamBuilder
builder =
new LakeSoulMultiTableSinkStreamBuilder(mySqlSource, context, lakeSoulRecordConvert);
DataStreamSource<BinarySourceRecord> source = builder.buildMultiTableSource("MySQL Source");

DataStream<BinarySourceRecord> stream = builder.buildHashPartitionedCDCStream(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ public LakeSoulMultiTablesSink<IN> build() {
}

@Override
public abstract AbstractLakeSoulMultiTableSinkWriter<IN> createWriter(Sink.InitContext context, int subTaskId);
public abstract AbstractLakeSoulMultiTableSinkWriter<IN> createWriter(Sink.InitContext context, int subTaskId) throws IOException;

@Override
public LakeSoulSinkCommitter createCommitter() throws IOException {
return null;
// return new LakeSoulSinkCommitter();
}

@Override
Expand Down
Loading

0 comments on commit c3271e1

Please sign in to comment.