Skip to content

Commit

Permalink
Fix setReceiveTimeout to honor milliseconds
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed May 24, 2016
1 parent 3cc3816 commit 7642d71
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void setReceiveTimeout(Duration value)
*/
public final void setPrefetchCount(final int prefetchCount)
{
if (prefetchCount < PartitionReceiver.MINIMUM_PREFETCH_COUNT && prefetchCount > PartitionReceiver.MAXIMUM_PREFETCH_COUNT)
if (prefetchCount < PartitionReceiver.MINIMUM_PREFETCH_COUNT || prefetchCount > PartitionReceiver.MAXIMUM_PREFETCH_COUNT)
{
throw new IllegalArgumentException(String.format(Locale.US,
"PrefetchCount has to be between %s and %s", PartitionReceiver.MINIMUM_PREFETCH_COUNT, PartitionReceiver.MAXIMUM_PREFETCH_COUNT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
public class MessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider
{
private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);

private static final int MIN_TIMEOUT_DURATION_MILLIS = 20;

private final ConcurrentLinkedQueue<ReceiveWorkItem> pendingReceives;
private final MessagingFactory underlyingFactory;
private final ITimeoutErrorHandler stuckTransportHandler;
Expand Down Expand Up @@ -126,7 +127,7 @@ public void run()
boolean workItemTimedout = false;
while((topWorkItem = MessageReceiver.this.pendingReceives.peek()) != null)
{
if (topWorkItem.getTimeoutTracker().remaining().getSeconds() <= 0)
if (topWorkItem.getTimeoutTracker().remaining().toMillis() <= MessageReceiver.MIN_TIMEOUT_DURATION_MILLIS)
{
WorkItem<Collection<Message>> dequedWorkItem = MessageReceiver.this.pendingReceives.poll();
if (dequedWorkItem != null)
Expand Down Expand Up @@ -518,7 +519,7 @@ private void sendFlow(final int credits)
if (this.nextCreditToFlow >= this.prefetchCount)
{
tempFlow = this.nextCreditToFlow;
this.receiveLink.flow(this.prefetchCount);
this.receiveLink.flow(tempFlow);
this.nextCreditToFlow = 0;
}
}
Expand All @@ -528,7 +529,7 @@ private void sendFlow(final int credits)
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, String.format("receiverPath[%s], linkname[%s], updated-link-credit[%s], sentCredits[%s]",
this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount));
this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static void schedule(Runnable runnable, Duration runFrequency, TimerType
break;

case RepeatRun:
executor.scheduleWithFixedDelay(runnable, runFrequency.getSeconds(), runFrequency.toMillis(), TimeUnit.MILLISECONDS);
executor.scheduleWithFixedDelay(runnable, runFrequency.toMillis(), runFrequency.toMillis(), TimeUnit.MILLISECONDS);
break;

default:
Expand Down

0 comments on commit 7642d71

Please sign in to comment.