earthdiags.py 17.3 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
"""Entry point for EarthDiagnostics"""
from datetime import datetime
import iris

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from bscearth.utils.log import Log
from earthdiagnostics import cdftools
from earthdiagnostics.cmormanager import CMORManager
from earthdiagnostics.config import Config
from earthdiagnostics.constants import Basins
from earthdiagnostics.obsreconmanager import ObsReconManager
from earthdiagnostics.threddsmanager import THREDDSManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
from earthdiagnostics.utils import TempFile, Utils
from earthdiagnostics.work_manager import WorkManager
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class EarthDiags(object):
    """
    Launcher class for the diagnostics
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    Parameters
    ----------
    config_file: str
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    # Get the version number from the relevant file.
    # If not, from earthdiagnostics package
    scriptdir = os.path.abspath(os.path.dirname(__file__))
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    if not os.path.exists(os.path.join(scriptdir, "VERSION")):
        scriptdir = os.path.join(scriptdir, os.path.pardir)

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    version_path = os.path.join(scriptdir, "VERSION")
    readme_path = os.path.join(scriptdir, "README")
    changes_path = os.path.join(scriptdir, "CHANGELOG")
    if os.path.isfile(version_path):
        with open(version_path) as f:
        version = pkg_resources.require("earthdiagnostics")[0].version
    def __init__(self):
        self.time = dict()
        self.data_manager = None
        self.threads = None
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config = Config()

    def read_config(self, config_file):
        """
        Read config file and initialize earthdiagnostics

        Parameters
        ----------
        config_file

        Returns
        -------

        """
        Log.info(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            "Initialising Earth Diagnostics Version {0}", EarthDiags.version
        )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.config.parse(config_file)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
        TempFile.scratch_folder = self.config.scratch_dir
        cdftools.path = self.config.cdftools_path
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._create_dic_variables()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.debug("Diags ready")
        Log.info(
            "Running diags for experiment {0}, startdates {1}, members {2}",
            self.config.experiment.expid,
            self.config.experiment.startdates,
            self.config.experiment.members,
        )
    def parse_args(args):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Entry point for the Earth Diagnostics.

        For more detailed documentation, use -h option
        parser = argparse.ArgumentParser(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            description="Main executable for Earth Diagnostics."
        )
        parser.add_argument(
            "-v",
            "--version",
            action="version",
            version=EarthDiags.version,
            help="returns Earth Diagnostics's version number and exit",
        )
        parser.add_argument(
            "--doc", action="store_true", help="opens documentation and exits"
        )
        parser.add_argument(
            "--clean",
            action="store_true",
            help="clean the scratch folder and exits",
        )
        parser.add_argument(
            "--report",
            action="store_true",
            help="generates a report about the available files",
        )
        parser.add_argument(
            "-lf",
            "--logfile",
            choices=(
                "EVERYTHING",
                "DEBUG",
                "INFO",
                "RESULT",
                "USER_WARNING",
                "WARNING",
                "ERROR",
                "CRITICAL",
                "NO_LOG",
            ),
            default="DEBUG",
            type=str,
            help="sets file's log level.",
        )
        parser.add_argument(
            "-lc",
            "--logconsole",
            choices=(
                "EVERYTHING",
                "DEBUG",
                "INFO",
                "RESULT",
                "USER_WARNING",
                "WARNING",
                "ERROR",
                "CRITICAL",
                "NO_LOG",
            ),
            default="INFO",
            type=str,
            help="sets console's log level",
        )

        parser.add_argument("-log", "--logfilepath", default=None, type=str)

        parser.add_argument(
            "-f", "--configfile", default="diags.conf", type=str
        )
        args = parser.parse_args(args)
            return EarthDiags.open_documentation()
        Log.set_console_level(args.logconsole)
        Log.set_file_level(args.logfile)
            Log.set_file(bscearth.utils.path.expand_path(args.logfilepath))
            diags = EarthDiags()
            diags.read_config(args.configfile)
            if args.clean:
                result = diags.clean()
            elif args.report:
                result = diags.report()
            else:
                result = diags.run()
        except Exception:
            raise
    @staticmethod
    def open_documentation():
        """
        Open Earthdiagnostics online documentation

        Returns
        -------
        bool:
            True if successful
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Opening documentation...")
        doc_path = os.path.join(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            "http://earthdiagnostics.readthedocs.io/en/latest"
        )
        Utils.execute_shell_command(("xdg-open", doc_path))
        Log.result("Documentation opened!")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_dic_variables(self):
        self.dic_variables = dict()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self.dic_variables["x"] = "i"
        self.dic_variables["y"] = "j"
        self.dic_variables["z"] = "lev"
        self.dic_variables["nav_lon"] = self.config.data_convention.lon_name
        self.dic_variables["nav_lat"] = self.config.data_convention.lat_name
        self.dic_variables["nav_lev"] = "lev"
        self.dic_variables["time_counter"] = "time"
        self.dic_variables["t"] = "time"
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        """
        Run the diagnostics
        start = datetime.now()
        self.had_errors = False
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.debug("Using netCDF version {0}", netCDF4.getlibversion())

        self._prepare_scratch_dir()
        self._prepare_mesh_files()
        self._initialize_basins()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Time to prepare: {}", datetime.now() - start)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._prepare_data_manager()

        # Run diagnostics
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Running diagnostics")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        work_manager = WorkManager(self.config, self.data_manager)
        work_manager.prepare_job_list()

        result = work_manager.run()

        if self.config.auto_clean:
            self._remove_scratch_dir()
    def _initialize_basins(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        self._read_basins_from_file("basins.nc")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def _read_basins_from_file(filename):
        if not os.path.isfile(filename):
            return
        Basins().get_available_basins(iris.load_cube(filename))
    def _prepare_scratch_dir(self):
        if self.config.use_ramdisk:
            self._remove_scratch_dir()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            tempfile.mkdtemp(dir="/dev/shm")
            os.symlink(
                tempfile.mkdtemp(dir="/dev/shm"), self.config.scratch_dir
            )
        else:
            if not os.path.exists(self.config.scratch_dir):
                os.makedirs(self.config.scratch_dir)
        os.chdir(self.config.scratch_dir)

    def _prepare_data_manager(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if self.config.data_adaptor == "CMOR":
            self.data_manager = CMORManager(self.config)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        elif self.config.data_adaptor == "THREDDS":
            self.data_manager = THREDDSManager(self.config)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        elif self.config.data_adaptor == "OBSRECON":
            self.data_manager = ObsReconManager(self.config)
        self.data_manager.prepare()

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Clean scratch folder

        Returns
        -------
        bool

        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Removing scratch folder...")
        self._remove_scratch_dir()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.result("Scratch folder removed")
    def _remove_scratch_dir(self):
        if os.path.islink(self.config.scratch_dir):
            # time.sleep(4)
            # shutil.rmtree(os.path.realpath(self.config.scratch_dir))
            Utils.execute_shell_command(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                "rm -r {0}".format(os.path.realpath(self.config.scratch_dir))
            )
            os.remove(self.config.scratch_dir)
        elif os.path.isdir(self.config.scratch_dir):
            # time.sleep(4)
            # shutil.rmtree(self.config.scratch_dir)
            Utils.execute_shell_command(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                "rm -r {0}".format(self.config.scratch_dir)
            )
    def report(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        """
        Create a report of missing variables for a given experiment

        Returns
        -------
        bool
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Looking for existing vars...")
        self._prepare_data_manager()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        base_folder = self.config.report.path
        if not base_folder:
            base_folder = self.config.scratch_dir
        Utils.create_folder_tree(base_folder)
        for startdate in self.config.experiment.startdates:
            for member in self.config.experiment.members:
                results = self._get_variable_report(startdate, member)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                report_path = os.path.join(
                    base_folder,
                    "{0}_{1}.report".format(
                        startdate,
                        self.config.experiment.get_member_str(member),
                    ),
                )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                self._create_report(report_path, results)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.result("Report finished")
        return True

    def _get_variable_report(self, startdate, member):
        var_manager = self.config.var_manager
        results = list()
        for var in var_manager.get_all_variables():
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if var.domain is None:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            for table, priority in var.tables:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                if (
                    priority is None
                    or priority > self.config.report.maximum_priority
                ):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    continue
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                if not self.data_manager.file_exists(
                    var.domain,
                    var.short_name,
                    startdate,
                    member,
                    1,
                    frequency=table.frequency,
                ):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    results.append((var, table, priority))
                    Log.debug(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        "Variable {0.short_name} not found in {1.name}",
                        var,
                        table,
                    )
                    Log.result(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                        "Variable {0.short_name} found in {1.name}", var, table
                    )
        return results
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def _create_report(report_path, results):
        tables = set([result[1].name for result in results])
        for table in tables:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_handler = open("{0}.{1}".format(report_path, table), "w")
            table_results = [
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                result for result in results if result[1].name == table
            ]
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_handler.write("\nTable {0}\n".format(table))
            file_handler.write("===================================\n")
            priorities = set([result[2] for result in table_results])
            priorities = sorted(priorities)
            for priority in priorities:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                priority_results = [
                    result[0]
                    for result in table_results
                    if result[2] == priority
                ]
                priority_results = sorted(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    priority_results, key=lambda v: v.short_name
                )
                file_handler.write(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    "\nMissing variables with priority {0}:\n".format(priority)
                )
                file_handler.write("--------------------------------------\n")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                for var in priority_results:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    file_handler.write(
                        "{0:12}: {1}\n".format(
                            var.short_name, var.standard_name
                        )
                    )
            file_handler.flush()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            file_handler.close()
    def _prepare_mesh_files(self):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        model_version = self.config.experiment.model_version
        if not model_version:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.info("No model version defined. Skipping mesh files copy!")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            return
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("Copying mesh files")
        con_files = self.config.con_files
        model_version = self.config.experiment.model_version
        restore_meshes = self.config.restore_meshes
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        mesh_mask = "mesh_mask_nemo.{0}.nc".format(model_version)
        new_mask_glo = "new_maskglo.{0}.nc".format(model_version)
        basins = "basins.{0}.nc".format(model_version)

        if self.config.mesh_mask:
            mesh_mask_path = self.config.mesh_mask
        else:
            mesh_mask_path = os.path.join(con_files, mesh_mask)

        if self.config.new_mask_glo:
            new_mask_glo_path = self.config.new_mask_glo
        else:
            new_mask_glo_path = os.path.join(con_files, new_mask_glo)
        if self.config.basins:
            basins_path = self.config.basins
            basins_path = os.path.join(con_files, basins)
        if self.config.scratch_masks:
            self._prepare_mesh_using_scratch(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                basins,
                basins_path,
                mesh_mask,
                mesh_mask_path,
                new_mask_glo,
                new_mask_glo_path,
                restore_meshes,
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._copy_file(mesh_mask_path, "mesh_hgr.nc", restore_meshes)
            self._link_file("mesh_hgr.nc", "mesh_zgr.nc")
            self._link_file("mesh_hgr.nc", "mask.nc")
            self._copy_file(
                new_mask_glo_path, "new_maskglo.nc", restore_meshes
            )
            self._copy_file(basins_path, "basins.nc", restore_meshes)

        Log.result("Mesh files ready!")

    def _prepare_mesh_using_scratch(
        self,
        basins,
        basins_path,
        mesh_mask,
        mesh_mask_path,
        new_mask_glo,
        new_mask_glo_path,
        restore_meshes,
    ):
        Utils.create_folder_tree(self.config.scratch_masks)
        Utils.give_group_write_permissions(self.config.scratch_masks)
        mesh_mask_scratch_path = os.path.join(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self.config.scratch_masks, mesh_mask
        )
        if self._copy_file(
            mesh_mask_path, mesh_mask_scratch_path, restore_meshes
        ):
            Utils.give_group_write_permissions(mesh_mask_scratch_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._link_file(mesh_mask_scratch_path, "mesh_hgr.nc")
            self._link_file(mesh_mask_scratch_path, "mesh_zgr.nc")
            self._link_file(mesh_mask_scratch_path, "mask.nc")

        new_maskglo_scratch_path = os.path.join(
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self.config.scratch_masks, new_mask_glo
        )
        if self._copy_file(
            new_mask_glo_path, new_maskglo_scratch_path, restore_meshes
        ):
            Utils.give_group_write_permissions(new_maskglo_scratch_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._link_file(new_maskglo_scratch_path, "new_maskglo.nc")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        basins_scratch_path = os.path.join(self.config.scratch_masks, basins)
        if self._copy_file(basins_path, basins_scratch_path, restore_meshes):
            Utils.give_group_write_permissions(basins_path)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            self._link_file(basins_scratch_path, "basins.nc")
    def _copy_file(self, source, destiny, force):
        if not os.path.exists(source):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.user_warning(
                "File {0} is not available for {1}",
                destiny,
                self.config.experiment.model_version,
            )
            Log.debug("Looking for it in {0}", source)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if not force and os.path.exists(destiny):
            # Small size differences can be due to the renaming of variables
            reference_size = os.stat(source).st_size
            delta_size = abs(reference_size - os.stat(destiny).st_size)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            if (
                delta_size < 2048
                or delta_size / reference_size < 1 / 1000
                or True
            ):
                Log.info("File {0} already exists", destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        Log.info("Copying file {0}", os.path.abspath(destiny))
        shutil.copyfile(source, destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("File {0} ready", destiny)
        Utils.rename_variables(destiny, self.dic_variables, False)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def _link_file(self, source, destiny):
        if not os.path.exists(source):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            Log.user_warning(
                "File {0} is not available for {1}",
                destiny,
                self.config.experiment.model_version,
            )
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if os.path.lexists(destiny):
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            try:
                os.remove(destiny)
            except OSError as ex:
                if ex.errno == 13:  # Permission denied
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                    Log.info("Link already created")
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        os.symlink(source, destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        Log.info("File {0} ready", destiny)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    """Main for earthdiagnostics"""
    if not EarthDiags.parse_args(sys.argv[1:]):


if __name__ == "__main__":
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    main()