Skip to content

Commit

Permalink
feat(async): add extra async versions of APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Sep 1, 2019
1 parent 97f4457 commit 7a0f904
Show file tree
Hide file tree
Showing 13 changed files with 1,350 additions and 204 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [stable, beta, nightly]
rust: [nightly]
os: [ubuntu-latest, windows-latest]

steps:
Expand All @@ -21,6 +21,7 @@ jobs:
- name: Build
run: cargo build --verbose
- name: Clippy
run: cargo clippy -- -D warnings
# TODO - add -D warnings back in after async-std publishes task::blocking
run: cargo clippy # -- -D warnings
- name: Run tests
run: cargo test --verbose
695 changes: 500 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ failure = "0.1.5"
walkdir = "2.2.7"
either = "1.5.2"
mkdirp = "1.0.0"
async-std = { git = "https://github.com/passcod/async-std-unstable", branch = "unstable" }
# async-std = { git = "https://github.com/vertexclique/async-std", branch = "ema-based-statistically-adaptive-thread-pool"}
futures-preview = "0.3.0-alpha.18"

[target.'cfg(unix)'.dependencies]
chownr = "2.0.0"
Expand Down
49 changes: 46 additions & 3 deletions benches/benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,59 @@
use async_std::task;
use cacache;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tempfile;

fn get(c: &mut Criterion) {
fn read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("read_hash", move |b| {
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)))
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)).unwrap())
});
}

criterion_group!(benches, get);
fn read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
cacache::get::read(&cache, "hello").unwrap();
c.bench_function("read", move |b| {
b.iter(|| cacache::get::read(black_box(&cache), black_box(String::from("hello"))).unwrap())
});
}

fn async_read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read_hash", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read_hash(
black_box(&cache),
black_box(&sri)
)).unwrap()
})
});
}

fn async_read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read(
black_box(&cache),
black_box("hello")
)).unwrap()
})
});
}

criterion_group!(benches, read_hash, read, async_read_hash, async_read);
// criterion_group!(benches, read_hash, async_read_hash);
criterion_main!(benches);
125 changes: 125 additions & 0 deletions src/async_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//! Functions for reading from cache.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

use crate::content::read::{self, AsyncReader};
use crate::errors::Error;
use crate::index::{self, Entry};

/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}

impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}

/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn read<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
read_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
#[allow(clippy::needless_lifetimes)]
pub async fn read_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}

/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}

/// Copies a cache entry by integrity address to a specified location.
#[allow(clippy::needless_lifetimes)]
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}

/// Gets entry information and metadata for a certain key.
pub async fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find_async(cache.as_ref(), key.as_ref()).await
}

/// Returns true if the given hash exists in the cache.
pub async fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content_async(cache.as_ref(), &sri).await.is_some()
}

123 changes: 123 additions & 0 deletions src/async_put.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//! Functions for asynchronously writing to cache.
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

pub use crate::put::PutOpts;
use crate::content::write;
use crate::errors::Error;
use crate::index;

/// Writes `data` to the `cache`, indexing it under `key`.
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_async(cache.as_ref(), key.as_ref()).await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}

impl PutOpts {
/// Opens the file handle for writing, returning a Put instance.
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
).await?,
opts: self,
})
}
}

/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}

impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}

impl AsyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::async_get;
use async_std::task;

#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
data(&dir, "hello", b"hello").await.unwrap();
});
let data = task::block_on(async {
async_get::read(&dir, "hello").await.unwrap()
});
assert_eq!(data, b"hello");
}
}
32 changes: 32 additions & 0 deletions src/async_rm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Functions for removing things from the cache.
use std::path::Path;

use async_std::fs as afs;
use ssri::Integrity;

use crate::content::rm;
use crate::errors::Error;
use crate::index;

/// Removes an individual index entry. The associated content will be left
/// intact.
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete_async(cache.as_ref(), &key).await
}

/// Removes an individual content entry. Any index entries pointing to this
/// content will become invalidated.
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm_async(cache.as_ref(), &sri).await
}

/// Removes entire contents of the cache, including temporary files, the entry
/// index, and all content data.
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
afs::remove_dir_all(entry.path()).await?;
}
}
Ok(())
}
Loading

0 comments on commit 7a0f904

Please sign in to comment.