Commit 06178e52 authored by Kent Yao's avatar Kent Yao
Browse files

KyuubiServer Part 1

parent 6dc41157
Showing with 619 additions and 158 deletions
+619 -158
......@@ -17,7 +17,7 @@
#
export KYUUBI_HOME="${KYUUBI_HOME:-"$(cd "`dirname $0`"/..; pwd)"}"
export KYUUBI_HOME="${KYUUBI_HOME:-"$(cd "$(dirname "$0")"/.. || exit; pwd)"}"
export KYUUBI_CONF_DIR="${KYUUBI_CONF_DIR:-"${KYUUBI_HOME}"/conf}"
......@@ -25,7 +25,7 @@ KYUUBI_ENV_SH="${KYUUBI_CONF_DIR}"/kyuubi-env.sh
if [[ -f ${KYUUBI_ENV_SH} ]]; then
set -a
echo "Using kyuubi.sh environment file ${KYUUBI_ENV_SH} to initialize..."
. ${KYUUBI_ENV_SH}
. "${KYUUBI_ENV_SH}"
set +a
else
echo "Warn: Not find kyuubi.sh environment file ${KYUUBI_ENV_SH}, using default ones..."
......@@ -49,7 +49,13 @@ fi
export KYUUBI_SCALA_VERSION="${KYUUBI_SCALA_VERSION:-"2.12"}"
export SPARK_HOME="${SPARK_HOME:-"${KYUUBI_HOME}/externals/spark-3.0.0-bin-hadoop2.7"}"
SPARK_BUILTIN="${KYUUBI_HOME}/externals/spark-3.0.0-bin-hadoop2.7"
if [[ ! -d ${SPARK_BUILTIN} ]]; then
SPARK_BUILTIN="${KYUUBI_HOME}/externals/kyuubi-download/target/spark-3.0.0-bin-hadoop2.7"
fi
export SPARK_HOME="${SPARK_HOME:-"${SPARK_BUILTIN}"}"
# Print essential environment variables to console
echo "JAVA_HOME: ${JAVA_HOME}"
......
......@@ -18,57 +18,34 @@
package org.apache.kyuubi.engine.spark
import java.time.LocalDateTime
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.{CompositeService, FrontendService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.service.SeverLike
import org.apache.kyuubi.util.SignalRegister
class SparkSQLEngine(name: String, spark: SparkSession) extends CompositeService(name) {
private val started = new AtomicBoolean(false)
private val OOMHook = new Runnable { override def run(): Unit = stop() }
private val backendService = new SparkSQLBackendService(spark)
private val frontendService = new FrontendService(backendService, OOMHook)
def connectionUrl: String = frontendService.connectionUrl
private[spark] final class SparkSQLEngine(name: String, spark: SparkSession)
extends SeverLike(name) {
def this(spark: SparkSession) = this(classOf[SparkSQLEngine].getSimpleName, spark)
override def initialize(conf: KyuubiConf): Unit = {
addService(backendService)
addService(frontendService)
super.initialize(conf)
}
override def start(): Unit = {
super.start()
started.set(true)
}
override protected val backendService = new SparkSQLBackendService(spark)
override def stop(): Unit = {
try {
spark.stop()
} catch {
case t: Throwable =>
warn(s"Error stopping spark ${t.getMessage}", t)
} finally {
if (started.getAndSet(false)) {
super.stop()
}
}
}
override protected def stopServer(): Unit = spark.stop()
}
private object SparkSQLEngine extends Logging {
object SparkSQLEngine extends Logging {
val kyuubiConf: KyuubiConf = KyuubiConf()
private val user = Utils.currentUser
def createSpark(): SparkSession = {
val sparkConf = new SparkConf()
sparkConf.setIfMissing("spark.master", "local")
......@@ -76,7 +53,7 @@ private object SparkSQLEngine extends Logging {
val appName = Seq(
"kyuubi",
Utils.currentUser,
user,
classOf[SparkSQLEngine].getSimpleName,
LocalDateTime.now).mkString("_")
......@@ -96,10 +73,9 @@ private object SparkSQLEngine extends Logging {
}
}
SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.getOrCreate()
val session = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
session.sql("SHOW DATABASES")
session
}
......@@ -111,6 +87,27 @@ private object SparkSQLEngine extends Logging {
engine
}
def exposeEngine(engine: SparkSQLEngine): Unit = {
val needExpose = kyuubiConf.get(HA_ZK_QUORUM).nonEmpty
if (needExpose) {
val instance = engine.connectionUrl
val zkNamespacePrefix = kyuubiConf.get(HA_ZK_NAMESPACE)
val postHook = new Thread {
override def run(): Unit = {
while (engine.backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(60 * 1000)
}
engine.stop()
}
}
val serviceDiscovery = new ServiceDiscovery(instance, s"$zkNamespacePrefix-$user", postHook)
serviceDiscovery.initialize(kyuubiConf)
serviceDiscovery.start()
sys.addShutdownHook(serviceDiscovery.stop())
}
}
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
......@@ -119,6 +116,7 @@ private object SparkSQLEngine extends Logging {
try {
spark = createSpark()
engine = startEngine(spark)
exposeEngine(engine)
} catch {
case t: Throwable =>
error("Error start SparkSQLEngine", t)
......
......@@ -21,13 +21,15 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
class GetCatalogs(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_CATALOGS, session) {
override protected def resultSchema: StructType = {
new StructType()
.add("TABLE_CAT", "string", nullable = true, "Catalog name. NULL if not applicable.")
.add(TABLE_CAT, "string", nullable = true, "Catalog name. NULL if not applicable.")
}
override protected def runInternal(): Unit = {
......
......@@ -20,9 +20,10 @@ package org.apache.kyuubi.engine.spark.operation
import java.util.regex.Pattern
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, NumericType, ShortType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, CalendarIntervalType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, NullType, NumericType, ShortType, StringType, StructField, StructType, TimestampType}
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetColumns(
......@@ -61,7 +62,8 @@ class GetColumns(
* For array, map, string, and binaries, the column size is variable, return null as unknown.
*/
private def getColumnSize(typ: DataType): Option[Int] = typ match {
case dt @ (BooleanType | _: NumericType | DateType | TimestampType) => Some(dt.defaultSize)
case dt @ (BooleanType | _: NumericType | DateType | TimestampType | CalendarIntervalType) =>
Some(dt.defaultSize)
case StructType(fields) =>
val sizeArr = fields.map(f => getColumnSize(f.dataType))
if (sizeArr.contains(None)) {
......@@ -123,42 +125,42 @@ class GetColumns(
}
override protected def resultSchema: StructType = {
new StructType()
.add("TABLE_CAT", "string", nullable = true, "Catalog name. NULL if not applicable")
.add("TABLE_SCHEM", "string", nullable = true, "Schema name")
.add("TABLE_NAME", "string", nullable = true, "Table name")
.add("COLUMN_NAME", "string", nullable = true, "Column name")
.add("DATA_TYPE", "int", nullable = true, "SQL type from java.sql.Types")
.add("TYPE_NAME", "string", nullable = true, "Data source dependent type name, for a UDT" +
.add(TABLE_CAT, "string", nullable = true, "Catalog name. NULL if not applicable")
.add(TABLE_SCHEM, "string", nullable = true, "Schema name")
.add(TABLE_NAME, "string", nullable = true, "Table name")
.add(COLUMN_NAME, "string", nullable = true, "Column name")
.add(DATA_TYPE, "int", nullable = true, "SQL type from java.sql.Types")
.add(TYPE_NAME, "string", nullable = true, "Data source dependent type name, for a UDT" +
" the type name is fully qualified")
.add("COLUMN_SIZE", "int", nullable = true, "Column size. For char or date types this is" +
.add(COLUMN_SIZE, "int", nullable = true, "Column size. For char or date types this is" +
" the maximum number of characters, for numeric or decimal types this is precision.")
.add("BUFFER_LENGTH", "tinyint", nullable = true, "Unused")
.add("DECIMAL_DIGITS", "int", nullable = true, "he number of fractional digits")
.add("NUM_PREC_RADIX", "int", nullable = true, "Radix (typically either 10 or 2)")
.add("NULLABLE", "int", nullable = true, "Is NULL allowed")
.add("REMARKS", "string", nullable = true, "Comment describing column (may be null)")
.add("COLUMN_DEF", "string", nullable = true, "Default value (may be null)")
.add("SQL_DATA_TYPE", "int", nullable = true, "Unused")
.add("SQL_DATETIME_SUB", "int", nullable = true, "Unused")
.add("CHAR_OCTET_LENGTH", "int", nullable = true,
.add(BUFFER_LENGTH, "tinyint", nullable = true, "Unused")
.add(DECIMAL_DIGITS, "int", nullable = true, "he number of fractional digits")
.add(NUM_PREC_RADIX, "int", nullable = true, "Radix (typically either 10 or 2)")
.add(NULLABLE, "int", nullable = true, "Is NULL allowed")
.add(REMARKS, "string", nullable = true, "Comment describing column (may be null)")
.add(COLUMN_DEF, "string", nullable = true, "Default value (may be null)")
.add(SQL_DATA_TYPE, "int", nullable = true, "Unused")
.add(SQL_DATETIME_SUB, "int", nullable = true, "Unused")
.add(CHAR_OCTET_LENGTH, "int", nullable = true,
"For char types the maximum number of bytes in the column")
.add("ORDINAL_POSITION", "int", nullable = true, "Index of column in table (starting at 1)")
.add("IS_NULLABLE", "string", nullable = true,
.add(ORDINAL_POSITION, "int", nullable = true, "Index of column in table (starting at 1)")
.add(IS_NULLABLE, "string", nullable = true,
"'NO' means column definitely does not allow NULL values; 'YES' means the column might" +
" allow NULL values. An empty string means nobody knows.")
.add("SCOPE_CATALOG", "string", nullable = true,
.add(SCOPE_CATALOG, "string", nullable = true,
"Catalog of table that is the scope of a reference attribute "
+ "(null if DATA_TYPE isn't REF)")
.add("SCOPE_SCHEMA", "string", nullable = true,
.add(SCOPE_SCHEMA, "string", nullable = true,
"Schema of table that is the scope of a reference attribute "
+ "(null if the DATA_TYPE isn't REF)")
.add("SCOPE_TABLE", "string", nullable = true,
.add(SCOPE_TABLE, "string", nullable = true,
"Table name that this the scope of a reference attribure "
+ "(null if the DATA_TYPE isn't REF)")
.add("SOURCE_DATA_TYPE", "smallint", nullable = true,
.add(SOURCE_DATA_TYPE, "smallint", nullable = true,
"Source type of a distinct type or user-generated Ref type, "
+ "SQL type from java.sql.Types (null if DATA_TYPE isn't DISTINCT or user-generated REF)")
.add("IS_AUTO_INCREMENT", "string", nullable = true,
.add(IS_AUTO_INCREMENT, "string", nullable = true,
"Indicates whether this column is auto incremented.")
}
......
......@@ -23,6 +23,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetFunctions(
......@@ -34,13 +35,13 @@ class GetFunctions(
extends SparkOperation(spark, OperationType.GET_FUNCTIONS, session) {
override protected def resultSchema: StructType = {
new StructType()
.add("FUNCTION_CAT", "string", nullable = true, "Function catalog (may be null)")
.add("FUNCTION_SCHEM", "string", nullable = true, "Function schema (may be null)")
.add("FUNCTION_NAME", "string", nullable = true, "Function name. This is the name used to" +
.add(FUNCTION_CAT, "string", nullable = true, "Function catalog (may be null)")
.add(FUNCTION_SCHEM, "string", nullable = true, "Function schema (may be null)")
.add(FUNCTION_NAME, "string", nullable = true, "Function name. This is the name used to" +
" invoke the function")
.add("REMARKS", "string", nullable = true, "Explanatory comment on the function")
.add("FUNCTION_TYPE", "int", nullable = true, "Kind of function.")
.add("SPECIFIC_NAME", "string", nullable = true, "The name which uniquely identifies this" +
.add(REMARKS, "string", nullable = true, "Explanatory comment on the function")
.add(FUNCTION_TYPE, "int", nullable = true, "Kind of function.")
.add(SPECIFIC_NAME, "string", nullable = true, "The name which uniquely identifies this" +
" function within its schema")
}
......@@ -56,11 +57,7 @@ class GetFunctions(
"",
info.getDb,
info.getName,
"\nUsage:\n" + info.getUsage +
"\nArgs:\n" + info.getArguments +
"\nExamples:\n" + info.getExamples +
"\nSince:\n" + info.getSince +
"\nNote:\n" + info.getNote,
s"Usage: ${info.getUsage}\nExtended Usage:${info.getExtended}",
DatabaseMetaData.functionResultUnknown,
info.getClassName)
}
......
......@@ -23,6 +23,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetSchemas(spark: SparkSession, session: Session, catalogName: String, schema: String)
......@@ -34,8 +35,8 @@ class GetSchemas(spark: SparkSession, session: Session, catalogName: String, sch
override protected def resultSchema: StructType = {
new StructType()
.add("TABLE_SCHEM", "string", nullable = true, "Schema name.")
.add("TABLE_CATALOG", "string", nullable = true, "Catalog name.")
.add(TABLE_SCHEM, "string", nullable = true, "Schema name.")
.add(TABLE_CATALOG, "string", nullable = true, "Catalog name.")
}
override protected def runInternal(): Unit = {
......@@ -44,7 +45,7 @@ class GetSchemas(spark: SparkSession, session: Session, catalogName: String, sch
val databases = spark.sessionState.catalog.listDatabases(schemaPattern)
val globalTmpViewDb = spark.sessionState.catalog.globalTempViewManager.database
if (schema == null ||
Pattern.compile(convertIdentifierPattern(schema, false))
Pattern.compile(convertSchemaPattern(schema, false))
.matcher(globalTmpViewDb).matches()) {
iter = (databases :+ globalTmpViewDb).map(Row(_, "")).toList.iterator
} else {
......
......@@ -22,13 +22,14 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTableTypes(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_TABLE_TYPES, session) {
override protected def resultSchema: StructType = {
new StructType()
.add("TABLE_TYPE", "string", nullable = true, "Table type name.")
.add(TABLE_TYPE, "string", nullable = true, "Table type name.")
}
override protected def runInternal(): Unit = {
......
......@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTables(
......@@ -33,7 +34,7 @@ class GetTables(
schema: String,
tableName: String,
tableTypes: java.util.List[String])
extends SparkOperation(spark, OperationType.GET_SCHEMAS, session) {
extends SparkOperation(spark, OperationType.GET_TABLES, session) {
private def matched(tableType: CatalogTableType): Boolean = {
val commonTableType = tableType match {
......@@ -45,11 +46,11 @@ class GetTables(
override protected def resultSchema: StructType = {
new StructType()
.add("TABLE_CAT", "string", nullable = true, "Catalog name. NULL if not applicable.")
.add("TABLE_SCHEM", "string", nullable = true, "Schema name.")
.add("TABLE_NAME", "string", nullable = true, "Table name.")
.add("TABLE_TYPE", "string", nullable = true, "The table type, e.g. \"TABLE\", \"VIEW\"")
.add("REMARKS", "string", nullable = true, "Comments about the table.")
.add(TABLE_CAT, "string", nullable = true, "Catalog name. NULL if not applicable.")
.add(TABLE_SCHEM, "string", nullable = true, "Schema name.")
.add(TABLE_NAME, "string", nullable = true, "Table name.")
.add(TABLE_TYPE, "string", nullable = true, "The table type, e.g. \"TABLE\", \"VIEW\"")
.add(REMARKS, "string", nullable = true, "Comments about the table.")
.add("TYPE_CAT", "string", nullable = true, "The types catalog." )
.add("TYPE_SCHEM", "string", nullable = true, "The types catalog")
.add("TYPE_NAME", "string", nullable = true, "Type name.")
......@@ -81,7 +82,8 @@ class GetTables(
val views = if (matched(CatalogTableType.VIEW)) {
val globalTempViewDb = catalog.globalTempViewManager.database
(if (Pattern.compile(schemaPattern).matcher(globalTempViewDb).matches()) {
(if (Pattern.compile(convertSchemaPattern(schema, datanucleusFormat = false))
.matcher(globalTempViewDb).matches()) {
catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true)
} else {
catalog.listLocalTempViews(tablePattern)
......
......@@ -23,24 +23,25 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTypeInfo(spark: SparkSession, session: Session)
extends SparkOperation(spark, OperationType.GET_TYPE_INFO, session) {
override protected def resultSchema: StructType = {
new StructType()
.add("TYPE_NAME", "string", nullable = false, "Type name")
.add("DATA_TYPE", "int", nullable = false, "SQL data type from java.sql.Types")
.add("PRECISION", "int", nullable = false, "Maximum precision")
.add(TYPE_NAME, "string", nullable = false, "Type name")
.add(DATA_TYPE, "int", nullable = false, "SQL data type from java.sql.Types")
.add(PRECISION, "int", nullable = false, "Maximum precision")
.add("LITERAL_PREFIX", "string", nullable = true, "Prefix used to quote a literal (may be" +
" null)")
.add("LITERAL_SUFFIX", "string", nullable = true, "Suffix used to quote a literal (may be" +
" null)")
.add("CREATE_PARAMS", "string", nullable = true, "Parameters used in creating the type (may" +
" be null)")
.add("NULLABLE", "smallint", nullable = false, "Can you use NULL for this type")
.add("CASE_SENSITIVE", "boolean", nullable = false, "Is it case sensitive")
.add("SEARCHABLE", "smallint", nullable = false, "Can you use \"WHERE\" based on this type")
.add(NULLABLE, "smallint", nullable = false, "Can you use NULL for this type")
.add(CASE_SENSITIVE, "boolean", nullable = false, "Is it case sensitive")
.add(SEARCHABLE, "smallint", nullable = false, "Can you use 'WHERE' based on this type")
.add("UNSIGNED_ATTRIBUTE", "boolean", nullable = false, "Is it unsigned")
.add("FIXED_PREC_SCALE", "boolean", nullable = false, "Can it be a money value")
.add("AUTO_INCREMENT", "boolean", nullable = false, "Can it be used for an auto-increment" +
......@@ -51,7 +52,7 @@ class GetTypeInfo(spark: SparkSession, session: Session)
.add("MAXIMUM_SCALE", "smallint", nullable = false, "Maximum scale supported")
.add("SQL_DATA_TYPE", "int", nullable = true, "Unused")
.add("SQL_DATETIME_SUB", "int", nullable = true, "Unused")
.add("NUM_PREC_RADIX", "int", nullable = false, "Usually 2 or 10")
.add(NUM_PREC_RADIX, "int", nullable = false, "Usually 2 or 10")
}
private def isNumericType(javaType: Int): Boolean = {
......@@ -59,16 +60,16 @@ class GetTypeInfo(spark: SparkSession, session: Session)
javaType == FLOAT || javaType == FLOAT || javaType == DOUBLE || javaType == DECIMAL
}
private def toRow(name: String, javaType: Int, precision: Option[Int] = None) = {
private def toRow(name: String, javaType: Int, precision: Integer = null): Row = {
Row(name, // TYPE_NAME
javaType, // DATA_TYPE
precision.orNull, // PRECISION
precision, // PRECISION
null, // LITERAL_PREFIX
null, // LITERAL_SUFFIX
null, // CREATE_PARAMS
1.toShort, // NULLABLE
javaType == VARCHAR, // CASE_SENSITIVE
if (javaType <= 2000) 3.toShort else 0.toShort, // SEARCHABLE
if (javaType < 1111) 3.toShort else 0.toShort, // SEARCHABLE
!isNumericType(javaType), // UNSIGNED_ATTRIBUTE
false, // FIXED_PREC_SCALE
false, // AUTO_INCREMENT
......@@ -84,20 +85,22 @@ class GetTypeInfo(spark: SparkSession, session: Session)
override protected def runInternal(): Unit = {
iter = Seq(
toRow("VOID", NULL),
toRow("BOOLEAN", BOOLEAN, null),
toRow("TINYINT", TINYINT, Some(3)),
toRow("SMALLINT", SMALLINT, Some(5)),
toRow("INTEGER", INTEGER, Some(10)),
toRow("BIGINT", BIGINT, Some(19)),
toRow("FLOAT", FLOAT, Some(7)),
toRow("DOUBLE", DOUBLE, Some(15)),
toRow("BOOLEAN", BOOLEAN),
toRow("TINYINT", TINYINT, 3),
toRow("SMALLINT", SMALLINT, 5),
toRow("INTEGER", INTEGER, 10),
toRow("BIGINT", BIGINT, 19),
toRow("FLOAT", FLOAT, 7),
toRow("DOUBLE", DOUBLE, 15),
toRow("VARCHAR", VARCHAR),
toRow("DECIMAL", DECIMAL, Some(38)),
toRow("BINARY", BINARY),
toRow("DECIMAL", DECIMAL, 38),
toRow("DATE", DATE),
toRow("TIMESTAMP", TIMESTAMP),
toRow("ARRAY", ARRAY),
toRow("MAP", JAVA_OBJECT),
toRow("STRUCT", STRUCT)
toRow("STRUCT", STRUCT),
toRow("INTERVAL", OTHER)
).toList.iterator
}
}
......@@ -65,11 +65,11 @@ abstract class SparkOperation(spark: SparkSession, opType: OperationType, sessio
* Convert wildcards and escape sequence of schema pattern from JDBC format to datanucleous/regex
* The schema pattern treats empty string also as wildcard
*/
protected def convertSchemaPattern(pattern: String): String = {
protected def convertSchemaPattern(pattern: String, datanucleusFormat: Boolean = true): String = {
if (StringUtils.isEmpty(pattern)) {
convertPattern("%", datanucleusFormat = true)
convertPattern("%", datanucleusFormat)
} else {
convertPattern(pattern, datanucleusFormat = true)
convertPattern(pattern, datanucleusFormat)
}
}
......
......@@ -36,7 +36,7 @@ object SchemaHelper {
case FloatType => TTypeId.FLOAT_TYPE
case DoubleType => TTypeId.DOUBLE_TYPE
case StringType => TTypeId.STRING_TYPE
case DecimalType() => TTypeId.DECIMAL_TYPE
case _: DecimalType => TTypeId.DECIMAL_TYPE
case DateType => TTypeId.DATE_TYPE
case TimestampType => TTypeId.TIMESTAMP_TYPE
case BinaryType => TTypeId.BINARY_TYPE
......@@ -85,4 +85,6 @@ object SchemaHelper {
}
tTableSchema
}
}
......@@ -22,12 +22,25 @@ import java.util.Locale
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.rpc.thrift.TCLIService
import org.apache.spark.sql.SparkSession
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.{KyuubiFunSuite, Utils}
import org.apache.kyuubi.service.authentication.PlainSASLHelper
trait WithSparkSQLEngine extends KyuubiFunSuite {
val warehousePath = Utils.createTempDir()
val metastorePath = Utils.createTempDir()
warehousePath.toFile.delete()
metastorePath.toFile.delete()
System.setProperty("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
System.setProperty("spark.sql.warehouse.dir", warehousePath.toString)
System.setProperty("spark.sql.hive.metastore.sharedPrefixes", "org.apache.hive.jdbc")
protected val spark: SparkSession = SparkSQLEngine.createSpark()
protected var engine: SparkSQLEngine = _
......@@ -92,4 +105,21 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
protected def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
withMultipleConnectionJdbcStatement(tableNames: _*)(f)
}
protected def withThriftClient(f: TCLIService.Iface => Unit): Unit = {
val hostAndPort = connectionUrl.split(":")
val host = hostAndPort.head
val port = hostAndPort(1).toInt
val socket = new TSocket(host, port)
val transport = PlainSASLHelper.getPlainTransport(Utils.currentUser, "anonymous", socket)
val protocol = new TBinaryProtocol(transport)
val client = new TCLIService.Client(protocol)
transport.open()
try {
f(client)
} finally {
socket.close()
}
}
}
......@@ -17,19 +17,25 @@
package org.apache.kyuubi.engine.spark.operation
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DecimalType, DoubleType, FloatType, IntegerType, MapType, NumericType, StringType, StructType, TimestampType}
import org.apache.hive.service.rpc.thrift.TOpenSessionReq
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
class SparkOperationSuite extends WithSparkSQLEngine {
private val currentCatalog = spark.sessionState.catalogManager.currentCatalog
private val dftSchema = "default"
test("get catalogs") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
val catalogs = metaData.getCatalogs
catalogs.next()
assert(catalogs.getString("TABLE_CAT") === currentCatalog.name())
assert(catalogs.getString(TABLE_CAT) === currentCatalog.name())
assert(!catalogs.next())
}
}
......@@ -43,14 +49,243 @@ class SparkOperationSuite extends WithSparkSQLEngine {
val expected =
Seq("db1", "db2", "default", spark.sharedState.globalTempViewManager.database).iterator
while(resultSet.next()) {
assert(resultSet.getString("TABLE_SCHEM") === expected.next)
assert(resultSet.getString("TABLE_CATALOG").isEmpty)
assert(resultSet.getString(TABLE_SCHEM) === expected.next)
assert(resultSet.getString(TABLE_CATALOG).isEmpty)
}
}
}
test("get tables") {
val table_test = "table_1_test"
val table_external_test = "table_2_test"
val view_test = "view_1_test"
val view_global_test = "view_2_test"
val tables = Seq(table_test, table_external_test, view_test, view_global_test)
val schemas = Seq("default", "default", "default", "global_temp")
val tableTypes = Seq("MANAGED", "EXTERNAL", "VIEW", "VIEW")
withJdbcStatement(view_test, view_global_test, table_test, view_test) { statement =>
statement.execute(
s"CREATE TABLE IF NOT EXISTS $table_test(key int) USING parquet COMMENT '$table_test'")
val loc = Utils.createTempDir()
statement.execute(s"CREATE EXTERNAL TABLE IF NOT EXISTS $table_external_test(key int)" +
s" COMMENT '$table_external_test' LOCATION '$loc'")
statement.execute(s"CREATE VIEW IF NOT EXISTS $view_test COMMENT '$view_test'" +
s" AS SELECT * FROM $table_test")
statement.execute(s"CREATE GLOBAL TEMP VIEW $view_global_test" +
s" COMMENT '$view_global_test' AS SELECT * FROM $table_test")
val metaData = statement.getConnection.getMetaData
val rs1 = metaData.getTables(null, null, null, null)
var i = 0
while(rs1.next()) {
assert(rs1.getString(TABLE_CAT).isEmpty)
assert(rs1.getString(TABLE_SCHEM) === schemas(i))
assert(rs1.getString(TABLE_NAME) == tables(i))
assert(rs1.getString(TABLE_TYPE) == tableTypes(i))
assert(rs1.getString(REMARKS) === tables(i).replace(view_global_test, ""))
i += 1
}
assert(i === 4)
val rs2 = metaData.getTables(null, null, null, Array("VIEW"))
i = 2
while(rs2.next()) {
assert(rs2.getString(TABLE_NAME) == tables(i))
i += 1
}
assert(i === 4)
val rs3 = metaData.getTables(null, null, "table%", Array("VIEW"))
assert(!rs3.next())
val rs4 = metaData.getTables(null, null, "table%", Array("TABLE"))
i = 0
while(rs4.next()) {
assert(rs4.getString(TABLE_NAME) == tables(i))
i += 1
}
assert(i === 2)
val rs5 = metaData.getTables(null, "default", "%", Array("VIEW"))
i = 2
while(rs5.next()) {
assert(rs5.getString(TABLE_NAME) == view_test)
}
}
}
test("get table types") {
withJdbcStatement() { statement =>
val meta = statement.getConnection.getMetaData
val types = meta.getTableTypes
val expected = CatalogTableType.tableTypes.toIterator
while (types.next()) {
assert(types.getString(TABLE_TYPE) === expected.next().name)
}
assert(!expected.hasNext)
assert(!types.next())
}
}
test("get type info") {
withJdbcStatement() { statement =>
val typeInfo = statement.getConnection.getMetaData.getTypeInfo
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "VOID")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.NULL)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "BOOLEAN")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.BOOLEAN)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "TINYINT")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.TINYINT)
assert(typeInfo.getInt(PRECISION) === 3)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "SMALLINT")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.SMALLINT)
assert(typeInfo.getInt(PRECISION) === 5)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "INTEGER")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.INTEGER)
assert(typeInfo.getInt(PRECISION) === 10)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "BIGINT")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.BIGINT)
assert(typeInfo.getInt(PRECISION) === 19)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "FLOAT")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.FLOAT)
assert(typeInfo.getInt(PRECISION) === 7)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "DOUBLE")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.DOUBLE)
assert(typeInfo.getInt(PRECISION) === 15)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "VARCHAR")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.VARCHAR)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "BINARY")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.BINARY)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "DECIMAL")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.DECIMAL)
assert(typeInfo.getInt(PRECISION) === 38)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 10)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "DATE")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.DATE)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "TIMESTAMP")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.TIMESTAMP)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 3)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "ARRAY")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.ARRAY)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 0)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "MAP")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.JAVA_OBJECT)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 0)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "STRUCT")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.STRUCT)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 0)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
typeInfo.next()
assert(typeInfo.getString(TYPE_NAME) === "INTERVAL")
assert(typeInfo.getInt(DATA_TYPE) === java.sql.Types.OTHER)
assert(typeInfo.getInt(PRECISION) === 0)
assert(typeInfo.getShort(NULLABLE) === 1)
assert(!typeInfo.getBoolean(CASE_SENSITIVE))
assert(typeInfo.getShort(SEARCHABLE) === 0)
assert(typeInfo.getInt(NUM_PREC_RADIX) === 0)
}
}
test("get columns operation") {
val schemaName = "default"
val tableName = "spark_get_col_operation"
val schema = new StructType()
.add("c0", "boolean", nullable = false, "0")
......@@ -73,7 +308,7 @@ class SparkOperationSuite extends WithSparkSQLEngine {
val ddl =
s"""
|CREATE TABLE IF NOT EXISTS $schemaName.$tableName (
|CREATE TABLE IF NOT EXISTS $dftSchema.$tableName (
| ${schema.toDDL}
|)
|using parquet""".stripMargin
......@@ -82,7 +317,7 @@ class SparkOperationSuite extends WithSparkSQLEngine {
statement.execute(ddl)
val databaseMetaData = statement.getConnection.getMetaData
val rowSet = databaseMetaData.getColumns("", schemaName, tableName, null)
val rowSet = databaseMetaData.getColumns("", dftSchema, tableName, null)
import java.sql.Types._
val expectedJavaTypes = Seq(BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE,
......@@ -91,21 +326,21 @@ class SparkOperationSuite extends WithSparkSQLEngine {
var pos = 0
while (rowSet.next()) {
assert(rowSet.getString("TABLE_CAT") === null)
assert(rowSet.getString("TABLE_SCHEM") === schemaName)
assert(rowSet.getString("TABLE_NAME") === tableName)
assert(rowSet.getString("COLUMN_NAME") === schema(pos).name)
assert(rowSet.getInt("DATA_TYPE") === expectedJavaTypes(pos))
assert(rowSet.getString("TYPE_NAME") === schema(pos).dataType.sql)
val colSize = rowSet.getInt("COLUMN_SIZE")
assert(rowSet.getString(TABLE_CAT) === null)
assert(rowSet.getString(TABLE_SCHEM) === dftSchema)
assert(rowSet.getString(TABLE_NAME) === tableName)
assert(rowSet.getString(COLUMN_NAME) === schema(pos).name)
assert(rowSet.getInt(DATA_TYPE) === expectedJavaTypes(pos))
assert(rowSet.getString(TYPE_NAME) === schema(pos).dataType.sql)
val colSize = rowSet.getInt(COLUMN_SIZE)
schema(pos).dataType match {
case StringType | BinaryType | _: ArrayType | _: MapType => assert(colSize === 0)
case o => assert(colSize === o.defaultSize)
}
assert(rowSet.getInt("BUFFER_LENGTH") === 0) // not used
val decimalDigits = rowSet.getInt("DECIMAL_DIGITS")
assert(rowSet.getInt(BUFFER_LENGTH) === 0) // not used
val decimalDigits = rowSet.getInt(DECIMAL_DIGITS)
schema(pos).dataType match {
case BooleanType | _: IntegerType => assert(decimalDigits === 0)
case d: DecimalType => assert(decimalDigits === d.scale)
......@@ -115,17 +350,17 @@ class SparkOperationSuite extends WithSparkSQLEngine {
case _ => assert(decimalDigits === 0) // nulls
}
val radix = rowSet.getInt("NUM_PREC_RADIX")
val radix = rowSet.getInt(NUM_PREC_RADIX)
schema(pos).dataType match {
case _: NumericType => assert(radix === 10)
case _ => assert(radix === 0) // nulls
}
assert(rowSet.getInt("NULLABLE") === 1)
assert(rowSet.getString("REMARKS") === pos.toString)
assert(rowSet.getInt("ORDINAL_POSITION") === pos)
assert(rowSet.getString("IS_NULLABLE") === "YES")
assert(rowSet.getString("IS_AUTO_INCREMENT") === "NO")
assert(rowSet.getInt(NULLABLE) === 1)
assert(rowSet.getString(REMARKS) === pos.toString)
assert(rowSet.getInt(ORDINAL_POSITION) === pos)
assert(rowSet.getString(IS_NULLABLE) === "YES")
assert(rowSet.getString(IS_AUTO_INCREMENT) === "NO")
pos += 1
}
......@@ -133,4 +368,35 @@ class SparkOperationSuite extends WithSparkSQLEngine {
}
}
test("get functions") {
withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
Seq("to_timestamp", "date_part", "lpad", "date_format", "cos", "sin").foreach { func =>
val resultSet = metaData.getFunctions("", dftSchema, func)
while (resultSet.next()) {
val exprInfo = FunctionRegistry.expressions(func)._1
assert(resultSet.getString(FUNCTION_CAT).isEmpty)
assert(resultSet.getString(FUNCTION_SCHEM) === null)
assert(resultSet.getString(FUNCTION_NAME) === exprInfo.getName)
assert(resultSet.getString(REMARKS) ===
s"Usage: ${exprInfo.getUsage}\nExtended Usage:${exprInfo.getExtended}")
assert(resultSet.getString(SPECIFIC_NAME) === exprInfo.getClassName)
}
}
}
}
test("get functions operation") {
withThriftClient { client =>
val req = new TOpenSessionReq()
req.setUsername("kentyao")
req.setPassword("anonymous")
// req.setClient_protocol(BackendService.SERVER_VERSION)
req
val resp = client.OpenSession(req)
val sessionHandle = resp.getSessionHandle
}
}
}
......@@ -73,6 +73,12 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
......@@ -116,10 +122,6 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
......@@ -140,10 +142,6 @@
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
</exclusion>
<exclusion>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
......@@ -156,14 +154,6 @@
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
</exclusion>
<exclusion>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</exclusion>
<exclusion>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
......
......@@ -97,6 +97,15 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
case (k, v) => (k.substring(dropped.length), v)
}
}
/** Copy this object */
override def clone: KyuubiConf = {
val cloned = KyuubiConf(false)
settings.entrySet().asScala.foreach { e =>
cloned.set(e.getKey, e.getValue)
}
cloned
}
}
object KyuubiConf {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.operation.meta
object ResultSetSchemaConstant {
/**
* String type.
* Catalog name. NULL if not applicable
*/
final val TABLE_CAT = "TABLE_CAT"
final val TABLE_CATALOG = "TABLE_CATALOG"
/**
* String.
* Schema name
*/
final val TABLE_SCHEM = "TABLE_SCHEM"
/**
* String.
* Table Name
*/
final val TABLE_NAME = "TABLE_NAME"
final val TABLE_TYPE = "TABLE_TYPE"
final val REMARKS = "REMARKS"
final val COLUMN_NAME = "COLUMN_NAME"
final val DATA_TYPE = "DATA_TYPE"
final val TYPE_NAME = "TYPE_NAME"
final val COLUMN_SIZE = "COLUMN_SIZE"
final val BUFFER_LENGTH = "BUFFER_LENGTH"
final val DECIMAL_DIGITS = "DECIMAL_DIGITS"
final val NUM_PREC_RADIX = "NUM_PREC_RADIX"
/**
* Short
* Can you use NULL for this type?
*/
final val NULLABLE = "NULLABLE"
final val COLUMN_DEF = "COLUMN_DEF"
final val SQL_DATA_TYPE = "SQL_DATA_TYPE"
final val SQL_DATETIME_SUB = "SQL_DATETIME_SUB"
final val CHAR_OCTET_LENGTH = "CHAR_OCTET_LENGTH"
final val ORDINAL_POSITION = "ORDINAL_POSITION"
final val IS_NULLABLE = "IS_NULLABLE"
final val SCOPE_CATALOG = "SCOPE_CATALOG"
final val SCOPE_SCHEMA = "SCOPE_SCHEMA"
final val SCOPE_TABLE = "SCOPE_TABLE"
final val SOURCE_DATA_TYPE = "SOURCE_DATA_TYPE"
final val IS_AUTO_INCREMENT = "IS_AUTO_INCREMENT"
/**
* Maximum precision
*/
final val PRECISION = "PRECISION"
/**
* Boolean
* Is it case sensitive?
*/
final val CASE_SENSITIVE = "CASE_SENSITIVE"
/**
* Short
* Can you use 'WHERE' based on this type?
*/
final val SEARCHABLE = "SEARCHABLE"
final val FUNCTION_CAT = "FUNCTION_CAT"
final val FUNCTION_SCHEM = "FUNCTION_SCHEM"
final val FUNCTION_NAME = "FUNCTION_NAME"
final val FUNCTION_TYPE = "FUNCTION_TYPE"
final val SPECIFIC_NAME = "SPECIFIC_NAME"
}
......@@ -17,8 +17,6 @@
package org.apache.kyuubi.service
import scala.collection.JavaConverters._
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion, TRowSet, TTableSchema}
import org.apache.kyuubi.config.KyuubiConf
......@@ -37,8 +35,8 @@ abstract class AbstractBackendService(name: String)
user: String,
password: String,
ipAddr: String,
configs: java.util.Map[String, String]): SessionHandle = {
sessionManager.openSession(protocol, user, password, ipAddr, configs.asScala.toMap)
configs: Map[String, String]): SessionHandle = {
sessionManager.openSession(protocol, user, password, ipAddr, configs)
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
......
......@@ -41,7 +41,7 @@ trait BackendService {
user: String,
password: String,
ipAddr: String,
configs: java.util.Map[String, String]): SessionHandle
configs: Map[String, String]): SessionHandle
def closeSession(sessionHandle: SessionHandle): Unit
def getInfo(sessionHandle: SessionHandle, infoType: TGetInfoType): TGetInfoValue
......
......@@ -20,6 +20,8 @@ package org.apache.kyuubi.service
import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import org.apache.hive.service.rpc.thrift._
import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
......@@ -161,8 +163,10 @@ class FrontendService private (name: String, be: BackendService, oomHook: Runnab
val userName = getUserName(req)
val ipAddress = authFactory.getIpAddress.orNull
val protocol = getMinVersion(BackendService.SERVER_VERSION, req.getClient_protocol)
val configuration =
Option(req.getConfiguration).map(_.asScala.toMap).getOrElse(Map.empty[String, String])
val sessionHandle = be.openSession(
protocol, userName, req.getPassword, ipAddress, req.getConfiguration)
protocol, userName, req.getPassword, ipAddress, configuration)
res.setServerProtocolVersion(protocol)
sessionHandle
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.service
import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf
abstract class SeverLike(name: String) extends CompositeService(name) {
private val OOMHook = new Runnable { override def run(): Unit = stop() }
private val started = new AtomicBoolean(false)
protected val backendService: AbstractBackendService
private lazy val frontendService = new FrontendService(backendService, OOMHook)
def connectionUrl: String = frontendService.connectionUrl
override def initialize(conf: KyuubiConf): Unit = {
addService(backendService)
addService(frontendService)
super.initialize(conf)
}
override def start(): Unit = {
super.start()
started.set(true)
}
protected def stopServer(): Unit
override def stop(): Unit = {
try {
stopServer()
} catch {
case t: Throwable =>
warn(s"Error stopping spark ${t.getMessage}", t)
} finally {
if (started.getAndSet(false)) {
super.stop()
}
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment