Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SolaceIO write connector #32060

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open

SolaceIO write connector #32060

wants to merge 19 commits into from

Conversation

iht
Copy link
Contributor

@iht iht commented Aug 2, 2024

This is a follow-up PR to #31953, and part of the issue #31905.

This PR adds the actual writer functionality, and some additional testing, including integration testing.

This should be final PR for the SolaceIO write connector to be complete.

This PR fixes #31905.

@iht iht force-pushed the solace_writer_finalpr branch 3 times, most recently from 72ad9ff to cdabf33 Compare August 27, 2024 17:07
…31905.

This PR adds the actual writer functionality, and some additional
testing, including integration testing.

This should be final PR for the SolaceIO write connector to be
complete.
@iht iht marked this pull request as ready for review August 27, 2024 17:36
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@iht
Copy link
Contributor Author

iht commented Aug 28, 2024

assign set of reviewers

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @m-trieu for label java.
R: @shunping for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

github-actions bot commented Sep 4, 2024

Reminder, please take a look at this pr: @m-trieu @shunping

Copy link
Contributor

github-actions bot commented Sep 6, 2024

Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @Abacn for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

Copy link
Contributor

@Abacn Abacn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, had a first pass and had a couple of comments most about

  • dead letter queue pattern
  • possibly excessive dependency usage
  • minor package/class structure adjusts

getNumberOfClientsPerWorker(),
getPublishLatencyMetrics()));

return writer.withOutputTags(FAILED_PUBLISH_TAG, TupleTagList.of(SUCCESSFUL_PUBLISH_TAG));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using PCollectionTags for failed/succeeded elements is no longer a recommended pattern. Consider use a DLQ instead (if possible). ref: https://docs.google.com/document/d/1NGeCk6tOqF-TiGEAV7ixd_vhIiWz9sHPlCa1P_77Ajs/edit#heading=h.a5fbpllgc16m and a couple of IOs already implemented it

From the user end, invocation will look like https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nice, I was not aware, let me work on those changes.

}

Preconditions.checkArgument(
getMaxNumOfUsedWorkers() > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, maxNumOfUsedWorkers and numberOfClientsPerWorker currently have a constant default, or supplied with a constant parameter. Is there a concern regarding the scalability and flexibility to scale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the concern is related to Solace quotas. They are normally much more limited than the average number of threads and workers in a pipeline (in the order of hundreds or thousands, when quotas are much lower).

return 0;
};

retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look at RetryCallableManager, it is based on com.google.cloud.RetryHelper. Solace isn't a GCP cloud product, but SolaceIO depends on various gcp artifacts for non-gcp usages currently (like here). We probably want to clean up these usages in the future. For example, retry management is preferred to use a FluentBackoff (in Java SDK core)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also an existing dependency on Failsafe (https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably recommend using.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retry library does not pull any other Google Cloud dependency, it is a generic library that happens to be distributed by Google Cloud.

Copy link
Contributor

@sjvanrossum sjvanrossum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partially reviewed, I'll give this another look tomorrow.

* connector.
*/
@Internal
public final class PublishResultsReceiver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is likely not working as you intended.
The singleton in this class receives the results for all publishers, meaning that any publish result may end up in the output PCollection of any SolaceIO.Write transform.
My guess is you'd want a PublishResultReceiver per instance of the transform.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, this is actually a bug. To fix it, there should be a receiver per instance of the transform. I am working on fixing it.

Comment on lines 1089 to 1098
// Store the current window used by the input
PCollection<Solace.PublishResult> captureWindow =
records.apply(
"Capture window", ParDo.of(new UnboundedSolaceWriter.RecordToPublishResultDoFn()));

@SuppressWarnings("unchecked")
WindowingStrategy<Solace.PublishResult, BoundedWindow> windowingStrategy =
(WindowingStrategy<Solace.PublishResult, BoundedWindow>)
captureWindow.getWindowingStrategy();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if transplanting the windowing strategy is a valid operation. Given that most connectors do not preserve the input windowing strategy it might not be expected behavior either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some connectors do. "Transplanting" should work because what I do here is grabbing the settings of the input window, and apply the same window (window with same settings) to the output. I need to apply a global window "internally" in the connector to control for parallelization, I think it was just nice to "respect" the input window and reapply another window with the same settings to the output, instead of producing the output with the global window that is applied internally (leaking an implementation detail to the user).

Comment on lines 1102 to 1111
PCollection<KV<Integer, Solace.Record>> withShardKeys =
withGlobalWindow.apply(
"Add shard key",
ParDo.of(new UnboundedSolaceWriter.AddShardKeyDoFn(getMaxNumOfUsedWorkers())));

String label =
getWriterType() == WriterType.STREAMING ? "Publish (streaming)" : "Publish (batched)";

PCollectionTuple solaceOutput = withShardKeys.apply(label, getWriterTransform(destinationFn));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sharding that's applied here couples Beam's internal parallelism to the number of clients you intend to use (1 per transform instance per worker). Using GroupIntoBatches.withShardedKey() instead makes more sense, since Solace mentions their producer is thread safe for some producer patterns.

On that topic, and feel free to correct me if I'm wrong, since this transform is producing session-dependent messages the producer may only be accessed by multiple threads for direct messages.
The transform switches the producer index on a per bundle basis, but it does not exclusively claim the producer, so multiple threads can access a producer regardless of the delivery mode.
To uncomplicate the thread safety conditions of the producer you could complicate access to the producer. By putting the producer on its own thread and exposing an intermediate channel for message passing (e.g. a concurrent queue) you can decouple Beam's concurrency from the number of producers (1 per distinct configuration). The producer thread polls the channel to receive sendable work from multiple producers and sets up callbacks per the message's configuration (callbacks should be per instance, see other comment).
This carries minor overhead for session-dependent direct messages, but simplifies the rest of the setup. You could optionally use a Guava cache to expire and close producers after a period of inactivity. My remaining question here is why isn't this using session-independent messages?

Back to GroupIntoBatches, it covers all of the state and timer stuff that's happening in the writer fn and applying it would simplify the writer a great deal. It looks like it might not even need state and timers at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that GroupIntoBatches would be a more elegant solution here and it would not need a state & timers custom implementation.

About session-independent messages, that's actually what I am using :). I think my implementation for the producer does actually that. There is a concurrent map of producers, and each thread/worker polls for a producer to be used to do some work. The clients are closed automatically after inactivity (by the client libraries) and the producer map takes care of making sure the producer reconnects when needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've confused the docs and the classes introduced by this connector. The docs state that session-dependent message are created by calling create*Message on a Producer and session-independent messages are created by calling create*Message on the JCSMPFactory. The messages produced by this connector are created by the MessageProducer class of this connector. It's calling out to JCSMPFactory, so I see that this connector is indeed creating session-independent messages.

Going back over my comment then, the bit about GroupIntoBatches is still relevant although likely best addressed in a follow-up PR. As an improvement to this PR I think we can still remove the state variables that limit the DoFn's parallelism since producer instances are thread-safe and manage the connection to Solace on a separate I/O thread, so Beam's internal parallelism should have no external effect as long as the producer is shared between instances of a unique transform. Did you by chance experiment with sharing a producer per unique transform configuration or per unique connection configuration instead of the fixed pool of 100 producers? If multiple producers improve throughput, then would it make sense to allow users to configure the size of the producer pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, let me finish the changes for this PR, and I will switch to GroupIntoBatches in an upcoming PR.

Comment on lines 1112 to 1129
SolaceOutput output;
if (getDeliveryMode() == DeliveryMode.PERSISTENT) {
PCollection<Solace.PublishResult> failedPublish = solaceOutput.get(FAILED_PUBLISH_TAG);
PCollection<Solace.PublishResult> successfulPublish =
solaceOutput.get(SUCCESSFUL_PUBLISH_TAG);
output =
rewindow(
SolaceOutput.in(input.getPipeline(), failedPublish, successfulPublish),
windowingStrategy);
} else {
LOG.info(
String.format(
"Solace.Write: omitting writer output because delivery mode is %s",
getDeliveryMode()));
output = SolaceOutput.in(input.getPipeline(), null, null);
}

return output;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do both of these need to be the same output type? PublishResult contains a boolean for success or failure, metrics and an error message. It seems to me like you would want to provide the user a PCollection<Solace.Record> for successses, PCollection for failures and PCollection for metrics.
See the note about using the new DLQ pattern as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But that's not possible, unless we accumulate the processed Solace.Record and then we check all the responses, match each response to each incoming Solace.Record and put the right record in the right place. I think that approach would be too heavyweight. The replies are received via "callbacks" (polling by the client for responses actually, as implemented in the Solace client libraries) by any thread in any worker (potentiall a worker different to the one that sent the original message). So doing this would require shuffling too.

The reply from Solace only contains some metadata (id, timestamp), and it is this info the one that the connector returns, which is a much simpler process.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I thought the record was propagated on the result. The writer should not be polling for any responses other than those for its own requests. Allowing any publish result to be emitted as output means the output could contain results from elements that belong to different bundles or windows. Reapplying the input windowing strategy is effectively useless if the input elements and output elements can appear in different bundles and windows.

I'm not convinced you're receiving responses to requests initiated on other machines though, that would be incredibly unhelpful behavior of a message broker and it makes distributed processing impractical. It also renders the latency metrics of this connector meaningless because the result of System.nanoTime() is not related to synchronized system or wall-clock time and can thus only be used to measure the duration between two results that were produced on the same machine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch my previous comment, you are right. The callbacks are received in the same client that sent the initial request. And I could keep the full record then to emit it when the callback arrives.

For instance, currently we actually keep a timestamp of when the message is sent.

Let me think of this, and I will send more changes in an upcoming PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did some digging and found a few undocumented JCSMP methods which are used by the new Java API to make the resource ownership flow a little more like what I was expecting.

The methods are marked as @SolReserved on Jcsmpsession and are named createProducer. If I understand correctly, a session owns the outbound connection to Solace and a producer uses that resource to produce messages. However, the documented getProducer method specifies that it will replace a previously returned producer, but I'd have to validate that.

The ownership and lifetime model that you probably want here is one where there's a shared, long-lived session per unique transform in a static variable, some shared map keyed on the transform configuration, and a producer per instance of the transform all multiplexed on the shared session. Every writer instance needs its own producer since the callbacks are passed as construction arguments to the producer, that's the only way to ensure that responses are routed to the originating requestor.

Could you check if repeated calls to getProducer invalidate prior return values like the docs say? If they do, then we'll likely need to use createProducer and verify with Solace that it's ok to call undocumented public methods on their classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The connector guarantees that if there is already a producer, it is reused. If the producer is closed, a new one is created. The connector only closes producers when a worker is destroyed.

The receiver that gets all the callbacks calls is the same for all the producers (static variable, which I actually have to change), so even if there is a new producer, as long as it is in the same worker, it will receive the same callbacks.

The static variable approach is actually buggy, two instances of the Write connector will use the same instance. I am changing it to a single instance receiving the callbacks per instance of the Write transform. But the same principle applies: all the callbacks are always received in the same object, and the producers are always reused for as long as Solace permits (and closed if the worker is destroyed).

return 0;
};

retryCallableManager.retryCallable(publish, ImmutableSet.of(JCSMPException.class));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also an existing dependency on Failsafe (https://github.com/failsafe-lib/failsafe) in Beam, which I'd probably recommend using.

Comment on lines +68 to +70
if (queue != null) {
builder = builder.queueName(queue.getName());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change from throwing to checking?
Also, given that AutoValue implementations are immutable, deriving from a class that allows you to mutate the queue name seems to run counter to what the framework provides. SessionServiceFactory should probably be an interface declaring create and optionally define an abstract class that implements the interface (abstract of course) and declares equals and hashCode abstract if your intention is to have all implementors explicitly override those methods for value equality.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was initially used for the Read connector alone, which requires that queue property to be set, and should fail if it is not set.

The Write connector does not need a pre-existing queue. I changed the queueName parameter to @Nullable and this to a check since now the queueName value is optional.

Initially, there were different classes for SessionService-ish for both connectors, with different properties. Now we use a single class, for consistency across the Read and Write connectors.

Comment on lines +63 to +64
messageId = key.toString();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a supertype or exhaustive list of types that could be compared to instead? Calling Object.toString() might not be appropriate if we don't know what type might be provided here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Object :).

Notice that I am overrideing the responseReceivedEx method, and that key is an Object. That´s all we know.

The documentation of Solace has this example (TL;DR: what I did above, except that they always use a single class for all the keys): https://tutorials.solace.dev/jcsmp/confirmed-delivery/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know what are the types that are being sent, because we are sending them. The connector will send either strings or CorrelationKey, depending on the settings of the write connector. That else block should never happened. I tried to be defensive, maybe I should just log an error instead in that else block?

import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;

@Internal
public class SolaceMessageProducer extends MessageProducer {
Copy link
Contributor

@sjvanrossum sjvanrossum Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there really be any other implementations of MessageProducer besides this implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for testing purposes for instance (mock message producers).

* UnboundedStreamingSolaceWriter.WriterDoFn} but also higher latency.
*/
@Internal
public static class WriterDoFn extends UnboundedSolaceWriter.AbstractWriterDoFn {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See notes in SolaceIO, I believe this can be simplified using GroupIntoBatches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted for a future PR :).

@iht
Copy link
Contributor Author

iht commented Sep 14, 2024

I have uploaded new commits that should be addressing all the comments that I have resolved. Each commit should each issue separately, so it would correspond to a single comment, or to a set of some comments that are all about the same issue.

There are still more changes pending, this should not be merged yet, as one of the pending changes is about a bug in the way the Solace responses are handled.

@Abacn @sjvanrossum please review the new commits in the meanwhile, as addressing the remaining changes will take me several days.

This DoFn is a stateful DoFn to force a shuffling with a given
input key set cardinality.
The warnings are only shown if the user decided to set the
properties that are overriden by the connector.

This was changed in one of the previous commits but it is
actually a bug. I am reverting that change and changing this to a
switch block, to make it more clear that the properties need to be
set always by the connector.
This lets the user to fully control all the properties used by the connector,
instead of making sensible choices on its behalf.

This also adds some logging to be more explicit about what the connector is
doing. This does not add too much logging pressure, this only adds logging at
the producer creation moment.
I forgot to pass the submission mode when the write session is created, and I
called the wrong method in the base class because it was defined as public.

This makes sure that the submission mode is passed to the session when the
session is created for writing messages.
@iht
Copy link
Contributor Author

iht commented Sep 15, 2024

Another round of commits just added, each commit should roughly correspond to one of the resolved comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request]: Support writing to a Solace message broker
4 participants