Skip to content

Commit

Permalink
fix: Use ETag for HTTP file cache invalidation (#17684)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Jul 17, 2024
1 parent 93dedd8 commit 66d0959
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 20 deletions.
6 changes: 1 addition & 5 deletions crates/polars-io/src/file_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ impl FileCache {
}
}

let uri_hash = blake3::hash(uri.as_bytes())
.to_hex()
.get(..32)
.unwrap()
.to_string();
let uri_hash = blake3::hash(uri.as_bytes()).to_hex()[..32].to_string();

{
let mut entries = self.entries.write().unwrap();
Expand Down
32 changes: 22 additions & 10 deletions crates/polars-io/src/file_cache/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use polars_utils::flatten;
use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
use super::file_fetcher::{FileFetcher, RemoteMetadata};
use super::file_lock::{FileLock, FileLockAnyGuard};
use super::metadata::EntryMetadata;
use super::metadata::{EntryMetadata, FileVersion};
use super::utils::update_last_accessed;

pub(super) const DATA_PREFIX: u8 = b'd';
Expand Down Expand Up @@ -43,8 +43,7 @@ pub struct FileCacheEntry(EntryData);

impl EntryMetadata {
fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
self.remote_last_modified == remote_metadata.last_modified
&& self.local_size == remote_metadata.size
self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
}
}

Expand Down Expand Up @@ -131,17 +130,17 @@ impl Inner {

if verbose {
eprintln!(
"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_last_modified = {}, remote_size = {}",
"[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
self.uri.clone(),
remote_metadata.last_modified,
remote_metadata.version,
remote_metadata.size
);
}

let data_file_path = &get_data_file_path(
self.path_prefix.to_str().unwrap().as_bytes(),
self.uri_hash.as_bytes(),
remote_metadata.last_modified,
&remote_metadata.version,
);
// Remove the file if it exists, since it doesn't match the metadata.
// This could be left from an aborted process.
Expand Down Expand Up @@ -186,7 +185,7 @@ impl Inner {
let metadata = Arc::make_mut(&mut metadata);
metadata.local_last_modified = local_last_modified;
metadata.local_size = local_size;
metadata.remote_last_modified = remote_metadata.last_modified;
metadata.remote_version = remote_metadata.version.clone();

if let Err(e) = metadata.compare_local_state(data_file_path) {
panic!("metadata mismatch after file fetch: {}", e);
Expand Down Expand Up @@ -259,7 +258,7 @@ impl Inner {
let data_file_path = get_data_file_path(
self.path_prefix.to_str().unwrap().as_bytes(),
self.uri_hash.as_bytes(),
metadata.remote_last_modified,
&metadata.remote_version,
);
self.cached_data = Some(CachedData {
last_modified,
Expand Down Expand Up @@ -365,13 +364,26 @@ fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F)
}

/// `[prefix]/d/[uri hash][last modified]`
fn get_data_file_path(path_prefix: &[u8], uri_hash: &[u8], remote_last_modified: u64) -> PathBuf {
fn get_data_file_path(
path_prefix: &[u8],
uri_hash: &[u8],
remote_version: &FileVersion,
) -> PathBuf {
let owned;
let path = flatten(
&[
path_prefix,
&[b'/', DATA_PREFIX, b'/'],
uri_hash,
format!("{:013x}", remote_last_modified).as_bytes(),
match remote_version {
FileVersion::Timestamp(v) => {
owned = Some(format!("{:013x}", v));
owned.as_deref().unwrap()
},
FileVersion::ETag(v) => v.as_str(),
FileVersion::Uninitialized => panic!("impl error: version not initialized"),
}
.as_bytes(),
],
None,
);
Expand Down
12 changes: 9 additions & 3 deletions crates/polars-io/src/file_cache/file_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use polars_error::{PolarsError, PolarsResult};

use super::metadata::FileVersion;
use super::utils::last_modified_u64;
use crate::cloud::PolarsObjectStore;
use crate::pl_async;
Expand All @@ -16,7 +17,7 @@ pub trait FileFetcher: Send + Sync {

pub struct RemoteMetadata {
pub size: u64,
pub last_modified: u64,
pub(super) version: FileVersion,
}

/// A struct that fetches data from local disk and stores it into the `cache`.
Expand Down Expand Up @@ -59,7 +60,7 @@ impl FileFetcher for LocalFileFetcher {

Ok(RemoteMetadata {
size: metadata.len(),
last_modified: last_modified_u64(&metadata),
version: FileVersion::Timestamp(last_modified_u64(&metadata)),
})
}

Expand Down Expand Up @@ -97,7 +98,12 @@ impl FileFetcher for CloudFileFetcher {

Ok(RemoteMetadata {
size: metadata.size as u64,
last_modified: metadata.last_modified.timestamp_millis() as u64,
version: metadata
.e_tag
.map(|x| FileVersion::ETag(blake3::hash(x.as_bytes()).to_hex()[..32].to_string()))
.unwrap_or_else(|| {
FileVersion::Timestamp(metadata.last_modified.timestamp_millis() as u64)
}),
})
}

Expand Down
11 changes: 9 additions & 2 deletions crates/polars-io/src/file_cache/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ use std::sync::Arc;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub(super) enum FileVersion {
Timestamp(u64),
ETag(String),
Uninitialized,
}

#[derive(Debug)]
pub enum LocalCompareError {
LastModifiedMismatch { expected: u64, actual: u64 },
Expand All @@ -18,7 +25,7 @@ pub(super) struct EntryMetadata {
pub(super) uri: Arc<str>,
pub(super) local_last_modified: u64,
pub(super) local_size: u64,
pub(super) remote_last_modified: u64,
pub(super) remote_version: FileVersion,
/// TTL since last access, in seconds.
pub(super) ttl: u64,
}
Expand Down Expand Up @@ -47,7 +54,7 @@ impl EntryMetadata {
uri,
local_last_modified: 0,
local_size: 0,
remote_last_modified: 0,
remote_version: FileVersion::Uninitialized,
ttl,
}
}
Expand Down

0 comments on commit 66d0959

Please sign in to comment.