Skip to content

Commit

Permalink
Remove dashmap (BloopAI#344)
Browse files Browse the repository at this point in the history
* Remove DashMap from credential provider

* Port conversational store to scc

* Remove dashmap from repo_pool

* Remove dashmap from aaa & file indexing

* Add direnv file

* Update deps

* That call seems redundant

* Simplify this
  • Loading branch information
rsdy committed Mar 29, 2023
1 parent 9af8bc6 commit 0b89b58
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 252 deletions.
1 change: 1 addition & 0 deletions .envrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use flake
25 changes: 10 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions server/bleep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ regex-syntax = "0.6.28"
smallvec = { version = "1.10.0", features = ["serde"]}
async-trait = "0.1.63"
flume = "0.10.14"
dashmap = { version = "5.4.0", features = ["serde"] }
either = "1.8.0"
either = "1.8.1"
compact_str = "0.6.1"
bincode = "1.3.3"
directories = "4.0.1"
Expand All @@ -114,6 +113,7 @@ sentry = "0.29.2"
rudderanalytics = "1.1.2"
async-stream = "0.3.3"
erased-serde = "0.3.25"
scc = { version= "1.2.0", features = ["serde"] }
sentry-tracing = "0.30.0"

[target.'cfg(windows)'.dependencies]
Expand Down
68 changes: 34 additions & 34 deletions server/bleep/src/background.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ impl IndexWriter {
pub(crate) async fn startup_scan(self) -> anyhow::Result<()> {
let Self(Application { repo_pool, .. }) = &self;

let repos = repo_pool.iter().map(|elem| elem.key().clone()).collect();
let mut repos = vec![];
repo_pool.scan_async(|k, _| repos.push(k.clone())).await;

self.sync_and_index(repos).await
}

Expand All @@ -134,14 +136,12 @@ impl IndexWriter {
}) = &self;

let writers = indexes.writers().await?;
let (key, repo) = {
let ptr = repo_pool.get(reporef).unwrap();
let key = ptr.key().clone();
let repo = ptr.value().clone();
(key, repo)
};
let (key, repo) = repo_pool
.read_async(reporef, |k, v| (k.clone(), v.clone()))
.await
.unwrap();

let (state, indexed) = match repo.sync_status {
let indexed = match repo.sync_status {
Uninitialized | Syncing | Indexing => return Ok(()),
Removed => {
let deleted = self.delete_repo_indexes(reporef, &repo, &writers).await;
Expand All @@ -162,33 +162,33 @@ impl IndexWriter {
return Ok(());
}
_ => {
repo_pool.get_mut(reporef).unwrap().value_mut().sync_status = Indexing;
let indexed = repo.index(&key, &writers).await;
let state = match &indexed {
Ok(state) => Some(state.clone()),
_ => None,
};

(state, indexed.map(|_| ()))
repo_pool
.update_async(reporef, |_, v| v.sync_status = Indexing)
.await
.unwrap();

repo.index(&key, &writers).await
}
};

writers.commit().await?;
config.source.save_pool(repo_pool.clone())?;

let mut repo = repo_pool.get_mut(reporef).unwrap();
match indexed {
Ok(()) => {
repo.value_mut().sync_done_with(state.unwrap());
info!("commit complete; indexing done");
}
Err(err) => {
repo.value_mut().sync_status = Error {
message: err.to_string(),
};
error!(?err, ?reporef, "failed to index repository");
}
}
repo_pool
.update_async(reporef, move |_k, repo| match indexed {
Ok(state) => {
repo.sync_done_with(state);
info!("commit complete; indexing done");
}
Err(err) => {
repo.sync_status = Error {
message: err.to_string(),
};
error!(?err, ?reporef, "failed to index repository");
}
})
.await
.unwrap();

Ok(())
}
Expand Down Expand Up @@ -216,7 +216,8 @@ impl IndexWriter {

self.0
.repo_pool
.entry(repo.to_owned())
.entry_async(repo.to_owned())
.await
.or_insert_with(|| Repository::local_from(&repo));

// we _never_ touch the git repositories of local repos
Expand All @@ -228,10 +229,9 @@ impl IndexWriter {
if let Err(RemoteError::RemoteNotFound) = synced {
self.0
.repo_pool
.get_mut(&repo)
.unwrap()
.value_mut()
.sync_status = SyncStatus::RemoteRemoved;
.update_async(&repo, |_, v| v.sync_status = SyncStatus::RemoteRemoved)
.await
.unwrap();

error!(?repo, "remote repository removed; disabling local syncing");

Expand Down
8 changes: 4 additions & 4 deletions server/bleep/src/indexes/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use std::{

use anyhow::{Context, Result};
use async_trait::async_trait;
use dashmap::mapref::entry::Entry;
use once_cell::sync::Lazy;
use regex::Regex;
use scc::hash_map::Entry;
use smallvec::SmallVec;
use tantivy::{
collector::TopDocs,
Expand Down Expand Up @@ -524,17 +524,17 @@ impl File {

trace!("adding cache entry");

match cache.entry(entry_disk_path.clone()) {
match cache.entry(entry_disk_path.to_owned()) {
Entry::Occupied(mut val) if val.get().value == content_hash => {
// skip processing if contents are up-to-date in the cache
val.get_mut().fresh = true;
return Ok(());
}
Entry::Occupied(mut val) => {
val.insert(content_hash.into());
_ = val.insert(content_hash.into());
}
Entry::Vacant(val) => {
val.insert(content_hash.into());
_ = val.insert_entry(content_hash.into());
}
}
trace!("added cache entry");
Expand Down
13 changes: 5 additions & 8 deletions server/bleep/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use console_subscriber as _;

#[cfg(target_os = "windows")]
use dunce::canonicalize;
use scc::hash_map::Entry;
#[cfg(not(target_os = "windows"))]
use std::fs::canonicalize;

Expand All @@ -30,7 +31,6 @@ use crate::{
use anyhow::{anyhow, bail, Result};
use axum::extract::FromRef;

use dashmap::{mapref::entry::Entry, DashMap};
use once_cell::sync::OnceCell;

use sentry_tracing::{EventFilter, SentryLayer};
Expand Down Expand Up @@ -76,7 +76,7 @@ pub struct Application {
indexes: Arc<Indexes>,
credentials: remotes::Backends,
cookie_key: axum_extra::extract::cookie::Key,
prior_conversational_store: Arc<DashMap<String, Vec<(String, String)>>>,
prior_conversational_store: Arc<scc::HashMap<String, Vec<(String, String)>>>,
}

impl Application {
Expand Down Expand Up @@ -126,18 +126,16 @@ impl Application {
env
};

let prior_conversational_store = Arc::new(DashMap::new());

Ok(Self {
indexes: Arc::new(Indexes::new(config.clone(), semantic.clone())?),
background: BackgroundExecutor::start(config.clone()),
repo_pool: config.source.initialize_pool()?,
cookie_key: config.source.initialize_cookie_key()?,
credentials: config.source.initialize_credentials()?.into(),
prior_conversational_store: Arc::default(),
semantic,
config,
env,
prior_conversational_store,
})
}

Expand Down Expand Up @@ -257,8 +255,7 @@ impl Application {
f: impl Fn(&[(String, String)]) -> T,
) -> T {
self.prior_conversational_store
.get(user_id)
.map(|r| f(&r.value()[..]))
.read(user_id, |_, v| f(&v[..]))
.unwrap_or_else(|| f(&[]))
}

Expand All @@ -267,7 +264,7 @@ impl Application {
match self.prior_conversational_store.entry(user_id) {
Entry::Occupied(mut o) => o.get_mut().push((query, String::new())),
Entry::Vacant(v) => {
v.insert(vec![(query, String::new())]);
v.insert_entry(vec![(query, String::new())]);
}
}
}
Expand Down
Loading

0 comments on commit 0b89b58

Please sign in to comment.