# coding: latin-1 import csv import glob import shutil import threading import uuid from datetime import datetime import netCDF4 import numpy as np import os from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \ date2str from basins import Basins from earthdiagnostics.utils import Utils, TempFile class DataManager(object): def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks, scratch_dir, nfrp, member_digits, calendar='standard'): """ :param institution: :param model: :param expid: :param datafolder: :param frequency: :param chunk_size: :param experiment_name: :param num_chunks: :param calendar: """ self.initialization_method = 'to be filled' self.initialization_description = 'to be filled' self.physics_version = 'to be filled' self.physics_description = 'to be filled' self.associated_model = 'to be filled' self.source = 'to be filled' self.associated_experiment = 'to be filled' self.institution = institution self.model = model self.expid = expid self.data_dir = datafolder self.frequency = frequency self.chunk_size = chunk_size self.experiment_name = experiment_name self.add_startdate = True self.add_name = True self.num_chunks = num_chunks self.calendar = calendar self.scratch_dir = scratch_dir self.nfrp = nfrp self.member_digits = member_digits # noinspection PyPep8Naming def prepare_CMOR_files(self, startdates, members, force_rebuild, ocean, atmosphere): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :param atmosphere: activates atmosphere files cmorization :type atmosphere: bool :param ocean: activates ocean files cmorization :type ocean: bool :param force_rebuild: if True, forces the creation of the CMOR files :type force_rebuild: bool :param startdates: list of startdates that will be used by the diagnostics :type startdates: list[str] :param members: lists of members that will be used by the diagnostics :type members: list[int] :return: """ # Check if cmorized and convert if not for startdate in startdates: for member in members: if force_rebuild or not os.path.exists(os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member))): Log.info('CMORizing startdate {0} member {1}', startdate, self.get_member_str(member)) if ocean: path_MMO = os.path.join(self.data_dir, self.expid, 'original_files', startdate, self.get_member_str(member), 'outputs', 'MMO*') for tarfile in glob.glob(path_MMO): self._unpack_tar(member, startdate, tarfile) if not atmosphere: continue grb_path = os.path.join(self.data_dir, self.expid, 'original_files', startdate, self.get_member_str(member), 'outputs', '*.grb') gribfiles = glob.glob(grb_path) if len(gribfiles) == 0: for tarfile in glob.glob(os.path.join(self.data_dir, self.expid, 'original_files', startdate, self.get_member_str(member), 'outputs', 'MMA*')): self._unpack_tar(member, startdate, tarfile) else: self._cmorize_grib(startdate, member, gribfiles) return for startdate in startdates: for member in members: member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member), 'outputs') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) filepaths = glob.glob(os.path.join(member_path, '*.gz')) if len(filepaths) == 0: continue self._unpack_cmorfiles(filepaths, member_path) def _cmorize_grib(self, startdate, member, gribfiles): gribfiles.sort() copied_gribfiles = list() for gribfile in gribfiles: shutil.copy(gribfile, os.path.join(self.scratch_dir, os.path.basename(gribfile))) copied_gribfiles.append(os.path.join(self.scratch_dir, os.path.basename(gribfile))) for gribfile in copied_gribfiles: cdo = Utils.cdo start = parse_date(gribfile[-10:-4]) month = '{0:02}'.format(start.month) times = cdo.showtimestamp(input=gribfile) times = times[0].split()[0:2] time_diff = datetime.strptime(times[1], '%Y-%m-%dT%H:%M:%S') - datetime.strptime(times[0], '%Y-%m-%dT%H:%M:%S') nfrp = (time_diff.seconds // 3600) param_6hr = (151, 167, 168, 164, 165, 166, 129,) param_day = (167, 165, 166, 151, 164, 168, 169, 177, 179, 228, 201, 202, 130) param_mon = (167, 201, 202, 165, 166, 151, 144, 228, 205, 182, 164, 146, 147, 176, 169, 177, 175, 212, 141, 180, 181, 179, 168, 243, 129, 130, 131, 132, 133) grid = os.path.basename(gribfile)[3:5] if os.path.exists('ICM{0}{1}+{2.year}{2.month:02}.grb'.format(grid, self.expid, add_months(start, -1, 'standard'))): fd = open('rules_files', 'w') fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(start)) fd.close() # get first timestep for each month from previous file (if possible) if os.path.exists('ICM'): os.remove('ICM') Utils.execute_shell_command('grib_filter -o ICM rules_files ' 'ICM{0}{1}+{2.year}{2.month:02}.grb ' '{3}'.format(grid, self.expid, add_months(start, -1, 'standard'), gribfile)) os.remove('rules_files') else: shutil.copy(gribfile, 'ICM') # remap on regular Gauss grid if grid == 'SH': Utils.cdo.splitparam(input='-sp2gpl ICM', output=gribfile + '_') else: Utils.cdo.splitparam(input='ICM', output=gribfile + '_', options='-R') # total precipitation (remove negative values) Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add ' '{0}_{{142,143}}.128.grb'.format(gribfile), output='{0}_228.128.grb'.format(gribfile)) os.remove('ICM') cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00') # daily variables for param in param_day: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param in (169, 177, 179): # radiation new_units = "W m-2" cdo_operator = "-divc,{0} -daymean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month) elif param == 228: # precipitation new_units = "kg m-2 -s" cdo_operator = "-mulc,1000 -divc,{0} -daymean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month) elif param == 201: # maximum cdo_operator = "-daymax -selmon,{1} -shifttime,-{0}hours".format(nfrp, month) elif param == 202: # minmimum cdo_operator = "-daymin -selmon,{1} -shifttime,-{0}hours".format(nfrp, month) elif param == 130: # 850 hPa cdo_operator = "-daymean -sellevel,85000 -selmon,{0}".format(month) else: # default, plain daily mean cdo_operator = "-daymean -selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_day.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) if new_units: handler = Utils.openCdf('{0}_{1}_day.nc'.format(gribfile, param)) for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break handler.close() # concat all vars in one file for day Utils.nco.ncks(input='{0}_{1}_day.nc'.format(gribfile, param), output='{0}_day.nc'.format(gribfile), options='-A') os.remove('{0}_{1}_day.nc'.format(gribfile, param)) # monthly variables for param in param_mon: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param in (146, 147, 176, 169, 177, 175, 179, 212): # radiation/heat new_units = "W m-2" cdo_operator = "-divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month) elif param in (180, 181): # momentum flux new_units = "N m-2" cdo_operator = "-divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month) elif param in (144, 228, 205, 182): # precipitation/evaporation/runoff new_units = "kg m-2 s-1" cdo_operator = "-mulc,1000 -divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, nfrp, month) elif param == 201: # mean daily maximum cdo_operator = "-monmean -daymax -selmon,{1} " \ "-shifttime,-{0}hours".format(nfrp, month) elif param == 202: # mean daily minmimum cdo_operator = "-monmean -daymin -selmon,{1} " \ "-shifttime,-{0}hours".format(nfrp, month) elif param in (130, 131, 132, 133): # upper-air cdo_operator = "-monmean -sellevel,5000,20000,50000,85000 " \ "-selmon,{0}".format(month) elif param == 129: # upper-air geopotential new_units = "m" cdo_operator = "-divc,9.81 -timmean -sellevel,5000,20000,50000,85000 " \ "-selmon,{0}".format(month) else: # default, plain monthly mean cdo_operator = "-monmean -selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_mon.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) handler = Utils.openCdf('{0}_{1}_mon.nc'.format(gribfile, param)) if new_units: for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break var_name = None for key in handler.variables.keys(): if key + '_2' in handler.variables and key not in handler.dimensions: var_name = key handler.close() if var_name is not None: Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param), output='{0}_{1}_mon.nc'.format(gribfile, param), options='-O -v {0}'.format(var_name)) # concat all vars in one file for mon Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param), output='{0}_mon.nc'.format(gribfile), options='-A') os.remove('{0}_{1}_mon.nc'.format(gribfile, param)) # 6-hourly variables for param in param_6hr: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param == 129: # geopotential new_units = "m" cdo_operator = "-divc,9.81 -sellevel,50000 -selmon,{0}".format(month) else: # default, plain monthly mean cdo_operator = "-selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_6hr.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) if new_units: handler = Utils.openCdf('{0}_{1}_6hr.nc'.format(gribfile, param)) for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break handler.close() # concat all vars in one file for 6hr Utils.nco.ncks(input='{0}_{1}_6hr.nc'.format(gribfile, param), output='{0}_6hr.nc'.format(gribfile), options='-A') os.remove('{0}_{1}_6hr.nc'.format(gribfile, param)) for splited_file in glob.glob('{0}_???.128.grb'.format(gribfile)): os.remove(splited_file) chunk_start = parse_date(startdate) while os.path.exists(os.path.join(self.scratch_dir, 'ICMGG{0}+{1}.grb'.format(self.expid, date2str(chunk_start)[:-2]))): chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') chunk_files_gg_mon = list() chunk_files_gg_day = list() chunk_files_gg_6h = list() chunk_files_sh_mon = list() chunk_files_sh_day = list() chunk_files_sh_6h = list() for month in range(0, self.chunk_size): chunk_file = 'ICMGG{0}+{1}.grb'.format(self.expid, date2str(add_months(chunk_start, month, 'standard'))[:-2]) os.remove(chunk_file) os.remove('ICMSH' + chunk_file[5:]) chunk_files_gg_mon.append(chunk_file + '_mon.nc') chunk_files_gg_day.append(chunk_file + '_day.nc') chunk_files_gg_6h.append(chunk_file + '_6hr.nc') chunk_files_sh_mon.append('ICMSH' + chunk_file[5:] + '_mon.nc') chunk_files_sh_day.append('ICMSH' + chunk_file[5:] + '_day.nc') chunk_files_sh_6h.append('ICMSH' + chunk_file[5:] + '_6hr.nc') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_mon, 'SH', '1m') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_day, 'SH', '1d') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_6h, 'SH', '6hr') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_mon, 'GG', '1m') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_day, 'GG', '1d') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_6h, 'GG', '6hr') chunk_start = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, chunk_files, grid, frequency): merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid) for x in range(0, len(chunk_files)): chunk_files[x] = os.path.join(self.scratch_dir, chunk_files[x]) Utils.cdo.mergetime(input=' '.join(chunk_files), output=merged_file, options='-O') for filepath in chunk_files: os.remove(filepath) self._cmorize_nc_file(merged_file, member, startdate) def _unpack_cmorfiles(self, filepaths, member_path): threads = list() numthreads = Utils.available_cpu_count() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment_name != self.model: bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.model), '_{0}_{1}_'.format(self.model, self.experiment_name)) good = good.replace('/{0}/{0}'.format(self.model), '/{0}/{1}'.format(self.model, self.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate), '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) def _unpack_tar(self, member, startdate, tarfile): Log.info('Unpacking {0}', tarfile) scratch_dir = os.path.join(self.scratch_dir, 'CMOR') if os.path.exists(scratch_dir): shutil.rmtree(scratch_dir) os.makedirs(scratch_dir) self._untar((tarfile,), scratch_dir) self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz'))) if os.path.basename(tarfile).startswith('MMA'): temp = TempFile.get() for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')): Utils.cdo.sp2gpl(options='-O', input=filename, output=temp) shutil.move(temp, filename) sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')) Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc')) gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc')) Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc')) for filename in sh_files + gg_files: os.remove(filename) Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'), output=os.path.join(scratch_dir, 'gg.nc'), options='-A') os.remove(os.path.join(scratch_dir, 'sh.nc')) tar_startdate = tarfile[0:-4].split('_')[5].split('-') new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate) shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name)) for filename in glob.glob(os.path.join(scratch_dir, '*.nc')): self._cmorize_nc_file(filename, member, startdate) def _cmorize_nc_file(self, filename, member, startdate): Log.info('Processing file {0}', filename) file_parts = os.path.basename(filename).split('_') frequency = file_parts[1][1].lower() variables = dict() variables['time_counter'] = 'time' variables['time_counter_bnds'] = 'time_bnds' # variables['time_counter_bounds'] = 'time_bnds' variables['tbnds'] = 'bnds' # variables['axis_nbounds'] = 'bnds' variables['nav_lat'] = 'lat' variables['nav_lon'] = 'lon' variables['x'] = 'i' variables['y'] = 'j' Utils.rename_variables(filename, variables, False, True) handler = Utils.openCdf(filename) self._add_common_attributes(frequency, handler, member, startdate) self._update_time_variables(handler, startdate) handler.sync() temp = TempFile.get() Log.info('Splitting file {0}', filename) for variable in handler.variables.keys(): if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi', 'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds', 'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds', 'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T', 'depth', 'depth_2', 'depth_3', 'depth_4', 'mlev', 'hyai', 'hybi', 'hyam', 'hybm'): continue self.extract_variable(file_parts, filename, frequency, handler, member, startdate, temp, variable) Log.result('File {0} cmorized!', filename) handler.close() os.remove(filename) def extract_variable(self, file_parts, filename, frequency, handler, member, startdate, temp, variable): var_cmor = Variable.get_variable(variable) if var_cmor is None: return Utils.nco.ncks(input=filename, output=temp, options='-v {0}'.format(variable)) if var_cmor.domain == 'ocean': Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev', 'depth': 'lev'}, False, True) elif var_cmor.domain in ('land', 'landIce'): Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth', 'depth_4': 'sdepth'}, False, True) elif var_cmor.domain == 'atmos': Utils.rename_variables(temp, {'depth': 'plev'}, False, True) handler_cmor = Utils.openCdf(temp) Utils.copy_variable(handler, handler_cmor, 'lon', False) Utils.copy_variable(handler, handler_cmor, 'lat', False) if 'time' in handler_cmor.dimensions.keys(): Utils.copy_variable(handler, handler_cmor, 'leadtime', False) handler_cmor.modeling_realm = var_cmor.domain handler_cmor.table_id = 'SPECS_' + self.domain_abbreviation(var_cmor.domain, frequency) var_handler = handler_cmor.variables[variable] var_handler.short_name = var_cmor.short_name var_handler.standard_name = var_cmor.standard_name var_handler.long_name = var_cmor.long_name handler_cmor.close() if frequency == 'd': frequency = 'day' elif frequency == 'm': frequency = 'mon' elif frequency == 'h': frequency = '6hr' else: raise Exception('Frequency {0} not supported'.format(frequency)) if var_cmor.basin is None: region = None else: region = var_cmor.basin.fullname self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member, frequency=frequency, rename_var=variable, date_str='{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]), region=region) @staticmethod def _update_time_variables(handler, startdate): time_var = handler.variables['time'] times = netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) if type(times[0]) is not datetime: for x in range(0, times.shape[0]): # noinspection PyProtectedMember times[x] = times[x]._to_real_datetime() time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard') if 'axis_nbounds' in handler.dimensions: handler.renameDimension('axis_nbounds', 'bnds') if 'time_counter_bounds' in handler.variables: handler.renameVariable('time_counter_bounds', 'time_bnds') handler.sync() if 'time_bnds' in handler.variables: time_bounds_var = handler.variables['time_bnds'] time_var.bounds = "time_bnds" time_bounds = netCDF4.num2date(time_bounds_var[:], time_var.units, time_var.calendar) if type(time_bounds[0, 0]) is not datetime: for x in range(0, time_bounds.shape[0]): for y in range(0, time_bounds.shape[1]): # noinspection PyProtectedMember time_bounds[x, y] = time_bounds[x, y]._to_real_datetime() time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard') time_var.units = 'days since 1850-01-01' time_var.time_origin = "1850-01-01" time_var.calendar = 'standard' time_var.long_name = "Verification time of the forecast" time_var.standard_name = "time" time_var.axis = "T" if 'leadtime' in handler.variables: var = handler.variables['leadtime'] else: var = handler.createVariable('leadtime', float, 'time') var.units = "days" var.long_name = "Time elapsed since the start of the forecast" var.standard_name = "forecast_period" leadtime = (netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) - parse_date(startdate)) for lt in range(0, leadtime.shape[0]): var[lt] = leadtime[lt].days def _add_common_attributes(self, frequency, handler, member, startdate): handler.associated_experiment = self.associated_experiment handler.batch = '{0}{1}'.format(self.institution, datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')) handler.contact = 'Pierre-Antoine Bretonnière, pierre-antoine.bretonniere@bsc.es , ' \ 'Javier Vegas-Regidor, javier.vegas@bsc.es ' handler.conventions = 'CF-1.6' handler.creation_date = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') handler.experiment_id = self.experiment_name handler.forecast_reference_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') if frequency == 'd': handler.frequency = 'daily' elif frequency == 'm': handler.frequency = 'monthly' handler.institute_id = self.institution handler.institution = self.institution handler.initialization_method = self.initialization_method handler.initialization_description = self.initialization_description handler.physics_version = self.physics_version handler.physics_description = self.physics_description handler.model_id = self.model handler.associated_model = self.associated_model handler.project_id = 'SPECS' handler.realization = member + 1 handler.source = self.source handler.startdate = 'S{0}'.format(startdate) handler.tracking_id = str(uuid.uuid1()) handler.title = "{0} model output prepared for SPECS {1}".format(self.model, self.experiment_name) @staticmethod def _unzip(files): for filepath in files: Log.debug('Unzipping {0}', filepath) Utils.execute_shell_command('gunzip {0}'.format(filepath)) @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) # os.remove(filepath) def get_files(self, startdate, member, chunk, domain, variables, grid=None): """ Returns a list of filenames for different variables :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param chunk: chunk to retrieve :type chunk: int :param domain: variable's CMOR domain :type domain:str :param variables: variables list :type variables: list[str], tuple[str] :param grid: specifies if the variable must be in a interpolated grid :type grid: str :return: """ file_names = list() domain_abreviattion = self.domain_abbreviation(domain, self.frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') var_file = list() for var in variables: if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) var_file.append(os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month)))) file_names.append(var_file) return file_names def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ if not frequency: frequency = self.frequency domain_abbreviation = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abbreviation, frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month))) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if rename_var: Utils.rename_variable(filetosend, rename_var, var) if not frequency: frequency = self.frequency domain_abreviattion = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year is not None: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str is not None: time_bound = date_str else: raise ValueError('Chunk and year can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_' '{6}.nc'.format(var, domain_abreviattion, self.model, self.experiment_name, startdate, member_plus, time_bound)) if region: Utils.convert2netcdf4(filetosend) if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region') temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp]) shutil.move(temp, filetosend) Utils.move_file(filetosend, filepath) if frequency in ('d', 'daily', 'day'): freq_str = 'daily_mean' else: freq_str = 'monthly_mean' if domain in ['ocean', 'seaIce']: link_path = os.path.join(self.data_dir, self.expid, freq_str, '{0}_f6h'.format(var)) else: link_path = os.path.join(self.data_dir, self.expid, freq_str, '{0}_f{1}h'.format(var, self.nfrp)) if not os.path.exists(link_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(link_path) except Exception: pass link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) os.symlink(filepath, link_path) @staticmethod def domain_abbreviation(domain, frequency): """ Returns the table name for a domain-frequency pair :param domain: variable's domain :type domain: str :param frequency: variable's frequency :type frequency: str :return: variable's table name :rtype: str """ if frequency == 'mon': if domain == 'seaIce': domain_abreviattion = 'OImon' elif domain == 'landIce': domain_abreviattion = 'LImon' else: domain_abreviattion = domain[0].upper() + 'mon' elif frequency == '6hr': domain_abreviattion = '6hrPlev' else: domain_abreviattion = 'day' return domain_abreviattion def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and returns the path to the scratch's copy. :param year: year to retrieve :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :return: path to the copy created on the scratch folder :rtype: str """ chunk_files = list() for chunk in self.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def get_year_chunks(self, startdate, year): """ Get the list of chunks containing timesteps from the given year :param startdate: startdate to use :type startdate: str :param year: reference year :type year: int :return: list of chunks containing data from the given year :rtype: list[int] """ date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks + 1): chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_full_years(self, startdate): """ Returns the list of full years that are in the given startdate :param startdate: startdate to use :type startdate: str :return: list of full years :rtype: list[int] """ chunks_per_year = 12 / self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year): years.append(first_year) first_year += 1 return years def get_member_str(self, member): """ Returns the member name for a given member number. :param member: member's number :type member: int :return: member's name :rtype: str """ return 'fc{0}'.format(str(member).zfill(self.member_digits)) class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires cmor_table.csv to work. """ def __init__(self, line): self.short_name = line[1] self.standard_name = line[2] self.long_name = line[3] self.domain = line[4] self.basin = Basins.parse(line[5]) @classmethod def get_variable(cls, original_name): try: return Variable._dict_variables[original_name.lower()] except AttributeError: Variable._dict_variables = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'variable': continue var = Variable(line) if not var.short_name: continue for old_name in line[0].split(':'): Variable._dict_variables[old_name] = var Variable._dict_variables[var.short_name] = var return Variable.get_variable(original_name) except KeyError: Log.error('Variable {0} is not defined'.format(original_name)) return None