cmorizer.py 38.4 KB
Newer Older
"""Cmorization classes"""
import os
import shutil
import uuid
import traceback
import time
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from datetime import datetime
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.date import (
    parse_date,
    chunk_end_date,
    previous_day,
    date2str,
    add_months,
)
from bscearth.utils.log import Log
from cdo import CDOException
from iris.coords import DimCoord
import iris.coord_categorisation
import iris.analysis
import iris.util
from earthdiagnostics.datafile import NetCDFFile
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import TempFile, Utils


class Cmorizer(object):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """
    Class to manage CMORization

    Parameters
    ----------
    data_manager: DataManager
    startdate: str
    member: int
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    NON_DATA_VARIABLES = (
        "lon",
        "lat",
        "longitude",
        "latitude",
        "plev",
        "time",
        "time_bnds",
        "leadtime",
        "lev",
        "lev_2",
        "icethi",
        "deptht",
        "depthu",
        "depthw",
        "depthv",
        "time_centered",
        "time_centered_bounds",
        "deptht_bounds",
        "depthu_bounds",
        "depthv_bounds",
        "depthw_bounds",
        "deptht_bnds",
        "depthu_bnds",
        "depthv_bnds",
        "depthw_bnds",
        "time_counter_bounds",
        "ncatice",
        "nav_lat_grid_V",
        "nav_lat_grid_U",
        "nav_lat_grid_T",
        "nav_lon_grid_V",
        "nav_lon_grid_U",
        "nav_lon_grid_T",
        "depth",
        "depth_2",
        "depth_3",
        "depth_4",
        "depth_bnds",
        "depth_2_bnds",
        "depth_3_bnds",
        "depth_4_bnds",
        "mlev",
        "hyai",
        "hybi",
        "hyam",
        "hybm",
    )
    def __init__(self, data_manager, startdate, member):
        self.data_manager = data_manager
        self.startdate = startdate
        self.member = member
        self.config = data_manager.config
        self.experiment = self.config.experiment
        self.cmor = self.config.cmor
        self.convetion = self.config.data_convention
        self.member_str = self.experiment.get_member_str(member)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.original_files_path = os.path.join(
            self.config.data_dir,
            self.experiment.expid,
            "original_files",
            self.startdate,
            self.member_str,
            "outputs",
        )
        self.atmos_timestep = None
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.cmor_scratch = str(
            os.path.join(
                self.config.scratch_dir,
                "CMOR",
                self.startdate,
                self.member_str,
            )
        )
        self.lon_name = self.config.data_convention.lon_name
        self.lat_name = self.config.data_convention.lat_name
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        self.alt_coord_names = {
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            "time_counter": "time",
            "time_counter_bnds": "time_bnds",
            "time_counter_bounds": "time_bnds",
            "tbnds": "bnds",
            "nav_lat": self.lat_name,
            "nav_lon": self.lon_name,
            "x": "i",
            "y": "j",
        }
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def path_icm(self):
        """Path to the ICM file"""
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return os.path.join(self.config.scratch_dir, "ICM")
    def cmorize_ocean(self):
        """Cmorize ocean files from MMO files"""
        if not self.cmor.ocean:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.info("Skipping ocean cmorization due to configuration")
            return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("\nCMORizing ocean\n")
        return self._cmorize_ocean_files("MMO", "PPO", "diags")
    def _cmorize_ocean_files(self, *args):
        tar_files = ()
        for prefix in args:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            tar_folder = os.path.join(
                self.original_files_path, "{0}*".format(prefix)
            )
            tar_files = glob.glob(tar_folder)
            tar_files.sort()
            if len(tar_files) > 0:
                break
        if not len(tar_files):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error(
                "No {1} files found in {0}".format(
                    self.original_files_path, args
                )
            )
            return False
        result = True
        for tarfile in tar_files:
            if not self._cmorization_required(
                self._get_chunk(os.path.basename(tarfile)),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                (
                    ModelingRealms.ocean,
                    ModelingRealms.seaIce,
                    ModelingRealms.ocnBgchem,
                ),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.info(
                    "No need to unpack file {0}/{1}".format(
                        count, len(tar_files)
                    )
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.info(
                    "Unpacking oceanic file {0}/{1}".format(
                        count, len(tar_files)
                    )
                )
                try:
                    self._unpack_tar_file(tarfile)
                    self._cmorize_nc_files()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.result(
                        "Oceanic file {0}/{1} finished".format(
                            count, len(tar_files)
                        )
                    )
                except Exception as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.error(
                        "Could not CMORize oceanic file {0}: {1}", count, ex
                    )
                    result = False
        return result
    def _filter_files(self, file_list):
        filtered = list()
        for file_path in file_list:
            filename = os.path.basename(file_path)
            if any(f in filename for f in self.cmor.filter_files):
                self._remove(file_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if not filtered:
            raise CMORException(
                f"Filters {self.cmor.filter_files} do not match any file",
            )
        return filtered

    def _remove(self, file_path):
        os.remove(file_path)

    def _cmorize_nc_files(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        nc_files = glob.glob(os.path.join(self.cmor_scratch, "*.nc"))
        for filename in self._filter_files(nc_files):
            self._cmorize_nc_file(filename)
        self._clean_cmor_scratch()
    def _correct_fluxes(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        fluxes_vars = [
            self.data_manager.variable_list.get_variable(
                cmor_var, True
            ).short_name
            for cmor_var in (
                "prc",
                "prs",
                "prsn",
                "rss",
                "rls",
                "rsscs",
                "rsds",
                "rlds",
                "hfss",
                "hfls",
            )
        ]
        change_sign_vars = [
            self.data_manager.variable_list.get_variable(
                cmor_var, True
            ).short_name
            for cmor_var in ("hfss", "hfls")
        ]
        total_seconds = self.experiment.atmos_timestep * 3600
        for filename in glob.glob(os.path.join(self.cmor_scratch, "*.nc")):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            handler = Utils.open_cdf(filename)
            for varname in handler.variables.keys():
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cmor_var = self.data_manager.variable_list.get_variable(
                    varname, True
                )
                if cmor_var is None or cmor_var.short_name not in fluxes_vars:

                if cmor_var.short_name in change_sign_vars:
                    sign = -1
                else:
                    sign = 1

                var_handler = handler.variables[varname]
                var_handler[:] = sign * var_handler[:] / total_seconds
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                var_handler.units = "{0} {1}".format(var_handler.units, "s-1")
    def _unpack_tar_file(self, tarfile):
        self._clean_cmor_scratch()
        os.makedirs(self.cmor_scratch)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Utils.untar(
            (tarfile,), self.cmor_scratch, self.config.cmor.filter_files
        )
        if os.path.isdir(os.path.join(self.cmor_scratch, "backup")):
            for filepath in glob.glob(
                os.path.join(self.cmor_scratch, "backup", "*")
            ):
                Log.debug("Moving file {0}", filepath)
                shutil.move(filepath, filepath.replace("/backup/", "/"))
        zip_files = glob.glob(os.path.join(self.cmor_scratch, "*.gz"))
        if zip_files:
            for zip_file in self._filter_files(zip_files):
                try:
                    Utils.unzip(zip_file)
                except Utils.UnzipException as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.error(
                        "File {0} could not be unzipped: {1}", tarfile, ex
                    )
    def _clean_cmor_scratch(self):
            time.sleep(2)
    def _merge_mma_files(self, tarfile):
        temp = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for grid in ["SH", "GG"]:
            files = glob.glob(
                os.path.join(self.cmor_scratch, "MMA_*_{}_*.nc".format(grid))
            )
            if not files:
                continue
            merged = TempFile.get()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if grid == "SH":
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Utils.cdo().sp2gpl(
                        options="-O", input=filename, output=temp
                    )
            cmorize_vars = set()
            var_manager = self.config.var_manager
            for filename in files:
                handler = Utils.open_cdf(filename)
                for variable in handler.variables.keys():

                    _, var_cmor = var_manager.get_variable_and_alias(
                        variable, silent=True,
                    )
                    if self.cmor.cmorize(var_cmor):
                        cmorize_vars.add(variable)
                handler.close()
            if not cmorize_vars:
                continue
            var_str = ",".join([str(var) for var in cmorize_vars])
            Utils.cdo().mergetime(
                input=[f"-selvar,{var_str} {filepath}" for filepath in files],
                output=merged
            )
                self._remove(filename)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            tar_startdate = (
                os.path.basename(tarfile[0:-4]).split("_")[4].split("-")
            )
            filename = "MMA{0}_1m_{1[0]}_{1[1]}.nc".format(grid, tar_startdate)
            shutil.move(merged, os.path.join(self.cmor_scratch, filename))

    def cmorize_atmos(self):
        """Cmorize atmospheric data, from grib or MMA files"""
        if not self.cmor.atmosphere:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.info("Skipping atmosphere cmorization due to configuration")
            return True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("\nCMORizing atmosphere\n")
        if self.cmor.use_grib and self._gribfiles_available():
            return self._cmorize_grib_files()
            return self._cmorize_mma_files()

    def _cmorize_mma_files(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        tar_files = glob.glob(
            os.path.join(self.original_files_path, "MMA*.tar")
        )
        tar_files.sort()
        count = 1
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error(
                "MMA files not found in {0}".format(self.original_files_path)
            )
            return False

        result = True
        for tarfile in tar_files:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if not self._cmorization_required(
                self._get_chunk(os.path.basename(tarfile)),
                (ModelingRealms.atmos,),
            ):
                Log.info(
                    "No need to unpack file {0}/{1}".format(
                        count, len(tar_files)
                    )
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.info(
                "Unpacking atmospheric file {0}/{1}".format(
                    count, len(tar_files)
                )
            )
            try:
                self._unpack_tar_file(tarfile)
                self._merge_mma_files(tarfile)
                self._correct_fluxes()
                self._cmorize_nc_files()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.result(
                    "Atmospheric file {0}/{1} finished".format(
                        count, len(tar_files)
                    )
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.error(
                    "Could not cmorize atmospheric file {0}: {1}\n {2}",
                    count,
                    ex,
                    traceback.format_exc(),
                )
                result = False
            count += 1
        return result

    def _cmorize_grib_files(self):
        chunk_start = parse_date(self.startdate)
        result = True
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        while os.path.exists(
            self._get_original_grib_path(chunk_start, "GG")
        ) or os.path.exists(self._get_original_grib_path(chunk_start, "SH")):
            if self._cmorization_required(chunk, (ModelingRealms.atmos,)):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                chunk_end = chunk_end_date(
                    chunk_start,
                    self.experiment.chunk_size,
                    "month",
                    self.experiment.calendar,
                )
                chunk_end = previous_day(chunk_end, self.experiment.calendar)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.info(
                    "CMORizing chunk {0}-{1}",
                    date2str(chunk_start),
                    date2str(chunk_end),
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    for grid in ("SH", "GG"):
                        Log.info("Processing {0} variables", grid)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        first_grib = self._get_original_grib_path(
                            chunk_start, grid
                        )
                        var_list = Utils.cdo().showvar(input=first_grib)[0]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        codes = {
                            int(var.replace("var", ""))
                            for var in var_list.split()
                        }
                        if not codes.intersection(
                            self.config.cmor.get_requested_codes()
                        ):
                            Log.info(
                                "No requested variables found in {0}. "
                                "Skipping...",
                                grid,
                            )
                        self._cmorize_grib_file(chunk_end, chunk_start, grid)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.error(
                        "Can not cmorize GRIB file for chunk {0}-{1}: {2}",
                        date2str(chunk_start),
                        date2str(chunk_end),
                        ex,
                    )
                    result = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            chunk_start = chunk_end_date(
                chunk_start,
                self.experiment.chunk_size,
                "month",
                self.experiment.calendar,
            )
        return result
    def _cmorize_grib_file(self, chunk_end, chunk_start, grid):
        for month in range(0, self.experiment.chunk_size):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            current_date = add_months(
                chunk_start, month, self.experiment.calendar
            )
            original_gribfile = self._get_original_grib_path(
                current_date, grid
            )
            Log.info("Processing month {1}", grid, date2str(current_date))
            gribfile = self._get_scratch_grib_path(current_date, grid)
            if not os.path.isfile(gribfile):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.info("Copying file...", grid, date2str(current_date))
                Utils.copy_file(original_gribfile, gribfile)

            self._obtain_atmos_timestep(gribfile)
            full_file = self._get_monthly_grib(current_date, gribfile, grid)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if not self._unpack_grib(
                full_file, gribfile, grid, current_date.month
            ):
                self._remove(gribfile)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            next_gribfile = self._get_original_grib_path(
                add_months(current_date, 1, self.experiment.calendar), grid
            )
            if not os.path.exists(next_gribfile):
                self._remove(gribfile)
            self._ungrib_vars(gribfile, current_date.month)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for splited_file in glob.glob("{0}_*.128.nc".format(gribfile)):
                self._remove(splited_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.result(
                "Month {0}, {1} variables finished",
                date2str(current_date),
                grid,
            )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._merge_and_cmorize_atmos(
            chunk_start, chunk_end, grid, Frequencies.monthly
        )
        self._merge_and_cmorize_atmos(
            chunk_start, chunk_end, grid, Frequencies.daily
        )
        self._merge_and_cmorize_atmos(
            chunk_start, chunk_end, grid, "{0}hr".format(self.atmos_timestep)
        )
    def _unpack_grib(self, full_file, gribfile, grid, month):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Unpacking... ")
        # remap on regular Gauss grid

        codes = self.cmor.get_requested_codes()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        codes_str = ",".join([str(code) for code in codes])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if grid == "SH":
                Utils.cdo().splitparam(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    input="-sp2gpl -selcode,{0} {1} ".format(
                        codes_str, full_file
                    ),
                    output=gribfile + "_",
                    options="-f nc4 -t ecmwf",
                Utils.cdo().splitparam(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    input="-selcode,{0} {1}".format(codes_str, full_file),
                    output=gribfile + "_",
                    options="-R -f nc4 -t ecmwf",
            return True
        except CDOException:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.info("No requested codes found in {0} file".format(grid))
            return False
        finally:
            Utils.remove_file(self.path_icm)

    def _get_monthly_grib(self, current_date, gribfile, grid):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        prev_gribfile = self._get_scratch_grib_path(
            add_months(current_date, -1, self.experiment.calendar), grid
        )
        if os.path.exists(prev_gribfile):
            self._merge_grib_files(current_date, prev_gribfile, gribfile)
            full_file = self.path_icm
        else:
            full_file = gribfile
        return full_file

    def _get_scratch_grib_path(self, current_date, grid):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return os.path.join(
            self.config.scratch_dir,
            self._get_grib_filename(grid, current_date),
        )

    def _obtain_atmos_timestep(self, gribfile):
        if self.atmos_timestep is None:
            self.atmos_timestep = self._get_atmos_timestep(gribfile)

    def _get_original_grib_path(self, current_date, grid):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return os.path.join(
            self.original_files_path,
            self._get_grib_filename(grid, current_date),
        )
    def _get_grib_filename(self, grid, month):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        return "ICM{0}{1}+{2}.grb".format(
            grid, self.experiment.expid, date2str(month)[:-2]
        )

    def _get_atmos_timestep(self, gribfile):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Getting timestep...")
        import cfgrib
        grib = cfgrib.open_file(gribfile)
        dates = set()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        valid_time = grib.variables["valid_time"]
        for t in valid_time.data:
            dates.add(
                cf_units.num2date(
                    t,
                    valid_time.attributes["units"],
                    valid_time.attributes["calendar"],
                )
            )
        dates = list(dates)
        dates.sort()
        atmos_timestep = dates[1] - dates[0]
        atmos_timestep = int(atmos_timestep.total_seconds() / 3600)
        self.experiment.atmos_timestep = atmos_timestep
        return atmos_timestep

    def _cmorize_nc_file(self, filename):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Processing file {0}", filename)

        if not self._contains_requested_variables(filename):
            self._remove(filename)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        # Utils.convert2netcdf4(filename)
        frequency = self._get_nc_file_frequency(filename)
        Utils.rename_variables(filename, self.alt_coord_names, False)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler = Utils.open_cdf(filename)
        Cmorizer._remove_valid_limits(handler)
        self._add_common_attributes(handler, frequency)
        self._update_time_variables(handler)

        variables = handler.variables.keys()
        handler.close()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Splitting file {0}", filename)
        for variable in variables:
            if variable in Cmorizer.NON_DATA_VARIABLES:
                continue
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.debug("Checking variable {0}", variable)
                self.extract_variable(filename, frequency, variable)
            except Exception as ex:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                Log.error(
                    "Variable {0} can not be cmorized: {1}", variable, ex
                )
        Log.result("File {0} cmorized!", filename)
        self._remove(filename)
    @staticmethod
    def _remove_valid_limits(handler):
        for variable in handler.variables.keys():
            var = handler.variables[variable]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if "valid_min" in var.ncattrs():
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if "valid_max" in var.ncattrs():
        handler.sync()
    def _get_nc_file_frequency(self, filename):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_parts = os.path.basename(filename).split("_")
        if self.experiment.expid in [file_parts[1], file_parts[2]]:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            frequency = Frequency("m")
        elif self.experiment.expid == file_parts[0]:
            try:
                parse_date(file_parts[1])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                frequency = Frequency("m")
                frequency = Frequency(file_parts[1])
            frequency = Frequency(file_parts[1])
        return frequency

    def _contains_requested_variables(self, filename):
        variables = Utils.get_file_variables(filename)
        return self.cmor.any_required(variables)

    def extract_variable(self, file_path, frequency, variable):
        Extract a variable from a file and creates the CMOR file

        Parameters
        ----------
        file_path:str
        frequency: Frequency
        variable: str

        Raises
        ------
        CMORException
            If the filename does not match any of the recognized patterns

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        alias, var_cmor = self.config.var_manager.get_variable_and_alias(
            variable
        )
        if var_cmor is None:
            return
        if not self.cmor.cmorize(var_cmor):
            return
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        lev_dimensions = self._set_coordinates_attribute(
            file_path, var_cmor, variable
        )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._rename_level_coords(file_path, lev_dimensions, temp, variable)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if alias.basin is None:
            region = alias.basin.name
        date_str = self._get_date_str(file_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if date_str is None:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.error(
                f"Variable {var_cmor.short_name,} can not be cmorized. "
                "Original filename does not match a recognized pattern",
            )
            raise CMORException(
                f"Variable {var_cmor.domain}:{var_cmor.short_name} can not "
                "be cmorized. Original filename does not match a recognized "
                "pattern"
            )
        netcdf_file = NetCDFFile()
        netcdf_file.data_manager = self.data_manager
        netcdf_file.local_file = temp
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        netcdf_file.remote_file = self.config.data_convention.get_file_path(
            self.startdate,
            self.member,
            var_cmor.domain,
            var_cmor.short_name,
            var_cmor,
            None,
            frequency,
            grid=alias.grid,
            year=None,
            date_str=date_str,
        )

        netcdf_file.data_convention = self.config.data_convention
        netcdf_file.region = region
        netcdf_file.frequency = frequency
        netcdf_file.domain = var_cmor.domain
        netcdf_file.var = var_cmor.short_name
        netcdf_file.final_name = var_cmor.short_name

        netcdf_file.prepare_to_upload(rename_var=variable)
        netcdf_file.add_cmorization_history()
        netcdf_file.upload()

        if region:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            region_str = " (Region {})".format(region)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            region_str = ""
        Log.info(
            "Variable {0.domain}:{0.short_name} processed{1}",
            var_cmor,
            region_str,
        )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _rename_level_coords(self, file_path, lev_dimensions, temp, variable):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        cube = iris.load_cube(
            file_path,
            iris.Constraint(cube_func=lambda c: c.var_name == variable),
        )
        for lev_original, lev_target in six.iteritems(lev_dimensions):
            try:
                cube.coord(var_name=lev_original).var_name = lev_target
            except iris.exceptions.CoordinateNotFoundError:
                pass
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if cube.coord("latitude").ndim > 1:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube.coord("j"),
            except iris.exceptions.CoordinateNotFoundError:
                cube.add_dim_coord(
                    DimCoord(
                        range(cube.shape[-2]),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        var_name="j",
                        long_name="Cell index along second dimension",
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    ),
                    len(cube.shape) - 2,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube.coord("i"),
            except iris.exceptions.CoordinateNotFoundError:
                cube.add_dim_coord(
                    DimCoord(
                        range(cube.shape[-1]),
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        var_name="i",
                        long_name="Cell index along first dimension",
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    ),
                    len(cube.shape) - 1,
                )
        iris.save(cube, temp, zlib=True)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _set_coordinates_attribute(self, file_path, var_cmor, variable):
        handler = Utils.open_cdf(file_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        coords = [self.lon_name, self.lat_name, "time"]
        if "leadtime" in handler.variables.keys():
            coords.append("leadtime")
        lev_dimensions = self._get_lev_dimensions(var_cmor)
        for lev_dim in lev_dimensions.keys():
            if lev_dim in handler.variables[variable].dimensions:
                coords.append(lev_dim)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        handler.variables[variable].coordinates = " ".join(set(coords))
        handler.close()
        return lev_dimensions

    def _get_lev_dimensions(self, var_cmor):
        if var_cmor.domain == ModelingRealms.ocean:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            lev_dimensions = {
                "deptht": "lev",
                "depthu": "lev",
                "depthw": "lev",
                "depthv": "lev",
                "depth": "lev",
            }
        elif var_cmor.domain in [ModelingRealms.landIce, ModelingRealms.land]:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            lev_dimensions = {
                "depth": "sdepth",
                "depth_2": "sdepth",
                "depth_3": "sdepth",
                "depth_4": "sdepth",
            }
        elif var_cmor.domain == ModelingRealms.atmos:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            lev_dimensions = {"depth": "plev"}
        else:
            lev_dimensions = {}
        return lev_dimensions

    def _get_date_str(self, file_path):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        file_parts = os.path.basename(file_path).split("_")
        valid_starts = (self.experiment.expid, "MMA", "MMASH", "MMAGG", "MMO")
        if file_parts[0] in valid_starts or file_parts[0].startswith("ORCA"):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            # Model output
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if file_parts[-1].endswith(".tar"):
                file_parts = file_parts[-1][0:-4].split("-")
                return "{0}-{1}".format(file_parts[0][0:6], file_parts[1][0:6])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                return "{0}-{1}".format(file_parts[2][0:6], file_parts[3][0:6])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        elif file_parts[1] == self.experiment.expid:
            # Files generated by the old version of the diagnostics
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return "{0}-{1}".format(file_parts[4][0:6], file_parts[5][0:6])
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        else:
            return None

    def _get_chunk(self, file_path):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        chunk_start = parse_date(self._get_date_str(file_path).split("-")[0])
        current_date = parse_date(self.startdate)
        chunk = 1
        while current_date < chunk_start:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            current_date = chunk_end_date(
                current_date,
                self.experiment.chunk_size,
                "month",
                self.experiment.calendar,
            )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            raise Exception(
                "File {0} start date is not a valid chunk start date".format(
                    file_path
                )
            )
    def _merge_grib_files(self, current_month, prev_gribfile, gribfile):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Merging data from different files...")
        temp = TempFile.get(suffix=".grb")
        Utils.cdo().selmon(
            current_month.month, input=prev_gribfile, output=temp
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Utils.cdo().mergetime(input=[temp, gribfile], output=self.path_icm)
        self._remove(prev_gribfile)
        self._remove(temp)
    def _ungrib_vars(self, gribfile, month):
        for var_code in self.cmor.get_requested_codes():
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_path = "{0}_{1}.128.nc".format(gribfile, var_code)
            if not os.path.exists(file_path):
            cube = iris.load_cube(file_path)
            cube = self._fix_time_coord(cube, var_code)
            cube = cube.extract(iris.Constraint(month_number=month))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            cube = self._change_units(cube, var_code)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for frequency in (
                Frequencies.monthly,
                Frequencies.daily,
                Frequency("{0}hr".format(self.atmos_timestep)),
            ):
                if var_code not in self.cmor.get_variables(frequency):
                    continue
                time_cube = self._get_time_average(cube, frequency, var_code)

                levels = self.config.cmor.get_levels(frequency, var_code)
                if levels:
                    time_cube = time_cube.extract(level=levels)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                if cube.var_name.endswith("_2"):
                    time_cube.var_name = cube.var_name[:-2]

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                out_file = "{0}_{1}_{2}.nc".format(
                    gribfile, var_code, frequency
                )
                time_cube.remove_coord("month_number")
                time_cube.remove_coord("day_of_month")
                iris.save(time_cube, out_file, zlib=True)
    def _fix_time_coord(self, cube, var_code):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        time = cube.coord("time")
        target_units = "days since 1950-01-01 00:00:00"
        time.convert_units(
            cf_units.Unit(target_units, calendar=time.units.calendar)
        )
        time.units = target_units
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if var_code in (
            144,
            146,
            147,
            169,
            175,
            176,
            177,
            179,
            180,
            181,
            182,
            201,
            202,
            205,
            212,
            228,
        ):
            time.points = time.points - (self.experiment.atmos_timestep / 24.0)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        iris.coord_categorisation.add_day_of_month(cube, "time")
        iris.coord_categorisation.add_month_number(cube, "time")

    @staticmethod
    def _get_time_average(cube, frequency, var_code):
        if frequency == Frequencies.monthly:
            if var_code == 201:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube = cube.aggregated_by(
                    ["month_number", "day_of_month"], iris.analysis.MAX
                )
            elif var_code == 202:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube = cube.aggregated_by(
                    ["month_number", "day_of_month"], iris.analysis.MIN
                )
            cube = cube.aggregated_by(["month_number"], iris.analysis.MEAN)
        elif frequency == Frequencies.daily:
            if var_code == 201:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube = cube.aggregated_by(
                    ["month_number", "day_of_month"], iris.analysis.MAX
                )
            elif var_code == 202:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube = cube.aggregated_by(
                    ["month_number", "day_of_month"], iris.analysis.MIN
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                cube = cube.aggregated_by(
                    ["month_number", "day_of_month"], iris.analysis.MEAN
                )
    def _change_units(self, cube, var_code):
        var_name = cube.var_name
        if var_code == 129:
            # geopotential
            cube = cube / 9.81
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            cube.units = "m"
        elif var_code in (146, 147, 169, 175, 176, 177, 179, 212):
            # radiation
            cube = cube / (self.experiment.atmos_timestep * 3600)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            cube.units = "W m-2"
        elif var_code in (180, 181):
            # momentum flux
            cube = cube / (self.experiment.atmos_timestep * 3600)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            cube.units = "N m-2"
        elif var_code in (144, 182, 205, 228):
            # precipitation/evaporation/runoff
            cube = cube * 1000 / (self.experiment.atmos_timestep * 3600)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            cube.units = "kg m-2 s-1"
        cube.var_name = var_name
        return cube
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _merge_and_cmorize_atmos(
        self, chunk_start, chunk_end, grid, frequency
    ):
        merged_file = "MMA_{0}_{1}_{2}_{3}.nc".format(
            frequency, date2str(chunk_start), date2str(chunk_end), grid
        )
        files = glob.glob(
            os.path.join(
                self.config.scratch_dir,
                "{0}_*_{1}.nc".format(
                    self._get_grib_filename(grid, chunk_start), frequency
                ),
            )
        )

        def _load_cube(cube, field, filename):