Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support obtaining protobuf schemas from schema registry for grpc services #757

Merged
merged 92 commits into from
Feb 2, 2024

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented Jan 23, 2024

Description

Support obtaining protobuf schemas from schema registry for grpc services

Fixes #697

Akram Yakubov and others added 30 commits October 25, 2023 13:35
…artial data frame while computing crc32c value
@@ -84,7 +86,7 @@ public ModelConfig adaptFromJson(
for (String catalogName: catalogsJson.keySet())
{
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName);
List<SchemaConfig> schemas = new LinkedList<>();
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left side should just be Set<SchemaConfig> or Collection<SchemaConfig>.

Comment on lines 176 to 183
ObjectHashSet<CatalogedConfig>.ObjectIterator catalogIterator = catalogs.iterator();

while (catalogIterator.hasNext())
{
for (SchemaConfig catalogSchema : catalog.schemas)
CatalogedConfig catalog = catalogIterator.nextValue();
ObjectHashSet<SchemaConfig>.ObjectIterator schemaIterator = catalog.schemas.iterator();

while (schemaIterator.hasNext())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're using ObjectHashSet with reusable iterator, then for ( ... : ...) syntax is fine, it will pick up the iterator implicitly.

@@ -102,6 +104,6 @@ private GrpcProtobufConfig asProtobuf(
final String location = ((JsonString) value).getString();
final String protoService = readURL.apply(location);

return ProtobufParser.protobufConfig(location, protoService);
return parser.parse(location, protoService);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return parser.parse(location, protoService);
return parser.parse(location, protobuf);

@@ -40,8 +42,8 @@ public class BindingConfig
public final String entry;
public final String vault;
public final OptionsConfig options;
public final ObjectHashSet<CatalogedConfig> catalogs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to avoid using implementation details in fields, let's use Set<CatalogedConfig> or Collection<CatalogedConfig> instead.

@@ -117,15 +119,27 @@ public BindingConfigBuilder<T> options(
return this;
}

public CatalogRefConfigBuilder<BindingConfigBuilder<T>> catalog()
public BindingConfigBuilder<T> catalogs(
ObjectHashSet<CatalogedConfig> catalogs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here


public CatalogedConfig(
String name,
List<SchemaConfig> schemas)
ObjectHashSet<SchemaConfig> schemas)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

public final class CatalogedConfigBuilder<T> extends ConfigBuilder<T, CatalogedConfigBuilder<T>>
{
private final Function<CatalogedConfig, T> mapper;

private String name;
private List<SchemaConfig> schemas;
private ObjectHashSet<SchemaConfig> schemas;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here

}


@Override
public CatalogRefConfig adaptFromJson(
public ObjectHashSet<CatalogedConfig> adaptFromJson(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here and twice in the method body too

{
for (CatalogedConfig cataloged : binding.catalogs)
{
Pattern pattern = Pattern.compile(cataloged.name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use of Pattern doesn't seem to be able to fail, so not sure why we have it.

@@ -81,7 +81,7 @@ protected AvroModelHandler(
this.encoder = encoderFactory.binaryEncoder(EMPTY_OUTPUT_STREAM, null);
CatalogedConfig cataloged = config.cataloged.get(0);
this.handler = supplyCatalog.apply(cataloged.id);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this if reverting to List, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I have resolved this but not sure why it is not updated locally I don't have this change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it out I was debugging issue reported by barnabas and accidentally made changes into that branch

@@ -60,7 +60,7 @@ public JsonModelHandler(
this.service = JsonValidationService.newInstance();
this.factory = schemaProvider.createParserFactory(null);
CatalogedConfig cataloged = config.cataloged.get(0);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this if reverting to List, right?

Comment on lines 83 to 93
List<SchemaConfig> schemas = new ArrayList<>(converter.cataloged.get(0).schemas);
assertThat(schemas.get(0).strategy, equalTo("topic"));
assertThat(schemas.get(0).version, equalTo("latest"));
assertThat(schemas.get(0).id, equalTo(0));
assertThat(schemas.get(1).subject, equalTo("cat"));
assertThat(schemas.get(1).strategy, nullValue());
assertThat(schemas.get(1).version, equalTo("latest"));
assertThat(schemas.get(1).id, equalTo(0));
assertThat(schemas.get(2).strategy, nullValue());
assertThat(schemas.get(2).version, nullValue());
assertThat(schemas.get(2).id, equalTo(42));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this if reverting to List, right?

@@ -86,7 +84,7 @@ public ModelConfig adaptFromJson(
for (String catalogName: catalogsJson.keySet())
{
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName);
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>();
List<SchemaConfig> schemas = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using List, lets keep exposing it as a List in the config object instead of Collection.

@@ -76,7 +76,7 @@ protected ProtobufModelHandler(
{
CatalogedConfig cataloged = config.cataloged.get(0);
this.handler = supplyCatalog.apply(cataloged.id);
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.get(0) : null;
this.catalog = cataloged.schemas.size() != 0 ? cataloged.schemas.stream().findFirst().get() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this if reverting to List, right?

{
CatalogedConfig catalog = catalogIterator.nextValue();
ObjectHashSet<SchemaConfig>.ObjectIterator schemaIterator = catalog.schemas.iterator();
CatalogedConfig catalog = catalogs.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CatalogedConfig catalog = catalogs.get(0);
CatalogedConfig catalog = catalogs.get(i);

@@ -77,7 +76,7 @@ public final class GrpcBindingConfig
private final HttpGrpcHeaderHelper helper;
private final Long2ObjectHashMap<CatalogHandler> handlersById;
private final Int2ObjectHashMap<GrpcProtobufConfig> protobufsBySchemaId;
private final ObjectHashSet<CatalogedConfig> catalogs;
private final List<CatalogedConfig> catalogs;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need both List<CatalogedConfig> catalogs and Long2ObjectHashMap<CatalogHandler> handlersById.

Seems like we need a data structure that has:

final CatalogHandler handler;
final String subject;
final String version;
int schemaId;
GrpcProtobufConfig protobuf;

and a list of these, flattening them for each catalog + schema combination, using same catalog handler for schemas on each catalog.

Then in resolveProtobufs, we just need to iterate this list, checking to see if schemaId changes compared to most recent one, and update it and newly parsed protobuf if necessary.

Then resolveMethod can stream over this list instead, mapping to GrpcProtobufConfig, before doing the rest.

@@ -76,7 +74,7 @@ public TestModelConfig adaptFromJson(
for (String catalogName: catalogsJson.keySet())
{
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName);
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>();
List<SchemaConfig> schemas = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use ArrayList as it is faster to walk via .get(int).

JsonObject catalogsJson)
{
ObjectHashSet<CatalogedConfig> catalogs = new ObjectHashSet<>();
List<CatalogedConfig> catalogs = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use ArrayList as it is faster to walk via .get(int).

for (String catalogName: catalogsJson.keySet())
{
JsonArray schemasJson = catalogsJson.getJsonArray(catalogName);
ObjectHashSet<SchemaConfig> schemas = new ObjectHashSet<>();
List<SchemaConfig> schemas = new LinkedList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use ArrayList as it is faster to walk via .get(int).

catalog.resolveProtobuf();
}

final Stream<GrpcProtobufConfig> objectStream = catalogs.stream().map(c -> c.protobuf);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. My idea is to reuse resolveMethod.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use catalogs in resolveMethod then it can be

        return catalogs.stream().
            .map(c -> c.protobuf)
            .map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
            .filter(Objects::nonNull)
            .map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
            .filter(Objects::nonNull)
            .findFirst()
            .orElse(null);

catalog.resolveProtobuf();
}

final Stream<GrpcProtobufConfig> objectStream = catalogs.stream().map(c -> c.protobuf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use catalogs in resolveMethod then it can be

        return catalogs.stream().
            .map(c -> c.protobuf)
            .map(p -> p.services.stream().filter(s -> s.service.equals(serviceName)).findFirst().orElse(null))
            .filter(Objects::nonNull)
            .map(s -> s.methods.stream().filter(m -> m.method.equals(methodName)).findFirst().orElse(null))
            .filter(Objects::nonNull)
            .findFirst()
            .orElse(null);


public final List<GrpcRouteConfig> routes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to nitpick, can we move this after options please?

Comment on lines 89 to 99
this.catalogs = new ObjectHashSet<>();

for (CatalogedConfig catalog : binding.catalogs)
{
CatalogHandler handler = supplyCatalog.apply(catalog.id);
for (SchemaConfig schema : catalog.schemas)
{
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version));
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.catalogs = new ObjectHashSet<>();
for (CatalogedConfig catalog : binding.catalogs)
{
CatalogHandler handler = supplyCatalog.apply(catalog.id);
for (SchemaConfig schema : catalog.schemas)
{
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version));
}
}
}
Set<GrpcCatalogSchema> catalogs = new ObjectHashSet<>();
for (CatalogedConfig catalog : binding.catalogs)
{
CatalogHandler handler = supplyCatalog.apply(catalog.id);
for (SchemaConfig schema : catalog.schemas)
{
catalogs.add(new GrpcCatalogSchema(handler, schema.subject, schema.version));
}
}
this.catalogs = catalogs;
}

Comment on lines 130 to 133
for (GrpcCatalogSchema catalog : catalogs)
{
catalog.resolveProtobuf();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets change resolveProtobuf() to return GrpcProtobufConfig, then in the resolveMethod(catalogs) method, use

  .map(GrpcCatalogSchema::resolveProtobuf)

instead of

  .map(c -> c.protobuf)

So no need for this loop then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's what I was thinking thanks good suggestion

jfallows
jfallows previously approved these changes Feb 2, 2024
jfallows
jfallows previously approved these changes Feb 2, 2024
@jfallows jfallows merged commit 0d511a1 into aklivity:develop Feb 2, 2024
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support obtaining protobuf schemas from schema registry for grpc services
2 participants