Skip to content

Latest commit

 

History

History
100 lines (69 loc) · 11.2 KB

README.md

File metadata and controls

100 lines (69 loc) · 11.2 KB

Apache Spark

Apache Spark is a unified computing engine and a set of libraries for parallel data processing on computer clusters. Spark is the most actively developed open source engine for this task, making it a standard tool for any developer or data scientist interested in big data.

spark起源于2007年其创始人Matei ZahariaUC Berkeley读博,期间其对数据中心级别的分布式计算非常感兴趣。当时一些互联网公司开始用几千台机器计算并存储数据,Matei开始和Yahoo以及Facebook的团队合作,来解决工业界中的大数据问题。但大多数技术都仅局限于批处理,还缺少对交互式查询的支持并且不支持机器学习等迭代式计算。

另外,MateiBerkeley继续研究时发现,Berkeley的研究团体同样需要可扩展的数据处理器,特别是机器学习研究。2009Matei着手开发Spark,并在最初就收到了许多好评,Berkeley的许多大数据研究者们都使用Spark进行大数据应用开发和研究。其两篇论文很好的阐述了spark的设计理念:Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster ComputingAn Architecture for Fast and General Data Processing on Large Clusters

(RDDs) - a distributed memory abstraction

RDDs允许应用开发者在大型集群上执行in-memory计算,同时保留MapReduce等数据流模型的容错能力。RDDs的设计受当前数据流系统无法有效处理的两种应用程序的驱动影响:迭代式计算(iterative algorithms)—被广泛用于图计算以及机器学习,以及交互式的数据挖掘工具。为了有效地实现容错,RDDs提供了高度受限的共享内存形式:其为只读的数据集(datasets)、分区的数据集合(partitioned collections of records),并且只能通过其它RDD上确定性转换(mapjoingroup by等)来创建。 RDD可通过血统(lineage)重新构建丢失的分区数据,其有足够的信息表明如何从其它RDD进行转换。

Programming Modelsparkrdd由对象表示,并调用这些对象上的方法进行转换。在定义一个或者多个RDD之后,开发者可在操作(action)中使用它们,其会将数值返回给应用程序或将数据导出到存储系统。RDD仅在action中首次使用时才进行计算(they are lazily evaluated),在构建RDD时允许在流水线上运行多个转换。cachingpartitioning是开发者经常控制RDD的两个操作,计算后的RDD分区数据进行缓存之后再使用时会加快计算速度。RDD通常缓存在内存中,当内存不足时会spill到磁盘上。此外,RDD通常还允许用户执行分区策略(partition strategy),目前支持hashrange两种方式进行分区。例如,应用程序可对两个RDD采用相同的hash分区(相同keyrecord放在同一台机器),用于加快join连接速度。

简而言之,每个RDD都有一组分区,这些分区都是数据集的原子部分。转换依赖关系构成了其与父RDD的血缘关系:基于其父代计算RDD计算的功能及有关其分区方案和数据放置的原数据。spark使用narrow dependencieswide dependencies来表示RDD间的数据依赖,narrow dependencies指子RDD仅依赖于父RDD的固定分区的数据(each partition of the child RDD depends on a constant number of partitions of parent),一般为一些map()filter()union()mapValues()等转换操作;wide dependencies指生成RDD的数据依赖与父RDD的所有分区数据,常为一些聚合类的转换操作groupByKey()groupByKey()reduceByKey(func, [numPartitions])

narrow dependencies 允许在集群单台node结点流水线执行父RDD所有分区的数据,相比之下,wide dependencies则要求所有父RDD分区中数据都必须是可用的,以便使用map-reduce类似的操作执行跨nodeshuffle操作。除此之外,在node计算失败后使用narrow dependencies会更加高效,因为其只需计算部分父RDD缺失数据的parition,并且重新计算的过程可在不同结点上并行进行。而wide dependencies则会要求重新计算整个父RDD中的所有数据,重新进行完整的计算。

DataFrames, Datasets, and Spark SQL

Sprak SQL以及它的DataFramesDataSets接口是Spark性能优化的未来,其自带更高效的storage optionsadvanced optimizer以及对序列化数据的直接操作。和RDDs一样,DataFramesDatasets代表分布式集合,并附带有在RDDs上没有schema信息。这些信息被用来提供一个更有效的存储层Tungsten,并在优化器中执行其它优化。除了schema之外,在DataFramesDataSets上执行时Catalyst可以检查其逻辑语意,而并不仅仅是执行functions内容。DataFramesDataSets中一种特殊的Row对象,其并不提供任何编译期的类型检查(type checking)。强类型的DataSet API特别适合用于更多像RDDs functions一样的操作。

可直接使用RDD[T].toDF(column1, column2, ..)创建DataFrames,在RDD[T]类型明确的情况下,使用spark.s qlContext.createDataFrame(flightRDD)进行创建。在之前性能评测中,执行reduceByKey()操作时,DataFrames执行相同数据性能评测是远远优于RDD性能:

val flight: Flight = Flight("United States", "Romania", 264)
val toDfDataFrame = spark.sparkContext.parallelize(Seq(flight))
      .toDF("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME", "count")
/*root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: decimal(38,0) (nullable = true)*/
toDfDataFrame.printSchema()

创建schema约束指定DataFrames的结构,在StructType中指定字段列表及类型之后创建DataFrame。对DataFrame调用.rdd方法可将DataFrames转换为RDD[Row]结构。可以在spark SQL中执行指定函数avg()filter()max()min()操作:

val flightSchema = StructType(
  StructField("DEST_COUNTRY_NAME", StringType, true) ::
  StructField("ORIGIN_COUNTRY_NAME", StringType, true) ::
  StructField("count", IntegerType, true) :: Nil)
val flightRdd = spark.sparkContext.parallelize(Seq(
  Row("United States", "Romania", 264)
))
val dataFrame = spark.sqlContext.createDataFrame(flightRdd, flightSchema)

Datasetsspark SQL中一个令人激动的扩展—提供了编译期compile time的类型检查。从Spark 2.0开始,DataFrames成为DataSets的一个特殊版本,用于直接操作genericrow对象。像DataFrames一样,Datasets也由Catalyst优化器使用逻辑计划进行优化,缓存的数据可以用spark SQL内核编码方式进行存储。创建DataSet的方式与Dataframe类似,使用createDateset(rdd)rdd.toDS()的方式进行创建:

val flightDataset = spark.sqlContext.createDataset(flightRdd)
flightDataset.printSchema()
val parallelizeDataset = spark.sparkContext.parallelize(Seq(flight)).toDS()

sparkrdds之前的转换多为Wide transformations or Narrow transformationswide数据转换大多需要进行shuffle操作。rdd中的数据以partitions的方式保存,当要调整分区数量时,可使用rdd.coalesce(numPartitions)调整。这种方式比较高效,重新分区使用narrow的方式,可以避免无用的shuffles操作。

Minimizing Object Creation优化,spark是运行在JVM上的,JVM的内存管理、large data structures及垃圾回收会很快成为Spark Job运行时耗费较大的一部分。在进行迭代式计算时,可使用rdd.cache()checkpointshuffile files临时缓存处理后的结果。spark中存储在内存或硬盘上的RDD数据并不是自动unpersist()spark使用LRU策略,在其executors内存快用完的时进行数据清理。

Spark Components and Packages, Structured Streaming and Graphx

spark具有大量组件,这些组件设计为作为一个集成系统一起工作,并且其中许多组件作为spark的一部分都是分布式的。Spark Streaming有两种API,其中一种是基于RDDs的叫做DStreams,另一种是基于Spark SQL/DataFrames的为Structured Streaming。许多关于RDDs转换、DataFrames、DataSet操作的性能考虑也同样适用于streaming上下文,大多数的operations都具有相同的名称,但也有一部分来自批处理api的操作在流api中并没有直接提供支持。

批处理间隔表示分布式流系统中吞吐量和延迟之间的传统权衡,Spark Streaming在处理完每一个批次间隔数据后才会处理第二个批次。因此你应将批次间隔设置的足够高,以便在安排下一个批次开始之前要处理上一个批次。批处理时间的设置依赖与你的应用程序,对此很难提供一般准则。在进行流数据处理时,可使用checkpoint将数据内容写入到磁盘上。

val batchInterval = Second(1)
new StreamingContext(sc, batchInterval)

GraphxApache spark上的一个遗留组件,已经有很长时间没有再更新。Graphx有遇到一些明显的性能问题:在某些情况下,执行迭代式计算时没有进行checkpoint以让spark清理DAG。目前最有希望的替代Graphx的是社区提供的GraphFrames组件,它旨在利用Spark DataFrames提供GraphX功能和扩展功能。这种扩展的功能包括主题查找,基于DataFrame的序列化和高度表达的图形查询。

Pregel: A System for Larg e-Scale Graph Processinggoogle工程师发表Pregal关于分布式计算论文,分布式图框架就是将大型图的各种操作封装成接口,让分布式存储、并行计算等复杂问题对上层透明,从而使工程师将焦点放在图相关的模型设计和使用上,而不用关心底层的实现细节。图计算框架基本上都遵循分布式批同步(Bulk Synchronous Parallel, BSP)计算模式,基于BSP模式,目前比较成熟的图计算框架Pregel框架和GraphLab框架。

Spark Graphx中主要使用Graph.aggregateMessage()Pregel接口进行图相关的计算,依据BSP模型处理流程中比较核心是发sendMsgmergeMsg过程:消息的发送与graph中的广度优先算法类似(graph所有结点同时发),每次都向相邻的点发送消息,对于收到消息的结点需要进行消息合并并更新graph并进行下一轮次的计算,直到最大迭代次数或graph中已没有要发送的消息为止。

/* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
*   rawGraph.aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
* }}}
*/
def aggregateMessages[A: ClassTag](
    sendMsg: EdgeContext[VD, ED, A] => Unit,
    mergeMsg: (A, A) => A,
    tripletFields: TripletFields = TripletFields.All)
  : VertexRDD[A] = {
    aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}