Skip to content

Commit

Permalink
feat(async): add extra async versions of APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
zkat committed Sep 1, 2019
1 parent 4f34257 commit cda44d1
Show file tree
Hide file tree
Showing 12 changed files with 1,155 additions and 198 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
steps:
- uses: actions/checkout@v1
- name: Components
run: rustup component add clippy
run: rustup default nightly && rustup component add clippy
- name: Build
run: cargo build --verbose
- name: Clippy
Expand Down
695 changes: 500 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ failure = "0.1.5"
walkdir = "2.2.7"
either = "1.5.2"
mkdirp = "1.0.0"
async-std = {git = "https://github.com/passcod/async-std-unstable", branch = "unstable"}
futures-preview = "0.3.0-alpha.18"

[dev-dependencies]
criterion = "0.2.11"
Expand Down
18 changes: 17 additions & 1 deletion benches/benchmarks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_std::task;
use cacache;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tempfile;
Expand All @@ -12,5 +13,20 @@ fn get(c: &mut Criterion) {
});
}

criterion_group!(benches, get);
fn async_get(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read_hash", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read_hash(
black_box(&cache),
black_box(&sri)
))
})
});
}

criterion_group!(benches, get, async_get);
criterion_main!(benches);
125 changes: 125 additions & 0 deletions src/async_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
//! Functions for reading from cache.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

use crate::content::read::{self, AsyncReader};
use crate::errors::Error;
use crate::index::{self, Entry};

/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}

impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}

impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}

/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn read<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
read_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}

/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
#[allow(clippy::needless_lifetimes)]
pub async fn read_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}

/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}

/// Copies a cache entry by integrity address to a specified location.
#[allow(clippy::needless_lifetimes)]
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}

/// Gets entry information and metadata for a certain key.
pub fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find(cache.as_ref(), key.as_ref())
}

/// Returns true if the given hash exists in the cache.
pub fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content(cache.as_ref(), &sri).is_some()
}

103 changes: 103 additions & 0 deletions src/async_put.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
//! Functions for asynchronously writing to cache.
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::prelude::*;
use ssri::{Algorithm, Integrity};

pub use crate::put::PutOpts;
use crate::content::write;
use crate::errors::Error;
use crate::index;

/// Writes `data` to the `cache`, indexing it under `key`.
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_async(cache.as_ref(), key.as_ref()).await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}

impl PutOpts {
/// Opens the file handle for writing, returning a Put instance.
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
).await?,
opts: self,
})
}
}

/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}

impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}

impl AsyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await?;
Ok(writer_sri)
}
}

32 changes: 32 additions & 0 deletions src/async_rm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Functions for removing things from the cache.
use std::path::Path;

use async_std::fs as afs;
use ssri::Integrity;

use crate::content::rm;
use crate::errors::Error;
use crate::index;

/// Removes an individual index entry. The associated content will be left
/// intact.
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete_async(cache.as_ref(), &key).await
}

/// Removes an individual content entry. Any index entries pointing to this
/// content will become invalidated.
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm_async(cache.as_ref(), &sri).await
}

/// Removes entire contents of the cache, including temporary files, the entry
/// index, and all content data.
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
afs::remove_dir_all(entry.path()).await?;
}
}
Ok(())
}
56 changes: 56 additions & 0 deletions src/content/read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use std::fs::{self, File};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};

use async_std;
use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityChecker};

use crate::content::path;
Expand All @@ -25,13 +29,44 @@ impl Reader {
}
}

pub struct AsyncReader {
fd: async_std::fs::File,
checker: IntegrityChecker,
}

impl AsyncRead for AsyncReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let amt = futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
self.checker.input(&buf);
Poll::Ready(Ok(amt))
}
}

impl AsyncReader {
pub fn check(self) -> Result<Algorithm, Error> {
self.checker.result().ok_or(Error::IntegrityError)
}
}

pub fn open(cache: &Path, sri: Integrity) -> Result<Reader, Error> {
Ok(Reader {
fd: File::open(cache)?,
checker: IntegrityChecker::new(sri),
})
}

#[allow(clippy::needless_lifetimes)]
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader, Error> {
Ok(AsyncReader {
fd: async_std::fs::File::open(cache).await?,
checker: IntegrityChecker::new(sri),
})
}

pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::read(&cpath)?;
Expand All @@ -42,6 +77,16 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>, Error> {
}
}

pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::read(&cpath).await?;
if sri.check(&ret).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)
}
}

pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::copy(&cpath, to)?;
Expand All @@ -53,6 +98,17 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64, Error> {
}
}

pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::copy(&cpath, to).await?;
let data = async_std::fs::read(cpath).await?;
if sri.check(data).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)
}
}

pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
if path::content_path(&cache, &sri).exists() {
Some(sri.clone())
Expand Down
Loading

0 comments on commit cda44d1

Please sign in to comment.