Newer
Older
"""Module to manage the data conventions supported by EarthDiagnostics"""
import shutil
import re
import threading
from bscearth.utils.date import (
parse_date,
chunk_start_date,
chunk_end_date,
previous_day,
add_hours,
)
from bscearth.utils.log import Log
from earthdiagnostics.frequency import Frequency, Frequencies
from earthdiagnostics.modelingrealm import ModelingRealms
from earthdiagnostics.variable import VariableType
def __init__(self, name, config):
self.config = config
self.name = name
self.lat_name = "lat"
self.lon_name = "lon"
self.time_separator = "-"
self.lock = threading.Lock()
self._checked_vars = list()
"""
Get the final scratch_masks path
Parameters
----------
scratch_masks: str
Returns
-------
str
"""
def get_file_path(
self,
startdate,
member,
domain,
var,
cmor_var,
chunk,
frequency,
grid=None,
year=None,
date_str=None,
):
"""
Return the path to a concrete file
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
chunk: int or None
frequency: Frequency or str
grid: str or None
year: int or None
date_str: str or None
Returns
-------
str
Raises
------
ValueError
If you provide two or more parameters from chunk, year or
date_str or none at all
"""
if frequency is None:
frequency = self.config.frequency
frequency = Frequency.parse(frequency)
folder_path = self.get_cmor_folder_path(
startdate, member, domain, var, frequency, grid, cmor_var
)
file_name = self.get_file_name(
startdate,
member,
domain,
var,
cmor_var,
frequency,
chunk,
year,
date_str,
grid,
)
filepath = os.path.join(folder_path, file_name)
return filepath
def get_file_name(
self,
startdate,
member,
domain,
var,
cmor_var,
frequency,
chunk,
year,
date_str,
grid,
):
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Get filename for a given configuration
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
frequency: Frequency
chunk: int or None
year: int or None
date_str: str or None
grid: str or None
Returns
-------
str
Raises
------
NotImplementedError:
If not implemented by derived classes
"""
def get_cmor_folder_path(
self, startdate, member, domain, var, frequency, grid, cmor_var
):
"""
Get the folder path following current CMOR convention
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
frequency: Frequency
grid: str
cmor_var: Variable
Returns
-------
str
Raises
------
NotImplementedError:
If not implemented by derived classes
"""
raise NotImplementedError
def get_startdate_path(self, startdate):
"""
Return the path to the startdate's CMOR folder
Parameters
----------
startdate: str
Returns
-------
str
return os.path.join(
self.config.data_dir,
self.config.experiment.expid,
"cmorfiles",
self.config.cmor.activity,
self.config.experiment.institute,
self.config.experiment.model,
self.experiment_name(startdate),
)
def experiment_name(self, startdate):
"""
Get experiment name, appending startdate if needed
Parameters
----------
startdate: str
Returns
-------
str
"""
if self.config.cmor.append_startdate:
return "{}S{}".format(
self.config.experiment.experiment_name, startdate
)
else:
return self.config.experiment.experiment_name
def get_member_str(self, member):
"""
Transalate member number to member string
Parameters
----------
member: int
Returns
-------
str
Raises
------
NotImplementedError:
If not implemented by derived classes
"""
def create_links(self, startdate, member=None):
"""
Create links for a given startdate or member
Parameters
----------
startdate: str
member: int or None
"""
if member is not None:
member_str = self.get_member_str(member)
else:
member_str = None
Log.info("Creating links for CMOR files ({0})", startdate)
path = self.get_startdate_path(startdate)
self._link_startdate(path, member_str)
def create_link(
self, domain, filepath, frequency, var, grid, move_old, vartype
):
"""
Create file link
Parameters
----------
domain: ModelingRealm
filepath: str
frequency: Frequency
var: str
grid: str
move_old: bool
vartype: VariableType
"""
freq_str = frequency.folder_name(vartype)
if not grid:
grid = "original"
variable_folder = domain.get_varfolder(
var,
self.config.experiment.ocean_timestep,
self.config.experiment.atmos_timestep,
)
vargrid_folder = domain.get_varfolder(
var,
self.config.experiment.ocean_timestep,
self.config.experiment.atmos_timestep,
grid=grid,
)
self.lock.acquire()
try:
expid = self.config.experiment.expid
if grid == "original":
link_path = os.path.join(
self.config.data_dir, expid, freq_str, variable_folder
)
Utils.create_folder_tree(link_path)
else:
link_path = os.path.join(
self.config.data_dir, expid, freq_str, vargrid_folder
)
default_path = os.path.join(
self.config.data_dir, expid, freq_str, variable_folder
)
original_path = os.path.join(
self.config.data_dir,
expid,
freq_str,
vargrid_folder.replace(
"-{0}_f".format(grid), "-original_f"
),
)
if os.path.islink(default_path):
os.remove(default_path)
elif os.path.isdir(default_path):
shutil.move(default_path, original_path)
os.symlink(link_path, default_path)
if move_old and link_path not in self._checked_vars:
self._checked_vars.append(link_path)
old_path = os.path.join(
self.config.data_dir,
expid,
freq_str,
"old_{0}".format(os.path.basename(link_path)),
)
regex = re.compile(var + "_[0-9]{6,8}[.]nc")
for filename in os.listdir(link_path):
if regex.match(filename):
Utils.create_folder_tree(old_path)
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
try:
os.remove(link_path)
except OSError:
pass
raise ValueError(
"Original file {0} does not exists".format(filepath)
)
relative_path = os.path.relpath(
filepath, os.path.dirname(link_path)
)
try:
os.symlink(relative_path, link_path)
except OSError:
except Exception:
raise
finally:
self.lock.release()
def _get_time_component(self, chunk, date_str, frequency, startdate, year):
if len([x for x in (chunk, date_str, year) if x is not None]) > 1:
raise ValueError(
"Only one of the parameters chunk, year or date_str may be "
"provided"
)
time_bound = self._get_chunk_time_bounds(
startdate, chunk, frequency
)
elif year:
if frequency != Frequencies.yearly:
raise ValueError(
'Year may be provided instead of chunk only if '
'frequency is "yr"'
)
def _get_chunk_time_bounds(self, startdate, chunk, frequency):
chunk_start = chunk_start_date(
start,
chunk,
self.config.experiment.chunk_size,
"month",
self.config.experiment.calendar,
)
chunk_end = chunk_end_date(
chunk_start,
self.config.experiment.chunk_size,
"month",
self.config.experiment.calendar,
)
chunk_end = previous_day(chunk_end, self.config.experiment.calendar)
time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(
chunk_start.year,
chunk_start.month,
chunk_end.year,
chunk_end.month,
self.time_separator,
)
def _check_var_presence(
self, folder, current_count, startdate, member, domain, chunk, freq
):
for var in os.listdir(folder):
cmor_var = self.config.var_manager.get_variable(var, True)
var_path = self.get_file_path(
startdate, member, domain, var, cmor_var, chunk, frequency=freq
)
for path in (var_path, var_path.replace('/original_files/', '/')):
if os.path.isfile(path):
current_count += 1
if current_count >= self.config.cmor.min_cmorized_vars:
break
return current_count
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a given chunk is cmorized for a given domain
Parameters
----------
startdate: str
member: str
chunk: int
domain: ModelingRealm
Returns
-------
bool
Raises
------
NotImplementedError:
If not implemented by the derived classes
"""
class Cmor2Convention(DataConvention):
def get_file_name(
self,
startdate,
member,
domain,
var,
cmor_var,
frequency,
chunk,
year,
date_str,
grid,
):
"""
Get filename for a given configuration
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
frequency: Frequency
chunk: int or None
year: int or None
date_str: str or None
grid: str or None
Returns
-------
str
"""
cmor_table = domain.get_table(
frequency, self.config.data_convention
)
cmor_table = cmor_var.get_table(
frequency, self.config.data_convention
)
time_bound = self._get_time_component(
chunk, date_str, frequency, startdate, year
)
time_bound = "_{0}.nc".format(time_bound)
file_name = "{0}_{1}_{2}_{3}_S{4}_{5}{6}".format(
var,
cmor_table.name,
self.config.experiment.model,
self.experiment_name(startdate),
startdate,
self.get_member_str(member),
time_bound,
)
def get_cmor_folder_path(
self, startdate, member, domain, var, frequency, grid, cmor_var
):
folder_path = os.path.join(
self.get_startdate_path(startdate),
str(frequency),
domain.name,
var,
)
if grid and grid != "original":
folder_path = os.path.join(folder_path, grid)
folder_path = os.path.join(folder_path, self.get_member_str(member))
if self.config.cmor.version:
folder_path = os.path.join(folder_path, self.config.cmor.version)
return folder_path
def get_member_str(self, member):
"""
Transalate member number to member string
Parameters
----------
member: int
Returns
-------
str
Raises
------
NotImplementedError:
If not implemented by derived classes
"""
template = "r{0}i{1}p1"
return template.format(
member + 1 - self.config.experiment.member_count_start,
self.config.cmor.initialization_number,
)
def _link_startdate(self, path, member_str):
Log.debug("Creating links for frequency {0}", freq)
frequency = Frequency.parse(freq)
for domain in os.listdir(os.path.join(path, freq)):
Log.debug("Creating links for domain {0}", domain)
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(
os.path.join(path, freq, domain, var)
):
if member_str is not None and member_str != member:
continue
for name in os.listdir(
os.path.join(path, freq, domain, var, member)
):
filepath = os.path.join(
path, freq, domain, var, member, name
)
self.create_link(
ModelingRealms.parse(domain),
filepath,
frequency,
var,
"",
False,
vartype=VariableType.MEAN,
)
else:
for filename in os.listdir(filepath):
self.create_link(
ModelingRealms.parse(domain),
os.path.join(filepath, filename),
frequency,
var,
"",
False,
vartype=VariableType.MEAN,
)
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a given chunk is cmorized for a given domain
Parameters
----------
startdate: str
member: str
chunk: int
domain: ModelingRealm
Returns
-------
bool
"""
startdate_path = self.get_startdate_path(startdate)
if not os.path.isdir(startdate_path):
return False
count = 0
for freq in os.listdir(startdate_path):
domain_path = os.path.join(startdate_path, freq, domain.name)
if os.path.isdir(domain_path):
count = self._check_var_presence(
domain_path, count, startdate, member, domain, chunk, freq
)
if count >= self.config.cmor.min_cmorized_vars:
return True
return False
"""
Return the path to the startdate's CMOR folder
Parameters
----------
startdate: str
Returns
-------
str
"""
return os.path.join(
self.config.data_dir,
self.config.experiment.expid,
"cmorfiles",
self.config.experiment.institute,
self.config.experiment.model,
self.experiment_name(startdate),
"S" + startdate,
)
class PrefaceConvention(Cmor2Convention):
Parameters
----------
name: str
config: Config
"""
def __init__(self, name, config):
super(PrefaceConvention, self).__init__(name, config)
def get_startdate_path(self, startdate):
"""
Return the path to the startdate's CMOR folder
Parameters
----------
startdate: str
Returns
-------
str
"""
return os.path.join(
self.config.data_dir,
self.config.experiment.expid,
"cmorfiles",
self.config.experiment.institute,
self.experiment_name(startdate),
"S" + startdate,
)
class Cmor3Convention(DataConvention):
"""
Base class for CMOR3-based conventions
Parameters
----------
name: str
config: Config
"""
def __init__(self, name, config):
super(Cmor3Convention, self).__init__(name, config)
self.lat_name = "latitude"
self.lon_name = "longitude"
def get_scratch_masks(self, scratch_masks):
"""
Get the final scratch_masks path
Adds a folder matching the convention name to the configured path
Parameters
----------
scratch_masks: str
Returns
-------
str
"""
def get_file_name(
self,
startdate,
member,
domain,
var,
cmor_var,
frequency,
chunk,
year,
date_str,
grid,
):
"""
Get filename for a given configuration
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
frequency: Frequency
chunk: int or None
year: int or None
date_str: str or None
grid: str or None
Returns
-------
str
"""
cmor_table = domain.get_table(
frequency, self.config.data_convention
)
cmor_table = cmor_var.get_table(
frequency, self.config.data_convention
)
time_bound = self._get_time_component(
chunk, date_str, frequency, startdate, year
)
time_bound = "_{0}.nc".format(time_bound)
if domain in [
ModelingRealms.ocnBgchem,
ModelingRealms.seaIce,
ModelingRealms.ocean,
]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
if self.config.cmor.append_startdate:
if self.config.cmor.append_startdate_year_only:
startdate = startdate[0:4]
subexp_id = ""
file_name = (
f"{var}_{cmor_table.name}_{self.config.experiment.model}"
f"_{self.experiment_name(startdate)}_{subexp_id}"
f"{self.get_member_str(member)}_{grid}{time_bound}"
)
def _get_chunk_time_bounds(self, startdate, chunk, frequency):
start = parse_date(startdate)
chunk_start = chunk_start_date(
start,
chunk,
self.config.experiment.chunk_size,
"month",
self.config.experiment.calendar,
)
chunk_end = chunk_end_date(
chunk_start,
self.config.experiment.chunk_size,
"month",
self.config.experiment.calendar,
)
if frequency == Frequencies.yearly:
chunk_end = previous_day(
chunk_end, self.config.experiment.calendar
)
time_bound = "{0:04}{2}{1:04}".format(
chunk_start.year,
chunk_end.year,
self.time_separator,
)
elif frequency == Frequencies.monthly:
chunk_end = previous_day(
chunk_end, self.config.experiment.calendar
)
time_bound = "{0:04}{1:02}{4}{2:04}{3:02}".format(
chunk_start.year,
chunk_start.month,
chunk_end.year,
chunk_end.month,
self.time_separator,
)
chunk_end = previous_day(
chunk_end, self.config.experiment.calendar
)
time_bound = (
"{0.year:04}{0.month:02}{0.day:02}{2}"
"{1.year:04}{1.month:02}{1.day:02}".format(
chunk_start, chunk_end, self.time_separator
)
)
elif frequency.frequency.endswith("hr"):
chunk_end = add_hours(
chunk_end,
-int(frequency.frequency[:-2]),
self.config.experiment.calendar,
)
time_bound = (
"{0.year:04}{0.month:02}{0.day:02}{0.hour:02}{0.minute:02}{2}"
"{1.year:04}{1.month:02}{1.day:02}{1.hour:02}"
"{1.minute:02}".format(
chunk_start, chunk_end, self.time_separator
)
)
def get_cmor_folder_path(
self, startdate, member, domain, var, frequency, grid, cmor_var
):
raise ValueError(
"CMOR version is mandatory for PRIMAVERA and CMIP6"
)
if domain in [
ModelingRealms.ocnBgchem,
ModelingRealms.seaIce,
ModelingRealms.ocean,
]:
grid = self.config.cmor.default_ocean_grid
else:
grid = self.config.cmor.default_atmos_grid
if cmor_var is None:
table_name = domain.get_table(
frequency, self.config.data_convention
).name
table_name = cmor_var.get_table(
frequency, self.config.data_convention
).name
folder_path = os.path.join(
self.get_startdate_path(startdate),
self.get_member_str(member),
table_name,
var,
grid,
self.config.cmor.version,
)
if self.config.cmor.version == "latest":
versions = os.listdir(os.path.dirname(folder_path))
versions.sort(reverse=True)
self.config.cmor.version = versions[0]
folder_path = folder_path.replace('/latest/', f'/{versions[0]}/')
def _link_startdate(self, path, member_str):
for member in os.listdir(path):
for table in os.listdir(os.path.join(path, member)):
frequency = self.config.var_manager.tables[table].frequency
for var in os.listdir(os.path.join(path, member, table)):
cmor_var = self.config.var_manager.get_variable(
var, silent=True
)
domain = cmor_var.domain
domain = self.config.var_manager.tables[table].domain
for grid in os.listdir(
os.path.join(path, member, table, var)
):
if member_str is not None and member_str != member:
continue
for name in os.listdir(
os.path.join(path, member, table, var, grid)
):
filepath = os.path.join(
path, member, table, var, grid, name
)
self.create_link(
domain,
filepath,
frequency,
var,
"",
False,
vartype=VariableType.MEAN,
)
else:
for filename in os.listdir(filepath):
cmorfile = os.path.join(filepath, filename)
self.create_link(
domain,
cmorfile,
frequency,
var,
"",
False,
vartype=VariableType.MEAN,
)
def experiment_name(self, startdate):
"""
Get experiment name.
CMOR3 ignores append startdates as thery are added to the member str
Parameters
----------
startdate: str
Returns
-------
str
"""
return self.config.experiment.experiment_name
"""
Transalate member number to member string
Parameters
----------
member: int
Returns
-------
str
"""
template = "r{0}i{1}p1f1"
return template.format(
member + 1 - self.config.experiment.member_count_start,
self.config.cmor.initialization_number,
)
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a given chunk is cmorized for a given domain
Parameters
----------
startdate: str
member: str
chunk: int
domain: ModelingRealm
Returns
-------
bool
"""
startdate_path = self.get_startdate_path(startdate)
if not os.path.isdir(startdate_path):
startdate_path = startdate_path.replace('/original_files/', '/')
if not os.path.isdir(startdate_path):
return False
count = 0
member_path = os.path.join(startdate_path, self.get_member_str(member))
if not os.path.isdir(member_path):
return False
freq = Frequencies.monthly
table = domain.get_table(freq, self.config.data_convention)
table_dir = os.path.join(member_path, table.name)
if not os.path.isdir(table_dir):
return False
count = self._check_var_presence(
table_dir, count, startdate, member, domain, chunk, freq
)
if count >= self.config.cmor.min_cmorized_vars:
return True
return False
class CMIP6Convention(Cmor3Convention):
class PrimaveraConvention(Cmor3Convention):
class MeteoFranceConvention(DataConvention):
def get_file_name(
self,
startdate,
member,
domain,
var,
cmor_var,
frequency,
chunk,
year,
date_str,
grid,
):
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
"""
Get filename for a given configuration
Parameters
----------
startdate: str
member: int
domain: ModelingRealm
var: str
cmor_var: Variable
frequency: Frequency
chunk: int or None
year: int or None
date_str: str or None
grid: str or None
Returns
-------
str
"""
raise ValueError("Year not supported with MeteoFrance convention")
raise ValueError(
"Date_str not supported with MeteoFrance convention"
)
raise ValueError(
"Chunk must be provided in MeteoFrance convention"
)
time_bound = self._get_chunk_time_bounds(startdate, chunk, frequency)
file_name = "{0}_{1}_{2}_{3}.nc".format(
var, frequency, time_bound, self.get_member_str(member)
)
def get_cmor_folder_path(
self, startdate, member, domain, var, frequency, grid, cmor_var
):
folder_path = os.path.join(
self.config.data_dir,
self.experiment_name(startdate),
"H{0}".format(chr(64 + int(startdate[4:6]))),
startdate[0:4],
)
return folder_path
def get_member_str(self, member):
"""
Transalate member number to member string
Parameters
----------
member: int
Returns
-------
str
"""
def _get_chunk_time_bounds(self, startdate, chunk, frequency):
chunk_start = chunk_start_date(
start,
chunk,
self.config.experiment.chunk_size,
"month",
self.config.experiment.calendar,
)
time_bound = "{0:04}{1:02}".format(chunk_start.year, chunk_start.month)
return time_bound
def create_link(
self, domain, filepath, frequency, var, grid, move_old, vartype
):
"""
Create file link
In this convention, it does nothing
Parameters
----------
domain: ModelingRealm
filepath: str
frequency: Frequency
var: str
grid: str
move_old: bool
vartype: VariableType
"""
pass
def create_links(self, startdate, member=None):
"""
Create links for a given startdate or member
In this convention, it does nothing
Parameters
----------
startdate: str
member: int or None
"""
pass
def is_cmorized(self, startdate, member, chunk, domain):
"""
Check if a given chunk is cmorized for a given domain
Parameters
----------
startdate: str
member: str
chunk: int
domain: ModelingRealm
Returns
-------
bool
Raises
------
NotImplementedError:
If not implemented by the derived classes
"""