utils.py 11.3 KB
Newer Older
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
# coding=utf-8
import hashlib
import shutil
import netCDF4
import re
import tempfile
from autosubmit.config.log import Log
from cdo import Cdo
from nco import Nco


class Utils(object):
    """
    Container class for miscellaneous utility methods
    """
    """An instance of Nco class ready to be used"""
    """An instance of Cdo class ready to be used"""

    @staticmethod
    def setminmax(filename, variable_list):
        """
        Sets the valid_max and valid_min values to the current max and min values on the file
        :param filename: path to file
        :type filename: str
        :param variable_list: list of variables in which valid_min and valid_max will be set
        :type variable_list: str | list
        """
        if isinstance(variable_list, basestring):
            variable_list = variable_list.split()

        Log.info('Getting max and min values for {0}', ' '.join(variable_list))
        handler = Utils.openCdf(filename)
            var = handler.variables[variable]
            values = [np.max(var), np.min(var)]
            Utils.nco.ncatted(input=filename, output=filename,
                              options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]))
            Utils.nco.ncatted(input=filename, output=filename,
                              options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]))
        handler.close()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    @staticmethod
    def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
        """
        Rename multiple variables from a NetCDF file
        :param filepath: path to file
        :type filepath: str
        :param old_name: variable's name to change
        :type old_name: str
        :param new_name: new name
        :type new_name: str
        :param must_exist: if True, the function will raise an exception if the variable name does not exist
        :type must_exist: bool
        :param rename_dimension: if True, also rename dimensions with the same name
        :type rename_dimension: bool
        """
        Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
        """
        Rename multiple variables from a NetCDF file
        :param filepath: path to file
        :type filepath: str
        :param dic_names: dictionary containing old names as keys and new names as values
        :type dic_names: dict
        :param must_exist: if True, the function will raise an exception if the variable name does not exist
        :type must_exist: bool
        :param rename_dimension: if True, also rename dimensions with the same name
        :type rename_dimension: bool
        """
        handler = Utils.openCdf(filepath)
        for old_name, new_name in dic_names.items():
                if new_name not in handler.variables:
                    handler.renameVariable(old_name, new_name)
            elif must_exist:
                raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))

            if rename_dimension:
                if old_name in handler.dimensions:
                    handler.renameDimension(old_name, new_name)
                elif must_exist:
                    raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath))
            handler.sync()

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def move_file(source, destiny):
        """
        Moves a file from source to destiny, creating dirs if necessary

        :param source: path to source
        :type source: str
        :param destiny:  path to destiny
        :type destiny: str
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if not os.path.exists(os.path.dirname(destiny)):
            try:
                os.makedirs(os.path.dirname(destiny))
            except OSError as ex:
                # This can be due to a race condition. If directory already exists, we don have to do nothing
                if not os.path.exists(os.path.dirname(destiny)):
                    raise ex
        hash_destiny = None
        hash_original = Utils.get_file_hash(source)

        retrials = 5
        while hash_original != hash_destiny:
            if retrials == 0:
                raise Exception('Can not move {0} to {1}'.format(source, destiny))
            shutil.copy(source, destiny)
            hash_destiny = Utils.get_file_hash(destiny)
        os.remove(source)

    @staticmethod
    def get_file_hash(filepath):
        """
        Returns the MD5 hash for the given filepath
        :param filepath: path to the file to compute hash on
        :type filepath:str
        :return: file's MD5 hash
        :rtype: str
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        blocksize = 65536
        hasher = hashlib.md5()
        with open(filepath, 'rb') as afile:
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
            buf = afile.read(blocksize)
            while len(buf) > 0:
                hasher.update(buf)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
                buf = afile.read(blocksize)
        return hasher.hexdigest()
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    def execute_shell_command(command, log_level=Log.DEBUG):
        """
        Executes a sheel command
        :param command: command to execute
        :type command: str|list
        :param log_level: log level to use for command output
        :type log_level: int
        :return: command output
        :rtype: list
        """
        if isinstance(command, basestring):
            command = command.split()
        process = subprocess.Popen(command, stdout=subprocess.PIPE)
        output = list()
        comunicate = process.communicate()
        if log_level != Log.NO_LOG:
            for line in comunicate:
                if not line:
                    continue
                Log.log.log(log_level, line)
                output.append(line)
            raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode)
        return output
    _cpu_count = None

    @staticmethod
    def available_cpu_count():
        """
        Number of available virtual or physical CPUs on this systemx
        """
        if Utils._cpu_count is None:
            try:
                m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$',
                              open('/proc/self/status').read())
                if m:
                    res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
                    if res > 0:
                        Utils._cpu_count = res
            except IOError:
                try:
                    import multiprocessing
                    Utils._cpu_count = multiprocessing.cpu_count()
                    return Utils._cpu_count
                except (ImportError, NotImplementedError):
                    Utils._cpu_count = -1
        Log.info('Available cores: {0}', Utils._cpu_count)
        return Utils._cpu_count

    @staticmethod
    def convert2netcdf4(filetoconvert):
        """
        Checks if a file is in netCDF4 format and converts to netCDF4 if not

        :param filetoconvert: file to convert
        :type filetoconvert: str
        """
        handler = Utils.openCdf(filetoconvert)
        if handler.file_format == 'NETCDF4':
            handler.close()
            return
        handler.close()
        Log.debug('Reformatting to netCDF-4')
        Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp])
    # noinspection PyPep8Naming
    @staticmethod
    def openCdf(filepath, mode='a'):
        """
        Opens a netCDF file and returns a handler to it

        :param filepath: path to the file
        :type filepath: str
        :param mode: mode to open the file. By default, a (append)
        :type mode: str
        :return: handler to the file
        :rtype: netCDF4.Dataset
        """
        return netCDF4.Dataset(filepath, mode)

    @staticmethod
    def get_datetime_from_netcdf(handler, time_variable='time'):
        """
        Gets a datetime array from a netCDF file

        :param handler: file to read
        :type handler: netCDF4.Dataset
        :param time_variable: variable to read, by default 'time'
        :type time_variable: str
        :return: Datetime numpy array created from the values stored at the netCDF file
        :rtype: np.array
        """
        nctime = handler.variables[time_variable][:]  # get values
        units = handler.variables[time_variable].units  # get unit  "days since 1950-01-01T00:00:00Z"

        try:
            cal_temps = handler.variables[time_variable].calendar
        except AttributeError:  # Attribute doesn't exist
            cal_temps = u"gregorian"  # or standard
        return netCDF4.num2date(nctime, units=units, calendar=cal_temps)

    @staticmethod
    def copy_variable(source, destiny, variable, must_exist=True):
        """
        Copies the given variable from source to destiny

        :param source: origin file
        :type source: netCDF4.Dataset
        :param destiny: destiny file
        :type destiny: netCDF4.Dataset
        :param variable: variable to copy
        :type variable: str
        :param must_exist: if false, does not raise an error uf variable does not exist
        :type must_exist: booº
        :return:
        """
        if not must_exist and variable not in source.variables.keys():
            return
        if variable in destiny.variables.keys():
            return
        original_var = source.variables[variable]
        new_var = destiny.createVariable(variable, original_var.datatype, original_var.dimensions)
        new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
        new_var[:] = original_var[:]

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
class TempFile(object):
    """
    Class to manage temporal files
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    autoclean = True
    """
    If True, new temporary files are added to the list for future cleaning
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    files = list()
    """
    List of files to clean automatically
    """
    """
    Scratch folder to create temporary files on it
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
    prefix = 'temp'
    """
    Prefix for temporary filenames
    """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

    @staticmethod
    def get(filename=None, clean=None, suffix='.nc'):
        """
        Gets a new temporal filename, storing it for automated cleaning

        :param filename: if it is not none, the function will use this filename instead of a random one
        :type filename: str
        :param clean: if true, stores filename for cleaning
        :type clean: bool
        :return: path to the temporal file
        :rtype: str
        """
        if clean is None:
            clean = TempFile.autoclean

Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        if filename:
            path = os.path.join(TempFile.scratch_folder, filename)
        else:
            fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix)
            os.close(fd)
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed

        if clean:
            TempFile.files.append(path)

        return path

    @staticmethod
    def clean():
        """
        Removes all temporary files created with Tempfile until now
        """
Javier Vegas-Regidor's avatar
Javier Vegas-Regidor committed
        for temp_file in TempFile.files:
            if os.path.exists(temp_file):
                os.remove(temp_file)
        TempFile.files = list()