# coding=utf-8 """Cmorization classes""" import glob import os import shutil import uuid import traceback import pygrib from datetime import datetime import six from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months from bscearth.utils.log import Log from cdo import CDOException import iris import iris.coord_categorisation import iris.analysis import iris.util import iris.exceptions import cf_units from earthdiagnostics.datafile import NetCDFFile from earthdiagnostics.frequency import Frequency, Frequencies from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import TempFile, Utils class Cmorizer(object): """ Class to manage CMORization Parameters ---------- data_manager: DataManager startdate: str member: int """ NON_DATA_VARIABLES = ('lon', 'lat', 'longitude', 'latitude', 'plev', 'time', 'time_bnds', 'leadtime', 'lev', 'lev_2', 'icethi', 'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds', 'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds', 'deptht_bnds', 'depthu_bnds', 'depthv_bnds', 'depthw_bnds', 'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T', 'depth', 'depth_2', 'depth_3', 'depth_4', 'depth_bnds', 'depth_2_bnds', 'depth_3_bnds', 'depth_4_bnds', 'mlev', 'hyai', 'hybi', 'hyam', 'hybm') def __init__(self, data_manager, startdate, member): self.data_manager = data_manager self.startdate = startdate self.member = member self.config = data_manager.config self.experiment = self.config.experiment self.cmor = self.config.cmor self.member_str = self.experiment.get_member_str(member) self.original_files_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', self.startdate, self.member_str, 'outputs') self.atmos_timestep = None self.cmor_scratch = str(os.path.join(self.config.scratch_dir, 'CMOR', self.startdate, self.member_str)) if self.config.data_convention in ('primavera', 'cmip6'): self.lon_name = 'longitude' self.lat_name = 'latitude' else: self.lon_name = 'lon' self.lat_name = 'lat' self.alt_coord_names = {'time_counter': 'time', 'time_counter_bnds': 'time_bnds', 'time_counter_bounds': 'time_bnds', 'tbnds': 'bnds', 'nav_lat': self.lat_name, 'nav_lon': self.lon_name, 'x': 'i', 'y': 'j'} @property def path_icm(self): """Path to the ICM file""" return os.path.join(self.config.scratch_dir, 'ICM') def cmorize_ocean(self): """Cmorize ocean files from MMO files""" if not self.cmor.ocean: Log.info('Skipping ocean cmorization due to configuration') return Log.info('\nCMORizing ocean\n') self._cmorize_ocean_files('MMO', 'PPO', 'diags') def _cmorize_ocean_files(self, *args): tar_files = () for prefix in args: tar_folder = os.path.join(self.original_files_path, '{0}*'.format(prefix)) tar_files = glob.glob(tar_folder) tar_files.sort() if len(tar_files) > 0: break if not len(tar_files): Log.error('No {1} files found in {0}'.format(self.original_files_path, args)) count = 1 for tarfile in tar_files: if not self._cmorization_required(self._get_chunk(os.path.basename(tarfile)), (ModelingRealms.ocean, ModelingRealms.seaIce, ModelingRealms.ocnBgchem)): Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files))) count += 1 continue Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files))) try: self._unpack_tar_file(tarfile) self._cmorize_nc_files() Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files))) except Exception as ex: Log.error('Could not CMORize oceanic file {0}: {1}', count, ex) count += 1 if count > self.experiment.num_chunks: return def _filter_files(self, file_list): if not self.cmor.filter_files: return file_list filtered = list() filters = self.cmor.filter_files.split(' ') for file_path in file_list: filename = os.path.basename(file_path) if any(f in filename for f in filters): filtered.append(file_path) else: os.remove(file_path) if len(filtered) == 0: Log.warning('Filters {0} do not match any of the files', filters) return filtered def _cmorize_nc_files(self): nc_files = glob.glob(os.path.join(self.cmor_scratch, '*.nc')) for filename in self._filter_files(nc_files): self._cmorize_nc_file(filename) self._clean_cmor_scratch() def _correct_fluxes(self): fluxes_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name for cmor_var in ('prc', "prsn", "rss", "rls", "rsscs", "rsds", "rlds", "hfss", 'hfls')] change_sign_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name for cmor_var in ("hfss", 'hfls')] total_seconds = (self.experiment.atmos_timestep * 3600) for filename in glob.glob(os.path.join(self.cmor_scratch, '*.nc')): handler = Utils.open_cdf(filename) for varname in handler.variables.keys(): cmor_var = self.data_manager.variable_list.get_variable(varname, True) if cmor_var is None or cmor_var.short_name not in fluxes_vars: continue if cmor_var.short_name in change_sign_vars: sign = -1 else: sign = 1 var_handler = handler.variables[varname] var_handler[:] = sign * var_handler[:] / total_seconds var_handler.units = '{0} {1}'.format(var_handler.units, 's-1') handler.close() def _unpack_tar_file(self, tarfile): self._clean_cmor_scratch() os.makedirs(self.cmor_scratch) Utils.untar((tarfile,), self.cmor_scratch) if os.path.isdir(os.path.join(self.cmor_scratch, 'backup')): for filepath in glob.glob(os.path.join(self.cmor_scratch, 'backup', '*')): Log.debug('Moving file {0}', filepath) shutil.move(filepath, filepath.replace('/backup/', '/')) zip_files = glob.glob(os.path.join(self.cmor_scratch, '*.gz')) if zip_files: for zip_file in self._filter_files(zip_files): try: Utils.unzip(zip_file) except Utils.UnzipException as ex: Log.error('File {0} could not be unzipped: {1}', tarfile, ex) def _clean_cmor_scratch(self): if os.path.exists(self.cmor_scratch): shutil.rmtree(self.cmor_scratch) def _merge_mma_files(self, tarfile): temp = TempFile.get() for grid in ['SH', 'GG']: files = glob.glob(os.path.join(self.cmor_scratch, 'MMA_*_{}_*.nc'.format(grid))) if not files: continue merged = TempFile.get() if grid == 'SH': for filename in files: Utils.cdo.sp2gpl(options='-O', input=filename, output=temp) shutil.move(temp, filename) Utils.cdo.mergetime(input=files, output=merged) for filename in files: os.remove(filename) tar_startdate = os.path.basename(tarfile[0:-4]).split('_')[4].split('-') filename = 'MMA{0}_1m_{1[0]}_{1[1]}.nc'.format(grid, tar_startdate) shutil.move(merged, os.path.join(self.cmor_scratch, filename)) def cmorize_atmos(self): """Cmorize atmospheric data, from grib or MMA files""" if not self.cmor.atmosphere: Log.info('Skipping atmosphere cmorization due to configuration') return Log.info('\nCMORizing atmosphere\n') if self.cmor.use_grib and self._gribfiles_available(): self._cmorize_grib_files() else: self._cmorize_mma_files() def _cmorize_mma_files(self): tar_files = glob.glob(os.path.join(self.original_files_path, 'MMA*')) tar_files.sort() count = 1 if len(tar_files) == 0: Log.error('MMA files not found in {0}'.format(self.original_files_path)) for tarfile in tar_files: if not self._cmorization_required(self._get_chunk(os.path.basename(tarfile)), (ModelingRealms.atmos,)): Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files))) count += 1 continue Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files))) try: self._unpack_tar_file(tarfile) self._merge_mma_files(tarfile) self._correct_fluxes() self._cmorize_nc_files() Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files))) except Exception as ex: Log.error('Could not cmorize atmospheric file {0}: {1}\n {2}', count, ex, traceback.format_exc()) count += 1 def _cmorize_grib_files(self): chunk = 1 chunk_start = parse_date(self.startdate) while os.path.exists(self._get_original_grib_path(chunk_start, 'GG')) or \ os.path.exists(self._get_original_grib_path(chunk_start, 'SH')): if self._cmorization_required(chunk, (ModelingRealms.atmos,)): chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = previous_day(chunk_end, self.experiment.calendar) Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end)) try: for grid in ('SH', 'GG'): Log.info('Processing {0} variables', grid) first_grib = self._get_original_grib_path(chunk_start, grid) if not os.path.exists(first_grib): continue var_list = Utils.cdo.showvar(input=first_grib)[0] codes = {int(var.replace('var', '')) for var in var_list.split()} if not codes.intersection(self.config.cmor.get_requested_codes()): Log.info('No requested variables found in {0}. Skipping...', grid) continue self._cmorize_grib_file(chunk_end, chunk_start, grid) except Exception as ex: Log.error('Can not cmorize GRIB file for chunk {0}-{1}: {2}', date2str(chunk_start), date2str(chunk_end), ex) chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk += 1 def _cmorize_grib_file(self, chunk_end, chunk_start, grid): for month in range(0, self.experiment.chunk_size): current_date = add_months(chunk_start, month, self.experiment.calendar) original_gribfile = self._get_original_grib_path(current_date, grid) Log.info('Processing month {1}', grid, date2str(current_date)) gribfile = self._get_scratch_grib_path(current_date, grid) if not os.path.isfile(gribfile): Log.info('Copying file...', grid, date2str(current_date)) Utils.copy_file(original_gribfile, gribfile) self._obtain_atmos_timestep(gribfile) full_file = self._get_monthly_grib(current_date, gribfile, grid) if not self._unpack_grib(full_file, gribfile, grid, current_date.month): os.remove(gribfile) return next_gribfile = self._get_original_grib_path(add_months(current_date, 1, self.experiment.calendar), grid) if not os.path.exists(next_gribfile): os.remove(gribfile) self._ungrib_vars(gribfile, current_date.month) for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)): os.remove(splited_file) Log.result('Month {0}, {1} variables finished', date2str(current_date), grid) self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.monthly) self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.daily) self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, '{0}hr'.format(self.atmos_timestep)) def _unpack_grib(self, full_file, gribfile, grid, month): Log.info('Unpacking... ') # remap on regular Gauss grid codes = self.cmor.get_requested_codes() if 228 in codes: codes.update((142, 143)) codes_str = ','.join([str(code) for code in codes]) try: if grid == 'SH': Utils.cdo.splitparam(input='-sp2gpl -selcode,{0} {1} '.format(codes_str, full_file), output=gribfile + '_', options='-f nc4 -t ecmwf') else: Utils.cdo.splitparam(input='-selcode,{0} {1}'.format(codes_str, full_file), output=gribfile + '_', options='-R -f nc4 -t ecmwf') # total precipitation (remove negative values) if 228 in codes: Utils.cdo.setcode(228, input='-chname,LSP,TP -setmisstoc,0 -setvrange,0,Inf ' '-add {0}_142.128.nc {0}_143.128.nc'.format(gribfile), output='{0}_228.128.nc'.format(gribfile), options='-f nc4') return True except CDOException: Log.info('No requested codes found in {0} file'.format(grid)) return False finally: Utils.remove_file(self.path_icm) def _get_monthly_grib(self, current_date, gribfile, grid): prev_gribfile = self._get_scratch_grib_path(add_months(current_date, -1, self.experiment.calendar), grid) if os.path.exists(prev_gribfile): self._merge_grib_files(current_date, prev_gribfile, gribfile) full_file = self.path_icm else: full_file = gribfile return full_file def _get_scratch_grib_path(self, current_date, grid): return os.path.join(self.config.scratch_dir, self._get_grib_filename(grid, current_date)) def _obtain_atmos_timestep(self, gribfile): if self.atmos_timestep is None: self.atmos_timestep = self._get_atmos_timestep(gribfile) def _get_original_grib_path(self, current_date, grid): return os.path.join(self.original_files_path, self._get_grib_filename(grid, current_date)) def _get_grib_filename(self, grid, month): return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2]) def _get_atmos_timestep(self, gribfile): Log.info('Getting timestep...') grib_handler = pygrib.open(gribfile) dates = set() for mes in grib_handler: dates.add(mes.validDate) dates = list(dates) dates.sort() atmos_timestep = dates[1] - dates[0] atmos_timestep = int(atmos_timestep.total_seconds() / 3600) self.experiment.atmos_timestep = atmos_timestep grib_handler.close() return atmos_timestep def _cmorize_nc_file(self, filename): Log.info('Processing file {0}', filename) if not self._contains_requested_variables(filename): os.remove(filename) return # Utils.convert2netcdf4(filename) frequency = self._get_nc_file_frequency(filename) Utils.rename_variables(filename, self.alt_coord_names, False) handler = Utils.open_cdf(filename) Cmorizer._remove_valid_limits(handler) self._add_common_attributes(handler, frequency) self._update_time_variables(handler) variables = handler.variables.keys() handler.close() Log.info('Splitting file {0}', filename) for variable in variables: if variable in Cmorizer.NON_DATA_VARIABLES: continue try: self.extract_variable(filename, frequency, variable) except Exception as ex: Log.error('Variable {0} can not be cmorized: {1}', variable, ex) Log.result('File {0} cmorized!', filename) os.remove(filename) @staticmethod def _remove_valid_limits(handler): for variable in handler.variables.keys(): var = handler.variables[variable] if 'valid_min' in var.ncattrs(): del var.valid_min if 'valid_max' in var.ncattrs(): del var.valid_max handler.sync() def _get_nc_file_frequency(self, filename): file_parts = os.path.basename(filename).split('_') if self.experiment.expid in [file_parts[1], file_parts[2]]: frequency = Frequency('m') elif self.experiment.expid == file_parts[0]: try: parse_date(file_parts[1]) frequency = Frequency('m') except ValueError: frequency = Frequency(file_parts[1]) else: frequency = Frequency(file_parts[1]) return frequency def _contains_requested_variables(self, filename): variables = Utils.get_file_variables(filename) return self.cmor.any_required(variables) def extract_variable(self, file_path, frequency, variable): """ Extract a variable from a file and creates the CMOR file Parameters ---------- file_path:str frequency: Frequency variable: str Raises ------ CMORException If the filename does not match any of the recognized patterns """ alias, var_cmor = self.config.var_manager.get_variable_and_alias(variable) if var_cmor is None: return if not self.cmor.cmorize(var_cmor): return temp = TempFile.get() handler = Utils.open_cdf(file_path) coords = [self.lon_name, self.lat_name, 'time'] if 'leadtime' in handler.variables.keys(): coords.append('leadtime') if var_cmor.domain == ModelingRealms.ocean: lev_dimensions = {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev', 'depth': 'lev'} elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]: lev_dimensions = {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth', 'depth_4': 'sdepth'} elif var_cmor.domain == ModelingRealms.atmos: lev_dimensions = {'depth': 'plev'} else: lev_dimensions = {} for lev_dim in lev_dimensions.keys(): if lev_dim in handler.variables[variable].dimensions: coords.append(lev_dim) handler.variables[variable].coordinates = ' '.join(set(coords)) handler.close() cube = iris.load_cube(file_path, iris.Constraint(cube_func=lambda c: c.var_name == variable)) for lev_original, lev_target in six.iteritems(lev_dimensions): try: cube.coord(lev_original).var_name = lev_target except iris.exceptions.CoordinateNotFoundError: pass iris.save(cube, temp, zlib=True) if alias.basin is None: region = None else: region = alias.basin.name date_str = self._get_date_str(file_path) if date_str is None: Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern', var_cmor.short_name) raise CMORException('Variable {0}:{1} can not be cmorized. Original filename does not match a recognized ' 'pattern'.format(var_cmor.domain, var_cmor.short_name)) netcdf_file = NetCDFFile() netcdf_file.data_manager = self.data_manager netcdf_file.local_file = temp netcdf_file.remote_file = self.data_manager.get_file_path(self.startdate, self.member, var_cmor.domain, var_cmor.short_name, var_cmor, None, frequency, grid=alias.grid, year=None, date_str=date_str) netcdf_file.data_convention = self.config.data_convention netcdf_file.region = region netcdf_file.frequency = frequency netcdf_file.domain = var_cmor.domain netcdf_file.var = var_cmor.short_name netcdf_file.final_name = var_cmor.short_name netcdf_file.prepare_to_upload(rename_var=variable) netcdf_file.add_cmorization_history() netcdf_file.upload() if region: region_str = ' (Region {})'.format(region) else: region_str = '' Log.info('Variable {0.domain}:{0.short_name} processed{1}', var_cmor, region_str) def _get_date_str(self, file_path): file_parts = os.path.basename(file_path).split('_') valid_starts = (self.experiment.expid, 'MMA', 'MMASH', 'MMAGG', 'MMO') if file_parts[0] in valid_starts or file_parts[0].startswith('ORCA'): # Model output if file_parts[-1].endswith('.tar'): file_parts = file_parts[-1][0:-4].split('-') return '{0}-{1}'.format(file_parts[0][0:6], file_parts[1][0:6]) else: return '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]) elif file_parts[1] == self.experiment.expid: # Files generated by the old version of the diagnostics return '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6]) else: return None def _get_chunk(self, file_path): chunk_start = parse_date(self._get_date_str(file_path).split('-')[0]) current_date = parse_date(self.startdate) chunk = 1 while current_date < chunk_start: current_date = chunk_end_date(current_date, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk += 1 if current_date != chunk_start: raise Exception('File {0} start date is not a valid chunk start date'.format(file_path)) return chunk def _merge_grib_files(self, current_month, prev_gribfile, gribfile): Log.info('Merging data from different files...') temp = TempFile.get(suffix='.grb') Utils.cdo.selmon(current_month.month, input=prev_gribfile, output=temp) Utils.cdo.mergetime(input=[temp, gribfile], output=self.path_icm) os.remove(prev_gribfile) os.remove(temp) def _ungrib_vars(self, gribfile, month): for var_code in self.cmor.get_requested_codes(): file_path = '{0}_{1}.128.nc'.format(gribfile, var_code) if not os.path.exists(file_path): continue cube = iris.load_cube(file_path) cube = self._fix_time_coord(cube, var_code) cube = cube.extract(iris.Constraint(month_number=month)) cube = self._change_units(cube, var_code) for frequency in (Frequencies.monthly, Frequencies.daily, Frequency('{0}hr'.format(self.atmos_timestep))): if var_code not in self.cmor.get_variables(frequency): continue time_cube = self._get_time_average(cube, frequency, var_code) levels = self.config.cmor.get_levels(frequency, var_code) if levels: time_cube = time_cube.extract(level=levels) if cube.var_name.endswith('_2'): time_cube.var_name = cube.var_name[:-2] out_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency) time_cube.remove_coord('month_number') time_cube.remove_coord('day_of_month') iris.save(time_cube, out_file, zlib=True) def _fix_time_coord(self, cube, var_code): time = cube.coord('time') target_units = 'days since 1950-01-01 00:00:00' time.convert_units(cf_units.Unit(target_units, calendar=time.units.calendar)) time.units = target_units if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228): time.points = time.points - (self.experiment.atmos_timestep / 24.0) iris.coord_categorisation.add_day_of_month(cube, 'time') iris.coord_categorisation.add_month_number(cube, 'time') return cube @staticmethod def _get_time_average(cube, frequency, var_code): if frequency == Frequencies.monthly: if var_code == 201: cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX) elif var_code == 202: cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN) cube = cube.aggregated_by(['month_number'], iris.analysis.MEAN) elif frequency == Frequencies.daily: if var_code == 201: cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MAX) elif var_code == 202: cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MIN) else: cube = cube.aggregated_by(['month_number', 'day_of_month'], iris.analysis.MEAN) return cube def _change_units(self, cube, var_code): var_name = cube.var_name if var_code == 129: # geopotential cube = cube / 9.81 cube.units = 'm' elif var_code in (146, 147, 169, 175, 176, 177, 179, 212): # radiation cube = cube / (self.experiment.atmos_timestep * 3600) cube.units = 'W m-2' elif var_code in (180, 181): # momentum flux cube = cube / (self.experiment.atmos_timestep * 3600) cube.units = 'N m-2' elif var_code in (144, 182, 205, 228): # precipitation/evaporation/runoff cube = cube * 1000 / (self.experiment.atmos_timestep * 3600) cube.units = 'kg m-2 s-1' cube.var_name = var_name return cube def _merge_and_cmorize_atmos(self, chunk_start, chunk_end, grid, frequency): merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid) files = glob.glob(os.path.join(self.config.scratch_dir, '{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency))) def _load_cube(cube, field, filename): if 'history' in cube.attributes: del cube.attributes['history'] for first_file in files: var_files = [] current_month = chunk_start while current_month < chunk_end: var_files.append(first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]), '+{0}.grb'.format(date2str(current_month)[:-2]))) current_month = add_months(current_month, 1, self.experiment.calendar) var_cubes = iris.load(var_files, callback=_load_cube) iris.util.unify_time_units(var_cubes) var_cube = var_cubes.concatenate_cube() iris.save(var_cube, merged_file, zlib=True) for var_file in var_files: os.remove(var_file) self._cmorize_nc_file(merged_file) def _update_time_variables(self, handler): time_var = handler.variables['time'] if hasattr(time_var, 'calendar'): calendar = time_var.calendar else: calendar = 'standard' if "time_bnds" in handler.variables: time_var.bounds = "time_bnds" handler.variables['time_bnds'].units = time_var.units Utils.convert_units(handler.variables['time_bnds'], 'days since 1850-01-01 00:00:00', calendar, calendar) Utils.convert_units(time_var, 'days since 1850-1-1 00:00:00', calendar) self._set_leadtime_var(handler) def _set_leadtime_var(self, handler): if 'leadtime' in handler.variables: var = handler.variables['leadtime'] else: var = handler.createVariable('leadtime', float, 'time') var.units = "days" var.long_name = "Time elapsed since the start of the forecast" var.standard_name = "forecast_period" leadtime = Utils.get_datetime_from_netcdf(handler) startdate = parse_date(self.startdate) leadtime = [datetime(time.year, time.month, time.day, time.hour, time.minute, time.second) - startdate for time in leadtime] for lt, lead in enumerate(leadtime): var[lt] = lead.days def _add_common_attributes(self, handler, frequency): cmor = self.config.cmor experiment = self.config.experiment handler.associated_experiment = cmor.associated_experiment handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')) handler.contact = 'Pierre-Antoine Bretonniere, pierre-antoine.bretonniere@bsc.es , ' \ 'Javier Vegas-Regidor, javier.vegas@bsc.es ' handler.Conventions = 'CF-1.6' handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)') handler.experiment_id = experiment.experiment_name startdate = parse_date(self.startdate) handler.forecast_reference_time = '{0.year}-{0.month:02}-{0.day:02}' \ '(T{0.hour:02}:{0.minute:02}:{0.second:02}Z)'.format(startdate) handler.frequency = frequency.frequency handler.institute_id = experiment.institute handler.institution = experiment.institute handler.initialization_method = cmor.initialization_method handler.initialization_description = cmor.initialization_description handler.physics_version = cmor.physics_version handler.physics_description = cmor.physics_description handler.model_id = experiment.model handler.associated_model = cmor.associated_model handler.project_id = self.config.data_convention.upper() handler.realization = str(self.member + 1) handler.source = cmor.source handler.startdate = 'S{0}'.format(self.startdate) handler.tracking_id = str(uuid.uuid1()) handler.title = "{0} model output prepared for {2} {1}".format(experiment.model, experiment.experiment_name, self.config.data_convention.upper()) def _gribfiles_available(self): grb_path = os.path.join(self.original_files_path, '*.grb') gribfiles = glob.glob(grb_path) return len(gribfiles) > 0 def _cmorization_required(self, chunk, domains): if not self.config.cmor.chunk_cmorization_requested(chunk): return False if self.config.cmor.force: return True for domain in domains: if self.data_manager.is_cmorized(self.startdate, self.member, chunk, domain): return False return True class CMORException(Exception): """Exception to be launched when an error is encountered during cmorization""" pass