Unverified Commit e54d4310 authored by Paul Lin's avatar Paul Lin Committed by Kent Yao
Browse files

[KYUUBI #1883] Support max result rows for Flink queries

### _Why are the changes needed?_

Currently, Flink engine would pull all result rows into memory before returning it to the client. This would be problematic for large result sets and infinite result sets.

This is a sub-task of KPIP-2 https://github.com/apache/incubator-kyuubi/issues/1322.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests

) locally before make a pull request

Closes #1938 from link3280/feature/FLINK-1883.

Closes #1883

80020cee [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
1b958221 [Paul Lin] [KYUUBI #1883] Avoid allocating too much buffer space
5be7535c [Paul Lin] [KYUUBI #1883] Support max result rows for Flink queries
Authored-by: default avatarPaul Lin <paullin3280@gmail.com>
Signed-off-by: default avatarKent Yao <yao@apache.org>
Showing with 43 additions and 7 deletions
+43 -7
...@@ -318,6 +318,7 @@ Key | Default | Meaning | Type | Since ...@@ -318,6 +318,7 @@ Key | Default | Meaning | Type | Since
<code>kyuubi.session.conf.restrict.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div> <code>kyuubi.session.conf.restrict.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of restricted keys. If the client connection contains any of them, the connection will be rejected explicitly during engine bootstrap and connection setup. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering but will not forbid users to set dynamic configurations via SET syntax.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.2.0</div>
<code>kyuubi.session.engine.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div> <code>kyuubi.session.engine.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT1M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The check interval for engine timeout</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.flink.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div> <code>kyuubi.session.engine.flink.main.resource</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The package used to create Flink SQL engine remote job. If it is undefined, Kyuubi will use the default</div>|<div style='width: 30pt'>string</div>|<div style='width: 20pt'>1.4.0</div>
<code>kyuubi.session.engine.flink.max.rows</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>1000000</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Max rows of Flink query results. For batch queries, rows that exceeds the limit would be ignored. For streaming queries, the query would be canceled if the limit is reached.</div>|<div style='width: 30pt'>int</div>|<div style='width: 20pt'>1.5.0</div>
<code>kyuubi.session.engine.idle.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div> <code>kyuubi.session.engine.idle.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT30M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>engine timeout, the engine will self-terminate when it's not accessed for this duration. 0 or negative means not to self-terminate.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.initialize.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div> <code>kyuubi.session.engine.initialize.timeout</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT3M</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>Timeout for starting the background engine, e.g. SparkSQLEngine.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.0.0</div>
<code>kyuubi.session.engine.launch.async</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session, whether to launch backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div> <code>kyuubi.session.engine.launch.async</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>When opening kyuubi session, whether to launch backend engine asynchronously. When true, the Kyuubi server will set up the connection with the client without delay as the backend engine will be created asynchronously.</div>|<div style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.4.0</div>
......
...@@ -41,7 +41,8 @@ class ExecuteStatement( ...@@ -41,7 +41,8 @@ class ExecuteStatement(
session: Session, session: Session,
override val statement: String, override val statement: String,
override val shouldRunAsync: Boolean, override val shouldRunAsync: Boolean,
queryTimeout: Long) queryTimeout: Long,
resultMaxRows: Int)
extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging { extends FlinkOperation(OperationType.EXECUTE_STATEMENT, session) with Logging {
private val operationLog: OperationLog = private val operationLog: OperationLog =
...@@ -132,12 +133,16 @@ class ExecuteStatement( ...@@ -132,12 +133,16 @@ class ExecuteStatement(
while (loop) { while (loop) {
Thread.sleep(50) // slow the processing down Thread.sleep(50) // slow the processing down
val result = executor.snapshotResult(sessionId, resultId, 2) val pageSize = Math.min(500, resultMaxRows)
val result = executor.snapshotResult(sessionId, resultId, pageSize)
result.getType match { result.getType match {
case TypedResult.ResultType.PAYLOAD => case TypedResult.ResultType.PAYLOAD =>
rows.clear()
(1 to result.getPayload).foreach { page => (1 to result.getPayload).foreach { page =>
rows ++= executor.retrieveResultPage(resultId, page).asScala if (rows.size < resultMaxRows) {
rows ++= executor.retrieveResultPage(resultId, page).asScala
} else {
loop = false
}
} }
case TypedResult.ResultType.EOS => loop = false case TypedResult.ResultType.EOS => loop = false
case TypedResult.ResultType.EMPTY => case TypedResult.ResultType.EMPTY =>
...@@ -147,7 +152,7 @@ class ExecuteStatement( ...@@ -147,7 +152,7 @@ class ExecuteStatement(
resultSet = ResultSet.builder resultSet = ResultSet.builder
.resultKind(ResultKind.SUCCESS_WITH_CONTENT) .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(resultDescriptor.getResultSchema.getColumns) .columns(resultDescriptor.getResultSchema.getColumns)
.data(rows.toArray[Row]) .data(rows.slice(0, resultMaxRows).toArray[Row])
.build .build
} finally { } finally {
if (resultId != null) { if (resultId != null) {
......
...@@ -33,6 +33,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage ...@@ -33,6 +33,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY) private lazy val operationModeDefault = getConf.get(OPERATION_PLAN_ONLY)
private lazy val resultMaxRowsDefault = getConf.get(ENGINE_FLINK_MAX_ROWS)
override def newExecuteStatementOperation( override def newExecuteStatementOperation(
session: Session, session: Session,
statement: String, statement: String,
...@@ -43,9 +45,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage ...@@ -43,9 +45,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
val mode = flinkSession.sessionContext.getConfigMap.getOrDefault( val mode = flinkSession.sessionContext.getConfigMap.getOrDefault(
OPERATION_PLAN_ONLY.key, OPERATION_PLAN_ONLY.key,
operationModeDefault) operationModeDefault)
val resultMaxRows =
flinkSession.normalizedConf.getOrElse(
ENGINE_FLINK_MAX_ROWS.key,
resultMaxRowsDefault.toString).toInt
val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match { val op = OperationModes.withName(mode.toUpperCase(Locale.ROOT)) match {
case NONE => case NONE =>
new ExecuteStatement(session, statement, runAsync, queryTimeout) new ExecuteStatement(session, statement, runAsync, queryTimeout, resultMaxRows)
case mode => case mode =>
new PlanOnlyStatement(session, statement, mode) new PlanOnlyStatement(session, statement, mode)
} }
......
...@@ -761,4 +761,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper { ...@@ -761,4 +761,18 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
.getStringVal.getValues.get(0) === "tmp.hello") .getStringVal.getValues.get(0) === "tmp.hello")
} }
} }
test("ensure result max rows") {
withSessionConf()(Map(KyuubiConf.ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
withJdbcStatement() { statement =>
statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
val resultSet = statement.executeQuery(s"select a from tbl_src")
var rows = 0
while (resultSet.next()) {
rows += 1
}
assert(rows === 200)
}
}
}
} }
...@@ -116,7 +116,8 @@ class LegacyFlinkOperationSuite extends KyuubiFunSuite { ...@@ -116,7 +116,8 @@ class LegacyFlinkOperationSuite extends KyuubiFunSuite {
} }
test("execute statement - select column name with dots") { test("execute statement - select column name with dots") {
val executeStatementOp = new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1) val executeStatementOp =
new ExecuteStatement(flinkSession, "select 'tmp.hello'", false, -1, 500)
val executor = createLocalExecutor val executor = createLocalExecutor
executor.openSession("test-session") executor.openSession("test-session")
executeStatementOp.setExecutor(executor) executeStatementOp.setExecutor(executor)
......
...@@ -594,6 +594,15 @@ object KyuubiConf { ...@@ -594,6 +594,15 @@ object KyuubiConf {
.stringConf .stringConf
.createOptional .createOptional
val ENGINE_FLINK_MAX_ROWS: ConfigEntry[Int] =
buildConf("session.engine.flink.max.rows")
.doc("Max rows of Flink query results. For batch queries, rows that exceeds the limit " +
"would be ignored. For streaming queries, the query would be canceled if the limit " +
"is reached.")
.version("1.5.0")
.intConf
.createWithDefault(1000000)
val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] = val ENGINE_TRINO_MAIN_RESOURCE: OptionalConfigEntry[String] =
buildConf("session.engine.trino.main.resource") buildConf("session.engine.trino.main.resource")
.doc("The package used to create Trino engine remote job. If it is undefined," + .doc("The package used to create Trino engine remote job. If it is undefined," +
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment