Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event logs for Model #874

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Event logs for Model
  • Loading branch information
ankitk-me committed Mar 21, 2024
commit cfcb04dc94a3985cc2f6c27b27c248a1738d32e2
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ public int resolve(

@Override
public int decode(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
Expand All @@ -195,7 +197,7 @@ public int decode(

if (schemaId > NO_SCHEMA_ID)
{
valLength = decoder.accept(schemaId, data, index + progress, length - progress, next);
valLength = decoder.accept(traceId, bindingId, schemaId, data, index + progress, length - progress, next);
}
return valLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void shouldResolveSchemaIdAndProcessData()
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);

int valLength = catalog.decode(data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);
int valLength = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);

assertEquals(data.capacity() - 9, valLength);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void shouldResolveSchemaIdAndProcessData()
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);

int valLength = catalog.decode(data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);
int valLength = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);

assertEquals(data.capacity(), valLength);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ public int resolve(

@Override
public int decode(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
Expand All @@ -151,7 +153,7 @@ public int decode(

if (schemaId > NO_SCHEMA_ID)
{
valLength = decoder.accept(schemaId, data, index + progress, length - progress, next);
valLength = decoder.accept(traceId, bindingId, schemaId, data, index + progress, length - progress, next);
}
return valLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void shouldResolveSchemaIdAndProcessData()
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);

int valLength = catalog.decode(data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);
int valLength = catalog.decode(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP, CatalogHandler.Decoder.IDENTITY);

assertEquals(data.capacity() - 5, valLength);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,19 @@ public int padding(

@Override
public int convert(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next)
{
return handler.decode(data, index, length, next, this::decodePayload);
return handler.decode(traceId, bindingId, data, index, length, next, this::decodePayload);
}

private int decodePayload(
long traceId,
long bindingId,
int schemaId,
DirectBuffer data,
int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public int padding(

@Override
public int convert(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void shouldVerifyValidAvroEvent()
byte[] bytes = {0x06, 0x69, 0x64,
0x30, 0x10, 0x70, 0x6f, 0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);
assertEquals(data.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(data.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -100,7 +100,7 @@ public void shouldWriteValidAvroEvent()
byte[] bytes = {0x06, 0x69, 0x64, 0x30, 0x10, 0x70, 0x6f,
0x73, 0x69, 0x74, 0x69, 0x76, 0x65};
data.wrap(bytes, 0, bytes.length);
assertEquals(data.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(data.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -118,7 +118,7 @@ public void shouldVerifyInvalidAvroEvent()

byte[] bytes = {0x06, 0x69, 0x64, 0x30, 0x10};
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(-1, converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand Down Expand Up @@ -158,10 +158,10 @@ public void shouldReadAvroEventExpectJson()
DirectBuffer expected = new UnsafeBuffer();
expected.wrap(json.getBytes(), 0, json.getBytes().length);

int progress = converter.convert(data, 0, data.capacity(), ValueConsumer.NOP);
int progress = converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP);
assertEquals(expected.capacity(), progress);

assertEquals(expected.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(expected.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand Down Expand Up @@ -200,10 +200,10 @@ public void shouldWriteJsonEventExpectAvro()

DirectBuffer data = new UnsafeBuffer();
data.wrap(payload.getBytes(), 0, payload.getBytes().length);
int progress = converter.convert(data, 0, data.capacity(), ValueConsumer.NOP);
int progress = converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP);
assertEquals(expected.capacity(), progress);

assertEquals(expected.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(expected.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ public Int32ConverterHandler(

@Override
public int convert(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next)
{
return handler.validate(FLAGS_COMPLETE, data, index, length, next) ? length : VALIDATION_FAILURE;
return handler.validate(traceId, bindingId, FLAGS_COMPLETE, data, index, length, next)
? length
: VALIDATION_FAILURE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public Int32ValidatorHandler(

@Override
public boolean validate(
long traceId,
long bindingId,
int flags,
DirectBuffer data,
int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ public Int64ConverterHandler(

@Override
public int convert(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
ValueConsumer next)
{
return handler.validate(FLAGS_COMPLETE, data, index, length, next) ? length : VALIDATION_FAILURE;
return handler.validate(traceId, bindingId, FLAGS_COMPLETE, data, index, length, next)
? length
: VALIDATION_FAILURE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public Int64ValidatorHandler(

@Override
public boolean validate(
long traceId,
long bindingId,
int flags,
DirectBuffer data,
int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public StringConverterHandler(

@Override
public int convert(
long traceId,
long bindingId,
DirectBuffer data,
int index,
int length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public StringValidatorHandler(

@Override
public boolean validate(
long traceId,
long bindingId,
int flags,
DirectBuffer data,
int index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void shouldVerifyValidInteger()

byte[] bytes = {0, 0, 0, 42};
data.wrap(bytes, 0, bytes.length);
assertEquals(data.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(data.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -47,6 +47,6 @@ public void shouldVerifyInvalidInteger()

byte[] bytes = "Not an Integer".getBytes();
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(-1, converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void shouldVerifyValidIntegerCompleteMessage()

byte[] bytes = {0, 0, 0, 42};
data.wrap(bytes, 0, bytes.length);
assertTrue(handler.validate(data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -54,7 +54,7 @@ public void shouldVerifyValidSignedIntegerCompleteMessage()

byte[] bytes = {-1, -1, -1, -25};
data.wrap(bytes, 0, bytes.length);
assertTrue(handler.validate(data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -67,7 +67,7 @@ public void shouldVerifyValidAsciiIntegerCompleteMessage()
String payload = "+8449999";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertTrue(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -82,7 +82,7 @@ public void shouldVerifyValidTextIntegerMaxLimit()
String payload = "8449999";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertFalse(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -98,7 +98,7 @@ public void shouldVerifyValidTextIntegerExclusiveMaxLimit()
String payload = "999";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertFalse(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -114,7 +114,7 @@ public void shouldVerifyValidTextIntegerExclusiveMinLimit()
String payload = "999";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertFalse(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -127,7 +127,7 @@ public void shouldVerifyInvalidAsciiIntegerCompleteMessage()
String payload = "-.1a1";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertFalse(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -140,7 +140,7 @@ public void shouldVerifyValidAsciiNegativeCompleteMessage()
String payload = "-125";
byte[] bytes = payload.getBytes();
data.wrap(bytes, 0, bytes.length);
assertTrue(handler.validate(ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_COMPLETE, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -154,13 +154,13 @@ public void shouldVerifyValidAsciiFragmentedMessage()
byte[] bytes = payload.getBytes();

data.wrap(bytes, 0, 2);
assertTrue(handler.validate(ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));

data.wrap(bytes, 2, 1);
assertTrue(handler.validate(0x00, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, 0x00, data, 0, data.capacity(), ValueConsumer.NOP));

data.wrap(bytes, 3, 1);
assertTrue(handler.validate(ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -174,10 +174,10 @@ public void shouldVerifyInvalidAsciiFragmentedMessage()
byte[] bytes = payload.getBytes();

data.wrap(bytes, 0, 2);
assertTrue(handler.validate(ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));

data.wrap(bytes, 2, 1);
assertFalse(handler.validate(0x00, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, 0x00, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -192,13 +192,13 @@ public void shouldVerifyValidIntegerFragmentedMessage()
byte[] bytes = {0, 0, 1, 42};

data.wrap(bytes, 0, 2);
assertTrue(handler.validate(ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));

data.wrap(bytes, 2, 1);
assertTrue(handler.validate(0x00, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, 0x00, data, 0, data.capacity(), ValueConsumer.NOP));

data.wrap(bytes, 3, 1);
assertTrue(handler.validate(ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -212,7 +212,7 @@ public void shouldVerifyInvalidIntegerCompleteMessage()

byte[] bytes = "Test value".getBytes();
data.wrap(bytes, 0, bytes.length);
assertFalse(handler.validate(data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -226,14 +226,14 @@ public void shouldVerifyInValidIntegerFragmentedMessage()

byte[] firstFragment = {0, 0, 0};
data.wrap(firstFragment, 0, firstFragment.length);
assertTrue(handler.validate(ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));
assertTrue(handler.validate(0L, 0L, ValidatorHandler.FLAGS_INIT, data, 0, data.capacity(), ValueConsumer.NOP));

byte[] secondFragment = {0, 0};
data.wrap(secondFragment, 0, secondFragment.length);
assertFalse(handler.validate(0x00, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, 0x00, data, 0, data.capacity(), ValueConsumer.NOP));

byte[] finalFragment = {42};
data.wrap(finalFragment, 0, finalFragment.length);
assertFalse(handler.validate(ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
assertFalse(handler.validate(0L, 0L, ValidatorHandler.FLAGS_FIN, data, 0, data.capacity(), ValueConsumer.NOP));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void shouldVerifyValidInt64()

byte[] bytes = {0, 0, 0, 0, 0, 0, 0, 42};
data.wrap(bytes, 0, bytes.length);
assertEquals(data.capacity(), converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(data.capacity(), converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}

@Test
Expand All @@ -47,6 +47,6 @@ public void shouldVerifyInvalidInt64()

byte[] bytes = "Not an Int64".getBytes();
data.wrap(bytes, 0, bytes.length);
assertEquals(-1, converter.convert(data, 0, data.capacity(), ValueConsumer.NOP));
assertEquals(-1, converter.convert(0L, 0L, data, 0, data.capacity(), ValueConsumer.NOP));
}
}
Loading
Loading