Skip to content

Commit

Permalink
KAFKA-1515 Fix a bug that could result in blocking for a long period …
Browse files Browse the repository at this point in the history
…of time in the producer. Patch from Guozhang Wang.
  • Loading branch information
jkreps committed Jul 8, 2014
1 parent 6de56b3 commit cd3ce27
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void handleDisconnections(List<ClientResponse> responses, long now) {
}
// we got a disconnect so we should probably refresh our metadata and see if that broker is dead
if (this.selector.disconnected().size() > 0)
this.metadata.forceUpdate();
this.metadata.requestUpdate();
}

/**
Expand Down Expand Up @@ -375,7 +375,7 @@ private void initiateConnect(Node node, long now) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */
metadata.forceUpdate();
metadata.requestUpdate();
log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class KafkaProducer implements Producer {
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
private final Time time;

/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
Expand All @@ -94,7 +96,7 @@ public KafkaProducer(Properties properties) {

private KafkaProducer(ProducerConfig config) {
log.trace("Starting the Kafka producer");
Time time = new SystemTime();
this.time = new SystemTime();
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
Expand All @@ -119,7 +121,7 @@ private KafkaProducer(ProducerConfig config) {
metrics,
time);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

NetworkClient client = new NetworkClient(new Selector(this.metrics, time),
this.metadata,
Expand Down Expand Up @@ -225,8 +227,9 @@ public Future<RecordMetadata> send(ProducerRecord record) {
@Override
public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
try {
Cluster cluster = metadata.fetch(record.topic(), this.metadataFetchTimeoutMs);
int partition = partitioner.partition(record, cluster);
// first make sure the metadata for the topic is available
waitOnMetadata(record.topic(), this.metadataFetchTimeoutMs);
int partition = partitioner.partition(record, metadata.fetch());
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(record.key(), record.value());
ensureValidRecordSize(serializedSize);
TopicPartition tp = new TopicPartition(record.topic(), partition);
Expand Down Expand Up @@ -255,6 +258,31 @@ public Future<RecordMetadata> send(ProducerRecord record, Callback callback) {
}
}

/**
* Wait for cluster metadata including partitions for the given topic to be available.
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum time in ms for waiting on the metadata
*/
private void waitOnMetadata(String topic, long maxWaitMs) {
if (metadata.fetch().partitionsForTopic(topic) != null) {
return;
} else {
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
while (metadata.fetch().partitionsForTopic(topic) == null) {
log.trace("Requesting metadata update for topic {}.", topic);
int version = metadata.requestUpdate();
metadata.add(topic);
sender.wakeup();
metadata.awaitUpdate(version, remainingWaitMs);
long elapsed = time.milliseconds() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}
}

/**
* Validate that the record size isn't too large
*/
Expand All @@ -271,8 +299,10 @@ private void ensureValidRecordSize(int size) {
" configuration.");
}

@Override
public List<PartitionInfo> partitionsFor(String topic) {
return this.metadata.fetch(topic, this.metadataFetchTimeoutMs).partitionsForTopic(topic);
waitOnMetadata(topic, this.metadataFetchTimeoutMs);
return this.metadata.fetch().partitionsForTopic(topic);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
package org.apache.kafka.clients.producer.internals;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -36,9 +34,10 @@ public final class Metadata {

private final long refreshBackoffMs;
private final long metadataExpireMs;
private int version;
private long lastRefreshMs;
private Cluster cluster;
private boolean forceUpdate;
private boolean needUpdate;
private final Set<String> topics;

/**
Expand All @@ -58,8 +57,9 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.lastRefreshMs = 0L;
this.version = 0;
this.cluster = Cluster.empty();
this.forceUpdate = false;
this.needUpdate = false;
this.topics = new HashSet<String>();
}

Expand All @@ -71,33 +71,10 @@ public synchronized Cluster fetch() {
}

/**
* Fetch cluster metadata including partitions for the given topic. If there is no metadata for the given topic,
* block waiting for an update.
* @param topic The topic we want metadata for
* @param maxWaitMs The maximum amount of time to block waiting for metadata
* Add the topic to maintain in the metadata
*/
public synchronized Cluster fetch(String topic, long maxWaitMs) {
List<PartitionInfo> partitions = null;
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
do {
partitions = cluster.partitionsForTopic(topic);
if (partitions == null) {
topics.add(topic);
forceUpdate = true;
try {
log.trace("Requesting metadata update for topic {}.", topic);
wait(remainingWaitMs);
} catch (InterruptedException e) { /* this is fine, just try again */
}
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
} else {
return cluster;
}
} while (true);
public synchronized void add(String topic) {
topics.add(topic);
}

/**
Expand All @@ -106,16 +83,35 @@ public synchronized Cluster fetch(String topic, long maxWaitMs) {
* been request then the expiry time is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = forceUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToExpire = needUpdate ? 0 : Math.max(this.lastRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

/**
* Force an update of the current cluster info
* Request an update of the current cluster metadata info, return the current version before the update
*/
public synchronized void forceUpdate() {
this.forceUpdate = true;
public synchronized int requestUpdate() {
this.needUpdate = true;
return this.version;
}

/**
* Wait for metadata update until the current version is larger than the last version we know of
*/
public synchronized void awaitUpdate(int lastVerison, long maxWaitMs) {
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while (this.version <= lastVerison) {
try {
wait(remainingWaitMs);
} catch (InterruptedException e) { /* this is fine */
}
long elapsed = System.currentTimeMillis() - begin;
if (elapsed >= maxWaitMs)
throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
}

/**
Expand All @@ -129,8 +125,9 @@ public synchronized Set<String> topics() {
* Update the cluster metadata
*/
public synchronized void update(Cluster cluster, long now) {
this.forceUpdate = false;
this.needUpdate = false;
this.lastRefreshMs = now;
this.version += 1;
this.cluster = cluster;
notifyAll();
log.debug("Updated cluster metadata to {}", cluster);
Expand All @@ -142,5 +139,4 @@ public synchronized void update(Cluster cluster, long now) {
public synchronized long lastUpdate() {
return this.lastRefreshMs;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void run(long now) {

// if there are any partitions whose leaders are not known yet, force metadata update
if (result.unknownLeadersExist)
this.metadata.forceUpdate();
this.metadata.requestUpdate();

// remove any nodes we aren't ready to send to
Iterator<Node> iter = result.readyNodes.iterator();
Expand Down Expand Up @@ -252,7 +252,7 @@ private void completeBatch(RecordBatch batch, Errors error, long baseOffset, lon
this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
}
if (error.exception() instanceof InvalidMetadataException)
metadata.forceUpdate();
metadata.requestUpdate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void testMetadata() throws Exception {
long time = 0;
metadata.update(Cluster.empty(), time);
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
metadata.forceUpdate();
metadata.requestUpdate();
assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0);
time += refreshBackoffMs;
assertTrue("Update needed now that backoff time expired", metadata.timeToNextUpdate(time) == 0);
Expand All @@ -40,7 +40,9 @@ public void testMetadata() throws Exception {
Thread t2 = asyncFetch(topic);
assertTrue("Awaiting update", t1.isAlive());
assertTrue("Awaiting update", t2.isAlive());
metadata.update(TestUtils.singletonCluster(topic, 1), time);
// keep updating the metadata until no need to
while (metadata.timeToNextUpdate(time) == 0)
metadata.update(TestUtils.singletonCluster(topic, 1), time);
t1.join();
t2.join();
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
Expand All @@ -51,7 +53,8 @@ public void testMetadata() throws Exception {
private Thread asyncFetch(final String topic) {
Thread thread = new Thread() {
public void run() {
metadata.fetch(topic, Integer.MAX_VALUE);
while (metadata.fetch().partitionsForTopic(topic) == null)
metadata.awaitUpdate(metadata.requestUpdate(), Long.MAX_VALUE);
}
};
thread.start();
Expand Down

0 comments on commit cd3ce27

Please sign in to comment.