Skip to content

Commit

Permalink
fix ReactorKafkaSender map ulimit size
Browse files Browse the repository at this point in the history
  • Loading branch information
wooEnrico committed May 9, 2024
1 parent e11e0cf commit 0da638e
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 24 deletions.
8 changes: 8 additions & 0 deletions kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@
<version>1.3.19</version>
<scope>compile</scope>
</dependency>
<!-- https://github.com/ben-manes/caffeine-->
<!-- For Java 11 or above, use 3.x otherwise use 2.x-->
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package io.github.wooernico.kafka.sender;

import com.github.benmanes.caffeine.cache.*;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -12,19 +15,19 @@
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

public abstract class ReactorKafkaSender<K, V, T> implements Disposable {

private static final Logger log = LoggerFactory.getLogger(DefaultKafkaProducer.class);
private static final Logger log = LoggerFactory.getLogger(ReactorKafkaSender.class);

protected final SenderProperties properties;
protected final Serializer<K> keySerializer;
protected final Serializer<V> valueSerializer;

private final ConcurrentHashMap<Thread, SinksSendToKafkaSubscriber<K, V, T>> subscribeMap = new ConcurrentHashMap<>(512);
private final LoadingCache<Thread, SinksSendToKafkaSubscriber<K, V, T>> cache;
private final reactor.kafka.sender.KafkaSender<K, V> kafkaSender;
private final Consumer<SenderResult<T>> senderResultConsumer;

Expand All @@ -39,12 +42,41 @@ public ReactorKafkaSender(SenderProperties properties, Serializer<K> keySerializ
this.valueSerializer = valueSerializer;

this.kafkaSender = this.createKafkaSender(this.properties, this.keySerializer, this.valueSerializer);

this.cache = this.getLoadingCache();
}

private LoadingCache<Thread, SinksSendToKafkaSubscriber<K, V, T>> getLoadingCache() {
RemovalListener<Thread, SinksSendToKafkaSubscriber<K, V, T>> removalListener = new RemovalListener<Thread, SinksSendToKafkaSubscriber<K, V, T>>() {
@Override
public void onRemoval(@Nullable Thread thread, @Nullable SinksSendToKafkaSubscriber<K, V, T> kvtSinksSendToKafkaSubscriber, @NonNull RemovalCause removalCause) {
log.debug("reactor kafka sinks remove for {}, {}, {}", thread, kvtSinksSendToKafkaSubscriber, removalCause);
if (kvtSinksSendToKafkaSubscriber != null) {
kvtSinksSendToKafkaSubscriber.dispose();
}
}
};

Caffeine<Thread, SinksSendToKafkaSubscriber<K, V, T>> caffeine = Caffeine.newBuilder()
.expireAfterAccess(this.properties.getSinksEmitTimeout().toNanos(), TimeUnit.NANOSECONDS)
.maximumSize(this.properties.getSinksCacheSize())
.removalListener(removalListener);

CacheLoader<Thread, SinksSendToKafkaSubscriber<K, V, T>> cacheLoader = new CacheLoader<Thread, SinksSendToKafkaSubscriber<K, V, T>>() {
@Override
public SinksSendToKafkaSubscriber<K, V, T> load(@NonNull Thread thread) {
return newSinksSendToKafkaSubscriber(thread);
}
};

return caffeine.build(cacheLoader);
}

@Override
public void dispose() {
this.subscribeMap.forEach((key, value) -> value.dispose());
this.cache.asMap().forEach((key, value) -> value.dispose());
this.cache.invalidateAll();
this.kafkaSender.close();
}

/**
Expand All @@ -64,7 +96,7 @@ public Mono<Void> send(ProducerRecord<K, V> producerRecord, T object) {
return Mono.empty();
}

log.warn("reactor kafka sinks emit fail for {}", emitResult);
log.debug("reactor kafka sinks emit fail for {}", emitResult);

return Mono.defer(() -> {
this.send(senderRecord).subscribe(senderResult -> {
Expand Down Expand Up @@ -98,8 +130,10 @@ public Flux<SenderResult<T>> send(Publisher<SenderRecord<K, V, T>> senderRecord)


private Sinks.EmitResult emitToSinks(SenderRecord<K, V, T> senderRecord) {

SinksSendToKafkaSubscriber<K, V, T> subscriber = getKvtSinksSendToKafkaSubscriber();
SinksSendToKafkaSubscriber<K, V, T> subscriber = this.cache.get(Thread.currentThread());
if (subscriber == null) {
return Sinks.EmitResult.FAIL_TERMINATED;
}

Sinks.EmitResult emitResult = subscriber.tryEmitNext(senderRecord);

Expand All @@ -110,22 +144,22 @@ private Sinks.EmitResult emitToSinks(SenderRecord<K, V, T> senderRecord) {
if (Sinks.EmitResult.FAIL_ZERO_SUBSCRIBER.equals(emitResult)
|| Sinks.EmitResult.FAIL_TERMINATED.equals(emitResult)
|| Sinks.EmitResult.FAIL_CANCELLED.equals(emitResult)) {
SinksSendToKafkaSubscriber<K, V, T> remove = this.subscribeMap.remove(Thread.currentThread());
if (remove != null) {
remove.dispose();
}
this.cache.invalidate(Thread.currentThread());
subscriber.dispose();
}

return emitResult;
}

private SinksSendToKafkaSubscriber<K, V, T> getKvtSinksSendToKafkaSubscriber() {
return this.subscribeMap.computeIfAbsent(Thread.currentThread(), thread -> {
LinkedBlockingQueue<SenderRecord<K, V, T>> queue = new LinkedBlockingQueue<>(properties.getQueueSize());
Sinks.Many<SenderRecord<K, V, T>> senderRecordMany = Sinks.many().unicast().onBackpressureBuffer(queue);
log.info("reactor kafka new sinks for {}, {}", Thread.currentThread().getName(), senderRecordMany.hashCode());
return new SinksSendToKafkaSubscriber<>(this.kafkaSender, senderRecordMany, this.senderResultConsumer);
});
private SinksSendToKafkaSubscriber<K, V, T> newSinksSendToKafkaSubscriber(Thread thread) {
if (thread == null) {
return null;
}
LinkedBlockingQueue<SenderRecord<K, V, T>> queue = new LinkedBlockingQueue<>(properties.getQueueSize());
Sinks.Many<SenderRecord<K, V, T>> senderRecordMany = Sinks.many().unicast().onBackpressureBuffer(queue);
SinksSendToKafkaSubscriber<K, V, T> kvtSinksSendToKafkaSubscriber = new SinksSendToKafkaSubscriber<>(this.kafkaSender, senderRecordMany, this.senderResultConsumer);
log.debug("reactor kafka new sinks for {}, {}", thread, kvtSinksSendToKafkaSubscriber);
return kvtSinksSendToKafkaSubscriber;
}

private reactor.kafka.sender.KafkaSender<K, V> createKafkaSender(SenderProperties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
Expand All @@ -139,30 +173,36 @@ private reactor.kafka.sender.KafkaSender<K, V> createKafkaSender(SenderPropertie
}

static class SinksSendToKafkaSubscriber<K, V, T> implements Disposable {
private final reactor.kafka.sender.KafkaSender<K, V> kafkaSender;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Sinks.Many<SenderRecord<K, V, T>> sinks;
private final Consumer<SenderResult<T>> senderResultConsumer;
private final Disposable subscriber;

public SinksSendToKafkaSubscriber(KafkaSender<K, V> kafkaSender, Sinks.Many<SenderRecord<K, V, T>> sinks, Consumer<SenderResult<T>> senderResultConsumer) {
this.kafkaSender = kafkaSender;
this.sinks = sinks;
this.senderResultConsumer = senderResultConsumer;

this.subscriber = this.kafkaSender.send(this.sinks.asFlux()).subscribe(senderResult -> {
this.subscriber = kafkaSender.send(this.sinks.asFlux()).subscribe(senderResult -> {
if (this.senderResultConsumer != null) {
this.senderResultConsumer.accept(senderResult);
}
});
}

public Sinks.EmitResult tryEmitNext(SenderRecord<K, V, T> senderRecord) {
if (this.closed.get()) {
return Sinks.EmitResult.FAIL_TERMINATED;
}
return this.sinks.tryEmitNext(senderRecord);
}

@Override
public void dispose() {
if (this.subscriber != null && !this.subscriber.isDisposed()) {
if (!this.closed.compareAndSet(false, true)) {
return;
}

Sinks.EmitResult emitResult = this.sinks.tryEmitComplete();
if (emitResult.isFailure() && this.subscriber != null && !this.subscriber.isDisposed()) {
this.subscriber.dispose();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ public class SenderProperties {

private int queueSize = 100;

/**
* where there is no data emit for timeout, then remove the sinks
*/
private Duration sinksEmitTimeout = Duration.ofSeconds(60);

/**
* sinks cache size
*/
private long sinksCacheSize = 100L;

/**
* sinks cache clear up
*/
private Duration sinksCacheClearUp = Duration.ofHours(1);

public Boolean getEnabled() {
return enabled;
}
Expand Down Expand Up @@ -59,6 +74,34 @@ public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

public Properties getDefaultProperties() {
return defaultProperties;
}

public Duration getSinksEmitTimeout() {
return sinksEmitTimeout;
}

public void setSinksEmitTimeout(Duration sinksEmitTimeout) {
this.sinksEmitTimeout = sinksEmitTimeout;
}

public long getSinksCacheSize() {
return sinksCacheSize;
}

public void setSinksCacheSize(long sinksCacheSize) {
this.sinksCacheSize = sinksCacheSize;
}

public Duration getSinksCacheClearUp() {
return sinksCacheClearUp;
}

public void setSinksCacheClearUp(Duration sinksCacheClearUp) {
this.sinksCacheClearUp = sinksCacheClearUp;
}

@Override
public String toString() {
return "SenderProperties{" +
Expand Down
1 change: 1 addition & 0 deletions kafka/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@
</root>

<logger name="com.springboot.thread.AutoRunTemplate" level="ERROR"/>
<logger name="io.github.wooernico.kafka.sender.ReactorKafkaSender" level="DEBUG"/>
</configuration>

0 comments on commit 0da638e

Please sign in to comment.