Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jul 26, 2024
1 parent 2ae47fc commit e343fd0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 13 deletions.
30 changes: 25 additions & 5 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,28 @@ fn url_and_creds_to_key(url: &Url, options: Option<&CloudOptions>) -> String {
)
}

/// Construct an object_store `Path` from a string without any encoding/decoding.
pub fn object_path_from_str(path: &str) -> PolarsResult<object_store::path::Path> {
/// Construct an object_store `Path` from a string:
/// * Local paths have leading slashes removed - i.e. `/data/1.csv` -> `data/1.csv`
/// * Cloud paths have `fs://` removed - i.e. `s3://data/1.csv` -> `data/1.csv`
/// * HTTP paths return an empty `Path` - i.e. `https://pola.rs/1.csv` -> ``
/// * This is because for HTTP, the path is bound to the object store.
pub fn new_object_path(path: &str) -> PolarsResult<object_store::path::Path> {
let path = if let Some(i) = path.find("://") {
// This is hit because the user requests `glob=False`, the raw path is
// given and we need to strip the leading `://`.
if path.starts_with("http://") || path.starts_with("https://") {
""
} else {
&path[i + 3..]
}
} else if path.starts_with('/') {
// `glob=False` and `FORCE_ASYNC`.
&path[1..]
} else {
// `glob=True`, the caller context gave us a parsed CloudLocation prefix.
path
};

object_store::path::Path::parse(path).map_err(to_compute_err)
}

Expand Down Expand Up @@ -132,11 +152,11 @@ pub async fn build_object_store(

mod test {
#[test]
fn test_object_path_from_str() {
use super::object_path_from_str;
fn test_new_object_path() {
use super::new_object_path;

let path = "%25";
let out = object_path_from_str(path).unwrap();
let out = new_object_path(path).unwrap();

assert_eq!(out.as_ref(), path);
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/file_cache/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_error::{PolarsError, PolarsResult};
use super::cache::{get_env_file_cache_ttl, FILE_CACHE};
use super::entry::FileCacheEntry;
use super::file_fetcher::{CloudFileFetcher, LocalFileFetcher};
use crate::cloud::{build_object_store, object_path_from_str, CloudOptions, PolarsObjectStore};
use crate::cloud::{build_object_store, new_object_path, CloudOptions, PolarsObjectStore};
use crate::path_utils::{ensure_directory_init, is_cloud_url, POLARS_TEMP_DIR_BASE_PATH};
use crate::pl_async;

Expand Down Expand Up @@ -88,7 +88,7 @@ pub fn init_entries_from_uri_list(
FILE_CACHE.init_entry(
uri.clone(),
|| {
let cloud_path = object_path_from_str(uri)?;
let cloud_path = new_object_path(uri)?;

let object_store =
object_stores[std::cmp::min(i, object_stores.len())].clone();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/ipc/ipc_reader_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_error::{polars_bail, polars_err, to_compute_err, PolarsResult};

use crate::cloud::{build_object_store, object_path_from_str, CloudOptions, PolarsObjectStore};
use crate::cloud::{build_object_store, new_object_path, CloudOptions, PolarsObjectStore};
use crate::file_cache::{init_entries_from_uri_list, FileCacheEntry};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::{materialize_projection, IpcReader};
Expand Down Expand Up @@ -67,7 +67,7 @@ impl IpcReaderAsync {
let cache_entry = init_entries_from_uri_list(&[Arc::from(uri)], cloud_options)?[0].clone();
let (_, store) = build_object_store(uri, cloud_options).await?;

let path = object_path_from_str(uri)?;
let path = new_object_path(uri)?;

Ok(Self {
store: PolarsObjectStore::new(store),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tokio::sync::Mutex;
use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::compute_row_group_range;
use crate::cloud::{build_object_store, object_path_from_str, CloudOptions, PolarsObjectStore};
use crate::cloud::{build_object_store, new_object_path, CloudOptions, PolarsObjectStore};
use crate::parquet::metadata::FileMetaDataRef;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;
Expand All @@ -39,7 +39,7 @@ impl ParquetObjectStore {
) -> PolarsResult<Self> {
let (_, store) = build_object_store(uri, options).await?;

let path = object_path_from_str(uri)?;
let path = new_object_path(uri)?;

Ok(ParquetObjectStore {
store: PolarsObjectStore::new(store),
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/path_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ pub fn expand_paths_hive(
{
use polars_utils::_limit_path_len_io_err;

use crate::cloud::object_path_from_str;
use crate::cloud::new_object_path;

if first_path.starts_with("hf://") {
let (expand_start_idx, paths) =
Expand All @@ -174,7 +174,7 @@ pub fn expand_paths_hive(
let (cloud_location, store) =
crate::cloud::build_object_store(path, cloud_options).await?;

let prefix = object_path_from_str(if glob {
let prefix = new_object_path(if glob {
&cloud_location.prefix
} else {
// No-glob requested, we need to keep the glob chars
Expand Down

0 comments on commit e343fd0

Please sign in to comment.