Skip to content

Commit

Permalink
Merge pull request #238 from splitgraph/remote-table-seafowl-workspace
Browse files Browse the repository at this point in the history
Factour out remote tables into a separate crate
  • Loading branch information
gruuya authored Dec 9, 2022
2 parents c381d9b + f3222d2 commit 5d9fbfd
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
# TODO split tests into unit and integration (one requires postgres?)
- name: Run tests
run: |
cargo test
cargo test --workspace
env:
# database URL for end-to-end + postgres repository tests
DATABASE_URL: "postgres://postgres:postgres@postgres:5432/db_test"
Expand Down
19 changes: 18 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[workspace]

[package]
name = "seafowl"
build = "build.rs"
Expand Down Expand Up @@ -40,9 +42,6 @@ chrono = { version = "0.4", default_features = false }
clap = { version = "3.2.19", features = [ "derive" ] }
config = "0.13.1"

# Remote query execution for a variety of DBs
connectorx = { git = "https://github.com/sfu-db/connector-x", rev = "9714c54dc7f6fb7ac3dd6f944e0a43913bd1c2d9", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

# PG wire protocol support
convergence = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-14-upgrade", optional = true }
convergence-arrow = { git = "https://github.com/splitgraph/convergence", branch = "datafusion-14-upgrade", package = "convergence-arrow", optional = true }
Expand All @@ -51,6 +50,8 @@ datafusion = "14.0.0"
datafusion-expr = "14.0.0"
datafusion-proto = "14.0.0"

datafusion-remote-tables = { path = "./datafusion_remote_tables" }

futures = "0.3"
hex = ">=0.4.0"
itertools = ">=0.10.0"
Expand Down Expand Up @@ -94,6 +95,7 @@ arrow = { git = "https://github.com/apache/arrow-rs", rev = "951caed784876d15a9e
arrow-buffer = { git = "https://github.com/apache/arrow-rs", rev = "951caed784876d15a9e712a5981de31cee4e3085" }
arrow-integration-test = { git = "https://github.com/apache/arrow-rs", rev = "951caed784876d15a9e712a5981de31cee4e3085" }
arrow-schema = { git = "https://github.com/apache/arrow-rs", rev = "951caed784876d15a9e712a5981de31cee4e3085" }
connectorx = { git = "https://github.com/sfu-db/connector-x", rev = "9714c54dc7f6fb7ac3dd6f944e0a43913bd1c2d9", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

[dev-dependencies]
assert_unordered = "0.3"
Expand Down
31 changes: 31 additions & 0 deletions datafusion_remote_tables/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[package]
name = "datafusion-remote-tables"
version = "0.1.0"
edition = "2021"
readme = "README.md"
description = "TableProvider implementation able to query various remote data sources"
authors = ["Splitgraph, Inc. <support@splitgraph.com>"]
keywords = ["datafusion", "remote", "tables"]
homepage = "https://github.com/splitgraph/seafowl"
repository = "https://github.com/splitgraph/seafowl"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = "26.0.0"
arrow-buffer = "26.0.0"
arrow-schema = "26.0.0"
async-trait = "0.1.41"

# Remote query execution for a variety of DBs
connectorx = { version = "0.3.1", features = [ "dst_arrow", "src_postgres", "src_mysql", "src_sqlite" ] }

datafusion = "14.0.0"
datafusion-expr = "14.0.0"
itertools = ">=0.10.0"
log = "0.4"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "signal", "process"] }

[dev-dependencies]
rstest = "*"
71 changes: 71 additions & 0 deletions datafusion_remote_tables/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
## DataFusion remote table querying mechanism

This crate provides support for creating DataFusion tables that proxy all the queries to an external
table stored in a remote data source, in analogy with Postgres FDWs.

It does so by implementing the TableProvider trait, whereby a SQL query corresponding to the remote
data source is re-constructed from the provided primitives (projections, filters, etc.), This query
is then executed on the remote data source using the
[connector-x](https://docs.rs/connectorx/latest/connectorx/) library.

## Usage

To create a remote table one needs to provide the exact name of the table on the remote data source
(escaping the identifier in the remote-compatible fashion if needed), and the corresponding
connection string.

In addition, an optional schema reference can be supplied; if the schema is empty the schema will be
inferred upon instantiation, otherwise the necessary casting of any column whose type doesn't match
the specified one will be performed on each query.

```rust
use datafusion::arrow::datatypes::{Schema, SchemaRef};
use datafusion_remote_tables::provider::RemoteTable;

let origin_table_name = "\"splitgraph-demo/seafowl:latest\".issue_reactions";
let conn_str = "postgresql://$SPLITGRAPH_API_KEY:$SPLITGRAPH_API_SECRET@data.splitgraph.com:5432/ddn";
let remote_table = RemoteTable::new(
origin_table_name.to_string(),
conn_str.to_string(),
SchemaRef::from(Schema::empty()),
)
.await?;
```

Next, you need to register the table in the DataFusion (either in a `SchemaProvider` or
`SessionContext`) using a table name that will be used for querying, for instance:

```rust
...
schema_provider.register_table(
"seafowl_issue_reactions".to_string(),
Arc::new(remote_table),
);
...
```

Finally, you can query the table:

```sql
SELECT content FROM my_schema.seafowl_issue_reactions
WHERE issue_number = 122 OR created_at >= '2022-10-26 19:18:10' LIMIT 2;
+---------+
| content |
|---------|
| hooray |
| heart |
+---------+
```

## Filter pushdown

Besides projection pushdown (i.e. scoping down the required columns for each particular query), the
remote table mechanism also supports filter pushdown. This pushdown mechanism aims to convert the
`WHERE` (and `LIMIT` if any) clause to the corresponding remote SQL string. It does so by walking
the filter expression AST, trying to determine whether each node in the tree is shippable and if so
convert it to a SQL form native to the remote data source.

This can provide significant performance increase, as it can drastically reduce the total amount of
data that has to be transferred over the network. When a filter is not shippable, it is kept in the
DataFusion plan so that the appropriate filtration can be performed locally after fetching the
result rows.
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ mod tests {
use datafusion::scalar::ScalarValue;
use rstest::rstest;

use crate::remote_tables::pushdown_visitor::{
use crate::filter_pushdown::{
filter_expr_to_sql, MySQLFilterPushdown, PostgresFilterPushdown,
SQLiteFilterPushdown,
};
Expand Down
2 changes: 2 additions & 0 deletions datafusion_remote_tables/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod filter_pushdown;
pub mod provider;
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::remote_tables::pushdown_visitor::{
use crate::filter_pushdown::{
filter_expr_to_sql, quote_identifier_backticks, quote_identifier_double_quotes,
MySQLFilterPushdown, PostgresFilterPushdown, SQLiteFilterPushdown,
};
Expand Down
2 changes: 1 addition & 1 deletion src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ use datafusion::datasource::listing::ListingOptions;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::SessionState;
use datafusion::execution::DiskManager;
use datafusion_remote_tables::provider::RemoteTable;

use datafusion_proto::protobuf;

use crate::datafusion::parser::{DFParser, Statement as DFStatement};
use crate::datafusion::utils::{build_schema, normalize_ident};
use crate::object_store::http::try_prepare_http_url;
use crate::object_store::wrapped::InternalObjectStore;
use crate::remote_tables::provider::RemoteTable;
use crate::utils::{gc_partitions, group_partitions, hash_file};
use crate::wasm_udf::wasm::create_udf_from_wasm;
use futures::{StreamExt, TryStreamExt};
Expand Down
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod frontend;
pub mod nodes;
pub mod object_store;
pub mod provider;
pub mod remote_tables;
pub mod repository;
pub mod schema;
pub mod system_tables;
Expand Down
2 changes: 0 additions & 2 deletions src/remote_tables/mod.rs

This file was deleted.

0 comments on commit 5d9fbfd

Please sign in to comment.