- 一个打十几个人, 甚至都拿刀的高手,分类处理。打败弱的对手先,最后打最强的高手,必须先听桥每个对手的弱点 ,中线在哪里,怎样弱点才能打败他的中线
- Clojure Sparkling & statistics, machine learning, kungfu
- Spark context
- Spark Streaming context
- Socket Text Stream
- Spark的闭包的处理是关键,Clojure与Spark互操作的关键: 函数的序列化
- 函数式操作的核心,SICP的原力爆发: map reduce
- Clojure和Scala的互操作: tuple and untuple
- Stream hello-world print整个数据流
- Kafka Stream
- Spark SQL
- Foreach RDD
- 线性回归SGD
- 贝叶斯
- ALS交替最小二乘法的协同过滤算法--推荐引擎学习
- 随机模拟,随机树到随机森林和功夫的骗手,用骗手来知其真
- svm升维度,敌人分类,然后降维度
- 每一个攻防招式都是高阶函数
- 组合拳或者腿就是组合函数
- 能够和你黐手的朋友才是真正的朋友
- 一个打十几个人的能力,打一百万人的能力,从身边的人打起爸爸,妈妈,姐
- 一切都是功夫,一切都是高阶函数,皆可打出去
- 一对多个分布式对手,是分布式的数据源数据流,攻防的组合招式是数据流的管道
- 功夫中的双截棍等武器,是数据流管道的用的帮助工具
- 咏春分手(小念头的分手在第二段,寻桥分手加了转马),手像两个盾牌一样,攻击连消带打 ,来留去送 ,甩手直冲。。。而不是他打我,我后退马,只是防御 ,没有攻击的防御是没用的 => 分手训练,小念头和寻桥分手训练,一手标一伏,发力为一体的两手,而不是分裂的,但是分工不一样
- 离开我中线,非中线向前,就可以出击 ,用黐手的打来提醒训练,发力错误,肘低发力 肩膀放松 归中三角形结构力 攻防才能奏效
- 离开我就打他,叫甩手直冲
- 来留去送 ,就像放风筝一样,顺着他的力 然后打他,如他向前较劲就拉打,他拉我就 我就撞打他
- 生活中的中线原理和埋肘原理,守中用中: 归中 和 连消带打 111 用在朋友和高人身上一样奏效,父母身上,创造咏春拳的人 ,才是绝世大师,中线原理
- 近邻分类(KNN)
- 朴素贝叶斯分类
- 决策树分类
- 预测数值型数据: 广义回归方法
- 神经网络和支持向量机
- K均值聚类
(spark/with-context context
(-> (conf/spark-conf)
(conf/master "local[*]")
(conf/app-name "Consumer"))
(do ... ))
(let [streaming-context (JavaStreamingContext. context (Duration. 1000))
... ]
(do ... ))
(def socket-stream (.socketTextStream streaming-context "localhost" 9999))
(def vec (Vectors/dense (double-array (list 0.1 0.15 0.2 0.3 0.25))))
(def mat (Matrices/dense 3 2 (double-array (list 1.0 3.0 5.0 2.0 4.0 6.0))))
(def sm (Matrices/sparse 3 2 (int-array (list 0 1 3)) (int-array (list 0 2 1)) (double-array (list 9 6 8))))
(def sv (Vectors/sparse 3 (int-array (list 0 2)) (double-array (list 1.0 3.0))))
(def pos (LabeledPoint. 1.0 (Vectors/dense (double-array (list 1.0 0.0 3.0)))))
(def mat-t (.transpose mat))
(def mat-multiply (.multiply mat mat-t))
;; => #object[org.apache.spark.mllib.linalg.DenseMatrix 0x63608adf "5.0 11.0 17.0 \n11.0 25.0 39.0 \n17.0 39.0 61.0 "]
;; Distributed matrix
(spark/with-context sc
(-> (conf/spark-conf)
(conf/master "local[*]")
(conf/app-name "Consumer"))
(let [data (Arrays/asList (into-array (list 1 2 3 4 5 6)))
rdd-dist-data (.parallelize sc data)
mat (RowMatrix. (.rdd rdd-dist-data))
mat2 (IndexedRowMatrix. (.rdd rdd-dist-data))
row-mat (.toRowMatrix mat2)]
;;(list (.numRows mat) (.numCols mat))
(.numRows mat) ;;=> 6
))
- Sparkling的数据流操作都必须在with-context下,否则会报序列化的错误
- 而且Spark版本的问题也可能导致序列化和闭包的错误
import clojure.lang.IFn;
public class Function2 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function2, org.apache.spark.sql.api.java.UDF2 {
public Function2(IFn func) {
super(func);
}
public Object call(Object v1, Object v2) throws Exception {
return f.invoke(v1, v2);
}
}
(gen-function Function function)
(gen-function Function2 function2)
(gen-function Function3 function3)
(gen-function VoidFunction void-function)
(gen-function FlatMapFunction flat-map-function)
(gen-function FlatMapFunction2 flat-map-function2)
(gen-function PairFlatMapFunction pair-flat-map-function)
(gen-function PairFunction pair-function)
(defn foreach-rdd [dstream f]
(.foreachRDD dstream (function2 f)))
(foreach-rdd
stream
(fn [rdd arg2] ...))
(defn map
[f rdd]
(-> (.map rdd (function f))
(u/set-auto-name (u/unmangle-fn f))))
(defn map-to-pair
[f rdd]
(-> (.mapToPair rdd (pair-function f))
(u/set-auto-name (u/unmangle-fn f))))
(defn reduce
[f rdd]
(u/set-auto-name (.reduce rdd (function2 f)) (u/unmangle-fn f)))
(defn foreach-partition
[f rdd]
(.foreachPartition rdd (void-function (comp f iterator-seq))))
(defn partition-by
[^Partitioner partitioner ^JavaPairRDD rdd]
(.partitionBy rdd partitioner))
(spark/map-to-pair
(fn [lp]
(spark/tuple (.label lp) (.features lp)))
labeled-stream)
(defn untuple [^Tuple2 t]
(persistent!
(conj!
(conj! (transient []) (._1 t))
(._2 t))))
(defn -main
[& args]
(do
(.print socket-stream) ;; 或者是其它流, 如Kafka
(.start streaming-context)
(.awaitTermination streaming-context)))
(let [parameters (HashMap. {"metadata.broker.list" "127.0.0.1:9092"})
topics (Collections/singleton "abc_messages")
stream (KafkaUtils/createDirectStream streaming-context String String StringDecoder StringDecoder parameters topics)
... ]
(do ... ))
- TODOS: 改写4G数据记录的Spark查询
(defn select
[cols data-frame]
(.select data-frame
(into-array cols)))
(defn where
"call where by "
[expression data-frame]
(.where data-frame expression))
(defn foreach-rdd [dstream f]
(.foreachRDD dstream (function2 f)))
public static StreamingLinearRegressionWithSGD linearRegressionodel(double [] args, int num, float size) {
StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD()
.setStepSize(size)
.setNumIterations(num)
.setInitialWeights(Vectors.dense(args));
return model;
}
public static LabeledPoint labeledPoint(double label, double [] args) {
LabeledPoint point = new LabeledPoint(label, Vectors.dense(args));
return point;
}
(def model (VectorClojure/linearRegressionodel (double-array (repeat 100 0.0)) 1 0.01))
(def labeled-stream
(spark/map
(fn [record]
(let [split (clojure.string/split record #"\t")
y (Double/parseDouble (nth split 0))
features (-> (nth split 1) (clojure.string/split #",") ((fn [fs] (map #(Double/parseDouble %) fs))) double-array)]
(VectorClojure/labeledPoint y features))) stream))
(do
(.trainOn model labeled-stream)
(.print
(.predictOnValues
model
(spark/map-to-pair
(fn [lp]
(spark/tuple (.label lp) (.features lp)))
labeled-stream)))
(do ... start ...))
public static Vector tftransform(HashingTF tf, String data) {
Vector tfres = tf.transform(Arrays.asList(data.split(" ")));
return tfres;
}
(defn tftransform
[tf x]
(.transform tf (-> x (clojure.string/split #" ") into-array Arrays/asList)))
(let [spam (spark/text-file context "files/spam.txt")
ham (spark/text-file context "files/ham.txt")
tf (HashingTF. 100)
spam-features (spark/map (fn [x] (tftransform tf x)) spam)
ham-features (spark/map (fn [x] (tftransform tf x)) ham)
positive-examples (spark/map (fn [x] (LabeledPoint. 1 x)) spam-features)
negative-examples (spark/map (fn [x] (LabeledPoint. 0 x)) ham-features)
training-data (spark/union (.rdd positive-examples) (.rdd negative-examples))
model (NaiveBayes/train training-data 1.0)
predict (fn [x] (.predict model (tftransform tf x)))]
(do ... ))
(defn to-mllib-rdd [rdd]
(.rdd rdd))
(defn alternating-least-squares [data {:keys [rank num-iter lambda]}]
(ALS/train (to-mllib-rdd data) rank num-iter lambda 10))
(defn parse-ratings [sc]
(->> (spark/text-file sc "resources/data/ml-100k/ua.base")
(spark/map-to-pair parse-rating)))
(defn training-ratings [ratings]
(->> ratings
(spark/filter (fn [tuple]
(< (s-de/key tuple) 8)))
(spark/values)))
(let [options {:rank 10
:num-iter 10
:lambda 1.0}
model (-> (parse-ratings sc)
(training-ratings)
(alternating-least-squares options))]
(into [] (.recommendProducts model 1 3)))
(.predict model 789 123) ;; 预测用户789,对电影123的评分 ;;=> 2.401917277364834
(into [] (.recommendProducts model 789 5)) ;;=> 给用户789,推荐5个电影
;; Rating(789,814,3.7114404312220763)
;; Rating(789,1500,3.642514446544692) ...
;; Rating(789,1449,3.484917824309928)