Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolaceIO write connector #32060

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Use static imports for Preconditions
  • Loading branch information
iht committed Sep 13, 2024
commit 8443c4ddeaf128fa2b5c859daa14815955acf781
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.solace;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

Expand Down Expand Up @@ -62,7 +63,6 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -1203,32 +1203,32 @@ private static Window<Solace.PublishResult> captureWindowDetails(
*/
private void validateWriteTransform(boolean usingSolaceRecords) {
iht marked this conversation as resolved.
Show resolved Hide resolved
if (!usingSolaceRecords) {
Preconditions.checkArgument(
getFormatFunction() != null,
checkNotNull(
getFormatFunction(),
"SolaceIO.Write: If you are not using Solace.Record as the input type, you"
+ " must set a format function using withFormatFunction().");
}

Preconditions.checkArgument(
checkArgument(
getMaxNumOfUsedWorkers() > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, maxNumOfUsedWorkers and numberOfClientsPerWorker currently have a constant default, or supplied with a constant parameter. Is there a concern regarding the scalability and flexibility to scale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the concern is related to Solace quotas. They are normally much more limited than the average number of threads and workers in a pipeline (in the order of hundreds or thousands, when quotas are much lower).

"SolaceIO.Write: The number of used workers must be positive.");
Preconditions.checkArgument(
checkArgument(
getNumberOfClientsPerWorker() > 0,
"SolaceIO.Write: The number of clients per worker must be positive.");
Preconditions.checkArgument(
checkArgument(
getDeliveryMode() == DeliveryMode.DIRECT || getDeliveryMode() == DeliveryMode.PERSISTENT,
String.format(
"SolaceIO.Write: Delivery mode must be either DIRECT or PERSISTENT. %s"
+ " not supported",
getDeliveryMode()));
if (getPublishLatencyMetrics()) {
Preconditions.checkArgument(
checkArgument(
getDeliveryMode() == DeliveryMode.PERSISTENT,
"SolaceIO.Write: Publish latency metrics can only be enabled for PERSISTENT"
+ " delivery mode.");
}
Preconditions.checkArgument(
getSessionServiceFactory() != null,
checkNotNull(
getSessionServiceFactory(),
"SolaceIO: You need to pass a session service factory. For basic"
+ " authentication, you can use BasicAuthJcsmpSessionServiceFactory.");
}
Expand Down