From 99647fb2b1d5a16b76ae471ff49de2b1acd56a87 Mon Sep 17 00:00:00 2001 From: sloosvel Date: Wed, 8 Jan 2020 13:06:25 +0100 Subject: [PATCH 1/2] Do not require i, j vars to be present --- earthdiagnostics/ocean/heatcontentlayer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/earthdiagnostics/ocean/heatcontentlayer.py b/earthdiagnostics/ocean/heatcontentlayer.py index b5093e83..1f64bcfc 100644 --- a/earthdiagnostics/ocean/heatcontentlayer.py +++ b/earthdiagnostics/ocean/heatcontentlayer.py @@ -197,8 +197,8 @@ class HeatContentLayer(Diagnostic): lon_name = next(alias for alias in ('lon', 'longitude') if alias in handler.variables.keys()) Utils.copy_variable(handler, handler_results, 'time', True, True) - Utils.copy_variable(handler, handler_results, 'i', True, True) - Utils.copy_variable(handler, handler_results, 'j', True, True) + Utils.copy_variable(handler, handler_results, 'i', False, True) + Utils.copy_variable(handler, handler_results, 'j', False, True) Utils.copy_variable(handler, handler_results, lat_name, True, True) Utils.copy_variable(handler, handler_results, lon_name, True, True) var = handler_results.createVariable('heatc', float, -- GitLab From a9ae76d794c4abffa9e15e096fc00eff6a44474d Mon Sep 17 00:00:00 2001 From: sloosvel Date: Wed, 8 Jan 2020 13:08:39 +0100 Subject: [PATCH 2/2] Check presence of files in diags and cmorfiles --- earthdiagnostics/datafile.py | 93 +++++++++++++++++--------------- earthdiagnostics/work_manager.py | 2 +- src/mixdiags | 1 + 3 files changed, 53 insertions(+), 43 deletions(-) create mode 160000 src/mixdiags diff --git a/earthdiagnostics/datafile.py b/earthdiagnostics/datafile.py index 95503447..4d02204c 100644 --- a/earthdiagnostics/datafile.py +++ b/earthdiagnostics/datafile.py @@ -214,8 +214,6 @@ class DataFile(Publisher): def to_storage(cls, remote_file, data_convention): """Create a new datafile object for a file that is going to be generated and stored""" new_object = cls() - if 'cmorfiles' in remote_file: - remote_file = remote_file.replace('cmorfiles', 'diags') new_object.remote_file = remote_file new_object.storage_status = StorageStatus.PENDING new_object.data_convention = data_convention @@ -266,10 +264,13 @@ class DataFile(Publisher): def upload(self): """Send a loal file to the storage""" self.storage_status = StorageStatus.UPLOADING + remote_file = self.remote_file try: - Utils.copy_file(self.local_file, self.remote_file, save_hash=True) + if '/cmorfiles/' in remote_file: + remote_file = remote_file.replace('/cmorfiles/', '/diags/') + Utils.copy_file(self.local_file, remote_file, save_hash=True) except (OSError, Exception) as ex: - Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex) + Log.error('File {0} can not be uploaded: {1}', remote_file, ex) self.storage_status = StorageStatus.FAILED return @@ -645,46 +646,54 @@ class NetCDFFile(DataFile): def download(self): """Get data from remote storage to the local one""" - try: - self.local_status = LocalStatus.DOWNLOADING - Log.debug('Downloading file {0}...', self.remote_file) - if not self.local_file: - self.local_file = TempFile.get() - # Utils.get_file_hash(self.remote_file, use_stored=True, save=True) - try: - Utils.copy_file(self.remote_file, self.local_file, retrials=1) - except Utils.CopyException: - # Utils.get_file_hash(self.remote_file, use_stored=False, save=True) - Utils.copy_file(self.remote_file, self.local_file, retrials=2) - - if self.data_convention == 'meteofrance': - Log.debug('Converting variable names from meteofrance convention') - alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds', - 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', - 'y': 'j'} - Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False) - Log.info('File {0} ready!', self.remote_file) - self.local_status = LocalStatus.READY - - except Exception as ex: - if os.path.isfile(self.local_file): - os.remove(self.local_file) - Log.error('File {0} not available: {1}', self.remote_file, ex) - self.local_status = LocalStatus.FAILED - - def check_is_in_storage(self): - if os.path.isfile(self.remote_file): - if self.region: + for path in (self.remote_file.replace('/cmorfiles/', '/diags/'), self.remote_file): + if os.path.isfile(path): try: - cubes = iris.load(self.remote_file) - self._check_regions(cubes) - except iris.exceptions.TranslationError as ex: - # If the check goes wrong, we must execute everything - os.remove(self.remote_file) + self.local_status = LocalStatus.DOWNLOADING + Log.debug('Downloading file {0}...', path) + if not self.local_file: + self.local_file = TempFile.get() + # Utils.get_file_hash(self.remote_file, use_stored=True, save=True) + try: + Utils.copy_file(path, self.local_file, retrials=1) + except Utils.CopyException: + # Utils.get_file_hash(self.remote_file, use_stored=False, save=True) + Utils.copy_file(path, self.local_file, retrials=2) + + if self.data_convention == 'meteofrance': + Log.debug('Converting variable names from meteofrance convention') + alt_coord_names = {'time_counter': 'time', 'time_counter_bounds': 'time_bnds', + 'tbnds': 'bnds', 'nav_lat': 'lat', 'nav_lon': 'lon', 'x': 'i', + 'y': 'j'} + Utils.rename_variables(self.local_file, alt_coord_names, must_exist=False) + Log.info('File {0} ready!', path) + self.local_status = LocalStatus.READY + return + except Exception as ex: - Log.debug('Exception when checking file {0}: {1}', self.remote_file, ex) - else: - self.storage_status = StorageStatus.READY + if os.path.isfile(self.local_file): + os.remove(self.local_file) + Log.error('File {0} not available: {1}', path, ex) + self.local_status = LocalStatus.FAILED + return + Log.error('File {0} not available: {1}', self.remote_file, 'FileNotFound') + self.local_status = LocalStatus.FAILED + + def check_is_in_storage(self): + for path in (self.remote_file, self.remote_file.replace('/cmorfiles/', '/diags/')): + if os.path.isfile(path): + if self.region: + try: + cubes = iris.load(path) + self._check_regions(cubes) + except iris.exceptions.TranslationError as ex: + # If the check goes wrong, we must execute everything + os.remove(path) + except Exception as ex: + Log.debug('Exception when checking file {0}: {1}', path, ex) + else: + self.storage_status = StorageStatus.READY + return def _check_regions(self, cubes): for cube in cubes: diff --git a/earthdiagnostics/work_manager.py b/earthdiagnostics/work_manager.py index 018dd601..c3d2ccd9 100644 --- a/earthdiagnostics/work_manager.py +++ b/earthdiagnostics/work_manager.py @@ -97,7 +97,7 @@ class WorkManager(object): self.lock = threading.Lock() self.lock.acquire() - for job in self.jobs[DiagnosticStatus.WAITING]: + for job in self.jobs[DiagnosticStatus.WAITING].copy(): job.request_data() job.declare_data_generated() job.subscribe(self, self._job_status_changed) diff --git a/src/mixdiags b/src/mixdiags new file mode 160000 index 00000000..19997970 --- /dev/null +++ b/src/mixdiags @@ -0,0 +1 @@ +Subproject commit 199979700e38d3918a82bd2052855d46375e48ab -- GitLab