import subprocess import numpy as np from autosubmit.config.log import Log from cdo import Cdo from nco import Nco import tempfile import os import pwd import shutil class Utils(object): """ Container class for miscellaneous utility methods """ nco = Nco() """An instance of Nco class ready to be used""" cdo = Cdo() """An instance of Cdo class ready to be used""" @staticmethod def setminmax(filename, variable_list): """ Sets the valid_max and valid_min values to the current max and min values on the file :param filename: path to file :type filename: str :param variable_list: list of variables in which valid_min and valid_max will be set :type variable_list: str | list """ if isinstance(variable_list, basestring): variable_list = variable_list.split() Log.info('Getting max and min values for {0}', ' '.join(variable_list)) dataset = Utils.nco.readCdf(filename) for variable in variable_list: var = dataset.variables[variable] values = [np.max(var), np.min(var)] Utils.nco.ncatted(input=filename, output=filename, options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0])) Utils.nco.ncatted(input=filename, output=filename, options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1])) @staticmethod def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False): """ Rename multiple variables from a NetCDF file :param filepath: path to file :type filepath: str :param old_name: variable's name to change :type old_name: str :param new_name: new name :type new_name: str :param must_exist: if True, the function will raise an exception if the variable name does not exist :type must_exist: bool :param rename_dimension: if True, also rename dimensions with the same name :type rename_dimension: bool """ Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension) @staticmethod def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False): """ Rename multiple variables from a NetCDF file :param filepath: path to file :type filepath: str :param dic_names: dictionary containing old names as keys and new names as values :type dic_names: dict :param must_exist: if True, the function will raise an exception if the variable name does not exist :type must_exist: bool :param rename_dimension: if True, also rename dimensions with the same name :type rename_dimension: bool """ handler = Utils.nco.openCdf(filepath) for old_name, new_name in dic_names.items(): if old_name in handler.variables: handler.renameVariable(old_name, new_name) elif must_exist: raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath)) if rename_dimension: if old_name in handler.dimensions: handler.renameDimension(old_name, new_name) elif must_exist: raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath)) handler.close() @staticmethod def move_file(source, destiny): """ Moves a file from source to destiny, creating dirs if necessary :param source: path to source :type source: str :param destiny: path to destiny :type destiny: str """ if not os.path.exists(os.path.dirname(destiny)): os.makedirs(os.path.dirname(destiny)) shutil.move(source, destiny) @staticmethod def execute_shell_command(command, log_level=Log.DEBUG): """ Executes a sheel command :param command: command to execute :type command: str|list :param log_level: log level to use for command output :type log_level: int :return: command output :rtype: list """ if isinstance(command, basestring): command = command.split() process = subprocess.Popen(command, stdout=subprocess.PIPE) output = list() comunicate = process.communicate() for line in comunicate: if not line: continue Log.log.log(log_level, line) output.append(line) if process.returncode != 0: raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode) return output @staticmethod def get_cardinal_coordinate(value, latitude): """ Translates a degree value into degrees NSEO :param value: value in degress :type value: float :param latitude: if True, value is latitude, Otherwise value is longitude :type latitude: bool :return: transformed value :rtype: str """ if latitude: if value < 0: direction = 'S' else: direction = 'N' else: if value < 0: direction = 'W' else: direction = 'E' return str(abs(value)) + direction class TempFile(object): """ Class to manage temporal files """ autoclean = True """ If True, new temporary files are added to the list for future cleaning """ files = list() """ List of files to clean automatically """ scratch_folder = os.path.join('/scratch', pwd.getpwuid(os.getuid())[0]) """ Scratch folder to create temporary files on it """ prefix = 'temp' """ Prefix for temporary filenames """ @staticmethod def get(filename=None, clean=None): """ Gets a new temporal filename, storing it for automated cleaning :param filename: if it is not none, the function will use this filename instead of a random one :type filename: str :param clean: if true, stores filename for cleaning :type clean: bool :return: path to the temporal file :rtype: str """ if clean is None: clean = TempFile.autoclean if filename: path = os.path.join(TempFile.scratch_folder, filename) else: path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix='.nc')[1] if clean: TempFile.files.append(path) return path @staticmethod def clean(): """ Removes all temporary files created with Tempfile until now """ for temp_file in TempFile.files: if os.path.exists(temp_file): os.remove(temp_file) TempFile.files = list()