Newer
Older
import subprocess
Javier Vegas-Regidor
committed
import numpy as np
from autosubmit.config.log import Log
Javier Vegas-Regidor
committed
from cdo import Cdo
from nco import Nco
Javier Vegas-Regidor
committed
class Utils(object):
"""
Container class for miscellaneous utility methods
"""
Javier Vegas-Regidor
committed
nco = Nco()
"""An instance of Nco class ready to be used"""
Javier Vegas-Regidor
committed
cdo = Cdo()
"""An instance of Cdo class ready to be used"""
Javier Vegas-Regidor
committed
@staticmethod
def get_mask(basin):
"""
Returns a numpy array containing the mask for the given basin
:param basin: basin to retrieve
:type basin: Basin
:return: mask
:rtype: numpy.array
"""
if basin != Basins.Global:
mask_handler = Utils.openCdf('mask_regions.nc')
mask = mask_handler.variables[basin.fullname][:, 0, :]
mask_handler.close()
else:
mask_handler = Utils.openCdf('mask.nc')
mask = np.asfortranarray(mask_handler.variables['tmask'][0, 0, :])
mask_handler.close()
return mask
Javier Vegas-Regidor
committed
@staticmethod
def setminmax(filename, variable_list):
"""
Sets the valid_max and valid_min values to the current max and min values on the file
:param filename: path to file
:type filename: str
:param variable_list: list of variables in which valid_min and valid_max will be set
:type variable_list: str | list
"""
if isinstance(variable_list, basestring):
variable_list = variable_list.split()
Javier Vegas-Regidor
committed
Log.info('Getting max and min values for {0}', ' '.join(variable_list))
Javier Vegas-Regidor
committed
for variable in variable_list:
Javier Vegas-Regidor
committed
values = [np.max(var), np.min(var)]
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]))
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]))
Javier Vegas-Regidor
committed
def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param old_name: variable's name to change
:type old_name: str
:param new_name: new name
:type new_name: str
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param dic_names: dictionary containing old names as keys and new names as values
:type dic_names: dict
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
handler = Utils.openCdf(filepath)
Javier Vegas-Regidor
committed
original_names = set(handler.variables.keys()).union(handler.dimensions.keys())
if not any((True for x in dic_names.keys() if x in original_names)):
handler.close()
return
handler.close()
temp = TempFile.get()
shutil.copyfile(filepath, temp)
handler = Utils.openCdf(temp)
for old_name, new_name in dic_names.items():
Javier Vegas-Regidor
committed
if old_name in handler.variables:
if new_name not in handler.variables:
handler.renameVariable(old_name, new_name)
Javier Vegas-Regidor
committed
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
Javier Vegas-Regidor
committed
if old_name in handler.dimensions:
handler.renameDimension(old_name, new_name)
elif must_exist:
raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath))
handler.sync()
Javier Vegas-Regidor
committed
handler.close()
try:
Utils.execute_shell_command(['ncdump', '-h', temp])
except Utils.ExecutionError:
original_handler = Utils.openCdf(filepath)
new_handler = Utils.openCdf(temp, 'w')
for attribute in original_handler.ncattrs():
setattr(new_handler, attribute, getattr(original_handler, attribute))
for dimension in original_handler.dimensions.keys():
Utils.copy_dimension(original_handler, new_handler, dimension, new_names=dic_names)
for variable in original_handler.variables.keys():
Utils.copy_variable(original_handler, new_handler, variable, new_names=dic_names)
original_handler.close()
new_handler.close()
Utils.move_file(temp, filepath)
@staticmethod
def move_file(source, destiny):
"""
Moves a file from source to destiny, creating dirs if necessary
:param source: path to source
:type source: str
:param destiny: path to destiny
:type destiny: str
"""
try:
os.makedirs(os.path.dirname(destiny))
except OSError as ex:
# This can be due to a race condition. If directory already exists, we don have to do nothing
if not os.path.exists(os.path.dirname(destiny)):
raise ex
hash_destiny = None
hash_original = Utils.get_file_hash(source)
retrials = 5
while hash_original != hash_destiny:
if retrials == 0:
raise Exception('Can not move {0} to {1}'.format(source, destiny))
shutil.copyfile(source, destiny)
hash_destiny = Utils.get_file_hash(destiny)
os.remove(source)
Javier Vegas-Regidor
committed
@staticmethod
def remove_file(path):
"""
Removes a file, checking before if its exists
:param path: path to file
:type path: str
"""
if os.path.isfile(path):
os.remove(path)
@staticmethod
def get_file_hash(filepath):
"""
Returns the MD5 hash for the given filepath
:param filepath: path to the file to compute hash on
:type filepath:str
:return: file's MD5 hash
:rtype: str
"""
hasher = hashlib.md5()
with open(filepath, 'rb') as afile:
while len(buf) > 0:
hasher.update(buf)
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
"""
Executes a sheel command
:param command: command to execute
Javier Vegas-Regidor
committed
Log.info('Detailed time for diagnostic class')
:param log_level: log level to use for command output
:type log_level: int
:return: command output
:rtype: list
"""
if isinstance(command, basestring):
command = command.split()
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output = list()
comunicate = process.communicate()
if log_level != Log.NO_LOG:
for line in comunicate:
if not line:
continue
Log.log.log(log_level, line)
output.append(line)
if process.returncode != 0:
raise Utils.ExecutionError('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode)
_cpu_count = None
@staticmethod
def available_cpu_count():
"""
Number of available virtual or physical CPUs on this systemx
"""
if Utils._cpu_count is None:
try:
m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$',
open('/proc/self/status').read())
if m:
res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
if res > 0:
Utils._cpu_count = res
except IOError:
try:
import multiprocessing
Utils._cpu_count = multiprocessing.cpu_count()
return Utils._cpu_count
except (ImportError, NotImplementedError):
Utils._cpu_count = -1
return Utils._cpu_count
@staticmethod
def convert2netcdf4(filetoconvert, force=True):
"""
Checks if a file is in netCDF4 format and converts to netCDF4 if not
:param force: if true, converts the file regardless of original encoding. Useful to make sure that a file has
deflation and shuffle activated
:type force: bool
:param filetoconvert: file to convert
:type filetoconvert: str
"""
if not force:
handler = Utils.openCdf(filetoconvert)
if handler.file_format == 'NETCDF4':
handler.close()
return
handler.close()
Log.debug('Reformatting to netCDF-4')
temp = TempFile.get()
Javier Vegas-Regidor
committed
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp])
shutil.move(temp, filetoconvert)
# noinspection PyPep8Naming
@staticmethod
def openCdf(filepath, mode='a'):
"""
Opens a netCDF file and returns a handler to it
:param filepath: path to the file
:type filepath: str
:param mode: mode to open the file. By default, a (append)
:type mode: str
:return: handler to the file
:rtype: netCDF4.Dataset
"""
return netCDF4.Dataset(filepath, mode)
@staticmethod
def get_datetime_from_netcdf(handler, time_variable='time'):
"""
Gets a datetime array from a netCDF file
:param handler: file to read
:type handler: netCDF4.Dataset
:param time_variable: variable to read, by default 'time'
:type time_variable: str
:return: Datetime numpy array created from the values stored at the netCDF file
:rtype: np.array
"""
nctime = handler.variables[time_variable][:] # get values
units = handler.variables[time_variable].units # get unit "days since 1950-01-01T00:00:00Z"
try:
cal_temps = handler.variables[time_variable].calendar
except AttributeError: # Attribute doesn't exist
cal_temps = u"gregorian" # or standard
return netCDF4.num2date(nctime, units=units, calendar=cal_temps)
def copy_variable(source, destiny, variable, must_exist=True, add_dimensions=False, new_names=None):
"""
Copies the given variable from source to destiny
:param add_dimensions: if it's true, dimensions required by the variable will be automatically added to the
file. It will also add the dimension variable
:type add_dimensions: bool
:param source: origin file
:type source: netCDF4.Dataset
:param destiny: destiny file
:type destiny: netCDF4.Dataset
:param variable: variable to copy
:type variable: str
:param must_exist: if false, does not raise an error uf variable does not exist
:type must_exist: bool
if not must_exist and variable not in source.variables.keys():
return
if variable in destiny.variables.keys():
return
if not new_names:
new_names = dict()
if variable in new_names:
new_name = new_names[variable]
else:
new_name = variable
translated_dimensions = Utils._translate(source.variables[variable].dimensions, new_names)
if not set(translated_dimensions).issubset(destiny.dimensions):
if not add_dimensions:
raise Exception('Variable {0} can not be added because dimensions does not match'.format(variable))
for dimension in source.variables[variable].dimensions:
Utils.copy_dimension(source, destiny, dimension, new_names, new_names)
if variable in destiny.variables.keys():
# Just in case the variable we are copying match a dimension name
return
original_var = source.variables[variable]
new_var = destiny.createVariable(new_name, original_var.datatype, translated_dimensions)
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
new_var[:] = original_var[:]
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
@staticmethod
def copy_dimension(source, destiny, dimension, must_exist=True, new_names=None):
"""
Copies the given dimension from source to destiny, including dimension variables if present
:param source: origin file
:type source: netCDF4.Dataset
:param destiny: destiny file
:type destiny: netCDF4.Dataset
:param dimension: variable to copy
:type dimension: str
:param must_exist: if false, does not raise an error uf variable does not exist
:type must_exist: bool
:return:
"""
if not must_exist and dimension not in source.dimensions.keys():
return
if not new_names:
new_names = dict()
if dimension in new_names:
new_name = new_names[dimension]
else:
new_name = dimension
if new_name in destiny.dimensions.keys():
return
if not new_name:
new_name = dimension
destiny.createDimension(new_name, source.dimensions[dimension].size)
if dimension in source.variables:
Utils.copy_variable(source, destiny, dimension, new_names=new_names)
@staticmethod
def concat_variables(source, destiny, remove_source=False):
"""
Add variables from a nc file to another
:param source: path to source file
:type source: str
:param destiny: path to destiny file
:type destiny: str
:param remove_source: if True, removes source file
:type remove_source: bool
"""
if os.path.exists(destiny):
handler_total = Utils.openCdf(destiny)
handler_variable = Utils.openCdf(source)
concatenated = dict()
for var in handler_variable.variables:
if var not in handler_total.variables:
Utils.copy_variable(handler_variable, handler_total, var, add_dimensions=True)
else:
variable = handler_variable.variables[var]
if 'time' not in variable.dimensions:
continue
concatenated[var] = np.concatenate((handler_total.variables[var][:], variable[:]),
axis=variable.dimensions.index('time'))
for var, array in concatenated.iteritems():
handler_total.variables[var][:] = array
handler_total.close()
handler_variable.close()
if remove_source:
os.remove(source)
else:
if remove_source:
Utils.move_file(source, destiny)
else:
shutil.copy(source, destiny)
Utils.convert2netcdf4(destiny, True)
@staticmethod
def expand_path(path):
"""
Expands character ~ and system variables on the given path
:param path: path to expand
:type path: str
:return: path after the expansion
"""
Javier Vegas-Regidor
committed
return os.path.expandvars(os.path.expanduser(path))
class ExecutionError(Exception):
pass
@classmethod
def _translate(cls, dimensions, new_names):
translated = list()
for dim in dimensions:
if dim in new_names:
translated.append(new_names[dim])
else:
translated.append(dim)
return translated
"""
Class to manage temporal files
"""
"""
If True, new temporary files are added to the list for future cleaning
"""
"""
List of files to clean automatically
"""
Javier Vegas-Regidor
committed
scratch_folder = ''
"""
Scratch folder to create temporary files on it
"""
"""
Prefix for temporary filenames
"""
def get(filename=None, clean=None, suffix='.nc'):
"""
Gets a new temporal filename, storing it for automated cleaning
:param filename: if it is not none, the function will use this filename instead of a random one
:type filename: str
:param clean: if true, stores filename for cleaning
:type clean: bool
:return: path to the temporal file
:rtype: str
"""
if clean is None:
clean = TempFile.autoclean
if filename:
path = os.path.join(TempFile.scratch_folder, filename)
else:
fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix)
if clean:
TempFile.files.append(path)
return path
@staticmethod
def clean():
"""
Removes all temporary files created with Tempfile until now
"""
for temp_file in TempFile.files:
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()
Javier Vegas-Regidor
committed