Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LRU eviction with 1gb memory limit for PandasData #392

Merged
merged 15 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions lumibot/backtesting/backtesting_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,44 @@ def _flatten_order(self, order):

return orders

def _process_filled_order(self, order, price, quantity):
"""
BackTesting needs to create/update positions when orders are filled becuase there is no broker to do it
"""
existing_position = self.get_tracked_position(order.strategy, order.asset)
position = super()._process_filled_order(order, price, quantity)
if existing_position:
position.add_order(order, quantity) # Add will update quantity, but not double count the order
if position.quantity == 0:
logging.info("Position %r liquidated" % position)
self._filled_positions.remove(position)
else:
self._filled_positions.append(position) # New position, add it to the tracker
return position

def _process_partially_filled_order(self, order, price, quantity):
"""
BackTesting needs to create/update positions when orders are partially filled becuase there is no broker
to do it
"""
existing_position = self.get_tracked_position(order.strategy, order.asset)
stored_order, position = super()._process_partially_filled_order(order, price, quantity)
if existing_position:
position.add_order(stored_order, quantity) # Add will update quantity, but not double count the order
return stored_order, position

def _process_cash_settlement(self, order, price, quantity):
"""
BackTesting needs to create/update positions when orders are filled becuase there is no broker to do it
"""
existing_position = self.get_tracked_position(order.strategy, order.asset)
super()._process_cash_settlement(order, price, quantity)
if existing_position:
existing_position.add_order(order, quantity) # Add will update quantity, but not double count the order
if existing_position.quantity == 0:
logging.info("Position %r liquidated" % existing_position)
self._filled_positions.remove(existing_position)

def submit_order(self, order):
"""Submit an order for an asset"""
order.update_raw(order)
Expand Down
67 changes: 51 additions & 16 deletions lumibot/brokers/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def __init__(self, name="", connect_stream=True, data_source: DataSource = None,
self._config = config
self.data_source = data_source
self.max_workers = min(max_workers, 200)
self.quote_assets = set() # Quote positions will never be removed from tracking during sync operations

if self.data_source is None:
raise ValueError("Broker must have a data source")
Expand Down Expand Up @@ -182,6 +183,49 @@ def _pull_broker_all_orders(self):
"""
pass

def sync_positions(self, strategy):
"""
Sync the broker positions with the lumibot positions. Remove any lumibot positions that are not at the broker.
"""
positions_broker = self._pull_positions(strategy)
for position in positions_broker:
# Check against existing position.
position_lumi = [
pos_lumi
for pos_lumi in self._filled_positions.get_list()
if pos_lumi.asset == position.asset
]
position_lumi = position_lumi[0] if len(position_lumi) > 0 else None

if position_lumi:
# Compare to existing lumi position.
if position_lumi.quantity != position.quantity:
position_lumi.quantity = position.quantity

# No current brokers have anyway to distinguish between strategies for an open position.
# Therefore, we will just update the strategy to the current strategy.
# This is added here because with initial polling, no strategy is set for the positions so we
# can create ones that have no strategy attached. This will ensure that all stored positions have a
# strategy with subsequent updates.
if strategy:
position_lumi.strategy = strategy.name if not isinstance(strategy, str) else strategy
else:
# Add to positions in lumibot, position does not exist
# in lumibot.
if position.quantity != 0:
self._filled_positions.append(position)

# Now iterate through lumibot positions.
# Remove lumibot position if not at the broker.
for position in self._filled_positions.get_list():
found = False
for position_broker in positions_broker:
if position_broker.asset == position.asset:
found = True
break
if not found and (position.asset not in self.quote_assets):
self._filled_positions.remove(position)

# =========Market functions=======================

def get_last_price(self, asset: Asset, quote=None, exchange=None) -> float:
Expand Down Expand Up @@ -464,18 +508,16 @@ def _process_partially_filled_order(self, order, price, quantity):
order.add_transaction(price, quantity)
order.status = self.PARTIALLY_FILLED_ORDER
order.set_partially_filled()
if order not in self._partially_filled_orders:
self._partially_filled_orders.append(order)

position = self.get_tracked_position(order.strategy, order.asset)
if position is None:
# Create new position for this given strategy and asset
position = order.to_position(quantity)
self._filled_positions.append(position)
else:
# Add the order to the already existing position
position.add_order(order, quantity)

if order not in self._partially_filled_orders:
self._partially_filled_orders.append(order)
position.add_order(order)

if order.asset.asset_type == "crypto":
self._process_crypto_quote(order, quantity, price)
Expand Down Expand Up @@ -503,13 +545,9 @@ def _process_filled_order(self, order, price, quantity):
if position is None:
# Create new position for this given strategy and asset
position = order.to_position(quantity)
self._filled_positions.append(position)
else:
# Add the order to the already existing position
position.add_order(order, quantity)
if position.quantity == 0:
logging.info("Position %r liquidated" % position)
self._filled_positions.remove(position)
position.add_order(order) # Don't update quantity here, it's handled by querying broker

if order.asset.asset_type == "crypto":
self._process_crypto_quote(order, quantity, price)
Expand All @@ -536,10 +574,7 @@ def _process_cash_settlement(self, order, price, quantity):
position = self.get_tracked_position(order.strategy, order.asset)
if position is not None:
# Add the order to the already existing position
position.add_order(order, quantity)
if position.quantity == 0:
logging.info("Position %r liquidated" % position)
self._filled_positions.remove(position)
position.add_order(order) # Don't update quantity here, it's handled by querying broker

def _process_crypto_quote(self, order, quantity, price):
"""Used to process the quote side of a crypto trade."""
Expand Down Expand Up @@ -694,7 +729,7 @@ def get_tracked_position(self, strategy, asset):
"""get a tracked position given an asset and
a strategy"""
for position in self._filled_positions:
if position.asset == asset and position.strategy == strategy:
if position.asset == asset and (not strategy or position.strategy == strategy):
return position
return None

Expand Down Expand Up @@ -1006,7 +1041,7 @@ def _process_trade_event(self, stored_order, type_event, price=None, filled_quan
position = self._process_filled_order(stored_order, price, filled_quantity)
self._on_filled_order(position, stored_order, price, filled_quantity, multiplier)
elif type_event == self.CASH_SETTLED:
position = self._process_cash_settlement(stored_order, price, filled_quantity)
self._process_cash_settlement(stored_order, price, filled_quantity)
stored_order.type = self.CASH_SETTLED
else:
logging.info(f"Unhandled type event {type_event} for {stored_order}")
Expand Down
22 changes: 14 additions & 8 deletions lumibot/brokers/tradier.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _pull_positions(self, strategy):

# Create the position
position = Position(
strategy=strategy.name,
strategy=strategy.name if strategy else "Unknown",
asset=asset,
quantity=quantity,
)
Expand Down Expand Up @@ -368,18 +368,22 @@ def do_polling(self):
This function is called every time the broker polls for new orders. It checks for new orders and
dispatches them to the stream for processing.
"""
# Get current orders from the broker and dispatch them to the stream for processing. Need to see all
# Pull the current Tradier positions and sync them with Lumibot's positions
self.sync_positions(None)

# Get current orders from Tradier and dispatch them to the stream for processing. Need to see all
# lumi orders (not just active "tracked" ones) to catch any orders that might have changed final
# status in Tradier.
df_orders = self.tradier.orders.get_orders()
# df_orders = self.tradier.orders.get_orders()
raw_orders = self._pull_broker_all_orders()
stored_orders = {x.identifier: x for x in self.get_all_orders()}
for order_row in df_orders.to_dict("records"):
for order_row in raw_orders:
order = self._parse_broker_order(order_row, strategy_name=order_row.get("tag"))

# First time seeing this order, something weird has happened, dispatch it as a new order
if order.identifier not in stored_orders:
logging.info(
f"Poll Update: Tradier has order {order}, but Lumibot doesn't know about it. "
f"Poll Update: {self.name} has order {order}, but Lumibot doesn't know about it. "
f"Adding it as a new order."
)
# If the Tradier status is not "open", the next polling cycle will catch it and dispatch it as needed.
Expand Down Expand Up @@ -410,7 +414,7 @@ def do_polling(self):
case "canceled":
self.stream.dispatch(self.CANCELED_ORDER, order=stored_order)
case "error":
default_msg = f"Tradier encountered an error with order {order.identifier} | {order}"
default_msg = f"{self.name} encountered an error with order {order.identifier} | {order}"
msg = order_row["reason_description"] if "reason_description" in order_row else default_msg
self.stream.dispatch(self.ERROR_ORDER, order=stored_order, error_msg=msg)
case "cash_settled":
Expand All @@ -428,10 +432,12 @@ def do_polling(self):
# See if there are any tracked (aka active) orders that are no longer in the broker's list,
# dispatch them as cancelled
tracked_orders = {x.identifier: x for x in self.get_tracked_orders()}
broker_ids = [o["id"] for o in raw_orders]
for order_id, order in tracked_orders.items():
if order_id not in df_orders["id"].values:
if order_id not in broker_ids:
logging.info(
f"Poll Update: Tradier no longer has order {order}, but Lumibot does. " f"Dispatching as cancelled."
f"Poll Update: {self.name} no longer has order {order}, but Lumibot does. "
f"Dispatching as cancelled."
)
self.stream.dispatch(self.CANCELED_ORDER, order=order)

Expand Down
5 changes: 2 additions & 3 deletions lumibot/entities/position.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ def __init__(self, strategy, asset, quantity, orders=None, hold=0, available=0):
self.orders = orders

def __repr__(self):
repr = "%f shares of %s" % (self.quantity, self.asset)
return repr
return f"{self.strategy} Position: {self.quantity} shares of {self.asset} ({len(self.orders)} orders)"

@property
def quantity(self):
Expand Down Expand Up @@ -151,7 +150,7 @@ def get_selling_order(self, quote_asset=None):
)
return order

def add_order(self, order: entities.Order, quantity: Decimal):
def add_order(self, order: entities.Order, quantity: Decimal = Decimal(0)):
increment = quantity if order.side == "buy" else -quantity
self._quantity += Decimal(increment)
if order not in self.orders:
Expand Down
1 change: 1 addition & 0 deletions lumibot/strategies/_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def __init__(
self.strategy_id = strategy_id

self._quote_asset = quote_asset
self.broker.quote_assets.add(self._quote_asset)

# Setting the broker object
self._is_backtesting = self.broker.IS_BACKTESTING_BROKER
Expand Down
1 change: 1 addition & 0 deletions lumibot/strategies/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def quote_asset(self):
@quote_asset.setter
def quote_asset(self, value):
self._quote_asset = value
self.broker.quote_assets.add(value)

@property
def last_on_trading_iteration_datetime(self):
Expand Down
46 changes: 12 additions & 34 deletions lumibot/strategies/strategy_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from apscheduler.triggers.cron import CronTrigger
from termcolor import colored

from lumibot.entities import Asset
from lumibot.tools import append_locals, get_trading_days, staticdecorator


Expand Down Expand Up @@ -119,9 +120,6 @@ def sync_broker(self):
if cash_broker is not None:
self.strategy._set_cash_position(cash_broker)

positions_broker = self.broker._pull_positions(self.strategy)
orders_broker = self.broker._pull_all_orders(self.name, self.strategy)

held_trades_len = len(self.broker._held_trades)
if held_trades_len > 0:
self.broker._hold_trade_events = False
Expand All @@ -132,37 +130,10 @@ def sync_broker(self):
# Update Lumibot positions to match broker positions.
# Any new trade notifications will not affect the sync as they
# are being held pending the completion of the sync.
for position in positions_broker:
# Check against existing position.
position_lumi = [
pos_lumi
for pos_lumi in self.broker._filled_positions.get_list()
if pos_lumi.asset == position.asset
]
position_lumi = position_lumi[0] if len(position_lumi) > 0 else None

if position_lumi:
# Compare to existing lumi position.
if position_lumi.quantity != position.quantity:
position_lumi.quantity = position.quantity
else:
# Add to positions in lumibot, position does not exist
# in lumibot.
if position.quantity != 0:
self.broker._filled_positions.append(position)

# Now iterate through lumibot positions.
# Remove lumibot position if not at the broker.
for position in self.broker._filled_positions.get_list():
found = False
for position_broker in positions_broker:
if position_broker.asset == position.asset:
found = True
break
if not found and position.asset != self.strategy.quote_asset:
self.broker._filled_positions.remove(position)
self.broker.sync_positions(self.strategy)

# ORDERS
orders_broker = self.broker._pull_all_orders(self.name, self.strategy)
if len(orders_broker) > 0:
orders_lumi = self.broker.get_all_orders()

Expand Down Expand Up @@ -450,8 +421,15 @@ def _on_filled_order(self, position, order, price, quantity, multiplier):
# Get the portfolio value
portfolio_value = self.strategy.get_portfolio_value()

# Calculate the percent of the portfolio that this order represents
percent_of_portfolio = (price * float(quantity)) / portfolio_value
# Calculate the value of the position
order_value = price * float(quantity)

# If option, multiply % of portfolio by 100
if order.asset.asset_type == Asset.AssetType.OPTION:
order_value = order_value * 100

# Calculate the percent of the portfolio that this position represents
percent_of_portfolio = order_value / portfolio_value

# Capitalize the side
side = order.side.capitalize()
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ scipy==1.10.1 # Newer versions of scipy are currently causing issues
ipython # required for quantstats, but not in their dependency list for some reason
quantstats-lumi
python-dotenv # Secret Storage
ccxt
ccxt==4.2.22
termcolor
jsonpickle
apscheduler
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

setuptools.setup(
name="lumibot",
version="3.1.14",
version="3.2.0",
author="Robert Grzesik",
author_email="rob@lumiwealth.com",
description="Backtesting and Trading Library, Made by Lumiwealth",
Expand Down Expand Up @@ -37,9 +37,9 @@
"pytest",
"scipy==1.10.1", # Newer versions of scipy are currently causing issues
"ipython", # required for quantstats, but not in their dependency list for some reason
"quantstats-lumi>=0.1.9",
"quantstats-lumi>=0.1.10",
"python-dotenv", # Secret Storage
"ccxt==4.2.22",
"ccxt==4.2.50",
"termcolor",
"jsonpickle",
"apscheduler==3.10.4",
Expand Down
Loading