import subprocess import netCDF4 import numpy as np import re from autosubmit.config.log import Log from cdo import Cdo from nco import Nco import tempfile import os import shutil class Utils(object): """ Container class for miscellaneous utility methods """ nco = Nco() """An instance of Nco class ready to be used""" cdo = Cdo() """An instance of Cdo class ready to be used""" @staticmethod def setminmax(filename, variable_list): """ Sets the valid_max and valid_min values to the current max and min values on the file :param filename: path to file :type filename: str :param variable_list: list of variables in which valid_min and valid_max will be set :type variable_list: str | list """ if isinstance(variable_list, basestring): variable_list = variable_list.split() Log.info('Getting max and min values for {0}', ' '.join(variable_list)) handler = Utils.openCdf(filename) for variable in variable_list: var = handler.variables[variable] values = [np.max(var), np.min(var)] Utils.nco.ncatted(input=filename, output=filename, options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0])) Utils.nco.ncatted(input=filename, output=filename, options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1])) handler.close() @staticmethod def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False): """ Rename multiple variables from a NetCDF file :param filepath: path to file :type filepath: str :param old_name: variable's name to change :type old_name: str :param new_name: new name :type new_name: str :param must_exist: if True, the function will raise an exception if the variable name does not exist :type must_exist: bool :param rename_dimension: if True, also rename dimensions with the same name :type rename_dimension: bool """ Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension) @staticmethod def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False): """ Rename multiple variables from a NetCDF file :param filepath: path to file :type filepath: str :param dic_names: dictionary containing old names as keys and new names as values :type dic_names: dict :param must_exist: if True, the function will raise an exception if the variable name does not exist :type must_exist: bool :param rename_dimension: if True, also rename dimensions with the same name :type rename_dimension: bool """ handler = Utils.openCdf(filepath) for old_name, new_name in dic_names.items(): if old_name in handler.variables: if new_name not in handler.variables: handler.renameVariable(old_name, new_name) elif must_exist: raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath)) if rename_dimension: if old_name in handler.dimensions: handler.renameDimension(old_name, new_name) elif must_exist: raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath)) handler.sync() handler.close() @staticmethod def move_file(source, destiny): """ Moves a file from source to destiny, creating dirs if necessary :param source: path to source :type source: str :param destiny: path to destiny :type destiny: str """ if not os.path.exists(os.path.dirname(destiny)): try: os.makedirs(os.path.dirname(destiny)) except OSError as ex: # This can be due to a race condition. If directory already exists, we don have to do nothing if not os.path.exists(os.path.dirname(destiny)): raise ex shutil.move(source, destiny) @staticmethod def execute_shell_command(command, log_level=Log.DEBUG): """ Executes a sheel command :param command: command to execute :type command: str|list :param log_level: log level to use for command output :type log_level: int :return: command output :rtype: list """ if isinstance(command, basestring): command = command.split() process = subprocess.Popen(command, stdout=subprocess.PIPE) output = list() comunicate = process.communicate() if log_level != Log.NO_LOG: for line in comunicate: if not line: continue Log.log.log(log_level, line) output.append(line) if process.returncode != 0: raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode) return output _cpu_count = None @staticmethod def available_cpu_count(): """ Number of available virtual or physical CPUs on this system, i.e. user/real as output by time(1) when called with an optimally scaling userspace-only program""" if Utils._cpu_count is None: try: m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', open('/proc/self/status').read()) if m: res = bin(int(m.group(1).replace(',', ''), 16)).count('1') if res > 0: Utils._cpu_count = res except IOError: try: import multiprocessing Utils._cpu_count = multiprocessing.cpu_count() return Utils._cpu_count except (ImportError, NotImplementedError): Utils._cpu_count = -1 Log.info('Available cores: {0}', Utils._cpu_count) return Utils._cpu_count @staticmethod def convert2netcdf4(filetoconvert): temp = TempFile.get() handler = Utils.openCdf(filetoconvert) if handler.file_format == 'NETCDF4': handler.close() return handler.close() Log.debug('Reformatting to netCDF-4') Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp]) shutil.move(temp, filetoconvert) # noinspection PyPep8Naming @staticmethod def openCdf(filepath, mode='a'): return netCDF4.Dataset(filepath, mode) @staticmethod def get_datetime_from_netcdf(handler, time_variable='time'): nctime = handler.variables[time_variable][:] # get values units = handler.variables[time_variable].units # get unit "days since 1950-01-01T00:00:00Z" try: cal_temps = handler.variables[time_variable].calendar except AttributeError: # Attribute doesn't exist cal_temps = u"gregorian" # or standard return netCDF4.num2date(nctime, units=units, calendar=cal_temps) @staticmethod def copy_variable(source, destiny, variable, must_exist=True): if not must_exist and variable not in source.variables.keys(): return if variable in destiny.variables.keys(): return original_var = source.variables[variable] new_var = destiny.createVariable(variable, original_var.datatype, original_var.dimensions) new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()}) new_var[:] = original_var[:] class TempFile(object): """ Class to manage temporal files """ autoclean = True """ If True, new temporary files are added to the list for future cleaning """ files = list() """ List of files to clean automatically """ scratch_folder = '' """ Scratch folder to create temporary files on it """ prefix = 'temp' """ Prefix for temporary filenames """ @staticmethod def get(filename=None, clean=None, suffix='.nc'): """ Gets a new temporal filename, storing it for automated cleaning :param suffix: :param filename: if it is not none, the function will use this filename instead of a random one :type filename: str :param clean: if true, stores filename for cleaning :type clean: bool :return: path to the temporal file :rtype: str """ if clean is None: clean = TempFile.autoclean if filename: path = os.path.join(TempFile.scratch_folder, filename) else: fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix) os.close(fd) if clean: TempFile.files.append(path) return path @staticmethod def clean(): """ Removes all temporary files created with Tempfile until now """ for temp_file in TempFile.files: if os.path.exists(temp_file): os.remove(temp_file) TempFile.files = list()