Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LRU eviction with 1gb memory limit for PandasData #392

Merged
merged 15 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Disable pandas_data eviction by default.
  • Loading branch information
Jim White committed Mar 12, 2024
commit 5b7725d6e3f6bdc2de50136295b25b7cae2f1910
35 changes: 27 additions & 8 deletions lumibot/backtesting/polygon_backtesting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import traceback
from collections import defaultdict
from collections import defaultdict, OrderedDict
from datetime import date, timedelta

from polygon import RESTClient
Expand All @@ -20,6 +20,10 @@ class PolygonDataBacktesting(PandasData):
Backtesting implementation of Polygon
"""

# Size limit for the pandas_data and _data_store (dicts of Pandas DataFrames) in bytes.
# Set to None to disable the limit.
MAX_STORAGE_BYTES = None

def __init__(
self,
datetime_start,
Expand All @@ -37,7 +41,17 @@ def __init__(
# RESTClient API for Polygon.io polygon-api-client
self.polygon_client = RESTClient(self._api_key)

def update_pandas_data(self, asset, quote, length, timestep, start_dt=None, update_data_store=False):
@staticmethod
def _enforce_storage_limit(pandas_data: OrderedDict):
storage_used = sum(data.df.memory_usage().sum() for data in pandas_data.values())
logging.info(f"{storage_used = :,} bytes for {len(pandas_data)} items")
while storage_used > PolygonDataBacktesting.MAX_STORAGE_BYTES:
k, d = pandas_data.popitem(last=False)
mu = d.df.memory_usage().sum()
storage_used -= mu
logging.info(f"Storage limit exceeded. Evicted LRU data: {k} used {mu:,} bytes")

def _update_pandas_data(self, asset, quote, length, timestep, start_dt=None, update_data_store=False):
"""
Get asset data and update the self.pandas_data dictionary.

Expand All @@ -53,6 +67,10 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None, upda
The timestep to use. For example, "1minute" or "1hour" or "1day".
start_dt : datetime
The start datetime to use. If None, the current self.start_datetime will be used.
update_data_store : bool
If True, the data will also be added to the self._data_store dictionary.
That update will not include the adjustments made by PandasData.load_data.
See https://github.com/Lumiwealth/lumibot/issues/390 and its PR if you have questions or answers.
"""
search_asset = asset
asset_separated = asset
Expand Down Expand Up @@ -182,12 +200,13 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None, upda

# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
# Don't let memory usage get out of control
self._enforce_storage_limit(self.pandas_data)
if PolygonDataBacktesting.MAX_STORAGE_BYTES:
self._enforce_storage_limit(self.pandas_data)
if update_data_store:
# TODO: Why do we have both self.pandas_data and self._data_store?
self._data_store.update(pandas_data_update)
self._enforce_storage_limit(self._data_store)
if PolygonDataBacktesting.MAX_STORAGE_BYTES:
self._enforce_storage_limit(self._data_store)

def _pull_source_symbol_bars(
self,
Expand All @@ -204,7 +223,7 @@ def _pull_source_symbol_bars(
start_dt, ts_unit = self.get_start_datetime_and_ts_unit(length, timestep, current_dt, start_buffer=START_BUFFER)

# Get data from Polygon
self.update_pandas_data(asset, quote, length, timestep, start_dt)
self._update_pandas_data(asset, quote, length, timestep, start_dt)

return super()._pull_source_symbol_bars(
asset, length, timestep, timeshift, quote, exchange, include_after_hours
Expand All @@ -221,7 +240,7 @@ def get_historical_prices_between_dates(
start_date=None,
end_date=None,
):
self.update_pandas_data(asset, quote, 1, timestep)
self._update_pandas_data(asset, quote, 1, timestep)

response = super()._pull_source_symbol_bars_between_dates(
asset, timestep, quote, exchange, include_after_hours, start_date, end_date
Expand All @@ -236,7 +255,7 @@ def get_historical_prices_between_dates(
def get_last_price(self, asset, timestep="minute", quote=None, exchange=None, **kwargs):
try:
dt = self.get_datetime()
self.update_pandas_data(asset, quote, 1, timestep, dt, update_data_store=True)
self._update_pandas_data(asset, quote, 1, timestep, dt, update_data_store=True)
except Exception as e:
print(f"Error get_last_price from Polygon: {e}")

Expand Down
13 changes: 0 additions & 13 deletions lumibot/data_sources/pandas_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from lumibot.entities import Asset, AssetsMapping, Bars


MAX_STORAGE_BYTES = 1_000_000_000 # 1 GB

class PandasData(DataSourceBacktesting):
"""
PandasData is a Backtesting-only DataSource that uses a Pandas DataFrame (read from CSV) as the source of
Expand Down Expand Up @@ -64,17 +62,6 @@ def _get_new_pandas_data_key(data):
new_pandas_data[key] = data

return new_pandas_data

@staticmethod
def _enforce_storage_limit(pandas_data: OrderedDict):
storage_used = sum(data.df.memory_usage().sum() for data in pandas_data.values())
logging.info(f"{storage_used = :,} bytes for {len(pandas_data)} items")
while storage_used > MAX_STORAGE_BYTES:
k, d = pandas_data.popitem(last=False)
mu = d.df.memory_usage().sum()
storage_used -= mu
logging.info(f"Storage limit exceeded. Evicted LRU data: {k} used {mu:,} bytes")
return

def load_data(self):
self._data_store = self.pandas_data
Expand Down
Loading