diff --git a/.github/workflows/test.sh b/.github/workflows/test.sh index d978e97..4ba9e9b 100755 --- a/.github/workflows/test.sh +++ b/.github/workflows/test.sh @@ -81,6 +81,6 @@ echo "HOST=localhost" >> src/test/resources/config.properties echo "PORT=1521" >> src/test/resources/config.properties echo "USER=test" >> src/test/resources/config.properties echo "PASSWORD=test" >> src/test/resources/config.properties -echo "CONNECT_TIMEOUT=60" >> src/test/resources/config.properties -echo "SQL_TIMEOUT=60" >> src/test/resources/config.properties +echo "CONNECT_TIMEOUT=120" >> src/test/resources/config.properties +echo "SQL_TIMEOUT=120" >> src/test/resources/config.properties mvn clean compile test diff --git a/README.md b/README.md index fa9299a..06c733e 100644 --- a/README.md +++ b/README.md @@ -508,12 +508,13 @@ for the out parameters is emitted last, after the `Result` for each cursor. Oracle R2DBC supports type mappings between Java and SQL for non-standard data types of Oracle Database. -| Oracle SQL Type | Java Type | -|---------------------------------------------------------------------------------------------------------------------------------------------------------|-----------| -| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` | -| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` | -| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` | -| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` | +| Oracle SQL Type | Java Type | +|---------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------| +| [JSON](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-E441F541-BA31-4E8C-B7B4-D2FB8C42D0DF) | `javax.json.JsonObject` or `oracle.sql.json.OracleJsonObject` | +| [DATE](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-5405B652-C30E-4F4F-9D33-9A4CB2110F1B) | `java.time.LocalDateTime` | +| [INTERVAL DAY TO SECOND](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-B03DD036-66F8-4BD3-AF26-6D4433EBEC1C) | `java.time.Duration` | +| [INTERVAL YEAR TO MONTH](https://docs.oracle.com/en/database/oracle/oracle-database/21/sqlrf/Data-Types.html#GUID-ED59E1B3-BA8D-4711-B5C8-B0199C676A95) | `java.time.Period` | +| [SYS_REFCURSOR](https://docs.oracle.com/en/database/oracle/oracle-database/21/lnpls/static-sql.html#GUID-470A7A99-888A-46C2-BDAF-D4710E650F27) | `io.r2dbc.spi.Result` | > Unlike the standard SQL type named "DATE", the Oracle Database type named > "DATE" stores values for year, month, day, hour, minute, and second. The > standard SQL type only stores year, month, and day. LocalDateTime objects are able @@ -553,6 +554,35 @@ prefetched entirely, a smaller prefetch size can be configured using the option, and the LOB can be consumed as a stream. By mapping LOB columns to `Blob` or `Clob` objects, the content can be consumed as a reactive stream. +### REF Cursors +Use the `oracle.r2dbc.OracleR2dbcTypes.REF_CURSOR` type to bind `SYS_REFCURSOR` out +parameters: +```java +Publisher executeProcedure(Connection connection) { + connection.createStatement( + "BEGIN example_procedure(:cursor_parameter); END;") + .bind("cursor_parameter", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .execute() +} +``` +A `SYS_REFCURSOR` out parameter can be mapped to an `io.r2dbc.spi.Result`: +```java +Publisher mapOutParametersResult(Result outParametersResult) { + return outParametersResult.map(outParameters -> + outParameters.get("cursor_parameter", Result.class)); +} +``` +The rows of a `SYS_REFCURSOR` may be consumed from the `Result` it is +mapped to: +```java +Publisher mapRefCursorRows(Result refCursorResult) { + return refCursorResult.map(row -> + new ExampleObject( + row.get("id_column", Long.class), + row.get("value_column", String.class))); +} +``` + ## Secure Programming Guidelines The following security related guidelines should be adhered to when programming with the Oracle R2DBC Driver. diff --git a/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java b/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java index 6f88622..08c6c56 100644 --- a/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java +++ b/src/main/java/oracle/r2dbc/OracleR2dbcTypes.java @@ -20,6 +20,7 @@ */ package oracle.r2dbc; +import io.r2dbc.spi.Result; import io.r2dbc.spi.Type; import oracle.sql.json.OracleJsonObject; @@ -92,6 +93,12 @@ private OracleR2dbcTypes() {} public static final Type TIMESTAMP_WITH_LOCAL_TIME_ZONE = new TypeImpl(LocalDateTime.class, "TIMESTAMP WITH LOCAL TIME ZONE"); + /** + * A cursor that is returned by a procedural call. + */ + public static final Type REF_CURSOR = + new TypeImpl(Result.class, "SYS_REFCURSOR"); + /** * Implementation of the {@link Type} SPI. */ diff --git a/src/main/java/oracle/r2dbc/impl/DependentCounter.java b/src/main/java/oracle/r2dbc/impl/DependentCounter.java new file mode 100644 index 0000000..1d1694e --- /dev/null +++ b/src/main/java/oracle/r2dbc/impl/DependentCounter.java @@ -0,0 +1,82 @@ +package oracle.r2dbc.impl; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + *

+ * A count of resources that depend on another resource to remain open. A + * dependent resource registers itself by incrementing the count, and + * deregisters itself by decrementing the count. The last dependent to + * deregister has the responsibility of subscribing to a {@code Publisher} that + * closes the resource it depended upon. + *

+ * This class is conceptually similar to a {@code java.util.concurrent.Phaser}. + * Parties register by calling {@link #increment()}, and deregister by calling + * {@link #decrement()}. Asynchronous "phase advancement" is then handled by + * the {@code Publisher} which {@code decrement} returns. + *

+ * This class offers a solution for tracking the consumption of + * {@link io.r2dbc.spi.Result} objects that depend on a JDBC statement to remain + * open until each result is consumed. Further explanations can be found in the + * JavaDocs of {@link OracleStatementImpl} and {@link OracleResultImpl}. + *

+ */ +class DependentCounter { + + /** Count of dependents */ + private final AtomicInteger count = new AtomicInteger(0); + + /** Publisher that closes the depended upon resource */ + private final Publisher closePublisher; + + /** + * Constructs a new counter that returns a resource closing publisher to the + * last dependent which unregisters. The counter is initialized with a count + * of zero. + * @param closePublisher Publisher that closes a resource. Not null. + */ + DependentCounter(Publisher closePublisher) { + this.closePublisher = closePublisher; + } + + /** + * Increments the count of dependents by one. + * + * A corresponding call to {@link #decrement()} MUST occur by the dependent + * which has called {@code increment()} + * + */ + void increment() { + count.incrementAndGet(); + } + + /** + *

+ * Returns a publisher that decrements the count of dependents by one when + * subscribed to. + * + * A corresponding call to {@link #increment()} MUST have previously occurred + * by the dependent which has called {@code decrement()} + * + *

+ * The dependent which has called this method MUST subscribe to the returned + * published. If the dependent that calls this method is the last dependent to + * do so, then the returned publisher will close the depended upon resource. + * Otherwise, if more dependents remain, the returned publisher does nothing. + * The caller of this method has no way to tell which is the case, so it must + * subscribe to be safe. + *

+ * @return A publisher that closes the depended upon resource after no + * dependents remain. Not null. + */ + Publisher decrement() { + return Mono.defer(() -> + count.decrementAndGet() == 0 + ? Mono.from(closePublisher) + : Mono.empty()); + } + +} diff --git a/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java b/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java index d00dbef..4d4b56c 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleReadableImpl.java @@ -28,6 +28,7 @@ import io.r2dbc.spi.R2dbcException; import io.r2dbc.spi.R2dbcType; +import io.r2dbc.spi.Result; import io.r2dbc.spi.Row; import io.r2dbc.spi.RowMetadata; import io.r2dbc.spi.Type; @@ -37,6 +38,7 @@ import oracle.r2dbc.impl.ReadablesMetadata.RowMetadataImpl; import java.nio.ByteBuffer; +import java.sql.ResultSet; import java.sql.Timestamp; import java.time.LocalDateTime; import java.util.NoSuchElementException; @@ -65,19 +67,27 @@ class OracleReadableImpl implements io.r2dbc.spi.Readable { /** Metadata of the values of this {@code Readable}. */ private final ReadablesMetadata readablesMetadata; + /** + * A collection of results that depend on the JDBC statement which created + * this readable to remain open until all results are consumed. + */ + private final DependentCounter dependentCounter; + /** *

* Constructs a new {@code Readable} that supplies values of a * {@code jdbcReadable} and obtains metadata of the values from * {@code resultMetadata}. *

+ * * @param jdbcReadable Readable values from a JDBC Driver. Not null. * @param readablesMetadata Metadata of each value. Not null. * @param adapter Adapts JDBC calls into reactive streams. Not null. */ private OracleReadableImpl( - JdbcReadable jdbcReadable, ReadablesMetadata readablesMetadata, - ReactiveJdbcAdapter adapter) { + DependentCounter dependentCounter, JdbcReadable jdbcReadable, + ReadablesMetadata readablesMetadata, ReactiveJdbcAdapter adapter) { + this.dependentCounter = dependentCounter; this.jdbcReadable = jdbcReadable; this.readablesMetadata = readablesMetadata; this.adapter = adapter; @@ -96,9 +106,9 @@ private OracleReadableImpl( * {@code metadata}. Not null. */ static Row createRow( - JdbcReadable jdbcReadable, RowMetadataImpl metadata, - ReactiveJdbcAdapter adapter) { - return new RowImpl(jdbcReadable, metadata, adapter); + DependentCounter dependentCounter, JdbcReadable jdbcReadable, + RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) { + return new RowImpl(dependentCounter, jdbcReadable, metadata, adapter); } /** *

@@ -113,9 +123,10 @@ static Row createRow( * {@code metadata}. Not null. */ static OutParameters createOutParameters( - JdbcReadable jdbcReadable, OutParametersMetadataImpl metadata, - ReactiveJdbcAdapter adapter) { - return new OutParametersImpl(jdbcReadable, metadata, adapter); + DependentCounter dependentCounter, JdbcReadable jdbcReadable, + OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) { + return new OutParametersImpl( + dependentCounter, jdbcReadable, metadata, adapter); } /** @@ -162,8 +173,8 @@ public T get(String name, Class type) { /** * Returns the 0-based index of the value identified by {@code name}. This * method implements a case-insensitive name match. If more than one - * value has a matching name, this method returns lowest index of all - * matching values. + * value has a matching name, this method returns lowest of all indexes that + * match. * @param name The name of a value. Not null. * @return The index of the named value within this {@code Readable} * @throws NoSuchElementException If no column has a matching name. @@ -208,6 +219,9 @@ else if (io.r2dbc.spi.Clob.class.equals(type)) { else if (LocalDateTime.class.equals(type)) { value = getLocalDateTime(index); } + else if (Result.class.equals(type)) { + value = getResult(index); + } else if (Object.class.equals(type)) { // Use the default type mapping if Object.class has been specified. // This method is invoked recursively with the default mapping, so long @@ -327,6 +341,36 @@ private LocalDateTime getLocalDateTime(int index) { } } + /** + *

+ * Converts the value of a column at the specified {@code index} to a + * {@code Result}. This method is intended for mapping REF CURSOR values, + * which JDBC will map to a {@link ResultSet}. + *

+ * A REF CURSOR is closed when the JDBC statement that created it is closed. + * To prevent the cursor from getting closed, the Result returned by this + * method is immediately added to the collection of results that depend on the + * JDBC statement. + *

+ * The Result returned by this method is received by user code, and user code + * MUST then fully consume it. The JDBC statement is not closed until the + * result is fully consumed. + *

+ * @param index 0 based column index + * @return A column value as a {@code Result}, or null if the column value is + * NULL. + */ + private Result getResult(int index) { + ResultSet resultSet = jdbcReadable.getObject(index, ResultSet.class); + + if (resultSet == null) + return null; + + dependentCounter.increment(); + return OracleResultImpl.createQueryResult( + dependentCounter, resultSet, adapter); + } + /** * Checks if the specified zero-based {@code index} is a valid column index * for this row. This method is used to verify index value parameters @@ -368,10 +412,9 @@ private static final class RowImpl * @param adapter Adapts JDBC calls into reactive streams. Not null. */ private RowImpl( - JdbcReadable jdbcReadable, - RowMetadataImpl metadata, - ReactiveJdbcAdapter adapter) { - super(jdbcReadable, metadata, adapter); + DependentCounter dependentCounter, JdbcReadable jdbcReadable, + RowMetadataImpl metadata, ReactiveJdbcAdapter adapter) { + super(dependentCounter, jdbcReadable, metadata, adapter); this.metadata = metadata; } @@ -410,10 +453,9 @@ private static final class OutParametersImpl * @param adapter Adapts JDBC calls into reactive streams. Not null. */ private OutParametersImpl( - JdbcReadable jdbcReadable, - OutParametersMetadataImpl metadata, - ReactiveJdbcAdapter adapter) { - super(jdbcReadable, metadata, adapter); + DependentCounter dependentCounter, JdbcReadable jdbcReadable, + OutParametersMetadataImpl metadata, ReactiveJdbcAdapter adapter) { + super(dependentCounter,jdbcReadable, metadata, adapter); this.metadata = metadata; } diff --git a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java index ed6ed47..502971f 100644 --- a/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleResultImpl.java @@ -38,7 +38,6 @@ import java.sql.SQLWarning; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Predicate; @@ -55,124 +54,63 @@ *

* Abstract class providing a base implementation of the R2DBC SPI * {@link Result} interface. Concrete subclasses implement - * {@link #publishSegments(Function)} to return a {@link Publisher} that emits - * the output of a {@link Segment} mapping function for each {@code Segment} of - * the {@code Result}. Implementations of R2DBC SPI methods in the base - * class invoke {@code publishSegments} with a mapping function that + * {@link #mapSegments(Class, Function)} to return a {@link Publisher} that + * emits the output of a {@link Segment} mapping function for each + * {@code Segment} of the {@code Result}. Implementations of R2DBC SPI methods + * in the base class invoke {@code mapSegments} with a mapping function that * filters the emitted {@code Segment}s according to the specification of the * SPI method. *

*/ abstract class OracleResultImpl implements Result { - /** - * Object output by mapping functions provided to - * {@link #publishSegments(Function)} for {@code Segment}s that do not - * satisfy a filter. Downstream operators of - * {@link #publishSegments(Function)} filter this object so that it is not - * emitted to user code. - */ - private static final Object FILTERED = new Object(); - /** * Indicates if a method call on this {@code Result} has already returned a * {@code Publisher} that allows this {@code Result} to be consumed. In - * conformance with the R2DBC SPI, multiple attempts to consume the this - * result will yield an {@code IllegalStateException}. + * conformance with the R2DBC SPI, an {@code IllegalStateException} is thrown + * if multiple attempts are made to consume this result; A result may only be + * consumed once. */ private boolean isPublished = false; - /** - * Reference to a publisher that must be subscribed to after all segments of - * this result have been consumed. The reference is updated to {@code null} - * after the publisher has been subscribed to. - */ - private final AtomicReference> onConsumed = - new AtomicReference<>(Mono.empty()); - /** Private constructor invoked by inner subclasses */ private OracleResultImpl() { } - /** - * Publishes the output of a {@code mappingFunction} for each {@code Segment} - * of this {@code Result}. - * @param mappingFunction {@code Segment} mapping function. Not null. - * @param Output type of the {@code mappingFunction} - * @return {@code Publisher} of values output by the {@code mappingFunction} - */ - abstract Publisher publishSegments( - Function mappingFunction); - /** *

- * Publishes the output of a {@code mappingFunction} for each {@code Segment} - * of this {@code Result}, where the {@code Segment} is an instance of the - * specified {@code type}. + * Returns a publisher that emits the output of a segment mapping function for + * each segment of this result. The mapping function only accepts segments of + * a specified type. This method is called from methods of the public API to + * create publishers of different value types. For instance, the + * {@link #getRowsUpdated()} method creates a {@code Publisher} by + * calling this method with an {@code UpdateCount.class} segment type and a + * function that maps {@code UpdateCount} segments to a {@code Long}. *

- * This method sets {@link #isPublished} to prevent multiple consumptions - * of this {@code Result}. In case this is a {@link FilteredResult}, this - * method must invoke {@link #publishSegments(Function)}, before returning, - * in order to update {@code isPublished} of the {@link FilteredResult#result} - * as well. - *

- * When the returned publisher terminates with {@code onComplete}, - * {@code onError}, or {@code cancel}, the {@link #onConsumed} publisher is - * subscribed to. The {@code onConsumed} reference is updated to {@code null} - * so that post-consumption calls to {@link #onConsumed(Publisher)} can detect - * that this result is already consumed. - *

- * The returned {@code Publisher} emits {@code onError} with an - * {@link R2dbcException} if this {@code Result} has a {@link Message} segment - * and the {@code type} is not a super-type of {@code Message}. This - * corresponds to the specified behavior of R2DBC SPI methods - * {@link #map(BiFunction)}, {@link #map(BiFunction)}, and - * {@link #getRowsUpdated()} + * Any segments, other than {@code Message} segments, that are not an instance + * of the {@code segmentType} should not be passed to the + * {@code segmentMapper}, and should not emitted by the returned publisher in + * any form. However, {@code Message} segments are an exception. The error of + * a {@code Message} segment must be emitted as an {@code onError} signal, + * even if the {@code segmentType} is not assignable to {@code Message} + * segments. *

- * @param type {@code Segment} type to be mapped. Not null. - * @param mappingFunction {@code Segment} mapping function. Not null. - * @param {@code Segment} type to be mapped - * @param Output type of the {@code mappingFunction} - * @return {@code Publisher} of mapped {@code Segment}s + * @param segmentType Class of {@code Segment} to map. Not null. + * @param segmentMapper Maps segments to published values. + * @return A publisher that emits the mapped values. + * @param Segment type to map + * @param Type of mapped value */ - @SuppressWarnings("unchecked") - private Publisher publishSegments( - Class type, Function mappingFunction) { - - setPublished(); - - Mono whenConsumed = Mono.defer(() -> { - Publisher consumedPublisher = onConsumed.getAndSet(null); - return consumedPublisher == null - ? Mono.empty() - : Mono.from((Publisher)consumedPublisher); - }); - - return Flux.concatDelayError( - Flux.from(publishSegments(segment -> { - if (type.isInstance(segment)) - return mappingFunction.apply(type.cast(segment)); - else if (segment instanceof OracleR2dbcWarning) - return (U)FILTERED; - else if (segment instanceof Message) - throw ((Message)segment).exception(); - else - return (U)FILTERED; - })) - .filter(object -> object != FILTERED), - whenConsumed) - .doOnCancel(() -> - Mono.from(whenConsumed).subscribe()); - } + protected abstract Publisher mapSegments( + Class segmentType, Function segmentMapper); /** * {@inheritDoc} *

- * Implements the R2DBC SPI method to return a {@code Publisher} emitting the - * flat-mapped output of {@code Publisher}s output by a - * {@code mappingFunction} for all {@code Segments} this {@code Result}. - * {@code Publisher}s output by the {@code mappingFunction} are subscribed to - * serially with the completion of the {@code Publisher} output for any - * previous {@code Segment}. + * Implements the R2DBC SPI method to return a flat-mapping of publishers + * generated by a {@code mappingFunction}. Publishers output by the + * {@code mappingFunction} are subscribed to serially. Serial subscription is + * implemented by the {@code Flux.concat(Publisher)} factory called + * by this method. *

* The returned {@code Publisher} does not support multiple * {@code Subscriber}s @@ -182,8 +120,9 @@ else if (segment instanceof Message) public Publisher flatMap( Function> mappingFunction) { requireNonNull(mappingFunction, "mappingFunction is null"); + setPublished(); return singleSubscriber(Flux.concat( - publishSegments(Segment.class, mappingFunction))); + mapSegments(Segment.class, mappingFunction))); } /** @@ -200,7 +139,8 @@ public Publisher flatMap( */ @Override public Publisher getRowsUpdated() { - return publishSegments(UpdateCount.class, UpdateCount::value); + setPublished(); + return mapSegments(UpdateCount.class, UpdateCount::value); } /** @@ -218,8 +158,9 @@ public Publisher getRowsUpdated() { public Publisher map( BiFunction mappingFunction) { requireNonNull(mappingFunction, "mappingFunction is null"); - return singleSubscriber(publishSegments(RowSegment.class, - rowSegment -> { + setPublished(); + return singleSubscriber(mapSegments( + RowSegment.class, rowSegment -> { Row row = rowSegment.row(); return mappingFunction.apply(row, row.getMetadata()); })); @@ -240,8 +181,9 @@ public Publisher map( public Publisher map( Function mappingFunction) { requireNonNull(mappingFunction, "mappingFunction is null"); - return singleSubscriber(publishSegments(ReadableSegment.class, - readableSegment -> + setPublished(); + return singleSubscriber(mapSegments( + ReadableSegment.class, readableSegment -> mappingFunction.apply(readableSegment.getReadable()))); } @@ -250,44 +192,52 @@ public Publisher map( *

* Implements the R2DBC SPI method to return a new instance of * {@code OracleResultImpl} that implements - * {@link OracleResultImpl#publishSegments(Function)} to call - * {@link OracleResultImpl#publishSegments(Class, Function)} on this instance - * of {@code OracleResultImpl}. The invocation of {@code publishSegments} - * on this instance ensures that its consumption state is updated correctly. - * The invocation of {@code publishSegments} is provided with a mapping - * function that outputs the {@link #FILTERED} object for {@code Segment}s - * rejected by the {@code filter}. + * {@link OracleResultImpl#mapSegments(Class, Function)} to filter segments of + * this result with the specified {@code filter} predicate. *

*/ @Override public OracleResultImpl filter(Predicate filter) { requireNonNull(filter, "filter is null"); - - if (isPublished) - throw multipleConsumptionException(); - return new FilteredResult(this, filter); } /** *

- * Sets a publisher that is subscribed to when all segments of this result - * have been consumed. + * Adds this result to a collection of results that depend on a JDBC + * statement. After this method is called, the JDBC statement must remain open + * until this result signals that it has been closed. + *

+ * This method must only be invoked when it is certain that this + * result will be received by user code. If user code never receives this + * result, then it can never consume it, and the JDBC statement is never + * closed. Otherwise, once this result reaches user code, that code is + * responsible for consuming it. The R2DBC specification requires results to + * be fully consumed; There is no other way for Oracle R2DBC to know when + * it is safe to close the JDBC statement. + *

+ * Depending on the type of this result, this method may or may not actually + * do anything. For instance, if this result is an update count, then it + * doesn't depend on a JDBC statement, and so it won't actually register + * itself as a dependent. This result registers itself only if it retains + * something like {@code ResultSet} which depends on the JDBC statement to + * remain open. *

- * If this result has already been consumed, then the publisher is not - * subscribed to. + * Additional results may be added to the collection after this method + * returns. In particular, a REF CURSOR is backed by a ResultSet, and that + * ResultSet will be closed when the JDBC statement is closed. At the time + * when this method is called, it is not known what the user defined mapping + * function will do. A check to see if REF CURSOR ResultSet was created can + * only after the user defined function has executed. *

- * A subsequent call to this method overwrites the publisher that has been - * set by the current call. + * This method is implemented by the OracleResultImpl super class to do + * nothing. Subclasses that depend on the JDBC statement override this method + * and add themselves to the collection of dependent results. *

- * @param onConsumed Publisher to subscribe to when consumed. Not null. - * @return true if this result has not already been consumed, and the - * publisher will be subscribed to. Returns false if the publisher will not - * be subscribed to because this result is already consumed. */ - final boolean onConsumed(Publisher onConsumed) { - return null != this.onConsumed.getAndUpdate( - current -> current == null ? null : onConsumed); + void addDependent() { + // Do nothing for non-dependent results. This method is overridden by the + // DependentResult subclass to add a dependent result. } /** @@ -295,49 +245,50 @@ final boolean onConsumed(Publisher onConsumed) { * {@code Result} to be consumed. This method enforces the {@link Result} SPI * contract which does not allow the same result to be consumed more than * once. + * + * This method MUST be called before returning a Publisher to user code from + * all public APIs of this class. + * * @throws IllegalStateException If this result has already been consumed. */ protected void setPublished() { - if (! isPublished) - isPublished = true; - else - throw multipleConsumptionException(); - } - - /** - * Returns an {@code IllegalStateException} to be thrown when user code - * attempts to consume a {@code Result} more than once with invocations of - * {@link #map(BiFunction)}, {@link #map(Function)}, - * {@link #flatMap(Function)}, or {@link #getRowsUpdated()}. - * @return {@code IllegalStateException} indicating multiple consumptions - */ - private static IllegalStateException multipleConsumptionException() { - return new IllegalStateException( - "A result can not be consumed more than once"); + if (isPublished) { + throw new IllegalStateException( + "A result can not be consumed more than once"); + } + isPublished = true; } /** * Creates a {@code Result} that publishes a JDBC {@code resultSet} as * {@link RowSegment}s + * @param dependentCounter Collection of results that depend on the JDBC + * statement which created the {@code ResultSet} to remain open until all + * results are consumed. * @param resultSet {@code ResultSet} to publish. Not null. * @param adapter Adapts JDBC calls into reactive streams. Not null. * @return A {@code Result} for a ResultSet */ public static OracleResultImpl createQueryResult( - ResultSet resultSet, ReactiveJdbcAdapter adapter) { - return new ResultSetResult(resultSet, adapter); + DependentCounter dependentCounter, ResultSet resultSet, + ReactiveJdbcAdapter adapter) { + return new ResultSetResult(dependentCounter, resultSet, adapter); } /** * Creates a {@code Result} that publishes {@code outParameters} as * {@link OutSegment}s + * @param dependentCounter Collection of results that depend on the JDBC + * statement which created the {@code OutParameters} to remain open until all + * results are consumed. * @param outParameters {@code OutParameters} to publish. Not null. * @param adapter Adapts JDBC calls into reactive streams. Not null. * @return A {@code Result} for {@code OutParameters} */ static OracleResultImpl createCallResult( - OutParameters outParameters, ReactiveJdbcAdapter adapter) { - return new CallResult(outParameters, adapter); + DependentCounter dependentCounter, OutParameters outParameters, + ReactiveJdbcAdapter adapter) { + return new CallResult(dependentCounter, outParameters, adapter); } /** @@ -346,12 +297,17 @@ static OracleResultImpl createCallResult( * {@code ResultSet} as {@link RowSegment}s * @return A {@code Result} for values generated by DML * @param updateCount Update count to publish + * @param dependentCounter Collection of results that depend on the JDBC + * statement which created the {@code generatedKeys} {@code ResultSet} to + * remain open until all results are consumed. * @param generatedKeys Generated values to publish. Not null. * @param adapter Adapts JDBC calls into reactive streams. Not null. */ static OracleResultImpl createGeneratedValuesResult( - long updateCount, ResultSet generatedKeys, ReactiveJdbcAdapter adapter) { - return new GeneratedKeysResult(updateCount, generatedKeys, adapter); + long updateCount, DependentCounter dependentCounter, + ResultSet generatedKeys, ReactiveJdbcAdapter adapter) { + return new GeneratedKeysResult( + updateCount, dependentCounter, generatedKeys, adapter); } /** @@ -425,12 +381,28 @@ private UpdateCountResult(long updateCount) { this.updateCount = updateCount; } + /** + * {@inheritDoc} + *

+ * This method uses Mono's fromSupplier factory to defer segment mapping + * until the publisher is subscribed to. This ensures that segments are + * consumed in the correct order when the returned publisher is concatenated + * after another, as with + * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for + * instance. Additionally, the factory handles any exception thrown by the + * segment mapper by translating it in to an onError signal. + *

+ */ @Override - Publisher publishSegments(Function mappingFunction) { - return updateCount >= 0 - ? Mono.just(new UpdateCountImpl(updateCount)) - .map(mappingFunction) - : Mono.empty(); + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + + if (!segmentType.isAssignableFrom(UpdateCountImpl.class)) + return Mono.empty(); + + return Mono.fromSupplier(() -> + segmentMapper.apply(segmentType.cast( + new UpdateCountImpl(updateCount)))); } } @@ -460,29 +432,36 @@ Publisher publishSegments(Function mappingFunction) { * values from that method. *

*/ - private static final class ResultSetResult extends OracleResultImpl { + private static final class ResultSetResult extends DependentResult { private final ResultSet resultSet; private final RowMetadataImpl metadata; private final ReactiveJdbcAdapter adapter; private ResultSetResult( - ResultSet resultSet, ReactiveJdbcAdapter adapter) { + DependentCounter dependentCounter, ResultSet resultSet, + ReactiveJdbcAdapter adapter) { + super(dependentCounter); this.resultSet = resultSet; this.metadata = createRowMetadata(fromJdbc(resultSet::getMetaData)); this.adapter = adapter; } @Override - Publisher publishSegments(Function mappingFunction) { + protected Publisher mapDependentSegments( + Class segmentType, Function segmentMapper) { - // Avoiding object allocating by reusing the same Row object + if (!segmentType.isAssignableFrom(RowSegmentImpl.class)) + return Mono.empty(); + + // Avoiding object allocation by reusing the same Row object ReusableJdbcReadable reusableJdbcReadable = new ReusableJdbcReadable(); - Row row = createRow(reusableJdbcReadable, metadata, adapter); + Row row = + createRow(dependentCounter, reusableJdbcReadable, metadata, adapter); return adapter.publishRows(resultSet, jdbcReadable -> { reusableJdbcReadable.current = jdbcReadable; - return mappingFunction.apply(new RowSegmentImpl(row)); + return segmentMapper.apply(segmentType.cast(new RowSegmentImpl(row))); }); } @@ -520,40 +499,57 @@ private static final class GeneratedKeysResult extends OracleResultImpl { private final OracleResultImpl generatedKeysResult; private GeneratedKeysResult( - long updateCount, ResultSet generatedKeys, ReactiveJdbcAdapter adapter) { + long updateCount, DependentCounter dependentCounter, + ResultSet generatedKeys, ReactiveJdbcAdapter adapter) { updateCountResult = createUpdateCountResult(updateCount); - generatedKeysResult = createQueryResult(generatedKeys, adapter); + generatedKeysResult = + createQueryResult(dependentCounter, generatedKeys, adapter); } @Override - Publisher publishSegments(Function mappingFunction) { - return Flux.from(updateCountResult.publishSegments(mappingFunction)) - .concatWith(generatedKeysResult.publishSegments(mappingFunction)); + void addDependent() { + generatedKeysResult.addDependent(); + } + + @Override + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + return Flux.concat( + updateCountResult.mapSegments(segmentType, segmentMapper), + generatedKeysResult.mapSegments(segmentType, segmentMapper)); } } /** - * {@link OracleResultImpl} subclass that publishes an single instance of + * {@link OracleResultImpl} subclass that publishes a single instance of * {@link OutParameters} as an {@link OutSegment}. */ - private static final class CallResult extends OracleResultImpl { + private static final class CallResult extends DependentResult { private final OutParameters outParameters; private final ReactiveJdbcAdapter adapter; private CallResult( - OutParameters outParameters, ReactiveJdbcAdapter adapter) { + DependentCounter dependentCounter, OutParameters outParameters, + ReactiveJdbcAdapter adapter) { + super(dependentCounter); this.outParameters = outParameters; this.adapter = adapter; } @Override - Publisher publishSegments(Function mappingFunction) { + protected Publisher mapDependentSegments( + Class segmentType, Function segmentMapper) { + + if (!segmentType.isAssignableFrom(OutSegmentImpl.class)) + return Mono.empty(); + // Acquire the JDBC lock asynchronously as the outParameters are backed // by a JDBC CallableStatement, and it may block a thread when values // are accessed with CallableStatement.getObject(...) return adapter.getLock().get(() -> - mappingFunction.apply(new OutSegmentImpl(outParameters))); + segmentMapper.apply(segmentType.cast( + new OutSegmentImpl(outParameters)))); } } @@ -570,10 +566,17 @@ private BatchUpdateResult(long[] updateCounts) { } @Override - Publisher publishSegments(Function mappingFunction) { - return Flux.fromStream(LongStream.of(updateCounts) - .mapToObj(UpdateCountImpl::new)) - .map(mappingFunction); + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + + if (!segmentType.isAssignableFrom(UpdateCountImpl.class)) + return Mono.empty(); + + return Flux.fromStream( + LongStream.of(updateCounts) + .mapToObj(updateCount -> + segmentMapper.apply(segmentType.cast( + new UpdateCountImpl(updateCount))))); } } @@ -589,8 +592,7 @@ private static final class BatchUpdateErrorResult extends OracleResultImpl { private final BatchUpdateResult batchUpdateResult; private final ErrorResult errorResult; - private BatchUpdateErrorResult( - BatchUpdateException batchUpdateException) { + private BatchUpdateErrorResult(BatchUpdateException batchUpdateException) { batchUpdateResult = new BatchUpdateResult( batchUpdateException.getLargeUpdateCounts()); errorResult = @@ -598,11 +600,13 @@ private BatchUpdateErrorResult( } @Override - Publisher publishSegments(Function mappingFunction) { + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { return Flux.concat( - batchUpdateResult.publishSegments(mappingFunction), - errorResult.publishSegments(mappingFunction)); + batchUpdateResult.mapSegments(segmentType, segmentMapper), + errorResult.mapSegments(segmentType, segmentMapper)); } + } /** @@ -617,10 +621,38 @@ private ErrorResult(R2dbcException r2dbcException) { this.r2dbcException = r2dbcException; } + /** + * {@inheritDoc} + *

+ * Emits the mapping of a message segment, or emits an error if another + * segment type is specified. Unlike other segment types, message segments + * represent an error that must be delivered to user code. Even when user + * code is calling for some other segment type, like rows with + * {@link #map(BiFunction)}, or update counts with + * {@link #getRowsUpdated()}, user code does not want these calls to ignore + * error. If user code really does want to ignore errors, it may call + * {@link #filter(Predicate)} to ignore message segments, or + * {@link #flatMap(Function)} to recover from message segments. + *

+ * This method uses Mono's fromSupplier factory to defer segment mapping + * until the publisher is subscribed to. This ensures that segments are + * consumed in the correct order when the returned publisher is concatenated + * after another, as with + * {@link BatchUpdateErrorResult#mapSegments(Class, Function)}, for + * instance. Additionally, the factory handles any exception thrown by the + * segment mapper by translating it in to an onError signal. + *

+ */ @Override - Publisher publishSegments(Function mappingFunction) { - return Mono.just(new MessageImpl(r2dbcException)) - .map(mappingFunction); + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + + if (! segmentType.isAssignableFrom(MessageImpl.class)) + return Mono.error(r2dbcException); + + return Mono.fromSupplier(() -> + segmentMapper.apply(segmentType.cast( + new MessageImpl(r2dbcException)))); } } @@ -655,20 +687,37 @@ private WarningResult( } @Override - Publisher publishSegments(Function mappingFunction) { - return Flux.fromStream(Stream.iterate( - warning, Objects::nonNull, SQLWarning::getNextWarning) - .map(nextWarning -> - // It is noted that SQL can not be extracted from Oracle JDBC's - // SQLWarning objects, so it must be explicitly provided here. - OracleR2dbcExceptions.toR2dbcException(warning, sql)) - .map(WarningImpl::new)) - .map(mappingFunction) - // Invoke publishSegments(Class, Function) rather than - // publishSegments(Function) to update the state of the result; Namely, - // the state that has the onConsumed Publisher emit a terminal signal. - .concatWith(result != null - ? result.publishSegments(Segment.class,mappingFunction) + void addDependent() { + result.addDependent(); + } + + /** + * @implNote In the 1.0.0 release, message segments for the warning were + * emitted prior to any segments from the {@link #result}. Unless message + * segments were consumed by {@link #flatMap(Function)}, the publisher + * returned to user code would emit onError before emitting values from + * the {@link #result}. + * Revisiting this decision before the 1.1.0 release, it really seems like a + * bad one. It is thought that user code would typically want to consume + * results before handling warnings and errors, and so the order is reversed + * in later releases. Segments are now emitted from the {@link #result} + * first, followed by the message segments. This change in behavior should + * be safe, as the R2DBC SPI does not specify any ordering for this case. + */ + @Override + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + return Flux.concat( + result != null + ? result.mapSegments(segmentType, segmentMapper) + : Mono.empty(), + segmentType.isAssignableFrom(WarningImpl.class) + ? Flux.fromStream(Stream.iterate( + warning, Objects::nonNull, SQLWarning::getNextWarning) + .map(nextWarning -> + segmentMapper.apply(segmentType.cast( + new WarningImpl(toR2dbcException(nextWarning, sql)))))) + // Do not emit warnings unless flatMap or filter will consume them : Mono.empty()); } } @@ -679,6 +728,13 @@ Publisher publishSegments(Function mappingFunction) { */ private static final class FilteredResult extends OracleResultImpl { + /** + * An object that represents a filtered {@code Segment}. This object is + * output by a segment mapping function defined in + * {@link #mapSegments(Class, Function)}. + */ + private static final Object FILTERED = new Object(); + /** Result of segments to publish after applying the {@link #filter} */ private final OracleResultImpl result; @@ -687,21 +743,72 @@ private static final class FilteredResult extends OracleResultImpl { /** * Constructs a new result that applies a {@code filter} when publishing - * segments of a {@code result} + * segments of a {@code result}. */ - private FilteredResult( - OracleResultImpl result, Predicate filter) { + private FilteredResult(OracleResultImpl result, Predicate filter) { this.result = result; this.filter = filter; } @Override - @SuppressWarnings("unchecked") - Publisher publishSegments(Function mappingFunction) { - return result.publishSegments(Segment.class, segment -> - filter.test(segment) - ? mappingFunction.apply(segment) - : (T)FILTERED); + void addDependent() { + result.addDependent(); + } + + /** + * {@inheritDoc} + *

+ * Passes {@code Segment.class} to the {@code mapSegments} method of the + * filtered {@link #result} to map all segments with the filtering + * predicate. Mapping functions must return a non-null value, so it will + * return a dummy object, {@link #FILTERED}, for segments that are filtered + * by the predicate. The {@code FILTERED} object is then filtered by a + * downstream filter operator. + *

+ * It is important that {@code Segment.class} be passed to the + * {@code mapSegments} method of the filtered result, otherwise + * {@code Message} segments would be emitted with {@code onError} and bypass + * the filtering function. If the filter does not exclude a message segment, + * and the {@code segmentMapper} does not accept message segments, only then + * will the exception of the message segment be emitted with onError. + *

+ */ + @Override + protected Publisher mapSegments( + Class segmentType, Function segmentMapper) { + + @SuppressWarnings("unchecked") + U filtered = (U)FILTERED; + + return Flux.from(result.mapSegments( + Segment.class, segment -> { + if (!filter.test(segment)) + return filtered; + + if (segmentType.isAssignableFrom(segment.getClass())) + return segmentMapper.apply(segmentType.cast(segment)); + else if (segment instanceof Message) + throw ((Message)segment).exception(); + else + return filtered; + })) + .filter(next -> next != FILTERED); + } + + /** + * {@inheritDoc} + *

+ * Overridden to also set the filtered result as published. If this method + * is called, then a method of the public API has been called to return + * a publisher from this result. If user code somehow has a reference to the + * filtered result as well, then the filtered result should also throw + * {@code IllegalStateException} if one of its public methods are invoked. + *

+ */ + @Override + protected void setPublished() { + result.setPublished(); + super.setPublished(); } } @@ -766,6 +873,84 @@ public Readable getReadable() { } } + + /** + * A base class for results that depend on a JDBC statement to remain open + * until the result is consumed. This base class handles interactions with + * a {@link DependentCounter} object representing a collection of results + * that depend on a JDBC statement. Subclasses implement + * {@link #mapDependentSegments(Class, Function)} following the same + * specification as {@link #mapSegments(Class, Function)}. + */ + private static abstract class DependentResult extends OracleResultImpl { + + /** + * A collection of results that depend on the JDBC statement which created + * this result to remain open until all results are consumed. + */ + protected final DependentCounter dependentCounter; + + /** + * Constructs a new result that registers and deregisters itself with a + * collection of dependent results. + * @param dependentCounter Collection of dependent results. Not null. + */ + private DependentResult(DependentCounter dependentCounter) { + this.dependentCounter = dependentCounter; + } + + /** + * {@inheritDoc} + *

+ * Adds this result to the collection of dependent results. + *

+ */ + @Override + void addDependent() { + dependentCounter.increment(); + } + + /** + * {@inheritDoc} + *

+ * Delegates to the {@code mapDependentSegments(Class, Function)} method of + * a subclass to perform actual segment mapping. This method ensures that + * this result is removed from the collection of dependent results when the + * segment mapping publisher terminates with {@code onComplete}, + * {@code onError}, or {@code cancel}. + *

+ */ + @Override + protected final Publisher mapSegments( + Class segmentType, Function segmentMapper) { + + @SuppressWarnings("unchecked") + Publisher removeDependent = (Publisher) dependentCounter.decrement(); + + return Flux.concatDelayError( + mapDependentSegments(segmentType, segmentMapper), + removeDependent) + .doOnCancel(() -> + Mono.from(removeDependent).subscribe()); + } + + /** + * Maps segments exactly as specified by + * {@link #mapSegments(Class, Function)}. This method is called from the + * base class implementation of {@link #mapSegments(Class, Function)}. The + * base class implementation ensures that this result is removed from the + * collection of dependents when the segment mapping publisher is + * terminated. + * @param segmentType Class of {@code Segment} to map. Not null. + * @param segmentMapper Maps segments to published values. + * @return A publisher that emits the mapped values. + * @param Segment type to map + * @param Type of mapped value + */ + protected abstract Publisher mapDependentSegments( + Class segmentType, Function segmentMapper); + } + /** * Implementation of {@link UpdateCount}. */ diff --git a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java index 8b76052..2555ae1 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java +++ b/src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java @@ -45,7 +45,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; @@ -53,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.IntStream; @@ -295,7 +295,7 @@ public Statement bind(int index, Object value) { * * @throws IllegalArgumentException {@inheritDoc} * @throws IllegalArgumentException If the {@code identifier} does match a - * case sensitive parameter name that appears in this {@code Statement's} + * case-sensitive parameter name that appears in this {@code Statement's} * SQL command. * @throws IllegalArgumentException If the JDBC PreparedStatement does not * support conversions of the bind value's Java type into a SQL type. @@ -360,7 +360,7 @@ public Statement bindNull(int index, Class type) { * * @throws IllegalArgumentException {@inheritDoc} * @throws IllegalArgumentException If the {@code identifier} does match a - * case sensitive parameter name that appears in this {@code Statement's} + * case-sensitive parameter name that appears in this {@code Statement's} * SQL command. */ @Override @@ -736,10 +736,8 @@ private void bindParameter(int index, Parameter parameter) { requireNonNull(parameter.getType(), "Parameter type is null"); SQLType jdbcType = toJdbcType(r2dbcType); - if (jdbcType == null) { - throw new IllegalArgumentException( - "Unsupported SQL type: " + r2dbcType); - } + if (jdbcType == null) + throw new IllegalArgumentException("Unsupported SQL type: " + r2dbcType); requireSupportedJavaType(parameter.getValue()); bindValues[index] = parameter; @@ -749,7 +747,7 @@ private void bindParameter(int index, Parameter parameter) { * Checks that the specified 0-based {@code index} is within the range of * valid parameter indexes for this statement. * @param index A 0-based parameter index - * @throws IndexOutOfBoundsException If the {@code index} is outside of the + * @throws IndexOutOfBoundsException If the {@code index} is not within the * valid range. */ private void requireValidIndex(int index) { @@ -840,8 +838,7 @@ private static IllegalStateException parameterNotSet() { } /** - * Checks that the class type of an {@code object} is supported as a bind - * value. + * Checks that the class of an {@code object} is supported as a bind value. * @param object Object to check. May be null. * @throws IllegalArgumentException If the class type of {@code object} is not * supported @@ -910,6 +907,14 @@ private class JdbcStatement { /** The {@code PreparedStatement} that is executed */ protected final PreparedStatement preparedStatement; + /** + * Collection of results that depend on the JDBC statement to remain open + * until they are consumed. For instance, a result that retains a + * {@code ResultSet} would depend on the JDBC statement to remain open, as + * the {@code ResultSet} is closed when the JDBC statement is closed. + */ + protected final DependentCounter dependentCounter; + /** The bind values that are set on the {@link #preparedStatement} */ protected final Object[] binds; @@ -928,6 +933,15 @@ private class JdbcStatement { private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) { this.preparedStatement = preparedStatement; this.binds = binds; + + // Add this statement as a "party" (think j.u.c.Phaser) to the dependent + // results by calling increment(). After the Result publisher returned by + // execute() terminates, this statement "arrives" by calling decrement(). + // Calling decrement() after the Result publisher terminates ensures that + // the JDBC statement can not be closed until all results have had a + // chance to be emitted to user code. + dependentCounter = new DependentCounter(closeStatement()); + dependentCounter.increment(); } /** @@ -945,21 +959,32 @@ private JdbcStatement(PreparedStatement preparedStatement, Object[] binds) { * emitted from the {@code bind} or {@code getResults} publishers, or if * {@link PreparedStatement#getWarnings()} yields a warning. *

- * After all {@code Results} have been consumed, the - * {@link #preparedStatement} is closed. + * The {@link #preparedStatement} can only be closed after all results that + * depend on it have been consumed by user code. It is not guaranteed that + * every result created by this statement will actually reach user code; A + * cancel signal may occur at any time. Upon cancellation, no more signals + * are emitted downstream. For this reason, the + * {@link OracleResultImpl#addDependent()} method must be called only when + * it is certain that a result will reach the downstream subscriber. This + * certainty is offered by the {@link Flux#doOnNext(Consumer)} operator. *

- * @return A publisher that emits the result of executing this statement + * @return A publisher that emits the result of executing this statement. + * Not null. */ final Publisher execute() { - return Flux.usingWhen(Mono.just(new ArrayList<>(1)), - results -> - Mono.from(bind()) - .thenMany(executeJdbc()) - .map(this::getWarnings) - .doOnNext(results::add) - .onErrorResume(R2dbcException.class, r2dbcException -> - Mono.just(createErrorResult(r2dbcException))), - this::deallocate); + + Mono deallocate = + Mono.from(deallocate()).cast(OracleResultImpl.class); + + return Flux.concatDelayError( + Mono.from(bind()) + .thenMany(executeJdbc()) + .map(this::getWarnings) + .onErrorResume(R2dbcException.class, r2dbcException -> + Mono.just(createErrorResult(r2dbcException))) + .doOnNext(OracleResultImpl::addDependent), + deallocate) + .doOnCancel(deallocate::subscribe); } /** @@ -1104,7 +1129,7 @@ private OracleResultImpl getCurrentResult(boolean isResultSet) { return fromJdbc(() -> { if (isResultSet) { return createQueryResult( - preparedStatement.getResultSet(), adapter); + dependentCounter, preparedStatement.getResultSet(), adapter); } else { long updateCount = preparedStatement.getLargeUpdateCount(); @@ -1143,71 +1168,43 @@ private OracleResultImpl getWarnings(OracleResultImpl result) { * made to deallocate any remaining resources before emitting the error. *

* The returned publisher subscribes to the {@link #deallocators} - * publisher, and may close the {@link #preparedStatement} if all {@code - * results} have been consumed when this method is called. - *

- * If one or more {@code results} have yet to be consumed, then this method - * arranges for the {@link #preparedStatement} to be closed after all - * results have been consumed. A result may be backed by a - * {@link java.sql.ResultSet} or by {@link CallableStatement}, so the - * {@link #preparedStatement} must remain open until all results have - * been consumed. + * publisher, and may close the {@link #preparedStatement} if all results + * have already been consumed when this method is called. This method + * calls the {@code decrement()} method of {@link #dependentCounter}, in + * balance with the {@code increment()} call that occur in the constructor + * of this statement. *

- * @param results Results that must be consumed before closing the - * {@link #preparedStatement} * @return A publisher that completes when all resources have been * deallocated */ - private Publisher deallocate(Collection results) { - - // Set up a counter that is decremented as each result is consumed. - AtomicInteger unconsumed = new AtomicInteger(results.size()); - - // Set up a publisher that decrements the counter, and closes the - // statement when it reaches zero - Publisher closeStatement = adapter.getLock().run(() -> { - if (unconsumed.decrementAndGet() == 0) - closeStatement(); - }); - - // Tell each unconsumed result to decrement the unconsumed count, and then - // close the statement when the count reaches zero. - for (OracleResultImpl result : results) { - if (!result.onConsumed(closeStatement)) - unconsumed.decrementAndGet(); - } - - // If there are no results, or all results have already been consumed, - // then the returned publisher closes the statement. - if (unconsumed.get() == 0) - addDeallocation(adapter.getLock().run(this::closeStatement)); - + private Publisher deallocate() { + addDeallocation(dependentCounter.decrement()); return deallocators; } /** - * Closes the JDBC {@link #preparedStatement}. This method should only be - * called while holding the - * {@linkplain ReactiveJdbcAdapter#getLock() connection lock} - * @throws SQLException If the statement fails to close. + * @return A publisher that closes the JDBC {@link #preparedStatement} when + * subscribed to. Not null. */ - private void closeStatement() throws SQLException { - try { - // Workaround Oracle JDBC bug #34545179: ResultSet references are - // retained even when the statement is closed. Calling getMoreResults - // with the CLOSE_ALL_RESULTS argument forces the driver to - // de-reference them. - preparedStatement.getMoreResults(CLOSE_ALL_RESULTS); - } - catch (SQLException sqlException) { - // It may be the case that the JDBC connection was closed, and so the - // statement was closed with it. Check for this, and ignore the - // SQLException if so. - if (!jdbcConnection.isClosed()) - throw sqlException; - } + private Publisher closeStatement() { + return adapter.getLock().run(() -> { + try { + // Workaround Oracle JDBC bug #34545179: ResultSet references are + // retained even when the statement is closed. Calling getMoreResults + // with the CLOSE_ALL_RESULTS argument forces the driver to + // de-reference them. + preparedStatement.getMoreResults(CLOSE_ALL_RESULTS); + } + catch (SQLException sqlException) { + // It may be the case that the JDBC connection was closed, and so the + // statement was closed with it. Check for this, and ignore the + // SQLException if so. + if (!jdbcConnection.isClosed()) + throw sqlException; + } - preparedStatement.close(); + preparedStatement.close(); + }); } /** @@ -1323,7 +1320,7 @@ private Publisher convertClobBind( /** * Converts a ByteBuffer to a byte array. The {@code byteBuffer} contents, - * delimited by it's position and limit, are copied into the returned byte + * delimited by its position and limit, are copied into the returned byte * array. No state of the {@code byteBuffer} is mutated, including it's * position, limit, or mark. * @param byteBuffer A ByteBuffer. Not null. Not retained. @@ -1420,7 +1417,10 @@ protected Publisher executeJdbc() { return Flux.concat( super.executeJdbc(), Mono.just(createCallResult( - createOutParameters(new JdbcOutParameters(), metadata, adapter), + dependentCounter, + createOutParameters( + dependentCounter, + new JdbcOutParameters(), metadata, adapter), adapter))); } @@ -1486,14 +1486,13 @@ private JdbcBatch( */ @Override protected Publisher bind() { - @SuppressWarnings({"unchecked"}) - Publisher[] bindPublishers = new Publisher[batchSize]; + Publisher[] bindPublishers = new Publisher[batchSize]; for (int i = 0; i < batchSize; i++) { bindPublishers[i] = Flux.concat( bind(batch.remove()), adapter.getLock().run(preparedStatement::addBatch)); } - return Flux.concat(bindPublishers); + return Flux.concat(bindPublishers).cast(Void.class); } /** @@ -1605,8 +1604,8 @@ protected Publisher executeJdbc() { if (generatedKeys.isBeforeFirst()) { return Mono.just(createGeneratedValuesResult( - preparedStatement.getLargeUpdateCount(), generatedKeys, - adapter)) + preparedStatement.getLargeUpdateCount(), + dependentCounter, generatedKeys, adapter)) .concatWith(super.getResults( preparedStatement.getMoreResults(KEEP_CURRENT_RESULT))); } diff --git a/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java b/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java index 231c1fd..b2ad059 100644 --- a/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java +++ b/src/main/java/oracle/r2dbc/impl/SqlTypeMap.java @@ -98,7 +98,8 @@ final class SqlTypeMap { R2dbcType.TIMESTAMP_WITH_TIME_ZONE), entry(JDBCType.TINYINT, R2dbcType.TINYINT), entry(JDBCType.VARBINARY, R2dbcType.VARBINARY), - entry(JDBCType.VARCHAR, R2dbcType.VARCHAR) + entry(JDBCType.VARCHAR, R2dbcType.VARCHAR), + entry(JDBCType.REF_CURSOR, OracleR2dbcTypes.REF_CURSOR) ); /** diff --git a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java index 4be45b1..a8c1d11 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java @@ -47,11 +47,10 @@ import static oracle.r2dbc.util.Awaits.tryAwaitExecution; import static oracle.r2dbc.util.Awaits.tryAwaitNone; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; /** * Verifies the Oracle R2DBC Driver implements behavior related to {@link Blob} - * and {@link Clob} types that is specified in it's class and method level + * and {@link Clob} types that is specified in its class and method level * javadocs, in the javadocs of {@code Blob} and {@code Clob}, and in Section * 12 of the R2DBC 0.8.2 Specification. */ @@ -353,7 +352,7 @@ public void testClobBatchInsert() { // Expect row.get(int/String) to use Clob as the default Java type // mapping for CLOB type columns. - List> Clobs = awaitMany(Flux.from(connection.createStatement( + List> clobs = awaitMany(Flux.from(connection.createStatement( "SELECT x,y FROM testClobInsert ORDER BY id") .execute()) .flatMap(result -> result.map((row, metadata) -> @@ -361,10 +360,10 @@ public void testClobBatchInsert() { // Expect bytes written to INSERTed Clobs to match the bytes read from // SELECTed Clobs - awaitBytes(xBytes0, Clobs.get(0).get(0)); - awaitBytes(yBytes0, Clobs.get(0).get(1)); - awaitBytes(xBytes1, Clobs.get(1).get(0)); - awaitBytes(yBytes1, Clobs.get(1).get(1)); + awaitBytes(xBytes0, clobs.get(0).get(0)); + awaitBytes(yBytes0, clobs.get(0).get(1)); + awaitBytes(xBytes1, clobs.get(1).get(0)); + awaitBytes(yBytes1, clobs.get(1).get(1)); } finally { diff --git a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java index e5727cf..1498825 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java @@ -650,9 +650,16 @@ public void testOracleR2dbcWarning() { awaitMany(Flux.from(warningStatement.execute()) .flatMap(result -> result.flatMap(Mono::just))); - // Expect the warning segment first. Expect it to have the fixed message - // and error number used by Oracle JDBC for all warnings. - Result.Segment secondSegment = segments.get(0); + // Expect the update count segment first. Warnings are always emitted + // last. + Result.Segment firstSegment = segments.get(0); + assertEquals(0, + assertInstanceOf(UpdateCount.class, firstSegment).value()); + assertFalse(firstSegment instanceof OracleR2dbcWarning); + + // Expect the warning segment after the update count. Expect it to have + // the fixed message and error number used by Oracle JDBC for all warnings + Result.Segment secondSegment = segments.get(1); OracleR2dbcWarning warning = assertInstanceOf(OracleR2dbcWarning.class, secondSegment); assertEquals( @@ -666,14 +673,6 @@ public void testOracleR2dbcWarning() { assertEquals(warning.sqlState(), exception.getSqlState()); assertEquals(sql, exception.getSql()); - - // Expect the update count segment last. Warnings are always emitted - // first. - Result.Segment firstSegment = segments.get(1); - assertEquals(0, - assertInstanceOf(UpdateCount.class, firstSegment).value()); - assertFalse(firstSegment instanceof OracleR2dbcWarning); - // Verify that there are not any more segments assertEquals(2, segments.size()); } @@ -727,19 +726,25 @@ public void testOracleR2dbcWarningIgnored() { public void testOracleR2dbcWarningNotIgnored() { Connection connection = awaitOne(sharedConnection()); try { - // Expect a warning for forcing a view that references a non-existent // table String sql = "CREATE OR REPLACE FORCE VIEW testOracleR2dbcWarningIgnored AS" + " SELECT x FROM thisdoesnotexist"; Statement warningStatement = connection.createStatement(sql); + AtomicInteger segmentIndex = new AtomicInteger(0); awaitError( R2dbcException.class, awaitOne(warningStatement.execute()) .flatMap(segment -> - Mono.error( - assertInstanceOf(OracleR2dbcWarning.class, segment).exception()))); + // Expect the update count first, followed by the warning + segmentIndex.getAndIncrement() == 0 + ? Mono.just(assertInstanceOf(UpdateCount.class, segment) + .value()) + : Mono.error(assertInstanceOf(OracleR2dbcWarning.class, segment) + .exception()))); + // Expect only two segments + assertEquals(2, segmentIndex.get()); } finally { tryAwaitExecution( diff --git a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java index 06211a4..8709fc6 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleRowMetadataImplTest.java @@ -23,19 +23,19 @@ import io.r2dbc.spi.ColumnMetadata; import io.r2dbc.spi.Connection; +import io.r2dbc.spi.Result; import io.r2dbc.spi.RowMetadata; +import io.r2dbc.spi.Statement; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import java.util.function.IntFunction; +import java.util.function.Function; import static java.util.Arrays.asList; import static oracle.r2dbc.test.DatabaseConfig.connectTimeout; @@ -45,7 +45,6 @@ import static oracle.r2dbc.util.Awaits.awaitNone; import static oracle.r2dbc.util.Awaits.awaitOne; import static oracle.r2dbc.util.Awaits.awaitUpdate; -import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -53,7 +52,7 @@ /** * Verifies that - * {@link ReadablesMetadata} implements behavior that is specified in it's + * {@link ReadablesMetadata} implements behavior that is specified in its * class and method level javadocs. */ public class OracleRowMetadataImplTest { @@ -80,23 +79,21 @@ public void testGetColumnMetadataByIndex() { "INSERT INTO testGetColumnMetadataByIndex (x,y) VALUES (0,0)")); // Expect IllegalArgumentException for an index less than 0 - awaitError(IndexOutOfBoundsException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByIndex") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> - metadata.getColumnMetadata(-1).getPrecision()))); + verifyError(IndexOutOfBoundsException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByIndex"), + result -> + result.map((row, metadata) -> + metadata.getColumnMetadata(-1).getPrecision())); // Expect IllegalArgumentException for an index greater than or equal // to the number of columns - awaitError(IndexOutOfBoundsException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByIndex") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> - metadata.getColumnMetadata(2).getPrecision()))); + verifyError(IndexOutOfBoundsException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByIndex"), + result -> + result.map((row, metadata) -> + metadata.getColumnMetadata(2).getPrecision())); // Expect valid indexes to return the column metadata awaitOne(asList(xPrecision, yPrecision), @@ -161,57 +158,43 @@ public void testGetColumnMetadataByName() { "INSERT INTO testGetColumnMetadataByName (x,y) VALUES (0,0)")); // Expect IllegalArgumentException for a null name - awaitError(IllegalArgumentException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata(null)) - )); + verifyError(IllegalArgumentException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata(null))); // Expect NoSuchElementException for unmatched names - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata("z")) - )); - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata("xx")) - )); - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata("x ")) - )); - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata(" x")) - )); - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata(" ")) - )); - awaitError(NoSuchElementException.class, - Flux.from(connection.createStatement( - "SELECT x, y FROM testGetColumnMetadataByName") - .execute()) - .concatMap(result -> - result.map((row, metadata) -> metadata.getColumnMetadata("")) - )); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata("z"))); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata("xx"))); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata("x "))); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata(" x"))); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata(" "))); + verifyError(NoSuchElementException.class, + connection.createStatement( + "SELECT x, y FROM testGetColumnMetadataByName"), + result -> + result.map((row, metadata) -> metadata.getColumnMetadata(""))); // Expect valid names to return the column metadata awaitOne(asList(xPrecision, yPrecision), @@ -431,4 +414,17 @@ public void testGetColumnMetadatas() { } } + /** + * Verifies that an expected class of error is emitted when a mapping function + * is applied to the result of a statement. + */ + private static void verifyError( + Class errorClass, Statement statement, + Function> resultMapper) { + awaitError( + errorClass, + Flux.from(statement.execute()) + .concatMapDelayError(resultMapper)); + } + } diff --git a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java index 509cbb4..7e198a0 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleStatementImplTest.java @@ -35,6 +35,7 @@ import io.r2dbc.spi.Statement; import io.r2dbc.spi.Type; import oracle.r2dbc.OracleR2dbcOptions; +import oracle.r2dbc.OracleR2dbcTypes; import oracle.r2dbc.OracleR2dbcWarning; import oracle.r2dbc.test.DatabaseConfig; import org.junit.jupiter.api.Test; @@ -45,9 +46,12 @@ import java.sql.RowId; import java.sql.SQLWarning; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -55,6 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -68,6 +73,7 @@ import static oracle.r2dbc.test.DatabaseConfig.port; import static oracle.r2dbc.test.DatabaseConfig.serviceName; import static oracle.r2dbc.test.DatabaseConfig.sharedConnection; +import static oracle.r2dbc.test.DatabaseConfig.showErrors; import static oracle.r2dbc.test.DatabaseConfig.sqlTimeout; import static oracle.r2dbc.test.DatabaseConfig.user; import static oracle.r2dbc.util.Awaits.awaitError; @@ -2022,16 +2028,18 @@ public void testWarningMessage() { result.flatMap(segment -> { int index = segmentCount.getAndIncrement(); if (index == 0) { - assertTrue(segment instanceof OracleR2dbcWarning, - "Unexpected Segment: " + segment); - return Mono.just(((OracleR2dbcWarning)segment).exception()); - } - else if (index == 1) { + // Expect the first segment to be an update count assertTrue(segment instanceof UpdateCount, "Unexpected Segment: " + segment); assertEquals(0, ((UpdateCount)segment).value()); return Mono.empty(); } + else if (index == 1) { + // Expect second segment to be a warning + assertTrue(segment instanceof OracleR2dbcWarning, + "Unexpected Segment: " + segment); + return Mono.just(((OracleR2dbcWarning)segment).exception()); + } else { fail("Unexpected Segment: " + segment); return Mono.error(new AssertionError("Should not reach here")); @@ -2238,6 +2246,190 @@ public void testGetSql() { } } + /** + * Verifies that a SYS_REFCURSOR out parameter can be consumed as a + * {@link Result} object. + */ + @Test + public void testRefCursorOut() { + Connection connection = awaitOne(sharedConnection()); + try { + List rows = createRows(100); + + // Create a table with some rows to query + awaitExecution(connection.createStatement( + "CREATE TABLE testRefCursorTable(id NUMBER, value VARCHAR(10))")); + Statement insertStatement = connection.createStatement( + "INSERT INTO testRefCursorTable VALUES (:id, :value)"); + awaitUpdate( + rows.stream() + .map(row -> 1) + .collect(Collectors.toList()), + bindRows(rows, insertStatement)); + + // Create a procedure that returns a cursor over the rows + awaitExecution(connection.createStatement( + "CREATE OR REPLACE PROCEDURE testRefCursorProcedure(" + + " countCursor OUT SYS_REFCURSOR)" + + " IS" + + " BEGIN" + + " OPEN countCursor FOR " + + " SELECT id, value FROM testRefCursorTable" + + " ORDER BY id;" + + " END;")); + + // Call the procedure with the cursor registered as an out parameter, and + // expect it to map to a Result. Then consume the rows of the Result and + // verify they have the expected values inserted above. + awaitMany( + rows, + Flux.from(connection.createStatement( + "BEGIN testRefCursorProcedure(:countCursor); END;") + .bind("countCursor", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .execute()) + .flatMap(result -> + result.map(outParameters -> + outParameters.get("countCursor"))) + .cast(Result.class) + .flatMap(countCursor -> + countCursor.map(row -> + new TestRow( + row.get("id", Integer.class), + row.get("value", String.class))))); + + // Verify the procedure call again. This time using an explicit + // Result.class argument to Row.get(...). Also, this time using + // Result.flatMap to create the publisher within the segment mapping + // function + awaitMany( + rows, + Flux.from(connection.createStatement( + "BEGIN testRefCursorProcedure(:countCursor); END;") + .bind("countCursor", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .execute()) + .flatMap(result -> + result.flatMap(segment -> + ((Result.OutSegment)segment).outParameters() + .get(0, Result.class) + .map(row -> + new TestRow( + row.get("id", Integer.class), + row.get("value", String.class)))))); + } + catch (Exception exception) { + showErrors(connection); + throw exception; + } + finally { + tryAwaitExecution(connection.createStatement( + "DROP PROCEDURE testRefCursorProcedure")); + tryAwaitExecution(connection.createStatement( + "DROP TABLE testRefCursorTable")); + tryAwaitNone(connection.close()); + } + } + + /** + * Verifies that SYS_REFCURSOR out parameters can be consumed as + * {@link Result} objects. + */ + @Test + public void testMultipleRefCursorOut() { + Connection connection = awaitOne(sharedConnection()); + try { + List rows = createRows(100); + + // Create a table with some rows to query + awaitExecution(connection.createStatement( + "CREATE TABLE testMultiRefCursorTable(id NUMBER, value VARCHAR(10))")); + Statement insertStatement = connection.createStatement( + "INSERT INTO testMultiRefCursorTable VALUES (:id, :value)"); + awaitUpdate( + rows.stream() + .map(row -> 1) + .collect(Collectors.toList()), + bindRows(rows, insertStatement)); + + // Create a procedure that returns a multiple cursors over the rows + awaitExecution(connection.createStatement( + "CREATE OR REPLACE PROCEDURE testMultiRefCursorProcedure(" + + " countCursor0 OUT SYS_REFCURSOR," + + " countCursor1 OUT SYS_REFCURSOR)" + + " IS" + + " BEGIN" + + " OPEN countCursor0 FOR " + + " SELECT id, value FROM testMultiRefCursorTable" + + " ORDER BY id;" + + " OPEN countCursor1 FOR " + + " SELECT id, value FROM testMultiRefCursorTable" + + " ORDER BY id DESC;" + + " END;")); + + // Call the procedure with the cursors registered as out parameters, and + // expect them to map to Results. Then consume the rows of each Result and + // verify they have the expected values inserted above. + List expectedRows = new ArrayList<>(rows); + Collections.reverse(rows); + expectedRows.addAll(rows); + awaitMany( + expectedRows, + Flux.from(connection.createStatement( + "BEGIN testMultiRefCursorProcedure(:countCursor0, :countCursor1); END;") + .bind("countCursor0", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .bind("countCursor1", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .execute()) + .flatMap(result -> + result.map(outParameters -> + List.of( + (Result)outParameters.get("countCursor0"), + (Result)outParameters.get("countCursor1")))) + .flatMap(results -> + Flux.concat( + results.get(0).map(row -> + new TestRow( + row.get("id", Integer.class), + row.get("value", String.class))), + results.get(1).map(row -> + new TestRow( + row.get("id", Integer.class), + row.get("value", String.class)))))); + + // Run the same verification, this time with Result.class argument to + // Row.get(...), and mapping the REF CURSOR Results into a Publisher + // within the row mapping function + awaitMany( + expectedRows, + Flux.from(connection.createStatement( + "BEGIN testMultiRefCursorProcedure(:countCursor0, :countCursor1); END;") + .bind("countCursor0", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .bind("countCursor1", Parameters.out(OracleR2dbcTypes.REF_CURSOR)) + .execute()) + .flatMap(result -> + result.map(outParameters -> + Flux.concat( + outParameters.get("countCursor0", Result.class).map(row -> + new TestRow( + row.get(0, Integer.class), + row.get(1, String.class))), + outParameters.get("countCursor1", Result.class).map(row -> + new TestRow( + row.get(0, Integer.class), + row.get(1, String.class)))))) + .flatMap(Function.identity())); + } + catch (Exception exception) { + showErrors(connection); + throw exception; + } + finally { + tryAwaitExecution(connection.createStatement( + "DROP PROCEDURE testMultiRefCursorProcedure")); + tryAwaitExecution(connection.createStatement( + "DROP TABLE testMultiRefCursorTable")); + tryAwaitNone(connection.close()); + } + } + // TODO: Repalce with Parameters.inOut when that's available private static final class InOutParameter implements Parameter, Parameter.In, Parameter.Out { @@ -2403,4 +2595,55 @@ private void verifyConcurrentFetch(Connection connection) { "DROP TABLE testConcurrentFetch")); } } + + /** + * Creates list of a specified length of test table rows + */ + private static List createRows(int length) { + return IntStream.range(0, 100) + .mapToObj(id -> new TestRow(id, String.valueOf(id))) + .collect(Collectors.toList()); + } + + /** Binds a list of rows to a batch statement */ + private Statement bindRows(List rows, Statement statement) { + rows.stream() + .limit(rows.size() - 1) + .forEach(row -> + statement + .bind("id", row.id) + .bind("value", row.value) + .add()); + + statement + .bind("id", rows.get(rows.size() - 1).id) + .bind("value", rows.get(rows.size() - 1).value); + + return statement; + } + + /** + * A row of a test table. + */ + private static class TestRow { + final int id; + final String value; + + TestRow(int id, String value) { + this.id = id; + this.value = value; + } + + @Override + public boolean equals(Object other) { + return other instanceof TestRow + && id == ((TestRow)other).id + && Objects.equals(value, ((TestRow)other).value); + } + + @Override + public String toString() { + return "[id=" + id + ", value=" + value + "]"; + } + } } diff --git a/src/test/java/oracle/r2dbc/util/Awaits.java b/src/test/java/oracle/r2dbc/util/Awaits.java index 81b4976..a9c241a 100644 --- a/src/test/java/oracle/r2dbc/util/Awaits.java +++ b/src/test/java/oracle/r2dbc/util/Awaits.java @@ -108,7 +108,7 @@ public static void awaitError( Class errorType, Publisher errorPublisher) { assertThrows( errorType, - () -> Mono.from(errorPublisher).block(sqlTimeout()), + () -> Flux.from(errorPublisher).blockLast(sqlTimeout()), "Unexpected signal from Publisher of an error"); }