Skip to content

xlisp/jim-emacs-kungfu-sparkling-lisp

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

27 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Clojure Sparkling & statistics, machine learning, kungfu

  • 一个打十几个人, 甚至都拿刀的高手,分类处理。打败弱的对手先,最后打最强的高手,必须先听桥每个对手的弱点 ,中线在哪里,怎样弱点才能打败他的中线

Spark context

(spark/with-context context
  (-> (conf/spark-conf)
      (conf/master "local[*]")
      (conf/app-name "Consumer"))
  (do ... ))

Spark Streaming context

(let [streaming-context (JavaStreamingContext. context (Duration. 1000))
     ... ]
  (do ... ))

Socket Text Stream

(def socket-stream (.socketTextStream streaming-context "localhost" 9999))

Spark 的矩阵计算(linalg+breeze)

(def vec (Vectors/dense (double-array (list 0.1 0.15 0.2 0.3 0.25))))

(def mat (Matrices/dense 3 2 (double-array (list 1.0 3.0 5.0 2.0 4.0 6.0))))

(def sm (Matrices/sparse 3 2 (int-array (list 0 1 3)) (int-array (list 0 2 1)) (double-array (list 9 6 8))))

(def sv (Vectors/sparse 3 (int-array (list 0 2)) (double-array (list 1.0 3.0))))

(def pos (LabeledPoint. 1.0 (Vectors/dense (double-array (list 1.0 0.0 3.0)))))

(def mat-t (.transpose mat))

(def mat-multiply (.multiply mat mat-t))
;; => #object[org.apache.spark.mllib.linalg.DenseMatrix 0x63608adf "5.0   11.0  17.0  \n11.0  25.0  39.0  \n17.0  39.0  61.0  "]

;; Distributed matrix
(spark/with-context sc
  (-> (conf/spark-conf)
      (conf/master "local[*]")
      (conf/app-name "Consumer"))
  (let [data (Arrays/asList (into-array (list 1 2 3 4 5 6)))
        rdd-dist-data (.parallelize sc data)
        mat (RowMatrix. (.rdd rdd-dist-data))
        mat2 (IndexedRowMatrix. (.rdd rdd-dist-data))
        row-mat (.toRowMatrix mat2)]
    ;;(list (.numRows mat) (.numCols mat))
    (.numRows mat) ;;=> 6
    ))

Spark的闭包的处理是关键,Clojure与Spark互操作的关键: 函数的序列化

  • Sparkling的数据流操作都必须在with-context下,否则会报序列化的错误
  • 而且Spark版本的问题也可能导致序列化和闭包的错误
import clojure.lang.IFn;
public class Function2 extends sparkling.serialization.AbstractSerializableWrappedIFn implements org.apache.spark.api.java.function.Function2, org.apache.spark.sql.api.java.UDF2 {
    public Function2(IFn func) {
        super(func);
    }
    public Object call(Object v1, Object v2) throws Exception {
    return f.invoke(v1, v2);
  }
}
(gen-function Function function)
(gen-function Function2 function2)
(gen-function Function3 function3)
(gen-function VoidFunction void-function)
(gen-function FlatMapFunction flat-map-function)
(gen-function FlatMapFunction2 flat-map-function2)
(gen-function PairFlatMapFunction pair-flat-map-function)
(gen-function PairFunction pair-function)
(defn foreach-rdd [dstream f]
  (.foreachRDD dstream (function2 f)))
(foreach-rdd
 stream
 (fn [rdd arg2] ...))

函数式操作的核心,SICP的原力爆发: map reduce

(defn map
  [f rdd]
  (-> (.map rdd (function f))
      (u/set-auto-name (u/unmangle-fn f))))
(defn map-to-pair
  [f rdd]
  (-> (.mapToPair rdd (pair-function f))
      (u/set-auto-name (u/unmangle-fn f))))
(defn reduce
  [f rdd]
  (u/set-auto-name (.reduce rdd (function2 f)) (u/unmangle-fn f)))
(defn foreach-partition
  [f rdd]
  (.foreachPartition rdd (void-function (comp f iterator-seq))))
(defn partition-by
  [^Partitioner partitioner ^JavaPairRDD rdd]
  (.partitionBy rdd partitioner))

Clojure和Scala的互操作: tuple and untuple

(spark/map-to-pair
 (fn [lp]
   (spark/tuple (.label lp) (.features lp)))
 labeled-stream)

(defn untuple [^Tuple2 t]
  (persistent!
   (conj!
    (conj! (transient []) (._1 t))
    (._2 t))))

Stream hello-world print整个数据流

(defn -main
  [& args]
  (do
    (.print socket-stream) ;; 或者是其它流, 如Kafka
    (.start streaming-context)
    (.awaitTermination streaming-context)))

Kafka Stream

(let [parameters (HashMap. {"metadata.broker.list" "127.0.0.1:9092"})
      topics (Collections/singleton "abc_messages")
      stream (KafkaUtils/createDirectStream streaming-context String String StringDecoder StringDecoder parameters topics)
     ... ]
  (do ... ))

Spark SQL

  • TODOS: 改写4G数据记录的Spark查询
(defn select
  [cols data-frame]
  (.select data-frame
           (into-array cols)))
(defn where
  "call where by "
  [expression data-frame]
  (.where data-frame expression))

Foreach RDD

(defn foreach-rdd [dstream f]
  (.foreachRDD dstream (function2 f)))

线性回归SGD

public static StreamingLinearRegressionWithSGD linearRegressionodel(double [] args, int num, float size) {
    StreamingLinearRegressionWithSGD model = new StreamingLinearRegressionWithSGD()
        .setStepSize(size)
        .setNumIterations(num)
        .setInitialWeights(Vectors.dense(args));
    return model;
}
public static LabeledPoint labeledPoint(double label, double [] args) {
    LabeledPoint point = new LabeledPoint(label, Vectors.dense(args));
    return point;
}
(def model (VectorClojure/linearRegressionodel (double-array (repeat 100 0.0)) 1 0.01))
(def labeled-stream
  (spark/map
   (fn [record]
     (let [split (clojure.string/split record #"\t")
           y (Double/parseDouble (nth split 0))
           features (-> (nth split 1) (clojure.string/split #",") ((fn [fs] (map #(Double/parseDouble %) fs))) double-array)]
       (VectorClojure/labeledPoint y features))) stream))
(do
  (.trainOn model labeled-stream)
  (.print
   (.predictOnValues
    model
    (spark/map-to-pair
     (fn [lp]
       (spark/tuple (.label lp) (.features lp)))
     labeled-stream)))
  (do ... start ...))

贝叶斯

public static Vector tftransform(HashingTF tf, String data) {
    Vector tfres = tf.transform(Arrays.asList(data.split(" ")));
    return tfres;
}
(defn tftransform
  [tf x]
  (.transform tf (-> x (clojure.string/split #" ") into-array Arrays/asList)))

(let [spam (spark/text-file context "files/spam.txt")
      ham (spark/text-file context "files/ham.txt")
      tf (HashingTF. 100)
      spam-features (spark/map (fn [x] (tftransform tf x)) spam)
      ham-features (spark/map (fn [x] (tftransform tf x)) ham)
      positive-examples (spark/map (fn [x] (LabeledPoint. 1 x)) spam-features)
      negative-examples (spark/map (fn [x] (LabeledPoint. 0 x)) ham-features)
      training-data (spark/union (.rdd positive-examples) (.rdd negative-examples))
      model (NaiveBayes/train training-data 1.0)
      predict (fn [x] (.predict model (tftransform tf x)))]
  (do ... ))

ALS交替最小二乘法的协同过滤算法--推荐引擎学习

(defn to-mllib-rdd [rdd]
  (.rdd rdd))
(defn alternating-least-squares [data {:keys [rank num-iter lambda]}]
  (ALS/train (to-mllib-rdd data) rank num-iter lambda 10))
(defn parse-ratings [sc]
  (->> (spark/text-file sc "resources/data/ml-100k/ua.base")
       (spark/map-to-pair parse-rating)))
(defn training-ratings [ratings]
  (->> ratings
       (spark/filter (fn [tuple]
                       (< (s-de/key tuple) 8)))
       (spark/values)))

(let [options {:rank 10
               :num-iter 10
               :lambda 1.0}
      model (-> (parse-ratings sc)
                (training-ratings)
                (alternating-least-squares options))]
  (into [] (.recommendProducts model 1 3)))

(.predict model 789 123) ;; 预测用户789,对电影123的评分 ;;=> 2.401917277364834

(into [] (.recommendProducts model 789 5)) ;;=> 给用户789,推荐5个电影
;; Rating(789,814,3.7114404312220763)
;; Rating(789,1500,3.642514446544692) ... 
;; Rating(789,1449,3.484917824309928)

随机模拟,随机树到随机森林和功夫的骗手,用骗手来知其真

svm升维度,敌人分类,然后降维度

每一个攻防招式都是高阶函数

组合拳或者腿就是组合函数

能够和你黐手的朋友才是真正的朋友

一个打十几个人的能力,打一百万人的能力,从身边的人打起爸爸,妈妈,姐

一切都是功夫,一切都是高阶函数,皆可打出去

一对多个分布式对手,是分布式的数据源数据流,攻防的组合招式是数据流的管道

功夫中的双截棍等武器,是数据流管道的用的帮助工具

咏春分手(小念头的分手在第二段,寻桥分手加了转马),手像两个盾牌一样,攻击连消带打 ,来留去送 ,甩手直冲。。。而不是他打我,我后退马,只是防御 ,没有攻击的防御是没用的 => 分手训练,小念头和寻桥分手训练,一手标一伏,发力为一体的两手,而不是分裂的,但是分工不一样

离开我中线,非中线向前,就可以出击 ,用黐手的打来提醒训练,发力错误,肘低发力 肩膀放松 归中三角形结构力 攻防才能奏效

离开我就打他,叫甩手直冲

来留去送 ,就像放风筝一样,顺着他的力 然后打他,如他向前较劲就拉打,他拉我就 我就撞打他

生活中的中线原理和埋肘原理,守中用中: 归中 和 连消带打 111 用在朋友和高人身上一样奏效,父母身上,创造咏春拳的人 ,才是绝世大师,中线原理

近邻分类(KNN)

朴素贝叶斯分类

决策树分类

预测数值型数据: 广义回归方法

神经网络和支持向量机

K均值聚类

About

Clojure Sparkling & statistics, machine learning, kungfu

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published