Skip to content

Commit

Permalink
Auto merge of #618 - Mark-Simulacrum:opt, r=Mark-Simulacrum
Browse files Browse the repository at this point in the history
Move record-progress processing to separate thread

This is intended to let us prioritize work on other requests over work on
record-progress, thereby avoiding some of the timeouts and "database is locked"
errors we would otherwise see when the record-progress requests happen to take
priority.

This separate thread is designed to only run when the server has no requests
in-flight (other than a short, bounded, queue of record-progress requests). If
that queue fills up, we will tell workers to slow down, causing them to retry
requests -- currently at fixed intervals and per worker thread, but a future
commit might clean that up a little to have a more intentional delay.

In general this should, hopefully, decrease the error rate as particularly
human-initiated requests should never have to wait for more than one
record-progress event to complete before having largely uncontended access to the
database. (Other requests still happen concurrently, but requests are typically
very rare in comparison to record-progress which are multiple times a second,
effectively constantly processing).

Errors like rust-lang/rust#94775 (comment) are the primary motivation here, which I hope this is enough to largely clear up.
  • Loading branch information
bors committed Mar 11, 2022
2 parents 6386c67 + 229283e commit f9baf09
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 73 deletions.
74 changes: 28 additions & 46 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ chrono = { version = "0.4", features = ["serde"] }
chrono-humanize = "0.1.1"
crates-index = "0.16.2"
crossbeam-utils = "0.5"
crossbeam-channel = "0.5"
csv = "1.0.2"
dotenv = "0.13"
failure = "0.1.3"
Expand Down
2 changes: 2 additions & 0 deletions src/agent/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ impl ResponseExt for ::reqwest::blocking::Response {
match self.status() {
StatusCode::NOT_FOUND => return Err(AgentApiError::InvalidEndpoint.into()),
StatusCode::BAD_GATEWAY
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT => {
return Err(AgentApiError::ServerUnavailable.into());
Expand All @@ -50,6 +51,7 @@ impl ResponseExt for ::reqwest::blocking::Response {
.with_context(|_| format!("failed to parse API response (status code {})", status,))?;
match result {
ApiResponse::Success { result } => Ok(result),
ApiResponse::SlowDown => Err(AgentApiError::ServerUnavailable.into()),
ApiResponse::InternalError { error } => {
Err(AgentApiError::InternalServerError(error).into())
}
Expand Down
4 changes: 3 additions & 1 deletion src/server/api_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct AgentConfig {
#[serde(tag = "status", rename_all = "kebab-case")]
pub enum ApiResponse<T> {
Success { result: T },
SlowDown,
InternalError { error: String },
Unauthorized,
NotFound,
Expand All @@ -41,8 +42,9 @@ impl ApiResponse<()> {

impl<T> ApiResponse<T> {
fn status_code(&self) -> StatusCode {
match *self {
match self {
ApiResponse::Success { .. } => StatusCode::OK,
ApiResponse::SlowDown => StatusCode::TOO_MANY_REQUESTS,
ApiResponse::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
ApiResponse::Unauthorized => StatusCode::UNAUTHORIZED,
ApiResponse::NotFound => StatusCode::NOT_FOUND,
Expand Down
10 changes: 8 additions & 2 deletions src/server/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::server::agents::Agent;
use chrono::{DateTime, Utc};
use prometheus::proto::{Metric, MetricFamily};
use prometheus::{
HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec, __register_gauge,
__register_gauge_vec,
HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, __register_counter_vec,
__register_gauge, __register_gauge_vec, opts, register_counter, register_int_counter,
};

const JOBS_METRIC: &str = "crater_completed_jobs_total";
Expand All @@ -18,6 +18,7 @@ const ENDPOINT_TIME: &str = "crater_endpoint_time_seconds";
#[derive(Clone)]
pub struct Metrics {
crater_completed_jobs_total: IntCounterVec,
pub crater_bounced_record_progress: IntCounter,
crater_agent_failure: IntCounterVec,
crater_work_status: IntGaugeVec,
crater_last_crates_update: IntGauge,
Expand All @@ -29,6 +30,10 @@ impl Metrics {
let jobs_opts = prometheus::opts!(JOBS_METRIC, "total completed jobs");
let crater_completed_jobs_total =
prometheus::register_int_counter_vec!(jobs_opts, &["agent", "experiment"])?;
let crater_bounced_record_progress = prometheus::register_int_counter!(
"crater_bounced_record_progress",
"hits with full record progress queue"
)?;
let failure_opts = prometheus::opts!(AGENT_FAILED, "total completed jobs");
let crater_agent_failure =
prometheus::register_int_counter_vec!(failure_opts, &["agent", "experiment"])?;
Expand All @@ -47,6 +52,7 @@ impl Metrics {

Ok(Metrics {
crater_completed_jobs_total,
crater_bounced_record_progress,
crater_agent_failure,
crater_work_status,
crater_last_crates_update,
Expand Down
23 changes: 16 additions & 7 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Data {
pub agents: Agents,
pub db: Database,
pub reports_worker: reports::ReportsWorker,
pub record_progress_worker: routes::agent::RecordProgressThread,
pub acl: ACL,
pub metrics: Metrics,
}
Expand Down Expand Up @@ -80,6 +81,10 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
info!("initialized metrics...");

let data = Data {
record_progress_worker: routes::agent::RecordProgressThread::new(
db.clone(),
metrics.clone(),
),
config,
tokens,
agents,
Expand All @@ -100,7 +105,9 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
let data = Arc::new(data);
let github_data = github_data.map(Arc::new);

let record_progress_worker = data.record_progress_worker.clone();
let routes = warp::any()
.and(warp::any().map(move || record_progress_worker.clone().start_request()))
.and(
warp::any()
.and(
Expand All @@ -118,13 +125,15 @@ pub fn run(config: Config, bind: SocketAddr) -> Fallible<()> {
.or(routes::ui::routes(data))
.unify(),
)
.map(|mut resp: Response<Body>| {
resp.headers_mut().insert(
http::header::SERVER,
HeaderValue::from_static(&SERVER_HEADER),
);
resp
});
.map(
|_guard: routes::agent::RequestGuard, mut resp: Response<Body>| {
resp.headers_mut().insert(
http::header::SERVER,
HeaderValue::from_static(&SERVER_HEADER),
);
resp
},
);

warp::serve(routes).run(bind);

Expand Down
Loading

0 comments on commit f9baf09

Please sign in to comment.