Skip to content

Commit

Permalink
[Flink-20999][docs] Adds usage examples to the Kafka Avro Confluent c…
Browse files Browse the repository at this point in the history
…onnector format

This closes apache#14764
  • Loading branch information
sv3ndk committed Jan 28, 2021
1 parent a4f1174 commit 2596c12
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 26 deletions.
133 changes: 118 additions & 15 deletions docs/dev/table/connectors/formats/avro-confluent.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ When reading (deserializing) a record with this format the Avro writer schema is

When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema id to be encoded with the data. The lookup is performed with in the configured Confluent Schema Registry under the [subject](https://docs.confluent.io/current/schema-registry/index.html#schemas-subjects-and-topics) given in `avro-confluent.schema-registry.subject`.

The Avro Schema Registry format can only be used in conjunction with [Apache Kafka SQL connector]({% link dev/table/connectors/kafka.md %}).
The Avro Schema Registry format can only be used in conjunction with the [Apache Kafka SQL connector]({% link dev/table/connectors/kafka.md %}) or the [Upsert Kafka SQL Connector]({% link dev/table/connectors/upsert-kafka.md %}).

Dependencies
------------
Expand All @@ -45,29 +45,132 @@ Dependencies
connector=connector
%}

How to create a table with Avro-Confluent format
----------------

Here is an example to create a table using Kafka connector and Confluent Avro format.
How to create tables with Avro-Confluent format
--------------

<div class="codetabs" markdown="1">
<div data-lang="SQL" markdown="1">

Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:

{% highlight sql %}
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
CREATE TABLE user_created (

-- one column mapped to the Kafka raw UTF-8 key
the_kafka_key STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING

) WITH (

'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'user_behavior',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081',
'avro-confluent.schema-registry.subject' = 'user_behavior'

-- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
{% endhighlight %}

We can write data into the kafka table as follows:

{% highlight sql %}
INSERT INTO user_created
SELECT
-- replicating the user id into a column mapped to the kafka key
id as the_kafka_key,

-- all values
id, name, email
FROM some_table
{% endhighlight %}

---

Example of a table with both the Kafka key and value registered as Avro records in the Schema Registry:

{% highlight sql %}
CREATE TABLE user_created (

-- one column mapped to the 'id' Avro field of the Kafka key
kafka_key_id STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING

) WITH (

'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',

-- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
-- forward compatible due to hash partitioning.
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',

-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the Kafka key field avoids clashes
'key.fields-prefix' = 'kafka_key_',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',

-- subjects have a default value since Flink 1.13, though can be overriden:
'key.avro-confluent.schema-registry.subject' = 'user_events_example2-key2',
'value.avro-confluent.schema-registry.subject' = 'user_events_example2-value2'
)
{% endhighlight %}

---
Example of a table using the upsert connector with the Kafka value registered as an Avro record in the Schema Registry:

{% highlight sql %}
CREATE TABLE user_created (

-- one column mapped to the Kafka raw UTF-8 key
kafka_key_id STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING,

-- upsert-kafka connector requires a primary key to define the upsert behavior
PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (

'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',

-- UTF-8 string as Kafka keys
-- We don't specify 'key.fields' in this case since it's dictated by the primary key of the table
'key.format' = 'raw',

-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the kafka key field to avoid clashes
'key.fields-prefix' = 'kafka_key_',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
{% endhighlight %}

</div>
</div>

Expand Down
126 changes: 115 additions & 11 deletions docs/dev/table/connectors/formats/avro-confluent.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ Avro Schema Registry (``avro-confluent``) 格式能让你读取被 ``io.confluen

当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 [subject](https://docs.confluent.io/current/schema-registry/index.html#schemas-subjects-and-topics) 下,检索 schema id。subject 通过 `avro-confluent.schema-registry.subject` 参数来制定。

Avro Schema Registry 格式只能与[Apache Kafka SQL连接器]({% link dev/table/connectors/kafka.zh.md %})结合使用。
The Avro Schema Registry format can only be used in conjunction with the [Apache Kafka SQL connector]({% link dev/table/connectors/kafka.zh.md %}) or the [Upsert Kafka SQL Connector]({% link dev/table/connectors/upsert-kafka.zh.md %}).

依赖
------------
Expand All @@ -52,20 +52,124 @@ Avro Schema Registry 格式只能与[Apache Kafka SQL连接器]({% link dev/tabl

<div class="codetabs" markdown="1">
<div data-lang="SQL" markdown="1">

Example of a table using raw UTF-8 string as Kafka key and Avro records registered in the Schema Registry as Kafka values:

{% highlight sql %}
CREATE TABLE user_created (

-- one column mapped to the Kafka raw UTF-8 key
the_kafka_key STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING

) WITH (

'connector' = 'kafka',
'topic' = 'user_events_example1',
'properties.bootstrap.servers' = 'localhost:9092',

-- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
{% endhighlight %}

We can write data into the kafka table as follows:

{% highlight sql %}
INSERT INTO user_created
SELECT
-- replicating the user id into a column mapped to the kafka key
id as the_kafka_key,

-- all values
id, name, email
FROM some_table
{% endhighlight %}

---

Example of a table with both the Kafka key and value registered as Avro records in the Schema Registry:

{% highlight sql %}
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
CREATE TABLE user_created (

-- one column mapped to the 'id' Avro field of the Kafka key
kafka_key_id STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING

) WITH (

'connector' = 'kafka',
'topic' = 'user_events_example2',
'properties.bootstrap.servers' = 'localhost:9092',
'topic' = 'user_behavior'
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://localhost:8081',
'avro-confluent.schema-registry.subject' = 'user_behavior'

-- Watch out: schema evolution in the context of a Kafka key is almost never backward nor
-- forward compatible due to hash partitioning.
'key.format' = 'avro-confluent',
'key.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'key.fields' = 'kafka_key_id',

-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the Kafka key field avoids clashes
'key.fields-prefix' = 'kafka_key_',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY',

-- subjects have a default value since Flink 1.13, though can be overriden:
'key.avro-confluent.schema-registry.subject' = 'user_events_example2-key2',
'value.avro-confluent.schema-registry.subject' = 'user_events_example2-value2'
)
{% endhighlight %}

---
Example of a table using the upsert connector with the Kafka value registered as an Avro record in the Schema Registry:

{% highlight sql %}
CREATE TABLE user_created (

-- one column mapped to the Kafka raw UTF-8 key
kafka_key_id STRING,

-- a few columns mapped to the Avro fields of the Kafka value
id STRING,
name STRING,
email STRING,

-- upsert-kafka connector requires a primary key to define the upsert behavior
PRIMARY KEY (kafka_key_id) NOT ENFORCED

) WITH (

'connector' = 'upsert-kafka',
'topic' = 'user_events_example3',
'properties.bootstrap.servers' = 'localhost:9092',

-- UTF-8 string as Kafka keys
-- We don't specify 'key.fields' in this case since it's dictated by the primary key of the table
'key.format' = 'raw',

-- In this example, we want the Avro types of both the Kafka key and value to contain the field 'id'
-- => adding a prefix to the table column associated to the kafka key field to avoid clashes
'key.fields-prefix' = 'kafka_key_',

'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = 'http://localhost:8082',
'value.fields-include' = 'EXCEPT_KEY'
)
{% endhighlight %}
</div>
Expand Down

0 comments on commit 2596c12

Please sign in to comment.