Skip to content

Latest commit

 

History

History
404 lines (331 loc) · 11.1 KB

readme.md

File metadata and controls

404 lines (331 loc) · 11.1 KB

说明

基于scala 2.11、spark 2.3.2

spark-streaming

主要是消费kafka数据,直接写入HBase

单条

每个partition一个连接,单条数据写入到HBase: com.example.spark.streaming.KafkaHBaseApp

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 对每个分区进行处理
    rdd.foreachPartition { records =>
      // 创建HBase连接
      val conf = createHBaseConf
      val conn = ConnectionFactory.createConnection(conf)
      // 消费,写入HBase
      records.foreach { record =>
        val table = conn.getTable(TableName.valueOf("test_bo"))
        // key作为row id
        val rowId = record.key()
        val put = new Put(Bytes.toBytes(rowId))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
        table.put(put)
        // 关闭
        table.close()
      }
      // 关闭
      conn.close()
    }
    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

备注: 性能最低

批量

每个partition一个连接,数据批量写入到HBase: com.example.spark.streaming.KafkaHBaseBulkApp:

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    // 转化成put,然后批量写入
    rdd.map { record =>
      val rowId = record.key()
      val put = new Put(Bytes.toBytes(rowId))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
      put
    }.foreachPartition { puts =>
      // 创建HBase连接
      val conn = ConnectionFactory.createConnection(conf)
      val table = conn.getTable(TableName.valueOf("test_bo"))
      // 批量写入
      import scala.collection.JavaConversions.seqAsJavaList
      table.put(seqAsJavaList(puts.toList))
      // 关闭
      table.close()
      conn.close()
    }
    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

备注: 性能快了一大截,但要防止一批数据量太大,建议是每一批次的间隔小一点

saveAsNewAPIHadoopDataset

每个partition一个连接,数据通过saveAsNewAPIHadoopDataset写入到HBase: com.example.spark.streaming.KafkaHBaseUseSaveAsNewAPIHadoopDatasetApp:

def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
  val conf = createHBaseConf
  val job = Job.getInstance(conf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

  stream.foreachRDD { rdd =>
    // 获取offset信息
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.map { record =>
      val rowId = record.key()
      val put = new Put(Bytes.toBytes(rowId))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("value"), Bytes.toBytes(record.value()))
      (new ImmutableBytesWritable, put)
    }.saveAsNewAPIHadoopDataset(job.getConfiguration)

    // 异步提交offset
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
  }
}

备注: 性能快了一大截,略快于批量,实际中还需更精确的测试。

kafka偏移量保存到redis

这里使用jedis,作为redis的客户端。

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

定义一个连接池

package com.example.spark.streaming

import java.util.Objects

import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{Jedis, JedisPool}

/**
  * redis client
  *
  * @author 奔波儿灞
  * @since 1.0
  */
object InternalRedisClient extends Serializable {

  @transient
  private var pool: JedisPool = _

  def makePool(redisHost: String = "localhost",
               redisPort: Int = 6379,
               redisTimeout: Int = 3000,
               maxTotal: Int = 16,
               maxIdle: Int = 8,
               minIdle: Int = 2,
               testOnBorrow: Boolean = true,
               testOnReturn: Boolean = false,
               maxWaitMillis: Long = 10000): Unit = {
    if (pool == null) {
      val poolConfig = new GenericObjectPoolConfig()
      poolConfig.setMaxTotal(maxTotal)
      poolConfig.setMaxIdle(maxIdle)
      poolConfig.setMinIdle(minIdle)
      poolConfig.setTestOnBorrow(testOnBorrow)
      poolConfig.setTestOnReturn(testOnReturn)
      poolConfig.setMaxWaitMillis(maxWaitMillis)
      pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

      sys.addShutdownHook {
        pool.destroy()
      }
    }
  }

  private def getPool: JedisPool = {
    Objects.requireNonNull(pool)
  }

  def getResource: Jedis = {
    getPool.getResource
  }

  def returnResource(jedis: Jedis): Unit = {
    if (jedis != null) {
      jedis.close()
    }
  }
}

kafka stream从redis中获取偏移量,并将offset提交到redis

package com.example.spark.streaming

import com.typesafe.scalalogging.Logger
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

import scala.collection.mutable

/**
  * 消费kafka
  */
object KafkaConsumerOffsetApp {

  private val log = Logger(LoggerFactory.getLogger(KafkaConsumerOffsetApp.getClass))

  /**
    * 每3s一批数据
    */
  private val batchDuration = Seconds(3)

  /**
    * kakfa参数
    */
  private val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "use_a_separate_group_id_for_each_stream",
    "auto.offset.reset" -> "none",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  /**
    * topic、partition
    */
  private val topicPartitions = Map[String, Int]("bobo" -> 2)


  def main(args: Array[String]): Unit = {
    // 创建redis
    InternalRedisClient.makePool(redisHost = "172.16.213.79")
    // 创建SparkContext
    val ssc = createSparkContext()
    // 创建kafka流
    val stream = createKafkaStream(ssc)
    // 消费
    consume(stream)
    // 启动并等待
    startAndWait(ssc)
  }

  /**
    * 创建sparkContext
    *
    * @return StreamingContext
    */
  def createSparkContext(): StreamingContext = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaConsumerOffsetApp")
    val ssc = new StreamingContext(conf, batchDuration)
    ssc
  }

  /**
    * 创建kakfa流
    *
    * @param ssc StreamingContext
    * @return InputDStream
    */
  def createKafkaStream(ssc: StreamingContext): InputDStream[ConsumerRecord[String, String]] = {
    val offsets = getOffsets

    // 创建kafka stream
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](offsets.keys.toList, kafkaParams, offsets)
    )
    stream
  }

  /**
    * 从redis中获取offset信息
    *
    * @return Map[TopicPartition, Long]
    */
  def getOffsets: Map[TopicPartition, Long] = {
    val jedis = InternalRedisClient.getResource

    // 设置每个分区起始的offset
    val offsets = mutable.Map[TopicPartition, Long]()

    topicPartitions.foreach { it =>
      val topic = it._1
      val partitions = it._2
      // 遍历分区,设置每个topic下对应partition的offset
      for (partition <- 0 until partitions) {
        val topicPartitionKey = topic + ":" + partition
        var lastOffset = 0L
        val lastSavedOffset = jedis.get(topicPartitionKey)

        if (null != lastSavedOffset) {
          try {
            lastOffset = lastSavedOffset.toLong
          } catch {
            case e: Exception =>
              log.error("get lastSavedOffset error", e)
              System.exit(1)
          }
        }
        log.info("from redis topic: {}, partition: {}, lastOffset: {}", topic, partition, lastOffset)

        // 添加
        offsets += (new TopicPartition(topic, partition) -> lastOffset)
      }
    }

    InternalRedisClient.returnResource(jedis)

    offsets.toMap
  }

  /**
    * 消费
    *
    * @param stream InputDStream
    */
  def consume(stream: InputDStream[ConsumerRecord[String, String]]): Unit = {
    stream.foreachRDD { rdd =>
      // 获取offset信息
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      // 计算相关指标,这里就统计下条数了
      val total = rdd.count()

      val jedis = InternalRedisClient.getResource
      val pipeline = jedis.pipelined()
      // 会阻塞redis
      pipeline.multi()

      // 更新相关指标
      pipeline.incrBy("totalRecords", total)

      // 更新offset
      offsetRanges.foreach { offsetRange =>
        log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetRange.topic, offsetRange.partition, offsetRange.untilOffset)
        val topicPartitionKey = offsetRange.topic + ":" + offsetRange.partition
        pipeline.set(topicPartitionKey, offsetRange.untilOffset + "")
      }

      // 执行,释放
      pipeline.exec()
      pipeline.sync()
      pipeline.close()
      InternalRedisClient.returnResource(jedis)
    }
  }

  /**
    * 启动并等待
    *
    * @param ssc StreamingContext
    */
  def startAndWait(ssc: StreamingContext): Unit = {
    // 启动
    ssc.start()
    // 等待
    ssc.awaitTermination()
  }

}

主要是:

  • getOffsets方法从redis中获取到topic对应的partition中的offset
  • consume消费完数据后,将offset提交到redis

参考:实时流计算、Spark Streaming、Kafka、Redis、Exactly-once、实时去重

核心是利用redis的pipelinemulti来保证spark streaming的计算结果写入redis是原子性。

spark-sql

连接hive

经过测试,HDP集群中,需要添加hive.metastore.urisspark.sql.warehouse.dirmetastore.catalog.default即可访问hive集群

val spark = SparkSession
  .builder()
  .appName("HiveApp")
  // 加这个配置访问集群中的hive
  .config("hive.metastore.uris", "thrift://crpprdap25:9083")
  .config("spark.sql.warehouse.dir", "/data/warehouse/tablespace/managed/hive")
  .config("metastore.catalog.default", "hive")
  .enableHiveSupport()
  .getOrCreate()

// 执行sql
spark.sql("show databases").show()

注意需要添加spark-hive依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>