-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.nf
476 lines (401 loc) · 16.6 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
#!/usr/bin/env nextflow
nextflow.enable.dsl = 2
// Sub workflows
include { get_input_files } from "./workflows/get_input_files"
include { encyclopedia_search as encyclopeda_export_elib } from "./workflows/encyclopedia_search"
include { encyclopedia_search as encyclopedia_quant } from "./workflows/encyclopedia_search"
include { diann_search } from "./workflows/diann_search"
include { get_mzmls as get_narrow_mzmls } from "./workflows/get_mzmls"
include { get_mzmls as get_wide_mzmls } from "./workflows/get_mzmls"
include { skyline_import } from "./workflows/skyline_import"
include { skyline_reports } from "./workflows/skyline_run_reports"
include { generate_dia_qc_report } from "./workflows/generate_qc_report"
include { panorama_upload_results } from "./workflows/panorama_upload"
include { panorama_upload_mzmls } from "./workflows/panorama_upload"
include { save_run_details } from "./workflows/save_run_details"
include { get_pdc_files } from "./workflows/get_pdc_files"
include { combine_file_hashes } from "./workflows/combine_file_hashes"
// modules
include { ENCYCLOPEDIA_BLIB_TO_DLIB } from "./modules/encyclopedia"
include { ENCYCLOPEDIA_DLIB_TO_TSV } from "./modules/encyclopedia"
include { BLIB_BUILD_LIBRARY } from "./modules/diann"
include { GET_AWS_USER_ID } from "./modules/aws"
include { BUILD_AWS_SECRETS } from "./modules/aws"
include { EXPORT_GENE_REPORTS } from "./modules/qc_report"
// useful functions and variables
include { param_to_list } from "./workflows/get_input_files"
// Check if old Skyline parameter variables are defined.
// If the old variable is defnied, return the params value of the old variable,
// otherwise return the params value of the new variable
def check_old_param_name(old_var, new_var) {
def(section, param) = new_var.split(/\./)
if(params[old_var] != null) {
if(params[section][param] != null) {
log.warn "Both params.$old_var and params.$new_var are defined!"
}
log.warn "Setting params.$new_var = params.$old_var"
return params[old_var]
}
return params[section][param]
}
//
// The main workflow
//
workflow {
all_mzml_ch = null // hold all mzml files generated
all_elib_ch = null // hold all elibs generated by encyclopedia
all_diann_file_ch = null // all files generated by diann to upload
// version file channles
encyclopedia_version = null
diann_version = null
proteowizard_version = null
dia_qc_version = null
config_file = file(workflow.configFiles[1]) // the config file used
// check for old param variable names
params.skyline.document_name = check_old_param_name('skyline_document_name',
'skyline.document_name')
params.skyline.skip = check_old_param_name('skip_skyline',
'skyline.skip')
params.skyline.template_file = check_old_param_name('skyline_template_file',
'skyline.template_file')
params.skyline.skyr_file = check_old_param_name('skyline_skyr_file',
'skyline.skyr_file')
// check for required params or incompatible params
if(params.panorama.upload && !params.panorama.upload_url) {
error "Panorama upload requested, but missing param: \'panorama.upload_url\'."
}
if(params.panorama.import_skyline) {
if(!params.panorama.upload) {
error "Import of Skyline document in Panorama requested, but \'panorama.upload\' is not set to true."
}
if(params.skyline.skip) {
error "Import of Skyline document in Panorama requested, but \'skyline.skip\' is set to true."
}
}
// if accessing panoramaweb and running on aws, set up an aws secret
if(workflow.profile == 'aws' && is_panorama_used) {
GET_AWS_USER_ID()
BUILD_AWS_SECRETS(GET_AWS_USER_ID.out)
aws_secret_id = BUILD_AWS_SECRETS.out.aws_secret_id
} else {
aws_secret_id = Channel.of('none').collect() // ensure this is a value channel
}
// get mzML files
if(params.pdc.study_id) {
get_pdc_files()
wide_mzml_ch = get_pdc_files.out.wide_mzml_ch
pdc_study_name = get_pdc_files.out.study_name
} else{
get_wide_mzmls(params.quant_spectra_dir, params.quant_spectra_glob, aws_secret_id)
wide_mzml_ch = get_wide_mzmls.out.mzml_ch
}
narrow_mzml_ch = null
if(params.chromatogram_library_spectra_dir != null) {
get_narrow_mzmls(params.chromatogram_library_spectra_dir,
params.chromatogram_library_spectra_glob,
aws_secret_id)
narrow_mzml_ch = get_narrow_mzmls.out.mzml_ch
all_mzml_ch = wide_mzml_ch.concat(narrow_mzml_ch)
} else {
all_mzml_ch = wide_mzml_ch
}
// only perform msconvert and terminate
if(params.msconvert_only) {
// save details about this run
input_files = all_mzml_ch.map{ it -> ['Spectra File', it.baseName] }
version_files = Channel.empty()
save_run_details(input_files.collect(), version_files.collect())
run_details_file = save_run_details.out.run_details
// if requested, upload mzMLs to panorama
if(params.panorama.upload) {
panorama_upload_mzmls(
params.panorama.upload_url,
all_mzml_ch,
run_details_file,
config_file,
aws_secret_id
)
}
return
}
get_input_files(aws_secret_id) // get input files
// set up some convenience variables
if(params.spectral_library) {
spectral_library = get_input_files.out.spectral_library
} else {
spectral_library = Channel.empty()
}
if(params.pdc.study_id) {
if(params.replicate_metadata) {
log.warn "params.replicate_metadata will be overritten by PDC metadata"
}
replicate_metadata = get_pdc_files.out.annotations_csv
} else {
replicate_metadata = get_input_files.out.replicate_metadata
}
fasta = get_input_files.out.fasta
skyline_template_zipfile = get_input_files.out.skyline_template_zipfile
skyr_file_ch = get_input_files.out.skyr_files
final_elib = null
if(params.search_engine.toLowerCase() == 'encyclopedia') {
if(!params.spectral_library) {
error "The parameter \'spectral_library\' is required when using EncyclopeDIA."
}
all_diann_file_ch = Channel.empty() // will be no diann
diann_version = Channel.empty()
// convert blib to dlib if necessary
if(params.spectral_library.endsWith(".blib")) {
ENCYCLOPEDIA_BLIB_TO_DLIB(
fasta,
spectral_library
)
spectral_library_to_use = ENCYCLOPEDIA_BLIB_TO_DLIB.out.dlib
} else {
spectral_library_to_use = spectral_library
}
// create elib if requested
if(params.chromatogram_library_spectra_dir != null) {
// create chromatogram library
encyclopeda_export_elib(
narrow_mzml_ch,
fasta,
spectral_library_to_use,
'false',
'narrow',
params.encyclopedia.chromatogram.params
)
quant_library = encyclopeda_export_elib.out.elib
spec_lib_hashes = encyclopeda_export_elib.out.output_file_stats
all_elib_ch = encyclopeda_export_elib.out.elib.concat(
encyclopeda_export_elib.out.individual_elibs
)
} else {
quant_library = spectral_library_to_use
spec_lib_hashes = Channel.empty()
all_mzml_ch = wide_mzml_ch
all_elib_ch = Channel.empty()
}
// search wide-window data using chromatogram library
encyclopedia_quant(
wide_mzml_ch,
fasta,
quant_library,
'true',
'wide',
params.encyclopedia.quant.params
)
encyclopedia_version = encyclopedia_quant.out.encyclopedia_version
search_file_stats = encyclopedia_quant.out.output_file_stats.concat(spec_lib_hashes)
final_elib = encyclopedia_quant.out.elib
all_elib_ch = all_elib_ch.concat(
encyclopedia_quant.out.individual_elibs,
encyclopedia_quant.out.elib,
encyclopedia_quant.out.peptide_quant,
encyclopedia_quant.out.protein_quant
)
} else if(params.search_engine.toLowerCase() == 'diann') {
if (params.chromatogram_library_spectra_dir != null) {
log.warn "The parameter 'chromatogram_library_spectra_dir' is set to a value (${params.chromatogram_library_spectra_dir}) but will be ignored."
}
if (params.encyclopedia.quant.params != null) {
log.warn "The parameter 'encyclopedia.quant.params' is set to a value (${params.encyclopedia.quant.params}) but will be ignored."
}
if (params.encyclopedia.chromatogram.params != null) {
log.warn "The parameter 'encyclopedia.chromatogram.params' is set to a value (${params.encyclopedia.chromatogram.params}) but will be ignored."
}
if(params.spectral_library) {
// convert spectral library to required format for dia-nn
if(params.spectral_library.endsWith(".blib")) {
ENCYCLOPEDIA_BLIB_TO_DLIB(
fasta,
spectral_library
)
ENCYCLOPEDIA_DLIB_TO_TSV(
ENCYCLOPEDIA_BLIB_TO_DLIB.out.dlib
)
spectral_library_to_use = ENCYCLOPEDIA_DLIB_TO_TSV.out.tsv
} else if(params.spectral_library.endsWith(".dlib")) {
ENCYCLOPEDIA_DLIB_TO_TSV(
spectral_library
)
spectral_library_to_use = ENCYCLOPEDIA_DLIB_TO_TSV.out.tsv
} else {
spectral_library_to_use = spectral_library
}
} else {
// no spectral library
spectral_library_to_use = Channel.empty()
}
all_elib_ch = Channel.empty() // will be no encyclopedia
encyclopedia_version = Channel.empty()
all_mzml_ch = wide_mzml_ch
diann_search(
wide_mzml_ch,
fasta,
spectral_library_to_use
)
diann_version = diann_search.out.diann_version
search_file_stats = diann_search.out.output_file_stats
// create compatible spectral library for Skyline, if needed
if(!params.skyline.skip) {
BLIB_BUILD_LIBRARY(diann_search.out.speclib,
diann_search.out.precursor_tsv)
final_elib = BLIB_BUILD_LIBRARY.out.blib
} else {
final_elib = Channel.empty()
}
// all files to upload to panoramaweb (if requested)
all_diann_file_ch = diann_search.out.speclib.concat(
diann_search.out.precursor_tsv
).concat(
diann_search.out.quant_files.flatten()
).concat(
final_elib
).concat(
diann_search.out.stdout
).concat(
diann_search.out.stderr
).concat(
diann_search.out.predicted_speclib
)
} else {
error "'${params.search_engine}' is an invalid argument for params.search_engine!"
}
if(!params.skyline.skip) {
// create Skyline document
if(skyline_template_zipfile != null) {
skyline_import(
skyline_template_zipfile,
fasta,
final_elib,
wide_mzml_ch,
replicate_metadata
)
proteowizard_version = skyline_import.out.proteowizard_version
}
final_skyline_file = skyline_import.out.skyline_results
final_skyline_hash = skyline_import.out.skyline_results_hash
// generate QC report
if(!params.qc_report.skip) {
generate_dia_qc_report(final_skyline_file, replicate_metadata)
dia_qc_version = generate_dia_qc_report.out.dia_qc_version
qc_report_files = generate_dia_qc_report.out.qc_reports.concat(
generate_dia_qc_report.out.qc_report_qmd,
generate_dia_qc_report.out.qc_report_db,
generate_dia_qc_report.out.qc_tables
)
// Export PDC gene tables
if(params.pdc.gene_level_data != null) {
EXPORT_GENE_REPORTS(generate_dia_qc_report.out.qc_report_db,
params.pdc.gene_level_data,
pdc_study_name)
EXPORT_GENE_REPORTS.out.gene_reports | flatten | set{ gene_reports }
} else {
gene_reports = Channel.empty()
}
} else {
dia_qc_version = Channel.empty()
qc_report_files = Channel.empty()
gene_reports = Channel.empty()
}
// run reports if requested
skyline_reports_ch = null;
if(params.skyline.skyr_file) {
skyline_reports(
final_skyline_file,
skyr_file_ch
)
skyline_reports_ch = skyline_reports.out.skyline_report_files.flatten()
} else {
skyline_reports_ch = Channel.empty()
}
} else {
// skip skyline
skyline_reports_ch = Channel.empty()
skyr_file_ch = Channel.empty()
final_skyline_file = Channel.empty()
qc_report_files = Channel.empty()
proteowizard_version = Channel.empty()
final_skyline_hash = Channel.empty()
dia_qc_version = Channel.empty()
gene_reports = Channel.empty()
}
version_files = encyclopedia_version.concat(diann_version,
proteowizard_version,
dia_qc_version).splitText()
input_files = fasta.map{ it -> ['Fasta file', it.name] }.concat(
spectral_library.map{ it -> ['Spectra library', it.baseName] },
all_mzml_ch.map{ it -> ['Spectra file', it.baseName] })
save_run_details(input_files.collect(), version_files.collect())
run_details_file = save_run_details.out.run_details
combine_file_hashes(fasta, spectral_library,
search_file_stats,
final_skyline_file,
final_skyline_hash,
skyline_reports_ch,
qc_report_files,
gene_reports,
run_details_file)
// upload results to Panorama
if(params.panorama.upload) {
panorama_upload_results(
params.panorama.upload_url,
all_elib_ch,
all_diann_file_ch,
final_skyline_file,
all_mzml_ch,
fasta,
spectral_library,
run_details_file,
config_file,
skyr_file_ch,
skyline_reports_ch,
aws_secret_id
)
}
}
// return true if any entry in the list created from the param is a panoramaweb URL
def any_entry_is_panorama(param) {
values = param_to_list(param)
return values.any { it.startsWith(params.panorama.domain) }
}
// return true if panoramaweb will be accessed by this Nextflow run
def is_panorama_used() {
return params.panorama.upload ||
(params.fasta && params.fasta.startsWith(params.panorama.domain)) ||
(params.spectral_library && params.spectral_library.startsWith(params.panorama.domain)) ||
(params.replicate_metadata && params.replicate_metadata.startsWith(params.panorama.domain)) ||
(params.skyline.template_file && params.skyline.template_file.startsWith(params.panorama.domain)) ||
(params.quant_spectra_dir && any_entry_is_panorama(params.quant_spectra_dir)) ||
(params.chromatogram_library_spectra_dir && any_entry_is_panorama(params.chromatogram_library_spectra_dir)) ||
(params.skyline_skyr_file && any_entry_is_panorama(params.skyline_skyr_file))
}
//
// Used for email notifications
//
def email() {
// Create the email text:
def (subject, msg) = EmailTemplate.email(workflow, params)
// Send the email:
if (params.email) {
sendMail(
to: "$params.email",
subject: subject,
body: msg
)
}
}
//
// This is a dummy workflow for testing
//
workflow dummy {
println "This is a workflow that doesn't do anything."
}
// Email notifications:
workflow.onComplete {
try {
email()
} catch (Exception e) {
println "Warning: Error sending completion email."
}
}