Unverified Commit c9ea7fac authored by Nick Song's avatar Nick Song Committed by Kent Yao
Browse files

[KYUUBI #2289] Use unique tag to kill applications

### _Why are the changes needed?_

Use unique tag to kill applications instaed of log
#2289

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests

) locally before make a pull request

Closes #2314 from Nick-0723/kill_app.

Closes #2289

28d5e7c9 [宋财礼] use eventually
8b295197 [宋财礼] Update SparkProcessBuilder.scala
e849cae9 [宋财礼] Update SparkProcessBuilderOnYarnSuite.scala
11780d91 [Nick Song] Unused import
e9b19703 [Nick Song] no need changes
a2bb13e7 [Nick Song] resolve conflicts
1d937f74 [Nick Song] add  application killed successfully test case
e6481412 [Nick Song] use yarn tags kill application
Lead-authored-by: default avatarNick Song <chun2184@163.com>
Co-authored-by: default avatar宋财礼 <31242104+Nick-0723@users.noreply.github.com>
Co-authored-by: default avatar宋财礼 <caili.song@nio.com>
Signed-off-by: default avatarKent Yao <yao@apache.org>
parent 20af38ee
Showing with 123 additions and 56 deletions
+123 -56
......@@ -170,7 +170,8 @@ private[kyuubi] class EngineRef(
// tag is a seq type with comma-separated
conf.set(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") + "KYUUBI")
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
"KYUUBI," + engineRefId)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
......@@ -203,7 +204,10 @@ private[kyuubi] class EngineRef(
}
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage = builder.killApplication()
val killMessage = engineType match {
case SPARK_SQL => builder.killApplication(Left(engineRefId))
case _ => builder.killApplication()
}
process.destroyForcibly()
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
......
......@@ -237,7 +237,11 @@ trait ProcBuilder {
process
}
def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = ""
/**
* Use Left to represent engineRefId and Right to represent line.
*/
def killApplication(clue: Either[String, String] = Right(lastRowsOfLog.toArray.mkString("\n")))
: String = ""
def close(): Unit = synchronized {
if (logCaptureThread != null) {
......
......@@ -78,7 +78,12 @@ class FlinkProcessBuilder(
override protected def commands: Array[String] = Array(executable)
override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
override def killApplication(clue: Either[String, String]): String = clue match {
case Left(_) => ""
case Right(line) => killApplicationByLog(line)
}
def killApplicationByLog(line: String = lastRowsOfLog.toArray.mkString("\n")): String = {
"Job ID: .*".r.findFirstIn(line) match {
case Some(jobIdLine) =>
val jobId = jobIdLine.split("Job ID: ")(1).trim
......
......@@ -20,8 +20,8 @@ package org.apache.kyuubi.engine.spark
import java.io.{File, IOException}
import java.nio.file.Paths
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.matching.Regex
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
......@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_INIT_TIMEOUT, ENGINE_TYPE}
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.AuthTypes
......@@ -44,12 +45,11 @@ class SparkProcessBuilder(
import SparkProcessBuilder._
val yarnClient = getYarnClient
def getYarnClient: YarnClient = YarnClient.createYarnClient
private val sparkHome = getEngineHome(shortName)
override protected val executable: String = {
val sparkHome = getEngineHome("spark")
Paths.get(sparkHome, "bin", SPARK_SUBMIT_FILE).toFile.getCanonicalPath
}
......@@ -101,8 +101,6 @@ class SparkProcessBuilder(
override protected def module: String = "kyuubi-spark-sql-engine"
val YARN_APP_NAME_REGEX: Regex = "application_\\d+_\\d+".r
private def useKeytab(): Boolean = {
val principal = conf.getOption(PRINCIPAL)
val keytab = conf.getOption(KEYTAB)
......@@ -141,33 +139,46 @@ class SparkProcessBuilder(
}
}
override def killApplication(line: String = lastRowsOfLog.toArray.mkString("\n")): String =
YARN_APP_NAME_REGEX.findFirstIn(line) match {
case Some(appId) =>
override def killApplication(clue: Either[String, String]): String = clue match {
case Left(engineRefId) => killApplicationByTag(engineRefId)
case Right(_) => ""
}
private def killApplicationByTag(engineRefId: String): String = {
conf.getOption(MASTER_KEY).orElse(getSparkDefaultsConf().get(MASTER_KEY)) match {
case Some("yarn") =>
var applicationId: ApplicationId = null
val yarnClient = getYarnClient
try {
val yarnConf = new YarnConfiguration(KyuubiHadoopUtils.newHadoopConf(conf))
yarnClient.init(yarnConf)
yarnClient.start()
val applicationId = ApplicationId.fromString(appId)
yarnClient.killApplication(applicationId)
s"Killed Application $appId successfully."
val apps = yarnClient.getApplications(null, null, Set(engineRefId).asJava)
if (apps.isEmpty) return s"There are no Application tagged with $engineRefId," +
s" please kill it manually."
applicationId = apps.asScala.head.getApplicationId
yarnClient.killApplication(
applicationId,
s"Kyuubi killed this caused by: Timeout(${conf.get(ENGINE_INIT_TIMEOUT)} ms) to" +
s" launched ${conf.get(ENGINE_TYPE)} engine with $this.")
s"Killed Application $applicationId tagged with $engineRefId successfully."
} catch {
case e: Throwable =>
s"Failed to kill Application $appId, please kill it manually." +
s" Caused by ${e.getMessage}."
s"Failed to kill Application $applicationId tagged with $engineRefId," +
s" please kill it manually. Caused by ${e.getMessage}."
} finally {
if (yarnClient != null) {
yarnClient.stop()
}
yarnClient.stop()
}
case None => ""
case _ => "Kill Application only works with YARN, please kill it manually." +
s" Application tagged with $engineRefId"
}
}
override protected def shortName: String = "spark"
protected def getSparkDefaultsConf(): Map[String, String] = {
val sparkDefaultsConfFile = env.get(SPARK_CONF_DIR)
.orElse(env.get(SPARK_HOME).map(_ + File.separator + "conf"))
.orElse(Option(s"$sparkHome${File.separator}conf"))
.map(_ + File.separator + SPARK_CONF_FILE_NAME)
.map(new File(_)).filter(_.exists())
Utils.getPropertiesFromFile(sparkDefaultsConfFile)
......@@ -187,7 +198,6 @@ object SparkProcessBuilder {
final private[spark] val KEYTAB = "spark.kerberos.keytab"
// Get the appropriate spark-submit file
final private val SPARK_SUBMIT_FILE = if (Utils.isWindows) "spark-submit.cmd" else "spark-submit"
final private val SPARK_HOME = "SPARK_HOME"
final private val SPARK_CONF_DIR = "SPARK_CONF_DIR"
final private val SPARK_CONF_FILE_NAME = "spark-defaults.conf"
}
......@@ -34,14 +34,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
override protected def env: Map[String, String] = Map("FLINK_HOME" -> "")
}
val exit1 = processBuilder.killApplication(
"""
|[INFO] SQL update statement has been successfully submitted to the cluster:
|Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
|""".stripMargin)
Right("""
|[INFO] SQL update statement has been successfully submitted to the cluster:
|Job ID: 6b1af540c0c0bb3fcfcad50ac037c862
|""".stripMargin))
assert(exit1.contains("6b1af540c0c0bb3fcfcad50ac037c862")
&& !exit1.contains("FLINK_HOME is not set!"))
val exit2 = processBuilder.killApplication("unknow")
val exit2 = processBuilder.killApplication(Right("unknow"))
assert(exit2.equals(""))
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.spark
import java.util.UUID
import scala.concurrent.duration.DurationInt
import org.apache.hadoop.yarn.client.api.YarnClient
import org.scalatestplus.mockito.MockitoSugar.mock
import org.apache.kyuubi.{Utils, WithKyuubiServerOnYarn}
import org.apache.kyuubi.config.KyuubiConf
class SparkProcessBuilderOnYarnSuite extends WithKyuubiServerOnYarn {
override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
override protected val connectionConf: Map[String, String] = Map(
"spark.master" -> "yarn",
"spark.executor.instances" -> "1")
test("test kill application") {
val engineRefId = UUID.randomUUID().toString
conf.set(
SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + ",").getOrElse("") +
"KYUUBI," + engineRefId)
val builder = new SparkProcessBuilder(Utils.currentUser, conf)
val proc = builder.start
eventually(timeout(3.minutes), interval(1.seconds)) {
val killMsg = builder.killApplication(Left(engineRefId))
assert(killMsg.contains(s"tagged with $engineRefId successfully."))
}
proc.destroyForcibly()
val pb1 = new FakeSparkProcessBuilder(conf.clone) {
override protected def env: Map[String, String] = Map()
override def getYarnClient: YarnClient = mock[YarnClient]
}
val exit1 = pb1.killApplication(Left(engineRefId))
assert(exit1.equals(s"There are no Application tagged with $engineRefId," +
s" please kill it manually."))
val pb2 = new FakeSparkProcessBuilder(conf.clone) {
override protected def env: Map[String, String] = Map()
override def getYarnClient: YarnClient = mock[YarnClient]
}
pb2.conf.set("spark.master", "local")
val exit2 = pb2.killApplication(Left(engineRefId))
assert(exit2.equals("Kill Application only works with YARN, please kill it manually." +
s" Application tagged with $engineRefId"))
}
}
......@@ -22,7 +22,6 @@ import java.nio.file.{Files, Path, Paths, StandardOpenOption}
import java.time.Duration
import java.util.concurrent.{Executors, TimeUnit}
import org.apache.hadoop.yarn.client.api.YarnClient
import org.scalatest.time.SpanSugar._
import org.scalatestplus.mockito.MockitoSugar
......@@ -239,32 +238,6 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
assert(b2.mainResource.getOrElse("") != jarPath.toString)
}
test("kill application") {
val pb1 = new FakeSparkProcessBuilder(conf) {
override protected def env: Map[String, String] = Map()
override def getYarnClient: YarnClient = mock[YarnClient]
}
val exit1 = pb1.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
assert(exit1.contains("Killed Application application_1593587619692_20149 successfully."))
val pb2 = new FakeSparkProcessBuilder(conf) {
override protected def env: Map[String, String] = Map()
override def getYarnClient: YarnClient = null
}
val exit2 = pb2.killApplication("21/09/30 17:12:47 INFO yarn.Client: " +
"Application report for application_1593587619692_20149 (state: ACCEPTED)")
assert(exit2.contains("Failed to kill Application application_1593587619692_20149")
&& exit2.contains("Caused by"))
val pb3 = new FakeSparkProcessBuilder(conf) {
override protected def env: Map[String, String] = Map()
override def getYarnClient: YarnClient = mock[YarnClient]
}
val exit3 = pb3.killApplication("unknow")
assert(exit3.equals(""))
}
test("add spark prefix for conf") {
val conf = KyuubiConf(false)
conf.set("kyuubi.kent", "yao")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment