-
Notifications
You must be signed in to change notification settings - Fork 0
/
simulator.py
executable file
·1326 lines (1199 loc) · 61.6 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3
from configparser import ConfigParser
from os import path, makedirs, get_terminal_size
import argparse
import h5py
import time
import warnings
import numpy as np
import bisect
import re
import itertools
import random
from scipy.stats import poisson
# Import general configuration
general_config = ConfigParser()
config_path = path.join(path.dirname(__file__), 'config.ini')
general_config.read(config_path)
# Import model configuration
model_config = ConfigParser()
model_config.read(path.join(path.dirname(__file__), 'models', general_config['simulation']['model'] + '.ini'))
py_module = 'models.' + model_config['module']['name']
model_module = __import__(py_module, fromlist=[''])
# Print HDF5 file contents
def print_name_type(name, obj):
print(name, type(obj))
# Increments number of file name if already exists
def uniquify(path_name):
"""
Uniquify the name of a file by adding a 0 to the end or incrementing the final number.
:param str path_name: Path to the file which name is going to be uniquified.
:return: Path for the new file name.
"""
filename, extension = path.splitext(path_name)
counter = 0
while path.exists(path_name):
path_name = filename + "_" + str(counter) + extension
counter += 1
return path_name
# Delete random elements from a list, used for emigration
def delete_random_elems(input_list, n):
"""
Delete random elements from a list.
:param list input_list: List from which random elements will be deleted.
:param int n: Number of elements to delete.
:return: New list with random elements deleted.
"""
for i in range(n):
group_index = random.randrange(len(input_list))
ind_index = random.randrange(len(input_list[group_index]))
input_list[group_index].pop(ind_index)
return input_list
# Length of a nested list, used for calculating the size of a groups structured population
def nested_len(nested_list):
"""
Calculate length of a nested list.
:param list[list] nested_list: It is mandatory that the list contains only lists with the objects to be counted.
:return: The number of elements in the lists of the input list.
"""
length = 0
for group in nested_list:
length += len(group)
return length
class Chromosome:
"""
One copy of the two from a locus, contains a list of SNVs and an allele.
:param str allele: Name of the allele of this Chromosome.
:ivar np.array snvs: List of floats representing mutations in an infinite sited model genome.
"""
def __init__(self, allele):
self.__snvs = np.array([])
self.__allele = allele
@property
def snvs(self):
return self.__snvs
@snvs.setter
def snvs(self, value):
self.__snvs = value
@property
def allele(self):
return self.__allele
def mutate(self, locus_size, mutation_rate):
"""
Adds mutations as floating point numbers to the SNVs list.
:param int locus_size:
:param float mutation_rate:
"""
# Number of mutations calculated based on a Poisson distribution
mean_mutations = mutation_rate * locus_size
n_mutations = poisson.rvs(mean_mutations)
mutations = np.sort([random.random() for _ in range(n_mutations)])
mutations_indexes = np.searchsorted(self.__snvs, mutations)
self.__snvs = np.insert(self.__snvs, mutations_indexes, mutations)
if not list(np.sort(self.__snvs)) == list(self.__snvs):
print(np.sort(self.__snvs), self.__snvs)
raise Exception('SNVs list is not ordered')
# Discontinued: SNVs are now floats instead of objects for performance
def snvs_to_positions(self):
return np.array([snv.position for snv in self.__snvs])
def snvs_to_sequence(self, locus_size):
"""
Save SNVs lists to haplotype format with 0 representing ancestral variant and 1 representing mutated
:param int locus_size: Size of the locus used for calculating the position of the floats mutations
:return: The chromosome's SNVs as a haplotype
"""
# Position of the mutations
mutations = [round(snv * locus_size) for snv in self.__snvs]
sequence = np.zeros(locus_size)
for mutation in mutations:
if mutation != locus_size:
# If the position is already mutated, to avoid recurrent mutations, the next position is mutated
if sequence[mutation] == 1 and mutation + 1 != locus_size:
sequence[mutation + 1] = 1
else:
sequence[mutation] = 1
return sequence
class Locus:
"""
Locus object of the genome containing two chromosome, each one with an allele.
:param list[Chromosome] chromosomes: Lists of the two Chromosome objects of the locus.
:param str name: Name of the locus.
:param int locus_size: Size of the locus for calculating mutation and recombination based on rate.
:param float mutation_rate: Probability of mutation in every position in parts per unit.
:param float recombination_rate: Probability of recombination in every position in parts per unit.
"""
def __init__(self, chromosomes, name, locus_size, mutation_rate, recombination_rate):
self.__chromosomes = chromosomes
self.__name = name
self.__locus_size = locus_size
self.__mutation_rate = mutation_rate
self.__recombination_rate = recombination_rate
@property
def chromosomes(self):
return self.__chromosomes
@chromosomes.setter
def chromosomes(self, value):
self.__chromosomes = value
@property
def name(self):
return self.__name
@property
def locus_size(self):
return self.__locus_size
@property
def mutation_rate(self):
return self.__mutation_rate
@property
def recombination_rate(self):
return self.__recombination_rate
def recombine(self, crossovers=None):
"""
Recombine the chromosomes of the locus based on its recombination rate and size.
"""
mean_crossovers = self.__recombination_rate * self.__locus_size
# Crossover can be determined only for testing, in the simulation they are calculated at random
if not crossovers:
number_crossovers = poisson.rvs(mean_crossovers)
crossovers = [random.random() for _ in range(number_crossovers)]
for crossover_point in crossovers:
# Index of crossover in each of the chromosomes
crossover_index_0 = bisect.bisect_left(self.__chromosomes[0].snvs, crossover_point)
crossover_index_1 = bisect.bisect_left(self.__chromosomes[1].snvs, crossover_point)
# Chromosomes 0 gets the first part of itself and the second part of chromosome 1
chromosome_0 = np.concatenate((
self.__chromosomes[0].snvs[:crossover_index_0],
self.__chromosomes[1].snvs[crossover_index_1:]))
# Chromosomes 1 gets the first part of itself and the second part of chromosome 0
chromosome_1 = np.concatenate((
self.__chromosomes[1].snvs[:crossover_index_1],
self.__chromosomes[0].snvs[crossover_index_0:]))
# Recombined chromosomes are set to the attribute to be recombined again
self.__chromosomes[0].snvs = chromosome_0
self.__chromosomes[1].snvs = chromosome_1
class Genome:
"""
Genome object which has all the genetic information. It contains all the loci of the genetic model.
:param list[Locus] loci: List of Locus objects.
"""
def __init__(self, loci):
self.__loci = loci
@property
def loci(self):
return self.__loci
@property
def locus_names(self):
locus_names = []
for locus in self.__loci:
locus_names.append(locus.name)
return locus_names
def haplotype(self):
"""
Haplotypes of both the copies of each locus.
:return: Tuple of haplotypes.
"""
full_haplotype = []
for locus in self.__loci:
locus_haplotype = []
for chromosome in locus.chromosomes:
locus_haplotype.append(chromosome.snvs_to_sequence(locus.locus_size))
full_haplotype.append(locus_haplotype)
return tuple(full_haplotype)
class Individual:
"""
Individual object.
:param Simulation simulation: Object of the simulation which contains information needed from the Individual object.
:ivar int age: Age of the individual, each generation increases in 1.
:ivar int life_expectancy: Age at which the individual will die calculated from a normal distribution.
:ivar Genome genome: Genome object initialized with empty Locus objects.
:ivar list genotype: List of lists containing the alleles in each locus.
:ivar list phenotype: Lists of the phenotype for each locus.
:ivar float initial_survival_probability: The initial survival probability does not change across the simulation,
is calculated from a normal distribution and is inherited.
:ivar float survival_probability: Variable survival probability, can change with altruism.
:ivar int id: ID of the individual used for calculating relatedness, automatic incremental number.
:ivar list ancestry: List with the IDs of the ancestors of the individual, each sublist represents a generation
"""
def __init__(self, simulation):
self.__simulation = simulation
self.__age = 0
self.__life_expectancy = round(np.random.normal(simulation.life_expectancy, simulation.life_expectancy_sd))
if self.__life_expectancy == 0:
self.__life_expectancy = 1
loci = []
for locus in simulation.loci:
locus_size = int(simulation.model_config_dict[locus]['locus_size'])
mutation_rate = float(simulation.model_config_dict[locus]['mutation_rate'])
recombination_rate = float(simulation.model_config_dict[locus]['recombination_rate'])
loci.append(Locus([], locus, locus_size, mutation_rate, recombination_rate))
self.__genome = Genome(loci)
self.__genotype = []
self.__phenotype = []
self.__initial_survival_probability = np.random.normal(simulation.survival_probability_mean,
simulation.survival_probability_sd)
self.__survival_probability = self.__initial_survival_probability
self.__id = simulation.newest_ind_id + 1
# The newest ID of the simulation is updated for the next initialization of an individual
simulation.newest_ind_id = self.__id
self.__ancestry = []
for i in range(simulation.ancestry_generations):
# List of lists with the ancestors in each generation
self.__ancestry.append([0] * 2 ** (i + 1))
@property
def age(self):
return self.__age
@property
def life_expectancy(self):
return self.__life_expectancy
@life_expectancy.setter
def life_expectancy(self, value):
self.__life_expectancy = value
@property
def genome(self):
return self.__genome
@genome.setter
def genome(self, value):
self.__genome = value
@property
def genotype(self):
return self.__genotype
@genotype.setter
def genotype(self, value):
"""
The genotype is a lists with a list for each locus with the alleles of that locus. With the genotype,
the phenotype is calculated and the Genome object is completed
:param list[list[str]] value: Genotype of the individual as a list of lists containing the alleles of each locus
"""
self.__genotype = value
self.generate_phenotype()
self.generate_genome()
@property
def phenotype(self):
return self.__phenotype
@phenotype.setter
def phenotype(self, value):
self.__phenotype = value
@property
def initial_survival_probability(self):
return self.__initial_survival_probability
@initial_survival_probability.setter
def initial_survival_probability(self, value):
self.__initial_survival_probability = value
self.__survival_probability = value
@property
def survival_probability(self):
return self.__survival_probability
@survival_probability.setter
def survival_probability(self, value):
self.__survival_probability = value
@property
def id(self):
return self.__id
@id.setter
def id(self, value):
self.__id = value
@property
def ancestry(self):
return self.__ancestry
@ancestry.setter
def ancestry(self, value):
self.__ancestry = value
def generate_phenotype(self):
"""
Calculates the phenotype considering the genotype and inheritance pattern of the locus.\n
Inheritance symbols are:\n
> : Left dominant\n
= : codominant
"""
phenotype = []
for i in range(len(self.__simulation.loci)):
locus = self.__simulation.loci[i]
# If both alleles are the same the phenotype is that allele
if self.__genotype[i][0] == self.__genotype[i][1]:
phenotype.append(self.__genotype[i][0])
else:
# ?: Non capturing group
# Matches >, =, any whitespace character, end of line or beginning of line
inheritance_divider = r'(?:>|=|\s|$|^)'
characters_between_alleles = re.search(
f'{inheritance_divider}{self.__genotype[i][0]}{inheritance_divider}(.*){inheritance_divider}'
f'{self.__genotype[i][1]}{inheritance_divider}',
self.__simulation.model_config_dict[locus]['alleles'])
reverse = False
if not characters_between_alleles:
characters_between_alleles = re.search(
f'{inheritance_divider}{self.__genotype[i][1]}{inheritance_divider}(.*){inheritance_divider}'
f'{self.__genotype[i][0]}{inheritance_divider}',
self.__simulation.model_config_dict[locus]['alleles'])
reverse = True
# Dominance-recessiveness
if '>' in characters_between_alleles.group(1):
if reverse:
phenotype.append(self.__genotype[i][1])
else:
phenotype.append(self.__genotype[i][0])
# Codominance
else:
chosen_phenotype = self.__genotype[i][0] + '_' + self.__genotype[i][1]
# [locus][0] is equal to locus's alleles
if chosen_phenotype in self.__simulation.loci_properties[locus][0]:
phenotype.append(chosen_phenotype)
else:
phenotype.append(self.__genotype[i][1] + '_' + self.__genotype[i][0])
self.phenotype = phenotype
def generate_genome(self):
"""
Adds Chromosome objects to the genome with the genotype assigned to the individual.
:return:
"""
for locus, genotype in zip(self.__genome.loci, self.__genotype):
chromosomes = []
for allele in genotype:
chromosomes.append(Chromosome(allele))
locus.chromosomes = chromosomes
def age_individual(self):
"""
Adds one to the age of the individual.
:return: True if individual has reached its life expectancy.
"""
self.__age += 1
return self.__age == self.__life_expectancy
class Simulation:
"""
Simulation object that contains all the information that the simulations
needs to run and all the individuals structured in groups.
:param int generations: Number of generations in the simulation.
:param int group_number: Number of groups in the simulation.
:param int group_size: Initial size of all groups in generation 0.
:param int group_size_limit: Size limit of a group.
:param int population_size_limit: Limit of the size of the whole population.
:param float descendants_per_survivor: Average number of descendants that each survivor will have.
:param float group_migration: Proportion of migrating individuals in each group.
:param float emigration: Proportion of emigrants of the whole populations.
:param float immigration: Proportion of immigrants of the whole populations.
:param str immigration_phenotype: List of phenotypes per locus of the immigrants.
:param int life_expectancy: Mean of the life expectancy normal distribution.
:param float life_expectancy_sd: Standard deviation of the life expectancy normal distribution.
:param float survival_probability_mean: Mean of the survival probability normal distribution.
:param float life_expectancy_sd: Standard deviation of the survival probability normal distribution.
:param int ancestry_generations: Number of generations taken into account in the pedigree for relatedness.
:param str output_file_name: Name of the HDF5 file where the summary of the simulation will be stored.
:param dictionary model_config_dict: Dictionary with the configuration of the genetic model.
:param bool test: If True prevents creating output file when running tests.
"""
def __init__(
self,
generations,
group_number,
group_size,
group_size_limit,
population_size_limit,
descendants_per_survivor,
group_migration,
emigration,
immigration,
immigration_phenotype,
life_expectancy,
life_expectancy_sd,
survival_probability_mean,
survival_probability_sd,
ancestry_generations,
output_file_name,
model_config_dict,
test):
self.__generations = generations + 1
self.__group_number = group_number
self.__group_size = group_size
self.__groups = []
self.__calc_avg_survival_prob = []
self.__exp_avg_survival_prob = []
self.__groups_indices = np.arange(self.__group_number)
self.__total_groups = self.__group_number
self.__group_size_limit = group_size_limit
self.__population_size_limit = population_size_limit
self.__descendants_per_survivor = descendants_per_survivor
self.__group_migration = group_migration
self.__emigration = emigration
self.__immigration = immigration
if immigration_phenotype:
self.__immigration_phenotype = immigration_phenotype.replace(' ', '').split(',')
self.__life_expectancy = life_expectancy
self.__life_expectancy_sd = life_expectancy_sd
self.__survival_probability_mean = survival_probability_mean
self.__survival_probability_sd = survival_probability_sd
self.__generation = 0
self.__ancestry_generations = ancestry_generations
self.__model_config_dict = model_config_dict
self.__loci_properties = {}
self.__alleles_combinations = []
self.__newest_ind_id = 0
self.__output_file_name = output_file_name
self.__stop = False
self.__test = test
'''
Example of model_config_dict:
{'module': {'name': 'blind_altruism_genomes'}, # Model name
'behaviour': {'alleles': 'selfish > altruistic', # Locus' alleles and inheritance pattern
'initial_frequencies': '0.5, 0.5', # Initial allele's frequencies in order of appearance
'locus_size': '0', # Size of the loci for mutation and recombination
'mutation_rate': '0', # Mutation probability on a position of the genome
'recombination_rate': '0'}, # Recombination probability on a position of the genome
'neutral': {'alleles': 'neutral', # Locus' alleles and inheritance pattern
'initial_frequencies': '1', # Initial allele's frequencies in order of appearance
'locus_size': '0', # Size of the loci for mutation and recombination
'mutation_rate': '0', # Mutation probability on a position of the genome
'recombination_rate': '0'} # Recombination probability on a position of the genome
} '''
for locus in self.__model_config_dict.keys():
# Ignore module name
if locus != 'module':
# Invalid allele's name
incorrect_character = re.search(r'[^\w\s=>]', self.__model_config_dict[locus]['alleles'])
if incorrect_character:
raise Exception(f'Character "{incorrect_character.group(0)}" in allele not valid')
# Saving allele names for each locus
allele_options = re.split(r'\s*>\s*|\s*=\s*', self.__model_config_dict[locus]['alleles'])
noncodominant_allele_options = allele_options.copy()
# Saving codominant phenotypes for each locus
codominant_alleles_raw = re.findall(r'[\w=]*=[\w=]*',
self.__model_config_dict[locus]['alleles'].replace(' ', ''))
for element in codominant_alleles_raw:
for combination in itertools.combinations(element.split('='), 2):
codominant_phenotype = combination[0] + '_' + combination[1]
allele_options.append(codominant_phenotype)
for allele in allele_options:
if re.search(r'\s', allele):
raise Exception(f'Missing inheritance symbol in "{allele}", allele names can\'t have spaces')
initial_frequencies = re.split(r'\s*,\s*', self.__model_config_dict[locus]['initial_frequencies'])
if len(initial_frequencies) != len(noncodominant_allele_options) and \
len(initial_frequencies) != len(noncodominant_allele_options) - 1:
raise Exception(f'Incorrect number of allele frequencies given in locus "{locus}",'
f'there must be same number as alleles or one less')
# Initial frequencies are defined by a portion of the range from 0 to 1
# First frequency range is set manually as it needn't be calculated
initial_frequencies_ranges = [float(initial_frequencies[0])]
for i in range(len(initial_frequencies))[1:]:
new_range = float(initial_frequencies[i]) + initial_frequencies_ranges[i - 1]
if new_range > 1:
raise Exception(f'Error in locus "{locus}", the sum of allele frequencies cannot exceed 1')
initial_frequencies_ranges.append(new_range)
if len(initial_frequencies_ranges) != len(noncodominant_allele_options):
initial_frequencies_ranges.append(1)
elif initial_frequencies_ranges[-1] < 1:
raise Exception(f'Error in locus "{locus}", the sum of allele frequencies must be 1')
self.__loci_properties[locus] = (allele_options, initial_frequencies_ranges)
# Saving allele combinations of the different loci and their order
all_alleles = []
for locus in self.loci:
# [locus][0] are the locus's alleles
all_alleles.append(self.__loci_properties[locus][0])
all_alleles_combinations = list(itertools.product(*all_alleles))
for i in range(len(all_alleles_combinations)):
allele_combination_name = ''
for element in all_alleles_combinations[i]:
allele_combination_name += element + '&'
if allele_combination_name[:-1] not in self.__alleles_combinations:
self.__alleles_combinations.append(allele_combination_name[:-1])
if self.__immigration != 0:
if type(self.__immigration_phenotype) == str:
self.__immigration_phenotype = [self.__immigration_phenotype]
self.__immigrants_genotype = []
print(self.loci, self.__immigration_phenotype)
if len(self.loci) != len(self.__immigration_phenotype):
raise Exception('Incorrect number of phenotypes specified for immigrants')
for allele, locus_i in zip(self.__immigration_phenotype, range(len(self.__immigration_phenotype))):
if allele not in self.__loci_properties[self.loci[locus_i]][0]:
raise Exception(f'{allele} invalid immigrant phenotype in locus {self.loci[locus_i]}')
self.__immigrants_genotype.append([allele, allele])
@property
def generations(self):
return self.__generations
@property
def current_generation(self):
return self.__generation
@property
def generation(self):
return self.__generation
@generation.setter
def generation(self, value):
self.__generation = value
@property
def ancestry_generations(self):
return self.__ancestry_generations
@property
def groups(self):
return self.__groups
@groups.setter
def groups(self, value):
self.__groups = value
@property
def total_groups(self):
return self.__total_groups
@property
def life_expectancy(self):
return self.__life_expectancy
@property
def life_expectancy_sd(self):
return self.__life_expectancy_sd
@property
def survival_probability_mean(self):
return self.__survival_probability_mean
@property
def survival_probability_sd(self):
return self.__survival_probability_sd
@property
def loci_properties(self):
return self.__loci_properties
@property
def loci(self):
return list(self.__loci_properties.keys())
@property
def dict_allele_options(self):
alleles = {}
for locus in self.loci:
# [locus][0] is equal to locus's alleles
alleles[locus] = self.__loci_properties[locus][0]
return alleles
@property
def alleles_combinations(self):
return self.__alleles_combinations
@property
def newest_ind_id(self):
return self.__newest_ind_id
@newest_ind_id.setter
def newest_ind_id(self, value):
self.__newest_ind_id = value
@property
def model_config_dict(self):
return self.__model_config_dict
@property
def stop(self):
return self.__stop
def generate_individual_genotype(self):
"""
Randomize a genotype based on initial locus frequencies.
"""
allele_pairs = []
for locus in self.loci:
alleles = []
for i in range(2):
random_picker = random.random()
# [locus][1] is equal to locus's alleles ranges
for j in range(len(self.__loci_properties[locus][1])):
if random_picker < self.__loci_properties[locus][1][j]:
# [locus][0] is equal to locus's alleles
alleles.append(self.__loci_properties[locus][0][j])
break
allele_pairs.append(alleles)
return allele_pairs
def populate_groups(self):
"""
Initializes the groups with the specified number of individuals in each one and creates the output HDF5 file.
"""
if not self.__test:
# Creates the output file if the class was not called from a test
with h5py.File(self.__output_file_name, 'w') as _:
pass
for group_i in range(self.__group_number):
group = []
for ind_i in range(self.__group_size):
individual = Individual(self)
individual.genotype = self.generate_individual_genotype()
group.append(individual)
self.__groups.append(group)
def reset_survival_prob(self):
"""
Resets the survival probability of each individual to its initial state.
The effect of altruism in one generation only lasts for that generation
"""
for group in self.__groups:
for ind in group:
ind.survival_probability = ind.initial_survival_probability
def selection_event(self):
"""
Survival or death based on the survival probability of the individual. Represents foraging, predators, etc.
"""
survivors = []
# np.array for saving which individuals have survived and which haven't
survived = np.zeros(0)
survived_list_index = 0
viable_groups = False
for group in self.__groups:
survivors_groups = []
# The np.array is resized to add the rest of the individuals
new_survived_size = survived_list_index + len(group)
survived.resize((new_survived_size,), refcheck=False)
for ind_i in range(len(group)):
picker = random.random()
# The individual will survive if a random generated number
# between 0 and 1 is below its survival probability
if picker < group[ind_i].survival_probability:
survivors_groups.append(group[ind_i])
# 1 indicates that the individual has survived
survived[survived_list_index + ind_i] = 1
else:
# 0 indicates that the individual has died
survived[survived_list_index + ind_i] = 0
survivors.append(survivors_groups)
survived_list_index += len(group)
# If there is at least one viable group the simulation will continue
if len(survivors_groups) > 2:
viable_groups = True
if not viable_groups:
self.__stop = True
self.__groups = survivors
# If the method was not called from a test, the survivors data is stored in the output file
if not self.__test:
with h5py.File(self.__output_file_name, 'a') as f:
generation_group = f[f'/generation_{str(self.current_generation).zfill(len(str(self.__generations)))}']
generation_group.create_dataset('survivors', data=survived)
def save_avg_survival_prob(self):
"""
The average survival probability of the individuals before and after altruism is stored. Groups where the
difference between expected and gotten is higher will be rewarded because they have helped each other
for the benefit of the group.
"""
calc_avg_survival_prob = []
exp_avg_survival_prob = []
for group in self.__groups:
calc_survival_prob = np.zeros(len(group))
exp_survival_prob = np.zeros(len(group))
for ind_i in range(len(group)):
# survival_probability can be modified by altruism
calc_survival_prob[ind_i] = group[ind_i].survival_probability
# initial_survival_probability can not be modified by altruism and is inherited
exp_survival_prob[ind_i] = group[ind_i].initial_survival_probability
calc_avg_survival_prob.append(np.mean(calc_survival_prob))
exp_avg_survival_prob.append(np.mean(exp_survival_prob))
self.__calc_avg_survival_prob = calc_avg_survival_prob
self.__exp_avg_survival_prob = exp_avg_survival_prob
def generate_immigrant(self):
"""
Generates a new individuals with the configured immigrants' genotype.
:return: Immigrant individual with the configured genotype and no SNVs in genome
"""
individual = Individual(self)
# Immigrants are homozygous
genotype = [[allele, allele] for allele in self.__immigration_phenotype]
individual.genotype = genotype
return individual
def migration(self):
"""
Emigration and immigration at the population level. The number of emigrants and immigrants are calculated
based on the population size and the proportion configured. The emigrants come from random groups and
immigrants enter random groups, the size of the group is not considered.
"""
population_size = nested_len(self.__groups)
if self.__emigration != 0:
# Groups are selected at random
self.__groups = delete_random_elems(self.__groups, round(population_size * self.__emigration))
if self.__immigration != 0:
immigrants = round(population_size * self.__immigration)
for i in range(immigrants):
# Groups are selected at random
group_index = random.choice(range(len(self.__groups)))
self.__groups[group_index].append(self.generate_immigrant())
def delete_empty_groups(self):
"""
Groups with zero individuals are deleted from the groups list. As the group of each individual is saved
as output data, the original indices of the groups are stored.
"""
for group_i in range(len(self.__groups) - 1, - 1, - 1):
if len(self.__groups[group_i]) < 2:
self.__groups.pop(group_i)
np.delete(self.__groups_indices, group_i)
def discard_old_individuals(self):
"""
Deleted the individuals that will reach their life expectancy in the next generation. These individuals will
leave offspring but will be deleted before the beginning of the next generation
:return: Groups without the individuals that will reach their life expectancy in the next generation
"""
# If all the individuals have a life expectancy of 1,
# the generations are not overlapped and all of them will die
if self.__life_expectancy == 1 and self.__life_expectancy_sd == 0:
new_groups = [[] for _ in range(len(self.__groups))]
else:
new_groups = []
for group in self.__groups:
new_group = []
for ind in group:
reached_life_expectancy = ind.age_individual()
if not reached_life_expectancy:
new_group.append(ind)
new_groups.append(new_group)
return new_groups
def calc_new_groups_missing(self, new_groups):
"""
Calculates the number of individuals that will be born in each group.
:param list[list[Individual]] new_groups: Groups with the survivors of the generations
and without the individuals that will
die of old age in the next generation.
:return: List of missing individuals per group.
"""
# Descendants per survivor
expect_new_inds = [len(group) * self.__descendants_per_survivor for group in self.__groups]
# Modified survival probabilities / Initial survival probabilities ratio
calc_exp_sp_ratios = [self.__calc_avg_survival_prob[group_i] / self.__exp_avg_survival_prob[group_i]
for group_i in range(len(self.__groups))]
# Modified number of newborns based on ratio Modified survival probabilities / Initial survival probabilities
calc_new_inds = [expect_new_inds[group_i] * calc_exp_sp_ratios[group_i]
for group_i in range(len(self.__groups))]
# Scaled number of newborns to be the same as the expected calculated
scaled_new_inds = [round(group_size * sum(expect_new_inds) / sum(calc_new_inds))
for group_size in calc_new_inds]
missing_population = self.__population_size_limit - nested_len(new_groups)
# If adding the newborns is going to make the population exceed the size limit, all the sizes are scaled down.
if sum(scaled_new_inds) > missing_population:
scaled_new_inds = [round(group_size * missing_population / sum(scaled_new_inds))
for group_size in scaled_new_inds]
return scaled_new_inds
def generate_offspring_genome(self, reproducers, descendant):
"""
Generates the genome of the offspring based on the reproducers genomes.
For each locus, there is a 50/50 chance of inherit one of the two chromosomes per parent.
Before the chromosome is inherited, it is mutated and recombined.
:param list[Individual] reproducers: List with the sire and the dam of the descendant, the order is irrelevant.
:param Individual descendant: Offspring that the generated genome will be assigned to.
"""
# Genotype in name of alleles format used in the altruistic step
genotype = []
# Actual Locus objects for generating the Genome object
loci = []
for locus_index in range(len(reproducers[0].genome.loci)):
genotype.append([])
chromosomes = []
# For each locus, each reproducer contribute with one allele (Chromosome object)
for reproducer in reproducers:
# The locus of the parent is cloned
locus = reproducer.genome.loci.copy()[locus_index]
if locus.recombination_rate != 0:
locus.recombine()
# The chromosome that will be inherited is selected at random between the two
chromosome = random.choice(locus.chromosomes)
if locus.mutation_rate != 0:
if locus.mutation_rate * locus.locus_size == 0:
warnings.warn(f'Mutation rate times locus size is equal to 0 in locus {locus.name}, '
f'there will be no mutations')
chromosome.mutate(locus.locus_size, locus.mutation_rate)
chromosomes.append(chromosome)
genotype[locus_index].append(chromosome.allele)
# The chromosomes list is ready to initialize the Locus object
name = self.loci[locus_index]
# The information of the locus is extracted from one of the parents, it is the same for every individual
locus_size = reproducers[0].genome.loci[locus_index].locus_size
mutation_rate = reproducers[0].genome.loci[locus_index].mutation_rate
recombination_rate = reproducers[0].genome.loci[locus_index].recombination_rate
# The Locus object is initialized and stored in the list that will be passed to the Genome object
loci.append(Locus(chromosomes, name, locus_size, mutation_rate, recombination_rate))
descendant.genotype = genotype
descendant.genome = Genome(loci)
def generate_offspring_ancestry(self, reproducers, descendant):
"""
Generates the ancestry list of the offspring based on the reproducers ancestries. The first sublist will be
the IDs of the reproducers and the rest will be extracted from their own ancestry.
:param list[Individual] reproducers: List with the sire and the dam of the descendant, the order is irrelevant.
:param Individual descendant: Offspring that the generated ancestry will be assigned to.
"""
for generation in range(self.__ancestry_generations):
if generation == 0:
# First generation of the ancestry are the parents
descendant.ancestry[generation] = [reproducers[0].id, reproducers[1].id]
else:
# The rest are extracted from the parents, the first half is one parent and the second the other
descendant.ancestry[generation] = reproducers[0].ancestry[generation - 1] + \
reproducers[1].ancestry[generation - 1]
def divide_big_groups(self, groups):
"""
Groups that have reach the group size limit are divided in two at random. Families are not kept together.
:param list[Individuals] groups: Groups of individuals.
"""
divided_groups = []
# If any group is divided the indices for saving the summary of the simulation need to be updated
new_groups_indices = []
for group_i in range(len(groups)):
if len(groups[group_i]) > self.__group_size_limit:
# The index of the group to be divided is deleted, the two new groups will have new indices
np.delete(self.__groups_indices, group_i)
# Individuals are added to the group by age, the group is shuffled to avoid grouping
# older individuals with older individuals and younger with younger
random.shuffle(groups[group_i])
# First half of the group is added as another group
divided_groups.append(groups[group_i][:round(len(groups[group_i]) / 2)].copy())
new_groups_indices.append(self.__groups_indices[group_i])
# The original group index will be the second half
divided_groups.append(groups[group_i][round(len(groups[group_i]) / 2):].copy())
new_groups_indices.append(max(max(self.__groups_indices), max(new_groups_indices)) + 1)
self.__total_groups += 1
else:
divided_groups.append(groups[group_i])
new_groups_indices.append(self.__groups_indices[group_i])
self.__groups_indices = np.array(new_groups_indices)
self.__groups = divided_groups
def group_exchange(self, survivors, newborns):
"""
Migration at group level. The individuals born in the current generation, if any, will be the migrants.
:param list[list[Individual]] survivors: List of groups of individuals that will pass to the next generation.
:param list[list[Individual]] newborns: List of groups of individuals born in the current generation.
:return: Lists of joined survivors and newborns with group exchange.
"""
# If there is more than one group and there is group migration, there will be exchange between groups
if len(survivors) > 1 and self.__group_migration > 0:
for from_group_index in range(len(survivors)):
group_size = len(survivors[from_group_index]) + len(newborns[from_group_index])
# Number of emigrants of the group
emigrants = round(group_size * self.__group_migration)
target_indexes = list(range(len(survivors)))
# Possible target groups
target_indexes.remove(from_group_index)
# If there have been newborn in the group, the migrants will be selected from them
if len(newborns[from_group_index]) != 0:
# If there are enough newborns
if len(newborns[from_group_index]) > emigrants:
# The emigrants are selected at random from the group and sent to a random group
for i in range(emigrants):
emigrant = newborns[from_group_index].pop(random.randrange(len(newborns[from_group_index])))
target_group_index = random.choice(target_indexes)
survivors[target_group_index].append(emigrant)
# If there are not enough newborns, the rest will be selected from the individuals
# that were already in the population
else:
emigrants_left = emigrants - len(newborns[from_group_index])
for i in range(len(newborns[from_group_index])):
emigrant = newborns[from_group_index].pop(random.randrange(len(newborns[from_group_index])))
target_group_index = random.choice(target_indexes)
survivors[target_group_index].append(emigrant)
for i in range(emigrants_left):
emigrant = survivors[from_group_index].pop(
random.randrange(len(survivors[from_group_index])))
target_group_index = random.choice(target_indexes)
survivors[target_group_index].append(emigrant)
# If there are no newborns, but there are individuals from the previous generation (no individuals
# were born) the migrants will be selected from them.
elif len(survivors[from_group_index]) != 0:
for i in range(emigrants):
emigrant = survivors[from_group_index].pop(random.randrange(len(survivors[from_group_index])))
target_group_index = random.choice(target_indexes)
survivors[target_group_index].append(emigrant)
# The survivors groups are joined with the newborns after the migration
for group_i in range(len(survivors)):
survivors[group_i].extend(newborns[group_i])
return survivors
# There will be no migration
else:
for group_i in range(len(survivors)):
survivors[group_i].extend(newborns[group_i])
return survivors
def reproduce(self):
"""
The reproduction occurs inside groups and is independent of the other groups. Parents are selected at random
from the individuals of the group.
"""
self.delete_empty_groups()
new_groups = self.discard_old_individuals()
offspring = [[] for _ in range(len(self.__groups))]
new_group_missing = self.calc_new_groups_missing(new_groups)
for group_index in range(len(self.__groups)):
group_size = new_group_missing[group_index]
while len(offspring[group_index]) < group_size:
reproducers = random.sample(self.__groups[group_index], 2)