Newer
Older
import subprocess
Javier Vegas-Regidor
committed
import numpy as np
from autosubmit.config.log import Log
Javier Vegas-Regidor
committed
from cdo import Cdo
from nco import Nco
import tempfile
import os
Javier Vegas-Regidor
committed
class Utils(object):
"""
Container class for miscellaneous utility methods
"""
Javier Vegas-Regidor
committed
nco = Nco()
"""An instance of Nco class ready to be used"""
Javier Vegas-Regidor
committed
cdo = Cdo()
"""An instance of Cdo class ready to be used"""
Javier Vegas-Regidor
committed
@staticmethod
def setminmax(filename, variable_list):
"""
Sets the valid_max and valid_min values to the current max and min values on the file
:param filename: path to file
:type filename: str
:param variable_list: list of variables in which valid_min and valid_max will be set
:type variable_list: str | list
"""
if isinstance(variable_list, basestring):
variable_list = variable_list.split()
Javier Vegas-Regidor
committed
Log.info('Getting max and min values for {0}', ' '.join(variable_list))
Javier Vegas-Regidor
committed
dataset = Utils.nco.readCdf(filename)
for variable in variable_list:
var = dataset.variables[variable]
values = [np.max(var), np.min(var)]
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]))
Utils.nco.ncatted(input=filename, output=filename,
options='-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]))
def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param old_name: variable's name to change
:type old_name: str
:param new_name: new name
:type new_name: str
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
:param filepath: path to file
:type filepath: str
:param dic_names: dictionary containing old names as keys and new names as values
:type dic_names: dict
:param must_exist: if True, the function will raise an exception if the variable name does not exist
:type must_exist: bool
:param rename_dimension: if True, also rename dimensions with the same name
:type rename_dimension: bool
"""
Javier Vegas-Regidor
committed
handler = Utils.nco.openCdf(filepath)
for old_name, new_name in dic_names.items():
Javier Vegas-Regidor
committed
if old_name in handler.variables:
if new_name not in handler.variables:
handler.renameVariable(old_name, new_name)
Javier Vegas-Regidor
committed
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
Javier Vegas-Regidor
committed
if old_name in handler.dimensions:
handler.renameDimension(old_name, new_name)
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
handler.close()
@staticmethod
def move_file(source, destiny):
"""
Moves a file from source to destiny, creating dirs if necessary
:param source: path to source
:type source: str
:param destiny: path to destiny
:type destiny: str
"""
if not os.path.exists(os.path.dirname(destiny)):
os.makedirs(os.path.dirname(destiny))
shutil.move(source, destiny)
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
"""
Executes a sheel command
:param command: command to execute
:type command: str|list
:param log_level: log level to use for command output
:type log_level: int
:return: command output
:rtype: list
"""
if isinstance(command, basestring):
command = command.split()
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output = list()
comunicate = process.communicate()
if log_level != Log.NO_LOG:
for line in comunicate:
if not line:
continue
Log.log.log(log_level, line)
output.append(line)
if process.returncode != 0:
raise Exception('Error executing {0}\n Return code: {1}', ' '.join(command), process.returncode)
@staticmethod
def get_cardinal_coordinate(value, latitude):
"""
Translates a degree value into degrees NSEO
:param value: value in degress
:type value: float
:param latitude: if True, value is latitude, Otherwise value is longitude
:type latitude: bool
:return: transformed value
:rtype: str
"""
if latitude:
if value < 0:
direction = 'S'
else:
direction = 'N'
else:
if value < 0:
direction = 'W'
else:
direction = 'E'
return str(abs(value)) + direction
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
_cpu_count = None
@staticmethod
def available_cpu_count():
""" Number of available virtual or physical CPUs on this system, i.e.
user/real as output by time(1) when called with an optimally scaling
userspace-only program"""
if Utils._cpu_count is None:
try:
m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$',
open('/proc/self/status').read())
if m:
res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
if res > 0:
Utils._cpu_count = res
except IOError:
try:
import multiprocessing
Utils._cpu_count = multiprocessing.cpu_count()
return Utils._cpu_count
except (ImportError, NotImplementedError):
Utils._cpu_count = -1
Log.info('Available cores: {0}', Utils._cpu_count)
return Utils._cpu_count
"""
Class to manage temporal files
"""
"""
If True, new temporary files are added to the list for future cleaning
"""
"""
List of files to clean automatically
"""
scratch_folder = os.path.join('/scratch', pwd.getpwuid(os.getuid())[0])
"""
Scratch folder to create temporary files on it
"""
"""
Prefix for temporary filenames
"""
def get(filename=None, clean=None):
"""
Gets a new temporal filename, storing it for automated cleaning
:param filename: if it is not none, the function will use this filename instead of a random one
:type filename: str
:param clean: if true, stores filename for cleaning
:type clean: bool
:return: path to the temporal file
:rtype: str
"""
if clean is None:
clean = TempFile.autoclean
if filename:
path = os.path.join(TempFile.scratch_folder, filename)
else:
fd, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix='.nc')
os.close(fd)
if clean:
TempFile.files.append(path)
return path
@staticmethod
def clean():
"""
Removes all temporary files created with Tempfile until now
"""
for temp_file in TempFile.files:
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()