Newer
Older
Javier Vegas-Regidor
committed
import shutil
import threading
from datetime import datetime
import numpy as np
import os
from autosubmit.date.chunk_date_lib import parse_date, chunk_start_date, chunk_end_date, previous_day
from cfunits import Units
from earthdiagnostics.cmorizer import Cmorizer
from earthdiagnostics.variable import Variable
"""
Class to manage the data repositories
Javier Vegas-Regidor
committed
:param config:
:type config: Config
Javier Vegas-Regidor
committed
def __init__(self, config):
self.config = config
self.experiment = config.experiment
self._checked_vars = list()
Javier Vegas-Regidor
committed
Variable.load_variables()
Javier Vegas-Regidor
committed
self.lock = threading.Lock()
Javier Vegas-Regidor
committed
self.cmor_path = os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles')
def get_file(self, domain, var, startdate, member, chunk, grid=None, box=None, frequency=None):
"""
Copies a given file from the CMOR repository to the scratch folder and returns the path to the scratch's copy
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, box, grid, None, None)
Javier Vegas-Regidor
committed
temp_path = TempFile.get()
shutil.copyfile(filepath, temp_path)
return temp_path
def send_file(self, filetosend, domain, var, startdate, member, chunk=None, grid=None, region=None, box=None,
Javier Vegas-Regidor
committed
rename_var=None, frequency=None, year=None, date_str=None, move_old=False):
"""
Copies a given file to the CMOR repository. It also automatically converts to netCDF 4 if needed and can merge
with already existing ones as needed
Javier Vegas-Regidor
committed
:param move_old: if true, moves files following older conventions that may be found on the links folder
:type move_old: bool
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param rename_var: if exists, the given variable will be renamed to the one given by var
:type rename_var: str
:param filetosend: path to the file to send to the CMOR repository
:type filetosend: str
:param region: specifies the region represented by the file. If it is defined, the data will be appended to the
CMOR repository as a new region in the file or will overwrite if region was already present
:type region: str
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
Javier Vegas-Regidor
committed
cmor_var = Variable.get_variable(var)
Javier Vegas-Regidor
committed
if rename_var and rename_var != var:
Utils.rename_variable(filetosend, rename_var, var)
elif original_var != var:
Utils.rename_variable(filetosend, original_var, var)
if not frequency:
Javier Vegas-Regidor
committed
frequency = self.config.frequency
Javier Vegas-Regidor
committed
domain = DataManager.correct_domain(domain)
filepath = self.get_file_path(startdate, member, domain, cmor_var.short_name, chunk, frequency, box,
grid, year, date_str)
Javier Vegas-Regidor
committed
if region:
self._prepare_region(filepath, filetosend, region, var)
Javier Vegas-Regidor
committed
Javier Vegas-Regidor
committed
temp = TempFile.get()
Utils.execute_shell_command(["nccopy", "-4", "-d4", "-s", filetosend, temp])
shutil.move(temp, filetosend)
if cmor_var:
handler = Utils.openCdf(filetosend)
var_handler = handler.variables[var]
var_handler.standard_name = cmor_var.standard_name
var_handler.long_name = cmor_var.long_name
var_handler.short_name = cmor_var.short_name
Javier Vegas-Regidor
committed
var_type = var_handler.dtype
handler.modeling_realm = cmor_var.domain
handler.table_id = 'Table {0} (December 2013)'.format(self.get_domain_abbreviation(cmor_var.domain,
frequency))
Javier Vegas-Regidor
committed
if cmor_var.units:
Javier Vegas-Regidor
committed
handler.sync()
if 'lev' in handler.variables:
handler.variables['lev'].short_name = 'lev'
if domain == 'ocean':
handler.variables['lev'].standard_name = 'depth'
if 'lon' in handler.variables:
handler.variables['lon'].short_name = 'lon'
handler.variables['lon'].standard_name = 'longitude'
if 'lat' in handler.variables:
handler.variables['lat'].short_name = 'lat'
handler.variables['lat'].standard_name = 'latitude'
handler.close()
Javier Vegas-Regidor
committed
if cmor_var.valid_min != '':
valid_min = '-a valid_min, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_min)
else:
valid_min = ''
if cmor_var.valid_max != '':
valid_max = '-a valid_max, {0}, o, {1}, "{2}" '.format(var, var_type.char, cmor_var.valid_max)
else:
valid_max = ''
Utils.nco.ncatted(input=filetosend, output=filetosend,
options='-O -a _FillValue,{0},o,{1},"1.e20" '
Javier Vegas-Regidor
committed
'-a missingValue,{0},o,{1},"1.e20" {2}{3}'.format(var, var_type.char,
valid_min, valid_max))
variables = dict()
variables['x'] = 'i'
variables['y'] = 'j'
variables['nav_lat_grid_V'] = 'lat'
variables['nav_lon_grid_V'] = 'lon'
variables['nav_lat_grid_U'] = 'lat'
variables['nav_lon_grid_U'] = 'lon'
variables['nav_lat_grid_T'] = 'lat'
variables['nav_lon_grid_T'] = 'lon'
Utils.rename_variables(filetosend, variables, False, True)
Javier Vegas-Regidor
committed
Utils.move_file(filetosend, filepath)
Javier Vegas-Regidor
committed
self._create_link(domain, filepath, frequency, var, grid, move_old)
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@staticmethod
def _fix_units(cmor_var, var_handler):
if 'units' in var_handler.ncattrs():
if var_handler.units == 'PSU':
var_handler.units = 'psu'
if var_handler.units == 'C' and cmor_var.units == 'K':
var_handler.units = 'deg_C'
if cmor_var.units != var_handler.units:
try:
new_unit = Units(cmor_var.units)
old_unit = Units(var_handler.units)
var_handler[:] = Units.conform(var_handler[:], old_unit, new_unit, inplace=True)
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = Units.conform(float(var_handler.valid_min), old_unit, new_unit,
inplace=True)
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = Units.conform(float(var_handler.valid_max), old_unit, new_unit,
inplace=True)
except ValueError:
factor, offset = UnitConversion.get_conversion_factor_offset(var_handler.units,
cmor_var.units)
var_handler[:] = var_handler[:] * factor + offset
if 'valid_min' in var_handler.ncattrs():
var_handler.valid_min = float(var_handler.valid_min) * factor + offset
if 'valid_max' in var_handler.ncattrs():
var_handler.valid_max = float(var_handler.valid_max) * factor + offset
var_handler.units = cmor_var.units
@staticmethod
def _prepare_region(filepath, filetosend, region, var):
Utils.convert2netcdf4(filetosend)
if not os.path.exists(filepath):
handler = Utils.openCdf(filetosend)
handler.createDimension('region')
var_region = handler.createVariable('region', str, 'region')
var_region[0] = region
original_var = handler.variables[var]
new_var = handler.createVariable('new_var', original_var.datatype,
original_var.dimensions + ('region',))
new_var.setncatts({k: original_var.getncattr(k) for k in original_var.ncattrs()})
value = original_var[:]
new_var[..., 0] = value
handler.close()
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O -x -v {0}'.format(var))
Utils.rename_variable(filetosend, 'new_var', var)
else:
temp = TempFile.get()
shutil.copyfile(filepath, temp)
Utils.nco.ncks(input=temp, output=temp, options='-O --mk_rec_dmn region')
handler = Utils.openCdf(temp)
handler_send = Utils.openCdf(filetosend)
value = handler_send.variables[var][:]
var_region = handler.variables['region']
basin_index = np.where(var_region[:] == region)
if len(basin_index[0]) == 0:
var_region[var_region.shape[0]] = region
basin_index = var_region.shape[0] - 1
else:
basin_index = basin_index[0][0]
handler.variables[var][..., basin_index] = value
handler.close()
handler_send.close()
Utils.move_file(temp, filetosend)
Utils.nco.ncks(input=filetosend, output=filetosend, options='-O --fix_rec_dmn region')
@staticmethod
def _get_final_var_name(box, var):
if box:
var += box.get_lon_str() + box.get_lat_str() + box.get_depth_str()
return var
Javier Vegas-Regidor
committed
@staticmethod
def correct_domain(domain):
"""
Corrects domain capitalization
:param domain: domain name
:type domain: str
:return: domain with correct capitalization
:rtype: str
"""
Javier Vegas-Regidor
committed
domain = domain.lower()
if domain == 'seaice':
return 'seaIce'
elif domain == 'landice':
return 'landIce'
return domain
Javier Vegas-Regidor
committed
def _create_link(self, domain, filepath, frequency, var, grid, move_old):
if frequency in ('d', 'daily', 'day'):
freq_str = 'daily_mean'
elif frequency.endswith('hr'):
freq_str = frequency[:-2] + 'hourly'
if grid:
var = '{0}-{1}'.format(var, grid)
Javier Vegas-Regidor
committed
variable_folder = '{0}_f{1}h'.format(var, self.experiment.ocean_timestep)
variable_folder = '{0}_f{1}h'.format(var, self.experiment.atmos_timestep)
Javier Vegas-Regidor
committed
link_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str, variable_folder)
Javier Vegas-Regidor
committed
if not os.path.exists(link_path):
# This can be a race condition
# noinspection PyBroadException
try:
os.makedirs(link_path)
except Exception:
pass
Javier Vegas-Regidor
committed
elif move_old:
if self.lock.acquire(False):
if link_path not in self._checked_vars:
self._checked_vars.append(link_path)
Javier Vegas-Regidor
committed
self.lock.release()
old_path = os.path.join(self.config.data_dir, self.experiment.expid, freq_str,
'old_{0}_f{1}h'.format(var, self.experiment.atmos_timestep))
regex = re.compile(var + '_[0-9]{6,8}\.nc')
for filename in os.listdir(link_path):
if regex.match(filename):
if not os.path.exists(old_path):
# This can be a race condition
# noinspection PyBroadException
try:
os.makedirs(old_path)
except Exception:
pass
Utils.move_file(os.path.join(link_path, filename),
os.path.join(old_path, filename))
Javier Vegas-Regidor
committed
link_path = os.path.join(link_path, os.path.basename(filepath))
if os.path.lexists(link_path):
os.remove(link_path)
if not os.path.exists(filepath):
raise ValueError('Original file {0} does not exists')
Javier Vegas-Regidor
committed
os.symlink(filepath, link_path)
def get_domain_abbreviation(domain, frequency):
"""
Returns the table name for a domain-frequency pair
:param domain: variable's domain
:type domain: str
:param frequency: variable's frequency
:type frequency: str
:return: variable's table name
:rtype: str
"""
if frequency == 'mon':
if domain == 'seaIce':
domain_abreviattion = 'OImon'
elif domain == 'landIce':
domain_abreviattion = 'LImon'
else:
domain_abreviattion = domain[0].upper() + 'mon'
elif frequency == '6hr':
domain_abreviattion = '6hrPlev'
else:
domain_abreviattion = 'day'
return domain_abreviattion
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
# Abstract methods
def get_file_path(self, startdate, member, domain, var, chunk, frequency,
box=None, grid=None, year=None, date_str=None):
"""
:param date_str: exact date_str to use in the cmorized file
:type: str
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int | None
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: file absolute path
:rtype: str
"""
raise NotImplementedError()
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
raise NotImplementedError()
# Overridable methods (not mandatory)
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False):
pass
class CMORManager(DataManager):
def get_file_path(self, startdate, member, domain, var, chunk, frequency,
box=None, grid=None, year=None, date_str=None):
if not frequency:
frequency = self.config.frequency
var = self._get_final_var_name(box, var)
domain_abreviattion = self.get_domain_abbreviation(domain, frequency)
start = parse_date(startdate)
member_plus = str(member + 1)
member_path = os.path.join(self._get_startdate_path(startdate), frequency, domain)
if chunk is not None:
chunk_start = chunk_start_date(start, chunk, self.experiment.chunk_size, 'month', 'standard')
chunk_end = chunk_end_date(chunk_start, self.experiment.chunk_size, 'month', 'standard')
chunk_end = previous_day(chunk_end, 'standard')
time_bound = "{0:04}{1:02}-{2:04}{3:02}".format(chunk_start.year, chunk_start.month, chunk_end.year,
chunk_end.month)
elif year:
if frequency is not 'yr':
raise ValueError('Year may be provided instead of chunk only if frequency is "yr"')
time_bound = str(year)
elif date_str:
time_bound = date_str
else:
raise ValueError('Chunk, year and date_str can not be None at the same time')
if grid:
var_path = os.path.join(member_path, var, grid, 'r{0}i1p1'.format(member_plus))
else:
var_path = os.path.join(member_path, var, 'r{0}i1p1'.format(member_plus))
filepath = os.path.join(var_path, '{0}_{1}_{2}_{3}_S{4}_r{5}i1p1_'
'{6}.nc'.format(var, domain_abreviattion, self.experiment.model,
self.experiment.experiment_name,
startdate, member_plus, time_bound))
return filepath
def link_file(self, domain, var, startdate, member, chunk=None, grid=None, box=None,
frequency=None, year=None, date_str=None, move_old=False):
"""
Creates the link of a given file from the CMOR repository.
:param move_old:
:param date_str:
:param year: if frequency is yearly, this parameter is used to give the corresponding year
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param chunk: file's chunk
:type chunk: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:param frequency: file's frequency (only needed if it is different from the default)
:type frequency: str
:return: path to the copy created on the scratch folder
:rtype: str
"""
var = self._get_final_var_name(box, var)
if not frequency:
frequency = self.config.frequency
domain = DataManager.correct_domain(domain)
filepath = self.get_file_path(startdate, member, domain, var, chunk, frequency, grid, year, date_str)
self._create_link(domain, filepath, frequency, var, grid, move_old)
def get_year(self, domain, var, startdate, member, year, grid=None, box=None):
Gets all the data corresponding to a given year from the CMOR repository to the scratch folder as one file and
returns the path to the scratch's copy.
:param year: year to retrieve
:type year: int
:param domain: CMOR domain
:type domain: str
:param var: variable name
:type var: str
:param startdate: file's startdate
:type startdate: str
:param member: file's member
:type member: int
:param grid: file's grid (only needed if it is not the original)
:type grid: str
:param box: file's box (only needed to retrieve sections or averages)
:type box: Box
:return: path to the copy created on the scratch folder
:rtype: str
"""
chunk_files = list()
Javier Vegas-Regidor
committed
for chunk in self.experiment.get_year_chunks(startdate, year):
chunk_files.append(self.get_file(domain, var, startdate, member, chunk, grid=grid, box=box))
if len(chunk_files) > 1:
temp = TempFile.get()
Utils.nco.ncrcat(input=' '.join(chunk_files), output=temp)
for chunk_file in chunk_files:
os.remove(chunk_file)
else:
temp = chunk_files[0]
temp2 = TempFile.get()
handler = Utils.openCdf(temp)
time = Utils.get_datetime_from_netcdf(handler)
handler.close()
start = None
end = None
for x in range(0, len(time)):
date = time[x]
if date.year == year:
if date.month == 1:
start = x
elif date.month == 12:
end = x
Utils.nco.ncks(input=temp, output=temp2, options='-O -d time,{0},{1}'.format(start, end))
os.remove(temp)
return temp2
def _is_cmorized(self, startdate, member):
startdate_path = self._get_startdate_path(startdate)
if not os.path.exists(startdate_path):
return False
for freq in os.listdir(startdate_path):
freq_path = os.path.join(startdate_path, freq)
for domain in os.listdir(freq_path):
domain_path = os.path.join(freq_path, domain)
for var in os.listdir(domain_path):
Javier Vegas-Regidor
committed
member_path = os.path.join(domain_path, var, 'r{0}i1p1'.format(member + 1))
if os.path.exists(member_path):
return True
return False
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
# noinspection PyPep8Naming
def prepare_CMOR_files(self):
"""
Prepares the data to be used by the diagnostic.
If CMOR data is not created, it show a warning and closes. In the future, an automatic cmorization procedure
will be launched
If CMOR data is available but packed, the procedure will unpack it.
:return:
"""
# Check if cmorized and convert if not
for startdate, member in self.experiment.get_member_list():
if not self.config.cmor.force and self._is_cmorized(startdate, member):
continue
member_str = self.experiment.get_member_str(member)
if not self.config.cmor.force:
tar_path = os.path.join(self.config.data_dir, self.experiment.expid, 'original_files', 'cmorfiles')
tar_original_files = os.path.join(self.config.data_dir, 'original_files', self.experiment.expid,
'cmorfiles')
file_name = 'CMOR?_{0}_{1}_*.tar.gz'.format(self.experiment.expid, startdate, member_str)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
if len(filepaths) > 0:
Log.info('Unzipping cmorized data...')
Utils.unzip(filepaths, True)
if not os.path.exists(self.cmor_path):
os.mkdir(self.cmor_path)
file_name = 'CMOR?_{0}_{1}_*.tar'.format(self.experiment.expid, startdate, member_str)
filepaths = glob.glob(os.path.join(tar_path, file_name))
filepaths += glob.glob(os.path.join(tar_path, 'outputs', file_name))
filepaths += glob.glob(os.path.join(tar_original_files, file_name))
filepaths += glob.glob(os.path.join(tar_original_files, 'outputs', file_name))
if len(filepaths) > 0:
Log.info('Unpacking cmorized data...')
Utils.untar(filepaths, self.cmor_path)
self._correct_paths(startdate)
self._create_links(startdate)
continue
else:
start_time = datetime.now()
Log.info('CMORizing startdate {0} member {1}. Starting at {0}', startdate, member_str, start_time)
cmorizer = Cmorizer(self, startdate, member)
cmorizer.cmorize_ocean()
cmorizer.cmorize_atmos()
Log.result('CMORized startdate {0} member {1}!\n\n', startdate, member_str,
datetime.now() - start_time)
def _correct_paths(self, startdate):
bad_path = os.path.join(self.cmor_path, 'output', self.experiment.institute)
if os.path.exists(bad_path):
Log.debug('Moving CMOR files out of the output folder')
Utils.execute_shell_command(['mv', bad_path, os.path.join(bad_path, '..', '..')])
os.rmdir(os.path.join(self.cmor_path, 'output'))
if self.experiment.experiment_name != self.experiment.model:
bad_path = os.path.join(self.cmor_path, self.experiment.institute, self.experiment.model,
self.experiment.model)
Log.debug('Correcting double model appearance')
for (dirpath, dirnames, filenames) in os.walk(bad_path, False):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
good = filepath.replace('_{0}_output_'.format(self.experiment.model),
'_{0}_{1}_S{2}_'.format(self.experiment.model,
self.experiment.experiment_name,
startdate))
good = good.replace('/{0}/{0}'.format(self.experiment.model),
'/{0}/{1}'.format(self.experiment.model,
self.experiment.experiment_name))
Utils.move_file(filepath, good)
os.rmdir(dirpath)
Log.debug('Done')
def _create_links(self, startdate):
Log.info('Creating links for CMOR files ()')
path = self._get_startdate_path(startdate)
for freq in os.listdir(path):
for domain in os.listdir(os.path.join(path, freq)):
for var in os.listdir(os.path.join(path, freq, domain)):
for member in os.listdir(os.path.join(path, freq, domain, var)):
for name in os.listdir(os.path.join(path, freq, domain, var, member)):
filepath = os.path.join(path, freq, domain, var, member, name)
if os.path.isfile(filepath):
self._create_link(domain, filepath, freq, var, "", False)
else:
for filename in os.listdir(filepath):
self._create_link(domain, os.path.join(filepath, filename), freq, var, "", False)
Log.info('Creating lings for CMOR files')
def _get_startdate_path(self, startdate):
"""
Returns the path to the startdate's CMOR folder
:param startdate: target startdate
:type startdate: str
:return: path to the startdate's CMOR folder
:rtype: str
"""
return os.path.join(self.config.data_dir, self.experiment.expid, 'cmorfiles', self.experiment.institute,
self.experiment.model, self.experiment.experiment_name, 'S' + startdate)
"""
Class to manage unit conversions
"""
_dict_conversions = None
@classmethod
def load_conversions(cls):
"""
Load conversions from the configuration file
"""
with open(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'conversions.csv'), 'rb') as csvfile:
reader = csv.reader(csvfile, dialect='excel')
for line in reader:
if line[0] == 'original':
continue
cls.add_conversion(UnitConversion(line[0], line[1], line[2], line[3]))
@classmethod
def add_conversion(cls, conversion):
"""
Adds a conversion to the dictionary
:param conversion: conversion to add
:type conversion: UnitConversion
"""
cls._dict_conversions[(conversion.source, conversion.destiny)] = conversion
def __init__(self, source, destiny, factor, offset):
self.source = source
self.destiny = destiny
self.factor = float(factor)
self.offset = float(offset)
@classmethod
def get_conversion_factor_offset(cls, input_units, output_units):
"""
Gets the conversion factor and offset for two units . The conversion has to be done in the following way:
converted = original * factor + offset
:param input_units: original units
:type input_units: str
:param output_units: destiny units
:type output_units: str
:return: factor and offset
:rtype: [float, float]
"""
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
units = input_units.split()
if len(units) == 1:
scale_unit = 1
unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_unit = pow(int(values[0]), int(values[1]))
else:
scale_unit = float(units[0])
unit = units[1]
units = output_units.split()
if len(units) == 1:
scale_new_unit = 1
new_unit = units[0]
else:
if '^' in units[0]:
values = units[0].split('^')
scale_new_unit = pow(int(values[0]), int(values[1]))
else:
scale_new_unit = float(units[0])
new_unit = units[1]
factor, offset = UnitConversion._get_factor(new_unit, unit)
if factor is None:
factor = factor * scale_unit / float(scale_new_unit)
offset /= float(scale_new_unit)
return factor, offset
@classmethod
def _get_factor(cls, new_unit, unit):
# Add only the conversions with a factor greater than 1
if unit == new_unit:
return 1, 0
elif (unit, new_unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(unit, new_unit)]
return conversion.factor, conversion.offset
elif (new_unit, unit) in cls._dict_conversions:
conversion = cls._dict_conversions[(new_unit, unit)]
Javier Vegas-Regidor
committed
return 1 / conversion.factor, -conversion.offset