Commit 690e315c authored by Javier Vegas-Regidor's avatar Javier Vegas-Regidor
Browse files

Optimized hash routines. Added option to pass max and min values to climatological percentiles

parent 3c6eb551
......@@ -143,7 +143,7 @@ class DataFile(Publisher):
def upload(self):
self.storage_status = StorageStatus.UPLOADING
try:
Utils.copy_file(self.local_file, self.remote_file)
Utils.copy_file(self.local_file, self.remote_file, save_hash=True)
except Exception as ex:
Log.error('File {0} can not be uploaded: {1}', self.remote_file, ex)
self.storage_status = StorageStatus.FAILED
......@@ -431,6 +431,7 @@ class NetCDFFile(DataFile):
self.local_status = LocalStatus.DOWNLOADING
if not self.local_file:
self.local_file = TempFile.get()
Utils.get_file_hash(self.remote_file, use_stored=True, save=True)
Utils.copy_file(self.remote_file, self.local_file)
Log.info('File {0} ready!', self.remote_file)
self.local_status = LocalStatus.READY
......
......@@ -4,7 +4,7 @@ from bscearth.utils.date import parse_date, add_months
from bscearth.utils.log import Log
from earthdiagnostics.diagnostic import Diagnostic, DiagnosticVariableOption, DiagnosticDomainOption, \
DiagnosticIntOption, DiagnosticListIntOption
DiagnosticIntOption, DiagnosticListIntOption, DiagnosticFloatOption
from earthdiagnostics.frequency import Frequencies
from earthdiagnostics.utils import Utils, TempFile
from earthdiagnostics.variable_type import VariableType
......@@ -14,6 +14,8 @@ import iris.coord_categorisation
from iris.time import PartialDateTime
import iris.exceptions
import iris.coords
import math
class ClimatologicalPercentile(Diagnostic):
......@@ -33,8 +35,8 @@ class ClimatologicalPercentile(Diagnostic):
Percentiles = np.array([0.1, 0.25, 0.33, 0.5, 0.66, 0.75, 0.9])
def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, forecast_month,
experiment_config):
def __init__(self, data_manager, domain, variable, num_bins, start_year, end_year, min_value, max_value,
forecast_month, experiment_config):
Diagnostic.__init__(self, data_manager)
self.variable = variable
self.domain = domain
......@@ -49,14 +51,26 @@ class ClimatologicalPercentile(Diagnostic):
self.end_year = end_year
self.forecast_month = forecast_month
self.cmor_var = data_manager.variable_list.get_variable(variable, silent=True)
if self.cmor_var and self.cmor_var.valid_max and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
if not math.isnan(min_value):
self.min_value = min_value
self.check_min_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.min_value = float(self.cmor_var.valid_min)
self.check_limit_values = False
self.check_min_value = False
else:
self.min_value = None
self.check_min_value = True
if not math.isnan(max_value):
self.max_value = max_value
self.check_max_value = False
elif self.cmor_var and self.cmor_var.valid_min:
self.max_value = float(self.cmor_var.valid_max)
self.check_max_value = False
else:
self.max_value = None
self.check_limit_values = True
self.check_max_value = True
def __eq__(self, other):
return self.domain == other.domain and self.variable == other.variable and self.num_bins == other.num_bins
......@@ -82,6 +96,8 @@ class ClimatologicalPercentile(Diagnostic):
DiagnosticIntOption('end_year'),
DiagnosticListIntOption('forecast_month'),
DiagnosticIntOption('bins', 2000),
DiagnosticFloatOption('min_value', float('nan')),
DiagnosticFloatOption('max_value', float('nan')),
)
options = cls.process_options(options, options_available)
......@@ -89,6 +105,7 @@ class ClimatologicalPercentile(Diagnostic):
for forecast_month in options['forecast_month']:
job_list.append(ClimatologicalPercentile(diags.data_manager, options['domain'], options['variable'],
options['bins'], options['start_year'], options['end_year'],
options['min_value'], options['max_value'],
forecast_month, diags.config.experiment))
return job_list
......@@ -122,7 +139,7 @@ class ClimatologicalPercentile(Diagnostic):
self.units = data_cube.units
self.lat_coord = data_cube.coord('latitude')
self.lon_coord = data_cube.coord('longitude')
Log.info('Range: [{0}, {1}]', self.min_value, self.max_value)
distribution = self._get_distribution()
percentile_values = self._calculate_percentiles(distribution)
self._save_results(percentile_values)
......@@ -229,16 +246,25 @@ class ClimatologicalPercentile(Diagnostic):
Log.warning('Different number of realizations in the data used by diagnostic {0}', self)
def _get_value_interval(self, data_cube):
if not self.check_limit_values:
return
for time_slice in data_cube.slices_over('time'):
file_max = np.amax(time_slice.data)
file_min = np.amin(time_slice.data)
self.max_value = max(self.min_value, file_max)
if self.min_value is None:
self.min_value = file_min
if self.check_min_value:
if self.check_max_value:
for time_slice in data_cube.slices_over('time'):
for value in time_slice.data.flat:
if value < self.min_value:
self.min_value = value
if value > self.max_value:
self.max_value = value
else:
self.min_value = min(self.min_value, file_min)
for time_slice in data_cube.slices_over('time'):
file_min = np.amin(time_slice.data)
if self.min_value is None:
self.min_value = file_min
self.max_value = min(self.min_value, file_min)
else:
if self.check_max_value:
for time_slice in data_cube.slices_over('time'):
file_max = np.amax(time_slice.data)
self.max_value = max(self.min_value, file_max)
def _calculate_distribution(self, data_cube):
def calculate_histogram(time_series):
......
# coding=utf-8
import hashlib
import shutil
import subprocess
import tarfile
import datetime
import netCDF4
import numpy as np
......@@ -20,6 +20,7 @@ from nco import Nco
from earthdiagnostics.constants import Basins
from contextlib import contextmanager
import sys
import xxhash
@contextmanager
......@@ -219,7 +220,7 @@ class Utils(object):
handler.sync()
@staticmethod
def copy_file(source, destiny):
def copy_file(source, destiny, save_hash=False):
"""
Copies a file from source to destiny, creating dirs if necessary
......@@ -238,18 +239,22 @@ class Utils(object):
if not os.path.exists(dirname_path):
raise ex
hash_destiny = None
hash_original = Utils.get_file_hash(source)
Log.debug('Hashing original file... {0}', datetime.datetime.now())
hash_original = Utils.get_file_hash(source, use_stored=True)
retrials = 3
while hash_original != hash_destiny:
if retrials == 0:
raise Exception('Can not copy {0} to {1}'.format(source, destiny))
Log.debug('Copying... {0}', datetime.datetime.now())
shutil.copyfile(source, destiny)
hash_destiny = Utils.get_file_hash(destiny)
Log.debug('Hashing copy ... {0}', datetime.datetime.now())
hash_destiny = Utils.get_file_hash(destiny, save=save_hash)
retrials -= 1
Log.info('Finished {0}', datetime.datetime.now())
@staticmethod
def move_file(source, destiny):
def move_file(source, destiny, save_hash=False):
"""
Moves a file from source to destiny, creating dirs if necessary
......@@ -258,7 +263,7 @@ class Utils(object):
:param destiny: path to destiny
:type destiny: str
"""
Utils.copy_file(source, destiny)
Utils.copy_file(source, destiny, save_hash)
os.remove(source)
@staticmethod
......@@ -292,27 +297,46 @@ class Utils(object):
shutil.rmtree(source)
@staticmethod
def get_file_hash(filepath):
def get_file_hash(filepath, use_stored=False, save=False):
"""
Returns the MD5 hash for the given filepath
Returns the xxHash hash for the given filepath
:param filepath: path to the file to compute hash on
:type filepath:str
:return: file's MD5 hash
:return: file's xxHash hash
:rtype: str
"""
blocksize = 65536
hasher = hashlib.md5()
if use_stored:
hash_file = Utils._get_hash_filename(filepath)
if os.path.isfile(hash_file):
hash_value = open(hash_file, 'r').readline()
return hash_value
blocksize = 104857600
hasher = xxhash.xxh64()
with open(filepath, 'rb') as afile:
buf = afile.read(blocksize)
while len(buf) > 0:
hasher.update(buf)
buf = afile.read(blocksize)
return hasher.hexdigest()
hash_value = hasher.hexdigest()
if save:
hash_file = open(Utils._get_hash_filename(filepath), 'w')
hash_file.write(hash_value)
hash_file.close()
return hash
@staticmethod
def _get_hash_filename(filepath):
dir = os.path.dirname(filepath)
filename = os.path.basename(filepath)
hash_file = os.path.join(dir, '.{0}.xxhash64.hash'.format(filename))
return hash_file
@staticmethod
def execute_shell_command(command, log_level=Log.DEBUG):
"""
Executes a sheel command
Executes a sheel commandsi
:param command: command to execute
Log.info('Detailed time for diagnostic class')
......@@ -729,3 +753,4 @@ class TempFile(object):
if os.path.exists(temp_file):
os.remove(temp_file)
TempFile.files = list()
......@@ -26,7 +26,7 @@ setup(
keywords=['climate', 'weather', 'diagnostic'],
setup_requires=['pyproj'],
install_requires=['numpy', 'netCDF4', 'bscearth.utils', 'cdo', 'nco>=0.0.3', 'iris>=1.12.0', 'coverage',
'pygrib', 'openpyxl', 'mock', 'futures', 'cf_units', 'cfunits'],
'pygrib', 'openpyxl', 'mock', 'futures', 'cf_units', 'cfunits', 'xxhash'],
packages=find_packages(),
include_package_data=True,
scripts=['bin/earthdiags']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment