forked from avickars/rapids-blog-post
-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_ml_tests.py
85 lines (63 loc) · 2.73 KB
/
spark_ml_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from pyspark.sql import SparkSession, functions, types
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
import time
import sys
NUM_START_ROWS = 2500
NUM_EXECUTIONS_PER_TEST = 3
NUM_DSIZE_DOUBLINGS = 9
NUM_FEATURES = 399 + 1
def main(input, output):
schema = types.StructType([
types.StructField(f"{i}", types.DoubleType()) for i in range(0, NUM_FEATURES)
])
results = []
for i in range(0, NUM_DSIZE_DOUBLINGS):
print('Test:', i, '***************************************************************************************************************')
X = spark.read.csv(
f"{input}/test_{i}/",
schema=schema,
header=True)
test = {'Test': 'Linear Regression', 'Test Number': i}
# ******************************************************************************
# LINEAR REGRESSION TEST
# Starting timer
t0 = time.time()
for j in range(0, NUM_EXECUTIONS_PER_TEST):
assemble_features = VectorAssembler(
inputCols=[f"{i}" for i in range(0, NUM_FEATURES - 1)],
outputCol='features')
X_vector = assemble_features.transform(X)
X_vector = X_vector.select(['features', '399'])
ols = LinearRegression(
featuresCol='features', labelCol='399', fitIntercept=True)
# pipeline_ols = Pipeline(stages=[assemble_features, ols])
ols_model = ols.fit(X_vector)
print("Intercept: " + str(ols_model.intercept))
# Stopping clock
t1 = time.time()
# Recording Results
total_time = t1 - t0
avg_time = total_time / NUM_EXECUTIONS_PER_TEST
test['Total'] = total_time
test['Average'] = avg_time
results.append(test)
test = {'Test': 'K Nearest Neighbour', 'Test Number': i, 'Total': 0.0, 'Average': 0.0}
results.append(test)
# pd.DataFrame(results).to_csv('spark_ml_results.csv')
schema = types.StructType([
types.StructField('Test', types.StringType()),
types.StructField('Test Number', types.IntegerType()),
types.StructField('Total', types.DoubleType()),
types.StructField('Average', types.DoubleType())
])
results_rdd = sc.parallelize(results, numSlices=1)
spark.createDataFrame(data=results_rdd, schema=schema).write.csv(output, header=True, mode='overwrite')
if __name__ == '__main__':
input = sys.argv[1]
output = sys.argv[2]
spark = SparkSession.builder.appName('spark ml tests').getOrCreate()
assert spark.version >= '3.0' # make sure we have Spark 3.0+
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext
main(input, output)