Newer
Older
import subprocess
Javier Vegas-Regidor
committed
import numpy as np
from autosubmit.config.log import Log
Javier Vegas-Regidor
committed
from cdo import Cdo
from nco import Nco
import tempfile
import os
Javier Vegas-Regidor
committed
class Utils(object):
"""
Container class for miscellaneous utility methods
"""
Javier Vegas-Regidor
committed
nco = Nco()
"""An instance of Nco class ready to be used"""
Javier Vegas-Regidor
committed
cdo = Cdo()
"""An instance of Cdo class ready to be used"""
Javier Vegas-Regidor
committed
@staticmethod
def setminmax(filename, variable_list):
"""
Sets the valid_max and valid_min values to the current max and min values on the file
:param filename: path to file
:type filename: str
:param variable_list: list of variables in which valid_min and valid_max will be set
:type variable_list: str | list
"""
if isinstance(variable_list, basestring):
variable_list = variable_list.split()
Javier Vegas-Regidor
committed
Log.info('Getting max and min values for {0}', ' '.join(variable_list))
Javier Vegas-Regidor
committed
dataset = Utils.nco.readCdf(filename)
for variable in variable_list:
var = dataset.variables[variable]
values = [np.max(var), np.min(var)]
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]))
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]))
def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param old_name: variable's name to change
:type old_name: str
:param new_name: new name
:type new_name: str
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param dic_names: dictionary containing old names as keys and new names as values
:type dic_names: dict
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
Javier Vegas-Regidor
committed
handler = Utils.nco.openCdf(filepath)
for old_name, new_name in dic_names.items():
Javier Vegas-Regidor
committed
if old_name in handler.variables:
if new_name not in handler.variables:
handler.renameVariable(old_name, new_name)
Javier Vegas-Regidor
committed
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
Javier Vegas-Regidor
committed
if old_name in handler.dimensions:
handler.renameDimension(old_name, new_name)
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
handler.close()
@staticmethod
def move_file(source, destiny):
"""
Moves a file from source to destiny, creating dirs if necessary
:param source: path to source
:type source: str
:param destiny: path to destiny
:type destiny: str
"""
if not os.path.exists(os.path.dirname(destiny)):
os.makedirs(os.path.dirname(destiny))
shutil.move(source, destiny)
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
"""
Executes a sheel command
:param command: command to execute
:type command: str|list
:param log_level: log level to use for command output
:type log_level: int
:return: command output
:rtype: list
"""
if isinstance(command, basestring):
command = command.split()
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output = list()
comunicate = process.communicate()
if log_level != Log.NO_LOG:
for line in comunicate:
if not line:
continue
Log.log.log(log_level, line)
output.append(line)
if process.returncode != 0:
raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode)
@staticmethod
def get_cardinal_coordinate(value, latitude):
"""
Translates a degree value into degrees NSEO
:param value: value in degress
:type value: float
:param latitude: if True, value is latitude, Otherwise value is longitude
:type latitude: bool
:return: transformed value
:rtype: str
"""
if latitude:
if value < 0:
direction = 'S'
else:
direction = 'N'
else:
if value < 0:
direction = 'W'
else:
direction = 'E'
return str(abs(value)) + direction
_cpu_count = None
@staticmethod
def available_cpu_count():
""" Number of available virtual or physical CPUs on this system, i.e.
user/real as output by time(1) when called with an optimally scaling
userspace-only program"""
if Utils._cpu_count is None:
try:
m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$',
open('/proc/self/status').read())
if m:
res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
if res > 0:
Utils._cpu_count = res
except IOError:
try:
import multiprocessing
Utils._cpu_count = multiprocessing.cpu_count()
return Utils._cpu_count
except (ImportError, NotImplementedError):
Utils._cpu_count = -1
Log.info('Available cores: {0}', Utils._cpu_count)
return Utils._cpu_count
@staticmethod
def convert2netcdf4(filetoconvert):
temp = TempFile.get()
handler = Utils.cdo.openCdf(filetoconvert)
if handler.file_format == 'NETCDF4':
handler.close()
return
handler.close()
Log.debug('Reformating to netCDF-4')
Utils.execute_shell_command(["nccopy", "-4", filetoconvert, temp])
shutil.move(temp, filetoconvert)
"""
Class to manage temporal files
"""
"""
If True, new temporary files are added to the list for future cleaning
"""
"""
List of files to clean automatically
"""
scratch_folder = os.path.join('/scratch', pwd.getpwuid(os.getuid())[0])
"""
Scratch folder to create temporary files on it
"""
"""
Prefix for temporary filenames
"""
def get(filename=None, clean=None):
"""
Gets a new temporal filename, storing it for automated cleaning
:param filename: if it is not none, the function will use this filename instead of a random one
:type filename: str
:param clean: if true, stores filename for cleaning
:type clean: bool
:return: path to the temporal file
:rtype: str
"""
if clean is None:
clean = TempFile.autoclean
if filename:
path = os.path.join(TempFile.scratch_folder, filename)
else:
Javier Vegas-Regidor
committed
fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix='.nc')
os.close(fd)
if clean:
TempFile.files.append(path)
return path
@staticmethod
def clean():
"""
Removes all temporary files created with Tempfile until now
"""
for temp_file in TempFile.files:
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()