Skip to content

Commit

Permalink
fix StreamUtil to execute the map function for each batch in the thre…
Browse files Browse the repository at this point in the history
…ad-pool (#548)
  • Loading branch information
xiaoyu-yang-gh authored Feb 14, 2024
1 parent 5249a34 commit 51ff79b
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

package com.linkedin.avroutil1.builder.util;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -90,14 +91,16 @@ private StreamUtil() {
.mapToObj(batch -> {
int startIndex = batch * batchSize;
int endIndex = (batch == batchCount) ? list.size() : (batch + 1) * batchSize;
return list.subList(startIndex, endIndex).stream();
return list.subList(startIndex, endIndex);
})
.map(batch -> CompletableFuture.supplyAsync(() -> batch.map(mapper), limitingExecutor))
.flatMap(CompletableFuture::join);
.map(batch -> CompletableFuture.supplyAsync(() -> batch.stream().map(mapper).collect(Collectors.toList()),
limitingExecutor))
.map(CompletableFuture::join)
.flatMap(Collection::stream);
});
}

private static class LimitingExecutor implements Executor {
private final static class LimitingExecutor implements Executor {

private final Semaphore _limiter;

Expand Down

0 comments on commit 51ff79b

Please sign in to comment.