-
Notifications
You must be signed in to change notification settings - Fork 0
XXwhite成长笔记
public static void main(String[] args) {
int[] arr = { 1, 5, 6, 4, 2, 3, 9, 8, 7 };
PriorityQueue<Integer> priorityQueue = new PriorityQueue<Integer>(3);
for (int i = 0; i < arr.length; i++) {
if (priorityQueue.size() < 3) {
priorityQueue.offer(arr[i]);
} else if (priorityQueue.peek() < arr[i]) {
priorityQueue.poll();
priorityQueue.offer(arr[i]);
}
}
System.out.println(Arrays.toString(priorityQueue.toArray()));
}
df.createOrReplaceTempView("temp_date_province_adv_count")
val sql =
"""
SELECT
date,province,advid,count
FROM
(
SELECT
date,province,advid,count,
ROW_NUMBER() OVER(PARTITION BY provnice order by count desc) rank
FROM
temp_date_province_adv_count
) tmp
WHERE tmp.rank <= 10
"""
val top10 = spark.sql(sql)
def realCountAdvClickByWindow(filterDStream: DStream[String]): Unit ={ filterDStream.map( log =>{ val fields = log.split(",") val y_day_hour_min = DateUtils.formatTimeMinute(new Date( fields(0).toLong)) val advid = fields(4).toLong (y_day_hour_min+"_"+advid,1L) }).reduceByKeyAndWindow((a:Long,b:Long) => a+b,Minutes(60),Seconds(20))//每隔20秒统计最近一小时内的数据情况 .foreachRDD( rdd =>{ //获取数据库链接 rdd.foreachPartition( partition =>{ partition.foreach( record =>{ /////持久化到数据库即可 }) }) }) }
/**
* 实时统计每天各省各城市广告的点击次数
*/
val resultDStream: DStream[(String, Long)] = date_province_city_advid_1.updateStateByKey(updateFunc)
def updateFunc(values:Seq[Long], status: Option[Long]):Option[Long]={
val currentCount = values.sum
val lastCount = status.getOrElse(0)
Some(currentCount+lastCount)
}
再次使用Spark-shell时可能会报错: 19/05/14 10:49:14 WARN component.AbstractLifeCycle: FAILED Spark@4b56b031{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use
19/05/14 10:49:14 WARN component.AbstractLifeCycle: FAILED org.spark_project.jetty.server.Server@4ef4f627: java.net.BindException: Address already in use
显示端口占用,但并不影响使用,杀掉之前端口为4040的进程即可,但是 netstat -qa|grep 4040 后未显示任何内容,lsof -i :4040 才可以找到该进程,kill掉后重试spark-shell,不再报错。
正确姿势:":quit"
1.The message is 1088783 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
设置Kafka的producer参数 buffer.memory 要大于最大文件大小 默认32m 在flume的配置文件中增加 max.request.size = 要大于最大文件大小 在kafka的配置文件中增加 replica.fetch.max.bytes = 要大于最大文件大小 和 message.max.bytes = 要大于最大文件大小 在消费者配置中添加 max.partition.fetch.bytes = 要大于最大文件大小
2.[ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:158)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: close() called when transaction is OPEN - you must either commit or rollback first
修改flume-ng文件: JAVA_OPTS="-Xmx100m" #设置大一点,默认是20M
主要体现在重启服务后初始化失败,只有第一次可以启动成功,还有就是根本起不来,直接报空指针异常,选择版本时要确保kafka 和 Schema-Registry的scala版本一定不要出现一个时2.10一个是2.11的情况。
Flume报错Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
在flume的配置中主要涉及三个参数batchSize,transactionCapacity,capacity。transactionCapacity,capacity的默认值是100,配置时batchSize一定不能大于transactionCapacity。
private static final MultipartConfigElement MULTI_PART_CONFIG = new MultipartConfigElement("c:/temp"); request.setAttribute(Request.__MULTIPART_CONFIG_ELEMENT, MULTI_PART_CONFIG);
在request.getParts()方法之前,一定不要有读取request.getInputStream()的操作,否则就报这个错误,坑死。
Exception in thread "main" tech.allegro.schema.json2avro.converter.AvroConversionException: Failed to convert JSON to Avro at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:58) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:47) at tech.allegro.schema.json2avro.converter.JsonAvroConverter.convertToGenericDataRecord(JsonAvroConverter.java:56) at Test2.main(Test2.java:94) Caused by: org.apache.avro.AvroTypeException: Unsupported type: BYTES at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:121) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.lambda$readRecord$0(JsonGenericRecordReader.java:67) at java.util.LinkedHashMap$LinkedEntrySet.forEach(LinkedHashMap.java:671) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.readRecord(JsonGenericRecordReader.java:64) at tech.allegro.schema.json2avro.converter.JsonGenericRecordReader.read(JsonGenericRecordReader.java:56) ... 3 more
提示不支持BYTES,schema中确实有bytes,看了源码,也确实不支持bytes,版本是0.2.7,搞了好久最后升级到最新版本就可以了。 tech.allegro.schema.json2avro converter 0.2.9
<dependency>
<groupId>com.sksamuel.avro4s</groupId>
<artifactId>avro4s-json_2.11</artifactId>
<version>1.6.2</version>
</dependency>
package J2AS
import com.sksamuel.avro4s.json.JsonToAvroConverter
import org.apache.avro.Schema
object Json2AvroSchema {
def main(args: Array[String]): Unit = {
var l = new JsonToAvroConverter("", true);
var str = "{\"employees\":[{\"firstName\":\"Bill\",\"lastName\":\"Gates\"{\"firstName\":\"George\",\"lastName\":\"Bush\"},{\"firstName\":\"Thomas\",\"lastName\":\"Carter\"}]}"
val schema: Schema = l.convert("mainbody", str)
println("Avro Schema:")
println(schema.toString())
}
}
[root@node2 ~]# yum install nginx
Loaded plugins: fastestmirror, langpacks
Repodata is over 2 weeks old. Install yum-cron? Or run: yum makecache fast
base | 3.6 kB 00:00:00
extras | 2.9 kB 00:00:00
updates | 2.9 kB 00:00:00
(1/2): extras/7/x86_64/primary_db | 153 kB 00:00:00
(2/2): updates/7/x86_64/primary_db | 2.8 MB 00:00:00
Determining fastest mirrors
- base: mirrors.tuna.tsinghua.edu.cn
- extras: mirrors.tuna.tsinghua.edu.cn
- updates: mirrors.tuna.tsinghua.edu.cn No package nginx available. Error: Nothing to do
解决:rpm -ivh http://nginx.org/packages/centos/7/noarch/RPMS/nginx-release-centos-7-0.el7.ngx.noarch.rpm
最近对新项目进行压力测试,下游的Agent是由kafkasource和httpsink组成,给的内存有1G左右,capacity参数为10000,每条数据大小9MB, flume瞬间OOM,总结后发现,根据1G内存,每条数据9MB,capacity最大为111(注意:是当前agent的所有channel的总和),所以kafkasource一股脑消费类大量数据到channel中,还没等sink channel中的数据,flume就挂掉了,然后改小了capacity,改为100,虽然没有OOM的错误了,但是还会KafkaSource EXCEPTION, {}。,这应该是由于channel中的event数量满了,不能再放入了,就中断了线程,这种情况要么就增加capacity的数量,要么就加快sink的速度。考量后两者都无法再改变了。于是调节了这个参数 keep-alive ,这个参数的作用是在channel满的时候,在停止进程之前等待的时间,如果等待时间内channel中的event被消费掉了一部分就可以继续放入event了,这样就能避免启动瞬间数据量很大,导致异常的情况发生。