Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor-health sample in chapter 13 blocks the I/O thread #230

Closed
rafaeltuelho opened this issue Feb 22, 2023 · 0 comments
Closed

processor-health sample in chapter 13 blocks the I/O thread #230

rafaeltuelho opened this issue Feb 22, 2023 · 0 comments

Comments

@rafaeltuelho
Copy link

rafaeltuelho commented Feb 22, 2023

At

the I/O therad gets blocked leading to

2023-02-21 21:31:32,456 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-2,5,main] has been blocked for 345343 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
        at java.base@11.0.18/jdk.internal.misc.Unsafe.park(Native Method)
        at java.base@11.0.18/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
        at java.base@11.0.18/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1796)
        at java.base@11.0.18/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3128)
        at java.base@11.0.18/java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1823)
        at java.base@11.0.18/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2043)
        at io.smallrye.context.CompletableFutureWrapper.join(CompletableFutureWrapper.java:166)
        at org.acme.Processor.process(Processor.java:30)
...

I had to refactor it to something like

  @Incoming("ticks")
  @Outgoing("processed")
  @Acknowledgment(Acknowledgment.Strategy.MANUAL)
  CompletionStage<Message<String>> process(Message<Long> message) throws Exception {
    if (count++ % 8 == 0) {
      message.nack(new Throwable("Random failure to process a record.")).toCompletableFuture().join();
      return null;
    }

    return message.ack().thenApply(s -> {
      String value = String.valueOf(message.getPayload());
      try {
        value += " consumed in pod (" + InetAddress.getLocalHost().getHostName() + ")";
      } catch (UnknownHostException e) {
        throw new RuntimeException(e);
      }

      return message.withPayload(value);
    });
  }

to make it non-blocking and get the intended behavior described in the book (page 255).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant