Skip to content

Commit

Permalink
[IND-394] add version to Indexer events (#289)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill committed Sep 18, 2023
1 parent 9c5fcd6 commit a3708c9
Show file tree
Hide file tree
Showing 45 changed files with 360 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ v4-proto-js/node_modules
v4-proto-js/src

.idea
**/.DS_Store
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ export interface IndexerTendermintEvent {
*/

eventIndex: number;
/** Version of the event. */

version: number;
}
/**
* IndexerTendermintEvent contains the base64 encoded event proto emitted from
Expand All @@ -158,6 +161,9 @@ export interface IndexerTendermintEventSDKType {
*/

event_index: number;
/** Version of the event. */

version: number;
}
/**
* IndexerTendermintBlock contains all the events for the block along with
Expand Down Expand Up @@ -292,7 +298,8 @@ function createBaseIndexerTendermintEvent(): IndexerTendermintEvent {
data: "",
transactionIndex: undefined,
blockEvent: undefined,
eventIndex: 0
eventIndex: 0,
version: 0
};
}

Expand All @@ -318,6 +325,10 @@ export const IndexerTendermintEvent = {
writer.uint32(40).uint32(message.eventIndex);
}

if (message.version !== 0) {
writer.uint32(48).uint32(message.version);
}

return writer;
},

Expand Down Expand Up @@ -350,6 +361,10 @@ export const IndexerTendermintEvent = {
message.eventIndex = reader.uint32();
break;

case 6:
message.version = reader.uint32();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -366,6 +381,7 @@ export const IndexerTendermintEvent = {
message.transactionIndex = object.transactionIndex ?? undefined;
message.blockEvent = object.blockEvent ?? undefined;
message.eventIndex = object.eventIndex ?? 0;
message.version = object.version ?? 0;
return message;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ const defaultPerpetualMarketTicker: string = testConstants.defaultPerpetualMarke
* @param data
* @param transactionIndex
* @param eventIndex
* @param version
* @returns
*/
export function createIndexerTendermintEvent(
subtype: string,
data: string,
transactionIndex: number,
eventIndex: number,
version: number = 1,
): IndexerTendermintEvent {
if (transactionIndex < 0) {
// blockEvent
Expand All @@ -89,6 +91,7 @@ export function createIndexerTendermintEvent(
data,
blockEvent: IndexerTendermintEvent_BlockEvent.BLOCK_EVENT_END_BLOCK,
eventIndex,
version,
};
}
// transactionIndex
Expand All @@ -97,6 +100,7 @@ export function createIndexerTendermintEvent(
data,
transactionIndex,
eventIndex,
version,
};
}

Expand Down
1 change: 1 addition & 0 deletions indexer/services/ender/__tests__/lib/helper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ describe('helper', () => {
subtype: 'order_fill',
data: 'data',
eventIndex: 0,
version: 1,
};
if (throwError) {
expect(() => indexerTendermintEventToTransactionIndex(event))
Expand Down
45 changes: 45 additions & 0 deletions indexer/services/ender/__tests__/lib/on-message.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,51 @@ describe('on-message', () => {
expect.any(Number), 1, { success: 'true' });
});

it('successfully processes block with transaction event with unset version', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;
const events: IndexerTendermintEvent[] = [
createIndexerTendermintEvent(
DydxIndexerSubtypes.SUBACCOUNT_UPDATE,
defaultSubaccountUpdateEventData,
transactionIndex,
eventIndex,
0,
),
];

const block: IndexerTendermintBlock = createIndexerTendermintBlock(
defaultHeight,
defaultTime,
events,
[defaultTxHash],
);
const binaryBlock: Uint8Array = Uint8Array.from(IndexerTendermintBlock.encode(block).finish());
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);
await Promise.all([
expectTendermintEvent(defaultHeight.toString(), transactionIndex, eventIndex),
expectTransactionWithHash([defaultTxHash]),
expectBlock(defaultHeight.toString(), defaultDateTime.toISO()),
]);

expect((SubaccountUpdateHandler as jest.Mock)).toHaveBeenCalledTimes(1);
expect((SubaccountUpdateHandler as jest.Mock)).toHaveBeenNthCalledWith(
1,
block,
events[0],
expect.any(Number),
defaultSubaccountUpdateEvent,
);
expect(stats.increment).toHaveBeenCalledWith('ender.received_kafka_message', 1);
expect(stats.timing).toHaveBeenCalledWith(
'ender.message_time_in_queue', expect.any(Number), 1, { topic: KafkaTopics.TO_ENDER });
expect(stats.gauge).toHaveBeenCalledWith('ender.processing_block_height', expect.any(Number));
expect(stats.timing).toHaveBeenCalledWith('ender.processed_block.timing',
expect.any(Number), 1, { success: 'true' });
});

it('successfully processes block with transfer event', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;
Expand Down
61 changes: 36 additions & 25 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable max-len */
import { logger } from '@dydxprotocol-indexer/base';
import { IndexerTendermintBlock, IndexerTendermintEvent } from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
Expand All @@ -20,26 +21,33 @@ import { indexerTendermintEventToEventProtoWithType, indexerTendermintEventToTra
import { KafkaPublisher } from './kafka-publisher';
import { SyncHandlers, SYNCHRONOUS_SUBTYPES } from './sync-handlers';
import {
DydxIndexerSubtypes, EventMessage, EventProtoWithType, GroupedEvents,
DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents,
} from './types';

const TXN_EVENT_SUBTYPE_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[DydxIndexerSubtypes.ORDER_FILL.toString()]: OrderFillValidator,
[DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()]: SubaccountUpdateValidator,
[DydxIndexerSubtypes.TRANSFER.toString()]: TransferValidator,
[DydxIndexerSubtypes.MARKET.toString()]: MarketValidator,
[DydxIndexerSubtypes.STATEFUL_ORDER.toString()]: StatefulOrderValidator,
[DydxIndexerSubtypes.ASSET.toString()]: AssetValidator,
[DydxIndexerSubtypes.PERPETUAL_MARKET.toString()]: PerpetualMarketValidator,
[DydxIndexerSubtypes.LIQUIDITY_TIER.toString()]: LiquidityTierValidator,
[DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()]: UpdatePerpetualValidator,
[DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()]: UpdateClobPairValidator,
const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[serializeSubtypeAndVersion(DydxIndexerSubtypes.ORDER_FILL.toString(), 1)]: OrderFillValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString(), 1)]: SubaccountUpdateValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.TRANSFER.toString(), 1)]: TransferValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.MARKET.toString(), 1)]: MarketValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.STATEFUL_ORDER.toString(), 1)]: StatefulOrderValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.ASSET.toString(), 1)]: AssetValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.PERPETUAL_MARKET.toString(), 1)]: PerpetualMarketValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.LIQUIDITY_TIER.toString(), 1)]: LiquidityTierValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPDATE_PERPETUAL.toString(), 1)]: UpdatePerpetualValidator,
[serializeSubtypeAndVersion(DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString(), 1)]: UpdateClobPairValidator,
};

const BLOCK_EVENT_SUBTYPE_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[DydxIndexerSubtypes.FUNDING.toString()]: FundingValidator,
const BLOCK_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
[serializeSubtypeAndVersion(DydxIndexerSubtypes.FUNDING.toString(), 1)]: FundingValidator,
};

function serializeSubtypeAndVersion(
subtype: string,
version: number,
): string {
return `${subtype}-${version}`;
}

export class BlockProcessor {
block: IndexerTendermintBlock;
txId: number;
Expand Down Expand Up @@ -94,7 +102,7 @@ export class BlockProcessor {
_.forEach(this.block.events, (event: IndexerTendermintEvent) => {
const transactionIndex: number = indexerTendermintEventToTransactionIndex(event);
const eventProtoWithType:
EventProtoWithType | undefined = indexerTendermintEventToEventProtoWithType(
EventProtoWithTypeAndVersion | undefined = indexerTendermintEventToEventProtoWithType(
event,
);
if (eventProtoWithType === undefined) {
Expand Down Expand Up @@ -122,50 +130,53 @@ export class BlockProcessor {
for (const eventProtoWithType of eventsInTransaction) {
this.validateAndAddHandlerForEvent(
eventProtoWithType,
TXN_EVENT_SUBTYPE_TO_VALIDATOR_MAPPING,
TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING,
);
}
}
for (const eventProtoWithType of groupedEvents.blockEvents) {
this.validateAndAddHandlerForEvent(
eventProtoWithType,
BLOCK_EVENT_SUBTYPE_TO_VALIDATOR_MAPPING,
BLOCK_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING,
);
}
}

private validateAndAddHandlerForEvent(
eventProtoWithType: EventProtoWithType,
eventProto: EventProtoWithTypeAndVersion,
validatorMap: Record<string, ValidatorInitializer>,
): void {
const Initializer:
ValidatorInitializer | undefined = validatorMap[
eventProtoWithType.type
serializeSubtypeAndVersion(
eventProto.type,
eventProto.version,
)
];
if (Initializer === undefined) {
const message: string = `cannot process subtype ${eventProtoWithType.type}`;
const message: string = `cannot process subtype ${eventProto.type} and version ${eventProto.version}`;
logger.error({
at: 'onMessage#saveTendermintEventData',
message,
eventProtoWithType,
eventProto,
});
return;
}

const validator: Validator<EventMessage> = new Initializer(
eventProtoWithType.eventProto,
eventProto.eventProto,
this.block,
);

validator.validate();
const handlers: Handler<EventMessage>[] = validator.createHandlers(
eventProtoWithType.indexerTendermintEvent,
eventProto.indexerTendermintEvent,
this.txId,
);

_.map(handlers, (handler: Handler<EventMessage>) => {
if (SYNCHRONOUS_SUBTYPES.includes(eventProtoWithType.type as DydxIndexerSubtypes)) {
this.syncHandlers.addHandler(eventProtoWithType.type, handler);
if (SYNCHRONOUS_SUBTYPES.includes(eventProto.type as DydxIndexerSubtypes)) {
this.syncHandlers.addHandler(eventProto.type, handler);
} else {
this.batchedHandlers.addHandler(handler);
}
Expand Down
17 changes: 15 additions & 2 deletions indexer/services/ender/src/lib/helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import {
import { base64StringToBinary } from '../helpers/encoding-helper';
import {
DydxIndexerSubtypes,
EventProtoWithType,
EventProtoWithTypeAndVersion,
} from './types';

export function indexerTendermintEventToTransactionIndex(
Expand Down Expand Up @@ -79,84 +79,97 @@ export function dateToDateTime(
*/
export function indexerTendermintEventToEventProtoWithType(
event: IndexerTendermintEvent,
): EventProtoWithType | undefined {
): EventProtoWithTypeAndVersion | undefined {
const eventDataBinary: Uint8Array = base64StringToBinary(event.data);
// set the default version to 1
const version: number = event.version === 0 ? 1 : event.version;
switch (event.subtype) {
case (DydxIndexerSubtypes.ORDER_FILL.toString()): {
return {
type: DydxIndexerSubtypes.ORDER_FILL,
eventProto: OrderFillEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.SUBACCOUNT_UPDATE.toString()): {
return {
type: DydxIndexerSubtypes.SUBACCOUNT_UPDATE,
eventProto: SubaccountUpdateEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.TRANSFER.toString()): {
return {
type: DydxIndexerSubtypes.TRANSFER,
eventProto: TransferEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.MARKET.toString()): {
return {
type: DydxIndexerSubtypes.MARKET,
eventProto: MarketEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.STATEFUL_ORDER.toString()): {
return {
type: DydxIndexerSubtypes.STATEFUL_ORDER,
eventProto: StatefulOrderEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.FUNDING.toString()): {
return {
type: DydxIndexerSubtypes.FUNDING,
eventProto: FundingEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.ASSET.toString()): {
return {
type: DydxIndexerSubtypes.ASSET,
eventProto: AssetCreateEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.PERPETUAL_MARKET.toString()): {
return {
type: DydxIndexerSubtypes.PERPETUAL_MARKET,
eventProto: PerpetualMarketCreateEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.LIQUIDITY_TIER.toString()): {
return {
type: DydxIndexerSubtypes.LIQUIDITY_TIER,
eventProto: LiquidityTierUpsertEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.UPDATE_PERPETUAL.toString()): {
return {
type: DydxIndexerSubtypes.UPDATE_PERPETUAL,
eventProto: UpdatePerpetualEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
case (DydxIndexerSubtypes.UPDATE_CLOB_PAIR.toString()): {
return {
type: DydxIndexerSubtypes.UPDATE_CLOB_PAIR,
eventProto: UpdateClobPairEventV1.decode(eventDataBinary),
indexerTendermintEvent: event,
version,
};
}
default: {
Expand Down
Loading

0 comments on commit a3708c9

Please sign in to comment.