Skip to content

Commit

Permalink
updated combined workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
LucR31 committed Nov 6, 2023
1 parent 649878a commit dcc9fdf
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 35 deletions.
65 changes: 46 additions & 19 deletions aiida_flexpart/workflows/multi_dates_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
FlexpartCalculation = plugins.CalculationFactory('flexpart.cosmo')
FlexpartIfsCalculation = plugins.CalculationFactory('flexpart.ifs')

#possible models
cosmo_models = ['cosmo7', 'cosmo1', 'kenda1']
ECMWF_models = ['IFS_GL_05', 'IFS_GL_1', 'IFS_EU_02', 'IFS_EU_01']

def get_simulation_period(date,
age_class_time,
release_duration,
Expand Down Expand Up @@ -45,6 +49,7 @@ def define(cls, spec):
spec.input('simulation_dates', valid_type=orm.List,
help='A list of the starting dates of the simulations')
spec.input('model', valid_type=orm.Str, required=True)
spec.input('model_offline', valid_type=orm.Str, required=True)
spec.input('offline_integration_time', valid_type=orm.Int)
spec.input('integration_time', valid_type=orm.Int, help='Integration time in hours')
spec.input("parent_calc_folder",
Expand All @@ -53,9 +58,6 @@ def define(cls, spec):
help="Working directory of a previously ran calculation to restart from."
)

#outline variables
spec.input('run_cosmo',valid_type=orm.Bool, required=True)

#model settings
spec.input('input_phy', valid_type=orm.Dict)
spec.input('command', valid_type=orm.Dict)
Expand All @@ -66,10 +68,12 @@ def define(cls, spec):
#meteo related inputs
spec.input('meteo_inputs', valid_type=orm.Dict,
help='Meteo models input params.')
spec.input('meteo_inputs_offline', valid_type=orm.Dict,required=False,
help='Meteo models input params.')
spec.input('meteo_path', valid_type=orm.RemoteData,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('meteo_path_ifs', valid_type=orm.RemoteData,
required=True, help='Path to the folder containing the meteorological input data.')
spec.input('meteo_path_offline', valid_type=orm.RemoteData,
required=False, help='Path to the folder containing the meteorological input data.')
spec.input('gribdir', valid_type=orm.Str, required=True)

#others
Expand Down Expand Up @@ -101,7 +105,13 @@ def define(cls, spec):
if_(cls.prepare_meteo_folder_cosmo)(
cls.run_cosmo_simulation
)
).else_(
),
if_(cls.run_ifs)(
if_(cls.prepare_meteo_folder_ifs)(
cls.run_ifs_simulation
)
),
if_(cls.run_offline)(
if_(cls.prepare_meteo_folder_ifs)(
cls.run_ifs_simulation
)
Expand All @@ -114,7 +124,18 @@ def condition(self):
return True if self.ctx.index < len(self.ctx.simulation_dates) else False

def run_cosmo(self):
return True if self.inputs.run_cosmo else False
return True if self.inputs.model in cosmo_models else False

def run_ifs(self):
return True if self.inputs.model in ECMWF_models else False

def run_offline(self):
if self.inputs.model_offline in ECMWF_models and self.inputs.model is not None:
self.ctx.index-=1
return True
elif self.inputs.model_offline in ECMWF_models and self.inputs.model is None:
return True
return False

def setup(self):
"""Prepare a simulation."""
Expand Down Expand Up @@ -227,7 +248,7 @@ def run_cosmo_simulation(self):
# Ask the workflow to continue when the results are ready and store them in the context
running = self.submit(builder)
self.to_context(calculations=engine.append_(running))

self.ctx.index += 1

def run_ifs_simulation(self):
Expand All @@ -236,35 +257,41 @@ def run_ifs_simulation(self):
self.report(f'Running FIFS for {self.ctx.simulation_dates[self.ctx.index]}')
builder = FlexpartIfsCalculation.get_builder()
builder.code = self.inputs.fifs_code

#changes in the command file
new_dict = self.ctx.command.get_dict()
new_dict['simulation_date'] = self.ctx.simulation_dates[self.ctx.index]
if self.ctx.offline_integration_time>0:

if self.inputs.model_offline in ECMWF_models:
new_dict['age_class'] = self.ctx.offline_integration_time * 3600
new_dict['dumped_particle_data'] = True

if self.inputs.model is not None:
self.ctx.parent_calc_folder = self.ctx.calculations[-1].outputs.remote_folder
self.report(f'starting from: {self.ctx.parent_calc_folder}')
else:
self.ctx.parent_calc_folder = self.inputs.parent_calc_folder

builder.meteo_path = self.inputs.meteo_path_offline
builder.parent_calc_folder = self.ctx.parent_calc_folder
new_dict.update(self.inputs.meteo_inputs_offline)
else:
new_dict['age_class'] = self.inputs.integration_time * 3600
new_dict.update(self.inputs.meteo_inputs)
new_dict['convection_parametrization'] = 1
builder.meteo_path = self.inputs.meteo_path
new_dict.update(self.inputs.meteo_inputs)

#model settings
builder.model_settings = {
'release_settings': self.ctx.release_settings,
'locations': self.ctx.locations,
'command': orm.Dict(dict=new_dict),
}

builder.outgrid = self.ctx.outgrid
builder.outgrid_nest = self.ctx.outgrid_nest
builder.species = self.ctx.species
builder.land_use = self.inputs.land_use_ifs
builder.meteo_path = self.inputs.meteo_path_ifs

#remote folder from cosmo calc
if 'parent_calc_folder' in self.inputs:
builder.parent_calc_folder = self.inputs.parent_calc_folder


# Walltime, memory, and resources.
builder.metadata.description = 'Test workflow to submit a flexpart calculation'
builder.metadata.options = self.inputs.flexpartifs.metadata.options
Expand Down
44 changes: 28 additions & 16 deletions examples/example_workflow_combi.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def simulation_dates_parser(date_list: list) -> list:
def test_run(flexpart_code):
"""Run workflow."""

simulation_dates = simulation_dates_parser(['2021-01-03'])
simulation_dates = simulation_dates_parser(['2021-01-07, 2021-01-08'])
model = 'cosmo7'
model_offline = 'IFS_GL_05'
username='lfernand'
users_address=f'/users/{username}/resources/flexpart/'
scratch_address=f'/scratch/snx3000/{username}/FLEXPART_input/'
Expand All @@ -75,14 +77,9 @@ def test_run(flexpart_code):
surfdepo = orm.RemoteData(
remote_path = users_address+'surfdepo.t',
computer=flexpart_code.computer)
meteo_path = orm.RemoteData(
remote_path=scratch_address+'cosmo7/',
computer = flexpart_code.computer)
meteo_path_ifs = orm.RemoteData(
remote_path = scratch_address+'IFS_GL_05',
computer=flexpart_code.computer)
#parent_folder = orm.load_node(pk previous tsk)
parent_folder = orm.RemoteData(
remote_path = scratch_address+'/',
remote_path = '/scratch/snx3000/lfernand/aiida/76/8d/cb2c-2fc6-46c4-b609-1d33fce0f60c',
computer=flexpart_code.computer)

#builder starts
Expand All @@ -96,18 +93,33 @@ def test_run(flexpart_code):
#basic settings
builder.simulation_dates = simulation_dates
builder.integration_time = orm.Int(24)
builder.run_cosmo = orm.Bool(False)
builder.offline_integration_time = orm.Int(0)
builder.offline_integration_time = orm.Int(48)

#meteo realted settings
builder.model=orm.Str('cosmo7')
builder.model = orm.Str(model)
builder.model_offline = orm.Str(model_offline)

meteo_path = orm.RemoteData(
remote_path=scratch_address+model+'/',
computer = flexpart_code.computer)
builder.meteo_path = meteo_path
builder.meteo_path_ifs = meteo_path_ifs
builder.gribdir=orm.Str(scratch_address)
builder.meteo_inputs = orm.Dict(
dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[
'cosmo7',
])['cosmo7'])
model,
])[model])

if model_offline is not None:
meteo_path_offline = orm.RemoteData(
remote_path = scratch_address+model_offline,
computer=flexpart_code.computer)
builder.meteo_path_offline = meteo_path_offline
builder.meteo_inputs_offline = orm.Dict(
dict=read_yaml_data('inputs/meteo_inputs.yaml', names=[
model_offline,
])[model_offline])

builder.gribdir=orm.Str(scratch_address)


#model settings
builder.command = orm.Dict(
Expand Down Expand Up @@ -141,7 +153,7 @@ def test_run(flexpart_code):
'surfdata': surfdata,
'surfdepo': surfdepo,
}
#builder.parent_calc_folder = parent_folder
builder.parent_calc_folder = parent_folder
builder.flexpart.metadata.options.stash = {
'source_list': ['aiida.out', 'grid_time_*.nc'],
'target_base': f'/store/empa/em05/{username}/aiida_stash',
Expand Down

0 comments on commit dcc9fdf

Please sign in to comment.