Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for multi Scala version & updated Spark-network commons to the latest version #3

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: scala

scala:
- 2.11.8
- 2.12.8

script:
- sbt clean 'project krapsRpc' 'coverage' 'test' 'coverageReport'
Expand Down
34 changes: 20 additions & 14 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
name := "kraps-rpc"

scalaVersion := "2.11.8"
lazy val scala212 = "2.12.8"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala212, scala211)

lazy val root = (project in file(".")).aggregate(krapsRpc, krapsRpcExample)
lazy val root = (project in file("."))
.aggregate(krapsRpc, krapsRpcExample)
.settings(crossScalaVersions := Nil, publish / skip := true)

lazy val commonSettings = Seq(
organization := "neoremind",
version := "1.0.1-SNAPSHOT",
scalaVersion := "2.11.8",
version := "1.0.2-SNAPSHOT",
crossScalaVersions := supportedScalaVersions,
//scalaVersion := "2.12.8",
publishMavenStyle := true,
organizationName := "neoremind",
organizationHomepage := Some(url("http://neoremind.net")),
Expand All @@ -28,16 +33,17 @@ lazy val krapsRpc = (project in file("kraps-core"))
.settings(
name := "kraps-core",
libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.7",
"org.slf4j" % "slf4j-log4j12" % "1.7.7",
"com.google.guava" % "guava" % "15.0",
"org.apache.spark" %% "spark-network-common" % "2.1.0",
"de.ruedigermoeller" % "fst" % "2.50",
"org.scalatest" %% "scalatest" % "2.2.6" % "test",
"org.scalamock" %% "scalamock-scalatest-support" % "3.2.2" % "test",
"org.hamcrest" % "hamcrest-core" % "1.3" % "test",
"org.hamcrest" % "hamcrest-library" % "1.3" % "test",
"junit" % "junit" % "4.11" % "test"
"org.slf4j" % "slf4j-api" % "1.7.7",
"org.slf4j" % "slf4j-log4j12" % "1.7.7",
"com.google.guava" % "guava" % "15.0",
"org.apache.spark" %% "spark-network-common" % "2.4.0",
"org.apache.spark" %% "spark-tags" % "2.4.0",
"de.ruedigermoeller" % "fst" % "2.50",
"org.scalatest" %% "scalatest" % "3.0.6" % "test",
"org.scalamock" %% "scalamock-scalatest-support" % "3.4.1" % "test",
"org.hamcrest" % "hamcrest-core" % "1.3" % "test",
"org.hamcrest" % "hamcrest-library" % "1.3" % "test",
"junit" % "junit" % "4.11" % "test"
)
)

Expand Down
86 changes: 0 additions & 86 deletions kraps-core/pom.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ trait RpcEndpoint {

/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[RpcException]] will be thrown and sent to `onError`.
* unmatched message, [[net.neoremind.kraps.RpcException]] will be thrown and sent to `onError`.
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new RpcException(self + " does not implement 'receive'")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package net.neoremind.kraps.rpc

import java.util.concurrent.TimeoutException

import net.neoremind.kraps.util.Utils
import net.neoremind.kraps.{RpcConf, RpcException}

import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import net.neoremind.kraps.RpcConf
import net.neoremind.kraps.RpcException
import net.neoremind.kraps.util.Utils

/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package net.neoremind.kraps.rpc.netty

import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import javax.annotation.concurrent.GuardedBy

import javax.annotation.concurrent.GuardedBy
import net.neoremind.kraps.RpcException
import net.neoremind.kraps.rpc.{RpcEndpoint, RpcEndpointAddress, RpcEndpointRef, RpcEnvStoppedException}
import net.neoremind.kraps.util.ThreadUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package net.neoremind.kraps.rpc.netty

import javax.annotation.concurrent.GuardedBy

import net.neoremind.kraps.RpcException
import net.neoremind.kraps.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package net.neoremind.kraps.rpc.netty

import java.lang
import java.util.Map

import net.neoremind.kraps.RpcConf
import org.apache.spark.network.util.{ConfigProvider, TransportConf}

Expand All @@ -35,7 +38,7 @@ object KrapsTransportConf {
/**
* Utility for creating a [[KrapsTransportConf]] from a [[net.neoremind.kraps.RpcConf]].
*
* @param conf the [[RpcConf]]
* @param conf the [[net.neoremind.kraps.RpcConf]]
* @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
Expand All @@ -51,6 +54,8 @@ object KrapsTransportConf {

new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)

override def getAll: lang.Iterable[Map.Entry[String, String]] = ??? //conf.getAll.toMap
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.net.{InetSocketAddress, URI}
import java.nio.ByteBuffer
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.Nullable

import javax.annotation.Nullable
import net.neoremind.kraps.RpcConf
import net.neoremind.kraps.rpc._
import net.neoremind.kraps.serializer.{JavaSerializer, JavaSerializerInstance}
Expand Down Expand Up @@ -79,7 +79,7 @@ class NettyRpcEnv(
private val stopped = new AtomicBoolean(false)

/**
* A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
* A map for [[net.neoremind.kraps.rpc.RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
* we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
*/
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package net.neoremind.kraps.rpc.netty

import java.nio.ByteBuffer
import java.util.concurrent.Callable
import javax.annotation.concurrent.GuardedBy

import javax.annotation.concurrent.GuardedBy
import net.neoremind.kraps.RpcException
import net.neoremind.kraps.rpc.{RpcAddress, RpcEnvStoppedException}
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import net.neoremind.kraps.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
import net.neoremind.kraps.util.Utils

/**
* An [[RpcEndpoint]] for remote [[RpcEnv]]s to query if an `RpcEndpoint` exists.
* An [[net.neoremind.kraps.rpc.RpcEndpoint]] for remote [[net.neoremind.kraps.rpc.RpcEnv]]s to query if an `RpcEndpoint` exists.
*
* This is used when setting up a remote endpoint reference.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package net.neoremind.kraps.serializer

import java.io._
import java.nio.ByteBuffer
import javax.annotation.concurrent.NotThreadSafe

import javax.annotation.concurrent.NotThreadSafe
import net.neoremind.kraps.util.NextIterator

import scala.reflect.ClassTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package net.neoremind.kraps.util


import net.neoremind.kraps.RpcConf
import net.neoremind.kraps.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}


object RpcUtils {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package net.neoremind.kraps.util
import java.io.IOException
import java.net.BindException

import net.neoremind.kraps.{RpcConf, RpcException}
import io.netty.channel.unix.Errors.NativeIoException
import net.neoremind.kraps.{RpcConf, RpcException}
import org.apache.spark.network.util.JavaUtils
import org.slf4j.LoggerFactory

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package net.neoremind.kraps
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{CountDownLatch, TimeUnit}

import net.neoremind.kraps.rpc._
import net.neoremind.kraps.rpc.netty.NettyRpcEnvFactory
import net.neoremind.kraps.rpc._
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -65,11 +65,20 @@ abstract class BaseRpcTest extends FlatSpec with BeforeAndAfter with Matchers {
*/
val clientCallWaitTimeInSec = "30s"

val defRpcConf = {
val rpcConf = new RpcConf()
rpcConf.set("spark.rpc.serialization.stream.factory",
"net.neoremind.kraps.serializer.FstSerializationStreamFactory")
rpcConf
}

def runServerAndAwaitTermination(block: => Unit,
rpcConf: RpcConf = new RpcConf(),
host: String = "localhost",
port: Int = _port.get()) = {
val future = Future {
rpcConf.set("spark.rpc.serialization.stream.factory",
"net.neoremind.kraps.serializer.FstSerializationStreamFactory")
val config = RpcEnvServerConfig(rpcConf, "hello-server", host, port)
serverRpcEnv = NettyRpcEnvFactory.create(config)
block
Expand All @@ -78,7 +87,7 @@ abstract class BaseRpcTest extends FlatSpec with BeforeAndAfter with Matchers {
}
future.onComplete {
case scala.util.Success(value) => log.info(s"Shut down server on host=$host, port=" + port)
case scala.util.Failure(e) => log.error(e.getMessage, e)
case scala.util.Failure(e) => log.error(e.getMessage, e)
}
}

Expand All @@ -93,7 +102,8 @@ abstract class BaseRpcTest extends FlatSpec with BeforeAndAfter with Matchers {
Thread.sleep(200)
val config = RpcEnvClientConfig(rpcConf, "test-client")
rpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(host, port), endpointName)
val endPointRef: RpcEndpointRef =
rpcEnv.setupEndpointRef(RpcAddress(host, port), endpointName)
log.info(s"created $endPointRef")
val future = runBlock(endPointRef)
future.onComplete(assertBlock)
Expand All @@ -118,7 +128,8 @@ abstract class BaseRpcTest extends FlatSpec with BeforeAndAfter with Matchers {
startedCountDownLatch.await(serverStartTimeoutInMs, TimeUnit.MILLISECONDS)
val config = RpcEnvClientConfig(rpcConf, "test-client")
rpcEnv = NettyRpcEnvFactory.create(config)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(host, port), endpointName)
val endPointRef: RpcEndpointRef =
rpcEnv.setupEndpointRef(RpcAddress(host, port), endpointName)
log.info(s"created $endPointRef")
runBlock(endPointRef)
} catch {
Expand All @@ -130,4 +141,4 @@ abstract class BaseRpcTest extends FlatSpec with BeforeAndAfter with Matchers {
if (rpcEnv != null) rpcEnv.shutdown()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package net.neoremind.kraps

import net.neoremind.kraps.rpc.netty.NettyRpcEnvFactory
import net.neoremind.kraps.rpc._
import net.neoremind.kraps.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcEnvClientConfig}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package net.neoremind.kraps


import net.neoremind.kraps.rpc._
import net.neoremind.kraps.rpc.netty.NettyRpcEnvFactory
import net.neoremind.kraps.rpc._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
Expand Down
Loading