Skip to content

Commit

Permalink
K8s manifests and code
Browse files Browse the repository at this point in the history
  • Loading branch information
sharma-rohit committed Feb 25, 2018
1 parent d9e13e9 commit eb68724
Show file tree
Hide file tree
Showing 15 changed files with 313 additions and 2 deletions.
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM openjdk:8

ENV SBT_VERSION 1.1.0

RUN \
curl -L -o sbt-$SBT_VERSION.deb https://dl.bintray.com/sbt/debian/sbt-$SBT_VERSION.deb && \
dpkg -i sbt-$SBT_VERSION.deb && \
rm sbt-$SBT_VERSION.deb && \
apt-get update && \
apt-get install sbt && \
sbt sbtVersion

RUN sbt publishLocal

COPY /target/scala-2.12/distributed-cache-assembly-1.0.jar /app/distributed-cache-assembly-1.0.jar

WORKDIR /app
EXPOSE 9000
ENTRYPOINT java -jar distributed-cache-assembly-1.0.jar
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# distributed-cache-on-k8s-poc
[PoC] Distributed Cache with Akka Cluster Sharding and Akka HTTP on Kubernetes
# -distributed-cache-on-k8s-poc
[PoC] Distributed Cache with Akka Cluster Sharding and Akka HTTP on Kubernetes and AWS
12 changes: 12 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name := "distributed-cache-on-k8s-poc"

version := "1.0"

scalaVersion := "2.12.1"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.0.10",
"com.typesafe.akka" %% "akka-cluster" % "2.5.9",
"com.typesafe.akka" %% "akka-cluster-sharding" % "2.5.9",
"org.slf4j" % "slf4j-simple" % "1.7.25" % Test
)
14 changes: 14 additions & 0 deletions kubernetes/headless_service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
kind: Service
apiVersion: v1
metadata:
name: distributed-cache
labels:
app: distributed-cache
spec:
clusterIP: None
selector:
app: distributed-cache
ports:
- port: 2551
targetPort: 2551
protocol: TCP
13 changes: 13 additions & 0 deletions kubernetes/ingress.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: distributed-cache-ingress
spec:
rules:
# DNS name your application should be exposed on
- host: "distributed-cache.com"
http:
paths:
- backend:
serviceName: distributed-cache-service
servicePort: 80
15 changes: 15 additions & 0 deletions kubernetes/service.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Service
metadata:
labels:
app: distributed-cache
name: distributed-cache-service
spec:
selector:
app: distributed-cache
type: ClusterIP
ports:
- port: 80
protocol: TCP
# this needs to match your container port
targetPort: 9000
37 changes: 37 additions & 0 deletions kubernetes/statefulset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: "apps/v1beta2"
kind: StatefulSet
metadata:
name: distributed-cache
spec:
selector:
matchLabels:
app: distributed-cache
serviceName: distributed-cache
replicas: 3
template:
metadata:
labels:
app: distributed-cache
spec:
containers:
- name: distributed-cache
image: "localhost:5000/distributed-cache:0.2.0"
env:
- name: AKKA_ACTOR_SYSTEM_NAME
value: "distributed-cache-system"
- name: AKKA_REMOTING_BIND_PORT
value: "2551"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: AKKA_REMOTING_BIND_DOMAIN
value: "distributed-cache.default.svc.cluster.local"
- name: AKKA_SEED_NODES
value: "distributed-cache-0.distributed-cache.default.svc.cluster.local:2551,distributed-cache-1.distributed-cache.default.svc.cluster.local:2551,distributed-cache-2.distributed-cache.default.svc.cluster.local:2551"
ports:
- containerPort: 2551
readinessProbe:
httpGet:
port: 9000
path: /health
1 change: 1 addition & 0 deletions project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version = 0.13.17
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
26 changes: 26 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
akka {
loggers = ["akka.event.Logging$DefaultLogger"]
loglevel = "INFO"

actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
}

cluster {
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
auto-down-unreachable-after = 10s

jmx {
multi-mbeans-in-same-jvm = on
}

# Enable metrics extension in akka-cluster-metrics.
// extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
}
}


51 changes: 51 additions & 0 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import cluster.ClusterStateInformer
import com.typesafe.config.{ Config, ConfigFactory, ConfigValueFactory }
import http.Route

import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContext.Implicits.global

object Main {

def main(args: Array[String]): Unit = {
println("Starting containers.....")
val config: Config = {
import scala.collection.JavaConverters._
val seedNodes = ClusterSetup.seedNodes()
ConfigFactory.empty()
.withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes.map(seedNode => s"akka.tcp://${ClusterSetup.actorSystemName()}@$seedNode").asJava))
.withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(ClusterSetup.podName() + "." + ClusterSetup.domain()))
.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(ClusterSetup.remoteBindingPort()))
.withFallback(ConfigFactory.load())
.resolve()
}

println(config)

implicit val system: ActorSystem = ActorSystem(ClusterSetup.actorSystemName(), config)
implicit val mat = ActorMaterializer(materializerSettings = Some(ActorMaterializerSettings(system)))
val routes = new Route(system)
Http().bindAndHandle(routes.routes, "0.0.0.0", 9000).onComplete {
case Success(s) => println("Successfully started..")
case Failure(f) => println(f)
}

system.actorOf(ClusterStateInformer.props(), "cluster-informer")
}

}

object ClusterSetup {
def seedNodes(): Iterable[String] = sys.env.get("AKKA_SEED_NODES").map(_.split(",")).get.toIterable

def domain(): String = sys.env.getOrElse("AKKA_REMOTING_BIND_DOMAIN", throw new RuntimeException("No domain found."))

def podName(): String = sys.env.getOrElse("POD_NAME", throw new RuntimeException("No podname found."))

def remoteBindingPort(): String = sys.env.getOrElse("AKKA_REMOTING_BIND_PORT", throw new RuntimeException("No port found."))

def actorSystemName(): String = sys.env.getOrElse("AKKA_ACTOR_SYSTEM_NAME", throw new RuntimeException("No actorsystem name found."))
}
36 changes: 36 additions & 0 deletions src/main/scala/cluster/CacheDataActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cluster

import java.util.UUID

import akka.actor.SupervisorStrategy.Stop
import akka.actor.{ Actor, ActorLogging, Props, ReceiveTimeout }
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.Passivate
import cluster.CacheDataActor.Get

class CacheDataActor extends Actor with ActorLogging {

override def receive: Receive = {
case Get(id) => sender ! s"cached data for id: $id"
case ReceiveTimeout =>
log.info(s"sending Passivate to metadata parent: {${context.parent.path.name}} for ${self.path.name}")
context.parent ! Passivate(stopMessage = Stop)
case Stop =>
context.stop(self)
log.info(s"Passivating metadata actor for ${self.path.name}")
}
}

object CacheDataActor {
final val numOfShards = 50 // Planned num of cluster nodes
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg@Get(id) => (id.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case Get(id) => (id.hashCode() % numOfShards).toString
}

case class Get(id: UUID)

def props: Props = Props(new CacheDataActor())
}
18 changes: 18 additions & 0 deletions src/main/scala/cluster/ClusterShardRegion.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package cluster

import akka.actor.{ ActorRef, ActorSystem }
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings }

class ClusterShardRegion(actorSystem: ActorSystem) {
val clusterShardRegion: ActorRef = ClusterSharding(actorSystem).start(
typeName = ClusterShardRegion.SHARD_REGION_NAME,
entityProps = CacheDataActor.props,
settings = ClusterShardingSettings(actorSystem),
extractEntityId = CacheDataActor.extractEntityId,
extractShardId = CacheDataActor.extractShardId
)
}

object ClusterShardRegion {
val SHARD_REGION_NAME = "cache-data"
}
34 changes: 34 additions & 0 deletions src/main/scala/cluster/ClusterStateInformer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cluster

import akka.actor.{ Actor, ActorLogging, Props }
import akka.cluster.ClusterEvent._
import akka.cluster.{ Cluster, ClusterEvent }

class ClusterStateInformer extends Actor with ActorLogging {
val cluster = Cluster(context.system)

override def preStart(): Unit = {
cluster.subscribe(
subscriber = self,
initialStateMode = ClusterEvent.InitialStateAsEvents,
to = classOf[MemberEvent], classOf[UnreachableMember]
)
}

override def postStop(): Unit = cluster.unsubscribe(self)

override def receive: Receive = {
case MemberJoined(member) => log.info(s"Member ${member.address} Joined")
case MemberUp(member) => log.info("Member is Up: {}", member.address)
case UnreachableMember(member) => log.info("Member detected as unreachable: {}", member)
case MemberRemoved(member, previousStatus) =>
log.info(
"Member is Removed: {} after {}",
member.address, previousStatus)
case me: MemberEvent log.info(s"Received Member event $me for Member: ${me.member.address}")
}
}

object ClusterStateInformer {
def props():Props = Props(new ClusterStateInformer)
}
34 changes: 34 additions & 0 deletions src/main/scala/http/Route.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package http

import akka.actor.ActorSystem
import akka.cluster.sharding.ClusterSharding
import akka.http.scaladsl.marshalling.Marshaller.StringMarshaller
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.pattern.ask
import akka.util.Timeout
import cluster.CacheDataActor.Get
import cluster.ClusterShardRegion

import scala.concurrent.duration._
import scala.util.{ Failure, Success }

class Route(system: ActorSystem) {
implicit val timeout: Timeout = 3.seconds
val shardRegionActor = new ClusterShardRegion(system).clusterShardRegion

def routes: akka.http.scaladsl.server.Route = path("health") {
get {
complete(StatusCodes.OK)
}
} ~ path("cache-data" / JavaUUID) { id =>
get {
onComplete((shardRegionActor ? Get(id)).mapTo[String]) {
case Success(s) => complete(s)
case Failure(f) => complete(f)
}
}
}

}

0 comments on commit eb68724

Please sign in to comment.