Commit 24be5636 authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Cleaned code

parent ecd86a04
......@@ -27,7 +27,7 @@ CDFTOOLS_PATH = ~jvegas/CDFTOOLS/bin
# If true, copies the mesh files regardless of presence in scratch dir
RESTORE_MESHES = False
# Limits the maximum amount of threads used. Default: 0 (no limitation, one per virtual core available)z
MAX_CORES = 1
#MAX_CORES = 1
[CMOR]
# If true, recreates CMOR files regardless of presence. Default = False
......@@ -88,8 +88,8 @@ OCEAN_TIMESTEP = 6
# CHUNK_SIZE is the size of each data file, given in months
# CHUNKS is the number of chunks. You can specify less chunks than present on the experiment
EXPID = testing_erainterim
# STARTDATES = 19931101 19941101 19951101 19961101 19971101 19981101 19991101 20001101 20011101 20021101 20031101
STARTDATES = 19840101 19850101
STARTDATES = 19801101 19811101 19821101 19831101 19841101 19851101 19861101 19871101 19881101 19891101 19701101 19711101 19721101 19731101 19741101 19751101 19761101 19771101 19781101 19791101
# STARTDATES = 19840101 19850101
MEMBERS = 0
MEMBER_DIGITS = 1
CHUNK_SIZE = 1
......
......@@ -28,7 +28,7 @@ class CDFTools(object):
:param output: output file. Not all tools support this parameter
:type options: str
:param options: options for the tool.
:type options: str | [str] | Tuple[str]
:type options: str | [str] | Tuple[str] | NoneType
:param log_level: log level at which the output of the cdftool command will be added
:type log_level: int
:param input_option: option to add before input file
......
......@@ -47,6 +47,7 @@ class CMORManager(DataManager):
raise Exception('Can not find model data')
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
# noinspection PyUnusedLocal
def file_exists(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None,
vartype=VariableType.MEAN, possible_versions=None):
cmor_var = self.variable_list.get_variable(var)
......@@ -386,7 +387,6 @@ class CMORManager(DataManager):
self._dic_cmorized[identifier][domain] = self._is_cmorized(startdate, member, chunk, domain)
return self._dic_cmorized[identifier][domain]
def _is_cmorized(self, startdate, member, chunk, domain):
startdate_path = self._get_startdate_path(startdate)
if not os.path.isdir(startdate_path):
......
......@@ -204,7 +204,7 @@ class DataFile(Publisher):
Utils.nco.ncatted(input=self.local_file, output=self.local_file,
options=('-O -a _FillValue,{0},o,{1},"1.e20" '
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(self.final_name, var_type.char,
valid_min, valid_max),))
valid_min, valid_max),))
def _fix_coordinate_variables_metadata(self, handler):
if 'lev' in handler.variables:
......
......@@ -118,7 +118,7 @@ class DataManager(object):
Utils.create_folder_tree(os.path.dirname(link_path))
relative_path = os.path.relpath(filepath, os.path.dirname(link_path))
os.symlink(relative_path, link_path)
except:
except Exception:
raise
finally:
self.lock.release()
......@@ -183,6 +183,8 @@ class DataManager(object):
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: Frequency|NoneType
:return: path to the copy created on the scratch folder
:param vartype: Variable type (mean, statistic)
:type vartype: VariableType
:rtype: str
"""
raise NotImplementedError('Class must override request_chunk method')
......
......@@ -109,4 +109,4 @@ class Module(Diagnostic):
component_u.close()
component_v.close()
self.module_file.set_local_file(temp, rename_var=self.componentu)
\ No newline at end of file
self.module_file.set_local_file(temp, rename_var=self.componentu)
......@@ -3,7 +3,6 @@ import iris
import iris.analysis
import iris.exceptions
from diagnostic import DiagnosticOption
from earthdiagnostics.box import Box
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticFloatOption, DiagnosticDomainOption, \
DiagnosticVariableOption
......
......@@ -226,6 +226,7 @@ class ObsReconManager(DataManager):
Log.debug('{0} requested', filepath)
return self._get_file_from_storage(filepath)
# noinspection PyUnusedLocal
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
......
......@@ -95,7 +95,7 @@ class Interpolate(Diagnostic):
for startdate, member, chunk in diags.config.experiment.get_chunk_list():
job_list.append(
Interpolate(diags.data_manager, startdate, member, chunk,
options['domain'], var , options['target_grid'],
options['domain'], var, options['target_grid'],
diags.config.experiment.model_version, options['invert_lat'], options['original_grid']))
return job_list
......@@ -174,7 +174,7 @@ class Interpolate(Diagnostic):
shutil.copy(input_file, temp)
weights_file = '/esnas/autosubmit/con_files/weigths/{0}/rmp_{0}_to_{1}_lev{2}.nc'.format(self.model_version,
self.grid, lev + 1)
self.grid, lev + 1)
if not os.path.isfile(weights_file):
raise Exception('Level {0} weights file does not exist for model {1} '
'and grid {2}'.format(lev+1, self.model_version, self.grid))
......
......@@ -71,7 +71,7 @@ class InterpolateCDO(Diagnostic):
'Mask ocean: {0.mask_oceans} Model: {0.model_version}'.format(self)
@classmethod
def generate_jobs(cls, diags, options ):
def generate_jobs(cls, diags, options):
"""
Creates a job for each chunk to compute the diagnostic
......
......@@ -35,10 +35,11 @@ class Publisher(object):
:param args: arguments to pass
"""
for subscriber, callback in self._subscribers.items():
# noinspection PyCallingNonCallable
callback(*args)
@property
def suscribers(self, *args):
def suscribers(self):
"""
List of suscribers of this publisher
"""
......
# coding=utf-8
import six
from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
DiagnosticIntOption, DiagnosticListIntOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.utils import TempFile
from earthdiagnostics.variable_type import VariableType
import numpy as np
import iris
import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
import math
class ClimatologicalPercentile(Diagnostic):
......@@ -49,7 +46,7 @@ class ClimatologicalPercentile(Diagnostic):
self.leadtime_files = {}
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and \
return self.domain == other.domain and self.variable == other.variable and \
self.start_year == other.start_year and self.end_year == other.end_year and \
self.forecast_month == other.forecast_month
......@@ -140,11 +137,11 @@ class ClimatologicalPercentile(Diagnostic):
for leadtime_slice in self.distribution.slices_over('leadtime'):
leadtime = leadtime_slice.coord('leadtime').points[0]
percentiles[leadtime]=np.apply_along_axis(calculate, 0, leadtime_slice.data)
percentiles[leadtime] = np.apply_along_axis(calculate, 0, leadtime_slice.data)
return percentiles
def _get_distribution(self):
for startdate, startdate_file in self.leadtime_files.iteritems():
for startdate, startdate_file in six.iteritems(self.leadtime_files):
Log.info('Getting data for startdate {0}', startdate)
data_cube = iris.load_cube(startdate_file.local_file)
if self.distribution is None:
......
......@@ -21,12 +21,6 @@ class DaysOverPercentile(Diagnostic):
:param data_manager: data management object
:type data_manager: DataManager
:param startdate: startdate
:type startdate: str
:param member: member number
:type member: int
:param chunk: chunk's number
:type chunk: int
:param variable: variable to average
:type variable: str
"""
......@@ -94,11 +88,13 @@ class DaysOverPercentile(Diagnostic):
self.days_over_file = {}
self.days_below_file = {}
for perc in ClimatologicalPercentile.Percentiles:
self.days_over_file[perc] = self.declare_chunk(self.domain, var_over.format(int(perc * 100)), self.startdate, None,
self.days_over_file[perc] = self.declare_chunk(self.domain, var_over.format(int(perc * 100)),
self.startdate, None,
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
self.days_below_file[perc] = self.declare_chunk(self.domain, var_below.format(int(perc * 100)), self.startdate, None,
self.days_below_file[perc] = self.declare_chunk(self.domain, var_below.format(int(perc * 100)),
self.startdate, None,
None, frequency=Frequencies.monthly,
vartype=VariableType.STATISTIC)
......@@ -119,6 +115,7 @@ class DaysOverPercentile(Diagnostic):
leadtimes = {1: PartialDateTime(lead_date.year, lead_date.month, lead_date.day)}
def assign_leadtime(coord, x):
# noinspection PyBroadException
try:
leadtime_month = 1
partial_date = leadtimes[leadtime_month]
......@@ -140,8 +137,8 @@ class DaysOverPercentile(Diagnostic):
realization_coord = var.coord('realization')
except iris.exceptions.CoordinateNotFoundError:
realization_coord = None
lat_coord = var.coord('latitude')
lon_coord = var.coord('longitude')
self.lat_coord = var.coord('latitude')
self.lon_coord = var.coord('longitude')
results_over = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
results_below = {perc: iris.cube.CubeList() for perc in ClimatologicalPercentile.Percentiles}
......@@ -154,7 +151,7 @@ class DaysOverPercentile(Diagnostic):
for leadtime in leadtimes.keys():
leadtime_slice = var.extract(iris.Constraint(leadtime=leadtime))
if len(percentiles.coords('leadtime')) >0:
if len(percentiles.coords('leadtime')) > 0:
percentiles_leadtime = percentiles.extract(iris.Constraint(leadtime=leadtime))
else:
percentiles_leadtime = percentiles
......@@ -166,13 +163,15 @@ class DaysOverPercentile(Diagnostic):
for percentile_slice in percentiles_leadtime.slices_over('percentile'):
percentile = percentile_slice.coord('percentile').points[0]
# noinspection PyTypeChecker
days_over = np.sum(leadtime_slice.data > percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_over, lat_coord, lon_coord, percentile, realization_coord,
result = self.create_results_cube(days_over, percentile, realization_coord,
time_coord, var_daysover, long_name_days_over)
results_over[percentile].append(result)
# noinspection PyTypeChecker
days_below = np.sum(leadtime_slice.data < percentile_slice.data, 0) / float(timesteps)
result = self.create_results_cube(days_below, lat_coord, lon_coord, percentile, realization_coord,
result = self.create_results_cube(days_below, percentile, realization_coord,
time_coord, var_days_below, long_name_days_below)
results_below[percentile].append(result)
......@@ -190,16 +189,16 @@ class DaysOverPercentile(Diagnostic):
must_exist=False, rename_dimension=True)
self.days_below_file[perc].set_local_file(temp, rename_var='daysbelow')
def create_results_cube(self, days_over, lat_coord, lon_coord, percentile, realization_coord, time_coord,
def create_results_cube(self, days_over, percentile, realization_coord, time_coord,
var_name, long_name):
result = iris.cube.Cube(days_over.astype(np.float32), var_name=var_name, long_name=long_name, units=1.0)
if realization_coord is not None:
result.add_aux_coord(realization_coord, 0)
result.add_dim_coord(lat_coord, 1)
result.add_dim_coord(lon_coord, 2)
result.add_dim_coord(self.lat_coord, 1)
result.add_dim_coord(self.lon_coord, 2)
else:
result.add_dim_coord(lat_coord, 0)
result.add_dim_coord(lon_coord, 1)
result.add_dim_coord(self.lat_coord, 0)
result.add_dim_coord(self.lon_coord, 1)
result.add_aux_coord(iris.coords.AuxCoord(percentile, long_name='percentile'))
result.add_aux_coord(time_coord)
return result
......
......@@ -68,7 +68,7 @@ class Discretize(Diagnostic):
self.process = psutil.Process()
def print_memory_used(self):
Log.user_warning('Memory: {0:.2f} GB'.format(self.process.memory_info().rss / 1024.0**3))
Log.debug('Memory: {0:.2f} GB'.format(self.process.memory_info().rss / 1024.0**3))
@property
def bins(self):
......@@ -190,7 +190,8 @@ class Discretize(Diagnostic):
leadtime_cube.add_dim_coord(bins_coord, 0)
leadtime_cube.add_dim_coord(self.data_cube.coord('latitude'), 1)
leadtime_cube.add_dim_coord(self.data_cube.coord('longitude'), 2)
leadtime_cube.add_aux_coord(iris.coords.AuxCoord(np.array((leadtime,), np.int8), var_name='leadtime', units='months'))
leadtime_cube.add_aux_coord(iris.coords.AuxCoord(np.array((leadtime,), np.int8), var_name='leadtime',
units='months'))
cubes.append(leadtime_cube)
temp = TempFile.get()
iris.FUTURE.netcdf_no_unlimited = True
......@@ -218,6 +219,7 @@ class Discretize(Diagnostic):
else:
self.distribution[leadtime] += self._calculate_distribution(realization_cube)
# noinspection PyTypeChecker
def _get_value_interval(self):
if self.check_min_value or self.check_max_value:
Log.debug('Calculating max and min values...')
......
......@@ -140,6 +140,7 @@ class THREDDSManager(DataManager):
var_folder)
return folder_path
# noinspection PyUnusedLocal
def get_year(self, domain, var, startdate, member, year, grid=None, box=None, vartype=VariableType.MEAN):
"""
Ge a file containing all the data for one year for one variable
......@@ -270,6 +271,7 @@ class THREDDSManager(DataManager):
self.requested_files[file_path] = thredds_subset
return thredds_subset
# noinspection PyUnusedLocal
def declare_chunk(self, domain, var, startdate, member, chunk, grid=None, region=None, box=None, frequency=None,
vartype=VariableType.MEAN, diagnostic=None):
"""
......@@ -368,6 +370,7 @@ class THREDDSSubset(DataFile):
Log.error('Can not retrieve {0} from server: {1}'.format(self, ex))
self.local_status = LocalStatus.FAILED
# noinspection PyUnusedLocal,PyMethodMayBeStatic
def _correct_cube(self, cube, field, filename):
if not cube.coords('time'):
return
......
......@@ -34,6 +34,10 @@ def suppress_stdout():
sys.stdout = old_stdout
class File(object):
pass
class Utils(object):
"""
Container class for miscellaneous utility methods
......@@ -224,6 +228,8 @@ class Utils(object):
"""
Copies a file from source to destiny, creating dirs if necessary
:param save_hash: if True, stores hash value in a file
:type save_hash: bool
:param source: path to source
:type source: str
:param destiny: path to destiny
......@@ -262,6 +268,8 @@ class Utils(object):
:type source: str
:param destiny: path to destiny
:type destiny: str
:param save_hash: if True, stores hash value in a file
:type save_hash: bool
"""
Utils.copy_file(source, destiny, save_hash)
os.remove(source)
......@@ -302,6 +310,10 @@ class Utils(object):
Returns the xxHash hash for the given filepath
:param filepath: path to the file to compute hash on
:type filepath:str
:param use_stored: if True, try to read the hash value from file
:type use_stored: bool
:param save: if True, stores hash value in a file
:type save: bool
:return: file's xxHash hash
:rtype: str
"""
......@@ -328,9 +340,9 @@ class Utils(object):
@staticmethod
def _get_hash_filename(filepath):
dir = os.path.dirname(filepath)
folder = os.path.dirname(filepath)
filename = os.path.basename(filepath)
hash_file = os.path.join(dir, '.{0}.xxhash64.hash'.format(filename))
hash_file = os.path.join(folder, '.{0}.xxhash64.hash'.format(filename))
return hash_file
@staticmethod
......@@ -615,7 +627,7 @@ class Utils(object):
# noinspection PyBroadException
try:
os.makedirs(path)
except:
except Exception:
# Here we can have a race condition. Let's check again for existence and rethrow if still not exists
if not os.path.isdir(path):
raise
......
......@@ -3,6 +3,7 @@ import datetime
import operator
from bscearth.utils.log import Log
# noinspection PyCompatibility
from concurrent.futures import ThreadPoolExecutor
from earthdiagnostics.datafile import StorageStatus, LocalStatus
......
#!/usr/bin/env bash
#SBATCH -n 1
#SBATCH -n 4
#SBATCH --time 7-00:00:00
#SBATCH --error=job.%J.err
#SBATCH --output=job.%J.out
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment