Skip to content

Commit

Permalink
[CT-874] Add realizedPnl/unrealizedPnl to consolidated subaccount web…
Browse files Browse the repository at this point in the history
…socket msg sent from fills/liquidations/deleveraging Ender handlers (backport #1603) (#1609)

Co-authored-by: dydxwill <119354122+dydxwill@users.noreply.github.com>
  • Loading branch information
mergify[bot] and dydxwill committed May 31, 2024
1 parent 11f47b6 commit 6b16c95
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 14 deletions.
44 changes: 38 additions & 6 deletions indexer/services/ender/__tests__/helpers/indexer-proto-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ import {
PerpetualMarketFromDatabase,
PerpetualMarketTable,
IsoString,
fillTypeToTradeType, OrderSubaccountMessageContents,
fillTypeToTradeType,
OrderSubaccountMessageContents,
MarketFromDatabase,
MarketTable,
MarketsMap,
MarketColumns,
UpdatedPerpetualPositionSubaccountKafkaObject,
} from '@dydxprotocol-indexer/postgres';
import { getOrderIdHash, ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
import {
Expand All @@ -52,14 +58,13 @@ import {
PerpetualMarketCreateEventV2,
DeleveragingEventV1,
protoTimestampToDate,
} from '@dydxprotocol-indexer/v4-protos';
import {
PerpetualMarketType,
} from '@dydxprotocol-indexer/v4-protos/build/codegen/dydxprotocol/indexer/protocol/v1/perpetual';
} from '@dydxprotocol-indexer/v4-protos';
import { IHeaders, Message, ProducerRecord } from 'kafkajs';
import _ from 'lodash';

import {
annotateWithPnl,
convertPerpetualPosition,
generateFillSubaccountMessage,
generatePerpetualMarketMessage,
Expand Down Expand Up @@ -676,13 +681,27 @@ export async function expectFillSubaccountKafkaMessageFromLiquidationEvent(
expect(fill).toBeDefined();
expect(position).toBeDefined();

const markets: MarketFromDatabase[] = await MarketTable.findAll(
{},
[],
);
const marketIdToMarket: MarketsMap = _.keyBy(
markets,
MarketColumns.id,
);
const positionUpdate = annotateWithPnl(
convertPerpetualPosition(position!),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);

const contents: SubaccountMessageContents = {
fills: [
generateFillSubaccountMessage(fill!, ticker),
],
perpetualPositions: generatePerpetualPositionsContents(
subaccountIdProto,
[convertPerpetualPosition(position!)],
[positionUpdate],
perpetualMarketRefresher.getPerpetualMarketsMap(),
),
blockHeight,
Expand Down Expand Up @@ -804,9 +823,22 @@ export async function expectOrderFillAndPositionSubaccountKafkaMessageFromIds(
};

if (position !== undefined) {
const markets: MarketFromDatabase[] = await MarketTable.findAll(
{},
[],
);
const marketIdToMarket: MarketsMap = _.keyBy(
markets,
MarketColumns.id,
);
const positionUpdate: UpdatedPerpetualPositionSubaccountKafkaObject = annotateWithPnl(
convertPerpetualPosition(position),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);
contents.perpetualPositions = generatePerpetualPositionsContents(
subaccountIdProto,
[convertPerpetualPosition(position)],
[positionUpdate],
perpetualMarketRefresher.getPerpetualMarketsMap(),
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ import { logger } from '@dydxprotocol-indexer/base';
import {
FillFromDatabase,
FillModel,
MarketColumns,
MarketFromDatabase,
MarketsMap,
MarketTable,
PerpetualMarketFromDatabase,
PerpetualMarketModel,
perpetualMarketRefresher,
PerpetualPositionFromDatabase,
PerpetualPositionModel,
SubaccountTable,
UpdatedPerpetualPositionSubaccountKafkaObject,
} from '@dydxprotocol-indexer/postgres';
import { DeleveragingEventV1 } from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import * as pg from 'pg';

import { SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { ConsolidatedKafkaEvent } from '../../lib/types';
import { AbstractOrderFillHandler } from './abstract-order-fill-handler';

Expand Down Expand Up @@ -59,18 +66,38 @@ export class DeleveragingHandler extends AbstractOrderFillHandler<DeleveragingEv
const offsettingPerpetualPosition:
PerpetualPositionFromDatabase = PerpetualPositionModel.fromJson(
resultRow.offsetting_perpetual_position) as PerpetualPositionFromDatabase;
const markets: MarketFromDatabase[] = await MarketTable.findAll(
{},
[],
{ txId: this.txId },
);
const marketIdToMarket: MarketsMap = _.keyBy(
markets,
MarketColumns.id,
);

const liquidatedPositionUpdate: UpdatedPerpetualPositionSubaccountKafkaObject = annotateWithPnl(
convertPerpetualPosition(liquidatedPerpetualPosition),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);
const offsettingPositionUpdate: UpdatedPerpetualPositionSubaccountKafkaObject = annotateWithPnl(
convertPerpetualPosition(offsettingPerpetualPosition),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);
const kafkaEvents: ConsolidatedKafkaEvent[] = [
this.generateConsolidatedKafkaEvent(
this.event.liquidated!,
undefined,
liquidatedPerpetualPosition,
liquidatedPositionUpdate,
liquidatedFill,
perpetualMarket,
),
this.generateConsolidatedKafkaEvent(
this.event.offsetting!,
undefined,
offsettingPerpetualPosition,
offsettingPositionUpdate,
offsettingFill,
perpetualMarket,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ import {
PerpetualPositionModel,
SubaccountTable,
OrderStatus,
MarketFromDatabase,
MarketTable,
MarketsMap,
MarketColumns,
UpdatedPerpetualPositionSubaccountKafkaObject,
perpetualMarketRefresher,
} from '@dydxprotocol-indexer/postgres';
import { StateFilledQuantumsCache } from '@dydxprotocol-indexer/redis';
import { isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import {
LiquidationOrderV1, IndexerOrderId,
} from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import Long from 'long';
import * as pg from 'pg';

import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE, SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { convertPerpetualPosition } from '../../helpers/kafka-helper';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { redisClient } from '../../helpers/redis/redis-controller';
import {
orderFillWithLiquidityToOrderFillEventWithLiquidation,
Expand Down Expand Up @@ -95,6 +102,22 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
const position: PerpetualPositionFromDatabase = PerpetualPositionModel.fromJson(
resultRow[field].perpetual_position) as PerpetualPositionFromDatabase;

const markets: MarketFromDatabase[] = await MarketTable.findAll(
{},
[],
{ txId: this.txId },
);
const marketIdToMarket: MarketsMap = _.keyBy(
markets,
MarketColumns.id,
);

const positionUpdate: UpdatedPerpetualPositionSubaccountKafkaObject = annotateWithPnl(
convertPerpetualPosition(position),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);

if (this.event.liquidity === Liquidity.MAKER) {
// Must be done in this order, because fills refer to an order
// We do not create a taker order for liquidations.
Expand All @@ -107,12 +130,11 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
this.getTotalFilled(castedLiquidationFillEventMessage).toString(),
redisClient,
);

const kafkaEvents: ConsolidatedKafkaEvent[] = [
this.generateConsolidatedKafkaEvent(
castedLiquidationFillEventMessage.makerOrder.orderId!.subaccountId!,
makerOrder,
convertPerpetualPosition(position),
positionUpdate,
fill,
perpetualMarket,
),
Expand All @@ -137,7 +159,7 @@ export class LiquidationHandler extends AbstractOrderFillHandler<OrderFillWithLi
this.generateConsolidatedKafkaEvent(
castedLiquidationFillEventMessage.liquidationOrder.liquidated!,
undefined,
convertPerpetualPosition(position),
positionUpdate,
fill,
perpetualMarket,
),
Expand Down
25 changes: 23 additions & 2 deletions indexer/services/ender/src/handlers/order-fills/order-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,24 @@ import {
PerpetualPositionModel,
SubaccountTable,
OrderStatus,
MarketFromDatabase,
MarketTable,
MarketsMap,
MarketColumns,
perpetualMarketRefresher,
UpdatedPerpetualPositionSubaccountKafkaObject,
} from '@dydxprotocol-indexer/postgres';
import { StateFilledQuantumsCache } from '@dydxprotocol-indexer/redis';
import { isStatefulOrder } from '@dydxprotocol-indexer/v4-proto-parser';
import {
IndexerOrderId, IndexerSubaccountId, IndexerOrder,
} from '@dydxprotocol-indexer/v4-protos';
import _ from 'lodash';
import Long from 'long';
import * as pg from 'pg';

import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE, SUBACCOUNT_ORDER_FILL_EVENT_TYPE } from '../../constants';
import { convertPerpetualPosition } from '../../helpers/kafka-helper';
import { annotateWithPnl, convertPerpetualPosition } from '../../helpers/kafka-helper';
import { redisClient } from '../../helpers/redis/redis-controller';
import { orderFillWithLiquidityToOrderFillEventWithOrder } from '../../helpers/translation-helper';
import { OrderFillWithLiquidity } from '../../lib/translated-types';
Expand Down Expand Up @@ -79,11 +86,25 @@ export class OrderHandler extends AbstractOrderFillHandler<OrderFillWithLiquidit
} else {
subaccountId = castedOrderFillEventMessage.order.orderId!.subaccountId!;
}
const markets: MarketFromDatabase[] = await MarketTable.findAll(
{},
[],
{ txId: this.txId },
);
const marketIdToMarket: MarketsMap = _.keyBy(
markets,
MarketColumns.id,
);
const positionUpdate: UpdatedPerpetualPositionSubaccountKafkaObject = annotateWithPnl(
convertPerpetualPosition(position),
perpetualMarketRefresher.getPerpetualMarketsMap(),
marketIdToMarket,
);
kafkaEvents.push(
this.generateConsolidatedKafkaEvent(
subaccountId,
order,
convertPerpetualPosition(position),
positionUpdate,
fill,
perpetualMarket,
),
Expand Down

0 comments on commit 6b16c95

Please sign in to comment.