Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include kafka client id consistently in all kafka protocol encoders #621

Merged
merged 10 commits into from
Dec 6, 2023
Next Next commit
Integrate inline catalog and json validator with mqtt binding
  • Loading branch information
jfallows committed Sep 26, 2023
commit 4e322a515c169cb5115c456d50506dd074e93261
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private <C> CatalogedConfigBuilder<C> injectJsonSchemas(
{
cataloged
.schema()
.schema(schema)
.subject(schema)
.build()
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bindings:
type: json
catalog:
catalog0:
- schema: items
- subject: items
routes:
- exit: mqtt_client0
when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ bindings:
type: json
catalog:
catalog0:
- schema: items
- schema: things
- subject: items
- subject: things
routes:
- exit: mqtt_client0
when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private boolean validate(
else if (catalog != null)
{
schemaId = handler.resolve(catalog.subject, catalog.version);
if (schemaId > 0)
if (schemaId != 0)
{
schema = handler.resolve(schemaId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,13 +945,15 @@ private int decodePublish(
reasonCode = PAYLOAD_FORMAT_INVALID;
server.onDecodeError(traceId, authorization, reasonCode);
server.decoder = decodeIgnoreAll;
break decode;
}

if (mqttPublishHeaderRO.payloadFormat.equals(MqttPayloadFormat.TEXT) && invalidUtf8(payload))
{
reasonCode = PAYLOAD_FORMAT_INVALID;
server.onDecodeError(traceId, authorization, reasonCode);
server.decoder = decodeIgnoreAll;
break decode;
}

boolean canPublish = MqttState.initialOpened(publisher.state);
Expand Down
Loading