Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12757] Add block-level read/write locks to BlockManager #10705

Closed
wants to merge 93 commits into from

Conversation

JoshRosen
Copy link
Contributor

Motivation

As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.

Changes

BlockInfoManager and reader/writer locks

This patch adds block-level read/write locks to the BlockManager. It introduces a new BlockInfoManager component, which is contained within the BlockManager, holds the BlockInfo objects that the BlockManager uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.

BlockManager's get*() and put*() methods now implicitly acquire the necessary locks. After a get() call successfully retrieves a block, that block is locked in a shared read mode. A put() call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This put() locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify CacheManager in the future (see #10748).

See BlockInfoManagerSuite's test cases for a more detailed specification of the locking semantics.

Auto-release of locks at the end of tasks

Our locking APIs support explicit release of locks (by calling unlock()), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit close() operator to signal that no more records will be consumed, operations like take() or limit() don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.

To address this, BlockInfoManager uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from TaskContext. When a task finishes, code in Executor calls BlockInfoManager.unlockAllLocksForTask(taskAttemptId) to free locks.

Locking and the MemoryStore

In order to prevent in-memory blocks from being evicted while they are being read, the MemoryStore's evictBlocksToFreeSpace() method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.

Locking and remote block transfer

This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.

FAQ

  • Why not use Java's built-in ReadWriteLock?

    Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using ReadWriteLock would mean that we might call unlock() from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use StampedLock to work around this issue.

  • Why not detect "leaked" locks in tests?:

    See above notes about take() and limit.

@SparkQA
Copy link

SparkQA commented Jan 11, 2016

Test build #49167 has finished for PR 10705 at commit 7265784.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49411 has finished for PR 10705 at commit 7cad770.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -43,6 +43,7 @@ import org.apache.spark.rpc.RpcEnv
import org.apache.spark.serializer.{Serializer, SerializerInstance}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util._
import org.apache.spark.util.collection.ReferenceCounter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment in this class that explains the ref counting mechanism? It can be a shorter version of the commit message.
Specifically:
What are the invariants? (explain get()) Need to call release. What does it mean if it is 0?

I slightly prefer pin count over ref count (the block manager has a reference but it is unpinned)

/**
* Thread-safe collection for maintaining both global and per-task reference counts for objects.
*/
private[spark] class ReferenceCounter[T] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason you did it this way instead of a counter per object? Not sure how many blocks we have but this seems contention prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to maintain global counts per each object as well as counts for each task (in order to automatically decrement the global counts when tasks finish) (I'm working on adding the releaseAllReferencesForTask() call to the task completion cleanup code).

If I stored the global count per block inside of the BlockInfo class, then I'd still need a mechanism to count the references per task. If the counts for each task were stored in BlockInfo then I'd have to loop over the BlockInfo list on task completion in order to clear those counts, or would have to maintain the counts separately. As a result, it made sense to me to keep both types of counts in close proximity like this.

@SparkQA
Copy link

SparkQA commented Jan 14, 2016

Test build #49422 has finished for PR 10705 at commit c1a8d85.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49419 has finished for PR 10705 at commit 8ae88b0.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49427 has finished for PR 10705 at commit 575a47b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 15, 2016

Test build #49477 has finished for PR 10705 at commit 0ba8318.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 20, 2016

Test build #49773 has finished for PR 10705 at commit 90cf403.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jan 21, 2016

Test build #49861 has finished for PR 10705 at commit 12ed084.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nongli
Copy link
Contributor

nongli commented Feb 24, 2016

LGTM Good work!

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51821 has finished for PR 10705 at commit b9d6e18.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51841 has finished for PR 10705 at commit b963178.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor Author

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Feb 24, 2016

Test build #51883 has finished for PR 10705 at commit 9becde3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

1 similar comment
@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51910 has finished for PR 10705 at commit 9becde3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor Author

Jenkins retest this please

@JoshRosen
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51985 has finished for PR 10705 at commit 9becde3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
memoryStore.getValues(blockId).map { iter =>
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
new BlockResult(ci, DataReadMethod.Memory, info.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now there's still a chance that the programmer forgets to wrap the iter. I would actually push the CompletionIterator logic one step further into BlockResult itself, e.g.

private[spark] class BlockResult(
    iter: Iterator[Any],
    blockId: BlockId,
    val readMethod: DataReadMethod.Value,
    val bytes: Long) {

  /**
   * Values of this block, to be consumed at most once.
   *
   * If this block was read locally, then we must have acquired a read lock on this block.
   * If so, release the lock once this iterator is drained. In cases where we don't consume
   * the entire iterator (e.g. take or limit), we rely on the executor releasing all locks
   * held by this task attempt at the end of the task.
   *
   * Otherwise, if this block was read remotely from other executors, there is no need to
   * do this because we didn't acquire any locks on the block.
   */
  val data: Iterator[Any] = {
    if (readMethod != DataReadMethod.Network) {
      CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
    } else {
      iter
    }
  }
}

I did a search and could not find another place where we would not want to release the lock other than getRemoteBlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you push it further then BlockResult needs to hold a reference to the BlockManager.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do this in a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we pass in an optional completion callback instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to handle the DataReadMethod == Network case somewhere since there's no lock to release in that case, so having an optional callback in the constructor seems like it faces the same problem of someone forgetting to add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The difference is that now the programmer needs to explicitly completionCallback = None. If the completionCallback is specified then you don't need to do the network check. It's better in that today you have zero reminder that you need to release the lock by the end of the task.

Actually an even better way IMO is to have a LocalBlockResult and a RemoteBlockResult so there's no way the programmer can forget to release the lock.

By the way, I'm not quite done reviewing yet but feel free to address these in a follow-up patch.

@SparkQA
Copy link

SparkQA commented Feb 25, 2016

Test build #51987 has finished for PR 10705 at commit 9becde3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// A block is either locked for reading or for writing, but not for both at the same time:
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
// If a block is removed then it is not locked:
assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: clearer

if (_removed) {
  assert(_readerCount == 0 ...)
}

@andrewor14
Copy link
Contributor

LGTM. There are still a few remaining issues about maintainability but they can be addressed in a follow-up patch.

@andrewor14
Copy link
Contributor

Merged into master.

@asfgit asfgit closed this in 633d63a Feb 26, 2016
synchronized {
get(blockId).foreach { info =>
info.readerCount -= lockCount
assert(info.readerCount >= 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should an exception be thrown here instead ?
In production, assertion may not be enabled.

asfgit pushed a commit that referenced this pull request Mar 2, 2016
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes #10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11436 from JoshRosen/remove-cachemanager.
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in apache#10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes apache#10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#11436 from JoshRosen/remove-cachemanager.
@JoshRosen JoshRosen deleted the pin-pages branch August 29, 2016 19:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants