Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved key serialization to a JSON serializer. #33

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,31 @@
import java.util.List;

public class AvroCompatibilityChecker {

// Check if the new schema can be used to read data produced by the latest schema
private static SchemaValidator BACKWARD_VALIDATOR =
new SchemaValidatorBuilder().canReadStrategy().validateLatest();

public static AvroCompatibilityChecker BACKWARD_CHECKER = new AvroCompatibilityChecker(
BACKWARD_VALIDATOR);
// Check if data produced by the new schema can be read by the latest schema
private static SchemaValidator FORWARD_VALIDATOR =
new SchemaValidatorBuilder().canBeReadStrategy().validateLatest();

public static AvroCompatibilityChecker FORWARD_CHECKER = new AvroCompatibilityChecker(
FORWARD_VALIDATOR);
// Check if the new schema is both forward and backward compatible with the latest schema
private static SchemaValidator FULL_VALIDATOR =
new SchemaValidatorBuilder().mutualReadStrategy().validateLatest();

public static AvroCompatibilityChecker FULL_CHECKER = new AvroCompatibilityChecker(
FULL_VALIDATOR);
private static SchemaValidator NO_OP_VALIDATOR = new SchemaValidator() {
@Override
public void validate(Schema schema, Iterable<Schema> schemas) throws SchemaValidationException {
// do nothing
}
};
private final SchemaValidator validator;

public static AvroCompatibilityChecker FORWARD_CHECKER = new AvroCompatibilityChecker(
FORWARD_VALIDATOR);
public static AvroCompatibilityChecker BACKWARD_CHECKER = new AvroCompatibilityChecker(
BACKWARD_VALIDATOR);
public static AvroCompatibilityChecker FULL_CHECKER = new AvroCompatibilityChecker(
FULL_VALIDATOR);
public static AvroCompatibilityChecker NO_OP_CHECKER = new AvroCompatibilityChecker(
NO_OP_VALIDATOR);
private final SchemaValidator validator;

private AvroCompatibilityChecker(SchemaValidator validator) {
this.validator = validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class AvroUtils {
* Convert a schema string into a schema object and a canonical schema string.
*
* @return A schema object and a canonical representation of the schema string. Return null if
* there is any parsing error.
* there is any parsing error.
*/
public static AvroSchema parseSchema(String schemaString) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public class SchemaRegistryConfig extends RestConfig {
* <code>advertised.host</code>
*/
public static final String ADVERTISED_HOST_CONFIG = "advertised.host";
/**
* <code>avro.compatibility.level</code>
*/
public static final String COMPATIBILITY_CONFIG = "avro.compatibility.level";
protected static final String KAFKASTORE_CONNECTION_URL_DOC =
"Zookeeper url for the Kafka cluster";
protected static final String KAFKASTORE_ZK_SESSION_TIMEOUT_MS_DOC =
Expand All @@ -67,11 +71,6 @@ public class SchemaRegistryConfig extends RestConfig {
protected static final String KAFKASTORE_COMMIT_INTERVAL_MS_DOC =
"The interval to commit offsets while consuming the Kafka topic";
protected static final String ADVERTISED_HOST_DOC = "The host name advertised in Zookeeper";

/**
* <code>avro.compatibility.level</code>
*/
public static final String COMPATIBILITY_CONFIG = "avro.compatibility.level";
protected static final String COMPATIBILITY_DOC =
"The avro compatibility type. Valid values are: "
+ "none (new schema can be any valid avro schema), "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaKeySerializer;
import io.confluent.kafka.schemaregistry.storage.serialization.SchemaSerializer;
import io.confluent.rest.Application;
import io.confluent.rest.RestConfigException;
Expand All @@ -47,7 +48,8 @@ public SchemaRegistryRestApplication(SchemaRegistryConfig config) {
@Override
public void setupResources(Configurable<?> config, SchemaRegistryConfig schemaRegistryConfig) {
try {
schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig, new SchemaSerializer());
schemaRegistry = new KafkaSchemaRegistry(schemaRegistryConfig, new SchemaKeySerializer(),
new SchemaSerializer());
schemaRegistry.init();
} catch (SchemaRegistryException e) {
log.error("Error starting the schema registry", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* to the compatibility level.
*/
public class IncompatibleAvroSchemaException extends WebApplicationException {

public static final Response.Status STATUS = Response.Status.CONFLICT;

public IncompatibleAvroSchemaException(String errorMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* An exception thrown when the registered schema is not a valid Avro schema.
*/
public class InvalidAvroException extends WebApplicationException {

public static final Response.Status STATUS = Response.Status.BAD_REQUEST;

public InvalidAvroException() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import javax.ws.rs.core.Response;

import io.confluent.kafka.schemaregistry.rest.Versions;
import io.confluent.kafka.schemaregistry.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2014 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.kafka.schemaregistry.storage;

import com.fasterxml.jackson.annotation.JsonProperty;

public class ConfigKey extends SchemaRegistryKey {

private static final int MAGIC_BYTE = 0;
private String subject;

public ConfigKey(@JsonProperty("subject") String subject) {
super(SchemaRegistryKeyType.CONFIG);
this.subject = subject;
this.magicByte = MAGIC_BYTE;
}

@JsonProperty("subject")
public String getSubject() {
return this.subject;
}

@JsonProperty("subject")
public void setSubject(String subject) {
this.subject = subject;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

ConfigKey that = (ConfigKey) o;

if (!super.equals(o)) {
return false;
}
if (this.subject != null && that.subject != null) {
if (!subject.equals(that.subject)) {
return false;
}
} else if (this.subject == null && that.subject == null) {
return true;
} else {
return false;
}
return true;
}

@Override
public int hashCode() {
int result = super.hashCode();
if (this.subject != null) {
result = 31 * result + subject.hashCode();
}
return result;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{magic=" + this.magicByte + ",");
sb.append("keytype=" + this.keyType.keyType + ",");
sb.append("subject=" + this.subject + "}");
return sb.toString();
}

@Override
public int compareTo(SchemaRegistryKey o) {
int compare = super.compareTo(o);
if (compare == 0) {
ConfigKey otherKey = (ConfigKey) o;
if (this.subject == null && otherKey.subject == null) {
return 0;
} else {
if (this.subject == null) {
return -1;
}
if (otherKey.subject == null) {
return 1;
}
return this.subject.compareTo(otherKey.subject);
}
} else {
return compare;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be able to compare all combinations of different types of SchemaRegistryKey. Instead of duplicating the code, perhaps we can write all comparators in a dispatcher in SchemaRegistryKey once and just call the dispatcher in the compareTo() implementation of each type.

Perhaps we need to do the same for equals().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems counter intuitive. Both classes extend from SchemaRegistryKey. If 2 keys of different sub classes are being compared, they will be sorted based on keyType anyway. There is no duplication. Each sub class only compares according to it's own class members.

Same for equals

Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,28 @@
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer;
import io.confluent.kafka.schemaregistry.storage.serialization.ZkStringSerializer;
import io.confluent.kafka.schemaregistry.utils.Pair;
import io.confluent.kafka.schemaregistry.zookeeper.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector;

public class KafkaSchemaRegistry implements SchemaRegistry {

public static final char SCHEMA_KEY_SEPARATOR = ',';
public static final int MIN_VERSION = 0;
public static final int MAX_VERSION = Integer.MAX_VALUE;
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
private final KafkaStore<String, Schema> kafkaStore;
private final KafkaStore<SchemaRegistryKey, Schema> kafkaStore;
private final Serializer<SchemaRegistryKey> keySerializer;
private final Serializer<Schema> serializer;
private final SchemaRegistryIdentity myIdentity;
private final Object masterLock = new Object();
private final AvroCompatibilityType defaultCompatibilityType;
private final ZkClient zkClient;
private SchemaRegistryIdentity masterIdentity;
private ZookeeperMasterElector masterElector = null;
private final AvroCompatibilityType defaultCompatibilityType;

public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<Schema> serializer)
public KafkaSchemaRegistry(SchemaRegistryConfig config,
Serializer<SchemaRegistryKey> keySerializer,
Serializer<Schema> serializer)
throws SchemaRegistryException {
String host = config.getString(SchemaRegistryConfig.ADVERTISED_HOST_CONFIG);
int port = config.getInt(SchemaRegistryConfig.PORT_CONFIG);
Expand All @@ -68,11 +70,12 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<Schema> seria
config.getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG);
this.zkClient = new ZkClient(kafkaClusterZkUrl, zkSessionTimeoutMs, zkSessionTimeoutMs,
new ZkStringSerializer());

this.keySerializer = keySerializer;
this.serializer = serializer;
kafkaStore = new KafkaStore<String, Schema>(config, new KafkaStoreMessageHandler(),
StringSerializer.INSTANCE, this.serializer,
new InMemoryStore<String, Schema>(), zkClient);
kafkaStore = new KafkaStore<SchemaRegistryKey, Schema>(config, new KafkaStoreMessageHandler(),
this.keySerializer, this.serializer,
new InMemoryStore<SchemaRegistryKey, Schema>(),
zkClient);
this.defaultCompatibilityType = config.compatibilityType();
}

Expand Down Expand Up @@ -132,7 +135,7 @@ public int register(String subject, Schema schema, RegisterSchemaForwardingAgent

Iterator<Schema> allVersions = getAllVersions(subject);
Schema latestSchema = null;
int latestUsedSchemaVersion = 0;
int latestUsedSchemaVersion = MIN_VERSION;
// see if the schema to be registered already exists
while (allVersions.hasNext()) {
Schema s = allVersions.next();
Expand All @@ -147,8 +150,7 @@ public int register(String subject, Schema schema, RegisterSchemaForwardingAgent

if (latestSchema == null || isCompatible(avroSchemaObj, latestSchema)) {
int newVersion = latestUsedSchemaVersion + 1;
String keyForNewVersion =
String.format("%s%c%d", subject, SCHEMA_KEY_SEPARATOR, newVersion);
SchemaKey keyForNewVersion = new SchemaKey(subject, newVersion);
schema.setVersion(newVersion);
kafkaStore.put(keyForNewVersion, schema);
return newVersion;
Expand Down Expand Up @@ -183,7 +185,7 @@ private org.apache.avro.Schema canonicalizeSchema(Schema schema) {

@Override
public Schema get(String subject, int version) throws SchemaRegistryException {
String key = subject + SCHEMA_KEY_SEPARATOR + version;
SchemaKey key = new SchemaKey(subject, version);
try {
Schema schema = kafkaStore.get(key);
return schema;
Expand All @@ -197,31 +199,32 @@ public Schema get(String subject, int version) throws SchemaRegistryException {
@Override
public Set<String> listSubjects() throws SchemaRegistryException {
try {
Iterator<String> allKeys = kafkaStore.getAllKeys();
Iterator<SchemaRegistryKey> allKeys = kafkaStore.getAllKeys();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably better to ignore keys that are not an instance of SchemaKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done inside extractUniqueSubjects

return extractUniqueSubjects(allKeys);
} catch (StoreException e) {
throw new SchemaRegistryException(
"Error from the backend Kafka store", e);
}
}

private Set<String> extractUniqueSubjects(Iterator<String> allKeys) {
private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should probably take Iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already does take in an iterator. Did you mean something else?

Set<String> subjects = new HashSet<String>();
while (allKeys.hasNext()) {
String key = allKeys.next();
subjects.add(key.split(String.valueOf(SCHEMA_KEY_SEPARATOR))[0]);
SchemaRegistryKey k = allKeys.next();
if (k instanceof SchemaKey) {
SchemaKey key = (SchemaKey) k;
subjects.add(key.getSubject());
}
}
return subjects;
}

@Override
public Iterator<Schema> getAllVersions(String subject) throws SchemaRegistryException {
try {
Iterator<Schema> allVersions = kafkaStore.getAll(String.format("%s%c", subject,
SCHEMA_KEY_SEPARATOR),
String.format("%s%c%c", subject,
SCHEMA_KEY_SEPARATOR,
'9' + 1));
SchemaKey key1 = new SchemaKey(subject, MIN_VERSION);
SchemaKey key2 = new SchemaKey(subject, MAX_VERSION);
Iterator<Schema> allVersions = kafkaStore.getAll(key1, key2);
return sortSchemasByVersion(allVersions);
} catch (StoreException e) {
throw new SchemaRegistryException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.confluent.kafka.schemaregistry.storage;

import java.util.Map;

import io.confluent.kafka.schemaregistry.rest.entities.Schema;

public class KafkaStoreMessageHandler implements StoreUpdateHandler<String, Schema> {
public class KafkaStoreMessageHandler implements StoreUpdateHandler<SchemaRegistryKey, Schema> {

public KafkaStoreMessageHandler() {
}
Expand All @@ -32,6 +30,6 @@ public KafkaStoreMessageHandler() {
* @param schema Schema written to the Kafka store
*/
@Override
public void handleUpdate(String key, Schema schema) {
public void handleUpdate(SchemaRegistryKey key, Schema schema) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,13 @@ public void doWork() {
localStore.put(messageKey, message);
}
this.storeUpdateHandler.handleUpdate(messageKey, message);
offsetUpdateLock.lock();
offsetInSchemasTopic = messageAndMetadata.offset();
offsetReachedThreshold.signalAll();
try {
offsetUpdateLock.lock();
offsetInSchemasTopic = messageAndMetadata.offset();
offsetReachedThreshold.signalAll();
} finally {
offsetUpdateLock.unlock();
}
} catch (StoreException se) {
/**
* TODO: maybe retry for a configurable amount before logging a failure?
Expand All @@ -158,8 +162,6 @@ public void doWork() {
* 2. Look into the issue manually
*/
log.error("Failed to add record from the Kafka topic" + topic + " the local store");
} finally {
offsetUpdateLock.unlock();
}
}
} catch (ConsumerTimeoutException cte) {
Expand Down
Loading