Newer
Older
Javier Vegas-Regidor
committed
#!/usr/bin/env python
Javier Vegas-Regidor
committed
import threading
import netCDF4
Javier Vegas-Regidor
committed
from earthdiagnostics.ocean import Salinity, Circulation, Heat, General, Siasiesiv
from autosubmit.date.chunk_date_lib import *
from parser import Parser
from autosubmit.config.log import Log
import shutil
import os
class Diags:
:param config_file: path to the configuration file
:type config_file: str
def __init__(self, config_file):
self._read_config(config_file)
self.datamanager = DataManager(self.institute, self.model, self.expid, self.data_dir,
self.frequency, self.chunk_size, self.experiment_name)
self.datamanager.add_startdate = self.add_startdate
self.datamanager.add_name = self.add_name
TempFile.scratch_folder = self.scratch_dir
cdftools.path = self.cdftools_path
def _create_dic_variables(self):
self.dic_variables = dict()
self.dic_variables['x'] = 'i'
self.dic_variables['y'] = 'j'
self.dic_variables['z'] = 'lev'
self.dic_variables['nav_lon'] = 'lon'
self.dic_variables['nav_lat'] = 'lat'
self.dic_variables['nav_lev'] = 'lev'
self.dic_variables['time_counter'] = 'time'
self.dic_variables['t'] = 'time'
Log.debug('Using netCDF version {0}', netCDF4.getlibversion())
time = datetime.datetime.now()
Log.info("Starting diagnostics at {0}", time)
if not os.path.exists(self.scratch_dir):
os.makedirs(self.scratch_dir)
os.chdir(self.scratch_dir)
self.datamanager.prepare_CMOR_files(self.startdates, self.members)
# Run diagnostics
Log.info('Running diagnostics')
Javier Vegas-Regidor
committed
list_jobs = list()
Log.info("Running {0}", fulldiag)
Javier Vegas-Regidor
committed
diag_options = fulldiag.split(',')
if diag_options[0] == 'siasiesiv':
list_jobs += Siasiesiv.generate_jobs(self.datamanager, self.startdates, self.members, self.chunks,
diag_options)
continue
for startdate in self.startdates:
for member in self.members:
for chunk in range(1, self.chunks+1):
Javier Vegas-Regidor
committed
self._execute_diagnostic(diag_options, startdate, member, chunk)
numthreads = min(Utils.available_cpu_count(), self.max_cores)
Javier Vegas-Regidor
committed
threads = list()
for numthread in range(0, numthreads):
t = threading.Thread(target=Diags._run_jobs,
args=([list_jobs[numthread:len(list_jobs): numthreads]]))
threads.append(t)
t.start()
for t in threads:
t.join()
Javier Vegas-Regidor
committed
TempFile.clean()
finsih_time = datetime.datetime.now()
Log.result("Diagnostics finished at {0}", finsih_time)
Log.result("Time ellapsed: {0}", finsih_time - time)
Javier Vegas-Regidor
committed
@staticmethod
def _run_jobs(jobs):
for job in jobs:
job.compute()
def _execute_diagnostic(self, diag_options, startdate, member, chunk):
diag = diag_options[0]
if diag == 'vertmeanmeters':
variable = diag_options[1]
depth_min = int(diag_options[2])
depth_max = int(diag_options[3])
depth = '{0}m-{1}m'.format(depth_min, depth_max)
variables = (variable, '{0}mean{1}'.format(variable, depth))
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
General.vertical_mean_meters(input_file, output_file, variable, depth_min, depth_max)
elif diag == 'vertmean':
variable = diag_options[1]
lev_min = int(diag_options[2])
lev_max = int(diag_options[3])
lev = '{0}-{1}'.format(lev_min, lev_max)
variables = (variable, '{0}mean{1}'.format(variable, lev))
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
General.vertical_mean(input_file, output_file, variable, lev_min, lev_max)
elif diag == 'convection':
variables = ('mlotst', 'mlotstsites')
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Circulation.convection_sites(input_file, self.nemo_version, output_file)
elif diag == 'psi':
Javier Vegas-Regidor
committed
variables = ('uo', 'vo', 'vsftbarot')
for [u_file, v_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean',
variables):
Circulation.psi(u_file, v_file, output_file)
elif diag == 'gyres':
Javier Vegas-Regidor
committed
variables = ('vsftbarot', 'vsftbarotgyres')
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Circulation.gyres(str(input_file), self.nemo_version, str(output_file))
elif diag == 'ohc':
basin = diag_options[1]
mixed_layer = int(diag_options[2])
depth_min = int(diag_options[3])
depth_max = int(diag_options[4])
if mixed_layer == 1:
mxl = 'mlotst'
depth = ''
elif mixed_layer == 0:
mxl = ''
depth = '{0}-{1}'.format(depth_min, depth_max)
Javier Vegas-Regidor
committed
variables = ('thetao', 'mlotst', 'ohcsum{0}{1}', 'ohcvmean{0}{1}'.format(mxl, depth))
for [input_file, mlotst_file, ohcsum_file, ohcvmean_file] in self.datamanager.get_files(startdate, member,
chunk, 'ocean',
variables):
Heat.total(input_file, mlotst_file, ohcsum_file, ohcvmean_file, basin, mixed_layer, depth_min, depth_max)
elif diag == 'ohclayer':
depth_min = int(diag_options[1])
depth_max = int(diag_options[2])
depth = '{0}-{1}'.format(depth_min, depth_max)
variables = ('thetao', 'ohc{0}'.format(depth))
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Heat.layer(input_file, output_file, depth_min, depth_max)
Javier Vegas-Regidor
committed
elif diag in ['moc', 'vsftmyz']:
variables = ('vo', 'vsftmyz')
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Circulation.moc(input_file, output_file)
Javier Vegas-Regidor
committed
elif diag in ['mocmax', 'vsftmyzmax']:
Javier Vegas-Regidor
committed
Log.warning('vsftmyzmax requires 4 arguments. Skipping!')
lat_min = int(diag_options[1])
lat_max = int(diag_options[2])
lat = '{0}-{1}'.format(lat_min, lat_max)
depth_min = int(diag_options[3])
depth_max = int(diag_options[4])
depth = '{0}-{1}'.format(depth_min, depth_max)
Javier Vegas-Regidor
committed
variables = ('vsftmyz', 'vsftmyzmax')
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Circulation.max_moc(input_file, lat_min, lat_max,
Javier Vegas-Regidor
committed
output_file.replace('vsftmyzmax', 'vsftmyzmax{0}-{1}'.format(lat, depth)),
Javier Vegas-Regidor
committed
elif diag in ['mocarea', 'vsftmyzarea']:
Javier Vegas-Regidor
committed
Log.warning('vsftmyzarea requires between 5 arguments. Skipping!')
lat_min = int(diag_options[1])
lat_max = int(diag_options[2])
lat = '{0}{1}'.format(Utils.get_cardinal_coordinate(lat_min, True),
Utils.get_cardinal_coordinate(lat_max, True))
depth_min = int(diag_options[3])
depth_max = int(diag_options[4])
depth = '{0}-{1}'.format(depth_min, depth_max)
basin = diag_options[5]
Javier Vegas-Regidor
committed
variables = ('vsftmyz', 'vsftmyz{0}{1}{2}'.format(lat, depth, basin))
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
Circulation.area_moc(input_file, lat_min, lat_max, output_file, depth_min, depth_max, basin)
elif diag == 'mlotsthc':
Javier Vegas-Regidor
committed
variables = ('thetao', 'mlotst', 'ohcvertsummlotst')
for [input_file, mlotst_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean',
variables):
Heat.mixed_layer_content(input_file, mlotst_file, output_file)
elif diag == 'mlotstsc':
Javier Vegas-Regidor
committed
variables = ('so', 'mlotst', 'scvertsummlotst')
for [input_file, mlotst_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean',
variables):
Salinity.mixed_layer_content(input_file, mlotst_file, output_file)
elif diag == 'interp3d':
if len(diag_options) == 3:
domain = diag_options[2]
if domain == 'seaice':
domain = 'seaIce'
else:
domain = 'ocean'
input_files = self.datamanager.get_files(startdate, member, chunk, domain, [variable])
output_files = self.datamanager.get_files(startdate, member, chunk, domain, [variable], 'regular')
Javier Vegas-Regidor
committed
for x in range(0, len(input_files[0])):
General.interpolate(input_files[0][x], output_files[0][x], variable, self.nemo_version)
elif diag == 'cutsection':
variable = diag_options[1]
zonal = diag_options[2] == 'z'
value = int(diag_options[3])
coordinate = Utils.get_cardinal_coordinate(value, zonal)
variables = (variable, '{0}{1}'.format(variable, coordinate))
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables,
'regular'):
General.cut_section(input_file, output_file, variable, zonal, value)
elif diag == 'avgsection':
variable = diag_options[1]
lon_min = int(diag_options[2])
lon_max = int(diag_options[3])
lat_min = int(diag_options[4])
lat_max = int(diag_options[5])
output_name = '{0}{1}{2}{3}{4}'.format(variable, Utils.get_cardinal_coordinate(lon_min, False),
Utils.get_cardinal_coordinate(lon_max, False),
Utils.get_cardinal_coordinate(lat_min, True),
Utils.get_cardinal_coordinate(lat_max, True))
variables = (variable, output_name)
for [input_file, output_file] in self.datamanager.get_files(startdate, member, chunk, 'ocean', variables):
General.avgsection(input_file, output_file, lon_min, lon_max, lat_min, lat_max)
else:
Log.warning('Diagnostic {0} not available', diag)
return
def _get_commands(self):
Log.debug('Preparing command list')
commands = self.diags.split()
for alias, added_commands in self._aliases.items():
if alias in commands:
Log.debug('Changing alias {0} for {1}', alias, ' '.join(added_commands))
commands.remove(alias)
for add_command in added_commands:
commands.append(add_command)
Log.debug('Command list ready ')
return commands
def _prepare_mesh_files(self):
Log.info('Copying mesh files')
self._copy_file(os.path.join(self.con_files, 'mesh_mask_nemo.{0}.nc'.format(self.nemo_version)), 'mesh_hgr.nc')
self._link_file('mesh_hgr.nc', 'mesh_zgr.nc')
self._link_file('mesh_hgr.nc', 'mask.nc')
self._copy_file(os.path.join(self.con_files, 'new_maskglo.{0}.nc'.format(self.nemo_version)), 'new_maskglo.nc')
self._copy_file(os.path.join(self.con_files, 'mask.regions.{0}.nc'.format(self.nemo_version)),
'mask_regions.nc')
self._copy_file(os.path.join(self.con_files, 'mask.regions.3d.{0}.nc'.format(self.nemo_version)),
def _copy_file(self, source, destiny):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)
if os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
Log.info('Creating file {0}', destiny)
shutil.copy(source, destiny)
Log.info('File {0} ready', destiny)
Utils.rename_variables('mesh_hgr.nc', self.dic_variables, False, True)
def _link_file(self, source, destiny):
if not os.path.exists(source):
Log.user_warning('File {0} is not available for {1}', destiny, self.nemo_version)
if os.path.exists(destiny):
if os.stat(source).st_size == os.stat(destiny).st_size:
Log.info('File {0} already exists', destiny)
return
else:
os.remove(destiny)
os.symlink(source, destiny)
Log.info('File {0} ready', destiny)
def _read_config(self, config_file):
self.parser = Parser()
self.parser.optionxform = str
self.parser.read(config_file)
# Read diags config
self.scratch_dir = self.parser.get_option('DIAGNOSTICS', 'SCRATCH_DIR')
self.data_dir = self.parser.get_option('DIAGNOSTICS', 'DATA_DIR')
self.con_files = self.parser.get_option('DIAGNOSTICS', 'CON_FILES')
self.diags = self.parser.get_option('DIAGNOSTICS', 'DIAGS').lower()
self.frequency = self.parser.get_option('DIAGNOSTICS', 'FREQUENCY')
self.cdftools_path = self.parser.get_option('DIAGNOSTICS', 'CDFTOOLS_PATH')
self.max_cores = self.parser.get_int_option('DIAGNOSTICS', 'MAX_CORES', 100000)
# Read experiment config
self.institute = self.parser.get_option('EXPERIMENT', 'INSTITUTE')
self.expid = self.parser.get_option('EXPERIMENT', 'EXPID')
self.experiment_name = self.parser.get_option('EXPERIMENT', 'NAME')
self.members = list()
for member in self.parser.get_option('EXPERIMENT', 'MEMBERS').split():
self.members.append(int(member))
self.startdates = self.parser.get_option('EXPERIMENT', 'STARTDATES').split()
self.chunk_size = self.parser.get_int_option('EXPERIMENT', 'CHUNK_SIZE')
self.chunks = self.parser.get_int_option('EXPERIMENT', 'CHUNKS')
self.model = self.parser.get_option('EXPERIMENT', 'MODEL')
self.nemo_version = self.parser.get_option('EXPERIMENT', 'NEMO_VERSION')
self.add_name = self.parser.get_bool_option('CMOR', 'ADD_NAME')
self.add_startdate = self.parser.get_bool_option('CMOR', 'ADD_STARTDATE')
# Read aliases
self._aliases = dict()
if self.parser.has_section('ALIAS'):
for option in self.parser.options('ALIAS'):
self._aliases[option.lower()] = self.parser.get_option('ALIAS', option).lower().split()
self.scratch_dir = os.path.join(self.scratch_dir, 'diags', self.expid)
if not os.path.exists(self.scratch_dir):
os.makedirs(self.scratch_dir)
os.chdir(self.scratch_dir)
parser = argparse.ArgumentParser(description='Main executable for Earth Diagnostics.')
parser.add_argument('-v', '--version', action='version', version='0.1',
help="returns Earth Diagnostics's version number and exit")
parser.add_argument('-lf', '--logfile', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='DEBUG', type=str,
help="sets file's log level.")
parser.add_argument('-lc', '--logconsole', choices=('EVERYTHING', 'DEBUG', 'INFO', 'RESULT', 'USER_WARNING',
'WARNING', 'ERROR', 'CRITICAL', 'NO_LOG'),
default='INFO', type=str,
help="sets console's log level")
parser.add_argument('-f', '--configfile', default='diags.conf', type=str)
args = parser.parse_args()
Log.set_console_level(args.logconsole)
Log.set_file_level(args.logfile)
diags = Diags(args.configfile)
if __name__ == "__main__":