Skip to content

Commit

Permalink
use max byte size and not max msg length in size check (apache#27017)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-lemos authored and scwhittle committed Jun 8, 2023
1 parent 2fd0412 commit 05120c6
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ public void startBundle(StartBundleContext c) throws IOException {
public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp)
throws IOException, SizeLimitExceededException {
// Validate again here just as a sanity check.
PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchSize);
PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize);
byte[] payload = message.getPayload();
int messageSize = payload.length;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.AvroSchema;
Expand Down Expand Up @@ -742,4 +743,39 @@ public void testDynamicTopics(boolean isBounded) throws IOException {
pipeline.run();
}
}

@Test
public void testBigMessageBounded() throws IOException {
String bigMsg =
IntStream.range(0, 100_000).mapToObj(_unused -> "x").collect(Collectors.joining(""));

OutgoingMessage msg =
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(bigMsg))
.build(),
0,
null,
"projects/project/topics/topic1");

try (PubsubTestClientFactory factory =
PubsubTestClient.createFactoryForPublish(null, ImmutableList.of(msg), ImmutableList.of())) {
TimestampedValue<PubsubMessage> pubsubMsg =
TimestampedValue.of(
new PubsubMessage(
msg.getMessage().getData().toByteArray(),
Collections.emptyMap(),
msg.recordId())
.withTopic(msg.topic()),
Instant.ofEpochMilli(msg.getTimestampMsSinceEpoch()));

PCollection<PubsubMessage> messages =
pipeline.apply(
Create.timestamped(ImmutableList.of(pubsubMsg))
.withCoder(new PubsubMessageWithTopicCoder()));
messages.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
messages.apply(PubsubIO.writeMessagesDynamic().withClientFactory(factory));
pipeline.run();
}
}
}

0 comments on commit 05120c6

Please sign in to comment.