-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add cached receiver #303
Add cached receiver #303
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am exited to see cachedReceiver going live! You can run Spark jobs in full speed! Please fix these few changes.. I wanted to just click on approve - but, I also know @sjkwak - will merge if I do :).
val prefetchCounts = new FixedList[Int](5) | ||
|
||
private var _client: EventHubClient = _ | ||
private def client: EventHubClient = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronized
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? There should only be one thread in this path, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup) | ||
val receiverOptions = new ReceiverOptions | ||
receiverOptions.setReceiverRuntimeMetricEnabled(false) | ||
receiverOptions.setIdentifier(s"${SparkEnv.get.executorId}-${TaskContext.get.taskAttemptId}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if executorid is like some guid and doesn't indicate - that it is a spark job - consider adding a prefix - hint like - "spark-.." - to indicate that this receiver is coming from spark job.
_receiver.setPrefetchCount(DefaultPrefetchCount) | ||
} | ||
|
||
private[this] var _receiver: PartitionReceiver = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so - CachedEventHubsReceiver
cannot be used - unless createReceiver
is invoked.
can we use factory pattern here instead ?
nAndP: NameAndPartition, | ||
requestSeqNo: SequenceNumber, | ||
batchSize: Int): EventData = { | ||
receivers.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need something like this - to avoid your getters on the map - conflict with setters - performing a rehash etc.
receivers.sync {
val receiver: PartitionReceiver // <- I used a val here which could be wrong. intent here is to mark it readonly - like a `final` in java - and single assignment
if (!isInit(nAndP)) {
receivers.update(nAndP, CreateEHReceiver())
}
receiver = get(nAndP) // **<- inside sync block**
} receiver.receive(...)
event | ||
} | ||
|
||
private def logAndUpdatePrefetch(batchSize: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets updatePrefetch to latest BatchSize to keep the code simple and behavior predictable.
|
||
type MutableMap[A, B] = scala.collection.mutable.HashMap[A, B] | ||
|
||
private[this] val receivers = new MutableMap[NameAndPartition, CachedEventHubsReceiver]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key should include namespace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address in #305
} | ||
|
||
private def get(nAndP: NameAndPartition): CachedEventHubsReceiver = receivers.synchronized { | ||
logInfo(s"lookup on $nAndP") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this logging for every receive call ?
nAndP: NameAndPartition, | ||
requestSeqNo: SequenceNumber, | ||
batchSize: Int): EventData = { | ||
receivers.synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: feel free to ignore..
one small Idea after our discussion:
val receiver = createIfNotExist(...) // <- this captures all of the isInitialized receiver.update, get into 1 synchronized block ...
receiver.receive()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override def receive(ehConf: EventHubsConf,
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): EventData = {
val receiver = createCachedReceiverIfNotExist(ehConf, nAndP, requestSeqNo)
receiver.receive(requestSeqNo, batchSize)
}
def createCachedReceiverIfNotExist(ehConf: EventHubsConf,
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber) : CachedEventHubsReceiver = {
var receiver: CachedEventHubsReceiver = null
receivers.synchronized {
receiver = receivers.getOrElseUpdate(nAndP, new CachedEventHubsReceiver(ehConf, nAndP)) // take care to initialize this lazily
}
receiver
}
This PR adds the cached receiver and cleans up of the client APIs.
The
CachedEventHubsReceiver
borrows a connection from theClientConnectionPool
and opens a receiver link. This receiver link will stay open until:The average of the last five batch sizes is used as the prefetchCount.
Close #259