Skip to content

Commit

Permalink
Change index queue worker to be persistent
Browse files Browse the repository at this point in the history
- move IndexQueueWorker to a dedicated class
- start index worker thread when needed
- do not halt index worker thread when service is stopped
  • Loading branch information
Enet4 committed Feb 6, 2024
1 parent f9c4b49 commit 343a391
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 51 deletions.
53 changes: 2 additions & 51 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/DicomStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,8 @@ public class DicomStorage extends StorageService {
private Set<String> priorityAETs = new HashSet<>();

// Changed to support priority queue.
private BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private NetworkApplicationEntity[] naeArr = null;
private AtomicLong seqNum = new AtomicLong(0L);
/**
* Whether the index worker thread
* should exit once it finishes processing the queue.
*/
private volatile boolean workerShouldExit = false;

private static boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

/**
*
Expand Down Expand Up @@ -292,7 +284,7 @@ protected void onCStoreRQ(Association as, int pcid, DicomObject rq, PDVInputStre
if (uri != null) {
// enqueue to index
ImageElement element = new ImageElement(uri, as.getCallingAET(), seqNum.getAndIncrement());
queue.add(element);
IndexQueueWorker.getInstance().addElement(element);
}
}

Expand Down Expand Up @@ -372,60 +364,19 @@ static int compareElementsImpl(Set<String> priorityAETs, String thisAETitle, lon
return Long.compare(thisSeqNumber, otherSeqNumber);
}

class IndexerQueueWorker implements Runnable {
public Collection<IndexerInterface> plugins;

@Override
public void run() {
while (true) {
try {
if (workerShouldExit && queue.isEmpty()) {
break;
}
// Fetch an element from the priority queue
// with a timeout so that the thread can exit
ImageElement element = queue.poll(3, TimeUnit.SECONDS);
if (element == null) {
continue;
}
URI exam = element.getUri();
if (ASYNC_INDEX)
PluginController.getInstance().index(exam);
else
PluginController.getInstance().indexBlocking(exam);
} catch (InterruptedException ex) {
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
LOG.info("DICOM storage finished processing files");
}
}

private Thread indexer = new Thread(new IndexerQueueWorker(), "indexer-queue-worker");

/*
* Start the Storage Service
* @throws java.io.IOException
*/
public void start() throws IOException {
device.startListening(executor);
indexer.start();

indexer.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
IndexQueueWorker.getInstance().start();
}

/**
* Stop the storage service
*/
public void stop() {
device.stopListening();
workerShouldExit = true;
LOG.debug("Indexer queue worker will exit");
}
}
73 changes: 73 additions & 0 deletions dicoogle/src/main/java/pt/ua/dicoogle/server/IndexQueueWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package pt.ua.dicoogle.server;

import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;

import pt.ua.dicoogle.plugins.PluginController;
import pt.ua.dicoogle.server.DicomStorage.ImageElement;

/** Index worker actor.
*
* This singleton actor module is responsible for
* dispatching indexing tasks for the DICOM files received via C-STORE.
*
* @see DicomStorage
*/
public final class IndexQueueWorker {

private static final boolean ASYNC_INDEX = Boolean.valueOf(System.getProperty("dicoogle.index.async", "true"));

private static final org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(IndexQueueWorker.class);

private static IndexQueueWorker instance;

public synchronized static IndexQueueWorker getInstance() {
if (instance == null) {
instance = new IndexQueueWorker();
}
return instance;
}

private IndexQueueWorker() {}

private final BlockingQueue<ImageElement> queue = new PriorityBlockingQueue<>();
private Thread thread;

public synchronized void start() {
if (thread == null || !thread.isAlive()) {
thread = new Thread(WORKER_RUN, "indexer-queue-worker");
thread.start();
thread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
LOG.error("Fatal error in indexer queue worker", e);
ControlServices.getInstance().stopStorage();
LOG.warn("DICOM storage service was taken down to prevent further errors");
});
}
}

/** Push this DICOM element to be indexed. */
public void addElement(ImageElement element) {
queue.add(element);
}

private final Runnable WORKER_RUN = () -> {
while (true) {
try {
// Fetch an element from the priority queue
ImageElement element = queue.take();
URI exam = element.getUri();
if (ASYNC_INDEX) {
PluginController.getInstance().index(exam);
} else {
PluginController.getInstance().indexBlocking(exam);
}
} catch (InterruptedException ex) {
LOG.warn("Indexer queue worker thread interrupted", ex);
} catch (Exception ex) {
LOG.error("Unexpected error in indexer queue worker", ex);
}
}
};
}

0 comments on commit 343a391

Please sign in to comment.