diff --git a/.github/workflows/test.sh b/.github/workflows/test.sh
index d978e97..4ba9e9b 100755
--- a/.github/workflows/test.sh
+++ b/.github/workflows/test.sh
@@ -81,6 +81,6 @@ echo "HOST=localhost" >> src/test/resources/config.properties
echo "PORT=1521" >> src/test/resources/config.properties
echo "USER=test" >> src/test/resources/config.properties
echo "PASSWORD=test" >> src/test/resources/config.properties
-echo "CONNECT_TIMEOUT=60" >> src/test/resources/config.properties
-echo "SQL_TIMEOUT=60" >> src/test/resources/config.properties
+echo "CONNECT_TIMEOUT=120" >> src/test/resources/config.properties
+echo "SQL_TIMEOUT=120" >> src/test/resources/config.properties
mvn clean compile test
diff --git a/README.md b/README.md
index fa9299a..06c733e 100644
--- a/README.md
+++ b/README.md
@@ -508,12 +508,13 @@ for the out parameters is emitted last, after the `Result` for each cursor.
Oracle R2DBC supports type mappings between Java and SQL for non-standard data
types of Oracle Database.
-| Oracle SQL Type | Java Type |
-|---------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|
-| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` |
-| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` |
-| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` |
-| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` |
+| Oracle SQL Type | Java Type |
+|---------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------|
+| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` |
+| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` |
+| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` |
+| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` |
+| [SYS_REFCURSOR](https://docs.oracle.com/en/database/oracle/oracle-database/21/lnpls/static-sql.html#GUID-470A7A99-888A-46C2-BDAF-D4710E650F27) | `io.r2dbc.spi.Result` |
> Unlike the standard SQL type named "DATE", the Oracle Database type named
> "DATE" stores values for year, month, day, hour, minute, and second. The
> standard SQL type only stores year, month, and day. LocalDateTime objects are able
@@ -553,6 +554,35 @@ prefetched entirely, a smaller prefetch size can be configured using the
option, and the LOB can be consumed as a stream. By mapping LOB columns to
`Blob` or `Clob` objects, the content can be consumed as a reactive stream.
+### REF Cursors
+Use the `oracle.r2dbc.OracleR2dbcTypes.REF_CURSOR` type to bind `SYS_REFCURSOR` out
+parameters:
+```java
+Publisher executeProcedure(Connection connection) {
+ connection.createStatement(
+ "BEGIN example_procedure(:cursor_parameter); END;")
+ .bind("cursor_parameter", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .execute()
+}
+```
+A `SYS_REFCURSOR` out parameter can be mapped to an `io.r2dbc.spi.Result`:
+```java
+Publisher mapOutParametersResult(Result outParametersResult) {
+ return outParametersResult.map(outParameters ->
+ outParameters.get("cursor_parameter", Result.class));
+}
+```
+The rows of a `SYS_REFCURSOR` may be consumed from the `Result` it is
+mapped to:
+```java
+Publisher mapRefCursorRows(Result refCursorResult) {
+ return refCursorResult.map(row ->
+ new ExampleObject(
+ row.get("id_column", Long.class),
+ row.get("value_column", String.class)));
+}
+```
+
## Secure Programming Guidelines
The following security related guidelines should be adhered to when programming
with the Oracle R2DBC Driver.
diff --git a/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java b/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java
index 6f88622..08c6c56 100644
--- a/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java
+++ b/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java
@@ -20,6 +20,7 @@
*/
package oracle.r2dbc;
+import io.r2dbc.spi.Result;
import io.r2dbc.spi.Type;
import oracle.sql.json.OracleJsonObject;
@@ -92,6 +93,12 @@ private OracleR2dbcTypes() {}
public static final Type TIMESTAMP_WITH_LOCAL_TIME_ZONE =
new TypeImpl(LocalDateTime.class, "TIMESTAMP WITH LOCAL TIME ZONE");
+ /**
+ * A cursor that is returned by a procedural call.
+ */
+ public static final Type REF_CURSOR =
+ new TypeImpl(Result.class, "SYS_REFCURSOR");
+
/**
* Implementation of the {@link Type} SPI.
*/
diff --git a/src/main/java/oracle/r2dbc/impl/DependentCounter.java b/src/main/java/oracle/r2dbc/impl/DependentCounter.java
new file mode 100644
index 0000000..1d1694e
--- /dev/null
+++ b/src/main/java/oracle/r2dbc/impl/DependentCounter.java
@@ -0,0 +1,82 @@
+package oracle.r2dbc.impl;
+
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Mono;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ *
+ * A count of resources that depend on another resource to remain open. A
+ * dependent resource registers itself by incrementing the count, and
+ * deregisters itself by decrementing the count. The last dependent to
+ * deregister has the responsibility of subscribing to a {@code Publisher} that
+ * closes the resource it depended upon.
+ *
+ * This class is conceptually similar to a {@code java.util.concurrent.Phaser}.
+ * Parties register by calling {@link #increment()}, and deregister by calling
+ * {@link #decrement()}. Asynchronous "phase advancement" is then handled by
+ * the {@code Publisher} which {@code decrement} returns.
+ *
+ * This class offers a solution for tracking the consumption of
+ * {@link io.r2dbc.spi.Result} objects that depend on a JDBC statement to remain
+ * open until each result is consumed. Further explanations can be found in the
+ * JavaDocs of {@link OracleStatementImpl} and {@link OracleResultImpl}.
+ *
+ */
+class DependentCounter {
+
+ /** Count of dependents */
+ private final AtomicInteger count = new AtomicInteger(0);
+
+ /** Publisher that closes the depended upon resource */
+ private final Publisher closePublisher;
+
+ /**
+ * Constructs a new counter that returns a resource closing publisher to the
+ * last dependent which unregisters. The counter is initialized with a count
+ * of zero.
+ * @param closePublisher Publisher that closes a resource. Not null.
+ */
+ DependentCounter(Publisher closePublisher) {
+ this.closePublisher = closePublisher;
+ }
+
+ /**
+ * Increments the count of dependents by one.
+ *
+ * A corresponding call to {@link #decrement()} MUST occur by the dependent
+ * which has called {@code increment()}
+ *
+ */
+ void increment() {
+ count.incrementAndGet();
+ }
+
+ /**
+ *
+ * Returns a publisher that decrements the count of dependents by one when
+ * subscribed to.
+ *
+ * A corresponding call to {@link #increment()} MUST have previously occurred
+ * by the dependent which has called {@code decrement()}
+ *
+ *
+ * The dependent which has called this method MUST subscribe to the returned
+ * published. If the dependent that calls this method is the last dependent to
+ * do so, then the returned publisher will close the depended upon resource.
+ * Otherwise, if more dependents remain, the returned publisher does nothing.
+ * The caller of this method has no way to tell which is the case, so it must
+ * subscribe to be safe.
+ *
+ * @return A publisher that closes the depended upon resource after no
+ * dependents remain. Not null.
+ */
+ Publisher decrement() {
+ return Mono.defer(() ->
+ count.decrementAndGet() == 0
+ ? Mono.from(closePublisher)
+ : Mono.empty());
+ }
+
+}
diff --git a/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java b/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
index d00dbef..4d4b56c 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java
@@ -28,6 +28,7 @@
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcType;
+import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Type;
@@ -37,6 +38,7 @@
import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl;
import java.nio.ByteBuffer;
+import java.sql.ResultSet;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.NoSuchElementException;
@@ -65,19 +67,27 @@ class OracleReadableImpl implements io.r2dbc.spi.Readable {
/** Metadata of the values of this {@code Readable}. */
private final ReadablesMetadata> readablesMetadata;
+ /**
+ * A collection of results that depend on the JDBC statement which created
+ * this readable to remain open until all results are consumed.
+ */
+ private final DependentCounter dependentCounter;
+
/**
*
* Constructs a new {@code Readable} that supplies values of a
* {@code jdbcReadable} and obtains metadata of the values from
* {@code resultMetadata}.
*
+ *
* @param jdbcReadable Readable values from a JDBC Driver. Not null.
* @param readablesMetadata Metadata of each value. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OracleReadableImpl(
- JdbcReadable jdbcReadable, ReadablesMetadata> readablesMetadata,
- ReactiveJdbcAdapter adapter) {
+ DependentCounter dependentCounter, JdbcReadable jdbcReadable,
+ ReadablesMetadata> readablesMetadata, ReactiveJdbcAdapter adapter) {
+ this.dependentCounter = dependentCounter;
this.jdbcReadable = jdbcReadable;
this.readablesMetadata = readablesMetadata;
this.adapter = adapter;
@@ -96,9 +106,9 @@ private OracleReadableImpl(
* {@code metadata}. Not null.
*/
static Row createRow(
- JdbcReadable jdbcReadable, RowMetadataImpl metadata,
- ReactiveJdbcAdapter adapter) {
- return new RowImpl(jdbcReadable, metadata, adapter);
+ DependentCounter dependentCounter, JdbcReadable jdbcReadable,
+ RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
+ return new RowImpl(dependentCounter, jdbcReadable, metadata, adapter);
}
/**
*
@@ -113,9 +123,10 @@ static Row createRow(
* {@code metadata}. Not null.
*/
static OutParameters createOutParameters(
- JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata,
- ReactiveJdbcAdapter adapter) {
- return new OutParametersImpl(jdbcReadable, metadata, adapter);
+ DependentCounter dependentCounter, JdbcReadable jdbcReadable,
+ OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
+ return new OutParametersImpl(
+ dependentCounter, jdbcReadable, metadata, adapter);
}
/**
@@ -162,8 +173,8 @@ public T get(String name, Class type) {
/**
* Returns the 0-based index of the value identified by {@code name}. This
* method implements a case-insensitive name match. If more than one
- * value has a matching name, this method returns lowest index of all
- * matching values.
+ * value has a matching name, this method returns lowest of all indexes that
+ * match.
* @param name The name of a value. Not null.
* @return The index of the named value within this {@code Readable}
* @throws NoSuchElementException If no column has a matching name.
@@ -208,6 +219,9 @@ else if (io.r2dbc.spi.Clob.class.equals(type)) {
else if (LocalDateTime.class.equals(type)) {
value = getLocalDateTime(index);
}
+ else if (Result.class.equals(type)) {
+ value = getResult(index);
+ }
else if (Object.class.equals(type)) {
// Use the default type mapping if Object.class has been specified.
// This method is invoked recursively with the default mapping, so long
@@ -327,6 +341,36 @@ private LocalDateTime getLocalDateTime(int index) {
}
}
+ /**
+ *
+ * Converts the value of a column at the specified {@code index} to a
+ * {@code Result}. This method is intended for mapping REF CURSOR values,
+ * which JDBC will map to a {@link ResultSet}.
+ *
+ * A REF CURSOR is closed when the JDBC statement that created it is closed.
+ * To prevent the cursor from getting closed, the Result returned by this
+ * method is immediately added to the collection of results that depend on the
+ * JDBC statement.
+ *
+ * The Result returned by this method is received by user code, and user code
+ * MUST then fully consume it. The JDBC statement is not closed until the
+ * result is fully consumed.
+ *
+ * @param index 0 based column index
+ * @return A column value as a {@code Result}, or null if the column value is
+ * NULL.
+ */
+ private Result getResult(int index) {
+ ResultSet resultSet = jdbcReadable.getObject(index, ResultSet.class);
+
+ if (resultSet == null)
+ return null;
+
+ dependentCounter.increment();
+ return OracleResultImpl.createQueryResult(
+ dependentCounter, resultSet, adapter);
+ }
+
/**
* Checks if the specified zero-based {@code index} is a valid column index
* for this row. This method is used to verify index value parameters
@@ -368,10 +412,9 @@ private static final class RowImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private RowImpl(
- JdbcReadable jdbcReadable,
- RowMetadataImpl metadata,
- ReactiveJdbcAdapter adapter) {
- super(jdbcReadable, metadata, adapter);
+ DependentCounter dependentCounter, JdbcReadable jdbcReadable,
+ RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
+ super(dependentCounter, jdbcReadable, metadata, adapter);
this.metadata = metadata;
}
@@ -410,10 +453,9 @@ private static final class OutParametersImpl
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
private OutParametersImpl(
- JdbcReadable jdbcReadable,
- OutParametersMetadataImpl metadata,
- ReactiveJdbcAdapter adapter) {
- super(jdbcReadable, metadata, adapter);
+ DependentCounter dependentCounter, JdbcReadable jdbcReadable,
+ OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) {
+ super(dependentCounter,jdbcReadable, metadata, adapter);
this.metadata = metadata;
}
diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
index ed6ed47..502971f 100644
--- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java
@@ -38,7 +38,6 @@
import java.sql.SQLWarning;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -55,124 +54,63 @@
*
* Abstract class providing a base implementation of the R2DBC SPI
* {@link Result} interface. Concrete subclasses implement
- * {@link #publishSegments(Function)} to return a {@link Publisher} that emits
- * the output of a {@link Segment} mapping function for each {@code Segment} of
- * the {@code Result}. Implementations of R2DBC SPI methods in the base
- * class invoke {@code publishSegments} with a mapping function that
+ * {@link #mapSegments(Class, Function)} to return a {@link Publisher} that
+ * emits the output of a {@link Segment} mapping function for each
+ * {@code Segment} of the {@code Result}. Implementations of R2DBC SPI methods
+ * in the base class invoke {@code mapSegments} with a mapping function that
* filters the emitted {@code Segment}s according to the specification of the
* SPI method.
*
*/
abstract class OracleResultImpl implements Result {
- /**
- * Object output by mapping functions provided to
- * {@link #publishSegments(Function)} for {@code Segment}s that do not
- * satisfy a filter. Downstream operators of
- * {@link #publishSegments(Function)} filter this object so that it is not
- * emitted to user code.
- */
- private static final Object FILTERED = new Object();
-
/**
* Indicates if a method call on this {@code Result} has already returned a
* {@code Publisher} that allows this {@code Result} to be consumed. In
- * conformance with the R2DBC SPI, multiple attempts to consume the this
- * result will yield an {@code IllegalStateException}.
+ * conformance with the R2DBC SPI, an {@code IllegalStateException} is thrown
+ * if multiple attempts are made to consume this result; A result may only be
+ * consumed once.
*/
private boolean isPublished = false;
- /**
- * Reference to a publisher that must be subscribed to after all segments of
- * this result have been consumed. The reference is updated to {@code null}
- * after the publisher has been subscribed to.
- */
- private final AtomicReference> onConsumed =
- new AtomicReference<>(Mono.empty());
-
/** Private constructor invoked by inner subclasses */
private OracleResultImpl() { }
- /**
- * Publishes the output of a {@code mappingFunction} for each {@code Segment}
- * of this {@code Result}.
- * @param mappingFunction {@code Segment} mapping function. Not null.
- * @param Output type of the {@code mappingFunction}
- * @return {@code Publisher} of values output by the {@code mappingFunction}
- */
- abstract Publisher publishSegments(
- Function mappingFunction);
-
/**
*
- * Publishes the output of a {@code mappingFunction} for each {@code Segment}
- * of this {@code Result}, where the {@code Segment} is an instance of the
- * specified {@code type}.
+ * Returns a publisher that emits the output of a segment mapping function for
+ * each segment of this result. The mapping function only accepts segments of
+ * a specified type. This method is called from methods of the public API to
+ * create publishers of different value types. For instance, the
+ * {@link #getRowsUpdated()} method creates a {@code Publisher} by
+ * calling this method with an {@code UpdateCount.class} segment type and a
+ * function that maps {@code UpdateCount} segments to a {@code Long}.
*
- * This method sets {@link #isPublished} to prevent multiple consumptions
- * of this {@code Result}. In case this is a {@link FilteredResult}, this
- * method must invoke {@link #publishSegments(Function)}, before returning,
- * in order to update {@code isPublished} of the {@link FilteredResult#result}
- * as well.
- *
- * When the returned publisher terminates with {@code onComplete},
- * {@code onError}, or {@code cancel}, the {@link #onConsumed} publisher is
- * subscribed to. The {@code onConsumed} reference is updated to {@code null}
- * so that post-consumption calls to {@link #onConsumed(Publisher)} can detect
- * that this result is already consumed.
- *
- * The returned {@code Publisher} emits {@code onError} with an
- * {@link R2dbcException} if this {@code Result} has a {@link Message} segment
- * and the {@code type} is not a super-type of {@code Message}. This
- * corresponds to the specified behavior of R2DBC SPI methods
- * {@link #map(BiFunction)}, {@link #map(BiFunction)}, and
- * {@link #getRowsUpdated()}
+ * Any segments, other than {@code Message} segments, that are not an instance
+ * of the {@code segmentType} should not be passed to the
+ * {@code segmentMapper}, and should not emitted by the returned publisher in
+ * any form. However, {@code Message} segments are an exception. The error of
+ * a {@code Message} segment must be emitted as an {@code onError} signal,
+ * even if the {@code segmentType} is not assignable to {@code Message}
+ * segments.
*
- * @param type {@code Segment} type to be mapped. Not null.
- * @param mappingFunction {@code Segment} mapping function. Not null.
- * @param {@code Segment} type to be mapped
- * @param Output type of the {@code mappingFunction}
- * @return {@code Publisher} of mapped {@code Segment}s
+ * @param segmentType Class of {@code Segment} to map. Not null.
+ * @param segmentMapper Maps segments to published values.
+ * @return A publisher that emits the mapped values.
+ * @param Segment type to map
+ * @param Type of mapped value
*/
- @SuppressWarnings("unchecked")
- private Publisher publishSegments(
- Class type, Function super T, U> mappingFunction) {
-
- setPublished();
-
- Mono whenConsumed = Mono.defer(() -> {
- Publisher consumedPublisher = onConsumed.getAndSet(null);
- return consumedPublisher == null
- ? Mono.empty()
- : Mono.from((Publisher)consumedPublisher);
- });
-
- return Flux.concatDelayError(
- Flux.from(publishSegments(segment -> {
- if (type.isInstance(segment))
- return mappingFunction.apply(type.cast(segment));
- else if (segment instanceof OracleR2dbcWarning)
- return (U)FILTERED;
- else if (segment instanceof Message)
- throw ((Message)segment).exception();
- else
- return (U)FILTERED;
- }))
- .filter(object -> object != FILTERED),
- whenConsumed)
- .doOnCancel(() ->
- Mono.from(whenConsumed).subscribe());
- }
+ protected abstract Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper);
/**
* {@inheritDoc}
*
- * Implements the R2DBC SPI method to return a {@code Publisher} emitting the
- * flat-mapped output of {@code Publisher}s output by a
- * {@code mappingFunction} for all {@code Segments} this {@code Result}.
- * {@code Publisher}s output by the {@code mappingFunction} are subscribed to
- * serially with the completion of the {@code Publisher} output for any
- * previous {@code Segment}.
+ * Implements the R2DBC SPI method to return a flat-mapping of publishers
+ * generated by a {@code mappingFunction}. Publishers output by the
+ * {@code mappingFunction} are subscribed to serially. Serial subscription is
+ * implemented by the {@code Flux.concat(Publisher)} factory called
+ * by this method.
*
* The returned {@code Publisher} does not support multiple
* {@code Subscriber}s
@@ -182,8 +120,9 @@ else if (segment instanceof Message)
public Publisher flatMap(
Function> mappingFunction) {
requireNonNull(mappingFunction, "mappingFunction is null");
+ setPublished();
return singleSubscriber(Flux.concat(
- publishSegments(Segment.class, mappingFunction)));
+ mapSegments(Segment.class, mappingFunction)));
}
/**
@@ -200,7 +139,8 @@ public Publisher flatMap(
*/
@Override
public Publisher getRowsUpdated() {
- return publishSegments(UpdateCount.class, UpdateCount::value);
+ setPublished();
+ return mapSegments(UpdateCount.class, UpdateCount::value);
}
/**
@@ -218,8 +158,9 @@ public Publisher getRowsUpdated() {
public Publisher map(
BiFunction mappingFunction) {
requireNonNull(mappingFunction, "mappingFunction is null");
- return singleSubscriber(publishSegments(RowSegment.class,
- rowSegment -> {
+ setPublished();
+ return singleSubscriber(mapSegments(
+ RowSegment.class, rowSegment -> {
Row row = rowSegment.row();
return mappingFunction.apply(row, row.getMetadata());
}));
@@ -240,8 +181,9 @@ public Publisher map(
public Publisher map(
Function super Readable, ? extends T> mappingFunction) {
requireNonNull(mappingFunction, "mappingFunction is null");
- return singleSubscriber(publishSegments(ReadableSegment.class,
- readableSegment ->
+ setPublished();
+ return singleSubscriber(mapSegments(
+ ReadableSegment.class, readableSegment ->
mappingFunction.apply(readableSegment.getReadable())));
}
@@ -250,44 +192,52 @@ public Publisher map(
*
* Implements the R2DBC SPI method to return a new instance of
* {@code OracleResultImpl} that implements
- * {@link OracleResultImpl#publishSegments(Function)} to call
- * {@link OracleResultImpl#publishSegments(Class, Function)} on this instance
- * of {@code OracleResultImpl}. The invocation of {@code publishSegments}
- * on this instance ensures that its consumption state is updated correctly.
- * The invocation of {@code publishSegments} is provided with a mapping
- * function that outputs the {@link #FILTERED} object for {@code Segment}s
- * rejected by the {@code filter}.
+ * {@link OracleResultImpl#mapSegments(Class, Function)} to filter segments of
+ * this result with the specified {@code filter} predicate.
*
*/
@Override
public OracleResultImpl filter(Predicate filter) {
requireNonNull(filter, "filter is null");
-
- if (isPublished)
- throw multipleConsumptionException();
-
return new FilteredResult(this, filter);
}
/**
*
- * Sets a publisher that is subscribed to when all segments of this result
- * have been consumed.
+ * Adds this result to a collection of results that depend on a JDBC
+ * statement. After this method is called, the JDBC statement must remain open
+ * until this result signals that it has been closed.
+ *
+ * This method must only be invoked when it is certain that this
+ * result will be received by user code. If user code never receives this
+ * result, then it can never consume it, and the JDBC statement is never
+ * closed. Otherwise, once this result reaches user code, that code is
+ * responsible for consuming it. The R2DBC specification requires results to
+ * be fully consumed; There is no other way for Oracle R2DBC to know when
+ * it is safe to close the JDBC statement.
+ *
+ * Depending on the type of this result, this method may or may not actually
+ * do anything. For instance, if this result is an update count, then it
+ * doesn't depend on a JDBC statement, and so it won't actually register
+ * itself as a dependent. This result registers itself only if it retains
+ * something like {@code ResultSet} which depends on the JDBC statement to
+ * remain open.
*
- * If this result has already been consumed, then the publisher is not
- * subscribed to.
+ * Additional results may be added to the collection after this method
+ * returns. In particular, a REF CURSOR is backed by a ResultSet, and that
+ * ResultSet will be closed when the JDBC statement is closed. At the time
+ * when this method is called, it is not known what the user defined mapping
+ * function will do. A check to see if REF CURSOR ResultSet was created can
+ * only after the user defined function has executed.
*
- * A subsequent call to this method overwrites the publisher that has been
- * set by the current call.
+ * This method is implemented by the OracleResultImpl super class to do
+ * nothing. Subclasses that depend on the JDBC statement override this method
+ * and add themselves to the collection of dependent results.
*
- * @param onConsumed Publisher to subscribe to when consumed. Not null.
- * @return true if this result has not already been consumed, and the
- * publisher will be subscribed to. Returns false if the publisher will not
- * be subscribed to because this result is already consumed.
*/
- final boolean onConsumed(Publisher onConsumed) {
- return null != this.onConsumed.getAndUpdate(
- current -> current == null ? null : onConsumed);
+ void addDependent() {
+ // Do nothing for non-dependent results. This method is overridden by the
+ // DependentResult subclass to add a dependent result.
}
/**
@@ -295,49 +245,50 @@ final boolean onConsumed(Publisher onConsumed) {
* {@code Result} to be consumed. This method enforces the {@link Result} SPI
* contract which does not allow the same result to be consumed more than
* once.
+ *
+ * This method MUST be called before returning a Publisher to user code from
+ * all public APIs of this class.
+ *
* @throws IllegalStateException If this result has already been consumed.
*/
protected void setPublished() {
- if (! isPublished)
- isPublished = true;
- else
- throw multipleConsumptionException();
- }
-
- /**
- * Returns an {@code IllegalStateException} to be thrown when user code
- * attempts to consume a {@code Result} more than once with invocations of
- * {@link #map(BiFunction)}, {@link #map(Function)},
- * {@link #flatMap(Function)}, or {@link #getRowsUpdated()}.
- * @return {@code IllegalStateException} indicating multiple consumptions
- */
- private static IllegalStateException multipleConsumptionException() {
- return new IllegalStateException(
- "A result can not be consumed more than once");
+ if (isPublished) {
+ throw new IllegalStateException(
+ "A result can not be consumed more than once");
+ }
+ isPublished = true;
}
/**
* Creates a {@code Result} that publishes a JDBC {@code resultSet} as
* {@link RowSegment}s
+ * @param dependentCounter Collection of results that depend on the JDBC
+ * statement which created the {@code ResultSet} to remain open until all
+ * results are consumed.
* @param resultSet {@code ResultSet} to publish. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
* @return A {@code Result} for a ResultSet
*/
public static OracleResultImpl createQueryResult(
- ResultSet resultSet, ReactiveJdbcAdapter adapter) {
- return new ResultSetResult(resultSet, adapter);
+ DependentCounter dependentCounter, ResultSet resultSet,
+ ReactiveJdbcAdapter adapter) {
+ return new ResultSetResult(dependentCounter, resultSet, adapter);
}
/**
* Creates a {@code Result} that publishes {@code outParameters} as
* {@link OutSegment}s
+ * @param dependentCounter Collection of results that depend on the JDBC
+ * statement which created the {@code OutParameters} to remain open until all
+ * results are consumed.
* @param outParameters {@code OutParameters} to publish. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
* @return A {@code Result} for {@code OutParameters}
*/
static OracleResultImpl createCallResult(
- OutParameters outParameters, ReactiveJdbcAdapter adapter) {
- return new CallResult(outParameters, adapter);
+ DependentCounter dependentCounter, OutParameters outParameters,
+ ReactiveJdbcAdapter adapter) {
+ return new CallResult(dependentCounter, outParameters, adapter);
}
/**
@@ -346,12 +297,17 @@ static OracleResultImpl createCallResult(
* {@code ResultSet} as {@link RowSegment}s
* @return A {@code Result} for values generated by DML
* @param updateCount Update count to publish
+ * @param dependentCounter Collection of results that depend on the JDBC
+ * statement which created the {@code generatedKeys} {@code ResultSet} to
+ * remain open until all results are consumed.
* @param generatedKeys Generated values to publish. Not null.
* @param adapter Adapts JDBC calls into reactive streams. Not null.
*/
static OracleResultImpl createGeneratedValuesResult(
- long updateCount, ResultSet generatedKeys, ReactiveJdbcAdapter adapter) {
- return new GeneratedKeysResult(updateCount, generatedKeys, adapter);
+ long updateCount, DependentCounter dependentCounter,
+ ResultSet generatedKeys, ReactiveJdbcAdapter adapter) {
+ return new GeneratedKeysResult(
+ updateCount, dependentCounter, generatedKeys, adapter);
}
/**
@@ -425,12 +381,28 @@ private UpdateCountResult(long updateCount) {
this.updateCount = updateCount;
}
+ /**
+ * {@inheritDoc}
+ *
+ * This method uses Mono's fromSupplier factory to defer segment mapping
+ * until the publisher is subscribed to. This ensures that segments are
+ * consumed in the correct order when the returned publisher is concatenated
+ * after another, as with
+ * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for
+ * instance. Additionally, the factory handles any exception thrown by the
+ * segment mapper by translating it in to an onError signal.
+ *
+ */
@Override
- Publisher publishSegments(Function mappingFunction) {
- return updateCount >= 0
- ? Mono.just(new UpdateCountImpl(updateCount))
- .map(mappingFunction)
- : Mono.empty();
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ if (!segmentType.isAssignableFrom(UpdateCountImpl.class))
+ return Mono.empty();
+
+ return Mono.fromSupplier(() ->
+ segmentMapper.apply(segmentType.cast(
+ new UpdateCountImpl(updateCount))));
}
}
@@ -460,29 +432,36 @@ Publisher publishSegments(Function mappingFunction) {
* values from that method.
*
*/
- private static final class ResultSetResult extends OracleResultImpl {
+ private static final class ResultSetResult extends DependentResult {
private final ResultSet resultSet;
private final RowMetadataImpl metadata;
private final ReactiveJdbcAdapter adapter;
private ResultSetResult(
- ResultSet resultSet, ReactiveJdbcAdapter adapter) {
+ DependentCounter dependentCounter, ResultSet resultSet,
+ ReactiveJdbcAdapter adapter) {
+ super(dependentCounter);
this.resultSet = resultSet;
this.metadata = createRowMetadata(fromJdbc(resultSet::getMetaData));
this.adapter = adapter;
}
@Override
- Publisher publishSegments(Function mappingFunction) {
+ protected Publisher mapDependentSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
- // Avoiding object allocating by reusing the same Row object
+ if (!segmentType.isAssignableFrom(RowSegmentImpl.class))
+ return Mono.empty();
+
+ // Avoiding object allocation by reusing the same Row object
ReusableJdbcReadable reusableJdbcReadable = new ReusableJdbcReadable();
- Row row = createRow(reusableJdbcReadable, metadata, adapter);
+ Row row =
+ createRow(dependentCounter, reusableJdbcReadable, metadata, adapter);
return adapter.publishRows(resultSet, jdbcReadable -> {
reusableJdbcReadable.current = jdbcReadable;
- return mappingFunction.apply(new RowSegmentImpl(row));
+ return segmentMapper.apply(segmentType.cast(new RowSegmentImpl(row)));
});
}
@@ -520,40 +499,57 @@ private static final class GeneratedKeysResult extends OracleResultImpl {
private final OracleResultImpl generatedKeysResult;
private GeneratedKeysResult(
- long updateCount, ResultSet generatedKeys, ReactiveJdbcAdapter adapter) {
+ long updateCount, DependentCounter dependentCounter,
+ ResultSet generatedKeys, ReactiveJdbcAdapter adapter) {
updateCountResult = createUpdateCountResult(updateCount);
- generatedKeysResult = createQueryResult(generatedKeys, adapter);
+ generatedKeysResult =
+ createQueryResult(dependentCounter, generatedKeys, adapter);
}
@Override
- Publisher publishSegments(Function mappingFunction) {
- return Flux.from(updateCountResult.publishSegments(mappingFunction))
- .concatWith(generatedKeysResult.publishSegments(mappingFunction));
+ void addDependent() {
+ generatedKeysResult.addDependent();
+ }
+
+ @Override
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+ return Flux.concat(
+ updateCountResult.mapSegments(segmentType, segmentMapper),
+ generatedKeysResult.mapSegments(segmentType, segmentMapper));
}
}
/**
- * {@link OracleResultImpl} subclass that publishes an single instance of
+ * {@link OracleResultImpl} subclass that publishes a single instance of
* {@link OutParameters} as an {@link OutSegment}.
*/
- private static final class CallResult extends OracleResultImpl {
+ private static final class CallResult extends DependentResult {
private final OutParameters outParameters;
private final ReactiveJdbcAdapter adapter;
private CallResult(
- OutParameters outParameters, ReactiveJdbcAdapter adapter) {
+ DependentCounter dependentCounter, OutParameters outParameters,
+ ReactiveJdbcAdapter adapter) {
+ super(dependentCounter);
this.outParameters = outParameters;
this.adapter = adapter;
}
@Override
- Publisher publishSegments(Function mappingFunction) {
+ protected Publisher mapDependentSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ if (!segmentType.isAssignableFrom(OutSegmentImpl.class))
+ return Mono.empty();
+
// Acquire the JDBC lock asynchronously as the outParameters are backed
// by a JDBC CallableStatement, and it may block a thread when values
// are accessed with CallableStatement.getObject(...)
return adapter.getLock().get(() ->
- mappingFunction.apply(new OutSegmentImpl(outParameters)));
+ segmentMapper.apply(segmentType.cast(
+ new OutSegmentImpl(outParameters))));
}
}
@@ -570,10 +566,17 @@ private BatchUpdateResult(long[] updateCounts) {
}
@Override
- Publisher publishSegments(Function mappingFunction) {
- return Flux.fromStream(LongStream.of(updateCounts)
- .mapToObj(UpdateCountImpl::new))
- .map(mappingFunction);
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ if (!segmentType.isAssignableFrom(UpdateCountImpl.class))
+ return Mono.empty();
+
+ return Flux.fromStream(
+ LongStream.of(updateCounts)
+ .mapToObj(updateCount ->
+ segmentMapper.apply(segmentType.cast(
+ new UpdateCountImpl(updateCount)))));
}
}
@@ -589,8 +592,7 @@ private static final class BatchUpdateErrorResult extends OracleResultImpl {
private final BatchUpdateResult batchUpdateResult;
private final ErrorResult errorResult;
- private BatchUpdateErrorResult(
- BatchUpdateException batchUpdateException) {
+ private BatchUpdateErrorResult(BatchUpdateException batchUpdateException) {
batchUpdateResult = new BatchUpdateResult(
batchUpdateException.getLargeUpdateCounts());
errorResult =
@@ -598,11 +600,13 @@ private BatchUpdateErrorResult(
}
@Override
- Publisher publishSegments(Function mappingFunction) {
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
return Flux.concat(
- batchUpdateResult.publishSegments(mappingFunction),
- errorResult.publishSegments(mappingFunction));
+ batchUpdateResult.mapSegments(segmentType, segmentMapper),
+ errorResult.mapSegments(segmentType, segmentMapper));
}
+
}
/**
@@ -617,10 +621,38 @@ private ErrorResult(R2dbcException r2dbcException) {
this.r2dbcException = r2dbcException;
}
+ /**
+ * {@inheritDoc}
+ *
+ * Emits the mapping of a message segment, or emits an error if another
+ * segment type is specified. Unlike other segment types, message segments
+ * represent an error that must be delivered to user code. Even when user
+ * code is calling for some other segment type, like rows with
+ * {@link #map(BiFunction)}, or update counts with
+ * {@link #getRowsUpdated()}, user code does not want these calls to ignore
+ * error. If user code really does want to ignore errors, it may call
+ * {@link #filter(Predicate)} to ignore message segments, or
+ * {@link #flatMap(Function)} to recover from message segments.
+ *
+ * This method uses Mono's fromSupplier factory to defer segment mapping
+ * until the publisher is subscribed to. This ensures that segments are
+ * consumed in the correct order when the returned publisher is concatenated
+ * after another, as with
+ * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for
+ * instance. Additionally, the factory handles any exception thrown by the
+ * segment mapper by translating it in to an onError signal.
+ *
+ */
@Override
- Publisher publishSegments(Function mappingFunction) {
- return Mono.just(new MessageImpl(r2dbcException))
- .map(mappingFunction);
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ if (! segmentType.isAssignableFrom(MessageImpl.class))
+ return Mono.error(r2dbcException);
+
+ return Mono.fromSupplier(() ->
+ segmentMapper.apply(segmentType.cast(
+ new MessageImpl(r2dbcException))));
}
}
@@ -655,20 +687,37 @@ private WarningResult(
}
@Override
- Publisher publishSegments(Function mappingFunction) {
- return Flux.fromStream(Stream.iterate(
- warning, Objects::nonNull, SQLWarning::getNextWarning)
- .map(nextWarning ->
- // It is noted that SQL can not be extracted from Oracle JDBC's
- // SQLWarning objects, so it must be explicitly provided here.
- OracleR2dbcExceptions.toR2dbcException(warning, sql))
- .map(WarningImpl::new))
- .map(mappingFunction)
- // Invoke publishSegments(Class, Function) rather than
- // publishSegments(Function) to update the state of the result; Namely,
- // the state that has the onConsumed Publisher emit a terminal signal.
- .concatWith(result != null
- ? result.publishSegments(Segment.class,mappingFunction)
+ void addDependent() {
+ result.addDependent();
+ }
+
+ /**
+ * @implNote In the 1.0.0 release, message segments for the warning were
+ * emitted prior to any segments from the {@link #result}. Unless message
+ * segments were consumed by {@link #flatMap(Function)}, the publisher
+ * returned to user code would emit onError before emitting values from
+ * the {@link #result}.
+ * Revisiting this decision before the 1.1.0 release, it really seems like a
+ * bad one. It is thought that user code would typically want to consume
+ * results before handling warnings and errors, and so the order is reversed
+ * in later releases. Segments are now emitted from the {@link #result}
+ * first, followed by the message segments. This change in behavior should
+ * be safe, as the R2DBC SPI does not specify any ordering for this case.
+ */
+ @Override
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+ return Flux.concat(
+ result != null
+ ? result.mapSegments(segmentType, segmentMapper)
+ : Mono.empty(),
+ segmentType.isAssignableFrom(WarningImpl.class)
+ ? Flux.fromStream(Stream.iterate(
+ warning, Objects::nonNull, SQLWarning::getNextWarning)
+ .map(nextWarning ->
+ segmentMapper.apply(segmentType.cast(
+ new WarningImpl(toR2dbcException(nextWarning, sql))))))
+ // Do not emit warnings unless flatMap or filter will consume them
: Mono.empty());
}
}
@@ -679,6 +728,13 @@ Publisher publishSegments(Function mappingFunction) {
*/
private static final class FilteredResult extends OracleResultImpl {
+ /**
+ * An object that represents a filtered {@code Segment}. This object is
+ * output by a segment mapping function defined in
+ * {@link #mapSegments(Class, Function)}.
+ */
+ private static final Object FILTERED = new Object();
+
/** Result of segments to publish after applying the {@link #filter} */
private final OracleResultImpl result;
@@ -687,21 +743,72 @@ private static final class FilteredResult extends OracleResultImpl {
/**
* Constructs a new result that applies a {@code filter} when publishing
- * segments of a {@code result}
+ * segments of a {@code result}.
*/
- private FilteredResult(
- OracleResultImpl result, Predicate filter) {
+ private FilteredResult(OracleResultImpl result, Predicate filter) {
this.result = result;
this.filter = filter;
}
@Override
- @SuppressWarnings("unchecked")
- Publisher publishSegments(Function mappingFunction) {
- return result.publishSegments(Segment.class, segment ->
- filter.test(segment)
- ? mappingFunction.apply(segment)
- : (T)FILTERED);
+ void addDependent() {
+ result.addDependent();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Passes {@code Segment.class} to the {@code mapSegments} method of the
+ * filtered {@link #result} to map all segments with the filtering
+ * predicate. Mapping functions must return a non-null value, so it will
+ * return a dummy object, {@link #FILTERED}, for segments that are filtered
+ * by the predicate. The {@code FILTERED} object is then filtered by a
+ * downstream filter operator.
+ *
+ * It is important that {@code Segment.class} be passed to the
+ * {@code mapSegments} method of the filtered result, otherwise
+ * {@code Message} segments would be emitted with {@code onError} and bypass
+ * the filtering function. If the filter does not exclude a message segment,
+ * and the {@code segmentMapper} does not accept message segments, only then
+ * will the exception of the message segment be emitted with onError.
+ *
+ */
+ @Override
+ protected Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ @SuppressWarnings("unchecked")
+ U filtered = (U)FILTERED;
+
+ return Flux.from(result.mapSegments(
+ Segment.class, segment -> {
+ if (!filter.test(segment))
+ return filtered;
+
+ if (segmentType.isAssignableFrom(segment.getClass()))
+ return segmentMapper.apply(segmentType.cast(segment));
+ else if (segment instanceof Message)
+ throw ((Message)segment).exception();
+ else
+ return filtered;
+ }))
+ .filter(next -> next != FILTERED);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Overridden to also set the filtered result as published. If this method
+ * is called, then a method of the public API has been called to return
+ * a publisher from this result. If user code somehow has a reference to the
+ * filtered result as well, then the filtered result should also throw
+ * {@code IllegalStateException} if one of its public methods are invoked.
+ *
+ */
+ @Override
+ protected void setPublished() {
+ result.setPublished();
+ super.setPublished();
}
}
@@ -766,6 +873,84 @@ public Readable getReadable() {
}
}
+
+ /**
+ * A base class for results that depend on a JDBC statement to remain open
+ * until the result is consumed. This base class handles interactions with
+ * a {@link DependentCounter} object representing a collection of results
+ * that depend on a JDBC statement. Subclasses implement
+ * {@link #mapDependentSegments(Class, Function)} following the same
+ * specification as {@link #mapSegments(Class, Function)}.
+ */
+ private static abstract class DependentResult extends OracleResultImpl {
+
+ /**
+ * A collection of results that depend on the JDBC statement which created
+ * this result to remain open until all results are consumed.
+ */
+ protected final DependentCounter dependentCounter;
+
+ /**
+ * Constructs a new result that registers and deregisters itself with a
+ * collection of dependent results.
+ * @param dependentCounter Collection of dependent results. Not null.
+ */
+ private DependentResult(DependentCounter dependentCounter) {
+ this.dependentCounter = dependentCounter;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Adds this result to the collection of dependent results.
+ *
+ */
+ @Override
+ void addDependent() {
+ dependentCounter.increment();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Delegates to the {@code mapDependentSegments(Class, Function)} method of
+ * a subclass to perform actual segment mapping. This method ensures that
+ * this result is removed from the collection of dependent results when the
+ * segment mapping publisher terminates with {@code onComplete},
+ * {@code onError}, or {@code cancel}.
+ *
+ */
+ @Override
+ protected final Publisher mapSegments(
+ Class segmentType, Function super T, U> segmentMapper) {
+
+ @SuppressWarnings("unchecked")
+ Publisher removeDependent = (Publisher) dependentCounter.decrement();
+
+ return Flux.concatDelayError(
+ mapDependentSegments(segmentType, segmentMapper),
+ removeDependent)
+ .doOnCancel(() ->
+ Mono.from(removeDependent).subscribe());
+ }
+
+ /**
+ * Maps segments exactly as specified by
+ * {@link #mapSegments(Class, Function)}. This method is called from the
+ * base class implementation of {@link #mapSegments(Class, Function)}. The
+ * base class implementation ensures that this result is removed from the
+ * collection of dependents when the segment mapping publisher is
+ * terminated.
+ * @param segmentType Class of {@code Segment} to map. Not null.
+ * @param segmentMapper Maps segments to published values.
+ * @return A publisher that emits the mapped values.
+ * @param Segment type to map
+ * @param Type of mapped value
+ */
+ protected abstract Publisher mapDependentSegments(
+ Class segmentType, Function super T, U> segmentMapper);
+ }
+
/**
* Implementation of {@link UpdateCount}.
*/
diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
index 8b76052..2555ae1 100755
--- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
+++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java
@@ -45,7 +45,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
@@ -53,6 +52,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;
@@ -295,7 +295,7 @@ public Statement bind(int index, Object value) {
*
* @throws IllegalArgumentException {@inheritDoc}
* @throws IllegalArgumentException If the {@code identifier} does match a
- * case sensitive parameter name that appears in this {@code Statement's}
+ * case-sensitive parameter name that appears in this {@code Statement's}
* SQL command.
* @throws IllegalArgumentException If the JDBC PreparedStatement does not
* support conversions of the bind value's Java type into a SQL type.
@@ -360,7 +360,7 @@ public Statement bindNull(int index, Class> type) {
*
* @throws IllegalArgumentException {@inheritDoc}
* @throws IllegalArgumentException If the {@code identifier} does match a
- * case sensitive parameter name that appears in this {@code Statement's}
+ * case-sensitive parameter name that appears in this {@code Statement's}
* SQL command.
*/
@Override
@@ -736,10 +736,8 @@ private void bindParameter(int index, Parameter parameter) {
requireNonNull(parameter.getType(), "Parameter type is null");
SQLType jdbcType = toJdbcType(r2dbcType);
- if (jdbcType == null) {
- throw new IllegalArgumentException(
- "Unsupported SQL type: " + r2dbcType);
- }
+ if (jdbcType == null)
+ throw new IllegalArgumentException("Unsupported SQL type: " + r2dbcType);
requireSupportedJavaType(parameter.getValue());
bindValues[index] = parameter;
@@ -749,7 +747,7 @@ private void bindParameter(int index, Parameter parameter) {
* Checks that the specified 0-based {@code index} is within the range of
* valid parameter indexes for this statement.
* @param index A 0-based parameter index
- * @throws IndexOutOfBoundsException If the {@code index} is outside of the
+ * @throws IndexOutOfBoundsException If the {@code index} is not within the
* valid range.
*/
private void requireValidIndex(int index) {
@@ -840,8 +838,7 @@ private static IllegalStateException parameterNotSet() {
}
/**
- * Checks that the class type of an {@code object} is supported as a bind
- * value.
+ * Checks that the class of an {@code object} is supported as a bind value.
* @param object Object to check. May be null.
* @throws IllegalArgumentException If the class type of {@code object} is not
* supported
@@ -910,6 +907,14 @@ private class JdbcStatement {
/** The {@code PreparedStatement} that is executed */
protected final PreparedStatement preparedStatement;
+ /**
+ * Collection of results that depend on the JDBC statement to remain open
+ * until they are consumed. For instance, a result that retains a
+ * {@code ResultSet} would depend on the JDBC statement to remain open, as
+ * the {@code ResultSet} is closed when the JDBC statement is closed.
+ */
+ protected final DependentCounter dependentCounter;
+
/** The bind values that are set on the {@link #preparedStatement} */
protected final Object[] binds;
@@ -928,6 +933,15 @@ private class JdbcStatement {
private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
this.preparedStatement = preparedStatement;
this.binds = binds;
+
+ // Add this statement as a "party" (think j.u.c.Phaser) to the dependent
+ // results by calling increment(). After the Result publisher returned by
+ // execute() terminates, this statement "arrives" by calling decrement().
+ // Calling decrement() after the Result publisher terminates ensures that
+ // the JDBC statement can not be closed until all results have had a
+ // chance to be emitted to user code.
+ dependentCounter = new DependentCounter(closeStatement());
+ dependentCounter.increment();
}
/**
@@ -945,21 +959,32 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) {
* emitted from the {@code bind} or {@code getResults} publishers, or if
* {@link PreparedStatement#getWarnings()} yields a warning.
*
- * After all {@code Results} have been consumed, the
- * {@link #preparedStatement} is closed.
+ * The {@link #preparedStatement} can only be closed after all results that
+ * depend on it have been consumed by user code. It is not guaranteed that
+ * every result created by this statement will actually reach user code; A
+ * cancel signal may occur at any time. Upon cancellation, no more signals
+ * are emitted downstream. For this reason, the
+ * {@link OracleResultImpl#addDependent()} method must be called only when
+ * it is certain that a result will reach the downstream subscriber. This
+ * certainty is offered by the {@link Flux#doOnNext(Consumer)} operator.
*
- * @return A publisher that emits the result of executing this statement
+ * @return A publisher that emits the result of executing this statement.
+ * Not null.
*/
final Publisher execute() {
- return Flux.usingWhen(Mono.just(new ArrayList<>(1)),
- results ->
- Mono.from(bind())
- .thenMany(executeJdbc())
- .map(this::getWarnings)
- .doOnNext(results::add)
- .onErrorResume(R2dbcException.class, r2dbcException ->
- Mono.just(createErrorResult(r2dbcException))),
- this::deallocate);
+
+ Mono deallocate =
+ Mono.from(deallocate()).cast(OracleResultImpl.class);
+
+ return Flux.concatDelayError(
+ Mono.from(bind())
+ .thenMany(executeJdbc())
+ .map(this::getWarnings)
+ .onErrorResume(R2dbcException.class, r2dbcException ->
+ Mono.just(createErrorResult(r2dbcException)))
+ .doOnNext(OracleResultImpl::addDependent),
+ deallocate)
+ .doOnCancel(deallocate::subscribe);
}
/**
@@ -1104,7 +1129,7 @@ private OracleResultImpl getCurrentResult(boolean isResultSet) {
return fromJdbc(() -> {
if (isResultSet) {
return createQueryResult(
- preparedStatement.getResultSet(), adapter);
+ dependentCounter, preparedStatement.getResultSet(), adapter);
}
else {
long updateCount = preparedStatement.getLargeUpdateCount();
@@ -1143,71 +1168,43 @@ private OracleResultImpl getWarnings(OracleResultImpl result) {
* made to deallocate any remaining resources before emitting the error.
*
* The returned publisher subscribes to the {@link #deallocators}
- * publisher, and may close the {@link #preparedStatement} if all {@code
- * results} have been consumed when this method is called.
- *
- * If one or more {@code results} have yet to be consumed, then this method
- * arranges for the {@link #preparedStatement} to be closed after all
- * results have been consumed. A result may be backed by a
- * {@link java.sql.ResultSet} or by {@link CallableStatement}, so the
- * {@link #preparedStatement} must remain open until all results have
- * been consumed.
+ * publisher, and may close the {@link #preparedStatement} if all results
+ * have already been consumed when this method is called. This method
+ * calls the {@code decrement()} method of {@link #dependentCounter}, in
+ * balance with the {@code increment()} call that occur in the constructor
+ * of this statement.
*
- * @param results Results that must be consumed before closing the
- * {@link #preparedStatement}
* @return A publisher that completes when all resources have been
* deallocated
*/
- private Publisher deallocate(Collection results) {
-
- // Set up a counter that is decremented as each result is consumed.
- AtomicInteger unconsumed = new AtomicInteger(results.size());
-
- // Set up a publisher that decrements the counter, and closes the
- // statement when it reaches zero
- Publisher closeStatement = adapter.getLock().run(() -> {
- if (unconsumed.decrementAndGet() == 0)
- closeStatement();
- });
-
- // Tell each unconsumed result to decrement the unconsumed count, and then
- // close the statement when the count reaches zero.
- for (OracleResultImpl result : results) {
- if (!result.onConsumed(closeStatement))
- unconsumed.decrementAndGet();
- }
-
- // If there are no results, or all results have already been consumed,
- // then the returned publisher closes the statement.
- if (unconsumed.get() == 0)
- addDeallocation(adapter.getLock().run(this::closeStatement));
-
+ private Publisher deallocate() {
+ addDeallocation(dependentCounter.decrement());
return deallocators;
}
/**
- * Closes the JDBC {@link #preparedStatement}. This method should only be
- * called while holding the
- * {@linkplain ReactiveJdbcAdapter#getLock() connection lock}
- * @throws SQLException If the statement fails to close.
+ * @return A publisher that closes the JDBC {@link #preparedStatement} when
+ * subscribed to. Not null.
*/
- private void closeStatement() throws SQLException {
- try {
- // Workaround Oracle JDBC bug #34545179: ResultSet references are
- // retained even when the statement is closed. Calling getMoreResults
- // with the CLOSE_ALL_RESULTS argument forces the driver to
- // de-reference them.
- preparedStatement.getMoreResults(CLOSE_ALL_RESULTS);
- }
- catch (SQLException sqlException) {
- // It may be the case that the JDBC connection was closed, and so the
- // statement was closed with it. Check for this, and ignore the
- // SQLException if so.
- if (!jdbcConnection.isClosed())
- throw sqlException;
- }
+ private Publisher closeStatement() {
+ return adapter.getLock().run(() -> {
+ try {
+ // Workaround Oracle JDBC bug #34545179: ResultSet references are
+ // retained even when the statement is closed. Calling getMoreResults
+ // with the CLOSE_ALL_RESULTS argument forces the driver to
+ // de-reference them.
+ preparedStatement.getMoreResults(CLOSE_ALL_RESULTS);
+ }
+ catch (SQLException sqlException) {
+ // It may be the case that the JDBC connection was closed, and so the
+ // statement was closed with it. Check for this, and ignore the
+ // SQLException if so.
+ if (!jdbcConnection.isClosed())
+ throw sqlException;
+ }
- preparedStatement.close();
+ preparedStatement.close();
+ });
}
/**
@@ -1323,7 +1320,7 @@ private Publisher convertClobBind(
/**
* Converts a ByteBuffer to a byte array. The {@code byteBuffer} contents,
- * delimited by it's position and limit, are copied into the returned byte
+ * delimited by its position and limit, are copied into the returned byte
* array. No state of the {@code byteBuffer} is mutated, including it's
* position, limit, or mark.
* @param byteBuffer A ByteBuffer. Not null. Not retained.
@@ -1420,7 +1417,10 @@ protected Publisher executeJdbc() {
return Flux.concat(
super.executeJdbc(),
Mono.just(createCallResult(
- createOutParameters(new JdbcOutParameters(), metadata, adapter),
+ dependentCounter,
+ createOutParameters(
+ dependentCounter,
+ new JdbcOutParameters(), metadata, adapter),
adapter)));
}
@@ -1486,14 +1486,13 @@ private JdbcBatch(
*/
@Override
protected Publisher bind() {
- @SuppressWarnings({"unchecked"})
- Publisher[] bindPublishers = new Publisher[batchSize];
+ Publisher>[] bindPublishers = new Publisher[batchSize];
for (int i = 0; i < batchSize; i++) {
bindPublishers[i] = Flux.concat(
bind(batch.remove()),
adapter.getLock().run(preparedStatement::addBatch));
}
- return Flux.concat(bindPublishers);
+ return Flux.concat(bindPublishers).cast(Void.class);
}
/**
@@ -1605,8 +1604,8 @@ protected Publisher executeJdbc() {
if (generatedKeys.isBeforeFirst()) {
return Mono.just(createGeneratedValuesResult(
- preparedStatement.getLargeUpdateCount(), generatedKeys,
- adapter))
+ preparedStatement.getLargeUpdateCount(),
+ dependentCounter, generatedKeys, adapter))
.concatWith(super.getResults(
preparedStatement.getMoreResults(KEEP_CURRENT_RESULT)));
}
diff --git a/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java b/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java
index 231c1fd..b2ad059 100644
--- a/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java
+++ b/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java
@@ -98,7 +98,8 @@ final class SqlTypeMap {
R2dbcType.TIMESTAMP_WITH_TIME_ZONE),
entry(JDBCType.TINYINT, R2dbcType.TINYINT),
entry(JDBCType.VARBINARY, R2dbcType.VARBINARY),
- entry(JDBCType.VARCHAR, R2dbcType.VARCHAR)
+ entry(JDBCType.VARCHAR, R2dbcType.VARCHAR),
+ entry(JDBCType.REF_CURSOR, OracleR2dbcTypes.REF_CURSOR)
);
/**
diff --git a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java
index 4be45b1..a8c1d11 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java
@@ -47,11 +47,10 @@
import static oracle.r2dbc.util.Awaits.tryAwaitExecution;
import static oracle.r2dbc.util.Awaits.tryAwaitNone;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Verifies the Oracle R2DBC Driver implements behavior related to {@link Blob}
- * and {@link Clob} types that is specified in it's class and method level
+ * and {@link Clob} types that is specified in its class and method level
* javadocs, in the javadocs of {@code Blob} and {@code Clob}, and in Section
* 12 of the R2DBC 0.8.2 Specification.
*/
@@ -353,7 +352,7 @@ public void testClobBatchInsert() {
// Expect row.get(int/String) to use Clob as the default Java type
// mapping for CLOB type columns.
- List> Clobs = awaitMany(Flux.from(connection.createStatement(
+ List> clobs = awaitMany(Flux.from(connection.createStatement(
"SELECT x,y FROM testClobInsert ORDER BY id")
.execute())
.flatMap(result -> result.map((row, metadata) ->
@@ -361,10 +360,10 @@ public void testClobBatchInsert() {
// Expect bytes written to INSERTed Clobs to match the bytes read from
// SELECTed Clobs
- awaitBytes(xBytes0, Clobs.get(0).get(0));
- awaitBytes(yBytes0, Clobs.get(0).get(1));
- awaitBytes(xBytes1, Clobs.get(1).get(0));
- awaitBytes(yBytes1, Clobs.get(1).get(1));
+ awaitBytes(xBytes0, clobs.get(0).get(0));
+ awaitBytes(yBytes0, clobs.get(0).get(1));
+ awaitBytes(xBytes1, clobs.get(1).get(0));
+ awaitBytes(yBytes1, clobs.get(1).get(1));
}
finally {
diff --git a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
index e5727cf..1498825 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java
@@ -650,9 +650,16 @@ public void testOracleR2dbcWarning() {
awaitMany(Flux.from(warningStatement.execute())
.flatMap(result -> result.flatMap(Mono::just)));
- // Expect the warning segment first. Expect it to have the fixed message
- // and error number used by Oracle JDBC for all warnings.
- Result.Segment secondSegment = segments.get(0);
+ // Expect the update count segment first. Warnings are always emitted
+ // last.
+ Result.Segment firstSegment = segments.get(0);
+ assertEquals(0,
+ assertInstanceOf(UpdateCount.class, firstSegment).value());
+ assertFalse(firstSegment instanceof OracleR2dbcWarning);
+
+ // Expect the warning segment after the update count. Expect it to have
+ // the fixed message and error number used by Oracle JDBC for all warnings
+ Result.Segment secondSegment = segments.get(1);
OracleR2dbcWarning warning =
assertInstanceOf(OracleR2dbcWarning.class, secondSegment);
assertEquals(
@@ -666,14 +673,6 @@ public void testOracleR2dbcWarning() {
assertEquals(warning.sqlState(), exception.getSqlState());
assertEquals(sql, exception.getSql());
-
- // Expect the update count segment last. Warnings are always emitted
- // first.
- Result.Segment firstSegment = segments.get(1);
- assertEquals(0,
- assertInstanceOf(UpdateCount.class, firstSegment).value());
- assertFalse(firstSegment instanceof OracleR2dbcWarning);
-
// Verify that there are not any more segments
assertEquals(2, segments.size());
}
@@ -727,19 +726,25 @@ public void testOracleR2dbcWarningIgnored() {
public void testOracleR2dbcWarningNotIgnored() {
Connection connection = awaitOne(sharedConnection());
try {
-
// Expect a warning for forcing a view that references a non-existent
// table
String sql =
"CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarningIgnored AS" +
" SELECT x FROM thisdoesnotexist";
Statement warningStatement = connection.createStatement(sql);
+ AtomicInteger segmentIndex = new AtomicInteger(0);
awaitError(
R2dbcException.class,
awaitOne(warningStatement.execute())
.flatMap(segment ->
- Mono.error(
- assertInstanceOf(OracleR2dbcWarning.class, segment).exception())));
+ // Expect the update count first, followed by the warning
+ segmentIndex.getAndIncrement() == 0
+ ? Mono.just(assertInstanceOf(UpdateCount.class, segment)
+ .value())
+ : Mono.error(assertInstanceOf(OracleR2dbcWarning.class, segment)
+ .exception())));
+ // Expect only two segments
+ assertEquals(2, segmentIndex.get());
}
finally {
tryAwaitExecution(
diff --git a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java
index 06211a4..8709fc6 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java
@@ -23,19 +23,19 @@
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.Result;
import io.r2dbc.spi.RowMetadata;
+import io.r2dbc.spi.Statement;
import org.junit.jupiter.api.Test;
+import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.function.IntFunction;
+import java.util.function.Function;
import static java.util.Arrays.asList;
import static oracle.r2dbc.test.DatabaseConfig.connectTimeout;
@@ -45,7 +45,6 @@
import static oracle.r2dbc.util.Awaits.awaitNone;
import static oracle.r2dbc.util.Awaits.awaitOne;
import static oracle.r2dbc.util.Awaits.awaitUpdate;
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -53,7 +52,7 @@
/**
* Verifies that
- * {@link ReadablesMetadata} implements behavior that is specified in it's
+ * {@link ReadablesMetadata} implements behavior that is specified in its
* class and method level javadocs.
*/
public class OracleRowMetadataImplTest {
@@ -80,23 +79,21 @@ public void testGetColumnMetadataByIndex() {
"INSERT INTO testGetColumnMetadataByIndex (x,y) VALUES (0,0)"));
// Expect IllegalArgumentException for an index less than 0
- awaitError(IndexOutOfBoundsException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByIndex")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) ->
- metadata.getColumnMetadata(-1).getPrecision())));
+ verifyError(IndexOutOfBoundsException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByIndex"),
+ result ->
+ result.map((row, metadata) ->
+ metadata.getColumnMetadata(-1).getPrecision()));
// Expect IllegalArgumentException for an index greater than or equal
// to the number of columns
- awaitError(IndexOutOfBoundsException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByIndex")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) ->
- metadata.getColumnMetadata(2).getPrecision())));
+ verifyError(IndexOutOfBoundsException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByIndex"),
+ result ->
+ result.map((row, metadata) ->
+ metadata.getColumnMetadata(2).getPrecision()));
// Expect valid indexes to return the column metadata
awaitOne(asList(xPrecision, yPrecision),
@@ -161,57 +158,43 @@ public void testGetColumnMetadataByName() {
"INSERT INTO testGetColumnMetadataByName (x,y) VALUES (0,0)"));
// Expect IllegalArgumentException for a null name
- awaitError(IllegalArgumentException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata(null))
- ));
+ verifyError(IllegalArgumentException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata(null)));
// Expect NoSuchElementException for unmatched names
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata("z"))
- ));
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata("xx"))
- ));
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata("x "))
- ));
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata(" x"))
- ));
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata(" "))
- ));
- awaitError(NoSuchElementException.class,
- Flux.from(connection.createStatement(
- "SELECT x, y FROM testGetColumnMetadataByName")
- .execute())
- .concatMap(result ->
- result.map((row, metadata) -> metadata.getColumnMetadata(""))
- ));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata("z")));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata("xx")));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata("x ")));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata(" x")));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata(" ")));
+ verifyError(NoSuchElementException.class,
+ connection.createStatement(
+ "SELECT x, y FROM testGetColumnMetadataByName"),
+ result ->
+ result.map((row, metadata) -> metadata.getColumnMetadata("")));
// Expect valid names to return the column metadata
awaitOne(asList(xPrecision, yPrecision),
@@ -431,4 +414,17 @@ public void testGetColumnMetadatas() {
}
}
+ /**
+ * Verifies that an expected class of error is emitted when a mapping function
+ * is applied to the result of a statement.
+ */
+ private static void verifyError(
+ Class extends Throwable> errorClass, Statement statement,
+ Function> resultMapper) {
+ awaitError(
+ errorClass,
+ Flux.from(statement.execute())
+ .concatMapDelayError(resultMapper));
+ }
+
}
diff --git a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
index 509cbb4..7e198a0 100644
--- a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
+++ b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java
@@ -35,6 +35,7 @@
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.Type;
import oracle.r2dbc.OracleR2dbcOptions;
+import oracle.r2dbc.OracleR2dbcTypes;
import oracle.r2dbc.OracleR2dbcWarning;
import oracle.r2dbc.test.DatabaseConfig;
import org.junit.jupiter.api.Test;
@@ -45,9 +46,12 @@
import java.sql.RowId;
import java.sql.SQLWarning;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -55,6 +59,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -68,6 +73,7 @@
import static oracle.r2dbc.test.DatabaseConfig.port;
import static oracle.r2dbc.test.DatabaseConfig.serviceName;
import static oracle.r2dbc.test.DatabaseConfig.sharedConnection;
+import static oracle.r2dbc.test.DatabaseConfig.showErrors;
import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout;
import static oracle.r2dbc.test.DatabaseConfig.user;
import static oracle.r2dbc.util.Awaits.awaitError;
@@ -2022,16 +2028,18 @@ public void testWarningMessage() {
result.flatMap(segment -> {
int index = segmentCount.getAndIncrement();
if (index == 0) {
- assertTrue(segment instanceof OracleR2dbcWarning,
- "Unexpected Segment: " + segment);
- return Mono.just(((OracleR2dbcWarning)segment).exception());
- }
- else if (index == 1) {
+ // Expect the first segment to be an update count
assertTrue(segment instanceof UpdateCount,
"Unexpected Segment: " + segment);
assertEquals(0, ((UpdateCount)segment).value());
return Mono.empty();
}
+ else if (index == 1) {
+ // Expect second segment to be a warning
+ assertTrue(segment instanceof OracleR2dbcWarning,
+ "Unexpected Segment: " + segment);
+ return Mono.just(((OracleR2dbcWarning)segment).exception());
+ }
else {
fail("Unexpected Segment: " + segment);
return Mono.error(new AssertionError("Should not reach here"));
@@ -2238,6 +2246,190 @@ public void testGetSql() {
}
}
+ /**
+ * Verifies that a SYS_REFCURSOR out parameter can be consumed as a
+ * {@link Result} object.
+ */
+ @Test
+ public void testRefCursorOut() {
+ Connection connection = awaitOne(sharedConnection());
+ try {
+ List rows = createRows(100);
+
+ // Create a table with some rows to query
+ awaitExecution(connection.createStatement(
+ "CREATE TABLE testRefCursorTable(id NUMBER, value VARCHAR(10))"));
+ Statement insertStatement = connection.createStatement(
+ "INSERT INTO testRefCursorTable VALUES (:id, :value)");
+ awaitUpdate(
+ rows.stream()
+ .map(row -> 1)
+ .collect(Collectors.toList()),
+ bindRows(rows, insertStatement));
+
+ // Create a procedure that returns a cursor over the rows
+ awaitExecution(connection.createStatement(
+ "CREATE OR REPLACE PROCEDURE testRefCursorProcedure(" +
+ " countCursor OUT SYS_REFCURSOR)" +
+ " IS" +
+ " BEGIN" +
+ " OPEN countCursor FOR " +
+ " SELECT id, value FROM testRefCursorTable" +
+ " ORDER BY id;" +
+ " END;"));
+
+ // Call the procedure with the cursor registered as an out parameter, and
+ // expect it to map to a Result. Then consume the rows of the Result and
+ // verify they have the expected values inserted above.
+ awaitMany(
+ rows,
+ Flux.from(connection.createStatement(
+ "BEGIN testRefCursorProcedure(:countCursor); END;")
+ .bind("countCursor", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .execute())
+ .flatMap(result ->
+ result.map(outParameters ->
+ outParameters.get("countCursor")))
+ .cast(Result.class)
+ .flatMap(countCursor ->
+ countCursor.map(row ->
+ new TestRow(
+ row.get("id", Integer.class),
+ row.get("value", String.class)))));
+
+ // Verify the procedure call again. This time using an explicit
+ // Result.class argument to Row.get(...). Also, this time using
+ // Result.flatMap to create the publisher within the segment mapping
+ // function
+ awaitMany(
+ rows,
+ Flux.from(connection.createStatement(
+ "BEGIN testRefCursorProcedure(:countCursor); END;")
+ .bind("countCursor", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .execute())
+ .flatMap(result ->
+ result.flatMap(segment ->
+ ((Result.OutSegment)segment).outParameters()
+ .get(0, Result.class)
+ .map(row ->
+ new TestRow(
+ row.get("id", Integer.class),
+ row.get("value", String.class))))));
+ }
+ catch (Exception exception) {
+ showErrors(connection);
+ throw exception;
+ }
+ finally {
+ tryAwaitExecution(connection.createStatement(
+ "DROP PROCEDURE testRefCursorProcedure"));
+ tryAwaitExecution(connection.createStatement(
+ "DROP TABLE testRefCursorTable"));
+ tryAwaitNone(connection.close());
+ }
+ }
+
+ /**
+ * Verifies that SYS_REFCURSOR out parameters can be consumed as
+ * {@link Result} objects.
+ */
+ @Test
+ public void testMultipleRefCursorOut() {
+ Connection connection = awaitOne(sharedConnection());
+ try {
+ List rows = createRows(100);
+
+ // Create a table with some rows to query
+ awaitExecution(connection.createStatement(
+ "CREATE TABLE testMultiRefCursorTable(id NUMBER, value VARCHAR(10))"));
+ Statement insertStatement = connection.createStatement(
+ "INSERT INTO testMultiRefCursorTable VALUES (:id, :value)");
+ awaitUpdate(
+ rows.stream()
+ .map(row -> 1)
+ .collect(Collectors.toList()),
+ bindRows(rows, insertStatement));
+
+ // Create a procedure that returns a multiple cursors over the rows
+ awaitExecution(connection.createStatement(
+ "CREATE OR REPLACE PROCEDURE testMultiRefCursorProcedure(" +
+ " countCursor0 OUT SYS_REFCURSOR," +
+ " countCursor1 OUT SYS_REFCURSOR)" +
+ " IS" +
+ " BEGIN" +
+ " OPEN countCursor0 FOR " +
+ " SELECT id, value FROM testMultiRefCursorTable" +
+ " ORDER BY id;" +
+ " OPEN countCursor1 FOR " +
+ " SELECT id, value FROM testMultiRefCursorTable" +
+ " ORDER BY id DESC;" +
+ " END;"));
+
+ // Call the procedure with the cursors registered as out parameters, and
+ // expect them to map to Results. Then consume the rows of each Result and
+ // verify they have the expected values inserted above.
+ List expectedRows = new ArrayList<>(rows);
+ Collections.reverse(rows);
+ expectedRows.addAll(rows);
+ awaitMany(
+ expectedRows,
+ Flux.from(connection.createStatement(
+ "BEGIN testMultiRefCursorProcedure(:countCursor0, :countCursor1); END;")
+ .bind("countCursor0", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .bind("countCursor1", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .execute())
+ .flatMap(result ->
+ result.map(outParameters ->
+ List.of(
+ (Result)outParameters.get("countCursor0"),
+ (Result)outParameters.get("countCursor1"))))
+ .flatMap(results ->
+ Flux.concat(
+ results.get(0).map(row ->
+ new TestRow(
+ row.get("id", Integer.class),
+ row.get("value", String.class))),
+ results.get(1).map(row ->
+ new TestRow(
+ row.get("id", Integer.class),
+ row.get("value", String.class))))));
+
+ // Run the same verification, this time with Result.class argument to
+ // Row.get(...), and mapping the REF CURSOR Results into a Publisher
+ // within the row mapping function
+ awaitMany(
+ expectedRows,
+ Flux.from(connection.createStatement(
+ "BEGIN testMultiRefCursorProcedure(:countCursor0, :countCursor1); END;")
+ .bind("countCursor0", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .bind("countCursor1", Parameters.out(OracleR2dbcTypes.REF_CURSOR))
+ .execute())
+ .flatMap(result ->
+ result.map(outParameters ->
+ Flux.concat(
+ outParameters.get("countCursor0", Result.class).map(row ->
+ new TestRow(
+ row.get(0, Integer.class),
+ row.get(1, String.class))),
+ outParameters.get("countCursor1", Result.class).map(row ->
+ new TestRow(
+ row.get(0, Integer.class),
+ row.get(1, String.class))))))
+ .flatMap(Function.identity()));
+ }
+ catch (Exception exception) {
+ showErrors(connection);
+ throw exception;
+ }
+ finally {
+ tryAwaitExecution(connection.createStatement(
+ "DROP PROCEDURE testMultiRefCursorProcedure"));
+ tryAwaitExecution(connection.createStatement(
+ "DROP TABLE testMultiRefCursorTable"));
+ tryAwaitNone(connection.close());
+ }
+ }
+
// TODO: Repalce with Parameters.inOut when that's available
private static final class InOutParameter
implements Parameter, Parameter.In, Parameter.Out {
@@ -2403,4 +2595,55 @@ private void verifyConcurrentFetch(Connection connection) {
"DROP TABLE testConcurrentFetch"));
}
}
+
+ /**
+ * Creates list of a specified length of test table rows
+ */
+ private static List createRows(int length) {
+ return IntStream.range(0, 100)
+ .mapToObj(id -> new TestRow(id, String.valueOf(id)))
+ .collect(Collectors.toList());
+ }
+
+ /** Binds a list of rows to a batch statement */
+ private Statement bindRows(List rows, Statement statement) {
+ rows.stream()
+ .limit(rows.size() - 1)
+ .forEach(row ->
+ statement
+ .bind("id", row.id)
+ .bind("value", row.value)
+ .add());
+
+ statement
+ .bind("id", rows.get(rows.size() - 1).id)
+ .bind("value", rows.get(rows.size() - 1).value);
+
+ return statement;
+ }
+
+ /**
+ * A row of a test table.
+ */
+ private static class TestRow {
+ final int id;
+ final String value;
+
+ TestRow(int id, String value) {
+ this.id = id;
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return other instanceof TestRow
+ && id == ((TestRow)other).id
+ && Objects.equals(value, ((TestRow)other).value);
+ }
+
+ @Override
+ public String toString() {
+ return "[id=" + id + ", value=" + value + "]";
+ }
+ }
}
diff --git a/src/test/java/oracle/r2dbc/util/Awaits.java b/src/test/java/oracle/r2dbc/util/Awaits.java
index 81b4976..a9c241a 100644
--- a/src/test/java/oracle/r2dbc/util/Awaits.java
+++ b/src/test/java/oracle/r2dbc/util/Awaits.java
@@ -108,7 +108,7 @@ public static void awaitError(
Class extends Throwable> errorType, Publisher> errorPublisher) {
assertThrows(
errorType,
- () -> Mono.from(errorPublisher).block(sqlTimeout()),
+ () -> Flux.from(errorPublisher).blockLast(sqlTimeout()),
"Unexpected signal from Publisher of an error");
}