Skip to content

Commit

Permalink
Merge pull request #684 from bioinformatics-ua/bug/dicom-storage-soft…
Browse files Browse the repository at this point in the history
…-stop

Change DICOM storage stop behavior to soft stop
  • Loading branch information
Enet4 committed Feb 28, 2024
2 parents 5fafbd3 + 428c26e commit d145455
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 40 deletions.
42 changes: 2 additions & 40 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/DicomStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,8 @@ public class DicomStorage extends StorageService {
private Set<String> priorityAETs = new HashSet<>();

// Changed to support priority queue.
private BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private NetworkApplicationEntity[] naeArr = null;
private AtomicLong seqNum = new AtomicLong(0L);
private volatile boolean workerShouldExit = false;

private static boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

/**
*
Expand Down Expand Up @@ -288,7 +284,7 @@ protected void onCStoreRQ(Association as, int pcid, DicomObject rq, PDVInputStre
if (uri != null) {
// enqueue to index
ImageElement element = new ImageElement(uri, as.getCallingAET(), seqNum.getAndIncrement());
queue.add(element);
IndexQueueWorker.getInstance().addElement(element);
}
}

Expand Down Expand Up @@ -368,53 +364,19 @@ static int compareElementsImpl(Set<String> priorityAETs, String thisAETitle, lon
return Long.compare(thisSeqNumber, otherSeqNumber);
}

class IndexerQueueWorker implements Runnable {
public Collection<IndexerInterface> plugins;

@Override
public void run() {
while (!workerShouldExit) {
try {
// Fetch an element by the queue taking into account the priorities.
ImageElement element = queue.take();
URI exam = element.getUri();
if (ASYNC_INDEX)
PluginController.getInstance().index(exam);
else
PluginController.getInstance().indexBlocking(exam);
} catch (InterruptedException ex) {
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
LOG.debug("Indexer queue worker exiting by request");
}
}

private Thread indexer = new Thread(new IndexerQueueWorker(), "indexer-queue-worker");

/*
* Start the Storage Service
* @throws java.io.IOException
*/
public void start() throws IOException {
device.startListening(executor);
indexer.start();

indexer.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
IndexQueueWorker.getInstance().start();
}

/**
* Stop the storage service
*/
public void stop() {
device.stopListening();
workerShouldExit = true;
indexer.interrupt();
}
}
91 changes: 91 additions & 0 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/IndexQueueWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright (C) 2014 Universidade de Aveiro, DETI/IEETA, Bioinformatics Group - http://bioinformatics.ua.pt/
*
* This file is part of Dicoogle/dicoogle.
*
* Dicoogle/dicoogle is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Dicoogle/dicoogle is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Dicoogle. If not, see <http://www.gnu.org/licenses/>.
*/
package pt.ua.dicoogle.server;

import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

import pt.ua.dicoogle.plugins.PluginController;
import pt.ua.dicoogle.server.DicomStorage.ImageElement;

/** Index worker actor.
*
* This singleton actor module is responsible for
* dispatching indexing tasks for the DICOM files received via C-STORE.
*
* @see DicomStorage
*/
public final class IndexQueueWorker {

private static final boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(IndexQueueWorker.class);

private static IndexQueueWorker instance;

public synchronized static IndexQueueWorker getInstance() {
if (instance == null) {
instance = new IndexQueueWorker();
}
return instance;
}

private IndexQueueWorker() {}

private final BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private Thread thread;

public synchronized void start() {
if (thread == null || !thread.isAlive()) {
thread = new Thread(WORKER_RUN, "indexer-queue-worker");
thread.start();
thread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
}
}

/** Push this DICOM element to be indexed. */
public void addElement(ImageElement element) {
queue.add(element);
}

private final Runnable WORKER_RUN = () -> {
while (true) {
try {
// Fetch an element from the priority queue
ImageElement element = queue.take();
URI exam = element.getUri();
if (ASYNC_INDEX) {
PluginController.getInstance().index(exam);
} else {
PluginController.getInstance().indexBlocking(exam);
}
} catch (InterruptedException ex) {
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
};
}

0 comments on commit d145455

Please sign in to comment.