Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1190] Emit FinalizeBlock updates in single batch. #2260

Merged
merged 3 commits into from
Sep 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Stream FinalizeBlock updates in a batch
  • Loading branch information
teddyding committed Sep 16, 2024
commit dfa1e4726718175f72c9d1dc3626451c256d53ac
172 changes: 131 additions & 41 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,19 +490,11 @@ func (sm *FullNodeStreamingManagerImpl) TracksSubaccountId(subaccountId satypes.
return exists
}

// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 3 functions SendOrderbookUpdates, SendOrderbookFillUpdates, SendFinalizedSubaccountUpdates, we are just taking out a few lines into a helper function. No logic change

func getStreamUpdatesFromOffchainUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

) (streamUpdates []clobtypes.StreamUpdate, clobPairIds []uint32) {
// Group updates by clob pair id.
updates := make(map[uint32]*clobtypes.OffchainUpdates)
for _, message := range offchainUpdates.Messages {
Expand All @@ -514,8 +506,8 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
}

// Unmarshal each per-clob pair message to v1 updates.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for clobPairId, update := range updates {
v1updates, err := streaming_util.GetOffchainUpdatesV1(update)
if err != nil {
Expand All @@ -535,26 +527,39 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
clobPairIds = append(clobPairIds, clobPairId)
}

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// SendOrderbookUpdates groups updates by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
func (sm *FullNodeStreamingManagerImpl) SendOrderbookUpdates(
offchainUpdates *clobtypes.OffchainUpdates,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
metrics.GrpcSendOrderbookUpdatesLatency,
time.Now(),
)

streamUpdates, clobPairIds := getStreamUpdatesFromOffchainUpdates(offchainUpdates, blockHeight, execMode)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}

func (sm *FullNodeStreamingManagerImpl) getStreamUpdatesForOrderbookFills(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) (
streamUpdates []clobtypes.StreamUpdate,
clobPairIds []uint32,
) {
// Group fills by clob pair id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
clobPairIds := make([]uint32, 0)
streamUpdates = make([]clobtypes.StreamUpdate, 0)
clobPairIds = make([]uint32, 0)
for _, orderbookFill := range orderbookFills {
// If this is a deleveraging fill, fetch the clob pair id from the deleveraged
// perpetual id.
Expand All @@ -577,6 +582,29 @@ func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
streamUpdates = append(streamUpdates, streamUpdate)
clobPairIds = append(clobPairIds, clobPairId)
}
return streamUpdates, clobPairIds
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendOrderbookFillUpdates(
orderbookFills []clobtypes.StreamOrderbookFill,
blockHeight uint32,
execMode sdk.ExecMode,
perpetualIdToClobPairId map[uint32][]clobtypes.ClobPairId,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendOrderbookFillsLatency,
time.Now(),
)

streamUpdates, clobPairIds := sm.getStreamUpdatesForOrderbookFills(
orderbookFills,
blockHeight,
execMode,
perpetualIdToClobPairId,
)

sm.AddOrderUpdatesToCache(streamUpdates, clobPairIds)
}
Expand Down Expand Up @@ -609,6 +637,31 @@ func (sm *FullNodeStreamingManagerImpl) SendTakerOrderStatus(
)
}

func getStreamUpdatesForSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) (
streamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Group subaccount updates by subaccount id.
streamUpdates = make([]clobtypes.StreamUpdate, 0)
subaccountIds = make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
Expand All @@ -626,20 +679,11 @@ func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

// Group subaccount updates by subaccount id.
streamUpdates := make([]clobtypes.StreamUpdate, 0)
subaccountIds := make([]*satypes.SubaccountId, 0)
for _, subaccountUpdate := range subaccountUpdates {
streamUpdate := clobtypes.StreamUpdate{
UpdateMessage: &clobtypes.StreamUpdate_SubaccountUpdate{
SubaccountUpdate: &subaccountUpdate,
},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
}
streamUpdates = append(streamUpdates, streamUpdate)
subaccountIds = append(subaccountIds, subaccountUpdate.SubaccountId)
}
streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}
Expand Down Expand Up @@ -796,6 +840,42 @@ func (sm *FullNodeStreamingManagerImpl) GetSubaccountSnapshotsForInitStreams(
return ret
}

func (sm *FullNodeStreamingManagerImpl) addBatchUpdatesToCache(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring comment for when this is used / what exactly the batching does?

Copy link
Contributor

@jonfung-dydx jonfung-dydx Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be good to add a comments saying this should never be called without a lock since we're modifying streamUpdateCache

orderbookStreamUpdates []clobtypes.StreamUpdate,
orderbookClobPairIds []uint32,
fillStreamUpdates []clobtypes.StreamUpdate,
fillClobPairIds []uint32,
subaccountStreamUpdates []clobtypes.StreamUpdate,
subaccountIds []*satypes.SubaccountId,
) {
// Add orderbook updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, orderbookStreamUpdates...)
for _, clobPairId := range orderbookClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add fill updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, fillStreamUpdates...)
for _, clobPairId := range fillClobPairIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.clobPairIdToSubscriptionIdMapping[clobPairId],
)
}

// Add subaccount updates to cache.
sm.streamUpdateCache = append(sm.streamUpdateCache, subaccountStreamUpdates...)
for _, subaccountId := range subaccountIds {
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.subaccountIdToSubscriptionIdMapping[*subaccountId],
)
}
}

// Grpc Streaming logic after consensus agrees on a block.
// - Stream all events staged during `FinalizeBlock`.
// - Stream orderbook updates to sync fills in local ops queue.
Expand All @@ -809,28 +889,38 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(

finalizedFills, finalizedSubaccountUpdates := sm.getStagedEventsFromFinalizeBlock(ctx)

// TODO(CT-1190): Stream below in a single batch.
// Send orderbook updates to sync optimistic orderbook onchain state after FinalizeBlock.
sm.SendOrderbookUpdates(
orderbookStreamUpdates, orderbookClobPairIds := getStreamUpdatesFromOffchainUpdates(
orderBookUpdatesToSyncLocalOpsQueue,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

// Send finalized fills from FinalizeBlock.
sm.SendOrderbookFillUpdates(
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
finalizedFills,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
perpetualIdToClobPairId,
)

// Send finalized subaccount updates from FinalizeBlock.
sm.SendFinalizedSubaccountUpdates(
sm.Lock()
defer sm.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding - why do we need to lock before getStreamUpdatesForSubaccountUpdates?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't; just wanted to lock before the 1st flush. I re-ordered the code blocks a bit to optimize


subaccountStreamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
finalizedSubaccountUpdates,
uint32(ctx.BlockHeight()),
ctx.ExecMode(),
)

sm.addBatchUpdatesToCache(
orderbookStreamUpdates,
orderbookClobPairIds,
fillStreamUpdates,
fillClobPairIds,
subaccountStreamUpdates,
subaccountIds,
)

sm.FlushStreamUpdatesWithLock()
}

// getStagedEventsFromFinalizeBlock returns staged events from `FinalizeBlock`.
Expand Down
Loading