Skip to content

Commit

Permalink
[ 💥 ] The script supports multi-language (python, sql, sh) developmen…
Browse files Browse the repository at this point in the history
…t mode. #10
  • Loading branch information
bebee4java committed Feb 1, 2021
1 parent 076905e commit c0283db
Show file tree
Hide file tree
Showing 33 changed files with 5,211 additions and 3,208 deletions.
8 changes: 4 additions & 4 deletions core/src/main/java/tech/ides/core/ScriptQueryExecute.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tech.ides.core

import ides.dsl.parser.{IdesDslBaseListener, IdesDslLexer, IdesDslParser}
import ides.dsl.parser._
import org.antlr.v4.runtime.tree.ParseTreeWalker
import org.antlr.v4.runtime.CommonTokenStream
import tech.ides.dsl.listener.ScriptQueryExecListener
Expand Down Expand Up @@ -77,11 +77,11 @@ object ScriptQueryExecute extends Logging {
}


private def parse(script: String, listener: IdesDslBaseListener): Unit = {
private def parse(script: String, listener: IdesParserBaseListener): Unit = {
val charStream = new CaseChangeCharStream(script)
val idesDslLexer = new IdesDslLexer(charStream)
val idesDslLexer = new IdesLexer(charStream)
val tokenStream = new CommonTokenStream(idesDslLexer)
val parser = new IdesDslParser(tokenStream)
val parser = new IdesParser(tokenStream)

// add syntax error listener
parser.addErrorListener(new SyntaxErrorListener)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/tech/ides/dsl/adaptor/LoadAdaptor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ides.dsl.adaptor

import ides.dsl.parser.IdesDslParser
import ides.dsl.parser.IdesDslParser.LoadContext
import ides.dsl.parser.IdesParser
import ides.dsl.parser.IdesParser.LoadContext
import org.apache.spark.sql.{DataFrame, DataFrameReader}
import tech.ides.core.ScriptQueryExecute
import tech.ides.datasource.{DataSourceConfig, DataSourceFactory}
Expand All @@ -15,7 +15,7 @@ import tech.ides.dsl.utils.DslUtil._
*/
case class LoadAdaptor(scriptQueryExecListener: ScriptQueryExecListener) extends ScriptDslAdaptor {

override def parse(context: IdesDslParser.QueryContext): SqlStatement = {
override def parse(context: IdesParser.QueryContext): SqlStatement = {

val loadContext = context.asInstanceOf[LoadContext]

Expand All @@ -31,7 +31,7 @@ case class LoadAdaptor(scriptQueryExecListener: ScriptQueryExecListener) extends
LoadSqlStatement(sql, format, path, options, tableName)
}

override def enterContext(context: IdesDslParser.QueryContext): Unit = {
override def enterContext(context: IdesParser.QueryContext): Unit = {
val LoadSqlStatement(_, format, path, options, tableName) = parse(context)
val sparkSession = scriptQueryExecListener.sparkSession
val reader = sparkSession.read
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/tech/ides/dsl/adaptor/SaveAdaptor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package tech.ides.dsl.adaptor

import java.util.UUID

import ides.dsl.parser.IdesDslParser
import ides.dsl.parser.IdesDslParser.SaveContext
import ides.dsl.parser.IdesParser
import ides.dsl.parser.IdesParser.SaveContext
import org.apache.spark.sql._
import tech.ides.constants.ScriptConstants.PARTITION_BY_COL
import tech.ides.core.ScriptQueryExecute
import tech.ides.datasource.{DataSinkConfig, DataSourceFactory}
import tech.ides.dsl.listener.ScriptQueryExecListener
import tech.ides.dsl.statement.{SaveSqlStatement, SqlStatement}
import tech.ides.dsl.utils.DslUtil.{cleanStr, currentText, resourceRealPath, whereExpressionsToMap, parseAssetName}
import tech.ides.dsl.utils.DslUtil.{cleanStr, currentText, parseAssetName, resourceRealPath, whereExpressionsToMap}
import tech.ides.job.ScriptJobManager

import scala.collection.mutable.ListBuffer
Expand All @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
*/
case class SaveAdaptor(scriptQueryExecListener: ScriptQueryExecListener) extends ScriptDslAdaptor {

override def parse(context: IdesDslParser.QueryContext): SqlStatement = {
override def parse(context: IdesParser.QueryContext): SqlStatement = {
val saveContext = context.asInstanceOf[SaveContext]

val sql = currentText(saveContext)
Expand Down Expand Up @@ -58,7 +58,7 @@ case class SaveAdaptor(scriptQueryExecListener: ScriptQueryExecListener) extends

}

override def enterContext(context: IdesDslParser.QueryContext): Unit = {
override def enterContext(context: IdesParser.QueryContext): Unit = {
val SaveSqlStatement(_, inputTableName, saveMode, format, path, options, partitionByCol) = parse(context)

val spark = scriptQueryExecListener.sparkSession
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package tech.ides.dsl.adaptor

import ides.dsl.parser.IdesDslParser.QueryContext
import ides.dsl.parser.IdesParser.QueryContext
import tech.ides.dsl.listener.ScriptQueryExecListener
import tech.ides.dsl.statement.SqlStatement

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/java/tech/ides/dsl/adaptor/SelectAdaptor.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ides.dsl.adaptor

import ides.dsl.parser.IdesDslParser
import ides.dsl.parser.IdesDslParser.SelectContext
import ides.dsl.parser.IdesParser
import ides.dsl.parser.IdesParser.SelectContext
import tech.ides.dsl.listener.ScriptQueryExecListener
import tech.ides.dsl.statement.{SelectSqlStatement, SqlStatement}
import tech.ides.dsl.utils.DslUtil.{currentText, parseAssetName}
Expand All @@ -11,7 +11,7 @@ import tech.ides.dsl.utils.DslUtil.{currentText, parseAssetName}
* Created by songgr on 2020/11/06.
*/
case class SelectAdaptor(scriptQueryExecListener: ScriptQueryExecListener) extends ScriptDslAdaptor {
override def parse(context: IdesDslParser.QueryContext): SqlStatement = {
override def parse(context: IdesParser.QueryContext): SqlStatement = {

val selectContext = context.asInstanceOf[SelectContext]
val sql = currentText(selectContext)
Expand All @@ -21,7 +21,7 @@ case class SelectAdaptor(scriptQueryExecListener: ScriptQueryExecListener) exten
SelectSqlStatement(sql, tableName)
}

override def enterContext(context: IdesDslParser.QueryContext): Unit = {
override def enterContext(context: IdesParser.QueryContext): Unit = {

val SelectSqlStatement(_sql, tableName) = parse(context)
val sparkSession = scriptQueryExecListener.sparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package tech.ides.dsl.listener

import java.util.concurrent.atomic.AtomicReference

import ides.dsl.parser.{IdesDslBaseListener, IdesDslParser}
import ides.dsl.parser.{IdesParser, IdesParserBaseListener}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.SparkSession
import tech.ides.core.ScriptStage
Expand All @@ -16,7 +16,7 @@ import scala.collection.mutable
* 脚本执行监听器类
* Created by songgr on 2020/10/28.
*/
class ScriptQueryExecListener(val sparkSession: SparkSession, val defaultPathPrefix:String, val owner:String) extends IdesDslBaseListener with Logging {
class ScriptQueryExecListener(val sparkSession: SparkSession, val defaultPathPrefix:String, val owner:String) extends IdesParserBaseListener with Logging {

logInfo(s"create ScriptQueryExecListener for $owner.")

Expand Down Expand Up @@ -69,31 +69,122 @@ class ScriptQueryExecListener(val sparkSession: SparkSession, val defaultPathPre
/**
* 整个脚本的context
*/
override def exitStatement(ctx: IdesDslParser.StatementContext): Unit = {}
override def exitStatement(ctx: IdesParser.StatementContext): Unit = {}

/**
* 单条sql的context
* python 代码的context
*
* example:
* %python
* # use table
* a=1
* print(a)
* % > output
*/
override def exitSql(ctx: IdesDslParser.SqlContext): Unit = {}
override def exitPy(ctx: IdesParser.PyContext): Unit = {
// todo 执行python代码
val pyMode = ctx.getStart.getText
val bracket_l = pyMode.indexOf('(')
val bracket_r = pyMode.indexOf(')')

val table = if ( bracket_l > 0 && bracket_r > bracket_l + 1) {
Some( pyMode.substring(bracket_l + 1, bracket_r) )
} else None

// todo table需要format
if (table.isDefined)
println("input table: " + table.get)

val context = ctx.pythonCode()
val pys = context.pyStatement()
println("total line: " + pys.size())
val s = context.getText
println("py: \n" + s)

// todo table需要format
if (ctx.outTable() != null) {
val tb = ctx.outTable().assetName()
println("py output: \n" + tb.getText)
}

}

/**
* sql脚本(jdbc语句)的context
* example:
* %sql
* # use table
* select 1 from test;
* % > output
*/
override def exitSql(ctx: IdesParser.SqlContext): Unit = {
// todo 执行sql代码
val sqlMode = ctx.getStart.getText
val bracket_l = sqlMode.indexOf('(')
val bracket_r = sqlMode.indexOf(')')

val connect = if ( bracket_l > 0 && bracket_r > bracket_l + 1) {
Some( sqlMode.substring(bracket_l + 1, bracket_r) )
} else None

// todo connect需要format
if (connect.isDefined)
println("sql connect: " + connect.get)

val context = ctx.sqlCode()
val sqls = context.sqlStatement()
println("total line: " + sqls.size())
val s = context.getText
println("sql: \n" + s)

// todo table需要format
if (ctx.outTable() != null) {
val tb = ctx.outTable().assetName()
println("sql output: \n" + tb.getText)
}
}


/**
* shell脚本的context
* example:
* %sh
* # test
* ls -las /;
* % > output
*/
override def exitSh(ctx: IdesParser.ShContext): Unit = {
val context = ctx.shellCode()
val shs = context.shellStatement()
println("total line: " + shs.size())
val s = context.getText
println("shell: \n" + s)

// todo table需要format
if (ctx.outTable() != null) {
val tb = ctx.outTable().assetName()
println("shell output: \n" + tb.getText)
}
}

/**
* load语句的context
*/
override def exitLoad(ctx: IdesDslParser.LoadContext): Unit = {
override def exitLoad(ctx: IdesParser.LoadContext): Unit = {
LoadAdaptor(this).enterContext(ctx)
}

/**
* select语句的context
*/
override def exitSelect(ctx: IdesDslParser.SelectContext): Unit = {
override def exitSelect(ctx: IdesParser.SelectContext): Unit = {
SelectAdaptor(this).enterContext(ctx)
}

/**
* save语句的context
*/
override def exitSave(ctx: IdesDslParser.SaveContext): Unit = {
override def exitSave(ctx: IdesParser.SaveContext): Unit = {
SaveAdaptor(this).enterContext(ctx)
}
}
16 changes: 8 additions & 8 deletions core/src/main/java/tech/ides/dsl/utils/DslUtil.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package tech.ides.dsl.utils

import ides.dsl.parser.{IdesDslLexer, IdesDslParser}
import ides.dsl.parser.IdesDslParser._
import ides.dsl.parser.{IdesLexer, IdesParser}
import ides.dsl.parser.IdesParser._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.misc.Interval
import org.antlr.v4.runtime.tree.TerminalNode
Expand All @@ -18,7 +18,7 @@ object DslUtil {

def currentText(ctx: QueryContext):String = {
if ( ctx == null ) return null
val input = ctx.start.getTokenSource.asInstanceOf[IdesDslLexer]._input
val input = ctx.start.getTokenSource.asInstanceOf[IdesLexer]._input

val start = ctx.start.getStartIndex
val stop = ctx.stop.getStopIndex
Expand Down Expand Up @@ -50,13 +50,13 @@ object DslUtil {
}

tTpye match {
case IdesDslParser.MUMERIC |
IdesDslParser.IDENTIFIER
case IdesParser.MUMERIC |
IdesParser.IDENTIFIER
=> pt.getText
case
IdesDslParser.STRING_TEXT |
IdesDslParser.BLOCK_STRING_TEXT |
IdesDslParser.QUOTED_TEXT
IdesParser.STRING_TEXT |
IdesParser.BLOCK_STRING_TEXT |
IdesParser.QUOTED_TEXT
=> cleanStr(pt.getText)
case _
=> pt.getText
Expand Down
22 changes: 12 additions & 10 deletions core/src/test/java/tech/ides/core/test/ListenerTest.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package tech.ides.core.test;

import ides.dsl.parser.IdesDslBaseListener;
import ides.dsl.parser.IdesDslLexer;
import ides.dsl.parser.IdesDslParser;
import ides.dsl.parser.*;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CodePointCharStream;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import tech.ides.dsl.CaseChangeCharStream;
Expand All @@ -19,22 +15,28 @@
public class ListenerTest {
public static void main(String[] args) throws IOException {
// String expr = "load hive.`a.bc` where a.aa.a=1 and a.b = 's' and a='''ssdsde.sdede''' as table1;";
String expr = "select 1 AS \n" +
String expr = "%py \n" +
"print('>')" +
"exit 1 " +
"\n%\n" +
"> abc\n" +
"select 1 AS \n" +
"\n" +
"\n" +
"\n" +
"\n" +
"\n" +
" Tb1";
" Tb1;";
System.out.println(expr);
CharStream cpcs = new CaseChangeCharStream(expr);
IdesDslLexer idesDslLexer = new IdesDslLexer(cpcs);
IdesLexer idesDslLexer = new IdesLexer(cpcs);

CommonTokenStream tokenStream = new CommonTokenStream(idesDslLexer);

IdesDslParser parser = new IdesDslParser(tokenStream);
IdesParser parser = new IdesParser(tokenStream);
ScriptQueryExecListener listener = new ScriptQueryExecListener(null, "", "test");

IdesDslParser.StatementContext statement = parser.statement();
IdesParser.StatementContext statement = parser.statement();

ParseTreeWalker.DEFAULT.walk(listener, statement);
}
Expand Down
Loading

0 comments on commit c0283db

Please sign in to comment.