Skip to content

Commit

Permalink
Include kafka client id consistently in all kafka protocol encoders (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows committed Dec 6, 2023
1 parent 73b2cd7 commit e2a9897
Show file tree
Hide file tree
Showing 215 changed files with 1,197 additions and 1,197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1307,7 +1307,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2606,7 +2606,7 @@ private void doEncodeOffsetsRequest(
.apiKey(OFFSETS_API_KEY)
.apiVersion(OFFSETS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2645,7 +2645,7 @@ private void doEncodeOffsetsRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ private void doEncodeFetchRequest(
.apiKey(FETCH_API_KEY)
.apiVersion(FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2712,7 +2712,7 @@ private void doEncodeFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public final class KafkaClientGroupFactory extends KafkaClientSaslHandshaker imp
private final Long2ObjectHashMap<GroupMembership> instanceIds;
private final Object2ObjectHashMap<String, KafkaGroupStream> groupStreams;
private final Map<String, String> configs;
private final String clientId;
private final Duration rebalanceTimeout;
private final String groupMinSessionTimeoutDefault;
private final String groupMaxSessionTimeoutDefault;
Expand All @@ -291,7 +290,6 @@ public KafkaClientGroupFactory(
{
super(config, context);
this.rebalanceTimeout = config.clientGroupRebalanceTimeout();
this.clientId = config.clientId();
this.supplyInstanceId = config.clientInstanceIdSupplier();
this.kafkaTypeId = context.supplyTypeId(KafkaBinding.NAME);
this.proxyTypeId = context.supplyTypeId("proxy");
Expand Down Expand Up @@ -2123,7 +2121,7 @@ private void doEncodeFindCoordinatorRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -2823,7 +2821,7 @@ private void doEncodeDescribeRequest(
.apiKey(DESCRIBE_CONFIGS_API_KEY)
.apiVersion(DESCRIBE_CONFIGS_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -2860,7 +2858,7 @@ private void doEncodeDescribeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3627,7 +3625,7 @@ private void doEncodeJoinGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3833,7 +3831,7 @@ private void doEncodeSyncGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress.get());
Expand Down Expand Up @@ -3881,7 +3879,7 @@ private void doEncodeHeartbeatRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down Expand Up @@ -3934,7 +3932,7 @@ private void doEncodeLeaveGroupRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ private void doEncodeMetaRequest(
.apiKey(METADATA_API_KEY)
.apiVersion(METADATA_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -1501,7 +1501,7 @@ private void doEncodeMetaRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1319,7 +1319,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(OFFSET_COMMIT_API_KEY)
.apiVersion(OFFSET_COMMIT_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1363,7 +1363,7 @@ private void doEncodeOffsetCommitRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1392,7 +1392,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(OFFSET_FETCH_API_KEY)
.apiVersion(OFFSET_FETCH_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1430,7 +1430,7 @@ private void doEncodeOffsetFetchRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
private static final byte RECORD_ATTRIBUTES_NONE = 0;

private static final String TRANSACTION_ID_NONE = null;
private static final String CLIENT_ID_NONE = null;

private static final int TIMESTAMP_NONE = 0;

Expand Down Expand Up @@ -1834,7 +1833,7 @@ private void doEncodeProduceRequest(
.apiKey(PRODUCE_API_KEY)
.apiVersion(PRODUCE_API_VERSION)
.correlationId(0)
.clientId(CLIENT_ID_NONE)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -1902,7 +1901,7 @@ private void doEncodeProduceRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.aklivity.zilla.runtime.binding.kafka.config.KafkaSaslConfig;
import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaConfiguration;
import io.aklivity.zilla.runtime.binding.kafka.internal.config.KafkaScramMechanism;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.RequestHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.ResponseHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.codec.sasl.SaslAuthenticateRequestFW;
Expand Down Expand Up @@ -94,6 +95,7 @@ public abstract class KafkaClientSaslHandshaker
private Matcher serverResponseMatcher;
private byte[] result, ui, prev;

protected final String16FW clientId;
protected final LongUnaryOperator supplyInitialId;
protected final LongUnaryOperator supplyReplyId;
protected final MutableDirectBuffer writeBuffer;
Expand All @@ -102,6 +104,7 @@ public KafkaClientSaslHandshaker(
KafkaConfiguration config,
EngineContext context)
{
this.clientId = new String16FW(config.clientId());
this.supplyInitialId = context::supplyInitialId;
this.supplyReplyId = context::supplyReplyId;
this.writeBuffer = new UnsafeBuffer(new byte[context.writeBuffer().capacity()]);
Expand Down Expand Up @@ -157,7 +160,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(SASL_HANDSHAKE_API_KEY)
.apiVersion(SASL_HANDSHAKE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -177,7 +180,7 @@ protected final void doEncodeSaslHandshakeRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -212,7 +215,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -242,7 +245,7 @@ private void doEncodeSaslPlainAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -270,7 +273,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand Down Expand Up @@ -304,7 +307,7 @@ private void doEncodeSaslScramFirstAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

if (KafkaConfiguration.DEBUG)
Expand Down Expand Up @@ -359,7 +362,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(SASL_AUTHENTICATE_API_KEY)
.apiVersion(SASL_AUTHENTICATE_API_VERSION)
.correlationId(0)
.clientId((String) null)
.clientId(clientId)
.build();

encodeProgress = requestHeader.limit();
Expand All @@ -379,7 +382,7 @@ private void doEncodeSaslScramFinalAuthenticateRequest(
.apiKey(requestHeader.apiKey())
.apiVersion(requestHeader.apiVersion())
.correlationId(requestId)
.clientId(requestHeader.clientId().asString())
.clientId(requestHeader.clientId())
.build();

doNetworkData(traceId, budgetId, encodeBuffer, encodeOffset, encodeProgress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ connect "zilla://streams/net0"

connected

write 17 # size
write 22 # size
17s # sasl.handshake
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

read 17 # size
Expand All @@ -41,11 +41,11 @@ read 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

write 32 # size
write 37 # size
36s # sasl.authenticate
1s # v1
${newRequestId}
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -57,11 +57,11 @@ read 20 # size
-1s # authentication bytes
0L # session lifetime

write 41 # size
write 46 # size
32s # describe configs
0s # v0
${newRequestId}
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ accepted

connected

read 17 # size
read 22 # size
17s # sasl.handshake
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
5s "PLAIN" # mechanism

write 17 # size
Expand All @@ -38,11 +38,11 @@ write 17 # size
1 # mechanisms
5s "PLAIN" # PLAIN

read 32 # size
read 37 # size
36s # sasl.authenticate
1s # v1
(int:requestId)
-1s # no client id
5s "zilla" # client id
18
[0x00] "username" # authentication bytes
[0x00] "password"
Expand All @@ -54,11 +54,11 @@ write 20 # size
-1s # authentication bytes
0L # session lifetime

read 41 # size
read 46 # size
32s # describe configs
0s # v0
(int:requestId)
-1s # no client id
5s "zilla" # client id
1 # resources
[0x02] # topic resource
4s "test" # "test" topic
Expand Down
Loading

0 comments on commit e2a9897

Please sign in to comment.