diff --git a/eMERLIN_CASA_pipeline.py b/eMERLIN_CASA_pipeline.py index 333bf8a..671d00c 100644 --- a/eMERLIN_CASA_pipeline.py +++ b/eMERLIN_CASA_pipeline.py @@ -49,31 +49,6 @@ def load_obj(name): return pickle.load(f) -def join_lists(x=[]): - x1 = ','.join(x) - x2 = set(x1.split(',')) - return ','.join(x2) - -# Sources to process -sources = {} -sources['targets'] = inputs['targets'] -sources['phscals'] = inputs['phscals'] -sources['fluxcal'] = inputs['fluxcal'] -sources['bpcal'] = inputs['bpcal'] -sources['ptcal'] = inputs['ptcal'] -sources['calsources'] = join_lists([sources['phscals'], sources['fluxcal'],sources['bpcal'], sources['ptcal']]) -sources['maincal'] = join_lists([sources['fluxcal'],sources['bpcal'], sources['ptcal']]) -sources['allsources'] = join_lists([sources['calsources'], sources['targets']]) -sources['no_fluxcal'] = sources['allsources'].replace(sources['fluxcal'], '').replace(',,',',').strip(',') -sources['cals_no_fluxcal'] = sources['calsources'].replace(sources['fluxcal'], '').replace(',,',',').strip(',') -sources['targets_phscals'] = join_lists([sources['targets'],sources['phscals']]) - -logger.info('Targets: {0}'.format(sources['targets'])) -logger.info('Phasecals: {0}'.format(sources['phscals'])) -logger.info('Fluxcal: {0}'.format(sources['fluxcal'])) -logger.info('Bandpass: {0}'.format(sources['bpcal'])) -logger.info('Pointcal: {0}'.format(sources['ptcal'])) - # Flags applied to the data by the pipeline try: flags = load_obj('./flags') @@ -95,8 +70,6 @@ def join_lists(x=[]): fitsfile = inputs['inbase']+'.fits' msfile = inputs['inbase']+'.ms' -save_obj(sources, 'sources') - ## Check for measurement sets in current directory otherwise drag from defined data directory if os.path.isdir(inputs['inbase']+'.ms') == False and os.path.isdir(inputs['inbase']+'.mms') == False: if os.path.isdir(data_dir+inputs['inbase']+'.mms') == True: @@ -108,12 +81,16 @@ def join_lists(x=[]): else: logger.info('Measurement set found: {}. Continuing with your inputs'.format(msfile)) + +################################# +### LOAD AND PREPROCESS DATA ### +################################# + ## Pipeline processes, inputs are read from the inputs dictionary if inputs['run_importfits'] == 1: em.run_importfitsIDI(data_dir,msfile) em.check_mixed_mode(msfile,mode='split') - if inputs['hanning'] == 1: em.hanning(inputvis=msfile,deloriginal=True) @@ -127,24 +104,33 @@ def join_lists(x=[]): else: em.ms2mms(vis=msfile,mode='parallel') -## check for parallelisation +### check for parallelisation if os.path.isdir('./'+inputs['inbase']+'.mms') == True: msfile = inputs['inbase']+'.mms' +### Create dictionary with ms information. +# It will try to remake the msinfo dictionary from the msfile. If the msfile is +# not there, it will try to load the pkl file. If nothing there (means that we +# don't care about the unaveraged data), nothing is done. +if os.path.isdir(msfile): + msinfo = em.get_msinfo(msfile, inputs) +else: + try: + msinfo = load_ob(msfile) + except: + pass + ### Run AOflagger if inputs['flag_0_aoflagger'] == 1: flags = em.run_aoflagger_fields(vis=msfile, flags=flags, fields='all', pipeline_path = pipeline_path) - ### Produce some initial plots ### if inputs['prediag'] == 1: em.do_prediagnostics(msfile,plots_dir) - ### A-priori flagdata: Lo&Mk2, edge channels, standard quack if inputs['flag_1_apriori'] == 1: - flags = em.flagdata1_apriori(msfile=msfile, sources=sources, flags=flags, - antennas=antennas, do_quack=True) + flags = em.flagdata1_apriori(msfile=msfile, msinfo=msinfo, flags=flags, do_quack=True) ### Load manual flagging file if inputs['flag_2a_manual'] == 1: @@ -153,7 +139,7 @@ def join_lists(x=[]): ### Average data ### if inputs['average_1'] == 1: - em.run_split(msfile, fields=sources['allsources'], sources=sources, width=4, timebin='2s') + em.run_split(msfile, msinfo, width=4, timebin='2s') # Check if averaged data already generated if os.path.isdir('./'+inputs['inbase']+'_avg.mms') == True: @@ -165,40 +151,17 @@ def join_lists(x=[]): logger.info('Using MS file: {0}'.format(msfile)) +### Create dictionary with ms information. +if os.path.isdir(msfile): + msinfo = em.get_msinfo(msfile, inputs) ### Load manual flagging file if inputs['flag_2b_manual'] == 1: flags = em.flagdata2_manual(msfile=msfile, inpfile=inputs['manual_flags_b'], flags=flags) +### Defining reference antenna +msinfo['refant'] = em.define_refant(msfile, msinfo, inputs) -### Retrieve MS information -# Sources in the MS -sources['msfile_fields'] = ','.join(vishead(msfile,mode='list',listitems='field')['field'][0]) -logger.info('Sources in MS {0}: {1}'.format(msfile, sources['msfile_fields'])) - -# Antenna list and reference antenna -ms.open(msfile) -d = ms.getdata(['axis_info'],ifraxis=True) -ms.close() -antennas = np.unique('-'.join(d['axis_info']['ifr_axis']['ifr_name']).split('-')) - -logger.info('Antennas in MS: {0}'.format(antennas)) - -# Defining reference antenna -refant = inputs['refant'] -refant_user = refant.replace(' ', '').split(',') -refant_in_ms = (np.array([ri in antennas for ri in refant_user])).all() - -if not refant_in_ms: - if refant != '': - logger.warning('Selected reference antenna(s) {0} not in MS! User selection will be ignored'.format(refant)) - # Finding best antennas for refant - refant, refant_pref = em.find_refant(msfile, field=sources['bpcal'], - antennas='Mk2,Pi,Da,Kn', spws='2,3', scan='') -else: - refant = ','.join(refant_user) - -logger.info('Refant: {}'.format(refant)) ################### @@ -211,54 +174,52 @@ def join_lists(x=[]): caltables = load_obj(calib_dir+'caltables') logger.info('Loaded previous calibration tables from: {0}'.format(calib_dir+'caltables.pkl')) except: - num_spw = len(vishead(msfile, mode = 'list', listitems = ['spw_name'])['spw_name'][0]) caltables = {} caltables['inbase'] = inputs['inbase'] caltables['plots_dir'] = plots_dir caltables['calib_dir'] = calib_dir - caltables['num_spw'] = num_spw - caltables['refant'] = refant + caltables['num_spw'] = msinfo['num_spw'] + caltables['refant'] = msinfo['refant'] logger.info('New caltables dictionary created. Saved to: {0}'.format(calib_dir+'caltables.pkl')) ### Initialize models ### if inputs['init_models'] == 1: # Need to add parameter to GUI models_path = pipeline_path+'calibrator_models/' - em.run_initialize_models(msfile=msfile, fluxcal=sources['fluxcal'], + em.run_initialize_models(msfile=msfile, fluxcal=msinfo['sources']['fluxcal'], models_path=models_path, - delmod_sources=sources['no_fluxcal']) + delmod_sources=msinfo['sources']['no_fluxcal']) ### Initial BandPass calibration ### if inputs['bandpass_0'] > 0: caltables = em.initial_bp_cal(msfile=msfile, caltables=caltables, - previous_cal=[], bpcal=sources['bpcal']) + previous_cal=[], bpcal=msinfo['sources']['bpcal']) save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_bandpass0') if inputs['bandpass_0'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, - previous_cal=['bpcal.B0'], - previous_cal_targets=['bpcal.B0']) + em.run_applycal(msfile=msfile, caltables=caltables, + sources=msinfo['sources'], previous_cal=['bpcal.B0'], + previous_cal_targets=['bpcal.B0']) ### Flagdata using TFCROP and bandpass shape B0 if inputs['flag_3_tfcropBP'] == 1: # If B0 has not been applied before, do it now if inputs['bandpass_0'] != 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, - previous_cal=['bpcal.B0'], - previous_cal_targets=['bpcal.B0']) - flags = em.flagdata3_tfcropBP(msfile=msfile, sources=sources, flags=flags) + em.run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], + previous_cal=['bpcal.B0'], previous_cal_targets=['bpcal.B0']) + flags = em.flagdata3_tfcropBP(msfile=msfile, msinfo=msinfo, flags=flags) ### Delay calibration ### if inputs['delay'] > 0: caltables = em.solve_delays(msfile=msfile, caltables=caltables, - previous_cal=['bpcal.B0'], calsources=sources['calsources']) + previous_cal=['bpcal.B0'], calsources=msinfo['sources']['calsources']) # Should the previous_cal be bpcal.B0? Probably better delay fit, but later # delay.K1 is applied without bpcal.B0, when bpcal_sp.B1 is computed save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_delay') if inputs['delay'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, + em.run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], previous_cal=['bpcal.B0','delay.K1'], previous_cal_targets=['bpcal.B0','delay.K1']) @@ -266,36 +227,39 @@ def join_lists(x=[]): ### Initial gain calibration ### if inputs['gain_0_p_ap'] > 0: caltables = em.initial_gaincal(msfile=msfile, caltables=caltables, - previous_cal=['delay.K1', 'bpcal.B0'], - calsources=sources['calsources'], phscals=sources['phscals']) + previous_cal=['delay.K1', 'bpcal.B0'], + calsources=msinfo['sources']['calsources'], + phscals=msinfo['sources']['phscals']) save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_gaincal') if inputs['gain_0_p_ap'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, + em.run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], previous_cal=['delay.K1','allcal_p.G0','allcal_ap.G1','bpcal.B0'], previous_cal_targets=['delay.K1','phscal_p_scan.G2','allcal_ap.G1','bpcal.B0']) ### Flux scale ### if inputs['fluxscale'] > 0: caltables = em.eM_fluxscale(msfile=msfile, caltables=caltables, - sources=sources, - ampcal_table='allcal_ap.G1', antennas=antennas) + sources=msinfo['sources'], + ampcal_table='allcal_ap.G1', + antennas=msinfo['antennas']) save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_fluxscale') if inputs['fluxscale'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, - previous_cal=['delay.K1','allcal_p.G0','allcal_ap.G1_fluxscaled','bpcal.B0'], - previous_cal_targets=['delay.K1','phscal_p_scan.G2','allcal_ap.G1_fluxscaled','bpcal.B0']) + em.run_applycal(msfile=msfile, caltables=caltables, + sources=msinfo['sources'], + previous_cal=['delay.K1','allcal_p.G0','allcal_ap.G1_fluxscaled','bpcal.B0'], + previous_cal_targets=['delay.K1','phscal_p_scan.G2','allcal_ap.G1_fluxscaled','bpcal.B0']) ### BandPass calibration with spectral index information ### if inputs['bandpass_1_sp'] > 0: caltables = em.bandpass_sp(msfile=msfile, caltables=caltables, previous_cal=['delay.K1','allcal_p.G0','allcal_ap.G1_fluxscaled'], - bpcal=sources['bpcal']) + bpcal=msinfo['sources']['bpcal']) save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_bandpass_sp') if inputs['bandpass_1_sp'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, + em.run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], previous_cal=['delay.K1','allcal_p.G0','allcal_ap.G1_fluxscaled','bpcal_sp.B1'], previous_cal_targets=['delay.K1','phscal_p_scan.G2','allcal_ap.G1_fluxscaled','bpcal_sp.B1']) @@ -303,22 +267,32 @@ def join_lists(x=[]): if inputs['gain_1_amp_sp'] > 0: caltables = em.sp_amp_gaincal(msfile=msfile, caltables=caltables, previous_cal=['delay.K1','allcal_p.G0','bpcal_sp.B1'], - calsources=sources['calsources']) + calsources=msinfo['sources']['calsources']) save_obj(caltables, calib_dir+'caltables') save_obj(caltables, calib_dir+'caltables_gaincal') if inputs['gain_1_amp_sp'] == 2: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, - previous_cal=['delay.K1','bpcal_sp.B1','allcal_p.G0','allcal_ap.G3'], - previous_cal_targets=['delay.K1','bpcal_sp.B1','phscal_p_scan.G2','allcal_ap.G3']) + em.run_applycal(msfile=msfile, caltables=caltables, + sources=msinfo['sources'], + previous_cal=['delay.K1','bpcal_sp.B1','allcal_p.G0','allcal_ap.G3'], + previous_cal_targets=['delay.K1','bpcal_sp.B1','phscal_p_scan.G2','allcal_ap.G3']) ### Apply calibration ### if inputs['applycal_all'] > 0: - em.run_applycal(msfile=msfile, caltables=caltables, sources=sources, + em.run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], previous_cal=['delay.K1','bpcal_sp.B1','allcal_p.G0','allcal_ap.G3'], previous_cal_targets=['delay.K1','bpcal_sp.B1','phscal_p_scan.G2','allcal_ap.G3']) +### Run monitoring for bright sources: +try: + if inputs['monitoring'] == 1: + flags, caltables = em.monitoring(msfile=msfile, msinfo=msinfo, + flags=flags, caltables=caltables, + previous_cal=['']) +except: + pass + logger.info('Pipeline finished') logger.info('#################') diff --git a/functions/eMERLIN_CASA_functions.py b/functions/eMERLIN_CASA_functions.py index f20ac4c..a98df5f 100644 --- a/functions/eMERLIN_CASA_functions.py +++ b/functions/eMERLIN_CASA_functions.py @@ -15,6 +15,7 @@ from tasks import * from casa import * from recipes.setOrder import setToCasaOrder +import datetime import logging logger = logging.getLogger('logger') @@ -181,6 +182,109 @@ def check_band(msfile): band = 'K' return band +def get_baselines(msfile): + ms.open(msfile) + baselines0 = ms.getdata(['axis_info'],ifraxis=True)['axis_info']['ifr_axis']['ifr_name'] + ms.close() + baselines = [] + for bsl_name in baselines0: + ant = bsl_name.split('-') + bsl = bsl_name.replace('-', '&') + if ant[0] != ant[1]: + baselines.append(bsl) + return baselines + +def get_dates(d): + t_mjd = d['axis_info']['time_axis']['MJDseconds']/60./60./24. + t = np.array([mjdtodate(timei) for timei in t_mjd]) + return t_mjd, t + + +def mjdtodate(mjd): + origin = datetime.datetime(1858,11,17) + date = origin + datetime.timedelta(mjd) + return date + +def join_lists(x=[]): + x1 = ','.join(x) + x2 = set(x1.split(',')) + return ','.join(x2) + +def user_sources(inputs): + sources = {} + sources['targets'] = inputs['targets'] + sources['phscals'] = inputs['phscals'] + sources['fluxcal'] = inputs['fluxcal'] + sources['bpcal'] = inputs['bpcal'] + sources['ptcal'] = inputs['ptcal'] + sources['calsources'] = join_lists([sources['phscals'], sources['fluxcal'],sources['bpcal'], sources['ptcal']]) + sources['maincal'] = join_lists([sources['fluxcal'],sources['bpcal'], sources['ptcal']]) + sources['allsources'] = join_lists([sources['calsources'], sources['targets']]) + sources['no_fluxcal'] = sources['allsources'].replace(sources['fluxcal'], '').replace(',,',',').strip(',') + sources['cals_no_fluxcal'] = sources['calsources'].replace(sources['fluxcal'], '').replace(',,',',').strip(',') + sources['targets_phscals'] = join_lists([sources['targets'],sources['phscals']]) + logger.info('Targets: {0}'.format(sources['targets'])) + logger.info('Phasecals: {0}'.format(sources['phscals'])) + logger.info('Fluxcal: {0}'.format(sources['fluxcal'])) + logger.info('Bandpass: {0}'.format(sources['bpcal'])) + logger.info('Pointcal: {0}'.format(sources['ptcal'])) + return sources + +def get_antennas(msfile): + # Antenna list + ms.open(msfile) + d = ms.getdata(['axis_info'],ifraxis=True) + ms.close() + antennas = np.unique('-'.join(d['axis_info']['ifr_axis']['ifr_name']).split('-')) + logger.info('Antennas in MS {0}: {1}'.format(msfile, antennas)) + return antennas + +def prt_dict(d): + for key in d.keys(): + if type(d[key]) == dict: + prt_dict(d[key]) + else: + print('{0:20s} {1}'.format(key, d[key])) + +def get_timefreq(msfile): + # Date and time of observation + ms.open(msfile) + axis_info = ms.getdata(['axis_info'],ifraxis=True) + ms.close() + t_mjd, t = get_dates(axis_info) + freq_ini = np.min(axis_info['axis_info']['freq_axis']['chan_freq'])/1e9 + freq_end = np.max(axis_info['axis_info']['freq_axis']['chan_freq'])/1e9 + chan_res = np.mean(axis_info['axis_info']['freq_axis']['resolution'])/1e9 + return t[0], t[-1], freq_ini, freq_end, chan_res + +def ms_sources(msfile): + mssources = ','.join(vishead(msfile,mode='list',listitems='field')['field'][0]) + logger.info('Sources in MS {0}: {1}'.format(msfile, mssources)) + return mssources + +def get_msinfo(msfile, inputs, doprint=False): + logger.info('Reading ms file information for MS: {0}'.format(msfile)) + msinfo = {} + msinfo['msfile'] = msfile + msinfo['sources'] = user_sources(inputs) + msinfo['mssources'] = ms_sources(msfile) + msinfo['antennas'] = get_antennas(msfile) + msinfo['band'] = check_band(msfile) + msinfo['baselines'] = get_baselines(msfile) + msinfo['num_spw'] = len(vishead(msfile, mode = 'list', listitems = ['spw_name'])['spw_name'][0]) + t_ini, t_end, freq_ini, freq_end, chan_res = get_timefreq(msfile) + msinfo['t_ini'] = t_ini + msinfo['t_end'] = t_end + msinfo['freq_ini'] = freq_ini + msinfo['freq_end'] = freq_end + msinfo['chan_res'] = chan_res + save_obj(msinfo, msfile) + logger.info('Saving information of MS {0} in: {1}'.format(msfile, msfile+'.pkl')) + if doprint: + prt_dict(msinfo) + return msinfo + + def run_importfitsIDI(data_dir,vis, setorder=False): logger.info('Starting importfitsIDI procedure') os.system('rm -r '+vis) @@ -391,7 +495,7 @@ def do_prediagnostics(vis,plot_dir): logger.info('End prediagnostics') -def flagdata1_apriori(msfile, sources, flags, antennas, do_quack=True): +def flagdata1_apriori(msfile, msinfo, flags, do_quack=True): logger.info('Start flagdata1_apriori') # Find number of channels in MS: ms.open(msfile) @@ -400,21 +504,21 @@ def flagdata1_apriori(msfile, sources, flags, antennas, do_quack=True): nchan = len(d['axis_info']['freq_axis']['chan_freq'][:,0]) # Flag Lo-Mk2 - if 'Lo' in antennas and 'Mk2' in antennas: + if 'Lo' in msinfo['antennas'] and 'Mk2' in msinfo['antennas']: logger.info('Flagging Lo-Mk2 baseline') - flagdata(vis=msfile, mode='manual', field=sources['allsources'], antenna='Lo*&Mk2*') + flagdata(vis=msfile, mode='manual', field=msinfo['sources']['allsources'], antenna='Lo*&Mk2*') # Subband edges channels_to_flag = '*:0~{0};{1}~{2}'.format(nchan/128-1, nchan-nchan/128, nchan-1) logger.info('MS has {} channels'.format(nchan)) logger.info('Flagging edge channels {0}'.format(channels_to_flag)) - flagdata(vis=msfile, mode='manual', field=sources['allsources'], spw=channels_to_flag) + flagdata(vis=msfile, mode='manual', field=msinfo['sources']['allsources'], spw=channels_to_flag) # Slewing (typical): # Main calibrators, 5 min logger.info('Flagging 5 min from bright calibrators') - flagdata(vis=msfile, field=sources['maincal'], mode='quack', quackinterval=300) + flagdata(vis=msfile, field=msinfo['sources']['maincal'], mode='quack', quackinterval=300) # Target and phase reference, 20 sec logger.info('Flagging first 20 sec of target and phasecal') - flagdata(vis=msfile, field=sources['targets_phscals'], mode='quack', quackinterval=20) + flagdata(vis=msfile, field=msinfo['sources']['targets_phscals'], mode='quack', quackinterval=20) # We can add more (Lo is slower, etc). flag_applied(flags, 'flagdata1_apriori') logger.info('End flagdata1_apriori') @@ -428,14 +532,14 @@ def flagdata2_manual(msfile, inpfile, flags): logger.info('End flagdata_manual') return flags -def flagdata3_tfcropBP(msfile, sources, flags): +def flagdata3_tfcropBP(msfile, msinfo, flags): logger.info('Start flagdata3_tfcropBP') logger.info("Running flagdata, mode = 'tfcrop'") logger.info("correlation='ABS_ALL'") logger.info("ntime='90min', combinescans=True, datacolumn='corrected'") logger.info("winsize=3, timecutoff=4.0, freqcutoff=3.0, maxnpieces=1") logger.info("usewindowstats='sum', halfwin=3, extendflags=True") - flagdata(vis=msfile, mode='tfcrop', field=sources['allsources'], + flagdata(vis=msfile, mode='tfcrop', field=msinfo['sources']['allsources'], antenna='', scan='',spw='', correlation='ABS_ALL', ntime='90min', combinescans=True, datacolumn='corrected', winsize=3, timecutoff=4.0, freqcutoff=3.0, maxnpieces=1, @@ -445,6 +549,25 @@ def flagdata3_tfcropBP(msfile, sources, flags): logger.info('End flagdata3_tfcropBP') return flags +def flagdata_tfcrop_bright(msfile, sources, flags, datacolumn='DATA'): + logger.info('Start flagdata_tfcrop_bright') + logger.info("Running flagdata, mode = 'tfcrop'") + logger.info("correlation='ABS_ALL'") + logger.info("ntime='90min', combinescans=True, datacolumn='{}'".format(datacolumn)) + logger.info("winsize=3, timecutoff=4.0, freqcutoff=3.0, maxnpieces=5") + logger.info("usewindowstats='sum', halfwin=3, extendflags=True") + flagdata(vis=msfile, mode='tfcrop', field=sources['allsources'], + antenna='', scan='',spw='', correlation='ABS_ALL', + ntime='90min', combinescans=True, datacolumn=datacolumn, + winsize=3, timecutoff=3.6, freqcutoff=3.6, maxnpieces=2, + usewindowstats='sum', halfwin=3, extendflags=True, + action='apply', display='', flagbackup=True) + flagdata(vis=msfile, mode='extend', ntime='90min', extendpols=False, + growtime=50, growfreq=0) + flag_applied(flags, 'flagdata_tfcrop_bright') + logger.info('End flagdata_tfcrop_bright') + return flags + def flag_applied(flags, new_flag): if new_flag in flags: logger.warning('Flags from {0} were already applied by the pipeline! They are applied more than once.'.format(new_flag)) @@ -454,6 +577,22 @@ def flag_applied(flags, new_flag): save_obj(flags, './flags') return flags +def define_refant(msfile, msinfo, inputs): + refant0 = inputs['refant'] + refant_user = refant0.replace(' ', '').split(',') + refant_in_ms = (np.array([ri in msinfo['antennas'] for ri in refant_user])).all() + if not refant_in_ms: + if refant0 != '': + logger.warning('Selected reference antenna(s) {0} not in MS! User selection will be ignored'.format(refant0)) + # Finding best antennas for refant + refant, refant_pref = find_refant(msfile, field=msinfo['sources']['bpcal'], + antennas='Mk2,Pi,Da,Kn', spws='2,3', scan='') + else: + refant = ','.join(refant_user) + logger.info('Refant: {}'.format(refant)) + return refant + + def find_refant(msfile, field, antennas='', spws='', scan=''): logger.info('Searching refant automatically') ms.open(msfile) @@ -482,10 +621,13 @@ def find_refant(msfile, field, antennas='', spws='', scan=''): snrj = [] for basel in antennas: if basel != anten: - d2 = visstat(msfile, antenna='{}&{}'.format(anten, basel), - field=field, axis='amplitude', scan=scan, - spw=str(spw)+channels, correlation=corr) - snrj.append(d2['DATA']['median']/d2['DATA']['stddev']) + try: + d2 = visstat(msfile, antenna='{}&{}'.format(anten, basel), + field=field, axis='amplitude', scan=scan, + spw=str(spw)+channels, correlation=corr) + snrj.append(d2['DATA']['median']/d2['DATA']['stddev']) + except: + snrj.append(0.0) snr[i, j, k] = np.mean(snrj) # Sorting antennas by average S/N, averaged in polarization and spw. @@ -513,14 +655,18 @@ def find_refant(msfile, field, antennas='', spws='', scan=''): ### Run CASA calibration functions -def run_split(msfile, fields, sources, width, timebin, datacolumn='data'): +def run_split(msfile, msinfo, width, timebin, datacolumn='data'): logger.info('Start split') # Check that all sources are there: - sources_not_in_msfile = [s for s in sources['allsources'].split(',') if s not in sources['msfile_fields']] + sources_not_in_msfile = [s for s in + msinfo['sources']['allsources'].split(',') + if s not in msinfo['mssources'].split(',')] if len(sources_not_in_msfile) > 0: - fields = ','.join(sources['msfile_fields']) + fields = msinfo['mssources'] logger.warning('Fields {} not present in MS but listed in inputs file.'.format(','.join(sources_not_in_msfile))) logger.warning('All fields will be included in the averaged MS.') + else: + fields = msinfo['sources']['allsources'] name = '.'.join(msfile.split('.')[:-1]) exte = ''.join(msfile.split('.')[-1]) outputmsfile = name+'_avg.'+exte @@ -710,7 +856,7 @@ def run_applycal(msfile, caltables, sources, previous_cal, previous_cal_targets= ### Calibration steps -def solve_delays(msfile, caltables, previous_cal, calsources): +def solve_delays(msfile, caltables, previous_cal, calsources, solint='600s'): logger.info('Start solve_delays') caltable_name = 'delay.K1' caltables[caltable_name] = {} @@ -719,7 +865,7 @@ def solve_delays(msfile, caltables, previous_cal, calsources): caltables[caltable_name]['field'] = calsources caltables[caltable_name]['gaintype'] = 'K' caltables[caltable_name]['calmode'] = 'p' - caltables[caltable_name]['solint'] = '600s' + caltables[caltable_name]['solint'] = solint caltables[caltable_name]['interp'] = 'linear' caltables[caltable_name]['spwmap'] = [0]*caltables['num_spw'] caltables[caltable_name]['combine'] = 'spw' @@ -1064,6 +1210,93 @@ def sp_amp_gaincal(msfile, caltables, previous_cal, calsources): logger.info('End gaincal_amp_sp') return caltables +def compile_statistics(msfile, tablename=''): + logger.info('Start compile_stats') + # Num of spw and baselines + num_spw = len(vishead(msfile, mode = 'list', listitems = ['spw_name'])['spw_name'][0]) + baselines = get_baselines(msfile) + # Date and time of observation + ms.open(msfile) + axis_info = ms.getdata(['axis_info'],ifraxis=True) + ms.close() + vis_field = vishead(msfile,mode='list',listitems='field')['field'][0][0] # Only one expected + t_mjd, t = get_dates(axis_info) + freq_ini = np.min(axis_info['axis_info']['freq_axis']['chan_freq'])/1e9 + freq_end = np.max(axis_info['axis_info']['freq_axis']['chan_freq'])/1e9 + chan_res = np.mean(axis_info['axis_info']['freq_axis']['resolution'])/1e9 + band = check_band(msfile) + data_stats = [] + for bsl in baselines: + print('Processing baseline: {}'.format(bsl)) + for spw in range(num_spw): + for corr in ['RR', 'LL']: + d = visstat(msfile, antenna=bsl, axis='amplitude', + spw=str(spw)+':100~400', correlation=corr) + data_stats.append([t[0], + t[-1], + np.mean(t_mjd), + band, + freq_ini, + freq_end, + chan_res, + vis_field, + bsl, + spw, + corr, + d['DATA']['mean'], + d['DATA']['npts'], + d['DATA']['median'], + d['DATA']['stddev']]) + print t[0], vis_field, bsl, spw, corr, d['DATA']['median'], d['DATA']['stddev'] + data_stats = np.asarray(data_stats) + ini_date = str(t[0]).replace('-', '').replace(':','').replace(' ','_').split('.')[0] + outname = '{0}_{1}_{2}.npy'.format(ini_date, vis_field, band) + np.save('data_'+outname, data_stats) + logger.info('Data statistics saved to: {0}'.format(outname)) + if tablename != '': + compile_delays(tablename, outname) + logger.info('End compile_stats') + + +def compile_delays(tablename, outname): + tb.open(tablename+'/ANTENNA') + antennas = tb.getcol('NAME') + tb.close() + tb.open(tablename) + a = tb.getcol('ANTENNA1') + times = tb.getcol('TIME') + delays = tb.getcol('FPARAM') + tb.close() + delay_stats = [] + for i in range(len(times)): + delay_stats.append([antennas[a[i]], 'RR', delays[0,0,i]]) + delay_stats.append([antennas[a[i]], 'LL', delays[1,0,i]]) + delay_stats = np.asarray(delay_stats) + np.save('delay_'+outname, delay_stats) + logger.info('Delay statistics saved to: {0}'.format(outname)) + + +def monitoring(msfile, msinfo, flags, caltables, previous_cal): + # This is intented to run on a single-source file for daily monitoring on + # unaveraged data + logger.info('Starting monitoring') + band = check_band(msfile) + if band == 'L': + hanning(inputvis=msfile,deloriginal=True) + + flags = flagdata1_apriori(msfile=msfile, msinfo=msinfo, flags=flags, do_quack=True) + + flags = flagdata_tfcrop_bright(msfile=msfile, sources=msinfo['sources'], flags=flags) + caltables = solve_delays(msfile=msfile, caltables=caltables, + previous_cal=[], calsources=msinfo['sources']['calsources'], + solint='60s') + run_applycal(msfile=msfile, caltables=caltables, sources=msinfo['sources'], + previous_cal=['delay.K1'], + previous_cal_targets=['delay.K1']) + compile_statistics(msfile, tablename=caltables['delay.K1']['table']) + return flags, caltables + + def dfluxpy(freq,baseline): #######