datamanager.py 52.1 KB
Newer Older
import csv
import uuid
from cfunits import Units
from datetime import datetime

import netCDF4
import numpy as np
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \
    date2str
from earthdiagnostics.constants import Basins
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils, TempFile


class DataManager(object):
    """
    Class to manage the data repositories
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def __init__(self, config):
        self.config = config
        self.experiment = config.experiment
        self._checked_vars = list()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        UnitConversion.load_conversions()
    # noinspection PyPep8Naming
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Prepares the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        :return:
        """
        # Check if cmorized and convert if not
        created = False
        for startdate, member in self.experiment.get_member_list():
            member_str = self.experiment.get_member_str(member)
            if self.config.cmor.force or not self._is_cmorized(startdate, member):
                created = True
                Log.info('CMORizing startdate {0} member {1}', startdate, member_str)
                    errors += self._unpack_ocean_files('MMO', startdate, member)
                    errors += self._unpack_ocean_files('diags', startdate, member)
                grb_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
                                        member_str, 'outputs', '*.grb')
                gribfiles = glob.glob(grb_path)
                if len(gribfiles) == 0:
                        os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, 
                                     member_str, 'outputs', 'MMA*'))
                    tar_files.sort()
                    count = 1
                    for tarfile in tar_files:
                        Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files)))
                        errors += self._unpack_tar(tarfile, startdate, member)
                        Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
                    self._cmorize_grib(startdate, member)
                Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str)

        if created:
            for error in errors:
                Log.error('File {0} could not be unzipped.', error)
        for startdate, member in self.experiment.get_member_list():
            member_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
            Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

            filepaths = glob.glob(os.path.join(member_path, '*.gz'))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

            if len(filepaths) == 0:
                continue

            self._unpack_cmorfiles(filepaths, member_path)
    def _unpack_ocean_files(self, prefix, startdate, member):
        tar_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate,
                                  self.experiment.get_member_str(member), 'outputs', '{0}*'.format(prefix))
        tar_files = glob.glob(tar_folder)
        tar_files.sort()
            Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files)))
            errors += self._unpack_tar(tarfile, startdate, member)
            Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files)))
    def _get_grib_filename(self, grid, month):
        return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2])
    def _cmorize_grib(self, startdate, member):
        atmos_timestep = None
        chunk_start = parse_date(startdate)
        member_str = self.experiment.get_member_str(member)
        data_folder = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', startdate, 
                                   member_str, 'outputs')
        while os.path.exists(os.path.join(data_folder, self._get_grib_filename('GG', chunk_start))) or \
                os.path.exists(os.path.join(data_folder, self._get_grib_filename('SH', chunk_start))):
            chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')
            Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end))
                Log.info('Processing {0} variables', grid)
                if not os.path.exists(os.path.join(data_folder, self._get_grib_filename(grid, chunk_start))):
                    continue

                    current_month = add_months(chunk_start, month, 'standard')
                    original_gribfile = os.path.join(data_folder, self._get_grib_filename(grid, current_month))
                    Log.info('Processing month {1}', grid, date2str(current_month))
                    gribfile = os.path.join(self.config.scratch_dir, os.path.basename(original_gribfile))
                    if not os.path.isfile(gribfile):
                        Log.info('Copying file...', grid, date2str(current_month))
                        shutil.copy(original_gribfile, gribfile)

                    if atmos_timestep is None:
                        atmos_timestep = self._get_atmos_timestep(gribfile)
                                                 self._get_grib_filename(grid,
                                                                         add_months(current_month, -1, 'standard')))
                    if os.path.exists(prev_gribfile):
                        self._merge_grib_files(current_month, prev_gribfile, gribfile)
                        full_file = 'ICM'
                    else:
                        full_file = gribfile

                    # remap on regular Gauss grid
                    if grid == 'SH':
                        Utils.cdo.splitparam(input='-sp2gpl {0}'.format(full_file), output=gribfile + '_',
                                             options='-f nc4')
                        Utils.cdo.splitparam(input=full_file, output=gribfile + '_', options='-R -f nc4')
                        # total precipitation (remove negative values)
                        Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add '
                                                     '{0}_{{142,143}}.128.nc'.format(gribfile),
                                          output='{0}_228.128.nc'.format(gribfile))
                    Utils.remove_file('ICM')
                    next_gribfile = os.path.join(data_folder,
                                                 self._get_grib_filename(grid,
                                                                         add_months(current_month, 1, 'standard')))

                    if not os.path.exists(next_gribfile):
                        os.remove(gribfile)

                    cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00')

                    self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '{0}hr'.format(atmos_timestep))
                    self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1d')
                    self._ungrib_vars(cdo_reftime, gribfile, current_month.month, '1m')
                    for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)):
                    Log.result('Month {0}, {1} variables finished', date2str(current_month), grid)
                self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1m')
                self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid, '1d')
                self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, grid,
                                              '{0}hr'.format(atmos_timestep))
            chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
    @staticmethod
    def _merge_grib_files(current_month, prev_gribfile, gribfile):
        Log.info('Merging data from different files...')
        fd = open('rules_files', 'w')
        fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month))
        fd.close()
        # get first timestep for each month from previous file (if possible)
        if os.path.exists('ICM'):
            os.remove('ICM')
        Utils.execute_shell_command('grib_filter -o ICM rules_files '
                                    '{0} {1}'.format(os.path.basename(prev_gribfile),
                                                     os.path.basename(gribfile)))
    def _get_atmos_timestep(self, gribfile):
        Log.info('Getting timestep...')
        grib_handler = pygrib.open(gribfile)
        mes1 = grib_handler.message(1)
        mes2 = grib_handler.readline()
        while mes2.analDate == mes1.analDate:
        atmos_timestep = mes2.analDate - mes1.analDate
        atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
        self.atmos_timestep = atmos_timestep
        return atmos_timestep

    def _ungrib_vars(self, cdo_reftime, gribfile, month, frequency):
        Log.info('Preparing {0} variables'.format(frequency))
        var_codes = self.config.cmor.get_variables(frequency)
        for var_code in var_codes:
            if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)):
            cdo_operator = '-selmon,{0}'.format(month)
            if frequency in ('month', 'monthly', 'mon', '1m'):
                if var_code == 201:
                    cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
                elif var_code == 202:
                    cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
                else:
                    cdo_operator = "-monmean {0} ".format(cdo_operator)
            elif frequency in ('day', 'daily', '1d'):
                if var_code == 201:
                    cdo_operator = "-daymax {0} ".format(cdo_operator)
                elif var_code == 202:
                    cdo_operator = "-daymin {0} ".format(cdo_operator)
                else:
                    cdo_operator = "-daymean {0} ".format(cdo_operator)
            if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228):
                cdo_operator = '{0} -shifttime,-{1}hours'.format(cdo_operator, self.atmos_timestep)
            if var_code == 129:
                # geopotential
                new_units = "m"
                cdo_operator = "-divc,9.81 {0}".format(cdo_operator)
            elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
                # radiation
                cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator)
                cdo_operator = "-divc,{0} {1}".format(self.atmos_timestep * 3600, cdo_operator)
                # precipitation/evaporation/runoff
                new_units = "kg m-2 s-1"
                cdo_operator = "-mulc,1000 -divc,{0}".format(self.atmos_timestep * 3600)
            levels = self.config.cmor.get_levels(frequency, var_code)
            if levels:
                cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels)

            Utils.execute_shell_command('cdo -t ecmwf setreftime,{0} '
                                        '{2}_{3}_{4}.nc'.format(cdo_reftime, cdo_operator,
                                                                gribfile, var_code, frequency))
            h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)

            handler = Utils.openCdf(h_var_file)
            var_name = None
            for key in handler.variables.keys():
                if key + '_2' in handler.variables and key not in handler.dimensions:
                    var_name = key
            handler.close()
                Utils.nco.ncks(input='{0}_{1}_1m.nc'.format(gribfile, var_code),
                               output='{0}_{1}_1m.nc'.format(gribfile, var_code),
    def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, grid, frequency):
        merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
        files = glob.glob(os.path.join(self.config.scratch_dir,
                                       '{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency)))
        for first_file in files:
            shutil.move(first_file, merged_file)
            current_month = add_months(chunk_start, 1, 'standard')
            while current_month < chunk_end:
                month_file = first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
                                                '+{0}.grb'.format(date2str(current_month)[:-2]))
                Utils.concat_variables(month_file, merged_file, True)
                current_month = add_months(current_month, 1, 'standard')

            self._cmorize_nc_file(merged_file, member, startdate)
    def _unpack_cmorfiles(self, filepaths, member_path):
        threads = list()
        numthreads = Utils.available_cpu_count()
        for numthread in range(0, numthreads):
            t = threading.Thread(target=DataManager._unzip,
                                 args=([filepaths[numthread::numthreads]]))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        filepaths = glob.glob(os.path.join(member_path, '*.tar')).sort()
        for numthread in range(0, numthreads):
            t = threading.Thread(target=DataManager._untar,
                                 args=(filepaths[numthread::numthreads], member_path))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        if self.experiment.experiment_name != self.experiment.model:
            bad_path = os.path.join(member_path, self.experiment.institute, self.experiment.model, 
                                    self.experiment.model)
            for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_output_'.format(self.experiment.model),
                                            '_{0}_{1}_'.format(self.experiment.model, self.experiment.experiment_name))
                    good = good.replace('/{0}/{0}'.format(self.experiment.model),
                                        '/{0}/{1}'.format(self.experiment.model,
                                                          self.experiment.experiment_name))

                    Utils.move_file(filepath, good)
                os.rmdir(dirpath)
        good_dir = os.path.join(member_path, self.experiment.institute, self.experiment.model,
                                self.experiment.experiment_name)
        for sdate in os.listdir(good_dir):
            for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_{1}_r'.format(self.experiment.model,
                                                                self.experiment.experiment_name, sdate),
                                            '_{0}_{1}_{2}_r'.format(self.experiment.model,
                                                                    self.experiment.experiment_name, sdate))
                    if good != filepath:
                        Log.info('Moving {0} to {1}'.format(filename, good))
                        Utils.move_file(filepath, good)

    def _unpack_tar(self, tarfile, startdate, member):
        Log.info('Unpacking {0}', tarfile)

        scratch_dir = os.path.join(self.config.scratch_dir, 'CMOR')

        if os.path.exists(scratch_dir):
            shutil.rmtree(scratch_dir)
        self._untar((tarfile,), scratch_dir)
        errors = self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz')))
        if os.path.basename(tarfile).startswith('MMA'):
            temp = TempFile.get()
            for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')):
                Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
                shutil.move(temp, filename)

            sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc'))
            Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc'))

            gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc'))
            Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc'))

            for filename in sh_files + gg_files:
                os.remove(filename)

            Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'),
                           output=os.path.join(scratch_dir, 'gg.nc'), options='-A')
            os.remove(os.path.join(scratch_dir, 'sh.nc'))

            tar_startdate = tarfile[0:-4].split('_')[5].split('-')
            new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)
            shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name))

        for filename in glob.glob(os.path.join(scratch_dir, '*.nc')):
            self._cmorize_nc_file(filename, member, startdate)

    def _cmorize_nc_file(self, filename, member, startdate):
        Log.info('Processing file {0}', filename)
        temp = TempFile.get()
        Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filename, temp])
        shutil.move(temp, filename)
        file_parts = os.path.basename(filename).split('_')
        if self.experiment.expid in [file_parts[1], file_parts[2]]:
            frequency = 'm'
        else:
            frequency = file_parts[1][1].lower()
        variables = dict()
        variables['time_counter'] = 'time'
        variables['time_counter_bnds'] = 'time_bnds'
        variables['tbnds'] = 'bnds'
        variables['nav_lat'] = 'lat'
        variables['nav_lon'] = 'lon'
        variables['x'] = 'i'
        variables['y'] = 'j'
        Utils.rename_variables(filename, variables, False, True)
        handler = Utils.openCdf(filename)
        self._add_common_attributes(frequency, handler, member, startdate)
        self._update_time_variables(handler, startdate)
        handler.sync()
        temp = TempFile.get()
        Log.info('Splitting file {0}', filename)
        for variable in handler.variables.keys():
            if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi',
                            'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
                            'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
                            'time_counter_bounds', 'ncatice',
                            'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T',
                            'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',
                            'depth', 'depth_2', 'depth_3', 'depth_4',
                            'mlev', 'hyai', 'hybi', 'hyam', 'hybm'):
            self.extract_variable(filename, handler, frequency, member, startdate, temp, variable)
        Log.result('File {0} cmorized!', filename)
        handler.close()
        os.remove(filename)
    def extract_variable(self, file_path, handler, frequency, member, startdate, temp, variable):
        """
        Extracts a variable from a file and creates the CMOR file

        :param file_path: path to the file
        :type file_path: str
        :param handler: netCDF4 handler for the file
        :type handler: netCDF$.Dataset
        :param frequency: variable's frequency
        :type frequency: str
        :param member: member
        :type member: int
        :param startdate: startdate
        :type startdate: str
        :param temp: temporal file to use
        :type temp: str
        :param variable: variable's name
        :type variable: str
        """
        file_parts = os.path.basename(file_path).split('_')
        var_cmor = Variable.get_variable(variable)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if frequency == 'd':
            frequency = 'day'
        elif frequency == 'm':
            frequency = 'mon'
        elif frequency == 'h':
            frequency = '6hr'
        else:
            raise Exception('Frequency {0} not supported'.format(frequency))
        Utils.nco.ncks(input=file_path, output=temp, options='-v {0}'.format(variable))
        if var_cmor.domain == 'ocean':
            Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
                                          'depth': 'lev'}, False, True)
        elif var_cmor.domain in ('land', 'landIce'):
            Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
                                          'depth_4': 'sdepth'}, False, True)
            Utils.rename_variables(temp, {'depth': 'plev'}, False, True)

        handler_cmor = Utils.openCdf(temp)
        Utils.copy_variable(handler, handler_cmor, 'lon', False)
        Utils.copy_variable(handler, handler_cmor, 'lat', False)
        if 'time' in handler_cmor.dimensions.keys():
            Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
        handler_cmor.close()

        if var_cmor.basin is None:
            region = None
        else:
            region = var_cmor.basin.fullname

        if file_parts[0] == self.experiment.expid or file_parts[0].startswith('ORCA') or \
                file_parts[0] in ('MMA', 'MMO'):
            # Model output
            date_str = '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6])
            # Files generated by the old version of the diagnostics
            date_str = '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6])
        else:
            Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern',
                      var_cmor.short_name)
        self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member,
                       frequency=frequency, rename_var=variable,
                       region=region, move_older=True)

    @staticmethod
    def _update_time_variables(handler, startdate):
        time_var = handler.variables['time']
        times = Utils.get_datetime_from_netcdf(handler)
        if type(times[0]) is not datetime:
            for x in range(0, times.shape[0]):
                times[x] = times[x]._to_real_datetime()
        time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard')
        if 'axis_nbounds' in handler.dimensions:
            handler.renameDimension('axis_nbounds', 'bnds')

        if 'time_counter_bounds' in handler.variables:
            handler.renameVariable('time_counter_bounds', 'time_bnds')
            handler.sync()
        if 'time_bnds' in handler.variables:
            time_bounds_var = handler.variables['time_bnds']
            time_var.bounds = "time_bnds"

            time_bounds = Utils.get_datetime_from_netcdf(handler, 'time_bnds')
            if type(time_bounds[0, 0]) is not datetime:
                for x in range(0, time_bounds.shape[0]):
                    for y in range(0, time_bounds.shape[1]):
                        time_bounds[x, y] = time_bounds[x, y]._to_real_datetime()
            time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard')
        time_var.units = 'days since 1850-01-01'
        time_var.time_origin = "1850-01-01"
        time_var.calendar = 'standard'
        time_var.long_name = "Verification time of the forecast"
        time_var.standard_name = "time"
        time_var.axis = "T"
        if 'leadtime' in handler.variables:
            var = handler.variables['leadtime']
        else:
            var = handler.createVariable('leadtime', float, 'time')
        var.units = "days"
        var.long_name = "Time elapsed since the start of the forecast"
        var.standard_name = "forecast_period"
        leadtime = (Utils.get_datetime_from_netcdf(handler) - parse_date(startdate))
        for lt in range(0, leadtime.shape[0]):
            var[lt] = leadtime[lt].days

    def _add_common_attributes(self, frequency, handler, member, startdate):
        experiment = self.config.experiment
        handler.associated_experiment = cmor.associated_experiment
        handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)'))
        handler.contact = 'Pierre-Antoine Bretonnière, pierre-antoine.bretonniere@bsc.es , ' \
                          'Javier Vegas-Regidor, javier.vegas@bsc.es '
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.Conventions = 'CF-1.6'
        handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')
        handler.experiment_id = experiment.experiment_name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.forecast_reference_time = parse_date(startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)')
        if frequency == 'd':
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            handler.frequency = 'day'
        elif frequency == 'm':
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            handler.frequency = 'mon'
        handler.institute_id = experiment.institute
        handler.institution = experiment.institute
        handler.initialization_method = cmor.initialization_method
        handler.initialization_description = cmor.initialization_description
        handler.physics_version = cmor.physics_version
        handler.physics_description = cmor.physics_description
        handler.model_id = experiment.model
        handler.project_id = 'SPECS'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.realization = str(member + 1)
        handler.startdate = 'S{0}'.format(startdate)
        handler.tracking_id = str(uuid.uuid1())
        handler.title = "{0} model output prepared for SPECS {1}".format(experiment.model, experiment.experiment_name)
        for filepath in files:
            Log.debug('Unzipping {0}', filepath)
            try:
                Utils.execute_shell_command('gunzip {0}'.format(filepath))
            except Exception as ex:
                Log.error('Can not unzip {0}: {1}', filepath, ex)
                errors.append(filepath)
        return errors

    @staticmethod
    def _untar(files, member_path):
        for filepath in files:
            Log.debug('Unpacking {0}', filepath)
            Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        domain_abbreviation = self.domain_abbreviation(domain, frequency)

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain)
        chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}-{8}.nc'.format(var, domain_abbreviation, frequency,
                                                              self.experiment.model,
                                                              self.experiment.experiment_name, startdate, member_plus,
                                                              "{0:04}{1:02}".format(chunk_start.year,
                                                                                    chunk_start.month),
                                                              "{0:04}{1:02}".format(chunk_end.year,
                                                                                    chunk_end.month)))

        temp_path = TempFile.get()
        shutil.copyfile(filepath, temp_path)
        return temp_path

    def get_startdate_path(self, startdate):
        """
        Returns the path to the startdate's CMOR folder
        :param startdate: target startdate
        :type startdate: str
        :return: path to the startdate's CMOR folder
        :rtype: str
        """
        return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
                            self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None, date_str=None, move_older=False):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param date_str:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        original_var = var
        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if rename_var:
            Utils.rename_variable(filetosend, rename_var, var)
        elif original_var != var:
            Utils.rename_variable(filetosend, original_var, var)
        domain_abreviattion = self.domain_abbreviation(domain, frequency)
        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.get_startdate_path(startdate), frequency, domain)
            chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
            chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')

            time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                            chunk_end.month)

        elif year is not None:
            if frequency is not 'yr':
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str is not None:
            time_bound = date_str
        else:
            raise ValueError('Chunk and year can not be None at the same time')

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
                                          '{6}.nc'.format(var, domain_abreviattion, self.experiment.model,
                                                          self.experiment.experiment_name,
            Utils.convert2netcdf4(filetosend)
                handler = Utils.openCdf(filetosend)
                handler.createDimension('region')
                var_region = handler.createVariable('region', str, 'region')
                var_region[0] = region

                original_var = handler.variables[var]
                new_var = handler.createVariable('new_var', original_var.datatype,
                                                 original_var.dimensions + ('region',))
                new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
                value = original_var[:]
                new_var[..., 0] = value
                handler.close()

                Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
                Utils.rename_variable(filetosend, 'new_var', var)
            else:
                temp = TempFile.get()
                shutil.copyfile(filepath, temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
                handler = Utils.openCdf(temp)
                handler_send = Utils.openCdf(filetosend)
                value = handler_send.variables[var][:]
                var_region = handler.variables['region']
                basin_index = np.where(var_region[:] == region)
                if len(basin_index[0]) == 0:
                    var_region[var_region.shape[0]] = region
                    basin_index = var_region.shape[0] - 1

                else:
                    basin_index = basin_index[0][0]

                handler.variables[var][..., basin_index] = value
                handler.close()
                handler_send.close()
                Utils.move_file(temp, filetosend)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
        temp = TempFile.get()
        Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp])
        shutil.move(temp, filetosend)

        if cmor_var:
            handler = Utils.openCdf(filetosend)
            var_handler = handler.variables[var]
            var_handler.standard_name = cmor_var.standard_name
            var_handler.long_name = cmor_var.long_name
            var_handler.short_name = cmor_var.short_name
            handler.modeling_realm = cmor_var.domain
            handler.table_id = 'Table {0} (December 2013)'.format(self.domain_abbreviation(cmor_var.domain, frequency))
            if cmor_var.units:
                if 'units' in var_handler.ncattrs():
                    if var_handler.units == 'PSU':
                        var_handler.units = 'psu'
                    if var_handler.units == 'C' and cmor_var.units == 'K':
                        var_handler.units = 'deg_C'
                        try:
                            new_unit = Units(cmor_var.units)
                            old_unit = Units(var_handler.units)

                            var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True)

                            if 'valid_min' in var_handler.ncattrs():
                                var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit,
                                                                      inplace=True)
                            if 'valid_max' in var_handler.ncattrs():
                                var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit,
                                                                      inplace=True)
                        except ValueError:
                            factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
                                                                                         cmor_var.units)
                            var_handler[:] = var_handler[:] * factor + offset
                            if 'valid_min' in var_handler.ncattrs():
                                var_handler.valid_min = float(var_handler.valid_min) * factor + offset
                            if 'valid_max' in var_handler.ncattrs():
                                var_handler.valid_max = float(var_handler.valid_max) * factor + offset
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    var_handler.units = cmor_var.units
            if 'lev' in handler.variables:
                handler.variables['lev'].short_name = 'lev'
                if domain == 'ocean':
                    handler.variables['lev'].standard_name = 'depth'
            if 'lon' in handler.variables:
                handler.variables['lon'].short_name = 'lon'
                handler.variables['lon'].standard_name = 'longitude'
            if 'lat' in handler.variables:
                handler.variables['lat'].short_name = 'lat'
                handler.variables['lat'].standard_name = 'latitude'

            if cmor_var.valid_min != '':
                valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min)
            else:
                valid_min = ''

            if cmor_var.valid_max != '':
                valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max)
            else:
                valid_max = ''

            Utils.nco.ncatted(input=filetosend, output=filetosend,
                              options='-O -a _FillValue,{0},o,{1},"1.e20" '
                                      '-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char,
                                                                                        valid_min, valid_max))
        variables = dict()
        variables['x'] = 'i'
        variables['y'] = 'j'
        variables['nav_lat_grid_V'] = 'lat'
        variables['nav_lon_grid_V'] = 'lon'
        variables['nav_lat_grid_U'] = 'lat'
        variables['nav_lon_grid_U'] = 'lon'
        variables['nav_lat_grid_T'] = 'lat'
        variables['nav_lon_grid_T'] = 'lon'
        Utils.rename_variables(filetosend, variables, False, True)

        Utils.move_file(filetosend, filepath)
        self._create_link(domain, filepath, frequency, var, grid, move_older)
        """
        Corrects domain capitalization
        :param domain: domain name
        :type domain: str
        :return: domain with correct capitalization
        :rtype: str
        """
        domain = domain.lower()
        if domain == 'seaice':
            return 'seaIce'
        elif domain == 'landice':
            return 'landIce'
        return domain

    def _create_link(self, domain, filepath, frequency, var, grid, move_older):
        if frequency in ('d', 'daily', 'day'):
            freq_str = 'daily_mean'
        else:
            freq_str = 'monthly_mean'

        if grid:
            var = '{0}-{1}'.format(var, grid)

        if domain in ['ocean', 'seaIce']:
            variable_folder = '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
            variable_folder = '{0}_f{1}h'.format(var, self.atmos_timestep)
        link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
        if not os.path.exists(link_path):
            # This can be a race condition
            # noinspection PyBroadException
            try:
                os.makedirs(link_path)
            except Exception:
                pass
        elif move_older:
            if link_path not in self._checked_vars:
                old_path = link_path + '_old'
                regex = re.compile(var + '_[0-9]{6,8}\.nc')
                for filename in os.listdir(link_path):
                    if regex.match(filename):
                        if not os.path.exists(old_path):
                            # This can be a race condition
                            # noinspection PyBroadException
                            try:
                                os.makedirs(old_path)
                            except Exception:
                                pass
                        Utils.move_file(os.path.join(link_path, filename),
                                        os.path.join(old_path, filename))
                    self._checked_vars.append(link_path)

        link_path = os.path.join(link_path, os.path.basename(filepath))
        if os.path.lexists(link_path):
            os.remove(link_path)

    @staticmethod
    def domain_abbreviation(domain, frequency):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Returns the table name for a domain-frequency pair
        :param domain: variable's domain
        :type domain: str
        :param frequency: variable's frequency
        :type frequency: str
        :return: variable's table name
        :rtype: str
        """
        if frequency == 'mon':
            if domain == 'seaIce':
                domain_abreviattion = 'OImon'
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            elif domain == 'landIce':
                domain_abreviattion = 'LImon'
            else:
                domain_abreviattion = domain[0].upper() + 'mon'
        elif frequency == '6hr':
            domain_abreviattion = '6hrPlev'
        else:
            domain_abreviattion = 'day'
        return domain_abreviattion

    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        returns the path to the scratch's copy.

        :param year: year to retrieve
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        for chunk in self.experiment.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = TempFile.get()
            Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
            for chunk_file in chunk_files:
                os.remove(chunk_file)
        else:
            temp = chunk_files[0]
        temp2 = TempFile.get()
        handler = Utils.openCdf(temp)
        time = Utils.get_datetime_from_netcdf(handler)
        handler.close()
        start = None
        end = None
        for x in range(0, len(time)):
            date = time[x]
            if date.year == year:
                if date.month == 1: