Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support obtaining protobuf schemas from schema registry for grpc services #757

Merged
merged 92 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
92 commits
Select commit Hold shift + click to select a range
2cef977
Adjust padding to accommodate good enough headers and don't include …
Oct 25, 2023
d201582
Merge branch 'develop' into feature/consumer-group-cont
Oct 25, 2023
76bf9de
Merge branch 'feature/consumer-group-cont' into develop
Oct 26, 2023
29ae79c
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
ec1b39e
Merge branch 'aklivity:develop' into develop
akrambek Oct 30, 2023
51a9f0e
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
4394783
Merge branch 'aklivity:develop' into develop
akrambek Oct 31, 2023
e8696ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
51c37b1
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
5da5f04
Merge branch 'aklivity:develop' into develop
akrambek Nov 2, 2023
db1e17c
Merge branch 'aklivity:develop' into develop
akrambek Nov 4, 2023
40f73dc
Merge branch 'aklivity:develop' into develop
akrambek Nov 6, 2023
d1a0492
Merge branch 'aklivity:develop' into develop
akrambek Nov 23, 2023
45799ce
Merge branch 'aklivity:develop' into develop
akrambek Nov 29, 2023
1e55162
Merge branch 'aklivity:develop' into develop
akrambek Nov 30, 2023
fedc41f
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
18a8d74
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
f160aad
Merge branch 'aklivity:develop' into develop
akrambek Dec 4, 2023
e0e7d5a
Merge branch 'aklivity:develop' into develop
akrambek Dec 6, 2023
9f4a8a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
456f111
Merge branch 'aklivity:develop' into develop
akrambek Dec 8, 2023
0d27262
Merge branch 'aklivity:develop' into develop
akrambek Dec 9, 2023
9fe7a91
Merge branch 'aklivity:develop' into develop
akrambek Dec 11, 2023
7e3d237
Merge branch 'aklivity:develop' into develop
akrambek Dec 12, 2023
33c4411
Merge branch 'aklivity:develop' into develop
akrambek Dec 13, 2023
fe9e318
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
d8b5e5c
Merge branch 'aklivity:develop' into develop
akrambek Dec 14, 2023
ebca7ef
Merge branch 'aklivity:develop' into develop
akrambek Dec 18, 2023
5e3e059
Merge branch 'aklivity:develop' into develop
akrambek Dec 22, 2023
ee71db9
Merge branch 'aklivity:develop' into develop
akrambek Dec 24, 2023
0b7a15a
Merge branch 'aklivity:develop' into develop
akrambek Dec 25, 2023
be13489
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
95df84c
Merge branch 'aklivity:develop' into develop
akrambek Dec 26, 2023
3ebdbf5
Merge branch 'aklivity:develop' into develop
akrambek Dec 28, 2023
24ad9e1
Merge branch 'aklivity:develop' into develop
akrambek Dec 30, 2023
6d21fec
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
368a0a6
Merge branch 'aklivity:develop' into develop
akrambek Dec 31, 2023
7069f1a
Merge branch 'aklivity:develop' into develop
akrambek Jan 2, 2024
09b7041
Merge branch 'aklivity:develop' into develop
akrambek Jan 3, 2024
98f1faa
Merge branch 'aklivity:develop' into develop
akrambek Jan 4, 2024
371391a
Merge branch 'aklivity:develop' into develop
akrambek Jan 5, 2024
c6a0882
Merge branch 'aklivity:develop' into develop
akrambek Jan 8, 2024
f99f009
Merge branch 'aklivity:develop' into develop
akrambek Jan 9, 2024
a110b68
Merge branch 'aklivity:develop' into develop
akrambek Jan 11, 2024
80c4625
Merge branch 'aklivity:develop' into develop
akrambek Jan 16, 2024
6617e20
Merge branch 'aklivity:develop' into develop
akrambek Jan 19, 2024
dea9f53
Merge branch 'aklivity:develop' into develop
akrambek Jan 20, 2024
b74db57
Merge branch 'aklivity:develop' into develop
akrambek Jan 23, 2024
7af00ab
WIP
Jan 23, 2024
59113ca
WIP
Jan 24, 2024
8e0f2c5
WIP
Jan 24, 2024
a6309d6
WIP
Jan 25, 2024
2ca7d2b
WIP
Jan 25, 2024
1778cdc
WIP
Jan 26, 2024
40f4916
WIP
Jan 29, 2024
eaf19f8
WIP
Jan 30, 2024
4617b54
Merge branch 'aklivity:develop' into develop
akrambek Jan 30, 2024
448f4a5
Support of catalog in grpc
Jan 31, 2024
4d2dd7b
WIP
Jan 31, 2024
a96c0f7
Added test
Jan 31, 2024
b3b421d
Merge branch 'aklivity:develop' into develop
akrambek Jan 31, 2024
9808063
Fix NPE
Jan 31, 2024
0981d06
Fix checkstyle
Jan 31, 2024
aad450c
Apply feedback from PR
Jan 31, 2024
84c2d43
Apply feedback
Feb 1, 2024
ad978c5
WIP
Feb 1, 2024
d6ae81b
Refactor binding binding catalog config
Feb 1, 2024
73d64b1
Merge branch 'aklivity:develop' into develop
akrambek Feb 1, 2024
81ea8f3
Merge branch 'develop' into story/697
Feb 1, 2024
8aeb377
Fix catch up merge
Feb 1, 2024
d9df80c
Revert "Fix catch up merge"
Feb 1, 2024
26e9f6c
Fix compilation error
Feb 1, 2024
5da65fe
Fix remaining issues after catch up merge
Feb 1, 2024
b5da3ca
Apply feedback from PR
Feb 1, 2024
7038697
Fix potential NPE
Feb 1, 2024
f9ddcc6
Add line separator
Feb 1, 2024
d95b3ce
Apply feedback from PR
Feb 1, 2024
08dcb73
Apply feedback from PR
Feb 1, 2024
6a39338
Fix checkstyle
Feb 1, 2024
7bcb511
Fix checkstyle
Feb 1, 2024
cdad780
Fix checkstyle
Feb 1, 2024
5860171
Apply feedback from PR
Feb 2, 2024
a71577e
Apply remaining changes
Feb 2, 2024
64022a5
Apply feedback from PR
Feb 2, 2024
1edd300
Fix typo
Feb 2, 2024
6fbe66c
Apply feedback from PR
Feb 2, 2024
9f19eb2
Revert back the change
Feb 2, 2024
c26d704
Revert back the change
Feb 2, 2024
70f82f0
Revert back the change
Feb 2, 2024
7cfb3fc
Remove extra line
Feb 2, 2024
9a4aaa7
Remove extra line
Feb 2, 2024
f825538
Ignore test due to sporadic github action failure
Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Adjust padding to accommodate good enough headers and don't include p…
…artial data frame while computing crc32c value
  • Loading branch information
Akram Yakubov committed Oct 25, 2023
commit 2cef977324c9b78508de72b0eb49c6eff88346c0
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public final class KafkaClientProduceFactory extends KafkaClientSaslHandshaker i
{
private static final int PRODUCE_REQUEST_RECORDS_OFFSET_MAX = 512;

private static final int KAFKA_RECORD_FRAMING = 100; // TODO
private static final int KAFKA_RECORD_FRAMING = 512; // TODO

private static final int FLAGS_CON = 0x00;
private static final int FLAGS_FIN = 0x01;
Expand Down Expand Up @@ -539,6 +539,7 @@ private int flushRecordInit(
final int valueSize = payload != null ? payload.sizeof() : 0;
client.valueCompleteSize = valueSize + client.encodeableRecordBytesDeferred;


final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + KAFKA_RECORD_FRAMING;
if (client.encodeSlot != NO_SLOT &&
maxEncodeableBytes > encodePool.slotCapacity())
Expand Down Expand Up @@ -1191,6 +1192,7 @@ private final class KafkaProduceClient extends KafkaSaslClient
private int encodeableRecordCount;
private int encodeableRecordBytes;
private int encodeableRecordBytesDeferred;
private int encodeableRecordValueBytes;
private int flushableRequestBytes;

private int decodeSlot = NO_SLOT;
Expand Down Expand Up @@ -1652,6 +1654,7 @@ private void doEncodeRecordInit(

encodeSlotBuffer.putBytes(encodeSlotLimit, encodeBuffer, 0, encodeProgress);
encodeSlotLimit += encodeProgress;
encodeableRecordValueBytes = 0;

if (headersCount > 0)
{
Expand Down Expand Up @@ -1689,6 +1692,7 @@ private void doEncodeRecordCont(

encodeSlotBuffer.putBytes(encodeSlotLimit, value.buffer(), value.offset(), length);
encodeSlotLimit += length;
encodeableRecordValueBytes += length;

if ((flags & FLAGS_FIN) == 0)
{
Expand Down Expand Up @@ -1893,7 +1897,8 @@ private void doEncodeProduceRequest(

final ByteBuffer encodeSlotByteBuffer = encodePool.byteBuffer(encodeSlot);
final int encodeSlotBytePosition = encodeSlotByteBuffer.position();
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit);
final int partialValueSize = flushFlags != FLAGS_FIN ? encodeableRecordValueBytes : 0;
encodeSlotByteBuffer.limit(encodeSlotBytePosition + encodeSlotLimit - partialValueSize);
encodeSlotByteBuffer.position(encodeSlotBytePosition + encodeSlotOffset + crcLimit);

final CRC32C crc = crc32c;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ClientMergedIT
.countersBufferCapacity(8192)
.configure(ENGINE_BUFFER_SLOT_CAPACITY, 8192)
.configure(KAFKA_CLIENT_META_MAX_AGE_MILLIS, 1000)
.configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 116)
.configure(KAFKA_CLIENT_PRODUCE_MAX_BYTES, 528)
.configurationRoot("io/aklivity/zilla/specs/binding/kafka/config")
.external("net0")
.clean();
Expand Down Expand Up @@ -234,7 +234,7 @@ public void shouldProduceMergedMessageValues() throws Exception
@Configure(
name = "zilla.binding.kafka.client.produce.max.bytes",
value = "200000")
@ScriptProperty("padding ${512 + 100}")
@ScriptProperty("padding ${512 + 512}")
public void shouldProduceMergedMessageValue10k() throws Exception
{
k3po.finish();
Expand All @@ -248,7 +248,7 @@ public void shouldProduceMergedMessageValue10k() throws Exception
@Configure(
name = "zilla.binding.kafka.client.produce.max.bytes",
value = "200000")
@ScriptProperty("padding ${512 + 100}")
@ScriptProperty("padding ${512 + 512}")
public void shouldProduceMergedMessageValue100k() throws Exception
{
k3po.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx()
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(102400 - 8192 + 512 + 100)
.deferred(102400 - 8192 + 512 + 512)
.timestamp(newTimestamp)
.build()
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0"

accept ${serverAddress}
option zilla:window 8192
option zilla:padding 612
option zilla:padding 1024
option zilla:transmission "half-duplex"

accepted
Expand Down Expand Up @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx()
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(102400 - 8192 + 512 + 100)
.deferred(102400 - 8192 + 512 + 512)
.build()
.build()}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ read zilla:begin.ext ${kafka:beginEx()
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(10240 - 8192 + 512 + 100)
.deferred(10240 - 8192 + 512 + 512)
.timestamp(newTimestamp)
.build()
.build()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ property serverAddress "zilla://streams/app0"

accept ${serverAddress}
option zilla:window 8192
option zilla:padding 612
option zilla:padding 1024
option zilla:transmission "half-duplex"

accepted
Expand Down Expand Up @@ -71,7 +71,7 @@ write zilla:begin.ext ${kafka:beginEx()
read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
.produce()
.deferred(10240 - 8192 + 512 + 100)
.deferred(10240 - 8192 + 512 + 512)
.build()
.build()}
read zilla:data.ext ${kafka:matchDataEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ write zilla:data.ext ${kafka:dataEx()
.produce()
.build()
.build()}
write ${kafka:randomBytes(7580)}
write ${kafka:randomBytes(8192-(512+512))}
write flush

write zilla:data.ext ${kafka:dataEx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ read zilla:data.ext ${kafka:matchDataEx()
.produce()
.build()
.build()}
read [0..7580]
read [0..7168]

read zilla:data.ext ${kafka:matchDataEx()
.typeId(zilla:id("kafka"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ write zilla:begin.ext ${proxy:beginEx()

connected

write 7690
write 7278
0s
3s
${newRequestId}
Expand All @@ -90,9 +90,9 @@ write 7690
4s "test"
1
0
7650 # record set size
7238 # record set size
0L # first offset
7638 # length
7226 # length
-1
[0x02]
0x4e8723aa
Expand All @@ -104,13 +104,13 @@ write 7690
-1s
-1
1 # records
${kafka:varint(7587)}
${kafka:varint(7175)}
[0x00]
${kafka:varint(0)}
${kafka:varint(0)}
${kafka:varint(-1)} # key
${kafka:varint(7580)} # value
${kafka:randomBytes(7580)}
${kafka:varint(7168)} # value
${kafka:randomBytes(7168)}
${kafka:varint(0)} # headers

read 44
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ read zilla:begin.ext ${proxy:beginEx()

connected

read 7690
read 7278
0s
3s
(int:requestId)
Expand All @@ -86,9 +86,9 @@ read 7690
4s "test"
1
0
7650 # record set size
7238 # record set size
0L # first offset
7638 # length
7226 # length
-1
[0x02]
[0..4]
Expand All @@ -100,13 +100,13 @@ read 7690
-1s
-1
1 # records
${kafka:varint(7587)}
${kafka:varint(7175)}
[0x00]
${kafka:varint(0)}
${kafka:varint(0)}
${kafka:varint(-1)} # key
${kafka:varint(7580)} # value
[0..7580]
${kafka:varint(7168)} # value
[0..7168]
${kafka:varint(0)} # headers

write 44
Expand Down