Skip to content

Commit

Permalink
Remover: mode is determined only when all messages have been removed …
Browse files Browse the repository at this point in the history
…and added to the batch
  • Loading branch information
belaban committed Sep 3, 2024
1 parent 65ccf71 commit fd5d45d
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 41 deletions.
10 changes: 4 additions & 6 deletions src/org/jgroups/protocols/ReliableMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH
protected static final Predicate<Message> remove_filter=m -> m != null
&& (m.isFlagSet(DONT_LOOPBACK) || m == DUMMY_OOB_MSG || m.isFlagSet(OOB_DELIVERED));

protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=MessageBatch::add;
protected static final BiConsumer<MessageBatch,Message> BATCH_ACCUMULATOR=(mb, m) -> mb.add(m, true, false);

protected final Function<Message,Long> SEQNO_GETTER= m -> {
NakAckHeader hdr=m != null? m.getHeader(id) : null;
Expand Down Expand Up @@ -832,18 +832,16 @@ protected void removeAndDeliver(Buffer<Message> win, Entry e, Address sender, bo
// Don't include DUMMY and OOB_DELIVERED messages in the removed set
mb=win.removeMany(remove_msgs, max_batch_size, no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs,
batch_creator, BATCH_ACCUMULATOR);
batch.determineMode();
}
catch(Throwable t) {
log.error("failed removing messages from table for " + sender, t);
}
int size=batch.size();
if(size > 0) {
if(stats) {
synchronized(avg_batch_size) {
avg_batch_size.add(size);
}
}
deliverBatch(batch, e);
if(stats)
avg_batch_size.add(size);
}
}
while(mb != null || adders.decrementAndGet() != 0);
Expand Down
7 changes: 2 additions & 5 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,9 @@ protected void deliverBatch(MessageBatch batch) {
sb.append(" (" + batch.size()).append(" messages)");
log.trace(sb);
}
if(stats) {
synchronized(avg_delivery_batch_size) { // can be accessed by different threads concurrently
avg_delivery_batch_size.add(batch.size());
}
}
up_prot.up(batch);
if(stats)
avg_delivery_batch_size.add(batch.size());
}
catch(Throwable t) {
log.warn(Util.getMessage("FailedToDeliverMsg"), local_addr, "batch", batch, t);
Expand Down
8 changes: 2 additions & 6 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
Expand Up @@ -905,13 +905,9 @@ protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loop
}
int size=batch.size();
if(size > 0) {
if(stats) {
synchronized(avg_batch_size) {
// accessed by multiple threads concurrently
avg_batch_size.add(size);
}
}
deliverBatch(batch);
if(stats)
avg_batch_size.add(size);
}
}
while(mb != null || adders.decrementAndGet() != 0);
Expand Down
25 changes: 18 additions & 7 deletions src/org/jgroups/util/Average.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static org.jgroups.util.Util.printTime;

Expand All @@ -21,9 +22,10 @@
public class Average implements Streamable {
protected final DoubleAdder total=new DoubleAdder();
protected AtomicReferenceArray<Double> samples;
protected final AtomicInteger index=new AtomicInteger(-1);
protected int index=-1;
protected final LongAdder count=new LongAdder();
protected TimeUnit unit;
protected final Lock lock=new ReentrantLock(); // for nextIndex()

public Average() {
this(128);
Expand Down Expand Up @@ -59,7 +61,7 @@ public <T extends Average> T merge(T other) {
public <T extends Average> T unit(TimeUnit u) {this.unit=u; return (T)this;}

public double average() {
int len=(count.sum() < samples.length()? index.get()+1 : samples.length());
int len=(count.sum() < samples.length()? index+1 : samples.length());
if(len == 0)
return 0.0;
return total.sum() / len;
Expand Down Expand Up @@ -100,12 +102,21 @@ public void readFrom(DataInput in) throws IOException {
}

protected int nextIndex() {
return index.accumulateAndGet(1, (l, __) -> {
if(l+1 >= samples.length()) {
lock.lock();
try {
if(++index >= samples.length())
index=0;
return index;
}
finally {
lock.unlock();
}
// using a lock is orders of magnitude faster than AtomicInteger.accumulateAndGet()!
/*return index.accumulateAndGet(1, (l, __) -> {
if(l+1 >= samples.length())
return 0;
}
else
return l+1;
});
});*/
}
}
32 changes: 18 additions & 14 deletions src/org/jgroups/util/MessageBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* @since 3.3
*/
public class MessageBatch implements Iterable<Message> {
public enum Mode {OOB, REG}

/** The destination address. Null if this is a multicast message batch, non-null if the batch is sent to a specific member */
protected Address dest;
Expand Down Expand Up @@ -114,7 +115,7 @@ public <T extends Message> T last() {
}

public MessageBatch add(final Message msg) {
add(msg, true);
add(msg, true, true);
return this;
}

Expand All @@ -124,8 +125,17 @@ public MessageBatch add(final Message msg) {
* @return always 1 if resize==true, else 1 if the message was added or 0 if not
*/
public int add(final Message msg, boolean resize) {
return add(msg, resize, true);
}

/** Adds a message to the table
* @param msg the message
* @param resize whether or not to resize the table. If true, the method will always return 1
* @return always 1 if resize==true, else 1 if the message was added or 0 if not
*/
public int add(final Message msg, boolean resize, boolean determine_mode) {
int added=messages.add(msg, resize);
if(added > 0)
if(added > 0 && determine_mode)
determineMode();
return added;
}
Expand Down Expand Up @@ -219,7 +229,12 @@ public boolean anyMatch(Predicate<Message> pred) {
return messages.anyMatch(pred);
}


public MessageBatch determineMode() {
if(mode != null || messages.isEmpty())
return this;
Message first=messages.get(0);
return mode(first.isFlagSet(Message.Flag.OOB)? Mode.OOB : Mode.REG);
}

/** Returns the number of non-null messages */
public int size() {
Expand All @@ -230,7 +245,6 @@ public boolean isEmpty() {
return messages.isEmpty();
}


/** Returns the size of the message batch (by calling {@link Message#size()} on all messages) */
public long totalSize() {
long retval=0;
Expand Down Expand Up @@ -303,14 +317,4 @@ public String printHeaders() {
}



public enum Mode {OOB, REG}

protected MessageBatch determineMode() {
if(mode != null || messages.isEmpty())
return this;
Message first=messages.get(0);
return mode(first.isFlagSet(Message.Flag.OOB)? Mode.OOB : Mode.REG);
}

}
4 changes: 1 addition & 3 deletions src/org/jgroups/util/Profiler.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ public void stop() {
Long start=threads.remove(curr_thread);
if(start != null) {
double time=System.nanoTime() - start;
synchronized(avg) {
avg.add(time);
}
avg.add(time);
}
}

Expand Down

0 comments on commit fd5d45d

Please sign in to comment.