Commit d00c3cd8 authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Merge branch 'master' into 'production'

Master

See merge request !43
parents cfce2bb4 b9d52989
*.pyc
.idea/*
doc/build/*
test/report/*
*.err
*.out
*.nc
.coverage
htmlcov
\ No newline at end of file
htmlcov
.pytest_cache
prof
before_script:
- export GIT_SSL_NO_VERIFY=1
- git submodule sync --recursive
- git submodule update --init --recursive
- export PATH="$HOME/miniconda2/bin:$PATH"
stages:
- prepare
- test
- report
- clean
cache:
paths:
- test/report
prepare:
stage: prepare
script:
- conda update conda
test_python2:
stage: test
script:
- conda env update -f environment.yml -n earthdiagnostics2 python=2
- source activate earthdiagnostics
- coverage run -m unittest discover
- git submodule sync --recursive
- git submodule update --init --recursive
- conda env update -f environment.yml -n earthdiagnostics2 python=2.7
- source activate earthdiagnostics2
- python run_test.py
test_python3:
stage: test
script:
- git submodule sync --recursive
- git submodule update --init --recursive
- conda env update -f environment.yml -n earthdiagnostics3 python=3.6
- source activate earthdiagnostics3
- python run_test.py
report_codacy:
stage: report
script:
- source activate earthdiagnostics3
- pip install codacy-coverage --upgrade
- python-codacy-coverage -r test/report/python3/coverage.xml
clean:
stage: clean
script:
- conda env update -f environment.yml -n earthdiagnostics3 python=3
- source activate earthdiagnostics
- coverage run -m unittest discover
- coverage xml
- python-codacy-coverage -r coverage.xml
- conda clean --all --yes
......@@ -18,7 +18,7 @@ def main():
"""
Entry point for the Earth Diagnostics
"""
if not EarthDiags.parse_args():
if not EarthDiags.parse_args(sys.argv[1:]):
os._exit(1)
os._exit(0)
......
[DIAGNOSTICS]
# Data adaptor type: CMOR (for our experiments), THREDDS (for other experiments)
DATA_ADAPTOR = CMOR
DATA_ADAPTOR = OBSRECON
# Path to the folder where you want to create the temporary files
SCRATCH_DIR = /scratch/Earth/$USER
# Root path for the cmorized data to use
DATA_DIR = /esnas:/esarchive
DATA_DIR = /esarchive
# Specify if your data is from an experiment (exp), observation (obs) or reconstructions (recon)
DATA_TYPE = exp
DATA_TYPE = recon
# CMORization type to use. Important also for THREDDS as it affects variable name conventions.
# Options: SPECS (default), PRIMAVERA, CMIP6
DATA_CONVENTION = PRIMAVERA
DATA_CONVENTION = SPECS
# Path to NEMO's mask and grid files needed for CDFTools
CON_FILES = /esnas/autosubmit/con_files/
# Diagnostics to run, space separated. You must provide for each one the name and the parameters (comma separated) or
# an alias defined in the ALIAS section (see more below). If you are using the diagnostics just to CMORize, leave it
# empty
DIAGS = regmean,ocean,thetao
DIAGS = interpcdo,atmos,prlr,r240x121,bilinear,False,,False
# DIAGS = OHC
# Frequency of the data you want to use by default. Some diagnostics do not use this value: i.e. monmean always stores
# its results at monthly frequency (obvious) and has a parameter to specify input's frequency.
FREQUENCY = mon
FREQUENCY = weekly
# Path to CDFTOOLS binaries
CDFTOOLS_PATH = ~jvegas/CDFTOOLS/bin
# If true, copies the mesh files regardless of presence in scratch dir
......@@ -73,11 +73,11 @@ SERVER_URL = https://earth.bsc.es/thredds
[EXPERIMENT]
# Experiments parameters as defined in CMOR standard
INSTITUTE = EC-Earth-Consortium
MODEL = EC-Earth3-HR
NAME = historical
INSTITUTE = gloh2o
MODEL = mswep
NAME = wekkly_means
# Model version: Available versions
MODEL_VERSION =Ec3.2_O25L75
MODEL_VERSION =
# Atmospheric output timestep in hours
ATMOS_TIMESTEP = 3
# Ocean output timestep in hours
......@@ -91,9 +91,10 @@ OCEAN_TIMESTEP = 3
# if 2, fc00
# CHUNK_SIZE is the size of each data file, given in months
# CHUNKS is the number of chunks. You can specify less chunks than present on the experiment
EXPID = a0n8
STARTDATES = 19900101
MEMBERS = fc0
EXPID = mswep
# STARTDATES = {19970101,20161231,1D}
STARTDATES = 19970322 20010425
MEMBERS = 0
MEMBER_DIGITS = 1
CHUNK_SIZE = 1
CHUNKS = 1
......
......@@ -4,6 +4,8 @@ import glob
import os
import shutil
import uuid
import traceback
import pygrib
from datetime import datetime
from bscearth.utils.date import parse_date, chunk_end_date, previous_day, date2str, add_months
......@@ -63,6 +65,11 @@ class Cmorizer(object):
'time_counter_bounds': 'time_bnds',
'tbnds': 'bnds', 'nav_lat': self.lat_name, 'nav_lon': self.lon_name, 'x': 'i', 'y': 'j'}
@property
def path_icm(self):
"""Path to the ICM file"""
return os.path.join(self.config.scratch_dir, 'ICM')
def cmorize_ocean(self):
"""Cmorize ocean files from MMO files"""
if not self.cmor.ocean:
......@@ -119,7 +126,7 @@ class Cmorizer(object):
def _cmorize_nc_files(self):
nc_files = glob.glob(os.path.join(self.cmor_scratch, '*.nc'))
for filename in nc_files:
for filename in self._filter_files(nc_files):
self._cmorize_nc_file(filename)
self._clean_cmor_scratch()
......@@ -156,11 +163,12 @@ class Cmorizer(object):
Log.debug('Moving file {0}', filepath)
shutil.move(filepath, filepath.replace('/backup/', '/'))
zip_files = glob.glob(os.path.join(self.cmor_scratch, '*.gz'))
for zip_file in self._filter_files(zip_files):
try:
Utils.unzip(zip_file)
except Utils.UnzipException as ex:
Log.error('File {0} could not be unzipped: {1}', tarfile, ex)
if zip_files:
for zip_file in self._filter_files(zip_files):
try:
Utils.unzip(zip_file)
except Utils.UnzipException as ex:
Log.error('File {0} could not be unzipped: {1}', tarfile, ex)
def _clean_cmor_scratch(self):
if os.path.exists(self.cmor_scratch):
......@@ -181,7 +189,7 @@ class Cmorizer(object):
Utils.cdo.mergetime(input=gg_files, output=merged_gg)
for filename in sh_files + gg_files:
os.remove(filename)
tar_startdate = tarfile[0:-4].split('_')[5].split('-')
tar_startdate = os.path.basename(tarfile[0:-4]).split('_')[4].split('-')
shutil.move(merged_gg, os.path.join(self.cmor_scratch, 'MMAGG_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)))
shutil.move(merged_sh, os.path.join(self.cmor_scratch, 'MMASH_1m_{0[0]}_{0[1]}.nc'.format(tar_startdate)))
......@@ -216,7 +224,7 @@ class Cmorizer(object):
self._cmorize_nc_files()
Log.result('Atmospheric file {0}/{1} finished'.format(count, len(tar_files)))
except Exception as ex:
Log.error('Could not cmorize atmospheric file {0}: {1}', count, ex)
Log.error('Could not cmorize atmospheric file {0}: {1}\n {2}', count, ex, traceback.format_exc())
count += 1
......@@ -235,7 +243,13 @@ class Cmorizer(object):
for grid in ('SH', 'GG'):
Log.info('Processing {0} variables', grid)
if not os.path.exists(self._get_original_grib_path(chunk_start, grid)):
first_grib = self._get_original_grib_path(chunk_start, grid)
if not os.path.exists(first_grib):
continue
var_list = Utils.cdo.showvar(input=first_grib)[0]
codes = {int(var.replace('var', '')) for var in var_list.split()}
if not codes.intersection(self.config.cmor.get_requested_codes()):
Log.info('No requested variables found in {0}. Skipping...', grid)
continue
self._cmorize_grib_file(chunk_end, chunk_start, grid)
except Exception as ex:
......@@ -256,7 +270,7 @@ class Cmorizer(object):
self._obtain_atmos_timestep(gribfile)
full_file = self._get_monthly_grib(current_date, gribfile, grid)
if not self._unpack_grib(full_file, gribfile, grid):
if not self._unpack_grib(full_file, gribfile, grid, current_date.month):
os.remove(gribfile)
return
......@@ -278,29 +292,29 @@ class Cmorizer(object):
self._merge_and_cmorize_atmos(chunk_start, chunk_end, grid,
'{0}hr'.format(self.atmos_timestep))
def _unpack_grib(self, full_file, gribfile, grid):
def _unpack_grib(self, full_file, gribfile, grid, month):
Log.info('Unpacking... ')
# remap on regular Gauss grid
codes = self.cmor.get_requested_codes()
if 228 in codes:
codes.update(142, 143)
codes.update((142, 143))
codes_str = ','.join([str(code) for code in codes])
try:
if grid == 'SH':
Utils.cdo.splitparam(input='-sp2gpl -selcode,{0} {1} '.format(codes_str, full_file),
Utils.cdo.splitparam(input='-sp2gpl -selmon,{2} -selcode,{0} {1} '.format(codes_str, full_file, month),
output=gribfile + '_',
options='-f nc4')
else:
Utils.cdo.splitparam(input='-selcode,{0} {1}'.format(codes_str, full_file),
Utils.cdo.splitparam(input='-selmon,{2} -selcode,{0} {1}'.format(codes_str, full_file, month),
output=gribfile + '_',
options='-R -f nc4')
# total precipitation (remove negative values)
if 228 in codes:
Utils.cdo.setcode(228,
input='-setmisstoc,0 -setvrange,0,Inf '
'-add {0}_{{142,143}}.128.nc'.format(gribfile),
'-add {0}_142.128.nc {0}_143.128.nc'.format(gribfile),
output='{0}_228.128.nc'.format(gribfile),
options='-f nc4')
return True
......@@ -308,13 +322,14 @@ class Cmorizer(object):
Log.info('No requested codes found in {0} file'.format(grid))
return False
finally:
Utils.remove_file('ICM')
Utils.remove_file(self.path_icm)
def _get_monthly_grib(self, current_date, gribfile, grid):
prev_gribfile = self._get_scratch_grib_path(add_months(current_date, -1, self.experiment.calendar), grid)
if os.path.exists(prev_gribfile):
self._merge_grib_files(current_date, prev_gribfile, gribfile)
full_file = 'ICM'
full_file = self.path_icm
else:
full_file = gribfile
return full_file
......@@ -335,15 +350,10 @@ class Cmorizer(object):
def _get_atmos_timestep(self, gribfile):
Log.info('Getting timestep...')
import pygrib
grib_handler = pygrib.open(gribfile)
dates = set()
try:
while True:
mes = grib_handler.next()
dates.add(mes.analDate)
except StopIteration:
pass
for mes in grib_handler:
dates.add(mes.validDate)
dates = list(dates)
dates.sort()
atmos_timestep = dates[1] - dates[0]
......@@ -359,37 +369,37 @@ class Cmorizer(object):
os.remove(filename)
return
Utils.convert2netcdf4(filename)
# Utils.convert2netcdf4(filename)
frequency = self._get_nc_file_frequency(filename)
Utils.rename_variables(filename, self.alt_coord_names, False, True)
handler = Utils.open_cdf(filename)
Cmorizer._remove_valid_limits(handler)
self._add_common_attributes(handler, frequency)
self._update_time_variables(handler)
handler.sync()
variables = handler.variables.keys()
handler.close()
Log.info('Splitting file {0}', filename)
for variable in handler.variables.keys():
for variable in variables:
if variable in Cmorizer.NON_DATA_VARIABLES:
continue
try:
self.extract_variable(filename, handler, frequency, variable)
self.extract_variable(filename, frequency, variable)
except Exception as ex:
Log.error('Variable {0} can not be cmorized: {1}', variable, ex)
Log.result('File {0} cmorized!', filename)
handler.close()
os.remove(filename)
@staticmethod
def _remove_valid_limits(filename):
handler = Utils.open_cdf(filename)
def _remove_valid_limits(handler):
for variable in handler.variables.keys():
var = handler.variables[variable]
if 'valid_min' in var.ncattrs():
del var.valid_min
if 'valid_max' in var.ncattrs():
del var.valid_max
handler.close()
handler.sync()
def _get_nc_file_frequency(self, filename):
file_parts = os.path.basename(filename).split('_')
......@@ -409,14 +419,13 @@ class Cmorizer(object):
variables = Utils.get_file_variables(filename)
return self.cmor.any_required(variables)
def extract_variable(self, file_path, handler, frequency, variable):
def extract_variable(self, file_path, frequency, variable):
"""
Extract a variable from a file and creates the CMOR file
Parameters
----------
file_path:str
handler: netCDF4.Dataset
frequency: Frequency
variable: str
......@@ -437,7 +446,7 @@ class Cmorizer(object):
Utils.nco.ncks(input=file_path, output=temp, options=('-v {0}'.format(variable),))
self._rename_level_variables(temp, var_cmor)
self._add_coordinate_variables(handler, temp)
self._add_coordinate_variables(file_path, temp)
if alias.basin is None:
region = None
......@@ -504,13 +513,15 @@ class Cmorizer(object):
raise Exception('File {0} start date is not a valid chunk start date'.format(file_path))
return chunk
def _add_coordinate_variables(self, handler, temp):
def _add_coordinate_variables(self, file_path, temp):
handler_cmor = Utils.open_cdf(temp)
handler = Utils.open_cdf(file_path, 'r')
Utils.copy_variable(handler, handler_cmor, self.lon_name, False)
Utils.copy_variable(handler, handler_cmor, self.lat_name, False)
if 'time' in handler_cmor.dimensions.keys():
Utils.copy_variable(handler, handler_cmor, 'leadtime', False)
handler_cmor.close()
handler.close()
@staticmethod
def _rename_level_variables(temp, var_cmor):
......@@ -523,19 +534,18 @@ class Cmorizer(object):
if var_cmor.domain == ModelingRealms.atmos:
Utils.rename_variables(temp, {'depth': 'plev'}, False, True)
@staticmethod
def _merge_grib_files(current_month, prev_gribfile, gribfile):
def _merge_grib_files(self, current_month, prev_gribfile, gribfile):
Log.info('Merging data from different files...')
fd = open('rules_files', 'w')
rules_path = os.path.join(self.config.scratch_dir, 'rules_files')
fd = open(rules_path, 'w')
fd.write('if (dataDate >= {0.year}{0.month:02}01) {{ write ; }}\n'.format(current_month))
fd.close()
# get first timestep for each month from previous file (if possible)
if os.path.exists('ICM'):
os.remove('ICM')
Utils.execute_shell_command('grib_filter -o ICM rules_files '
'{0} {1}'.format(os.path.basename(prev_gribfile),
os.path.basename(gribfile)))
os.remove('rules_files')
if os.path.exists(self.path_icm):
os.remove(self.path_icm)
Utils.execute_shell_command('grib_filter -o {2} {3} '
'{0} {1}'.format(prev_gribfile, gribfile, self.path_icm, rules_path))
os.remove(rules_path)
Utils.remove_file(prev_gribfile)
def _ungrib_vars(self, gribfile, month, frequency):
......@@ -548,21 +558,19 @@ class Cmorizer(object):
continue
new_units = None
cdo_operator = '-selmon,{0}'.format(month)
cdo_operator = ''
cdo_operator = self._get_time_average(cdo_operator, frequency, var_code)
cdo_operator = self._fix_time_shift(cdo_operator, var_code)
cdo_operator, new_units = self._change_units(cdo_operator, new_units, var_code)
levels = self.config.cmor.get_levels(frequency, var_code)
if levels:
cdo_operator = "{0} -sellevel,{1}".format(cdo_operator, levels)
Utils.execute_shell_command('cdo -t ecmwf setreftime,{0} '
'{1} {2}_{3}.128.nc '
'{2}_{3}_{4}.nc'.format(cdo_reftime, cdo_operator,
gribfile, var_code, frequency))
Utils.cdo.setreftime(cdo_reftime,
input='{2} {0}_{1}.128.nc '.format(gribfile, var_code, cdo_operator),
output='{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency),
options='-t ecmwf')
h_var_file = '{0}_{1}_{2}.nc'.format(gribfile, var_code, frequency)
handler = Utils.open_cdf(h_var_file)
......@@ -594,10 +602,10 @@ class Cmorizer(object):
if var_code == 201:
cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
elif var_code == 202:
cdo_operator = "-monmean -daymax {0}".format(cdo_operator)
cdo_operator = "-monmean -daymin {0}".format(cdo_operator)
else:
cdo_operator = "-monmean {0} ".format(cdo_operator)
if frequency == Frequencies.daily:
elif frequency == Frequencies.daily:
if var_code == 201:
cdo_operator = "-daymax {0} ".format(cdo_operator)
elif var_code == 202:
......@@ -622,7 +630,7 @@ class Cmorizer(object):
elif var_code in (144, 182, 205, 228):
# precipitation/evaporation/runoff
new_units = "kg m-2 s-1"
cdo_operator = "-mulc,1000 -divc,{0}".format(self.experiment.atmos_timestep * 3600)
cdo_operator = "-mulc,1000 -divc,{0} {1}".format(self.experiment.atmos_timestep * 3600, cdo_operator)
return cdo_operator, new_units
def _merge_and_cmorize_atmos(self, chunk_start, chunk_end, grid, frequency):
......@@ -665,8 +673,8 @@ class Cmorizer(object):
startdate = parse_date(self.startdate)
leadtime = [datetime(time.year, time.month, time.day, time.hour, time.minute, time.second) - startdate
for time in leadtime]
for lt in range(0, len(leadtime)):
var[lt] = leadtime[lt].days
for lt, lead in enumerate(leadtime):
var[lt] = lead.days
def _add_common_attributes(self, handler, frequency):
cmor = self.config.cmor
......
......@@ -496,8 +496,25 @@ class CMORManager(DataManager):
return
for startdate, member in self.experiment.get_member_list():
if not self._unpack_cmor_files(startdate, member):
self._cmorize_member(startdate, member)
Log.info('Checking data for startdate {0} member {1}', startdate, member)
if not self.config.cmor.force:
cmorized = False
for chunk in range(1, self.experiment.num_chunks + 1):
if not self.config.cmor.chunk_cmorization_requested(chunk):
Log.debug('Skipping chunk {0}', chunk)
continue
if not self.config.cmor.force_untar:
Log.debug('Checking chunk {0}...', chunk)
for domain in (ModelingRealms.atmos, ModelingRealms.ocean, ModelingRealms.seaIce):
if self.is_cmorized(startdate, member, chunk, domain):
Log.debug('Chunk {0} ready', chunk)
continue
if self._unpack_chunk(startdate, member, chunk):
cmorized = True
if cmorized:
Log.info('Startdate {0} member {1} ready', startdate, member)
return
self._cmorize_member(startdate, member)
def is_cmorized(self, startdate, member, chunk, domain):
"""
......@@ -564,34 +581,14 @@ class CMORManager(DataManager):
def _cmorize_member(self, startdate, member):
start_time = datetime.now()
member_str = self.experiment.get_member_str(member)
Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
Log.info('CMORizing startdate {0} member {1}. Starting at {2}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}! Elapsed time: {2}\n\n', startdate, member_str,
datetime.now() - start_time)
def _unpack_cmor_files(self, startdate, member):
if self.config.cmor.force:
return False
cmorized = False
for chunk in range(1, self.experiment.num_chunks + 1):
if not self.config.cmor.force_untar:
if self.is_cmorized(startdate, member, chunk, ModelingRealms.atmos) or \
self.is_cmorized(startdate, member, chunk, ModelingRealms.ocean):
cmorized = True
continue
if self._unpack_chunk(startdate, member, chunk):
cmorized = True
if cmorized:
Log.info('Startdate {0} member {1} ready', startdate, member)
return cmorized
def _unpack_chunk(self, startdate, member, chunk):
if not self.config.cmor.chunk_cmorization_requested(chunk):
return True
filepaths = self._get_transferred_cmor_data_filepaths(startdate, member, chunk, 'tar.gz')
if len(filepaths) > 0:
Log.info('Unzipping cmorized data for {0} {1} {2}...', startdate, member, chunk)
......
......@@ -6,6 +6,7 @@ import six
from bscearth.utils.config_parser import ConfigParser
from bscearth.utils.date import parse_date, chunk_start_date, chunk_end_date, date2str, add_years, add_months, add_days
from bscearth.utils.log import Log
import bscearth.utils.path
from earthdiagnostics import cdftools
from earthdiagnostics.frequency import Frequency, Frequencies
......@@ -109,9 +110,13 @@ class Config(object):
----------
path: str
"""
config_file_path = bscearth.utils.path.expand_path(path)
if not os.path.isfile(config_file_path):
Log.critical('Configuration file {0} can not be found', config_file_path)
raise ValueError('Configuration file {0} can not be found'.format(config_file_path))
parser = ConfigParser()
parser.optionxform = str
parser.read(path)
parser.read(config_file_path)
# Read diags config
self.data_adaptor = parser.get_choice_option('DIAGNOSTICS', 'DATA_ADAPTOR', ('CMOR', 'THREDDS', 'OBSRECON'),
......
......@@ -26,6 +26,12 @@ class Basin(object):
def __str__(self):
return self._name
def __repr__(self):
return str(self)
def __hash__(self):
return hash(str(self))
@property
def name(self):
"""
......
......@@ -6,6 +6,7 @@ import shutil
from datetime import datetime
import iris
import iris.coords
import numpy as np
from bscearth.utils.log import Log
......@@ -238,8 +239,8 @@ class DataFile(Publisher):
self.lon_name = 'longitude'
self.lat_name = 'latitude'
else:
self.lon_name = 'longitude'
self.lat_name = 'latitude'
self.lon_name = 'lon'
self.lat_name = 'lat'