Skip to content

Latest commit

 

History

History
282 lines (190 loc) · 10.4 KB

spark_repl.markdown

File metadata and controls

282 lines (190 loc) · 10.4 KB

spark repl 图解(v0.1)

author: 玄畅 时金魁 2016.1.12

本文基于spark master代码分析, 当前最新的spark版本为:2.0.0-SNAPSHOT, 最新commit id: 8cfa218, Commits on Jan 12, 2016

overview


repl: Read! Eval! Print! Loop.., 顾名思义就是: 读取输入-求值-打印,无限循环上述过程。

jline2是一个java实现的repl, 有个example,可以感受下是repl怎么回事。

spark repl鲜有人说,大概因为repl是非必需品,在生产和调试spark时几乎用不到repl。在刚接触spark时,跑一下 Spark Examples时, 一般会直接在spark-shell里跑一下样例。

下文就是从spark-shell入口剖析下spark repl的运行路径。

used by


用到repl的应用:

  1. spark shell
  2. hue livy
  3. spark-notebook

适用于spark交互式场景, 操作界面一般是notebook, 在web上写spark代码, 直接在web上运行,输出结果。

为什么notebook比较受欢迎?
配置好集群后,直接通过web界面(notebook)运行spark作业,渲染输出结果,特别适用于作业调试,方便、快速。算法工程师焦点于job作业,而不需要关心底下的spark集群。

full graph


entrance


bin/spark-shell文件作为研究spark repl的入口。

export SPARK_SUBMIT_OPTS
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"

bin/spark-shellbin/spark-submit提交main为org.apache.spark.repl.Main的scala object。Main是spark-repl包里的类,所以不需要添加jar,spark自带的。

bin/spark-submit

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

/bin/spark-class负责读取环境配置,所需的jar, 然后执行org.apache.spark.deploy.SparkSubmit.main(), SparkSubmit会:

  1. 生成指定的类加载器
  2. 把jar添加到classpath
  3. 设置系统属性 System.setProperty
  4. --class指定的类, invoke it

经过前面的spark运行环境准备工作,后面进入到org.apache.spark.repl.Main.main(), 这个Main对象就是repl的入口了。

repl


org.apache.spark.repl.Main.main()函数很简单, new一个SparkILoop对象, 调用它的process()函数。

SparkILoop是repl处理输入-求值-打印的主要地方.

一个repl的过程大致有以下4个步骤:

  1. 读取控制台输入
  2. 编译输入的代码, 生成AST
  3. apply, 执行编译后的字节码
  4. 输出结果

SparkILoop对象有两个关键的成员变量:

  1. intp: SparkIMain 解释器
  2. in: InteractiveReader 控制台输入reader

一个读取用户输入, 一个解释执行输入的代码,打印结果。

step 1:

第一个进入的是process()函数, 这个主要是把输入参数转化成SparkCommandLine对象, 如果输入参数不是帮助说明参数, 进入process(command.settings)函数。

  /** process command-line arguments and do as they request */
  def process(args: Array[String]): Boolean = {
    val command = new SparkCommandLine(args.toList, msg => echo(msg))
    def neededHelp(): String =
      (if (command.settings.help.value) command.usageMsg + "\n" else "") +
      (if (command.settings.Xhelp.value) command.xusageMsg + "\n" else "")

    // if they asked for no help and command is valid, we call the real main
    neededHelp() match {
      case ""     => command.ok && process(command.settings)
      case help   => echoNoNL(help) ; true
    }
  }

step 2:

  1. 创建解释器

    process(command.settings)函数,首先会检查master是否是yarn-client,如果是则设置系统变量SPARK_YARN_MODE=true, 标示当前是yarn模式。

    接着会执行函数createInterpreter()创建一个解释器对象, 这个解释器会准备基本的运行环境。后续用户输入的代码,都是通过这个解释器执行。

    查询是否设置了系统变量spark.jars,如果不存在则读取另外一个系统变量ADD_JARS是否存在。由此得到一个逗号,分割的jar列表, 解析每个jar的URL, 添加到classpath中。

    构建一个解释器对象SparkILoopInterpreter, 赋值给SparkILoop的变量intp

    解释器继承自InteractiveReader, 固定的生命周期为:

    val interactive: Boolean
    def init(): Unit
    def reset(): Unit
    def history: History
    def completion: Completion
    def eraseLine(): Unit
    def redrawLine(): Unit
    def currentLine: String
    def readYesOrNo(prompt: String, alt: => Boolean): Boolean
    def readAssumingNo(prompt: String)
    def readAssumingYes(prompt: String) 
    def readLine(prompt: String): String
    

    规范见这里

    2.1 创建解释器:

  2. 创建控制台输入reader

    有两类reader:

    • 用户指定readerBuffer了, new一个SimpleReader对象
    • 根据settings配置选择一个reader对象,new一个SparkJLineReaderSimpleReader对象

    reader对象生成后, 赋值给SparkILoop的变量in, 后续读取输入。

    2.2 选择reader:

  3. 添加bind绑定到执行列表

    3,4,5,6都是把要执行的函数放到pendingThunks:List[() => Unit]中, 这些函数都是需要在解释器初始化后被执行。

    名为绑定,要绑定个啥?
    把一个key-value设置到intp解释器中,在后面的用户输入的表达式中可以直接引用这个key-value。在后续代码片段生成的class中引入intp作为成员变量,这样就可以直接用了。

    先new一个val bindRep = new ReadEvalPrint()对象, ReadEvalPrint表达一个repl的过程。bindRep首先编译object类,再调用set函数, 把SparkIMain对象set进去。

    // todo CodeAssembler, ObjectSourceCode, ResultObjectSour ceCode

    object ${bindRep.evalName} {
      var value: ${boundType} = _
      def set(x: Any) = value = x.asInstanceOf[${boundType}]
    }
    

    上面这个静态类就是要编译的代码, 查询系统属性scala.repl.name.eval, 如果不存在就以$eval作为类名。成员变量boundType就是SparkIMain了。

    bindRep.callEither()调用上面生成的object的set函数,把SparkIMain对象set进去。

    这样,直接调用生成object对象${bindRep.evalName}的value属性就可以用SparkIMain了。

    2.3 绑定

  4. 添加repl自动执行代码到执行列表

    scala.tools.nsc.ReplProps中定义的变量replAutorunCode, 会引用系统变量scala.repl.autoruncode, 如果用户设置了这个属性, 则会读取对应的value, value一般是指向一个代码文件, 如果确实存在这个源代码文件, 则调用编译执行函数。

    上述整个过程添加到待执行列表。

  5. 添加欢迎信息函数到执行列表

    控制台输出欢迎字符串:

          ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
    /___/ .__/\_,_/_/ /_/\_\   version %s
      /_/
    
  6. 添加初始化spark环境函数到执行列表

    初始化spark环境initializeSpark,编译执行以下代码块:

    //	1. command
    @transient val sc = {
      val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
      println("Spark context available as sc " +
        s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
      _sc
    }
    
    //	2. command
    @transient val sqlContext = {
      val _sqlContext = org.apache.spark.repl.Main.interp.createSQLContext()
      println("SQL context available as sqlContext.")
      _sqlContext
    }
    
    //	3. command
    import org.apache.spark.SparkContext._
    //	4. command
    import sqlContext.implicits._
    //	5. command
    import sqlContext.sql
    //	6. command
    import org.apache.spark.sql.functions._
    
    

    导入必需的类, 声明sc和sqlContext变量, 直接暴露这俩变量给用户使用。 notebook也主要以此两个变量作为调试job代码的入口。

    上述整个过程添加到待执行列表。

  7. 解释器初始化

    解释器intp根据用户的设定是否为异步执行同步初始化或异步初始化。目前固定为同步初始化。

    异步初始化的设置位置为: scala.tools.ScalaSettings
    val Yreplsync = BooleanSetting ("-Yrepl-sync", "Do not use asynchronous code for repl startup")

    初始化的过程就是编译执行名为<init>代码:class $repl_$init { },一个空类。

    如果这个空类编译执行报错,那么整个repl就会hang死翘翘了。更像是个探针,测试下scala编译执行环境是否可用。

  8. 执行3,4,5,6添加到执行列表中的函数

    初始化完毕后, 遍历之前添加到pendingThunks列表中的待执行函数,apply执行之。

  9. loop开始干活

    解释器初始化正常,执行系统定义的代码:绑定、自动执行代码、welcome、初始化spark变量,一切正常则开始无尽循环的正事: read-eval-print, readLine => processLine。

    loop退出条件为:

    • 读到的行为null
    • 行命令执行结果ResultkeepRunning==false
      case class Result(val keepRunning: Boolean, val lineToRecord: Option[String])

    读到行代码, 执行代码内容, 执行函数路径为:

    1. 解析 command(line)

    2. 开始调用解释器 interpretStartingWith(code: String)

    3. 调用解释器 intp.interpret(code)

    4. 生成语法树 requestFromLine(line, synthetic)

    5. 加载上下文环境, 执行语法树。loadAndRunReq(req: Request)

    6. 调用用户输入的代码 call()
      m.invoke(evalClass, args.map(_.asInstanceOf[AnyRef]): _*) 调用

      这里的Method是把line source code放到一个函数中, 生成一个class, 然后调用执行。

    2.9 repl

summary


从SparkSubmit起,初始化解释器,执行代码: 绑定、自动执行代码、输出欢迎、初始化spark环境,进入loop状态:read-eval-print。

如果想让repl的过程有更多的自定义交互操作,可以提交SparkSubmit一个自定义的类,做一个中间代理,包装一下SparkILoop。弊端就是要兼容spark各个版本的代码。

后续会分析hue livy和spark notebook怎么与spark交互。

转载请注明原作者

--------EOF---------