Skip to content

Commit

Permalink
ENH: Add POC async implementation, example using storescp
Browse files Browse the repository at this point in the history
  • Loading branch information
naterichman committed Jul 14, 2024
1 parent 89c4f1c commit 3251368
Show file tree
Hide file tree
Showing 16 changed files with 3,282 additions and 225 deletions.
159 changes: 157 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion encoding/src/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type DecodeResult<T> = Result<T, DecodeTextError>;

/// A holder of encoding and decoding mechanisms for text in DICOM content,
/// which according to the standard, depends on the specific character set.
pub trait TextCodec {
pub trait TextCodec: Send + Sync {
/// Obtain the defined term (unique name) of the text encoding,
/// which may be used as the value of a
/// Specific Character Set (0008, 0005) element to refer to this codec.
Expand Down
3 changes: 2 additions & 1 deletion storescp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ readme = "README.md"
[dependencies]
clap = { version = "4.0.18", features = ["derive"] }
dicom-core = { path = '../core', version = "0.7.0" }
dicom-ul = { path = '../ul', version = "0.7.0" }
dicom-ul = { path = '../ul', version = "0.7.0", features = ["tokio"] }
dicom-object = { path = '../object', version = "0.7.0" }
dicom-encoding = { path = "../encoding/", version = "0.7.0" }
dicom-dictionary-std = { path = "../dictionary-std/", version = "0.7.0" }
dicom-transfer-syntax-registry = { path = "../transfer-syntax-registry/", version = "0.7.0" }
snafu = "0.8"
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
tokio = "1.38.0"
35 changes: 18 additions & 17 deletions storescp/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream},
net::{Ipv4Addr, SocketAddrV4},
path::PathBuf,
sync::Arc,
};

use clap::Parser;
Expand Down Expand Up @@ -47,7 +48,7 @@ struct App {
port: u16,
}

fn run(scu_stream: TcpStream, args: &App) -> Result<(), Whatever> {
async fn run(scu_stream: tokio::net::TcpStream, args: &App) -> Result<(), Whatever> {
let App {
verbose,
calling_ae_title,
Expand Down Expand Up @@ -90,6 +91,7 @@ fn run(scu_stream: TcpStream, args: &App) -> Result<(), Whatever> {

let mut association = options
.establish(scu_stream)
.await
.whatever_context("could not establish association")?;

info!("New association from {}", association.client_ae_title());
Expand All @@ -99,7 +101,7 @@ fn run(scu_stream: TcpStream, args: &App) -> Result<(), Whatever> {
);

loop {
match association.receive() {
match association.receive().await {
Ok(mut pdu) => {
if verbose {
debug!("scu ----> scp: {}", pdu.short_description());
Expand Down Expand Up @@ -153,7 +155,7 @@ fn run(scu_stream: TcpStream, args: &App) -> Result<(), Whatever> {
data: cecho_data,
}],
};
association.send(&pdu_response).whatever_context(
association.send(&pdu_response).await.whatever_context(
"failed to send C-ECHO response object to SCU",
)?;
} else {
Expand Down Expand Up @@ -254,13 +256,14 @@ fn run(scu_stream: TcpStream, args: &App) -> Result<(), Whatever> {
};
association
.send(&pdu_response)
.await
.whatever_context("failed to send response object to SCU")?;
}
}
}
Pdu::ReleaseRQ => {
buffer.clear();
association.send(&Pdu::ReleaseRP).unwrap_or_else(|e| {
association.send(&Pdu::ReleaseRP).await.unwrap_or_else(|e| {
warn!(
"Failed to send association release message to SCU: {}",
snafu::Report::from_error(e)
Expand Down Expand Up @@ -355,8 +358,9 @@ fn create_cecho_response(message_id: u16) -> InMemDicomObject<StandardDataDictio
])
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = App::parse();
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Arc::new(App::parse());

tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
Expand All @@ -380,23 +384,20 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
});

let listen_addr = SocketAddrV4::new(Ipv4Addr::from(0), args.port);
let listener = TcpListener::bind(listen_addr)?;
let listener = tokio::net::TcpListener::bind(listen_addr).await?;
info!(
"{} listening on: tcp://{}",
&args.calling_ae_title, listen_addr
);

for stream in listener.incoming() {
match stream {
Ok(scu_stream) => {
if let Err(e) = run(scu_stream, &args) {
error!("{}", snafu::Report::from_error(e));
}
}
Err(e) => {
loop {
let (socket, _addr) = listener.accept().await?;
let args = args.clone();
tokio::task::spawn(async move {
if let Err(e) = run(socket, &args).await {
error!("{}", snafu::Report::from_error(e));
}
}
});
}

Ok(())
Expand Down
Loading

0 comments on commit 3251368

Please sign in to comment.