Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cached receiver #303

Merged
merged 11 commits into from
Mar 30, 2018
Merged

Add cached receiver #303

merged 11 commits into from
Mar 30, 2018

Conversation

sabeegrewal
Copy link
Contributor

This PR adds the cached receiver and cleans up of the client APIs.

The CachedEventHubsReceiver borrows a connection from the ClientConnectionPool and opens a receiver link. This receiver link will stay open until:

  • The Spark job is ended
  • An executor is added or removed.

The average of the last five batch sizes is used as the prefetchCount.

Close #259

@sabeegrewal sabeegrewal changed the title Cached receiver Add cached receiver Mar 29, 2018
@sabeegrewal sabeegrewal added this to the 2.3.2 milestone Mar 30, 2018
Copy link
Contributor

@SreeramGarlapati SreeramGarlapati left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am exited to see cachedReceiver going live! You can run Spark jobs in full speed! Please fix these few changes.. I wanted to just click on approve - but, I also know @sjkwak - will merge if I do :).

val prefetchCounts = new FixedList[Int](5)

private var _client: EventHubClient = _
private def client: EventHubClient = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? There should only be one thread in this path, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad

val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
val receiverOptions = new ReceiverOptions
receiverOptions.setReceiverRuntimeMetricEnabled(false)
receiverOptions.setIdentifier(s"${SparkEnv.get.executorId}-${TaskContext.get.taskAttemptId}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if executorid is like some guid and doesn't indicate - that it is a spark job - consider adding a prefix - hint like - "spark-.." - to indicate that this receiver is coming from spark job.

_receiver.setPrefetchCount(DefaultPrefetchCount)
}

private[this] var _receiver: PartitionReceiver = _
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so - CachedEventHubsReceiver cannot be used - unless createReceiver is invoked.
can we use factory pattern here instead ?

nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): EventData = {
receivers.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need something like this - to avoid your getters on the map - conflict with setters - performing a rehash etc.

receivers.sync {
val receiver: PartitionReceiver // <- I used a val here which could be wrong. intent here is to mark it readonly - like a `final` in java - and single assignment
if (!isInit(nAndP)) {
receivers.update(nAndP, CreateEHReceiver())
}
receiver = get(nAndP) // **<- inside sync block**

} receiver.receive(...)

event
}

private def logAndUpdatePrefetch(batchSize: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets updatePrefetch to latest BatchSize to keep the code simple and behavior predictable.


type MutableMap[A, B] = scala.collection.mutable.HashMap[A, B]

private[this] val receivers = new MutableMap[NameAndPartition, CachedEventHubsReceiver]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Key should include namespace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in #305

}

private def get(nAndP: NameAndPartition): CachedEventHubsReceiver = receivers.synchronized {
logInfo(s"lookup on $nAndP")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this logging for every receive call ?

nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): EventData = {
receivers.synchronized {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: feel free to ignore..
one small Idea after our discussion:

val receiver = createIfNotExist(...) // <- this captures all of the isInitialized receiver.update, get into 1 synchronized block ...
receiver.receive()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

override def receive(ehConf: EventHubsConf,
                       nAndP: NameAndPartition,
                       requestSeqNo: SequenceNumber,
                       batchSize: Int): EventData = {

    val receiver = createCachedReceiverIfNotExist(ehConf, nAndP, requestSeqNo)
    receiver.receive(requestSeqNo, batchSize)
  }

  def createCachedReceiverIfNotExist(ehConf: EventHubsConf,
                                     nAndP: NameAndPartition,
                                     requestSeqNo: SequenceNumber) : CachedEventHubsReceiver = {
    var receiver: CachedEventHubsReceiver = null
    receivers.synchronized {
      receiver = receivers.getOrElseUpdate(nAndP, new CachedEventHubsReceiver(ehConf, nAndP)) // take care to initialize this lazily
    }

    receiver
  }

@sabeegrewal sabeegrewal merged commit 79cbf99 into Azure:master Mar 30, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants