-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbenchmark.py
executable file
·2596 lines (2377 loc) · 126 KB
/
benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
:Description: A modular benchmark, which optionally generates and pre-processes
(shuffles, i.e. reorder nodes in the networks) datasets using specified
executable, optionally executes specified applications (clustering algorithms)
with specified parameters on the specified datasets, and optionally evaluates
results of the execution using specified executable(s).
All executions are traced and resources consumption is logged as:
CPU (user, kernel, etc.) and memory (RSS RAM).
Traces are saved even in case of internal / external interruptions and crashes.
= Overlapping Hierarchical Clustering Benchmark =
Implemented:
- synthetic datasets are generated using extended LFR Framework (origin:
https://sites.google.com/site/santofortunato/inthepress2, which is
"Benchmarks for testing community detection algorithms on directed and
weighted graphs with overlapping communities" by Andrea Lancichinetti 1
and Santo Fortunato) and producing specified number of instances per
each set of parameters (there can be varying network instances for the
same set of generating parameters);
- networks are shuffled (nodes are reordered) to evaluate stability /
determinism of the clustering algorithm;
- executes HiReCS (http://www.lumais.com/hirecs), Louvain (original
https://sites.google.com/site/findcommunities/ and igraph implementations),
Oslom2 (http://www.oslom.org/software.htm),
Ganxis/SLPA (https://sites.google.com/site/communitydetectionslpa/) and
SCP (http://www.lce.hut.fi/~mtkivela/kclique.html) clustering algorithms
on the generated synthetic networks and real world networks;
- evaluates results using NMI for overlapping communities, extended versions of:
* gecmi (https://bitbucket.org/dsign/gecmi/wiki/Home, "Comparing network covers
using mutual information" by Alcides Viamontes Esquivel, Martin Rosvall),
* onmi (https://github.com/aaronmcdaid/Overlapping-NMI, "Normalized Mutual
Information to evaluate overlapping community finding algorithms" by
Aaron F. McDaid, Derek Greene, Neil Hurley);
- resources consumption is evaluated using exectime profiler (https://bitbucket.org/lumais/exectime/).
:Authors: (c) Artem Lutov <artem@exascale.info>
:Organizations: eXascale Infolab <http://exascale.info/>, Lumais <http://www.lumais.com/>,
ScienceWise <http://sciencewise.info/>
:Date: 2015-04
"""
from __future__ import print_function, division # Required for stderr output, must be the first import
# Extrenal API (exporting functions)
__all__ = ['generateNets', 'shuffleNets', 'convertNets', 'runApps', 'evalResults', 'benchmark']
# Required to efficiently traverse items of dictionaries in both Python 2 and 3
try:
from future.builtins import range
except ImportError:
# Replace range() implementation for Python2
try:
range = xrange
except NameError:
pass # xrange is not defined in Python3, which is fine
import atexit # At exit termination handling
import sys
import os
import shutil
import signal # Intercept kill signals
import glob
import traceback # Stacktrace
import copy
import itertools # chain
import time
from numbers import Number # To verify that a variable is a number (int or float)
# Consider time interface compatibility for Python before v3.3
if not hasattr(time, 'perf_counter'): #pylint: disable=C0413
time.perf_counter = time.time
from math import sqrt
from multiprocessing import cpu_count # Returns the number of logical CPU units (HW treads) if defined
import benchapps # Required for the functions name mapping to/from the app names
from benchapps import PYEXEC, EXTCLSNDS, aggexec, reduceLevels # , ALGSDIR
from benchutils import IntEnum, viewitems, timeSeed, dirempty, tobackup, dhmsSec, syncedTime, \
secDhms, delPathSuffix, parseName, funcToAppName, staticTrace, PREFEXEC, \
SEPPARS, SEPINST, SEPLRD, SEPSHF, SEPPATHID, SEPSUBTASK, UTILDIR, \
TIMESTAMP_START_STR, TIMESTAMP_START_HEADER, ALEVSMAX, ALGLEVS
# PYEXEC - current Python interpreter
import benchevals # Required for the functions name mapping to/from the quality measures names
from benchevals import aggEvals, RESDIR, CLSDIR, QMSDIR, EXTRESCONS, QMSRAFN, QMSINTRIN, QMSRUNS, \
SATTRNINS, SATTRNSHF, SATTRNLEV, SUFULEV, QualitySaver, NetInfo, SMeta
from utils.mpepool import AffinityMask, ExecPool, Job, Task, secondsToHms
from utils.mpewui import WebUiApp #, bottle
from algorithms.utils.parser_nsl import asymnet, dflnetext
# if not bottle.TEMPLATE_PATH:
# bottle.TEMPLATE_PATH = []
# bottle.TEMPLATE_PATH.append('utils/views')
# Note: '/' is required in the end of the dir to evaluate whether it is already exist and distinguish it from the file
_SYNTDIR = 'syntnets/' # Default base directory for the synthetic datasets (both networks, params and seeds)
_SYNTDIR_MIXED = 'syntnets_mixed/' # Default base directory for the synthetic datasets varying only the mixing parameter
_SYNTDIR_LREDUCT = 'syntnets_lreduct/' # Default base directory for the synthetic datasets reducing the number of network links
_NETSDIR = 'networks/' # Networks sub-directory of the synthetic networks (inside _SYNTDIR)
assert RESDIR.endswith('/'), 'A directory should have a valid terminator'
_SEEDFILE = RESDIR + 'seed.txt'
_PATHIDFILE = RESDIR + 'pathid.map' # Path id map file for the results interpretation (mapping back to the input networks)
_TIMEOUT = 36 * 60*60 # Default execution timeout for each algorithm for a single network instance
_GENSEPSHF = '%' # Shuffle number separator in the synthetic networks generation parameters
_WPROCSMAX = max(cpu_count()-1, 1) # Maximal number of the worker processes, should be >= 1
assert _WPROCSMAX >= 1, 'Natural number is expected not exceeding the number of system cores'
_VMLIMIT = 4096 # Set 4 TB, it is automatically decreased to the physical memory of the computer
_HOST = None # 'localhost'; Note: start without the WebUI by default
_PORT = 8080 # Default port for the WebUI, Note: port 80 accessible only from the root in NIX
_RUNTIMEOUT = 10*24*60*60 # Clustering execution timeout, 10 days
_EVALTIMEOUT = 5*24*60*60 # Results evaluation timeout, 5 days
# Set memory limit per an algorithm equal to half of the available RAM because
# some of them (Scp and Java-based) consume huge amount of memory
_MEMLIM = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024**3 * 2.) # RAM (physical memory) size in GB
_QSEPGROUP=';' # Quality aggregation options group separator
_QSEPMSR='/' # Quality aggregation option separator for measures section
_QSEPNET=':' # Quality aggregation option separator for networks section
#_TRACE = 1 # Tracing level: 0 - none, 1 - lightweight, 2 - debug, 3 - detailed
_DEBUG_TRACE = False # Trace start / stop and other events to stderr
_webuiapp = None # Global WebUI application
# Pool of executors to process jobs, the global variable is required to terminate
# the worker processes on external signal (TERM, KILL, etc.)
_execpool = None
# Data structures --------------------------------------------------------------
class PathOpts(object):
"""Paths parameters"""
__slots__ = ('path', 'flat', 'asym', 'shfnum', '_reshuffle', '_revalue', 'ppeval')
def __init__(self, path, flat=False, asym=False, shfnum=None, reshuffle=None, revalue=None, ppeval=None):
"""Sets default values for the input parameters
path: str|unicode - path (directory or file), a wildcard is allowed
flat: bool - use flat derivatives or create the dedicated directory on shuffling
to avoid flooding of the base directory.
NOTE: variance over the shuffles of each network instance is evaluated
only for the non-flat structure.
asym: bool - the network is asymmetric (specified by arcs rather than edges),
which is considered only for the non-standard file extensions (not .nsL)
shfnum: int - the number of shuffles of each network instance to be produced, >= 0;
0 means do not produce any shuffles but process all existent
reshuffle: bool - overwrite or skip shuffles generation if they already exist. The lacked instances are always generated anyway.
revalue: bool - revaluate existing results for this path intead of omitting them
ppeval: bool - per-pair evaluation for the middle levels of the clustered networks
instead of the evaluation vs the ground-truth. Actual for the link reduced synthetic networks.
"""
# Note: flat should not be used with ppeval
assert (shfnum is None or shfnum >= 0) and (not flat or not ppeval), (
'Invalid arguments, shfnum: {}, flat: {}, revalue: {}, ppeval: {}'.format(shfnum, flat, revalue, ppeval))
# assert isinstance(path, str)
self.path = path
self.flat = flat
self.asym = asym
self.shfnum = shfnum # Number of shuffles for each network instance to be produced, >= 0
self._reshuffle = reshuffle
self._revalue = revalue or reshuffle
self.ppeval = ppeval
@property
def reshuffle(self):
"""Read property for the reshuffle attribute"""
return self._reshuffle
@reshuffle.setter
def reshuffle(self, val):
"""Write property for the reshuffle attribute
val: bool - overwrite or skip the generation if the synthetic network instances
already exist
NOTE: the shuffling also respects this flag
"""
self._reshuffle = val
self._revalue = self._revalue or val
@property
def revalue(self):
"""Read property for the revalue attribute"""
return self._revalue
@revalue.setter
def revalue(self, val):
"""Write property for the revalue attribute
val: bool - revaluate existing results for this path intead of omitting them
"""
self._revalue = val or self.reshuffle
def __str__(self):
"""String conversion"""
# return ', '.join(': '.join((name, str(val))) for name, val in viewitems(self.__dict__))
return ', '.join(': '.join((name, str(self.__getattribute__(name)))) for name in self.__slots__)
SyntPolicy = IntEnum('SyntPolicy', 'ordinary mixed lreduct') # JOB_INFO, TASK_INFO
"""Synthethic network generation polcy"""
class SyntPathOpts(PathOpts):
"""Paths parameters for the synthetic networks"""
__slots__ = ('policy', 'netins')
def __init__(self, policy, path, netins=3, overwrite=False, flat=False, asym=False, shfnum=0, ppeval=False):
"""Sets default values for the input parameters
path: str|unicode - path (directory or file), a wildcard is allowed
policy: SyntPolicy - policy for the synthetic networks generation
netins: int - the number of network instances to generate, >= 0
overwrite: bool - overwrite or skip the generation if the synthetic network instances
already exist. NOTE: the shuffling also respects this flag
flat: bool - use flat derivatives or create the dedicated directory on shuffling
to avoid flooding of the base directory.
NOTE: variance over the shuffles of each network instance is evaluated
only for the non-flat structure.
asym: bool - the network is asymmetric (specified by arcs rather than edges)
shfnum: int - the number of shuffles of each network instance to be produced, >= 0
ppeval: bool - per-pair evaluation for the middle levels of the clustered networks
instead of the evaluation vs the ground-truth. Actual for the link reduced synthetic networks.
"""
assert isinstance(policy, SyntPolicy), 'Unexpected policy type: ' + type(policy).__name__
super(SyntPathOpts, self).__init__(path, flat=flat, asym=asym, shfnum=shfnum
, reshuffle=overwrite, revalue=overwrite, ppeval=ppeval)
self.policy = policy
self.netins = netins
# self.overwrite = overwrite
@property
def overwrite(self):
#def overwrite(self):
"""Read property for the overwrite attribute"""
return self.reshuffle
@overwrite.setter
def overwrite(self, val):
# def setOverwrite(self, val):
"""Write property for the overwrite attribute
val: bool - overwrite or skip the generation if the synthetic network instances
already exist
NOTE: the shuffling also respects this flag
"""
self.reshuffle = val
assert not val or self.revalue(), '.revalue should be synced whith the .overwrite'
def __str__(self):
"""String conversion"""
return ', '.join(': '.join((name, str(self.__getattribute__(name))))
for name in itertools.chain(super(SyntPathOpts, self).__slots__, self.__slots__))
class QAggMeta(object):
"""Quality aggregation options metadata"""
__slots__ = ('exclude', 'seeded', 'plot')
def __init__(self, exclude=False, seeded=True, plot=False):
"""Sets values for the input parameters
exclude: bool - include in (filter by) or exclude from (filter out)
the output according to the specified options
seeded: bool - aggregate results only from the HDF5 storage having current seed or from all the matching storages
plot: bool - plot the aggregated results besides storing them
"""
self.exclude = exclude
self.seeded = seeded
self.plot = plot
def __str__(self):
"""String conversion"""
# return ', '.join(': '.join((name, str(val))) for name, val in viewitems(self.__dict__))
return ', '.join(': '.join((name, str(self.__getattribute__(name)))) for name in self.__slots__)
class QAggOpt(object):
"""Quality aggregation option"""
__slots__ = ('alg', 'msrs', 'nets')
def __init__(self, alg, msrs=None, nets=None):
"""Sets values for the input parameters
Specified networks and measures that do not exist in the algorithm output are omitted
alg: str - algorithm name
msrs: iterable(str) or None - quality measure names or their prefixes in the format <appname>[:<metric>][+u]
(like: "Xmeasures:MF1h_w+u")
Note: there are few measures, so linear search is the fastest
nets: set(str) or None - wildcards of the network names
"""
assert isinstance(alg, str) and isinstance(msrs, (tuple, list)) and isinstance(nets, (tuple, list)), (
'Ivalid type of the argument, alg: {}, msrs: {}, nets: {}'.format(
type(alg).__name__, type(msrs).__name__, type(nets).__name__))
self.alg = alg
self.msrs = msrs
self.nets = nets
@staticmethod
def parse(text):
"""Parse text to QAggOpt
text: str - text representation of QAggOpt in the format: <algname>[/<metric1>,<metric2>...][:<net1>,<net2>,...]
seeded: bool - aggregate results only from the HDF5 storage having current seed or from all the matching storages
plot: bool - plot the aggregated results besides storing them
return list(QAggOpt) - parsed QAggOpts
"""
if not text:
raise ValueError('A valid text is expected')
msrs = None
nets = None
groups = text.split(_QSEPGROUP)
res = []
for gr in groups:
parts = gr.split(_QSEPMSR)
if len(parts) >= 2:
# Fetch pure alg
alg = parts[0].strip()
# Fetch msrs and nets
parts = parts[1].split(_QSEPNET)
msrs = parts[0].strip().split(',')
if len(parts) >= 2:
nets = parts[1].strip().split(',')
else:
# Fetch pure alg and nets
parts = parts[0].split(_QSEPNET)
alg = parts[0].strip()
if len(parts) >= 2:
nets = parts[1].strip().split(',')
res.append(QAggOpt(alg, msrs, nets))
return res
def __str__(self):
"""String conversion"""
res = self.alg
if self.msrs:
res = _QSEPMSR.join((res, ','.join(self.msrs)))
if self.nets:
res = _QSEPNET.join((res, ','.join(self.nets)))
return res
class Params(object):
"""Input parameters"""
def __init__(self):
"""Sets default values for the input parameters
syntpos: list(SyntPathOpts) - synthetic networks path options, SyntPathOpts
runalgs - execute algorithm or not
qmeasures: list(list(str)) - quality measures with their parameters to be evaluated
on the clustering results. None means do not evaluate.
qupdate - update quality evaluations storage (update with the lacking evaluations
omitting the existent one until qrevalue is set) instead of creating a new storage
for the quality measures evaluation, applicable only for the same seed.
Otherwise a new storage is created and the existent is backed up.
qrevalue - revalue all values from scratch instead of leaving the existent values
and (evaluating and) adding only the non-existent (lacking values), makes sense
only if qupdate otherwise all values are computed anyway.
datas: PathOpts - list of datasets to be run with asym flag (asymmetric
/ symmetric links weights):
[PathOpts, ...] , where path is either dir or file [wildcard]
timeout - execution timeout in sec per each algorithm
algorithms - algorithms to be executed (just names as in the code)
seedfile - seed file name
convnets: bits - convert existing networks into the .rcg format, DEPRECATED
0 - do not convert
0b001 - convert:
0b01 - convert only if this network is not exist
0b11 - force conversion (overwrite all)
0b100 - resolve duplicated links on conversion
TODO: replace with IntEnum like in mpewui
qaggmeta: QAggMeta - quality aggregation meta options
qaggopts: list(QAggOpt) or None - aggregate evaluations of the algorithms for the
specified targets or for all algorithms on all networks if only the list is empty,
the aggregation is omitted if the value is None
host: str - WebUI host, None to disable WebUI
port: int - WebUI port
runtimeout: uint - clustering algorithms execution timeout
memlim: ufloat - max amount of memory in GB allowed for each executing application, half of RAM by default
evaltimeout: uint - resulting clusterings evaluations timeout
"""
self.syntpos = [] # SyntPathOpts()
self.runalgs = False
self.qmeasures = None # Evaluating quality measures with their parameters
self.qupdate = True
self.qrevalue = False
self.datas = [] # Input datasets, list of PathOpts, where path is either dir or file wildcard
self.timeout = _TIMEOUT
self.algorithms = []
self.seedfile = _SEEDFILE # Seed value for the synthetic networks generation and stochastic algorithms, integer
self.convnets = 0
self.qaggmeta = QAggMeta()
self.qaggopts = None # None means omit the aggregation
# self.aggrespaths = [] # Paths for the evaluated results aggregation (to be done for already existent evaluations)
# WebUI host and port
self.host = _HOST
self.port = _PORT
self.runtimeout = _RUNTIMEOUT
self.evaltimeout = _EVALTIMEOUT
self.memlim = _MEMLIM
def unquote(text):
"""Unqoute the text from ' and "
text: str - text to be unquoted
return text: str - unquoted text
>>> unquote('dfhreh')
'dfhreh'
>>> unquote('"dfhreh"')
'dfhreh'
>>> unquote('"df \\'rtj\\'"') == "df 'rtj'"
True
>>> unquote('"df" x "a"')
'"df" x "a"'
>>> unquote("'df' 'rtj'") == "'df' 'rtj'"
True
>>> unquote('"dfhreh"\\'') == '"dfhreh"\\''
True
>>> unquote('"rtj\\'a "dfh" qw\\'sd"') == 'rtj\\'a "dfh" qw\\'sd'
True
>>> unquote('"\\'dfhreh\\'"')
'dfhreh'
"""
# Ensure that the text is quoted
quotes = '"\'' # Kinds of the resolving quotes
tlen = 0 if not text else len(text) # Text length
if tlen <= 1 or text[0] not in quotes or text[-1] != text[0]:
return text
q = [] # Current quotation with its position
qnum = 0 # The number of removing quotations
for i in range(tlen):
c = text[i] # Current character (symbol)
if c not in quotes:
continue
# Count opening quotation
if not q or q[-1][0] != c:
q.append((c, i))
continue
# Closing quotation compensates the last opening one
if len(q) == tlen - i and tlen - i - 1 == q[-1][1]:
qnum += 1
else:
qnum = 0
q.pop()
return text[qnum:tlen-qnum] # Unquotted text
# Input parameters processing --------------------------------------------------
def parseParams(args):
"""Parse user-specified parameters
return params: Params
"""
assert isinstance(args, (tuple, list)) and args, 'Input arguments must be specified'
opts = Params()
timemul = 1 # Time multiplier, sec by default
for arg in args:
# Validate input format
if arg[0] != '-':
raise ValueError('Unexpected argument: ' + arg)
# Always output TIMESTAMP_START_HEADER to both stdout and stderr
print(TIMESTAMP_START_HEADER)
print(TIMESTAMP_START_HEADER, file=sys.stderr)
# Process long args
if arg[1] == '-':
# Exclusive long options
if arg.startswith('--quality-noupdate'):
opts.qupdate = False
continue
elif arg.startswith('--quality-revalue'):
opts.qrevalue = True
continue
elif arg.startswith('--runtimeout'):
nend = len('--runtimeout')
if len(arg) <= nend + 1 or arg[nend] != '=':
raise ValueError('Unexpected argument: ' + arg)
opts.runtimeout = dhmsSec(arg[nend+1:])
continue
elif arg.startswith('--evaltimeout'):
nend = len('--evaltimeout')
if len(arg) <= nend + 1 or arg[nend] != '=':
raise ValueError('Unexpected argument: ' + arg)
opts.evaltimeout = dhmsSec(arg[nend+1:])
continue
elif arg.startswith('--memlimit'):
nend = len('--memlimit')
if len(arg) <= nend + 1 or arg[nend] != '=':
raise ValueError('Unexpected argument: ' + arg)
opts.memlim = float(arg[nend+1:])
if opts.memlim < 0:
raise ValueError('Non-negative memlim value is expected: ' + arg)
continue
# Normal options
# eif arg.startswith('--std'):
# if arg == '--stderr-stamp': # or arg == '--stdout-stamp':
# #if len(args) == 1:
# # raise ValueError('More input arguments are expected besides: ' + arg)
# print(TIMESTAMP_START_HEADER, file=sys.stderr if arg == '--stderr-stamp' else sys.stdout)
# continue
# raise ValueError('Unexpected argument: ' + arg)
elif arg.startswith('--generate'):
arg = '-g' + arg[len('--generate'):]
elif arg.startswith('--generate-mixed'):
arg = '-m' + arg[len('--generate-mixed'):]
elif arg.startswith('--generate-lreduct'):
arg = '-l' + arg[len('--generate-lreduct'):]
elif arg.startswith('--input'):
arg = '-i' + arg[len('--input'):]
elif arg.startswith('--apps'):
arg = '-a' + arg[len('--apps'):]
elif arg.startswith('--runapps'):
arg = '-r' + arg[len('--runapps'):]
elif arg.startswith('--quality'):
arg = '-q' + arg[len('--quality'):]
elif arg.startswith('--timeout'):
arg = '-t' + arg[len('--timeout'):]
elif arg.startswith('--seedfile'):
arg = '-d' + arg[len('--seedfile'):]
elif arg.startswith('--convret'):
arg = '-c' + arg[len('--convret'):]
elif arg.startswith('--summary'):
arg = '-s' + arg[len('--summary'):]
elif arg.startswith('--webaddr'):
arg = '-w' + arg[len('--webaddr'):]
else:
raise ValueError('Unexpected argument: ' + arg)
if arg[1] in 'gml':
# [-g[o][a]=[<number>][{gensepshuf}<shuffles_number>][=<outpdir>]
ib = 2 # Begin index of the argument subparameters
ppeval = False
if arg[1] == 'g':
policy = SyntPolicy.ordinary
syntdir = _SYNTDIR
elif arg[1] == 'm':
policy = SyntPolicy.mixed
syntdir = _SYNTDIR_MIXED
elif arg[1] == 'l':
policy = SyntPolicy.lreduct
syntdir = _SYNTDIR_LREDUCT
if ib < len(arg) and arg[ib] == 'p':
ppeval = True
ib += 1
else:
raise('Unexpected argument for the SyntPolicy: ' + arg)
syntpo = SyntPathOpts(policy, syntdir, ppeval=ppeval)
opts.syntpos.append(syntpo)
if ib == len(arg):
continue
pos = arg.find('=', ib)
ieflags = pos if pos != -1 else len(arg) # End index of the prefix flags
for i in range(ib, ieflags):
if arg[i] == 'o':
syntpo.overwrite = True # Forced generation (overwrite)
elif arg[i] == 'a':
syntpo.asym = True # Generate asymmetric (directed) networks
else:
raise ValueError('Unexpected argument: ' + arg)
if pos != -1:
# Parse number of instances, shuffles and outpdir: [<instances>][.<shuffles>][=<outpdir>]
val = arg[pos+1:].split('=', 1)
if val[0]:
# Parse number of instances
nums = val[0].split(_GENSEPSHF, 1)
# Now [instances][shuffles][outpdir]
if nums[0]:
syntpo.netins = int(nums[0])
else:
syntpo.netins = 0 # Zero if omitted in case of shuffles are specified
# Parse shuffles
if len(nums) > 1:
syntpo.shfnum = int(nums[1])
if syntpo.netins < 0 or syntpo.shfnum < 0:
raise ValueError('Value is out of range: netins: {netins} >= 1, shfnum: {shfnum} >= 0'
.format(netins=syntpo.netins, shfnum=syntpo.shfnum))
# Parse outpdir
if len(val) > 1:
if not val[1]: # arg ended with '=' symbol
raise ValueError('Unexpected argument: ' + arg)
syntpo.path = val[1]
syntpo.path = unquote(syntpo.path)
if not syntpo.path.endswith('/'):
syntpo.path += '/'
elif arg[1] == 'i':
# [-i[f][a][{gensepshuf}<shuffles_number>]=<datasets_{{dir,file}}_wildcard>
pos = arg.find('=', 2)
if pos == -1 or arg[2] not in 'pfar=' + _GENSEPSHF or len(arg) == pos + 1:
raise ValueError('Unexpected argument: ' + arg)
# flat - Use flat derivatives or generate the dedicated dir for the derivatives of this network(s)
# asym - asymmetric (directed): None - not specified (symmetric is assumed), False - symmetric, True - asymmetric
# shfnum - the number of shuffles
popt = PathOpts(unquote(arg[pos+1:]), flat=False, asym=False, shfnum=0, ppeval=False) # Remove quotes if exist
for i in range(2, pos):
if arg[i] == 'p':
popt.ppeval = True
elif arg[i] == 'f':
popt.flat = True
elif arg[i] == 'a':
popt.asym = True
elif arg[i] == 'r':
popt.reshuffle = True
elif arg[i] == _GENSEPSHF:
popt.shfnum = int(arg[i+1:pos])
if popt.shfnum < 0:
raise ValueError('Value is out of range: shfnum: {} >= 0'.format(popt.shfnum))
break
else:
raise ValueError('Unexpected argument: ' + arg)
# Note: flat should not be used with ppeval
assert not popt.flat or not popt.ppeval, 'flat option should not be used with ppeval'
val = arg[3]
opts.datas.append(popt)
elif arg[1] == 'c':
opts.convnets = 1
for i in range(2, 4):
if len(arg) > i and (arg[i] not in 'fr'):
raise ValueError('Unexpected argument: ' + arg)
arg = arg[2:]
if 'f' in arg:
opts.convnets |= 0b10
if 'r' in arg:
opts.convnets |= 0b100
elif arg[1] == 'a':
if not (arg.startswith('-a=') and len(arg) >= 4):
raise ValueError('Unexpected argument: ' + arg)
inverse = arg[3] == '-' # Consider inversing (run all except)
if inverse and len(arg) <= 4:
raise ValueError('Unexpected argument: ' + arg)
opts.algorithms = unquote(arg[3 + inverse:]).split() # Note: argparse automatically performs this escaping
# Exclude algorithms if required
if opts.algorithms and inverse:
algs = fetchAppnames(benchapps)
try:
for alg in opts.algorithms:
algs.remove(alg)
except ValueError as err:
print('ERROR, "{}" does not exist: {}'.format(alg, err))
raise
opts.algorithms = algs
# Note: all algs are run if not specified
elif arg[1] == 'r':
if len(arg) > 2:
raise ValueError('Unexpected argument: ' + arg)
opts.runalgs = True
elif arg[1] == 'q':
if not (arg == '-q' or (arg.startswith('-q=') and len(arg) >= 4)):
raise ValueError('Unexpected argument: ' + arg)
# Note: this option can be supplied multiple time with various values
if opts.qmeasures is None:
opts.qmeasures = []
# Note: each argument is stored as an array item, which is either a parameter or its value
opts.qmeasures.append(unquote(arg[3:]).split())
elif arg[1] == 's':
if opts.qaggopts is None: # Aggregate all algs on all networks
opts.qaggopts = [] # QAggOpt array
iv0 = 2
if len(arg) == iv0:
continue
# Parse (plot, sdeded) for the summarization
ival = arg.find('=', iv0)
# Parse even if the value part is not specifier
if ival == -1:
ival = len(arg)
for i in range(iv0, ival):
if arg[i] == 'p':
opts.qaggmeta.plot = True
elif arg[i] == '*':
opts.qaggmeta.seeded = False
elif arg[i] == '-':
opts.qaggmeta.exclude = True
else:
raise ValueError('Bad argument [{}]: {}'.format(i, arg))
# Parse the values
if ival < len(arg):
opts.qaggopts = QAggOpt.parse(arg[ival+1:])
elif arg[1] == 't':
pos = arg.find('=', 2)
if pos == -1 or arg[2] not in 'smh=' or len(arg) == pos + 1:
raise ValueError('Unexpected argument: ' + arg)
pos += 1
if arg[2] == '=':
opts.timeout = dhmsSec(arg[pos:])
else:
if arg[2] == 'm':
timemul = 60 # Minutes
elif arg[2] == 'h':
timemul = 3600 # Hours
opts.timeout = float(arg[pos:]) * timemul
assert opts.timeout >= 0, 'Non-negative timeout is expected'
elif arg[1] == 'd':
if len(arg) <= 3 or arg[2] != '=':
raise ValueError('Unexpected argument: ' + arg)
opts.seedfile = arg[3:]
elif arg[1] == 'w':
if len(arg) <= 3 or arg[2] != '=':
raise ValueError('Unexpected argument: ' + arg)
# Parse host and port
host = arg[3:]
if host:
isep = host.rfind(':')
if isep != -1:
try:
opts.port = int(host[isep+1:])
opts.host = host[:isep]
except ValueError:
opts.port = _PORT
opts.host = host
else:
opts.host = host
# print('>>> Webaddr specified: {}, parced host: {}, port: {}'.format(
# host, opts.host, opts.port), file=sys.stderr)
else:
raise ValueError('Unexpected argument: ' + arg)
return opts
# Networks processing ----------------------------------------------------------
def generateNets(genbin, policy, insnum, asym=False, basedir=_SYNTDIR, netsdir=_NETSDIR
, overwrite=False, seedfile=_SEEDFILE, gentimeout=3*60*60): # 2-4 hours
"""Generate synthetic networks with ground-truth communities and save generation params.
Previously existed paths with the same name are backed up before being updated.
genbin - the binary used to generate the data (full path or relative to the base benchmark dir)
policy: SyntPolicy - synthetic networks generation policy
insnum - the number of instances of each network to be generated, >= 1
asym - generate asymmetric (specified by arcs, directed) instead of undirected networks
basedir - base directory where data will be generated
netsdir - relative directory for the synthetic networks, contains subdir-s,
each contains all instances of each network and all shuffles of each instance
overwrite - whether to overwrite existing networks or use them
seedfile - seed file name
gentimeout - timeout for all networks generation in parallel mode, >= 0,
0 means unlimited time
"""
paramsdir = 'params/' # Contains networks generation parameters per each network type
seedsdir = 'seeds/' # Contains network generation seeds per each network instance
# Store all instances of each network with generation parameters in the dedicated directory
assert isinstance(policy, SyntPolicy), 'Unexpected policy type: ' + type(policy).__name__
assert insnum >= 1, 'Number of the network instances to be generated must be positive'
assert ((basedir == '' or basedir[-1] == '/') and paramsdir[-1] == '/' and seedsdir[-1] == '/' and netsdir[-1] == '/'
), 'Directory name must have valid terminator'
assert os.path.exists(seedfile), 'The seed file should exist'
paramsdirfull = basedir + paramsdir
seedsdirfull = basedir + seedsdir
netsdirfull = basedir + netsdir
# Initialize backup path suffix if required
if overwrite:
bcksuffix = syncedTime(lock=False) # Use the same backup suffix for multiple paths
# Create dirs if required
for dirname in (basedir, paramsdirfull, seedsdirfull, netsdirfull):
if not os.path.exists(dirname):
os.mkdir(dirname) # Note: mkdir does not create intermediate (non-leaf) dirs
# Backup target dirs on rewriting, removing backed up content
elif overwrite and not dirempty(dirname):
tobackup(dirname, False, bcksuffix, move=True) # Move to the backup
os.mkdir(dirname)
# Initial options for the networks generation
N0 = 1000 # Satrting number of nodes
rmaxK = 3 # Min ratio of the max degree relative to the avg degree
# 1K ** 0.618 -> 71, 100K -> 1.2K
def evalmuw(mut):
"""Evaluate LFR muw"""
assert isinstance(mut, Number), 'A number is expected'
return mut * 1.05 # 0.75
def evalmaxk(genopts):
"""Evaluate LFR maxk"""
# 0.618 is 1/golden_ratio; sqrt(n), but not less than rmaxK times of the average degree
# => average degree should be <= N/rmaxK
return int(max(genopts['N'] ** 0.618, genopts['k']*rmaxK))
def evalminc(genopts):
"""Evaluate LFR minc"""
return 2 + int(sqrt(genopts['N'] / N0))
def evalmaxc(genopts):
"""Evaluate LFR maxc"""
return int(genopts['N'] / 3)
def evalon(genopts, mixed):
"""Evaluate LFR on
mixed: bool - the number of overlapping nodes 'on' depends on the topology mixing
"""
if mixed:
return int(genopts['N'] * genopts['mut']**2) # The number of overlapping nodes
return int(genopts['N'] ** 0.618)
# Template of the generating options files
# mut: external cluster links / total links
if policy == SyntPolicy.ordinary:
genopts = {'beta': 1.5, 't1': 1.75, 't2': 1.35, 'om': 2, 'cnl': 1} # beta: 1.35, 1.2 ... 1.618; t1: 1.65,
else:
genopts = {'beta': 1.5, 't1': 1.75, 't2': 1.35, 'om': 2, 'cnl': 1} # beta: 1.35, 1.2 ... 1.618; t1: 1.65,
# Defaults: beta: 1.5, t1: 2, t2: 1
# Generate options for the networks generation using chosen variations of params
if policy == SyntPolicy.ordinary:
varNmul = (1, 5, 20, 50) # *N0 - sizes of the generating networks in thousands of nodes; Note: 100K on max degree works more than 30 min; 50K -> 15 min
vark = (5, 25, 75) # Average node degree (density of the network links)
varMut = (0.275,)
else:
varNmul = (10,) # *N0 - sizes of the generating networks in thousands of nodes; Note: 100K on max degree works more than 30 min; 50K -> 15 min
vark = (20,) # Average node degree (density of the network links)
varMut = tuple(0.05 * i for i in range(10)) if policy == SyntPolicy.mixed else (0.275,)
#varNmul = (1, 5) # *N0 - sizes of the generating networks in thousands of nodes; Note: 100K on max degree works more than 30 min; 50K -> 15 min
#vark = (5, 25) # Average node degree (density of the network links)
assert vark[-1] <= round(varNmul[0] * 1000 / rmaxK), 'Avg vs max degree validation failed'
#varkr = (0.5, 1, 5) #, 20) # Average relative density of network links in percents of the number of nodes
global _execpool
assert _execpool is None, 'The global execution pool should not exist'
# Note: AffinityMask.CORE_THREADS - set affinity in a way to maximize the CPU cache L1/2 for each process
# 1 - maximizes parallelization => overall execution speed
with ExecPool(_WPROCSMAX, afnmask=AffinityMask(1)
, memlimit=_VMLIMIT, name='gennets_' + policy.name, webuiapp=_webuiapp) as _execpool:
bmname = os.path.split(genbin)[1] # Benchmark name
genbin = os.path.relpath(genbin, basedir) # Update path to the executable relative to the job workdir
# Copy benchmark seed to the syntnets seed
randseed = basedir + 'lastseed.txt' # Random seed file name
shutil.copy2(seedfile, randseed)
# namepref = '' if policy == SyntPolicy.ordinary else policy.name[0]
# namesuf = '' if policy != SyntPolicy.lreduct else '_0'
netext = dflnetext(asym) # Network file extension (should have the leading '.')
asymarg = ['-a', '1'] if asym else None # Whether to generate directed (specified by arcs) or undirected (specified by edges) network
for nm in varNmul:
N = nm * N0
for k in vark:
for mut in varMut:
netgenTimeout = max(nm * k / 1.5, 30) # ~ up to 30 min (>= 30 sec) per a network instance (50K nodes on K=75 takes ~15-35 min)
name = 'K'.join((str(nm), str(k))) #.join((namepref, namesuf))
if len(varMut) >= 2:
name += 'm{:02}'.format(int(round(mut*100))) # Omit '0.' prefix and show exactly 2 digits padded with 0: 0.05 -> m05
ext = '.ngp' # Network generation parameters
# Generate network parameters files if not exist
fnamex = name.join((paramsdirfull, ext))
if overwrite or not os.path.exists(fnamex):
print('Generating {} parameters file...'.format(fnamex))
with open(fnamex, 'w') as fout:
genopts.update({'N': N, 'k': k, 'mut': mut, 'muw': evalmuw(mut)})
genopts.update({'maxk': evalmaxk(genopts), 'minc': evalminc(genopts), 'maxc': evalmaxc(genopts)
, 'on': evalon(genopts, policy == SyntPolicy.ordinary), 'name': name})
for opt in viewitems(genopts):
fout.write(''.join(('-', opt[0], ' ', str(opt[1]), '\n')))
else:
assert os.path.isfile(fnamex), '{} should be a file'.format(fnamex)
# Recover the seed file is exists
netseed = name.join((seedsdirfull, '.ngs'))
if os.path.isfile(netseed):
shutil.copy2(netseed, randseed)
if _DEBUG_TRACE:
print('The seed {netseed} is retained (but inapplicable for the shuffles)'.format(netseed=netseed))
# Generate networks with ground truth corresponding to the parameters
if os.path.isfile(fnamex):
netpath = name.join((netsdir, '/')) # syntnets/networks/<netname>/ netname.*
netparams = name.join((paramsdir, ext)) # syntnets/params/<netname>.<ext>
xtimebin = os.path.relpath(UTILDIR + 'exectime', basedir)
jobseed = os.path.relpath(netseed, basedir)
# Generate required number of network instances
netpathfull = basedir + netpath
if not os.path.exists(netpathfull):
os.mkdir(netpathfull)
startdelay = 0.1 # Required to start execution of the LFR benchmark before copying the time_seed for the following process
def startgen(name, inst=0):
"""Start generation of the synthetic network instance
name: str - base network name
inst: int - instance index
"""
print(' Starting generation {}^{}'.format(name, inst))
assert isinstance(name, str) and isinstance(inst, int), ('Unexpected arguments type, name: {}, inst: {}'.
format(type(name).__name__, type(inst).__name__))
netname = name if not inst else ''.join((name, SEPINST, str(inst)))
netfile = netpath + netname
if not overwrite and os.path.exists(netfile.join((basedir, netext))):
return
print(' > Starting the generation job')
args = [xtimebin, '-n=' + netname, ''.join(('-o=', bmname, EXTRESCONS)) # Output .rcp in the current dir, basedir
, genbin, '-f', netparams, '-name', netfile, '-seed', jobseed]
if asymarg:
args.extend(asymarg)
# Consider links reduction policy, which should produce multiple instances with reduced links
if policy == SyntPolicy.lreduct:
args = (PYEXEC, '-c',
# Network instance generation with subsequent links reduction
"""from __future__ import print_function #, division # Required for stderr output, must be the first import
import subprocess
import os
import sys
sys.path.append('{benchdir}')
from utils.remlinks import remlinks
subprocess.check_call({args}) # Raises exception on failed call
# Form the path and file name for the network with reduced links
netfile = '{netfile}{netext}'
path, netname = os.path.split('{netfile}')
basepath, dirname = os.path.split(path)
iinst = netname.rfind('{SEPINST}') # Index of the instance suffix
if iinst == -1:
iinst = len(netname)
for i in range(1, 16, 2): # 1 .. 15% with step 2
# Use zero padding of number to have proper ordering of the filename for easier per-pair comparison
istr = '{{:02}}'.format(i) # str(i)
rlsuf = ''.join(('{SEPLRD}', istr, 'p'))
rlname = ''.join((netname[:iinst], rlsuf, netname[iinst:]))
rlpath = '/'.join((basepath, dirname + rlsuf))
frlname = '/'.join((rlpath, rlname))
# Produce file with the reduced links
try:
remlinks(istr + '%', netfile, frlname + '{netext}')
# Link the ground-truth with updated name
os.symlink(os.path.relpath(os.path.splitext(netfile)[0] + '{EXTCLSNDS}', rlpath), frlname + '{EXTCLSNDS}')
except Exception as err: #pylint: disable=W0703
print('ERROR on links redution making {{}}: {{}}, discarded. {{}}'
.format(frlname + '{netext}', err, traceback.format_exc(5)), file=sys.stderr)
""".format(benchdir=os.getcwd(), args=args, netfile=netfile, netext=netext, SEPLRD=SEPLRD, SEPINST=SEPINST, EXTCLSNDS=EXTCLSNDS)) # Skip the shuffling if the respective file already exists
#Job(name, workdir, args, timeout=0, rsrtonto=False, onstart=None, ondone=None, tstart=None)
# , workdir=basedir
_execpool.execute(Job(name=netname, workdir=basedir, args=args, timeout=netgenTimeout, rsrtonto=True
#, onstart=lambda job: shutil.copy2(randseed, job.name.join((seedsdirfull, '.ngs'))) # Network generation seed
, onstart=lambda job: shutil.copy2(randseed, netseed) #pylint: disable=W0640; Network generation seed
#, ondone=shuffle if shfnum > 0 else None
, startdelay=startdelay, category='generate_' + str(k), size=N))
netfile = netpath + name
if _DEBUG_TRACE:
print('Generating {netfile} as {name} by {netparams}'.format(netfile=netfile, name=name, netparams=netparams))
# if insnum and overwrite or not os.path.exists(netfile.join((basedir, netext))):
# args = [xtimebin, '-n=' + name, ''.join(('-o=', bmname, EXTRESCONS)) # Output .rcp in the current dir, basedir
# , genbin, '-f', netparams, '-name', netfile, '-seed', jobseed]
# if asymarg:
# args.extend(asymarg)
# #Job(name, workdir, args, timeout=0, rsrtonto=False, onstart=None, ondone=None, tstart=None)
# _execpool.execute(Job(name=name, workdir=basedir, args=args, timeout=netgenTimeout, rsrtonto=True
# #, onstart=lambda job: shutil.copy2(randseed, job.name.join((seedsdirfull, '.ngs'))) # Network generation seed
# , onstart=lambda job: shutil.copy2(randseed, netseed) #pylint: disable=W0640; Network generation seed
# #, ondone=shuffle if shfnum > 0 else None
# , startdelay=startdelay, category='generate_' + str(k), size=N))
for i in range(insnum):
startgen(name, i)
# nameinst = ''.join((name, SEPINST, str(i)))
# netfile = netpath + nameinst
# if overwrite or not os.path.exists(netfile.join((basedir, netext))):
# args = [xtimebin, '-n=' + nameinst, ''.join(('-o=', bmname, EXTRESCONS))
# , genbin, '-f', netparams, '-name', netfile, '-seed', jobseed]
# if asymarg:
# args.extend(asymarg)
# #Job(name, workdir, args, timeout=0, rsrtonto=False, onstart=None, ondone=None, tstart=None)
# _execpool.execute(Job(name=nameinst, workdir=basedir, args=args, timeout=netgenTimeout, rsrtonto=True
# #, onstart=lambda job: shutil.copy2(randseed, job.name.join((seedsdirfull, '.ngs'))) # Network generation seed
# , onstart=lambda job: shutil.copy2(randseed, netseed) #pylint: disable=W0640; Network generation seed
# #, ondone=shuffle if shfnum > 0 else None
# , startdelay=startdelay, category='generate_' + str(k), size=N))
else:
print('ERROR, network parameters file "{}" does not exist'.format(fnamex), file=sys.stderr)
print('Parameter files generation completed')
if gentimeout <= 0:
gentimeout = insnum * netgenTimeout
# Note: insnum*netgenTimeout is max time required for the largest instances generation,
# insnum*2 to consider all smaller networks
try:
_execpool.join(min(gentimeout, insnum*2*netgenTimeout))
except BaseException as err: # Consider also system interruptions not captured by the Exception
print('WARNING, network generation execution pool is interrupted by: {}. {}'
.format(err, traceback.format_exc(5)), file=sys.stderr)
raise
_execpool = None
print('Synthetic networks files generation completed')
def shuffleNets(datas, timeout1=7*60, shftimeout=30*60): # 7, 30 min
"""Shuffle specified networks backing up and updating existent shuffles.
Existing shuffles with the target name are skipped, redundant are deleted,
lacked are formed.
datas - input datasets, PathOpts with wildcards of files or directories
containing files of the default extensions .ns{{e,a}}
timeout1 - timeout for a single shuffle, >= 0
shftimeout - total shuffling timeout, >= 0, 0 means unlimited time
"""
if not datas:
return
assert isinstance(datas[0], PathOpts), 'datas must be a container of PathOpts'
assert timeout1 + 0 >= 0, 'Non-negative shuffling timeout is expected'