Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix future timeout issue #419

Merged
merged 2 commits into from
Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.{ SparkEnv, TaskContext }
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{ Await, Future }
import scala.util.Success

private[spark] trait CachedReceiver {
private[eventhubs] def receive(ehConf: EventHubsConf,
Expand Down Expand Up @@ -63,12 +62,11 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
private var receiver: Future[PartitionReceiver] = createReceiver(startSeqNo)

private def createReceiver(seqNo: SequenceNumber): Future[PartitionReceiver] = {
logInfo(s"creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}")
logInfo(
s"creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. seqNo: $seqNo")
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
val prefetchCount =
Math.min(ehConf.prefetchCount.getOrElse(DefaultPrefetchCount), PrefetchCountMaximum)
val receiverOptions = new ReceiverOptions
receiverOptions.setReceiverRuntimeMetricEnabled(false)
receiverOptions.setReceiverRuntimeMetricEnabled(true)
receiverOptions.setIdentifier(
s"spark-${SparkEnv.get.executorId}-${TaskContext.get.taskAttemptId}")
val epochReceiver = retryJava(
Expand All @@ -79,13 +77,20 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
receiverOptions),
"CachedReceiver creation."
)
epochReceiver.onComplete {
case Success(x) => x.setPrefetchCount(prefetchCount)
case _ =>
}
epochReceiver
}

private def lastReceivedOffset(): Future[Long] = {
receiver
.flatMap { r =>
if (r.getEventPosition.getSequenceNumber != null) {
Future.successful(r.getEventPosition.getSequenceNumber)
} else {
Future.successful(-1)
}
}
}

private def receiveOne(msg: String): Future[Iterable[EventData]] = {
receiver
.flatMap { r =>
Expand All @@ -97,6 +102,15 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
}

private def checkCursor(requestSeqNo: SequenceNumber): Future[Iterable[EventData]] = {
val lastReceivedSeqNo =
Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout)

if (lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) {
logInfo(
s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo")
receiver = createReceiver(requestSeqNo)
}

val event = Await.result(receiveOne("checkCursor initial"), ehConf.internalOperationTimeout)
val receivedSeqNo = event.head.getSystemProperties.getSequenceNumber

Expand All @@ -106,6 +120,8 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
// cursor is in the wrong spot.
// 2) Your desired event has expired from the service.
// First, we'll check for case (1).
logInfo(
s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, receivedSeqNo: $lastReceivedSeqNo")
receiver = createReceiver(requestSeqNo)
val movedEvent = Await.result(receiveOne("checkCursor move"), ehConf.internalOperationTimeout)
val movedSeqNo = movedEvent.head.getSystemProperties.getSequenceNumber
Expand Down Expand Up @@ -146,13 +162,6 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
e1.getSystemProperties.getSequenceNumber < e2.getSystemProperties.getSequenceNumber)
.iterator

val newPrefetchCount =
if (batchSize < PrefetchCountMinimum) PrefetchCountMinimum
else Math.min(batchSize * 2, PrefetchCountMaximum)
receiver.onComplete {
case Success(r) => r.setPrefetchCount(newPrefetchCount)
case _ =>
}
val (result, validate) = sorted.duplicate
assert(validate.size == batchSize)
result
Expand All @@ -178,7 +187,8 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): Iterator[EventData] = {
logInfo(s"EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup}")
logInfo(
s"EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, batchSize: $batchSize")
var receiver: CachedEventHubsReceiver = null
receivers.synchronized {
receiver = receivers.getOrElseUpdate(key(ehConf, nAndP), {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.spark.eventhubs.client

import java.util.concurrent.{ ConcurrentLinkedQueue, ExecutorService, Executors }
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ ConcurrentLinkedQueue, Executors, ScheduledExecutorService }

import com.microsoft.azure.eventhubs.EventHubClient
import org.apache.spark.eventhubs._
Expand Down Expand Up @@ -147,5 +147,5 @@ object ClientConnectionPool extends Logging {
* Threads are created on demand if none are available.
*/
object ClientThreadPool {
val pool: ExecutorService = Executors.newCachedThreadPool
val pool: ScheduledExecutorService = Executors.newScheduledThreadPool(DefaultThreadPoolSize)
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
yield
getRunTimeInfoF(i) map { r =>
val earliest =
if (r.getBeginSequenceNumber == -1L) 0L else r.getBeginSequenceNumber
if (r.getBeginSequenceNumber == -1L) 0L
else {
if (r.getIsEmpty) r.getLastEnqueuedSequenceNumber + 1 else r.getBeginSequenceNumber
}
val latest = r.getLastEnqueuedSequenceNumber + 1
i -> (earliest, latest)
}
Expand All @@ -134,7 +137,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
*/
private def earliestSeqNoF(partition: PartitionId): Future[SequenceNumber] = {
getRunTimeInfoF(partition).map { r =>
val seqNo = r.getBeginSequenceNumber
val seqNo =
if (r.getIsEmpty) r.getLastEnqueuedSequenceNumber + 1 else r.getBeginSequenceNumber
if (seqNo == -1L) 0L else seqNo
}
}
Expand Down Expand Up @@ -238,20 +242,31 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
case StartOfStream => (nAndP.partitionId, earliestSeqNoF(nAndP.partitionId))
case EndOfStream => (nAndP.partitionId, latestSeqNoF(nAndP.partitionId))
case _ =>
val receiver = retryJava(client.createEpochReceiver(consumerGroup,
nAndP.partitionId.toString,
pos.convert,
DefaultEpoch),
"translate: epoch receiver creation.")
val seqNo = receiver
.flatMap { r =>
retryNotNull(r.receive(1), "translate: receive call")
}
.map { e =>
e.iterator.next.getSystemProperties.getSequenceNumber
val runtimeInfo =
Await.result(getRunTimeInfoF(nAndP.partitionId), ehConf.internalOperationTimeout)
val seqNo =
if (runtimeInfo.getIsEmpty || (pos.enqueuedTime != null &&
runtimeInfo.getLastEnqueuedTimeUtc.isBefore(pos.enqueuedTime.toInstant))) {
Future.successful(runtimeInfo.getLastEnqueuedSequenceNumber + 1)
} else {
logInfo(
s"translate: creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. filter: ${pos.convert}")
val receiver = retryJava(client.createEpochReceiver(consumerGroup,
nAndP.partitionId.toString,
pos.convert,
DefaultEpoch),
"translate: epoch receiver creation.")
receiver
.flatMap { r =>
retryNotNull(r.receive(1), "translate: receive call")
}
.map { e =>
e.iterator.next.getSystemProperties.getSequenceNumber
}
}
(nAndP.partitionId, seqNo)
}

val future = Future
.traverse(futures) {
case (p, f) =>
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/eventhubs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ package object eventhubs {
val DefaultFailOnDataLoss = "true"
val DefaultUseSimulatedClient = "false"
val StartingSequenceNumber = 0L
val DefaultThreadPoolSize = 16
val DefaultEpoch = 0L
val RetryCount = 3

Expand Down Expand Up @@ -79,4 +80,5 @@ package object eventhubs {

def toSequenceNumber: SequenceNumber = str.toLong
}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>1.2.1</version>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down