Skip to content

Commit

Permalink
KAFKA-2944: Replaced the NPE with a nicer error and clean exit and ad…
Browse files Browse the repository at this point in the history
…ded debug message to assist with figuring this out.

…ssage to assist with figuring this out.

Author: Gwen Shapira <cshapi@gmail.com>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes apache#993 from gwenshap/KAFKA-2944
  • Loading branch information
gwenshap authored and ewencp committed Mar 3, 2016
1 parent 9cd65c4 commit 00a58f8
Showing 1 changed file with 16 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void putConnectorConfig(String connector, Map<String, String> properties)
}

try {
log.debug("Writing connector configuration for connector " + connector + " configuration: " + properties);
configLog.send(CONNECTOR_KEY(connector), serializedConfig);
configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Expand Down Expand Up @@ -334,6 +335,7 @@ public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
connectConfig.put("properties", taskConfigEntry.getValue());
byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
log.debug("Writing configuration for task " + taskConfigEntry.getKey() + " configuration: " + taskConfigEntry.getValue());
configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
}

Expand All @@ -348,6 +350,7 @@ public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
connectConfig.put("tasks", taskCountEntry.getValue());
byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
log.debug("Writing commit for connector " + taskCountEntry.getKey() + " with " + taskCountEntry.getValue() + " tasks.");
configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
}

Expand Down Expand Up @@ -396,6 +399,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
synchronized (lock) {
if (value.value() == null) {
// Connector deletion will be written as a null value
log.info("Removed connector " + connectorName + " due to null configuration. This is usually intentional and does not indicate an issue.");
connectorConfigs.remove(connectorName);
} else {
// Connector configs can be applied and callbacks invoked immediately
Expand All @@ -405,9 +409,10 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
}
Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newConnectorConfig instanceof Map)) {
log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
log.error("Invalid data for connector config (" + connectorName + "): properties filed should be a Map but is " + newConnectorConfig.getClass());
return;
}
log.debug("Updating configuration for connector " + connectorName + " configuation: " + newConnectorConfig);
connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
}
}
Expand All @@ -421,13 +426,13 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
return;
}
if (!(value.value() instanceof Map)) {
log.error("Ignoring task configuration because it is in the wrong format: " + value.value());
log.error("Ignoring task configuration for task " + taskId + " because it is in the wrong format: " + value.value());
return;
}

Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
if (!(newTaskConfig instanceof Map)) {
log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass());
log.error("Invalid data for task config (" + taskId + "): properties filed should be a Map but is " + newTaskConfig.getClass());
return;
}

Expand All @@ -436,6 +441,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
deferred = new HashMap<>();
deferredTaskUpdates.put(taskId.connector(), deferred);
}
log.debug("Storing new config for task " + taskId + " this will wait for a commit message before the new config will take effect. New config: " + newTaskConfig);
deferred.put(taskId, (Map<String, String>) newTaskConfig);
}
} else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
Expand Down Expand Up @@ -464,7 +470,7 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
// resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
// exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value());
log.error("Ignoring connector tasks configuration commit for connector " + connectorName + " because it is in the wrong format: " + value.value());
return;
}

Expand All @@ -476,11 +482,17 @@ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record)
// update of all tasks that are expected based on the number of tasks in the commit message.
Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
if (taskIdSet == null) {
//TODO: Figure out why this happens (KAFKA-3321)
log.error("Received a commit message for connector " + connectorName + " but there is no matching configuration for tasks in this connector. This should never happen.");
return;
}
if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
// Given the logic for writing commit messages, we should only hit this condition due to compacted
// historical data, in which case we would not have applied any updates yet and there will be no
// task config data already committed for the connector, so we shouldn't have to clear any data
// out. All we need to do is add the flag marking it inconsistent.
log.debug("We have an incomplete set of task configs for connector " + connectorName + " probably due to compaction. So we are not doing anything with the new configuration.");
inconsistent.add(connectorName);
} else {
if (deferred != null) {
Expand Down

0 comments on commit 00a58f8

Please sign in to comment.