Skip to content

Commit

Permalink
Add batch migration code
Browse files Browse the repository at this point in the history
  • Loading branch information
xSke committed Feb 11, 2024
1 parent 1dade41 commit c17c0bd
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 2 deletions.
20 changes: 19 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ImageKind;
use s3::creds::time::OffsetDateTime;
use sqlx::{Executor, FromRow, PgPool};
use sqlx::{Executor, FromRow, PgPool, Postgres, Transaction};

#[derive(FromRow)]
pub struct ImageMeta {
Expand All @@ -19,6 +19,13 @@ pub struct ImageMeta {
pub uploaded_by_account: Option<i64>,
}

#[derive(FromRow)]
pub struct ImageQueueEntry {
pub itemid: i32,
pub url: String,
pub kind: ImageKind,
}

pub async fn init(pool: &PgPool) -> anyhow::Result<()> {
pool.execute(include_str!("./init.sql")).await?;
Ok(())
Expand Down Expand Up @@ -47,6 +54,17 @@ pub async fn get_by_attachment_id(
)
}

pub async fn pop_queue(pool: &PgPool) -> anyhow::Result<Option<(Transaction<Postgres>, ImageQueueEntry)>> {
let mut tx = pool.begin().await?;
let res: Option<ImageQueueEntry> = sqlx::query_as("delete from image_queue where itemid = (select itemid from image_queue order by itemid for update skip locked limit 1) returning *")
.fetch_optional(&mut *tx).await?;
Ok(res.map(|x| (tx, x)))
}

pub async fn get_queue_length(pool: &PgPool) -> anyhow::Result<i64> {
Ok(sqlx::query_scalar("select count(*) from image_queue").fetch_one(pool).await?)
}

pub async fn add_image(pool: &PgPool, meta: ImageMeta) -> anyhow::Result<bool> {
let kind_str = match meta.kind {
ImageKind::Avatar => "avatar",
Expand Down
8 changes: 7 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ mod hash;
mod process;
mod pull;
mod store;
use crate::db::ImageMeta;
mod migrate;

use crate::pull::Puller;
use crate::store::Storer;
use axum::extract::State;
Expand Down Expand Up @@ -178,6 +179,8 @@ async fn main() -> anyhow::Result<()> {
config: Arc::new(config),
};

migrate::spawn_migrate_workers(Arc::new(state.clone()), state.config.migrate_worker_count);

let app = Router::new().route("/pull", post(pull)).with_state(state);

let host = "0.0.0.0:3000";
Expand Down Expand Up @@ -249,6 +252,9 @@ struct Config {
db: String,
s3: S3Config,
base_url: String,

#[serde(default)]
migrate_worker_count: u32
}

#[derive(Deserialize, Clone)]
Expand Down
71 changes: 71 additions & 0 deletions src/migrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::sync::Arc;
use std::time::Duration;
use tracing::{error, info, instrument, warn};
use crate::{AppState, db, process};
use crate::db::ImageMeta;
use crate::pull::parse_url;

pub async fn handle_item(state: &AppState) -> anyhow::Result<()> {
let queue_length = db::get_queue_length(&state.pool).await?;
info!("migrate queue length: {}", queue_length);

if let Some((tx, item)) = db::pop_queue(&state.pool).await? {
let Ok(parsed) = parse_url(&item.url) else {
// if invalid url, consume and skip it
warn!("skipping invalid url: {}", item.url);
tx.commit().await?;
return Ok(());
};

let pulled = state.puller.pull(&parsed).await?;
let encoded = process::process(&pulled.data, item.kind)?;
let store_res = state.storer.store(&encoded).await?;
let final_url = format!("{}{}", state.config.base_url, store_res.path);

db::add_image(
&state.pool,
ImageMeta {
id: store_res.id,
url: final_url.clone(),
original_url: Some(parsed.full_url),
original_type: Some(pulled.content_type),
original_file_size: Some(pulled.data.len() as i32),
original_attachment_id: Some(parsed.attachment_id as i64),
file_size: encoded.data_webp.len() as i32,
width: encoded.width as i32,
height: encoded.height as i32,
kind: item.kind,
uploaded_at: None,
uploaded_by_account: None,
},
)
.await?;

info!("migrated {} ({}k -> {}k)", final_url, pulled.data.len(), encoded.data_webp.len());
tx.commit().await?;
} else {
tokio::time::sleep(Duration::from_secs(5)).await;
}

Ok(())
}

#[instrument(skip(state))]
pub async fn worker(worker_id: u32, state: Arc<AppState>) {
info!("spawned migrate worker with id {}", worker_id);
loop {
match handle_item(&state).await {
Ok(()) => {},
Err(e) => {
error!("error in migrate worker {}: {}", worker_id, e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}

pub fn spawn_migrate_workers(state: Arc<AppState>, count: u32) {
for i in 0..count {
tokio::spawn(worker(i, state.clone()));
}
}

0 comments on commit c17c0bd

Please sign in to comment.