Skip to content

XXwhite成长笔记

XXwhite edited this page Jan 8, 2020 · 26 revisions

小根堆实现TopK问题-PriorityQueue:

public static void main(String[] args) {
	int[] arr = { 1, 5, 6, 4, 2, 3, 9, 8, 7 };

	PriorityQueue<Integer> priorityQueue = new PriorityQueue<Integer>(3);
	for (int i = 0; i < arr.length; i++) {
		if (priorityQueue.size() < 3) {
			priorityQueue.offer(arr[i]);
		} else if (priorityQueue.peek() < arr[i]) {
			priorityQueue.poll();
			priorityQueue.offer(arr[i]);
		}
	}
	System.out.println(Arrays.toString(priorityQueue.toArray()));
}

SparkSql TopN案例:

  df.createOrReplaceTempView("temp_date_province_adv_count")
  val sql =
    """
       SELECT
             date,province,advid,count
       FROM
       (
       SELECT
             date,province,advid,count,
             ROW_NUMBER() OVER(PARTITION BY provnice order by count desc) rank
       FROM
             temp_date_province_adv_count
         ) tmp
       WHERE tmp.rank <= 10

    """
  val top10 = spark.sql(sql)

SparkStreaming 窗口函数ReduceByKeyAndWindow的使用:

def realCountAdvClickByWindow(filterDStream: DStream[String]): Unit ={ filterDStream.map( log =>{ val fields = log.split(",") val y_day_hour_min = DateUtils.formatTimeMinute(new Date( fields(0).toLong)) val advid = fields(4).toLong (y_day_hour_min+"_"+advid,1L) }).reduceByKeyAndWindow((a:Long,b:Long) => a+b,Minutes(60),Seconds(20))//每隔20秒统计最近一小时内的数据情况 .foreachRDD( rdd =>{ //获取数据库链接 rdd.foreachPartition( partition =>{ partition.foreach( record =>{ /////持久化到数据库即可 }) }) }) }

SparkStreaming 中updateStateByKey的使用:

/**
  * 实时统计每天各省各城市广告的点击次数
  */
val resultDStream: DStream[(String, Long)] = date_province_city_advid_1.updateStateByKey(updateFunc)

def updateFunc(values:Seq[Long], status: Option[Long]):Option[Long]={
  val currentCount = values.sum
  val lastCount = status.getOrElse(0)
  Some(currentCount+lastCount)
}

GroupByKey和ReduceByKey的区别:

ReduceByKey

GroupByKey

使用Spark-shell 非正常退出可能产生的问题:

再次使用Spark-shell时可能会报错: 19/05/14 10:49:14 WARN component.AbstractLifeCycle: FAILED Spark@4b56b031{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use

19/05/14 10:49:14 WARN component.AbstractLifeCycle: FAILED org.spark_project.jetty.server.Server@4ef4f627: java.net.BindException: Address already in use

显示端口占用,但并不影响使用,杀掉之前端口为4040的进程即可,但是 netstat -qa|grep 4040 后未显示任何内容,lsof -i :4040 才可以找到该进程,kill掉后重试spark-shell,不再报错。

正确姿势:":quit"

Flume发送大文件给kafka的故障解决

1.The message is 1088783 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

设置Kafka的producer参数 buffer.memory 要大于最大文件大小 默认32m 在flume的配置文件中增加 max.request.size = 要大于最大文件大小 在kafka的配置文件中增加 replica.fetch.max.bytes = 要大于最大文件大小 和 message.max.bytes = 要大于最大文件大小 在消费者配置中添加 max.partition.fetch.bytes = 要大于最大文件大小

2.[ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.

java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first

修改flume-ng文件: JAVA_OPTS="-Xmx100m" #设置大一点,默认是20M

使用Schema-Registry 与kafka 的版本不兼容

主要体现在重启服务后初始化失败,只有第一次可以启动成功,还有就是根本起不来,直接报空指针异常,选择版本时要确保kafka 和 Schema-Registry的scala版本一定不要出现一个时2.10一个是2.11的情况。

Flume报错Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count

在flume的配置中主要涉及三个参数batchSize,transactionCapacity,capacity。transactionCapacity,capacity的默认值是100,配置时batchSize一定不能大于transactionCapacity。

Jetty使用request.getParts()要进行的配置

private static final MultipartConfigElement MULTI_PART_CONFIG = new MultipartConfigElement("c:/temp"); request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, MULTI_PART_CONFIG);

Selvet文件上传插件报错Missing content for multipart request

在request.getParts()方法之前,一定不要有读取request.getInputStream()的操作,否则就报这个错误,坑死。

使用JsonAvroConverter将Json转Avro过程中报错

Exception in thread "main" tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:58) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:47) at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:56) at Test2.main(Test2.java:94) Caused by: org.apache.avro.AvroTypeException: Unsupported type: BYTES at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:121) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:67) at java.util.LinkedHashMap$LinkedEntrySet.forEach(LinkedHashMap.java:671) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:64) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:56) ... 3 more

提示不支持BYTES,schema中确实有bytes,看了源码,也确实不支持bytes,版本是0.2.7,搞了好久最后升级到最新版本就可以了。 tech.allegro.schema.json2avro converter 0.2.9

记录找到一个从Json数据转换成Avro Schema的一个工具,帮了大忙省了很多事,就是生成的schema没有加默认值,scala写的。

  <dependency>
        <groupId>com.sksamuel.avro4s</groupId>
        <artifactId>avro4s-json_2.11</artifactId>
        <version>1.6.2</version>
    </dependency>

package J2AS

import com.sksamuel.avro4s.json.JsonToAvroConverter
import org.apache.avro.Schema

object Json2AvroSchema {
  def main(args: Array[String]): Unit = {
  var l = new JsonToAvroConverter("", true);
  var str = "{\"employees\":[{\"firstName\":\"Bill\",\"lastName\":\"Gates\"{\"firstName\":\"George\",\"lastName\":\"Bush\"},{\"firstName\":\"Thomas\",\"lastName\":\"Carter\"}]}"
  val schema: Schema = l.convert("mainbody", str)
  println("Avro Schema:")
  println(schema.toString())
}

}

yum -y install nginx 报错

[root@node2 ~]# yum install nginx Loaded plugins: fastestmirror, langpacks Repodata is over 2 weeks old. Install yum-cron? Or run: yum makecache fast base | 3.6 kB 00:00:00
extras | 2.9 kB 00:00:00
updates | 2.9 kB 00:00:00
(1/2): extras/7/x86_64/primary_db | 153 kB 00:00:00
(2/2): updates/7/x86_64/primary_db | 2.8 MB 00:00:00
Determining fastest mirrors

  • base: mirrors.tuna.tsinghua.edu.cn
  • extras: mirrors.tuna.tsinghua.edu.cn
  • updates: mirrors.tuna.tsinghua.edu.cn No package nginx available. Error: Nothing to do

解决:rpm -ivh http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm

记录flume的一次性能调优

最近对新项目进行压力测试,下游的Agent是由kafkasource和httpsink组成,给的内存有1G左右,capacity参数为10000,每条数据大小9MB, flume瞬间OOM,总结后发现,根据1G内存,每条数据9MB,capacity最大为111(注意:是当前agent的所有channel的总和),所以kafkasource一股脑消费类大量数据到channel中,还没等sink channel中的数据,flume就挂掉了,然后改小了capacity,改为100,虽然没有OOM的错误了,但是还会KafkaSource EXCEPTION, {}。,这应该是由于channel中的event数量满了,不能再放入了,就中断了线程,这种情况要么就增加capacity的数量,要么就加快sink的速度。考量后两者都无法再改变了。于是调节了这个参数 keep-alive ,这个参数的作用是在channel满的时候,在停止进程之前等待的时间,如果等待时间内channel中的event被消费掉了一部分就可以继续放入event了,这样就能避免启动瞬间数据量很大,导致异常的情况发生。