Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Wiring bigQueryThreadPoolTaskScheduler with writeJsonStream #1855

Merged
merged 27 commits into from
Jun 20, 2023

Conversation

prash-mi
Copy link
Contributor

@prash-mi prash-mi commented May 16, 2023

This PR Wires ThreadPoolTaskScheduler with writeJsonStream.

User can use bigQueryThreadPoolTaskScheduler in order to avoid java.lang.OutOfMemoryError which was arising due to too many concurrent write threads working.

Fixes #1599

@prash-mi prash-mi marked this pull request as ready for review May 19, 2023 13:59
@@ -115,6 +120,9 @@ public BigQueryTemplate(
this.datasetName = bqDatasetName;
this.taskScheduler = taskScheduler;
this.bigQueryWriteClient = bigQueryWriteClient;
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unchecked cast from TaskScheduler to ThreadPoolTaskScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to ExecutorService instead

@@ -86,6 +90,7 @@ public class BigQueryTemplate implements BigQueryOperations {
private final Logger logger = LoggerFactory.getLogger(BigQueryTemplate.class);

private final int jsonWriterBatchSize;
private final ScheduledExecutorService executorService;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just ExecutorService? You're not using the scheduling features.
Also, the variable should probably be called jsonWriterExecutorService for clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, wired a ExecutorService

@@ -115,6 +120,9 @@ public BigQueryTemplate(
this.datasetName = bqDatasetName;
this.taskScheduler = taskScheduler;
this.bigQueryWriteClient = bigQueryWriteClient;
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskScheduler is "used to poll for the status of long-running BigQuery operations". I don't think it should be re-purposed for json writing as well.
Instead, I would add a setter for jsonWriterExecutorService and wire in a default one from BigQueryAutoConfiguration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, that's a better approach, I have updated the implementation accordingly, thanks

// This can be replaced with interrupting the ExecutorService when it has been wired-in
logger.info("asyncTask interrupted");
if (exception != null || !writeApiResponse.isSuccessful()) {
logger.error("asyncTask interrupted");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does "asyncTask interrupted" still make sense? Why not just log the exception and/or the write response error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, exception might be null so logging it when it's not null, and appending Write operation failed warning otherwise

*/
public BigQueryTemplate(
BigQuery bigQuery,
BigQueryWriteClient bigQueryWriteClient,
Map<String, Object> bqInitSettings,
TaskScheduler taskScheduler) {
TaskScheduler taskScheduler,
ExecutorService jsonWriterExecutorService) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Either add a setter or an overloaded constructor.

Copy link
Contributor Author

@prash-mi prash-mi May 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I thought that we have updated the bigQueryTemplate bean @ GcpBigQueryAutoConfiguration , so we are safe from this breaking change! But you are right and a user might directly initialise BigQueryTemplate.

So, I have added an overloaded constructor where the user need not pass jsonWriterExecutorService (it initialises a default FixedThreadPool)

Copy link
Member

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prash-mi Can we try re-using the bigQueryThreadPoolTaskScheduler instead of introducing a new ExecutorService specific to jsonWriter? I think it will be simpler and more clear.

@prash-mi prash-mi requested review from a team as code owners June 9, 2023 05:22
@prash-mi
Copy link
Contributor Author

prash-mi commented Jun 9, 2023

@prash-mi Can we try re-using the bigQueryThreadPoolTaskScheduler instead of introducing a new ExecutorService specific to jsonWriter? I think it will be simpler and more clear.

@meltsufin sure, I have reused bigQueryThreadPoolTaskScheduler PTAL.
Currently I have left jsonWriterExecutorService as is in the code as later we might be able to use it with any new capabilities we implement (with supporting documentation). Please let me know if it looks good, or if we should remove it from all the layers (GcpBigQueryAutoConfiguration and Unit Tests)

Copy link
Member

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking to have the jsonWriterExecutorService and everything associated with it to be removed entirely. Do you see a problem with that?

@prash-mi
Copy link
Contributor Author

I was looking to have the jsonWriterExecutorService and everything associated with it to be removed entirely. Do you see a problem with that?

Done @meltsufin, I have removed jsonWriterExecutorService from all the layers

@prash-mi prash-mi changed the title fix: Wiring ExecutorService with writeJsonStream fix: Wiring bigQueryThreadPoolTaskScheduler with writeJsonStream Jun 15, 2023
Copy link
Member

@meltsufin meltsufin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just final polish comments.

Map<String, Object> bqInitSettings = new HashMap<>();
bqInitSettings.put("DATASET_NAME", this.datasetName);
bqInitSettings.put("JSON_WRITER_BATCH_SIZE", this.jsonWriterBatchSize);
bqInitSettings.put("JSON_WRITER_THREAD_POOL_SIZE", this.threadPoolSize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it was used with the previous ExecutorService. Removed it

@@ -81,6 +83,9 @@ public class BigQueryTemplate implements BigQueryOperations {
private static final int DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE =
1000; // write records in batches of 1000

private static final int DEFAULT_JSON_WRITER_THREAD_POOL_SIZE =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unused.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@sonarcloud
Copy link

sonarcloud bot commented Jun 16, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 1 Code Smell

82.4% 82.4% Coverage
0.0% 0.0% Duplication

asyncTaskScheduledFuture.cancel(true);
}
} else if (writeApiResponse != null && !writeApiResponse.isSuccessful()) {
logger.warn("Write operation failed");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to log a message from the writeApiResponse here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be tricky to log it here as WriteApiResponse returns a List<StorageError> using writeApiResponse.getErrors() (as it's a batch operation), users have access to writeApiResponse, so they might need to process it.

@meltsufin meltsufin merged commit 6467a08 into GoogleCloudPlatform:main Jun 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BigQueryTemplate.writeJsonStream API throwing java.lang.OutOfMemoryError: unable to create new native thread
3 participants