Skip to content

Commit

Permalink
[apache#25887] fix(JmsIO): issue with multiple connection open apache…
Browse files Browse the repository at this point in the history
…#25887 (apache#25945)

* [apache#25887] fix(JmsIO): issue with multiple connection open apache#25887

- Issue related to multiple connection being open for each bundle
- Add integration test using jms-qpid for JmsIO

Fixes apache#25887

Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com>

* [apache#25887] fix(JmsIO): create new CommonJms instance instead of extending it apache#25887

Fixes apache#25887

Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com>

* [apache#25887] fix(JmsIO): replacing flag with producer null verification apache#25887

Fixes apache#25887

Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com>

* [apache#25887] fix(JmsIO): replace google timestamp with java instant apache#25887

Fixes apache#25887

Co-Authored-By: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com>

---------

Co-authored-by: Amrane Ait Zeouay <122456352+amranezeRenault@users.noreply.github.com>
  • Loading branch information
2 people authored and rezarokni committed May 22, 2023
1 parent 63c4eff commit f2cf566
Show file tree
Hide file tree
Showing 11 changed files with 620 additions and 102 deletions.
3 changes: 3 additions & 0 deletions .test-infra/jenkins/job_PreCommit_Java_IOs.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ def additionalTasks = [
snowflake: [
':sdks:java:io:snowflake:expansion-service:build',
],
jms: [
':sdks:java:io:jms:integrationTest',
],
]

// In case the test suite name is different from the project folder name
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* BigQuery Storage Write API is now available in Python SDK via cross-language ([#21961](https://github.com/apache/beam/issues/21961)).
* Added HbaseIO support for writing RowMutations (ordered by rowkey) to Hbase (Java) ([#25830](https://github.com/apache/beam/issues/25830)).
* Added fileio transforms MatchFiles, MatchAll and ReadMatches (Go) ([#25779](https://github.com/apache/beam/issues/25779)).
* Add integration test for JmsIO + fix issue with multiple connections (Java) ([#25887](https://github.com/apache/beam/issues/25887)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,7 @@ class BeamModulePlugin implements Plugin<Project> {
def powermock_version = "2.0.9"
// Try to keep protobuf_version consistent with the protobuf version in google_cloud_platform_libraries_bom
def protobuf_version = "3.21.12"
def qpid_jms_client_version = "0.61.0"
def quickcheck_version = "1.0"
def sbe_tool_version = "1.25.1"
def singlestore_jdbc_version = "1.1.4"
Expand All @@ -565,6 +566,7 @@ class BeamModulePlugin implements Plugin<Project> {
def testcontainers_version = "1.17.3"
def arrow_version = "5.0.0"
def jmh_version = "1.34"
def jupiter_version = "5.7.0"

// Export Spark versions, so they are defined in a single place only
project.ext.spark3_version = spark3_version
Expand Down Expand Up @@ -746,6 +748,9 @@ class BeamModulePlugin implements Plugin<Project> {
json_org : "org.json:json:20220320", // Keep in sync with everit-json-schema / google_cloud_platform_libraries_bom transitive deps.
everit_json_schema : "com.github.erosb:everit-json-schema:${everit_json_version}",
junit : "junit:junit:4.13.1",
jupiter_api : "org.junit.jupiter:junit-jupiter-api:$jupiter_version",
jupiter_engine : "org.junit.jupiter:junit-jupiter-engine:$jupiter_version",
jupiter_params : "org.junit.jupiter:junit-jupiter-params:$jupiter_version",
kafka : "org.apache.kafka:kafka_2.11:$kafka_version",
kafka_clients : "org.apache.kafka:kafka-clients:$kafka_version",
log4j : "log4j:log4j:1.2.17",
Expand Down Expand Up @@ -778,6 +783,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
qpid_jms_client : "org.apache.qpid:qpid-jms-client:$qpid_jms_client_version",
sbe_tool : "uk.co.real-logic:sbe-tool:$sbe_tool_version",
singlestore_jdbc : "com.singlestore:singlestore-jdbc-client:$singlestore_jdbc_version",
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
Expand Down
7 changes: 7 additions & 0 deletions sdks/java/io/jms/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.jms',
)
provideIntegrationTestingDependencies()
enableJavaPerformanceTesting()

description = "Apache Beam :: SDKs :: Java :: IO :: JMS"
ext.summary = """IO to read and write to JMS (Java Messaging Service)
Expand All @@ -31,13 +33,18 @@ dependencies {
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1"
testImplementation library.java.activemq_amqp
testImplementation library.java.activemq_broker
testImplementation library.java.activemq_jaas
testImplementation library.java.activemq_kahadb_store
testImplementation library.java.activemq_client
testImplementation library.java.hamcrest
testImplementation library.java.junit
testImplementation library.java.mockito_core
testImplementation library.java.mockito_inline
testImplementation library.java.qpid_jms_client
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
testImplementation project(path: ":sdks:java:testing:test-utils", configuration: "testRuntimeMigration")
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -988,17 +989,16 @@ private static class JmsConnection<T> implements Serializable {
private transient @Initialized Destination destination;
private transient @Initialized MessageProducer producer;

private boolean isProducerNeedsToBeCreated = true;
private final JmsIO.Write<T> spec;
private final Counter connectionErrors =
Metrics.counter(JMS_IO_PRODUCER_METRIC_NAME, CONNECTION_ERRORS_METRIC_NAME);

public JmsConnection(Write<T> spec) {
JmsConnection(Write<T> spec) {
this.spec = spec;
}

public void start() throws JMSException {
if (isProducerNeedsToBeCreated) {
void connect() throws JMSException {
if (this.producer == null) {
ConnectionFactory connectionFactory = spec.getConnectionFactory();
if (spec.getUsername() != null) {
this.connection =
Expand All @@ -1008,7 +1008,6 @@ public void start() throws JMSException {
}
this.connection.setExceptionListener(
exception -> {
this.isProducerNeedsToBeCreated = true;
this.connectionErrors.inc();
});
this.connection.start();
Expand All @@ -1021,12 +1020,11 @@ public void start() throws JMSException {
this.destination = session.createTopic(spec.getTopic());
}
// Create producer with null destination. Destination will be set with producer.send().
this.producer = this.session.createProducer(null);
this.isProducerNeedsToBeCreated = false;
startProducer();
}
}

public void publishMessage(T input) throws JMSException, JmsIOException {
void publishMessage(T input) throws JMSException, JmsIOException {
Destination destinationToSendTo = destination;
try {
Message message = spec.getValueMapper().apply(input, session);
Expand All @@ -1043,24 +1041,30 @@ public void publishMessage(T input) throws JMSException, JmsIOException {
}
}

public void close() throws JMSException {
isProducerNeedsToBeCreated = true;
void startProducer() throws JMSException {
this.producer = this.session.createProducer(null);
}

void closeProducer() throws JMSException {
if (producer != null) {
producer.close();
producer = null;
}
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
try {
// If the connection failed, stopping the connection will throw a JMSException
connection.stop();
} catch (JMSException exception) {
LOG.warn("The connection couldn't be closed", exception);
}

void close() {
try {
closeProducer();
if (session != null) {
session.close();
}
connection.close();
if (connection != null) {
connection.close();
}
} catch (JMSException exception) {
LOG.warn("The connection couldn't be closed", exception);
} finally {
session = null;
connection = null;
}
}
Expand All @@ -1083,8 +1087,10 @@ static class JmsIOProducerFn<T> extends DoFn<T, T> {
}

@Setup
public void setup() {
RetryConfiguration retryConfiguration = checkStateNotNull(spec.getRetryConfiguration());
public void setup() throws JMSException {
this.jmsConnection.connect();
RetryConfiguration retryConfiguration =
MoreObjects.firstNonNull(spec.getRetryConfiguration(), RetryConfiguration.create());
retryBackOff =
FluentBackoff.DEFAULT
.withInitialBackoff(checkStateNotNull(retryConfiguration.getInitialDuration()))
Expand All @@ -1094,7 +1100,7 @@ public void setup() {

@StartBundle
public void startBundle() throws JMSException {
this.jmsConnection.start();
this.jmsConnection.startProducer();
}

@ProcessElement
Expand Down Expand Up @@ -1130,11 +1136,11 @@ private void publishMessage(T input)

@FinishBundle
public void finishBundle() throws JMSException {
this.jmsConnection.close();
this.jmsConnection.closeProducer();
}

@Teardown
public void tearDown() throws JMSException {
public void tearDown() {
this.jmsConnection.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

@AutoValue
public abstract class RetryConfiguration implements Serializable {
private static final Integer DEFAULT_MAX_ATTEMPTS = 5;
private static final Duration DEFAULT_INITIAL_BACKOFF = Duration.standardSeconds(15);
private static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF = Duration.standardDays(1000);

Expand All @@ -35,6 +36,10 @@ public abstract class RetryConfiguration implements Serializable {

abstract @Nullable Duration getInitialDuration();

public static RetryConfiguration create() {
return create(DEFAULT_MAX_ATTEMPTS, null, null);
}

public static RetryConfiguration create(int maxAttempts) {
return create(maxAttempts, null, null);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.jms;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.amqp.AmqpTransportFactory;

/**
* A common test fixture to create a broker and connection factories for {@link JmsIOIT} & {@link
* JmsIOTest}.
*/
public class CommonJms implements Serializable {
private static final String BROKER_WITHOUT_PREFETCH_PARAM = "?jms.prefetchPolicy.all=0&";

static final String USERNAME = "test_user";
static final String PASSWORD = "test_password";
static final String QUEUE = "test_queue";
static final String TOPIC = "test_topic";

private final String brokerUrl;
private final Integer brokerPort;
private final String forceAsyncAcksParam;
private transient BrokerService broker;

protected ConnectionFactory connectionFactory;
protected final Class<? extends ConnectionFactory> connectionFactoryClass;
protected ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;

public CommonJms(
String brokerUrl,
Integer brokerPort,
String forceAsyncAcksParam,
Class<? extends ConnectionFactory> connectionFactoryClass) {
this.brokerUrl = brokerUrl;
this.brokerPort = brokerPort;
this.forceAsyncAcksParam = forceAsyncAcksParam;
this.connectionFactoryClass = connectionFactoryClass;
}

void startBroker() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
TransportFactory.registerTransportFactory("amqp", new AmqpTransportFactory());
if (connectionFactoryClass != ActiveMQConnectionFactory.class) {
broker.addConnector(String.format("%s:%d?transport.transformer=jms", brokerUrl, brokerPort));
} else {
broker.addConnector(brokerUrl);
}
broker.setBrokerName("localhost");
broker.setPopulateJMSXUserID(true);
broker.setUseAuthenticatedPrincipalForJMSXUserID(true);
broker.getManagementContext().setCreateConnector(false);

// enable authentication
List<AuthenticationUser> users = new ArrayList<>();
// username and password to use to connect to the broker.
// This user has users privilege (able to browse, consume, produce, list destinations)
users.add(new AuthenticationUser(USERNAME, PASSWORD, "users"));
SimpleAuthenticationPlugin plugin = new SimpleAuthenticationPlugin(users);
BrokerPlugin[] plugins = new BrokerPlugin[] {plugin};
broker.setPlugins(plugins);

broker.start();
broker.waitUntilStarted();

// create JMS connection factory
connectionFactory = connectionFactoryClass.getConstructor(String.class).newInstance(brokerUrl);
connectionFactoryWithSyncAcksAndWithoutPrefetch =
connectionFactoryClass
.getConstructor(String.class)
.newInstance(brokerUrl + BROKER_WITHOUT_PREFETCH_PARAM + forceAsyncAcksParam);
}

void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = null;
}

Class<? extends ConnectionFactory> getConnectionFactoryClass() {
return this.connectionFactoryClass;
}

ConnectionFactory getConnectionFactory() {
return this.connectionFactory;
}

ConnectionFactory getConnectionFactoryWithSyncAcksAndWithoutPrefetch() {
return this.connectionFactoryWithSyncAcksAndWithoutPrefetch;
}

/** A test class that maps a {@link javax.jms.BytesMessage} into a {@link String}. */
public static class BytesMessageToStringMessageMapper implements JmsIO.MessageMapper<String> {

@Override
public String mapMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage) message;

byte[] bytes = new byte[(int) bytesMessage.getBodyLength()];

return new String(bytes, StandardCharsets.UTF_8);
}
}
}
Loading

0 comments on commit f2cf566

Please sign in to comment.