Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing a bug with missing event data on initialization #1692

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions src/agent0/chainsync/exec/acquire_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ def acquire_data(
suppress_logs: bool = False,
progress_bar: bool = False,
backfill=True,
force_init=False,
):
"""Execute the data acquisition pipeline.

Expand Down Expand Up @@ -74,11 +75,16 @@ def acquire_data(
If true, will show a progress bar. Defaults to False.
backfill: bool, optional
If true, will fill in missing pool info data for every block. Defaults to True.
force_init: bool, optional
If true, will explicitly use start block on query instead of depending on latest pool info.
This is useful when we initialize an existing pool object and need to initialize
the db.
"""

# TODO cleanup
# pylint: disable=too-many-arguments
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
# pylint: disable=too-many-branches
# pylint: disable=too-many-positional-arguments
# TODO implement logger instead of global logging to suppress based on module name.
Expand Down Expand Up @@ -116,9 +122,12 @@ def acquire_data(

## Get starting point for restarts
# Get last entry of pool info in db
data_latest_block_number = get_latest_block_number_from_pool_info_table(db_session)
# Using max of latest block in database or specified start block
curr_write_block = max(start_block, data_latest_block_number + 1)
if force_init:
curr_write_block = start_block
else:
data_latest_block_number = get_latest_block_number_from_pool_info_table(db_session)
# Using max of latest block in database or specified start block
curr_write_block = max(start_block, data_latest_block_number + 1)

latest_mined_block = int(interfaces[0].get_block_number(interfaces[0].get_current_block()))
if (latest_mined_block - curr_write_block) > lookback_block_limit:
Expand Down
19 changes: 14 additions & 5 deletions src/agent0/core/hyperdrive/interactive/local_hyperdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,17 @@ def __init__(
if backfill_data_start_block is not None:
logging.info("Backfilling data from block %s to %s", self._data_start_block, chain.block_number())
# If this is set, we ignore manual sync and always sync the database
self.sync_database(progress_bar=True, force_backfill=True)
self.sync_database(progress_bar=True, force_backfill=True, force_init=True)
else:
# Otherwise, we do this lazily if manual sync is on
self._maybe_run_blocking_data_pipeline()
self._maybe_run_blocking_data_pipeline(force_init=True)

def sync_database(
self, start_block: int | None = None, progress_bar: bool = False, force_backfill: bool = False
self,
start_block: int | None = None,
progress_bar: bool = False,
force_backfill: bool = False,
force_init: bool = False,
) -> None:
"""Explicitly syncs the database with the chain.
This function doesn't need to be explicitly called if `manual_database_sync = False`.
Expand All @@ -385,6 +389,8 @@ def sync_database(
force_backfill: bool, optional
If True, will force backfilling pool info data. Otherwise will look for
`chain.config.backfill_pool_info` in the chain config.
force_init: bool, optional
If True, will explicitly initialize the database with this pool's info.
"""
if start_block is None:
data_start_block = self._data_start_block
Expand All @@ -406,6 +412,7 @@ def sync_database(
suppress_logs=True,
progress_bar=progress_bar,
backfill=backfill,
force_init=force_init,
)
analyze_data(
start_block=analysis_start_block,
Expand All @@ -416,10 +423,12 @@ def sync_database(
calc_pnl=self.calc_pnl,
)

def _maybe_run_blocking_data_pipeline(self, start_block: int | None = None, progress_bar: bool = False) -> None:
def _maybe_run_blocking_data_pipeline(
self, start_block: int | None = None, progress_bar: bool = False, force_init: bool = False
) -> None:
# Checks the chain config to see if manual sync is on. Noop if it is.
if not self.chain.config.manual_database_sync:
self.sync_database(start_block, progress_bar)
self.sync_database(start_block, progress_bar, force_init=force_init)

# We overwrite these dunder methods to allow this object to be used as a dictionary key
# This is used to allow chain's `advance_time` function to return this object as a key.
Expand Down
Loading