From dec9a2e0a88515f63df6843392919b07c536ec3d Mon Sep 17 00:00:00 2001 From: Sorami Hisamoto Date: Fri, 5 Jan 2024 15:08:22 +0900 Subject: [PATCH] =?UTF-8?q?GeoPackage=E7=94=A8=E3=81=AESink=E3=82=92?= =?UTF-8?q?=E5=AE=9F=E8=A3=85=EF=BC=88=E7=8F=BE=E5=9C=A8=E3=81=AE=E9=83=A8?= =?UTF-8?q?=E5=93=81=E3=82=92=E3=82=82=E3=81=A8=E3=81=AB=E3=81=97=E3=81=A6?= =?UTF-8?q?=EF=BC=89=20(#136)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close #121 #89 , #129 での、Gpkg変換の初期実装をもとに、全体の流れに組み込む(sinkを作る)。 とりあえず書いてみたコード例のようなものなので、コメントいただきたいです。 ## 悩み SQLx(async)の取り扱い。とりあえず、Sinkの中でブロックしてやってみているが、こう言う方法でいいのか? --------- Co-authored-by: Taku Fukada --- nusamai-gpkg/src/geometry.rs | 4 +- nusamai-gpkg/src/handler.rs | 52 ++++++++++++++++----- nusamai-gpkg/src/lib.rs | 4 +- nusamai/Cargo.toml | 3 +- nusamai/src/main.rs | 5 +- nusamai/src/sink/gpkg/mod.rs | 89 ++++++++++++++++++++++++++++++++++++ nusamai/src/sink/mod.rs | 1 + 7 files changed, 142 insertions(+), 16 deletions(-) create mode 100644 nusamai/src/sink/gpkg/mod.rs diff --git a/nusamai-gpkg/src/geometry.rs b/nusamai-gpkg/src/geometry.rs index 8a3c46d0..36df0242 100644 --- a/nusamai-gpkg/src/geometry.rs +++ b/nusamai-gpkg/src/geometry.rs @@ -1,5 +1,7 @@ //! Convert geometries to GeoPackage SQL Geometry Binary Format -/// cf. https://www.geopackage.org/spec130/#gpb_format +//! +//! cf. https://www.geopackage.org/spec130/#gpb_format + use nusamai_geometry::{MultiPolygon, Polygon}; fn geometry_header(srs_id: i32) -> Vec { diff --git a/nusamai-gpkg/src/handler.rs b/nusamai-gpkg/src/handler.rs index 583c7c4f..29c1cab9 100644 --- a/nusamai-gpkg/src/handler.rs +++ b/nusamai-gpkg/src/handler.rs @@ -1,7 +1,8 @@ -use crate::geometry::multipolygon_to_bytes; -use sqlx::Row; -use sqlx::{migrate::MigrateDatabase, Pool, Sqlite, SqlitePool}; +use sqlx::sqlite::*; +use sqlx::{Acquire, Row}; +use sqlx::{Pool, Sqlite, SqlitePool}; use std::path::Path; +use std::str::FromStr; use thiserror::Error; pub struct GpkgHandler { @@ -25,7 +26,11 @@ impl GpkgHandler { let db_url = format!("sqlite://{}", path); - Sqlite::create_database(&db_url).await?; + let conn_opts = SqliteConnectOptions::from_str(&db_url)? + .create_if_missing(true) + .synchronous(SqliteSynchronous::Normal) + .journal_mode(SqliteJournalMode::Wal); + SqlitePoolOptions::new().connect_with(conn_opts).await?; let pool = SqlitePool::connect(&db_url).await?; // Initialize the database with minimum GeoPackage schema @@ -84,19 +89,44 @@ impl GpkgHandler { table_names } + pub async fn begin(&mut self) -> Result { + Ok(GpkgTransaction::new(self.pool.begin().await?)) + } +} + +pub async fn insert_feature(pool: &Pool, bytes: &[u8]) { + sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)") + .bind(bytes) + .execute(pool) + .await + .unwrap(); + + // TODO: MultiLineString + // TODO: MultiPoint +} + +pub struct GpkgTransaction<'c> { + tx: sqlx::Transaction<'c, Sqlite>, +} + +impl<'c> GpkgTransaction<'c> { + pub fn new(tx: sqlx::Transaction<'c, Sqlite>) -> Self { + Self { tx } + } + + pub async fn commit(self) -> Result<(), GpkgError> { + Ok(self.tx.commit().await?) + } + /// Add a MultiPolygonZ feature to the GeoPackage database /// /// Note: とりあえず地物を挿入してみるための実装です。参考にしないでください。 - pub async fn add_multi_polygon_feature( - &self, - vertices: &[[f64; 3]], - mpoly: &nusamai_geometry::MultiPolygon<'_, 1, u32>, - ) { - let bytes = multipolygon_to_bytes(vertices, mpoly, 4326); + pub async fn insert_feature(&mut self, bytes: &[u8]) { + let executor = self.tx.acquire().await.unwrap(); sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)") .bind(bytes) - .execute(&self.pool) + .execute(&mut *executor) .await .unwrap(); diff --git a/nusamai-gpkg/src/lib.rs b/nusamai-gpkg/src/lib.rs index 7c35b992..3b47db96 100644 --- a/nusamai-gpkg/src/lib.rs +++ b/nusamai-gpkg/src/lib.rs @@ -1,4 +1,4 @@ -mod geometry; +pub mod geometry; mod handler; -pub use handler::GpkgHandler; +pub use handler::*; diff --git a/nusamai/Cargo.toml b/nusamai/Cargo.toml index 8c4aaaa7..63f1f3b7 100644 --- a/nusamai/Cargo.toml +++ b/nusamai/Cargo.toml @@ -19,9 +19,10 @@ nusamai-geojson = { path = "../nusamai-geojson" } geojson = "0.24.1" serde_json = "1.0.108" url = "2.5.0" +nusamai-gpkg = { path = "../nusamai-gpkg" } +tokio = { version = "1.35.1", features = ["full"] } [dev-dependencies] rand = "0.8.5" tokio = { version = "1.35.1", features = ["full"] } nusamai-geometry = { path = "../nusamai-geometry" } -nusamai-gpkg = { path = "../nusamai-gpkg" } \ No newline at end of file diff --git a/nusamai/src/main.rs b/nusamai/src/main.rs index e16f916b..ffc45642 100644 --- a/nusamai/src/main.rs +++ b/nusamai/src/main.rs @@ -4,7 +4,8 @@ use clap::Parser; use nusamai::pipeline::Canceller; use nusamai::sink::{ - geojson::GeoJsonSinkProvider, noop::NoopSinkProvider, serde::SerdeSinkProvider, + geojson::GeoJsonSinkProvider, gpkg::GpkgSinkProvider, noop::NoopSinkProvider, + serde::SerdeSinkProvider, }; use nusamai::sink::{DataSink, DataSinkProvider}; use nusamai::source::citygml::CityGMLSourceProvider; @@ -26,6 +27,7 @@ enum SinkChoice { Noop, Serde, Geojson, + Gpkg, } impl SinkChoice { @@ -34,6 +36,7 @@ impl SinkChoice { SinkChoice::Noop => Box::new(NoopSinkProvider {}), SinkChoice::Serde => Box::new(SerdeSinkProvider {}), SinkChoice::Geojson => Box::new(GeoJsonSinkProvider {}), + SinkChoice::Gpkg => Box::new(GpkgSinkProvider {}), } } } diff --git a/nusamai/src/sink/gpkg/mod.rs b/nusamai/src/sink/gpkg/mod.rs new file mode 100644 index 00000000..d88195da --- /dev/null +++ b/nusamai/src/sink/gpkg/mod.rs @@ -0,0 +1,89 @@ +//! GeoPackage sink + +use rayon::prelude::*; + +use crate::configuration::Config; +use crate::pipeline::{Feedback, Receiver}; +use crate::sink::{DataSink, DataSinkProvider, SinkInfo}; + +use nusamai_gpkg::geometry::multipolygon_to_bytes; +use nusamai_gpkg::GpkgHandler; + +pub struct GpkgSinkProvider {} + +impl DataSinkProvider for GpkgSinkProvider { + fn create(&self, _config: &Config) -> Box { + Box::::default() + } + + fn info(&self) -> SinkInfo { + SinkInfo { + name: "GeoPackage".to_string(), + } + } + + fn config(&self) -> Config { + Config::default() + } +} + +#[derive(Default)] +pub struct GpkgSink {} + +impl GpkgSink { + pub async fn run_async(&mut self, upstream: Receiver, feedback: &mut Feedback) { + let mut handler = GpkgHandler::init("output.gpkg").await.unwrap(); + + let (sender, mut receiver) = tokio::sync::mpsc::channel(100); + + let producers = { + let feedback = feedback.clone(); + tokio::task::spawn_blocking(move || { + let _ = upstream.into_iter().par_bridge().try_for_each_with( + sender, + |sender, parcel| { + if feedback.is_cancelled() { + return Err(()); + } + let cityobj = parcel.cityobj; + if !cityobj.geometries.multipolygon.is_empty() { + let bytes = multipolygon_to_bytes( + &cityobj.geometries.vertices, + &cityobj.geometries.multipolygon, + 4326, + ); + if sender.blocking_send(bytes).is_err() { + return Err(()); + }; + } + Ok(()) + }, + ); + }) + }; + + let mut tx = handler.begin().await.unwrap(); + while let Some(gpkg_bin) = receiver.recv().await { + if feedback.is_cancelled() { + return; + } + tx.insert_feature(&gpkg_bin).await; + } + tx.commit().await.unwrap(); + + producers.await.unwrap(); + } +} + +impl DataSink for GpkgSink { + fn run(&mut self, upstream: Receiver, feedback: &mut Feedback) { + let runtime = tokio::runtime::Runtime::new().unwrap(); + runtime.block_on(self.run_async(upstream, feedback)); + } +} + +#[cfg(test)] +mod tests { + #[test] + fn test() {} +} diff --git a/nusamai/src/sink/mod.rs b/nusamai/src/sink/mod.rs index a2f05cde..34341d8a 100644 --- a/nusamai/src/sink/mod.rs +++ b/nusamai/src/sink/mod.rs @@ -1,4 +1,5 @@ pub mod geojson; +pub mod gpkg; pub mod noop; pub mod serde;