Skip to content

Commit

Permalink
[FLINK-22698][connectors/rabbitmq] Add deliveryTimeout to RabbitMQ so…
Browse files Browse the repository at this point in the history
…urce

This change enables setting the message delivery timeout in the RabbitMQ queueing consumer, allowing to properly stop the job in cases when no new message is available on the queue.

Changes:
- Add the ability to setDeliveryTimeout on the RMQConnectionConfig and its builder
- Change default message delivery timeout in the RMQSource to 30 seconds (previously there was no timeout)
- Extend RabbitMQ source unit tests
  • Loading branch information
cmick authored and AHeise committed Jun 22, 2021
1 parent 685e093 commit 79df4d2
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 12 deletions.
13 changes: 13 additions & 0 deletions flink-connectors/flink-connector-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.15.3</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,14 @@ private void processMessage(Delivery delivery, RMQCollectorImpl collector) throw
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
final long timeout = rmqConnectionConfig.getDeliveryTimeout();
while (running) {
Delivery delivery = consumer.nextDelivery();
Delivery delivery = consumer.nextDelivery(timeout);

synchronized (ctx.getCheckpointLock()) {
processMessage(delivery, collector);
if (delivery != null) {
processMessage(delivery, collector);
}
if (collector.isEndOfStreamSignalled()) {
this.running = false;
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,24 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
* Connection Configuration for RMQ. If {@link Builder#setUri(String)} has been set then {@link
* RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer, Integer,
* Integer, Integer, Integer)} will be used for initialize the RMQ connection or {@link
* Integer, Integer, Integer, Long)} will be used for initialize the RMQ connection or {@link
* RMQConnectionConfig#RMQConnectionConfig(String, Integer, String, String, String, Integer,
* Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)} will be used for initialize the
* RMQ connection
* Boolean, Boolean, Integer, Integer, Integer, Integer, Integer, Long)} will be used for initialize
* the RMQ connection
*/
public class RMQConnectionConfig implements Serializable {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class);

private static final long DEFAULT_DELIVERY_TIMEOUT = 30000;

private String host;
private Integer port;
private String virtualHost;
Expand All @@ -61,6 +64,7 @@ public class RMQConnectionConfig implements Serializable {
private Integer requestedHeartbeat;

private Integer prefetchCount;
private final long deliveryTimeout;

/**
* @param host host name
Expand All @@ -75,6 +79,7 @@ public class RMQConnectionConfig implements Serializable {
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @param deliveryTimeout message delivery timeout in the queueing consumer
* @throws NullPointerException if host or virtual host or username or password is null
*/
private RMQConnectionConfig(
Expand All @@ -90,12 +95,15 @@ private RMQConnectionConfig(
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
Integer prefetchCount) {
Integer prefetchCount,
Long deliveryTimeout) {
Preconditions.checkNotNull(host, "host can not be null");
Preconditions.checkNotNull(port, "port can not be null");
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
Preconditions.checkNotNull(username, "username can not be null");
Preconditions.checkNotNull(password, "password can not be null");
Preconditions.checkArgument(
deliveryTimeout == null || deliveryTimeout > 0, "deliveryTimeout must be positive");
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
Expand All @@ -110,6 +118,8 @@ private RMQConnectionConfig(
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
this.prefetchCount = prefetchCount;
this.deliveryTimeout =
Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
}

/**
Expand All @@ -121,6 +131,7 @@ private RMQConnectionConfig(
* @param requestedChannelMax requested maximum channel number
* @param requestedFrameMax requested maximum frame size
* @param requestedHeartbeat requested heartbeat interval
* @param deliveryTimeout message delivery timeout in the queueing consumer
* @throws NullPointerException if URI is null
*/
private RMQConnectionConfig(
Expand All @@ -132,8 +143,11 @@ private RMQConnectionConfig(
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
Integer prefetchCount) {
Integer prefetchCount,
Long deliveryTimeout) {
Preconditions.checkNotNull(uri, "Uri can not be null");
Preconditions.checkArgument(
deliveryTimeout == null || deliveryTimeout > 0, "deliveryTimeout must be positive");
this.uri = uri;

this.networkRecoveryInterval = networkRecoveryInterval;
Expand All @@ -144,6 +158,8 @@ private RMQConnectionConfig(
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
this.prefetchCount = prefetchCount;
this.deliveryTimeout =
Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
}

/** @return the host to use for connections */
Expand Down Expand Up @@ -265,6 +281,16 @@ public Optional<Integer> getPrefetchCount() {
return Optional.ofNullable(prefetchCount);
}

/**
* Retrieve the message delivery timeout used in the queueing consumer. If not specified
* explicitly, the default value of 30000 milliseconds will be returned.
*
* @return the message delivery timeout, in milliseconds
*/
public long getDeliveryTimeout() {
return deliveryTimeout;
}

/**
* @return Connection Factory for RMQ
* @throws URISyntaxException if Malformed URI has been passed
Expand Down Expand Up @@ -343,6 +369,8 @@ public static class Builder {
// basicQos options for consumers
private Integer prefetchCount;

private Long deliveryTimeout;

private String uri;

/**
Expand Down Expand Up @@ -506,16 +534,42 @@ public Builder setPrefetchCount(int prefetchCount) {
return this;
}

/**
* Enables setting the message delivery timeout in the queueing consumer. Only applicable to
* the {@link RMQSource}. If not set it will default to 30000.
*
* @param deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
* @return the Builder
*/
public Builder setDeliveryTimeout(long deliveryTimeout) {
Preconditions.checkArgument(deliveryTimeout > 0, "deliveryTimeout must be positive");
this.deliveryTimeout = deliveryTimeout;
return this;
}

/**
* Enables setting the message delivery timeout in the queueing consumer. Only applicable to
* the {@link RMQSource}. If not set it will default to 30 seconds.
*
* @param deliveryTimeout maximum wait time for the next message delivery
* @param unit deliveryTimeout unit
* @return the Builder
*/
public Builder setDeliveryTimeout(long deliveryTimeout, TimeUnit unit) {
return setDeliveryTimeout(unit.toMillis(deliveryTimeout));
}

/**
* The Builder method.
*
* <p>If URI is NULL we use host, port, vHost, username, password combination to initialize
* connection. using {@link RMQConnectionConfig#RMQConnectionConfig(String, Integer, String,
* String, String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer)}.
* String, String, Integer, Boolean, Boolean, Integer, Integer, Integer, Integer, Integer,
* Long)}.
*
* <p>Otherwise the URI will be used to initialize the client connection {@link
* RMQConnectionConfig#RMQConnectionConfig(String, Integer, Boolean, Boolean, Integer,
* Integer, Integer, Integer, Integer)}
* Integer, Integer, Integer, Integer, Long)}
*
* @return RMQConnectionConfig
*/
Expand All @@ -530,7 +584,8 @@ public RMQConnectionConfig build() {
this.requestedChannelMax,
this.requestedFrameMax,
this.requestedHeartbeat,
this.prefetchCount);
this.prefetchCount,
this.deliveryTimeout);
} else {
return new RMQConnectionConfig(
this.host,
Expand All @@ -545,7 +600,8 @@ public RMQConnectionConfig build() {
this.requestedChannelMax,
this.requestedFrameMax,
this.requestedHeartbeat,
this.prefetchCount);
this.prefetchCount,
this.deliveryTimeout);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.flink.streaming.connectors.rabbitmq;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.test.util.MiniClusterWithClientResource;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.containers.RabbitMQContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeoutException;

/** A class containing RabbitMQ source tests against a real RabbiMQ cluster. */
public class RMQSourceITCase {

private static final int RABBITMQ_PORT = 5672;
private static final String QUEUE_NAME = "test-queue";
private static final JobID JOB_ID = new JobID();

private RestClusterClient<?> clusterClient;
private RMQConnectionConfig config;

@Rule public final TemporaryFolder tmp = new TemporaryFolder();

@Rule
public final MiniClusterWithClientResource flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(1)
.setNumberTaskManagers(1)
.build());

@ClassRule
public static final RabbitMQContainer RMQ_CONTAINER =
new RabbitMQContainer(
DockerImageName.parse("rabbitmq").withTag("3.7.25-management-alpine"))
.withExposedPorts(RABBITMQ_PORT)
.waitingFor(Wait.forListeningPort());

@Before
public void setUp() throws Exception {
final Connection connection = getRMQConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.txSelect();
clusterClient = flinkCluster.getRestClusterClient();
config =
new RMQConnectionConfig.Builder()
.setHost(RMQ_CONTAINER.getHost())
.setDeliveryTimeout(500)
.setVirtualHost("/")
.setUserName(RMQ_CONTAINER.getAdminUsername())
.setPassword(RMQ_CONTAINER.getAdminPassword())
.setPort(RMQ_CONTAINER.getMappedPort(RABBITMQ_PORT))
.build();
}

@Test
public void testStopWithSavepoint() throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStreamSource<String> source =
env.addSource(new RMQSource<>(config, QUEUE_NAME, new SimpleStringSchema()));
source.addSink(new DiscardingSink<>());
env.enableCheckpointing(500);
final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobGraph.setJobID(JOB_ID);
clusterClient.submitJob(jobGraph).get();
CommonTestUtils.waitUntilCondition(
() ->
clusterClient.getJobStatus(JOB_ID).get() == JobStatus.RUNNING
&& clusterClient.getJobDetails(JOB_ID).get().getJobVertexInfos()
.stream()
.allMatch(
info ->
info.getExecutionState()
== ExecutionState.RUNNING),
Deadline.fromNow(Duration.ofSeconds(10)),
5L);

clusterClient.stopWithSavepoint(JOB_ID, false, tmp.newFolder().getAbsolutePath()).get();
}

private static Connection getRMQConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(RMQ_CONTAINER.getAdminUsername());
factory.setPassword(RMQ_CONTAINER.getAdminPassword());
factory.setVirtualHost("/");
factory.setHost(RMQ_CONTAINER.getHost());
factory.setPort(RMQ_CONTAINER.getAmqpPort());
return factory.newConnection();
}
}
Loading

0 comments on commit 79df4d2

Please sign in to comment.