Skip to content

Commit

Permalink
Add Cache.put method to refresh cached value without removing it first
Browse files Browse the repository at this point in the history
  • Loading branch information
ashishkujoy authored and franktominc committed Jun 12, 2019
1 parent 01ddf8f commit 87d78de
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ ProblemFilters.exclude[MissingClassProblem]("akka.http.caching.scaladsl.LfuCache
ProblemFilters.exclude[MissingClassProblem]("akka.http.caching.scaladsl.CachingSettingsImpl$")
ProblemFilters.exclude[MissingClassProblem]("akka.http.caching.scaladsl.LfuCacheSettingsImpl")
ProblemFilters.exclude[MissingClassProblem]("akka.http.caching.scaladsl.CachingSettingsImpl")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.caching.scaladsl.Cache.put")

# @InternalApi
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.caching.LfuCache.store")
Expand Down
22 changes: 20 additions & 2 deletions akka-http-caching/src/main/scala/akka/http/caching/LfuCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.annotation.{ ApiMayChange, InternalApi }

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future }
import com.github.benmanes.caffeine.cache.{ AsyncCache, Caffeine }
import akka.http.caching.LfuCache.toJavaMappingFunction
import akka.http.caching.scaladsl.Cache
Expand Down Expand Up @@ -105,7 +105,25 @@ private[caching] class LfuCache[K, V](val store: AsyncCache[K, V]) extends Cache

def apply(key: K, genValue: () => Future[V]): Future[V] = store.get(key, toJavaMappingFunction[K, V](genValue)).toScala

def getOrLoad(key: K, loadValue: K => Future[V]) = store.get(key, toJavaMappingFunction[K, V](loadValue)).toScala
/**
* Multiple call to put method for the same key may result in a race condition,
* the value yield by the last successful future for that key will replace any previously cached value.
*/
def put(key: K, mayBeValue: Future[V])(implicit ex: ExecutionContext): Future[V] = {
val previouslyCacheValue = Option(store.getIfPresent(key))

previouslyCacheValue match {
case None =>
store.put(key, toJava(mayBeValue).toCompletableFuture)
mayBeValue
case _ => mayBeValue.map { value =>
store.put(key, toJava(Future.successful(value)).toCompletableFuture)
value
}
}
}

def getOrLoad(key: K, loadValue: K => Future[V]): Future[V] = store.get(key, toJavaMappingFunction[K, V](loadValue)).toScala

def remove(key: K): Unit = store.synchronous().invalidate(key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.japi.{ Creator, Procedure }
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.compat.java8.FutureConverters.{ toJava => futureToJava, toScala => futureToScala }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }

/**
* API MAY CHANGE
Expand Down Expand Up @@ -58,6 +58,13 @@ abstract class Cache[K, V] extends akka.http.caching.javadsl.Cache[K, V] {
override def getOptional(key: K): Optional[CompletionStage[V]] =
Optional.ofNullable(get(key).map(f => futureToJava(f)).orNull)

/**
* Cache the given future if not cached previously.
* Or replace the old cached value on successful completion of given future.
* In case the given future fails, the previously cached value for that key (if any) will remain unchanged.
*/
def put(key: K, mayBeValue: Future[V])(implicit ex: ExecutionContext): Future[V]

/**
* Removes the cache item for the given key.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }

class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem()
implicit val system: ActorSystem = ActorSystem()
import system.dispatcher

"An LfuCache" should {
Expand Down Expand Up @@ -59,6 +59,50 @@ class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll
Await.result(future2, 3.seconds) should be("A")
cache.size should be(1)
}
"put given uncached future value" in {
val cache = lfuCache[String]()
val futureValue = "A"
val done = cache.put(1, Future.successful(futureValue))

Await.result(done, 3.seconds)
Await.result(cache.get(1).get, 3.seconds) should be(futureValue)

}
"replace existing cache when evaluation of new value completes" in {
val cache = lfuCache[String]()
val latch = new CountDownLatch(1)
val future1 = cache.get(1, () => "A")

cache.get(1) should be(Some(future1))
Await.result(future1, 3.seconds)
val future2 = Future {
latch.await()
"B"
}

val putFuture = cache.put(1, future2)
cache.get(1) should be(Some(future1))
latch.countDown()

Await.result(putFuture, 3.seconds)
Await.result(cache.get(1).get, 3.seconds) should be("B")

}
"not remove existing cache when evaluation of new value fails" in {
val cache = lfuCache[String]()
val latch = new CountDownLatch(1)
val future1 = cache.get(1, () => "A")
val future2: Future[String] = Future.failed(new RuntimeException("Failure"))

cache.get(1) should be(Some(future1))
Await.result(future1, 3.seconds)
latch.countDown()

an[RuntimeException] shouldBe thrownBy {
Await.result(cache.put(1, future2), 3.seconds)
}
cache.get(1) should be(Some(future1))
}
"properly limit capacity" in {
val cache = lfuCache[String](maxCapacity = 3, initialCapacity = 1)
Await.result(cache.get(1, () => "A"), 3.seconds) should be("A")
Expand Down Expand Up @@ -113,7 +157,7 @@ class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll
}
}

override def afterAll() = {
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

Expand Down

0 comments on commit 87d78de

Please sign in to comment.