import glob import shutil import threading # import netCDF4 import os import numpy as np from autosubmit.config.log import Log from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day from earthdiagnostics import Utils, TempFile class DataManager(object): def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks, calendar='standard'): self.institution = institution self.model = model self.expid = expid self.data_dir = datafolder self.frequency = frequency self.chunk_size = chunk_size self.experiment_name = experiment_name self.add_startdate = True self.add_name = True self.num_chunks = num_chunks self.calendar = calendar # noinspection PyPep8Naming def prepare_CMOR_files(self, startdates, members): # Check if cmorized and convert if not if not os.path.exists(os.path.join(self.data_dir, self.expid)): raise Exception('The experiment {0} is not CMORized. ' 'Please, CMORize it and launch again.'.format(self.expid)) for startdate in startdates: for member in members: member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc{0}'.format(member), 'outputs') Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member)) threads = list() numthreads = Utils.available_cpu_count() filepaths = glob.glob(os.path.join(member_path, '*.gz')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._unzip, args=([filepaths[numthread::numthreads]])) threads.append(t) t.start() for t in threads: t.join() filepaths = glob.glob(os.path.join(member_path, '*.tar')) for numthread in range(0, numthreads): t = threading.Thread(target=DataManager._untar, args=(filepaths[numthread::numthreads], member_path)) threads.append(t) t.start() for t in threads: t.join() if self.experiment_name != self.model: bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model) for (dirpath, dirnames, filenames) in os.walk(bad_path, False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_output_'.format(self.model), '_{0}_{1}_'.format(self.model, self.experiment_name)) good = good.replace('/{0}/{0}'.format(self.model), '/{0}/{1}'.format(self.model, self.experiment_name)) Utils.move_file(filepath, good) os.rmdir(dirpath) good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name) for sdate in os.listdir(good_dir): for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False): for filename in filenames: filepath = os.path.join(dirpath, filename) good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate), '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate)) if good != filepath: Log.info('Moving {0} to {1}'.format(filename, good)) Utils.move_file(filepath, good) @staticmethod def _unzip(files): for filepath in files: Log.debug('Unzipping {0}', filepath) Utils.execute_shell_command('gunzip {0}'.format(filepath)) @staticmethod def _untar(files, member_path): for filepath in files: Log.debug('Unpacking {0}', filepath) Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path)) os.remove(filepath) def get_files(self, startdate, member, chunk, domain, variables, grid=None): file_names = list() if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, self.frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') var_file = list() for var in variables: if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) var_file.append(os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, self.frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month)))) file_names.append(var_file) return file_names def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None): if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() if not frequency: frequency = self.frequency start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}-{8}.nc'.format(var, domain_abreviattion, frequency, self.model, self.experiment_name, startdate, member_plus, "{0:04}{1:02}".format(chunk_start.year, chunk_start.month), "{0:04}{1:02}".format(chunk_end.year, chunk_end.month))) temp_path = TempFile.get() shutil.copyfile(filepath, temp_path) return temp_path def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None, rename_var=None, frequency=None, year=None): Utils.convert2netcdf4(filetosend) if domain == 'seaIce': domain_abreviattion = 'OI' else: domain_abreviattion = domain[0].upper() if box: var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str() if rename_var: Utils.rename_variable(filetosend, rename_var, var) if not frequency: frequency = self.frequency start = parse_date(startdate) member_plus = str(member + 1) member_path = os.path.join(self.data_dir, self.expid, startdate, 'fc' + str(member), 'outputs', 'output', self.institution, self.model, self.experiment_name, 'S' + startdate, frequency, domain) if chunk is not None: chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard') chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard') chunk_end = previous_day(chunk_end, 'standard') time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month) elif year is not None: if frequency is not 'yr': raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) else: raise ValueError('Chunk and year can not be None at the same time') if grid: var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus)) else: var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus)) filepath = os.path.join(var_path, '{0}_{1}{2}_{3}_{4}_S{5}_r{6}i1p1_' '{7}.nc'.format(var, domain_abreviattion, frequency, self.model, self.experiment_name, startdate, member_plus, time_bound)) if region: if not os.path.exists(filepath): handler = Utils.openCdf(filetosend) handler.createDimension('region') var_region = handler.createVariable('region', str, 'region') var_region[0] = region original_var = handler.variables[var] new_var = handler.createVariable('new_var', original_var.datatype, original_var.dimensions + ('region',)) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) value = original_var[:] new_var[..., 0] = value handler.close() Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var)) Utils.rename_variable(filetosend, 'new_var', var) else: temp = TempFile.get() shutil.copyfile(filepath, temp) handler = Utils.openCdf(temp) handler_send = Utils.openCdf(filetosend) value = handler_send.variables[var][:] var_region = handler.variables['region'] basin_index = np.where(var_region[:] == region) if len(basin_index[0]) == 0: var_region[var_region.shape[0]] = region basin_index = var_region.shape[0] - 1 else: basin_index = basin_index[0][0] handler.variables[var][..., basin_index] = value handler.close() handler_send.close() Utils.move_file(temp, filetosend) Utils.move_file(filetosend, filepath) def get_year(self, domain, var, startdate, member, year, grid=None, box=None): chunk_files = list() for chunk in self.get_year_chunks(startdate, year): chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box)) if len(chunk_files) > 1: temp = TempFile.get() Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp) for chunk_file in chunk_files: os.remove(chunk_file) else: temp = chunk_files[0] temp2 = TempFile.get() handler = Utils.openCdf(temp) time = Utils.get_datetime_from_netcdf(handler) handler.close() start = None end = None for x in range(0, len(time)): date = time[x] if date.year == year: if date.month == 1: start = x elif date.month == 12: end = x Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end)) os.remove(temp) return temp2 def get_year_chunks(self, startdate, year): date = parse_date(startdate) chunks = list() for chunk in range(1, self.num_chunks+1): chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar) if chunk_start.year > year: break elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month', self.calendar).year == year: chunks.append(chunk) return chunks def get_full_years(self, startdate): chunks_per_year = 12 / self.chunk_size date = parse_date(startdate) first_january = 0 first_year = date.year if date.month != 1: month = date.month first_year += 1 while month + self.chunk_size < 12: month += self.chunk_size first_january += 1 years = list() for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year): years.append(first_year) first_year += 1 return years