Newer
Older
"""Common utilities for multiple topics that are not big enough to have their own module"""
import subprocess
import tempfile
from contextlib import contextmanager
Javier Vegas-Regidor
committed
import numpy as np
from bscearth.utils.log import Log
Javier Vegas-Regidor
committed
from nco import Nco
Javier Vegas-Regidor
committed
@contextmanager
def suppress_stdout():
with open(os.devnull, "w") as devnull:
old_stdout = sys.stdout
sys.stdout = devnull
try:
yield
finally:
sys.stdout = old_stdout
Javier Vegas-Regidor
committed
class Utils(object):
"""Container class for miscellaneous utility methods"""
Javier Vegas-Regidor
committed
nco = Nco()
"""An instance of Nco class ready to be used"""
"""An instance of Cdo class ready to be used"""
Javier Vegas-Regidor
committed
Parameters
----------
basin: Basin
Returns
-------
numpy.array
Raises
------
Exception: If mask.regions.nc is not available
"""
basin = Basins().parse(basin)
if basin != Basins().Global:
if with_levels:
mask_handler = Utils.open_cdf('mask_regions.3d.nc')
mask = mask_handler.variables[basin.name][0, ...]
else:
mask_handler = Utils.open_cdf('mask_regions.nc')
mask = mask_handler.variables[basin.name][:, 0, :]
mask_handler.close()
except IOError:
raise Exception('File mask.regions.nc is required for basin {0}'.format(basin))
mask = mask_handler.variables['tmask'][0, 0, :]
mask_handler.close()
return mask
Javier Vegas-Regidor
committed
@staticmethod
def setminmax(filename, variable_list):
"""
Set the valid_max and valid_min values to the current max and min values on the file
Parameters
----------
filename: str
variable_list: str or iterable of str
Javier Vegas-Regidor
committed
Log.info('Getting max and min values for {0}', ' '.join(variable_list))
Javier Vegas-Regidor
committed
for variable in variable_list:
Javier Vegas-Regidor
committed
values = [np.max(var), np.min(var)]
Utils.nco.ncatted(input=filename, output=filename,
options=('-h -a valid_max,{0},m,f,{1}'.format(variable, values[0]),))
Javier Vegas-Regidor
committed
Utils.nco.ncatted(input=filename, output=filename,
options=('-h -a valid_min,{0},m,f,{1}'.format(variable, values[1]),))
Javier Vegas-Regidor
committed
def rename_variable(filepath, old_name, new_name, must_exist=True, rename_dimension=False):
"""
Rename variable from a NetCDF file
This function is just a wrapper around Utils.rename_variables
Parameters
----------
filepath: str
old_name: str
new_name: str
must_exist: bool, optional
rename_dimension: bool, optional
See Also
--------
Utils.rename_variables
"""
Utils.rename_variables(filepath, {old_name: new_name}, must_exist, rename_dimension)
def rename_variables(filepath, dic_names, must_exist=True, rename_dimension=False):
"""
Rename multiple variables from a NetCDF file
Parameters
----------
filepath: str
dic_names: dict of str: str
Gives the renaming to do in the form old_name: new_name
must_exist: bool, optional
rename_dimension: bool, optional
Raises
-------
ValueError
If any original name is the same as the new
Exception
If any requested variable does not exist and must_exist is True
if old == new:
raise ValueError('{0} original name is the same as the new')
Javier Vegas-Regidor
committed
original_names = set(handler.variables.keys()).union(handler.dimensions.keys())
if not any((True for x in dic_names.keys() if x in original_names)):
handler.close()
if must_exist:
raise Exception("Variables {0} does not exist in file {1}".format(','.join(dic_names.keys()), filepath))
return
handler.close()
temp = TempFile.get()
shutil.copyfile(filepath, temp)
error = False
try:
Javier Vegas-Regidor
committed
Utils._rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension)
except RuntimeError as ex:
Log.debug('Renaming error: {0}', ex)
Javier Vegas-Regidor
committed
handler.close()
Javier Vegas-Regidor
committed
if not error and not Utils.check_netcdf_file(temp):
error = True
if error:
Log.debug('First attemp to rename failed. Using secondary rename method for netCDF')
Utils._rename_by_new_file(dic_names, filepath, temp)
Utils.move_file(temp, filepath)
@staticmethod
def check_netcdf_file(filepath):
"""
Check if a NetCDF file is well stored
This functions is used to check if a NetCDF file is corrupted. It prefers to raise a false postive than
to have false negatives.
Parameters
----------
filepath
Returns
-------
bool
"""
Javier Vegas-Regidor
committed
if 'time' in handler.variables:
if handler.variables['time'].dimensions != ('time', ):
handler.close()
return False
handler.close()
Javier Vegas-Regidor
committed
cubes = iris.load(filepath)
if len(cubes) == 0:
return False
except (iris.exceptions.IrisError, RuntimeError, OSError) as ex:
@staticmethod
def get_file_variables(filename):
"""
Get all the variables in a file
Parameters
----------
filename
Returns
-------
iterable of str
"""
variables = handler.variables.keys()
handler.close()
return variables
Javier Vegas-Regidor
committed
@staticmethod
def _rename_by_new_file(dic_names, filepath, temp):
original_handler = Utils.open_cdf(filepath)
new_handler = Utils.open_cdf(temp, 'w')
Javier Vegas-Regidor
committed
for attribute in original_handler.ncattrs():
Javier Vegas-Regidor
committed
original = getattr(original_handler, attribute)
setattr(new_handler, attribute, Utils.convert_to_ascii_if_possible(original))
Javier Vegas-Regidor
committed
for dimension in original_handler.dimensions.keys():
Utils.copy_dimension(original_handler, new_handler, dimension, new_names=dic_names)
for variable in original_handler.variables.keys():
Utils.copy_variable(original_handler, new_handler, variable, new_names=dic_names)
original_handler.close()
new_handler.close()
Javier Vegas-Regidor
committed
@staticmethod
def convert_to_ascii_if_possible(string, encoding='ascii'):
Convert an Unicode string to ASCII if all characters can be translated.
If a string can not be translated it is unchanged. It also automatically
replaces Bretonnière with Bretonniere
Parameters
----------
string: unicode
encoding: str, optional
Returns
-------
str
"""
Javier Vegas-Regidor
committed
try:
return string.encode(encoding)
except UnicodeEncodeError:
string = string.replace(u'Bretonnière', 'Bretonnière')
return Utils.convert_to_ascii_if_possible(string, encoding)
Javier Vegas-Regidor
committed
return string
Javier Vegas-Regidor
committed
@staticmethod
def _rename_vars_directly(dic_names, filepath, handler, must_exist, rename_dimension):
for old_name, new_name in dic_names.items():
if rename_dimension:
if old_name in handler.dimensions:
handler.renameDimension(old_name, new_name)
elif must_exist:
raise Exception("Dimension {0} does not exist in file {1}".format(old_name, filepath))
if old_name in handler.variables:
if new_name not in handler.variables:
handler.renameVariable(old_name, new_name)
for var in handler.variables:
if hasattr(var, 'coordinates') and " {0} ".format(old_name) in var.coordinates:
new_coordinates = var.coordinates.replace(" {0} ".format(old_name),
" {0} ".format(new_name))
var.coordinates = Utils.convert_to_ascii_if_possible(new_coordinates)
Javier Vegas-Regidor
committed
elif must_exist:
raise Exception("Variable {0} does not exist in file {1}".format(old_name, filepath))
handler.sync()
def copy_file(source, destiny, save_hash=False, use_stored_hash=True, retrials=3):
Copy a file and compute a hash to check if the copy is equal to the source
Parameters
----------
source: str
destiny: str
save_hash: bool, optional
If True, stores a copy of the hash
use_stored_hash: bool, optional
If True, try to use the stored value of the source hash instead of computing it
retrials: int, optional
Minimum value is 1
See Also
--------
move_file
dirname_path = os.path.dirname(destiny)
if dirname_path and not os.path.exists(dirname_path):
try:
os.makedirs(dirname_path)
Utils.give_group_write_permissions(dirname_path)
except OSError as ex:
# This can be due to a race condition. If directory already exists, we don have to do nothing
if not os.path.exists(dirname_path):
raise ex
Javier Vegas-Regidor
committed
Log.debug('Hashing original file... {0}', datetime.datetime.now())
hash_original = Utils.get_file_hash(source, use_stored=use_stored_hash)
while hash_original != hash_destiny:
if retrials == 0:
raise Exception('Can not copy {0} to {1}'.format(source, destiny))
Javier Vegas-Regidor
committed
Log.debug('Copying... {0}', datetime.datetime.now())
shutil.copyfile(source, destiny)
Javier Vegas-Regidor
committed
Log.debug('Hashing copy ... {0}', datetime.datetime.now())
hash_destiny = Utils.get_file_hash(destiny, save=save_hash)
Log.debug('Finished {0}', datetime.datetime.now())
def move_file(source, destiny, save_hash=False, retrials=3):
Move a file and compute a hash to check if the copy is equal to the source
It is just a call to Utils.copy_file followed bu
Parameters
----------
source: str
destiny: str
save_hash: bool, optional
If True, stores a copy of the hash
retrials: int, optional
Minimum value is 1
See Also
--------
copy_file
Utils.copy_file(source, destiny, save_hash, retrials)
Javier Vegas-Regidor
committed
@staticmethod
def remove_file(path):
"""
Delete a file only if it previously exists
Parameters
----------
path: str
Javier Vegas-Regidor
committed
"""
if os.path.isfile(path):
os.remove(path)
@staticmethod
def copy_tree(source, destiny):
Parameters
----------
source: str
destiny: str
See Also
--------
move_tree
"""
if not os.path.exists(destiny):
os.makedirs(destiny)
shutil.copystat(source, destiny)
lst = os.listdir(source)
for item in lst:
item_source = os.path.join(source, item)
item_destiny = os.path.join(destiny, item)
if os.path.isdir(item_source):
Utils.copy_tree(item_source, item_destiny)
else:
shutil.copy2(item_source, item_destiny)
@staticmethod
def move_tree(source, destiny):
Parameters
----------
source: str
destiny: str
See Also
-------
copy_tree
"""
Utils.copy_tree(source, destiny)
shutil.rmtree(source)
Javier Vegas-Regidor
committed
def get_file_hash(filepath, use_stored=False, save=False):
Get the xxHash hash for a given file
Parameters
----------
filepath: str
use_stored: bool, optional
If True, tries to use the stored hash before computing it
save: bool, optional
If True, saves the hash to a file
Javier Vegas-Regidor
committed
if use_stored:
hash_file = Utils._get_hash_filename(filepath)
if os.path.isfile(hash_file):
hash_value = open(hash_file, 'r').readline()
return hash_value
blocksize = 104857600
hasher = xxhash.xxh64()
with open(filepath, 'rb') as afile:
while len(buf) > 0:
hasher.update(buf)
Javier Vegas-Regidor
committed
hash_value = hasher.hexdigest()
if save:
hash_file = open(Utils._get_hash_filename(filepath), 'w')
hash_file.write(hash_value)
hash_file.close()
Javier Vegas-Regidor
committed
@staticmethod
def _get_hash_filename(filepath):
Javier Vegas-Regidor
committed
filename = os.path.basename(filepath)
hash_file = os.path.join(folder, '.{0}.xxhash64.hash'.format(filename))
Javier Vegas-Regidor
committed
return hash_file
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
Execute shell command
Writes the output to the log with the specified level
Parameters
----------
command: str or iterable of str
log_level: int, optional
Returns
-------
iterable of str
Standard output of the command
Raises
------
Utils.ExecutionError
If the command return value is non zero
Javier Vegas-Regidor
committed
command = command.split()
process = subprocess.Popen(command, stdout=subprocess.PIPE)
output = list()
comunicate = process.communicate()
for line in comunicate:
if not line:
continue
if six.PY3:
line = str(line, encoding='UTF-8')
if log_level != Log.NO_LOG:
output.append(line)
if process.returncode != 0:
raise Utils.ExecutionError('Error executing {0}\n Return code: {1}'.format(' '.join(command),
str(process.returncode)))
_cpu_count = None
@staticmethod
def available_cpu_count():
"""Number of available virtual or physical CPUs on this system"""
match = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', open('/proc/self/status').read())
if match:
res = bin(int(match.group(1).replace(',', ''), 16)).count('1')
if res > 0:
Utils._cpu_count = res
except IOError:
try:
import multiprocessing
Utils._cpu_count = multiprocessing.cpu_count()
return Utils._cpu_count
except (ImportError, NotImplementedError):
Utils._cpu_count = -1
return Utils._cpu_count
@staticmethod
def convert2netcdf4(filetoconvert):
Conversion only performed if required. Deflation level set to 4 and shuffle activated.
Parameters
----------
filetoconvert: str
"""
if Utils._is_compressed_netcdf4(filetoconvert):
return
Log.debug('Reformatting to netCDF-4')
temp = TempFile.get()
Javier Vegas-Regidor
committed
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetoconvert, temp])
shutil.move(temp, filetoconvert)
@classmethod
def _is_compressed_netcdf4(cls, filetoconvert):
is_compressed = True
if not handler.file_format == 'NETCDF4':
is_compressed = False
ncdump_result = Utils.execute_shell_command('ncdump -hs {0}'.format(filetoconvert), Log.NO_LOG)
ncdump_result = ncdump_result[0].replace('\t', '').split('\n')
for var in handler.variables:
if not '{0}:_DeflateLevel = 4 ;'.format(var) in ncdump_result:
is_compressed = False
break
if not '{0}:_Shuffle = "true" ;'.format(var) in ncdump_result:
is_compressed = False
break
return is_compressed
# noinspection PyPep8Naming
Parameters
----------
filepath: str
mode: str, optional
Returns
-------
netCDF4.Dataset
return netCDF4.Dataset(filepath, mode)
@staticmethod
def get_datetime_from_netcdf(handler, time_variable='time'):
Get time from NetCDF files
Parameters
----------
handler: netCDF4.Dataset
time_variable: str, optional
Returns
-------
numpy.array of Datetime
Javier Vegas-Regidor
committed
var_time = handler.variables[time_variable]
nctime = var_time[:] # get values
units = var_time.units
try:
Javier Vegas-Regidor
committed
cal_temps = var_time.calendar
except AttributeError:
cal_temps = u"standard"
return netCDF4.num2date(nctime, units=units, calendar=cal_temps)
def copy_variable(source, destiny, variable, must_exist=True, add_dimensions=False, new_names=None):
Copy the given variable from source to destiny
Parameters
----------
source: netCDF4.Dataset
destiny: netCDF4.Dataset
variable: str
must_exist: bool, optional
add_dimensions: bool, optional
new_names: dict of str: str
Raises
------
Exception
If dimensions are not correct in the destiny file and add_dimensions is False
if not must_exist and variable not in source.variables.keys():
return
if not new_names:
new_names = dict()
if variable in new_names:
new_name = new_names[variable]
else:
new_name = variable
if new_name in destiny.variables.keys():
return
translated_dimensions = Utils._translate(source.variables[variable].dimensions, new_names)
if not set(translated_dimensions).issubset(destiny.dimensions):
if not add_dimensions:
raise Exception('Variable {0} can not be added because dimensions does not match: '
'{1} {2}'.format(variable, translated_dimensions, destiny.dimensions))
for dimension in source.variables[variable].dimensions:
Utils.copy_dimension(source, destiny, dimension, must_exist, new_names)
if new_name in destiny.variables.keys():
# Just in case the variable we are copying match a dimension name
return
original_var = source.variables[variable]
new_var = destiny.createVariable(new_name, original_var.datatype, translated_dimensions)
Utils.copy_attributes(new_var, original_var)
if hasattr(new_var, 'coordinates'):
coords = [new_names[coord] if coord in new_names else coord for coord in new_var.coordinates.split(' ')]
new_var.coordinates = Utils.convert_to_ascii_if_possible(' '.join(coords))
new_var[:] = original_var[:]
@staticmethod
def copy_attributes(new_var, original_var, omitted_attributtes=None):
"""
Copy attributtes from one variable to another
Parameters
----------
new_var: netCDF4.Variable
original_var: netCDF4.Variable
omitted_attributtes: iterable of str
Collection of attributtes that should not be copied
"""
if omitted_attributtes is None:
omitted_attributtes = []
new_var.setncatts({k: Utils.convert_to_ascii_if_possible(original_var.getncattr(k))
for k in original_var.ncattrs() if k not in omitted_attributtes})
@staticmethod
def copy_dimension(source, destiny, dimension, must_exist=True, new_names=None):
"""
Copy the given dimension from source to destiny, including dimension variables if present
Parameters
----------
source: netCDF4.Dataset
destiny: netCDF4.Dataset
dimension: str
must_exist: bool, optional
new_names: dict of str: str or None, optional
"""
if not must_exist and dimension not in source.dimensions.keys():
return
if not new_names:
new_names = dict()
if dimension in new_names:
new_name = new_names[dimension]
else:
new_name = dimension
if new_name in destiny.dimensions.keys():
return
if not new_name:
new_name = dimension
destiny.createDimension(new_name, source.dimensions[dimension].size)
if dimension in source.variables:
Utils.copy_variable(source, destiny, dimension, new_names=new_names)
@staticmethod
def concat_variables(source, destiny, remove_source=False):
"""
Add variables from a nc file to another
Parameters
----------
source: str
destiny: str
remove_source: bool
if True, removes source file
if os.path.exists(destiny):
handler_total = Utils.open_cdf(destiny)
handler_variable = Utils.open_cdf(source)
concatenated = dict()
for var in handler_variable.variables:
if var not in handler_total.variables:
Utils.copy_variable(handler_variable, handler_total, var, add_dimensions=True)
else:
variable = handler_variable.variables[var]
if 'time' not in variable.dimensions:
continue
concatenated[var] = np.concatenate((handler_total.variables[var][:], variable[:]),
axis=variable.dimensions.index('time'))
handler_total.variables[var][:] = array
handler_total.close()
handler_variable.close()
if remove_source:
os.remove(source)
else:
if remove_source:
Utils.move_file(source, destiny)
else:
shutil.copy(source, destiny)
Utils.convert2netcdf4(destiny)
class ExecutionError(Exception):
"""Exception to raise when a command execution fails"""
pass
@classmethod
def _translate(cls, dimensions, new_names):
translated = list()
for dim in dimensions:
if dim in new_names:
translated.append(new_names[dim])
else:
translated.append(dim)
return translated
Javier Vegas-Regidor
committed
@staticmethod
def create_folder_tree(path):
"""
Create a folder path with all parent directories if needed.
Parameters
----------
path: str
Javier Vegas-Regidor
committed
"""
if not os.path.exists(path):
Javier Vegas-Regidor
committed
try:
os.makedirs(path)
except OSError:
# This could happen if two threads are tying to create the folder.
# Let's check again for existence and rethrow if still not exists
Javier Vegas-Regidor
committed
@staticmethod
def give_group_write_permissions(path):
stats = os.stat(path)
if stats.st_mode & stat.S_IWGRP:
def convert_units(var_handler, new_units, calendar=None, old_calendar=None):
"""
Convert units
Parameters
----------
var_handler: Dataset
new_units: str
calendar: str
old_calendar: str
"""
if new_units == var_handler.units:
return
Javier Vegas-Regidor
committed
if hasattr(var_handler, 'calendar'):
old_calendar = var_handler.calendar
new_unit = cf_units.Unit(new_units, calendar=calendar)
old_unit = cf_units.Unit(var_handler.units, calendar=old_calendar)
var_handler[:] = old_unit.convert(var_handler[:], new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = old_unit.convert(float(var_handler.valid_min), new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = old_unit.convert(float(var_handler.valid_max), new_unit,
inplace=True)
var_handler.units = new_units
def untar(files, destiny_path):
"""
Untar files to a given destiny
Parameters
----------
files: iterable of str
destiny_path: str
for filepath in files:
Log.debug('Unpacking {0}', filepath)
tar = tarfile.open(filepath)
for file_compressed in tar.getmembers():
if file_compressed.isdir():
if os.path.isdir(os.path.join(destiny_path, file_compressed.name)):
continue
else:
if os.path.exists(os.path.join(destiny_path, file_compressed.name)):
os.remove(os.path.join(destiny_path, file_compressed.name))
tar.extract(file_compressed, destiny_path)
tar.close()
files: str or iterable of str
force: bool, optional
if True, it will overwrite unzipped files
files = [files]
for filepath in files:
Log.debug('Unzipping {0}', filepath)
if force:
option = ' -f'
else:
option = ''
Utils.execute_shell_command('gunzip{1} {0}'.format(filepath, option))
except Exception as ex:
raise Utils.UnzipException('Can not unzip {0}: {1}'.format(filepath, ex))
class UnzipException(Exception):
"""
If True, new temporary files are added to the list for future cleaning
"""
"""
List of files to clean automatically
"""
Javier Vegas-Regidor
committed
scratch_folder = ''
"""
Scratch folder to create temporary files on it
"""
"""
Prefix for temporary filenames
"""
def get(filename=None, clean=None, suffix='.nc'):
Get a new temporal filename, storing it for automated cleaning
:param filename: if it is not none, the function will use this filename instead of a random one
:type filename: str
:param clean: if true, stores filename for cleaning
:type clean: bool
:return: path to the temporal file
:rtype: str
"""
if clean is None:
clean = TempFile.autoclean
if filename:
path = os.path.join(TempFile.scratch_folder, filename)
else:
file_descriptor, path = tempfile.mkstemp(dir=TempFile.scratch_folder, prefix=TempFile.prefix, suffix=suffix)
if clean:
TempFile.files.append(path)
return path
@staticmethod
def clean():
"""Remove all temporary files created with Tempfile until now"""
for temp_file in TempFile.files:
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()