Skip to content

Commit

Permalink
Add semaphore to migrate
Browse files Browse the repository at this point in the history
  • Loading branch information
xSke committed Feb 14, 2024
1 parent f12f3ef commit cef7915
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ use crate::{db, process, AppState, PKAvatarError};
use reqwest::StatusCode;
use std::sync::Arc;
use std::time::Duration;
use time::Instant;
use tokio::sync::Semaphore;
use tracing::{error, info, instrument, warn};

static PROCESS_SEMAPHORE: Semaphore = Semaphore::const_new(1);

pub async fn handle_item_inner(
state: &AppState,
item: &ImageQueueEntry,
Expand All @@ -24,7 +28,22 @@ pub async fn handle_item_inner(
let pulled = state.puller.pull(&parsed).await?;
let data_len = pulled.data.len();

let encoded = process::process_async(pulled.data, item.kind).await?;
let encoded = {
// Trying to reduce CPU load/potentially blocking the worker by adding a bottleneck on parallel encodes
// no semaphore on the main api though, that one should ideally be low latency
// todo: configurable?
let time_before_semaphore = Instant::now();
let permit = PROCESS_SEMAPHORE.acquire().await.map_err(|e| PKAvatarError::InternalError(e.into()))?;
let time_after_semaphore = Instant::now();
let semaphore_time = time_after_semaphore - time_before_semaphore;
if semaphore_time.whole_milliseconds() > 100 {
warn!("waited more than {} ms for process semaphore", semaphore_time.whole_milliseconds());
}

let encoded = process::process_async(pulled.data, item.kind).await?;
drop(permit);
encoded
};
let store_res = state.storer.store(&encoded).await?;
let final_url = format!("{}{}", state.config.base_url, store_res.path);

Expand Down

0 comments on commit cef7915

Please sign in to comment.