import glob import shutil import threading import os import numpy as np from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day from earthdiagnostics.utils import Utils, TempFile class DataManager(object): def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks, calendar='standard'): """ :param institution: :param model: :param expid: :param datafolder: :param frequency: :param chunk_size: :param experiment_name: :param num_chunks: :param calendar: """ self.institution = institution self.model = model self.expid = expid self.data_dir = datafolder self.frequency = frequency self.chunk_size = chunk_size self.experiment_name = experiment_name self.add_startdate = True self.add_name = True self.num_chunks = num_chunks self.calendar = calendar # noinspection PyPep8Naming def prepare_CMOR_files(self, startdates, members): """ Prepares the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. :param startdates: list of startdates that will be used by the diagnostics :type startdates: list[str] :param members: lists of members that will be used by the diagnostics :type members: list[int] :return: """ # Check if cmorized and convert if not if not os.path.exists(os.path.join(self.data_dir, self.expid)): raise Exception('The experiment {0} is not CMORized. ' 'Please, CMORize it and launch again.'.format(self.expid)) for startdate in startdates: for member in members: member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) filepaths = glob.glob(os.path.join(member_path, '*.gz')) if len(filepaths) == 0: continue threads = list() numthreads = Utils.available_cpu_count() for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment_name != self.model: bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.model), '_{0}_{1}_'.format(self.model, self.experiment_name)) good = good.replace('/{0}/{0}'.format(self.model), '/{0}/{1}'.format(self.model, self.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate), '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) @staticmethod def _unzip(files): for filepath in files: Log.debug('Unzipping {0}', filepath) Utils.execute_shell_command('gunzip {0}'.format(filepath)) @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) os.remove(filepath) def get_files(self, startdate, member, chunk, domain, variables, grid=None): """ Returns a list of filenames for different variables :param startdate: startdate to retrieve :type startdate: str :param member: member to retrieve :type member: int :param chunk: chunk to retrieve :type chunk: int :param domain: variable's CMOR domain :type domain:str :param variables: variables list :type variables: list[str] | tuple[str] :param grid: specifies if the variable must be in a interpolated grid :type grid: str :return: """ file_names = list() if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') var_file = list() for var in variables: if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) var_file.append(os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month)))) file_names.append(var_file) return file_names def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): """ Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() if not frequency: frequency = self.frequency start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month))) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None): """ Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge with already existing ones as needed :param year: if frequency is yearly, this parameter is used to give the corresponding year :type year: int :param rename_var: if exists, the given variable will be renamed to the one given by var :type rename_var: str :param filetosend: path to the file to send to the CMOR repository :type filetosend: str :param region: specifies the region represented by the file. If it is defined, the data will be appended to the CMOR repository as a new region in the file or will overwrite if region was already present :type region: str :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param chunk: file's chunk :type chunk: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :param frequency: file's frequency (only needed if it is different from the default) :type frequency: str :return: path to the copy created on the scratch folder :rtype: str """ Utils.convert2netcdf4(filetosend) if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if rename_var: Utils.rename_variable(filetosend, rename_var, var) if not frequency: frequency = self.frequency start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year is not None: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) else: raise ValueError('Chunk and year can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}.nc'.format(var, domain_abreviattion, frequency, self.model, self.experiment_name, startdate, member_plus, time_bound)) if region: if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region') handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region') Utils.move_file(filetosend, filepath) def get_year(self, domain, var, startdate, member, year, grid=None, box=None): """ Gets all the data corresponfing to a given year from the CMOR repository to the scratch folder as one file and returns the path to the scratch's copy. :param year: year to retrieve :type year: int :param domain: CMOR domain :type domain: str :param var: variable name :type var: str :param startdate: file's startdate :type startdate: str :param member: file's member :type member: int :param grid: file's grid (only needed if it is not the original) :type grid: str :param box: file's box (only needed to retrieve sections or averages) :type box: Box :return: path to the copy created on the scratch folder :rtype: str """ chunk_files = list() for chunk in self.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def get_year_chunks(self, startdate, year): """ Get the list of chunks containing timesteps from the given year :param startdate: startdate to use :type startdate: str :param year: reference year :type year: int :return: list of chunks containing data from the given year :rtype: list[int] """ date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks+1): chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_full_years(self, startdate): """ Returns the list of full years that are in the given startdate :param startdate: startdate to use :type startdate: str :return: list of full years :rtype: list[int] """ chunks_per_year = 12 / self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year): years.append(first_year) first_year += 1 return years