Newer
Older
"""Module for classes to manage storage manipulation"""
import os
Javier Vegas-Regidor
committed
import iris.coords
import iris.experimental.equalise_cubes
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.publisher import Publisher
from earthdiagnostics.variable import VariableType
PENDING = 0
DOWNLOADING = 1
READY = 2
FAILED = 3
NOT_REQUESTED = 4
PENDING = 0
UPLOADING = 1
READY = 2
FAILED = 3
"""
Represent a data file
Must be derived for each concrete data file format
"""
def __init__(self):
super(DataFile, self).__init__()
self.remote_file = None
self.local_file = None
self.domain = None
self.var = None
self.cmor_var = None
self.frequency = None
self.data_convention = None
self._local_status = LocalStatus.NOT_REQUESTED
self._storage_status = StorageStatus.READY
self._modifiers = []
return "Data file for {0}".format(self.remote_file)
@property
def size(self):
if self._size is None:
self._get_size()
return self._size
def _get_size(self):
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
"""Check if a local file is still needed and remove it if not"""
if (
self.local_status != LocalStatus.READY
or self.suscribers
or self.upload_required()
or self.storage_status == StorageStatus.UPLOADING
):
Log.debug(
"File {0} no longer needed. Deleting from scratch...".format(
self.remote_file
)
)
Log.debug("File {0} deleted from scratch".format(self.remote_file))
self.local_file = None
self.local_status = LocalStatus.PENDING
def upload_required(self):
"""
Get if an upload is needed for this file
Returns
-------
bool
"""
return (
self.local_status == LocalStatus.READY
and self.storage_status == StorageStatus.PENDING
)
def download_required(self):
"""
Get if a download is required for this file
Returns
-------
bool
"""
if not self.local_status == LocalStatus.PENDING:
return False
if self.storage_status == StorageStatus.READY:
return True
if self.has_modifiers():
return True
return False
def add_modifier(self, diagnostic):
"""
Register a diagnostic as a modifier of this data
A modifier diagnostic is a diagnostic that read this data and changes
it in any way. The diagnostic must be a modifier even if it only
affects the metadata
Parameters
----------
diagnostic: Diagnostic
"""
self._modifiers.append(diagnostic)
"""
Check if it has registered modifiers
Returns
-------
bool
def ready_to_run(self, diagnostic):
"""
Check if the data is ready to run for a given diagnostics
To be ready to run, the datafile should be in the local storage
and no modifiers can be pending.
Parameters
----------
diagnostic: Diagnostic
if not self.local_status == LocalStatus.READY:
return False
return True
return self._modifiers[0] is diagnostic
@property
def remote_diags_file(self):
remote_diags_file = self.remote_file.replace("/cmorfiles/", "/diags/")
remote_diags_file = remote_diags_file.replace("/original_files/", "/")
return remote_diags_file
@property
def local_status(self):
return self._local_status
@local_status.setter
def local_status(self, value):
if self._local_status == value:
return
self.dispatch(self)
@property
def storage_status(self):
return self._storage_status
@storage_status.setter
def storage_status(self, value):
if self._storage_status == value:
return
self.dispatch(self)
def from_storage(cls, filepath, data_convention):
"""Create a new datafile to be downloaded from the storage"""
file_object = cls()
file_object.remote_file = filepath
file_object.local_status = LocalStatus.PENDING
file_object.data_convention = data_convention
return file_object
@classmethod
def to_storage(cls, remote_file, data_convention):
"""Creates new datafile object for a file to be generated and stored"""
new_object.remote_file = remote_file
new_object.storage_status = StorageStatus.PENDING
new_object.data_convention = data_convention
"""
Get data from remote storage to the local one
Must be overriden by the derived classes
Raises
------
NotImplementedError
If the derived classes do not override this
"""
raise NotImplementedError("Class must implement the download method")
def prepare_to_upload(self, rename_var):
"""
Prepare a local file to be uploaded
This includes renaming the variable if necessary,
updating the metadata and adding the history and
managing the possibility of multiple regions
"""
if rename_var:
original_name = rename_var
else:
original_name = self.var
if self.final_name != original_name:
self.local_file, original_name, self.final_name
)
self._correct_metadata()
self._prepare_region()
Utils.convert2netcdf4(self.local_file)
"""Send a local file to the storage"""
self.storage_status = StorageStatus.UPLOADING
remote_file = self.remote_file
Utils.copy_file(
self.local_file, self.remote_diags_file, save_hash=True)
Log.error("File {0} can not be uploaded: {1}", remote_file, ex)
self.storage_status = StorageStatus.FAILED
return
def set_local_file(self, local_file, diagnostic=None, rename_var=""):
"""
Set the local file generated by EarthDiagnostics
This also prepares it for the upload
Parameters
----------
local_file: str
diagnostic: Diagnostic or None
rename_var: str
Returns
-------
None
"""
self.storage_status = StorageStatus.PENDING
if diagnostic in self._modifiers:
self._modifiers.remove(diagnostic)
self.prepare_to_upload(rename_var)
self.local_status = LocalStatus.READY
"""Create a link from the original in <frequency>_<var_type> folder"""
var_handler = handler.variables[self.final_name]
coords = set.intersection(
{
'time', 'lev',
self.data_convention.lat_name, self.data_convention.lon_name,
'leadtime', 'region', 'time_centered'
},
)
var_handler.coordinates = Utils.convert_to_ascii_if_possible(
" ".join(coords)
)
if "time_centered" in handler.variables:
if hasattr(handler.variables["time_centered"], "standard_name"):
del handler.variables["time_centered"].standard_name
if not self.cmor_var:
handler.close()
return
self._fix_variable_name(var_handler)
handler.modeling_realm = self.cmor_var.domain.name
table = self.cmor_var.get_table(self.frequency, self.data_convention)
handler.table_id = "Table {0} ({1})".format(table.name, table.date)
if self.cmor_var.units:
self._fix_units(var_handler)
handler.sync()
self._fix_coordinate_variables_metadata(handler)
var_type = var_handler.dtype
handler.close()
self._fix_values_metadata(var_type)
def _fix_variable_name(self, var_handler):
var_handler.standard_name = self.cmor_var.standard_name
var_handler.long_name = self.cmor_var.long_name
def _fix_values_metadata(self, var_type, file_path=None):
if file_path is None:
file_path = self.local_file
if self.cmor_var is not None:
if self.cmor_var.valid_min:
valid_min = '-a valid_min,{0},o,{1},"{2}" '.format(
self.final_name, var_type.char, self.cmor_var.valid_min
)
if self.cmor_var.valid_max:
valid_max = '-a valid_max,{0},o,{1},"{2}" '.format(
self.final_name, var_type.char, self.cmor_var.valid_max
)
options=(
'-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(
self.final_name, var_type.char, valid_min, valid_max
),
),
def _fix_coordinate_variables_metadata(self, handler):
lat_name = self.data_convention.lat_name
lon_name = self.data_convention.lon_name
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if self.domain == ModelingRealms.ocean:
handler.variables['lev'].standard_name = 'depth'
if lon_name in handler.variables:
handler.variables[lon_name].short_name = lon_name
handler.variables[lon_name].standard_name = 'longitude'
if lat_name in handler.variables:
handler.variables[lat_name].short_name = lat_name
handler.variables[lat_name].standard_name = 'latitude'
if var_handler.units == "-":
var_handler.units = "1.0"
if var_handler.units == "PSU":
var_handler.units = "psu"
if var_handler.units == "C" and self.cmor_var.units == "K":
var_handler.units = "deg_C"
if self.cmor_var.units != var_handler.units:
self._convert_units(var_handler)
var_handler.units = self.cmor_var.units
def _convert_units(self, var_handler):
try:
Utils.convert_units(var_handler, self.cmor_var.units)
except ValueError as ex:
Log.warning(
"Can not convert {3} from {0} to {1}: {2}",
var_handler.units,
self.cmor_var.units,
ex,
self.cmor_var.short_name,
)
factor, offset = UnitConversion.get_conversion_factor_offset(
var_handler.units, self.cmor_var.units
)
var_handler[:] = var_handler[:] * factor + offset
if "valid_min" in var_handler.ncattrs():
var_handler.valid_min = (
float(var_handler.valid_min) * factor + offset
)
if "valid_max" in var_handler.ncattrs():
var_handler.valid_max = (
float(var_handler.valid_max) * factor + offset
)
if not self.check_is_in_storage(update_status=False):
cube = iris.load_cube(self.local_file)
try:
except iris.exceptions.CoordinateNotFoundError:
Javier Vegas-Regidor
committed
return
old_cube = iris.load_cube(self.remote_diags_file)
old_cube = iris.load_cube(self.remote_file)
except Exception:
# Bad data, overwrite
return
for region_slice in cube.slices_over("region"):
Log.debug(region_slice.coord("region").points[0])
new_data[region_slice.coord("region").points[0]] = region_slice
for region_slice in old_cube.slices_over("region"):
region = region_slice.coord("region").points[0]
Log.debug(region)
if region not in new_data:
new_data[region] = region_slice
cube_list = iris.cube.CubeList(
[new_data[region] for region in sorted(new_data)]
if len(cube_list) == 1:
return
iris.experimental.equalise_cubes.equalise_attributes(cube_list)
final_cube = cube_list.merge_cube()
temp = TempFile.get()
iris.save(final_cube, temp, zlib=True)
handler = Utils.open_cdf(temp)
renames = {}
for dim in handler.dimensions:
if dim.startswith('dim'):
renames[dim] = 'region'
if dim.startswith('string'):
renames[dim] = 'region_length'
renames[final_cube.var_name.replace('-', '_')] = \
final_cube.var_name
Utils.rename_variables(
temp, renames, must_exist=False, rename_dimension=True)
Utils.move_file(temp, self.local_file)
handler2 = Utils.open_cdf(self.local_file)
region_var = handler2.variables['region']
for i, cube in enumerate(cube_list):
encode = 'utf-8'
name = region_var[i, ...].tobytes().strip().decode(encode)
length = handler2.dimensions['region_length'].size
region_var[i, ...] = netCDF4.stringtoarr(name, length)
handler2.close()
def _rename_coordinate_variables(self):
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = self.data_convention.lat_name
variables['nav_lon_grid_V'] = self.data_convention.lon_name
variables['nav_lat_grid_U'] = self.data_convention.lat_name
variables['nav_lon_grid_U'] = self.data_convention.lon_name
variables['nav_lat_grid_T'] = self.data_convention.lat_name
variables['nav_lon_grid_T'] = self.data_convention.lon_name
Utils.rename_variables(self.local_file, variables, False)
def add_diagnostic_history(self):
if not self.diagnostic:
return
from earthdiagnostics.earthdiags import EarthDiags
history_line = (
f"Diagnostic {self.diagnostic} calculated with EarthDiagnostics "
f"version {EarthDiags.version}"
)
self._add_history_line(history_line)
def add_cmorization_history(self):
from earthdiagnostics.earthdiags import EarthDiags
history_line = "CMORized with Earthdiagnostics version {0}".format(
EarthDiags.version
)
self._add_history_line(history_line)
def _add_history_line(self, history_line):
utc_datetime = "UTC " + datetime.utcnow().isoformat()
history_line = "{0}: {1};".format(utc_datetime, history_line)
try:
history_line = history_line + handler.history
except AttributeError:
history_line = history_line
handler.history = Utils.convert_to_ascii_if_possible(history_line)
handler.close()
class UnitConversion(object):
"""
Class to manage unit conversions
Parameters
----------
source: str
destiny: str
factor: float
offset: float
"""
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
with open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)), "conversions.csv"
),
"r",
) as csvfile:
reader = csv.reader(csvfile, dialect="excel")
cls.add_conversion(
UnitConversion(line[0], line[1], line[2], line[3])
)
@classmethod
def add_conversion(cls, conversion):
"""
:param conversion: conversion to add
:type conversion: UnitConversion
cls._dict_conversions[
(conversion.source, conversion.destiny)
] = conversion
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Get the conversion factor and offset for two units.
The conversion has to be done in the following way:
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if "^" in units[0]:
values = units[0].split("^")
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if "^" in units[0]:
values = units[0].split("^")
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
return None, None
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
return 1 / conversion.factor, -conversion.offset
else:
return None, None
def check_is_in_storage(self):
return
"""Implementation of DataFile for netCDF files"""
"""Get data from remote storage to the local one"""
self.local_status = LocalStatus.DOWNLOADING
if not self.local_file:
self.local_file = TempFile.get()
try:
Utils.copy_file(path, self.local_file, retrials=1)
except Utils.CopyException:
Utils.copy_file(path, self.local_file, retrials=2)
if self.data_convention == "meteofrance":
Log.debug(
"Converting variable names from "
"meteofrance convention"
)
"time_counter": "time",
"time_counter_bounds": "time_bnds",
"tbnds": "bnds",
"nav_lat": "lat",
"nav_lon": "lon",
"x": "i",
"y": "j",
}
Utils.rename_variables(
self.local_file, alt_coord_names, must_exist=False
)
Log.info("File {0} ready!", path)
self.local_status = LocalStatus.READY
return
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error("File {0} not available: {1}", path, ex)
self.local_status = LocalStatus.FAILED
return
Log.error(
"File {0} not available: {1}", self.remote_file, "FileNotFound"
)
self.local_status = LocalStatus.FAILED
def check_is_in_storage(self, update_status=True):
self.storage_status = StorageStatus.READY
"""Create a link from original data to <frequency>_<var_type> folder"""
self.data_convention.create_link(
self.domain,
self.remote_file,
self.frequency,
self.final_name,
self.grid,
True,
self.var_type,
)
Log.error(
"Can not create link to {1}: {0}".format(ex, self.remote_file)
)
try:
if self.local_status == LocalStatus.READY:
self._size = os.path.getsize(self.local_file)
if self.storage_status == StorageStatus.READY:
self._size = os.path.getsize(self.remote_file)