Skip to content

Commit

Permalink
[KYUUBI #369] Using a lightweight zk client for kyuubi session
Browse files Browse the repository at this point in the history
![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![Closes #369](https://badgen.net/badge/Preview/Closes%20%23369/blue)](https://github.com/yaooqinn/kyuubi/pull/369) ![60](https://badgen.net/badge/%2B/60/red) ![34](https://badgen.net/badge/-/34/green) ![3](https://badgen.net/badge/commits/3/yellow) ![Target Issue](https://badgen.net/badge/Missing/Target%20Issue/ff0000) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/yaooqinn/kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

1. limit zkClient to method wide
2. a more lightweight retry policy
3. add some trace log for Kyuubi session
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

Closes #369 from yaooqinn/zk.

5cbe14f [Kent Yao] typo
731bae8 [Kent Yao] typo
51365da [Kent Yao] Using a lightweight zk client for kyuubi session

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Feb 25, 2021
1 parent 4b13f1c commit d94b1c4
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ abstract class AbstractSession(
val conf: Map[String, String],
val sessionManager: SessionManager) extends Session with Logging {

protected def logSessionInfo(msg: String): Unit = info(s"[$user:$ipAddress] $handle - $msg")

private final val _handle: SessionHandle = SessionHandle(protocol)
override def handle: SessionHandle = _handle

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import javax.security.auth.login.Configuration

import scala.collection.JavaConverters._

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode
import org.apache.curator.framework.state.{ConnectionState, ConnectionStateListener}
Expand Down Expand Up @@ -216,7 +218,16 @@ object ServiceDiscovery {
* Use the [[ZooKeeperACLProvider]] to create appropriate ACLs
*/
def startZookeeperClient(conf: KyuubiConf): CuratorFramework = {
val client = buildZookeeperClient(conf)
val connectionStr = conf.get(HA_ZK_QUORUM)
val sessionTimeout = conf.get(HA_ZK_SESSION_TIMEOUT)
val connectionTimeout = conf.get(HA_ZK_CONN_TIMEOUT)
val retryPolicy = new ExponentialBackoffRetry(1000, 3)
val client = CuratorFrameworkFactory.builder()
.connectString(connectionStr)
.sessionTimeoutMs(sessionTimeout)
.connectionTimeoutMs(connectionTimeout)
.retryPolicy(retryPolicy)
.build()
client.start()
client
}
Expand Down Expand Up @@ -250,4 +261,22 @@ object ServiceDiscovery {
val zkEnsemble = conf.get(HA_ZK_QUORUM)
zkEnsemble != null && zkEnsemble.nonEmpty
}

def getServerHost(zkClient: CuratorFramework, namespace: String): Option[(String, Int)] = {
try {
val hosts = zkClient.getChildren.forPath(namespace)
// TODO: use last one because to avoid touching some maybe-crashed engines
// We need a big improvement here.
hosts.asScala.lastOption.map { p =>
val path = ZKPaths.makePath(namespace, p)
val hostPort = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
val strings = hostPort.split(":")
val host = strings.head
val port = strings(1).toInt
(host, port)
}
} catch {
case _: Exception => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class KyuubiOperation(
override def getOperationLog: Option[OperationLog] = None

protected def onError(action: String = "operating"): PartialFunction[Throwable, Unit] = {
case e: Exception =>
case e: Throwable =>
state.synchronized {
if (isTerminalState(state)) {
warn(s"Ignore exception in terminal state with $statementId: $e")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.kyuubi.session

import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.curator.utils.ZKPaths
import org.apache.hive.service.rpc.thrift.{TCLIService, TCloseSessionReq, TOpenSessionReq, TProtocolVersion, TSessionHandle}
import org.apache.thrift.TException
import org.apache.thrift.protocol.TBinaryProtocol
Expand All @@ -36,7 +34,7 @@ import org.apache.kyuubi.engine.{ShareLevel, SQLEngineAppName}
import org.apache.kyuubi.engine.ShareLevel.{SERVER, ShareLevel}
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ServiceDiscovery._
import org.apache.kyuubi.service.authentication.PlainSASLHelper

class KyuubiSessionImpl(
Expand Down Expand Up @@ -84,37 +82,18 @@ class KyuubiSessionImpl(

private val appZkNamespace: String = boundAppName.getZkNamespace(sessionConf.get(HA_ZK_NAMESPACE))

private lazy val zkClient = ServiceDiscovery.startZookeeperClient(sessionConf)
private val timeout: Long = sessionConf.get(ENGINE_INIT_TIMEOUT)

private var transport: TTransport = _
private var client: TCLIService.Client = _
private var remoteSessionHandle: TSessionHandle = _

private def getServerHost: Option[(String, Int)] = {
try {
val hosts = zkClient.getChildren.forPath(appZkNamespace)
// TODO: use last one because to avoid touching some maybe-crashed engines
// We need a big improvement here.
hosts.asScala.lastOption.map { p =>
val path = ZKPaths.makePath(appZkNamespace, p)
val hostPort = new String(zkClient.getData.forPath(path), StandardCharsets.UTF_8)
val strings = hostPort.split(":")
val host = strings.head
val port = strings(1).toInt
(host, port)
}
} catch {
case _: Exception => None
}
}

override def open(): Unit = {
super.open()
// Init zookeeper client here to capture errors
zkClient
val zkClient = startZookeeperClient(sessionConf)
logSessionInfo(s"Connected to Zookeeper")
try {
getServerHost match {
getServerHost(zkClient, appZkNamespace) match {
case Some((host, port)) => openSession(host, port)
case None =>
sessionConf.setIfMissing(SparkProcessBuilder.APP_KEY, boundAppName.toString)
Expand All @@ -125,9 +104,9 @@ class KyuubiSessionImpl(
sessionConf.set(HA_ZK_NAMESPACE, appZkNamespace)
val builder = new SparkProcessBuilder(appUser, sessionConf.toSparkPrefixedConf)
try {
info(s"Launching SQL engine: $builder")
logSessionInfo(s"Launching SQL engine:\n$builder")
val process = builder.start
var sh = getServerHost
var sh = getServerHost(zkClient, appZkNamespace)
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (sh.isEmpty) {
Expand All @@ -142,7 +121,7 @@ class KyuubiSessionImpl(
throw KyuubiSQLException(s"Timed out($timeout ms) to launched Spark with $builder",
builder.getError)
}
sh = getServerHost
sh = getServerHost(zkClient, appZkNamespace)
}
val Some((host, port)) = sh
openSession(host, port)
Expand All @@ -166,13 +145,19 @@ class KyuubiSessionImpl(
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
transport = PlainSASLHelper.getPlainTransport(
user, passwd, new TSocket(host, port, loginTimeout))
if (!transport.isOpen) transport.open()
if (!transport.isOpen) {
logSessionInfo(s"Connecting to engine [$host:$port]")
transport.open()
logSessionInfo(s"Connected to engine [$host:$port]")
}
client = new TCLIService.Client(new TBinaryProtocol(transport))
val req = new TOpenSessionReq()
req.setUsername(user)
req.setPassword(passwd)
req.setConfiguration(conf.asJava)
logSessionInfo(s"Sending TOpenSessionReq to engine [$host:$port]")
val resp = client.OpenSession(req)
logSessionInfo(s"Received TOpenSessionResp from engine [$host:$port]")
ThriftUtils.verifyTStatus(resp.getStatus)
remoteSessionHandle = resp.getSessionHandle
sessionManager.operationManager.setConnection(handle, client, remoteSessionHandle)
Expand Down
6 changes: 3 additions & 3 deletions kyuubi-main/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ log4j.rootLogger=DEBUG, CA, FA
log4j.appender.CA=org.apache.log4j.ConsoleAppender
log4j.appender.CA.layout=org.apache.log4j.PatternLayout
log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
log4j.appender.CA.Threshold = WARN
log4j.appender.CA.Threshold = INFO


#File Appender
log4j.appender.FA=org.apache.log4j.FileAppender
log4j.appender.FA.append=false
log4j.appender.FA.file=target/unit-tests.log
log4j.appender.FA.layout=org.apache.log4j.PatternLayout
log4j.appender.FA.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss.SSS} %t %p %c{2}: %m%n
log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{2}: %m%n

# Set the logger level of File Appender to WARN
log4j.appender.FA.Threshold = DEBUG
log4j.appender.FA.Threshold = DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.kyuubi.operation

import java.nio.file.Files

import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ZK_ACL_ENABLED, HA_ZK_QUORUM}
Expand All @@ -29,8 +31,10 @@ trait WithKyuubiServer extends KyuubiFunSuite {

private var zkServer: EmbeddedZkServer = _
private var server: KyuubiServer = _
private val metastore = Utils.createTempDir()

override def beforeAll(): Unit = {
Files.delete(metastore)
zkServer = new EmbeddedZkServer()
conf.set(KyuubiConf.EMBEDDED_ZK_PORT, -1)
val zkData = Utils.createTempDir()
Expand All @@ -39,6 +43,8 @@ trait WithKyuubiServer extends KyuubiFunSuite {
zkServer.start()

conf.set("spark.ui.enabled", "false")
conf.set("spark.hadoop.javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
conf.set(KyuubiConf.FRONTEND_BIND_PORT, 0)
conf.set(KyuubiConf.ENGINE_CHECK_INTERVAL, 4000L)
conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 10000L)
Expand Down
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -925,9 +925,13 @@
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>TestSuite.txt</filereports>
<environmentVariables>
<KYUUBI_WORK_DIR_ROOT>${project.build.directory}/work</KYUUBI_WORK_DIR_ROOT>
</environmentVariables>
<systemProperties>
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
<java.io.tmpdir>${project.build.directory}/tmp</java.io.tmpdir>
<spark.driver.memory>2g</spark.driver.memory>
</systemProperties>
</configuration>
<executions>
Expand Down

0 comments on commit d94b1c4

Please sign in to comment.