Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache refresh without remove #2385 #2522

Merged
merged 9 commits into from
Jun 6, 2019
Next Next commit
Add put method to replace old cache on successful completion of given…
… future value(#2385)
  • Loading branch information
ashishkujoy committed May 6, 2019
commit 3bfd148b67bcd41828224f40b5ce80d57cb55a6c
22 changes: 18 additions & 4 deletions akka-http-caching/src/main/scala/akka/http/caching/LfuCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import akka.annotation.{ ApiMayChange, InternalApi }

import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.Future
import scala.concurrent.{ ExecutionContext, Future, Promise }
import com.github.benmanes.caffeine.cache.{ AsyncCacheLoader, AsyncLoadingCache, Caffeine }
import akka.http.caching.LfuCache.toJavaMappingFunction
import akka.http.caching.scaladsl.Cache
Expand All @@ -21,6 +21,7 @@ import akka.http.caching.CacheJavaMapping.Implicits._

import scala.compat.java8.FutureConverters._
import scala.compat.java8.FunctionConverters._
import scala.util.{ Failure, Success }

@ApiMayChange
object LfuCache {
Expand Down Expand Up @@ -91,8 +92,8 @@ object LfuCache {
}

//LfuCache requires a loader function on creation - this will not be used.
private def dummyLoader[K, V] = new AsyncCacheLoader[K, V] {
def asyncLoad(k: K, e: Executor) =
private def dummyLoader[K, V]: AsyncCacheLoader[K, V] = new AsyncCacheLoader[K, V] {
def asyncLoad(k: K, e: Executor): CompletableFuture[V] =
Future.failed[V](new RuntimeException("Dummy loader should not be used by LfuCache")).toJava.toCompletableFuture
}

Expand All @@ -111,7 +112,20 @@ private[caching] class LfuCache[K, V](val store: AsyncLoadingCache[K, V]) extend

def apply(key: K, genValue: () ⇒ Future[V]): Future[V] = store.get(key, toJavaMappingFunction[K, V](genValue)).toScala

def getOrLoad(key: K, loadValue: K ⇒ Future[V]) = store.get(key, toJavaMappingFunction[K, V](loadValue)).toScala
def getOrLoad(key: K, loadValue: K ⇒ Future[V]): Future[V] =
store.get(key, toJavaMappingFunction[K, V](loadValue)).toScala

def put(key: K, mayBeValue: Future[V])(implicit ex: ExecutionContext): Future[Unit] = {
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
val cachedBefore = store.synchronous().asMap().containsKey(key)
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
val promise = Promise[Unit]
if (cachedBefore) {
mayBeValue.onComplete {
case Success(value) ⇒ promise.completeWith(Future(store.synchronous().put(key, value)))
raboof marked this conversation as resolved.
Show resolved Hide resolved
case failure: Failure[_] ⇒ promise.failure(failure.exception)
}
} else promise.completeWith(Future(store.put(key, toJava(mayBeValue).toCompletableFuture)))
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
promise.future
}

def remove(key: K): Unit = store.synchronous().invalidate(key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import akka.japi.{ Creator, Procedure }
import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.compat.java8.FutureConverters.{ toJava ⇒ futureToJava, toScala ⇒ futureToScala }
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
import scala.concurrent.{ Future, Promise }
import scala.concurrent.{ ExecutionContext, Future, Promise }

/**
* API MAY CHANGE
Expand Down Expand Up @@ -58,6 +58,12 @@ abstract class Cache[K, V] extends akka.http.caching.javadsl.Cache[K, V] {
override def getOptional(key: K): Optional[CompletionStage[V]] =
Optional.ofNullable(get(key).map(f ⇒ futureToJava(f)).orNull)

/**
* Cache the given future if not cache previously.
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
* Or Replace the old cache on successful completion of given future.
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved
*/
def put(key: K, mayBeValue: Future[V])(implicit ex: ExecutionContext): Future[Unit]

/**
* Removes the cache item for the given key.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future, Promise }

class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll {
implicit val system = ActorSystem()
implicit val system: ActorSystem = ActorSystem()
import system.dispatcher

"An LfuCache" should {
Expand Down Expand Up @@ -59,6 +59,50 @@ class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll
Await.result(future2, 3.seconds) should be("A")
cache.size should be(1)
}
"put given uncached future value" in {
val cache = lfuCache[String]()
val futureValue = "A"
val done = cache.put(1, Future.successful(futureValue))

Await.result(done, 3.seconds)
Await.result(cache.get(1).get, 3.seconds) should be(futureValue)

}
"replace existing cache when evaluation of new value completes" in {
val cache = lfuCache[String]()
val latch = new CountDownLatch(1)
val future1 = cache.get(1, () ⇒ "A")
ashishkujoy marked this conversation as resolved.
Show resolved Hide resolved

cache.get(1) should be(Some(future1))
Await.result(future1, 3.seconds)
val future2 = Future {
latch.await()
"B"
}

val putFuture = cache.put(1, future2)
cache.get(1) should be(Some(future1))
latch.countDown()

Await.result(putFuture, 3.seconds)
Await.result(cache.get(1).get, 3.seconds) should be("B")

}
"not remove existing cache when evaluation of new value fails" in {
val cache = lfuCache[String]()
val latch = new CountDownLatch(1)
val future1 = cache.get(1, () ⇒ "A")
val future2: Future[String] = Future.failed(new RuntimeException("Failure"))

cache.get(1) should be(Some(future1))
Await.result(future1, 3.seconds)
latch.countDown()

an[RuntimeException] shouldBe thrownBy {
Await.result(cache.put(1, future2), 3.seconds)
}
cache.get(1) should be(Some(future1))
}
"properly limit capacity" in {
val cache = lfuCache[String](maxCapacity = 3, initialCapacity = 1)
Await.result(cache.get(1, () ⇒ "A"), 3.seconds) should be("A")
Expand Down Expand Up @@ -113,7 +157,7 @@ class ExpiringLfuCacheSpec extends WordSpec with Matchers with BeforeAndAfterAll
}
}

override def afterAll() = {
override def afterAll(): Unit = {
TestKit.shutdownActorSystem(system)
}

Expand Down