# coding=utf-8 """Classes to manage cmorized datasets""" import glob import os import re import shutil from datetime import datetime from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, previous_day from bscearth.utils.log import Log from earthdiagnostics.datafile import StorageStatus from earthdiagnostics.diagnostic import Diagnostic from earthdiagnostics.cmorizer import Cmorizer from earthdiagnostics.datamanager import DataManager from earthdiagnostics.frequency import Frequencies, Frequency from earthdiagnostics.modelingrealm import ModelingRealms from earthdiagnostics.utils import TempFile, Utils from earthdiagnostics.variable import VariableType class CMORManager(DataManager): """ Data manager class for CMORized experiments Parameters ---------- config: earthdiagnostics.config.Config """ def __init__(self, config): super(CMORManager, self).__init__(config) self._dic_cmorized = dict() self.find_model_data() self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid) def find_model_data(self): """ Seek the configured data folders for the experiment data For each folder, it looks at: -/ -// -/// Model has any '-' character removed and is passed to lower """ data_folders = self.config.data_dir.split(':') experiment_folder = self.experiment.model.lower() if experiment_folder.startswith('ec-earth'): experiment_folder = 'ecearth' self.config.data_dir = None for data_folder in data_folders: if os.path.isdir(os.path.join(data_folder, self.experiment.expid)): self.config.data_dir = data_folder break test_folder = os.path.join(data_folder, self.experiment.model.lower().replace('-', '')) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break test_folder = os.path.join(data_folder, self.config.data_type, experiment_folder) if os.path.isdir(os.path.join(test_folder, self.experiment.expid)): self.config.data_dir = test_folder break if not self.config.data_dir: raise Exception('Can not find model data') # noinspection PyUnusedLocal def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=VariableType.MEAN, possible_versions=None): """ Check if a file exists in the storage Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str or None box: Box or None frequency: Frequency or None vartype: VariableType possible_versions: iterable od str or None Returns ------- bool """ cmor_var = self.variable_list.get_variable(var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) if possible_versions is None: return os.path.isfile(filepath) else: for version in possible_versions: if os.path.isfile(filepath.replace(self.config.cmor.version, version)): return True return False def request_chunk(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None, vartype=None): """ Request a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str or None box: Box or None frequency: Frequency or None vartype: VariableType or None Returns ------- DataFile """ if frequency is None: frequency = self.config.frequency cmor_var = self.variable_list.get_variable(var) var = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid, None, None) return self._get_file_from_storage(filepath) def request_year(self, diagnostic, domain, var, startdate, member, year, grid=None, box=None, frequency=None): """ Request a given year for a variavle from a CMOR repository Parameters ---------- diagnostic: Diagnostic domain: ModelingRealm var: str startdate: str member: int year: int grid: str or None box: Box or None frequency: Frequency or None Returns ------- DataFile """ job = MergeYear(self, domain, var, startdate, member, year, grid, box, frequency) job.request_data() job.declare_data_generated() if not job.year_file.job_added: diagnostic.subjobs.append(job) job.year_file.job_added = True return job.year_file def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None, vartype=VariableType.MEAN, diagnostic=None): """ Declare a variable chunk to be generated by a diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int chunk: int grid: str or None region: Basin or None box: Box or None frequency: Frequency or None vartype: VariableType diagnostic: Diagnostic Returns ------- DataFile """ if frequency is None: frequency = self.config.frequency original_name = var cmor_var = self.variable_list.get_variable(var) if cmor_var: var = cmor_var.short_name final_name = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, chunk, frequency, grid) netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention, region, diagnostic, grid, vartype, original_name) netcdf_file.frequency = frequency return netcdf_file def declare_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN, diagnostic=None): """ Declare a variable year to be generated by a diagnostic Parameters ---------- domain: ModelingRealm var: str startdate: str member: int year: int grid: str or None box: Box or None vartype: VariableType diagnostic: Diagnostic Returns ------- DataFile """ original_name = var cmor_var = self.variable_list.get_variable(var) if cmor_var: var = cmor_var.short_name final_name = self._get_final_var_name(box, var) filepath = self.get_file_path(startdate, member, domain, final_name, cmor_var, None, Frequencies.yearly, grid, year=year) netcdf_file = self._declare_generated_file(filepath, domain, final_name, cmor_var, self.config.data_convention, None, diagnostic, grid, vartype, original_name) netcdf_file.frequency = Frequencies.yearly return netcdf_file def get_file_path(self, startdate, member, domain, var, cmor_var, chunk, frequency, grid=None, year=None, date_str=None): """ Return the path to a concrete file Parameters ---------- startdate: str member: int domain: ModelingRealm var: str cmor_var: Variable chunk: int or None frequency: Frequency or str grid: str or None year: int or None date_str: str or None Returns ------- str Raises ------ ValueError If you provide two or more parameters from chunk, year or date_str or none at all """ if frequency is None: frequency = self.config.frequency frequency = Frequency.parse(frequency) options = sum(x is not None for x in (chunk, year, date_str)) if options == 0: raise ValueError('You must provide chunk, year or date_str') elif options > 1: raise ValueError('You must provide only one parameter in chunk, year or date_str') if frequency is None: frequency = self.config.frequency else: frequency = Frequency.parse(frequency) folder_path = self._get_full_cmor_folder_path(startdate, member, domain, var, frequency, grid, cmor_var) file_name = self._get_cmor_file_name(startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid) filepath = os.path.join(folder_path, file_name) return filepath def _get_cmor_file_name(self, startdate, member, domain, var, cmor_var, frequency, chunk, year, date_str, grid, ): if cmor_var is None: cmor_table = domain.get_table(frequency, self.config.data_convention) else: cmor_table = cmor_var.get_table(frequency, self.config.data_convention) time_bound = self._get_time_component(chunk, date_str, frequency, startdate, year) time_bound = '_{0}.nc'.format(time_bound) if self.config.data_convention in ('specs', 'preface'): file_name = '{0}_{1}_{2}_{3}_S{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model, self.experiment.experiment_name, startdate, self._get_member_str(member), time_bound) elif self.config.data_convention in ('primavera', 'cmip6'): if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid file_name = '{0}_{1}_{2}_{3}_{4}_{5}{6}'.format(var, cmor_table.name, self.experiment.model, self.experiment.experiment_name, self._get_member_str(member), grid, time_bound) # Meteofrance else: time_bound = self._get_chunk_time_bounds(startdate, chunk) file_name = '{0}_{1}_{2}_{3}.nc'.format(var, frequency, time_bound, self._get_member_str(member)) return file_name def _get_time_component(self, chunk, date_str, frequency, startdate, year): if chunk is not None: time_bound = self._get_chunk_time_bounds(startdate, chunk) elif year: if frequency != Frequencies.yearly: raise ValueError('Year may be provided instead of chunk only if frequency is "yr"') time_bound = str(year) else: time_bound = date_str return time_bound def _get_full_cmor_folder_path(self, startdate, member, domain, var, frequency, grid, cmor_var): if self.config.data_convention in ('specs', 'preface'): folder_path = os.path.join(self._get_startdate_path(startdate), str(frequency), domain.name, var) if grid: folder_path = os.path.join(folder_path, grid) folder_path = os.path.join(folder_path, self._get_member_str(member)) if self.config.cmor.version: folder_path = os.path.join(folder_path, self.config.cmor.version) elif self.config.data_convention in ('primavera', 'cmip6'): if not self.config.cmor.version: raise ValueError('CMOR version is mandatory for PRIMAVERA and CMIP6') if not grid: if domain in [ModelingRealms.ocnBgchem, ModelingRealms.seaIce, ModelingRealms.ocean]: grid = self.config.cmor.default_ocean_grid else: grid = self.config.cmor.default_atmos_grid if cmor_var is None: table_name = domain.get_table(frequency, self.config.data_convention).name else: table_name = cmor_var.get_table(frequency, self.config.data_convention).name folder_path = os.path.join(self._get_startdate_path(startdate), self._get_member_str(member), table_name, var, grid, self.config.cmor.version) elif self.config.data_convention == 'meteofrance': folder_path = os.path.join(self.config.data_dir, self.experiment.experiment_name, 'H{0}'.format(chr(64 + int(startdate[4:6]))), startdate[0:4]) else: raise ValueError('Data convention {0} not supported'.format(self.config.data_convention)) return folder_path def _get_chunk_time_bounds(self, startdate, chunk): start = parse_date(startdate) chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar) chunk_end = previous_day(chunk_end, self.experiment.calendar) if self.config.data_convention == 'preface': separator = '_' else: separator = '-' if self.config.data_convention == 'meteofrance': time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month) else: time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year, chunk_end.month, separator) return time_bound def link_file(self, domain, var, cmor_var, startdate, member, chunk=None, grid=None, frequency=None, year=None, date_str=None, move_old=False, vartype=VariableType.MEAN): """ Create the link of a given file from the CMOR repository. Parameters ---------- domain: ModelingRealm var: str cmor_var: startdate: str member: int chunk: int or None, optional grid: str or None, optional frequency: Frequency or None, optional year: int or None, optional date_str: str or None, optional move_old: bool, optional vartype: VariableType, optional """ if frequency is None: frequency = self.config.frequency filepath = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency, grid=grid, year=year, date_str=date_str) self.create_link(domain, filepath, frequency, var, grid, move_old, vartype) def create_link(self, domain, filepath, frequency, var, grid, move_old, vartype): """ Create file link Parameters ---------- domain: ModelingRealm filepath: str frequency: Frequency var: str grid: str move_old: bool vartype: VariableType """ if self.config.data_convention == 'meteofrance': return freq_str = frequency.folder_name(vartype) if not grid: grid = 'original' variable_folder = self.get_varfolder(domain, var) vargrid_folder = self.get_varfolder(domain, var, grid) self.lock.acquire() try: if grid == 'original': link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) Utils.create_folder_tree(link_path) else: link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder) Utils.create_folder_tree(link_path) default_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder) original_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, vargrid_folder.replace('-{0}_f'.format(grid), '-original_f')) if os.path.islink(default_path): os.remove(default_path) elif os.path.isdir(default_path): shutil.move(default_path, original_path) os.symlink(link_path, default_path) if move_old and link_path not in self._checked_vars: self._checked_vars.append(link_path) old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, 'old_{0}'.format(os.path.basename(link_path))) regex = re.compile(var + '_[0-9]{6,8}\.nc') for filename in os.listdir(link_path): if regex.match(filename): Utils.create_folder_tree(old_path) Utils.move_file(os.path.join(link_path, filename), os.path.join(old_path, filename)) link_path = os.path.join(link_path, os.path.basename(filepath)) if os.path.lexists(link_path): os.remove(link_path) if not os.path.exists(filepath): raise ValueError('Original file {0} does not exists'.format(filepath)) relative_path = os.path.relpath(filepath, os.path.dirname(link_path)) os.symlink(relative_path, link_path) except Exception: raise finally: self.lock.release() def prepare(self): """ Prepare the data to be used by the diagnostic. If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure will be launched If CMOR data is available but packed, the procedure will unpack it. """ # Check if cmorized and convert if not if self.config.data_convention == 'meteofrance': return for startdate, member in self.experiment.get_member_list(): if not self._unpack_cmor_files(startdate, member): self._cmorize_member(startdate, member) def is_cmorized(self, startdate, member, chunk, domain): """ Check if a chunk domain is cmorized A cache is maintained so only the first check is costly Parameters ---------- startdate: str member: int chunk: int domain: ModelingRealm Returns ------- bool """ identifier = (startdate, member, chunk) if identifier not in self._dic_cmorized: self._dic_cmorized[identifier] = {} self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain) elif domain not in self._dic_cmorized[identifier]: self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain) return self._dic_cmorized[identifier][domain] def _is_cmorized(self, startdate, member, chunk, domain): startdate_path = self._get_startdate_path(startdate) if not os.path.isdir(startdate_path): return False count = 0 if self.config.data_convention == 'specs': for freq in os.listdir(startdate_path): domain_path = os.path.join(startdate_path, freq, domain.name) if os.path.isdir(domain_path): count = self._check_var_presence(domain_path, count, startdate, member, domain, chunk, freq) if count >= self.config.cmor.min_cmorized_vars: return True else: member_path = os.path.join(startdate_path, self._get_member_str(member)) if not os.path.isdir(member_path): return False freq = Frequencies.monthly table = domain.get_table(freq, self.config.data_convention) table_dir = os.path.join(member_path, table.name) if not os.path.isdir(table_dir): return False count = self._check_var_presence(table_dir, count, startdate, member, domain, chunk, freq) if count >= self.config.cmor.min_cmorized_vars: return True return False def _check_var_presence(self, folder, current_count, startdate, member, domain, chunk, freq): for var in os.listdir(folder): cmor_var = self.variable_list.get_variable(var, True) var_path = self.get_file_path(startdate, member, domain, var, cmor_var, chunk, frequency=freq) if os.path.isfile(var_path): current_count += 1 if current_count >= self.config.cmor.min_cmorized_vars: break return current_count def _cmorize_member(self, startdate, member): start_time = datetime.now() member_str = self.experiment.get_member_str(member) Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time) cmorizer = Cmorizer(self, startdate, member) cmorizer.cmorize_ocean() cmorizer.cmorize_atmos() Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str, datetime.now() - start_time) def _unpack_cmor_files(self, startdate, member): if self.config.cmor.force: return False cmorized = False for chunk in range(1, self.experiment.num_chunks + 1): if not self.config.cmor.force_untar: if self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \ self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean): cmorized = True continue if self._unpack_chunk(startdate, member, chunk): cmorized = True if cmorized: Log.info('Startdate {0} member {1} ready', startdate, member) return cmorized def _unpack_chunk(self, startdate, member, chunk): if not self.config.cmor.chunk_cmorization_requested(chunk): return True filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz') if len(filepaths) > 0: Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.unzip(filepaths, True) filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar') if len(filepaths) > 0: Log.info('Unpacking cmorized data for {0} {1} {2}...', startdate, member, chunk) Utils.untar(filepaths, self.cmor_path) self._correct_paths(startdate) self.create_links(startdate, member) return True return False def _get_transferred_cmor_data_filepaths(self, startdate, member, chunk, extension): tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles') tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid, 'cmorfiles') filepaths = [] for cmor_prefix in ('CMOR?', 'CMOR'): file_name = '{5}_{0}_{1}_{2}_{3}-*.{4}'.format(self.experiment.expid, startdate, self.experiment.get_member_str(member), self.experiment.get_chunk_start_str(startdate, chunk), extension, cmor_prefix) filepaths += glob.glob(os.path.join(tar_path, file_name)) filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name)) filepaths += glob.glob(os.path.join(tar_original_files, file_name)) filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name)) return filepaths def _correct_paths(self, startdate): self._remove_extra_output_folder() self._fix_model_as_experiment_error(startdate) def _fix_model_as_experiment_error(self, startdate): if self.experiment.experiment_name != self.experiment.model: bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model, self.experiment.model) Log.debug('Correcting double model appearance') for (dirpath, _, filenames) in os.walk(bad_path, False): for filename in filenames: if '_S{0}_'.format(startdate) in filename: continue filepath = os.path.join(dirpath, filename) good = filepath good = good.replace('_{0}_output_'.format(self.experiment.model), '_{0}_{1}_S{2}_'.format(self.experiment.model, self.experiment.experiment_name, startdate)) good = good.replace('/{0}/{0}'.format(self.experiment.model), '/{0}/{1}'.format(self.experiment.model, self.experiment.experiment_name)) Utils.move_file(filepath, good) if self.experiment.model != self.experiment.experiment_name: os.rmdir(dirpath) Log.debug('Done') def _remove_extra_output_folder(self): bad_path = os.path.join(self.cmor_path, 'output') if os.path.exists(bad_path): Log.debug('Moving CMOR files out of the output folder') Utils.move_tree(bad_path, self.cmor_path) Log.debug('Done') def create_links(self, startdate, member=None): """ Create links for a gicen startdate or member Parameters ---------- startdate: str member: int or None Returns ------- ValueError: If the data convention is not supported """ if member is not None: member_str = self._get_member_str(member) else: member_str = None Log.info('Creating links for CMOR files ({0})', startdate) path = self._get_startdate_path(startdate) if self.config.data_convention.upper() in ('SPECS', 'APPLICATE'): self._create_links_cmip5(member_str, path) elif self.config.data_convention.upper() in ('CMIP6', 'PRIMAVERA'): self._create_links_cmip6(member_str, path) else: raise ValueError('Dataset convention {0} not supported for massive ' 'link creation'.format(self.config.data_convention)) Log.debug('Links ready') def _create_links_cmip5(self, member_str, path): for freq in os.listdir(path): frequency = Frequency.parse(freq) for domain in os.listdir(os.path.join(path, freq)): for var in os.listdir(os.path.join(path, freq, domain)): for member in os.listdir(os.path.join(path, freq, domain, var)): if member_str is not None and member_str != member: continue for name in os.listdir(os.path.join(path, freq, domain, var, member)): filepath = os.path.join(path, freq, domain, var, member, name) if os.path.isfile(filepath): self.create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): self.create_link(domain, os.path.join(filepath, filename), frequency, var, "", False, vartype=VariableType.MEAN) def _create_links_cmip6(self, member_str, path): for member in os.listdir(path): for table in os.listdir(os.path.join(path, member)): frequency = self.variable_list.tables[table].frequency domain = None for var in os.listdir(os.path.join(path, member, table)): for grid in os.listdir(os.path.join(path, member, table, var)): if member_str is not None and member_str != member: continue for name in os.listdir(os.path.join(path, member, table, var, grid)): filepath = os.path.join(path, member, table, var, grid, name) if os.path.isfile(filepath): self.create_link(domain, filepath, frequency, var, "", False, vartype=VariableType.MEAN) else: for filename in os.listdir(filepath): cmorfile = os.path.join(filepath, filename) self.create_link(domain, cmorfile, frequency, var, "", False, vartype=VariableType.MEAN) def _get_startdate_path(self, startdate): """ Return the path to the startdate's CMOR folder Parameters ---------- startdate: str Returns ------- str """ if self.config.data_convention == 'specs': return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.model, self.experiment.experiment_name, 'S' + startdate) elif self.config.data_convention == 'preface': return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute, self.experiment.experiment_name, 'S' + startdate) else: return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.config.cmor.activity, self.experiment.institute, self.experiment.model, self.experiment.experiment_name) def _get_member_str(self, member): if self.config.data_convention in ('specs', 'preface'): template = 'r{0}i{1}p1' elif self.config.data_convention in ('primavera', 'cmip6'): template = 'r{0}i{1}p1f1' elif self.config.data_convention == 'meteofrance': return '{0:02d}'.format(member) return template.format(member + 1 - self.experiment.member_count_start, self.config.cmor.initialization_number) class MergeYear(Diagnostic): """ Diagnostic to get all the data for a given year and merge it in a file Parameters ---------- data_manager: DataManager domain: ModelingRealm var: str startdate: str member: int year: int grid: str or None, optional box: Box or None, optional frequency: Frequency or None, optional """ @classmethod def generate_jobs(cls, diags, options): """ Method to generate the required diagnostics from a section of the configuration file Required by the interface, does nothing as this diagnostic is not meant to be configured in the usal way """ pass def __init__(self, data_manager, domain, var, startdate, member, year, grid=None, box=None, frequency=None): super(MergeYear, self).__init__(data_manager) self.chunk_files = [] self.experiment = self.data_manager.experiment self.domain = domain self.var = var self.startdate = startdate self.member = member self.year = year self.grid = grid self.box = box self.frequency = frequency def __str__(self): return 'Merge year data Variable: {0.domain}:{0.var} Startdate: {0.startdate} Member: {0.member} ' \ 'Year: {0.year} Grid: {0.grid} Box: {0.box} Frequency: {0.frequency}'.format(self) def __eq__(self, other): return self.domain == other.domain and self.var == other.var and self.startdate == other.startdate and \ self.member == other.member and self.year == other.year and self.grid == other.grid and \ self.box == other.box and self.frequency == other.frequency def __hash__(self): return hash(str(self)) def request_data(self): """Request all the data required by the diagnostic""" for chunk in self.experiment.get_year_chunks(self.startdate, self.year): self.chunk_files.append(self.request_chunk(self.domain, self.var, self.startdate, self.member, chunk, grid=self.grid, box=self.box, frequency=self.frequency)) def declare_data_generated(self): """Declare all the data generated by the diagnostic""" self.year_file = self.declare_year(self.domain, self.var, self.startdate, self.member, self.year, grid=self.grid, box=self.box) self.year_file.storage_status = StorageStatus.NO_STORE def compute(self): """Create the yearly file for the data""" temp = self._merge_chunk_files() temp2 = self._select_data_of_given_year(temp) self.year_file.set_local_file(temp2) def _select_data_of_given_year(self, data_file): temp2 = TempFile.get() handler = Utils.open_cdf(data_file) times = Utils.get_datetime_from_netcdf(handler) x = 0 first_index = None last_index = None while x < times.size: if times[x].year == self.year: first_index = x break else: x += 1 while x < times.size: if times[x].year != self.year: last_index = x break else: x += 1 if last_index is None: last_index = times.size Utils.nco.ncks(input=data_file, output=temp2, options=['-d time,{0},{1}'.format(first_index, last_index - 1)]) return temp2 def _merge_chunk_files(self): temp = TempFile.get() if len(self.chunk_files) == 1: Utils.copy_file(self.chunk_files[0].local_file, temp) return temp Utils.nco.ncrcat(input=' '.join(self.chunk_files), output=temp) for chunk_file in self.chunk_files: os.remove(chunk_file) return temp