Skip to content

Commit

Permalink
GeoPackage用のSinkを実装(現在の部品をもとにして) (#136)
Browse files Browse the repository at this point in the history
close #121

#89 , #129 での、Gpkg変換の初期実装をもとに、全体の流れに組み込む(sinkを作る)。

とりあえず書いてみたコード例のようなものなので、コメントいただきたいです。

## 悩み

SQLx(async)の取り扱い。とりあえず、Sinkの中でブロックしてやってみているが、こう言う方法でいいのか?

---------

Co-authored-by: Taku Fukada <naninunenor@gmail.com>
  • Loading branch information
sorami and ciscorn authored Jan 5, 2024
1 parent 55f34c4 commit dec9a2e
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 16 deletions.
4 changes: 3 additions & 1 deletion nusamai-gpkg/src/geometry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Convert geometries to GeoPackage SQL Geometry Binary Format
/// cf. https://www.geopackage.org/spec130/#gpb_format
//!
//! cf. https://www.geopackage.org/spec130/#gpb_format

use nusamai_geometry::{MultiPolygon, Polygon};

fn geometry_header(srs_id: i32) -> Vec<u8> {
Expand Down
52 changes: 41 additions & 11 deletions nusamai-gpkg/src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::geometry::multipolygon_to_bytes;
use sqlx::Row;
use sqlx::{migrate::MigrateDatabase, Pool, Sqlite, SqlitePool};
use sqlx::sqlite::*;
use sqlx::{Acquire, Row};
use sqlx::{Pool, Sqlite, SqlitePool};
use std::path::Path;
use std::str::FromStr;
use thiserror::Error;

pub struct GpkgHandler {
Expand All @@ -25,7 +26,11 @@ impl GpkgHandler {

let db_url = format!("sqlite://{}", path);

Sqlite::create_database(&db_url).await?;
let conn_opts = SqliteConnectOptions::from_str(&db_url)?
.create_if_missing(true)
.synchronous(SqliteSynchronous::Normal)
.journal_mode(SqliteJournalMode::Wal);
SqlitePoolOptions::new().connect_with(conn_opts).await?;
let pool = SqlitePool::connect(&db_url).await?;

// Initialize the database with minimum GeoPackage schema
Expand Down Expand Up @@ -84,19 +89,44 @@ impl GpkgHandler {
table_names
}

pub async fn begin(&mut self) -> Result<GpkgTransaction, GpkgError> {
Ok(GpkgTransaction::new(self.pool.begin().await?))
}
}

pub async fn insert_feature(pool: &Pool<Sqlite>, bytes: &[u8]) {
sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)")
.bind(bytes)
.execute(pool)
.await
.unwrap();

// TODO: MultiLineString
// TODO: MultiPoint
}

pub struct GpkgTransaction<'c> {
tx: sqlx::Transaction<'c, Sqlite>,
}

impl<'c> GpkgTransaction<'c> {
pub fn new(tx: sqlx::Transaction<'c, Sqlite>) -> Self {
Self { tx }
}

pub async fn commit(self) -> Result<(), GpkgError> {
Ok(self.tx.commit().await?)
}

/// Add a MultiPolygonZ feature to the GeoPackage database
///
/// Note: とりあえず地物を挿入してみるための実装です。参考にしないでください。
pub async fn add_multi_polygon_feature(
&self,
vertices: &[[f64; 3]],
mpoly: &nusamai_geometry::MultiPolygon<'_, 1, u32>,
) {
let bytes = multipolygon_to_bytes(vertices, mpoly, 4326);
pub async fn insert_feature(&mut self, bytes: &[u8]) {
let executor = self.tx.acquire().await.unwrap();

sqlx::query("INSERT INTO mpoly3d (geometry) VALUES (?)")
.bind(bytes)
.execute(&self.pool)
.execute(&mut *executor)
.await
.unwrap();

Expand Down
4 changes: 2 additions & 2 deletions nusamai-gpkg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mod geometry;
pub mod geometry;
mod handler;

pub use handler::GpkgHandler;
pub use handler::*;
3 changes: 2 additions & 1 deletion nusamai/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ nusamai-geojson = { path = "../nusamai-geojson" }
geojson = "0.24.1"
serde_json = "1.0.108"
url = "2.5.0"
nusamai-gpkg = { path = "../nusamai-gpkg" }
tokio = { version = "1.35.1", features = ["full"] }

[dev-dependencies]
rand = "0.8.5"
tokio = { version = "1.35.1", features = ["full"] }
nusamai-geometry = { path = "../nusamai-geometry" }
nusamai-gpkg = { path = "../nusamai-gpkg" }
5 changes: 4 additions & 1 deletion nusamai/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use clap::Parser;

use nusamai::pipeline::Canceller;
use nusamai::sink::{
geojson::GeoJsonSinkProvider, noop::NoopSinkProvider, serde::SerdeSinkProvider,
geojson::GeoJsonSinkProvider, gpkg::GpkgSinkProvider, noop::NoopSinkProvider,
serde::SerdeSinkProvider,
};
use nusamai::sink::{DataSink, DataSinkProvider};
use nusamai::source::citygml::CityGMLSourceProvider;
Expand All @@ -26,6 +27,7 @@ enum SinkChoice {
Noop,
Serde,
Geojson,
Gpkg,
}

impl SinkChoice {
Expand All @@ -34,6 +36,7 @@ impl SinkChoice {
SinkChoice::Noop => Box::new(NoopSinkProvider {}),
SinkChoice::Serde => Box::new(SerdeSinkProvider {}),
SinkChoice::Geojson => Box::new(GeoJsonSinkProvider {}),
SinkChoice::Gpkg => Box::new(GpkgSinkProvider {}),
}
}
}
Expand Down
89 changes: 89 additions & 0 deletions nusamai/src/sink/gpkg/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//! GeoPackage sink

use rayon::prelude::*;

use crate::configuration::Config;
use crate::pipeline::{Feedback, Receiver};
use crate::sink::{DataSink, DataSinkProvider, SinkInfo};

use nusamai_gpkg::geometry::multipolygon_to_bytes;
use nusamai_gpkg::GpkgHandler;

pub struct GpkgSinkProvider {}

impl DataSinkProvider for GpkgSinkProvider {
fn create(&self, _config: &Config) -> Box<dyn DataSink> {
Box::<GpkgSink>::default()
}

fn info(&self) -> SinkInfo {
SinkInfo {
name: "GeoPackage".to_string(),
}
}

fn config(&self) -> Config {
Config::default()
}
}

#[derive(Default)]
pub struct GpkgSink {}

impl GpkgSink {
pub async fn run_async(&mut self, upstream: Receiver, feedback: &mut Feedback) {
let mut handler = GpkgHandler::init("output.gpkg").await.unwrap();

let (sender, mut receiver) = tokio::sync::mpsc::channel(100);

let producers = {
let feedback = feedback.clone();
tokio::task::spawn_blocking(move || {
let _ = upstream.into_iter().par_bridge().try_for_each_with(
sender,
|sender, parcel| {
if feedback.is_cancelled() {
return Err(());
}
let cityobj = parcel.cityobj;
if !cityobj.geometries.multipolygon.is_empty() {
let bytes = multipolygon_to_bytes(
&cityobj.geometries.vertices,
&cityobj.geometries.multipolygon,
4326,
);
if sender.blocking_send(bytes).is_err() {
return Err(());
};
}
Ok(())
},
);
})
};

let mut tx = handler.begin().await.unwrap();
while let Some(gpkg_bin) = receiver.recv().await {
if feedback.is_cancelled() {
return;
}
tx.insert_feature(&gpkg_bin).await;
}
tx.commit().await.unwrap();

producers.await.unwrap();
}
}

impl DataSink for GpkgSink {
fn run(&mut self, upstream: Receiver, feedback: &mut Feedback) {
let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on(self.run_async(upstream, feedback));
}
}

#[cfg(test)]
mod tests {
#[test]
fn test() {}
}
1 change: 1 addition & 0 deletions nusamai/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod geojson;
pub mod gpkg;
pub mod noop;
pub mod serde;

Expand Down

0 comments on commit dec9a2e

Please sign in to comment.