Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LRU eviction with 1gb memory limit for PandasData #392

Merged
merged 15 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions lumibot/backtesting/polygon_backtesting.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import traceback
from collections import defaultdict
from collections import defaultdict, OrderedDict
from datetime import date, timedelta

from polygon import RESTClient
Expand All @@ -20,6 +20,10 @@ class PolygonDataBacktesting(PandasData):
Backtesting implementation of Polygon
"""

# Size limit for the pandas_data and _data_store (dicts of Pandas DataFrames) in bytes.
# Set to None to disable the limit.
MAX_STORAGE_BYTES = None

def __init__(
self,
datetime_start,
Expand All @@ -37,7 +41,17 @@ def __init__(
# RESTClient API for Polygon.io polygon-api-client
self.polygon_client = RESTClient(self._api_key)

def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
@staticmethod
def _enforce_storage_limit(pandas_data: OrderedDict):
storage_used = sum(data.df.memory_usage().sum() for data in pandas_data.values())
logging.info(f"{storage_used = :,} bytes for {len(pandas_data)} items")
while storage_used > PolygonDataBacktesting.MAX_STORAGE_BYTES:
k, d = pandas_data.popitem(last=False)
mu = d.df.memory_usage().sum()
storage_used -= mu
logging.info(f"Storage limit exceeded. Evicted LRU data: {k} used {mu:,} bytes")

def _update_pandas_data(self, asset, quote, length, timestep, start_dt=None, update_data_store=False):
"""
Get asset data and update the self.pandas_data dictionary.

Expand All @@ -51,11 +65,12 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
The number of data points to get.
timestep : str
The timestep to use. For example, "1minute" or "1hour" or "1day".

Returns
-------
dict
A dictionary with the keys being the asset and the values being the PandasData objects.
start_dt : datetime
The start datetime to use. If None, the current self.start_datetime will be used.
update_data_store : bool
If True, the data will also be added to the self._data_store dictionary.
That update will not include the adjustments made by PandasData.load_data.
See https://github.com/Lumiwealth/lumibot/issues/391 and its PR for further discussion.
"""
search_asset = asset
asset_separated = asset
Expand Down Expand Up @@ -84,22 +99,22 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
if data_timestep == ts_unit:
# Check if we have enough data (5 days is the buffer we subtracted from the start datetime)
if (data_start_datetime - start_datetime) < START_BUFFER:
return None
return

# Always try to get the lowest timestep possible because we can always resample
# If day is requested then make sure we at least have data that's less than a day
if ts_unit == "day":
if data_timestep == "minute":
# Check if we have enough data (5 days is the buffer we subtracted from the start datetime)
if (data_start_datetime - start_datetime) < START_BUFFER:
return None
return
else:
# We don't have enough data, so we need to get more (but in minutes)
ts_unit = "minute"
elif data_timestep == "hour":
# Check if we have enough data (5 days is the buffer we subtracted from the start datetime)
if (data_start_datetime - start_datetime) < START_BUFFER:
return None
return
else:
# We don't have enough data, so we need to get more (but in hours)
ts_unit = "hour"
Expand All @@ -109,7 +124,7 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
if data_timestep == "minute":
# Check if we have enough data (5 days is the buffer we subtracted from the start datetime)
if (data_start_datetime - start_datetime) < START_BUFFER:
return None
return
else:
# We don't have enough data, so we need to get more (but in minutes)
ts_unit = "minute"
Expand Down Expand Up @@ -177,15 +192,21 @@ def update_pandas_data(self, asset, quote, length, timestep, start_dt=None):
logging.error(traceback.format_exc())
raise Exception("Error getting data from Polygon") from e

if df is None:
return None
if (df is None) or df.empty:
return

pandas_data = []
data = Data(asset_separated, df, timestep=ts_unit, quote=quote_asset)
pandas_data.append(data)
pandas_data_updated = self._set_pandas_data_keys(pandas_data)

return pandas_data_updated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is pandas_data_updated no longer being returned? are you sure this won't break anything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only references to update_pandas_data in the library are in this file and it looked to be an internal method to me. The purpose of the method seems to be about the side effect of updating PandasData.pandas_data and (sometimes) PandasData._data_store. If update_pandas_data is part of the API I can certainly put the return values back in.

My question is why do we have both _data_store and pandas_data in the super class PandasData? They hold nearly the same data but there is some kind of staging effect (i.e. out of sync behavior) whose top-level is PandasData.load_data. For big backtests that doubling of RAM usage is a significant cost and I don't know what it is for (although as I mentioned elsewhere I haven't read the docs...). If the two copies are needed and the selective updating has a purpose then I'll include that when adding a docstring to update_data_store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok that makes sense, in that case I think you're right to re-arrange it like that.

As for your question about _data_store vs pandas_data: I believe one of them is used for inputting data at the very beginning, while the other is used at the actual data store. I think you're right that we do not need both and would probably be better off removing one of them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, yes. After looking at the code some more, even though the name is update_pandas_data it seems the actual purpose is to update _data_store.

AFAICT pandas_data is only used by load_store. It could be the idea is to do get_X calls in order to populate pandas_data before load_data. But that doesn't seem to make a lot of sense because the only place that updates both is PolygonBacktesting.get_last_price.

How about I rename update_pandas_data to update_data_store and drop the option to update pandas_data. That way if someone tries to upgrade and this would break them then they'll know.

Copy link
Collaborator Author

@jimwhite jimwhite Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nevermind. I had it backwards.

The overrides PolygonBacktesting._pull_source_symbol_bars and PolygonBacktesting.get_historical_prices_between_dates call update_pandas_data and don't update _data_store. So they rely on calling PandasData.load_data by _Strategy.init to do some fixing up of dates or something related to options ("expiries"):

self.broker.data_source.load_data()

def load_data(self):

It seems as though those adjustments done by PandasData.load_data are only done when the backtest begins and won't happen for symbols that aren't preloaded into pandas_data. Is that correct? It seems like that means symbols that aren't known before the backtest begins can't be used, or if they are then they won't get whatever those adjustments are that load_data does.

I've only tested this change on the existing lumibot tests and my backtest which is just stocks and uses Strategy.get_historical_prices() and get_position().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to do no harm I've made the default value of PolygonDataBacktesting.MAX_STORAGE_BYTES be None. I'm concerned that I don't understand what's happening with the PolygonDataBacktesting use of pandas_data, _data_store, and load_data. This change shouldn't have any effect (apart from the rename of update_pandas_data to _update_pandas_data which I expect isn't used by apps) unless the user enables it by assigning a suitable value to MAX_STORAGE_BYTES.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, nevermind. I had it backwards.

The overrides PolygonBacktesting._pull_source_symbol_bars and PolygonBacktesting.get_historical_prices_between_dates call update_pandas_data and don't update _data_store. So they rely on calling PandasData.load_data by _Strategy.init to do some fixing up of dates or something related to options ("expiries"):

self.broker.data_source.load_data()

def load_data(self):

It seems as though those adjustments done by PandasData.load_data are only done when the backtest begins and won't happen for symbols that aren't preloaded into pandas_data. Is that correct? It seems like that means symbols that aren't known before the backtest begins can't be used, or if they are then they won't get whatever those adjustments are that load_data does.

I've only tested this change on the existing lumibot tests and my backtest which is just stocks and uses Strategy.get_historical_prices() and get_position().

Yes that sounds correct, if my memory serves right, pandas data is only used on the initial load whereas data store is used on an ongoing basis. I sent you a link to my calendar for us to speak (https://calendly.com/lumi-rob/30min). This seems sufficiently complicated to justify a zoom call

pandas_data_update = self._set_pandas_data_keys([data])

# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
if PolygonDataBacktesting.MAX_STORAGE_BYTES:
self._enforce_storage_limit(self.pandas_data)
if update_data_store:
# TODO: Why do we have both self.pandas_data and self._data_store?
self._data_store.update(pandas_data_update)
if PolygonDataBacktesting.MAX_STORAGE_BYTES:
self._enforce_storage_limit(self._data_store)

def _pull_source_symbol_bars(
self,
Expand All @@ -202,11 +223,7 @@ def _pull_source_symbol_bars(
start_dt, ts_unit = self.get_start_datetime_and_ts_unit(length, timestep, current_dt, start_buffer=START_BUFFER)

# Get data from Polygon
pandas_data_update = self.update_pandas_data(asset, quote, length, timestep, start_dt)

if pandas_data_update is not None:
# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
self._update_pandas_data(asset, quote, length, timestep, start_dt)

return super()._pull_source_symbol_bars(
asset, length, timestep, timeshift, quote, exchange, include_after_hours
Expand All @@ -223,10 +240,7 @@ def get_historical_prices_between_dates(
start_date=None,
end_date=None,
):
pandas_data_update = self.update_pandas_data(asset, quote, 1, timestep)
if pandas_data_update is not None:
# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
self._update_pandas_data(asset, quote, 1, timestep)

response = super()._pull_source_symbol_bars_between_dates(
asset, timestep, quote, exchange, include_after_hours, start_date, end_date
Expand All @@ -241,11 +255,7 @@ def get_historical_prices_between_dates(
def get_last_price(self, asset, timestep="minute", quote=None, exchange=None, **kwargs):
try:
dt = self.get_datetime()
pandas_data_update = self.update_pandas_data(asset, quote, 1, timestep, dt)
if pandas_data_update is not None:
# Add the keys to the self.pandas_data dictionary
self.pandas_data.update(pandas_data_update)
self._data_store.update(pandas_data_update)
self._update_pandas_data(asset, quote, 1, timestep, dt, update_data_store=True)
except Exception as e:
print(f"Error get_last_price from Polygon: {e}")

Expand Down
9 changes: 5 additions & 4 deletions lumibot/data_sources/pandas_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from collections import defaultdict
from collections import defaultdict, OrderedDict
from datetime import date, timedelta

import pandas as pd
Expand All @@ -24,15 +24,16 @@ def __init__(self, *args, pandas_data=None, auto_adjust=True, **kwargs):
self.name = "pandas"
self.pandas_data = self._set_pandas_data_keys(pandas_data)
self.auto_adjust = auto_adjust
self._data_store = {}
self._data_store = OrderedDict()
self._date_index = None
self._date_supply = None
self._timestep = "minute"
self._expiries_exist = False

@staticmethod
def _set_pandas_data_keys(pandas_data):
new_pandas_data = {}
# OrderedDict tracks the LRU dataframes for when it comes time to do evictions.
new_pandas_data = OrderedDict()

def _get_new_pandas_data_key(data):
# Always save the asset as a tuple of Asset and quote
Expand Down Expand Up @@ -61,7 +62,7 @@ def _get_new_pandas_data_key(data):
new_pandas_data[key] = data

return new_pandas_data

def load_data(self):
self._data_store = self.pandas_data
self._expiries_exist = (
Expand Down
5 changes: 5 additions & 0 deletions lumibot/entities/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,11 @@ def trim_data(self, df, date_start, date_end, trading_hours_start, trading_hours
)
return df

# ./lumibot/build/__editable__.lumibot-3.1.14-py3-none-any/lumibot/entities/data.py:280:
# FutureWarning: Downcasting object dtype arrays on .fillna, .ffill, .bfill is deprecated and will change in a future version.
# Call result.infer_objects(copy=False) instead.
# To opt-in to the future behavior, set `pd.set_option('future.no_silent_downcasting', True)`
Comment on lines +271 to +274
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

category Code Design Improvements priority 7

The warning comment about the deprecation of downcasting object dtype arrays on .fillna, .ffill, .bfill is important and should be addressed. The code should be updated to use the recommended method to avoid future issues when the behavior changes in a future version of pandas.


def repair_times_and_fill(self, idx):
# Trim the global index so that it is within the local data.
idx = idx[(idx >= self.datetime_start) & (idx <= self.datetime_end)]
Expand Down
5 changes: 4 additions & 1 deletion lumibot/tools/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,10 @@ def to_datetime_aware(dt_in):
"""Convert naive time to datetime aware on default timezone."""
if not dt_in:
return dt_in
elif isinstance(dt_in, dt.datetime) and (dt_in.tzinfo is None or dt_in.tzinfo.utcoffset(dt_in) is None):
elif isinstance(dt_in, dt.datetime) and (dt_in.tzinfo is None):
return LUMIBOT_DEFAULT_PYTZ.localize(dt_in)
elif isinstance(dt_in, dt.datetime) and (dt_in.tzinfo.utcoffset(dt_in) is None):
# TODO: This will fail because an exception is thrown if tzinfo is not None.
return LUMIBOT_DEFAULT_PYTZ.localize(dt_in)
Comment on lines +147 to 149
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

category Bug Risk priority 7

The code attempts to localize a datetime object that already has a timezone but no UTC offset. However, the comment indicates that this will fail because an exception is thrown if the datetime object's tzinfo attribute is not None. This could lead to unexpected behavior or errors. It would be better to handle this case properly, either by converting the datetime object to a naive datetime before localizing it, or by handling the exception in some way.

else:
return dt_in
Expand Down
2 changes: 1 addition & 1 deletion lumibot/tools/polygon_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_price_data_from_polygon(
cached in the LUMIBOT_CACHE_FOLDER/polygon folder so that it can be reused later and we don't have to query
Polygon.io every time we run a backtest.

If the Polygon respone has missing bars for a date, the missing bars will be added as empty (all NaN) rows
If the Polygon response has missing bars for a date, the missing bars will be added as empty (all NaN) rows
to the cache file to avoid querying Polygon for the same missing bars in the future. Note that means if
a request is for a future time then we won't make a request to Polygon for it later when that data might
be available. That should result in an error rather than missing data from Polygon, but just in case a
Expand Down
Loading