diff --git a/diags.conf b/diags.conf index 1b3cae917fed1291ea99893d84efe08996a2001f..0c17c47d198c03bedc86f45cca3386b6cc4c4e22 100644 --- a/diags.conf +++ b/diags.conf @@ -4,14 +4,16 @@ DATA_ADAPTOR = THREDDS # Path to the folder where you want to create the temporary files SCRATCH_DIR = /scratch/Earth/$USER # Root path for the cmorized data to use -DATA_DIR = /esnas/exp/:/esarchive/exp/ +DATA_DIR = /esnas:/esarchive +# Specify if your data is from an experiment (exp), observation (obs) or reconstructions (recon) +DATA_TYPE = exp # Path to NEMO's mask and grid files needed for CDFTools CON_FILES = /esnas/autosubmit/con_files/ # Diagnostics to run, space separated. You must provide for each one the name and the parameters (comma separated) or # an alias defined in the ALIAS section (see more below). If you are using the diagnpostics just to CMORize, leave it # empty -DIAGS = monpercent,atmos,sfcWind,90 monpercent,atmos,sfcWind,10 +DIAGS = climpercent,atmos,sfcWind,1 # DIAGS = OHC # Frequency of the data you want to use by default. Some diagnostics do not use this value: i.e. monmean always stores # its results at monthly frequency (obvious) and has a parameter to specify input's frequency. @@ -59,11 +61,11 @@ ATMOS_MONTHLY_VARS = 167, 201, 202, 165, 166, 151, 144, 228, 205, 182, 164, 146, # SOURCE = 'EC-Earthv2.3.0, ocean: Nemo3.1, ifs31r1, lim2 [THREDDS] -SERVER_URL = http://earth.bsc.es/thredds +SERVER_URL = https://earth.bsc.es/thredds [EXPERIMENT] # Experiments parameters as defined in CMOR standard -INSTITUTE = meteofrance +INSTITUTE = ecmwf MODEL = system4_m1 # Model version: Available versions MODEL_VERSION =Ec2.3_O1L46 @@ -81,7 +83,7 @@ OCEAN_TIMESTEP = 6 # CHUNK_SIZE is the size of each data file, given in months # CHUNKS is the number of chunks. You can specify less chunks than present on the experiment EXPID = resilience -STARTDATES = 19911101 +STARTDATES = 19810101 MEMBERS = 0 MEMBER_DIGITS = 1 CHUNK_SIZE = 7 diff --git a/earthdiagnostics/config.py b/earthdiagnostics/config.py index 152fbce49cf46b47aa927026ceaa8f80fc9530b9..ab206c12ab8d8953939ff066950c3d3ccf4c0d31 100644 --- a/earthdiagnostics/config.py +++ b/earthdiagnostics/config.py @@ -29,13 +29,17 @@ class Config(object): "Scratch folder path" self.data_dir = Utils.expand_path(parser.get_option('DIAGNOSTICS', 'DATA_DIR')) "Root data folder path" + self.data_type = Utils.expand_path(parser.get_option('DIAGNOSTICS', 'DATA_TYPE', 'exp')).lower() + "Data type (experiment, observation or reconstruction)" + if self.data_type not in ('exp', 'obs', 'recon'): + raise Exception('Data type must be exp, obs or recon') self.con_files = Utils.expand_path(parser.get_option('DIAGNOSTICS', 'CON_FILES')) "Mask and meshes folder path" self._diags = parser.get_option('DIAGNOSTICS', 'DIAGS') self.frequency = parser.get_option('DIAGNOSTICS', 'FREQUENCY').lower() + "Default data frequency to be used by the diagnostics" if self.frequency == 'month': self.frequency = 'mon' - "Default data frequency to be used by the diagnostics" self.cdftools_path = Utils.expand_path(parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH')) "Path to CDFTOOLS executables" self.max_cores = parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000) diff --git a/earthdiagnostics/statistics/climatologicalpercentile.py b/earthdiagnostics/statistics/climatologicalpercentile.py index 1e52049cb47c2a7e897ddeb6caa4299e6e68b2eb..5df474194dbcfc818ce629d0930a25fe6f5b5bfc 100644 --- a/earthdiagnostics/statistics/climatologicalpercentile.py +++ b/earthdiagnostics/statistics/climatologicalpercentile.py @@ -22,7 +22,7 @@ class ClimatologicalPercentile(Diagnostic): alias = 'climpercent' "Diagnostic alias for the configuration file" - def __init__(self, data_manager, domain, variable, leadtimes, experiment_config): + def __init__(self, data_manager, domain, variable, leadtimes, num_bins, experiment_config): Diagnostic.__init__(self, data_manager) self.variable = variable self.domain = domain @@ -32,7 +32,7 @@ class ClimatologicalPercentile(Diagnostic): self.realizations = None self.lat_len = None self.lon_len = None - self.num_bins = 2000 + self.num_bins = num_bins self._bins = None self.percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9]) self.cmor_var = Variable.get_variable(variable, silent=True) @@ -67,14 +67,18 @@ class ClimatologicalPercentile(Diagnostic): if num_options < 3: raise Exception('You must specify the variable (and its domain) and the leadtimes you want to compute ' 'the percentiles on') - if num_options > 3: - raise Exception('You must specify three parameters for the climatological percentiles') + if num_options > 4: + raise Exception('You must specify between three and 4 parameters for the climatological percentiles') domain = Domain(options[1]) variable = options[2] leadtimes = [int(i) for i in options[3].split('-')] + if num_options > 3: + num_bins = int(options[4]) + else: + num_bins = 2000 job_list = list() - job_list.append(ClimatologicalPercentile(diags.data_manager, domain, variable, leadtimes, + job_list.append(ClimatologicalPercentile(diags.data_manager, domain, variable, leadtimes, num_bins, diags.config.experiment)) return job_list @@ -98,16 +102,16 @@ class ClimatologicalPercentile(Diagnostic): percentile_var = handler.createVariable('percentile', float, ('percentile',)) percentile_var[:] = self.percentiles - handler.createDimension('lat', self.lat_len) + handler.createDimension('lat', self.lat.size) lat_var = handler.createVariable('lat', float, ('lat',)) lat_var[:] = self.lat - handler.createDimension('lon', self.lon_len) + handler.createDimension('lon', self.lon.size) lon_var = handler.createVariable('lon', float, ('lon',)) lon_var[:] = self.lon - p75_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon')) - p75_var[...] = percentile_values + percentile_var = handler.createVariable('percent', float, ('percentile', 'lat', 'lon')) + percentile_var[...] = percentile_values handler.close() @@ -117,14 +121,14 @@ class ClimatologicalPercentile(Diagnostic): def _calculate_percentiles(self, distribution): Log.debug('Calculating percentiles') - def calculate_percentiles(point_distribution): + def calculate(point_distribution): cs = np.cumsum(point_distribution) total = cs[-1] percentile_values = self.percentiles * total index = np.searchsorted(cs, percentile_values) return [(self._bins[i + 1] + self._bins[i]) / 2 for i in index] - distribution = np.apply_along_axis(calculate_percentiles, 0, distribution) + distribution = np.apply_along_axis(calculate, 0, distribution) return distribution def _get_distribution(self, member_files): @@ -189,18 +193,16 @@ class ClimatologicalPercentile(Diagnostic): return histogram var = handler.variables[self.variable] - return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...]) + if 'realization' in var.dimensions or 'ensemble' in var.dimensions: + return np.apply_along_axis(calculate_histogram, 0, var[:, realization, ...]) + else: + return np.apply_along_axis(calculate_histogram, 0, var[:]) def _get_var_size(self, handler): if self.lat_len is not None: return - self.lat = handler.variables['latitude'][:] self.lon = handler.variables['longitude'][:] - self.lat = handler.dimensions['latitude'].size - self.lon = handler.dimensions['longitude'].size - - diff --git a/earthdiagnostics/threddsmanager.py b/earthdiagnostics/threddsmanager.py index cb60f22be46055b93100ec97e4b679fc31313768..ae8acb004b6ecbad6c41e67e0b00d3e3a4629d59 100644 --- a/earthdiagnostics/threddsmanager.py +++ b/earthdiagnostics/threddsmanager.py @@ -1,10 +1,10 @@ # coding=utf-8 import os -from autosubmit.date.chunk_date_lib import parse_date, add_months +from autosubmit.date.chunk_date_lib import parse_date, add_months, chunk_start_date, chunk_end_date, date2str from earthdiagnostics.datamanager import DataManager, NetCDFFile from earthdiagnostics.utils import TempFile, Utils -import urllib +from datetime import datetime from earthdiagnostics.variable import Variable, VarType @@ -19,7 +19,7 @@ class THREDDSManager(DataManager): data_folders = self.config.data_dir.split(':') self.config.data_dir = None for data_folder in data_folders: - if os.path.isdir(os.path.join(data_folder, self.experiment.institute.lower(), + if os.path.isdir(os.path.join(data_folder, self.config.data_type, self.experiment.institute.lower(), self.experiment.model.lower())): self.config.data_dir = data_folder break @@ -27,16 +27,26 @@ class THREDDSManager(DataManager): if not self.config.data_dir: raise Exception('Can not find model data') + if self.config.data_type in ('obs', 'recon') and self.experiment.chunk_size !=1 : + raise Exception('For obs and recon data chunk_size must be always 1') + def get_leadtimes(self, domain, variable, startdate, member, leadtimes, frequency=None, vartype=VarType.MEAN): - if not frequency: - frequency = self.config.frequency - aggregation_path = self.get_var_url(variable, startdate, frequency, None, False, vartype) - temp = TempFile.get() + + aggregation_path = self.get_var_url(variable, startdate, frequency, None, vartype) startdate = parse_date(startdate) + start_chunk = chunk_start_date(startdate, self.experiment.num_chunks, self.experiment.chunk_size, + 'month', 'standard') + end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard') + + thredds_subset = THREDDSSubset(aggregation_path, variable, startdate, end_chunk).get_url() selected_months = ','.join([str(add_months(startdate, i, 'standard').month) for i in leadtimes]) - select_months = '-selmonth,{0} {1}'.format(selected_months, aggregation_path) - selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes]) - Utils.cdo.selyear(selected_years, input=select_months, output=temp) + temp = TempFile.get() + if self.config.data_type == 'exp': + select_months = '-selmonth,{0} {1}'.format(selected_months, thredds_subset) + selected_years = ','.join([str(add_months(startdate, i, 'standard').year) for i in leadtimes]) + Utils.cdo.selyear(selected_years, input=select_months, output=temp) + else: + Utils.cdo.selmonth(selected_months, input=thredds_subset, output=temp) return temp def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, @@ -63,14 +73,14 @@ class THREDDSManager(DataManager): :return: path to the copy created on the scratch folder :rtype: str """ - if not frequency: - frequency = self.config.frequency - aggregation_path = self.get_var_url(var, startdate, frequency, box, True, vartype) - temp = TempFile.get() - urllib.urlretrieve(aggregation_path, temp) - if not Utils.check_netcdf_file(temp): - raise THREDDSError('Can not retrieve {0} from server'.format(aggregation_path)) - return temp + aggregation_path = self.get_var_url(var, startdate, frequency, box, vartype) + + start_chunk = chunk_start_date(parse_date(startdate), chunk, self.experiment.chunk_size, 'month', 'standard') + end_chunk = chunk_end_date(start_chunk, self.experiment.chunk_size, 'month', 'standard') + + thredds_subset = THREDDSSubset(aggregation_path, var, start_chunk, end_chunk) + return thredds_subset.download() + def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None, date_str=None, move_old=False, @@ -160,23 +170,26 @@ class THREDDSManager(DataManager): var = self._get_final_var_name(box, var) folder_path = self._get_folder_path(frequency, domain, var, grid, vartype) - if startdate: - file_name = '{0}_{1}.nc'.format(var, startdate) - else: - file_name = '{0}.nc'.format(var) + file_name = self._get_file_name(startdate, var) filepath = os.path.join(folder_path, file_name) return filepath def _get_folder_path(self, frequency, domain, variable, grid, vartype): - folder_path = os.path.join(self.config.data_dir, + + if self.config.data_type == 'exp': + var_folder = self.get_varfolder(domain, variable, grid) + else: + var_folder = variable + + folder_path = os.path.join(self.config.data_dir, self.config.data_type, self.experiment.institute.lower(), self.experiment.model.lower(), self.frequency_folder_name(frequency, vartype), - self.get_varfolder(domain, variable, grid)) + var_folder) return folder_path - def get_year(self, domain, var, startdate, member, year, grid=None, box=None): + def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VarType.MEAN): """ Ge a file containing all the data for one year for one variable :param domain: variable's domain @@ -195,16 +208,30 @@ class THREDDSManager(DataManager): :type box: Box :return: """ + aggregation_path = self.get_var_url(var, startdate, None, box, vartype) + thredds_subset = THREDDSSubset(aggregation_path, var, datetime(year, 1, 1), datetime(year+1, 1, 1)) + return thredds_subset.download() - def get_var_url(self, var, startdate, frequency, box, fileserver, vartype): + + def get_var_url(self, var, startdate, frequency, box, vartype): + if not frequency: + frequency = self.config.frequency var = self._get_final_var_name(box, var) - if fileserver: - protocol = 'fileServer' + full_path = os.path.join(self.server_url, 'dodsC', self.config.data_type, self.experiment.institute, + self.experiment.model, self.frequency_folder_name(frequency, vartype)) + if self.config.data_type == 'exp': + full_path = os.path.join(full_path, var, self._get_file_name(startdate, var)) else: - protocol = 'dodsC' - return os.path.join(self.server_url, protocol, 'exp', self.experiment.institute, - self.experiment.model, self.frequency_folder_name(frequency, vartype), - var, '{0}_{1}.nc'.format(var, startdate)) + full_path = os.path.join(full_path, self._get_file_name(None, var)) + return full_path + + def _get_file_name(self, startdate, var): + if startdate: + if self.config.data_type != 'exp': + startdate = startdate[0:6] + return '{0}_{1}.nc'.format(var, startdate) + else: + return '{0}.nc'.format(var) def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VarType.MEAN): @@ -241,3 +268,77 @@ class THREDDSManager(DataManager): class THREDDSError(Exception): pass + +class THREDDSSubset: + def __init__(self, thredds_path, var, start_time, end_time): + self.thredds_path = thredds_path + self.var = var + self.dimension_indexes = {} + self.handler = None + self.start_time = start_time + self.end_time = end_time + + def get_url(self): + self.handler = Utils.openCdf(self.thredds_path) + self._read_metadata() + self.handler.close() + + self._get_time_indexes() + + return self._get_subset_url() + + def download(self): + url = self.get_url() + return self._download_url(url) + + def _read_metadata(self): + self.var_dimensions = self.handler.variables[self.var].dimensions + for dimension in self.var_dimensions: + if dimension == 'time': + continue + self.dimension_indexes[dimension] = (0, self.handler.dimensions[dimension].size - 1) + + if 'time' in self.var_dimensions: + self.times = Utils.get_datetime_from_netcdf(self.handler) + + def _get_time_indexes(self): + if 'time' not in self.var_dimensions: + return + + time_start = 0 + while time_start < self.times.size and self.times[time_start] < self.start_time: + time_start += 1 + if time_start == self.times.size: + raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time)) + time_end = time_start + if self.times[time_end] >= self.end_time: + raise Exception('Timesteps not available for interval {0}-{1}'.format(self.start_time, self.end_time)) + while time_end < self.times.size - 1 and self.times[time_end + 1] < self.end_time: + time_end += 1 + self.dimension_indexes['time'] = (time_start, time_end) + + def _download_url(self, url): + temp = TempFile.get() + Utils.execute_shell_command(['nccopy', url, temp]) + if not Utils.check_netcdf_file(temp): + raise THREDDSError('Can not retrieve {0} from server'.format(url)) + return temp + + def _get_subset_url(self): + var_slice = self.var + dimensions_slice = '' + + for dimension in self.var_dimensions: + slice_index = self._get_slice_index(self.dimension_indexes[dimension]) + var_slice += slice_index + if dimension == 'ensemble': + dimension = 'realization' + dimensions_slice += '{0}{1},'.format(dimension, slice_index) + + return '{0}?{1}{2}'.format(self.thredds_path, dimensions_slice, var_slice) + + def _get_slice_index(self, index_tuple): + return '[{0[0]}:1:{0[1]}]'.format(index_tuple) + + +