-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Moved key serialization to a JSON serializer. #33
Changes from all commits
1cdef3f
340cefe
dd90982
59ed2b1
e0de4aa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
/* | ||
* Copyright 2014 Confluent Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package io.confluent.kafka.schemaregistry.storage; | ||
|
||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
|
||
public class ConfigKey extends SchemaRegistryKey { | ||
|
||
private static final int MAGIC_BYTE = 0; | ||
private String subject; | ||
|
||
public ConfigKey(@JsonProperty("subject") String subject) { | ||
super(SchemaRegistryKeyType.CONFIG); | ||
this.subject = subject; | ||
this.magicByte = MAGIC_BYTE; | ||
} | ||
|
||
@JsonProperty("subject") | ||
public String getSubject() { | ||
return this.subject; | ||
} | ||
|
||
@JsonProperty("subject") | ||
public void setSubject(String subject) { | ||
this.subject = subject; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
ConfigKey that = (ConfigKey) o; | ||
|
||
if (!super.equals(o)) { | ||
return false; | ||
} | ||
if (this.subject != null && that.subject != null) { | ||
if (!subject.equals(that.subject)) { | ||
return false; | ||
} | ||
} else if (this.subject == null && that.subject == null) { | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
int result = super.hashCode(); | ||
if (this.subject != null) { | ||
result = 31 * result + subject.hashCode(); | ||
} | ||
return result; | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
StringBuilder sb = new StringBuilder(); | ||
sb.append("{magic=" + this.magicByte + ","); | ||
sb.append("keytype=" + this.keyType.keyType + ","); | ||
sb.append("subject=" + this.subject + "}"); | ||
return sb.toString(); | ||
} | ||
|
||
@Override | ||
public int compareTo(SchemaRegistryKey o) { | ||
int compare = super.compareTo(o); | ||
if (compare == 0) { | ||
ConfigKey otherKey = (ConfigKey) o; | ||
if (this.subject == null && otherKey.subject == null) { | ||
return 0; | ||
} else { | ||
if (this.subject == null) { | ||
return -1; | ||
} | ||
if (otherKey.subject == null) { | ||
return 1; | ||
} | ||
return this.subject.compareTo(otherKey.subject); | ||
} | ||
} else { | ||
return compare; | ||
} | ||
} | ||
} | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,26 +37,28 @@ | |
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException; | ||
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException; | ||
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer; | ||
import io.confluent.kafka.schemaregistry.storage.serialization.StringSerializer; | ||
import io.confluent.kafka.schemaregistry.storage.serialization.ZkStringSerializer; | ||
import io.confluent.kafka.schemaregistry.utils.Pair; | ||
import io.confluent.kafka.schemaregistry.zookeeper.SchemaRegistryIdentity; | ||
import io.confluent.kafka.schemaregistry.zookeeper.ZookeeperMasterElector; | ||
|
||
public class KafkaSchemaRegistry implements SchemaRegistry { | ||
|
||
public static final char SCHEMA_KEY_SEPARATOR = ','; | ||
public static final int MIN_VERSION = 0; | ||
public static final int MAX_VERSION = Integer.MAX_VALUE; | ||
private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class); | ||
private final KafkaStore<String, Schema> kafkaStore; | ||
private final KafkaStore<SchemaRegistryKey, Schema> kafkaStore; | ||
private final Serializer<SchemaRegistryKey> keySerializer; | ||
private final Serializer<Schema> serializer; | ||
private final SchemaRegistryIdentity myIdentity; | ||
private final Object masterLock = new Object(); | ||
private final AvroCompatibilityType defaultCompatibilityType; | ||
private final ZkClient zkClient; | ||
private SchemaRegistryIdentity masterIdentity; | ||
private ZookeeperMasterElector masterElector = null; | ||
private final AvroCompatibilityType defaultCompatibilityType; | ||
|
||
public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<Schema> serializer) | ||
public KafkaSchemaRegistry(SchemaRegistryConfig config, | ||
Serializer<SchemaRegistryKey> keySerializer, | ||
Serializer<Schema> serializer) | ||
throws SchemaRegistryException { | ||
String host = config.getString(SchemaRegistryConfig.ADVERTISED_HOST_CONFIG); | ||
int port = config.getInt(SchemaRegistryConfig.PORT_CONFIG); | ||
|
@@ -68,11 +70,12 @@ public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<Schema> seria | |
config.getInt(SchemaRegistryConfig.KAFKASTORE_ZK_SESSION_TIMEOUT_MS_CONFIG); | ||
this.zkClient = new ZkClient(kafkaClusterZkUrl, zkSessionTimeoutMs, zkSessionTimeoutMs, | ||
new ZkStringSerializer()); | ||
|
||
this.keySerializer = keySerializer; | ||
this.serializer = serializer; | ||
kafkaStore = new KafkaStore<String, Schema>(config, new KafkaStoreMessageHandler(), | ||
StringSerializer.INSTANCE, this.serializer, | ||
new InMemoryStore<String, Schema>(), zkClient); | ||
kafkaStore = new KafkaStore<SchemaRegistryKey, Schema>(config, new KafkaStoreMessageHandler(), | ||
this.keySerializer, this.serializer, | ||
new InMemoryStore<SchemaRegistryKey, Schema>(), | ||
zkClient); | ||
this.defaultCompatibilityType = config.compatibilityType(); | ||
} | ||
|
||
|
@@ -132,7 +135,7 @@ public int register(String subject, Schema schema, RegisterSchemaForwardingAgent | |
|
||
Iterator<Schema> allVersions = getAllVersions(subject); | ||
Schema latestSchema = null; | ||
int latestUsedSchemaVersion = 0; | ||
int latestUsedSchemaVersion = MIN_VERSION; | ||
// see if the schema to be registered already exists | ||
while (allVersions.hasNext()) { | ||
Schema s = allVersions.next(); | ||
|
@@ -147,8 +150,7 @@ public int register(String subject, Schema schema, RegisterSchemaForwardingAgent | |
|
||
if (latestSchema == null || isCompatible(avroSchemaObj, latestSchema)) { | ||
int newVersion = latestUsedSchemaVersion + 1; | ||
String keyForNewVersion = | ||
String.format("%s%c%d", subject, SCHEMA_KEY_SEPARATOR, newVersion); | ||
SchemaKey keyForNewVersion = new SchemaKey(subject, newVersion); | ||
schema.setVersion(newVersion); | ||
kafkaStore.put(keyForNewVersion, schema); | ||
return newVersion; | ||
|
@@ -183,7 +185,7 @@ private org.apache.avro.Schema canonicalizeSchema(Schema schema) { | |
|
||
@Override | ||
public Schema get(String subject, int version) throws SchemaRegistryException { | ||
String key = subject + SCHEMA_KEY_SEPARATOR + version; | ||
SchemaKey key = new SchemaKey(subject, version); | ||
try { | ||
Schema schema = kafkaStore.get(key); | ||
return schema; | ||
|
@@ -197,31 +199,32 @@ public Schema get(String subject, int version) throws SchemaRegistryException { | |
@Override | ||
public Set<String> listSubjects() throws SchemaRegistryException { | ||
try { | ||
Iterator<String> allKeys = kafkaStore.getAllKeys(); | ||
Iterator<SchemaRegistryKey> allKeys = kafkaStore.getAllKeys(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's probably better to ignore keys that are not an instance of SchemaKey. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done inside extractUniqueSubjects |
||
return extractUniqueSubjects(allKeys); | ||
} catch (StoreException e) { | ||
throw new SchemaRegistryException( | ||
"Error from the backend Kafka store", e); | ||
} | ||
} | ||
|
||
private Set<String> extractUniqueSubjects(Iterator<String> allKeys) { | ||
private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method should probably take Iterator. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It already does take in an iterator. Did you mean something else? |
||
Set<String> subjects = new HashSet<String>(); | ||
while (allKeys.hasNext()) { | ||
String key = allKeys.next(); | ||
subjects.add(key.split(String.valueOf(SCHEMA_KEY_SEPARATOR))[0]); | ||
SchemaRegistryKey k = allKeys.next(); | ||
if (k instanceof SchemaKey) { | ||
SchemaKey key = (SchemaKey) k; | ||
subjects.add(key.getSubject()); | ||
} | ||
} | ||
return subjects; | ||
} | ||
|
||
@Override | ||
public Iterator<Schema> getAllVersions(String subject) throws SchemaRegistryException { | ||
try { | ||
Iterator<Schema> allVersions = kafkaStore.getAll(String.format("%s%c", subject, | ||
SCHEMA_KEY_SEPARATOR), | ||
String.format("%s%c%c", subject, | ||
SCHEMA_KEY_SEPARATOR, | ||
'9' + 1)); | ||
SchemaKey key1 = new SchemaKey(subject, MIN_VERSION); | ||
SchemaKey key2 = new SchemaKey(subject, MAX_VERSION); | ||
Iterator<Schema> allVersions = kafkaStore.getAll(key1, key2); | ||
return sortSchemasByVersion(allVersions); | ||
} catch (StoreException e) { | ||
throw new SchemaRegistryException( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to be able to compare all combinations of different types of SchemaRegistryKey. Instead of duplicating the code, perhaps we can write all comparators in a dispatcher in SchemaRegistryKey once and just call the dispatcher in the compareTo() implementation of each type.
Perhaps we need to do the same for equals().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That seems counter intuitive. Both classes extend from SchemaRegistryKey. If 2 keys of different sub classes are being compared, they will be sorted based on keyType anyway. There is no duplication. Each sub class only compares according to it's own class members.
Same for equals