-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathS5_Exam_PySpark.py
88 lines (61 loc) · 2.84 KB
/
S5_Exam_PySpark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Import de SparkSession et de SparkContext
from pyspark.sql import SparkSession
from pyspark import SparkContext
# Création d'un SparkContext
sc = SparkContext.getOrCreate()
# Création d'une session Spark
spark = SparkSession \
.builder \
.appName("Exam PySpark SCH") \
.getOrCreate()
spark
# Importer le fichier creditcard.scv
df_raw = spark.read.csv('data/creditcard.csv', header=True)
# Afficher un extrait du DataFrame df_raw
df_raw.sample(False, .001, seed = 222).toPandas()
##########################
distinct_classes = df_raw.select('Class').distinct()
# Convertir le résultat en DataFrame Pandas pour l'affichage
distinct_classes_df = distinct_classes.toPandas()
print(distinct_classes_df)
##########################
# importer col
from pyspark.sql.functions import col
#Créer un DataFrame df à partir de df_raw en changeant les colonnes des variables explicatives en double
#et la variable cible, Class, en int.
exprs = [col(c).cast("double") for c in df_raw.columns[1:30]]
df = df_raw.select(df_raw.Class.cast('int'),
*exprs)
# Afficher le schema
print("Schema du DataFrame df :")
df.printSchema()
# supprimer les lignes manquantes:
df = df.dropna()
# Importer la focntion DenseVector
from pyspark.ml.linalg import DenseVector
# Conversion de la base de données au format svmlib
#Créer un rdd rdd_ml séparant la variable à expliquer des features (à mettre sous forme DenseVector)
rdd_ml = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
#Créer un DataFrame df_ml contenant notre base de données sous deux variables : 'labels' et 'features'
df_ml = spark.createDataFrame(rdd_ml, ['label', 'features'])
df_ml.show()
# Créer deux DataFrames appelés train et test contenant chacun respectivement 80% et 20% des données
train, test = df_ml.randomSplit([0.8, 0.2], seed=222)
# Import de la fonction RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier
# Créer un classificateur Forest cf
clf = RandomForestClassifier(featuresCol='features', labelCol='label', predictionCol='prediction',seed = 222)
# Entraîner le modèle sur l'ensemble d'entraînement
model = clf.fit(train)
# Faire des prédictions sur l'ensemble de test
predictions = model.transform(test)
# Affichage d'un extrait des prédictions
predictions.sample(False, 0.001 , seed = 222).toPandas()
# Import d'un évaluateur MulticlassClassificationEvaluator du package pyspark.ml.evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Création d'un évaluateur
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
# Calcul et affichage de la précision du modèle
accuracy = evaluator.evaluate(predictions)
print(accuracy)
spark.stop()