# coding: utf-8 import csv import glob import shutil import threading import uuid import pygrib from cf_units import Unit from datetime import datetime import netCDF4 import numpy as np import os from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \ date2str from earthdiagnostics.constants import Basins from earthdiagnostics.utils import Utils, TempFile class DataManager(object): """ Class to manage the data repositories :param exp_manager: :type exp_manager: ExperimentManager :param institution: CMOR's institution name :type institution: str :param model: CMOR's model name :type model: str :param expid: experiment identifier :type expid: str :param datafolder: Folder containing the data :type datafolder: str :param frequency: default frequency :type frequency: str :param experiment_name: CMOR's experiment name :type experiment_name: str :param scratch_dir: path to scratch folder :type scratch_dir: str :param nfrp: sample frequency :type nfrp: int :param calendar: experiment's calendar :type calendar: str """ def __init__(self, exp_manager, institution, model, expid, datafolder, frequency, experiment_name, scratch_dir, nfrp, calendar='standard'): self.initialization_method = 1 self.initialization_description = 'to be filled' self.physics_version = 1 self.physics_description = 'to be filled' self.associated_model = 'to be filled' self.source = 'to be filled' self.associated_experiment = 'to be filled' self.institution = institution self.model = model self.expid = expid self.data_dir = datafolder self.frequency = frequency self.experiment_name = experiment_name self.add_startdate = True self.add_name = True self.calendar = calendar self.scratch_dir = scratch_dir self.nfrp = nfrp self.exp_manager = exp_manager Variable.load_variables() UnitConversion.load_conversions() # noinspection PyPep8Naming def prepare_CMOR_files(self, force_rebuild, ocean, atmosphere): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :param atmosphere: activates atmosphere files cmorization :type atmosphere: bool :param ocean: activates ocean files cmorization :type ocean: bool :param force_rebuild: if True, forces the creation of the CMOR files :type force_rebuild: bool :return: """ # Check if cmorized and convert if not created = False errors = list() for startdate, member in self.exp_manager.get_member_list(): member_str = self.exp_manager.get_member_str(member) if force_rebuild or not self._is_cmorized(startdate, member): created = True Log.info('CMORizing startdate {0} member {1}', startdate, member_str) if ocean: errors += self._unpack_ocean_files('MMO', startdate, member) errors += self._unpack_ocean_files('diags', startdate, member) if not atmosphere: continue grb_path = os.path.join(self.data_dir, self.expid, 'original_files', startdate, member_str, 'outputs', '*.grb') gribfiles = glob.glob(grb_path) if len(gribfiles) == 0: tar_files = glob.glob( os.path.join(self.data_dir, self.expid, 'original_files', startdate, member_str, 'outputs', 'MMA*')) tar_files.sort() count = 1 for tarfile in tar_files: Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files))) errors += self._unpack_tar(tarfile, startdate, member) Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files))) count += 1 else: self._cmorize_grib(startdate, member, gribfiles) Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str) if created: for error in errors: Log.error('File {0} could not be unzipped.', error) return for startdate, member in self.exp_manager.get_member_list(): member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) filepaths = glob.glob(os.path.join(member_path, '*.gz')) if len(filepaths) == 0: continue self._unpack_cmorfiles(filepaths, member_path) def _unpack_ocean_files(self, prefix, startdate, member): tar_folder = os.path.join(self.data_dir, self.expid, 'original_files', startdate, self.exp_manager.get_member_str(member), 'outputs', '{0}*'.format(prefix)) tar_files = glob.glob(tar_folder) tar_files.sort() errors = list() count = 1 for tarfile in tar_files: Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files))) errors += self._unpack_tar(tarfile, startdate, member) Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files))) count += 1 return errors def _cmorize_grib(self, startdate, member, gribfiles): gribfiles.sort() copied_gribfiles = list() for gribfile in gribfiles: shutil.copy(gribfile, os.path.join(self.scratch_dir, os.path.basename(gribfile))) copied_gribfiles.append(os.path.join(self.scratch_dir, os.path.basename(gribfile))) count = 1 for gribfile in copied_gribfiles: Log.info('Unpacking atmospheric grib file {0}/{1}'.format(count, len(copied_gribfiles))) start = parse_date(gribfile[-10:-4]) month = '{0:02}'.format(start.month) grib_handler = pygrib.open(gribfile) mes1 = grib_handler.message(1) mes2 = grib_handler.readline() while mes2['name'] != mes1['name']: mes2 = grib_handler.readline() nfrp = mes2.analDate - mes1.analDate nfrp = int(nfrp.total_seconds() / 3600) grib_handler.close() grid = os.path.basename(gribfile)[3:5] if os.path.exists('ICM{0}{1}+{2.year}{2.month:02}.grb'.format(grid, self.expid, add_months(start, -1, 'standard'))): fd = open('rules_files', 'w') fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(start)) fd.close() # get first timestep for each month from previous file (if possible) if os.path.exists('ICM'): os.remove('ICM') Utils.execute_shell_command('grib_filter -o ICM rules_files ' 'ICM{0}{1}+{2.year}{2.month:02}.grb ' '{3}'.format(grid, self.expid, add_months(start, -1, 'standard'), gribfile)) os.remove('rules_files') else: shutil.copy(gribfile, 'ICM') # remap on regular Gauss grid if grid == 'SH': Utils.cdo.splitparam(input='-sp2gpl ICM', output=gribfile + '_') else: Utils.cdo.splitparam(input='ICM', output=gribfile + '_', options='-R') # total precipitation (remove negative values) Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add ' '{0}_{{142,143}}.128.grb'.format(gribfile), output='{0}_228.128.grb'.format(gribfile)) os.remove('ICM') cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00') self._ungrib_6_hourly_vars(cdo_reftime, gribfile, month) self._ungrib_daily_vars(cdo_reftime, gribfile, month, nfrp) self._ungrib_monthly_files(cdo_reftime, gribfile, month, nfrp) for splited_file in glob.glob('{0}_???.128.grb'.format(gribfile)): os.remove(splited_file) Log.result('Atmospheric grib file {0}/{1} finished'.format(count, len(copied_gribfiles))) count += 1 chunk_start = parse_date(startdate) while os.path.exists(os.path.join(self.scratch_dir, 'ICMGG{0}+{1}.grb'.format(self.expid, date2str(chunk_start)[:-2]))): chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') chunk_files_gg_mon = list() chunk_files_gg_day = list() chunk_files_gg_6h = list() chunk_files_sh_mon = list() chunk_files_sh_day = list() chunk_files_sh_6h = list() for month in range(0, self.exp_manager.chunk_size): chunk_file = 'ICMGG{0}+{1}.grb'.format(self.expid, date2str(add_months(chunk_start, month, 'standard'))[:-2]) os.remove(chunk_file) os.remove('ICMSH' + chunk_file[5:]) chunk_files_gg_mon.append(chunk_file + '_mon.nc') chunk_files_gg_day.append(chunk_file + '_day.nc') chunk_files_gg_6h.append(chunk_file + '_6hr.nc') chunk_files_sh_mon.append('ICMSH' + chunk_file[5:] + '_mon.nc') chunk_files_sh_day.append('ICMSH' + chunk_file[5:] + '_day.nc') chunk_files_sh_6h.append('ICMSH' + chunk_file[5:] + '_6hr.nc') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_mon, 'SH', '1m') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_day, 'SH', '1d') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_6h, 'SH', '6hr') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_mon, 'GG', '1m') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_day, 'GG', '1d') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_6h, 'GG', '6hr') chunk_start = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard') @staticmethod def _ungrib_6_hourly_vars(cdo_reftime, gribfile, month): Log.info('Preparing 6-hourly variables') var_codes = (129, 130, 131, 132, 151, 167, 168, 164, 165, 166, 129) for param in var_codes: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param == 129: # geopotential new_units = "m" cdo_operator = "-divc,9.81 -sellevel,{1} -selmon,{0}".format(month, ','.join(map(str, range(30000, 90000, 5000)))) elif param == 129: # upper air temperature new_units = "m" cdo_operator = "-sellevel,{1} -selmon,{0}".format(month, ','.join(map(str, range(30000, 50000, 5000)))) elif param in (131, 132): # upper air wind new_units = "m" cdo_operator = "-sellevel,{1} -selmon,{0}".format(month, ','.join(map(str, (50000, 70000, 85000)))) else: # default, plain monthly mean cdo_operator = "-selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_6hr.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) h_var_file = '{0}_{1}_6hr.nc'.format(gribfile, param) if new_units: handler = Utils.openCdf(h_var_file) for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break handler.close() # concat all vars in one file for 6hr h_file = '{0}_6hr.nc'.format(gribfile) Utils.concat_variables(h_var_file, h_file, True) @staticmethod def _ungrib_daily_vars(cdo_reftime, gribfile, month, nfrp): Log.info('Preparing daily variables') var_codes = (167, 165, 166, 151, 164, 168, 169, 177, 179, 228, 201, 202, 130) for param in var_codes: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param in (169, 177, 179): # radiation new_units = "W m-2" cdo_operator = "-divc,{0} -daymean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, int(nfrp), month) elif param == 228: # precipitation new_units = "kg m-2 -s" cdo_operator = "-mulc,1000 -divc,{0} -daymean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, int(nfrp), month) elif param == 201: # maximum cdo_operator = "-daymax -selmon,{1} -shifttime,-{0}hours".format(int(nfrp), month) elif param == 202: # minmimum cdo_operator = "-daymin -selmon,{1} -shifttime,-{0}hours".format(int(nfrp), month) elif param == 130: # 850 hPa cdo_operator = "-daymean -sellevel,85000 -selmon,{0}".format(month) else: # default, plain daily mean cdo_operator = "-daymean -selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_day.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) daily_variable_file = '{0}_{1}_day.nc'.format(gribfile, param) if new_units: handler = Utils.openCdf(daily_variable_file) for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break handler.close() # concat all vars in one file for day daily_file = '{0}_day.nc'.format(gribfile) Utils.concat_variables(daily_variable_file, daily_file, True) @staticmethod def _ungrib_monthly_files(cdo_reftime, gribfile, month, nfrp): Log.info('Preparing monthly variables') var_codes = (167, 201, 202, 165, 166, 151, 144, 228, 205, 182, 164, 146, 147, 176, 169, 177, 175, 212, 141, 180, 181, 179, 168, 243, 129, 130, 131, 132, 133) for param in var_codes: if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)): continue new_units = None if param in (146, 147, 176, 169, 177, 175, 179, 212): # radiation/heat new_units = "W m-2" cdo_operator = "-divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, int(nfrp), month) elif param in (180, 181): # momentum flux new_units = "N m-2" cdo_operator = "-divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, int(nfrp), month) elif param in (144, 228, 205, 182): # precipitation/evaporation/runoff new_units = "kg m-2 s-1" cdo_operator = "-mulc,1000 -divc,{0} -monmean -selmon,{2} " \ "-shifttime,-{1}hours".format(nfrp * 3600, int(nfrp), month) elif param == 201: # mean daily maximum cdo_operator = "-monmean -daymax -selmon,{1} " \ "-shifttime,-{0}hours".format(nfrp, month) elif param == 202: # mean daily minmimum cdo_operator = "-monmean -daymin -selmon,{1} " \ "-shifttime,-{0}hours".format(nfrp, month) elif param in (130, 131, 132, 133): # upper-air cdo_operator = "-monmean -sellevel,5000,20000,50000,85000 " \ "-selmon,{0}".format(month) elif param == 129: # upper-air geopotential new_units = "m" cdo_operator = "-divc,9.81 -timmean -sellevel,5000,20000,50000,85000 " \ "-selmon,{0}".format(month) else: # default, plain monthly mean cdo_operator = "-monmean -selmon,{0}".format(month) Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.grb ' '{2}_{3}_mon.nc'.format(cdo_reftime, cdo_operator, gribfile, param)) handler = Utils.openCdf('{0}_{1}_mon.nc'.format(gribfile, param)) if new_units: for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == param: var.units = new_units break var_name = None for key in handler.variables.keys(): if key + '_2' in handler.variables and key not in handler.dimensions: var_name = key handler.close() if var_name is not None: Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param), output='{0}_{1}_mon.nc'.format(gribfile, param), options='-O -v {0}'.format(var_name)) # concat all vars in one file for mon Utils.concat_variables('{0}_{1}_mon.nc'.format(gribfile, param), '{0}_mon.nc'.format(gribfile), True) def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, chunk_files, grid, frequency): merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid) for x in range(0, len(chunk_files)): chunk_files[x] = os.path.join(self.scratch_dir, chunk_files[x]) if len(chunk_files) > 1: Log.info('Merging...') Utils.cdo.mergetime(input=' '.join(chunk_files), output=merged_file, options='-O') for filepath in chunk_files: os.remove(filepath) else: shutil.move(chunk_files[0], merged_file) self._cmorize_nc_file(merged_file, member, startdate) def _unpack_cmorfiles(self, filepaths, member_path): threads = list() numthreads = Utils.available_cpu_count() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')).sort() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment_name != self.model: bad_path = os.path.join(member_path, self.institution, self.model, self.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.model), '_{0}_{1}_'.format(self.model, self.experiment_name)) good = good.replace('/{0}/{0}'.format(self.model), '/{0}/{1}'.format(self.model, self.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, self.institution, self.model, self.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate), '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) def _unpack_tar(self, tarfile, startdate, member): Log.info('Unpacking {0}', tarfile) scratch_dir = os.path.join(self.scratch_dir, 'CMOR') if os.path.exists(scratch_dir): shutil.rmtree(scratch_dir) os.makedirs(scratch_dir) self._untar((tarfile,), scratch_dir) errors = self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz'))) if os.path.basename(tarfile).startswith('MMA'): temp = TempFile.get() for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')): Utils.cdo.sp2gpl(options='-O', input=filename, output=temp) shutil.move(temp, filename) sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')) Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc')) gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc')) Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc')) for filename in sh_files + gg_files: os.remove(filename) Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'), output=os.path.join(scratch_dir, 'gg.nc'), options='-A') os.remove(os.path.join(scratch_dir, 'sh.nc')) tar_startdate = tarfile[0:-4].split('_')[5].split('-') new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate) shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name)) for filename in glob.glob(os.path.join(scratch_dir, '*.nc')): self._cmorize_nc_file(filename, member, startdate) return errors def _cmorize_nc_file(self, filename, member, startdate): Log.info('Processing file {0}', filename) temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filename, temp]) shutil.move(temp, filename) file_parts = os.path.basename(filename).split('_') if self.expid in [file_parts[1], file_parts[2]]: frequency = 'm' else: frequency = file_parts[1][1].lower() variables = dict() variables['time_counter'] = 'time' variables['time_counter_bnds'] = 'time_bnds' variables['tbnds'] = 'bnds' variables['nav_lat'] = 'lat' variables['nav_lon'] = 'lon' variables['x'] = 'i' variables['y'] = 'j' Utils.rename_variables(filename, variables, False, True) handler = Utils.openCdf(filename) self._add_common_attributes(frequency, handler, member, startdate) self._update_time_variables(handler, startdate) handler.sync() temp = TempFile.get() Log.info('Splitting file {0}', filename) for variable in handler.variables.keys(): if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi', 'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds', 'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds', 'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T', 'depth', 'depth_2', 'depth_3', 'depth_4', 'mlev', 'hyai', 'hybi', 'hyam', 'hybm'): continue self.extract_variable(filename, handler, frequency, member, startdate, temp, variable) Log.result('File {0} cmorized!', filename) handler.close() os.remove(filename) def extract_variable(self, file_path, handler, frequency, member, startdate, temp, variable): """ Extracts a variable from a file and creates the CMOR file :param file_path: path to the file :type file_path: str :param handler: netCDF4 handler for the file :type handler: netCDF$.Dataset :param frequency: variable's frequency :type frequency: str :param member: member :type member: int :param startdate: startdate :type startdate: str :param temp: temporal file to use :type temp: str :param variable: variable's name :type variable: str """ file_parts = os.path.basename(file_path).split('_') var_cmor = Variable.get_variable(variable) if var_cmor is None: return if frequency == 'd': frequency = 'day' elif frequency == 'm': frequency = 'mon' elif frequency == 'h': frequency = '6hr' else: raise Exception('Frequency {0} not supported'.format(frequency)) Utils.nco.ncks(input=file_path, output=temp, options='-v {0}'.format(variable)) if var_cmor.domain == 'ocean': Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev', 'depth': 'lev'}, False, True) elif var_cmor.domain in ('land', 'landIce'): Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth', 'depth_4': 'sdepth'}, False, True) elif var_cmor.domain == 'atmos': Utils.rename_variables(temp, {'depth': 'plev'}, False, True) handler_cmor = Utils.openCdf(temp) Utils.copy_variable(handler, handler_cmor, 'lon', False) Utils.copy_variable(handler, handler_cmor, 'lat', False) if 'time' in handler_cmor.dimensions.keys(): Utils.copy_variable(handler, handler_cmor, 'leadtime', False) handler_cmor.close() if var_cmor.basin is None: region = None else: region = var_cmor.basin.fullname if file_parts[0] == self.expid or file_parts[0].startswith('ORCA') or file_parts[0] in ('MMA', 'MMO'): # Model output date_str = '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]) elif file_parts[1] == self.expid: # Files generated by the old version of the diagnostics date_str = '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6]) else: Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern', var_cmor.short_name) return self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member, frequency=frequency, rename_var=variable, date_str=date_str, region=region) @staticmethod def _update_time_variables(handler, startdate): time_var = handler.variables['time'] times = netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) if type(times[0]) is not datetime: for x in range(0, times.shape[0]): # noinspection PyProtectedMember times[x] = times[x]._to_real_datetime() time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard') if 'axis_nbounds' in handler.dimensions: handler.renameDimension('axis_nbounds', 'bnds') if 'time_counter_bounds' in handler.variables: handler.renameVariable('time_counter_bounds', 'time_bnds') handler.sync() if 'time_bnds' in handler.variables: time_bounds_var = handler.variables['time_bnds'] time_var.bounds = "time_bnds" time_bounds = netCDF4.num2date(time_bounds_var[:], time_var.units, time_var.calendar) if type(time_bounds[0, 0]) is not datetime: for x in range(0, time_bounds.shape[0]): for y in range(0, time_bounds.shape[1]): # noinspection PyProtectedMember time_bounds[x, y] = time_bounds[x, y]._to_real_datetime() time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard') time_var.units = 'days since 1850-01-01' time_var.time_origin = "1850-01-01" time_var.calendar = 'standard' time_var.long_name = "Verification time of the forecast" time_var.standard_name = "time" time_var.axis = "T" if 'leadtime' in handler.variables: var = handler.variables['leadtime'] else: var = handler.createVariable('leadtime', float, 'time') var.units = "days" var.long_name = "Time elapsed since the start of the forecast" var.standard_name = "forecast_period" leadtime = (netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) - parse_date(startdate)) for lt in range(0, leadtime.shape[0]): var[lt] = leadtime[lt].days def _add_common_attributes(self, frequency, handler, member, startdate): handler.associated_experiment = self.associated_experiment handler.batch = '{0}{1}'.format(self.institution, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')) handler.contact = 'Pierre-Antoine Bretonnière, pierre-antoine.bretonniere@bsc.es , ' \ 'Javier Vegas-Regidor, javier.vegas@bsc.es ' handler.Conventions = 'CF-1.6' handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)') handler.experiment_id = self.experiment_name handler.forecast_reference_time = parse_date(startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)') if frequency == 'd': handler.frequency = 'day' elif frequency == 'm': handler.frequency = 'mon' handler.institute_id = self.institution handler.institution = self.institution handler.initialization_method = self.initialization_method handler.initialization_description = self.initialization_description handler.physics_version = self.physics_version handler.physics_description = self.physics_description handler.model_id = self.model handler.associated_model = self.associated_model handler.project_id = 'SPECS' handler.realization = str(member + 1) handler.source = self.source handler.startdate = 'S{0}'.format(startdate) handler.tracking_id = str(uuid.uuid1()) handler.title = "{0} model output prepared for SPECS {1}".format(self.model, self.experiment_name) @staticmethod def _unzip(files): errors = list() for filepath in files: Log.debug('Unzipping {0}', filepath) try: Utils.execute_shell_command('gunzip {0}'.format(filepath)) except Exception as ex: Log.error('Can not unzip {0}: {1}', filepath, ex) errors.append(filepath) return errors @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ if not frequency: frequency = self.frequency domain_abbreviation = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain) chunk_start = chunk_start_date(start, chunk, self.exp_manager.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abbreviation, frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month))) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR folder :rtype: str """ return os.path.join(self.data_dir, self.expid, 'cmorfiles', self.institution, self.model, self.experiment_name, 'S' + startdate) def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ original_var = var cmor_var = Variable.get_variable(var) if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if rename_var: Utils.rename_variable(filetosend, rename_var, var) elif original_var != var: Utils.rename_variable(filetosend, original_var, var) if not frequency: frequency = self.frequency domain_abreviattion = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.exp_manager.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.exp_manager.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year is not None: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str is not None: time_bound = date_str else: raise ValueError('Chunk and year can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_' '{6}.nc'.format(var, domain_abreviattion, self.model, self.experiment_name, startdate, member_plus, time_bound)) if region: Utils.convert2netcdf4(filetosend) if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region') temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp]) shutil.move(temp, filetosend) if cmor_var: handler = Utils.openCdf(filetosend) var_handler = handler.variables[var] var_handler.standard_name = cmor_var.standard_name var_handler.long_name = cmor_var.long_name var_handler.short_name = cmor_var.short_name var_type = var_handler.dtype handler.modeling_realm = cmor_var.domain handler.table_id = 'Table {0} (December 2013)'.format(self.domain_abbreviation(cmor_var.domain, frequency)) if cmor_var.units: if 'units' in var_handler.ncattrs(): if cmor_var.units != var_handler.units: new_unit = Unit(cmor_var.units) old_unit = Unit(var_handler.units) var_handler[:] = new_unit.convert(var_handler[:], old_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = new_unit.convert(float(var_handler.valid_min), old_unit) if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = new_unit.convert(float(var_handler.valid_max), old_unit) var_handler.units = cmor_var.units handler.sync() if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if domain == 'ocean': handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' handler.close() if cmor_var.valid_min != '': valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min) else: valid_min = '' if cmor_var.valid_max != '': valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=filetosend, output=filetosend, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char, valid_min, valid_max)) variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(filetosend, variables, False, True) Utils.move_file(filetosend, filepath) self._create_link(domain, filepath, frequency, var) def _create_link(self, domain, filepath, frequency, var): if frequency in ('d', 'daily', 'day'): freq_str = 'daily_mean' else: freq_str = 'monthly_mean' if domain in ['ocean', 'seaIce']: variable_folder = '{0}_f6h'.format(var) else: variable_folder = '{0}_f{1}h'.format(var, self.nfrp) link_path = os.path.join(self.data_dir, self.expid, freq_str, domain.lower(), variable_folder) if not os.path.exists(link_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(link_path) except Exception: pass link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) os.symlink(filepath, link_path) @staticmethod def domain_abbreviation(domain, frequency): """ Returns the table name for a domain-frequency pair :param domain: variable's domain :type domain: str :param frequency: variable's frequency :type frequency: str :return: variable's table name :rtype: str """ if frequency == 'mon': if domain == 'seaIce': domain_abreviattion = 'OImon' elif domain == 'landIce': domain_abreviattion = 'LImon' else: domain_abreviattion = domain[0].upper() + 'mon' elif frequency == '6hr': domain_abreviattion = '6hrPlev' else: domain_abreviattion = 'day' return domain_abreviattion def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and returns the path to the scratch's copy. :param year: year to retrieve :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :return: path to the copy created on the scratch folder :rtype: str """ chunk_files = list() for chunk in self.exp_manager.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def _is_cmorized(self, startdate, member): if not os.path.exists(self.get_startdate_path(startdate)): return False startdate_path = self.get_startdate_path(startdate) for freq in os.listdir(startdate_path): freq_path = os.path.join(startdate_path, freq) for domain in os.listdir(freq_path): domain_path = os.path.join(freq_path, domain) for var in os.listdir(domain_path): member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member+1)) if os.path.exists(member_path): return True return False class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires cmor_table.csv to work. """ _dict_variables = None def __init__(self, line): self.short_name = line[1].strip() self.standard_name = line[2].strip() self.long_name = line[3].strip() self.domain = line[4].strip() self.basin = Basins.parse(line[5]) self.units = line[6].strip() self.valid_min = line[7].strip() self.valid_max = line[8].strip() @classmethod def get_variable(cls, original_name): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :return: CMOR variable :rtype: Variable """ try: return cls._dict_variables[original_name.lower()] except KeyError: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None @classmethod def load_variables(cls): """ Loads the cmor_table.csv and creates the variables dictionary """ Variable._dict_variables = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'variable': continue var = Variable(line) if not var.short_name: continue for old_name in line[0].split(':'): Variable._dict_variables[old_name] = var Variable._dict_variables[var.short_name] = var class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return 1/conversion.factor, -conversion.offset else: return None, None