Skip to content

Commit

Permalink
[KYUUBI apache#4544] Initial implement Kyuubi Chat Engine
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Introduce a brand new CHAT engine, it's supposed to support different backends, e.g. ChatGPT, 文心一言, etc.

This PR implements the following providers:

- ECHO, simply replies a welcome message.
- GPT: a.k.a ChatGPT, powered by OpenAI, which requires a API key for authentication. https://platform.openai.com/account/api-keys

Add the following configurations in `kyuubi-defaults.conf`
```
kyuubi.engine.chat.provider=[ECHO|GPT]
kyuubi.engine.chat.gpt.apiKey=<chat-gpt-api-key>
```

Open an ECHO beeline chat engine.
```
beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=CHAT;kyuubi.engine.chat.provider=ECHO'
```

```
Connecting to jdbc:hive2://localhost:10009/
Connected to: Kyuubi Chat Engine (version 1.8.0-SNAPSHOT)
Driver: Kyuubi Project Hive JDBC Client (version 1.7.0)
Beeline version 1.7.0 by Apache Kyuubi
0: jdbc:hive2://localhost:10009/> Hello, Kyuubi!;
+----------------------------------------+
|                 reply                  |
+----------------------------------------+
| This is ChatKyuubi, nice to meet you!  |
+----------------------------------------+
1 row selected (0.397 seconds)
```

Open a ChatGPT beeline chat engine. (make sure your network can connect the open API and configure the API key)
```
beeline -u 'jdbc:hive2://localhost:10009/?kyuubi.engine.type=CHAT;kyuubi.engine.chat.provider=GPT'
```

<img width="1109" alt="image" src="https://user-images.githubusercontent.com/26535726/225813625-a002e6e2-3b0d-4194-b061-2e215d58ba94.png">

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes apache#4544 from pan3793/chatgpt.

Closes apache#4544

87bdebb [Cheng Pan] nit
f7dee18 [Cheng Pan] Update docs
9beb551 [cxzl25] chat api (nexr#1)
af38bdc [Cheng Pan] update docs
9aa6d83 [Cheng Pan] Initial implement Kyuubi Chat Engine

Lead-authored-by: Cheng Pan <chengpan@apache.org>
Co-authored-by: cxzl25 <cxzl25@users.noreply.github.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
pan3793 and cxzl25 committed Mar 17, 2023
1 parent 7709cd1 commit f1eb449
Show file tree
Hide file tree
Showing 25 changed files with 1,409 additions and 57 deletions.
13 changes: 13 additions & 0 deletions build/dist
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ mkdir -p "$DISTDIR/externals/engines/spark"
mkdir -p "$DISTDIR/externals/engines/trino"
mkdir -p "$DISTDIR/externals/engines/hive"
mkdir -p "$DISTDIR/externals/engines/jdbc"
mkdir -p "$DISTDIR/externals/engines/chat"
echo "Kyuubi $VERSION $GITREVSTRING built for" > "$DISTDIR/RELEASE"
echo "Java $JAVA_VERSION" >> "$DISTDIR/RELEASE"
echo "Scala $SCALA_VERSION" >> "$DISTDIR/RELEASE"
Expand Down Expand Up @@ -313,6 +314,18 @@ for jar in $(ls "$DISTDIR/jars/"); do
fi
done

# Copy chat engines
cp "$KYUUBI_HOME/externals/kyuubi-chat-engine/target/kyuubi-chat-engine_${SCALA_VERSION}-${VERSION}.jar" "$DISTDIR/externals/engines/chat/"
cp -r "$KYUUBI_HOME"/externals/kyuubi-chat-engine/target/scala-$SCALA_VERSION/jars/*.jar "$DISTDIR/externals/engines/chat/"

# Share the jars w/ server to reduce binary size
# shellcheck disable=SC2045
for jar in $(ls "$DISTDIR/jars/"); do
if [[ -f "$DISTDIR/externals/engines/chat/$jar" ]]; then
(cd $DISTDIR/externals/engines/chat; ln -snf "../../../jars/$jar" "$DISTDIR/externals/engines/chat/$jar")
fi
done

# Copy kyuubi tools
if [[ -f "$KYUUBI_HOME/tools/spark-block-cleaner/target/spark-block-cleaner_${SCALA_VERSION}-${VERSION}.jar" ]]; then
mkdir -p "$DISTDIR/tools/spark-block-cleaner/kubernetes"
Expand Down
113 changes: 59 additions & 54 deletions docs/deployment/settings.md

Large diffs are not rendered by default.

89 changes: 89 additions & 0 deletions externals/kyuubi-chat-engine/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-parent</artifactId>
<version>1.8.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>kyuubi-chat-engine_2.12</artifactId>
<packaging>jar</packaging>
<name>Kyuubi Project Engine Chat</name>
<url>https://kyuubi.apache.org/</url>

<dependencies>
<!-- kyuubi dependency -->
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>${hive.jdbc.artifact}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<id>prepare-test-jar</id>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import org.apache.kyuubi.engine.chat.session.ChatSessionManager
import org.apache.kyuubi.service.AbstractBackendService
import org.apache.kyuubi.session.SessionManager

class ChatBackendService
extends AbstractBackendService("ChatBackendService") {

override val sessionManager: SessionManager = new ChatSessionManager()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import ChatEngine.currentEngine

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, JDBC_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister

class ChatEngine extends Serverable("ChatEngine") {

override val backendService = new ChatBackendService()
override val frontendServices = Seq(new ChatTBinaryFrontendService(this))

override def start(): Unit = {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => {
currentEngine.foreach(_.stop())
})
}

override protected def stopServer(): Unit = {}
}

object ChatEngine extends Logging {

val kyuubiConf: KyuubiConf = KyuubiConf()

var currentEngine: Option[ChatEngine] = None

def startEngine(): Unit = {
currentEngine = Some(new ChatEngine())
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
engine.start()
addShutdownHook(
() => {
engine.stop()
},
JDBC_ENGINE_SHUTDOWN_PRIORITY + 1)
}
}

def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)

try {
Utils.fromCommandLineArgs(args, kyuubiConf)
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
kyuubiConf.setIfMissing(HA_ZK_CONN_RETRY_POLICY, RetryPolicies.N_TIME.toString)

startEngine()
} catch {
case t: Throwable if currentEngine.isDefined =>
currentEngine.foreach { engine =>
engine.stop()
}
error("Failed to create Chat Engine", t)
throw t
case t: Throwable =>
error("Failed to create Chat Engine.", t)
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat

import org.apache.kyuubi.ha.client.{EngineServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service, TBinaryFrontendService}

class ChatTBinaryFrontendService(override val serverable: Serverable)
extends TBinaryFrontendService("ChatTBinaryFrontend") {

/**
* An optional `ServiceDiscovery` for [[FrontendService]] to expose itself
*/
override lazy val discoveryService: Option[Service] =
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
Some(new EngineServiceDiscovery(this))
} else {
None
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.chat.operation

import org.apache.hive.service.rpc.thrift._

import org.apache.kyuubi.{KyuubiSQLException, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.chat.schema.{RowSet, SchemaHelper}
import org.apache.kyuubi.operation.{AbstractOperation, FetchIterator, OperationState}
import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
import org.apache.kyuubi.session.Session

abstract class ChatOperation(session: Session) extends AbstractOperation(session) {

protected var iter: FetchIterator[Array[String]] = _

protected lazy val conf: KyuubiConf = session.sessionManager.getConf

override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
order match {
case FETCH_NEXT =>
iter.fetchNext()
case FETCH_PRIOR =>
iter.fetchPrior(rowSetSize)
case FETCH_FIRST =>
iter.fetchAbsolute(0)
}

val taken = iter.take(rowSetSize)
val resultRowSet = RowSet.toTRowSet(taken.toSeq, 1, getProtocolVersion)
resultRowSet.setStartRowOffset(iter.getPosition)
resultRowSet
}

override def cancel(): Unit = {
cleanup(OperationState.CANCELED)
}

override def close(): Unit = {
cleanup(OperationState.CLOSED)
}

protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
// We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
// could be thrown.
case e: Throwable =>
state.synchronized {
val errMsg = Utils.stringifyException(e)
if (state == OperationState.TIMEOUT) {
val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
setOperationException(ke)
throw ke
} else if (isTerminalState(state)) {
setOperationException(KyuubiSQLException(errMsg))
warn(s"Ignore exception in terminal state with $statementId: $errMsg")
} else {
error(s"Error operating $opType: $errMsg", e)
val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
setOperationException(ke)
setState(OperationState.ERROR)
throw ke
}
}
}

override protected def beforeRun(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true)
}

override protected def afterRun(): Unit = {}

override def getResultSetMetadata: TGetResultSetMetadataResp = {
val tTableSchema = SchemaHelper.stringTTableSchema("reply")
val resp = new TGetResultSetMetadataResp
resp.setSchema(tTableSchema)
resp.setStatus(OK_STATUS)
resp
}

override def shouldRunAsync: Boolean = false
}
Loading

0 comments on commit f1eb449

Please sign in to comment.