Skip to content

Commit

Permalink
fix: Wiring bigQueryThreadPoolTaskScheduler with writeJsonStream (
Browse files Browse the repository at this point in the history
…#1855)

User can use bigQueryThreadPoolTaskScheduler in order to avoid java.lang.OutOfMemoryError which was arising due to too many concurrent write threads working.

Fixes: #1599
  • Loading branch information
prash-mi authored Jun 20, 2023
1 parent e94438a commit 6467a08
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public BigQueryTemplate bigQueryTemplate(
BigQuery bigQuery,
BigQueryWriteClient bigQueryWriteClient,
@Qualifier("bigQueryThreadPoolTaskScheduler")
ThreadPoolTaskScheduler bigQueryThreadPoolTaskScheduler) {
ThreadPoolTaskScheduler bigQueryThreadPoolTaskScheduler) {
Map<String, Object> bqInitSettings = new HashMap<>();
bqInitSettings.put("DATASET_NAME", this.datasetName);
bqInitSettings.put("JSON_WRITER_BATCH_SIZE", this.jsonWriterBatchSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import org.json.JSONArray;
Expand Down Expand Up @@ -247,39 +249,34 @@ public Table createTable(
public CompletableFuture<WriteApiResponse> writeJsonStream(
String tableName, InputStream jsonInputStream) {

CompletableFuture<WriteApiResponse> writeApiFutureResponse =
new CompletableFuture<>();

Thread asyncTask =
new Thread(
() -> {
try {
WriteApiResponse apiResponse = getWriteApiResponse(tableName, jsonInputStream);
writeApiFutureResponse.complete(apiResponse);
} catch (DescriptorValidationException | IOException e) {
writeApiFutureResponse.completeExceptionally(e);
logger.warn(String.format("Error: %s %n", e.getMessage()), e);
} catch (Exception e) {
writeApiFutureResponse.completeExceptionally(e);
// Restore interrupted state in case of an InterruptedException
Thread.currentThread().interrupt();
}
});
asyncTask
.start(); // run the thread async so that we can return the writeApiFutureResponse. This
// thread can be run in the ExecutorService when it has been wired-in

CompletableFuture<WriteApiResponse> writeApiFutureResponse = new CompletableFuture<>();
Runnable asyncTask =
() -> {
try {
WriteApiResponse apiResponse = getWriteApiResponse(tableName, jsonInputStream);
writeApiFutureResponse.complete(apiResponse);
} catch (Exception e) {
writeApiFutureResponse.completeExceptionally(e);
// Restore interrupted state in case of an InterruptedException
Thread.currentThread().interrupt();
logger.warn("Unable to get write API response.", e);
}
};
ScheduledFuture<?> asyncTaskScheduledFuture =
taskScheduler.schedule(asyncTask, Instant.now()); // Run this task
// register success and failure callback
writeApiFutureResponse.whenComplete(
(writeApiResponse, exception) -> {
if (exception != null) {
asyncTask.interrupt(); // interrupt the thread as the developer might have cancelled the
// Future.
// This can be replaced with interrupting the ExecutorService when it has been wired-in
logger.info("asyncTask interrupted");
return;
logger.error("asyncTask interrupted", exception);
if (exception instanceof CancellationException) { // user have cancelled it
asyncTaskScheduledFuture.cancel(true);
}
} else if (writeApiResponse != null && !writeApiResponse.isSuccessful()) {
logger.warn("Write operation failed");
} else {
logger.info("Data successfully written");
}
logger.info("Data successfully written");
});

return writeApiFutureResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -60,7 +61,7 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.scheduling.concurrent.DefaultManagedTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

@ExtendWith(MockitoExtension.class)
class BigQueryTemplateTest {
Expand All @@ -80,6 +81,7 @@ class BigQueryTemplateTest {
private BigQueryOptions options;
private final Map<String, Object> bqInitSettings = new HashMap<>();
BigQueryTemplate bqTemplateSpy;
BigQueryTemplate bqTemplateDefaultPoolSpy;

private Schema getDefaultSchema() {
return Schema.of(
Expand All @@ -103,22 +105,33 @@ public void setUp() {
bigquery = options.getService();
bqInitSettings.put("DATASET_NAME", DATASET);
bqInitSettings.put("JSON_WRITER_BATCH_SIZE", JSON_WRITER_BATCH_SIZE);

BigQueryTemplate bqTemplate =
new BigQueryTemplate(
bigquery, bigQueryWriteClientMock, bqInitSettings, new DefaultManagedTaskScheduler());
bigquery, bigQueryWriteClientMock, bqInitSettings, getThreadPoolTaskScheduler());
bqTemplateSpy = Mockito.spy(bqTemplate);
BigQueryTemplate bqTemplateDefaultPool =
new BigQueryTemplate(
bigquery, bigQueryWriteClientMock, bqInitSettings, getThreadPoolTaskScheduler());
bqTemplateDefaultPoolSpy = Mockito.spy(bqTemplateDefaultPool);
}

private BigQueryOptions createBigQueryOptionsForProject(
BigQueryRpcFactory rpcFactory) {
private BigQueryOptions createBigQueryOptionsForProject(BigQueryRpcFactory rpcFactory) {
return BigQueryOptions.newBuilder()
.setProjectId(BigQueryTemplateTest.PROJECT)
.setServiceRpcFactory(rpcFactory)
.setRetrySettings(ServiceOptions.getNoRetrySettings())
.build();
}

private ThreadPoolTaskScheduler getThreadPoolTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.setThreadNamePrefix("gcp-bigquery");
scheduler.setDaemon(true);
scheduler.initialize();
return scheduler;
}

@Test
void getDatasetNameTest() {
assertThat(bqTemplateSpy.getDatasetName()).isEqualTo(DATASET);
Expand All @@ -131,10 +144,8 @@ void getJsonWriterBatchSizeTest() {

@Test
void setAutoDetectSchemaTest() {
assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(true))
.doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(false))
.doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(true)).doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setAutoDetectSchema(false)).doesNotThrowAnyException();
}

@Test
Expand All @@ -143,8 +154,7 @@ void setWriteDispositionTest() {
.doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_APPEND))
.doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_EMPTY))
.doesNotThrowAnyException();
assertThatCode(() -> bqTemplateSpy.setWriteDisposition(WRITE_EMPTY)).doesNotThrowAnyException();
}

@Test
Expand Down Expand Up @@ -223,6 +233,67 @@ void writeJsonStreamTest()
assertEquals(0, apiRes.getErrors().size());
}

@Test
void writeJsonStreamTestDefaultPool()
throws DescriptorValidationException, IOException, InterruptedException,
ExecutionException { // Tests the constructor which doesn't have jsonWriterExecutorService
// as the param

InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes());
WriteApiResponse apiResponse = new WriteApiResponse();
apiResponse.setSuccessful(true);
doReturn(apiResponse)
.when(bqTemplateDefaultPoolSpy)
.getWriteApiResponse(any(String.class), any(InputStream.class));
CompletableFuture<WriteApiResponse> futRes =
bqTemplateDefaultPoolSpy.writeJsonStream(TABLE, jsonInputStream);
WriteApiResponse apiRes = futRes.get();
assertTrue(apiRes.isSuccessful());
assertEquals(0, apiRes.getErrors().size());
}

@Test
void writeJsonStreamNegativeTest() {
try (InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes())) {
WriteApiResponse apiResponse = new WriteApiResponse();
apiResponse.setSuccessful(false);
doReturn(apiResponse)
.when(bqTemplateSpy)
.getWriteApiResponse(any(String.class), any(InputStream.class));

CompletableFuture<WriteApiResponse> futRes =
bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream);
WriteApiResponse apiRes = futRes.get();
assertThat(apiRes.isSuccessful()).isFalse();
assertEquals(0, apiRes.getErrors().size());
} catch (Exception e) {
fail("Error initialising the InputStream");
}
}

@Test
void writeJsonStreamThrowTest() {
try (InputStream jsonInputStream = new ByteArrayInputStream(newLineSeperatedJson.getBytes())) {
String failureMsg = "Operation failed";
Exception ioException = new IOException(failureMsg);
doThrow(ioException)
.when(bqTemplateSpy)
.getWriteApiResponse(any(String.class), any(InputStream.class));

CompletableFuture<WriteApiResponse> futRes =
bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream);
try {
futRes.get();
fail();
} catch (Exception ex) {
assertThat(ex.getCause() instanceof IOException).isTrue();
assertThat(ex.getCause().getMessage()).isEqualTo(failureMsg);
}
} catch (Exception e) {
fail("Error initialising the InputStream");
}
}

@Test
void writeJsonStreamWithSchemaTest()
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
Expand All @@ -245,7 +316,6 @@ void writeJsonStreamWithSchemaTest()
assertEquals(0, apiRes.getErrors().size());
}


@Test
void writeJsonStreamFailsOnGenericWritingException()
throws DescriptorValidationException, IOException, InterruptedException {
Expand All @@ -261,8 +331,6 @@ void writeJsonStreamFailsOnGenericWritingException()

CompletableFuture<WriteApiResponse> futRes =
bqTemplateSpy.writeJsonStream(TABLE, jsonInputStream, getDefaultSchema());
assertThat(futRes)
.withFailMessage("boom!")
.failsWithin(Duration.ofSeconds(1));
assertThat(futRes).withFailMessage("boom!").failsWithin(Duration.ofSeconds(1));
}
}

0 comments on commit 6467a08

Please sign in to comment.