-
Notifications
You must be signed in to change notification settings - Fork 310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Wiring bigQueryThreadPoolTaskScheduler
with writeJsonStream
#1855
fix: Wiring bigQueryThreadPoolTaskScheduler
with writeJsonStream
#1855
Conversation
@@ -115,6 +120,9 @@ public BigQueryTemplate( | |||
this.datasetName = bqDatasetName; | |||
this.taskScheduler = taskScheduler; | |||
this.bigQueryWriteClient = bigQueryWriteClient; | |||
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unchecked cast from TaskScheduler
to ThreadPoolTaskScheduler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed it to ExecutorService
instead
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Outdated
Show resolved
Hide resolved
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Outdated
Show resolved
Hide resolved
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Show resolved
Hide resolved
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Outdated
Show resolved
Hide resolved
...-cloud-gcp-bigquery/src/test/java/com/google/cloud/spring/bigquery/BigQueryTemplateTest.java
Show resolved
Hide resolved
@@ -86,6 +90,7 @@ public class BigQueryTemplate implements BigQueryOperations { | |||
private final Logger logger = LoggerFactory.getLogger(BigQueryTemplate.class); | |||
|
|||
private final int jsonWriterBatchSize; | |||
private final ScheduledExecutorService executorService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just ExecutorService
? You're not using the scheduling features.
Also, the variable should probably be called jsonWriterExecutorService
for clarity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, wired a ExecutorService
@@ -115,6 +120,9 @@ public BigQueryTemplate( | |||
this.datasetName = bqDatasetName; | |||
this.taskScheduler = taskScheduler; | |||
this.bigQueryWriteClient = bigQueryWriteClient; | |||
ThreadPoolTaskScheduler threadPoolTaskScheduler = (ThreadPoolTaskScheduler) taskScheduler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskScheduler
is "used to poll for the status of long-running BigQuery operations". I don't think it should be re-purposed for json writing as well.
Instead, I would add a setter for jsonWriterExecutorService
and wire in a default one from BigQueryAutoConfiguration
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, that's a better approach, I have updated the implementation accordingly, thanks
// This can be replaced with interrupting the ExecutorService when it has been wired-in | ||
logger.info("asyncTask interrupted"); | ||
if (exception != null || !writeApiResponse.isSuccessful()) { | ||
logger.error("asyncTask interrupted"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does "asyncTask interrupted" still make sense? Why not just log the exception and/or the write response error message?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, exception
might be null
so logging it when it's not null
, and appending Write operation failed
warning otherwise
*/ | ||
public BigQueryTemplate( | ||
BigQuery bigQuery, | ||
BigQueryWriteClient bigQueryWriteClient, | ||
Map<String, Object> bqInitSettings, | ||
TaskScheduler taskScheduler) { | ||
TaskScheduler taskScheduler, | ||
ExecutorService jsonWriterExecutorService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. Either add a setter or an overloaded constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I thought that we have updated the bigQueryTemplate
bean @ GcpBigQueryAutoConfiguration
, so we are safe from this breaking change! But you are right and a user might directly initialise BigQueryTemplate
.
So, I have added an overloaded constructor where the user need not pass jsonWriterExecutorService
(it initialises a default FixedThreadPool)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prash-mi Can we try re-using the bigQueryThreadPoolTaskScheduler
instead of introducing a new ExecutorService
specific to jsonWriter? I think it will be simpler and more clear.
@meltsufin sure, I have reused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking to have the jsonWriterExecutorService
and everything associated with it to be removed entirely. Do you see a problem with that?
Done @meltsufin, I have removed |
ExecutorService
with writeJsonStream
bigQueryThreadPoolTaskScheduler
with writeJsonStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just final polish comments.
Map<String, Object> bqInitSettings = new HashMap<>(); | ||
bqInitSettings.put("DATASET_NAME", this.datasetName); | ||
bqInitSettings.put("JSON_WRITER_BATCH_SIZE", this.jsonWriterBatchSize); | ||
bqInitSettings.put("JSON_WRITER_THREAD_POOL_SIZE", this.threadPoolSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it was used with the previous ExecutorService. Removed it
@@ -81,6 +83,9 @@ public class BigQueryTemplate implements BigQueryOperations { | |||
private static final int DEFAULT_JSON_STREAM_WRITER_BATCH_SIZE = | |||
1000; // write records in batches of 1000 | |||
|
|||
private static final int DEFAULT_JSON_WRITER_THREAD_POOL_SIZE = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Show resolved
Hide resolved
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Outdated
Show resolved
Hide resolved
Kudos, SonarCloud Quality Gate passed! |
...cloud-gcp-bigquery/src/main/java/com/google/cloud/spring/bigquery/core/BigQueryTemplate.java
Show resolved
Hide resolved
asyncTaskScheduledFuture.cancel(true); | ||
} | ||
} else if (writeApiResponse != null && !writeApiResponse.isSuccessful()) { | ||
logger.warn("Write operation failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to log a message from the writeApiResponse
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be tricky to log it here as WriteApiResponse
returns a List<StorageError>
using writeApiResponse.getErrors()
(as it's a batch operation), users have access to writeApiResponse
, so they might need to process it.
This PR Wires
ThreadPoolTaskScheduler
withwriteJsonStream
.User can use
bigQueryThreadPoolTaskScheduler
in order to avoidjava.lang.OutOfMemoryError
which was arising due to too many concurrent write threads working.Fixes #1599