Skip to content

Commit

Permalink
[FLINK-24905][docs] Updating documentation for Kinesis table API conn…
Browse files Browse the repository at this point in the history
…ector
  • Loading branch information
vahmed-hamdy authored and dannycranmer committed Jan 24, 2022
1 parent 64787a6 commit bdfabeb
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 13 deletions.
96 changes: 89 additions & 7 deletions docs/content.zh/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ Connector Options
<td>String</td>
<td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td>
</tr>
<tr>
<td><h5>aws.trust.all.certificates</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true accepts all SSL certificates.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down Expand Up @@ -629,12 +636,71 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
Sink options for the <code>KinesisProducer</code>.
Suffix names must match the <a href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a> setters in lower-case hyphenated style (for example, <code>sink.producer.collection-max-count</code> or <code>sink.producer.aggregation-max-count</code>).
The transformed action keys are passed to the <code>sink.producer.*</code> to <a href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
Note that some of the defaults are overwritten by <code>KinesisConfigUtil</code>.
Deprecated options previously used by the legacy connector.
Options with equivalant alternatives in <code>KinesisDataStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to user as warnings.
</td>
</tr>
<tr>
<td><h5>sink.http-client.max-concurrency</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">10000</td>
<td>Integer</td>
<td>
Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>.
</td>
</tr>
<tr>
<td><h5>sink.http-client.read-timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">360000</td>
<td>Integer</td>
<td>
Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>.
</td>
</tr>
<tr>
<td><h5>sink.http-client.protocol.version</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">HTTP2</td>
<td>String</td>
<td>Http version used by Kinesis Client.</td>
</tr>
<tr>
<td><h5>sink.batch.max-size</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">500</td>
<td>Integer</td>
<td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td>
</tr>
<tr>
<td><h5>sink.requests.max-inflight</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td>
</tr>
<tr>
<td><h5>sink.requests.max-buffered</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">10000</td>
<td>String</td>
<td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td>
</tr>
<tr>
<td><h5>sink.flush-buffer.size</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">5242880</td>
<td>Long</td>
<td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td>
</tr>
<tr>
<td><h5>sink.flush-buffer.timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">5000</td>
<td>Long</td>
<td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -722,7 +788,7 @@ You can enable and configure EFO with the following properties:
to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
Stream consumer ARNs should be provided to the job via the consumer configuration.
* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally registered ARN-consumers (substitute `<stream-name>` with the name of your stream in the parameter name).
Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy.
Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy.

<span class="label label-info">Note</span> For a given Kinesis data stream, each EFO consumer must have a unique name.
However, consumer names do not have to be unique across data streams.
Expand All @@ -733,12 +799,28 @@ In the event that a job terminates within executing the shutdown hooks, stream c
In this situation the stream consumers will be gracefully reused when the application restarts.
With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`.

Data Type Mapping
----------------
# Data Type Mapping


Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword.
Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.

# Updates in 1.15

Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisDataStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisDataStreamsSink</code>.

Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisDataStreamsSink</code>.

<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.

<span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.

Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options.


{{< top >}}
94 changes: 88 additions & 6 deletions docs/content/docs/connectors/table/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ Connector Options
<td>String</td>
<td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td>
</tr>
<tr>
<td><h5>aws.trust.all.certificates</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true accepts all SSL certificates.</td>
</tr>
</tbody>
<thead>
<tr>
Expand Down Expand Up @@ -629,12 +636,71 @@ Connector Options
<td style="word-wrap: break-word;">(none)</td>
<td></td>
<td>
Sink options for the <code>KinesisProducer</code>.
Suffix names must match the <a href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html">KinesisProducerConfiguration</a> setters in lower-case hyphenated style (for example, <code>sink.producer.collection-max-count</code> or <code>sink.producer.aggregation-max-count</code>).
The transformed action keys are passed to the <code>sink.producer.*</code> to <a href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-">KinesisProducerConfigurations#fromProperties</a>.
Note that some of the defaults are overwritten by <code>KinesisConfigUtil</code>.
Deprecated options previously used by the legacy connector.
Options with equivalant alternatives in <code>KinesisDataStreamsSink</code> are matched
to their respective properties. Unsupported options are logged out to user as warnings.
</td>
</tr>
<tr>
<td><h5>sink.http-client.max-concurrency</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">10000</td>
<td>Integer</td>
<td>
Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>.
</td>
</tr>
<tr>
<td><h5>sink.http-client.read-timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">360000</td>
<td>Integer</td>
<td>
Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>.
</td>
</tr>
<tr>
<td><h5>sink.http-client.protocol.version</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">HTTP2</td>
<td>String</td>
<td>Http version used by Kinesis Client.</td>
</tr>
<tr>
<td><h5>sink.batch.max-size</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">500</td>
<td>Integer</td>
<td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td>
</tr>
<tr>
<td><h5>sink.requests.max-inflight</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">16</td>
<td>Integer</td>
<td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td>
</tr>
<tr>
<td><h5>sink.requests.max-buffered</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">10000</td>
<td>String</td>
<td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td>
</tr>
<tr>
<td><h5>sink.flush-buffer.size</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">5242880</td>
<td>Long</td>
<td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td>
</tr>
<tr>
<td><h5>sink.flush-buffer.timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">5000</td>
<td>Long</td>
<td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td>
</tr>
</tbody>
</table>

Expand Down Expand Up @@ -733,12 +799,28 @@ In the event that a job terminates within executing the shutdown hooks, stream c
In this situation the stream consumers will be gracefully reused when the application restarts.
With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`.

Data Type Mapping
----------------
# Data Type Mapping


Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword.
Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.

# Updates in 1.15

Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisDataStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisDataStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisDataStreamsSink</code>.

Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisDataStreamsSink</code>.

<code>KinesisDataStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis,
which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code>
are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
<code>sink.producer.aggregation-count</code>.

<span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.

Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options.


{{< top >}}

0 comments on commit bdfabeb

Please sign in to comment.