# coding: utf-8 import csv import glob import re import shutil import threading import uuid import pygrib from cfunits import Units from datetime import datetime import netCDF4 import numpy as np import os from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \ date2str from earthdiagnostics.constants import Basins from earthdiagnostics.utils import Utils, TempFile class DataManager(object): """ Class to manage the data repositories :param config: :type config: Config """ def __init__(self, config): self.config = config self.experiment = config.experiment self._checked_vars = list() Variable.load_variables() UnitConversion.load_conversions() # noinspection PyPep8Naming def prepare_CMOR_files(self): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :return: """ # Check if cmorized and convert if not created = False errors = list() for startdate, member in self.experiment.get_member_list(): member_str = self.experiment.get_member_str(member) if self.config.cmor.force or not self._is_cmorized(startdate, member): created = True Log.info('CMORizing startdate {0} member {1}', startdate, member_str) if self.config.cmor.ocean: errors += self._unpack_ocean_files('MMO', startdate, member) errors += self._unpack_ocean_files('diags', startdate, member) if not self.config.cmor.atmosphere: continue grb_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, member_str, 'outputs', '*.grb') gribfiles = glob.glob(grb_path) if len(gribfiles) == 0: tar_files = glob.glob( os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, member_str, 'outputs', 'MMA*')) tar_files.sort() count = 1 for tarfile in tar_files: Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files))) errors += self._unpack_tar(tarfile, startdate, member) Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files))) count += 1 else: self._cmorize_grib(startdate, member) Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str) if created: for error in errors: Log.error('File {0} could not be unzipped.', error) return for startdate, member in self.experiment.get_member_list(): member_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) filepaths = glob.glob(os.path.join(member_path, '*.gz')) if len(filepaths) == 0: continue self._unpack_cmorfiles(filepaths, member_path) def _unpack_ocean_files(self, prefix, startdate, member): tar_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, self.experiment.get_member_str(member), 'outputs', '{0}*'.format(prefix)) tar_files = glob.glob(tar_folder) tar_files.sort() errors = list() count = 1 for tarfile in tar_files: Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files))) errors += self._unpack_tar(tarfile, startdate, member) Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files))) count += 1 return errors def _get_grib_filename(self, grid, month): return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2]) def _cmorize_grib(self, startdate, member): count = 1 atmos_timestep = None chunk_start = parse_date(startdate) member_str = self.experiment.get_member_str(member) data_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, member_str, 'outputs') while os.path.exists(os.path.join(data_folder, self._get_grib_filename('GG', chunk_start))) or \ os.path.exists(os.path.join(data_folder, self._get_grib_filename('SH', chunk_start))): chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end)) for grid in ('SH', 'GG'): Log.info('Processing {0} variables', grid) if not os.path.exists(os.path.join(data_folder, self._get_grib_filename(grid, chunk_start))): continue for month in range(0, self.experiment.chunk_size): current_month = add_months(chunk_start, month, 'standard') original_gribfile = os.path.join(data_folder, self._get_grib_filename(grid, current_month)) Log.info('Processing month {1}', grid, date2str(current_month)) gribfile = os.path.join(self.config.scratch_dir, os.path.basename(original_gribfile)) if not os.path.isfile(gribfile): Log.info('Copying file...', grid, date2str(current_month)) shutil.copy(original_gribfile, gribfile) if atmos_timestep is None: atmos_timestep = self._get_atmos_timestep(gribfile) prev_gribfile = os.path.join(self.config.scratch_dir, self._get_grib_filename(grid, add_months(current_month, -1, 'standard'))) if os.path.exists(prev_gribfile): self._merge_grib_files(current_month, prev_gribfile, gribfile) full_file = 'ICM' else: full_file = gribfile Log.info('Unpacking... ') # remap on regular Gauss grid if grid == 'SH': Utils.cdo.splitparam(input='-sp2gpl {0}'.format(full_file), output=gribfile + '_', options='-f nc4') else: Utils.cdo.splitparam(input=full_file, output=gribfile + '_', options='-R -f nc4') # total precipitation (remove negative values) Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add ' '{0}_{{142,143}}.128.nc'.format(gribfile), output='{0}_228.128.nc'.format(gribfile)) Utils.remove_file('ICM') next_gribfile = os.path.join(data_folder, self._get_grib_filename(grid, add_months(current_month, 1, 'standard'))) if not os.path.exists(next_gribfile): os.remove(gribfile) cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00') self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '{0}hr'.format(atmos_timestep)) self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1d') self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1m') for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)): os.remove(splited_file) Log.result('Month {0}, {1} variables finished', date2str(current_month), grid) count += 1 self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1m') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1d') self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '{0}hr'.format(atmos_timestep)) chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') @staticmethod def _merge_grib_files(current_month, prev_gribfile, gribfile): Log.info('Merging data from different files...') fd = open('rules_files', 'w') fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month)) fd.close() # get first timestep for each month from previous file (if possible) if os.path.exists('ICM'): os.remove('ICM') Utils.execute_shell_command('grib_filter -o ICM rules_files ' '{0} {1}'.format(os.path.basename(prev_gribfile), os.path.basename(gribfile))) os.remove('rules_files') Utils.remove_file(prev_gribfile) def _get_atmos_timestep(self, gribfile): Log.info('Getting timestep...') grib_handler = pygrib.open(gribfile) mes1 = grib_handler.message(1) mes2 = grib_handler.readline() while mes2.analDate == mes1.analDate: mes2 = grib_handler.readline() atmos_timestep = mes2.analDate - mes1.analDate atmos_timestep = int(atmos_timestep.total_seconds() / 3600) self.atmos_timestep = atmos_timestep grib_handler.close() return atmos_timestep def _ungrib_vars(self, cdo_reftime, gribfile, month, frequency): Log.info('Preparing {0} variables'.format(frequency)) var_codes = self.config.cmor.get_variables(frequency) for var_code in var_codes: if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)): continue new_units = None cdo_operator = '-selmon,{0}'.format(month) if frequency in ('month', 'monthly', 'mon', '1m'): if var_code == 201: cdo_operator = "-monmean -daymax {0}".format(cdo_operator) elif var_code == 202: cdo_operator = "-monmean -daymax {0}".format(cdo_operator) else: cdo_operator = "-monmean {0} ".format(cdo_operator) elif frequency in ('day', 'daily', '1d'): if var_code == 201: cdo_operator = "-daymax {0} ".format(cdo_operator) elif var_code == 202: cdo_operator = "-daymin {0} ".format(cdo_operator) else: cdo_operator = "-daymean {0} ".format(cdo_operator) if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228): cdo_operator = '{0} -shifttime,-{1}hours'.format(cdo_operator, self.atmos_timestep) if var_code == 129: # geopotential new_units = "m" cdo_operator = "-divc,9.81 {0}".format(cdo_operator) elif var_code in (146, 147, 169, 175, 176, 177, 179, 212): # radiation new_units = "W m-2" cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator) elif var_code in (180, 181): # momentum flux new_units = "N m-2" cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator) elif var_code in (144, 182, 205, 228): # precipitation/evaporation/runoff new_units = "kg m-2 s-1" cdo_operator = "-mulc,1000 -divc,{0}".format(self.atmos_timestep * 3600) levels = self.config.cmor.get_levels(frequency, var_code) if levels: cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels) Utils.execute_shell_command('cdo -t ecmwf setreftime,{0} ' '{1} {2}_{3}.128.nc ' '{2}_{3}_{4}.nc'.format(cdo_reftime, cdo_operator, gribfile, var_code, frequency)) h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency) handler = Utils.openCdf(h_var_file) if new_units: for var in handler.variables.values(): if 'code' in var.ncattrs() and var.code == var_code: var.units = new_units break var_name = None for key in handler.variables.keys(): if key + '_2' in handler.variables and key not in handler.dimensions: var_name = key handler.close() if var_name is not None: Utils.nco.ncks(input='{0}_{1}_1m.nc'.format(gribfile, var_code), output='{0}_{1}_1m.nc'.format(gribfile, var_code), options='-O -v {0}'.format(var_name)) def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, grid, frequency): merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid) files = glob.glob(os.path.join(self.config.scratch_dir, '{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency))) for first_file in files: shutil.move(first_file, merged_file) current_month = add_months(chunk_start, 1, 'standard') while current_month < chunk_end: month_file = first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]), '+{0}.grb'.format(date2str(current_month)[:-2])) Utils.concat_variables(month_file, merged_file, True) current_month = add_months(current_month, 1, 'standard') self._cmorize_nc_file(merged_file, member, startdate) def _unpack_cmorfiles(self, filepaths, member_path): threads = list() numthreads = Utils.available_cpu_count() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')).sort() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(member_path, self.experiment.institute, self.experiment.model, self.experiment.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_'.format(self.experiment.model, self.experiment.experiment_name)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, self.experiment.institute, self.experiment.model, self.experiment.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.experiment.model, self.experiment.experiment_name, sdate), '_{0}_{1}_{2}_r'.format(self.experiment.model, self.experiment.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) def _unpack_tar(self, tarfile, startdate, member): Log.info('Unpacking {0}', tarfile) scratch_dir = os.path.join(self.config.scratch_dir, 'CMOR') if os.path.exists(scratch_dir): shutil.rmtree(scratch_dir) os.makedirs(scratch_dir) self._untar((tarfile,), scratch_dir) errors = self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz'))) if os.path.basename(tarfile).startswith('MMA'): temp = TempFile.get() for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')): Utils.cdo.sp2gpl(options='-O', input=filename, output=temp) shutil.move(temp, filename) sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')) Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc')) gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc')) Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc')) for filename in sh_files + gg_files: os.remove(filename) Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'), output=os.path.join(scratch_dir, 'gg.nc'), options='-A') os.remove(os.path.join(scratch_dir, 'sh.nc')) tar_startdate = tarfile[0:-4].split('_')[5].split('-') new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate) shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name)) for filename in glob.glob(os.path.join(scratch_dir, '*.nc')): self._cmorize_nc_file(filename, member, startdate) return errors def _cmorize_nc_file(self, filename, member, startdate): Log.info('Processing file {0}', filename) temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filename, temp]) shutil.move(temp, filename) file_parts = os.path.basename(filename).split('_') if self.experiment.expid in [file_parts[1], file_parts[2]]: frequency = 'm' else: frequency = file_parts[1][1].lower() variables = dict() variables['time_counter'] = 'time' variables['time_counter_bnds'] = 'time_bnds' variables['tbnds'] = 'bnds' variables['nav_lat'] = 'lat' variables['nav_lon'] = 'lon' variables['x'] = 'i' variables['y'] = 'j' Utils.rename_variables(filename, variables, False, True) handler = Utils.openCdf(filename) self._add_common_attributes(frequency, handler, member, startdate) self._update_time_variables(handler, startdate) handler.sync() temp = TempFile.get() Log.info('Splitting file {0}', filename) for variable in handler.variables.keys(): if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi', 'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds', 'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds', 'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T', 'depth', 'depth_2', 'depth_3', 'depth_4', 'mlev', 'hyai', 'hybi', 'hyam', 'hybm'): continue self.extract_variable(filename, handler, frequency, member, startdate, temp, variable) Log.result('File {0} cmorized!', filename) handler.close() os.remove(filename) def extract_variable(self, file_path, handler, frequency, member, startdate, temp, variable): """ Extracts a variable from a file and creates the CMOR file :param file_path: path to the file :type file_path: str :param handler: netCDF4 handler for the file :type handler: netCDF$.Dataset :param frequency: variable's frequency :type frequency: str :param member: member :type member: int :param startdate: startdate :type startdate: str :param temp: temporal file to use :type temp: str :param variable: variable's name :type variable: str """ file_parts = os.path.basename(file_path).split('_') var_cmor = Variable.get_variable(variable) if var_cmor is None: return if frequency == 'd': frequency = 'day' elif frequency == 'm': frequency = 'mon' elif frequency == 'h': frequency = '6hr' else: raise Exception('Frequency {0} not supported'.format(frequency)) Utils.nco.ncks(input=file_path, output=temp, options='-v {0}'.format(variable)) if var_cmor.domain == 'ocean': Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev', 'depth': 'lev'}, False, True) elif var_cmor.domain in ('land', 'landIce'): Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth', 'depth_4': 'sdepth'}, False, True) elif var_cmor.domain == 'atmos': Utils.rename_variables(temp, {'depth': 'plev'}, False, True) handler_cmor = Utils.openCdf(temp) Utils.copy_variable(handler, handler_cmor, 'lon', False) Utils.copy_variable(handler, handler_cmor, 'lat', False) if 'time' in handler_cmor.dimensions.keys(): Utils.copy_variable(handler, handler_cmor, 'leadtime', False) handler_cmor.close() if var_cmor.basin is None: region = None else: region = var_cmor.basin.fullname if file_parts[0] == self.experiment.expid or file_parts[0].startswith('ORCA') or \ file_parts[0] in ('MMA', 'MMO'): # Model output date_str = '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]) elif file_parts[1] == self.experiment.expid: # Files generated by the old version of the diagnostics date_str = '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6]) else: Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern', var_cmor.short_name) return self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member, frequency=frequency, rename_var=variable, date_str=date_str, region=region, move_older=True) @staticmethod def _update_time_variables(handler, startdate): time_var = handler.variables['time'] times = Utils.get_datetime_from_netcdf(handler) if type(times[0]) is not datetime: for x in range(0, times.shape[0]): # noinspection PyProtectedMember times[x] = times[x]._to_real_datetime() time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard') if 'axis_nbounds' in handler.dimensions: handler.renameDimension('axis_nbounds', 'bnds') if 'time_counter_bounds' in handler.variables: handler.renameVariable('time_counter_bounds', 'time_bnds') handler.sync() if 'time_bnds' in handler.variables: time_bounds_var = handler.variables['time_bnds'] time_var.bounds = "time_bnds" time_bounds = Utils.get_datetime_from_netcdf(handler, 'time_bnds') if type(time_bounds[0, 0]) is not datetime: for x in range(0, time_bounds.shape[0]): for y in range(0, time_bounds.shape[1]): # noinspection PyProtectedMember time_bounds[x, y] = time_bounds[x, y]._to_real_datetime() time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard') time_var.units = 'days since 1850-01-01' time_var.time_origin = "1850-01-01" time_var.calendar = 'standard' time_var.long_name = "Verification time of the forecast" time_var.standard_name = "time" time_var.axis = "T" if 'leadtime' in handler.variables: var = handler.variables['leadtime'] else: var = handler.createVariable('leadtime', float, 'time') var.units = "days" var.long_name = "Time elapsed since the start of the forecast" var.standard_name = "forecast_period" leadtime = (Utils.get_datetime_from_netcdf(handler) - parse_date(startdate)) for lt in range(0, leadtime.shape[0]): var[lt] = leadtime[lt].days def _add_common_attributes(self, frequency, handler, member, startdate): cmor = self.config.cmor experiment = self.config.experiment handler.associated_experiment = cmor.associated_experiment handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')) handler.contact = 'Pierre-Antoine Bretonnière, pierre-antoine.bretonniere@bsc.es , ' \ 'Javier Vegas-Regidor, javier.vegas@bsc.es ' handler.Conventions = 'CF-1.6' handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)') handler.experiment_id = experiment.experiment_name handler.forecast_reference_time = parse_date(startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)') if frequency == 'd': handler.frequency = 'day' elif frequency == 'm': handler.frequency = 'mon' handler.institute_id = experiment.institute handler.institution = experiment.institute handler.initialization_method = cmor.initialization_method handler.initialization_description = cmor.initialization_description handler.physics_version = cmor.physics_version handler.physics_description = cmor.physics_description handler.model_id = experiment.model handler.associated_model = cmor.associated_model handler.project_id = 'SPECS' handler.realization = str(member + 1) handler.source = cmor.source handler.startdate = 'S{0}'.format(startdate) handler.tracking_id = str(uuid.uuid1()) handler.title = "{0} model output prepared for SPECS {1}".format(experiment.model, experiment.experiment_name) @staticmethod def _unzip(files): errors = list() for filepath in files: Log.debug('Unzipping {0}', filepath) try: Utils.execute_shell_command('gunzip {0}'.format(filepath)) except Exception as ex: Log.error('Can not unzip {0}: {1}', filepath, ex) errors.append(filepath) return errors @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ if not frequency: frequency = self.config.frequency domain = DataManager.correct_domain(domain) domain_abbreviation = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain) chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abbreviation, frequency, self.experiment.model, self.experiment.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month))) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def get_startdate_path(self, startdate): """ Returns the path to the startdate's CMOR folder :param startdate: target startdate :type startdate: str :return: path to the startdate's CMOR folder :rtype: str """ return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate) def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_older=False): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param date_str: :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ original_var = var cmor_var = Variable.get_variable(var) if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if rename_var: Utils.rename_variable(filetosend, rename_var, var) elif original_var != var: Utils.rename_variable(filetosend, original_var, var) if not frequency: frequency = self.config.frequency domain = DataManager.correct_domain(domain) domain_abreviattion = self.domain_abbreviation(domain, frequency) start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year is not None: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) elif date_str is not None: time_bound = date_str else: raise ValueError('Chunk and year can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_' '{6}.nc'.format(var, domain_abreviattion, self.experiment.model, self.experiment.experiment_name, startdate, member_plus, time_bound)) if region: Utils.convert2netcdf4(filetosend) if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region') temp = TempFile.get() Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp]) shutil.move(temp, filetosend) if cmor_var: handler = Utils.openCdf(filetosend) var_handler = handler.variables[var] var_handler.standard_name = cmor_var.standard_name var_handler.long_name = cmor_var.long_name var_handler.short_name = cmor_var.short_name var_type = var_handler.dtype handler.modeling_realm = cmor_var.domain handler.table_id = 'Table {0} (December 2013)'.format(self.domain_abbreviation(cmor_var.domain, frequency)) if cmor_var.units: if 'units' in var_handler.ncattrs(): if var_handler.units == 'PSU': var_handler.units = 'psu' if var_handler.units == 'C' and cmor_var.units == 'K': var_handler.units = 'deg_C' if cmor_var.units != var_handler.units: try: new_unit = Units(cmor_var.units) old_unit = Units(var_handler.units) var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit, inplace=True) if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit, inplace=True) except ValueError: factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units, cmor_var.units) var_handler[:] = var_handler[:] * factor + offset if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = float(var_handler.valid_min) * factor + offset if 'valid_max' in var_handler.ncattrs(): var_handler.valid_max = float(var_handler.valid_max) * factor + offset var_handler.units = cmor_var.units handler.sync() if 'lev' in handler.variables: handler.variables['lev'].short_name = 'lev' if domain == 'ocean': handler.variables['lev'].standard_name = 'depth' if 'lon' in handler.variables: handler.variables['lon'].short_name = 'lon' handler.variables['lon'].standard_name = 'longitude' if 'lat' in handler.variables: handler.variables['lat'].short_name = 'lat' handler.variables['lat'].standard_name = 'latitude' handler.close() if cmor_var.valid_min != '': valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min) else: valid_min = '' if cmor_var.valid_max != '': valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max) else: valid_max = '' Utils.nco.ncatted(input=filetosend, output=filetosend, options='-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char, valid_min, valid_max)) variables = dict() variables['x'] = 'i' variables['y'] = 'j' variables['nav_lat_grid_V'] = 'lat' variables['nav_lon_grid_V'] = 'lon' variables['nav_lat_grid_U'] = 'lat' variables['nav_lon_grid_U'] = 'lon' variables['nav_lat_grid_T'] = 'lat' variables['nav_lon_grid_T'] = 'lon' Utils.rename_variables(filetosend, variables, False, True) Utils.move_file(filetosend, filepath) self._create_link(domain, filepath, frequency, var, grid, move_older) @staticmethod def correct_domain(domain): """ Corrects domain capitalization :param domain: domain name :type domain: str :return: domain with correct capitalization :rtype: str """ domain = domain.lower() if domain == 'seaice': return 'seaIce' elif domain == 'landice': return 'landIce' return domain def _create_link(self, domain, filepath, frequency, var, grid, move_older): if frequency in ('d', 'daily', 'day'): freq_str = 'daily_mean' else: freq_str = 'monthly_mean' if grid: var = '{0}-{1}'.format(var, grid) if domain in ['ocean', 'seaIce']: variable_folder = '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) else: variable_folder = '{0}_f{1}h'.format(var, self.atmos_timestep) link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) if not os.path.exists(link_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(link_path) except Exception: pass elif move_older: if link_path not in self._checked_vars: old_path = link_path + '_old' regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): if not os.path.exists(old_path): # This can be a race condition # noinspection PyBroadException try: os.makedirs(old_path) except Exception: pass Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) self._checked_vars.append(link_path) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) os.symlink(filepath, link_path) @staticmethod def domain_abbreviation(domain, frequency): """ Returns the table name for a domain-frequency pair :param domain: variable's domain :type domain: str :param frequency: variable's frequency :type frequency: str :return: variable's table name :rtype: str """ if frequency == 'mon': if domain == 'seaIce': domain_abreviattion = 'OImon' elif domain == 'landIce': domain_abreviattion = 'LImon' else: domain_abreviattion = domain[0].upper() + 'mon' elif frequency == '6hr': domain_abreviattion = '6hrPlev' else: domain_abreviattion = 'day' return domain_abreviattion def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and returns the path to the scratch's copy. :param year: year to retrieve :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :return: path to the copy created on the scratch folder :rtype: str """ chunk_files = list() for chunk in self.experiment.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def _is_cmorized(self, startdate, member): startdate_path = self.get_startdate_path(startdate) if not os.path.exists(startdate_path): return False for freq in os.listdir(startdate_path): freq_path = os.path.join(startdate_path, freq) for domain in os.listdir(freq_path): domain_path = os.path.join(freq_path, domain) for var in os.listdir(domain_path): member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1)) if os.path.exists(member_path): return True return False class Variable(object): """ Class to characterize a CMOR variable. It also contains the static method to make the match between thje original name and the standard name. Requires cmor_table.csv to work. """ _dict_variables = None def __init__(self, line): self.short_name = line[1].strip() self.standard_name = line[2].strip() self.long_name = line[3].strip() self.domain = line[4].strip() self.basin = Basins.parse(line[5]) self.units = line[6].strip() self.valid_min = line[7].strip() self.valid_max = line[8].strip() @classmethod def get_variable(cls, original_name): """ Returns the cmor variable instance given a variable name :param original_name: original variable's name :type original_name: str :return: CMOR variable :rtype: Variable """ try: return cls._dict_variables[original_name.lower()] except KeyError: Log.warning('Variable {0} is not defined in the CMOR table. Please add it'.format(original_name)) return None @classmethod def load_variables(cls): """ Loads the cmor_table.csv and creates the variables dictionary """ Variable._dict_variables = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'Variable': continue var = Variable(line) if not var.short_name: continue for old_name in line[0].split(':'): Variable._dict_variables[old_name] = var Variable._dict_variables[var.short_name] = var class UnitConversion(object): """ Class to manage unit conversions """ _dict_conversions = None @classmethod def load_conversions(cls): """ Load conversions from the configuration file """ cls._dict_conversions = dict() with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile: reader = csv.reader(csvfile, dialect='excel') for line in reader: if line[0] == 'original': continue cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3])) @classmethod def add_conversion(cls, conversion): """ Adds a conversion to the dictionary :param conversion: conversion to add :type conversion: UnitConversion """ cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion def __init__(self, source, destiny, factor, offset): self.source = source self.destiny = destiny self.factor = float(factor) self.offset = float(offset) @classmethod def get_conversion_factor_offset(cls, input_units, output_units): """ Gets the conversion factor and offset for two units . The conversion has to be done in the following way: converted = original * factor + offset :param input_units: original units :type input_units: str :param output_units: destiny units :type output_units: str :return: factor and offset :rtype: [float, float] """ units = input_units.split() if len(units) == 1: scale_unit = 1 unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_unit = pow(int(values[0]), int(values[1])) else: scale_unit = float(units[0]) unit = units[1] units = output_units.split() if len(units) == 1: scale_new_unit = 1 new_unit = units[0] else: if '^' in units[0]: values = units[0].split('^') scale_new_unit = pow(int(values[0]), int(values[1])) else: scale_new_unit = float(units[0]) new_unit = units[1] factor, offset = UnitConversion._get_factor(new_unit, unit) if factor is None: return None, None factor = factor * scale_unit / float(scale_new_unit) offset /= float(scale_new_unit) return factor, offset @classmethod def _get_factor(cls, new_unit, unit): # Add only the conversions with a factor greater than 1 if unit == new_unit: return 1, 0 elif (unit, new_unit) in cls._dict_conversions: conversion = cls._dict_conversions[(unit, new_unit)] return conversion.factor, conversion.offset elif (new_unit, unit) in cls._dict_conversions: conversion = cls._dict_conversions[(new_unit, unit)] return 1 / conversion.factor, -conversion.offset else: return None, None