-
Notifications
You must be signed in to change notification settings - Fork 1
/
Main
306 lines (243 loc) · 10.1 KB
/
Main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PairRDDFunctions
import annotation.tailrec
import scala.reflect.ClassTag
import annotation.tailrec
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
/** A raw posting, either a question or an answer */
case class Posting(postingType: Int, id: Int, parentId: Option[Int], score: Int, tags: Option[String]) extends Serializable
/** The main class */
object Assignment2 extends Assignment2 {
@transient lazy val conf: SparkConf = new SparkConf().setMaster("local").setAppName("Assignment2")
@transient lazy val sc: SparkContext = new SparkContext(conf)
//sc.setLogLevel("WARN")
/** Main function */
def main(args: Array[String]): Unit = {
// val conf = new SparkConf()
// conf.setAppName("Assignment2")
// conf.setMaster("local")
// val sc = new SparkContext(conf)
val lines = sc.textFile(args(0))
val raw = rawPostings(lines)
val Groupedpairs = groupedPosting(raw)
val Vec: RDD[(Int, (String, Int))] = scoredPosting(Groupedpairs)
val TupleAll = vectorPosting(Vec)
//cluster centroid, size of cluster, median score, avg score
printResults(Kmeans(TupleAll))
// val means = kmeans(sampleVectors(vectors), vectors, debug = true)
// val results = clusterResults(means, vectors)
// printResults(results)
}
}
/** The parsing and kmeans methods */
class Assignment2 extends Serializable {
/** Languages */
val Domains =
List(
"Machine-Learning", "Compute-Science", "Algorithm", "Big-Data", "Data-Analysis", "Security", "Silicon Valley", "Computer-Systems",
"Deep-learning", "Internet-Service-Providers", "Programming-Language", "Cloud-services", "Software-Engineering", "Embedded-System", "Architecture")
/** K-means parameter: How "far apart" languages should be for the kmeans algorithm? */
def DomainSpread = 50000
assert(DomainSpread > 0)
/** K-means parameter: Number of clusters */
def kmeansKernels = 45
/** K-means parameter: Convergence criteria, if changes of all centriods < kmeansEta, stop */
def kmeansEta: Double = 20.0D
/** K-means parameter: Maximum iterations */
def kmeansMaxIterations = 4
/** Load postings from the given file */
//将原始数据集以逗号隔开并输出Posting格式(postingType(Question or Answer), id, parentId, score, tags)
def rawPostings(lines: RDD[String]): RDD[Posting] =
lines.map(line => {
val arr = line.split(",")
Posting(postingType = arr(0).toInt,
id = arr(1).toInt,
parentId = if (arr(2) == "") None else Some(arr(2).toInt),
score = arr(3).toInt,
tags = if (arr.length >= 5) Some(arr(4).intern()) else None)
})
/** Group the questions and answers together */
//
def groupedPosting(lines: RDD[Posting]) = {
// Filter the questions and answers separately
// Prepare them for a join operation by extracting the QID value in the first element of a tuple.
// Filter the questions and answers separately
val QRaw: RDD[Posting] = lines.filter(itr => {
itr.postingType == 1
})
val ARaw = lines.filter(itr => {
itr.postingType == 2
})
//Question保留id 和 tags
val QTuple = QRaw.map(item => {
Tuple2(item.id, item.tags.getOrElse(""))
})
//Answer保留parentId 和 answer得分
val ATuple = ARaw.map(itr => {
Tuple2(itr.parentId.getOrElse(0), itr.score)
})
//将QTuple_id = ATuple_parentId的组合通过join组合起来
//输出GroupedPairs:RDD[(Int, (String, Int))] = RDD[(Question_id, (Question_tags, 与Question_id对应的Answer的得分))]
val GroupedPairs: RDD[(Int, (String, Int))] = QTuple.join(ATuple)
GroupedPairs
}
/** Compute the maximum score for each posting */
//GroupedPairs:RDD[(Question_id, (Question_tags, 与Question_id对应的Answer的得分))]中存在很多
//[(Question_a, (Question_b, 与Question_a对应的Answer的得分c))],[(Question_a, (Question_b, 与Question_a对应的Answer的得分d))]
//筛选出c和d中的较大者
def scoredPosting(lines: RDD[(Int, (String, Int))]) = {
val MaxValue = lines.reduceByKey((x, y) => {
if (x._2 < y._2) y else x
})
//输出MaxValue:RDD[(Question_id, (Question_tags, Int)),与Question_id对应的所有Answer的最高得分]
MaxValue
}
/** Compute the vectors for the kmeans */
//将MaxValue:RDD[(Question_id, (Question_tags, Int)),与Question_id对应的所有Answer的最高得分]中Question_tags换成对应的index*50000
def vectorPosting(lines: RDD[(Int, (String, Int))]) = {
val TupleAll = lines.map(item => {
val Vec1 = Domains.indexOf(item._2._1) * DomainSpread
Tuple2(Vec1, item._2._2)
})
//最后只输出TupleAll:RDD[(Int,Int)] = TupleAll:RDD[(D*X,Score)]
TupleAll
}
//
//
// Kmeans method:
//
//
/** Main kmeans computation */
final def Kmeans(TupleAll: RDD[(Int, Int)]) = {
//设置初始中心点
val kPoints: Array[(Int, Int)] = TupleAll.distinct().takeSample(false, kmeansKernels, 1)
//定义最终输出(此时尚且不重要)
var finaloutput = ArrayBuffer[((Int, Int), Int, Int, Int)]()
//定义循环n和循环n+1中两次聚类中心点的变化
var distance = Double.PositiveInfinity
//定义循环次数
var iterations = 0
//当循环n和循环n+1中两次聚类中心点的变化小于阈值或循环次数超出限制时循环停止
while (!converged(distance) && iterations <= kmeansMaxIterations) {
//通过findClosest在上一次聚类中心点的基础上聚类出新的类
//findClosest在上一次聚类中心点列表中找出和输入参数点距离最近的中心点在聚类中心点列表中的序列号
//输出WaitForClustering:RDD[(Int, (Int, Int))] = WaitForClustering:RDD[(距离最近的聚类中心点序列号, (D*X, Score))]
//每一个聚类中心点序列号下的所有(D*X, Score)代表一个新的聚类
val WaitForClustering = TupleAll.map(item => {
(findClosest((item._1, item._2), kPoints), item)
})
//通过一次Map Reduce计算出新的聚类的点的总个数
val CountMap: RDD[(Int, Int)] = WaitForClustering.map(itr => {
Tuple2(itr._1, 1)
})
val CountReduce = CountMap.reduceByKey((v1: Int, v2: Int) => {
v1 + v2
})
//为计算每个新聚类的聚类中心点做准备
//通过groupByKey将每个聚类分组
val NewClusterRDD = WaitForClustering.groupByKey()
//对每个聚类分组做计算所有点的平均值操作得到新的聚类中心点
//输出NewCentroids: RDD[(Int, (Int, Int))] = NewCentroids: RDD[(index of cluster, (D*X, Score))]
val NewCentroids: RDD[(Int, (Int, Int))] = NewClusterRDD.map(itr => {
Tuple2(itr._1, averageVectors(itr._2))
})
//计算聚类的median
val Median = NewClusterRDD.map(item => {
Tuple2(item._1, computeMedian(item._2))
})
val LengthOfNewCentroids = NewCentroids.keys.count().toInt
// println(LengthOfNewCentroids)
var ArrayNewCentroids = new Array[(Int, Int)](45)
//如果新的聚类中心点
if (LengthOfNewCentroids < kPoints.length) {
val temp = kPoints.drop(LengthOfNewCentroids)
val temp_1 = NewCentroids.values.collect()
ArrayNewCentroids = temp_1 ++ temp
}
else {
ArrayNewCentroids = NewCentroids.values.collect()
}
//计算新聚类中心点和老聚类中心点的距离
distance = euclideanDistance(kPoints, ArrayNewCentroids)
iterations = iterations + 1
for (i <- 0 until kmeansKernels) {
kPoints(i) = ArrayNewCentroids(i)
}
if (converged(distance) || iterations > kmeansMaxIterations) {
//通过聚类分组的index连接NewCentroids,size,median
val FinalOutput = NewCentroids.join(CountReduce).join(Median).map(itr => {
val tuple = Tuple2(itr._2._1._1._1, itr._2._1._1._2)
//index of cluster, cluster centroid, size of cluster, median score, avg score
Tuple4(tuple, itr._2._1._2, itr._2._2, itr._2._1._1._2)
}).collect()
// FinalOutput.foreach(println)
FinalOutput.foreach(item => {
finaloutput.+=(item)
})
}
}
finaloutput
}
//
//
/** Decide whether the kmeans clustering converged */
def converged(distance: Double) = distance < kmeansEta
/** Return the euclidean distance between two points */
def euclideanDistance(v1: (Int, Int), v2: (Int, Int)): Double = {
val part1 = (v1._1 - v2._1).toDouble * (v1._1 - v2._1)
val part2 = (v1._2 - v2._2).toDouble * (v1._2 - v2._2)
part1 + part2
}
/** Return the euclidean distance between two points */
def euclideanDistance(a1: Array[(Int, Int)], a2: Array[(Int, Int)]): Double = {
// println(a1.length)
// println(a2.length)
assert(a1.length == a2.length)
var sum = 0d
var idx = 0
while (idx < a1.length) {
sum += euclideanDistance(a1(idx), a2(idx))
idx += 1
}
sum
}
/** Return the closest point */
def findClosest(p: (Int, Int), centers: Array[(Int, Int)]): Int = {
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 0 until centers.length) {
val tempDist = euclideanDistance(p, centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
}
}
bestIndex
}
/** Average the vectors */
def averageVectors(ps: Iterable[(Int, Int)]): (Int, Int) = {
val iter = ps.iterator
var count = 0
var comp1: Long = 0
var comp2: Long = 0
while (iter.hasNext) {
val item = iter.next
comp1 += item._1
comp2 += item._2
count += 1
}
((comp1 / count).toInt, (comp2 / count).toInt)
}
def computeMedian(a: Iterable[(Int, Int)]) = {
val s = a.map(x => x._2).toArray
val length = s.length
val (lower, upper) = s.sortWith(_ < _).splitAt(length / 2)
if (length % 2 == 0) (lower.last + upper.head) / 2 else upper.head
}
def printResults(item: ArrayBuffer[((Int, Int), Int, Int, Int)]) = {
item.foreach(println)
}
}