Newer
Older
"""
Common utilities for multiple topics that are not big enough to have their own module
"""
import subprocess
import tempfile
from contextlib import contextmanager
Javier Vegas-Regidor
committed
import numpy as np
from bscearth.utils.log import Log
Javier Vegas-Regidor
committed
from nco import Nco
Javier Vegas-Regidor
committed
@contextmanager
def suppress_stdout():
"""Redirects the standard output to devnull"""
with open(os.devnull, "w") as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
yield
finally:
sys.stdout = old_stdout
Javier Vegas-Regidor
committed
class Utils(object):
"""Container class for miscellaneous utility methods"""
Javier Vegas-Regidor
committed
nco = Nco()
"""An instance of Nco class ready to be used"""
Javier Vegas-Regidor
committed
cdo = Cdo()
"""An instance of Cdo class ready to be used"""
Javier Vegas-Regidor
committed
@staticmethod
def get_mask(basin):
Parameters
----------
basin: Basin
Returns
-------
numpy.array
Raises
------
Exception: If mask.regions.nc is not available
"""
basin = Basins().parse(basin)
if basin != Basins().Global:
mask = mask_handler.variables[basin.name][:, 0, :]
mask_handler.close()
except IOError:
raise Exception('File mask.regions.nc is required for basin {0}'.format(basin))
mask = mask_handler.variables['tmask'][0, 0, :]
mask_handler.close()
return mask
Javier Vegas-Regidor
committed
@staticmethod
def setminmax(filename, variable_list):
"""
Set the valid_max and valid_min values to the current max and min values on the file
Parameters
----------
filename: str
variable_list: str or iterable of str
Javier Vegas-Regidor
committed
Log.info('Getting max and min values for {0}', ' '.join(variable_list))
Javier Vegas-Regidor
committed
for variable in variable_list:
Javier Vegas-Regidor
committed
values = [np.max(var), np.min(var)]
Utils.nco.ncatted(input=filename, output=filename,
options=('-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]),))
Javier Vegas-Regidor
committed
Utils.nco.ncatted(input=filename, output=filename,
options=('-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]),))
Javier Vegas-Regidor
committed
def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
"""
Rename variable from a NetCDF file
This function is just a wrapper around Utils.rename_variables
Parameters
----------
filepath: str
old_name: str
new_name: str
must_exist: bool, optional
rename_dimension: bool, optional
See Also
--------
Utils.rename_variables
"""
Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
Parameters
----------
filepath: str
dic_names: dict of str: str
Gives the renaming to do in the form old_name: new_name
must_exist: bool, optional
rename_dimension: bool, optional
Raises
-------
ValueError
If any original name is the same as the new
Exception
If any requested variable does not exist and must_exist is True
if old == new:
raise ValueError('{0} original name is the same as the new')
Javier Vegas-Regidor
committed
original_names = set(handler.variables.keys()).union(handler.dimensions.keys())
if not any((True for x in dic_names.keys() if x in original_names)):
handler.close()
if must_exist:
raise Exception("Variables {0} does not exist in file {1}".format(','.join(dic_names.keys()), filepath))
return
handler.close()
temp = TempFile.get()
shutil.copyfile(filepath, temp)
error = False
try:
Javier Vegas-Regidor
committed
Utils._rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension)
except RuntimeError as ex:
Log.debug('Renaming error: {0}', ex)
Javier Vegas-Regidor
committed
handler.close()
Javier Vegas-Regidor
committed
if not error and not Utils.check_netcdf_file(temp):
error = True
if error:
Log.debug('First attemp to rename failed. Using secondary rename method for netCDF')
Utils._rename_by_new_file(dic_names, filepath, temp)
Utils.move_file(temp, filepath)
@staticmethod
def check_netcdf_file(filepath):
"""
Check if a NetCDF file is well stored
This functions is used to check if a NetCDF file is corrupted. It prefers to raise a false postive than
to have false negatives.
Parameters
----------
filepath
Returns
-------
bool
"""
Javier Vegas-Regidor
committed
if 'time' in handler.variables:
if handler.variables['time'].dimensions != ('time', ):
handler.close()
return False
handler.close()
Javier Vegas-Regidor
committed
cubes = iris.load(filepath)
if len(cubes) == 0:
return False
except (iris.exceptions.IrisError, RuntimeError) as ex:
@staticmethod
def get_file_variables(filename):
"""
Get all the variables in a file
Parameters
----------
filename
Returns
-------
iterable of str
"""
variables = handler.variables.keys()
handler.close()
return variables
Javier Vegas-Regidor
committed
@staticmethod
def _rename_by_new_file(dic_names, filepath, temp):
original_handler = Utils.open_cdf(filepath)
new_handler = Utils.open_cdf(temp, 'w')
Javier Vegas-Regidor
committed
for attribute in original_handler.ncattrs():
Javier Vegas-Regidor
committed
original = getattr(original_handler, attribute)
setattr(new_handler, attribute, Utils.convert_to_ascii_if_possible(original))
Javier Vegas-Regidor
committed
for dimension in original_handler.dimensions.keys():
Utils.copy_dimension(original_handler, new_handler, dimension, new_names=dic_names)
for variable in original_handler.variables.keys():
Utils.copy_variable(original_handler, new_handler, variable, new_names=dic_names)
original_handler.close()
new_handler.close()
Javier Vegas-Regidor
committed
@staticmethod
def convert_to_ascii_if_possible(string, encoding='ascii'):
Convert an Unicode string to ASCII if all characters can be translated.
If a string can not be translated it is unchanged. It also automatically
replaces Bretonnière with Bretonniere
Parameters
----------
string: unicode
encoding: str, optional
Returns
-------
str
"""
Javier Vegas-Regidor
committed
try:
return string.encode(encoding)
except UnicodeEncodeError:
string = string.replace(u'Bretonnière', 'Bretonnière')
return Utils.convert_to_ascii_if_possible(string, encoding)
Javier Vegas-Regidor
committed
return string
Javier Vegas-Regidor
committed
@staticmethod
def _rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension):
for old_name, new_name in dic_names.items():
if rename_dimension:
if old_name in handler.dimensions:
handler.renameDimension(old_name, new_name)
elif must_exist:
raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath))
if old_name in handler.variables:
if new_name not in handler.variables:
handler.renameVariable(old_name, new_name)
for var in handler.variables:
if hasattr(var, 'coordinates') and " {0} ".format(old_name) in var.coordinates:
new_coordinates = var.coordinates.replace(" {0} ".format(old_name),
" {0} ".format(new_name))
var.coordinates = Utils.convert_to_ascii_if_possible(new_coordinates)
Javier Vegas-Regidor
committed
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
handler.sync()
def copy_file(source, destiny, save_hash=False, use_stored_hash=True, retrials=3):
Copy a file and compute a hash to check if the copy is equal to the source
Parameters
----------
source: str
destiny: str
save_hash: bool, optional
If True, stores a copy of the hash
use_stored_hash: bool, optional
If True, try to use the stored value of the source hash instead of computing it
retrials: int, optional
Minimum value is 1
See Also
--------
move_file
dirname_path = os.path.dirname(destiny)
if dirname_path and not os.path.exists(dirname_path):
try:
os.makedirs(dirname_path)
Utils.give_group_write_permissions(dirname_path)
except OSError as ex:
# This can be due to a race condition. If directory already exists, we don have to do nothing
if not os.path.exists(dirname_path):
raise ex
Javier Vegas-Regidor
committed
Log.debug('Hashing original file... {0}', datetime.datetime.now())
hash_original = Utils.get_file_hash(source, use_stored=use_stored_hash)
while hash_original != hash_destiny:
if retrials == 0:
raise Exception('Can not copy {0} to {1}'.format(source, destiny))
Javier Vegas-Regidor
committed
Log.debug('Copying... {0}', datetime.datetime.now())
shutil.copyfile(source, destiny)
Javier Vegas-Regidor
committed
Log.debug('Hashing copy ... {0}', datetime.datetime.now())
hash_destiny = Utils.get_file_hash(destiny, save=save_hash)
Log.debug('Finished {0}', datetime.datetime.now())
def move_file(source, destiny, save_hash=False, retrials=3):
Move a file and compute a hash to check if the copy is equal to the source
It is just a call to Utils.copy_file followed bu
Parameters
----------
source: str
destiny: str
save_hash: bool, optional
If True, stores a copy of the hash
retrials: int, optional
Minimum value is 1
See Also
--------
copy_file
Utils.copy_file(source, destiny, save_hash, retrials)
Javier Vegas-Regidor
committed
@staticmethod
def remove_file(path):
"""
Delete a file only if it previously exists
Parameters
----------
path: str
Javier Vegas-Regidor
committed
"""
if os.path.isfile(path):
os.remove(path)
@staticmethod
def copy_tree(source, destiny):
"""
Copies a full tree to a new location
Parameters
----------
source: str
destiny: str
See Also
--------
move_tree
"""
if not os.path.exists(destiny):
os.makedirs(destiny)
shutil.copystat(source, destiny)
lst = os.listdir(source)
for item in lst:
item_source = os.path.join(source, item)
item_destiny = os.path.join(destiny, item)
if os.path.isdir(item_source):
Utils.copy_tree(item_source, item_destiny)
else:
shutil.copy2(item_source, item_destiny)
@staticmethod
def move_tree(source, destiny):
Parameters
----------
source: str
destiny: str
See Also
-------
copy_tree
"""
Utils.copy_tree(source, destiny)
shutil.rmtree(source)
Javier Vegas-Regidor
committed
def get_file_hash(filepath, use_stored=False, save=False):
Get the xxHash hash for a given file
Parameters
----------
filepath: str
use_stored: bool, optional
If True, tries to use the stored hash before computing it
save: bool, optional
If True, saves the hash to a file
Javier Vegas-Regidor
committed
if use_stored:
hash_file = Utils._get_hash_filename(filepath)
if os.path.isfile(hash_file):
hash_value = open(hash_file, 'r').readline()
return hash_value
blocksize = 104857600
hasher = xxhash.xxh64()
with open(filepath, 'rb') as afile:
while len(buf) > 0:
hasher.update(buf)
Javier Vegas-Regidor
committed
hash_value = hasher.hexdigest()
if save:
hash_file = open(Utils._get_hash_filename(filepath), 'w')
hash_file.write(hash_value)
hash_file.close()
Javier Vegas-Regidor
committed
@staticmethod
def _get_hash_filename(filepath):
Javier Vegas-Regidor
committed
filename = os.path.basename(filepath)
hash_file = os.path.join(folder, '.{0}.xxhash64.hash'.format(filename))
Javier Vegas-Regidor
committed
return hash_file
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
Execute shell command
Writes the output to the log with the specified level
Parameters
----------
command: str or iterable of str
log_level: int, optional
Returns
-------
iterable of str
Standard output of the command
Raises
------
Utils.ExecutionError
If the command return value is non zero
Javier Vegas-Regidor
committed
command = command.split()
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output = list()
comunicate = process.communicate()
for line in comunicate:
if not line:
continue
if six.PY3:
line = str(line, encoding='UTF-8')
if log_level != Log.NO_LOG:
output.append(line)
if process.returncode != 0:
raise Utils.ExecutionError('Error executing {0}\n Return code: {1}'.format(' '.join(command),
str(process.returncode)))
_cpu_count = None
@staticmethod
def available_cpu_count():
"""Number of available virtual or physical CPUs on this system"""
match = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', open('/proc/self/status').read())
if match:
res = bin(int(match.group(1).replace(',', ''), 16)).count('1')
if res > 0:
Utils._cpu_count = res
except IOError:
try:
import multiprocessing
Utils._cpu_count = multiprocessing.cpu_count()
return Utils._cpu_count
except (ImportError, NotImplementedError):
Utils._cpu_count = -1
return Utils._cpu_count
@staticmethod
def convert2netcdf4(filetoconvert):
Conversion only performed if required. Deflation level set to 4 and shuffle activated.
Parameters
----------
filetoconvert: str
"""
if Utils._is_compressed_netcdf4(filetoconvert):
return
Log.debug('Reformatting to netCDF-4')
temp = TempFile.get()
Javier Vegas-Regidor
committed
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp])
shutil.move(temp, filetoconvert)
@classmethod
def _is_compressed_netcdf4(cls, filetoconvert):
is_compressed = True
if not handler.file_format == 'NETCDF4':
is_compressed = False
ncdump_result = Utils.execute_shell_command('ncdump -hs {0}'.format(filetoconvert), Log.NO_LOG)
ncdump_result = ncdump_result[0].replace('\t', '').split('\n')
for var in handler.variables:
if not '{0}:_DeflateLevel = 4 ;'.format(var) in ncdump_result:
is_compressed = False
break
if not '{0}:_Shuffle = "true" ;'.format(var) in ncdump_result:
is_compressed = False
break
return is_compressed
# noinspection PyPep8Naming
Parameters
----------
filepath: str
mode: str, optional
Returns
-------
netCDF4.Dataset
return netCDF4.Dataset(filepath, mode)
@staticmethod
def get_datetime_from_netcdf(handler, time_variable='time'):
Get time from NetCDF files
Parameters
----------
handler: netCDF4.Dataset
time_variable: str, optional
Returns
-------
numpy.array of Datetime
Javier Vegas-Regidor
committed
var_time = handler.variables[time_variable]
nctime = var_time[:] # get values
units = var_time.units
try:
Javier Vegas-Regidor
committed
cal_temps = var_time.calendar
except AttributeError:
cal_temps = u"standard"
return netCDF4.num2date(nctime, units=units, calendar=cal_temps)
def copy_variable(source, destiny, variable, must_exist=True, add_dimensions=False, new_names=None):
Copy the given variable from source to destiny
Parameters
----------
source: netCDF4.Dataset
destiny: netCDF4.Dataset
variable: str
must_exist: bool, optional
add_dimensions: bool, optional
new_names: dict of str: str
Raises
------
Exception
If dimensions are not correct in the destiny file and add_dimensions is False
if not must_exist and variable not in source.variables.keys():
return
if not new_names:
new_names = dict()
if variable in new_names:
new_name = new_names[variable]
else:
new_name = variable
if new_name in destiny.variables.keys():
return
translated_dimensions = Utils._translate(source.variables[variable].dimensions, new_names)
if not set(translated_dimensions).issubset(destiny.dimensions):
if not add_dimensions:
raise Exception('Variable {0} can not be added because dimensions does not match: '
'{1} {2}'.format(variable, translated_dimensions, destiny.dimensions))
for dimension in source.variables[variable].dimensions:
Utils.copy_dimension(source, destiny, dimension, must_exist, new_names)
if new_name in destiny.variables.keys():
# Just in case the variable we are copying match a dimension name
return
original_var = source.variables[variable]
new_var = destiny.createVariable(new_name, original_var.datatype, translated_dimensions)
Utils.copy_attributes(new_var, original_var)
if hasattr(new_var, 'coordinates'):
coords = [new_names[coord] if coord in new_names else coord for coord in new_var.coordinates.split(' ')]
new_var.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords))
new_var[:] = original_var[:]
@staticmethod
def copy_attributes(new_var, original_var, omitted_attributtes=None):
"""
Copy attributtes from one variable to another
Parameters
----------
new_var: netCDF4.Variable
original_var: netCDF4.Variable
omitted_attributtes: iterable of str
Collection of attributtes that should not be copied
"""
if omitted_attributtes is None:
omitted_attributtes = []
new_var.setncatts({k: Utils.convert_to_ascii_if_possible(original_var.getncattr(k))
for k in original_var.ncattrs() if k not in omitted_attributtes})
@staticmethod
def copy_dimension(source, destiny, dimension, must_exist=True, new_names=None):
"""
Copy the given dimension from source to destiny, including dimension variables if present
Parameters
----------
source: netCDF4.Dataset
destiny: netCDF4.Dataset
dimension: str
must_exist: bool, optional
new_names: dict of str: str or NoneType, optional
"""
if not must_exist and dimension not in source.dimensions.keys():
return
if not new_names:
new_names = dict()
if dimension in new_names:
new_name = new_names[dimension]
else:
new_name = dimension
if new_name in destiny.dimensions.keys():
return
if not new_name:
new_name = dimension
destiny.createDimension(new_name, source.dimensions[dimension].size)
if dimension in source.variables:
Utils.copy_variable(source, destiny, dimension, new_names=new_names)
@staticmethod
def concat_variables(source, destiny, remove_source=False):
"""
Add variables from a nc file to another
:param source: path to source file
:type source: str
:param destiny: path to destiny file
:type destiny: str
:param remove_source: if True, removes source file
:type remove_source: bool
"""
if os.path.exists(destiny):
handler_total = Utils.open_cdf(destiny)
handler_variable = Utils.open_cdf(source)
concatenated = dict()
for var in handler_variable.variables:
if var not in handler_total.variables:
Utils.copy_variable(handler_variable, handler_total, var, add_dimensions=True)
else:
variable = handler_variable.variables[var]
if 'time' not in variable.dimensions:
continue
concatenated[var] = np.concatenate((handler_total.variables[var][:], variable[:]),
axis=variable.dimensions.index('time'))
handler_total.variables[var][:] = array
handler_total.close()
handler_variable.close()
if remove_source:
os.remove(source)
else:
if remove_source:
Utils.move_file(source, destiny)
else:
shutil.copy(source, destiny)
Utils.convert2netcdf4(destiny)
class ExecutionError(Exception):
"""
Exception to raise when a command execution fails
"""
pass
@classmethod
def _translate(cls, dimensions, new_names):
translated = list()
for dim in dimensions:
if dim in new_names:
translated.append(new_names[dim])
else:
translated.append(dim)
return translated
Javier Vegas-Regidor
committed
@staticmethod
def create_folder_tree(path):
"""
Createas a fodle path will and parent directories if needed.
:param path: folder's path
:type path: str
"""
if not os.path.exists(path):
Javier Vegas-Regidor
committed
try:
os.makedirs(path)
except OSError:
# This could happen if two threads are tying to create the folder.
# Let's check again for existence and rethrow if still not exists
Javier Vegas-Regidor
committed
@staticmethod
def give_group_write_permissions(path):
stats = os.stat(path)
if stats.st_mode & stat.S_IWGRP:
def convert_units(var_handler, new_units, calendar=None, old_calendar=None):
"""
Convert units
Parameters
----------
var_handler: Dataset
new_units: str
calendar: str
old_calendar: str
"""
if new_units == var_handler.units:
return
Javier Vegas-Regidor
committed
if hasattr(var_handler, 'calendar'):
old_calendar = var_handler.calendar
new_unit = cf_units.Unit(new_units, calendar=calendar)
old_unit = cf_units.Unit(var_handler.units, calendar=old_calendar)
var_handler[:] = old_unit.convert(var_handler[:], new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = old_unit.convert(float(var_handler.valid_min), new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = old_unit.convert(float(var_handler.valid_max), new_unit,
inplace=True)
var_handler.units = new_units
def untar(files, destiny_path):
"""
Untar files to a given destiny
:param files: files to unzip
Javier Vegas-Regidor
committed
:type files: list[Any] | Tuple[Any]
:param destiny_path: path to destination folder
:type destiny_path: str
"""
for filepath in files:
Log.debug('Unpacking {0}', filepath)
tar = tarfile.open(filepath)
for file_compressed in tar.getmembers():
if file_compressed.isdir():
if os.path.isdir(os.path.join(destiny_path, file_compressed.name)):
continue
else:
if os.path.exists(os.path.join(destiny_path, file_compressed.name)):
os.remove(os.path.join(destiny_path, file_compressed.name))
tar.extract(file_compressed, destiny_path)
tar.close()
"""
Unzip a list of files
:param files: files to unzip
:type files: list | str
:param force: if True, it will overwrite unzipped files
:type force: bool
"""
files = [files]
for filepath in files:
Log.debug('Unzipping {0}', filepath)
if force:
option = ' -f'
else:
option = ''
Utils.execute_shell_command('gunzip{1} {0}'.format(filepath, option))
except Exception as ex:
raise Utils.UnzipException('Can not unzip {0}: {1}'.format(filepath, ex))
class UnzipException(Exception):
"""
Excpetion raised when unzip fails
"""
"""
Class to manage temporal files
"""
"""
If True, new temporary files are added to the list for future cleaning
"""
"""
List of files to clean automatically
"""
Javier Vegas-Regidor
committed
scratch_folder = ''
"""
Scratch folder to create temporary files on it
"""
"""
Prefix for temporary filenames
"""
def get(filename=None, clean=None, suffix='.nc'):
"""
Gets a new temporal filename, storing it for automated cleaning
:param filename: if it is not none, the function will use this filename instead of a random one
:type filename: str
:param clean: if true, stores filename for cleaning
:type clean: bool
:return: path to the temporal file
:rtype: str
"""
if clean is None:
clean = TempFile.autoclean
if filename:
path = os.path.join(TempFile.scratch_folder, filename)
else:
file_descriptor, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix)
if clean:
TempFile.files.append(path)
return path
@staticmethod
def clean():
"""
Removes all temporary files created with Tempfile until now
"""
for temp_file in TempFile.files:
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()