Skip to content

Commit

Permalink
Add Databento continuous symbology support
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Mar 21, 2024
1 parent 387cbdb commit 5e8ba20
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 45 deletions.
3 changes: 2 additions & 1 deletion RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
Released on TBD (UTC).

### Enhancements
- Added Databento adapter continuous symbology support (will infer from symbols)
- Added `DatabaseConfig.timeout` config option for timeout seconds to wait for a new connection
- Added CSV tick and bar data loaders params, thanks @rterbush
- Added CSV tick and bar data loader params, thanks @rterbush
- Implemented `LogGuard` to ensure global logger is flushed on termination, thanks @ayush-sb and @twitu
- Improved Binance execution client ping listen key error handling and logging
- Improved Redis cache adapter and message bus error handling and logging
Expand Down
6 changes: 3 additions & 3 deletions examples/live/databento/databento_subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
# For correct subscription operation, you must specify all instruments to be immediately
# subscribed for as part of the data client configuration
instrument_ids = [
InstrumentId.from_str("ESM4.GLBX"),
# InstrumentId.from_str("ESU4.GLBX"),
# InstrumentId.from_str("ESM4.GLBX"),
InstrumentId.from_str("ES.c.0.GLBX"),
# InstrumentId.from_str("AAPL.XNAS"),
]

Expand Down Expand Up @@ -153,7 +153,7 @@ def on_start(self) -> None:
# )

self.subscribe_quote_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
self.subscribe_trade_ticks(instrument_id, client_id=DATABENTO_CLIENT_ID)
# self.request_quote_ticks(instrument_id)
# self.request_trade_ticks(instrument_id)

Expand Down
93 changes: 93 additions & 0 deletions nautilus_core/adapters/src/databento/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,96 @@ pub fn get_date_time_range(start: UnixNanos, end: UnixNanos) -> anyhow::Result<D
OffsetDateTime::from_unix_timestamp_nanos(i128::from(end))?,
)))
}

pub fn infer_symbology_type(symbol: &str) -> String {
if symbol.ends_with(".FUT") || symbol.ends_with(".OPT") {
return "parent".to_string();
}

let parts: Vec<&str> = symbol.split('.').collect();
if parts.len() == 3 && parts[2].chars().all(|c| c.is_ascii_digit()) {
return "continuous".to_string();
}

"raw_symbol".to_string()
}

pub fn check_consistent_symbology(symbols: &[&str]) -> anyhow::Result<()> {
if symbols.is_empty() {
return Err(anyhow::anyhow!("Symbols was empty"));
};

// SAFETY: We checked len so know there must be at least one symbol
let first_symbol = symbols.first().unwrap();
let first_stype = infer_symbology_type(first_symbol);

for symbol in symbols {
let next_stype = infer_symbology_type(symbol);
if next_stype != first_stype {
return Err(anyhow::anyhow!(
"Inconsistent symbology types: '{}' for {} vs '{}' for {}",
first_stype,
first_symbol,
next_stype,
symbol
));
}
}

Ok(())
}

////////////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod tests {
use rstest::*;

use super::*;

#[rstest]
#[case("AAPL", "raw_symbol")]
#[case("ESM4", "raw_symbol")]
#[case("BRN FMM0024!", "raw_symbol")]
#[case("BRN 99 5617289", "raw_symbol")]
#[case("SPY 240319P00511000", "raw_symbol")]
#[case("ES.FUT", "parent")]
#[case("ES.OPT", "parent")]
#[case("BRN.FUT", "parent")]
#[case("SPX.OPT", "parent")]
#[case("ES.c.0", "continuous")]
#[case("SPX.n.0", "continuous")]
fn test_infer_symbology_type(#[case] symbol: String, #[case] expected: String) {
let result = infer_symbology_type(&symbol);
assert_eq!(result, expected);
}

#[rstest]
fn test_check_consistent_symbology_when_empty_symbols() {
let symbols: Vec<&str> = vec![];
let result = check_consistent_symbology(&symbols);
assert!(result.is_err());
assert_eq!(result.err().unwrap().to_string(), "Symbols was empty");
}

#[rstest]
fn test_check_consistent_symbology_when_inconsistent() {
let symbols = vec!["ESM4", "ES.OPT"];
let result = check_consistent_symbology(&symbols);
assert!(result.is_err());
assert_eq!(
result.err().unwrap().to_string(),
"Inconsistent symbology types: 'raw_symbol' for ESM4 vs 'parent' for ES.OPT"
);
}

#[rstest]
#[case(vec!["AAPL,MSFT"])]
#[case(vec!["ES.OPT,ES.FUT"])]
#[case(vec!["ES.c.0,ES.c.1"])]
fn test_check_consistent_symbology_when_consistent(#[case] symbols: Vec<&str>) {
let result = check_consistent_symbology(&symbols);
assert!(result.is_ok());
}
}
39 changes: 30 additions & 9 deletions nautilus_core/adapters/src/databento/python/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
// limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{fs, num::NonZeroU64, sync::Arc};
use std::{fs, num::NonZeroU64, str::FromStr, sync::Arc};

use databento::{dbn, historical::timeseries::GetRangeParams};
use databento::{
dbn::{self, SType},
historical::timeseries::GetRangeParams,
};
use indexmap::IndexMap;
use nautilus_core::{
python::to_pyvalue_err,
Expand All @@ -36,7 +39,7 @@ use tokio::sync::Mutex;

use super::loader::convert_instrument_to_pyobject;
use crate::databento::{
common::get_date_time_range,
common::{check_consistent_symbology, get_date_time_range, infer_symbology_type},
decode::{
decode_imbalance_msg, decode_instrument_def_msg, decode_record, decode_statistics_msg,
raw_ptr_to_ustr,
Expand Down Expand Up @@ -110,19 +113,22 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns());
let time_range = get_date_time_range(start, end).map_err(to_pyvalue_err)?;
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Definition)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down Expand Up @@ -175,19 +181,22 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns());
let time_range = get_date_time_range(start, end).map_err(to_pyvalue_err)?;
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Mbp1)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down Expand Up @@ -239,19 +248,22 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns());
let time_range = get_date_time_range(start, end).map_err(to_pyvalue_err)?;
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Trades)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down Expand Up @@ -304,14 +316,16 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
aggregation: BarAggregation,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let schema = match aggregation {
BarAggregation::Second => dbn::Schema::Ohlcv1S,
BarAggregation::Minute => dbn::Schema::Ohlcv1M,
Expand All @@ -325,6 +339,7 @@ impl DatabentoHistoricalClient {
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(schema)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down Expand Up @@ -377,19 +392,22 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns());
let time_range = get_date_time_range(start, end).map_err(to_pyvalue_err)?;
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Imbalance)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down Expand Up @@ -431,19 +449,22 @@ impl DatabentoHistoricalClient {
&self,
py: Python<'py>,
dataset: String,
symbols: String,
symbols: Vec<&str>,
start: UnixNanos,
end: Option<UnixNanos>,
limit: Option<u64>,
) -> PyResult<&'py PyAny> {
let client = self.inner.clone();

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let end = end.unwrap_or(self.clock.get_time_ns());
let time_range = get_date_time_range(start, end).map_err(to_pyvalue_err)?;
let params = GetRangeParams::builder()
.dataset(dataset)
.date_time_range(time_range)
.symbols(symbols)
.stype_in(SType::from_str(&stype_in).map_err(to_pyvalue_err)?)
.schema(dbn::Schema::Statistics)
.limit(limit.and_then(NonZeroU64::new))
.build();
Expand Down
8 changes: 4 additions & 4 deletions nautilus_core/adapters/src/databento/python/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use tracing::{debug, error, trace};

use super::loader::convert_instrument_to_pyobject;
use crate::databento::{
common::{check_consistent_symbology, infer_symbology_type},
live::{DatabentoFeedHandler, LiveCommand, LiveMessage},
types::DatabentoPublisher,
};
Expand Down Expand Up @@ -150,12 +151,11 @@ impl DatabentoLiveClient {
fn py_subscribe(
&mut self,
schema: String,
symbols: String,
stype_in: Option<String>,
symbols: Vec<&str>,
start: Option<UnixNanos>,
) -> PyResult<()> {
let stype_in = stype_in.unwrap_or("raw_symbol".to_string());

let stype_in = infer_symbology_type(symbols.first().unwrap());
check_consistent_symbology(symbols.as_slice()).map_err(to_pyvalue_err)?;
let mut sub = Subscription::builder()
.symbols(symbols)
.schema(dbn::Schema::from_str(&schema).map_err(to_pyvalue_err)?)
Expand Down
Loading

0 comments on commit 5e8ba20

Please sign in to comment.