From 6467a089e235efbebfbe5f38c83f1d18bebb9b78 Mon Sep 17 00:00:00 2001 From: Prashant Mishra <11733935+prash-mi@users.noreply.github.com> Date: Tue, 20 Jun 2023 19:23:12 +0530 Subject: [PATCH] fix: Wiring `bigQueryThreadPoolTaskScheduler ` with `writeJsonStream` (#1855) User can use bigQueryThreadPoolTaskScheduler in order to avoid java.lang.OutOfMemoryError which was arising due to too many concurrent write threads working. Fixes: #1599 --- .../GcpBigQueryAutoConfiguration.java | 2 +- .../bigquery/core/BigQueryTemplate.java | 53 +++++----- .../spring/bigquery/BigQueryTemplateTest.java | 98 ++++++++++++++++--- 3 files changed, 109 insertions(+), 44 deletions(-) diff --git a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/bigquery/GcpBigQueryAutoConfiguration.java b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/bigquery/GcpBigQueryAutoConfiguration.java index f23dd4f79e..88111ffe8e 100644 --- a/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/bigquery/GcpBigQueryAutoConfiguration.java +++ b/spring-cloud-gcp-autoconfigure/src/main/java/com/google/cloud/spring/autoconfigure/bigquery/GcpBigQueryAutoConfiguration.java @@ -134,7 +134,7 @@ public BigQueryTemplate bigQueryTemplate( BigQuery bigQuery, BigQueryWriteClient bigQueryWriteClient, @Qualifier("bigQueryThreadPoolTaskScheduler") - ThreadPoolTaskScheduler bigQueryThreadPoolTaskScheduler) { + ThreadPoolTaskScheduler bigQueryThreadPoolTaskScheduler) { Map bqInitSettings = new HashMap<>(); bqInitSettings.put("DATASET_NAME", this.datasetName); bqInitSettings.put("JSON_WRITER_BATCH_SIZE", this.jsonWriterBatchSize); diff --git a/spring-cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java b/spring-cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java index fe685ba460..0e9502f2d7 100644 --- a/spring-cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java +++ b/spring-cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java @@ -44,7 +44,9 @@ import java.io.OutputStream; import java.nio.channels.Channels; import java.time.Duration; +import java.time.Instant; import java.util.Map; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; import org.json.JSONArray; @@ -247,39 +249,34 @@ public Table createTable( public CompletableFuture writeJsonStream( String tableName, InputStream jsonInputStream) { - CompletableFuture writeApiFutureResponse = - new CompletableFuture<>(); - - Thread asyncTask = - new Thread( - () -> { - try { - WriteApiResponse apiResponse = getWriteApiResponse(tableName, jsonInputStream); - writeApiFutureResponse.complete(apiResponse); - } catch (DescriptorValidationException | IOException e) { - writeApiFutureResponse.completeExceptionally(e); - logger.warn(String.format("Error: %s %n", e.getMessage()), e); - } catch (Exception e) { - writeApiFutureResponse.completeExceptionally(e); - // Restore interrupted state in case of an InterruptedException - Thread.currentThread().interrupt(); - } - }); - asyncTask - .start(); // run the thread async so that we can return the writeApiFutureResponse. This - // thread can be run in the ExecutorService when it has been wired-in - + CompletableFuture writeApiFutureResponse = new CompletableFuture<>(); + Runnable asyncTask = + () -> { + try { + WriteApiResponse apiResponse = getWriteApiResponse(tableName, jsonInputStream); + writeApiFutureResponse.complete(apiResponse); + } catch (Exception e) { + writeApiFutureResponse.completeExceptionally(e); + // Restore interrupted state in case of an InterruptedException + Thread.currentThread().interrupt(); + logger.warn("Unable to get write API response.", e); + } + }; + ScheduledFuture asyncTaskScheduledFuture = + taskScheduler.schedule(asyncTask, Instant.now()); // Run this task // register success and failure callback writeApiFutureResponse.whenComplete( (writeApiResponse, exception) -> { if (exception != null) { - asyncTask.interrupt(); // interrupt the thread as the developer might have cancelled the - // Future. - // This can be replaced with interrupting the ExecutorService when it has been wired-in - logger.info("asyncTask interrupted"); - return; + logger.error("asyncTask interrupted", exception); + if (exception instanceof CancellationException) { // user have cancelled it + asyncTaskScheduledFuture.cancel(true); + } + } else if (writeApiResponse != null && !writeApiResponse.isSuccessful()) { + logger.warn("Write operation failed"); + } else { + logger.info("Data successfully written"); } - logger.info("Data successfully written"); }); return writeApiFutureResponse; diff --git a/spring-cloud-gcp-bigquery/src/test/java/com/google/cloud/spring/bigquery/BigQueryTemplateTest.java b/spring-cloud-gcp-bigquery/src/test/java/com/google/cloud/spring/bigquery/BigQueryTemplateTest.java index 59ba2c4a80..115816b451 100644 --- a/spring-cloud-gcp-bigquery/src/test/java/com/google/cloud/spring/bigquery/BigQueryTemplateTest.java +++ b/spring-cloud-gcp-bigquery/src/test/java/com/google/cloud/spring/bigquery/BigQueryTemplateTest.java @@ -25,6 +25,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -60,7 +61,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; -import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @ExtendWith(MockitoExtension.class) class BigQueryTemplateTest { @@ -80,6 +81,7 @@ class BigQueryTemplateTest { private BigQueryOptions options; private final Map bqInitSettings = new HashMap<>(); BigQueryTemplate bqTemplateSpy; + BigQueryTemplate bqTemplateDefaultPoolSpy; private Schema getDefaultSchema() { return Schema.of( @@ -103,15 +105,17 @@ public void setUp() { bigquery = options.getService(); bqInitSettings.put("DATASET_NAME", DATASET); bqInitSettings.put("JSON_WRITER_BATCH_SIZE", JSON_WRITER_BATCH_SIZE); - BigQueryTemplate bqTemplate = new BigQueryTemplate( - bigquery, bigQueryWriteClientMock, bqInitSettings, new DefaultManagedTaskScheduler()); + bigquery, bigQueryWriteClientMock, bqInitSettings, getThreadPoolTaskScheduler()); bqTemplateSpy = Mockito.spy(bqTemplate); + BigQueryTemplate bqTemplateDefaultPool = + new BigQueryTemplate( + bigquery, bigQueryWriteClientMock, bqInitSettings, getThreadPoolTaskScheduler()); + bqTemplateDefaultPoolSpy = Mockito.spy(bqTemplateDefaultPool); } - private BigQueryOptions createBigQueryOptionsForProject( - BigQueryRpcFactory rpcFactory) { + private BigQueryOptions createBigQueryOptionsForProject(BigQueryRpcFactory rpcFactory) { return BigQueryOptions.newBuilder() .setProjectId(BigQueryTemplateTest.PROJECT) .setServiceRpcFactory(rpcFactory) @@ -119,6 +123,15 @@ private BigQueryOptions createBigQueryOptionsForProject( .build(); } + private ThreadPoolTaskScheduler getThreadPoolTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(5); + scheduler.setThreadNamePrefix("gcp-bigquery"); + scheduler.setDaemon(true); + scheduler.initialize(); + return scheduler; + } + @Test void getDatasetNameTest() { assertThat(bqTemplateSpy.getDatasetName()).isEqualTo(DATASET); @@ -131,10 +144,8 @@ void getJsonWriterBatchSizeTest() { @Test void setAutoDetectSchemaTest() { - assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(true)) - .doesNotThrowAnyException(); - assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(false)) - .doesNotThrowAnyException(); + assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(true)).doesNotThrowAnyException(); + assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(false)).doesNotThrowAnyException(); } @Test @@ -143,8 +154,7 @@ void setWriteDispositionTest() { .doesNotThrowAnyException(); assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_APPEND)) .doesNotThrowAnyException(); - assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_EMPTY)) - .doesNotThrowAnyException(); + assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_EMPTY)).doesNotThrowAnyException(); } @Test @@ -223,6 +233,67 @@ void writeJsonStreamTest() assertEquals(0, apiRes.getErrors().size()); } + @Test + void writeJsonStreamTestDefaultPool() + throws DescriptorValidationException, IOException, InterruptedException, + ExecutionException { // Tests the constructor which doesn't have jsonWriterExecutorService + // as the param + + InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes()); + WriteApiResponse apiResponse = new WriteApiResponse(); + apiResponse.setSuccessful(true); + doReturn(apiResponse) + .when(bqTemplateDefaultPoolSpy) + .getWriteApiResponse(any(String.class), any(InputStream.class)); + CompletableFuture futRes = + bqTemplateDefaultPoolSpy.writeJsonStream(TABLE, jsonInputStream); + WriteApiResponse apiRes = futRes.get(); + assertTrue(apiRes.isSuccessful()); + assertEquals(0, apiRes.getErrors().size()); + } + + @Test + void writeJsonStreamNegativeTest() { + try (InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes())) { + WriteApiResponse apiResponse = new WriteApiResponse(); + apiResponse.setSuccessful(false); + doReturn(apiResponse) + .when(bqTemplateSpy) + .getWriteApiResponse(any(String.class), any(InputStream.class)); + + CompletableFuture futRes = + bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream); + WriteApiResponse apiRes = futRes.get(); + assertThat(apiRes.isSuccessful()).isFalse(); + assertEquals(0, apiRes.getErrors().size()); + } catch (Exception e) { + fail("Error initialising the InputStream"); + } + } + + @Test + void writeJsonStreamThrowTest() { + try (InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes())) { + String failureMsg = "Operation failed"; + Exception ioException = new IOException(failureMsg); + doThrow(ioException) + .when(bqTemplateSpy) + .getWriteApiResponse(any(String.class), any(InputStream.class)); + + CompletableFuture futRes = + bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream); + try { + futRes.get(); + fail(); + } catch (Exception ex) { + assertThat(ex.getCause() instanceof IOException).isTrue(); + assertThat(ex.getCause().getMessage()).isEqualTo(failureMsg); + } + } catch (Exception e) { + fail("Error initialising the InputStream"); + } + } + @Test void writeJsonStreamWithSchemaTest() throws DescriptorValidationException, IOException, InterruptedException, ExecutionException { @@ -245,7 +316,6 @@ void writeJsonStreamWithSchemaTest() assertEquals(0, apiRes.getErrors().size()); } - @Test void writeJsonStreamFailsOnGenericWritingException() throws DescriptorValidationException, IOException, InterruptedException { @@ -261,8 +331,6 @@ void writeJsonStreamFailsOnGenericWritingException() CompletableFuture futRes = bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream, getDefaultSchema()); - assertThat(futRes) - .withFailMessage("boom!") - .failsWithin(Duration.ofSeconds(1)); + assertThat(futRes).withFailMessage("boom!").failsWithin(Duration.ofSeconds(1)); } }