-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy path07-parallelisation.Rmd
332 lines (264 loc) · 11.8 KB
/
07-parallelisation.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
# Parallélisation du code `R`
## Introduction à l'execution parallèle sous `R`
En dehors de l'optimisation du code et des algorithmes, une autre façon
d'obtenir un code performant est de tirer profit des architectures parallèles
des ordinateurs modernes. Il s'agit alors de **paralléliser** son code afin de
faire des opérations simultanées sur des parties distinctes d'un même problème,
en utilisant différent cœurs de calcul. On ne réduit pas le temps de calcul
total nécessaire, mais l'ensemble des opérations s’exécute plus rapidement.
Il existe un nombre non négligeable d'algorithmes qui sont d'un "parallélisme
embarrassant", c'est-à-dire dont les calculs peuvent se décomposer en plusieurs
sous-calculs indépendants. En statistique, il est ainsi souvent facile et direct
de paralléliser selon les différentes observations ou selon les différentes
dimensions. Typiquement, il s'agit d'opérations que l'on peut écrire sous la
forme de boucle dont les opérations sont indépendantes d'une itération de la
boucle à l'autre.
**Les opérations nécessaires pour l'établissement d'un code parallèle sont les suivantes :**
1. Démarrer $m$ processus "travailleurs" (i.e. cœurs de calcul) et les initialiser
2. Envoyer les fonctions et données nécessaires pour chaque tache aux travailleurs
3. Séparer les taches en $m$ opérations d'envergure similaire et les envoyer aux travailleurs
4. Attendre que tous les travailleurs aient terminer leurs calculs et obtenir leurs résultats
5. Rassembler les résultats des différents travailleurs
6. Arrêter les processus travailleurs
Selon les plateformes, plusieurs protocoles de communications sont disponibles
entre les cœurs. Sous les systèmes UNIX, le protocole *Fork* est le plus
utilisé, mais il n'est pas disponible sous Windows où on utilise
préférentiellement le protocole *PSOCK*. Enfin, pour les architecture de calcul
distribuée où les cœurs ne se trouvent pas nécessairement sur le même processeur
physique, on utilise généralement le protocole *MPI*. L'avantage des packages
`future` et `future.apply` est que le même code pourra être exécuté quelque soit
la configuration matérielle.
Il existe un nombre important de packages et d'initiatives permettant de faire
du calcul en R. Depuis `R 2.14.0`, le package
[`parallel`](https://stat.ethz.ch/R-manual/R-devel/library/parallel/doc/parallel.pdf)
est inclus directement dans `R` et permet de démarrer et d'arrêter un "cluster"
de plusieurs processus travailleur (étape 1 et 6). En plus du package
`parallel`, on va donc utiliser le package `future` qui permet de gérer les
processus travailleurs et la communication et l'articulation avec le package
`future.apply`qui permet lui de gérer le dialogue avec les travailleurs (envois,
réception et rassemblement des résultats - étapes 2, 3, 4 et 5).
## Première fonction parallèle en `R`
> ***À vous de jouer !***
>
> Commencez par écrire une fonction simple qui calcule le logarithme de $n$
nombres:
>
> 1. Déterminez combien de coeurs sont disponibles sur votre marchine grâce à
la fonction `future::availableCores()`.
>
> 2. À l'aide de la fonction `future::plan(multisession(workers = XX))`,
déclarez un "plan" de calculs parallèles sur votre ordinateur (en prenant garde
à **laisser un coeur disponible** pour traiter les autres processus).
>
> 3. À l'aide d'une fonction de type apply `future.apply::future_*apply()`,
calculez le log des $n$ nombres en parallèle et concaténez les résultats dans un
vecteur.
>
> 4. Comparez le temps d'éxecution avec celui d'une fonction séquentielle
sur les 100 premiers entiers, grâce à la commande :
`microbenchmark(log_par(1:100), log_seq(1:100), times=10)`
```{r echo = TRUE, message = FALSE, cache=TRUE}
library(microbenchmark)
library(future.apply)
log_seq <- function(x){
# try this yourself (spoiler alert: it is quite long...):
# res <- numeric(length(x))
# for(i in 1:length(x)){
# res[i] <- log(x[i])
# }
# return(res)
return(log(x))
}
log_par <- function(x){
res <- future_sapply(1:length(x), FUN = function(i) {
log(x[i])
})
return(res)
}
plan(multisession(workers = 3))
mb <- microbenchmark(log_par(1:100), log_seq(1:100), times = 50)
```
```{r, echo=FALSE, eval =FALSE}
log_parIter <- function(x, ncores = 1){
iter <- itertools::isplitIndices(n=length(x), chunks=ncores)
res <- future_sapply(iter, FUN = function(i) {
log(x[i])
})
return(unlist(res))
}
mb <- microbenchmark(log_par(1:100), log_parIter(1:100), times=50)
```
```{r, echo=FALSE}
library(ggplot2)
data2plot <- cbind.data.frame("Time" = mb$time/10^6, "Expression" = mb$expr)
rangeTime <- c(floor(log10(min(data2plot$Time))):ceiling(log10(max(data2plot$Time))))
brk <- NULL
for(i in rangeTime){
brk <- c(brk, seq(from = 10^i, to = 10^(i+1), by=10^i))
}
ggplot(data2plot) + geom_violin(aes(x=Expression, y=Time, fill=Expression), alpha=0.8) +
scale_fill_manual(guide="none", values=viridis::viridis(6)[1:4]) +
scale_y_log10(minor_breaks=brk) +
ylab("Log-temps de calcul (milli-sec)") +
xlab("Expression évaluée") +
theme_bw() +
ggtitle("Spoiler alert")
```
La version parallèle tourne beaucoup plus lentement... Car en fait, si les
tâches individuelles sont trop rapides, `R` va passer plus de temps à
communiquer avec les cœurs, qu'à faire les calculs effectifs.
**Il faut qu'une itération de la boucle soit relativement longue pour que le
calcul parallèle apporte un gain en temps de calcul !**
En augmentant $n$, on observe une réduction de la différence entre les 2
implémentations (le temps de calcul en parallèle augmente très lentement comparé
à l'augmentation de celui de la fonction séquentielle).
**NB :** les itérateurs d'`itertools` sont très performants mais ne peuvent
servir que lorsque le code à l'intérieur de `future_*apply()` est vectorisé (il
est toujours possible de vectoriser le code à l'intérieur, par exemple avec une
fonction de type `apply`). Ils minimisent le nombre de communication entre les
coeurs.
## Parallélisation efficace
On va maintenant se pencher sur un autre cas d'utilisation. Imaginons que l'on
ait un grand tableau de données de taille comportant 10 observations pour 100
000 variables (e.g. des mesures de génomique), et que l'on veuille calculer la
médiane pour chacune de ces variables.
```{r}
x <- matrix(rnorm(1e6), nrow = 10)
dim(x)
```
Pour un utilisateur averti de `R`, une telle opération se programme facilement
à l'aide de la fonction `apply` :
```{r}
colmedian_apply <- function(x){
return(apply(x, 2, median))
}
system.time(colmedian_apply(x))
```
En réalité, une boucle `for` n'est pas plus lente à condition d'être bien
programmée :
```{r}
colmedian_for <- function(x){
ans <- rep(0, ncol(x))
for (i in 1:ncol(x)) {
ans[i] <- median(x[, i])
}
return(ans)
}
system.time(colmedian_for(x))
```
>***À vous de jouer !***
> Essayez d'améliorer encore ce temps de calcul en parallélisant :
>
> 1. Parallélisez le calcul de la médiane de chacune des 100 000 variables.
Observe-t-on un gain en temps de calcul ?
>
> 2. Proposez une implémentation alternative grâce à la fonction
`itertools::isplitIndices()` qui permet de séparer vos données (les $n$ nombres)
en autant de groupes que vous avez de coeurs. Comparez à nouveau les temps de
calcul.
```{r, error=TRUE, cache=TRUE}
colmedian_par <- function(x){
res <- future_sapply(1:ncol(x), FUN = function(i) {
median(x[, i])
})
return(res)
}
plan(multisession(workers = 3))
system.time(colmedian_par(x))
colmedian_parIter <- function(x, ncores = 1){
iter <- itertools::isplitIndices(n = ncol(x), chunks = ncores)
res <- future_sapply(iter, FUN = function(i) {
apply(x[, i], 2, median)
})
return(unlist(res))
}
system.time(colmedian_parIter(x, ncores = 3))
mb <- microbenchmark(colmedian_apply(x),
colmedian_for(x),
colmedian_par(x),
colmedian_parIter(x, ncores = 3), times = 10)
mb
```
```{r, echo=FALSE}
library(ggplot2)
data2plot <- cbind.data.frame("Time" = mb$time/10^9, "Expression" = mb$expr)
rangeTime <- c(floor(log10(min(data2plot$Time))):ceiling(log10(max(data2plot$Time))))
brk <- NULL
for(i in rangeTime){
brk <- c(brk, seq(from = 10^i, to = 10^(i+1), by=10^i))
}
ggplot(data2plot) + geom_violin(aes(x=Expression, y=Time, fill=Expression), alpha=0.8) +
scale_fill_manual(guide="none", values=viridis::viridis(6)[1:4]) +
#scale_y_log10(minor_breaks=brk) +
ylab("Temps de calcul (sec)") +
xlab("Expression évaluée") +
theme_bw() +
ylim(0, max(data2plot$Time)) +
ggtitle("La parallélisation ça marche ! 3 coeurs disponibles pour la parallélisation")
```
### Les itérateurs
Le package `itertools` permet de séparer facilement des données ou des
taches (étape 3) tout en minimisant les communications avec les différents
travailleurs. Il s'appuie sur une implémentation des itérateurs en `R`.
Son utilisation nécessite néanmoins de vectoriser le code à l'intérieur de
`future_*apply()`. Expérimentez avec le petit code ci-dessous :
```{r, error = TRUE}
myiter <- itertools::isplitIndices(n = 30, chunks = 3)
# Une première fois
iterators::nextElem(myiter)
# Une deuxième fois... Oh ?!
iterators::nextElem(myiter)
# Encore !
iterators::nextElem(myiter)
# Encore ?
iterators::nextElem(myiter)
```
### Les autres "plans" de calculs parallèle
Pour exécuter votre code (exactement le même code, c'est un des avantages
du packages de la famille `future*`), vous devez régler un "plan" de calculs :
- sur un ordinateur (ou un unique serveur de calcul) sous Unix (Linux, Mac OS),
vous pouvez utiliser `plan(multicore(workers = XX))` qui est souvent plus
performant. Le plan multisession fonctionne toujours.
- sur un cluster de calculs (type Avakas à Bordeaux), nous renvoyons au package
[`future.batchtools`](https://cran.r-project.org/web/packages/future.batchtools/vignettes/future.batchtools.html)
## Parallélisation dans notre exemple fil rouge
>***À vous de jouer !***
>
> 1. À partir de la fonction `mvnpdfoptim()` et/ou `mvnpdfsmart()`, proposez
une implémentation parallélisant les calculs sur les observations (colonnes de $x$)
>
> 2. Comparez les temps de calcul sur 10 000 observations
```{r, cache=TRUE}
plan(multisession(workers = 3))
n <- 10000
mb <- microbenchmark::microbenchmark(
mvtnorm::dmvnorm(matrix(1.96, nrow = n, ncol = 2)),
mypkgr::mvnpdfoptim(x=matrix(1.96, nrow = 2, ncol = n), Log=FALSE),
mypkgr::mvnpdfoptim_par(x=matrix(1.96, nrow = 2, ncol = n), Log=FALSE),
mypkgr::mvnpdfoptim_parIter(x=matrix(1.96, nrow = 2, ncol = n), Log=FALSE, ncores = 3),
times=20L)
mb
```
```{r, echo=FALSE}
library(ggplot2)
data2plot <- cbind.data.frame("Time" = mb$time/10^6, "Expression" = mb$expr)
rangeTime <- c(floor(log10(min(data2plot$Time))):ceiling(log10(max(data2plot$Time))))
brk <- NULL
for(i in rangeTime){
brk <- c(brk, seq(from = 10^i, to = 10^(i+1), by=10^i))
}
levels(data2plot$Expression) <- gsub("mvtnorm::", "", sapply(strsplit(levels(data2plot$Expression), "(", fixed=T), "[", 1))
ggplot(data2plot) + geom_violin(aes(x=Expression, y=Time, fill=Expression), alpha=0.8) +
scale_fill_manual(guide="none", values=viridis::viridis(6)[1:4]) +
scale_y_log10(minor_breaks=brk) +
ylab("Log-temps de calcul (milli-sec)") +
xlab("Expression évaluée") +
theme_bw() +
ggtitle("Spoiler alert 2")
```
Notre proposition d'implementation pour `mvnpdfoptim_par` est téléchargeable [ici](https://r-dev-perf.borishejblum.science/FormationRavancee_dev_perf_files/mvnpdfoptim_par.R).
## Conclusion
La parallélisation permet de gagner du temps, mais il faut d'abord bien
optimiser son code. Quand on parallélise un code, le gain sur la durée
d’exécution dépend avant tout du ratio entre le temps de communication et le
temps de calcul effectif pour chaque tache.