Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk unindexing for IndexerInterface #598

Open
wants to merge 13 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import pt.ua.dicoogle.plugins.webui.WebUIPluginManager;
import pt.ua.dicoogle.sdk.*;
import pt.ua.dicoogle.sdk.datastructs.Report;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.datastructs.SearchResult;
import pt.ua.dicoogle.sdk.datastructs.dim.DimLevel;
import pt.ua.dicoogle.sdk.settings.ConfigurationHolder;
Expand All @@ -45,6 +46,7 @@
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.ZipFile;

Expand Down Expand Up @@ -761,7 +763,43 @@ public void unindex(URI path, Collection<String> indexProviders) {
}
}

/** Issue an unindexation procedure to the given indexers.
/** Issue the removal of indexed entries in bulk.
*
* @param indexProvider the name of the indexer
* @param items a collections of item identifiers to unindex
* @param progressCallback an optional function (can be `null`),
* called for every batch of items successfully unindexed
* to indicate early progress
* and inform consumers that
* it is safe to remove or exclude the unindexed item
* @return an asynchronous task object returning
* a report containing which files were not unindexed,
* and whether some of them were not found in the database
* @throws IOException
*/
public Task<UnindexReport> unindex(String indexProvider, Collection<URI> items, Consumer<Collection<URI>> progressCallback) throws IOException {
logger.info("Starting unindexing procedure for {} items", items.size());

IndexerInterface indexer = null;
if (indexProvider != null) {
indexer = this.getIndexerByName(indexProvider, true);
}
if (indexer == null) {
indexer = this.getIndexingPlugins(true).iterator().next();
}
Task<UnindexReport> task = indexer.unindex(items, progressCallback);
if (task != null) {
final String taskUniqueID = UUID.randomUUID().toString();
task.setName(String.format("[%s]unindex", indexer.getName()));
task.onCompletion(() -> {
logger.info("Unindexing task [{}] complete", taskUniqueID);
});
taskManager.dispatch(task);
}
return task;
}

/** Issue an unindexing procedure to the given indexers.
*
* @param path the URI of the directory or file to unindex
* @param indexers a collection of providers
Expand All @@ -782,7 +820,7 @@ public void remove(URI uri) {
}

public void doRemove(URI uri, StorageInterface si) {
if (si.handles(uri)) {
if (Objects.equals(uri.getScheme(), si.getScheme())) {
si.remove(uri);
} else {
logger.warn("Storage Plugin does not handle URI: {},{}", uri, si);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.servlet.ServletException;
Expand All @@ -42,6 +43,7 @@
import pt.ua.dicoogle.plugins.PluginController;
import pt.ua.dicoogle.sdk.QueryInterface;
import pt.ua.dicoogle.sdk.datastructs.SearchResult;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.task.JointQueryTask;
import pt.ua.dicoogle.sdk.task.Task;

Expand Down Expand Up @@ -81,26 +83,50 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
"No arguments provided; must include either one of `uri`, `SOPInstanceUID`, `SeriesInstanceUID` or `StudyInstanceUID`");
return;
}

PluginController pc = PluginController.getInstance();

long indexed = 0;
long failed = 0;
long notfound = 0;

Collection<String> uris = resolveURIs(paramUri, paramSop, paramSeries, paramStudy);
Collection<URI> uris = resolveURIs(paramUri, paramSop, paramSeries, paramStudy);

// unindex
for (String strUri : uris) {
try {
URI uri = new URI(strUri);
// if only one entry, do it inline
if (uris.size() <= 1) {
for (URI uri : uris) {
try {
PluginController.getInstance().unindex(uri, providers);
pc.unindex(uri, providers);
indexed += 1;
} catch (RuntimeException ex) {
logger.error("Failed to unindex {}", uri, ex);
failed += 1;
}
} catch (URISyntaxException ex) {
logger.warn("Received bad URI", ex);
failed += 1;
}

} else {
// if many, use bulk unindexing
List<Task<UnindexReport>> tasks = new ArrayList<>();

if (providers == null) {
providers = pc.getIndexingPlugins(true).stream()
.map(p -> p.getName())
.collect(Collectors.toList());
}
for (String indexProvider: providers) {
tasks.add(pc.unindex(indexProvider, uris, null));
}

int i = 0;
for (Task<UnindexReport> task: tasks) {
try {
UnindexReport report = task.get();
indexed = uris.size() - report.notUnindexedFileCount();
failed = report.failedFileCount();
notfound = report.getNotFound().size();
} catch (Exception ex) {
logger.error("Task to unindex items in {} failed", providers.get(i), ex);
}
}
}

Expand All @@ -109,15 +135,18 @@ protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws S
JSONObject obj = new JSONObject();
obj.put("indexed", indexed);
obj.put("failed", failed);
obj.put("notFound", notfound);
resp.setStatus(200);
resp.getWriter().write(obj.toString());
}

/// Convert the given parameters into a list of URIs
private static Collection<String> resolveURIs(String[] paramUri, String[] paramSop, String[] paramSeries,
private static Collection<URI> resolveURIs(String[] paramUri, String[] paramSop, String[] paramSeries,
String[] paramStudy) {
if (paramUri != null) {
return Arrays.asList(paramUri);
return Stream.of(paramUri)
.map(URI::create)
.collect(Collectors.toList());
}
String attribute = null;
if (paramSop != null) {
Expand All @@ -142,19 +171,19 @@ public void onCompletion() {}
};
try {
return StreamSupport.stream(PluginController.getInstance()
.queryAll(holder, dcmAttribute + ":" + uid).get().spliterator(), false);
.queryAll(holder, dcmAttribute + ":\"" + uid + '"').get().spliterator(), false);
} catch (InterruptedException | ExecutionException ex) {
throw new RuntimeException(ex);
}
}).map(r -> r.getURI().toString()).collect(Collectors.toList());
}).map(r -> r.getURI()).collect(Collectors.toList());

}
String dicomProvider = dicomProviders.iterator().next();
return Arrays.stream(paramSop).flatMap(uid -> {
// translate to URIs
QueryInterface dicom = PluginController.getInstance().getQueryProviderByName(dicomProvider, false);

return StreamSupport.stream(dicom.query(dcmAttribute + ":" + uid).spliterator(), false);
}).map(r -> r.getURI().toString()).collect(Collectors.toList());
return StreamSupport.stream(dicom.query(dcmAttribute + ":\"" + uid + '"').spliterator(), false);
}).map(r -> r.getURI()).collect(Collectors.toList());
}
}
107 changes: 94 additions & 13 deletions sdk/src/main/java/pt/ua/dicoogle/sdk/IndexerInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,73 @@
*/
package pt.ua.dicoogle.sdk;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

import pt.ua.dicoogle.sdk.datastructs.Report;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport;
import pt.ua.dicoogle.sdk.datastructs.UnindexReport.FailedUnindex;
import pt.ua.dicoogle.sdk.task.Task;

/**
* Index Interface Plugin. Indexers analyze documents for performing queries. They may index
* documents by DICOM metadata for instance, but other document processing procedures may be involved.
* Indexing plugin interface.
*
* Indexers analyze and record documents for future retrieval.
* They are primarily designed to index DICOM meta-data,
* which in that case they are accompanied by a query plugin,
* and both plugins are called <em>DIM providers</em>.
* However, indexers are not restricted to processing DICOM files,
* or to retrieving and indexing meta-data.
*
* @author Luís A. Bastião Silva <bastiao@ua.pt>
* @author Luís A. Bastião Silva <bastiao@bmd-software.com>
* @author Frederico Valente <fmvalente@ua.pt>
*/
public interface IndexerInterface extends DicooglePlugin {

/**
* Indexes the file path to the database. Indexation procedures are asynchronous, and will return
* Indexes the file path to the database. Indexing procedures are asynchronous, and will return
* immediately after the call. The outcome is a report that can be retrieved from the given task
* as a future.
*
* @param file directory or file to index
* @return a representation of the asynchronous indexation task
* @return a representation of the asynchronous indexing task
*/
public Task<Report> index(StorageInputStream file, Object... parameters);

/**
* Indexes multiple file paths to the database. Indexation procedures are asynchronous, and will return
* Indexes multiple file paths to the database. Indexing procedures are asynchronous, and will return
* immediately after the call. The outcomes are aggregated into a single report and can be retrieved from
* the given task as a future.
*
* @param files a collection of directories and/or files to index
* @return a representation of the asynchronous indexation task
* @return a representation of the asynchronous indexing task
*/
public Task<Report> index(Iterable<StorageInputStream> files, Object... parameters);


/**
* Checks whether the file in the given path can be indexed by this indexer. The indexer should verify if
* the file holds compatible content (e.g. a DICOM file). If this method returns false, the file will not
* be indexed.
*
* Checks whether the file in the given path can be indexed by this indexer.
*
* The method should return <code>false</code> <em>if and only if</em>
* it is sure that the file cannot be indexed,
* by observation of its URI.
* This method exists in order to filter out files
* that are obviously incompatible for the indexer.
* However, there are situations where this is not reliable,
* since the storage is free to establish its own file naming rules,
* and that can affect the file extension.
* In case of doubt, it is recommended to leave the default implementation,
* which returns true unconditionally.
* Attempts to read invalid files can instead
* be handled gracefully by the indexer by capturing exceptions.
*
* @param path a URI to the file to check
* @return whether the indexer can handle the file at the given path
* @return whether the item at the given URI path can be fed to this indexer
*/
public default boolean handles(URI path) {
return true;
Expand All @@ -67,8 +93,63 @@ public default boolean handles(URI path) {
/**
* Removes the indexed file at the given path from the database.
*
* Unlike the other indexing tasks,
* this operation is synchronous
* and will only return when the operation is done.
*
* @param path the URI of the document
* @return whether it was successfully deleted from the database
*/
public boolean unindex(URI path);

/**
* Removes indexed files from the database in bulk.
*
* The default implementation unindexes each item one by one
* in a non-specified order via {@linkplain #unindex(URI)},
* but indexers may implement this as
* one or more individual operations in batch,
* thus becoming faster than unindexing each item individually.
*
* Like {@linkplain index},
* this operation is asynchronous.
* One can keep track of the unindexing task's progress
* by passing a callback function as the second parameter.
*
* @param uris the URIs of the items to unindex
* @param progressCallback an optional function (can be `null`),
* called for every batch of items successfully unindexed
* to indicate early progress
* and inform consumers that
* it is safe to remove or exclude the unindexed item
* @return an asynchronous task object returning
* a report containing which files were not unindexed,
* and whether some of them were not found in the database
* @throws IOException if an error occurred
* before the unindexing operation could start,
* such as when failing to access or open the database
*/
public default Task<UnindexReport> unindex(Collection<URI> uris, Consumer<Collection<URI>> progressCallback)
throws IOException {
Objects.requireNonNull(uris);
return new Task<>(() -> {
List<FailedUnindex> failures = new ArrayList<>();
for (URI uri : uris) {
try {
if (unindex(uri)) {
// unindexed successfully
if (progressCallback != null) {
progressCallback.accept(Collections.singleton(uri));
}
} else {
// failed to unindex, reason unknown
failures.add(new FailedUnindex(Collections.singleton(uri), null));
}
} catch (Exception ex) {
failures.add(new FailedUnindex(Collections.singleton(uri), ex));
}
}
return UnindexReport.withFailures(failures);
tiberio-baptista marked this conversation as resolved.
Show resolved Hide resolved
});
}
}
Loading
Loading