datamanager.py 49.8 KB
Newer Older
# coding: latin-1
import Queue
import csv
import uuid
from datetime import datetime

import netCDF4
import numpy as np
import os
from autosubmit.config.log import Log
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day, add_months, \
    date2str
from basins import Basins
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import Utils, TempFile


class DataManager(object):
    def __init__(self, institution, model, expid, datafolder, frequency, chunk_size, experiment_name, num_chunks,
                 scratch_dir, nfrp, member_digits, calendar='standard'):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """

        :param institution:
        :param model:
        :param expid:
        :param datafolder:
        :param frequency:
        :param chunk_size:
        :param experiment_name:
        :param num_chunks:
        :param calendar:
        """
        self.initialization_method = 'to be filled'
        self.initialization_description = 'to be filled'
        self.physics_version = 'to be filled'
        self.physics_description = 'to be filled'
        self.associated_model = 'to be filled'
        self.source = 'to be filled'
        self.associated_experiment = 'to be filled'
        self.institution = institution
        self.model = model
        self.expid = expid
        self.data_dir = datafolder
        self.frequency = frequency
        self.chunk_size = chunk_size
        self.experiment_name = experiment_name
        self.add_startdate = True
        self.add_name = True
        self.num_chunks = num_chunks
        self.calendar = calendar
        self.scratch_dir = scratch_dir
        self.nfrp = nfrp
        self.member_digits = member_digits
    # noinspection PyPep8Naming
    def prepare_CMOR_files(self, startdates, members, force_rebuild):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Prepares the data to be used by the diagnostic.

        If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
        will be launched

        If CMOR data is available but packed, the procedure will unpack it.

        :param force_rebuild: if True, forces the creation of the CMOR files
        :type force_rebuild: bool
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param startdates: list of startdates that will be used by the diagnostics
        :type startdates: list[str]
        :param members: lists of members that will be used by the diagnostics
        :type members: list[int]
        :return:
        """
        # Check if cmorized and convert if not
        if force_rebuild or not os.path.exists(os.path.join(self.data_dir, self.expid, 'cmorfiles')):
            for startdate in startdates:
                for member in members:
                    Log.info('Untaring member S{0} {1}', startdate, self.get_member_str(member))
                    for tarfile in glob.glob(os.path.join(self.data_dir, self.expid, 'original_files', startdate,
                                                          'outputs', 'MMO*')):
                        self._unpack_tar(member, startdate, tarfile)
                    grb_path = os.path.join(self.data_dir, self.expid, 'original_files', startdate,
                                            self.get_member_str(member), 'outputs', '*.grb')
                    gribfiles = glob.glob(grb_path)
                    if len(gribfiles) == 0:
                        for tarfile in glob.glob(os.path.join(self.data_dir, self.expid, 'original_files', startdate,
                                                              self.get_member_str(member),
                                                              'outputs', 'MMA*')):
                            self._unpack_tar(member, startdate, tarfile)
                    else:
                        gribfiles.sort()
                        copied_gribfiles = list()
                        for gribfile in gribfiles:
                            shutil.copy(gribfile, os.path.join(self.scratch_dir, os.path.basename(gribfile)))
                            copied_gribfiles.append(os.path.join(self.scratch_dir, os.path.basename(gribfile)))

                        for gribfile in copied_gribfiles:
                            cdo = Utils.cdo
                            start = parse_date(gribfile[-10:-4])
                            month = '{0:02}'.format(start.month)
                            times = cdo.showtimestamp(input=gribfile)
                            times = times[0].split()[0:2]
                            time_diff = datetime.strptime(times[1], '%Y-%m-%dT%H:%M:%S') - datetime.strptime(times[0],
                                                                                                             '%Y-%m-%dT%H:%M:%S')
                            NFRP = (time_diff.seconds // 3600)

                            param_6hr = (151, 167, 168, 164, 165, 166, 129,)
                            param_day = (167, 165, 166, 151, 164, 168, 169, 177, 179, 228, 201, 202, 130)
                            param_mon = (167, 201, 202, 165, 166, 151, 144, 228, 205, 182, 164, 146, 147, 176, 169,
                                         177, 175, 212, 141, 180, 181, 179, 168, 243, 129, 130, 131, 132, 133)

                            grid = os.path.basename(gribfile)[3:5]

                            if os.path.exists('ICM{0}{1}+{2.year}{2.month:02}.grb'.format(grid, self.expid,
                                                                                          add_months(start, -1,
                                                                                                     'standard'))):
                                fd = open('rules_files', 'w')
                                fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(start))
                                fd.close()
                                # get first timestep for each month from previous file (if possible)
                                if os.path.exists('ICM'):
                                    os.remove('ICM')
                                Utils.execute_shell_command('grib_filter -o ICM rules_files '
                                                            'ICM{0}{1}+{2.year}{2.month:02}.grb '
                                                            '{3}'.format(grid, self.expid,
                                                                         add_months(start, -1, 'standard'),
                                                                         gribfile))
                                os.remove('rules_files')

                            else:
                                shutil.copy(gribfile, 'ICM')

                            # remap on regular Gauss grid
                            if grid == 'SH':
                                Utils.cdo.splitparam(input='-sp2gpl ICM', output=gribfile + '_')
                            else:
                                Utils.cdo.splitparam(input='ICM', output=gribfile + '_', options='-R')
                                # total precipitation (remove negative values)
                                Utils.cdo.setcode(228, input='-setmisstoc,0 -setvrange,0,Inf -add '
                                                             '{0}_{{142,143}}.128.grb'.format(gribfile),
                                                  output='{0}_228.128.grb'.format(gribfile))
                            os.remove('ICM')

                            cdo_reftime = parse_date(startdate).strftime('%Y-%m-%d,00:00')

                            # daily variables
                            for param in param_day:
                                if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
                                    continue
                                new_units = None
                                if param in (169, 177, 179):
                                    # radiation
                                    new_units = "W m-2"
                                    cdo_operator = "-divc,{0} -daymean -selmon,{2} " \
                                                   "-shifttime,-{1}hours".format(NFRP * 3600, NFRP, month)
                                elif param == 228:
                                    # precipitation
                                    new_units = "kg m-2 -s"
                                    cdo_operator = "-mulc,1000 -divc,{0} -daymean -selmon,{2} " \
                                                   "-shifttime,-{1}hours".format(NFRP * 3600, NFRP, month)
                                elif param == 201:
                                    # maximum
                                    cdo_operator = "-daymax -selmon,{1} -shifttime,-{0}hours".format(NFRP, month)
                                elif param == 202:
                                    # minmimum
                                    cdo_operator = "-daymin -selmon,{1} -shifttime,-{0}hours".format(NFRP, month)
                                elif param == 130:
                                    # 850 hPa
                                    cdo_operator = "-daymean -sellevel,85000 -selmon,{0}".format(month)
                                else:
                                    # default, plain daily mean
                                    cdo_operator = "-daymean -selmon,{0}".format(month)

                                Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
                                                            '{1} {2}_{3}.128.grb '
                                                            '{2}_{3}_day.nc'.format(cdo_reftime, cdo_operator,
                                                                                    gribfile, param))

                                if new_units:
                                    handler = Utils.openCdf('{0}_{1}_day.nc'.format(gribfile, param))
                                    for var in handler.variables.values():
                                        if 'code' in var.ncattrs() and var.code == param:
                                            var.units = new_units
                                            break
                                    handler.close()
                                # concat all vars in one file for day
                                Utils.nco.ncks(input='{0}_{1}_day.nc'.format(gribfile, param),
                                               output='{0}_day.nc'.format(gribfile), options='-A')
                                os.remove('{0}_{1}_day.nc'.format(gribfile, param))

                            # monthly variables
                            for param in param_mon:
                                if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
                                    continue
                                new_units = None
                                if param in (146, 147, 176, 169, 177, 175, 179, 212):
                                    # radiation/heat
                                    new_units = "W m-2"
                                    cdo_operator = "-divc,{0} -monmean -selmon,{2} " \
                                                   "-shifttime,-{1}hours".format(NFRP * 3600, NFRP, month)
                                elif param in (180, 181):
                                    # momentum flux
                                    new_units = "N m-2"
                                    cdo_operator = "-divc,{0} -monmean -selmon,{2} " \
                                                   "-shifttime,-{1}hours".format(NFRP * 3600, NFRP, month)
                                elif param in (144, 228, 205, 182):
                                    # precipitation/evaporation/runoff
                                    new_units = "kg m-2 s-1"
                                    cdo_operator = "-mulc,1000 -divc,{0} -monmean -selmon,{2} " \
                                                   "-shifttime,-{1}hours".format(NFRP * 3600, NFRP, month)
                                elif param == 201:
                                    # mean daily maximum
                                    cdo_operator = "-monmean -daymax -selmon,{1} " \
                                                   "-shifttime,-{0}hours".format(NFRP, month)
                                elif param == 202:
                                    # mean daily minmimum
                                    cdo_operator = "-monmean -daymin -selmon,{1} " \
                                                   "-shifttime,-{0}hours".format(NFRP, month)
                                elif param in (130, 131, 132, 133):
                                    # upper-air
                                    cdo_operator = "-monmean -sellevel,5000,20000,50000,85000 " \
                                                   "-selmon,{0}".format(month)
                                elif param == 129:
                                    # upper-air geopotential
                                    new_units = "m"
                                    cdo_operator = "-divc,9.81 -timmean -sellevel,5000,20000,50000,85000 " \
                                                   "-selmon,{0}".format(month)
                                else:
                                    # default, plain monthly mean
                                    cdo_operator = "-monmean -selmon,{0}".format(month)

                                Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
                                                            '{1} {2}_{3}.128.grb '
                                                            '{2}_{3}_mon.nc'.format(cdo_reftime, cdo_operator,
                                                                                    gribfile, param))
                                handler = Utils.openCdf('{0}_{1}_mon.nc'.format(gribfile, param))
                                if new_units:
                                    for var in handler.variables.values():
                                        if 'code' in var.ncattrs() and var.code == param:
                                            var.units = new_units
                                            break
                                var_name = None
                                for key in handler.variables.keys():
                                    if key + '_2' in handler.variables and key not in handler.dimensions:
                                        var_name = key

                                handler.close()
                                if var_name is not None:
                                    Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param),
                                                   output='{0}_{1}_mon.nc'.format(gribfile, param),
                                                   options='-O -v {0}'.format(var_name))

                                # concat all vars in one file for mon
                                Utils.nco.ncks(input='{0}_{1}_mon.nc'.format(gribfile, param),
                                               output='{0}_mon.nc'.format(gribfile), options='-A')
                                os.remove('{0}_{1}_mon.nc'.format(gribfile, param))

                            # 6-hourly variables
                            for param in param_6hr:
                                if not os.path.exists('{0}_{1}.128.grb'.format(gribfile, param)):
                                    continue
                                new_units = None
                                if param == 129:
                                    # geopotential
                                    new_units = "m"
                                    cdo_operator = "-divc,9.81 -sellevel,50000 -selmon,{0}".format(month)
                                else:
                                    # default, plain monthly mean
                                    cdo_operator = "-selmon,{0}".format(month)

                                Utils.execute_shell_command('cdo -f nc -t ecmwf setreftime,{0} '
                                                            '{1} {2}_{3}.128.grb '
                                                            '{2}_{3}_6hr.nc'.format(cdo_reftime, cdo_operator,
                                                                                    gribfile, param))
                                if new_units:
                                    handler = Utils.openCdf('{0}_{1}_6hr.nc'.format(gribfile, param))
                                    for var in handler.variables.values():
                                        if 'code' in var.ncattrs() and var.code == param:
                                            var.units = new_units
                                            break
                                    handler.close()
                                # concat all vars in one file for 6hr
                                Utils.nco.ncks(input='{0}_{1}_6hr.nc'.format(gribfile, param),
                                               output='{0}_6hr.nc'.format(gribfile), options='-A')
                                os.remove('{0}_{1}_6hr.nc'.format(gribfile, param))

                            for splited_file in glob.glob('{0}_???.128.grb'.format(gribfile)):
                                os.remove(splited_file)

                        chunk_start = parse_date(startdate)
                        while os.path.exists(os.path.join(self.scratch_dir,
                                                          'ICMGG{0}+{1}.grb'.format(self.expid,
                                                                                    date2str(chunk_start)[:-2]))):
                            chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
                            chunk_end = previous_day(chunk_end, 'standard')
                            chunk_files_gg_mon = list()
                            chunk_files_gg_day = list()
                            chunk_files_gg_6h = list()

                            chunk_files_sh_mon = list()
                            chunk_files_sh_day = list()
                            chunk_files_sh_6h = list()

                            for month in range(0, self.chunk_size):
                                chunk_file = 'ICMGG{0}+{1}.grb'.format(self.expid,
                                                                       date2str(add_months(chunk_start, month,
                                                                                           'standard'))[:-2])
                                os.remove(chunk_file)
                                os.remove('ICMSH' + chunk_file[5:])
                                chunk_files_gg_mon.append(chunk_file + '_mon.nc')
                                chunk_files_gg_day.append(chunk_file + '_day.nc')
                                chunk_files_gg_6h.append(chunk_file + '_6hr.nc')
                                chunk_files_sh_mon.append('ICMSH' + chunk_file[5:] + '_mon.nc')
                                chunk_files_sh_day.append('ICMSH' + chunk_file[5:] + '_day.nc')
                                chunk_files_sh_6h.append('ICMSH' + chunk_file[5:] + '_6hr.nc')

                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_mon,
                                                          'SH', '1m')
                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_day,
                                                          'SH', '1d')
                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_sh_6h,
                                                          'SH', '6hr')
                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_mon,
                                                          'GG', '1m')
                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_day,
                                                          'GG', '1d')
                            self._merge_and_cmorize_atmos(startdate, member, chunk_start, chunk_end, chunk_files_gg_6h,
                                                          'GG', '6hr')
                            chunk_start = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
                return
        for startdate in startdates:
            for member in members:
                member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate,
                                           self.get_member_str(member), 'outputs')
                Log.info('Preparing CMOR files for startdate {0} and member {1}'.format(startdate, member))

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                filepaths = glob.glob(os.path.join(member_path, '*.gz'))

                if len(filepaths) == 0:
                    continue

                self._unpack_cmorfiles(filepaths, member_path)

    def _merge_and_cmorize_atmos(self, startdate, member, chunk_start, chunk_end, chunk_files, grid, frequency):
        merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
        for x in range(0, len(chunk_files)):
            chunk_files[x] = os.path.join(self.scratch_dir, chunk_files[x])
        Utils.cdo.mergetime(input=' '.join(chunk_files), output=merged_file, options='-O')
        for filepath in chunk_files:
            os.remove(filepath)
        self._cmorize_nc_file(merged_file, member, startdate)

    def _unpack_cmorfiles(self, filepaths, member_path):
        threads = list()
        numthreads = Utils.available_cpu_count()
        for numthread in range(0, numthreads):
            t = threading.Thread(target=DataManager._unzip,
                                 args=([filepaths[numthread::numthreads]]))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        filepaths = glob.glob(os.path.join(member_path, '*.tar'))
        for numthread in range(0, numthreads):
            t = threading.Thread(target=DataManager._untar,
                                 args=(filepaths[numthread::numthreads], member_path))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()
        if self.experiment_name != self.model:
            bad_path = os.path.join(member_path, 'output', self.institution, self.model, self.model)
            for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_output_'.format(self.model),
                                            '_{0}_{1}_'.format(self.model, self.experiment_name))

                    good = good.replace('/{0}/{0}'.format(self.model),
                                        '/{0}/{1}'.format(self.model,
                                                          self.experiment_name))

                    Utils.move_file(filepath, good)
                os.rmdir(dirpath)
        good_dir = os.path.join(member_path, 'output', self.institution, self.model, self.experiment_name)
        for sdate in os.listdir(good_dir):
            for (dirpath, dirnames, filenames) in os.walk(os.path.join(good_dir, sdate), False):
                for filename in filenames:
                    filepath = os.path.join(dirpath, filename)
                    good = filepath.replace('_{0}_{1}_r'.format(self.model, self.experiment_name, sdate),
                                            '_{0}_{1}_{2}_r'.format(self.model, self.experiment_name, sdate))
                    if good != filepath:
                        Log.info('Moving {0} to {1}'.format(filename, good))
                        Utils.move_file(filepath, good)

    def _unpack_tar(self, member, startdate, tarfile):
        Log.info('Unpacking {0}', tarfile)

        scratch_dir = os.path.join(self.scratch_dir, 'CMOR')
        os.makedirs(scratch_dir)
        self._untar((tarfile,), scratch_dir)
        self._unzip(glob.glob(os.path.join(scratch_dir, '*.gz')))

        if os.path.basename(tarfile).startswith('MMA'):
            temp = TempFile.get()
            for filename in glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc')):
                Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
                shutil.move(temp, filename)

            sh_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_SH_*.nc'))
            Utils.cdo.mergetime(input=sh_files, output=os.path.join(scratch_dir, 'sh.nc'))

            gg_files = glob.glob(os.path.join(scratch_dir, 'MMA_*_GG_*.nc'))
            Utils.cdo.mergetime(input=gg_files, output=os.path.join(scratch_dir, 'gg.nc'))

            for filename in sh_files + gg_files:
                os.remove(filename)

            Utils.nco.ncks(input=os.path.join(scratch_dir, 'sh.nc'),
                           output=os.path.join(scratch_dir, 'gg.nc'), options='-A')
            os.remove(os.path.join(scratch_dir, 'sh.nc'))

            tar_startdate = tarfile[0:-4].split('_')[5].split('-')
            new_name = 'MMA_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)
            shutil.move(os.path.join(scratch_dir, 'gg.nc'), os.path.join(scratch_dir, new_name))

        for filename in glob.glob(os.path.join(scratch_dir, '*.nc')):
            self._cmorize_nc_file(filename, member, startdate)

    def _cmorize_nc_file(self, filename, member, startdate):
        Log.info('Processing file {0}', filename)
        file_parts = os.path.basename(filename).split('_')
        frequency = file_parts[1][1].lower()
        variables = dict()
        variables['time_counter'] = 'time'
        variables['time_counter_bnds'] = 'time_bnds'
        # variables['time_counter_bounds'] = 'time_bnds'
        variables['tbnds'] = 'bnds'
        # variables['axis_nbounds'] = 'bnds'
        variables['nav_lat'] = 'lat'
        variables['nav_lon'] = 'lon'
        variables['x'] = 'i'
        variables['y'] = 'j'
        Utils.rename_variables(filename, variables, False, True)
        handler = Utils.openCdf(filename)
        self._add_common_attributes(frequency, handler, member, startdate)
        self._update_time_variables(handler, startdate)
        handler.sync()
        temp = TempFile.get()
        Log.info('Splitting file {0}', filename)
        for variable in handler.variables.keys():
            if variable in ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'icethi',
                            'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
                            'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
                            'time_counter_bounds', 'ncatice',
                            'nav_lat_grid_V', 'nav_lat_grid_U', 'nav_lat_grid_T',
                            'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',):
                continue
            self.extract_variable(file_parts, filename, frequency, handler, member, startdate, temp,
                                  variable)
        Log.result('File {0} cmorized!', filename)
        handler.close()
        os.remove(filename)

    def extract_variable(self, file_parts, filename, frequency, handler, member, startdate, temp, variable):
        var_cmor = Variable.get_variable(variable)
        Utils.nco.ncks(input=filename, output=temp, options='-v {0}'.format(variable))
        Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev'}, False, True)
        handler_cmor = Utils.openCdf(temp)
        Utils.copy_variable(handler, handler_cmor, 'lon', False)
        Utils.copy_variable(handler, handler_cmor, 'lat', False)
        if 'time' in handler_cmor.dimensions.keys():
            Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
        handler_cmor.modeling_realm = var_cmor.domain
        handler_cmor.table_id = 'SPECS_' + self.domain_abbreviation(var_cmor.domain, frequency)
        var_handler = handler_cmor.variables[variable]
        var_handler.short_name = var_cmor.short_name
        var_handler.standard_name = var_cmor.standard_name
        var_handler.long_name = var_cmor.long_name
        handler_cmor.close()
        if frequency == 'd':
            frequency = 'day'
        elif frequency == 'm':
            frequency = 'mon'
        elif frequency == 'h':
            frequency = '6hr'
        else:
            raise Exception('Frequency {0} not supported'.format(frequency))

        if var_cmor.basin is None:
            region = None
        else:
            region = var_cmor.basin.fullname

        self.send_file(temp, var_cmor.domain, var_cmor.short_name, startdate, member,
                       frequency=frequency, rename_var=variable,
                       date_str='{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6]),
                       region=region)

    @staticmethod
    def _update_time_variables(handler, startdate):
        time_var = handler.variables['time']
        times = netCDF4.num2date(time_var[:], time_var.units, time_var.calendar)
        if type(times[0]) is not datetime:
            for x in range(0, times.shape[0]):
                times[x] = times[x]._to_real_datetime()
        time_var[:] = netCDF4.date2num(times, 'days since 1850-01-01', 'standard')
        if 'axis_nbounds' in handler.dimensions:
            handler.renameDimension('axis_nbounds', 'bnds')

        if 'time_counter_bounds' in handler.variables:
            handler.renameVariable('time_counter_bounds', 'time_bnds')
            handler.sync()
        if 'time_bnds' in handler.variables:
            time_bounds_var = handler.variables['time_bnds']
            time_var.bounds = "time_bnds"

            time_bounds = netCDF4.num2date(time_bounds_var[:], time_var.units, time_var.calendar)
            if type(time_bounds[0, 0]) is not datetime:
                for x in range(0, time_bounds.shape[0]):
                    for y in range(0, time_bounds.shape[1]):
                        time_bounds[x, y] = time_bounds[x, y]._to_real_datetime()
            time_bounds_var[:] = netCDF4.date2num(time_bounds, 'days since 1850-01-01', 'standard')
        time_var.units = 'days since 1850-01-01'
        time_var.time_origin = "1850-01-01"
        time_var.calendar = 'standard'
        time_var.long_name = "Verification time of the forecast"
        time_var.standard_name = "time"
        time_var.axis = "T"
        if 'leadtime' in handler.variables:
            var = handler.variables['leadtime']
        else:
            var = handler.createVariable('leadtime', float, 'time')
        var.units = "days"
        var.long_name = "Time elapsed since the start of the forecast"
        var.standard_name = "forecast_period"
        leadtime = (netCDF4.num2date(time_var[:], time_var.units, time_var.calendar) - parse_date(startdate))
        for lt in range(0, leadtime.shape[0]):
            var[lt] = leadtime[lt].days

    def _add_common_attributes(self, frequency, handler, member, startdate):
        handler.associated_experiment = self.associated_experiment
        handler.batch = '{0}{1}'.format(self.institution, datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ'))
        handler.contact = 'Pierre-Antoine Bretonnire, pierre-antoine.bretonniere@bsc.es , ' \
                          'Javier Vegas-Regidor, javier.vegas@bsc.es '
        handler.conventions = 'CF-1.6'
        handler.creation_date = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
        handler.experiment_id = self.experiment_name
        handler.forecast_reference_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
        if frequency == 'd':
            handler.frequency = 'daily'
        elif frequency == 'm':
            handler.frequency = 'monthly'
        handler.institute_id = self.institution
        handler.institution = self.institution
        handler.initialization_method = self.initialization_method
        handler.initialization_description = self.initialization_description
        handler.physics_version = self.physics_version
        handler.physics_description = self.physics_description
        handler.model_id = self.model
        handler.associated_model = self.associated_model
        handler.project_id = 'SPECS'
        handler.realization = member
        handler.startdate = 'S{0}'.format(startdate)
        handler.tracking_id = str(uuid.uuid1())
        handler.title = "{0} model output prepared for SPECS {1}".format(self.model, self.experiment_name)
    @staticmethod
    def _unzip(files):
        for filepath in files:
            Log.debug('Unzipping {0}', filepath)
            Utils.execute_shell_command('gunzip {0}'.format(filepath))

    @staticmethod
    def _untar(files, member_path):
        for filepath in files:
            Log.debug('Unpacking {0}', filepath)
            Utils.execute_shell_command('tar -xvf {0} -C {1}'.format(filepath, member_path))
            # os.remove(filepath)

    def get_files(self, startdate, member, chunk, domain, variables, grid=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Returns a list of filenames for different variables

        :param startdate: startdate to retrieve
        :type startdate: str
        :param member: member to retrieve
        :type member: int
        :param chunk: chunk to retrieve
        :type chunk: int
        :param domain: variable's CMOR domain
        :type domain:str
        :param variables: variables list
        :type variables: list[str], tuple[str]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param grid: specifies if the variable must be in a interpolated grid
        :type grid: str
        :return:
        """

        file_names = list()

        domain_abreviattion = self.domain_abbreviation(domain, self.frequency)

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member),
                                   'outputs', 'output', self.institution, self.model, self.experiment_name,
                                   'S' + startdate, self.frequency, domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        var_file = list()
        for var in variables:
            if grid:
                var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
            else:
                var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

            var_file.append(os.path.join(var_path,
                                         '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_'
                                         '{7}-{8}.nc'.format(var, domain_abreviattion,
                                                             self.frequency, self.model, self.experiment_name,
                                                             startdate, member_plus,
                                                             "{0:04}{1:02}".format(chunk_start.year,
                                                                                   chunk_start.month),
                                                             "{0:04}{1:02}".format(chunk_end.year,
                                                                                   chunk_end.month))))
        file_names.append(var_file)

        return file_names
    def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy

        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        if not frequency:
            frequency = self.frequency

        domain_abbreviation = self.domain_abbreviation(domain, frequency)

        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member),
                                   'outputs', 'output', self.institution, self.model, self.experiment_name,
                                   'S' + startdate, frequency, domain)

        chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
        chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
        chunk_end = previous_day(chunk_end, 'standard')

        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}_{3}_{4}_S{5}_r{6}i1p1_'
                                          '{7}-{8}.nc'.format(var, domain_abbreviation, frequency, self.model,
                                                              self.experiment_name, startdate, member_plus,
                                                              "{0:04}{1:02}".format(chunk_start.year,
                                                                                    chunk_start.month),
                                                              "{0:04}{1:02}".format(chunk_end.year,
                                                                                    chunk_end.month)))

        temp_path = TempFile.get()
        shutil.copyfile(filepath, temp_path)
        return temp_path

    def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
                  rename_var=None, frequency=None, year=None, date_str=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
        with already existing ones as needed

        :param date_str:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :param year: if frequency is yearly, this parameter is used to give the corresponding year
        :type year: int
        :param rename_var: if exists, the given variable will be renamed to the one given by var
        :type rename_var: str
        :param filetosend: path to the file to send to the CMOR repository
        :type filetosend: str
        :param region: specifies the region represented by the file. If it is defined, the data will be appended to the
            CMOR repository as a new region in the file or will overwrite if region was already present
        :type region: str
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param chunk: file's chunk
        :type chunk: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :param frequency: file's frequency (only needed if it is different from the default)
        :type frequency: str
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        if box:
            var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()

        if rename_var:
            Utils.rename_variable(filetosend, rename_var, var)

        if not frequency:
            frequency = self.frequency
        domain_abreviattion = self.domain_abbreviation(domain, frequency)
        start = parse_date(startdate)
        member_plus = str(member + 1)
        member_path = os.path.join(self.data_dir, self.expid, 'cmorfiles', startdate, self.get_member_str(member),
                                   'outputs', 'output', self.institution, self.model, self.experiment_name,
                                   'S' + startdate,
                                   frequency, domain)
        if chunk is not None:
            chunk_start = chunk_start_date(start, chunk, self.chunk_size, 'month', 'standard')
            chunk_end = chunk_end_date(chunk_start, self.chunk_size, 'month', 'standard')
            chunk_end = previous_day(chunk_end, 'standard')

            time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
                                                            chunk_end.month)

        elif year is not None:
            if frequency is not 'yr':
                raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
            time_bound = str(year)
        elif date_str is not None:
            time_bound = date_str
        else:
            raise ValueError('Chunk and year can not be None at the same time')

        if grid:
            var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
        else:
            var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))

        filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
                                          '{6}.nc'.format(var, domain_abreviattion, self.model,
                                                          self.experiment_name, startdate, member_plus, time_bound))
        if region:
            if not os.path.exists(filepath):

                handler = Utils.openCdf(filetosend)
                handler.createDimension('region')
                var_region = handler.createVariable('region', str, 'region')
                var_region[0] = region

                original_var = handler.variables[var]
                new_var = handler.createVariable('new_var', original_var.datatype,
                                                 original_var.dimensions + ('region',))
                new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
                value = original_var[:]
                new_var[..., 0] = value
                handler.close()

                Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
                Utils.rename_variable(filetosend, 'new_var', var)
            else:
                temp = TempFile.get()
                shutil.copyfile(filepath, temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
                handler = Utils.openCdf(temp)
                handler_send = Utils.openCdf(filetosend)
                value = handler_send.variables[var][:]
                var_region = handler.variables['region']
                basin_index = np.where(var_region[:] == region)
                if len(basin_index[0]) == 0:
                    var_region[var_region.shape[0]] = region
                    basin_index = var_region.shape[0] - 1

                else:
                    basin_index = basin_index[0][0]

                handler.variables[var][..., basin_index] = value
                handler.close()
                handler_send.close()
                Utils.move_file(temp, filetosend)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')

        temp = TempFile.get()
        Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp])
        shutil.move(temp, filetosend)

        if frequency in ('d', 'daily', 'day'):
            freq_str = 'daily_mean'
        else:
            freq_str = 'monthly_mean'

        if domain in ['ocean', 'seaIce']:
            link_path = os.path.join(self.data_dir, self.expid, freq_str, '{0}_f6h'.format(var))
        else:
            link_path = os.path.join(self.data_dir, self.expid, freq_str, '{0}_f{1}h'.format(var, self.nfrp))

        if not os.path.exists(link_path):
            # This can be a race condition
            # noinspection PyBroadException
            try:
                os.makedirs(link_path)
            except Exception:
                pass

        link_path = os.path.join(link_path, os.path.basename(filepath))
        if os.path.lexists(link_path):
            os.remove(link_path)

    @staticmethod
    def domain_abbreviation(domain, frequency):
        if frequency == 'mon':
            if domain == 'seaIce':
                domain_abreviattion = 'OImon'
            else:
                domain_abreviattion = domain[0].upper() + 'mon'
        elif frequency == '6hr':
            domain_abreviattion = '6hrPlev'
        else:
            domain_abreviattion = 'day'
        return domain_abreviattion

    def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        returns the path to the scratch's copy.

        :param year: year to retrieve
        :type year: int
        :param domain: CMOR domain
        :type domain: str
        :param var: variable name
        :type var: str
        :param startdate: file's startdate
        :type startdate: str
        :param member: file's member
        :type member: int
        :param grid: file's grid (only needed if it is not the original)
        :type grid: str
        :param box: file's box (only needed to retrieve sections or averages)
        :type box: Box
        :return: path to the copy created on the scratch folder
        :rtype: str
        """
        chunk_files = list()
        for chunk in self.get_year_chunks(startdate, year):
            chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))

        if len(chunk_files) > 1:
            temp = TempFile.get()
            Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
            for chunk_file in chunk_files:
                os.remove(chunk_file)
        else:
            temp = chunk_files[0]
        temp2 = TempFile.get()
        handler = Utils.openCdf(temp)
        time = Utils.get_datetime_from_netcdf(handler)
        handler.close()
        start = None
        end = None
        for x in range(0, len(time)):
            date = time[x]
            if date.year == year:
                if date.month == 1:
                    start = x
                elif date.month == 12:
                    end = x

        Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
        os.remove(temp)
        return temp2

    def get_year_chunks(self, startdate, year):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Get the list of chunks containing timesteps from the given year
        :param startdate: startdate to use
        :type startdate: str
        :param year: reference year
        :type year: int
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        :return: list of chunks containing data from the given year
        :rtype: list[int]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        date = parse_date(startdate)
        chunks = list()
        for chunk in range(1, self.num_chunks + 1):
            chunk_start = chunk_start_date(date, chunk, self.chunk_size, 'month', self.calendar)
            if chunk_start.year > year:
                break
            elif chunk_start.year == year or chunk_end_date(chunk_start, self.chunk_size, 'month',
                                                            self.calendar).year == year:
                chunks.append(chunk)

        return chunks

    def get_full_years(self, startdate):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Returns the list of full years that are in the given startdate
        :param startdate: startdate to use
        :type startdate: str
        :return: list of full years
        :rtype: list[int]
        """
        chunks_per_year = 12 / self.chunk_size
        date = parse_date(startdate)
        first_january = 0
        first_year = date.year
        if date.month != 1:
            month = date.month
            first_year += 1
            while month + self.chunk_size < 12:
                month += self.chunk_size
                first_january += 1

        years = list()
        for chunk in range(first_january, self.num_chunks - chunks_per_year, chunks_per_year):
            years.append(first_year)
            first_year += 1
        return years
    def get_member_str(self, member):
        return 'fc{0}'.format(str(member).zfill(self.member_digits))


class Variable(object):
    def __init__(self, line):
        self.short_name = line[1]
        self.standard_name = line[2]
        self.long_name = line[3]
        self.domain = line[4]
        self.basin = Basins.parse(line[5])

    @classmethod
    def get_variable(cls, original_name):
        try:
            return Variable._dict_variables[original_name.lower()]

        except AttributeError:
            Variable._dict_variables = dict()
            with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'cmor_table.csv'), 'rb') as csvfile:
                reader = csv.reader(csvfile, dialect='excel')
                for line in reader:
                    if line[0] == 'variable':
                        continue
                    var = Variable(line)
                    if not var.short_name:
                        continue
                    for old_name in line[0].split(':'):
                        Variable._dict_variables[old_name] = var
                    Variable._dict_variables[var.short_name] = var
            return Variable.get_variable(original_name)

        except KeyError:
            Log.error('Variable {0} is not defined'.format(original_name))
            return None