From f1eb449c42b820b8ad3f4d388bb6320acf70bb1b Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 17 Mar 2023 13:39:30 +0800 Subject: [PATCH] [KYUUBI #4544] Initial implement Kyuubi Chat Engine MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### _Why are the changes needed?_ Introduce a brand new CHAT engine, it's supposed to support different backends, e.g. ChatGPT, 文心一言, etc. This PR implements the following providers: - ECHO, simply replies a welcome message. - GPT: a.k.a ChatGPT, powered by OpenAI, which requires a API key for authentication. https://platform.openai.com/account/api-keys Add the following configurations in `kyuubi-defaults.conf` ``` kyuubi.engine.chat.provider=[ECHO|GPT] kyuubi.engine.chat.gpt.apiKey= ``` Open an ECHO beeline chat engine. ``` beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=CHAT;kyuubi.engine.chat.provider=ECHO' ``` ``` Connecting to jdbc:hive2://localhost:10009/ Connected to: Kyuubi Chat Engine (version 1.8.0-SNAPSHOT) Driver: Kyuubi Project Hive JDBC Client (version 1.7.0) Beeline version 1.7.0 by Apache Kyuubi 0: jdbc:hive2://localhost:10009/> Hello, Kyuubi!; +----------------------------------------+ | reply | +----------------------------------------+ | This is ChatKyuubi, nice to meet you! | +----------------------------------------+ 1 row selected (0.397 seconds) ``` Open a ChatGPT beeline chat engine. (make sure your network can connect the open API and configure the API key) ``` beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=CHAT;kyuubi.engine.chat.provider=GPT' ``` image ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [x] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4544 from pan3793/chatgpt. Closes #4544 87bdebb6d [Cheng Pan] nit f7dee18f3 [Cheng Pan] Update docs 9beb55162 [cxzl25] chat api (#1) af38bdc7c [Cheng Pan] update docs 9aa6d83a6 [Cheng Pan] Initial implement Kyuubi Chat Engine Lead-authored-by: Cheng Pan Co-authored-by: cxzl25 Signed-off-by: Cheng Pan --- build/dist | 13 ++ docs/deployment/settings.md | 113 +++++++-------- externals/kyuubi-chat-engine/pom.xml | 89 ++++++++++++ .../engine/chat/ChatBackendService.scala | 28 ++++ .../kyuubi/engine/chat/ChatEngine.scala | 86 ++++++++++++ .../chat/ChatTBinaryFrontendService.scala | 34 +++++ .../engine/chat/operation/ChatOperation.scala | 100 ++++++++++++++ .../chat/operation/ChatOperationManager.scala | 130 ++++++++++++++++++ .../chat/operation/ExecuteStatement.scala | 66 +++++++++ .../chat/provider/ChatGPTProvider.scala | 84 +++++++++++ .../engine/chat/provider/ChatProvider.scala | 59 ++++++++ .../engine/chat/provider/EchoProvider.scala | 28 ++++ .../kyuubi/engine/chat/provider/Message.scala | 20 +++ .../kyuubi/engine/chat/schema/RowSet.scala | 108 +++++++++++++++ .../engine/chat/schema/SchemaHelper.scala | 56 ++++++++ .../engine/chat/session/ChatSessionImpl.scala | 68 +++++++++ .../chat/session/ChatSessionManager.scala | 71 ++++++++++ .../src/test/resources/log4j2-test.xml | 42 ++++++ .../kyuubi/engine/chat/WithChatEngine.scala | 60 ++++++++ .../chat/operation/ChatOperationSuite.scala | 40 ++++++ .../org/apache/kyuubi/config/KyuubiConf.scala | 46 +++++++ .../org/apache/kyuubi/engine/EngineType.scala | 2 +- .../org/apache/kyuubi/engine/EngineRef.scala | 5 +- .../engine/chat/ChatProcessBuilder.scala | 115 ++++++++++++++++ pom.xml | 3 +- 25 files changed, 1409 insertions(+), 57 deletions(-) create mode 100644 externals/kyuubi-chat-engine/pom.xml create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatBackendService.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatEngine.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatTBinaryFrontendService.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationManager.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatGPTProvider.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatProvider.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/EchoProvider.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/Message.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/RowSet.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/SchemaHelper.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionImpl.scala create mode 100644 externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionManager.scala create mode 100644 externals/kyuubi-chat-engine/src/test/resources/log4j2-test.xml create mode 100644 externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/WithChatEngine.scala create mode 100644 externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationSuite.scala create mode 100644 kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala diff --git a/build/dist b/build/dist index c155dce3ff4..e0dae3479b8 100755 --- a/build/dist +++ b/build/dist @@ -256,6 +256,7 @@ mkdir -p "$DISTDIR/externals/engines/spark" mkdir -p "$DISTDIR/externals/engines/trino" mkdir -p "$DISTDIR/externals/engines/hive" mkdir -p "$DISTDIR/externals/engines/jdbc" +mkdir -p "$DISTDIR/externals/engines/chat" echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE" echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE" echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE" @@ -313,6 +314,18 @@ for jar in $(ls "$DISTDIR/jars/"); do fi done +# Copy chat engines +cp "$KYUUBI_HOME/externals/kyuubi-chat-engine/target/kyuubi-chat-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/chat/" +cp -r "$KYUUBI_HOME"/externals/kyuubi-chat-engine/target/scala-$SCALA_VERSION/jars/*.jar "$DISTDIR/externals/engines/chat/" + +# Share the jars w/ server to reduce binary size +# shellcheck disable=SC2045 +for jar in $(ls "$DISTDIR/jars/"); do + if [[ -f "$DISTDIR/externals/engines/chat/$jar" ]]; then + (cd $DISTDIR/externals/engines/chat; ln -snf "../../../jars/$jar" "$DISTDIR/externals/engines/chat/$jar") + fi +done + # Copy kyuubi tools if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes" diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md index a2474a50603..9c5f7671209 100644 --- a/docs/deployment/settings.md +++ b/docs/deployment/settings.md @@ -118,60 +118,65 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Engine -| Key | Default | Meaning | Type | Since | -|----------------------------------------------------------|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 | -| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | seq | 1.2.0 | -| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | seq | 1.2.0 | -| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 | -| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 | -| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.
  • Local Path: start with 'file://'
  • HDFS Path: start with 'hdfs://'
| string | 1.3.0 | -| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 | -| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc | string | 1.6.0 | -| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine | string | 1.6.0 | -| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine | string | 1.6.0 | -| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | -| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 | -| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 | -| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 | -| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 | -| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 | -| kyuubi.engine.jdbc.connection.provider | <undefined> | The connection provider is used for getting a connection from the server | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 | -| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 | -| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 | -| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 | -| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 | -| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 | -| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 | -| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | -| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | -| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 | -| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session.
  • RANDOM - Randomly use the engine in the pool
  • POLLING - Polling use the engine in the pool
| string | 1.7.0 | -| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | -| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | -| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | -| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are:
  • CONNECTION: engine will not be shared but only used by the current client connection
  • USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.subdomain
  • GROUP: the engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the group name as the effective username, so here the group name is in value of special user who is able to visit the computing resources/data of the team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level.
  • SERVER: the App will be shared by Kyuubi servers
| string | 1.2.0 | -| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | -| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 | -| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | -| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | -| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | -| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | -| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | -| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned. | duration | 1.7.1 | -| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | -| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | -| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | -| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 | -| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are:
  • SPARK_SQL: specify this engine type will launch a Spark engine which can provide all the capacity of the Apache Spark. Note, it's a default engine type.
  • FLINK_SQL: specify this engine type will launch a Flink engine which can provide all the capacity of the Apache Flink.
  • TRINO: specify this engine type will launch a Trino engine which can provide all the capacity of the Trino.
  • HIVE_SQL: specify this engine type will launch a Hive engine which can provide all the capacity of the Hive Server2.
  • JDBC: specify this engine type will launch a JDBC engine which can provide a MySQL protocol connector, for now we only support Doris dialect.
| string | 1.4.0 | -| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | -| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | -| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 | -| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 | -| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 | -| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 | +| Key | Default | Meaning | Type | Since | +|----------------------------------------------------------|---------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.engine.chat.extra.classpath | <undefined> | The extra classpath for the Chat engine, for configuring the location of the SDK and etc. | string | 1.8.0 | +| kyuubi.engine.chat.gpt.apiKey | <undefined> | The key to access OpenAI open API, which could be got at https://platform.openai.com/account/api-keys | string | 1.8.0 | +| kyuubi.engine.chat.java.options | <undefined> | The extra Java options for the Chat engine | string | 1.8.0 | +| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 | +| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates:
  • ECHO: simply replies a welcome message.
  • GPT: a.k.a ChatGPT, powered by OpenAI.
| string | 1.8.0 | +| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 | +| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | seq | 1.2.0 | +| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | seq | 1.2.0 | +| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 | +| kyuubi.engine.deregister.job.max.failures | 4 | Number of failures of job before deregistering the engine. | int | 1.2.0 | +| kyuubi.engine.event.json.log.path | file:///tmp/kyuubi/events | The location where all the engine events go for the built-in JSON logger.
  • Local Path: start with 'file://'
  • HDFS Path: start with 'hdfs://'
| string | 1.3.0 | +| kyuubi.engine.event.loggers | SPARK | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: User-defined event handlers.
Note that: Kyuubi supports custom event handlers with the Java SPI. To register a custom event handler, the user needs to implement a subclass of `org.apache.kyuubi.events.handler.CustomEventHandlerProvider` which has a zero-arg constructor. | seq | 1.3.0 | +| kyuubi.engine.flink.extra.classpath | <undefined> | The extra classpath for the Flink SQL engine, for configuring the location of hadoop client jars, etc | string | 1.6.0 | +| kyuubi.engine.flink.java.options | <undefined> | The extra Java options for the Flink SQL engine | string | 1.6.0 | +| kyuubi.engine.flink.memory | 1g | The heap memory for the Flink SQL engine | string | 1.6.0 | +| kyuubi.engine.hive.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | +| kyuubi.engine.hive.extra.classpath | <undefined> | The extra classpath for the Hive query engine, for configuring location of the hadoop client jars and etc. | string | 1.6.0 | +| kyuubi.engine.hive.java.options | <undefined> | The extra Java options for the Hive query engine | string | 1.6.0 | +| kyuubi.engine.hive.memory | 1g | The heap memory for the Hive query engine | string | 1.6.0 | +| kyuubi.engine.initialize.sql | SHOW DATABASES | SemiColon-separated list of SQL statements to be initialized in the newly created engine before queries. i.e. use `SHOW DATABASES` to eagerly active HiveClient. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.2.0 | +| kyuubi.engine.jdbc.connection.password | <undefined> | The password is used for connecting to server | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.properties || The additional properties are used for connecting to server | seq | 1.6.0 | +| kyuubi.engine.jdbc.connection.provider | <undefined> | The connection provider is used for getting a connection from the server | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.url | <undefined> | The server url that engine will connect to | string | 1.6.0 | +| kyuubi.engine.jdbc.connection.user | <undefined> | The user is used for connecting to server | string | 1.6.0 | +| kyuubi.engine.jdbc.driver.class | <undefined> | The driver class for JDBC engine connection | string | 1.6.0 | +| kyuubi.engine.jdbc.extra.classpath | <undefined> | The extra classpath for the JDBC query engine, for configuring the location of the JDBC driver and etc. | string | 1.6.0 | +| kyuubi.engine.jdbc.java.options | <undefined> | The extra Java options for the JDBC query engine | string | 1.6.0 | +| kyuubi.engine.jdbc.memory | 1g | The heap memory for the JDBC query engine | string | 1.6.0 | +| kyuubi.engine.jdbc.type | <undefined> | The short name of JDBC type | string | 1.6.0 | +| kyuubi.engine.operation.convert.catalog.database.enabled | true | When set to true, The engine converts the JDBC methods of set/get Catalog and set/get Schema to the implementation of different engines | boolean | 1.6.0 | +| kyuubi.engine.operation.log.dir.root | engine_operation_logs | Root directory for query operation log at engine-side. | string | 1.4.0 | +| kyuubi.engine.pool.name | engine-pool | The name of the engine pool. | string | 1.5.0 | +| kyuubi.engine.pool.selectPolicy | RANDOM | The select policy of an engine from the corresponding engine pool engine for a session.
  • RANDOM - Randomly use the engine in the pool
  • POLLING - Polling use the engine in the pool
| string | 1.7.0 | +| kyuubi.engine.pool.size | -1 | The size of the engine pool. Note that, if the size is less than 1, the engine pool will not be enabled; otherwise, the size of the engine pool will be min(this, kyuubi.engine.pool.size.threshold). | int | 1.4.0 | +| kyuubi.engine.pool.size.threshold | 9 | This parameter is introduced as a server-side parameter controlling the upper limit of the engine pool. | int | 1.4.0 | +| kyuubi.engine.session.initialize.sql || SemiColon-separated list of SQL statements to be initialized in the newly created engine session before queries. This configuration can not be used in JDBC url due to the limitation of Beeline/JDBC driver. | seq | 1.3.0 | +| kyuubi.engine.share.level | USER | Engines will be shared in different levels, available configs are:
  • CONNECTION: engine will not be shared but only used by the current client connection
  • USER: engine will be shared by all sessions created by a unique username, see also kyuubi.engine.share.level.subdomain
  • GROUP: the engine will be shared by all sessions created by all users belong to the same primary group name. The engine will be launched by the group name as the effective username, so here the group name is in value of special user who is able to visit the computing resources/data of the team. It follows the [Hadoop GroupsMapping](https://reurl.cc/xE61Y5) to map user to a primary group. If the primary group is not found, it fallback to the USER level.
  • SERVER: the App will be shared by Kyuubi servers
| string | 1.2.0 | +| kyuubi.engine.share.level.sub.domain | <undefined> | (deprecated) - Using kyuubi.engine.share.level.subdomain instead | string | 1.2.0 | +| kyuubi.engine.share.level.subdomain | <undefined> | Allow end-users to create a subdomain for the share level of an engine. A subdomain is a case-insensitive string values that must be a valid zookeeper subpath. For example, for the `USER` share level, an end-user can share a certain engine within a subdomain, not for all of its clients. End-users are free to create multiple engines in the `USER` share level. When disable engine pool, use 'default' if absent. | string | 1.4.0 | +| kyuubi.engine.single.spark.session | false | When set to true, this engine is running in a single session mode. All the JDBC/ODBC connections share the temporary views, function registries, SQL configuration and the current database. | boolean | 1.3.0 | +| kyuubi.engine.spark.event.loggers | SPARK | A comma-separated list of engine loggers, where engine/session/operation etc events go.
  • SPARK: the events will be written to the Spark listener bus.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | +| kyuubi.engine.spark.python.env.archive | <undefined> | Portable Python env archive used for Spark engine Python language mode. | string | 1.7.0 | +| kyuubi.engine.spark.python.env.archive.exec.path | bin/python | The Python exec path under the Python env archive. | string | 1.7.0 | +| kyuubi.engine.spark.python.home.archive | <undefined> | Spark archive containing $SPARK_HOME/python directory, which is used to init session Python worker for Python language mode. | string | 1.7.0 | +| kyuubi.engine.submit.timeout | PT30S | Period to tolerant Driver Pod ephemerally invisible after submitting. In some Resource Managers, e.g. K8s, the Driver Pod is not invisible immediately after `spark-submit` is returned. | duration | 1.7.1 | +| kyuubi.engine.trino.event.loggers | JSON | A comma-separated list of engine history loggers, where engine/session/operation etc events go.
  • JSON: the events will be written to the location of kyuubi.engine.event.json.log.path
  • JDBC: to be done
  • CUSTOM: to be done.
| seq | 1.7.0 | +| kyuubi.engine.trino.extra.classpath | <undefined> | The extra classpath for the Trino query engine, for configuring other libs which may need by the Trino engine | string | 1.6.0 | +| kyuubi.engine.trino.java.options | <undefined> | The extra Java options for the Trino query engine | string | 1.6.0 | +| kyuubi.engine.trino.memory | 1g | The heap memory for the Trino query engine | string | 1.6.0 | +| kyuubi.engine.type | SPARK_SQL | Specify the detailed engine supported by Kyuubi. The engine type bindings to SESSION scope. This configuration is experimental. Currently, available configs are:
  • SPARK_SQL: specify this engine type will launch a Spark engine which can provide all the capacity of the Apache Spark. Note, it's a default engine type.
  • FLINK_SQL: specify this engine type will launch a Flink engine which can provide all the capacity of the Apache Flink.
  • TRINO: specify this engine type will launch a Trino engine which can provide all the capacity of the Trino.
  • HIVE_SQL: specify this engine type will launch a Hive engine which can provide all the capacity of the Hive Server2.
  • JDBC: specify this engine type will launch a JDBC engine which can provide a MySQL protocol connector, for now we only support Doris dialect.
  • CHAT: specify this engine type will launch a Chat engine.
| string | 1.4.0 | +| kyuubi.engine.ui.retainedSessions | 200 | The number of SQL client sessions kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | +| kyuubi.engine.ui.retainedStatements | 200 | The number of statements kept in the Kyuubi Query Engine web UI. | int | 1.4.0 | +| kyuubi.engine.ui.stop.enabled | true | When true, allows Kyuubi engine to be killed from the Spark Web UI. | boolean | 1.3.0 | +| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 | +| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 | +| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 | ### Event diff --git a/externals/kyuubi-chat-engine/pom.xml b/externals/kyuubi-chat-engine/pom.xml new file mode 100644 index 00000000000..7e217891848 --- /dev/null +++ b/externals/kyuubi-chat-engine/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + org.apache.kyuubi + kyuubi-parent + 1.8.0-SNAPSHOT + ../../pom.xml + + + kyuubi-chat-engine_2.12 + jar + Kyuubi Project Engine Chat + https://kyuubi.apache.org/ + + + + + org.apache.kyuubi + kyuubi-common_${scala.binary.version} + ${project.version} + + + + org.apache.kyuubi + kyuubi-ha_${scala.binary.version} + ${project.version} + + + + org.apache.httpcomponents + httpclient + + + + org.apache.kyuubi + kyuubi-common_${scala.binary.version} + ${project.version} + test-jar + test + + + + org.apache.kyuubi + ${hive.jdbc.artifact} + ${project.version} + test + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + prepare-test-jar + + test-jar + + test-compile + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatBackendService.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatBackendService.scala new file mode 100644 index 00000000000..fdc710e2ccd --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatBackendService.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat + +import org.apache.kyuubi.engine.chat.session.ChatSessionManager +import org.apache.kyuubi.service.AbstractBackendService +import org.apache.kyuubi.session.SessionManager + +class ChatBackendService + extends AbstractBackendService("ChatBackendService") { + + override val sessionManager: SessionManager = new ChatSessionManager() + +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatEngine.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatEngine.scala new file mode 100644 index 00000000000..c1fdea9538c --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatEngine.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat + +import ChatEngine.currentEngine + +import org.apache.kyuubi.{Logging, Utils} +import org.apache.kyuubi.Utils.{addShutdownHook, JDBC_ENGINE_SHUTDOWN_PRIORITY} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY +import org.apache.kyuubi.ha.client.RetryPolicies +import org.apache.kyuubi.service.Serverable +import org.apache.kyuubi.util.SignalRegister + +class ChatEngine extends Serverable("ChatEngine") { + + override val backendService = new ChatBackendService() + override val frontendServices = Seq(new ChatTBinaryFrontendService(this)) + + override def start(): Unit = { + super.start() + // Start engine self-terminating checker after all services are ready and it can be reached by + // all servers in engine spaces. + backendService.sessionManager.startTerminatingChecker(() => { + currentEngine.foreach(_.stop()) + }) + } + + override protected def stopServer(): Unit = {} +} + +object ChatEngine extends Logging { + + val kyuubiConf: KyuubiConf = KyuubiConf() + + var currentEngine: Option[ChatEngine] = None + + def startEngine(): Unit = { + currentEngine = Some(new ChatEngine()) + currentEngine.foreach { engine => + engine.initialize(kyuubiConf) + engine.start() + addShutdownHook( + () => { + engine.stop() + }, + JDBC_ENGINE_SHUTDOWN_PRIORITY + 1) + } + } + + def main(args: Array[String]): Unit = { + SignalRegister.registerLogger(logger) + + try { + Utils.fromCommandLineArgs(args, kyuubiConf) + kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString) + + startEngine() + } catch { + case t: Throwable if currentEngine.isDefined => + currentEngine.foreach { engine => + engine.stop() + } + error("Failed to create Chat Engine", t) + throw t + case t: Throwable => + error("Failed to create Chat Engine.", t) + throw t + } + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatTBinaryFrontendService.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatTBinaryFrontendService.scala new file mode 100644 index 00000000000..80702c97c3c --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/ChatTBinaryFrontendService.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat + +import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery} +import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService} + +class ChatTBinaryFrontendService(override val serverable: Serverable) + extends TBinaryFrontendService("ChatTBinaryFrontend") { + + /** + * An optional `ServiceDiscovery` for [[FrontendService]] to expose itself + */ + override lazy val discoveryService: Option[Service] = + if (ServiceDiscovery.supportServiceDiscovery(conf)) { + Some(new EngineServiceDiscovery(this)) + } else { + None + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala new file mode 100644 index 00000000000..38527cbf1f8 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperation.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat.operation + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.{KyuubiSQLException, Utils} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.chat.schema.{RowSet, SchemaHelper} +import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState} +import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation} +import org.apache.kyuubi.session.Session + +abstract class ChatOperation(session: Session) extends AbstractOperation(session) { + + protected var iter: FetchIterator[Array[String]] = _ + + protected lazy val conf: KyuubiConf = session.sessionManager.getConf + + override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = { + validateDefaultFetchOrientation(order) + assertState(OperationState.FINISHED) + setHasResultSet(true) + order match { + case FETCH_NEXT => + iter.fetchNext() + case FETCH_PRIOR => + iter.fetchPrior(rowSetSize) + case FETCH_FIRST => + iter.fetchAbsolute(0) + } + + val taken = iter.take(rowSetSize) + val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion) + resultRowSet.setStartRowOffset(iter.getPosition) + resultRowSet + } + + override def cancel(): Unit = { + cleanup(OperationState.CANCELED) + } + + override def close(): Unit = { + cleanup(OperationState.CLOSED) + } + + protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = { + // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError` + // could be thrown. + case e: Throwable => + state.synchronized { + val errMsg = Utils.stringifyException(e) + if (state == OperationState.TIMEOUT) { + val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg") + setOperationException(ke) + throw ke + } else if (isTerminalState(state)) { + setOperationException(KyuubiSQLException(errMsg)) + warn(s"Ignore exception in terminal state with $statementId: $errMsg") + } else { + error(s"Error operating $opType: $errMsg", e) + val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e) + setOperationException(ke) + setState(OperationState.ERROR) + throw ke + } + } + } + + override protected def beforeRun(): Unit = { + setState(OperationState.PENDING) + setHasResultSet(true) + } + + override protected def afterRun(): Unit = {} + + override def getResultSetMetadata: TGetResultSetMetadataResp = { + val tTableSchema = SchemaHelper.stringTTableSchema("reply") + val resp = new TGetResultSetMetadataResp + resp.setSchema(tTableSchema) + resp.setStatus(OK_STATUS) + resp + } + + override def shouldRunAsync: Boolean = false +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationManager.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationManager.scala new file mode 100644 index 00000000000..1e89165176e --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationManager.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat.operation + +import java.util + +import org.apache.kyuubi.KyuubiSQLException +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.chat.provider.ChatProvider +import org.apache.kyuubi.operation.{Operation, OperationManager} +import org.apache.kyuubi.session.Session + +class ChatOperationManager( + conf: KyuubiConf, + chatProvider: ChatProvider) extends OperationManager("ChatOperationManager") { + + override def newExecuteStatementOperation( + session: Session, + statement: String, + confOverlay: Map[String, String], + runAsync: Boolean, + queryTimeout: Long): Operation = { + val executeStatement = + new ExecuteStatement( + session, + statement, + runAsync, + queryTimeout, + chatProvider) + addOperation(executeStatement) + } + + override def newGetTypeInfoOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetCatalogsOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetSchemasOperation( + session: Session, + catalog: String, + schema: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetTablesOperation( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + tableTypes: util.List[String]): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetTableTypesOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetColumnsOperation( + session: Session, + catalogName: String, + schemaName: String, + tableName: String, + columnName: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetFunctionsOperation( + session: Session, + catalogName: String, + schemaName: String, + functionName: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetPrimaryKeysOperation( + session: Session, + catalogName: String, + schemaName: String, + tableName: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetCrossReferenceOperation( + session: Session, + primaryCatalog: String, + primarySchema: String, + primaryTable: String, + foreignCatalog: String, + foreignSchema: String, + foreignTable: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def getQueryId(operation: Operation): String = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newSetCurrentCatalogOperation(session: Session, catalog: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetCurrentCatalogOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newSetCurrentDatabaseOperation(session: Session, database: String): Operation = { + throw KyuubiSQLException.featureNotSupported() + } + + override def newGetCurrentDatabaseOperation(session: Session): Operation = { + throw KyuubiSQLException.featureNotSupported() + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala new file mode 100644 index 00000000000..754a519324f --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/operation/ExecuteStatement.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat.operation + +import org.apache.kyuubi.Logging +import org.apache.kyuubi.engine.chat.provider.ChatProvider +import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState} +import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.session.Session + +class ExecuteStatement( + session: Session, + override val statement: String, + override val shouldRunAsync: Boolean, + queryTimeout: Long, + chatProvider: ChatProvider) + extends ChatOperation(session) with Logging { + + private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle) + override def getOperationLog: Option[OperationLog] = Option(operationLog) + + override protected def runInternal(): Unit = { + addTimeoutMonitor(queryTimeout) + if (shouldRunAsync) { + val asyncOperation = new Runnable { + override def run(): Unit = { + executeStatement() + } + } + val chatSessionManager = session.sessionManager + val backgroundHandle = chatSessionManager.submitBackgroundOperation(asyncOperation) + setBackgroundHandle(backgroundHandle) + } else { + executeStatement() + } + } + + private def executeStatement(): Unit = { + setState(OperationState.RUNNING) + + try { + val reply = chatProvider.ask(session.handle.identifier.toString, statement) + iter = new ArrayFetchIterator(Array(Array(reply))) + + setState(OperationState.FINISHED) + } catch { + onError(true) + } finally { + shutdownTimeoutMonitor() + } + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatGPTProvider.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatGPTProvider.scala new file mode 100644 index 00000000000..f948eb1540d --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatGPTProvider.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.provider + +import java.util +import java.util.concurrent.TimeUnit + +import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} +import org.apache.http.HttpStatus +import org.apache.http.client.methods.HttpPost +import org.apache.http.entity.StringEntity +import org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder} +import org.apache.http.util.EntityUtils + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.engine.chat.provider.ChatProvider.mapper + +class ChatGPTProvider(conf: KyuubiConf) extends ChatProvider { + + val token = conf.get(KyuubiConf.ENGINE_CHAT_GPT_API_KEY).get + + val httpClient: CloseableHttpClient = HttpClientBuilder.create().build() + + private val chatHistory: LoadingCache[String, util.ArrayDeque[Message]] = + CacheBuilder.newBuilder() + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(new CacheLoader[String, util.ArrayDeque[Message]] { + override def load(sessionId: String): util.ArrayDeque[Message] = + new util.ArrayDeque[Message] + }) + + override def open(sessionId: String): Unit = { + chatHistory.getIfPresent(sessionId) + } + + override def ask(sessionId: String, q: String): String = { + val messages = chatHistory.get(sessionId) + messages.addLast(Message("user", q)) + + val request = new HttpPost("https://api.openai.com/v1/chat/completions") + request.addHeader("Content-Type", "application/json") + request.addHeader("Authorization", "Bearer " + token) + + val req = Map( + "messages" -> messages, + "model" -> "gpt-3.5-turbo", + "max_tokens" -> 200, + "temperature" -> 0.5, + "top_p" -> 1) + + request.setEntity(new StringEntity(mapper.writeValueAsString(req))) + val responseEntity = httpClient.execute(request) + val respJson = mapper.readTree(EntityUtils.toString(responseEntity.getEntity)) + val statusCode = responseEntity.getStatusLine.getStatusCode + if (responseEntity.getStatusLine.getStatusCode == HttpStatus.SC_OK) { + val replyMessage = mapper.treeToValue[Message]( + respJson.get("choices").get(0).get("message")) + messages.addLast(replyMessage) + replyMessage.content + } else { + messages.removeLast() + s"Chat failed. Status: $statusCode. ${respJson.get("error").get("message").asText}" + } + } + + override def close(sessionId: String): Unit = { + chatHistory.invalidate(sessionId) + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatProvider.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatProvider.scala new file mode 100644 index 00000000000..af1ba434bea --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/ChatProvider.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.provider + +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} + +import org.apache.kyuubi.{KyuubiException, Logging} +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.reflection.DynConstructors + +trait ChatProvider { + + def open(sessionId: String): Unit + + def ask(sessionId: String, q: String): String + + def close(sessionId: String): Unit +} + +object ChatProvider extends Logging { + + val mapper: ObjectMapper with ClassTagExtensions = + new ObjectMapper().registerModule(DefaultScalaModule) :: ClassTagExtensions + + def load(conf: KyuubiConf): ChatProvider = { + val groupProviderClass = conf.get(KyuubiConf.ENGINE_CHAT_PROVIDER) + try { + DynConstructors.builder(classOf[ChatProvider]) + .impl(groupProviderClass, classOf[KyuubiConf]) + .impl(groupProviderClass) + .buildChecked + .newInstanceChecked(conf) + } catch { + case _: ClassCastException => + throw new KyuubiException( + s"Class $groupProviderClass is not a child of '${classOf[ChatProvider].getName}'.") + case NonFatal(e) => + throw new IllegalArgumentException(s"Error while instantiating '$groupProviderClass': ", e) + } + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/EchoProvider.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/EchoProvider.scala new file mode 100644 index 00000000000..31ad3b8e390 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/EchoProvider.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.provider + +class EchoProvider extends ChatProvider { + + override def open(sessionId: String): Unit = {} + + override def ask(sessionId: String, q: String): String = + "This is ChatKyuubi, nice to meet you!" + + override def close(sessionId: String): Unit = {} +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/Message.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/Message.scala new file mode 100644 index 00000000000..e2162be9f1a --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/provider/Message.scala @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.provider + +case class Message(role: String, content: String) diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/RowSet.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/RowSet.scala new file mode 100644 index 00000000000..3bb4ba7dfa9 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/RowSet.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.schema + +import java.util + +import org.apache.hive.service.rpc.thrift._ + +import org.apache.kyuubi.util.RowSetUtils._ + +object RowSet { + + def emptyTRowSet(): TRowSet = { + new TRowSet(0, new java.util.ArrayList[TRow](0)) + } + + def toTRowSet( + rows: Seq[Array[String]], + columnSize: Int, + protocolVersion: TProtocolVersion): TRowSet = { + if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) { + toRowBasedSet(rows, columnSize) + } else { + toColumnBasedSet(rows, columnSize) + } + } + + def toRowBasedSet(rows: Seq[Array[String]], columnSize: Int): TRowSet = { + val rowSize = rows.length + val tRows = new java.util.ArrayList[TRow](rowSize) + var i = 0 + while (i < rowSize) { + val row = rows(i) + val tRow = new TRow() + var j = 0 + val columnSize = row.length + while (j < columnSize) { + val columnValue = stringTColumnValue(j, row) + tRow.addToColVals(columnValue) + j += 1 + } + i += 1 + tRows.add(tRow) + } + new TRowSet(0, tRows) + } + + def toColumnBasedSet(rows: Seq[Array[String]], columnSize: Int): TRowSet = { + val rowSize = rows.length + val tRowSet = new TRowSet(0, new util.ArrayList[TRow](rowSize)) + var i = 0 + while (i < columnSize) { + val tColumn = toTColumn(rows, i) + tRowSet.addToColumns(tColumn) + i += 1 + } + tRowSet + } + + private def toTColumn(rows: Seq[Array[String]], ordinal: Int): TColumn = { + val nulls = new java.util.BitSet() + val values = getOrSetAsNull[String](rows, ordinal, nulls, "") + TColumn.stringVal(new TStringColumn(values, nulls)) + } + + private def getOrSetAsNull[String]( + rows: Seq[Array[String]], + ordinal: Int, + nulls: util.BitSet, + defaultVal: String): util.List[String] = { + val size = rows.length + val ret = new util.ArrayList[String](size) + var idx = 0 + while (idx < size) { + val row = rows(idx) + val isNull = row(ordinal) == null + if (isNull) { + nulls.set(idx, true) + ret.add(idx, defaultVal) + } else { + ret.add(idx, row(ordinal)) + } + idx += 1 + } + ret + } + + private def stringTColumnValue(ordinal: Int, row: Array[String]): TColumnValue = { + val tStringValue = new TStringValue + if (row(ordinal) != null) tStringValue.setValue(row(ordinal)) + TColumnValue.stringVal(tStringValue) + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/SchemaHelper.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/SchemaHelper.scala new file mode 100644 index 00000000000..8ccfdda2fe9 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/schema/SchemaHelper.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.schema + +import java.util.Collections + +import org.apache.hive.service.rpc.thrift._ + +object SchemaHelper { + + def stringTTypeQualifiers: TTypeQualifiers = { + val ret = new TTypeQualifiers() + val qualifiers = Collections.emptyMap[String, TTypeQualifierValue]() + ret.setQualifiers(qualifiers) + ret + } + + def stringTTypeDesc: TTypeDesc = { + val typeEntry = new TPrimitiveTypeEntry(TTypeId.STRING_TYPE) + typeEntry.setTypeQualifiers(stringTTypeQualifiers) + val tTypeDesc = new TTypeDesc() + tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry)) + tTypeDesc + } + + def stringTColumnDesc(fieldName: String, pos: Int): TColumnDesc = { + val tColumnDesc = new TColumnDesc() + tColumnDesc.setColumnName(fieldName) + tColumnDesc.setTypeDesc(stringTTypeDesc) + tColumnDesc.setPosition(pos) + tColumnDesc + } + + def stringTTableSchema(fieldsName: String*): TTableSchema = { + val tTableSchema = new TTableSchema() + fieldsName.zipWithIndex.foreach { case (f, i) => + tTableSchema.addToColumns(stringTColumnDesc(f, i)) + } + tTableSchema + } +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionImpl.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionImpl.scala new file mode 100644 index 00000000000..29f42076822 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionImpl.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat.session + +import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion} + +import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY +import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager} + +class ChatSessionImpl( + protocol: TProtocolVersion, + user: String, + password: String, + ipAddress: String, + conf: Map[String, String], + sessionManager: SessionManager) + extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) { + + override val handle: SessionHandle = + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle()) + + private val chatProvider = sessionManager.asInstanceOf[ChatSessionManager].chatProvider + + override def open(): Unit = { + info(s"Starting to open chat session.") + chatProvider.open(handle.identifier.toString) + super.open() + info(s"The chat session is started.") + } + + override def getInfo(infoType: TGetInfoType): TGetInfoValue = withAcquireRelease() { + infoType match { + case TGetInfoType.CLI_SERVER_NAME | TGetInfoType.CLI_DBMS_NAME => + TGetInfoValue.stringValue("Kyuubi Chat Engine") + case TGetInfoType.CLI_DBMS_VER => + TGetInfoValue.stringValue(KYUUBI_VERSION) + case TGetInfoType.CLI_ODBC_KEYWORDS => TGetInfoValue.stringValue("Unimplemented") + case TGetInfoType.CLI_MAX_COLUMN_NAME_LEN => + TGetInfoValue.lenValue(128) + case TGetInfoType.CLI_MAX_SCHEMA_NAME_LEN => + TGetInfoValue.lenValue(128) + case TGetInfoType.CLI_MAX_TABLE_NAME_LEN => + TGetInfoValue.lenValue(128) + case _ => throw KyuubiSQLException(s"Unrecognized GetInfoType value: $infoType") + } + } + + override def close(): Unit = { + chatProvider.close(handle.identifier.toString) + super.close() + } + +} diff --git a/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionManager.scala b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionManager.scala new file mode 100644 index 00000000000..33a9dd45066 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/main/scala/org/apache/kyuubi/engine/chat/session/ChatSessionManager.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat.session + +import org.apache.hive.service.rpc.thrift.TProtocolVersion + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY +import org.apache.kyuubi.engine.ShareLevel +import org.apache.kyuubi.engine.chat.ChatEngine +import org.apache.kyuubi.engine.chat.operation.ChatOperationManager +import org.apache.kyuubi.engine.chat.provider.ChatProvider +import org.apache.kyuubi.operation.OperationManager +import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager} + +class ChatSessionManager(name: String) + extends SessionManager(name) { + + def this() = this(classOf[ChatSessionManager].getSimpleName) + + override protected def isServer: Boolean = false + + lazy val chatProvider: ChatProvider = ChatProvider.load(conf) + + override lazy val operationManager: OperationManager = + new ChatOperationManager(conf, chatProvider) + + override def initialize(conf: KyuubiConf): Unit = { + this.conf = conf + super.initialize(conf) + } + + override protected def createSession( + protocol: TProtocolVersion, + user: String, + password: String, + ipAddress: String, + conf: Map[String, String]): Session = { + conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID) + .flatMap(getSessionOption).getOrElse { + new ChatSessionImpl(protocol, user, password, ipAddress, conf, this) + } + } + + override def closeSession(sessionHandle: SessionHandle): Unit = { + super.closeSession(sessionHandle) + if (conf.get(ENGINE_SHARE_LEVEL) == ShareLevel.CONNECTION.toString) { + info("Session stopped due to shared level is Connection.") + stopSession() + } + } + + private def stopSession(): Unit = { + ChatEngine.currentEngine.foreach(_.stop()) + } +} diff --git a/externals/kyuubi-chat-engine/src/test/resources/log4j2-test.xml b/externals/kyuubi-chat-engine/src/test/resources/log4j2-test.xml new file mode 100644 index 00000000000..585a12c6f99 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/test/resources/log4j2-test.xml @@ -0,0 +1,42 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/WithChatEngine.scala b/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/WithChatEngine.scala new file mode 100644 index 00000000000..287fdde2fb5 --- /dev/null +++ b/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/WithChatEngine.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.engine.chat + +import org.apache.kyuubi.KyuubiFunSuite +import org.apache.kyuubi.config.KyuubiConf + +trait WithChatEngine extends KyuubiFunSuite { + + protected var engine: ChatEngine = _ + protected var connectionUrl: String = _ + + protected val kyuubiConf: KyuubiConf = ChatEngine.kyuubiConf + + def withKyuubiConf: Map[String, String] + + override def beforeAll(): Unit = { + super.beforeAll() + startChatEngine() + } + + override def afterAll(): Unit = { + stopChatEngine() + super.afterAll() + } + + def stopChatEngine(): Unit = { + if (engine != null) { + engine.stop() + engine = null + } + } + + def startChatEngine(): Unit = { + withKyuubiConf.foreach { case (k, v) => + System.setProperty(k, v) + kyuubiConf.set(k, v) + } + ChatEngine.startEngine() + engine = ChatEngine.currentEngine.get + connectionUrl = engine.frontendServices.head.connectionUrl + } + + protected def jdbcConnectionUrl: String = s"jdbc:hive2://$connectionUrl/;" + +} diff --git a/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationSuite.scala b/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationSuite.scala new file mode 100644 index 00000000000..b14407a267b --- /dev/null +++ b/externals/kyuubi-chat-engine/src/test/scala/org/apache/kyuubi/engine/chat/operation/ChatOperationSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat.operation + +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.chat.WithChatEngine +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class ChatOperationSuite extends HiveJDBCTestHelper with WithChatEngine { + + override def withKyuubiConf: Map[String, String] = Map( + ENGINE_CHAT_PROVIDER.key -> "echo") + + override protected def jdbcUrl: String = jdbcConnectionUrl + + test("test echo chat provider") { + withJdbcStatement() { stmt => + val result = stmt.executeQuery("Hello, Kyuubi") + assert(result.next()) + val expected = "This is ChatKyuubi, nice to meet you!" + assert(result.getString("reply") === expected) + assert(!result.next()) + } + } +} diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 1b7737344bf..b39bee307e6 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1817,6 +1817,7 @@ object KyuubiConf { " all the capacity of the Hive Server2." + "
  • JDBC: specify this engine type will launch a JDBC engine which can provide" + " a MySQL protocol connector, for now we only support Doris dialect.
  • " + + "
  • CHAT: specify this engine type will launch a Chat engine.
  • " + "") .version("1.4.0") .stringConf @@ -2621,6 +2622,51 @@ object KyuubiConf { Map(configs.map { cfg => cfg.key -> cfg }: _*) } + val ENGINE_CHAT_MEMORY: ConfigEntry[String] = + buildConf("kyuubi.engine.chat.memory") + .doc("The heap memory for the Chat engine") + .version("1.8.0") + .stringConf + .createWithDefault("1g") + + val ENGINE_CHAT_JAVA_OPTIONS: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.chat.java.options") + .doc("The extra Java options for the Chat engine") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_CHAT_PROVIDER: ConfigEntry[String] = + buildConf("kyuubi.engine.chat.provider") + .doc("The provider for the Chat engine. Candidates:
      " + + "
    • ECHO: simply replies a welcome message.
    • " + + "
    • GPT: a.k.a ChatGPT, powered by OpenAI.
    • " + + "
    ") + .version("1.8.0") + .stringConf + .transform { + case "ECHO" | "echo" => "org.apache.kyuubi.engine.chat.provider.EchoProvider" + case "GPT" | "gpt" | "ChatGPT" => "org.apache.kyuubi.engine.chat.provider.ChatGPTProvider" + case other => other + } + .createWithDefault("ECHO") + + val ENGINE_CHAT_GPT_API_KEY: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.chat.gpt.apiKey") + .doc("The key to access OpenAI open API, which could be got at " + + "https://platform.openai.com/account/api-keys") + .version("1.8.0") + .stringConf + .createOptional + + val ENGINE_CHAT_EXTRA_CLASSPATH: OptionalConfigEntry[String] = + buildConf("kyuubi.engine.chat.extra.classpath") + .doc("The extra classpath for the Chat engine, for configuring the location " + + "of the SDK and etc.") + .version("1.8.0") + .stringConf + .createOptional + val ENGINE_JDBC_MEMORY: ConfigEntry[String] = buildConf("kyuubi.engine.jdbc.memory") .doc("The heap memory for the JDBC query engine") diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala index 88680a8c757..3d850ba14f5 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/EngineType.scala @@ -23,5 +23,5 @@ package org.apache.kyuubi.engine object EngineType extends Enumeration { type EngineType = Value - val SPARK_SQL, FLINK_SQL, TRINO, HIVE_SQL, JDBC = Value + val SPARK_SQL, FLINK_SQL, CHAT, TRINO, HIVE_SQL, JDBC = Value } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index e2ddb422154..5f3af28f8e6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -29,8 +29,9 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY -import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, HIVE_SQL, JDBC, SPARK_SQL, TRINO} +import org.apache.kyuubi.engine.EngineType._ import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel} +import org.apache.kyuubi.engine.chat.ChatProcessBuilder import org.apache.kyuubi.engine.flink.FlinkProcessBuilder import org.apache.kyuubi.engine.hive.HiveProcessBuilder import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder @@ -192,6 +193,8 @@ private[kyuubi] class EngineRef( new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog) case JDBC => new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog) + case CHAT => + new ChatProcessBuilder(appUser, conf, engineRefId, extraEngineLog) } MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala new file mode 100644 index 00000000000..532c5dddfa8 --- /dev/null +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.engine.chat + +import java.io.File +import java.nio.file.{Files, Paths} +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.google.common.annotations.VisibleForTesting + +import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils} +import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY +import org.apache.kyuubi.engine.ProcBuilder +import org.apache.kyuubi.operation.log.OperationLog + +class ChatProcessBuilder( + override val proxyUser: String, + override val conf: KyuubiConf, + val engineRefId: String, + val extraEngineLog: Option[OperationLog] = None) + extends ProcBuilder with Logging { + + @VisibleForTesting + def this(proxyUser: String, conf: KyuubiConf) { + this(proxyUser, conf, "") + } + + /** + * The short name of the engine process builder, we use this for form the engine jar paths now + * see `mainResource` + */ + override def shortName: String = "chat" + + override protected def module: String = "kyuubi-chat-engine" + + /** + * The class containing the main method + */ + override protected def mainClass: String = "org.apache.kyuubi.engine.chat.ChatEngine" + + override protected val commands: Array[String] = { + val buffer = new ArrayBuffer[String]() + buffer += executable + + val memory = conf.get(ENGINE_CHAT_MEMORY) + buffer += s"-Xmx$memory" + + val javaOptions = conf.get(ENGINE_CHAT_JAVA_OPTIONS) + javaOptions.foreach(buffer += _) + + buffer += "-cp" + val classpathEntries = new util.LinkedHashSet[String] + mainResource.foreach(classpathEntries.add) + mainResource.foreach { path => + val parent = Paths.get(path).getParent + val chatDevDepDir = parent + .resolve(s"scala-$SCALA_COMPILE_VERSION") + .resolve("jars") + if (Files.exists(chatDevDepDir)) { + // add dev classpath + classpathEntries.add(s"$chatDevDepDir${File.separator}*") + } else { + // add prod classpath + classpathEntries.add(s"$parent${File.separator}*") + } + } + + val extraCp = conf.get(ENGINE_CHAT_EXTRA_CLASSPATH) + extraCp.foreach(classpathEntries.add) + buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer += mainClass + + buffer += "--conf" + buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + + for ((k, v) <- conf.getAll) { + buffer += "--conf" + buffer += s"$k=$v" + } + buffer.toArray + } + + override def toString: String = { + if (commands == null) { + super.toString() + } else { + Utils.redactCommandLineArgs(conf, commands).map { + case arg if arg.contains(ENGINE_CHAT_GPT_API_KEY.key) => + s"${ENGINE_CHAT_GPT_API_KEY.key}=$REDACTION_REPLACEMENT_TEXT" + case arg => arg + }.mkString("\n") + } + } +} diff --git a/pom.xml b/pom.xml index 2082499dc29..a2de5d29de3 100644 --- a/pom.xml +++ b/pom.xml @@ -60,10 +60,11 @@ extensions/spark/kyuubi-spark-connector-tpcds extensions/spark/kyuubi-spark-connector-tpch extensions/spark/kyuubi-spark-lineage + externals/kyuubi-chat-engine externals/kyuubi-download externals/kyuubi-flink-sql-engine - externals/kyuubi-jdbc-engine externals/kyuubi-hive-sql-engine + externals/kyuubi-jdbc-engine externals/kyuubi-spark-sql-engine externals/kyuubi-trino-engine integration-tests