utils.py 9.25 KB
Newer Older
import netCDF4
import re
from autosubmit.config.log import Log
from cdo import Cdo
from nco import Nco
import tempfile
import os
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
import pwd
import shutil
    """
    Container class for miscellaneous utility methods
    """
    """An instance of Nco class ready to be used"""
    """An instance of Cdo class ready to be used"""

    @staticmethod
    def setminmax(filename, variable_list):
        """
        Sets the valid_max and valid_min values to the current max and min values on the file
        :param filename: path to file
        :type filename: str
        :param variable_list: list of variables in which valid_min and valid_max will be set
        :type variable_list: str | list
        """
        if isinstance(variable_list, basestring):
            variable_list = variable_list.split()

        Log.info('Getting max and min values for {0}', ' '.join(variable_list))
        handler = Utils.openCdf(filename)
            var = handler.variables[variable]
            values = [np.max(var), np.min(var)]
            Utils.nco.ncatted(input=filename, output=filename,
                              options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]))
            Utils.nco.ncatted(input=filename, output=filename,
                              options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]))
        handler.close()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
        """
        Rename multiple variables from a NetCDF file
        :param filepath: path to file
        :type filepath: str
        :param old_name: variable's name to change
        :type old_name: str
        :param new_name: new name
        :type new_name: str
        :param must_exist: if True, the function will raise an exception if the variable name does not exist
        :type must_exist: bool
        :param rename_dimension: if True, also rename dimensions with the same name
        :type rename_dimension: bool
        """
        Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
        """
        Rename multiple variables from a NetCDF file
        :param filepath: path to file
        :type filepath: str
        :param dic_names: dictionary containing old names as keys and new names as values
        :type dic_names: dict
        :param must_exist: if True, the function will raise an exception if the variable name does not exist
        :type must_exist: bool
        :param rename_dimension: if True, also rename dimensions with the same name
        :type rename_dimension: bool
        """
        handler = Utils.openCdf(filepath)
        for old_name, new_name in dic_names.items():
                if new_name not in handler.variables:
                    handler.renameVariable(old_name, new_name)
            elif must_exist:
                raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))

            if rename_dimension:
                if old_name in handler.dimensions:
                    handler.renameDimension(old_name, new_name)
                elif must_exist:
                    raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath))
            handler.sync()


Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def move_file(source, destiny):
        """
        Moves a file from source to destiny, creating dirs if necessary

        :param source: path to source
        :type source: str
        :param destiny:  path to destiny
        :type destiny: str
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if not os.path.exists(os.path.dirname(destiny)):
            try:
                os.makedirs(os.path.dirname(destiny))
            except OSError as ex:
                # This can be due to a race condition. If directory already exists, we don have to do nothing
                if not os.path.exists(os.path.dirname(destiny)):
                    raise ex
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        shutil.move(source, destiny)

    def execute_shell_command(command, log_level=Log.DEBUG):
        """
        Executes a sheel command
        :param command: command to execute
        :type command: str|list
        :param log_level: log level to use for command output
        :type log_level: int
        :return: command output
        :rtype: list
        """
        if isinstance(command, basestring):
            command = command.split()
        process = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = list()
        comunicate = process.communicate()
        if log_level != Log.NO_LOG:
            for line in comunicate:
                if not line:
                    continue
                Log.log.log(log_level, line)
                output.append(line)
            raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode)
        return output
    _cpu_count = None

    @staticmethod
    def available_cpu_count():
        """ Number of available virtual or physical CPUs on this system, i.e.
        user/real as output by time(1) when called with an optimally scaling
        userspace-only program"""
        if Utils._cpu_count is None:
            try:
                m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$',
                              open('/proc/self/status').read())
                if m:
                    res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
                    if res > 0:
                        Utils._cpu_count = res
            except IOError:
                try:
                    import multiprocessing
                    Utils._cpu_count = multiprocessing.cpu_count()
                    return Utils._cpu_count
                except (ImportError, NotImplementedError):
                    Utils._cpu_count = -1
        Log.info('Available cores: {0}', Utils._cpu_count)
        return Utils._cpu_count

    @staticmethod
    def convert2netcdf4(filetoconvert):
        temp = TempFile.get()
        handler = Utils.openCdf(filetoconvert)
        if handler.file_format == 'NETCDF4':
            handler.close()
            return
        handler.close()
        Log.debug('Reformatting to netCDF-4')
        Utils.execute_shell_command(["nccopy", "-4", filetoconvert, temp])
        shutil.move(temp, filetoconvert)

    # noinspection PyPep8Naming
    @staticmethod
    def openCdf(filepath, mode='a'):
        return netCDF4.Dataset(filepath, mode)

    @staticmethod
    def get_datetime_from_netcdf(handler, time_variable='time'):
        nctime = handler.variables[time_variable][:]  # get values
        units = handler.variables[time_variable].units  # get unit  "days since 1950-01-01T00:00:00Z"

        try:
            cal_temps = handler.variables[time_variable].calendar
        except AttributeError:  # Attribute doesn't exist
            cal_temps = u"gregorian"  # or standard
        return netCDF4.num2date(nctime, units=units, calendar=cal_temps)

    @staticmethod
    def copy_variable(source, destiny, variable, must_exist=True):
        if not must_exist and variable not in source.variables.keys():
            return
        if variable in destiny.variables.keys():
            return
        original_var = source.variables[variable]
        new_var = destiny.createVariable(variable, original_var.datatype, original_var.dimensions)
        new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
        new_var[:] = original_var[:]

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class TempFile(object):
    """
    Class to manage temporal files
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    autoclean = True
    """
    If True, new temporary files are added to the list for future cleaning
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    files = list()
    """
    List of files to clean automatically
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    scratch_folder = os.path.join('/scratch', pwd.getpwuid(os.getuid())[0])
    """
    Scratch folder to create temporary files on it
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    prefix = 'temp'
    """
    Prefix for temporary filenames
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def get(filename=None, clean=None, suffix='.nc'):
        """
        Gets a new temporal filename, storing it for automated cleaning

        :param filename: if it is not none, the function will use this filename instead of a random one
        :type filename: str
        :param clean: if true, stores filename for cleaning
        :type clean: bool
        :return: path to the temporal file
        :rtype: str
        """
        if clean is None:
            clean = TempFile.autoclean

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if filename:
            path = os.path.join(TempFile.scratch_folder, filename)
        else:
            fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix)
            os.close(fd)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if clean:
            TempFile.files.append(path)

        return path

    @staticmethod
    def clean():
        """
        Removes all temporary files created with Tempfile until now
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for temp_file in TempFile.files:
            if os.path.exists(temp_file):
                os.remove(temp_file)
        TempFile.files = list()