If you are using the diagnostics just to CMORize, leave it # empty #DIAGS = discretize,atmos,sfcWind,,0,40 -DIAGS = climpercent,atmos,sfcWind,2010,2012,11 daysover,atmos,sfcWind,2010,2012,11 -#DIAGS = OHC +DIAGS = interpcdo,ocean,tas,r240x121,bilinear,False,ecmwf,False +# DIAGS = monmean,ocean,uovmean0.0-30.0m,day monmean,ocean,vovmean0.0-30.0m,day # Frequency of the data you want to use by default. Some diagnostics do not use this value: i.e. monmean always stores # its results at monthly frequency (obvious) and has a parameter to specify input's frequency. -FREQUENCY = 6hr +FREQUENCY = weekly # Path to CDFTOOLS binaries CDFTOOLS_PATH = ~jvegas/CDFTOOLS/bin # If true, copies the mesh files regardless of presence in scratch dir @@ -73,7 +73,7 @@ SERVER_URL = https://earth.bsc.es/thredds INSTITUTE = ecmwf MODEL = erainterim # Model version: Available versions -MODEL_VERSION = Ec3.2_O1L75 +MODEL_VERSION = # Atmospheric output timestep in hours ATMOS_TIMESTEP = 6 # Ocean output timestep in hours @@ -88,8 +88,9 @@ OCEAN_TIMESTEP = 6 # CHUNK_SIZE is the size of each data file, given in months # CHUNKS is the number of chunks. """ -from cdo import Cdo -from nco import Nco -from earthdiagnostics.cdftools import CDFTools import os -cdo = Cdo() -nco = Nco() -cdftools = CDFTools('/home/Earth/jvegas/CDFTOOLS_3.0/bin') +from earthdiagnostics.cdftools import CDFTools + +cdftools = CDFTools() DEVNULL = open(os.devnull, 'wb') diff --git a/earthdiagnostics/box.py b/earthdiagnostics/box.py index f3dc76693e3130c51c453294f593561cc28cbbec..ea7ccc15feee5661b4cdb0e54ab965fbaf10f33a 100644 --- a/earthdiagnostics/box.py +++ b/earthdiagnostics/box.py @@ -153,10 +153,10 @@ class Box(object): else: suffix = '' - string = str(abs(self.max_depth)) + suffix - if self.min_depth != self.max_depth: - string = '{0}-{1}'.format(str(abs(self.min_depth)), string) + string = '{0:d}-{1:d}{2}'.format(int(abs(self.min_depth)), int(abs(self.max_depth)), suffix) + else: + string = '{0:d}{1}'.format(int(abs(self.max_depth)), suffix) return string diff --git a/earthdiagnostics/cdftools.py b/earthdiagnostics/cdftools.py index 36fffdaacce1d61d790900f21e8c75d626c7ebf9..ad0355c38e811b3051e939d51e7b86629d0ddded 100644 --- a/earthdiagnostics/cdftools.py +++ b/earthdiagnostics/cdftools.py @@ -1,8 +1,10 @@ # coding=utf-8 -from earthdiagnostics.utils import Utils import os -from bscearth.utils.log import Log + import six +from bscearth.utils.log import Log + +from earthdiagnostics.utils import Utils class CDFTools(object): @@ -15,6 +17,7 @@ class CDFTools(object): def __init__(self, path=''): self.path = path + self.data_convention = '' # noinspection PyShadowingBuiltins def run(self, command, input, output=None, options=None, log_level=Log.INFO, input_option=None): @@ -52,6 +55,7 @@ class CDFTools(object): line.append('-o') line.append(output) Log.debug('Executing {0}', ' '.join(line)) + shell_output = Utils.execute_shell_command(line, log_level) self._check_output_was_created(line, output) diff --git a/earthdiagnostics/cmorizer.py b/earthdiagnostics/cmorizer.py index 7cf8c5c354410fc4024f7d73ea2f605f1ecb00f9..4bfde9ded0e205fec05e44eca3eca3cab2231ee1 100644 --- a/earthdiagnostics/cmorizer.py +++ b/earthdiagnostics/cmorizer.py @@ -8,6 +8,7 @@ from datetime import datetime from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months from bscearth.utils.log import Log +from cdo import CDOException from earthdiagnostics.datafile import NetCDFFile from earthdiagnostics.frequency import Frequency, Frequencies @@ -28,7 +29,8 @@ class Cmorizer(object): """ - NON_DATA_VARIABLES = ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'lev_2', 'icethi', + NON_DATA_VARIABLES = ('lon', 'lat', 'longitude', 'latitude', 'plev', 'time', 'time_bnds', 'leadtime', 'lev', + 'lev_2', 'icethi', 'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds', 'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds', 'deptht_bnds', 'depthu_bnds', 'depthv_bnds', 'depthw_bnds', @@ -38,9 +40,6 @@ class Cmorizer(object): 'depth_bnds', 'depth_2_bnds', 'depth_3_bnds', 'depth_4_bnds', 'mlev', 'hyai', 'hybi', 'hyam', 'hybm') - ALT_COORD_NAMES = {'time_counter': 'time', 'time_counter_bnds': 'time_bnds', 'time_counter_bounds': 'time_bnds', - 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', 'y': 'j'} - def __init__(self, data_manager, startdate, member): self.data_manager = data_manager self.startdate = startdate @@ -54,6 +53,17 @@ class Cmorizer(object): self.atmos_timestep = None self.cmor_scratch = str(os.path.join(self.config.scratch_dir, 'CMOR', self.startdate, self.member_str)) + if self.config.data_convention in ('primavera', 'cmip6'): + self.lon_name = 'longitude' + self.lat_name = 'latitude' + else: + self.lon_name = 'lon' + self.lat_name = 'lat' + + self.alt_coord_names = {'time_counter': 'time', 'time_counter_bnds': 'time_bnds', + 'time_counter_bounds': 'time_bnds', + 'tbnds': 'bnds', 'nav_lat': self.lat_name, 'nav_lon': self.lon_name, 'x': 'i', 'y': 'j'} + def cmorize_ocean(self): """ CMORizes ocean files from MMO files @@ -73,7 +83,6 @@ class Cmorizer(object): tar_files.sort() if len(tar_files) > 0: break - if not len(tar_files): Log.error('No {1} files found in {0}'.format(self.original_files_path, args)) @@ -94,6 +103,8 @@ class Cmorizer(object): except Exception as ex: Log.error('Could not CMORize oceanic file {0}: {1}', count, ex) count += 1 + if count > self.experiment.num_chunks: + return def _filter_files(self, file_list): if not self.cmor.filter_files: @@ -248,18 +259,17 @@ class Cmorizer(object): self._obtain_atmos_timestep(gribfile) full_file = self._get_monthly_grib(current_date, gribfile, grid) - self._unpack_grib(full_file, gribfile, grid) + if not self._unpack_grib(full_file, gribfile, grid): + os.remove(gribfile) + return next_gribfile = self.get_original_grib_path(add_months(current_date, 1, self.experiment.calendar), grid) - if not os.path.exists(next_gribfile): os.remove(gribfile) - cdo_reftime = parse_date(self.startdate).strftime('%Y-%m-%d,00:00') - - self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequency('{0}hr'.format(self.atmos_timestep))) - self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequencies.daily) - self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequencies.monthly) + self._ungrib_vars(gribfile, current_date.month, Frequency('{0}hr'.format(self.atmos_timestep))) + self._ungrib_vars(gribfile, current_date.month, Frequencies.daily) + self._ungrib_vars(gribfile, current_date.month, Frequencies.monthly) for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)): os.remove(splited_file) @@ -271,19 +281,37 @@ class Cmorizer(object): self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, '{0}hr'.format(self.atmos_timestep)) - @staticmethod - def _unpack_grib(full_file, gribfile, grid): + def _unpack_grib(self, full_file, gribfile, grid): Log.info('Unpacking... ') # remap on regular Gauss grid - if grid == 'SH': - Utils.cdo.splitparam(input='-sp2gpl {0}'.format(full_file), output=gribfile + '_', options='-f nc4') - else: - Utils.cdo.splitparam(input=full_file, output=gribfile + '_', options='-R -f nc4') - # total precipitation (remove negative values) - Utils.cdo.setcode(228, - input='-setmisstoc,0 -setvrange,0,Inf -add {0}_{{142,143}}.128.nc'.format(gribfile), - output='{0}_228.128.nc'.format(gribfile)) - Utils.remove_file('ICM') + + codes = self.cmor.get_requested_codes() + if 228 in codes: + codes.update(142, 143) + codes_str = ','.join([str(code) for code in codes]) + try: + if grid == 'SH': + Utils.cdo.splitparam(input='-sp2gpl -selcode,{0} {1} '.format(codes_str, full_file), + output=gribfile + '_', + options='-f nc4') + + else: + Utils.cdo.splitparam(input='-selcode,{0} {1}'.format(codes_str, full_file), + output=gribfile + '_', + options='-R -f nc4') + # total precipitation (remove negative values) + if 228 in codes: + Utils.cdo.setcode(228, + input='-setmisstoc,0 -setvrange,0,Inf ' + '-add {0}_{{142,143}}.128.nc'.format(gribfile), + output='{0}_228.128.nc'.format(gribfile), + options='-f nc4') + return True + except CDOException: + Log.info('No requested codes found in {0} file'.format(grid)) + return False + finally: + Utils.remove_file('ICM') def _get_monthly_grib(self, current_date, gribfile, grid): prev_gribfile = self.get_scratch_grib_path(add_months(current_date, -1, self.experiment.calendar), grid) @@ -430,7 +458,6 @@ class Cmorizer(object): netcdf_file.data_convention = self.config.data_convention netcdf_file.region = region - netcdf_file.frequency = frequency netcdf_file.domain = var_cmor.domain netcdf_file.var = var_cmor.short_name @@ -473,11 +500,10 @@ class Cmorizer(object): raise Exception('File {0} start date is not a valid chunk start date'.format(file_path)) return chunk - @staticmethod - def _add_coordinate_variables(handler, temp): + def _add_coordinate_variables(self, handler, temp): handler_cmor = Utils.openCdf(temp) - Utils.copy_variable(handler, handler_cmor, 'lon', False) - Utils.copy_variable(handler, handler_cmor, 'lat', False) + Utils.copy_variable(handler, handler_cmor, self.lon_name, False) + Utils.copy_variable(handler, handler_cmor, self.lat_name, False) if 'time' in handler_cmor.dimensions.keys(): Utils.copy_variable(handler, handler_cmor, 'leadtime', False) handler_cmor.close() @@ -508,12 +534,14 @@ class Cmorizer(object): os.remove('rules_files') Utils.remove_file(prev_gribfile) - def _ungrib_vars(self, cdo_reftime, gribfile, month, frequency): + def _ungrib_vars(self, gribfile, month, frequency): + cdo_reftime = parse_date(self.startdate).strftime('%Y-%m-%d,00:00') + Log.info('Preparing {0} variables'.format(frequency)) var_codes = self.config.cmor.get_variables(frequency) for var_code in var_codes: if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)): - continue + continue new_units = None cdo_operator = '-selmon,{0}'.format(month) @@ -598,14 +626,16 @@ class Cmorizer(object): def _update_time_variables(self, filename): handler = Utils.openCdf(filename) time_var = handler.variables['time'] + if hasattr(time_var, 'calendar'): + calendar = time_var.calendar + else: + calendar = 'standard' if "time_bnds" in handler.variables: time_var.bounds = "time_bnds" handler.variables['time_bnds'].units = time_var.units + Utils.convert_units(handler.variables['time_bnds'], 'days since 1850-01-01 00:00:00', calendar, calendar) + Utils.convert_units(time_var, 'days since 1850-1-1 00:00:00', calendar) handler.close() - temp = TempFile.get() - Utils.cdo.setreftime('1850-01-01,00:00:00,days', input=filename, output=temp) - Utils.move_file(temp, filename) - self._set_leadtime_var(filename) def _set_leadtime_var(self, filename): diff --git a/earthdiagnostics/cmormanager.py b/earthdiagnostics/cmormanager.py index 58ce7e2307b2ff418d670df3c1c0bf721e1be1d6..cfa67caefe57fce5578f933a1057dcd8b0dddcd3 100644 --- a/earthdiagnostics/cmormanager.py +++ b/earthdiagnostics/cmormanager.py @@ -282,8 +282,9 @@ class CMORManager(DataManager): grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid - file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.experiment_name, - self.experiment.model, self._get_member_str(member), + file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model, + self.experiment.experiment_name, + self._get_member_str(member), grid, time_bound) else: raise Exception('Data convention {0} not supported'.format(self.config.data_convention)) @@ -303,8 +304,12 @@ class CMORManager(DataManager): grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid + if cmor_var is None: + table_name = domain.get_table(frequency, self.config.data_convention).name + else: + table_name = cmor_var.get_table(frequency, self.config.data_convention).name folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member), - cmor_var.get_table(frequency, self.config.data_convention).name, var, + table_name, var, grid, self.config.cmor.version) return folder_path @@ -440,44 +445,38 @@ class CMORManager(DataManager): def _unpack_cmor_files(self, startdate, member): if self.config.cmor.force: return False - chunk = 1 cmorized = False - - if not self.config.cmor.force_untar: - while self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \ - self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean): - chunk += 1 - - while self._unpack_chunk(startdate, member, chunk): - chunk += 1 - cmorized = True - - if self.experiment.num_chunks <= chunk: - cmorized = True + for chunk in range(1, self.experiment.num_chunks + 1): + if not self.config.cmor.force_untar: + if self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \ + self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean): + cmorized = True + continue + + if self._unpack_chunk(startdate, member, chunk): + cmorized = True if cmorized: Log.info('Startdate {0} member {1} ready', startdate, member) return cmorized def _unpack_chunk(self, startdate, member, chunk): + if not self.config.cmor.chunk_cmorization_requested(chunk): + return True filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz') if len(filepaths) > 0: - if self.config.cmor.chunk_cmorization_requested(chunk): - Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk) - Utils.unzip(filepaths, True) - else: - return True + Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk) + Utils.unzip(filepaths, True) if not os.path.exists(self.cmor_path): os.mkdir(self.cmor_path) filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar') if len(filepaths) > 0: - if self.config.cmor.chunk_cmorization_requested(chunk): - Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk) - Utils.untar(filepaths, self.cmor_path) - self._correct_paths(startdate) - self.create_links(startdate, member) + Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk) + Utils.untar(filepaths, self.cmor_path) + self._correct_paths(startdate) + self.create_links(startdate, member) return True return False @@ -485,14 +484,16 @@ class CMORManager(DataManager): tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') - file_name = 'CMOR?_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate, - self.experiment.get_member_str(member), - self.experiment.get_chunk_start_str(startdate, chunk), - extension) - filepaths = glob.glob(os.path.join(tar_path, file_name)) - filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) - filepaths += glob.glob(os.path.join(tar_original_files, file_name)) - filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) + filepaths = [] + for cmor_prefix in ('CMOR?', 'CMOR'): + file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate, + self.experiment.get_member_str(member), + self.experiment.get_chunk_start_str(startdate, chunk), + extension, cmor_prefix) + filepaths += glob.glob(os.path.join(tar_path, file_name)) + filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) + filepaths += glob.glob(os.path.join(tar_original_files, file_name)) + filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) return filepaths def _correct_paths(self, startdate): @@ -538,6 +539,16 @@ class CMORManager(DataManager): member_str = None Log.info('Creating links for CMOR files ({0})', startdate) path = self._get_startdate_path(startdate) + if self.config.data_convention.upper() in ('SPECS', 'APPLICATE'): + self._create_links_CMIP5(member_str, path) + elif self.config.data_convention.upper() in ('CMIP6', 'PRIMAVERA'): + self._create_links_CMIP6(member_str, path) + else: + raise ValueError('Dataset convention {0} not supported for massive ' + 'link creation'.format(self.config.data_convention)) + Log.debug('Links ready') + + def _create_links_CMIP5(self, member_str, path): for freq in os.listdir(path): frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): @@ -554,7 +565,66 @@ class CMORManager(DataManager): for filename in os.listdir(filepath): self.create_link(domain, os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN) - Log.debug('Links ready') + + def _create_links_CMIP6(self, member_str, path): + for member in os.listdir(path): + for table in os.listdir(os.path.join(path, member)): + frequency = self.variable_list.tables[table].frequency + domain = None + for var in os.listdir(os.path.join(path, member, table)): + for grid in os.listdir(os.path.join(path, member, table, var)): + if member_str != member: + continue + for name in os.listdir(os.path.join(path, member, table, var, grid)): + filepath = os.path.join(path, member, table, var, grid, name) + if os.path.isfile(filepath): + original_handler = Utils.openCdf(filepath) + if original_handler.dimensions['i'].size < original_handler.dimensions['j'].size: + original_handler.close() + Utils.rename_variables(filepath, {'i': 'j', 'j': 'i'}, False, True) + else: + original_handler.close() + self.create_link(domain, filepath, frequency, var, "", False, + vartype=VariableType.MEAN) + else: + for filename in os.listdir(filepath): + if not filename.endswith('.nc') or filename.startswith('.'): + return + cmorfile = os.path.join(filepath, filename) + self._fix_ij_swap(cmorfile) + self.create_link(domain, cmorfile, frequency, var, "", + False, vartype=VariableType.MEAN) + + def _fix_ij_swap(self, cmorfile): + return + original_handler = Utils.openCdf(cmorfile) + if original_handler.dimensions['i'].size < original_handler.dimensions['j'].size: + temp = TempFile.get() + new_handler = Utils.openCdf(temp, 'w') + for attribute in original_handler.ncattrs(): + original = getattr(original_handler, attribute) + setattr(new_handler, attribute, + Utils.convert_to_ASCII_if_possible(original)) + for dimension in original_handler.dimensions.keys(): + if dimension == 'i': + new_name = 'j' + elif dimension == 'j': + new_name = 'i' + else: + new_name = dimension + new_handler.createDimension(new_name, original_handler.dimensions[dimension].size) + for variable in original_handler.variables.keys(): + original_var = original_handler.variables[variable] + translated_dimensions = Utils._translate(original_var.dimensions, + {'i': 'j', 'j': 'i'}) + new_var = new_handler.createVariable(variable, original_var.datatype, + translated_dimensions) + Utils.copy_attributes(new_var, original_var) + new_var[:] = original_var[:] + original_handler.close() + new_handler.close() + Utils.move_file(temp, cmorfile, save_hash=True) + Log.debug('File {0} translated', cmorfile) def _get_startdate_path(self, startdate): """ diff --git a/earthdiagnostics/config.py b/earthdiagnostics/config.py index 133bd6daa4945a79fd7a8c97c8f738cd05e2cfd7..f3c8d63876bb289f68ccfb385af2d52c7ba01e30 100644 --- a/earthdiagnostics/config.py +++ b/earthdiagnostics/config.py @@ -6,6 +6,7 @@ from bscearth.utils.config_parser import ConfigParser from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str from bscearth.utils.log import Log +from earthdiagnostics import cdftools from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.variable import VariableManager from modelingrealm import ModelingRealm @@ -63,9 +64,20 @@ class Config(object): self.data_convention = parser.get_choice_option('DIAGNOSTICS', 'DATA_CONVENTION', ('specs', 'primavera', 'cmip6', 'preface'), 'specs', ignore_case=True) + + if self.data_convention in ('primavera', 'cmip6'): + self.scratch_masks = os.path.join(self.scratch_masks, self.data_convention) + cdftools.data_convention = self.data_convention + namelist_file = os.path.join(os.path.dirname(__file__), 'CDFTOOLS_{0}.namlist'.format(self.data_convention)) + Log.debug(namelist_file) + if os.path.isfile(namelist_file): + Log.debug('Setting namelist {0}', namelist_file) + os.environ['NAM_CDF_NAMES'] = namelist_file + self.var_manager = VariableManager() self.var_manager.load_variables(self.data_convention) self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS') + self.skip_diags_done = parser.get_bool_option('DIAGNOSTICS', 'SKIP_DIAGS_DONE', True) self.frequency = Frequency(parser.get_option('DIAGNOSTICS', 'FREQUENCY')) "Default data frequency to be used by the diagnostics" @@ -233,6 +245,9 @@ class CMORConfig(object): return self._var_monthly raise ValueError('Frequency not recognized: {0}'.format(frequency)) + def get_requested_codes(self): + return set(list(self._var_hourly.keys()) + list(self._var_daily.keys()) + list(self._var_monthly.keys())) + def get_levels(self, frequency, variable): return self.get_variables(frequency)[variable] diff --git a/earthdiagnostics/constants.py b/earthdiagnostics/constants.py index c1e51013879b1417bbb0960b3963bc949a2d691d..229a61b0f0bbc0f92cca9895c34646f51e92b145 100644 --- a/earthdiagnostics/constants.py +++ b/earthdiagnostics/constants.py @@ -3,6 +3,7 @@ Contains the enumeration-like classes used by the diagnostics """ import netCDF4 + from singleton import SingletonType @@ -47,6 +48,7 @@ class Basins(object): def __init__(self): self.aliases = { + 'Global': ('Global', 'glob'), 'Atlantic_Ocean': ('atl', 'atlantic'), 'North_Atlantic_Ocean': ('natl', 'north_atlantic'), 'Tropical_Atlantic_Ocean': ('tatl', 'tropical_atlantic'), @@ -125,7 +127,9 @@ class Basins(object): self.Indian = Basin('Indian_Ocean') self._known_aliases = {} self._add_alias('glob', self.Global) - self._add_alias(self.Global.name, self.Global) + for basin in (self.Global, self.Atlantic, self.Pacific, self.IndoPacific, self.Indian): + for alias in self.aliases[basin.name]: + self._add_alias(alias, basin) def get_available_basins(self, handler): """ @@ -135,7 +139,7 @@ class Basins(object): """ basin_names = handler.variables.keys() - ignored_names = ('lat', 'lon', 'i', 'j', 'time', 'lev') + ignored_names = ('lat', 'latitude', 'lon', 'longitude', 'i', 'j', 'time', 'lev') for basin in basin_names: if basin in ignored_names: diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index ddaf1b102e54e16dc8bce3e3faaf5cbd3f4adb50..3b38e0771a390de9a8f7cecf65debbe672f5ea2b 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -4,6 +4,7 @@ import os import shutil from datetime import datetime +import iris import numpy as np from bscearth.utils.log import Log @@ -51,22 +52,30 @@ class DataFile(Publisher): self._storage_status = StorageStatus.READY self.job_added = False self._modifiers = [] + self._size = None def __str__(self): return 'Data file for {0}'.format(self.remote_file) def unsubscribe(self, who): super(DataFile, self).unsubscribe(who) - self._clean_local() @property def size(self): - if self.local_status == LocalStatus.READY: - os.path.getsize(self.local_file) - return None + if self._size is None: + self._get_size() + return self._size - def _clean_local(self): - if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required(): + def _get_size(self): + try: + if self.local_status == LocalStatus.READY: + self._size = os.path.getsize(self.local_file) + except Exception: + self._size = None + + def clean_local(self): + if self.local_status != LocalStatus.READY or len(self.suscribers) > 0 or self.upload_required() or \ + self.storage_status == StorageStatus.UPLOADING: return Log.debug('File {0} no longer needed. Deleting from scratch...'.format(self.remote_file)) os.remove(self.local_file) @@ -74,6 +83,11 @@ class DataFile(Publisher): self.local_file = None self.local_status = LocalStatus.PENDING + def only_suscriber(self, who): + if len(self._subscribers) != 1: + return + return who in self._subscribers + def upload_required(self): return self.local_status == LocalStatus.READY and self.storage_status == StorageStatus.PENDING @@ -109,6 +123,7 @@ class DataFile(Publisher): if self._local_status == value: return self._local_status = value + self._size = None self.dispatch(self) @property @@ -120,6 +135,7 @@ class DataFile(Publisher): if self._storage_status == value: return self._storage_status = value + self._size = None self.dispatch(self) @classmethod @@ -140,6 +156,13 @@ class DataFile(Publisher): raise NotImplementedError('Class must implement the download method') def prepare_to_upload(self, rename_var): + if self.data_convention in ('primavera', 'cmip6'): + self.lon_name = 'longitude' + self.lat_name = 'latitude' + else: + self.lon_name = 'longitude' + self.lat_name = 'latitude' + Utils.convert2netcdf4(self.local_file) if rename_var: original_name = rename_var @@ -150,8 +173,9 @@ class DataFile(Publisher): self._rename_coordinate_variables() self._correct_metadata() self._prepare_region() - self.add_diagnostic_history() + if self.region is not None: + self.upload() def upload(self): self.storage_status = StorageStatus.UPLOADING @@ -168,11 +192,14 @@ class DataFile(Publisher): except Exception as ex: Log.warning('Link for file {0} can not be created: {1}', self.remote_file, ex) self.storage_status = StorageStatus.READY - self._clean_local() - def set_local_file(self, local_file, diagnostic=None, rename_var=''): + def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None): if diagnostic in self._modifiers: self._modifiers.remove(diagnostic) + if region is not None: + self.region = region.name + else: + self.region = None self.local_file = local_file self.prepare_to_upload(rename_var) self.local_status = LocalStatus.READY @@ -183,7 +210,7 @@ class DataFile(Publisher): def _correct_metadata(self): handler = Utils.openCdf(self.local_file) var_handler = handler.variables[self.final_name] - coords = set.intersection({'time', 'lev', 'lat', 'lon'}, set(handler.variables.keys())) + coords = set.intersection({'time', 'lev', self.lat_name, self.lon_name}, set(handler.variables.keys())) var_handler.coordinates = ' '.join(coords) if not self.cmor_var: handler.close() @@ -204,18 +231,21 @@ class DataFile(Publisher): def _fix_variable_name(self, var_handler): var_handler.standard_name = self.cmor_var.standard_name var_handler.long_name = self.cmor_var.long_name - # var_handler.short_name = self.cmor_var.short_name - def _fix_values_metadata(self, var_type): - if self.cmor_var.valid_min != '': - valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) - else: - valid_min = '' - if self.cmor_var.valid_max != '': - valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) - else: - valid_max = '' - Utils.nco.ncatted(input=self.local_file, output=self.local_file, + def _fix_values_metadata(self, var_type, file_path=None): + if file_path is None: + file_path = self.local_file + valid_min = '' + valid_max = '' + + if self.cmor_var is not None: + if self.cmor_var.valid_min: + valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_min) + + if self.cmor_var.valid_max: + valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(self.final_name, var_type.char, self.cmor_var.valid_max) + + Utils.nco.ncatted(input=file_path, output=file_path, options=('-O -a _FillValue,{0},o,{1},"1.e20" ' '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char, valid_min, valid_max),)) @@ -225,12 +255,12 @@ class DataFile(Publisher): handler.variables['lev'].short_name = 'lev' if self.domain == ModelingRealms.ocean: handler.variables['lev'].standard_name = 'depth' - if 'lon' in handler.variables: - handler.variables['lon'].short_name = 'lon' - handler.variables['lon'].standard_name = 'longitude' - if 'lat' in handler.variables: - handler.variables['lat'].short_name = 'lat' - handler.variables['lat'].standard_name = 'latitude' + if self.lon_name in handler.variables: + handler.variables[self.lon_name].short_name = self.lon_name + handler.variables[self.lon_name].standard_name = 'longitude' + if self.lat_name in handler.variables: + handler.variables[self.lat_name].short_name = self.lat_name + handler.variables[self.lat_name].standard_name = 'latitude' def _fix_units(self, var_handler): if 'units' not in var_handler.ncattrs(): @@ -269,14 +299,48 @@ class DataFile(Publisher): self._update_var_with_region_data() self._correct_metadata() Utils.nco.ncks(input=self.local_file, output=self.local_file, options=['--fix_rec_dmn region']) + handler = Utils.openCdf(self.local_file) + regions = handler.variables['region'][...].tolist() + if len(regions) > 1: + ordered_regions = sorted(regions) + print(regions) + print(ordered_regions) + new_indexes = [regions.index(region) for region in ordered_regions] + print(new_indexes) + + for var in handler.variables.values(): + if 'region' not in var.dimensions: + continue + index_region = var.dimensions.index('region') + var_values = var[...] + var_ordered = np.take(var_values, new_indexes, index_region) + var[...] = var_ordered + handler.close() def _update_var_with_region_data(self): temp = TempFile.get() shutil.copyfile(self.remote_file, temp) + handler = Utils.openCdf(temp) + var_handler = handler.variables[self.final_name] + var_type = var_handler.dtype + handler.close() + self._fix_values_metadata(var_type, temp) + Utils.nco.ncks(input=temp, output=temp, options=['--mk_rec_dmn region']) handler = Utils.openCdf(temp) - handler_send = Utils.openCdf(self.local_file) - value = handler_send.variables[self.final_name][:] + var_handler = handler.variables[self.final_name] + if hasattr(var_handler, 'valid_min'): + del var_handler.valid_min + if hasattr(var_handler, 'valid_max'): + del var_handler.valid_max + handler.sync() + cubes = iris.load(self.local_file) + for cube in cubes: + if self.final_name == cube.var_name: + value = cube.data + break + if isinstance(value, np.ma.MaskedArray): + value = np.ma.getdata(value) var_region = handler.variables['region'] basin_index = np.where(var_region[:] == self.region) if len(basin_index[0]) == 0: @@ -285,9 +349,8 @@ class DataFile(Publisher): else: basin_index = basin_index[0][0] - handler.variables[self.final_name][..., basin_index] = value + handler.variables[self.final_name][..., basin_index] = np.multiply(np.ones(value.shape), value) handler.close() - handler_send.close() Utils.move_file(temp, self.local_file) def _add_region_dimension_to_var(self): @@ -302,19 +365,19 @@ class DataFile(Publisher): value = original_var[:] new_var[..., 0] = value handler.close() - Utils.nco.ncks(input=self.local_file, output=self.local_file, options='-O -x -v {0}'.format(self.final_name)) + Utils.nco.ncks(input=self.local_file, output=self.local_file, options=('-x -v {0}'.format(self.final_name),)) Utils.rename_variable(self.local_file, 'new_var', self.final_name) def _rename_coordinate_variables(self): variables = dict() variables['x'] = 'i' variables['y'] = 'j' - variables['nav_lat_grid_V'] = 'lat' - variables['nav_lon_grid_V'] = 'lon' - variables['nav_lat_grid_U'] = 'lat' - variables['nav_lon_grid_U'] = 'lon' - variables['nav_lat_grid_T'] = 'lat' - variables['nav_lon_grid_T'] = 'lon' + variables['nav_lat_grid_V'] = self.lat_name + variables['nav_lon_grid_V'] = self.lon_name + variables['nav_lat_grid_U'] = self.lat_name + variables['nav_lon_grid_U'] = self.lon_name + variables['nav_lat_grid_T'] = self.lat_name + variables['nav_lon_grid_T'] = self.lon_name Utils.rename_variables(self.local_file, variables, False, True) def add_diagnostic_history(self): @@ -464,12 +527,13 @@ class NetCDFFile(DataFile): except Exception as ex: Log.error('Can not create link to {1}: {0}'.format(ex, self.remote_file)) - @property - def size(self): - if self.local_status == LocalStatus.READY: - return os.path.getsize(self.local_file) - if self.storage_status == StorageStatus.READY: - return os.path.getsize(self.remote_file) - return None + def _get_size(self): + try: + if self.local_status == LocalStatus.READY: + self._size = os.path.getsize(self.local_file) + if self.storage_status == StorageStatus.READY: + self._size = os.path.getsize(self.remote_file) + except Exception: + self._size = None diff --git a/earthdiagnostics/datamanager.py b/earthdiagnostics/datamanager.py index e3799d31ef13dc1ac1aaf7396ee3cbb1e8914f6d..b4ab012486095e8058845a99b50abada456fdb11 100644 --- a/earthdiagnostics/datamanager.py +++ b/earthdiagnostics/datamanager.py @@ -58,14 +58,20 @@ class DataManager(object): var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() return var - def get_varfolder(self, domain, var, grid=None): + def get_varfolder(self, domain, var, grid=None, frequency=None): if grid: var = '{0}-{1}'.format(var, grid) if domain in [ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem]: - return '{0}_f{1}h'.format(var, self.experiment.ocean_timestep) + return self._apply_fxh(var, self.experiment.ocean_timestep, frequency) else: - return '{0}_f{1}h'.format(var, self.experiment.atmos_timestep) + return self._apply_fxh(var, self.experiment.atmos_timestep, frequency) + + def _apply_fxh(self, folder_name, timestep, frequency=None): + is_base_frequency = frequency is not None and frequency.frequency.endswith('hr') + if not is_base_frequency and timestep > 0: + return '{0}_f{1}h'.format(folder_name, timestep) + return folder_name def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): freq_str = frequency.folder_name(vartype) diff --git a/earthdiagnostics/diagnostic.py b/earthdiagnostics/diagnostic.py index 4e90373dbbd6e79389ad0d9210c2751bbac1956e..0b1dbf57a346ca408883b904281ed6deff6c8502 100644 --- a/earthdiagnostics/diagnostic.py +++ b/earthdiagnostics/diagnostic.py @@ -1,12 +1,14 @@ # coding=utf-8 import datetime -from datafile import StorageStatus, LocalStatus +from bscearth.utils.log import Log + from earthdiagnostics.constants import Basins, Basin +from earthdiagnostics.datafile import StorageStatus, LocalStatus from earthdiagnostics.frequency import Frequency from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.publisher import Publisher from earthdiagnostics.variable_type import VariableType -from publisher import Publisher class DiagnosticStatus(object): @@ -41,6 +43,15 @@ class Diagnostic(Publisher): self.consumed_time = datetime.timedelta() self.subjobs = [] + def can_skip_run(self): + for file_generated in self._generated_files: + if file_generated.storage_status != StorageStatus.READY: + return False + if file_generated.has_modifiers(): + Log.warning('Can not skip diagnostics run when data is going to be modified: {0}'.format(self)) + return False + + def __repr__(self): return str(self) @@ -136,6 +147,8 @@ class Diagnostic(Publisher): region = region.name generated_chunk = self.data_manager.declare_chunk(domain, var, startdate, member, chunk, grid, region, box, diagnostic=self, vartype=vartype, frequency=frequency) + if region is not None: + generated_chunk.add_modifier(self) self._generated_files.append(generated_chunk) return generated_chunk @@ -199,6 +212,19 @@ class Diagnostic(Publisher): """ return 'Developer must override base class __str__ method' + def add_subjob(self, subjob): + """ + Adds a subjob + :param subjob: + :type subjob: Diagnostic + :return: + """ + self.subjobs.append(subjob) + subjob.subscribe(self, self._subjob_status_changed) + + def _subjob_status_changed(self, job): + self.check_is_ready() + def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, to_modify=False, vartype=VariableType.MEAN): request = self.data_manager.request_chunk(domain, var, startdate, member, chunk, grid, box, frequency, vartype) @@ -229,7 +255,8 @@ class Diagnostic(Publisher): self.check_is_ready() def check_is_ready(self): - if all([request.ready_to_run(self) for request in self._requests]): + if all([request.ready_to_run(self) for request in self._requests]) and\ + all([subjob.status == DiagnosticStatus.COMPLETED for subjob in self.subjobs]): self.status = DiagnosticStatus.READY def _unsuscribe_requests(self): @@ -240,7 +267,8 @@ class Diagnostic(Publisher): return self.pending_requests() == 0 def pending_requests(self): - return len([request.storage_status != StorageStatus.READY for request in self._requests]) + return len([request.storage_status != StorageStatus.READY or request.local_status != LocalStatus.READY + for request in self._requests]) class DiagnosticOption(object): @@ -352,7 +380,7 @@ class DiagnosticVariableListOption(DiagnosticOption): def parse(self, option_value): option_value = self.check_default(option_value) var_names = [] - for value in option_value.split('-'): + for value in option_value.split(':'): real_name = self.var_manager.get_variable(value, False) if real_name is None: var_names.append(value) @@ -379,7 +407,11 @@ class DiagnosticFrequencyOption(DiagnosticOption): class DiagnosticBasinOption(DiagnosticOption): def parse(self, option_value): - return Basins().parse(self.check_default(option_value)) + value = self.check_default(option_value) + basin = Basins().parse(value) + if basin is None: + raise DiagnosticOptionError('Basin {0} not recognized'.format(value)) + return basin class DiagnosticComplexStrOption(DiagnosticOption): diff --git a/earthdiagnostics/earthdiags.py b/earthdiagnostics/earthdiags.py index b9ef5aa2935518a28a7094b32e64be73afc7c14e..44c7fba56a2bf986fb916bc7d7928f966e9bbca4 100755 --- a/earthdiagnostics/earthdiags.py +++ b/earthdiagnostics/earthdiags.py @@ -129,8 +129,12 @@ class EarthDiags(object): self.dic_variables['x'] = 'i' self.dic_variables['y'] = 'j' self.dic_variables['z'] = 'lev' - self.dic_variables['nav_lon'] = 'lon' - self.dic_variables['nav_lat'] = 'lat' + if self.config.data_convention.lower() in ['primavera', 'cmip6']: + self.dic_variables['nav_lon'] = 'longitude' + self.dic_variables['nav_lat'] = 'latitude' + else: + self.dic_variables['nav_lon'] = 'lon' + self.dic_variables['nav_lat'] = 'lat' self.dic_variables['nav_lev'] = 'lev' self.dic_variables['time_counter'] = 'time' self.dic_variables['t'] = 'time' @@ -370,7 +374,10 @@ class EarthDiags(object): if os.path.lexists(destiny): try: os.remove(destiny) - except OSError: + except OSError as ex: + if ex.errno == 13: #Permission denied + Log.info('Link already created') + return pass os.symlink(source, destiny) diff --git a/earthdiagnostics/frequency.py b/earthdiagnostics/frequency.py index 09ebe3de5d3b3e972cadcd60b9ff68308c1e81fc..9a95f26e00fabd86d278eb43df8687a27209f4b8 100644 --- a/earthdiagnostics/frequency.py +++ b/earthdiagnostics/frequency.py @@ -9,6 +9,7 @@ class Frequency(object): 'dec': 'dec', 'decadal': 'dec', 'y': 'year', 'yr': 'year', 'year': 'year', 'yearly': 'year', 'm': 'mon', '1m': 'mon', 'mon': 'mon', 'monthly': 'mon', 'mm': 'mon', + 'w': 'week', '1w': 'week', 'week': 'week', 'weekly': 'week', 'd': 'day', '1d': 'day', 'daily': 'day', 'day': 'day', '15': '15hr', '15h': '15hr', '15hr': '15hr', '15_hourly': '15hr', '15hourly': '15hr', '15 hourly': '15hr', @@ -28,12 +29,17 @@ class Frequency(object): def __eq__(self, other): return self.frequency == other.frequency + def __ne__(self, other): + return not self == other + def __str__(self): return self.frequency def folder_name(self, vartype): if self == Frequencies.daily: freq_str = 'daily_{0}'.format(VariableType.to_str(vartype)) + elif self == Frequencies.weekly: + freq_str = 'weekly_{0}'.format(VariableType.to_str(vartype)) elif self == Frequencies.climatology: freq_str = 'clim' elif self in (Frequencies.three_hourly, Frequencies.six_hourly, Frequencies.hourly): @@ -56,6 +62,7 @@ class Frequencies(object): climatology = Frequency('clim') yearly = Frequency('year') monthly = Frequency('mon') + weekly = Frequency('week') daily = Frequency('day') six_hourly = Frequency('6hr') three_hourly = Frequency('3hr') diff --git a/earthdiagnostics/general/simplify_dimensions.py b/earthdiagnostics/general/simplify_dimensions.py index 579a5473faf0dcc0804ba30c361688de6b03c535..c903af0a7258259efdadf8ef6f415f9400eb8f34 100644 --- a/earthdiagnostics/general/simplify_dimensions.py +++ b/earthdiagnostics/general/simplify_dimensions.py @@ -1,9 +1,10 @@ # coding=utf-8 +import numpy as np + from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, \ DiagnosticVariableListOption from earthdiagnostics.modelingrealm import ModelingRealm from earthdiagnostics.utils import Utils, TempFile -import numpy as np class SimplifyDimensions(Diagnostic): @@ -32,7 +33,7 @@ class SimplifyDimensions(Diagnostic): alias = 'simdim' "Diagnostic alias for the configuration file" - def __init__(self, data_manager, startdate, member, chunk, domain, variable, grid): + def __init__(self, data_manager, startdate, member, chunk, domain, variable, grid, data_convention): Diagnostic.__init__(self, data_manager) self.startdate = startdate self.member = member @@ -40,6 +41,12 @@ class SimplifyDimensions(Diagnostic): self.variable = variable self.domain = domain self.grid = grid + if data_convention in ('cmip6', 'primavera'): + self.lon_name = 'longitude' + self.lat_name = 'latitude' + else: + self.lon_name = 'lon' + self.lat_name = 'lat' def __str__(self): return 'Simplify dimension Startdate: {0} Member: {1} Chunk: {2} ' \ @@ -71,7 +78,8 @@ class SimplifyDimensions(Diagnostic): for startdate, member, chunk in diags.config.experiment.get_chunk_list(): job_list.append(SimplifyDimensions(diags.data_manager, startdate, member, chunk, - options['domain'], var, options['grid'])) + options['domain'], var, options['grid'], + diags.config.data_convention)) return job_list def request_data(self): @@ -89,14 +97,14 @@ class SimplifyDimensions(Diagnostic): handler = Utils.openCdf(self.variable_file.local_file) if 'i' not in handler.dimensions: raise Exception('Variable {0.domain}:{0.variable} does not have i,j dimensions'.format(self)) - lat = handler.variables['lat'] + lat = handler.variables[self.lat_name] lat_values = lat[:, 0:1] # noinspection PyTypeChecker if np.any((lat[:] - lat_values) != 0): raise Exception('Latitude is not constant over i dimension for variable ' '{0.domain}:{0.variable}'.format(self)) - lon = handler.variables['lon'] + lon = handler.variables[self.lon_name] lon_values = lon[0:1, :] # noinspection PyTypeChecker if np.any((lon[:] - lon) != 0): @@ -106,21 +114,22 @@ class SimplifyDimensions(Diagnostic): temp = TempFile.get() new_file = Utils.openCdf(temp, 'w') for dim in handler.dimensions.keys(): - if dim in ('lat', 'lon', 'i', 'j', 'vertices'): + if dim in (self.lon_name, self.lat_name, 'i', 'j', 'vertices'): continue - Utils.copy_dimension(handler, new_file, dim, new_names={'i': 'lon', 'j': 'lat'}) + Utils.copy_dimension(handler, new_file, dim, new_names={'i': self.lon_name, 'j': self.lat_name}) - new_file.createDimension('lon', handler.dimensions['i'].size) - new_file.createDimension('lat', handler.dimensions['j'].size) + new_file.createDimension(self.lon_name, handler.dimensions['i'].size) + new_file.createDimension(self.lat_name, handler.dimensions['j'].size) new_file.createDimension('vertices', 2) for var in handler.variables.keys(): - if var in ('lat', 'lon', 'i', 'j', 'lat_vertices', 'lon_vertices'): + if var in (self.lon_name, self.lat_name, 'i', 'j', + '{0}_vertices'.format(self.lon_name), '{0}_vertices'.format(self.lat_name)): continue - Utils.copy_variable(handler, new_file, var, new_names={'i': 'lon', 'j': 'lat'}) + Utils.copy_variable(handler, new_file, var, new_names={'i': self.lon_name, 'j': self.lat_name}) - self._create_var('lon', lon_values, handler, new_file) - self._create_var('lat', lat_values, handler, new_file) + self._create_var(self.lon_name, lon_values, handler, new_file) + self._create_var(self.lat_name, lat_values, handler, new_file) handler.close() new_file.close() diff --git a/earthdiagnostics/general/verticalmeanmetersiris.py b/earthdiagnostics/general/verticalmeanmetersiris.py index 92de0bef9d587e16d2d452f0fc88b9b74fba8f95..23dce4d8c686047e3f67385d5367ac2789725eaf 100644 --- a/earthdiagnostics/general/verticalmeanmetersiris.py +++ b/earthdiagnostics/general/verticalmeanmetersiris.py @@ -4,10 +4,10 @@ import iris.analysis import iris.exceptions from earthdiagnostics.box import Box -from earthdiagnostics.diagnostic import Diagnostic, DiagnosticFloatOption, DiagnosticDomainOption, \ - DiagnosticVariableOption -from earthdiagnostics.utils import TempFile +from earthdiagnostics.diagnostic import Diagnostic, DiagnosticFloatOption, \ + DiagnosticDomainOption, DiagnosticVariableListOption from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import TempFile class VerticalMeanMetersIris(Diagnostic): @@ -66,10 +66,10 @@ class VerticalMeanMetersIris(Diagnostic): :type options: list[str] :return: """ - options_available = (DiagnosticVariableOption(diags.data_manager.config.var_manager), + options_available = (DiagnosticDomainOption(), + DiagnosticVariableListOption(diags.data_manager.config.var_manager, 'variable'), DiagnosticFloatOption('min_depth', -1), - DiagnosticFloatOption('max_depth', -1), - DiagnosticDomainOption(default_value=ModelingRealms.ocean)) + DiagnosticFloatOption('max_depth', -1)) options = cls.process_options(options, options_available) box = Box(True) @@ -79,9 +79,10 @@ class VerticalMeanMetersIris(Diagnostic): box.max_depth = options['max_depth'] job_list = list() - for startdate, member, chunk in diags.config.experiment.get_chunk_list(): - job_list.append(VerticalMeanMetersIris(diags.data_manager, startdate, member, chunk, - options['domain'], options['variable'], box)) + for var in options['variable']: + for startdate, member, chunk in diags.config.experiment.get_chunk_list(): + job_list.append(VerticalMeanMetersIris(diags.data_manager, startdate, member, chunk, + options['domain'], var, box)) return job_list def request_data(self): diff --git a/earthdiagnostics/obsreconmanager.py b/earthdiagnostics/obsreconmanager.py index 661986c9b9d515045e1aca11b56b126c7a1478b9..356eac7732ec956711e95fc30b43839ffd1331a6 100644 --- a/earthdiagnostics/obsreconmanager.py +++ b/earthdiagnostics/obsreconmanager.py @@ -4,6 +4,7 @@ import os from bscearth.utils.log import Log from earthdiagnostics.datamanager import DataManager +from earthdiagnostics.frequency import Frequencies from earthdiagnostics.variable_type import VariableType @@ -88,21 +89,15 @@ class ObsReconManager(DataManager): """ if not frequency: frequency = self.config.frequency - var = self._get_final_var_name(box, var) folder_path = self._get_folder_path(frequency, domain, var, grid, vartype) - file_name = self._get_file_name(var, startdate) + file_name = self._get_file_name(var, startdate, frequency) filepath = os.path.join(folder_path, file_name) return filepath def _get_folder_path(self, frequency, domain, variable, grid, vartype): - - if not frequency.frequency.endswith('hr'): - var_folder = self.get_varfolder(domain, variable, grid) - else: - var_folder = variable - + var_folder = self.get_varfolder(domain, variable, grid, frequency) folder_path = os.path.join(self.config.data_dir, self.config.data_type, self.experiment.institute.lower(), self.experiment.model.lower(), @@ -153,12 +148,12 @@ class ObsReconManager(DataManager): var = self._get_final_var_name(box, var) full_path = os.path.join(self.config.data_dir, self.config.data_type, self.experiment.institute, self.experiment.model, frequency.folder_name(vartype)) - full_path = os.path.join(full_path, var, self._get_file_name(var, startdate)) + full_path = os.path.join(full_path, var, self._get_file_name(var, startdate, frequency)) return full_path - def _get_file_name(self, var, startdate): + def _get_file_name(self, var, startdate, frequency): if startdate: - if self.config.data_type != 'exp': + if self.config.data_type != 'exp' and frequency != Frequencies.weekly: startdate = startdate[0:6] return '{0}_{1}.nc'.format(var, startdate) else: @@ -222,7 +217,7 @@ class ObsReconManager(DataManager): :rtype: str """ var = self._get_final_var_name(box, var) - filepath = self.get_file_path(startdate, domain, var, frequency, vartype, grid, box) + filepath = self.get_file_path(startdate, domain, var, frequency, vartype, box, grid) Log.debug('{0} requested', filepath) return self._get_file_from_storage(filepath) @@ -261,7 +256,7 @@ class ObsReconManager(DataManager): cmor_var = self.variable_list.get_variable(var) if cmor_var: var = cmor_var.short_name - final_name = var + final_name = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, domain, final_name, frequency, vartype, box, grid) netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention, diff --git a/earthdiagnostics/ocean/areamoc.py b/earthdiagnostics/ocean/areamoc.py index 80c18474a3cb5f3d163467a3f73e04a052edf20c..c73895fd434d03ee6d726cdfbd51434fc5dadb87 100644 --- a/earthdiagnostics/ocean/areamoc.py +++ b/earthdiagnostics/ocean/areamoc.py @@ -1,12 +1,13 @@ # coding=utf-8 +import os + import numpy as np + +from earthdiagnostics.box import Box from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import Diagnostic, DiagnosticIntOption, DiagnosticBasinOption -from earthdiagnostics.box import Box -from earthdiagnostics.utils import Utils, TempFile -import os - from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile class AreaMoc(Diagnostic): @@ -116,10 +117,15 @@ class AreaMoc(Diagnostic): handler = Utils.openCdf(temp) basin_index = np.where(handler.variables['basin'][:] == self.basin.name) - lat_values = handler.variables['lat'][:] - lat_type = handler.variables['lat'].dtype - lat_units = handler.variables['lat'].units - lat_long_name = handler.variables['lat'].long_name + if 'lat' in handler.variables: + lat_name = 'lat' + else: + lat_name = 'latitude' + var_lat = handler.variables[lat_name] + lat_values = var_lat[:] + lat_type = var_lat.dtype + lat_units = var_lat.units + lat_long_name = var_lat.long_name handler.close() @@ -134,29 +140,31 @@ class AreaMoc(Diagnostic): Utils.copy_dimension(source, destiny, 'time') Utils.copy_dimension(source, destiny, 'lev') - Utils.copy_dimension(source, destiny, 'j', new_names={'j': 'lat'}) + Utils.copy_dimension(source, destiny, 'j', new_names={'j': lat_name}) - lat_variable = destiny.createVariable('lat', lat_type, 'lat') + lat_variable = destiny.createVariable(lat_name, lat_type, lat_name) lat_variable[:] = lat_values[:] lat_variable.units = lat_units lat_variable.long_name = lat_long_name Utils.copy_variable(source, destiny, 'lev') Utils.copy_variable(source, destiny, 'time') - Utils.copy_variable(source, destiny, 'vsftmyz', new_names={'j': 'lat'}) + Utils.copy_variable(source, destiny, 'vsftmyz', new_names={'j': lat_name}) source.close() destiny.close() nco.ncks(input=temp2, output=temp, - options='-O -d lev,{0:.1f},{1:.1f} -d lat,{2:.1f},{3:.1f}'.format(self.box.min_depth, - self.box.max_depth, - self.box.min_lat, - self.box.max_lat)) + options=('-d lev,{0:.1f},{1:.1f} -d {4},{2:.1f},{3:.1f}'.format(self.box.min_depth, + self.box.max_depth, + self.box.min_lat, + self.box.max_lat, + lat_name),)) cdo.vertmean(input=temp, output=temp2) os.remove(temp) - nco.ncap2(input=temp2, output=temp2, options='-O -s "coslat[lat]=cos(lat[lat]*3.141592657/180.0)"') - nco.ncwa(input=temp2, output=temp2, options='-w coslat -a lat') - nco.ncks(input=temp2, output=temp2, options='-O -v vsftmyz,time') + nco.ncap2(input=temp2, output=temp2, + options=('-s "coslat[{0}]=cos({0}[{0}]*3.141592657/180.0)"'.format(lat_name),)) + nco.ncwa(input=temp2, output=temp2, options=('-w coslat -a {0}'.format(lat_name),)) + nco.ncks(input=temp2, output=temp2, options=('-v vsftmyz,time',)) self.results.set_local_file(temp2) diff --git a/earthdiagnostics/ocean/heatcontent.py b/earthdiagnostics/ocean/heatcontent.py index 31ac9268b1e302515744091ec3ab87b6d1c08a9a..81ef23ccd1cdcc0d19483bd540ca13bef7ef5610 100644 --- a/earthdiagnostics/ocean/heatcontent.py +++ b/earthdiagnostics/ocean/heatcontent.py @@ -1,13 +1,14 @@ # coding=utf-8 import shutil +import numpy as np + from earthdiagnostics import cdftools +from earthdiagnostics.box import Box from earthdiagnostics.constants import Basins -from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.diagnostic import Diagnostic, DiagnosticBasinOption, DiagnosticIntOption -from earthdiagnostics.box import Box from earthdiagnostics.modelingrealm import ModelingRealms -import numpy as np +from earthdiagnostics.utils import Utils, TempFile class HeatContent(Diagnostic): @@ -50,6 +51,8 @@ class HeatContent(Diagnostic): self.max_level = max_level def __eq__(self, other): + if not isinstance(other, HeatContent): + return False return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ self.box == other.box and self.basin == other.basin and self.mxloption == other.mxloption @@ -74,7 +77,7 @@ class HeatContent(Diagnostic): DiagnosticIntOption('min_depth'), DiagnosticIntOption('max_depth')) options = cls.process_options(options, options_available) - box = Box(True) + box = Box() box.min_depth = options['min_depth'] box.max_depth = options['max_depth'] min_level = 0 @@ -159,9 +162,9 @@ class HeatContent(Diagnostic): box_save = self.box self.heatcsum = self.declare_chunk(ModelingRealms.ocean, 'heatcsum', self.startdate, self.member, self.chunk, - box=box_save, region=self.basin.fullname) + box=box_save, region=self.basin.name) self.heatcmean = self.declare_chunk(ModelingRealms.ocean, 'heatcvmean', self.startdate, self.member, self.chunk, - box=box_save, region=self.basin.fullname) + box=box_save, region=self.basin.name) def compute(self): """ diff --git a/earthdiagnostics/ocean/interpolate.py b/earthdiagnostics/ocean/interpolate.py index 262f87676c896957bd0320752ae5163ec0bb8e50..dab72b363f7499d4a7157471be8632a03d20417d 100644 --- a/earthdiagnostics/ocean/interpolate.py +++ b/earthdiagnostics/ocean/interpolate.py @@ -1,14 +1,14 @@ # coding=utf-8 +import os import shutil import threading -import os from bscearth.utils.log import Log + from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticDomainOption, DiagnosticBoolOption, \ DiagnosticVariableListOption - -from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile class Interpolate(Diagnostic): diff --git a/earthdiagnostics/ocean/interpolatecdo.py b/earthdiagnostics/ocean/interpolatecdo.py index 0bc7709b51a8e5b4162562b8be1b19778d273e1d..b8b816207853e3529837b2dfc319cbdf5685fca9 100644 --- a/earthdiagnostics/ocean/interpolatecdo.py +++ b/earthdiagnostics/ocean/interpolatecdo.py @@ -97,64 +97,86 @@ class InterpolateCDO(Diagnostic): job_list = list() weights = TempFile.get() method = options['method'].lower() - cls._compute_weights(diags, method, options, target_grid, weights) + if options['weights_from_mask']: + temp = cls.get_sample_grid_file() + cls.compute_weights(method, target_grid, temp, weights) + os.remove(temp) + weights_job = None + else: + startdate, member, chunk = diags.config.experiment.get_chunk_list()[0] + weights_job = ComputeWeights(diags.data_manager, startdate, member, chunk, options['domain'], + options['variables'][0],target_grid, options['original_grid'], weights, + options['method']) + for var in options['variables']: for startdate, member, chunk in diags.config.experiment.get_chunk_list(): - job_list.append(InterpolateCDO(diags.data_manager, startdate, member, chunk, - options['domain'], var, target_grid, - diags.config.experiment.model_version, options['mask_oceans'], - options['original_grid'], weights)) + job = InterpolateCDO(diags.data_manager, startdate, member, chunk, options['domain'], var, target_grid, + diags.config.experiment.model_version, options['mask_oceans'], + options['original_grid'], weights) + if weights_job is not None: + job.add_subjob(weights_job) + job_list.append(job) return job_list @classmethod - def _compute_weights(cls, diags, method, options, target_grid, weights): - if options['weights_from_mask']: - temp = cls.get_sample_grid_file() - else: - startdate, member, chunk = diags.config.experiment.get_chunk_list()[0] - temp = diags.data_manager.get_file(options['domain'], options['variable'], startdate, member, chunk, - grid=options['original_grid']) + def compute_weights(cls, method, target_grid, sample_file, weights): if method == InterpolateCDO.BILINEAR: - Utils.cdo.genbil(target_grid, input=temp, output=weights) + Utils.cdo.genbil(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.BICUBIC: - Utils.cdo.genbic(target_grid, input=temp, output=weights) + Utils.cdo.genbic(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.CONSERVATIVE: - Utils.cdo.genycon(target_grid, input=temp, output=weights) + Utils.cdo.genycon(target_grid, input=sample_file, output=weights) elif method == InterpolateCDO.CONSERVATIVE2: - Utils.cdo.gencon2(target_grid, input=temp, output=weights) - os.remove(temp) + Utils.cdo.gencon2(target_grid, input=sample_file, output=weights) @classmethod def get_sample_grid_file(cls): temp = TempFile.get() - Utils.nco.ncks(input='mask.nc', output=temp, options=('-O -v tmask,lat,lon,gphif,glamf',)) + + lat_name = 'lat' + handler = Utils.openCdf('mask.nc') + for lat_alias in ['lat', 'latitude']: + if lat_alias in handler.variables: + lat_name = lat_alias + break + + lon_name = None + for lon_alias in ['lon', 'longitude']: + if lon_alias in handler.variables: + lon_name = lon_alias + break + lon_bnds_name = '{0}_bnds'.format(lon_name) + lat_bnds_name = '{0}_bnds'.format(lat_name) + + Utils.nco.ncks(input='mask.nc', output=temp, + options=('-O -v tmask,{0},{1},gphif,glamf'.format(lat_name, lon_name),)) handler = Utils.openCdf(temp) - lon = handler.variables['lon'] + lon = handler.variables[lon_name] lon.units = "degrees_east" lon.long_name = "Longitude" lon.nav_model = "Default grid" lon.standard_name = "longitude" - lon.short_name = "lon" - lon.bounds = 'lon_bnds' + lon.short_name = lon_name + lon.bounds = lon_bnds_name - lat = handler.variables['lat'] + lat = handler.variables[lat_name] lat.units = "degrees_north" lat.long_name = "Latitude" lat.nav_model = "Default grid" lat.standard_name = "latitude" - lat.short_name = "lat" - lat.bounds = 'lat_bnds' + lat.short_name = lat_name + lat.bounds = lat_bnds_name handler.createDimension('bounds', 4) - lon_bnds = handler.createVariable('lon_bnds', lon.datatype, ('j', 'i', 'bounds')) + lon_bnds = handler.createVariable(lon_bnds_name, lon.datatype, ('j', 'i', 'bounds')) corner_lat = handler.variables['glamf'][0, ...] lon_bnds[:, :, 0] = corner_lat lon_bnds[:, :, 1] = np.roll(corner_lat, 1, 0) lon_bnds[:, :, 2] = np.roll(corner_lat, -1, 1) lon_bnds[:, :, 3] = np.roll(lon_bnds[:, :, 1], -1, 1) - lat_bnds = handler.createVariable('lat_bnds', lat.datatype, ('j', 'i', 'bounds')) + lat_bnds = handler.createVariable(lat_bnds_name, lat.datatype, ('j', 'i', 'bounds')) corner_lat = handler.variables['gphif'][0, ...] lat_bnds[:, :, 0] = corner_lat lat_bnds[:, :, 1] = np.roll(corner_lat, 1, 0) @@ -164,7 +186,7 @@ class InterpolateCDO(Diagnostic): lat_bnds[0, :, 3] = lat_bnds[1, 0, 3] - 1 tmask = handler.variables['tmask'] - tmask.coordinates = 'time lev lat lon' + tmask.coordinates = 'time lev {0} {1}'.format(lat_name, lon_name) handler.close() @@ -235,6 +257,38 @@ class InterpolateCDO(Diagnostic): self.regridded.set_local_file(temp) +class ComputeWeights(Diagnostic): + alias = 'computeinterpcdoweights' + "Diagnostic alias for the configuration file" + + @classmethod + def generate_jobs(cls, diags, options): + pass + + def __init__(self, data_manager, startdate, member, chunk, domain, variable, target_grid, + original_grid, weights_file, method): + Diagnostic.__init__(self, data_manager) + self.startdate = startdate + self.member = member + self.chunk = chunk + self.variable = variable + self.domain = domain + self.grid = target_grid + self.original_grid = original_grid + self.weights_file = weights_file + self.method = method + + def __str__(self): + return 'Computing weights for CDO interpolation: Method {0.method} Target grid: {0.grid}'.format(self) + + def compute(self): + InterpolateCDO.compute_weights(self.method, self.grid, self.sample_data.local_file, self.weights_file) + def request_data(self): + self.sample_data = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk, + grid=self.original_grid) + + def declare_data_generated(self): + pass diff --git a/earthdiagnostics/ocean/maxmoc.py b/earthdiagnostics/ocean/maxmoc.py index 2be7110e75edca01ff46cc3cbc877e5807990621..fb86ecb5c636a968871486038871a083a9decb71 100644 --- a/earthdiagnostics/ocean/maxmoc.py +++ b/earthdiagnostics/ocean/maxmoc.py @@ -2,11 +2,12 @@ import netCDF4 import numpy as np from bscearth.utils.log import Log -from earthdiagnostics.constants import Basins + from earthdiagnostics.box import Box +from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import Diagnostic, DiagnosticBasinOption, DiagnosticFloatOption -from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.variable_type import VariableType @@ -134,7 +135,10 @@ class MaxMoc(Diagnostic): basin_index = basin_index[0][0] lev = handler.variables['lev'][:] - lat = handler.variables['lat'][:] + if "lat" in handler.variables: + lat = handler.variables['lat'][:] + else: + lat = handler.variables['latitude'][:] if self.box.min_lat == self.box.max_lat: lat_inds = ((np.abs(lat - self.box.min_lat)).argmin(),) diff --git a/earthdiagnostics/ocean/mxl.py b/earthdiagnostics/ocean/mxl.py index fe531b76cd0ce4e30a27a7eee4bee4206368ffa8..b5e7c980355beccbf485606bbeaed5216c498375 100644 --- a/earthdiagnostics/ocean/mxl.py +++ b/earthdiagnostics/ocean/mxl.py @@ -3,8 +3,8 @@ import os from earthdiagnostics import cdftools from earthdiagnostics.diagnostic import Diagnostic -from earthdiagnostics.utils import Utils, TempFile from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile class Mxl(Diagnostic): @@ -71,8 +71,10 @@ class Mxl(Diagnostic): source = Utils.openCdf(temp) destiny = Utils.openCdf(temp2, 'w') Utils.copy_variable(source, destiny, 'somxl010', must_exist=True, add_dimensions=True) - Utils.copy_variable(source, destiny, 'lat') - Utils.copy_variable(source, destiny, 'lon') + Utils.copy_variable(source, destiny, 'lat', must_exist=False) + Utils.copy_variable(source, destiny, 'latitude', must_exist=False) + Utils.copy_variable(source, destiny, 'lon', must_exist=False) + Utils.copy_variable(source, destiny, 'longitude', must_exist=False) source.close() destiny.close() self.mlotst_file.set_local_file(temp2, rename_var='somxl010') diff --git a/earthdiagnostics/ocean/regionmean.py b/earthdiagnostics/ocean/regionmean.py index 1d6cebb8d7961a60f2ce172b5cfd0e5a5d8727b9..edc5e212884106d87483fe94adfce5545da0bc51 100644 --- a/earthdiagnostics/ocean/regionmean.py +++ b/earthdiagnostics/ocean/regionmean.py @@ -60,7 +60,7 @@ class RegionMean(Diagnostic): def __str__(self): return 'Region mean Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \ 'Grid point: {0.grid_point} Box: {0.box} Save 3D: {0.save3d} Save variance: {0.variance} ' \ - 'Original grid: {0.grid}'.format(self) + 'Original grid: {0.grid} Basin: {0.basin}'.format(self) @classmethod def generate_jobs(cls, diags, options): @@ -159,7 +159,14 @@ class RegionMean(Diagnostic): temp2 = TempFile.get() Utils.nco.ncks(input=mean_file, output=temp2, options=('-v {0},lat,lon{1}'.format(original_name, levels),)) - self.declared[final_name].set_local_file(temp2, rename_var=original_name) + handler = Utils.openCdf(temp2) + var_handler = handler.variables[original_name] + if hasattr(var_handler, 'valid_min'): + del var_handler.valid_min + if hasattr(var_handler, 'valid_max'): + del var_handler.valid_max + handler.close() + self.declared[final_name].set_local_file(temp2, diagnostic=self, rename_var=original_name, region=self.basin) def declare_var(self, var, threed, box_save): if threed: diff --git a/earthdiagnostics/ocean/regionsum.py b/earthdiagnostics/ocean/regionsum.py new file mode 100644 index 0000000000000000000000000000000000000000..52b49f91935b65e8485a6c86f58b0e0725f83f5f --- /dev/null +++ b/earthdiagnostics/ocean/regionsum.py @@ -0,0 +1,172 @@ +# coding=utf-8 +import os + +from earthdiagnostics import cdftools +from earthdiagnostics.box import Box +from earthdiagnostics.constants import Basins +from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, DiagnosticIntOption, DiagnosticDomainOption, \ + DiagnosticBoolOption, DiagnosticBasinOption, DiagnosticVariableOption +from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile + + +class RegionSum(Diagnostic): + """ + Computes the mean value of the field (3D, weighted). For 3D fields, + a horizontal mean for each level is also given. If a spatial window + is specified, the mean value is computed only in this window. + + :original author: Javier Vegas-Regidor + + :created: March 2017 + + :param data_manager: data management object + :type data_manager: DataManager + :param startdate: startdate + :type startdate: str + :param member: member number + :type member: int + :param chunk: chunk's number + :type chunk: int + :param variable: variable to average + :type variable: str + :param box: box used to restrict the vertical mean + :type box: Box + """ + + alias = 'regsum' + "Diagnostic alias for the configuration file" + + def __init__(self, data_manager, startdate, member, chunk, domain, variable, grid_point, box, save3d, basin, + variance, grid): + Diagnostic.__init__(self, data_manager) + self.startdate = startdate + self.member = member + self.chunk = chunk + self.domain = domain + self.variable = variable + self.grid_point = grid_point.upper() + self.box = box + self.save3d = save3d + self.basin = basin + self.variance = variance + self.grid = grid + self.declared = {} + + def __eq__(self, other): + return self.startdate == other.startdate and self.member == other.member and self.chunk == other.chunk and \ + self.box == other.box and self.variable == other.variable + + def __str__(self): + return 'Region sum Startdate: {0.startdate} Member: {0.member} Chunk: {0.chunk} Variable: {0.variable} ' \ + 'Grid point: {0.grid_point} Box: {0.box} Save 3D: {0.save3d}' \ + 'Original grid: {0.grid} Basin: {0.basin}'.format(self) + + @classmethod + def generate_jobs(cls, diags, options): + """ + Creates a job for each chunk to compute the diagnostic + + :param diags: Diagnostics manager class + :type diags: Diags + :param options: variable, minimum depth (level), maximum depth (level) + :type options: list[str] + :return: + """ + options_available = (DiagnosticDomainOption(), + DiagnosticVariableOption(diags.data_manager.config.var_manager), + DiagnosticOption('grid_point', 'T'), + DiagnosticBasinOption('basin', Basins().Global), + DiagnosticIntOption('min_depth', 0), + DiagnosticIntOption('max_depth', 0), + DiagnosticBoolOption('save3D', True), + DiagnosticOption('grid', '')) + options = cls.process_options(options, options_available) + + box = Box() + box.min_depth = options['min_depth'] + box.max_depth = options['max_depth'] + + job_list = list() + for startdate, member, chunk in diags.config.experiment.get_chunk_list(): + job_list.append(RegionSum(diags.data_manager, startdate, member, chunk, + options['domain'], options['variable'], options['grid_point'], box, + options['save3D'], options['basin'], options['variance'], options['grid'])) + return job_list + + def request_data(self): + self.variable_file = self.request_chunk(self.domain, self.variable, self.startdate, self.member, self.chunk, + grid=self.grid) + + def declare_data_generated(self): + if self.box.min_depth == 0: + # To cdftools, this means all levels + box_save = None + else: + box_save = self.box + + self.declare_var('sum', False, box_save) + self.declare_var('sum', True, box_save) + + def compute(self): + """ + Runs the diagnostic + """ + mean_file = TempFile.get() + + variable_file = self.variable_file.local_file + + handler = Utils.openCdf(variable_file) + self.save3d &= 'lev' in handler.dimensions + handler.close() + + cdfmean_options = [self.variable, self.grid_point, 0, 0, 0, 0, self.box.min_depth, self.box.max_depth] + if self.variance: + cdfmean_options += ['-var'] + if self.basin != Basins().Global: + cdfmean_options.append('-M') + cdfmean_options.append('mask_regions.3d.nc') + cdfmean_options.append(self.basin.name) + + cdftools.run('cdfsum', input=variable_file, output=mean_file, options=cdfmean_options) + Utils.rename_variables(mean_file, {'gdept': 'lev', 'gdepw': 'lev'}, must_exist=False, rename_dimension=True) + + self.send_var('mean', False, mean_file) + self.send_var('mean', True, mean_file) + + os.remove(mean_file) + + def send_var(self, var, threed, mean_file): + if threed: + if not self.save3d: + return False + original_name = '{0}_{1}'.format(var, self.variable) + final_name = '{1}3d{0}'.format(var, self.variable) + levels = ',lev' + else: + original_name = '{0}_3D{1}'.format(var, self.variable) + final_name = '{1}{0}'.format(var, self.variable) + levels = '' + + temp2 = TempFile.get() + Utils.nco.ncks(input=mean_file, output=temp2, options=('-v {0},lat,lon{1}'.format(original_name, levels),)) + handler = Utils.openCdf(temp2) + var_handler = handler.variables[original_name] + if hasattr(var_handler, 'valid_min'): + del var_handler.valid_min + if hasattr(var_handler, 'valid_max'): + del var_handler.valid_max + handler.close() + self.declared[final_name].set_local_file(temp2, diagnostic=self, rename_var=original_name, region=self.basin) + + def declare_var(self, var, threed, box_save): + if threed: + if not self.save3d: + return False + final_name = '{1}3d{0}'.format(var, self.variable) + else: + final_name = '{1}{0}'.format(var, self.variable) + + self.declared[final_name] = self.declare_chunk(ModelingRealms.ocean, final_name, self.startdate, self.member, + self.chunk, box=box_save, region=self.basin, grid=self.grid) + diff --git a/earthdiagnostics/ocean/siasiesiv.py b/earthdiagnostics/ocean/siasiesiv.py index d16295a458be9d6b8dec18a8a5a71720f07ba745..bd2c41c8f3c08d9c895edfd23f72b172be7340b1 100644 --- a/earthdiagnostics/ocean/siasiesiv.py +++ b/earthdiagnostics/ocean/siasiesiv.py @@ -1,17 +1,16 @@ # coding=utf-8 -import netCDF4 import os -from bscearth.utils.log import Log - -from earthdiagnostics.diagnostic import Diagnostic, DiagnosticBasinOption -from earthdiagnostics.utils import Utils, TempFile # noinspection PyUnresolvedReferences -import earthdiagnostics.cdftoolspython as cdftoolspython + +import netCDF4 import numpy as np +from bscearth.utils.log import Log -from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.constants import Basins +from earthdiagnostics.diagnostic import Diagnostic, DiagnosticBasinOption +from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.utils import Utils, TempFile class Siasiesiv(Diagnostic): @@ -35,7 +34,7 @@ class Siasiesiv(Diagnostic): e2t = None gphit = None - def __init__(self, data_manager, startdate, member, chunk, basin, mask): + def __init__(self, data_manager, startdate, member, chunk, basin, mask, var_manager): """ :param data_manager: data management object :type data_manager: DataManager @@ -55,6 +54,9 @@ class Siasiesiv(Diagnostic): self.chunk = chunk self.mask = mask self.generated = {} + self.var_manager = var_manager + self.sic_varname = self.var_manager.get_variable('sic').short_name + self.sit_varname = self.var_manager.get_variable('sit').short_name def __str__(self): return 'Siasiesiv Startdate: {0} Member: {1} Chunk: {2} Basin: {3}'.format(self.startdate, self.member, @@ -82,7 +84,8 @@ class Siasiesiv(Diagnostic): job_list = list() for startdate, member, chunk in diags.config.experiment.get_chunk_list(): - job_list.append(Siasiesiv(diags.data_manager, startdate, member, chunk, options['basin'], mask)) + job_list.append(Siasiesiv(diags.data_manager, startdate, member, chunk, options['basin'], mask, + diags.config.var_manager)) mesh_handler = Utils.openCdf('mesh_hgr.nc') Siasiesiv.e1t = np.asfortranarray(mesh_handler.variables['e1t'][0, :]) Siasiesiv.e2t = np.asfortranarray(mesh_handler.variables['e2t'][0, :]) @@ -92,8 +95,10 @@ class Siasiesiv(Diagnostic): return job_list def request_data(self): - self.sit = self.request_chunk(ModelingRealms.seaIce, 'sit', self.startdate, self.member, self.chunk) - self.sic = self.request_chunk(ModelingRealms.seaIce, 'sic', self.startdate, self.member, self.chunk) + self.sit = self.request_chunk(ModelingRealms.seaIce, self.sit_varname, + self.startdate, self.member, self.chunk) + self.sic = self.request_chunk(ModelingRealms.seaIce, self.sic_varname, + self.startdate, self.member, self.chunk) def declare_data_generated(self): self._declare_var('sivols') @@ -106,20 +111,21 @@ class Siasiesiv(Diagnostic): def _declare_var(self, var_name): self.generated[var_name] = self.declare_chunk(ModelingRealms.seaIce, var_name, self.startdate, self.member, - self.chunk, region=self.basin.fullname) + self.chunk, region=self.basin.name) def compute(self): """ Runs the diagnostic """ + import earthdiagnostics.cdftoolspython as cdftoolspython sit_handler = Utils.openCdf(self.sit.local_file) - sit = np.asfortranarray(sit_handler.variables['sit'][:]) + sit = np.asfortranarray(sit_handler.variables[self.sit_varname][:]) timesteps = sit_handler.dimensions['time'].size sit_handler.close() sic_handler = Utils.openCdf(self.sic.local_file) - Utils.convert_units(sic_handler.variables['sic'], '1.0') - sic = np.asfortranarray(sic_handler.variables['sic'][:]) + Utils.convert_units(sic_handler.variables[self.sic_varname], '1.0') + sic = np.asfortranarray(sic_handler.variables[self.sic_varname][:]) sic_handler.close() result = np.empty((8, timesteps)) diff --git a/earthdiagnostics/statistics/discretize.py b/earthdiagnostics/statistics/discretize.py index 81ed8232b578f0e4a682b0c3972c1a1fb50fc485..c997e5ec73f3697f320deed2ee853210ef66876b 100644 --- a/earthdiagnostics/statistics/discretize.py +++ b/earthdiagnostics/statistics/discretize.py @@ -1,11 +1,11 @@ # coding=utf-8 import math +import cf_units import iris import iris.coord_categorisation import iris.coords import iris.exceptions -import iris.unit import numpy as np import psutil import six @@ -199,7 +199,7 @@ class Discretize(Diagnostic): var_name='leadtime', units='months')) lead_date = add_months(date, leadtime - 1, self.data_manager.config.experiment.calendar) - leadtime_cube.add_aux_coord(iris.coords.AuxCoord(iris.unit.date2num(lead_date, + leadtime_cube.add_aux_coord(iris.coords.AuxCoord(cf_units.date2num(lead_date, unit='days since 1950-01-01', calendar="standard"), var_name='time', diff --git a/earthdiagnostics/utils.py b/earthdiagnostics/utils.py index 42304aa02ed840edb038165a8b7a4fc8757d0990..7c70b56e4d0e69a69d63a0a185e33d399db572be 100644 --- a/earthdiagnostics/utils.py +++ b/earthdiagnostics/utils.py @@ -150,12 +150,12 @@ class Utils(object): try: Utils._rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension) - except RuntimeError: + except RuntimeError as ex: + Log.debug('Renaming error: {0}', ex) error = True - handler.close() - if not Utils.check_netcdf_file(temp): + if not error and not Utils.check_netcdf_file(temp): error = True if error: @@ -169,9 +169,27 @@ class Utils(object): def check_netcdf_file(filepath): with suppress_stdout(): try: + handler = Utils.openCdf(filepath) + if 'time' in handler.variables: + if handler.variables['time'].dimensions != ('time', ): + handler.close() + return False + handler.close() + iris.FUTURE.netcdf_promote = True - iris.load(filepath) - except iris.exceptions.IrisError: + cubes = iris.load(filepath) + if len(cubes) == 0: + return False + except iris.exceptions.IrisError as ex: + Log.debug('netCDF checks failed: {0}', ex) + return False + except RuntimeError as ex: + # HDF error, usually + Log.debug('netCDF checks failed: {0}', ex) + return False + except Exception as ex: + # HDF error, usually + Log.debug('netCDF checks failed: {0}', ex) return False return True @@ -226,7 +244,7 @@ class Utils(object): handler.sync() @staticmethod - def copy_file(source, destiny, save_hash=False): + def copy_file(source, destiny, save_hash=False, use_stored_hash=True): """ Copies a file from source to destiny, creating dirs if necessary @@ -248,7 +266,7 @@ class Utils(object): raise ex hash_destiny = None Log.debug('Hashing original file... {0}', datetime.datetime.now()) - hash_original = Utils.get_file_hash(source, use_stored=True) + hash_original = Utils.get_file_hash(source, use_stored=use_stored_hash) retrials = 3 while hash_original != hash_destiny: @@ -259,7 +277,7 @@ class Utils(object): Log.debug('Hashing copy ... {0}', datetime.datetime.now()) hash_destiny = Utils.get_file_hash(destiny, save=save_hash) retrials -= 1 - Log.info('Finished {0}', datetime.datetime.now()) + Log.debug('Finished {0}', datetime.datetime.now()) @staticmethod def move_file(source, destiny, save_hash=False): @@ -372,8 +390,7 @@ class Utils(object): Log.log.log(log_level, line) output.append(line) if process.returncode != 0: - raise Utils.ExecutionError('Error executing {0}\n Return code: {1}'.format(' '.join(command), - process.returncode)) + raise Utils.ExecutionError('Error executing {0}\n Return code: {1}'.format(' '.join(command), process.returncode)) return output _cpu_count = None @@ -642,11 +659,15 @@ class Utils(object): os.chmod(path, st.st_mode | stat.S_IWGRP) @staticmethod - def convert_units(var_handler, new_units): + def convert_units(var_handler, new_units, calendar=None, old_calendar=None): if new_units == var_handler.units: return - new_unit = Units(new_units) - old_unit = Units(var_handler.units) + + if hasattr(var_handler, 'calendar'): + old_calendar = var_handler.calendar + + new_unit = Units(new_units, calendar=calendar) + old_unit = Units(var_handler.units, calendar=old_calendar) var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True) if 'valid_min' in var_handler.ncattrs(): var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit, diff --git a/earthdiagnostics/variable.py b/earthdiagnostics/variable.py index 9a623469694438ea5bf71afd49c703fd868cb448..588d18e24c9338f925fd481de9797f468daaef9c 100644 --- a/earthdiagnostics/variable.py +++ b/earthdiagnostics/variable.py @@ -10,7 +10,6 @@ from bscearth.utils.log import Log from earthdiagnostics.constants import Basins from earthdiagnostics.frequency import Frequency from earthdiagnostics.modelingrealm import ModelingRealms -from singleton import SingletonType class VariableJsonException(Exception): @@ -22,6 +21,7 @@ class VariableManager(object): self._cmor_tables_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_tables') self._aliases_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'variable_alias') self._dict_variables = {} + self.tables = {} def get_variable(self, original_name, silent=False): """ @@ -132,9 +132,9 @@ class VariableManager(object): continue if 'variable_entry' in data: Log.debug('Parsing file {0}'.format(file_name)) - table = CMORTable(data['Header']['table_id'][6:], - Frequency(data['Header']['frequency']), - data['Header']['table_date']) + table_id = data['Header']['table_id'][6:] + table = CMORTable(table_id, Frequency(data['Header']['frequency']), data['Header']['table_date']) + self.tables[table_id] = table self._load_json_variables(data['variable_entry'], table) def _load_json_variables(self, json_data, table): @@ -247,6 +247,7 @@ class VariableManager(object): continue table_frequency, table_date = table_data[sheet.title] table = CMORTable(sheet.title, table_frequency, table_date) + self.tables[sheet.title] = table for row in sheet.rows: if row[0].value == 'Priority' or not row[5].value: continue diff --git a/earthdiagnostics/work_manager.py b/earthdiagnostics/work_manager.py index ef3a62ea119458e629c15ecc752d412f837be9f0..33576f7c67a6eb48d9115bf095b1ef124f87ee40 100644 --- a/earthdiagnostics/work_manager.py +++ b/earthdiagnostics/work_manager.py @@ -1,8 +1,10 @@ # coding=utf-8 import datetime import operator +import sys import threading import time +import traceback from bscearth.utils.log import Log # noinspection PyCompatibility @@ -37,6 +39,8 @@ class WorkManager(object): try: for job in diag_class.generate_jobs(self, diag_options): list_jobs.append(job) + for subjob in job.subjobs: + list_jobs.append(subjob) continue except DiagnosticOptionError as ex: Log.error('Can not configure diagnostic {0}: {1}', diag_options[0], ex) @@ -47,8 +51,12 @@ class WorkManager(object): self.jobs = list_jobs def run(self): - time = datetime.datetime.now() - Log.info("Starting to compute at {0}", time) + if len(self.jobs) == 0: + Log.result('No diagnostics to run') + return True + + start_time = datetime.datetime.now() + Log.info("Starting to compute at {0}", start_time) self.threads = Utils.available_cpu_count() if 0 < self.config.max_cores < self.threads: self.threads = self.config.max_cores @@ -62,10 +70,14 @@ class WorkManager(object): job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) - for subjob in job.subjobs: - subjob.subscribe(self, self._job_status_changed) job.check_is_ready() + if self.config.skip_diags_done: + for job in self.jobs: + if job.can_skip_run(): + Log.info('Diagnostic {0} already done. Skipping !', job) + job.status = DiagnosticStatus.COMPLETED + for file_object in self.data_manager.requested_files.values(): file_object.subscribe(self, self._file_object_status_changed) if file_object.download_required(): @@ -75,7 +87,7 @@ class WorkManager(object): self.lock = threading.Lock() self.lock.acquire() - self.check_completion() + # self.check_completion() self.lock.acquire() self.downloader.shutdown() @@ -85,7 +97,7 @@ class WorkManager(object): TempFile.clean() finish_time = datetime.datetime.now() Log.result("Diagnostics finished at {0}", finish_time) - Log.result("Elapsed time: {0}\n", finish_time - time) + Log.result("Elapsed time: {0}\n", finish_time - start_time) self.print_errors() self.print_stats() return not self.had_errors @@ -93,18 +105,40 @@ class WorkManager(object): def _job_status_changed(self, job): if job.status == DiagnosticStatus.READY: self.executor.submit(self._run_job, job) + for request in job._requests: + if request.only_suscriber(self): + del self.data_manager.requested_files[request.remote_file] + request.unsubscribe(self) + request.clean_local() self.check_completion() def _file_object_status_changed(self, file_object): + Log.debug('Checking file {0}. Local status {0.local_status} Storage status{0.storage_status}', file_object) if file_object.download_required(): self.downloader.submit(file_object) return if file_object.upload_required(): + file_object.storage_status = StorageStatus.UPLOADING self.uploader.submit(file_object.upload) return + if file_object.local_status != LocalStatus.COMPUTING and \ + file_object.storage_status != StorageStatus.UPLOADING and \ + file_object.only_suscriber(self): + del self.data_manager.requested_files[file_object.remote_file] + file_object.unsubscribe(self) + file_object.clean_local() self.check_completion() def check_completion(self): + counter = 0 + for job in self.jobs: + if job.status == DiagnosticStatus.READY: + counter += 1 + if counter > 20: + break + + self.downloader.on_hold = counter > 20 + for job in self.jobs: if job.status in (DiagnosticStatus.READY, DiagnosticStatus.RUNNING): return False @@ -115,14 +149,17 @@ class WorkManager(object): for request in self.data_manager.requested_files.values(): if request.storage_status == StorageStatus.UPLOADING: - return + return False if request.local_status == LocalStatus.DOWNLOADING: - return + return False if request.upload_required(): - return + return False if request.download_required(): - return - self.lock.release() + return False + try: + self.lock.release() + except Exception: + pass return True def print_stats(self): @@ -161,8 +198,9 @@ class WorkManager(object): job.compute() except Exception as ex: job.consumed_time = datetime.datetime.now() - time - job.message = str(ex) - Log.error('Job {0} failed: {1}', job, ex) + exc_type, exc_value, exc_traceback = sys.exc_info() + job.message = '{0}\n{1}'.format(ex, ''.join(traceback.format_tb(exc_traceback))) + Log.error('Job {0} failed: {1}', job, job.message ) job.status = DiagnosticStatus.FAILED return False @@ -228,8 +266,8 @@ class Downloader(object): def __init__(self): self._downloads = [] self._lock = threading.Lock() - self._wait = threading.Semaphore() self.stop = False + self.on_hold = False def start(self): self._thread = threading.Thread(target=self.downloader) @@ -260,6 +298,13 @@ class Downloader(object): if suscribers: return -suscribers + if datafile1.size is None: + if datafile2.size is None: + return 0 + else: + return -1 + elif datafile2.size is None: + return 1 size = datafile1.size - datafile2.size if size: return -size @@ -267,7 +312,7 @@ class Downloader(object): while True: with self._lock: - if len(self._downloads) == 0: + if len(self._downloads) == 0 or self.on_hold: if self.stop: return time.sleep(0.01) @@ -277,7 +322,7 @@ class Downloader(object): self._downloads.remove(datafile) datafile.download() except Exception as ex: - Log.critical('Unhandled error at downloader: {0}', ex) + Log.critical('Unhandled error at downloader: {0}\n{1}', ex, traceback.print_exc()) def shutdown(self): self.stop = True diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000000000000000000000000000000000000..4fa44d1373b1f8f66db6d573a40c57125b2909b1 --- /dev/null +++ b/environment.yml @@ -0,0 +1,31 @@ +--- + +name: earthdiagnostics +channels: +- conda-forge + +dependencies: +- iris +- netcdf4 +- numpy +- cdo +- nco +- python-cdo +- coverage +- pygrib +- psutil +- six +- cf_units +- openpyxl +- mock +- cmake +- cfunits +- coverage + +- pip: + - bscearth.utils + - futures + - nco + - exrex + - xxhash + - codacy-coverage diff --git a/setup.py b/setup.py index 411833062675cf4e5a4c926c2a544d0d5a599b32..a9ff71b6b038d66ffbab52b0947462bf16b146fd 100644 --- a/setup.py +++ b/setup.py @@ -5,6 +5,7 @@ Installation script for EarthDiagnostics package """ from os import path + from setuptools import find_packages from setuptools import setup @@ -26,7 +27,7 @@ setup( keywords=['climate', 'weather', 'diagnostic'], setup_requires=['pyproj'], install_requires=['numpy', 'netCDF4', 'bscearth.utils', 'cdo>=1.3.4', 'nco>=0.0.3', 'iris>=1.12.0', 'coverage', - 'pygrib', 'openpyxl', 'mock', 'futures', 'cf_units', 'cfunits', 'xxhash', 'six', 'psutil', + 'pygrib', 'openpyxl', 'mock', 'futures', 'cf_units', 'xxhash', 'six', 'psutil', 'exrex'], packages=find_packages(), include_package_data=True, diff --git a/test/run_test.py b/test/run_test.py index 59eec7e98a18694833f660a50431d2ea696de630..765057a5785dcf0d6f89b009fd12cc4b663584b8 100644 --- a/test/run_test.py +++ b/test/run_test.py @@ -3,16 +3,15 @@ Script to run the tests for EarthDiagnostics and generate the code coverage report """ + +import os +work_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +os.chdir(work_path) +print(work_path) import coverage import unittest -import os -work_path = os.path.abspath('.') -source_path = os.path.join(work_path, '..', 'earthdiagnostics', '*') -print(source_path) -cov = coverage.Coverage(include=source_path) -cov.set_option("run:branch", True) -cov.set_option("html:title", 'Coverage report for EarthDiagnostics') +cov = coverage.Coverage() cov.start() suite = unittest.TestLoader().discover('.') unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/test/unit/general/test_simplify_dimensions.py b/test/unit/general/test_simplify_dimensions.py index 429ad6f2344ed5d5dd4f67d38c89ed53b1ffd807..48ac6f7ccb4bc765fe429252f177acb13c09cfed 100644 --- a/test/unit/general/test_simplify_dimensions.py +++ b/test/unit/general/test_simplify_dimensions.py @@ -19,6 +19,7 @@ class TestSimplifyDimensions(TestCase): self.diags.config.experiment.get_chunk_list.return_value = (('20010101', 0, 0), ('20010101', 0, 1)) self.diags.config.experiment.startdates = ['20010101', ] self.diags.config.frequency = Frequencies.monthly + self.diags.config.data_convention = 'convention' self.box = Box() self.box.min_depth = 0 @@ -32,24 +33,24 @@ class TestSimplifyDimensions(TestCase): jobs = SimplifyDimensions.generate_jobs(self.diags, ['diagnostic', 'atmos', 'var']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], SimplifyDimensions(self.data_manager, '20010101', 0, 0, - ModelingRealms.atmos, 'var', '')) + ModelingRealms.atmos, 'var', '', 'convention')) self.assertEqual(jobs[1], SimplifyDimensions(self.data_manager, '20010101', 0, 1, - ModelingRealms.atmos, 'var', '')) + ModelingRealms.atmos, 'var', '', 'convention')) jobs = SimplifyDimensions.generate_jobs(self.diags, ['diagnostic', 'atmos', 'var', 'grid']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], SimplifyDimensions(self.data_manager, '20010101', 0, 0, - ModelingRealms.atmos, 'var', 'grid')) + ModelingRealms.atmos, 'var', 'grid', 'convention')) self.assertEqual(jobs[1], SimplifyDimensions(self.data_manager, '20010101', 0, 1, - ModelingRealms.atmos, 'var', 'grid')) + ModelingRealms.atmos, 'var', 'grid', 'convention')) with self.assertRaises(DiagnosticOptionError): SimplifyDimensions.generate_jobs(self.diags, ['diagnostic']) - with self.assertRaises(DiagnosticOptionError): - SimplifyDimensions.generate_jobs(self.diags, ['diagnostic', 'atmos', 'var', 'grid', 'extra']) + with self.assertRaises(DiagnosticOptionError): + SimplifyDimensions.generate_jobs(self.diags, ['diagnostic', 'atmos', 'var', 'grid', 'extra']) def test_str(self): - mixed = SimplifyDimensions(self.data_manager, '20010101', 0, 0, ModelingRealms.atmos, 'var', 'grid') + mixed = SimplifyDimensions(self.data_manager, '20010101', 0, 0, ModelingRealms.atmos, 'var', 'grid', 'convention') self.assertEquals(str(mixed), 'Simplify dimension Startdate: 20010101 Member: 0 Chunk: 0 Variable: atmos:var ' 'Grid: grid') diff --git a/test/unit/general/test_verticalmeanmetersiris.py b/test/unit/general/test_verticalmeanmetersiris.py index cd2876fe9f2a62b91ee28616a5dee862b8f79dbf..179b54fa20b70a802796fa26055418aa9f276411 100644 --- a/test/unit/general/test_verticalmeanmetersiris.py +++ b/test/unit/general/test_verticalmeanmetersiris.py @@ -1,7 +1,7 @@ # coding=utf-8 from unittest import TestCase -from earthdiagnostics.diagnostic import DiagnosticVariableOption, DiagnosticOptionError +from earthdiagnostics.diagnostic import DiagnosticVariableListOption, DiagnosticOptionError from earthdiagnostics.box import Box from earthdiagnostics.general.verticalmeanmetersiris import VerticalMeanMetersIris from earthdiagnostics.frequency import Frequencies @@ -27,14 +27,14 @@ class TestVerticalMeanMetersIris(TestCase): def fake_parse(self, value): if not value: raise DiagnosticOptionError - return value + return [value] - @patch.object(DiagnosticVariableOption, 'parse', fake_parse) + @patch.object(DiagnosticVariableListOption, 'parse', fake_parse) def test_generate_jobs(self): box = Box(True) - jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'var']) + jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'ocean', 'var']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], VerticalMeanMetersIris(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', box)) @@ -45,14 +45,14 @@ class TestVerticalMeanMetersIris(TestCase): box.min_depth = 0 box.max_depth = 100 - jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'var', '0', '100']) + jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'ocean', 'var', '0', '100']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], VerticalMeanMetersIris(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', box)) self.assertEqual(jobs[1], VerticalMeanMetersIris(self.data_manager, '20010101', 0, 1, ModelingRealms.ocean, 'var', box)) - jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'var', '0', '100', 'seaIce']) + jobs = VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'seaice', 'var', '0', '100']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], VerticalMeanMetersIris(self.data_manager, '20010101', 0, 0, ModelingRealms.seaIce, 'var', box)) @@ -63,7 +63,8 @@ class TestVerticalMeanMetersIris(TestCase): VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic']) with self.assertRaises(DiagnosticOptionError): - VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'var', '0', '100', 'seaIce', 'extra']) + VerticalMeanMetersIris.generate_jobs(self.diags, ['diagnostic', 'ocean', 'var', '0', '100', 'seaIce', + 'extra']) def test_str(self): box = Box(True) diff --git a/test/unit/ocean/test_heatcontent.py b/test/unit/ocean/test_heatcontent.py index c58d1217a9338e0dd422a21a3d3d3fb6926ef124..f842bc13c2dcedcdc0dc586216da4aabc7b9529f 100644 --- a/test/unit/ocean/test_heatcontent.py +++ b/test/unit/ocean/test_heatcontent.py @@ -1,10 +1,11 @@ # coding=utf-8 from unittest import TestCase +from mock import Mock, patch + from earthdiagnostics.box import Box from earthdiagnostics.constants import Basins from earthdiagnostics.ocean.heatcontent import HeatContent -from mock import Mock, patch # noinspection PyUnusedLocal @@ -21,7 +22,7 @@ class TestHeatContent(TestCase): self.diags.model_version = 'model_version' self.diags.config.experiment.get_chunk_list.return_value = (('20010101', 0, 0), ('20010101', 0, 1)) - self.box = Box(True) + self.box = Box() self.box.min_depth = 0 self.box.max_depth = 100 @@ -43,5 +44,5 @@ class TestHeatContent(TestCase): def test_str(self): diag = HeatContent(self.data_manager, '20010101', 0, 0, Basins().Global, -1, self.box, 1, 20) - self.assertEquals(str(diag), 'Heat content Startdate: 20010101 Member: 0 Chunk: 0 Mixed layer: -1 Box: 0-100m ' + self.assertEquals(str(diag), 'Heat content Startdate: 20010101 Member: 0 Chunk: 0 Mixed layer: -1 Box: 0-100 ' 'Basin: Global') diff --git a/test/unit/ocean/test_interpolatecdo.py b/test/unit/ocean/test_interpolatecdo.py index 04b085521407b34572a6edc95f238906de21b8b0..ba238f28edfa7feabcaf8f887c6e881b2f0d86a7 100644 --- a/test/unit/ocean/test_interpolatecdo.py +++ b/test/unit/ocean/test_interpolatecdo.py @@ -24,9 +24,11 @@ class TestInterpolate(TestCase): raise DiagnosticOptionError return value.split('-') - @patch('earthdiagnostics.ocean.interpolatecdo.InterpolateCDO._compute_weights') + @patch('earthdiagnostics.ocean.interpolatecdo.InterpolateCDO.compute_weights') + @patch('earthdiagnostics.ocean.interpolatecdo.InterpolateCDO.get_sample_grid_file') @patch.object(DiagnosticVariableListOption, 'parse', fake_parse) - def test_generate_jobs(self, mock_weights): + @patch('os.remove') + def test_generate_jobs(self, mock_weights, mock_grid_file, mock_remove): mock_weights.return_value = None jobs = InterpolateCDO.generate_jobs(self.diags, ['interpcdo', 'ocean', 'var']) diff --git a/test/unit/ocean/test_maxmoc.py b/test/unit/ocean/test_maxmoc.py index f6f12b3468a050c5ad118ff86664b664ceae29c9..a1141d01c047a4f00fa2a16f27a67b5327e70bf5 100644 --- a/test/unit/ocean/test_maxmoc.py +++ b/test/unit/ocean/test_maxmoc.py @@ -1,10 +1,11 @@ # coding=utf-8 from unittest import TestCase +from mock import Mock, patch + from earthdiagnostics.box import Box from earthdiagnostics.constants import Basins, Basin from earthdiagnostics.ocean.maxmoc import MaxMoc -from mock import Mock, patch class TestMaxMoc(TestCase): @@ -63,4 +64,4 @@ class TestMaxMoc(TestCase): def test_str(self): self.assertEquals(str(self.maxmoc), 'Max moc Startdate: 20000101 Member: 1 Year: 2000 ' - 'Box: 0.0N0.0m Basin: Global') + 'Box: 0.0N0m Basin: Global') diff --git a/test/unit/ocean/test_region_mean.py b/test/unit/ocean/test_region_mean.py index e527feda328b3103615680a4387d155ef29d3f63..efeeb7d70b8da45b3baaceeff3e79a9bb0193305 100644 --- a/test/unit/ocean/test_region_mean.py +++ b/test/unit/ocean/test_region_mean.py @@ -1,11 +1,13 @@ # coding=utf-8 from unittest import TestCase -from earthdiagnostics.ocean.regionmean import RegionMean -from earthdiagnostics.modelingrealm import ModelingRealms -from earthdiagnostics.constants import Basins + +from mock import Mock, patch + from earthdiagnostics.box import Box +from earthdiagnostics.constants import Basins from earthdiagnostics.diagnostic import DiagnosticOptionError, DiagnosticVariableOption -from mock import Mock, patch +from earthdiagnostics.modelingrealm import ModelingRealms +from earthdiagnostics.ocean.regionmean import RegionMean class TestRegionMean(TestCase): @@ -49,8 +51,8 @@ class TestRegionMean(TestCase): box, True, Basins().Global, False, '')) box = Box() - box.min_depth = 1 - box.max_depth = 10 + box.min_depth = 1.0 + box.max_depth = 10.0 jobs = RegionMean.generate_jobs(self.diags, ['diagnostic', 'ocean', 'var', 'U', 'global', '1', '10']) self.assertEqual(len(jobs), 2) @@ -97,4 +99,4 @@ class TestRegionMean(TestCase): diag = RegionMean(self.data_manager, '20010101', 0, 0, ModelingRealms.ocean, 'var', 'U', box, False, Basins().Global, True, 'grid') self.assertEquals(str(diag), 'Region mean Startdate: 20010101 Member: 0 Chunk: 0 Variable: var Grid point: U ' - 'Box: 1-10 Save 3D: False Save variance: True Original grid: grid') + 'Box: 1-10 Save 3D: False Save variance: True Original grid: grid Basin: Global') diff --git a/test/unit/ocean/test_siasiesiv.py b/test/unit/ocean/test_siasiesiv.py index 00f7c68ca53e22c4ca2f10d90b1d4d89e4176236..2457b72d2308371134564cb3be5211ac6d5f711a 100644 --- a/test/unit/ocean/test_siasiesiv.py +++ b/test/unit/ocean/test_siasiesiv.py @@ -15,7 +15,8 @@ class TestSiasiesiv(TestCase): self.diags.config.experiment.get_chunk_list.return_value = (('20010101', 0, 0), ('20010101', 0, 1)) self.mask = Mock() - self.psi = Siasiesiv(self.data_manager, '20000101', 1, 1, Basins().Global, self.mask) + self.var_manager = Mock() + self.psi = Siasiesiv(self.data_manager, '20000101', 1, 1, Basins().Global, self.mask, self.var_manager) def test_str(self): self.assertEquals(str(self.psi), 'Siasiesiv Startdate: 20000101 Member: 1 Chunk: 1 Basin: Global') diff --git a/test/unit/statistics/test_daysoverpercentile.py b/test/unit/statistics/test_daysoverpercentile.py index cc225cd64a589cb1f453e2582777841cc6b2c2ff..26e0506a0c23654d26db584d175be5fe67afd4ab 100644 --- a/test/unit/statistics/test_daysoverpercentile.py +++ b/test/unit/statistics/test_daysoverpercentile.py @@ -13,14 +13,15 @@ class TestDaysOverPercentile(TestCase): self.data_manager = Mock() self.diags = Mock() self.diags.config.experiment.get_chunk_list.return_value = (('20011101', 0, 0), ('20011101', 0, 1)) + self.diags.config.experiment.startdates = ('20001101', '20011101') def test_generate_jobs(self): jobs = DaysOverPercentile.generate_jobs(self.diags, ['monpercent', 'ocean', 'var', '2000', '2001', '11']) self.assertEqual(len(jobs), 2) self.assertEqual(jobs[0], DaysOverPercentile(self.data_manager, ModelingRealms.ocean, 'var', 2000, 2001, - 2000, 11)) + '20001101', 11)) self.assertEqual(jobs[1], DaysOverPercentile(self.data_manager, ModelingRealms.ocean, 'var', 2000, 2001, - 2001, 11)) + '20011101', 11)) with self.assertRaises(Exception): DaysOverPercentile.generate_jobs(self.diags, ['monpercent', 'ocean', 'var', '2000', '2001']) diff --git a/test/unit/test_config.py b/test/unit/test_config.py index 419be475362582c7df34169f4a640748ad298e9e..a1c8d25b8dfd0e6769e60f5c624cf75983efadfb 100644 --- a/test/unit/test_config.py +++ b/test/unit/test_config.py @@ -273,6 +273,7 @@ class TestReportConfig(TestCase): self.assertEquals(config.path, 'new_path') def test_priority(self): + self.mock_parser.add_value('REPORT', 'MAXIMUM_PRIORITY', 3) config = ReportConfig(self.mock_parser) self.assertEquals(config.maximum_priority, 3) @@ -294,12 +295,6 @@ class TestExperimentConfig(TestCase): self.assertEquals(config.atmos_timestep, 6) self.assertEquals(config.ocean_timestep, 6) - def test_cmor_version_required(self): - self.mock_parser.add_value('CMOR', 'VERSION', '20001101') - self.mock_parser.add_value('EXPERIMENT', 'DATA_CONVENTION', 'Primavera') - config = ExperimentConfig(self.mock_parser) - self.assertEquals(config.path, 'new_path') - def test_startdates(self): self.mock_parser.add_value('EXPERIMENT', 'STARTDATES', '20001101 20011101') config = ExperimentConfig(self.mock_parser) diff --git a/test/unit/test_diagnostic.py b/test/unit/test_diagnostic.py index ae9921fa920affc0833b87dffbdf5d5899c6b806..e3cdcd83687e17c9aeed62be527c9f0d877d34af 100644 --- a/test/unit/test_diagnostic.py +++ b/test/unit/test_diagnostic.py @@ -271,7 +271,7 @@ class TestDiagnosticVariableListOption(TestCase): var_manager_mock = Mock() var_manager_mock.get_variable.side_effect = (self.get_var_mock('var1'), self.get_var_mock('var2')) diag = DiagnosticVariableListOption(var_manager_mock, 'variables') - self.assertEqual(['var1', 'var2'], diag.parse('var1-var2')) + self.assertEqual(['var1', 'var2'], diag.parse('var1:var2')) def test_parse_one(self): var_manager_mock = Mock()