Commit 2a6378d7 authored by sloosvel's avatar sloosvel
Browse files

Merge branch 'master' into 'production'

Merge branch master into production to update the module

See merge request !80
parents 88b04246 564d660d
......@@ -56,16 +56,16 @@ master_doc = 'index'
# General information about the project.
project = u'Earth Diagnostics'
copyright = u'2018, BSC-CNS Earth Sciences Department'
copyright = u'2019, BSC-CNS Earth Sciences Department'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.source ~/vi
#
# The short X.Y version.
version = '3.1'
version = '3.2'
# The full version, including alpha/beta/rc tags.
release = '3.1.1'
release = '3.2.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
......
......@@ -584,22 +584,22 @@ Options:
2. Variable:
Variable to average
3. Grid_point:
NEMO grid point used to store the variable: T, U, V ...
4. Basin = Global:
3. Basin = Global:
Basin to compute
5. Save 3d = True:
If True, it also stores the average per level
4. Grid_point:
NEMO grid point used to store the variable: T, U, V ...
6. Min depth:
5. Min depth:
Minimum depth to compute in levels. If -1, average from the surface
7. Max depth:
6. Max depth:
Maximum depth to compute in levels. If -1, average to the bottom
7. Save 3d = True:
If True, it also stores the average per level
8. Variance = False:
If True, it also stores the variance
......@@ -607,6 +607,10 @@ Options:
Source grid to choose. By default this is the original data, but sometimes you will want to use another
(for example, the 'rotated' one produced by the rotation diagnostic)
10. Original frequency:
Original frequency to use
rotate
~~~~~~
......
......@@ -46,10 +46,8 @@ class Config(object):
"Custom mesh mask file to use"
self.new_mask_glo = None
"Custom new mask glo file to use"
self.mask_regions = None
"Custom mask regions file to use"
self.mask_regions_3d = None
"Custom mask regions 3D file to use"
self.basins = None
"Custom basins file to use"
self.data_convention = None
"Data convention to use"
self.var_manager = None
......
......@@ -153,20 +153,15 @@ class Basins(object):
self.__initiated = False
def get_available_basins(self, handler):
def get_available_basins(self, cube):
"""
Read available basins from file
:param handler:
:type handler: netCDF4.Dataset
"""
basin_names = handler.variables.keys()
ignored_names = ('lat', 'latitude', 'lon', 'longitude', 'i', 'j', 'time', 'lev')
for basin in basin_names:
if basin in ignored_names:
continue
for basin in cube.coord('region').points:
basin_object = Basin(basin)
setattr(self, basin, basin_object)
self._add_alias(basin, basin_object)
......
......@@ -264,15 +264,18 @@ class DataFile(Publisher):
def upload(self):
"""Send a loal file to the storage"""
self.storage_status = StorageStatus.UPLOADING
remote_file = self.remote_file
try:
Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
if '/cmorfiles/' in remote_file:
remote_file = remote_file.replace('/cmorfiles/', '/diags/')
Utils.copy_file(self.local_file, remote_file, save_hash=True)
except (OSError, Exception) as ex:
Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
Log.error('File {0} can not be uploaded: {1}', remote_file, ex)
self.storage_status = StorageStatus.FAILED
return
Log.info('File {0} uploaded!', self.remote_file)
self.create_link()
# self.create_link()
self.storage_status = StorageStatus.READY
def set_local_file(self, local_file, diagnostic=None, rename_var='', region=None):
......@@ -643,46 +646,54 @@ class NetCDFFile(DataFile):
def download(self):
"""Get data from remote storage to the local one"""
try:
self.local_status = LocalStatus.DOWNLOADING
Log.debug('Downloading file {0}...', self.remote_file)
if not self.local_file:
self.local_file = TempFile.get()
# Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
try:
Utils.copy_file(self.remote_file, self.local_file, retrials=1)
except Utils.CopyException:
# Utils.get_file_hash(self.remote_file, use_stored=False, save=True)
Utils.copy_file(self.remote_file, self.local_file, retrials=2)
if self.data_convention == 'meteofrance':
Log.debug('Converting variable names from meteofrance convention')
alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i',
'y': 'j'}
Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
except Exception as ex:
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', self.remote_file, ex)
self.local_status = LocalStatus.FAILED
def check_is_in_storage(self):
if os.path.isfile(self.remote_file):
if self.region:
for path in (self.remote_file.replace('/cmorfiles/', '/diags/'), self.remote_file):
if os.path.isfile(path):
try:
cubes = iris.load(self.remote_file)
self._check_regions(cubes)
except iris.exceptions.TranslationError as ex:
# If the check goes wrong, we must execute everything
os.remove(self.remote_file)
self.local_status = LocalStatus.DOWNLOADING
Log.debug('Downloading file {0}...', path)
if not self.local_file:
self.local_file = TempFile.get()
# Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
try:
Utils.copy_file(path, self.local_file, retrials=1)
except Utils.CopyException:
# Utils.get_file_hash(self.remote_file, use_stored=False, save=True)
Utils.copy_file(path, self.local_file, retrials=2)
if self.data_convention == 'meteofrance':
Log.debug('Converting variable names from meteofrance convention')
alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i',
'y': 'j'}
Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False)
Log.info('File {0} ready!', path)
self.local_status = LocalStatus.READY
return
except Exception as ex:
Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex)
else:
self.storage_status = StorageStatus.READY
if os.path.isfile(self.local_file):
os.remove(self.local_file)
Log.error('File {0} not available: {1}', path, ex)
self.local_status = LocalStatus.FAILED
return
Log.error('File {0} not available: {1}', self.remote_file, 'FileNotFound')
self.local_status = LocalStatus.FAILED
def check_is_in_storage(self):
for path in (self.remote_file, self.remote_file.replace('/cmorfiles/', '/diags/')):
if os.path.isfile(path):
if self.region:
try:
cubes = iris.load(path)
self._check_regions(cubes)
except iris.exceptions.TranslationError as ex:
# If the check goes wrong, we must execute everything
os.remove(path)
except Exception as ex:
Log.debug('Exception when checking file {0}: {1}', path, ex)
else:
self.storage_status = StorageStatus.READY
return
def _check_regions(self, cubes):
for cube in cubes:
......
......@@ -3,7 +3,6 @@
"""Entry point for EarthDiagnostics"""
import argparse
import os
import time
import sys
import shutil
import tempfile
......@@ -11,6 +10,8 @@ from datetime import datetime
import netCDF4
import pkg_resources
import iris
import bscearth.utils.path
from bscearth.utils.log import Log
......@@ -66,7 +67,8 @@ class EarthDiags(object):
-------
"""
Log.info('Initialising Earth Diagnostics Version {0}', EarthDiags.version)
Log.info(
'Initialising Earth Diagnostics Version {0}', EarthDiags.version)
self.config.parse(config_file)
os.environ['HDF5_USE_FILE_LOCKING'] = 'FALSE'
TempFile.scratch_folder = self.config.scratch_dir
......@@ -84,7 +86,8 @@ class EarthDiags(object):
For more detailed documentation, use -h option
"""
# try:
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser = argparse.ArgumentParser(
description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version=EarthDiags.version,
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('--doc', action='store_true',
......@@ -104,7 +107,8 @@ class EarthDiags(object):
parser.add_argument('-log', '--logfilepath', default=None, type=str)
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
parser.add_argument('-f', '--configfile',
default='diags.conf', type=str)
args = parser.parse_args(args)
if args.doc:
......@@ -142,7 +146,8 @@ class EarthDiags(object):
True if successful
"""
Log.info('Opening documentation...')
doc_path = os.path.join('http://earthdiagnostics.readthedocs.io/en/latest')
doc_path = os.path.join(
'http://earthdiagnostics.readthedocs.io/en/latest')
Utils.execute_shell_command(('xdg-open', doc_path))
Log.result('Documentation opened!')
return True
......@@ -192,22 +197,20 @@ class EarthDiags(object):
return result
def _initialize_basins(self):
self._read_basins_from_file('mask_regions.nc')
self._read_basins_from_file('mask_regions.3d.nc')
self._read_basins_from_file('basins.nc')
@staticmethod
def _read_basins_from_file(filename):
if not os.path.isfile(filename):
return
handler = Utils.open_cdf(filename)
Basins().get_available_basins(handler)
handler.close()
Basins().get_available_basins(iris.load_cube(filename))
def _prepare_scratch_dir(self):
if self.config.use_ramdisk:
self._remove_scratch_dir()
tempfile.mkdtemp(dir='/dev/shm')
os.symlink(tempfile.mkdtemp(dir='/dev/shm'), self.config.scratch_dir)
os.symlink(tempfile.mkdtemp(dir='/dev/shm'),
self.config.scratch_dir)
else:
if not os.path.exists(self.config.scratch_dir):
os.makedirs(self.config.scratch_dir)
......@@ -240,12 +243,14 @@ class EarthDiags(object):
if os.path.islink(self.config.scratch_dir):
# time.sleep(4)
# shutil.rmtree(os.path.realpath(self.config.scratch_dir))
Utils.execute_shell_command('rm -r {0}'.format(os.path.realpath(self.config.scratch_dir)))
Utils.execute_shell_command(
'rm -r {0}'.format(os.path.realpath(self.config.scratch_dir)))
os.remove(self.config.scratch_dir)
elif os.path.isdir(self.config.scratch_dir):
# time.sleep(4)
# shutil.rmtree(self.config.scratch_dir)
Utils.execute_shell_command('rm -r {0}'.format(self.config.scratch_dir))
Utils.execute_shell_command(
'rm -r {0}'.format(self.config.scratch_dir))
def report(self):
"""
......@@ -286,9 +291,11 @@ class EarthDiags(object):
if not self.data_manager.file_exists(var.domain, var.short_name, startdate, member, 1,
frequency=table.frequency):
results.append((var, table, priority))
Log.debug('Variable {0.short_name} not found in {1.name}', var, table)
Log.debug(
'Variable {0.short_name} not found in {1.name}', var, table)
else:
Log.result('Variable {0.short_name} found in {1.name}', var, table)
Log.result(
'Variable {0.short_name} found in {1.name}', var, table)
return results
......@@ -297,7 +304,8 @@ class EarthDiags(object):
tables = set([result[1].name for result in results])
for table in tables:
file_handler = open('{0}.{1}'.format(report_path, table), 'w')
table_results = [result for result in results if result[1].name == table]
table_results = [
result for result in results if result[1].name == table]
file_handler.write('\nTable {0}\n'.format(table))
file_handler.write('===================================\n')
......@@ -305,13 +313,17 @@ class EarthDiags(object):
priorities = set([result[2] for result in table_results])
priorities = sorted(priorities)
for priority in priorities:
priority_results = [result[0] for result in table_results if result[2] == priority]
priority_results = sorted(priority_results, key=lambda v: v.short_name)
file_handler.write('\nMissing variables with priority {0}:\n'.format(priority))
priority_results = [result[0]
for result in table_results if result[2] == priority]
priority_results = sorted(
priority_results, key=lambda v: v.short_name)
file_handler.write(
'\nMissing variables with priority {0}:\n'.format(priority))
file_handler.write('--------------------------------------\n')
for var in priority_results:
file_handler.write('{0:12}: {1}\n'.format(var.short_name, var.standard_name))
file_handler.write('{0:12}: {1}\n'.format(
var.short_name, var.standard_name))
file_handler.flush()
file_handler.close()
......@@ -327,8 +339,7 @@ class EarthDiags(object):
mesh_mask = 'mesh_mask_nemo.{0}.nc'.format(model_version)
new_mask_glo = 'new_maskglo.{0}.nc'.format(model_version)
mask_regions = 'mask.regions.{0}.nc'.format(model_version)
mask_regions_3d = 'mask.regions.3d.{0}.nc'.format(model_version)
basins = 'basins.{0}.nc'.format(model_version)
if self.config.mesh_mask:
mesh_mask_path = self.config.mesh_mask
......@@ -340,64 +351,60 @@ class EarthDiags(object):
else:
new_mask_glo_path = os.path.join(con_files, new_mask_glo)
if self.config.mask_regions:
mask_regions_path = self.config.mask_regions
else:
mask_regions_path = os.path.join(con_files, mask_regions)
if self.config.mask_regions_3d:
mask_regions_3d_path = self.config.mask_regions_3d
if self.config.basins:
basins_path = self.config.basins
else:
mask_regions_3d_path = os.path.join(con_files, mask_regions_3d)
basins_path = os.path.join(con_files, basins)
if self.config.scratch_masks:
self._prepare_mesh_using_scratch(mask_regions, mask_regions_3d, mask_regions_3d_path, mask_regions_path,
mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path,
restore_meshes)
self._prepare_mesh_using_scratch(
basins, basins_path,
mesh_mask, mesh_mask_path,
new_mask_glo, new_mask_glo_path,
restore_meshes
)
else:
self._copy_file(mesh_mask_path, 'mesh_hgr.nc', restore_meshes)
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
self._copy_file(new_mask_glo_path, 'new_maskglo.nc',
restore_meshes)
self._copy_file(mask_regions_path,
'mask_regions.nc', restore_meshes)
self._copy_file(mask_regions_3d_path,
'mask_regions.3d.nc', restore_meshes)
self._copy_file(basins_path, 'basins.nc', restore_meshes)
Log.result('Mesh files ready!')
def _prepare_mesh_using_scratch(self, mask_regions, mask_regions_3d, mask_regions_3d_path, mask_regions_path,
mesh_mask, mesh_mask_path, new_mask_glo, new_mask_glo_path,
def _prepare_mesh_using_scratch(self, basins, basins_path,
mesh_mask, mesh_mask_path,
new_mask_glo, new_mask_glo_path,
restore_meshes):
Utils.create_folder_tree(self.config.scratch_masks)
Utils.give_group_write_permissions(self.config.scratch_masks)
mesh_mask_scratch_path = os.path.join(self.config.scratch_masks, mesh_mask)
mesh_mask_scratch_path = os.path.join(
self.config.scratch_masks, mesh_mask)
if self._copy_file(mesh_mask_path, mesh_mask_scratch_path,
restore_meshes):
Utils.give_group_write_permissions(mesh_mask_scratch_path)
self._link_file(mesh_mask_scratch_path, 'mesh_hgr.nc')
self._link_file(mesh_mask_scratch_path, 'mesh_zgr.nc')
self._link_file(mesh_mask_scratch_path, 'mask.nc')
new_maskglo_scratch_path = os.path.join(self.config.scratch_masks, new_mask_glo)
new_maskglo_scratch_path = os.path.join(
self.config.scratch_masks, new_mask_glo)
if self._copy_file(new_mask_glo_path,
new_maskglo_scratch_path, restore_meshes):
Utils.give_group_write_permissions(new_maskglo_scratch_path)
self._link_file(new_maskglo_scratch_path, 'new_maskglo.nc')
mask_regions_scratch_path = os.path.join(self.config.scratch_masks, mask_regions)
if self._copy_file(mask_regions_path,
mask_regions_scratch_path, restore_meshes):
Utils.give_group_write_permissions(mask_regions_scratch_path)
self._link_file(mask_regions_scratch_path, 'mask_regions.nc')
mask_regions3d_scratch_path = os.path.join(self.config.scratch_masks, mask_regions_3d)
if self._copy_file(mask_regions_3d_path,
mask_regions3d_scratch_path, restore_meshes):
Utils.give_group_write_permissions(mask_regions3d_scratch_path)
self._link_file(mask_regions3d_scratch_path, 'mask_regions.3d.nc')
basins_scratch_path = os.path.join(
self.config.scratch_masks, basins)
if self._copy_file(basins_path, basins_scratch_path, restore_meshes):
Utils.give_group_write_permissions(basins_path)
self._link_file(basins_scratch_path, 'basins.nc')
def _copy_file(self, source, destiny, force):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
Log.user_warning('File {0} is not available for {1}',
destiny, self.config.experiment.model_version)
Log.debug('Looking for it in {0}', source)
return False
......@@ -417,7 +424,8 @@ class EarthDiags(object):
def _link_file(self, source, destiny):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.config.experiment.model_version)
Log.user_warning('File {0} is not available for {1}',
destiny, self.config.experiment.model_version)
return
if os.path.lexists(destiny):
......
......@@ -47,7 +47,7 @@ class HeatContentLayer(Diagnostic):
"Diagnostic alias for the configuration file"
def __init__(self, data_manager, startdate, member, chunk, box, areas,
weight, layers, basins, data_convention):
weight, layers, basins, data_convention, min_level, max_level):
Diagnostic.__init__(self, data_manager)
self.startdate = startdate
self.member = member
......@@ -57,9 +57,9 @@ class HeatContentLayer(Diagnostic):
self.weight = weight
self.layers = layers
self.basins = basins
self.required_vars = ['so', 'mlotst']
self.generated_vars = ['scvertsum']
self.data_convention = data_convention
self.min_level = min_level
self.max_level = max_level
def __str__(self):
return 'Heat content layer Startdate: {0} Member: {1} Chunk: {2} Box: {3}'.format(self.startdate, self.member,
......@@ -108,6 +108,8 @@ class HeatContentLayer(Diagnostic):
mask = mesh.get_landsea_mask()
depth = mesh.get_depth(cell_point='W')
weight = ohc.get_weights(layers, mask, e3t, depth)
max_level, min_level = cls._get_used_levels(weight)
weight[0] = weight[0][:, min_level:max_level,:, :]
del mask, depth, e3t
......@@ -115,7 +117,7 @@ class HeatContentLayer(Diagnostic):
job_list.append(HeatContentLayer(
diags.data_manager, startdate, member, chunk, box,
areas, weight, layers, basins,
diags.config.data_convention
diags.config.data_convention, min_level, max_level
))
return job_list
......@@ -161,6 +163,18 @@ class HeatContentLayer(Diagnostic):
self.startdate, self.member,
self.chunk, box=self.box)
@classmethod
def _get_used_levels(cls, weight):
# Now we will reduce to the levels with any weight != 0 to avoid loading too much data on memory
levels = weight[0].shape[1]
min_level = 0
while min_level < levels and not weight[0][:, min_level, :].any():
min_level += 1
max_level = min_level
while max_level < (levels - 1) and weight[0][:, max_level + 1, :].any():
max_level += 1
return max_level, min_level
def compute(self):
"""Run the diagnostic"""
thetao_file = TempFile.get()
......@@ -171,23 +185,30 @@ class HeatContentLayer(Diagnostic):
handler = Utils.open_cdf(thetao_file)
Utils.convert_units(handler.variables['thetao'], 'K')
heatc_sl, heatc_sl1D = ohc.compute(self.layers, self.weight,
handler.variables['thetao'][:],
handler.variables['thetao'][:,self.min_level:self.max_level,:,:],
self.areas)
handler.sync()
handler.renameVariable('thetao', 'heatc_sl')
results = TempFile.get()
handler_results = Utils.open_cdf(results, 'w')
lat_name = next(alias for alias in ('lat', 'latitude')
if alias in handler.variables.keys())
lon_name = next(alias for alias in ('lon', 'longitude')
if alias in handler.variables.keys())
Utils.copy_variable(handler, handler_results, 'time', True, True)
Utils.copy_variable(handler, handler_results, 'i', True, True)
Utils.copy_variable(handler, handler_results, 'j', True, True)
# Utils.rename_variables(results, {'x': 'i', 'y': 'j'}, False)
Utils.copy_variable(handler, handler_results, 'i', False, True)
Utils.copy_variable(handler, handler_results, 'j', False, True)
Utils.copy_variable(handler, handler_results, lat_name, True, True)
Utils.copy_variable(handler, handler_results, lon_name, True, True)
var = handler_results.createVariable('heatc', float,
('time', 'j', 'i'),
fill_value=1.e20)
var.units = 'J m-2'
var.units = 'J m-2'''
var.coordinates = ' '.join((lat_name, lon_name))
handler_results.sync()
handler_results.variables['heatc'][:] = heatc_sl[0] # temporary fix, needs to loop over layers
# temporary fix, needs to loop over layers
handler_results.variables['heatc'][:] = heatc_sl[0]
handler_results.close()
results1D = TempFile.get()
......@@ -195,8 +216,10 @@ class HeatContentLayer(Diagnostic):
Utils.copy_variable(handler, handler_results1D, 'time', True, True)
handler_results1D.createDimension('region', len(self.basins))
handler_results1D.createDimension('region_length', 50)
var_region = handler_results1D.createVariable('region', 'S1', ('region', 'region_length'))
var_ohc1D = handler_results1D.createVariable('heatcsum', float, ('time', 'region',),)
var_region = handler_results1D.createVariable(
'region', 'S1', ('region', 'region_length'))
var_ohc1D = handler_results1D.createVariable(
'heatcsum', float, ('time', 'region',),)
handler_results1D.sync()
for i, basin in enumerate(self.basins):
var_region[i, ...] = netCDF4.stringtoarr(basin.name, 50)
......@@ -206,3 +229,4 @@ class HeatContentLayer(Diagnostic):
Utils.setminmax(results, 'heatc')
self.heatc.set_local_file(results)
self.heatcsum.set_local_file(results1D)
......@@ -14,7 +14,8 @@ from earthdiagnostics.box import Box
from earthdiagnostics.constants import Basins
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticOption, \