diff --git a/src/db.rs b/src/db.rs index 7e3a68c..ae4b815 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,6 +1,6 @@ use crate::ImageKind; use s3::creds::time::OffsetDateTime; -use sqlx::{Executor, FromRow, PgPool}; +use sqlx::{Executor, FromRow, PgPool, Postgres, Transaction}; #[derive(FromRow)] pub struct ImageMeta { @@ -19,6 +19,13 @@ pub struct ImageMeta { pub uploaded_by_account: Option, } +#[derive(FromRow)] +pub struct ImageQueueEntry { + pub itemid: i32, + pub url: String, + pub kind: ImageKind, +} + pub async fn init(pool: &PgPool) -> anyhow::Result<()> { pool.execute(include_str!("./init.sql")).await?; Ok(()) @@ -47,6 +54,17 @@ pub async fn get_by_attachment_id( ) } +pub async fn pop_queue(pool: &PgPool) -> anyhow::Result, ImageQueueEntry)>> { + let mut tx = pool.begin().await?; + let res: Option = sqlx::query_as("delete from image_queue where itemid = (select itemid from image_queue order by itemid for update skip locked limit 1) returning *") + .fetch_optional(&mut *tx).await?; + Ok(res.map(|x| (tx, x))) +} + +pub async fn get_queue_length(pool: &PgPool) -> anyhow::Result { + Ok(sqlx::query_scalar("select count(*) from image_queue").fetch_one(pool).await?) +} + pub async fn add_image(pool: &PgPool, meta: ImageMeta) -> anyhow::Result { let kind_str = match meta.kind { ImageKind::Avatar => "avatar", diff --git a/src/main.rs b/src/main.rs index d6d072f..4bde0cd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,8 @@ mod hash; mod process; mod pull; mod store; -use crate::db::ImageMeta; +mod migrate; + use crate::pull::Puller; use crate::store::Storer; use axum::extract::State; @@ -178,6 +179,8 @@ async fn main() -> anyhow::Result<()> { config: Arc::new(config), }; + migrate::spawn_migrate_workers(Arc::new(state.clone()), state.config.migrate_worker_count); + let app = Router::new().route("/pull", post(pull)).with_state(state); let host = "0.0.0.0:3000"; @@ -249,6 +252,9 @@ struct Config { db: String, s3: S3Config, base_url: String, + + #[serde(default)] + migrate_worker_count: u32 } #[derive(Deserialize, Clone)] diff --git a/src/migrate.rs b/src/migrate.rs new file mode 100644 index 0000000..2a99012 --- /dev/null +++ b/src/migrate.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; +use std::time::Duration; +use tracing::{error, info, instrument, warn}; +use crate::{AppState, db, process}; +use crate::db::ImageMeta; +use crate::pull::parse_url; + +pub async fn handle_item(state: &AppState) -> anyhow::Result<()> { + let queue_length = db::get_queue_length(&state.pool).await?; + info!("migrate queue length: {}", queue_length); + + if let Some((tx, item)) = db::pop_queue(&state.pool).await? { + let Ok(parsed) = parse_url(&item.url) else { + // if invalid url, consume and skip it + warn!("skipping invalid url: {}", item.url); + tx.commit().await?; + return Ok(()); + }; + + let pulled = state.puller.pull(&parsed).await?; + let encoded = process::process(&pulled.data, item.kind)?; + let store_res = state.storer.store(&encoded).await?; + let final_url = format!("{}{}", state.config.base_url, store_res.path); + + db::add_image( + &state.pool, + ImageMeta { + id: store_res.id, + url: final_url.clone(), + original_url: Some(parsed.full_url), + original_type: Some(pulled.content_type), + original_file_size: Some(pulled.data.len() as i32), + original_attachment_id: Some(parsed.attachment_id as i64), + file_size: encoded.data_webp.len() as i32, + width: encoded.width as i32, + height: encoded.height as i32, + kind: item.kind, + uploaded_at: None, + uploaded_by_account: None, + }, + ) + .await?; + + info!("migrated {} ({}k -> {}k)", final_url, pulled.data.len(), encoded.data_webp.len()); + tx.commit().await?; + } else { + tokio::time::sleep(Duration::from_secs(5)).await; + } + + Ok(()) +} + +#[instrument(skip(state))] +pub async fn worker(worker_id: u32, state: Arc) { + info!("spawned migrate worker with id {}", worker_id); + loop { + match handle_item(&state).await { + Ok(()) => {}, + Err(e) => { + error!("error in migrate worker {}: {}", worker_id, e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } +} + +pub fn spawn_migrate_workers(state: Arc, count: u32) { + for i in 0..count { + tokio::spawn(worker(i, state.clone())); + } +} \ No newline at end of file