cmorizer.py 30.4 KB
Newer Older
# coding=utf-8
import glob
import shutil
import uuid

import os
from datetime import datetime

import pygrib
from bscearth.utils.log import Log
from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.variable import VariableManager
from earthdiagnostics.datafile import NetCDFFile


class Cmorizer(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Class to manage CMORization

    :param data_manager: experiment's data manager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    :param startdate: startdate to cmorize
    :type startdate: str
    :param member: member to cmorize
    :type member: int

    """
    NON_DATA_VARIABLES = ('lon', 'lat', 'time', 'time_bnds', 'leadtime', 'lev', 'lev_2', 'icethi',
                          'deptht', 'depthu', 'depthw', 'depthv', 'time_centered', 'time_centered_bounds',
                          'deptht_bounds', 'depthu_bounds', 'depthv_bounds', 'depthw_bounds',
                          'deptht_bnds', 'depthu_bnds', 'depthv_bnds', 'depthw_bnds',
                          'time_counter_bounds', 'ncatice', 'nav_lat_grid_V', 'nav_lat_grid_U',
                          'nav_lat_grid_T', 'nav_lon_grid_V', 'nav_lon_grid_U', 'nav_lon_grid_T',
                          'depth', 'depth_2', 'depth_3', 'depth_4',
                          'depth_bnds', 'depth_2_bnds', 'depth_3_bnds', 'depth_4_bnds',
                          'mlev', 'hyai', 'hybi', 'hyam', 'hybm')

    ALT_COORD_NAMES = {'time_counter': 'time', 'time_counter_bnds': 'time_bnds', 'time_counter_bounds': 'time_bnds',
                       'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', 'y': 'j'}
    def __init__(self, data_manager, startdate, member):
        self.data_manager = data_manager
        self.startdate = startdate
        self.member = member
        self.config = data_manager.config
        self.experiment = self.config.experiment
        self.cmor = self.config.cmor
        self.member_str = self.experiment.get_member_str(member)
        self.original_files_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files',
                                                self.startdate, self.member_str, 'outputs')
        self.atmos_timestep = None
        self.cmor_scratch = str(os.path.join(self.config.scratch_dir, 'CMOR', self.startdate, self.member_str))

    def cmorize_ocean(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        CMORizes ocean files from MMO files
        :return:
        """
        if not self.cmor.ocean:
            Log.info('Skipping ocean cmorization due to configuration')
        self._cmorize_ocean_files('MMO', 'PPO', 'diags')
    def _cmorize_ocean_files(self, *args):
        tar_files = ()
        for prefix in args:
            tar_folder = os.path.join(self.original_files_path, '{0}*'.format(prefix))
            tar_files = glob.glob(tar_folder)
            tar_files.sort()
            if len(tar_files) > 0:
                break

        if not len(tar_files):
            Log.error('No {1} files found in {0}'.format(self.original_files_path, args))
        count = 1
        for tarfile in tar_files:
            if not self.cmorization_required(self.get_chunk(os.path.basename(tarfile)), (ModelingRealms.ocean,
                                                                                         ModelingRealms.seaIce,
                                                                                         ModelingRealms.ocnBgchem)):
                Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files)))
            Log.info('Unpacking oceanic file {0}/{1}'.format(count, len(tar_files)))
            try:
                self._unpack_tar_file(tarfile)
                self._cmorize_nc_files()
                Log.result('Oceanic file {0}/{1} finished'.format(count, len(tar_files)))
            except Exception as ex:
                Log.error('Could not CMORize oceanic file {0}: {1}', count, ex)
    def _filter_files(self, file_list):
        filtered = list()
        filters = self.cmor.filter_files.split(' ')
        for file_path in file_list:
            filename = os.path.basename(file_path)
            if any(f in filename for f in filters):
        if len(filtered) == 0:
            Log.warning('Filters {0} do not match any of the files', filters)
        return filtered

    def _cmorize_nc_files(self):
        nc_files = glob.glob(os.path.join(self.cmor_scratch, '*.nc'))
        for filename in nc_files:
            self._cmorize_nc_file(filename)
        self._clean_cmor_scratch()
    def _correct_fluxes(self):
        fluxes_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name
                       for cmor_var in ('prc', "prsn", "rss", "rls", "rsscs", "rsds", "rlds", "hfss", 'hfls')]
        change_sign_vars = [self.data_manager.variable_list.get_variable(cmor_var, True).short_name
                            for cmor_var in ("hfss", 'hfls')]
        total_seconds = (self.experiment.atmos_timestep * 3600)
        for filename in glob.glob(os.path.join(self.cmor_scratch, '*.nc')):
            handler = Utils.openCdf(filename)
            for varname in handler.variables.keys():
                cmor_var = self.data_manager.variable_list.get_variable(varname, True)
                if cmor_var is None or cmor_var.short_name not in fluxes_vars:

                if cmor_var.short_name in change_sign_vars:
                    sign = -1
                else:
                    sign = 1

                var_handler = handler.variables[varname]
                var_handler[:] = sign * var_handler[:] / total_seconds
                var_handler.units = '{0} {1}'.format(var_handler.units, 's-1')
    def _unpack_tar_file(self, tarfile):
        self._clean_cmor_scratch()
        os.makedirs(self.cmor_scratch)
        Utils.untar((tarfile,), self.cmor_scratch)
        zip_files = glob.glob(os.path.join(self.cmor_scratch, '*.gz'))
        for zip_file in self._filter_files(zip_files):
            try:
                Utils.unzip(zip_file)
            except Utils.UnzipException as ex:
                Log.error('File {0} could not be unzipped: {1}', tarfile, ex)
    def _clean_cmor_scratch(self):
        if os.path.exists(self.cmor_scratch):
            shutil.rmtree(self.cmor_scratch)

    def _merge_mma_files(self, tarfile):
        temp = TempFile.get()
        sh_files = glob.glob(os.path.join(self.cmor_scratch, 'MMA_*_SH_*.nc'))
        gg_files = glob.glob(os.path.join(self.cmor_scratch, 'MMA_*_GG_*.nc'))

        merged_sh = TempFile.get()
        merged_gg = TempFile.get()

        for filename in sh_files:
            Utils.cdo.sp2gpl(options='-O', input=filename, output=temp)
            shutil.move(temp, filename)
        Utils.cdo.mergetime(input=sh_files, output=merged_sh)
        Utils.cdo.mergetime(input=gg_files, output=merged_gg)
        for filename in sh_files + gg_files:
            os.remove(filename)
        tar_startdate = tarfile[0:-4].split('_')[5].split('-')
        shutil.move(merged_gg, os.path.join(self.cmor_scratch, 'MMAGG_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)))
        shutil.move(merged_sh, os.path.join(self.cmor_scratch, 'MMASH_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)))

    def cmorize_atmos(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        CMORizes atmospheric data, from grib or MMA files
        :return:
        """
        if not self.cmor.atmosphere:
            Log.info('Skipping atmosphere cmorization due to configuration')
        Log.info('\nCMORizing atmosphere\n')
        if self.cmor.use_grib and self.gribfiles_available():
            self._cmorize_grib_files()
            self._cmorize_mma_files()

    def _cmorize_mma_files(self):
        tar_files = glob.glob(os.path.join(self.original_files_path, 'MMA*'))
        tar_files.sort()
        count = 1
        if len(tar_files) == 0:
            Log.error('MMA files not found in {0}'.format(self.original_files_path))
        for tarfile in tar_files:
            if not self.cmorization_required(self.get_chunk(os.path.basename(tarfile)), (ModelingRealms.atmos,)):
                Log.info('No need to unpack file {0}/{1}'.format(count, len(tar_files)))
                count += 1
                continue
            Log.info('Unpacking atmospheric file {0}/{1}'.format(count, len(tar_files)))
            try:
                self._unpack_tar_file(tarfile)
                self._merge_mma_files(tarfile)
                self._correct_fluxes()
                self._cmorize_nc_files()
                Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
            except Exception as ex:
                Log.error('Could not cmorize atmospheric file {0}: {1}', count, ex)

            count += 1

    def _cmorize_grib_files(self):
        chunk_start = parse_date(self.startdate)

        while os.path.exists(self.get_original_grib_path(chunk_start, 'GG')) or \
                os.path.exists(self.get_original_grib_path(chunk_start, 'SH')):
            if self.cmorization_required(chunk, (ModelingRealms.atmos,)):
                chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
                chunk_end = previous_day(chunk_end, self.experiment.calendar)
                Log.info('CMORizing chunk {0}-{1}', date2str(chunk_start), date2str(chunk_end))
                try:
                    for grid in ('SH', 'GG'):
                        Log.info('Processing {0} variables', grid)

                        if not os.path.exists(self.get_original_grib_path(chunk_start, grid)):
                            continue
                        self.cmorize_grib_file(chunk_end, chunk_start, grid)
                except Exception as ex:
                    Log.error('Can not cmorize GRIB file for chunk {0}-{1}: {2}',
                              date2str(chunk_start), date2str(chunk_end), ex)
            chunk_start = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', self.experiment.calendar)
    def cmorize_grib_file(self, chunk_end, chunk_start, grid):
        for month in range(0, self.experiment.chunk_size):
            current_date = add_months(chunk_start, month, self.experiment.calendar)
            original_gribfile = self.get_original_grib_path(current_date, grid)
            Log.info('Processing month {1}', grid, date2str(current_date))
            gribfile = self.get_scratch_grib_path(current_date, grid)
            if not os.path.isfile(gribfile):
                Log.info('Copying file...', grid, date2str(current_date))
                Utils.copy_file(original_gribfile, gribfile)

            self._obtain_atmos_timestep(gribfile)
            full_file = self._get_monthly_grib(current_date, gribfile, grid)
            self._unpack_grib(full_file, gribfile, grid)
            next_gribfile = self.get_original_grib_path(add_months(current_date, 1, self.experiment.calendar), grid)

            if not os.path.exists(next_gribfile):
                os.remove(gribfile)

            cdo_reftime = parse_date(self.startdate).strftime('%Y-%m-%d,00:00')

            self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequency('{0}hr'.format(self.atmos_timestep)))
            self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequencies.daily)
            self._ungrib_vars(cdo_reftime, gribfile, current_date.month, Frequencies.monthly)

            for splited_file in glob.glob('{0}_*.128.nc'.format(gribfile)):
                os.remove(splited_file)

            Log.result('Month {0}, {1} variables finished', date2str(current_date), grid)
        self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.monthly)
        self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid, Frequencies.daily)
        self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid,
                                      '{0}hr'.format(self.atmos_timestep))

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _unpack_grib(full_file, gribfile, grid):
        Log.info('Unpacking... ')
        # remap on regular Gauss grid
        if grid == 'SH':
            Utils.cdo.splitparam(input='-sp2gpl {0}'.format(full_file), output=gribfile + '_', options='-f nc4')
            Utils.cdo.splitparam(input=full_file, output=gribfile + '_', options='-R -f nc4')
            # total precipitation (remove negative values)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Utils.cdo.setcode(228,
                              input='-setmisstoc,0 -setvrange,0,Inf -add {0}_{{142,143}}.128.nc'.format(gribfile),
                              output='{0}_228.128.nc'.format(gribfile))
        Utils.remove_file('ICM')

    def _get_monthly_grib(self, current_date, gribfile, grid):
        prev_gribfile = self.get_scratch_grib_path(add_months(current_date, -1, self.experiment.calendar), grid)
        if os.path.exists(prev_gribfile):
            self._merge_grib_files(current_date, prev_gribfile, gribfile)
            full_file = 'ICM'
        else:
            full_file = gribfile
        return full_file

    def get_scratch_grib_path(self, current_date, grid):
        return os.path.join(self.config.scratch_dir, self._get_grib_filename(grid, current_date))

    def _obtain_atmos_timestep(self, gribfile):
        if self.atmos_timestep is None:
            self.atmos_timestep = self._get_atmos_timestep(gribfile)

    def get_original_grib_path(self, current_date, grid):
        return os.path.join(self.original_files_path,
                            self._get_grib_filename(grid, current_date))

    def _get_grib_filename(self, grid, month):
        return 'ICM{0}{1}+{2}.grb'.format(grid, self.experiment.expid, date2str(month)[:-2])

    def _get_atmos_timestep(self, gribfile):
        Log.info('Getting timestep...')
        grib_handler = pygrib.open(gribfile)
        dates = set()
        try:
            while True:
                mes = grib_handler.next()
                dates.add(mes.analDate)
        except StopIteration:
            pass
        dates = list(dates)
        dates.sort()
        atmos_timestep = dates[1] - dates[0]
        atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
        self.experiment.atmos_timestep = atmos_timestep
        grib_handler.close()
        return atmos_timestep

    def _cmorize_nc_file(self, filename):
        Log.info('Processing file {0}', filename)

        if not self._contains_requested_variables(filename):
        Utils.convert2netcdf4(filename)
        frequency = self._get_nc_file_frequency(filename)
        Utils.rename_variables(filename, Cmorizer.ALT_COORD_NAMES, False, True)
        self._add_common_attributes(filename, frequency)
        self._update_time_variables(filename)

        handler = Utils.openCdf(filename)
        Log.info('Splitting file {0}', filename)
        for variable in handler.variables.keys():
            if variable in Cmorizer.NON_DATA_VARIABLES:
                continue
            try:
                self.extract_variable(filename, handler, frequency, variable)
            except Exception as ex:
                Log.error('Variable {0} can not be cmorized: {1}', variable, ex)
        Log.result('File {0} cmorized!', filename)
        handler.close()
        os.remove(filename)

    # noinspection PyMethodMayBeStatic
    def _remove_valid_limits(self, filename):
        handler = Utils.openCdf(filename)
        for variable in handler.variables.keys():
            var = handler.variables[variable]
            if 'valid_min' in var.ncattrs():
                del var.valid_min
            if 'valid_max' in var.ncattrs():
                del var.valid_max
        handler.close()

    def _get_nc_file_frequency(self, filename):
        file_parts = os.path.basename(filename).split('_')
        if self.experiment.expid in [file_parts[1], file_parts[2]]:
            frequency = Frequency('m')
        elif self.experiment.expid == file_parts[0]:
            try:
                parse_date(file_parts[1])
                frequency = Frequency('m')
                frequency = Frequency(file_parts[1])
            frequency = Frequency(file_parts[1])
        return frequency

    def _contains_requested_variables(self, filename):
        variables = Utils.get_file_variables(filename)
        return self.cmor.any_required(variables)

    def extract_variable(self, file_path, handler, frequency, variable):
        """
        Extracts a variable from a file and creates the CMOR file

        :param file_path: path to the file
        :type file_path: str
        :param handler: netCDF4 handler for the file
        :type handler: netCDF4.Dataset
        :param frequency: variable's frequency
        :type frequency: Frequency
        :param variable: variable's name
        :type variable: str
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        alias, var_cmor = VariableManager().get_variable_and_alias(variable)
        if var_cmor is None:
            return
        if not self.cmor.cmorize(var_cmor):
            return
        Utils.nco.ncks(input=file_path, output=temp, options=('-v {0}'.format(variable),))
        self._rename_level_variables(temp, var_cmor)
        self._add_coordinate_variables(handler, temp)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if alias.basin is None:
            region = alias.basin.name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        date_str = self.get_date_str(file_path)
        if date_str is None:
            Log.error('Variable {0} can not be cmorized. Original filename does not match a recognized pattern',
                      var_cmor.short_name)
            raise CMORException('Variable {0}:{1} can not be cmorized. Original filename does not match a recognized '
                                'pattern'.format(var_cmor.domain, var_cmor.short_name))
        netcdf_file = NetCDFFile()
        netcdf_file.data_manager = self.data_manager
        netcdf_file.local_file = temp
        netcdf_file.remote_file = self.data_manager.get_file_path(self.startdate, self.member,
                                                                  var_cmor.domain, var_cmor.short_name, var_cmor,
                                                                  None, frequency,
                                                                  grid=alias.grid, year=None, date_str=date_str)

        netcdf_file.data_convention = self.config.data_convention
        netcdf_file.region = region

        netcdf_file.frequency = frequency
        netcdf_file.domain = var_cmor.domain
        netcdf_file.var = var_cmor.short_name
        netcdf_file.final_name = var_cmor.short_name

        netcdf_file.prepare_to_upload(rename_var=variable)
        netcdf_file.add_cmorization_history()
        netcdf_file.upload()

        if region:
            region_str = ' (Region {})'.format(region)
        else:
            region_str = ''
        Log.info('Variable {0.domain}:{0.short_name} processed{1}', var_cmor, region_str)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def get_date_str(self, file_path):
        file_parts = os.path.basename(file_path).split('_')
        if file_parts[0] in (self.experiment.expid, 'MMA', 'MMASH', 'MMAGG', 'MMO') or file_parts[0].startswith('ORCA'):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            # Model output
            if file_parts[-1].endswith('.tar'):
                file_parts = file_parts[-1][0:-4].split('-')
                return '{0}-{1}'.format(file_parts[0][0:6], file_parts[1][0:6])
            else:
                return '{0}-{1}'.format(file_parts[2][0:6], file_parts[3][0:6])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        elif file_parts[1] == self.experiment.expid:
            # Files generated by the old version of the diagnostics
            return '{0}-{1}'.format(file_parts[4][0:6], file_parts[5][0:6])
        else:
            return None

    def get_chunk(self, file_path):
        chunk_start = parse_date(self.get_date_str(file_path).split('-')[0])
        current_date = parse_date(self.startdate)
        chunk = 1
        while current_date < chunk_start:
            current_date = chunk_end_date(current_date, self.experiment.chunk_size, 'month', self.experiment.calendar)
            chunk += 1

        if current_date != chunk_start:
            raise Exception('File {0} start date is not a valid chunk start date'.format(file_path))
        return chunk

    @staticmethod
    def _add_coordinate_variables(handler, temp):
        handler_cmor = Utils.openCdf(temp)
        Utils.copy_variable(handler, handler_cmor, 'lon', False)
        Utils.copy_variable(handler, handler_cmor, 'lat', False)
        if 'time' in handler_cmor.dimensions.keys():
            Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
        handler_cmor.close()

    @staticmethod
    def _rename_level_variables(temp, var_cmor):
        if var_cmor.domain == ModelingRealms.ocean:
            Utils.rename_variables(temp, {'deptht': 'lev', 'depthu': 'lev', 'depthw': 'lev', 'depthv': 'lev',
                                          'depth': 'lev'}, False, True)
        if var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
            Utils.rename_variables(temp, {'depth': 'sdepth', 'depth_2': 'sdepth', 'depth_3': 'sdepth',
                                          'depth_4': 'sdepth'}, False, True)
        if var_cmor.domain == ModelingRealms.atmos:
            Utils.rename_variables(temp, {'depth': 'plev'}, False, True)

    @staticmethod
    def _merge_grib_files(current_month, prev_gribfile, gribfile):
        Log.info('Merging data from different files...')
        fd = open('rules_files', 'w')
        fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month))
        fd.close()
        # get first timestep for each month from previous file (if possible)
        if os.path.exists('ICM'):
            os.remove('ICM')
        Utils.execute_shell_command('grib_filter -o ICM rules_files '
                                    '{0} {1}'.format(os.path.basename(prev_gribfile),
                                                     os.path.basename(gribfile)))
        os.remove('rules_files')
        Utils.remove_file(prev_gribfile)

    def _ungrib_vars(self, cdo_reftime, gribfile, month, frequency):
        Log.info('Preparing {0} variables'.format(frequency))
        var_codes = self.config.cmor.get_variables(frequency)
        for var_code in var_codes:
            if not os.path.exists('{0}_{1}.128.nc'.format(gribfile, var_code)):
                continue
            new_units = None

            cdo_operator = '-selmon,{0}'.format(month)
            if frequency == Frequencies.monthly:
                if var_code == 201:
                    cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
                elif var_code == 202:
                    cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
                else:
                    cdo_operator = "-monmean {0} ".format(cdo_operator)
            if frequency == Frequencies.daily:
                if var_code == 201:
                    cdo_operator = "-daymax {0} ".format(cdo_operator)
                elif var_code == 202:
                    cdo_operator = "-daymin {0} ".format(cdo_operator)
                else:
                    cdo_operator = "-daymean {0} ".format(cdo_operator)

            if var_code in (144, 146, 147, 169, 175, 176, 177, 179, 180, 181, 182, 201, 202, 205, 212, 228):
                cdo_operator = '{0} -shifttime,-{1}hours'.format(cdo_operator, self.experiment.atmos_timestep)

            if var_code == 129:
                # geopotential
                new_units = "m"
                cdo_operator = "-divc,9.81 {0}".format(cdo_operator)
            elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
                # radiation
                new_units = "W m-2"
                cdo_operator = "-divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
            elif var_code in (180, 181):
                # momentum flux
                new_units = "N m-2"
                cdo_operator = "-divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
            elif var_code in (144, 182, 205, 228):
                # precipitation/evaporation/runoff
                new_units = "kg m-2 s-1"
                cdo_operator = "-mulc,1000 -divc,{0}".format(self.experiment.atmos_timestep * 3600)

            levels = self.config.cmor.get_levels(frequency, var_code)
            if levels:
                cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels)

            Utils.execute_shell_command('cdo -t ecmwf setreftime,{0} '
                                        '{1} {2}_{3}.128.nc '
                                        '{2}_{3}_{4}.nc'.format(cdo_reftime, cdo_operator,
                                                                gribfile, var_code, frequency))
            h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)

            handler = Utils.openCdf(h_var_file)
            if new_units:
                for var in handler.variables.values():
                    if 'code' in var.ncattrs() and var.code == var_code:
                        var.units = new_units
                        break

            var_name = None
            for key in handler.variables.keys():
                if key + '_2' in handler.variables and key not in handler.dimensions:
                    var_name = key
            handler.close()

            if var_name is not None:
                Utils.nco.ncks(input='{0}_{1}_1m.nc'.format(gribfile, var_code),
                               output='{0}_{1}_1m.nc'.format(gribfile, var_code),
                               options=('-O -v {0}'.format(var_name)))

    def _merge_and_cmorize_atmos(self, chunk_start, chunk_end, grid, frequency):
        merged_file = 'MMA_{0}_{1}_{2}_{3}.nc'.format(frequency, date2str(chunk_start), date2str(chunk_end), grid)
        files = glob.glob(os.path.join(self.config.scratch_dir,
                                       '{0}_*_{1}.nc'.format(self._get_grib_filename(grid, chunk_start), frequency)))
        for first_file in files:
            shutil.move(first_file, merged_file)
            current_month = add_months(chunk_start, 1, self.experiment.calendar)
            while current_month < chunk_end:
                month_file = first_file.replace('+{0}.grb'.format(date2str(chunk_start)[:-2]),
                                                '+{0}.grb'.format(date2str(current_month)[:-2]))
                Utils.concat_variables(month_file, merged_file, True)
                current_month = add_months(current_month, 1, self.experiment.calendar)

            self._cmorize_nc_file(merged_file)

    def _update_time_variables(self, filename):
        handler = Utils.openCdf(filename)
        time_var = handler.variables['time']
        if "time_bnds" in handler.variables:
            time_var.bounds = "time_bnds"
            handler.variables['time_bnds'].units = time_var.units
        handler.close()
        temp = TempFile.get()
        Utils.cdo.setreftime('1850-01-01,00:00:00,days', input=filename, output=temp)
        Utils.move_file(temp, filename)

        self._set_leadtime_var(filename)
    def _set_leadtime_var(self, filename):
        handler = Utils.openCdf(filename)
        if 'leadtime' in handler.variables:
            var = handler.variables['leadtime']
        else:
            var = handler.createVariable('leadtime', float, 'time')
        var.units = "days"
        var.long_name = "Time elapsed since the start of the forecast"
        var.standard_name = "forecast_period"
        leadtime = Utils.get_datetime_from_netcdf(handler)
        startdate = parse_date(self.startdate)
        leadtime = [datetime(time.year, time.month, time.day, time.hour, time.minute, time.second) - startdate
                    for time in leadtime]
        for lt in range(0, len(leadtime)):
            var[lt] = leadtime[lt].days
    def _add_common_attributes(self, filename, frequency):
        cmor = self.config.cmor
        experiment = self.config.experiment
        handler.associated_experiment = cmor.associated_experiment
        handler.batch = '{0}{1}'.format(experiment.institute, datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)'))
        handler.contact = 'Pierre-Antoine Bretonniere, pierre-antoine.bretonniere@bsc.es , ' \
                          'Javier Vegas-Regidor, javier.vegas@bsc.es '
        handler.Conventions = 'CF-1.6'
        handler.creation_date = datetime.now().strftime('%Y-%m-%d(T%H:%M:%SZ)')
        handler.experiment_id = experiment.experiment_name
        handler.forecast_reference_time = parse_date(self.startdate).strftime('%Y-%m-%d(T%H:%M:%SZ)')
        handler.frequency = frequency.frequency
        handler.institute_id = experiment.institute
        handler.institution = experiment.institute
        handler.initialization_method = cmor.initialization_method
        handler.initialization_description = cmor.initialization_description
        handler.physics_version = cmor.physics_version
        handler.physics_description = cmor.physics_description
        handler.model_id = experiment.model
        handler.associated_model = cmor.associated_model
        handler.project_id = self.config.data_convention.upper()
        handler.realization = str(self.member + 1)
        handler.source = cmor.source
        handler.startdate = 'S{0}'.format(self.startdate)
        handler.tracking_id = str(uuid.uuid1())
        handler.title = "{0} model output prepared for {2} {1}".format(experiment.model, experiment.experiment_name,
                                                                       self.config.data_convention.upper())

    def gribfiles_available(self):
        grb_path = os.path.join(self.original_files_path, '*.grb')
        gribfiles = glob.glob(grb_path)
        return len(gribfiles) > 0

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def cmorization_required(self, chunk, domains):
        if not self.config.cmor.chunk_cmorization_requested(chunk):
            return False
        if self.config.cmor.force:
            return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for domain in domains:
            if self.data_manager.is_cmorized(self.startdate, self.member, chunk, domain):
                return False
        return True

class CMORException(Exception):
    pass