Skip to content
/ beam Public
forked from apache/beam

Commit

Permalink
Update KafkaIO's docstring to match current implementation (apache#31496
Browse files Browse the repository at this point in the history
)
  • Loading branch information
bzablocki committed Jun 14, 2024
1 parent 47736c3 commit e5a5ea9
Showing 1 changed file with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,19 @@
*
* <pre>{@code
* pipeline
* .apply(Create.of(KafkaSourceDescriptor.of(new TopicPartition("topic", 1)))
* .apply(KafkaIO.readAll()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class));
* .apply(Create.of(
* KafkaSourceDescriptor.of(
* new TopicPartition("topic", 1),
* null,
* null,
* null,
* null,
* null)))
* .apply(
* KafkaIO.<Long, String>readSourceDescriptors()
* .withBootstrapServers("broker_1:9092,broker_2:9092")
* .withKeyDeserializer(LongDeserializer.class)
* .withValueDeserializer(StringDeserializer.class));
* }</pre>
*
* Note that the {@code bootstrapServers} can also be populated from the {@link
Expand All @@ -412,8 +420,10 @@
* new TopicPartition("topic", 1),
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))
* .apply(KafkaIO.readAll()
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))))
* .apply(KafkaIO.<Long, String>readSourceDescriptors()
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class));
* }</pre>
Expand Down Expand Up @@ -441,17 +451,19 @@
*
* <pre>{@code
* pipeline
* .apply(Create.of(
* .apply(Create.of(
* KafkaSourceDescriptor.of(
* new TopicPartition("topic", 1),
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))
* .apply(KafkaIO.readAll()
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class)
* .withProcessingTime()
* .commitOffsets());
* null,
* null,
* ImmutableList.of("broker_1:9092", "broker_2:9092"))))
* .apply(KafkaIO.<Long, String>readSourceDescriptors()
* .withKeyDeserializer(LongDeserializer.class).
* .withValueDeserializer(StringDeserializer.class)
* .withProcessingTime()
* .commitOffsets());
* }</pre>
*
* <h3>Writing to Kafka</h3>
Expand Down

0 comments on commit e5a5ea9

Please sign in to comment.